|
@ -75,12 +75,10 @@ macro_rules! generate_connections { |
|
|
#[cfg($name)] |
|
|
#[cfg($name)] |
|
|
impl CustomizeConnection<$ty, diesel::r2d2::Error> for DbConnOptions { |
|
|
impl CustomizeConnection<$ty, diesel::r2d2::Error> for DbConnOptions { |
|
|
fn on_acquire(&self, conn: &mut $ty) -> Result<(), diesel::r2d2::Error> { |
|
|
fn on_acquire(&self, conn: &mut $ty) -> Result<(), diesel::r2d2::Error> { |
|
|
(|| { |
|
|
if !self.init_stmts.is_empty() { |
|
|
if !self.init_stmts.is_empty() { |
|
|
conn.batch_execute(&self.init_stmts).map_err(diesel::r2d2::Error::QueryError)?; |
|
|
conn.batch_execute(&self.init_stmts)?; |
|
|
} |
|
|
} |
|
|
Ok(()) |
|
|
Ok(()) |
|
|
|
|
|
})().map_err(diesel::r2d2::Error::QueryError) |
|
|
|
|
|
} |
|
|
} |
|
|
})+ |
|
|
})+ |
|
|
|
|
|
|
|
@ -97,7 +95,7 @@ macro_rules! generate_connections { |
|
|
|
|
|
|
|
|
impl Drop for DbConn { |
|
|
impl Drop for DbConn { |
|
|
fn drop(&mut self) { |
|
|
fn drop(&mut self) { |
|
|
let conn = self.conn.clone(); |
|
|
let conn = Arc::clone(&self.conn); |
|
|
let permit = self.permit.take(); |
|
|
let permit = self.permit.take(); |
|
|
|
|
|
|
|
|
// Since connection can't be on the stack in an async fn during an
|
|
|
// Since connection can't be on the stack in an async fn during an
|
|
@ -143,21 +141,20 @@ macro_rules! generate_connections { |
|
|
})) |
|
|
})) |
|
|
.build(manager) |
|
|
.build(manager) |
|
|
.map_res("Failed to create pool")?; |
|
|
.map_res("Failed to create pool")?; |
|
|
return Ok(DbPool { |
|
|
Ok(DbPool { |
|
|
pool: Some(DbPoolInner::$name(pool)), |
|
|
pool: Some(DbPoolInner::$name(pool)), |
|
|
semaphore: Arc::new(Semaphore::new(CONFIG.database_max_conns() as usize)), |
|
|
semaphore: Arc::new(Semaphore::new(CONFIG.database_max_conns() as usize)), |
|
|
}); |
|
|
}) |
|
|
} |
|
|
} |
|
|
#[cfg(not($name))] |
|
|
#[cfg(not($name))] |
|
|
#[allow(unreachable_code)] |
|
|
unreachable!("Trying to use a DB backend when it's feature is disabled") |
|
|
return unreachable!("Trying to use a DB backend when it's feature is disabled"); |
|
|
|
|
|
}, |
|
|
}, |
|
|
)+ } |
|
|
)+ } |
|
|
} |
|
|
} |
|
|
// Get a connection from the pool
|
|
|
// Get a connection from the pool
|
|
|
pub async fn get(&self) -> Result<DbConn, Error> { |
|
|
pub async fn get(&self) -> Result<DbConn, Error> { |
|
|
let duration = Duration::from_secs(CONFIG.database_timeout()); |
|
|
let duration = Duration::from_secs(CONFIG.database_timeout()); |
|
|
let permit = match timeout(duration, self.semaphore.clone().acquire_owned()).await { |
|
|
let permit = match timeout(duration, Arc::clone(&self.semaphore).acquire_owned()).await { |
|
|
Ok(p) => p.expect("Semaphore should be open"), |
|
|
Ok(p) => p.expect("Semaphore should be open"), |
|
|
Err(_) => { |
|
|
Err(_) => { |
|
|
err!("Timeout waiting for database connection"); |
|
|
err!("Timeout waiting for database connection"); |
|
@ -170,10 +167,10 @@ macro_rules! generate_connections { |
|
|
let pool = p.clone(); |
|
|
let pool = p.clone(); |
|
|
let c = run_blocking(move || pool.get_timeout(duration)).await.map_res("Error retrieving connection from pool")?; |
|
|
let c = run_blocking(move || pool.get_timeout(duration)).await.map_res("Error retrieving connection from pool")?; |
|
|
|
|
|
|
|
|
return Ok(DbConn { |
|
|
Ok(DbConn { |
|
|
conn: Arc::new(Mutex::new(Some(DbConnInner::$name(c)))), |
|
|
conn: Arc::new(Mutex::new(Some(DbConnInner::$name(c)))), |
|
|
permit: Some(permit) |
|
|
permit: Some(permit) |
|
|
}); |
|
|
}) |
|
|
}, |
|
|
}, |
|
|
)+ } |
|
|
)+ } |
|
|
} |
|
|
} |
|
|