@ -1186,16 +1186,83 @@ fn opendal_operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
Ok ( operator )
}
#[ cfg(s3) ]
/// Set S3Config fields from query parameters using serde
fn set_s3_config_param (
config : opendal ::services ::S3Config ,
param_name : & str ,
param_value : Option < & str > ,
) -> Result < opendal ::services ::S3Config , Error > {
use serde_json ::{ json , Value } ;
// Special handling for blocked parameters
const BLOCKED_PARAMS : & [ & str ] = & [ "bucket" , "root" ] ;
if BLOCKED_PARAMS . contains ( & param_name ) {
return Err ( format ! ( "S3 OpenDAL Parameter '{param_name}' cannot be overridden via query string" ) . into ( ) ) ;
}
// Parse the parameter value
let json_value = match param_value {
None = > {
// For boolean fields that default to true when present without value
// This includes fields starting with enable_, disable_, or allow_
if param_name . starts_with ( "enable_" )
| | param_name . starts_with ( "disable_" )
| | param_name . starts_with ( "allow_" )
{
json ! ( true )
} else {
return Err ( format ! ( "S3 OpenDAL Parameter '{param_name}' requires a value" ) . into ( ) ) ;
}
}
Some ( value ) = > {
// Try to parse as boolean first
if value = = "true" {
json ! ( true )
} else if value = = "false" {
json ! ( false )
} else if let Ok ( num ) = value . parse ::< usize > ( ) {
// Try to parse as number (for fields like delete_max_size, batch_max_operations)
json ! ( num )
} else {
// Default to string
json ! ( value )
}
}
} ;
// Convert current config to JSON
let config_json =
serde_json ::to_value ( config ) . map_err ( | e | Error ::from ( format ! ( "Failed to serialize S3Config to JSON: {e}" ) ) ) ? ;
// Merge with the new field and deserialize
if let Value ::Object ( mut config_obj ) = config_json {
// Insert the new field
config_obj . insert ( param_name . to_string ( ) , json_value . clone ( ) ) ;
// Try to deserialize with the new field
let new_config = serde_json ::from_value ::< opendal ::services ::S3Config > ( Value ::Object ( config_obj ) )
. map_err ( | e | Error ::from ( format ! ( "Failed to deserialize S3Config from JSON after updating parameter '{param_name}' to value {json_value}: {e}" ) ) ) ? ;
Ok ( new_config )
} else {
unreachable ! ( "S3Config should always serialize to an object" ) ;
}
}
#[ cfg(s3) ]
fn opendal_s3_operator_for_path ( path : & str ) -> Result < opendal ::Operator , Error > {
use crate ::http_client ::aws ::AwsReqwestConnector ;
use aws_config ::{ default_provider ::credentials ::DefaultCredentialsChain , provider_config ::ProviderConfig } ;
use opendal ::{ services ::S3Config , Configurator } ;
// This is a custom AWS credential loader that uses the official AWS Rust
// SDK config crate to load credentials. This ensures maximum compatibility
// with AWS credential configurations. For example, OpenDAL doesn't support
// AWS SSO temporary credentials yet.
struct OpenDALS3CredentialLoader { }
struct OpenDALS3CredentialLoader {
config : S3Config ,
}
#[ async_trait ]
impl reqsign ::AwsCredentialLoad for OpenDALS3CredentialLoader {
@ -1203,6 +1270,23 @@ fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error>
use aws_credential_types ::provider ::ProvideCredentials as _ ;
use tokio ::sync ::OnceCell ;
// If static credentials are provided, use them directly
match ( & self . config . access_key_id , & self . config . secret_access_key ) {
( Some ( access_key_id ) , Some ( secret_access_key ) ) = > {
return Ok ( Some ( reqsign ::AwsCredential {
access_key_id : access_key_id . clone ( ) ,
secret_access_key : secret_access_key . clone ( ) ,
session_token : self . config . session_token . clone ( ) ,
expires_in : None ,
} ) ) ;
}
( None , None ) if self . config . session_token . is_none ( ) = > ( ) ,
_ = > anyhow ::bail ! (
"s3 path must have access_key_id and secret_access_key both set, optionally with session_token set, or all three must be unset"
) ,
} ;
// Use the default credentials chain from the AWS SDK (especially useful for SSO)
static DEFAULT_CREDENTIAL_CHAIN : OnceCell < DefaultCredentialsChain > = OnceCell ::const_new ( ) ;
let chain = DEFAULT_CREDENTIAL_CHAIN
@ -1229,18 +1313,46 @@ fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error>
}
}
const OPEN_DAL_S3_CREDENTIAL_LOADER : OpenDALS3CredentialLoader = OpenDALS3CredentialLoader { } ;
let url = Url ::parse ( path ) . map_err ( | e | format ! ( "Invalid path S3 URL path {path:?}: {e}" ) ) ? ;
let bucket = url . host_str ( ) . ok_or_else ( | | format ! ( "Missing Bucket name in data folder S3 URL {path:?}" ) ) ? ;
let builder = opendal ::services ::S3 ::default ( )
. customized_credential_load ( Box ::new ( OPEN_DAL_S3_CREDENTIAL_LOADER ) )
. enable_virtual_host_style ( )
. bucket ( bucket )
. root ( url . path ( ) )
. default_storage_class ( "INTELLIGENT_TIERING" ) ;
// Create S3Config and set base configuration based on best practices for
// the official AWS S3 service.
let mut config = S3Config ::default ( ) ;
config . bucket = bucket . to_string ( ) ;
config . root = Some ( url . path ( ) . to_string ( ) ) ;
// Default to virtual host style enabled (AWS S3 has deprecated path style)
//
// Note: Some providers may not support virtual host style
config . enable_virtual_host_style = true ;
// Default to AWS S3's Intelligent Tiering storage class for optimal
// cost/performance
//
// Note: Some providers may not support this storage class
config . default_storage_class = Some ( "INTELLIGENT_TIERING" . to_string ( ) ) ;
// Process query parameters
for ( param_name , param_value ) in url . query_pairs ( ) {
let param_name = param_name . as_ref ( ) ;
let param_value = if param_value . is_empty ( ) {
None
} else {
Some ( param_value . as_ref ( ) )
} ;
// Use the generated setter function to handle parameters
config = set_s3_config_param ( config , param_name , param_value ) ? ;
}
let credential_loader = OpenDALS3CredentialLoader {
config : config . clone ( ) ,
} ;
// Convert config to builder and add custom credential loader
let builder = config . into_builder ( ) . customized_credential_load ( Box ::new ( credential_loader ) ) ;
Ok ( opendal ::Operator ::new ( builder ) ? . finish ( ) )
}