Filter out toots involving blocked/muted users

This commit is contained in:
Daniel Sockwell 2020-03-12 21:35:33 -04:00
parent 32fc17a89a
commit 9ec8692ba5
4 changed files with 64 additions and 38 deletions

View File

@ -7,6 +7,7 @@ use mock_postgres as postgres;
mod postgres;
pub use self::postgres::PgPool;
use super::query::Query;
use std::collections::HashSet;
use warp::reject::Rejection;
/// The filters that can be applied to toots after they come from Redis
@ -47,8 +48,8 @@ impl From<Vec<String>> for OauthScope {
#[derive(Clone, Default, Debug, PartialEq)]
pub struct Blocks {
domain_blocks: Vec<String>,
user_blocks: Vec<i64>,
pub domain_blocks: HashSet<String>,
pub user_blocks: HashSet<i64>,
}
/// The User (with data read from Postgres)
@ -83,6 +84,7 @@ impl Default for User {
impl User {
pub fn from_query(q: Query, pool: PgPool) -> Result<Self, Rejection> {
println!("Creating user...");
let mut user: User = match q.access_token.clone() {
None => User::default(),
Some(token) => postgres::select_user(&token, pool.clone())?,
@ -91,7 +93,7 @@ impl User {
user = user.set_timeline_and_filter(q, pool.clone())?;
user.blocks.user_blocks = postgres::select_user_blocks(user.id, pool.clone());
user.blocks.domain_blocks = postgres::select_domain_blocks(pool.clone());
dbg!(&user);
Ok(user)
}

View File

@ -5,6 +5,7 @@ use crate::{
};
use ::postgres;
use r2d2_postgres::PostgresConnectionManager;
use std::collections::HashSet;
use warp::reject::Rejection;
#[derive(Clone)]
@ -60,6 +61,7 @@ LIMIT 1",
.collect();
Ok(User {
id: only_row.get(1),
access_token: access_token.to_string(),
email: only_row.get(2),
logged_in: true,
scopes: OauthScope::from(scope_vec),
@ -69,29 +71,11 @@ LIMIT 1",
}
}
#[cfg(test)]
pub fn query_for_user_data(access_token: &str) -> (i64, Option<Vec<String>>, Vec<String>) {
let (user_id, lang, scopes) = if access_token == "TEST_USER" {
(
1,
None,
vec![
"read".to_string(),
"write".to_string(),
"follow".to_string(),
],
)
} else {
(-1, None, Vec::new())
};
(user_id, lang, scopes)
}
/// Query Postgres for everyone the user has blocked or muted
///
/// **NOTE**: because we check this when the user connects, it will not include any blocks
/// the user adds until they refresh/reconnect.
pub fn select_user_blocks(user_id: i64, pg_pool: PgPool) -> Vec<i64> {
pub fn select_user_blocks(user_id: i64, pg_pool: PgPool) -> HashSet<i64> {
pg_pool
.0
.get()
@ -117,7 +101,7 @@ UNION SELECT target_account_id
/// **NOTE**: because we check this when the user connects, it will not include any blocks
/// the user adds until they refresh/reconnect. Additionally, we are querying it once per
/// user, even though it is constant for all users (at any given time).
pub fn select_domain_blocks(pg_pool: PgPool) -> Vec<String> {
pub fn select_domain_blocks(pg_pool: PgPool) -> HashSet<String> {
pg_pool
.0
.get()

View File

@ -19,7 +19,7 @@ use super::receiver::Receiver;
use crate::{config, parse_client_request::user::User};
use futures::{Async, Poll};
use serde_json::Value;
use std::sync;
use std::{collections::HashSet, sync};
use tokio::io::Error;
use uuid::Uuid;
@ -110,6 +110,7 @@ impl futures::stream::Stream for ClientAgent {
}
/// The message to send to the client (which might not literally be a toot in some cases).
#[derive(Debug, Clone)]
pub struct Toot {
pub category: String,
pub payload: Value,
@ -118,7 +119,7 @@ pub struct Toot {
impl Toot {
/// Construct a `Toot` from well-formed JSON.
fn from_json(value: Value) -> Self {
pub fn from_json(value: Value) -> Self {
let category = value["event"].as_str().expect("Redis string").to_owned();
let language = if category == "update" {
Some(value["payload"]["language"].to_string())
@ -133,8 +134,29 @@ impl Toot {
}
}
pub fn get_involved_users(&self) -> HashSet<i64> {
let mut involved_users: HashSet<i64> = HashSet::new();
let msg = self.payload.clone();
let api = "Invariant Violation: JSON value does not conform to Mastdon API";
involved_users.insert(msg["account"]["id"].str_to_i64().expect(&api));
if let Some(mentions) = msg["mentions"].as_array() {
for mention in mentions {
involved_users.insert(mention["id"].str_to_i64().expect(&api));
}
}
if let Some(replied_to_account) = msg["in_reply_to_account_id"].as_str() {
involved_users.insert(replied_to_account.parse().expect(&api));
}
if let Some(reblog) = msg["reblog"].as_object() {
involved_users.insert(reblog["account"]["id"].str_to_i64().expect(&api));
}
involved_users
}
/// Filter out any `Toot`'s that fail the provided filter.
fn filter(self, user: &User) -> Result<Async<Option<Self>>, Error> {
pub fn filter(self, user: &User) -> Result<Async<Option<Self>>, Error> {
let toot = self;
let category = toot.category.clone();
@ -161,3 +183,17 @@ impl Toot {
}
}
}
trait ConvertValue {
fn str_to_i64(&self) -> Result<i64, Box<dyn std::error::Error>>;
}
impl ConvertValue for Value {
fn str_to_i64(&self) -> Result<i64, Box<dyn std::error::Error>> {
Ok(self
.as_str()
.ok_or(format!("{} is not a string", &self))?
.parse()
.map_err(|_| "Could not parse str")?)
}
}

View File

@ -82,29 +82,33 @@ pub fn send_updates_to_ws(
let mut time = time::Instant::now();
let (tl, email, id) = (
let (tl, email, id, blocked_users) = (
client_agent.current_user.target_timeline.clone(),
client_agent.current_user.email.clone(),
client_agent.current_user.id,
client_agent.current_user.blocks.user_blocks.clone(),
);
// Every time you get an event from that stream, send it through the pipe
event_stream
.for_each(move |_instant| {
if let Ok(Async::Ready(Some(toot))) = client_agent.poll() {
let txt = &toot.payload["content"];
log::warn!("toot: {}\n in TL: {}\nuser: {}({})", txt, tl, email, id);
if blocked_users.is_disjoint(&toot.get_involved_users()) {
let txt = &toot.payload["content"];
log::warn!("toot: {}\nTL: {}\nUser: {}({})", txt, tl, email, id);
let msg = warp::ws::Message::text(
json!({"event": toot.category,
"payload": toot.payload.to_string()})
.to_string(),
);
tx.unbounded_send(msg).expect("No send error");
tx.unbounded_send(warp::ws::Message::text(
json!({ "event": toot.category,
"payload": &toot.payload.to_string() })
.to_string(),
))
.expect("No send error");
} else {
log::info!("Blocked a message to {}", email);
}
};
if time.elapsed() > time::Duration::from_secs(30) {
let msg = warp::ws::Message::text("{}");
tx.unbounded_send(msg).expect("Can ping");
tx.unbounded_send(warp::ws::Message::text("{}"))
.expect("Can ping");
time = time::Instant::now();
}
Ok(())