Browse Source

Merge 64268f4b18 into 8e7eeab293

pull/6127/merge
Chase Douglas 3 days ago
committed by GitHub
parent
commit
d41b1c9136
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 130
      src/config.rs

130
src/config.rs

@ -1264,16 +1264,83 @@ fn opendal_operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
Ok(operator)
}
#[cfg(s3)]
/// Set S3Config fields from query parameters using serde
fn set_s3_config_param(
config: opendal::services::S3Config,
param_name: &str,
param_value: Option<&str>,
) -> Result<opendal::services::S3Config, Error> {
use serde_json::{json, Value};
// Special handling for blocked parameters
const BLOCKED_PARAMS: &[&str] = &["bucket", "root"];
if BLOCKED_PARAMS.contains(&param_name) {
return Err(format!("S3 OpenDAL Parameter '{param_name}' cannot be overridden via query string").into());
}
// Parse the parameter value
let json_value = match param_value {
None => {
// For boolean fields that default to true when present without value
// This includes fields starting with enable_, disable_, or allow_
if param_name.starts_with("enable_")
|| param_name.starts_with("disable_")
|| param_name.starts_with("allow_")
{
json!(true)
} else {
return Err(format!("S3 OpenDAL Parameter '{param_name}' requires a value").into());
}
}
Some(value) => {
// Try to parse as boolean first
if value == "true" {
json!(true)
} else if value == "false" {
json!(false)
} else if let Ok(num) = value.parse::<usize>() {
// Try to parse as number (for fields like delete_max_size, batch_max_operations)
json!(num)
} else {
// Default to string
json!(value)
}
}
};
// Convert current config to JSON
let config_json =
serde_json::to_value(config).map_err(|e| Error::from(format!("Failed to serialize S3Config to JSON: {e}")))?;
// Merge with the new field and deserialize
if let Value::Object(mut config_obj) = config_json {
// Insert the new field
config_obj.insert(param_name.to_string(), json_value.clone());
// Try to deserialize with the new field
let new_config = serde_json::from_value::<opendal::services::S3Config>(Value::Object(config_obj))
.map_err(|e| Error::from(format!("Failed to deserialize S3Config from JSON after updating parameter '{param_name}' to value {json_value}: {e}")))?;
Ok(new_config)
} else {
unreachable!("S3Config should always serialize to an object");
}
}
#[cfg(s3)]
fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
use crate::http_client::aws::AwsReqwestConnector;
use aws_config::{default_provider::credentials::DefaultCredentialsChain, provider_config::ProviderConfig};
use opendal::{services::S3Config, Configurator};
// This is a custom AWS credential loader that uses the official AWS Rust
// SDK config crate to load credentials. This ensures maximum compatibility
// with AWS credential configurations. For example, OpenDAL doesn't support
// AWS SSO temporary credentials yet.
struct OpenDALS3CredentialLoader {}
struct OpenDALS3CredentialLoader {
config: S3Config,
}
#[async_trait]
impl reqsign::AwsCredentialLoad for OpenDALS3CredentialLoader {
@ -1281,6 +1348,23 @@ fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error>
use aws_credential_types::provider::ProvideCredentials as _;
use tokio::sync::OnceCell;
// If static credentials are provided, use them directly
match (&self.config.access_key_id, &self.config.secret_access_key) {
(Some(access_key_id), Some(secret_access_key)) => {
return Ok(Some(reqsign::AwsCredential {
access_key_id: access_key_id.clone(),
secret_access_key: secret_access_key.clone(),
session_token: self.config.session_token.clone(),
expires_in: None,
}));
}
(None, None) if self.config.session_token.is_none() => (),
_ => anyhow::bail!(
"s3 path must have access_key_id and secret_access_key both set, optionally with session_token set, or all three must be unset"
),
};
// Use the default credentials chain from the AWS SDK (especially useful for SSO)
static DEFAULT_CREDENTIAL_CHAIN: OnceCell<DefaultCredentialsChain> = OnceCell::const_new();
let chain = DEFAULT_CREDENTIAL_CHAIN
@ -1307,18 +1391,46 @@ fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error>
}
}
const OPEN_DAL_S3_CREDENTIAL_LOADER: OpenDALS3CredentialLoader = OpenDALS3CredentialLoader {};
let url = Url::parse(path).map_err(|e| format!("Invalid path S3 URL path {path:?}: {e}"))?;
let bucket = url.host_str().ok_or_else(|| format!("Missing Bucket name in data folder S3 URL {path:?}"))?;
let builder = opendal::services::S3::default()
.customized_credential_load(Box::new(OPEN_DAL_S3_CREDENTIAL_LOADER))
.enable_virtual_host_style()
.bucket(bucket)
.root(url.path())
.default_storage_class("INTELLIGENT_TIERING");
// Create S3Config and set base configuration based on best practices for
// the official AWS S3 service.
let mut config = S3Config::default();
config.bucket = bucket.to_string();
config.root = Some(url.path().to_string());
// Default to virtual host style enabled (AWS S3 has deprecated path style)
//
// Note: Some providers may not support virtual host style
config.enable_virtual_host_style = true;
// Default to AWS S3's Intelligent Tiering storage class for optimal
// cost/performance
//
// Note: Some providers may not support this storage class
config.default_storage_class = Some("INTELLIGENT_TIERING".to_string());
// Process query parameters
for (param_name, param_value) in url.query_pairs() {
let param_name = param_name.as_ref();
let param_value = if param_value.is_empty() {
None
} else {
Some(param_value.as_ref())
};
// Use the generated setter function to handle parameters
config = set_s3_config_param(config, param_name, param_value)?;
}
let credential_loader = OpenDALS3CredentialLoader {
config: config.clone(),
};
// Convert config to builder and add custom credential loader
let builder = config.into_builder().customized_credential_load(Box::new(credential_loader));
Ok(opendal::Operator::new(builder)?.finish())
}

Loading…
Cancel
Save