Browse Source

Add AWS S3 support via OpenDAL for data files

pull/5626/head
Chase Douglas 4 weeks ago
parent
commit
d668402c8e
  1. 778
      Cargo.lock
  2. 7
      Cargo.toml
  3. 3
      build.rs
  4. 18
      src/api/core/sends.rs
  5. 60
      src/config.rs
  6. 17
      src/db/models/attachment.rs
  7. 20
      src/main.rs

778
Cargo.lock

File diff suppressed because it is too large

7
Cargo.toml

@ -31,6 +31,7 @@ enable_mimalloc = ["dep:mimalloc"]
# You also need to set an env variable `QUERY_LOGGER=1` to fully activate this so you do not have to re-compile
# if you want to turn off the logging for a specific run.
query_logger = ["dep:diesel_logger"]
s3 = ["opendal/services-s3", "dep:aws-config", "dep:aws-credential-types", "dep:anyhow", "dep:reqsign"]
# Enable unstable features, requires nightly
# Currently only used to enable rusts official ip support
@ -177,6 +178,12 @@ grass_compiler = { version = "0.13.4", default-features = false }
# File are accessed through Apache OpenDAL
opendal = { version = "0.51.2", features = ["services-fs"] }
# For retrieving AWS credentials, including temporary SSO credentials
anyhow = { version = "1.0.96", optional = true }
aws-config = { version = "1.5.12", features = ["behavior-version-latest"], optional = true }
aws-credential-types = { version = "1.2.1", optional = true }
reqsign = { version = "0.16.1", optional = true }
[patch.crates-io]
# Patch yubico to remove duplicate crates of older versions
yubico = { git = "https://github.com/BlackDex/yubico-rs", rev = "00df14811f58155c0f02e3ab10f1570ed3e115c6" }

3
build.rs

