From 64268f4b18eba95e06e08abf0f8c97f8a49c4f12 Mon Sep 17 00:00:00 2001 From: Chase Douglas Date: Sun, 3 Aug 2025 13:12:26 -0700 Subject: [PATCH] OpenDAL S3 parameter support This change allows s3:// data paths to include additional parameters as querystrings. The default parameters remain optimized for the official AWS S3 service, but any OpenDAL S3 parameters can be set. For example, to override the default `true` value for `enable_virtual_host_style` you can set the data config to: s3://my-bucket?enable_virtual_host_style=false Boolean parameters can be set to `true` by simply adding the parameter: s3://my-bucket?enable_versioning Fixes #6112 --- src/config.rs | 130 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 121 insertions(+), 9 deletions(-) diff --git a/src/config.rs b/src/config.rs index 5a3d060f..9283ab42 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1186,16 +1186,83 @@ fn opendal_operator_for_path(path: &str) -> Result { Ok(operator) } +#[cfg(s3)] +/// Set S3Config fields from query parameters using serde +fn set_s3_config_param( + config: opendal::services::S3Config, + param_name: &str, + param_value: Option<&str>, +) -> Result { + use serde_json::{json, Value}; + + // Special handling for blocked parameters + const BLOCKED_PARAMS: &[&str] = &["bucket", "root"]; + if BLOCKED_PARAMS.contains(¶m_name) { + return Err(format!("S3 OpenDAL Parameter '{param_name}' cannot be overridden via query string").into()); + } + + // Parse the parameter value + let json_value = match param_value { + None => { + // For boolean fields that default to true when present without value + // This includes fields starting with enable_, disable_, or allow_ + if param_name.starts_with("enable_") + || param_name.starts_with("disable_") + || param_name.starts_with("allow_") + { + json!(true) + } else { + return Err(format!("S3 OpenDAL Parameter '{param_name}' requires a value").into()); + } + } + Some(value) => { + // Try to parse as boolean first + if value == "true" { + json!(true) + } else if value == "false" { + json!(false) + } else if let Ok(num) = value.parse::() { + // Try to parse as number (for fields like delete_max_size, batch_max_operations) + json!(num) + } else { + // Default to string + json!(value) + } + } + }; + + // Convert current config to JSON + let config_json = + serde_json::to_value(config).map_err(|e| Error::from(format!("Failed to serialize S3Config to JSON: {e}")))?; + + // Merge with the new field and deserialize + if let Value::Object(mut config_obj) = config_json { + // Insert the new field + config_obj.insert(param_name.to_string(), json_value.clone()); + + // Try to deserialize with the new field + let new_config = serde_json::from_value::(Value::Object(config_obj)) + .map_err(|e| Error::from(format!("Failed to deserialize S3Config from JSON after updating parameter '{param_name}' to value {json_value}: {e}")))?; + + Ok(new_config) + } else { + unreachable!("S3Config should always serialize to an object"); + } +} + #[cfg(s3)] fn opendal_s3_operator_for_path(path: &str) -> Result { use crate::http_client::aws::AwsReqwestConnector; use aws_config::{default_provider::credentials::DefaultCredentialsChain, provider_config::ProviderConfig}; + use opendal::{services::S3Config, Configurator}; // This is a custom AWS credential loader that uses the official AWS Rust // SDK config crate to load credentials. This ensures maximum compatibility // with AWS credential configurations. For example, OpenDAL doesn't support // AWS SSO temporary credentials yet. - struct OpenDALS3CredentialLoader {} + struct OpenDALS3CredentialLoader { + config: S3Config, + } #[async_trait] impl reqsign::AwsCredentialLoad for OpenDALS3CredentialLoader { @@ -1203,6 +1270,23 @@ fn opendal_s3_operator_for_path(path: &str) -> Result use aws_credential_types::provider::ProvideCredentials as _; use tokio::sync::OnceCell; + // If static credentials are provided, use them directly + match (&self.config.access_key_id, &self.config.secret_access_key) { + (Some(access_key_id), Some(secret_access_key)) => { + return Ok(Some(reqsign::AwsCredential { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + session_token: self.config.session_token.clone(), + expires_in: None, + })); + } + (None, None) if self.config.session_token.is_none() => (), + _ => anyhow::bail!( + "s3 path must have access_key_id and secret_access_key both set, optionally with session_token set, or all three must be unset" + ), + }; + + // Use the default credentials chain from the AWS SDK (especially useful for SSO) static DEFAULT_CREDENTIAL_CHAIN: OnceCell = OnceCell::const_new(); let chain = DEFAULT_CREDENTIAL_CHAIN @@ -1229,18 +1313,46 @@ fn opendal_s3_operator_for_path(path: &str) -> Result } } - const OPEN_DAL_S3_CREDENTIAL_LOADER: OpenDALS3CredentialLoader = OpenDALS3CredentialLoader {}; - let url = Url::parse(path).map_err(|e| format!("Invalid path S3 URL path {path:?}: {e}"))?; let bucket = url.host_str().ok_or_else(|| format!("Missing Bucket name in data folder S3 URL {path:?}"))?; - let builder = opendal::services::S3::default() - .customized_credential_load(Box::new(OPEN_DAL_S3_CREDENTIAL_LOADER)) - .enable_virtual_host_style() - .bucket(bucket) - .root(url.path()) - .default_storage_class("INTELLIGENT_TIERING"); + // Create S3Config and set base configuration based on best practices for + // the official AWS S3 service. + let mut config = S3Config::default(); + config.bucket = bucket.to_string(); + config.root = Some(url.path().to_string()); + + // Default to virtual host style enabled (AWS S3 has deprecated path style) + // + // Note: Some providers may not support virtual host style + config.enable_virtual_host_style = true; + + // Default to AWS S3's Intelligent Tiering storage class for optimal + // cost/performance + // + // Note: Some providers may not support this storage class + config.default_storage_class = Some("INTELLIGENT_TIERING".to_string()); + + // Process query parameters + for (param_name, param_value) in url.query_pairs() { + let param_name = param_name.as_ref(); + let param_value = if param_value.is_empty() { + None + } else { + Some(param_value.as_ref()) + }; + + // Use the generated setter function to handle parameters + config = set_s3_config_param(config, param_name, param_value)?; + } + + let credential_loader = OpenDALS3CredentialLoader { + config: config.clone(), + }; + + // Convert config to builder and add custom credential loader + let builder = config.into_builder().customized_credential_load(Box::new(credential_loader)); Ok(opendal::Operator::new(builder)?.finish()) }