diff --git a/Cargo.lock b/Cargo.lock index e0d0277..9a0bdc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -453,7 +453,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.8.0" +version = "0.8.1" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 35a728e..881c450 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.8.0" +version = "0.8.1" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/err/mod.rs b/src/err/mod.rs index eb5839e..55a8927 100644 --- a/src/err/mod.rs +++ b/src/err/mod.rs @@ -35,20 +35,9 @@ impl From for FatalErr { Self::ReceiverErr(e) } } -pub fn die_with_msg2(msg: impl fmt::Display) { - eprintln!("{}", msg); - std::process::exit(1); -} +// TODO delete vvvv when postgres_cfg.rs has better error handling pub fn die_with_msg(msg: impl fmt::Display) -> ! { eprintln!("FATAL ERROR: {}", msg); std::process::exit(1); } - -#[macro_export] -macro_rules! log_fatal { - ($str:expr, $var:expr) => {{ - log::error!($str, $var); - panic!(); - };}; -} diff --git a/src/err/timeline.rs b/src/err/timeline.rs index 4ba9f34..6f05f89 100644 --- a/src/err/timeline.rs +++ b/src/err/timeline.rs @@ -2,10 +2,12 @@ use std::fmt; #[derive(Debug)] pub enum TimelineErr { - RedisNamespaceMismatch, + MissingHashtag, InvalidInput, } +impl std::error::Error for TimelineErr {} + impl From for TimelineErr { fn from(_error: std::num::ParseIntError) -> Self { Self::InvalidInput @@ -16,8 +18,8 @@ impl fmt::Display for TimelineErr { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { use TimelineErr::*; let msg = match self { - RedisNamespaceMismatch => "TODO: Cut this error", - InvalidInput => "The timeline text from Redis could not be parsed into a supported timeline. TODO: add incoming timeline text" + InvalidInput => "The timeline text from Redis could not be parsed into a supported timeline. TODO: add incoming timeline text", + MissingHashtag => "Attempted to send a hashtag timeline without supplying a tag name", }; write!(f, "{}", msg) } diff --git a/src/main.rs b/src/main.rs index 4a79a8f..fed290e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,6 +66,7 @@ fn main() -> Result<(), FatalErr> { log::info!("Incoming websocket request for {:?}", subscription.timeline); { let mut receiver = ws_receiver.lock().unwrap_or_else(Receiver::recover); + receiver.subscribe(&subscription).unwrap_or_else(|e| { log::error!("Could not subscribe to the Redis channel: {}", e) }); diff --git a/src/messages/event/checked_event/id.rs b/src/messages/event/checked_event/id.rs index 8eb031a..0226e5d 100644 --- a/src/messages/event/checked_event/id.rs +++ b/src/messages/event/checked_event/id.rs @@ -37,6 +37,12 @@ impl FromStr for Id { } } +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "{}", self.0) + } +} + impl Serialize for Id { fn serialize(&self, serializer: S) -> Result where diff --git a/src/messages/event/dynamic_event.rs b/src/messages/event/dynamic_event.rs deleted file mode 100644 index fa87142..0000000 --- a/src/messages/event/dynamic_event.rs +++ /dev/null @@ -1,87 +0,0 @@ -use crate::parse_client_request::Blocks; -use hashbrown::HashSet; -use serde::{Deserialize, Serialize}; -use serde_json::Value; - -#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] -pub struct DynamicEvent { - pub event: String, - pub payload: Value, - queued_at: Option, -} - -impl DynamicEvent { - /// Returns `true` if the status is filtered out based on its language - pub fn language_not(&self, allowed_langs: &HashSet) -> bool { - const ALLOW: bool = false; - const REJECT: bool = true; - - if allowed_langs.is_empty() { - return ALLOW; // listing no allowed_langs results in allowing all languages - } - - match self.payload["language"].as_str() { - Some(toot_language) if allowed_langs.contains(toot_language) => ALLOW, - None => ALLOW, // If toot language is unknown, toot is always allowed - Some(empty) if empty == String::new() => ALLOW, - Some(_toot_language) => REJECT, - } - } - /// Returns `true` if the toot contained in this Event originated from a blocked domain, - /// is from an account that has blocked the current user, or if the User's list of - /// blocked/muted users includes a user involved in the toot. - /// - /// A user is involved in the toot if they: - /// * Are mentioned in this toot - /// * Wrote this toot - /// * Wrote a toot that this toot is replying to (if any) - /// * Wrote the toot that this toot is boosting (if any) - pub fn involves_any(&self, blocks: &Blocks) -> bool { - const ALLOW: bool = false; - const REJECT: bool = true; - let Blocks { - blocked_users, - blocking_users, - blocked_domains, - } = blocks; - - let id = self.payload["account"]["id"].as_str().expect("TODO"); - let username = self.payload["account"]["acct"].as_str().expect("TODO"); - - if self.involves(blocked_users) || blocking_users.contains(&id.parse().expect("TODO")) { - REJECT - } else { - let full_username = &username; - match full_username.split('@').nth(1) { - Some(originating_domain) if blocked_domains.contains(originating_domain) => REJECT, - Some(_) | None => ALLOW, // None means the local instance, which can't be blocked - } - } - } - - // involved_users = mentioned_users + author + replied-to user + boosted user - fn involves(&self, blocked_users: &HashSet) -> bool { - // mentions - let mentions = self.payload["mentions"].as_array().expect("TODO"); - let mut involved_users: HashSet = mentions - .iter() - .map(|mention| mention["id"].as_str().expect("TODO").parse().expect("TODO")) - .collect(); - - // author - let author_id = self.payload["account"]["id"].as_str().expect("TODO"); - involved_users.insert(author_id.parse::().expect("TODO")); - // replied-to user - let replied_to_user = self.payload["in_reply_to_account_id"].as_str(); - if let Some(user_id) = replied_to_user { - involved_users.insert(user_id.parse().expect("TODO")); - } - // boosted user - let id_of_boosted_user = self.payload["reblog"]["account"]["id"].as_str(); - if let Some(user_id) = id_of_boosted_user { - involved_users.insert(user_id.parse().expect("TODO")); - } - - !involved_users.is_disjoint(blocked_users) - } -} diff --git a/src/messages/event/dynamic_event/mod.rs b/src/messages/event/dynamic_event/mod.rs index adcc150..fbb8462 100644 --- a/src/messages/event/dynamic_event/mod.rs +++ b/src/messages/event/dynamic_event/mod.rs @@ -21,6 +21,7 @@ pub enum EventKind { Update(DynStatus), NonUpdate, } + impl Default for EventKind { fn default() -> Self { Self::NonUpdate @@ -35,10 +36,9 @@ pub struct DynStatus { pub mentioned_users: HashSet, pub replied_to_user: Option, pub boosted_user: Option, - pub payload: Value, } -type Result = std::result::Result; // TODO cut if not used more than once +type Result = std::result::Result; impl DynEvent { pub fn set_update(self) -> Result { @@ -65,7 +65,6 @@ impl DynStatus { mentioned_users: HashSet::new(), replied_to_user: Id::try_from(&payload["in_reply_to_account_id"]).ok(), boosted_user: Id::try_from(&payload["reblog"]["account"]["id"]).ok(), - payload, }) } /// Returns `true` if the status is filtered out based on its language diff --git a/src/messages/event/mod.rs b/src/messages/event/mod.rs index 187fe82..8583b98 100644 --- a/src/messages/event/mod.rs +++ b/src/messages/event/mod.rs @@ -8,7 +8,6 @@ pub use { err::EventErr, }; -use crate::log_fatal; use serde::Serialize; use std::{convert::TryFrom, string::String}; @@ -26,8 +25,7 @@ impl Event { Some(payload) => SendableEvent::WithPayload { event, payload }, None => SendableEvent::NoPayload { event }, }; - serde_json::to_string(&sendable_event) - .unwrap_or_else(|_| log_fatal!("Could not serialize `{:?}`", &sendable_event)) + serde_json::to_string(&sendable_event).expect("Guaranteed: SendableEvent is Serialize") } pub fn event_name(&self) -> String { @@ -47,7 +45,7 @@ impl Event { .. }) => "update", Self::Dynamic(DynEvent { event, .. }) => event, - Self::Ping => panic!("event_name() called on EventNotReady"), + Self::Ping => panic!("event_name() called on Ping"), }) } @@ -65,7 +63,7 @@ impl Event { FiltersChanged => None, }, Self::Dynamic(DynEvent { payload, .. }) => Some(payload.to_string()), - Self::Ping => panic!("payload() called on EventNotReady"), + Self::Ping => panic!("payload() called on Ping"), } } } @@ -105,6 +103,5 @@ enum SendableEvent<'a> { } fn escaped(content: T) -> String { - serde_json::to_string(&content) - .unwrap_or_else(|_| log_fatal!("Could not parse Event with: `{:?}`", &content)) + serde_json::to_string(&content).expect("Guaranteed by Serialize trait bound") } diff --git a/src/parse_client_request/subscription.rs b/src/parse_client_request/subscription.rs index 3ad632b..64b2a21 100644 --- a/src/parse_client_request/subscription.rs +++ b/src/parse_client_request/subscription.rs @@ -6,17 +6,15 @@ // #[cfg(not(test))] use super::postgres::PgPool; +use super::query; use super::query::Query; use crate::err::TimelineErr; -use crate::log_fatal; + use crate::messages::Id; + use hashbrown::HashSet; use lru::LruCache; -use uuid::Uuid; -use warp::reject::Rejection; - -use super::query; -use warp::{filters::BoxedFilter, path, Filter}; +use warp::{filters::BoxedFilter, path, reject::Rejection, Filter}; /// Helper macro to match on the first of any of the provided filters macro_rules! any_of { @@ -52,7 +50,6 @@ macro_rules! parse_sse_query { #[derive(Clone, Debug, PartialEq)] pub struct Subscription { - pub id: Uuid, pub timeline: Timeline, pub allowed_langs: HashSet, pub blocks: Blocks, @@ -70,7 +67,6 @@ pub struct Blocks { impl Default for Subscription { fn default() -> Self { Self { - id: Uuid::new_v4(), timeline: Timeline(Stream::Unset, Reach::Local, Content::Notification), allowed_langs: HashSet::new(), blocks: Blocks::default(), @@ -134,7 +130,6 @@ impl Subscription { }; Ok(Subscription { - id: Uuid::new_v4(), timeline, allowed_langs: user.allowed_langs, blocks: Blocks { @@ -183,30 +178,28 @@ impl Timeline { Self(Unset, Local, Notification) } - pub fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> String { + pub fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> Result { use {Content::*, Reach::*, Stream::*}; - match self { + Ok(match self { Timeline(Public, Federated, All) => "timeline:public".into(), Timeline(Public, Local, All) => "timeline:public:local".into(), Timeline(Public, Federated, Media) => "timeline:public:media".into(), Timeline(Public, Local, Media) => "timeline:public:local:media".into(), - Timeline(Hashtag(id), Federated, All) => format!( + Timeline(Hashtag(_id), Federated, All) => format!( "timeline:hashtag:{}", - hashtag.unwrap_or_else(|| log_fatal!("Did not supply a name for hashtag #{}", id)) + hashtag.ok_or_else(|| TimelineErr::MissingHashtag)? ), - Timeline(Hashtag(id), Local, All) => format!( + Timeline(Hashtag(_id), Local, All) => format!( "timeline:hashtag:{}:local", - hashtag.unwrap_or_else(|| log_fatal!("Did not supply a name for hashtag #{}", id)) + hashtag.ok_or_else(|| TimelineErr::MissingHashtag)? ), Timeline(User(id), Federated, All) => format!("timeline:{}", id), Timeline(User(id), Federated, Notification) => format!("timeline:{}:notification", id), Timeline(List(id), Federated, All) => format!("timeline:list:{}", id), Timeline(Direct(id), Federated, All) => format!("timeline:direct:{}", id), - Timeline(one, _two, _three) => { - log_fatal!("Supposedly impossible timeline reached: {:?}", one) - } - } + Timeline(_one, _two, _three) => Err(TimelineErr::InvalidInput)?, + }) } pub fn from_redis_text( @@ -226,10 +219,10 @@ impl Timeline { ["public", "local", "media"] => Timeline(Public, Local, Media), ["hashtag", tag] => Timeline(Hashtag(id_from_tag(tag)?), Federated, All), ["hashtag", tag, "local"] => Timeline(Hashtag(id_from_tag(tag)?), Local, All), - [id] => Timeline(User(id.parse().unwrap()), Federated, All), - [id, "notification"] => Timeline(User(id.parse().unwrap()), Federated, Notification), - ["list", id] => Timeline(List(id.parse().unwrap()), Federated, All), - ["direct", id] => Timeline(Direct(id.parse().unwrap()), Federated, All), + [id] => Timeline(User(id.parse()?), Federated, All), + [id, "notification"] => Timeline(User(id.parse()?), Federated, Notification), + ["list", id] => Timeline(List(id.parse()?), Federated, All), + ["direct", id] => Timeline(Direct(id.parse()?), Federated, All), // Other endpoints don't exist: [..] => Err(TimelineErr::InvalidInput)?, }) @@ -255,11 +248,11 @@ impl Timeline { "hashtag" => Timeline(Hashtag(id_from_hashtag()?), Federated, All), "hashtag:local" => Timeline(Hashtag(id_from_hashtag()?), Local, All), "user" => match user.scopes.contains(&Statuses) { - true => Timeline(User(*user.id), Federated, All), + true => Timeline(User(user.id), Federated, All), false => Err(custom("Error: Missing access token"))?, }, "user:notification" => match user.scopes.contains(&Statuses) { - true => Timeline(User(*user.id), Federated, Notification), + true => Timeline(User(user.id), Federated, Notification), false => Err(custom("Error: Missing access token"))?, }, "list" => match user.scopes.contains(&Lists) && user_owns_list() { @@ -280,7 +273,8 @@ impl Timeline { #[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)] pub enum Stream { - User(i64), + User(Id), + // TODO consider whether List, Direct, and Hashtag should all be `id::Id`s List(i64), Direct(i64), Hashtag(i64), diff --git a/src/redis_to_client_stream/event_stream.rs b/src/redis_to_client_stream/event_stream.rs index 6c44e47..abd13e0 100644 --- a/src/redis_to_client_stream/event_stream.rs +++ b/src/redis_to_client_stream/event_stream.rs @@ -133,7 +133,7 @@ impl SseStream { .filter_map(move |(timeline, event)| { if target_timeline == timeline { use crate::messages::{ - CheckedEvent, CheckedEvent::Update, Event::*, EventKind, + CheckedEvent, CheckedEvent::Update, DynEvent, Event::*, EventKind, }; use crate::parse_client_request::Stream::Public; @@ -148,13 +148,16 @@ impl SseStream { }, TypeSafe(non_update) => Self::reply_with(Event::TypeSafe(non_update)), Dynamic(dyn_event) => { - if let EventKind::Update(s) = dyn_event.kind.clone() { + if let EventKind::Update(s) = dyn_event.kind { match timeline { Timeline(Public, _, _) if s.language_not(&allowed_langs) => { None } _ if s.involves_any(&blocks) => None, - _ => Self::reply_with(Dynamic(dyn_event)), + _ => Self::reply_with(Dynamic(DynEvent { + kind: EventKind::Update(s), + ..dyn_event + })), } } else { None diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index fbc00a7..cb70387 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -59,7 +59,6 @@ impl Receiver { pub fn subscribe(&mut self, subscription: &Subscription) -> Result<()> { let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); - if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (tag, tl) { self.redis_connection.update_cache(hashtag, id); }; @@ -74,7 +73,6 @@ impl Receiver { if *number_of_subscriptions == 1 { self.redis_connection.send_cmd(Subscribe, &tl)? }; - log::info!("Started stream for {:?}", tl); Ok(()) } diff --git a/src/redis_to_client_stream/redis/redis_connection/err.rs b/src/redis_to_client_stream/redis/redis_connection/err.rs index ddaf3b4..bb702c2 100644 --- a/src/redis_to_client_stream/redis/redis_connection/err.rs +++ b/src/redis_to_client_stream/redis/redis_connection/err.rs @@ -1,3 +1,4 @@ +use crate::err::TimelineErr; use std::fmt; #[derive(Debug)] @@ -8,6 +9,7 @@ pub enum RedisConnErr { IncorrectPassword(String), MissingPassword, NotRedis(String), + TimelineErr(TimelineErr), } impl RedisConnErr { @@ -49,11 +51,18 @@ impl fmt::Display for RedisConnErr { REDIS_PORT environmental variables and try again.", addr ), + TimelineErr(inner) => format!("{}", inner), }; write!(f, "{}", msg) } } +impl From for RedisConnErr { + fn from(e: TimelineErr) -> RedisConnErr { + RedisConnErr::TimelineErr(e) + } +} + impl From for RedisConnErr { fn from(e: std::io::Error) -> RedisConnErr { RedisConnErr::UnknownRedisErr(e) diff --git a/src/redis_to_client_stream/redis/redis_connection/mod.rs b/src/redis_to_client_stream/redis/redis_connection/mod.rs index acc3415..a63c03c 100644 --- a/src/redis_to_client_stream/redis/redis_connection/mod.rs +++ b/src/redis_to_client_stream/redis/redis_connection/mod.rs @@ -166,8 +166,8 @@ impl RedisConn { Timeline(Stream::Hashtag(id), _, _) => self.tag_name_cache.get(id), _non_hashtag_timeline => None, }; - let tl = timeline.to_redis_raw_timeline(hashtag); + let tl = timeline.to_redis_raw_timeline(hashtag)?; let (primary_cmd, secondary_cmd) = match cmd { RedisCmd::Subscribe => ( format!("*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl),