mirror of https://github.com/mastodon/flodgatt
Read user and domain blocks from Postgres
This commit reads the blocks from pg and stores them in the User struct; it does not yet actually filter the responses. It also does not update the tests.
This commit is contained in:
parent
ebe9aeccbc
commit
35ab488b11
|
@ -29,7 +29,7 @@ fn main() {
|
|||
|
||||
let client_agent_sse = ClientAgent::blank(redis_cfg);
|
||||
let client_agent_ws = client_agent_sse.clone_with_shared_receiver();
|
||||
let pg_pool = user::PostgresPool::new(postgres_cfg);
|
||||
let pg_pool = user::PgPool::new(postgres_cfg);
|
||||
|
||||
log::warn!("Streaming server initialized and ready to accept connections");
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! Filters for all the endpoints accessible for Server Sent Event updates
|
||||
use super::{
|
||||
query::{self, Query},
|
||||
user::{PostgresPool, User},
|
||||
user::{PgPool, User},
|
||||
};
|
||||
use warp::{filters::BoxedFilter, path, Filter};
|
||||
#[allow(dead_code)]
|
||||
|
@ -39,7 +39,7 @@ macro_rules! parse_query {
|
|||
.boxed()
|
||||
};
|
||||
}
|
||||
pub fn extract_user_or_reject(pg_pool: PostgresPool) -> BoxedFilter<(User,)> {
|
||||
pub fn extract_user_or_reject(pg_pool: PgPool) -> BoxedFilter<(User,)> {
|
||||
any_of!(
|
||||
parse_query!(
|
||||
path => "api" / "v1" / "streaming" / "user" / "notification"
|
||||
|
|
|
@ -5,7 +5,7 @@ mod mock_postgres;
|
|||
use mock_postgres as postgres;
|
||||
#[cfg(not(test))]
|
||||
mod postgres;
|
||||
pub use self::postgres::PostgresPool;
|
||||
pub use self::postgres::PostgresPool as PgPool;
|
||||
use super::query::Query;
|
||||
use warp::reject::Rejection;
|
||||
|
||||
|
@ -22,19 +22,6 @@ impl Default for Filter {
|
|||
}
|
||||
}
|
||||
|
||||
/// The User (with data read from Postgres)
|
||||
#[derive(Clone, Debug, Default, PartialEq)]
|
||||
pub struct User {
|
||||
pub target_timeline: String,
|
||||
pub email: String, // We only use email for logging; we could cut it for performance
|
||||
pub id: i64,
|
||||
pub access_token: String,
|
||||
pub scopes: OauthScope,
|
||||
pub langs: Option<Vec<String>>,
|
||||
pub logged_in: bool,
|
||||
pub filter: Filter,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, PartialEq)]
|
||||
pub struct OauthScope {
|
||||
pub all: bool,
|
||||
|
@ -58,51 +45,58 @@ impl From<Vec<String>> for OauthScope {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default, Debug, PartialEq)]
|
||||
pub struct Blocks {
|
||||
domain_blocks: Vec<String>,
|
||||
user_blocks: Vec<i64>,
|
||||
}
|
||||
|
||||
/// The User (with data read from Postgres)
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct User {
|
||||
pub target_timeline: String,
|
||||
pub email: String, // We only use email for logging; we could cut it for performance
|
||||
pub access_token: String, // We only need this once (to send back with the WS reply). Cut?
|
||||
pub id: i64,
|
||||
pub scopes: OauthScope,
|
||||
pub langs: Option<Vec<String>>,
|
||||
pub logged_in: bool,
|
||||
pub filter: Filter,
|
||||
pub blocks: Blocks,
|
||||
}
|
||||
|
||||
impl Default for User {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
scopes: OauthScope::default(),
|
||||
langs: None,
|
||||
logged_in: false,
|
||||
target_timeline: String::new(),
|
||||
filter: Filter::default(),
|
||||
blocks: Blocks::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl User {
|
||||
pub fn from_query(q: Query, pg_pool: PostgresPool) -> Result<Self, Rejection> {
|
||||
let (id, access_token, email, scopes, langs, logged_in) = match q.access_token.clone() {
|
||||
None => (
|
||||
-1,
|
||||
"no access token".to_owned(),
|
||||
"".to_string(),
|
||||
OauthScope::default(),
|
||||
None,
|
||||
false,
|
||||
),
|
||||
Some(token) => {
|
||||
let (id, email, langs, scope_list) =
|
||||
postgres::query_for_user_data(&token, pg_pool.clone());
|
||||
|
||||
if id == -1 {
|
||||
return Err(warp::reject::custom("Error: Invalid access token"));
|
||||
}
|
||||
let scopes = OauthScope::from(scope_list);
|
||||
(id, token, email, scopes, langs, true)
|
||||
}
|
||||
};
|
||||
let mut user = User {
|
||||
id,
|
||||
email,
|
||||
target_timeline: "PLACEHOLDER".to_string(),
|
||||
access_token,
|
||||
scopes,
|
||||
langs,
|
||||
logged_in,
|
||||
filter: Filter::Language,
|
||||
pub fn from_query(q: Query, pool: PgPool) -> Result<Self, Rejection> {
|
||||
let mut user: User = match q.access_token.clone() {
|
||||
None => User::default(),
|
||||
Some(token) => postgres::select_user(&token, pool.clone())?,
|
||||
};
|
||||
|
||||
user = user.update_timeline_and_filter(q, pg_pool.clone())?;
|
||||
user = user.set_timeline_and_filter(q, pool.clone())?;
|
||||
user.blocks.user_blocks = postgres::select_user_blocks(user.id, pool.clone());
|
||||
user.blocks.domain_blocks = postgres::select_domain_blocks(pool.clone());
|
||||
|
||||
Ok(user)
|
||||
}
|
||||
|
||||
fn update_timeline_and_filter(
|
||||
mut self,
|
||||
q: Query,
|
||||
pg_pool: PostgresPool,
|
||||
) -> Result<Self, Rejection> {
|
||||
fn set_timeline_and_filter(mut self, q: Query, pool: PgPool) -> Result<Self, Rejection> {
|
||||
let read_scope = self.scopes.clone();
|
||||
|
||||
let timeline = match q.stream.as_ref() {
|
||||
// Public endpoints:
|
||||
tl @ "public" | tl @ "public:local" if q.media => format!("{}:media", tl),
|
||||
|
@ -110,7 +104,7 @@ impl User {
|
|||
tl @ "public" | tl @ "public:local" => tl.to_string(),
|
||||
// Hashtag endpoints:
|
||||
tl @ "hashtag" | tl @ "hashtag:local" => format!("{}:{}", tl, q.hashtag),
|
||||
// Private endpoints: User
|
||||
// Private endpoints: User:
|
||||
"user" if self.logged_in && (read_scope.all || read_scope.statuses) => {
|
||||
self.filter = Filter::NoFilter;
|
||||
format!("{}", self.id)
|
||||
|
@ -120,7 +114,7 @@ impl User {
|
|||
format!("{}", self.id)
|
||||
}
|
||||
// List endpoint:
|
||||
"list" if self.owns_list(q.list, pg_pool) && (read_scope.all || read_scope.lists) => {
|
||||
"list" if self.owns_list(q.list, pool) && (read_scope.all || read_scope.lists) => {
|
||||
self.filter = Filter::NoFilter;
|
||||
format!("list:{}", q.list)
|
||||
}
|
||||
|
@ -142,11 +136,7 @@ impl User {
|
|||
})
|
||||
}
|
||||
|
||||
/// Determine whether the User is authorised for a specified list
|
||||
pub fn owns_list(&self, list: i64, pg_pool: PostgresPool) -> bool {
|
||||
match postgres::query_list_owner(list, pg_pool) {
|
||||
Some(i) if i == self.id => true,
|
||||
_ => false,
|
||||
}
|
||||
fn owns_list(&self, list: i64, pool: PgPool) -> bool {
|
||||
postgres::user_owns_list(self.id, list, pool)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
//! Postgres queries
|
||||
use crate::config;
|
||||
use crate::{
|
||||
config,
|
||||
parse_client_request::user::{OauthScope, User},
|
||||
};
|
||||
use ::postgres;
|
||||
use r2d2_postgres::PostgresConnectionManager;
|
||||
use warp::reject::Rejection;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PostgresPool(pub r2d2::Pool<PostgresConnectionManager<postgres::NoTls>>);
|
||||
|
@ -25,12 +29,12 @@ impl PostgresPool {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn query_for_user_data(
|
||||
access_token: &str,
|
||||
pg_pool: PostgresPool,
|
||||
) -> (i64, String, Option<Vec<String>>, Vec<String>) {
|
||||
/// Build a user based on the result of querying Postgres with the access token
|
||||
///
|
||||
/// This does _not_ set the timeline, filter, or blocks fields. Use the various `User`
|
||||
/// methods to do so. In general, this function shouldn't be needed outside `User`.
|
||||
pub fn select_user(access_token: &str, pg_pool: PostgresPool) -> Result<User, Rejection> {
|
||||
let mut conn = pg_pool.0.get().unwrap();
|
||||
|
||||
let query_result = conn
|
||||
.query(
|
||||
"
|
||||
|
@ -45,19 +49,23 @@ LIMIT 1",
|
|||
&[&access_token.to_owned()],
|
||||
)
|
||||
.expect("Hard-coded query will return Some([0 or more rows])");
|
||||
if !query_result.is_empty() {
|
||||
if query_result.is_empty() {
|
||||
Err(warp::reject::custom("Error: Invalid access token"))
|
||||
} else {
|
||||
let only_row: &postgres::Row = query_result.get(0).unwrap();
|
||||
let id: i64 = only_row.get(1);
|
||||
let email: String = only_row.get(2);
|
||||
let scopes = only_row
|
||||
let scope_vec: Vec<String> = only_row
|
||||
.get::<_, String>(4)
|
||||
.split(' ')
|
||||
.map(|s| s.to_owned())
|
||||
.collect();
|
||||
let langs: Option<Vec<String>> = only_row.get(3);
|
||||
(id, email, langs, scopes)
|
||||
} else {
|
||||
(-1, "".to_string(), None, Vec::new())
|
||||
Ok(User {
|
||||
id: only_row.get(1),
|
||||
email: only_row.get(2),
|
||||
logged_in: true,
|
||||
scopes: OauthScope::from(scope_vec),
|
||||
langs: only_row.get(3),
|
||||
..User::default()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,7 +87,50 @@ pub fn query_for_user_data(access_token: &str) -> (i64, Option<Vec<String>>, Vec
|
|||
(user_id, lang, scopes)
|
||||
}
|
||||
|
||||
pub fn query_list_owner(list_id: i64, pg_pool: PostgresPool) -> Option<i64> {
|
||||
/// Query Postgres for everyone the user has blocked or muted
|
||||
///
|
||||
/// **NOTE**: because we check this when the user connects, it will not include any blocks
|
||||
/// the user adds until they refresh/reconnect.
|
||||
pub fn select_user_blocks(user_id: i64, pg_pool: PostgresPool) -> Vec<i64> {
|
||||
pg_pool
|
||||
.0
|
||||
.get()
|
||||
.unwrap()
|
||||
.query(
|
||||
"
|
||||
SELECT target_account_id
|
||||
FROM blocks
|
||||
WHERE account_id = $1
|
||||
UNION SELECT target_account_id
|
||||
FROM mutes
|
||||
WHERE account_id = $1",
|
||||
&[&user_id],
|
||||
)
|
||||
.expect("Hard-coded query will return Some([0 or more rows])")
|
||||
.iter()
|
||||
.map(|row| row.get(0))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Query Postgres for all current domain blocks
|
||||
///
|
||||
/// **NOTE**: because we check this when the user connects, it will not include any blocks
|
||||
/// the user adds until they refresh/reconnect. Additionally, we are querying it once per
|
||||
/// user, even though it is constant for all users (at any given time).
|
||||
pub fn select_domain_blocks(pg_pool: PostgresPool) -> Vec<String> {
|
||||
pg_pool
|
||||
.0
|
||||
.get()
|
||||
.unwrap()
|
||||
.query("SELECT domain FROM domain_blocks", &[])
|
||||
.expect("Hard-coded query will return Some([0 or more rows])")
|
||||
.iter()
|
||||
.map(|row| row.get(0))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Test whether a user owns a list
|
||||
pub fn user_owns_list(user_id: i64, list_id: i64, pg_pool: PostgresPool) -> bool {
|
||||
let mut conn = pg_pool.0.get().unwrap();
|
||||
// For the Postgres query, `id` = list number; `account_id` = user.id
|
||||
let rows = &conn
|
||||
|
@ -92,9 +143,12 @@ LIMIT 1",
|
|||
&[&list_id],
|
||||
)
|
||||
.expect("Hard-coded query will return Some([0 or more rows])");
|
||||
if rows.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(rows.get(0).unwrap().get(1))
|
||||
|
||||
match rows.get(0) {
|
||||
None => false,
|
||||
Some(row) => {
|
||||
let list_owner_id: i64 = row.get(1);
|
||||
list_owner_id == user_id
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! Filters for the WebSocket endpoint
|
||||
use super::{
|
||||
query::{self, Query},
|
||||
user::{PostgresPool, User},
|
||||
user::{PgPool, User},
|
||||
};
|
||||
use warp::{filters::BoxedFilter, path, Filter};
|
||||
|
||||
|
@ -32,7 +32,7 @@ fn parse_query() -> BoxedFilter<(Query,)> {
|
|||
.boxed()
|
||||
}
|
||||
|
||||
pub fn extract_user_or_reject(pg_pool: PostgresPool) -> BoxedFilter<(User,)> {
|
||||
pub fn extract_user_or_reject(pg_pool: PgPool) -> BoxedFilter<(User,)> {
|
||||
parse_query()
|
||||
.and(query::OptionalAccessToken::from_ws_header())
|
||||
.and_then(Query::update_access_token)
|
||||
|
|
Loading…
Reference in New Issue