Browse Source

feat: add S3-compatible OpenDAL URI parameter parsing

pull/6840/head
g-roliveira 7 months ago
parent
commit
2fdcfa66eb
  1. 287
      src/config.rs

287
src/config.rs

@ -1404,16 +1404,166 @@ fn opendal_operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
Ok(operator) Ok(operator)
} }
#[cfg(s3)]
fn is_s3_secret_param(param_name: &str) -> bool {
matches!(param_name, "access_key_id" | "secret_access_key" | "session_token")
}
#[cfg(s3)]
fn parse_s3_bool(value: &str) -> Option<bool> {
match value.to_ascii_lowercase().as_str() {
"true" | "1" | "yes" => Some(true),
"false" | "0" | "no" => Some(false),
_ => None,
}
}
#[cfg(s3)]
fn is_s3_implicit_bool_param(param_name: &str) -> bool {
param_name.starts_with("enable_") || param_name.starts_with("disable_") || param_name.starts_with("allow_")
}
#[cfg(s3)]
/// Set S3Config fields from query parameters using serde.
fn set_s3_config_param(
config: opendal::services::S3Config,
param_name: &str,
param_value: Option<&str>,
) -> Result<opendal::services::S3Config, Error> {
use serde_json::{json, Value};
// Special handling for blocked parameters
const BLOCKED_PARAMS: &[&str] = &["bucket", "root"];
if BLOCKED_PARAMS.contains(&param_name) {
return Err(format!("S3 OpenDAL Parameter '{param_name}' cannot be overridden via query string").into());
}
// Parse the parameter value
let json_value = match param_value {
None => {
// For boolean fields that default to true when present without value
// This includes fields starting with enable_, disable_, or allow_
if is_s3_implicit_bool_param(param_name) {
json!(true)
} else {
return Err(format!("S3 OpenDAL Parameter '{param_name}' requires a value").into());
}
}
Some(value) => {
// Try to parse as boolean first
if let Some(bool_value) = parse_s3_bool(value) {
json!(bool_value)
} else if let Ok(num) = value.parse::<usize>() {
// Try to parse as number (for fields like delete_max_size, batch_max_operations)
json!(num)
} else {
// Default to string
json!(value)
}
}
};
// Convert current config to JSON
let config_json =
serde_json::to_value(config).map_err(|e| Error::from(format!("Failed to serialize S3Config to JSON: {e}")))?;
// Merge with the new field and deserialize
if let Value::Object(mut config_obj) = config_json {
// Insert the new field
config_obj.insert(param_name.to_string(), json_value.clone());
// Try to deserialize with the new field
let display_json_value = if is_s3_secret_param(param_name) {
json!("***")
} else {
json_value.clone()
};
let new_config = serde_json::from_value::<opendal::services::S3Config>(Value::Object(config_obj))
.map_err(|e| Error::from(format!("Failed to deserialize S3Config from JSON after updating parameter '{param_name}' to value {display_json_value}: {e}")))?;
Ok(new_config)
} else {
unreachable!("S3Config should always serialize to an object");
}
}
#[cfg(s3)]
fn parse_s3_config_for_path(path: &str) -> Result<opendal::services::S3Config, Error> {
use opendal::services::S3Config;
let url = Url::parse(path).map_err(|e| format!("Invalid path S3 URL path {path:?}: {e}"))?;
let bucket = url.host_str().ok_or_else(|| format!("Missing Bucket name in data folder S3 URL {path:?}"))?;
// Create S3Config and set base configuration based on best practices for
// the official AWS S3 service.
let mut config = S3Config::default();
config.bucket = bucket.to_string();
config.root = Some(url.path().to_string());
// Default to virtual host style enabled (AWS S3 has deprecated path style)
//
// Note: Some providers may not support virtual host style
config.enable_virtual_host_style = true;
// Default to AWS S3's Intelligent Tiering storage class for optimal
// cost/performance
//
// Note: Some providers may not support this storage class
config.default_storage_class = Some("INTELLIGENT_TIERING".to_string());
// Process query parameters
for (param_name, param_value) in url.query_pairs() {
let param_name = param_name.as_ref();
let mut param_value = if param_value.is_empty() {
None
} else {
Some(param_value.as_ref())
};
if param_name == "disable_virtual_host_style" {
let value = param_value.unwrap_or("true");
let bool_value = parse_s3_bool(value)
.ok_or_else(|| format!("S3 OpenDAL Parameter 'disable_virtual_host_style' has invalid boolean value {value:?}"))?;
let enabled_value = if bool_value { "false" } else { "true" };
config = set_s3_config_param(config, "enable_virtual_host_style", Some(enabled_value))?;
continue;
}
if param_name == "default_storage_class" && param_value.is_none() {
param_value = Some("");
}
// Use the generated setter function to handle parameters
config = set_s3_config_param(config, param_name, param_value)?;
}
if config.access_key_id.is_some() || config.secret_access_key.is_some() || config.session_token.is_some() {
warn!(
"S3 static credentials provided through path query parameters. This works, but using environment credentials or IAM is recommended."
);
}
if config.default_storage_class.as_deref() == Some("") {
config.default_storage_class = None;
}
Ok(config)
}
#[cfg(s3)] #[cfg(s3)]
fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error> { fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
use crate::http_client::aws::AwsReqwestConnector; use crate::http_client::aws::AwsReqwestConnector;
use aws_config::{default_provider::credentials::DefaultCredentialsChain, provider_config::ProviderConfig}; use aws_config::{default_provider::credentials::DefaultCredentialsChain, provider_config::ProviderConfig};
use opendal::{services::S3Config, Configurator};
// This is a custom AWS credential loader that uses the official AWS Rust // This is a custom AWS credential loader that uses the official AWS Rust
// SDK config crate to load credentials. This ensures maximum compatibility // SDK config crate to load credentials. This ensures maximum compatibility
// with AWS credential configurations. For example, OpenDAL doesn't support // with AWS credential configurations. For example, OpenDAL doesn't support
// AWS SSO temporary credentials yet. // AWS SSO temporary credentials yet.
struct OpenDALS3CredentialLoader {} struct OpenDALS3CredentialLoader {
config: S3Config,
}
#[async_trait] #[async_trait]
impl reqsign::AwsCredentialLoad for OpenDALS3CredentialLoader { impl reqsign::AwsCredentialLoad for OpenDALS3CredentialLoader {
@ -1421,6 +1571,23 @@ fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error>
use aws_credential_types::provider::ProvideCredentials as _; use aws_credential_types::provider::ProvideCredentials as _;
use tokio::sync::OnceCell; use tokio::sync::OnceCell;
// If static credentials are provided, use them directly
match (&self.config.access_key_id, &self.config.secret_access_key) {
(Some(access_key_id), Some(secret_access_key)) => {
return Ok(Some(reqsign::AwsCredential {
access_key_id: access_key_id.clone(),
secret_access_key: secret_access_key.clone(),
session_token: self.config.session_token.clone(),
expires_in: None,
}));
}
(None, None) if self.config.session_token.is_none() => (),
_ => anyhow::bail!(
"s3 path must have access_key_id and secret_access_key both set, optionally with session_token set, or all three must be unset"
),
};
// Use the default credentials chain from the AWS SDK (especially useful for SSO)
static DEFAULT_CREDENTIAL_CHAIN: OnceCell<DefaultCredentialsChain> = OnceCell::const_new(); static DEFAULT_CREDENTIAL_CHAIN: OnceCell<DefaultCredentialsChain> = OnceCell::const_new();
let chain = DEFAULT_CREDENTIAL_CHAIN let chain = DEFAULT_CREDENTIAL_CHAIN
@ -1447,22 +1614,120 @@ fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error>
} }
} }
const OPEN_DAL_S3_CREDENTIAL_LOADER: OpenDALS3CredentialLoader = OpenDALS3CredentialLoader {}; let config = parse_s3_config_for_path(path)?;
let url = Url::parse(path).map_err(|e| format!("Invalid path S3 URL path {path:?}: {e}"))?;
let bucket = url.host_str().ok_or_else(|| format!("Missing Bucket name in data folder S3 URL {path:?}"))?; let credential_loader = OpenDALS3CredentialLoader {
config: config.clone(),
};
let builder = opendal::services::S3::default() // Convert config to builder and add custom credential loader
.customized_credential_load(Box::new(OPEN_DAL_S3_CREDENTIAL_LOADER)) let builder = config.into_builder().customized_credential_load(Box::new(credential_loader));
.enable_virtual_host_style()
.bucket(bucket)
.root(url.path())
.default_storage_class("INTELLIGENT_TIERING");
Ok(opendal::Operator::new(builder)?.finish()) Ok(opendal::Operator::new(builder)?.finish())
} }
#[cfg(all(test, s3))]
mod s3_tests {
use super::{opendal_s3_operator_for_path, parse_s3_config_for_path};
#[test]
fn test_parse_s3_config_defaults() {
let config = parse_s3_config_for_path("s3://vaultwarden-data/path/to/root").expect("config should parse");
assert_eq!(config.bucket, "vaultwarden-data");
assert_eq!(config.root.as_deref(), Some("/path/to/root"));
assert!(config.enable_virtual_host_style);
assert_eq!(config.default_storage_class.as_deref(), Some("INTELLIGENT_TIERING"));
}
#[test]
fn test_parse_s3_config_custom_endpoint_and_path_style() {
let config = parse_s3_config_for_path(
"s3://vw/path?endpoint=http%3A%2F%2F127.0.0.1%3A9000&enable_virtual_host_style=false&default_storage_class=STANDARD&region=us-east-1",
)
.expect("config should parse");
assert_eq!(config.endpoint.as_deref(), Some("http://127.0.0.1:9000"));
assert!(!config.enable_virtual_host_style);
assert_eq!(config.default_storage_class.as_deref(), Some("STANDARD"));
assert_eq!(config.region.as_deref(), Some("us-east-1"));
}
#[test]
fn test_parse_s3_config_disable_virtual_host_style_alias() {
let config =
parse_s3_config_for_path("s3://vw/path?disable_virtual_host_style=true").expect("config should parse");
assert!(!config.enable_virtual_host_style);
}
#[test]
fn test_parse_s3_config_storage_class_can_be_omitted() {
let config = parse_s3_config_for_path("s3://vw/path?default_storage_class=").expect("config should parse");
assert_eq!(config.default_storage_class, None);
}
#[test]
fn test_parse_s3_config_implicit_boolean_flag() {
let config = parse_s3_config_for_path("s3://vw/path?enable_virtual_host_style")
.expect("config should parse");
assert!(config.enable_virtual_host_style);
}
#[test]
fn test_parse_s3_config_boolean_variants() {
let config = parse_s3_config_for_path("s3://vw/path?enable_virtual_host_style=0")
.expect("config should parse");
assert!(!config.enable_virtual_host_style);
}
#[test]
fn test_parse_s3_config_percent_encoded_prefix() {
let config = parse_s3_config_for_path("s3://vw/path%20with%20spaces").expect("config should parse");
assert_eq!(config.root.as_deref(), Some("/path with spaces"));
}
#[test]
fn test_parse_s3_config_rejects_unknown_parameter() {
let error = parse_s3_config_for_path("s3://vw/path?unknown_param=value")
.expect_err("unknown params should fail");
let error_string = error.to_string();
assert!(error_string.contains("unknown field"));
}
#[test]
#[ignore]
fn test_s3_minio_integration_put_get_delete() {
let endpoint = std::env::var("VW_S3_MINIO_ENDPOINT").unwrap_or_else(|_| "http://127.0.0.1:9000".to_string());
let bucket = std::env::var("VW_S3_MINIO_BUCKET").unwrap_or_else(|_| "vaultwarden-test".to_string());
let mut root = std::env::var("VW_S3_MINIO_ROOT").unwrap_or_else(|_| "/vaultwarden-s3-test".to_string());
if !root.starts_with('/') {
root = format!("/{root}");
}
let access_key = std::env::var("VW_S3_MINIO_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string());
let secret_key = std::env::var("VW_S3_MINIO_SECRET_KEY").unwrap_or_else(|_| "minioadmin".to_string());
let mut query = url::form_urlencoded::Serializer::new(String::new());
query.append_pair("endpoint", &endpoint);
query.append_pair("enable_virtual_host_style", "false");
query.append_pair("default_storage_class", "STANDARD");
query.append_pair("access_key_id", &access_key);
query.append_pair("secret_access_key", &secret_key);
let s3_path = format!("s3://{bucket}{root}?{}", query.finish());
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().expect("tokio runtime should build");
rt.block_on(async move {
let operator = opendal_s3_operator_for_path(&s3_path).expect("operator should be created");
let key = format!("integration/{}.txt", uuid::Uuid::new_v4());
let payload = b"vaultwarden-opendal-s3-compatible";
operator.write(&key, payload.as_slice()).await.expect("object upload should succeed");
let buffer = operator.read(&key).await.expect("object download should succeed");
assert_eq!(buffer.to_vec(), payload.as_slice());
operator.delete(&key).await.expect("object delete should succeed");
});
}
}
pub enum PathType { pub enum PathType {
Data, Data,
IconCache, IconCache,

Loading…
Cancel
Save