Browse Source

storage: add OpenDAL S3 URI options

OpenDAL S3 storage accepts bucket and root path data today, but
serverless deployments also need URI query parameters to describe provider
behavior in one DATA_FOLDER value.

Update OpenDAL to 0.56.0 and build S3 operators with
S3Config::from_uri(). Keep Vaultwarden's AWS SDK credential chain by
installing a reqsign provider when the URI does not explicitly request
OpenDAL-native credential handling.

Move path handling and operator construction into storage.rs so S3-specific
parsing, credential setup, and URI path manipulation stay out of
configuration handling. Local filesystem behavior is unchanged, and S3
child paths are derived before query strings.
pull/6127/head
Chase Douglas 3 weeks ago
parent
commit
67429c55c3
  1. 2
      src/api/core/sends.rs
  2. 8
      src/auth.rs
  3. 148
      src/config.rs
  4. 2
      src/db/models/attachment.rs
  5. 1
      src/main.rs
  6. 297
      src/storage.rs

2
src/api/core/sends.rs

@ -568,7 +568,7 @@ async fn post_access_file(
async fn download_url(host: &Host, send_id: &SendId, file_id: &SendFileId) -> Result<String, crate::Error> {
let operator = CONFIG.opendal_operator_for_path_type(&PathType::Sends)?;
if operator.info().scheme() == opendal::services::FS_SCHEME {
if crate::storage::is_fs_operator(&operator) {
let token_claims = crate::auth::generate_send_claims(send_id, file_id);
let token = crate::auth::encode_jwt(&token_claims);

8
src/auth.rs

@ -54,12 +54,8 @@ static PUBLIC_RSA_KEY: OnceLock<DecodingKey> = OnceLock::new();
pub async fn initialize_keys() -> Result<(), Error> {
use std::io::Error;
let rsa_key_filename = std::path::PathBuf::from(CONFIG.private_rsa_key())
.file_name()
.ok_or_else(|| Error::other("Private RSA key path missing filename"))?
.to_str()
.ok_or_else(|| Error::other("Private RSA key path filename is not valid UTF-8"))?
.to_string();
let rsa_key_filename = crate::storage::file_name(&CONFIG.private_rsa_key())
.ok_or_else(|| Error::other("Private RSA key path missing filename"))?;
let operator = CONFIG.opendal_operator_for_path_type(&PathType::RsaKey).map_err(Error::other)?;

148
src/config.rs

@ -14,6 +14,7 @@ use serde::de::{self, Deserialize, Deserializer, MapAccess, Visitor};
use crate::{
error::Error,
storage,
util::{
get_active_web_release, get_env, get_env_bool, is_valid_email, parse_experimental_client_feature_flags,
FeatureFlagFilter,
@ -22,18 +23,14 @@ use crate::{
static CONFIG_FILE: LazyLock<String> = LazyLock::new(|| {
let data_folder = get_env("DATA_FOLDER").unwrap_or_else(|| String::from("data"));
get_env("CONFIG_FILE").unwrap_or_else(|| format!("{data_folder}/config.json"))
get_env("CONFIG_FILE").unwrap_or_else(|| storage::join_path(&data_folder, "config.json"))
});
static CONFIG_FILE_PARENT_DIR: LazyLock<String> = LazyLock::new(|| {
let path = std::path::PathBuf::from(&*CONFIG_FILE);
path.parent().unwrap_or(std::path::Path::new("data")).to_str().unwrap_or("data").to_string()
});
static CONFIG_FILE_PARENT_DIR: LazyLock<String> =
LazyLock::new(|| storage::parent(&CONFIG_FILE).unwrap_or_else(|| "data".to_string()));
static CONFIG_FILENAME: LazyLock<String> = LazyLock::new(|| {
let path = std::path::PathBuf::from(&*CONFIG_FILE);
path.file_name().unwrap_or(std::ffi::OsStr::new("config.json")).to_str().unwrap_or("config.json").to_string()
});
static CONFIG_FILENAME: LazyLock<String> =
LazyLock::new(|| storage::file_name(&CONFIG_FILE).unwrap_or_else(|| "config.json".to_string()));
pub static SKIP_CONFIG_VALIDATION: AtomicBool = AtomicBool::new(false);
@ -263,7 +260,7 @@ macro_rules! make_config {
}
async fn from_file() -> Result<Self, Error> {
let operator = opendal_operator_for_path(&CONFIG_FILE_PARENT_DIR)?;
let operator = storage::operator_for_path(&CONFIG_FILE_PARENT_DIR)?;
let config_bytes = operator.read(&CONFIG_FILENAME).await?;
println!("[INFO] Using saved config from `{}` for configuration.\n", *CONFIG_FILE);
serde_json::from_slice(&config_bytes.to_vec()).map_err(Into::into)
@ -507,19 +504,19 @@ make_config! {
/// Data folder |> Main data folder
data_folder: String, false, def, "data".to_string();
/// Database URL
database_url: String, false, auto, |c| format!("{}/db.sqlite3", c.data_folder);
database_url: String, false, auto, |c| storage::join_path(&c.data_folder, "db.sqlite3");
/// Icon cache folder
icon_cache_folder: String, false, auto, |c| format!("{}/icon_cache", c.data_folder);
icon_cache_folder: String, false, auto, |c| storage::join_path(&c.data_folder, "icon_cache");
/// Attachments folder
attachments_folder: String, false, auto, |c| format!("{}/attachments", c.data_folder);
attachments_folder: String, false, auto, |c| storage::join_path(&c.data_folder, "attachments");
/// Sends folder
sends_folder: String, false, auto, |c| format!("{}/sends", c.data_folder);
sends_folder: String, false, auto, |c| storage::join_path(&c.data_folder, "sends");
/// Temp folder |> Used for storing temporary file uploads
tmp_folder: String, false, auto, |c| format!("{}/tmp", c.data_folder);
tmp_folder: String, false, auto, |c| storage::join_path(&c.data_folder, "tmp");
/// Templates folder
templates_folder: String, false, auto, |c| format!("{}/templates", c.data_folder);
templates_folder: String, false, auto, |c| storage::join_path(&c.data_folder, "templates");
/// Session JWT key
rsa_key_filename: String, false, auto, |c| format!("{}/rsa_key", c.data_folder);
rsa_key_filename: String, false, auto, |c| storage::join_path(&c.data_folder, "rsa_key");
/// Web vault folder
web_vault_folder: String, false, def, "web-vault/".to_string();
},
@ -1366,107 +1363,6 @@ fn smtp_convert_deprecated_ssl_options(smtp_ssl: Option<bool>, smtp_explicit_tls
"starttls".to_string()
}
fn opendal_operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
// Cache of previously built operators by path
static OPERATORS_BY_PATH: LazyLock<dashmap::DashMap<String, opendal::Operator>> =
LazyLock::new(dashmap::DashMap::new);
if let Some(operator) = OPERATORS_BY_PATH.get(path) {
return Ok(operator.clone());
}
let operator = if path.starts_with("s3://") {
#[cfg(not(s3))]
return Err(opendal::Error::new(opendal::ErrorKind::ConfigInvalid, "S3 support is not enabled").into());
#[cfg(s3)]
opendal_s3_operator_for_path(path)?
} else {
let builder = opendal::services::Fs::default().root(path);
opendal::Operator::new(builder)?.finish()
};
OPERATORS_BY_PATH.insert(path.to_string(), operator.clone());
Ok(operator)
}
#[cfg(s3)]
fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
use crate::http_client::aws::AwsReqwestConnector;
use aws_config::{default_provider::credentials::DefaultCredentialsChain, provider_config::ProviderConfig};
use reqsign_aws_v4::Credential;
use reqsign_core::{Context, ProvideCredential, ProvideCredentialChain};
// This is a custom AWS credential loader that uses the official AWS Rust
// SDK config crate to load credentials. This ensures maximum compatibility
// with AWS credential configurations. For example, OpenDAL doesn't support
// AWS SSO temporary credentials yet.
#[derive(Debug)]
struct OpenDALS3CredentialProvider;
impl ProvideCredential for OpenDALS3CredentialProvider {
type Credential = Credential;
async fn provide_credential(&self, _ctx: &Context) -> reqsign_core::Result<Option<Self::Credential>> {
use aws_credential_types::provider::ProvideCredentials as _;
use reqsign_core::time::Timestamp;
use tokio::sync::OnceCell;
static DEFAULT_CREDENTIAL_CHAIN: OnceCell<DefaultCredentialsChain> = OnceCell::const_new();
let chain = DEFAULT_CREDENTIAL_CHAIN
.get_or_init(|| {
let reqwest_client = reqwest::Client::builder().build().unwrap();
let connector = AwsReqwestConnector {
client: reqwest_client,
};
let conf = ProviderConfig::default().with_http_client(connector);
DefaultCredentialsChain::builder().configure(conf).build()
})
.await;
let creds = chain.provide_credentials().await.map_err(|e| {
reqsign_core::Error::unexpected("failed to load AWS credentials via AWS SDK").with_source(e)
})?;
let expires_in = if let Some(expiration) = creds.expiry() {
let duration = expiration.duration_since(std::time::UNIX_EPOCH).map_err(|e| {
reqsign_core::Error::unexpected("AWS credential expiration is before the Unix epoch").with_source(e)
})?;
let seconds = i64::try_from(duration.as_secs()).map_err(|e| {
reqsign_core::Error::unexpected("AWS credential expiration is too large").with_source(e)
})?;
Some(Timestamp::from_second(seconds)?)
} else {
None
};
Ok(Some(Credential {
access_key_id: creds.access_key_id().to_string(),
secret_access_key: creds.secret_access_key().to_string(),
session_token: creds.session_token().map(|s| s.to_string()),
expires_in,
}))
}
}
let url = Url::parse(path).map_err(|e| format!("Invalid path S3 URL path {path:?}: {e}"))?;
let bucket = url.host_str().ok_or_else(|| format!("Missing Bucket name in data folder S3 URL {path:?}"))?;
let builder = opendal::services::S3::default()
.credential_provider_chain(ProvideCredentialChain::new().push(OpenDALS3CredentialProvider))
.enable_virtual_host_style()
.bucket(bucket)
.root(url.path())
.default_storage_class("INTELLIGENT_TIERING");
Ok(opendal::Operator::new(builder)?.finish())
}
pub enum PathType {
Data,
IconCache,
@ -1564,7 +1460,7 @@ impl Config {
}
//Save to file
let operator = opendal_operator_for_path(&CONFIG_FILE_PARENT_DIR)?;
let operator = storage::operator_for_path(&CONFIG_FILE_PARENT_DIR)?;
operator.write(&CONFIG_FILENAME, config_str).await?;
Ok(())
@ -1629,7 +1525,7 @@ impl Config {
}
pub async fn delete_user_config(&self) -> Result<(), Error> {
let operator = opendal_operator_for_path(&CONFIG_FILE_PARENT_DIR)?;
let operator = storage::operator_for_path(&CONFIG_FILE_PARENT_DIR)?;
operator.delete(&CONFIG_FILENAME).await?;
// Empty user config
@ -1653,7 +1549,7 @@ impl Config {
}
pub fn private_rsa_key(&self) -> String {
format!("{}.pem", self.rsa_key_filename())
storage::with_extension(&self.rsa_key_filename(), "pem")
}
pub fn mail_enabled(&self) -> bool {
let inner = &self.inner.read().unwrap().config;
@ -1694,15 +1590,11 @@ impl Config {
PathType::IconCache => self.icon_cache_folder(),
PathType::Attachments => self.attachments_folder(),
PathType::Sends => self.sends_folder(),
PathType::RsaKey => std::path::Path::new(&self.rsa_key_filename())
.parent()
.ok_or_else(|| std::io::Error::other("Failed to get directory of RSA key file"))?
.to_str()
.ok_or_else(|| std::io::Error::other("Failed to convert RSA key file directory to UTF-8 string"))?
.to_string(),
PathType::RsaKey => storage::parent(&self.private_rsa_key())
.ok_or_else(|| std::io::Error::other("Failed to get directory of RSA key file"))?,
};
opendal_operator_for_path(&path)
storage::operator_for_path(&path)
}
pub fn render_template<T: serde::ser::Serialize>(&self, name: &str, data: &T) -> Result<String, Error> {

2
src/db/models/attachment.rs

@ -46,7 +46,7 @@ impl Attachment {
pub async fn get_url(&self, host: &str) -> Result<String, crate::Error> {
let operator = CONFIG.opendal_operator_for_path_type(&PathType::Attachments)?;
if operator.info().scheme() == opendal::services::FS_SCHEME {
if crate::storage::is_fs_operator(&operator) {
let token = encode_jwt(&generate_file_download_claims(self.cipher_uuid.clone(), self.id.clone()));
Ok(format!("{host}/attachments/{}/{}?token={token}", self.cipher_uuid, self.id))
} else {

1
src/main.rs

@ -57,6 +57,7 @@ mod mail;
mod ratelimit;
mod sso;
mod sso_client;
mod storage;
mod util;
use crate::api::core::two_factor::duo_oidc::purge_duo_contexts;

297
src/storage.rs

@ -0,0 +1,297 @@
use std::sync::LazyLock;
pub(crate) fn join_path(base: &str, child: &str) -> String {
#[cfg(s3)]
if s3::is_uri(base) {
return s3::join_path(base, child);
}
let base = base.trim_end_matches('/');
let child = child.trim_start_matches('/');
if base.is_empty() {
child.to_string()
} else if child.is_empty() {
base.to_string()
} else {
format!("{base}/{child}")
}
}
pub(crate) fn with_extension(path: &str, extension: &str) -> String {
let extension = extension.trim_start_matches('.');
#[cfg(s3)]
if s3::is_uri(path) {
return s3::with_extension(path, extension);
}
format!("{path}.{extension}")
}
pub(crate) fn parent(path: &str) -> Option<String> {
#[cfg(s3)]
if s3::is_uri(path) {
return s3::parent(path);
}
std::path::Path::new(path).parent()?.to_str().map(ToString::to_string)
}
pub(crate) fn file_name(path: &str) -> Option<String> {
#[cfg(s3)]
if s3::is_uri(path) {
return s3::file_name(path);
}
std::path::Path::new(path).file_name()?.to_str().map(ToString::to_string)
}
pub(crate) fn is_fs_operator(operator: &opendal::Operator) -> bool {
operator.info().scheme() == opendal::services::FS_SCHEME
}
pub(crate) fn operator_for_path(path: &str) -> Result<opendal::Operator, crate::Error> {
// Cache of previously built operators by path
static OPERATORS_BY_PATH: LazyLock<dashmap::DashMap<String, opendal::Operator>> =
LazyLock::new(dashmap::DashMap::new);
if let Some(operator) = OPERATORS_BY_PATH.get(path) {
return Ok(operator.clone());
}
let operator = if path.starts_with("s3://") {
#[cfg(not(s3))]
return Err(opendal::Error::new(opendal::ErrorKind::ConfigInvalid, "S3 support is not enabled").into());
#[cfg(s3)]
s3::operator_for_path(path)?
} else {
let builder = opendal::services::Fs::default().root(path);
opendal::Operator::new(builder)?.finish()
};
OPERATORS_BY_PATH.insert(path.to_string(), operator.clone());
Ok(operator)
}
#[cfg(s3)]
mod s3 {
use reqwest::Url;
use crate::error::Error;
pub(super) fn is_uri(path: &str) -> bool {
path.starts_with("s3://")
}
pub(super) fn join_path(base: &str, child: &str) -> String {
if let Ok(mut url) = Url::parse(base) {
let mut segments = path_segments(&url);
segments.extend(child.split('/').filter(|segment| !segment.is_empty()).map(ToString::to_string));
set_path_segments(&mut url, &segments);
return url.to_string();
}
let base = base.trim_end_matches('/');
let child = child.trim_start_matches('/');
if base.is_empty() {
child.to_string()
} else if child.is_empty() {
base.to_string()
} else {
format!("{base}/{child}")
}
}
pub(super) fn with_extension(path: &str, extension: &str) -> String {
if let Ok(mut url) = Url::parse(path) {
let mut segments = path_segments(&url);
if let Some(file_name) = segments.last_mut() {
file_name.push('.');
file_name.push_str(extension);
set_path_segments(&mut url, &segments);
return url.to_string();
}
}
format!("{path}.{extension}")
}
pub(super) fn parent(path: &str) -> Option<String> {
if let Ok(mut url) = Url::parse(path) {
let mut segments = path_segments(&url);
segments.pop()?;
set_path_segments(&mut url, &segments);
return Some(url.to_string());
}
std::path::Path::new(path).parent()?.to_str().map(ToString::to_string)
}
pub(super) fn file_name(path: &str) -> Option<String> {
if let Ok(url) = Url::parse(path) {
return path_segments(&url).pop();
}
std::path::Path::new(path).file_name()?.to_str().map(ToString::to_string)
}
fn path_segments(url: &Url) -> Vec<String> {
url.path_segments()
.map(|segments| segments.filter(|segment| !segment.is_empty()).map(ToString::to_string).collect())
.unwrap_or_default()
}
fn set_path_segments(url: &mut Url, segments: &[String]) {
if segments.is_empty() {
url.set_path("");
} else {
url.set_path(&format!("/{}", segments.join("/")));
}
}
pub(super) fn operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
use crate::http_client::aws::AwsReqwestConnector;
use aws_config::{default_provider::credentials::DefaultCredentialsChain, provider_config::ProviderConfig};
use opendal::Configurator;
use reqsign_aws_v4::Credential;
use reqsign_core::{Context, ProvideCredential, ProvideCredentialChain};
// This is a custom AWS credential loader that uses the official AWS Rust
// SDK config crate to load credentials. This ensures maximum compatibility
// with AWS credential configurations. For example, OpenDAL doesn't support
// AWS SSO temporary credentials yet.
#[derive(Debug)]
struct OpenDALS3CredentialProvider;
impl ProvideCredential for OpenDALS3CredentialProvider {
type Credential = Credential;
async fn provide_credential(&self, _ctx: &Context) -> reqsign_core::Result<Option<Self::Credential>> {
use aws_credential_types::provider::ProvideCredentials as _;
use reqsign_core::time::Timestamp;
use tokio::sync::OnceCell;
static DEFAULT_CREDENTIAL_CHAIN: OnceCell<DefaultCredentialsChain> = OnceCell::const_new();
let chain = DEFAULT_CREDENTIAL_CHAIN
.get_or_init(|| {
let reqwest_client = reqwest::Client::builder().build().unwrap();
let connector = AwsReqwestConnector {
client: reqwest_client,
};
let conf = ProviderConfig::default().with_http_client(connector);
DefaultCredentialsChain::builder().configure(conf).build()
})
.await;
let creds = chain.provide_credentials().await.map_err(|e| {
reqsign_core::Error::unexpected("failed to load AWS credentials via AWS SDK").with_source(e)
})?;
let expires_in = if let Some(expiration) = creds.expiry() {
let duration = expiration.duration_since(std::time::UNIX_EPOCH).map_err(|e| {
reqsign_core::Error::unexpected("AWS credential expiration is before the Unix epoch")
.with_source(e)
})?;
let seconds = i64::try_from(duration.as_secs()).map_err(|e| {
reqsign_core::Error::unexpected("AWS credential expiration is too large").with_source(e)
})?;
Some(Timestamp::from_second(seconds)?)
} else {
None
};
Ok(Some(Credential {
access_key_id: creds.access_key_id().to_string(),
secret_access_key: creds.secret_access_key().to_string(),
session_token: creds.session_token().map(|s| s.to_string()),
expires_in,
}))
}
}
let uri = opendal::OperatorUri::new(path, std::iter::empty::<(String, String)>())?;
let mut config = opendal::services::S3Config::from_uri(&uri)?;
if !uri_has_option(&uri, &["default_storage_class"]) {
config.default_storage_class = Some("INTELLIGENT_TIERING".to_string());
}
if !uri_has_option(
&uri,
&["enable_virtual_host_style", "aws_virtual_hosted_style_request", "virtual_hosted_style_request"],
) {
config.enable_virtual_host_style = true;
}
let use_aws_sdk_credentials = !uri_has_credential_options(&uri, &config);
let mut builder = config.into_builder();
if use_aws_sdk_credentials {
builder =
builder.credential_provider_chain(ProvideCredentialChain::new().push(OpenDALS3CredentialProvider));
}
Ok(opendal::Operator::new(builder)?.finish())
}
fn uri_has_option(uri: &opendal::OperatorUri, names: &[&str]) -> bool {
names.iter().any(|name| uri.options().contains_key(*name))
}
fn uri_has_credential_options(uri: &opendal::OperatorUri, config: &opendal::services::S3Config) -> bool {
config.access_key_id.is_some()
|| config.secret_access_key.is_some()
|| config.session_token.is_some()
|| config.role_arn.is_some()
|| config.external_id.is_some()
|| config.role_session_name.is_some()
|| uri_has_option(uri, &["allow_anonymous", "disable_config_load", "disable_ec2_metadata"])
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn handles_local_paths() {
assert_eq!(join_path("data", "attachments"), "data/attachments");
assert_eq!(with_extension("data/rsa_key", "pem"), "data/rsa_key.pem");
assert_eq!(parent("data/rsa_key.pem").as_deref(), Some("data"));
assert_eq!(file_name("data/rsa_key.pem").as_deref(), Some("rsa_key.pem"));
}
}
#[cfg(all(test, s3))]
mod s3_tests {
use super::*;
#[test]
fn joins_s3_path_before_query_string() {
assert_eq!(
join_path("s3://bucket/base?region=us-west-2", "attachments"),
"s3://bucket/base/attachments?region=us-west-2"
);
}
#[test]
fn appends_extension_before_s3_query_string() {
assert_eq!(
with_extension("s3://bucket/base/rsa_key?region=us-west-2", "pem"),
"s3://bucket/base/rsa_key.pem?region=us-west-2"
);
}
#[test]
fn splits_s3_parent_and_file_name_without_query_string() {
let path = "s3://bucket/base/config.json?region=us-west-2";
assert_eq!(parent(path).as_deref(), Some("s3://bucket/base?region=us-west-2"));
assert_eq!(file_name(path).as_deref(), Some("config.json"));
}
}
Loading…
Cancel
Save