Browse Source

Add AWS S3 support via OpenDAL for data files

pull/5626/head
Chase Douglas 5 months ago
parent
commit
4a5d54777e
  1. 938
      Cargo.lock
  2. 7
      Cargo.toml
  3. 3
      build.rs
  4. 2
      src/api/core/ciphers.rs
  5. 22
      src/api/core/sends.rs
  6. 60
      src/config.rs
  7. 23
      src/db/models/attachment.rs
  8. 4
      src/db/models/cipher.rs
  9. 20
      src/main.rs

938
Cargo.lock

File diff suppressed because it is too large

7
Cargo.toml

@ -32,6 +32,7 @@ enable_mimalloc = ["dep:mimalloc"]
# You also need to set an env variable `QUERY_LOGGER=1` to fully activate this so you do not have to re-compile # You also need to set an env variable `QUERY_LOGGER=1` to fully activate this so you do not have to re-compile
# if you want to turn off the logging for a specific run. # if you want to turn off the logging for a specific run.
query_logger = ["dep:diesel_logger"] query_logger = ["dep:diesel_logger"]
s3 = ["opendal/services-s3", "dep:aws-config", "dep:aws-credential-types", "dep:anyhow", "dep:reqsign"]
# Enable unstable features, requires nightly # Enable unstable features, requires nightly
# Currently only used to enable rusts official ip support # Currently only used to enable rusts official ip support
@ -179,6 +180,12 @@ grass_compiler = { version = "0.13.4", default-features = false }
# File are accessed through Apache OpenDAL # File are accessed through Apache OpenDAL
opendal = { version = "0.53.2", features = ["services-fs"] } opendal = { version = "0.53.2", features = ["services-fs"] }
# For retrieving AWS credentials, including temporary SSO credentials
anyhow = { version = "1.0.98", optional = true }
aws-config = { version = "1.6.3", features = ["behavior-version-latest"], optional = true }
aws-credential-types = { version = "1.2.3", optional = true }
reqsign = { version = "0.16.3", optional = true }
# Strip debuginfo from the release builds # Strip debuginfo from the release builds
# The debug symbols are to provide better panic traces # The debug symbols are to provide better panic traces
# Also enable fat LTO and use 1 codegen unit for optimizations # Also enable fat LTO and use 1 codegen unit for optimizations

3
build.rs

