Initial postgres connection pool

This commit is contained in:
Daniel Sockwell 2020-01-10 14:46:29 -05:00
parent 15ff1c8224
commit 8334a3cbfa
7 changed files with 69 additions and 32 deletions

32
Cargo.lock generated
View File

@ -423,6 +423,8 @@ dependencies = [
"postgres 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)",
"postgres-openssl 0.2.0-rc.1 (git+https://github.com/sfackler/rust-postgres.git)",
"pretty_env_logger 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"r2d2 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)",
"r2d2_postgres 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1289,6 +1291,25 @@ dependencies = [
"proc-macro2 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "r2d2"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
"scheduled-thread-pool 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "r2d2_postgres"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"postgres 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)",
"r2d2 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rand"
version = "0.6.5"
@ -1555,6 +1576,14 @@ dependencies = [
"winapi-util 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"parking_lot 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "scoped-tls"
version = "1.0.0"
@ -2451,6 +2480,8 @@ dependencies = [
"checksum quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9274b940887ce9addde99c4eee6b5c44cc494b182b97e73dc8ffdcb3397fd3f0"
"checksum quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)" = "faf4799c5d274f3868a4aae320a0a182cbd2baee377b378f080e16a23e9d80db"
"checksum quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "053a8c8bcc71fcce321828dc897a98ab9760bef03a4fc36693c231e5b3216cfe"
"checksum r2d2 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)" = "1497e40855348e4a8a40767d8e55174bce1e445a3ac9254ad44ad468ee0485af"
"checksum r2d2_postgres 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)" = "707d27f66f43bac1081141f6d9611fffcce7da2841ae97c7ac53619d098efe8f"
"checksum rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca"
"checksum rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3ae1b169243eaf61759b8475a998f0a385e42042370f3a7dbaf35246eacc8412"
"checksum rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef"
@ -2483,6 +2514,7 @@ dependencies = [
"checksum ryu 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c92464b447c0ee8c4fb3824ecc8383b81717b9f1e74ba2e72540aef7b9f82997"
"checksum safemem 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d2b08423011dae9a5ca23f07cf57dac3857f5c885d352b76f6d95f4aea9434d0"
"checksum same-file 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "585e8ddcedc187886a30fa705c47985c3fa88d06624095856b36ca0b82ff4421"
"checksum scheduled-thread-pool 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f5de7bc31f28f8e6c28df5e1bf3d10610f5fdc14cc95f272853512c70a2bd779"
"checksum scoped-tls 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
"checksum scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27"
"checksum scopeguard 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b42e15e59b18a828bbf5c58ea01debb36b9b096346de35d941dcb89009f24a0d"

View File

@ -21,6 +21,8 @@ postgres-openssl = { git = "https://github.com/sfackler/rust-postgres.git"}
url = "2.1.0"
strum = "0.16.0"
strum_macros = "0.16.0"
r2d2_postgres = "0.16.0"
r2d2 = "0.8.8"
[dev-dependencies]
criterion = "0.3"

View File

@ -30,7 +30,7 @@ fn main() {
let client_agent_sse = ClientAgent::blank(redis_cfg);
let client_agent_ws = client_agent_sse.clone_with_shared_receiver();
let pg_conn = user::PostgresConn::new(postgres_cfg);
let pg_conn = user::PostgresPool::new(postgres_cfg);
warn!("Streaming server initialized and ready to accept connections");

View File

@ -1,7 +1,7 @@
//! Filters for all the endpoints accessible for Server Sent Event updates
use super::{
query::{self, Query},
user::{PostgresConn, User},
user::{PostgresPool, User},
};
use warp::{filters::BoxedFilter, path, Filter};
#[allow(dead_code)]
@ -39,7 +39,7 @@ macro_rules! parse_query {
.boxed()
};
}
pub fn extract_user_or_reject(pg_conn: PostgresConn) -> BoxedFilter<(User,)> {
pub fn extract_user_or_reject(pg_conn: PostgresPool) -> BoxedFilter<(User,)> {
any_of!(
parse_query!(
path => "api" / "v1" / "streaming" / "user" / "notification"

View File

@ -1,11 +1,11 @@
//! `User` struct and related functionality
#[cfg(test)]
mod mock_postgres;
#[cfg(test)]
use mock_postgres as postgres;
#[cfg(not(test))]
//#[cfg(test)]
//mod mock_postgres;
//#[cfg(test)]
//use mock_postgres as postgres;
//#[cfg(not(test))]
mod postgres;
pub use self::postgres::PostgresConn;
pub use self::postgres::PostgresPool;
use super::query::Query;
use warp::reject::Rejection;
@ -58,7 +58,7 @@ impl From<Vec<String>> for OauthScope {
}
impl User {
pub fn from_query(q: Query, pg_conn: PostgresConn) -> Result<Self, Rejection> {
pub fn from_query(q: Query, pg_conn: PostgresPool) -> Result<Self, Rejection> {
let (id, access_token, scopes, langs, logged_in) = match q.access_token.clone() {
None => (
-1,
@ -95,7 +95,7 @@ impl User {
fn update_timeline_and_filter(
mut self,
q: Query,
pg_conn: PostgresConn,
pg_conn: PostgresPool,
) -> Result<Self, Rejection> {
let read_scope = self.scopes.clone();
@ -139,7 +139,7 @@ impl User {
}
/// Determine whether the User is authorised for a specified list
pub fn owns_list(&self, list: i64, pg_conn: PostgresConn) -> bool {
pub fn owns_list(&self, list: i64, pg_conn: PostgresPool) -> bool {
match postgres::query_list_owner(list, pg_conn) {
Some(i) if i == self.id => true,
_ => false,

View File

@ -1,33 +1,36 @@
//! Postgres queries
use crate::config;
use ::postgres;
use std::sync::{Arc, Mutex};
use r2d2_postgres::PostgresConnectionManager;
#[derive(Clone)]
pub struct PostgresConn(pub Arc<Mutex<postgres::Client>>);
impl PostgresConn {
pub struct PostgresPool(pub r2d2::Pool<PostgresConnectionManager<postgres::NoTls>>);
impl PostgresPool {
pub fn new(pg_cfg: config::PostgresConfig) -> Self {
let mut con = postgres::Config::new();
con.user(&pg_cfg.user)
let mut cfg = postgres::Config::new();
cfg.user(&pg_cfg.user)
.host(&*pg_cfg.host.to_string())
.port(*pg_cfg.port)
.dbname(&pg_cfg.database);
if let Some(password) = &*pg_cfg.password {
con.password(password);
cfg.password(password);
};
Self(Arc::new(Mutex::new(
con.connect(postgres::NoTls)
.expect("Can connect to local Postgres"),
)))
let manager = PostgresConnectionManager::new(cfg, postgres::NoTls);
let pool = r2d2::Pool::builder()
.max_size(10)
.build(manager)
.expect("Can connect to local postgres");
Self(pool)
}
}
#[cfg(not(test))]
pub fn query_for_user_data(
access_token: &str,
pg_conn: PostgresConn,
pg_pool: PostgresPool,
) -> (i64, Option<Vec<String>>, Vec<String>) {
let mut conn = pg_conn.0.lock().unwrap();
let mut conn = pg_pool.0.get().unwrap();
let query_result = conn
.query(
@ -77,8 +80,8 @@ pub fn query_for_user_data(access_token: &str) -> (i64, Option<Vec<String>>, Vec
}
#[cfg(not(test))]
pub fn query_list_owner(list_id: i64, pg_conn: PostgresConn) -> Option<i64> {
let mut conn = pg_conn.0.lock().unwrap();
pub fn query_list_owner(list_id: i64, pg_pool: PostgresPool) -> Option<i64> {
let mut conn = pg_pool.0.get().unwrap();
// For the Postgres query, `id` = list number; `account_id` = user.id
let rows = &conn
.query(
@ -97,7 +100,7 @@ LIMIT 1",
}
}
#[cfg(test)]
pub fn query_list_owner(_list_id: i64) -> Option<i64> {
Some(1)
}
//#[cfg(test)]
//pub fn query_list_owner(_list_id: i64) -> Option<i64> {
// Some(1)
//}

View File

@ -1,7 +1,7 @@
//! Filters for the WebSocket endpoint
use super::{
query::{self, Query},
user::{PostgresConn, User},
user::{PostgresPool, User},
};
use warp::{filters::BoxedFilter, path, Filter};
@ -32,7 +32,7 @@ fn parse_query() -> BoxedFilter<(Query,)> {
.boxed()
}
pub fn extract_user_or_reject(pg_conn: PostgresConn) -> BoxedFilter<(User,)> {
pub fn extract_user_or_reject(pg_conn: PostgresPool) -> BoxedFilter<(User,)> {
parse_query()
.and(query::OptionalAccessToken::from_ws_header())
.and_then(Query::update_access_token)