diff --git a/src/config.rs b/src/config.rs index 545d7dce..f8088a0f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1264,16 +1264,83 @@ fn opendal_operator_for_path(path: &str) -> Result { Ok(operator) } +#[cfg(s3)] +/// Set S3Config fields from query parameters using serde +fn set_s3_config_param( + config: opendal::services::S3Config, + param_name: &str, + param_value: Option<&str>, +) -> Result { + use serde_json::{json, Value}; + + // Special handling for blocked parameters + const BLOCKED_PARAMS: &[&str] = &["bucket", "root"]; + if BLOCKED_PARAMS.contains(¶m_name) { + return Err(format!("S3 OpenDAL Parameter '{param_name}' cannot be overridden via query string").into()); + } + + // Parse the parameter value + let json_value = match param_value { + None => { + // For boolean fields that default to true when present without value + // This includes fields starting with enable_, disable_, or allow_ + if param_name.starts_with("enable_") + || param_name.starts_with("disable_") + || param_name.starts_with("allow_") + { + json!(true) + } else { + return Err(format!("S3 OpenDAL Parameter '{param_name}' requires a value").into()); + } + } + Some(value) => { + // Try to parse as boolean first + if value == "true" { + json!(true) + } else if value == "false" { + json!(false) + } else if let Ok(num) = value.parse::() { + // Try to parse as number (for fields like delete_max_size, batch_max_operations) + json!(num) + } else { + // Default to string + json!(value) + } + } + }; + + // Convert current config to JSON + let config_json = + serde_json::to_value(config).map_err(|e| Error::from(format!("Failed to serialize S3Config to JSON: {e}")))?; + + // Merge with the new field and deserialize + if let Value::Object(mut config_obj) = config_json { + // Insert the new field + config_obj.insert(param_name.to_string(), json_value.clone()); + + // Try to deserialize with the new field + let new_config = serde_json::from_value::(Value::Object(config_obj)) + .map_err(|e| Error::from(format!("Failed to deserialize S3Config from JSON after updating parameter '{param_name}' to value {json_value}: {e}")))?; + + Ok(new_config) + } else { + unreachable!("S3Config should always serialize to an object"); + } +} + #[cfg(s3)] fn opendal_s3_operator_for_path(path: &str) -> Result { use crate::http_client::aws::AwsReqwestConnector; use aws_config::{default_provider::credentials::DefaultCredentialsChain, provider_config::ProviderConfig}; + use opendal::{services::S3Config, Configurator}; // This is a custom AWS credential loader that uses the official AWS Rust // SDK config crate to load credentials. This ensures maximum compatibility // with AWS credential configurations. For example, OpenDAL doesn't support // AWS SSO temporary credentials yet. - struct OpenDALS3CredentialLoader {} + struct OpenDALS3CredentialLoader { + config: S3Config, + } #[async_trait] impl reqsign::AwsCredentialLoad for OpenDALS3CredentialLoader { @@ -1281,6 +1348,23 @@ fn opendal_s3_operator_for_path(path: &str) -> Result use aws_credential_types::provider::ProvideCredentials as _; use tokio::sync::OnceCell; + // If static credentials are provided, use them directly + match (&self.config.access_key_id, &self.config.secret_access_key) { + (Some(access_key_id), Some(secret_access_key)) => { + return Ok(Some(reqsign::AwsCredential { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + session_token: self.config.session_token.clone(), + expires_in: None, + })); + } + (None, None) if self.config.session_token.is_none() => (), + _ => anyhow::bail!( + "s3 path must have access_key_id and secret_access_key both set, optionally with session_token set, or all three must be unset" + ), + }; + + // Use the default credentials chain from the AWS SDK (especially useful for SSO) static DEFAULT_CREDENTIAL_CHAIN: OnceCell = OnceCell::const_new(); let chain = DEFAULT_CREDENTIAL_CHAIN @@ -1307,18 +1391,46 @@ fn opendal_s3_operator_for_path(path: &str) -> Result } } - const OPEN_DAL_S3_CREDENTIAL_LOADER: OpenDALS3CredentialLoader = OpenDALS3CredentialLoader {}; - let url = Url::parse(path).map_err(|e| format!("Invalid path S3 URL path {path:?}: {e}"))?; let bucket = url.host_str().ok_or_else(|| format!("Missing Bucket name in data folder S3 URL {path:?}"))?; - let builder = opendal::services::S3::default() - .customized_credential_load(Box::new(OPEN_DAL_S3_CREDENTIAL_LOADER)) - .enable_virtual_host_style() - .bucket(bucket) - .root(url.path()) - .default_storage_class("INTELLIGENT_TIERING"); + // Create S3Config and set base configuration based on best practices for + // the official AWS S3 service. + let mut config = S3Config::default(); + config.bucket = bucket.to_string(); + config.root = Some(url.path().to_string()); + + // Default to virtual host style enabled (AWS S3 has deprecated path style) + // + // Note: Some providers may not support virtual host style + config.enable_virtual_host_style = true; + + // Default to AWS S3's Intelligent Tiering storage class for optimal + // cost/performance + // + // Note: Some providers may not support this storage class + config.default_storage_class = Some("INTELLIGENT_TIERING".to_string()); + + // Process query parameters + for (param_name, param_value) in url.query_pairs() { + let param_name = param_name.as_ref(); + let param_value = if param_value.is_empty() { + None + } else { + Some(param_value.as_ref()) + }; + + // Use the generated setter function to handle parameters + config = set_s3_config_param(config, param_name, param_value)?; + } + + let credential_loader = OpenDALS3CredentialLoader { + config: config.clone(), + }; + + // Convert config to builder and add custom credential loader + let builder = config.into_builder().customized_credential_load(Box::new(credential_loader)); Ok(opendal::Operator::new(builder)?.finish()) }