@ -11,6 +11,8 @@ fn main() {
println!("cargo:rustc-cfg=postgresql"); println!("cargo:rustc-cfg=postgresql");
#[cfg(feature = "query_logger")] #[cfg(feature = "query_logger")]
println!("cargo:rustc-cfg=query_logger"); println!("cargo:rustc-cfg=query_logger");
#[cfg(feature = "s3")]
println!("cargo:rustc-cfg=s3");
#[cfg(not(any(feature = "sqlite", feature = "mysql", feature = "postgresql")))] #[cfg(not(any(feature = "sqlite", feature = "mysql", feature = "postgresql")))]
compile_error!( compile_error!(
@ -23,6 +25,7 @@ fn main() {
println!("cargo::rustc-check-cfg=cfg(mysql)"); println!("cargo::rustc-check-cfg=cfg(mysql)");
println!("cargo::rustc-check-cfg=cfg(postgresql)"); println!("cargo::rustc-check-cfg=cfg(postgresql)");
println!("cargo::rustc-check-cfg=cfg(query_logger)"); println!("cargo::rustc-check-cfg=cfg(query_logger)");
println!("cargo::rustc-check-cfg=cfg(s3)");
// Rerun when these paths are changed. // Rerun when these paths are changed.
// Someone could have checked-out a tag or specific commit, but no other files changed. // Someone could have checked-out a tag or specific commit, but no other files changed.

2
src/api/core/ciphers.rs

@ -1051,7 +1051,7 @@ async fn get_attachment(
} }
match Attachment::find_by_id(&attachment_id, &mut conn).await { match Attachment::find_by_id(&attachment_id, &mut conn).await {
Some(attachment) if cipher_id == attachment.cipher_uuid => Ok(Json(attachment.to_json(&headers.host)?)), Some(attachment) if cipher_id == attachment.cipher_uuid => Ok(Json(attachment.to_json(&headers.host).await?)),
Some(_) => err!("Attachment doesn't belong to cipher"), Some(_) => err!("Attachment doesn't belong to cipher"),
None => err!("Attachment doesn't exist"), None => err!("Attachment doesn't exist"),
} }

22
src/api/core/sends.rs

@ -1,4 +1,5 @@
use std::path::Path; use std::path::Path;
use std::time::Duration;
use chrono::{DateTime, TimeDelta, Utc}; use chrono::{DateTime, TimeDelta, Utc};
use num_traits::ToPrimitive; use num_traits::ToPrimitive;
@ -569,15 +570,26 @@ async fn post_access_file(
Ok(Json(json!({ Ok(Json(json!({
"object": "send-fileDownload", "object": "send-fileDownload",
"id": file_id, "id": file_id,
"url": download_url(&host, &send_id, &file_id)?, "url": download_url(&host, &send_id, &file_id).await?,
}))) })))
} }
fn download_url(host: &Host, send_id: &SendId, file_id: &SendFileId) -> Result<String, crate::Error> { async fn download_url(host: &Host, send_id: &SendId, file_id: &SendFileId) -> Result<String, crate::Error> {
let token_claims = crate::auth::generate_send_claims(send_id, file_id); let operator = CONFIG.opendal_operator_for_path_type(PathType::Sends)?;
let token = crate::auth::encode_jwt(&token_claims);
if operator.info().scheme() == opendal::Scheme::Fs {
let token_claims = crate::auth::generate_send_claims(send_id, file_id);
let token = crate::auth::encode_jwt(&token_claims);
Ok(format!("{}/api/sends/{send_id}/{file_id}?t={token}", &host.host)) Ok(format!("{}/api/sends/{send_id}/{file_id}?t={token}", &host.host))
} else {
Ok(operator
.presign_read(&format!("{send_id}/{file_id}"), Duration::from_secs(5 * 60))
.await
.map_err(Into::<crate::Error>::into)?
.uri()
.to_string())
}
} }
#[get("/sends/<send_id>/<file_id>?<t>")] #[get("/sends/<send_id>/<file_id>?<t>")]

60
src/config.rs

@ -1176,14 +1176,70 @@ fn opendal_operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
return Ok(operator.clone()); return Ok(operator.clone());
} }
let builder = opendal::services::Fs::default().root(path); let operator = if path.starts_with("s3://") {
let operator = opendal::Operator::new(builder).map_err(Into::<Error>::into)?.finish(); #[cfg(not(s3))]
return Err(opendal::Error::new(opendal::ErrorKind::ConfigInvalid, "S3 support is not enabled").into());
#[cfg(s3)]
opendal_s3_operator_for_path(path)?
} else {
let builder = opendal::services::Fs::default().root(path);
opendal::Operator::new(builder).map_err(Into::<Error>::into)?.finish()
};
operators_by_path.insert(path.to_string(), operator.clone()); operators_by_path.insert(path.to_string(), operator.clone());
Ok(operator) Ok(operator)
} }
#[cfg(s3)]
fn opendal_s3_operator_for_path(path: &str) -> Result<opendal::Operator, Error> {
// This is a custom AWS credential loader that uses the official AWS Rust
// SDK config crate to load credentials. This ensures maximum compatibility
// with AWS credential configurations. For example, OpenDAL doesn't support
// AWS SSO temporary credentials yet.
struct OpenDALS3CredentialLoader {}
#[async_trait]
impl reqsign::AwsCredentialLoad for OpenDALS3CredentialLoader {
async fn load_credential(&self, _client: reqwest::Client) -> anyhow::Result<Option<reqsign::AwsCredential>> {
use aws_credential_types::provider::ProvideCredentials as _;
use tokio::sync::OnceCell;
static DEFAULT_CREDENTIAL_CHAIN: OnceCell<
aws_config::default_provider::credentials::DefaultCredentialsChain,
> = OnceCell::const_new();
let chain = DEFAULT_CREDENTIAL_CHAIN
.get_or_init(|| aws_config::default_provider::credentials::DefaultCredentialsChain::builder().build())
.await;
let creds = chain.provide_credentials().await?;
Ok(Some(reqsign::AwsCredential {
access_key_id: creds.access_key_id().to_string(),
secret_access_key: creds.secret_access_key().to_string(),
session_token: creds.session_token().map(|s| s.to_string()),
expires_in: creds.expiry().map(|expiration| expiration.into()),
}))
}
}
const OPEN_DAL_S3_CREDENTIAL_LOADER: OpenDALS3CredentialLoader = OpenDALS3CredentialLoader {};
let url = Url::parse(path).map_err(|e| format!("Invalid path S3 URL path {path:?}: {e}"))?;
let bucket = url.host_str().ok_or_else(|| format!("Missing Bucket name in data folder S3 URL {path:?}"))?;
let builder = opendal::services::S3::default()
.customized_credential_load(Box::new(OPEN_DAL_S3_CREDENTIAL_LOADER))
.bucket(bucket)
.root(url.path())
.default_storage_class("INTELLIGENT_TIERING");
Ok(opendal::Operator::new(builder).map_err(Into::<Error>::into)?.finish())
}
pub enum PathType { pub enum PathType {
Data, Data,
IconCache, IconCache,

23
src/db/models/attachment.rs

@ -1,3 +1,5 @@
use std::time::Duration;
use bigdecimal::{BigDecimal, ToPrimitive}; use bigdecimal::{BigDecimal, ToPrimitive};
use derive_more::{AsRef, Deref, Display}; use derive_more::{AsRef, Deref, Display};
use serde_json::Value; use serde_json::Value;
@ -42,15 +44,26 @@ impl Attachment {
format!("{}/{}", self.cipher_uuid, self.id) format!("{}/{}", self.cipher_uuid, self.id)
} }
pub fn get_url(&self, host: &str) -> Result<String, crate::Error> { pub async fn get_url(&self, host: &str) -> Result<String, crate::Error> {
let token = encode_jwt(&generate_file_download_claims(self.cipher_uuid.clone(), self.id.clone())); let operator = CONFIG.opendal_operator_for_path_type(PathType::Attachments)?;
Ok(format!("{host}/attachments/{}/{}?token={token}", self.cipher_uuid, self.id))
if operator.info().scheme() == opendal::Scheme::Fs {
let token = encode_jwt(&generate_file_download_claims(self.cipher_uuid.clone(), self.id.clone()));
Ok(format!("{host}/attachments/{}/{}?token={token}", self.cipher_uuid, self.id))
} else {
Ok(operator
.presign_read(&self.get_file_path(), Duration::from_secs(5 * 60))
.await
.map_err(Into::<crate::Error>::into)?
.uri()
.to_string())
}
} }
pub fn to_json(&self, host: &str) -> Result<Value, crate::Error> { pub async fn to_json(&self, host: &str) -> Result<Value, crate::Error> {
Ok(json!({ Ok(json!({
"id": self.id, "id": self.id,
"url": self.get_url(host)?, "url": self.get_url(host).await?,
"fileName": self.file_name, "fileName": self.file_name,
"size": self.file_size.to_string(), "size": self.file_size.to_string(),
"sizeName": crate::util::get_display_size(self.file_size), "sizeName": crate::util::get_display_size(self.file_size),

4
src/db/models/cipher.rs

@ -150,7 +150,7 @@ impl Cipher {
if !attachments.is_empty() { if !attachments.is_empty() {
let mut attachments_json_vec = vec![]; let mut attachments_json_vec = vec![];
for attachment in attachments { for attachment in attachments {
attachments_json_vec.push(attachment.to_json(host)?); attachments_json_vec.push(attachment.to_json(host).await?);
} }
attachments_json = Value::Array(attachments_json_vec); attachments_json = Value::Array(attachments_json_vec);
} }
@ -160,7 +160,7 @@ impl Cipher {
if !attachments.is_empty() { if !attachments.is_empty() {
let mut attachments_json_vec = vec![]; let mut attachments_json_vec = vec![];
for attachment in attachments { for attachment in attachments {
attachments_json_vec.push(attachment.to_json(host)?); attachments_json_vec.push(attachment.to_json(host).await?);
} }
attachments_json = Value::Array(attachments_json_vec); attachments_json = Value::Array(attachments_json_vec);
} }

20
src/main.rs

@ -61,7 +61,7 @@ mod util;
use crate::api::core::two_factor::duo_oidc::purge_duo_contexts; use crate::api::core::two_factor::duo_oidc::purge_duo_contexts;
use crate::api::purge_auth_requests; use crate::api::purge_auth_requests;
use crate::api::{WS_ANONYMOUS_SUBSCRIPTIONS, WS_USERS}; use crate::api::{WS_ANONYMOUS_SUBSCRIPTIONS, WS_USERS};
pub use config::CONFIG; pub use config::{PathType, CONFIG};
pub use error::{Error, MapResult}; pub use error::{Error, MapResult};
use rocket::data::{Limits, ToByteUnit}; use rocket::data::{Limits, ToByteUnit};
use std::sync::{atomic::Ordering, Arc}; use std::sync::{atomic::Ordering, Arc};
@ -461,6 +461,24 @@ fn create_dir(path: &str, description: &str) {
async fn check_data_folder() { async fn check_data_folder() {
let data_folder = &CONFIG.data_folder(); let data_folder = &CONFIG.data_folder();
if data_folder.starts_with("s3://") {
if let Err(e) = CONFIG
.opendal_operator_for_path_type(PathType::Data)
.unwrap_or_else(|e| {
error!("Failed to create S3 operator for data folder '{data_folder}': {e:?}");
exit(1);
})
.check()
.await
{
error!("Could not access S3 data folder '{data_folder}': {e:?}");
exit(1);
}
return;
}
let path = Path::new(data_folder); let path = Path::new(data_folder);
if !path.exists() { if !path.exists() {
error!("Data folder '{data_folder}' doesn't exist."); error!("Data folder '{data_folder}' doesn't exist.");

Loading…
Cancel
Save