@ -11,6 +11,8 @@ fn main() {
println!("cargo:rustc-cfg=postgresql");
#[cfg(feature = "query_logger")]
println!("cargo:rustc-cfg=query_logger");
#[cfg(feature = "s3")]
println!("cargo:rustc-cfg=s3");
#[cfg(not(any(feature = "sqlite", feature = "mysql", feature = "postgresql")))]
compile_error!(
@ -23,6 +25,7 @@ fn main() {
println!("cargo::rustc-check-cfg=cfg(mysql)");
println!("cargo::rustc-check-cfg=cfg(postgresql)");
println!("cargo::rustc-check-cfg=cfg(query_logger)");
println!("cargo::rustc-check-cfg=cfg(s3)");
// Rerun when these paths are changed.
// Someone could have checked-out a tag or specific commit, but no other files changed.

18
src/api/core/sends.rs

@ -1,4 +1,5 @@
use std::path::Path;
use std::time::Duration;
use chrono::{DateTime, TimeDelta, Utc};
use num_traits::ToPrimitive;
@ -556,10 +557,21 @@ async fn post_access_file(
}
async fn download_url(host: &Host, send_id: &SendId, file_id: &SendFileId) -> Result<String, crate::Error> {
let token_claims = crate::auth::generate_send_claims(send_id, file_id);
let token = crate::auth::encode_jwt(&token_claims);
let operator = CONFIG.opendal_operator_for_path_type(PathType::Sends)?;
if operator.info().scheme() == opendal::Scheme::Fs {
let token_claims = crate::auth::generate_send_claims(send_id, file_id);
let token = crate::auth::encode_jwt(&token_claims);
Ok(format!("{}/api/sends/{}/{}?t={}", &host.host, send_id, file_id, token))
Ok(format!("{}/api/sends/{}/{}?t={}", &host.host, send_id, file_id, token))
} else {
Ok(operator
.presign_read(&format!("{send_id}/{file_id}"), Duration::from_secs(5 * 60))
.await
.map_err(Into::<crate::Error>::into)?
.uri()
.to_string())
}
}
#[get("/sends/<send_id>/<file_id>?<t>")]

60
src/config.rs

@ -1170,14 +1170,70 @@ fn opendal_operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
return Ok(operator.clone());
}
let builder = opendal::services::Fs::default().root(path);
let operator = opendal::Operator::new(builder).map_err(Into::<Error>::into)?.finish();
let operator = if path.starts_with("s3://") {
#[cfg(not(s3))]
return Err(opendal::Error::new(opendal::ErrorKind::ConfigInvalid, "S3 support is not enabled").into());
#[cfg(s3)]
opendal_s3_operator_for_path(path)?
} else {
let builder = opendal::services::Fs::default().root(path);
opendal::Operator::new(builder).map_err(Into::<Error>::into)?.finish()
};
operators_by_path.insert(path.to_string(), operator.clone());
Ok(operator)
}
#[cfg(s3)]
fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
// This is a custom AWS credential loader that uses the official AWS Rust
// SDK config crate to load credentials. This ensures maximum compatibility
// with AWS credential configurations. For example, OpenDAL doesn't support
// AWS SSO temporary credentials yet.
struct OpenDALS3CredentialLoader {}
#[async_trait]
impl reqsign::AwsCredentialLoad for OpenDALS3CredentialLoader {
async fn load_credential(&self, _client: reqwest::Client) -> anyhow::Result<Option<reqsign::AwsCredential>> {
use aws_credential_types::provider::ProvideCredentials as _;
use tokio::sync::OnceCell;
static DEFAULT_CREDENTIAL_CHAIN: OnceCell<
aws_config::default_provider::credentials::DefaultCredentialsChain,
> = OnceCell::const_new();
let chain = DEFAULT_CREDENTIAL_CHAIN
.get_or_init(|| aws_config::default_provider::credentials::DefaultCredentialsChain::builder().build())
.await;
let creds = chain.provide_credentials().await?;
Ok(Some(reqsign::AwsCredential {
access_key_id: creds.access_key_id().to_string(),
secret_access_key: creds.secret_access_key().to_string(),
session_token: creds.session_token().map(|s| s.to_string()),
expires_in: creds.expiry().map(|expiration| expiration.into()),
}))
}
}
const OPEN_DAL_S3_CREDENTIAL_LOADER: OpenDALS3CredentialLoader = OpenDALS3CredentialLoader {};
let url = Url::parse(path).map_err(|e| format!("Invalid path S3 URL path {path:?}: {e}"))?;
let bucket = url.host_str().ok_or_else(|| format!("Missing Bucket name in data folder S3 URL {path:?}"))?;
let builder = opendal::services::S3::default()
.customized_credential_load(Box::new(OPEN_DAL_S3_CREDENTIAL_LOADER))
.bucket(bucket)
.root(url.path())
.default_storage_class("INTELLIGENT_TIERING");
Ok(opendal::Operator::new(builder).map_err(Into::<Error>::into)?.finish())
}
pub enum PathType {
Data,
IconCache,

17
src/db/models/attachment.rs

@ -1,3 +1,5 @@
use std::time::Duration;
use bigdecimal::{BigDecimal, ToPrimitive};
use derive_more::{AsRef, Deref, Display};
use serde_json::Value;
@ -43,8 +45,19 @@ impl Attachment {
}
pub async fn get_url(&self, host: &str) -> Result<String, crate::Error> {
let token = encode_jwt(&generate_file_download_claims(self.cipher_uuid.clone(), self.id.clone()));
Ok(format!("{}/attachments/{}/{}?token={}", host, self.cipher_uuid, self.id, token))
let operator = CONFIG.opendal_operator_for_path_type(PathType::Attachments)?;
if operator.info().scheme() == opendal::Scheme::Fs {
let token = encode_jwt(&generate_file_download_claims(self.cipher_uuid.clone(), self.id.clone()));
Ok(format!("{}/attachments/{}/{}?token={}", host, self.cipher_uuid, self.id, token))
} else {
Ok(operator
.presign_read(&self.get_file_path(), Duration::from_secs(5 * 60))
.await
.map_err(Into::<crate::Error>::into)?
.uri()
.to_string())
}
}
pub async fn to_json(&self, host: &str) -> Result<Value, crate::Error> {

20
src/main.rs

@ -61,7 +61,7 @@ mod util;
use crate::api::core::two_factor::duo_oidc::purge_duo_contexts;
use crate::api::purge_auth_requests;
use crate::api::{WS_ANONYMOUS_SUBSCRIPTIONS, WS_USERS};
pub use config::CONFIG;
pub use config::{PathType, CONFIG};
pub use error::{Error, MapResult};
use rocket::data::{Limits, ToByteUnit};
use std::sync::{atomic::Ordering, Arc};
@ -464,6 +464,24 @@ fn create_dir(path: &str, description: &str) {
async fn check_data_folder() {
let data_folder = &CONFIG.data_folder();
if data_folder.starts_with("s3://") {
if let Err(e) = CONFIG
.opendal_operator_for_path_type(PathType::Data)
.unwrap_or_else(|e| {
error!("Failed to create S3 operator for data folder '{data_folder}': {e:?}");
exit(1);
})
.check()
.await
{
error!("Could not access S3 data folder '{data_folder}': {e:?}");
exit(1);
}
return;
}
let path = Path::new(data_folder);
if !path.exists() {
error!("Data folder '{}' doesn't exist.", data_folder);

Loading…
Cancel
Save