|
|
@ -7,7 +7,7 @@ use std::{ |
|
|
|
|
|
|
|
use diesel::{ |
|
|
|
connection::SimpleConnection, |
|
|
|
r2d2::{ConnectionManager, CustomizeConnection, Pool, PooledConnection}, |
|
|
|
r2d2::{CustomizeConnection, Pool, PooledConnection}, |
|
|
|
Connection, RunQueryDsl, |
|
|
|
}; |
|
|
|
|
|
|
@ -54,6 +54,64 @@ pub enum DbConnInner { |
|
|
|
Sqlite(diesel::sqlite::SqliteConnection), |
|
|
|
} |
|
|
|
|
|
|
|
/// Custom connection manager that implements manual connection establishment
|
|
|
|
pub struct DbConnManager { |
|
|
|
database_url: String, |
|
|
|
} |
|
|
|
|
|
|
|
impl DbConnManager { |
|
|
|
pub fn new(database_url: &str) -> Self { |
|
|
|
Self { |
|
|
|
database_url: database_url.to_string(), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
fn establish_connection(&self) -> Result<DbConnInner, diesel::r2d2::Error> { |
|
|
|
let url = &self.database_url; |
|
|
|
|
|
|
|
match DbConnType::from_url(url) { |
|
|
|
#[cfg(mysql)] |
|
|
|
Ok(DbConnType::Mysql) => { |
|
|
|
let conn = diesel::mysql::MysqlConnection::establish(url)?; |
|
|
|
Ok(DbConnInner::Mysql(conn)) |
|
|
|
} |
|
|
|
#[cfg(postgresql)] |
|
|
|
Ok(DbConnType::Postgresql) => { |
|
|
|
let conn = diesel::pg::PgConnection::establish(url)?; |
|
|
|
Ok(DbConnInner::Postgresql(conn)) |
|
|
|
} |
|
|
|
#[cfg(sqlite)] |
|
|
|
Ok(DbConnType::Sqlite) => { |
|
|
|
let conn = diesel::sqlite::SqliteConnection::establish(url)?; |
|
|
|
Ok(DbConnInner::Sqlite(conn)) |
|
|
|
} |
|
|
|
|
|
|
|
Err(e) => Err(diesel::r2d2::Error::ConnectionError(diesel::ConnectionError::InvalidConnectionUrl( |
|
|
|
format!("Unable to estabilsh a connection: {e:?}"), |
|
|
|
))), |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
impl diesel::r2d2::ManageConnection for DbConnManager { |
|
|
|
type Connection = DbConnInner; |
|
|
|
type Error = diesel::r2d2::Error; |
|
|
|
|
|
|
|
fn connect(&self) -> Result<Self::Connection, Self::Error> { |
|
|
|
self.establish_connection() |
|
|
|
} |
|
|
|
|
|
|
|
fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { |
|
|
|
use diesel::r2d2::R2D2Connection; |
|
|
|
conn.ping().map_err(diesel::r2d2::Error::QueryError) |
|
|
|
} |
|
|
|
|
|
|
|
fn has_broken(&self, conn: &mut Self::Connection) -> bool { |
|
|
|
use diesel::r2d2::R2D2Connection; |
|
|
|
conn.is_broken() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
#[derive(Eq, PartialEq)] |
|
|
|
pub enum DbConnType { |
|
|
|
#[cfg(mysql)] |
|
|
@ -67,7 +125,7 @@ pub enum DbConnType { |
|
|
|
pub static ACTIVE_DB_TYPE: OnceLock<DbConnType> = OnceLock::new(); |
|
|
|
|
|
|
|
pub struct DbConn { |
|
|
|
conn: Arc<Mutex<Option<PooledConnection<ConnectionManager<DbConnInner>>>>>, |
|
|
|
conn: Arc<Mutex<Option<PooledConnection<DbConnManager>>>>, |
|
|
|
permit: Option<OwnedSemaphorePermit>, |
|
|
|
} |
|
|
|
|
|
|
@ -88,7 +146,7 @@ impl CustomizeConnection<DbConnInner, diesel::r2d2::Error> for DbConnOptions { |
|
|
|
#[derive(Clone)] |
|
|
|
pub struct DbPool { |
|
|
|
// This is an 'Option' so that we can drop the pool in a 'spawn_blocking'.
|
|
|
|
pool: Option<Pool<ConnectionManager<DbConnInner>>>, |
|
|
|
pool: Option<Pool<DbConnManager>>, |
|
|
|
semaphore: Arc<Semaphore>, |
|
|
|
} |
|
|
|
|
|
|
@ -154,7 +212,7 @@ impl DbPool { |
|
|
|
} |
|
|
|
|
|
|
|
let max_conns = CONFIG.database_max_conns(); |
|
|
|
let manager = ConnectionManager::<DbConnInner>::new(&db_url); |
|
|
|
let manager = DbConnManager::new(&db_url); |
|
|
|
let pool = Pool::builder() |
|
|
|
.max_size(max_conns) |
|
|
|
.min_idle(Some(CONFIG.database_min_conns())) |
|
|
|