From 009565735c2088ae7fad1c073045089e8150101c Mon Sep 17 00:00:00 2001 From: 0x484558 <0x484558@pm.me> Date: Tue, 24 Mar 2026 22:55:37 +0100 Subject: [PATCH] add single-job worker entrypoint for scheduled tasks --- src/db/mod.rs | 40 +++++++++++++------- src/jobs.rs | 86 ++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 101 +++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 196 insertions(+), 31 deletions(-) create mode 100644 src/jobs.rs diff --git a/src/db/mod.rs b/src/db/mod.rs index ae2b1221..a01e9a49 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -181,8 +181,8 @@ impl Drop for DbPool { } impl DbPool { - // For the given database URL, guess its type, run migrations, create pool, and return it - pub fn from_config() -> Result { + // For the given database URL, guess its type, optionally run migrations, create pool, and return it + pub fn from_config_with_migrations(run_migrations: bool) -> Result { let db_url = CONFIG.database_url(); let conn_type = DbConnType::from_url(&db_url)?; @@ -194,18 +194,20 @@ impl DbPool { drop(diesel::connection::set_default_instrumentation(query_logger::simple_logger)); } - match conn_type { - #[cfg(mysql)] - DbConnType::Mysql => { - mysql_migrations::run_migrations(&db_url)?; - } - #[cfg(postgresql)] - DbConnType::Postgresql => { - postgresql_migrations::run_migrations(&db_url)?; - } - #[cfg(sqlite)] - DbConnType::Sqlite => { - sqlite_migrations::run_migrations(&db_url)?; + if run_migrations { + match conn_type { + #[cfg(mysql)] + DbConnType::Mysql => { + mysql_migrations::run_migrations(&db_url)?; + } + #[cfg(postgresql)] + DbConnType::Postgresql => { + postgresql_migrations::run_migrations(&db_url)?; + } + #[cfg(sqlite)] + DbConnType::Sqlite => { + sqlite_migrations::run_migrations(&db_url)?; + } } } @@ -233,6 +235,16 @@ impl DbPool { }) } + // For the given database URL, guess its type, run migrations, create pool, and return it + pub fn from_config() -> Result { + Self::from_config_with_migrations(true) + } + + // For the given database URL, guess its type, skip migrations, create pool, and return it + pub fn from_config_no_migrations() -> Result { + Self::from_config_with_migrations(false) + } + // Get a connection from the pool pub async fn get(&self) -> Result { let duration = Duration::from_secs(CONFIG.database_timeout()); diff --git a/src/jobs.rs b/src/jobs.rs new file mode 100644 index 00000000..b8c3ff70 --- /dev/null +++ b/src/jobs.rs @@ -0,0 +1,86 @@ +use crate::{ + api::{self, purge_auth_requests}, + db::{models::SsoAuth, DbPool}, + Error, +}; + +use crate::api::core::two_factor::duo_oidc::purge_duo_contexts; + +#[derive(Clone, Copy, Debug)] +pub enum ScheduledJob { + SendPurge, + TrashPurge, + Incomplete2faNotifications, + EmergencyRequestTimeout, + EmergencyNotificationReminder, + AuthRequestPurge, + DuoContextPurge, + EventCleanup, + PurgeIncompleteSsoAuth, +} + +const ALL_JOBS: [ScheduledJob; 9] = [ + ScheduledJob::SendPurge, + ScheduledJob::TrashPurge, + ScheduledJob::Incomplete2faNotifications, + ScheduledJob::EmergencyRequestTimeout, + ScheduledJob::EmergencyNotificationReminder, + ScheduledJob::AuthRequestPurge, + ScheduledJob::DuoContextPurge, + ScheduledJob::EventCleanup, + ScheduledJob::PurgeIncompleteSsoAuth, +]; + +impl ScheduledJob { + pub const fn as_str(self) -> &'static str { + match self { + ScheduledJob::SendPurge => "send_purge", + ScheduledJob::TrashPurge => "trash_purge", + ScheduledJob::Incomplete2faNotifications => "incomplete_2fa_notifications", + ScheduledJob::EmergencyRequestTimeout => "emergency_request_timeout", + ScheduledJob::EmergencyNotificationReminder => "emergency_notification_reminder", + ScheduledJob::AuthRequestPurge => "auth_request_purge", + ScheduledJob::DuoContextPurge => "duo_context_purge", + ScheduledJob::EventCleanup => "event_cleanup", + ScheduledJob::PurgeIncompleteSsoAuth => "purge_incomplete_sso_auth", + } + } + + pub fn from_str(name: &str) -> Option { + match name { + "send_purge" => Some(ScheduledJob::SendPurge), + "trash_purge" => Some(ScheduledJob::TrashPurge), + "incomplete_2fa_notifications" => Some(ScheduledJob::Incomplete2faNotifications), + "emergency_request_timeout" => Some(ScheduledJob::EmergencyRequestTimeout), + "emergency_notification_reminder" => Some(ScheduledJob::EmergencyNotificationReminder), + "auth_request_purge" => Some(ScheduledJob::AuthRequestPurge), + "duo_context_purge" => Some(ScheduledJob::DuoContextPurge), + "event_cleanup" => Some(ScheduledJob::EventCleanup), + "purge_incomplete_sso_auth" => Some(ScheduledJob::PurgeIncompleteSsoAuth), + _ => None, + } + } + + pub fn names() -> Vec<&'static str> { + ALL_JOBS.iter().map(|job| job.as_str()).collect() + } +} + +pub async fn run(pool: DbPool, job: ScheduledJob) -> Result<(), Error> { + info!("Running job: {}", job.as_str()); + + match job { + ScheduledJob::SendPurge => api::purge_sends(pool).await, + ScheduledJob::TrashPurge => api::purge_trashed_ciphers(pool).await, + ScheduledJob::Incomplete2faNotifications => api::send_incomplete_2fa_notifications(pool).await, + ScheduledJob::EmergencyRequestTimeout => api::emergency_request_timeout_job(pool).await, + ScheduledJob::EmergencyNotificationReminder => api::emergency_notification_reminder_job(pool).await, + ScheduledJob::AuthRequestPurge => purge_auth_requests(pool).await, + ScheduledJob::DuoContextPurge => purge_duo_contexts(pool).await, + ScheduledJob::EventCleanup => api::event_cleanup_job(pool).await, + ScheduledJob::PurgeIncompleteSsoAuth => SsoAuth::delete_expired(pool).await?, + } + + info!("Finished job: {}", job.as_str()); + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 8eef2e8c..2a524fa0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -53,6 +53,7 @@ mod crypto; #[macro_use] mod db; mod http_client; +mod jobs; mod mail; mod ratelimit; mod sso; @@ -70,27 +71,38 @@ pub use util::is_running_in_container; #[rocket::main] async fn main() -> Result<(), Error> { - parse_args(); - launch_info(); + let run_mode = parse_args(); + if matches!(run_mode, RunMode::Server) { + launch_info(); + } let level = init_logging()?; check_data_folder().await; - auth::initialize_keys().await.unwrap_or_else(|e| { - error!("Error creating private key '{}'\n{e:?}\nExiting Vaultwarden!", CONFIG.private_rsa_key()); - exit(1); - }); - check_web_vault(); - create_dir(&CONFIG.tmp_folder(), "tmp folder"); + match run_mode { + RunMode::Server => { + auth::initialize_keys().await.unwrap_or_else(|e| { + error!("Error creating private key '{}'\n{e:?}\nExiting Vaultwarden!", CONFIG.private_rsa_key()); + exit(1); + }); + check_web_vault(); + create_dir(&CONFIG.tmp_folder(), "tmp folder"); - let pool = create_db_pool().await; - schedule_jobs(pool.clone()); - db::models::TwoFactor::migrate_u2f_to_webauthn(&pool.get().await.unwrap()).await.unwrap(); - db::models::TwoFactor::migrate_credential_to_passkey(&pool.get().await.unwrap()).await.unwrap(); + let pool = create_db_pool(true).await; + schedule_jobs(pool.clone()); + db::models::TwoFactor::migrate_u2f_to_webauthn(&pool.get().await.unwrap()).await.unwrap(); + db::models::TwoFactor::migrate_credential_to_passkey(&pool.get().await.unwrap()).await.unwrap(); - let extra_debug = matches!(level, log::LevelFilter::Trace | log::LevelFilter::Debug); - launch_rocket(pool, extra_debug).await // Blocks until program termination. + let extra_debug = matches!(level, log::LevelFilter::Trace | log::LevelFilter::Debug); + launch_rocket(pool, extra_debug).await // Blocks until program termination. + } + RunMode::RunSingleJob(job) => { + create_dir(&CONFIG.tmp_folder(), "tmp folder"); + let pool = create_db_pool(false).await; + jobs::run(pool, job).await + } + } } const HELP: &str = "\ @@ -107,6 +119,8 @@ COMMAND: hash [--preset {bitwarden|owasp}] Generate an Argon2id PHC ADMIN_TOKEN backup Create a backup of the SQLite database You can also send the USR1 signal to trigger a backup + jobs run Run one scheduled background job and exit + jobs run --list List available scheduled background jobs PRESETS: m= t= p= bitwarden (default) 64MiB, 3 Iterations, 4 Threads @@ -116,7 +130,12 @@ PRESETS: m= t= p= pub const VERSION: Option<&str> = option_env!("VW_VERSION"); -fn parse_args() { +enum RunMode { + Server, + RunSingleJob(jobs::ScheduledJob), +} + +fn parse_args() -> RunMode { let mut pargs = pico_args::Arguments::from_env(); let version = VERSION.unwrap_or("(Version info from Git not present)"); @@ -197,9 +216,49 @@ fn parse_args() { exit(1); } } + } else if command == "jobs" { + let action = pargs.subcommand().unwrap_or_default(); + let action = action.as_deref().unwrap_or_default(); + + match action { + "run" => { + let args = pargs.finish(); + if args.len() == 1 && (args[0] == "--list" || args[0] == "-l") { + println!("Available jobs:"); + for name in jobs::ScheduledJob::names() { + println!(" {name}"); + } + exit(0); + } + + if args.len() != 1 { + println!("Usage: vaultwarden jobs run "); + println!("Usage: vaultwarden jobs run --list"); + println!("Available jobs: {}", jobs::ScheduledJob::names().join(", ")); + exit(1); + } + + let job_name = args[0].to_string_lossy(); + let Some(job) = jobs::ScheduledJob::from_str(&job_name) else { + println!("Unknown job '{job_name}'"); + println!("Available jobs: {}", jobs::ScheduledJob::names().join(", ")); + exit(1); + }; + + return RunMode::RunSingleJob(job); + } + _ => { + println!("Usage: vaultwarden jobs run "); + println!("Usage: vaultwarden jobs run --list"); + println!("Available jobs: {}", jobs::ScheduledJob::names().join(", ")); + exit(1); + } + } } exit(0); } + + RunMode::Server } fn launch_info() { @@ -544,8 +603,16 @@ fn check_web_vault() { } } -async fn create_db_pool() -> db::DbPool { - match util::retry_db(db::DbPool::from_config, CONFIG.db_connection_retries()).await { +async fn create_db_pool(run_migrations: bool) -> db::DbPool { + let build_pool = || { + if run_migrations { + db::DbPool::from_config() + } else { + db::DbPool::from_config_no_migrations() + } + }; + + match util::retry_db(build_pool, CONFIG.db_connection_retries()).await { Ok(p) => p, Err(e) => { error!("Error creating database pool: {e:?}");