From 19792d9484eddb04f8acb6f9a7f83b9f1b8e0ddf Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Fri, 3 Apr 2020 12:41:53 -0400 Subject: [PATCH] Handle non conforment events (#117) * Initial implementation of DynamicEvent * Restore early Event parsing --- Cargo.lock | 56 +-- Cargo.toml | 7 +- benches/parse_redis.rs | 48 +- old | 447 ++++++++++++++++++ src/messages/event/checked_event/account.rs | 49 ++ .../event/checked_event/announcement.rs | 19 + .../checked_event/announcement_reaction.rs | 10 + .../event/checked_event/conversation.rs | 11 + src/messages/event/checked_event/emoji.rs | 11 + src/messages/event/checked_event/mention.rs | 10 + src/messages/event/checked_event/mod.rs | 35 ++ .../event/checked_event/notification.rs | 23 + .../event/checked_event/status/application.rs | 11 + .../event/checked_event/status/attachment.rs | 25 + .../event/checked_event/status/card.rs | 28 ++ .../event/checked_event/status/mod.rs | 130 +++++ .../event/checked_event/status/poll.rs | 24 + src/messages/event/checked_event/tag.rs | 17 + .../event/checked_event/visibility.rs | 10 + src/messages/event/dynamic_event.rs | 88 ++++ src/messages/event/mod.rs | 94 ++++ src/messages/mod.rs | 431 +---------------- src/messages/test.rs | 46 ++ src/parse_client_request/mod.rs | 2 +- src/redis_to_client_stream/client_agent.rs | 37 +- src/redis_to_client_stream/event_stream.rs | 11 +- src/redis_to_client_stream/mod.rs | 3 + src/redis_to_client_stream/receiver/mod.rs | 15 +- .../redis/redis_connection/mod.rs | 4 +- .../redis/redis_msg/test.rs | 21 + 30 files changed, 1211 insertions(+), 512 deletions(-) create mode 100644 old create mode 100644 src/messages/event/checked_event/account.rs create mode 100644 src/messages/event/checked_event/announcement.rs create mode 100644 src/messages/event/checked_event/announcement_reaction.rs create mode 100644 src/messages/event/checked_event/conversation.rs create mode 100644 src/messages/event/checked_event/emoji.rs create mode 100644 src/messages/event/checked_event/mention.rs create mode 100644 src/messages/event/checked_event/mod.rs create mode 100644 src/messages/event/checked_event/notification.rs create mode 100644 src/messages/event/checked_event/status/application.rs create mode 100644 src/messages/event/checked_event/status/attachment.rs create mode 100644 src/messages/event/checked_event/status/card.rs create mode 100644 src/messages/event/checked_event/status/mod.rs create mode 100644 src/messages/event/checked_event/status/poll.rs create mode 100644 src/messages/event/checked_event/tag.rs create mode 100644 src/messages/event/checked_event/visibility.rs create mode 100644 src/messages/event/dynamic_event.rs create mode 100644 src/messages/event/mod.rs diff --git a/Cargo.lock b/Cargo.lock index ac9700f..106466a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,7 +111,7 @@ dependencies = [ "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "regex-automata 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -235,9 +235,9 @@ dependencies = [ "rand_os 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "rand_xoshiro 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)", "tinytemplate 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "walkdir 2.2.9 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -308,7 +308,7 @@ dependencies = [ "csv-core 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "ryu 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -453,9 +453,9 @@ dependencies = [ "r2d2 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)", "r2d2_postgres 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)", "strum 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "strum_macros 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1597,11 +1597,6 @@ dependencies = [ "semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "ryu" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "ryu" version = "1.0.0" @@ -1658,30 +1653,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "serde" -version = "1.0.91" +version = "1.0.105" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "serde_derive" -version = "1.0.91" +version = "1.0.105" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "serde_json" -version = "1.0.39" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", - "ryu 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "ryu 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1691,7 +1686,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "dtoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", "url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1886,8 +1881,8 @@ name = "tinytemplate" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2310,8 +2305,8 @@ dependencies = [ "mime_guess 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "multipart 0.16.1 (registry+https://github.com/rust-lang/crates.io-index)", "scoped-tls 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)", "serde_urlencoded 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2562,7 +2557,6 @@ dependencies = [ "checksum rent_to_own 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "05a51ad2b1c5c710fa89e6b1631068dab84ed687bc6a5fe061ad65da3d0c25b2" "checksum rustc-demangle 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "ccc78bfd5acd7bf3e89cffcf899e5cb1a52d6fafa8dec2739ad70c9577a57288" "checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" -"checksum ryu 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "b96a9549dc8d48f2c283938303c4b5a77aa29bfbc5b54b084fb1630408899a8f" "checksum ryu 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c92464b447c0ee8c4fb3824ecc8383b81717b9f1e74ba2e72540aef7b9f82997" "checksum safemem 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d2b08423011dae9a5ca23f07cf57dac3857f5c885d352b76f6d95f4aea9434d0" "checksum same-file 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "585e8ddcedc187886a30fa705c47985c3fa88d06624095856b36ca0b82ff4421" @@ -2572,9 +2566,9 @@ dependencies = [ "checksum scopeguard 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b42e15e59b18a828bbf5c58ea01debb36b9b096346de35d941dcb89009f24a0d" "checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" "checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" -"checksum serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)" = "a72e9b96fa45ce22a4bc23da3858dfccfd60acd28a25bcd328a98fdd6bea43fd" -"checksum serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)" = "101b495b109a3e3ca8c4cbe44cf62391527cdfb6ba15821c5ce80bcd5ea23f9f" -"checksum serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)" = "5a23aa71d4a4d43fdbfaac00eff68ba8a06a51759a89ac3304323e800c4dd40d" +"checksum serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)" = "e707fbbf255b8fc8c3b99abb91e7257a622caeb20a9818cbadbeeede4e0932ff" +"checksum serde_derive 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)" = "ac5d00fc561ba2724df6758a17de23df5914f20e41cb00f94d5b7ae42fffaff8" +"checksum serde_json 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)" = "78a7a12c167809363ec3bd7329fc0a3369056996de43c4b37ef3cd54a6ce4867" "checksum serde_urlencoded 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97" "checksum sha-1 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "23962131a91661d643c98940b20fcaffe62d776a823247be80a48fcb8b6fce68" "checksum sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b4d8bfd0e469f417657573d8451fb33d16cfe0989359b93baf3a1ffc639543d" diff --git a/Cargo.toml b/Cargo.toml index 6226c65..a463b92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.6.7" +version = "0.6.8" authors = ["Daniel Long Sockwell "] edition = "2018" @@ -10,8 +10,8 @@ log = { version = "0.4.6", features = ["release_max_level_info"] } futures = "0.1.26" tokio = "0.1.19" warp = { git = "https://github.com/seanmonstar/warp.git"} -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.39" +serde = { version = "1.0.105", features = ["derive"] } +serde_json = "1.0.50" serde_derive = "1.0.90" pretty_env_logger = "0.3.0" postgres = "0.17.0" @@ -36,4 +36,5 @@ harness = false [features] default = [ "production" ] +bench = [] production = [] diff --git a/benches/parse_redis.rs b/benches/parse_redis.rs index 6e0badf..2ad4e61 100644 --- a/benches/parse_redis.rs +++ b/benches/parse_redis.rs @@ -1,8 +1,8 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use flodgatt::{ - messages::Event, + messages::*, parse_client_request::{Content::*, Reach::*, Stream::*, Timeline}, - redis_to_client_stream::redis_msg::{RedisMsg, RedisParseOutput}, + redis_to_client_stream::{RedisMsg, RedisParseOutput}, }; use lru::LruCache; use std::convert::TryFrom; @@ -17,30 +17,54 @@ fn parse_long_redis_input<'a>(input: &'a str) -> RedisMsg<'a> { } fn parse_to_timeline(msg: RedisMsg) -> Timeline { - let tl = Timeline::from_redis_text(msg.timeline_txt, &mut LruCache::new(1000), &None).unwrap(); + let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..]; + let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap(); assert_eq!(tl, Timeline(User(1), Federated, All)); tl } +fn parse_to_checked_event(msg: RedisMsg) -> Event { + Event::TypeSafe(serde_json::from_str(msg.event_txt).unwrap()) +} -fn parse_to_event(msg: RedisMsg) -> Event { - serde_json::from_str(msg.event_txt).unwrap() +fn parse_to_dyn_event(msg: RedisMsg) -> String { + let event: Event = Event::Dynamic(serde_json::from_str(msg.event_txt).unwrap()); + event.to_json_string() +} + +fn redis_msg_to_event_string(msg: RedisMsg) -> String { + msg.event_txt.to_string() +} + +fn string_to_checked_event(event_txt: &String) -> Event { + Event::TypeSafe(serde_json::from_str(event_txt).unwrap()) } fn criterion_benchmark(c: &mut Criterion) { let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS; let mut group = c.benchmark_group("Parse redis RESP array"); - group.bench_function("parse redis input to RedisMsg", |b| { - b.iter(|| black_box(parse_long_redis_input(input))) - }); + // group.bench_function("parse redis input to RedisMsg", |b| { + // b.iter(|| black_box(parse_long_redis_input(input))) + // }); let msg = parse_long_redis_input(input); - group.bench_function("parse RedisMsg to Timeline", |b| { - b.iter(|| black_box(parse_to_timeline(msg.clone()))) + // group.bench_function("parse RedisMsg to Timeline", |b| { + // b.iter(|| black_box(parse_to_timeline(msg.clone()))) + // }); + + group.bench_function("parse RedisMsg -> DynamicEvent -> JSON string", |b| { + b.iter(|| black_box(parse_to_dyn_event(msg.clone()))) }); - group.bench_function("parse RedisMsg to Event", |b| { - b.iter(|| black_box(parse_to_event(msg.clone()))) + group.bench_function("parse RedisMsg -> CheckedEvent", |b| { + b.iter(|| black_box(parse_to_checked_event(msg.clone()))) + }); + + group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| { + b.iter(|| { + let txt = black_box(redis_msg_to_event_string(msg.clone())); + black_box(string_to_checked_event(&txt)); + }) }); } diff --git a/old b/old new file mode 100644 index 0000000..bc8a053 --- /dev/null +++ b/old @@ -0,0 +1,447 @@ +use crate::log_fatal; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::boxed::Box; +use std::{collections::HashSet, string::String}; + +pub enum Event { + TypeSafe(CheckedEvent), + Dynamic(DynamicEvent), +} + +impl Event { + pub fn to_json_string(&self) -> String { + let event = &self.event_name(); + let sendable_event = match self.payload() { + Some(payload) => SendableEvent::WithPayload { event, payload }, + None => SendableEvent::NoPayload { event }, + }; + serde_json::to_string(&sendable_event) + .unwrap_or_else(|_| log_fatal!("Could not serialize `{:?}`", &sendable_event)) + } + + pub fn event_name(&self) -> String { + String::from(match self { + Self::TypeSafe(checked) => match checked { + CheckedEvent::Update { .. } => "update", + CheckedEvent::Notification { .. } => "notification", + CheckedEvent::Delete { .. } => "delete", + CheckedEvent::Announcement { .. } => "announcement", + CheckedEvent::AnnouncementReaction { .. } => "announcement.reaction", + CheckedEvent::AnnouncementDelete { .. } => "announcement.delete", + CheckedEvent::Conversation { .. } => "conversation", + CheckedEvent::FiltersChanged => "filters_changed", + }, + Self::Dynamic(dyn_event) => &dyn_event.event, + }) + } + + pub fn payload(&self) -> Option { + use CheckedEvent::*; + match self { + Self::TypeSafe(checked) => match checked { + Update { payload, .. } => Some(escaped(payload)), + Notification { payload, .. } => Some(escaped(payload)), + Delete { payload, .. } => Some(payload.0.clone()), + Announcement { payload, .. } => Some(escaped(payload)), + AnnouncementReaction { payload, .. } => Some(escaped(payload)), + AnnouncementDelete { payload, .. } => Some(payload.0.clone()), + Conversation { payload, .. } => Some(escaped(payload)), + FiltersChanged => None, + }, + Self::Dynamic(dyn_event) => Some(dyn_event.payload.to_string()), + } + } +} + +#[derive(Deserialize, Debug, Clone, PartialEq)] +pub struct DynamicEvent { + pub event: String, + payload: Value, + queued_at: Option, +} + +#[serde(rename_all = "snake_case", tag = "event", deny_unknown_fields)] +#[rustfmt::skip] +#[derive(Deserialize, Debug, Clone, PartialEq)] +pub enum CheckedEvent { + Update { payload: Status, queued_at: Option }, + Notification { payload: Notification }, + Delete { payload: DeletedId }, + FiltersChanged, + Announcement { payload: Announcement }, + #[serde(rename(serialize = "announcement.reaction", deserialize = "announcement.reaction"))] + AnnouncementReaction { payload: AnnouncementReaction }, + #[serde(rename(serialize = "announcement.delete", deserialize = "announcement.delete"))] + AnnouncementDelete { payload: DeletedId }, + Conversation { payload: Conversation, queued_at: Option }, +} + +#[derive(Serialize, Debug, Clone)] +#[serde(untagged)] +pub enum SendableEvent<'a> { + WithPayload { event: &'a str, payload: String }, + NoPayload { event: &'a str }, +} + +fn escaped(content: T) -> String { + serde_json::to_string(&content) + .unwrap_or_else(|_| log_fatal!("Could not parse Event with: `{:?}`", &content)) +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Conversation { + id: String, + accounts: Vec, + unread: bool, + last_status: Option, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct DeletedId(String); + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Status { + id: String, + uri: String, + created_at: String, + account: Account, + content: String, + visibility: Visibility, + sensitive: bool, + spoiler_text: String, + media_attachments: Vec, + application: Option, // Should be non-optional? + mentions: Vec, + tags: Vec, + emojis: Vec, + reblogs_count: i64, + favourites_count: i64, + replies_count: i64, + url: Option, + in_reply_to_id: Option, + in_reply_to_account_id: Option, + reblog: Option>, + poll: Option, + card: Option, + language: Option, + text: Option, + // ↓↓↓ Only for authorized users + favourited: Option, + reblogged: Option, + muted: Option, + bookmarked: Option, + pinned: Option, +} + +#[serde(rename_all = "lowercase", deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub enum Visibility { + Public, + Unlisted, + Private, + Direct, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Account { + id: String, + username: String, + acct: String, + url: String, + display_name: String, + note: String, + avatar: String, + avatar_static: String, + header: String, + header_static: String, + locked: bool, + emojis: Vec, + discoverable: Option, // Shouldn't be option? + created_at: String, + statuses_count: i64, + followers_count: i64, + following_count: i64, + moved: Option>, + fields: Option>, + bot: Option, + source: Option, + group: Option, // undocumented + last_status_at: Option, // undocumented +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Attachment { + id: String, + r#type: AttachmentType, + url: String, + preview_url: String, + remote_url: Option, + text_url: Option, + meta: Option, + description: Option, + blurhash: Option, +} + +#[serde(rename_all = "lowercase", deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +enum AttachmentType { + Unknown, + Image, + Gifv, + Video, + Audio, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Application { + name: String, + website: Option, + vapid_key: Option, + client_id: Option, + client_secret: Option, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Emoji { + shortcode: String, + url: String, + static_url: String, + visible_in_picker: bool, + category: Option, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Field { + name: String, + value: String, + verified_at: Option, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Source { + note: String, + fields: Vec, + privacy: Option, + sensitive: bool, + language: String, + follow_requests_count: i64, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Mention { + id: String, + username: String, + acct: String, + url: String, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Tag { + name: String, + url: String, + history: Option>, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Poll { + id: String, + expires_at: String, + expired: bool, + multiple: bool, + votes_count: i64, + voters_count: Option, + voted: Option, + own_votes: Option>, + options: Vec, + emojis: Vec, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct PollOptions { + title: String, + votes_count: Option, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Card { + url: String, + title: String, + description: String, + r#type: CardType, + author_name: Option, + author_url: Option, + provider_name: Option, + provider_url: Option, + html: Option, + width: Option, + height: Option, + image: Option, + embed_url: Option, +} + +#[serde(rename_all = "lowercase", deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +enum CardType { + Link, + Photo, + Video, + Rich, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct History { + day: String, + uses: String, + accounts: String, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Notification { + id: String, + r#type: NotificationType, + created_at: String, + account: Account, + status: Option, +} + +#[serde(rename_all = "snake_case", deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +enum NotificationType { + Follow, + FollowRequest, // Undocumented + Mention, + Reblog, + Favourite, + Poll, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Announcement { + // Fully undocumented + id: String, + tags: Vec, + all_day: bool, + content: String, + emojis: Vec, + starts_at: Option, + ends_at: Option, + published_at: String, + updated_at: String, + mentions: Vec, + reactions: Vec, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct AnnouncementReaction { + #[serde(skip_serializing_if = "Option::is_none")] + announcement_id: Option, + count: i64, + name: String, +} + +impl Status { + /// Returns `true` if the status is filtered out based on its language + pub fn language_not_allowed(&self, allowed_langs: &HashSet) -> bool { + const ALLOW: bool = false; + const REJECT: bool = true; + + let reject_and_maybe_log = |toot_language| { + log::info!("Filtering out toot from `{}`", &self.account.acct); + log::info!("Toot language: `{}`", toot_language); + log::info!("Recipient's allowed languages: `{:?}`", allowed_langs); + REJECT + }; + if allowed_langs.is_empty() { + return ALLOW; // listing no allowed_langs results in allowing all languages + } + + match self.language.as_ref() { + Some(toot_language) if allowed_langs.contains(toot_language) => ALLOW, + None => ALLOW, // If toot language is unknown, toot is always allowed + Some(empty) if empty == &String::new() => ALLOW, + Some(toot_language) => reject_and_maybe_log(toot_language), + } + } + + /// Returns `true` if this toot originated from a domain the User has blocked. + pub fn from_blocked_domain(&self, blocked_domains: &HashSet) -> bool { + let full_username = &self.account.acct; + + match full_username.split('@').nth(1) { + Some(originating_domain) => blocked_domains.contains(originating_domain), + None => false, // None means the user is on the local instance, which can't be blocked + } + } + + /// Returns `true` if the Status is from an account that has blocked the current user. + pub fn from_blocking_user(&self, blocking_users: &HashSet) -> bool { + const ALLOW: bool = false; + const REJECT: bool = true; + let err = |_| log_fatal!("Could not process `account.id` in {:?}", &self); + + if blocking_users.contains(&self.account.id.parse().unwrap_or_else(err)) { + REJECT + } else { + ALLOW + } + } + + /// Returns `true` if the User's list of blocked and muted users includes a user + /// involved in this toot. + /// + /// A user is involved if they: + /// * Are mentioned in this toot + /// * Wrote this toot + /// * Wrote a toot that this toot is replying to (if any) + /// * Wrote the toot that this toot is boosting (if any) + pub fn involves_blocked_user(&self, blocked_users: &HashSet) -> bool { + const ALLOW: bool = false; + const REJECT: bool = true; + let err = |_| log_fatal!("Could not process an `id` field in {:?}", &self); + + // involved_users = mentioned_users + author + replied-to user + boosted user + let mut involved_users: HashSet = self + .mentions + .iter() + .map(|mention| mention.id.parse().unwrap_or_else(err)) + .collect(); + + involved_users.insert(self.account.id.parse::().unwrap_or_else(err)); + + if let Some(replied_to_account_id) = self.in_reply_to_account_id.clone() { + involved_users.insert(replied_to_account_id.parse().unwrap_or_else(err)); + } + + if let Some(boosted_status) = self.reblog.clone() { + involved_users.insert(boosted_status.account.id.parse().unwrap_or_else(err)); + } + + if involved_users.is_disjoint(blocked_users) { + ALLOW + } else { + REJECT + } + } +} + +#[cfg(test)] +mod test; diff --git a/src/messages/event/checked_event/account.rs b/src/messages/event/checked_event/account.rs new file mode 100644 index 0000000..f86ef6c --- /dev/null +++ b/src/messages/event/checked_event/account.rs @@ -0,0 +1,49 @@ +use super::{emoji::Emoji, visibility::Visibility}; +use serde::{Deserialize, Serialize}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub(super) struct Account { + pub id: String, + username: String, + pub acct: String, + url: String, + display_name: String, + note: String, + avatar: String, + avatar_static: String, + header: String, + header_static: String, + locked: bool, + emojis: Vec, + discoverable: Option, // Shouldn't be option? + created_at: String, + statuses_count: i64, + followers_count: i64, + following_count: i64, + moved: Option>, + fields: Option>, + bot: Option, + source: Option, + group: Option, // undocumented + last_status_at: Option, // undocumented +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Field { + name: String, + value: String, + verified_at: Option, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Source { + note: String, + fields: Vec, + privacy: Option, + sensitive: bool, + language: String, + follow_requests_count: i64, +} diff --git a/src/messages/event/checked_event/announcement.rs b/src/messages/event/checked_event/announcement.rs new file mode 100644 index 0000000..4ac88b9 --- /dev/null +++ b/src/messages/event/checked_event/announcement.rs @@ -0,0 +1,19 @@ +use super::{emoji::Emoji, mention::Mention, tag::Tag, AnnouncementReaction}; +use serde::{Deserialize, Serialize}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Announcement { + // Fully undocumented + id: String, + tags: Vec, + all_day: bool, + content: String, + emojis: Vec, + starts_at: Option, + ends_at: Option, + published_at: String, + updated_at: String, + mentions: Vec, + reactions: Vec, +} diff --git a/src/messages/event/checked_event/announcement_reaction.rs b/src/messages/event/checked_event/announcement_reaction.rs new file mode 100644 index 0000000..b17b0be --- /dev/null +++ b/src/messages/event/checked_event/announcement_reaction.rs @@ -0,0 +1,10 @@ +use serde::{Deserialize, Serialize}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct AnnouncementReaction { + #[serde(skip_serializing_if = "Option::is_none")] + announcement_id: Option, + count: i64, + name: String, +} diff --git a/src/messages/event/checked_event/conversation.rs b/src/messages/event/checked_event/conversation.rs new file mode 100644 index 0000000..5b9fde9 --- /dev/null +++ b/src/messages/event/checked_event/conversation.rs @@ -0,0 +1,11 @@ +use super::{account::Account, status::Status}; +use serde::{Deserialize, Serialize}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Conversation { + id: String, + accounts: Vec, + unread: bool, + last_status: Option, +} diff --git a/src/messages/event/checked_event/emoji.rs b/src/messages/event/checked_event/emoji.rs new file mode 100644 index 0000000..836f341 --- /dev/null +++ b/src/messages/event/checked_event/emoji.rs @@ -0,0 +1,11 @@ +use serde::{Deserialize, Serialize}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub(super) struct Emoji { + shortcode: String, + url: String, + static_url: String, + visible_in_picker: bool, + category: Option, +} diff --git a/src/messages/event/checked_event/mention.rs b/src/messages/event/checked_event/mention.rs new file mode 100644 index 0000000..5f9a876 --- /dev/null +++ b/src/messages/event/checked_event/mention.rs @@ -0,0 +1,10 @@ +use serde::{Deserialize, Serialize}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub(super) struct Mention { + pub id: String, + username: String, + acct: String, + url: String, +} diff --git a/src/messages/event/checked_event/mod.rs b/src/messages/event/checked_event/mod.rs new file mode 100644 index 0000000..60c7513 --- /dev/null +++ b/src/messages/event/checked_event/mod.rs @@ -0,0 +1,35 @@ +mod account; + +mod announcement; +mod announcement_reaction; +mod conversation; +mod emoji; +mod mention; +mod notification; +mod status; +mod tag; +mod visibility; + +pub use announcement::Announcement; +pub(in crate::messages::event) use announcement_reaction::AnnouncementReaction; +pub use conversation::Conversation; +pub use notification::Notification; +pub use status::Status; + +use serde::Deserialize; + +#[serde(rename_all = "snake_case", tag = "event", deny_unknown_fields)] +#[rustfmt::skip] +#[derive(Deserialize, Debug, Clone, PartialEq)] +pub enum CheckedEvent { + Update { payload: Status, queued_at: Option }, + Notification { payload: Notification }, + Delete { payload: String }, + FiltersChanged, + Announcement { payload: Announcement }, + #[serde(rename(serialize = "announcement.reaction", deserialize = "announcement.reaction"))] + AnnouncementReaction { payload: AnnouncementReaction }, + #[serde(rename(serialize = "announcement.delete", deserialize = "announcement.delete"))] + AnnouncementDelete { payload: String }, + Conversation { payload: Conversation, queued_at: Option }, +} diff --git a/src/messages/event/checked_event/notification.rs b/src/messages/event/checked_event/notification.rs new file mode 100644 index 0000000..9f70f90 --- /dev/null +++ b/src/messages/event/checked_event/notification.rs @@ -0,0 +1,23 @@ +use super::{account::Account, status::Status}; +use serde::{Deserialize, Serialize}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Notification { + id: String, + r#type: NotificationType, + created_at: String, + account: Account, + status: Option, +} + +#[serde(rename_all = "snake_case", deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +enum NotificationType { + Follow, + FollowRequest, // Undocumented + Mention, + Reblog, + Favourite, + Poll, +} diff --git a/src/messages/event/checked_event/status/application.rs b/src/messages/event/checked_event/status/application.rs new file mode 100644 index 0000000..1fd2f88 --- /dev/null +++ b/src/messages/event/checked_event/status/application.rs @@ -0,0 +1,11 @@ +use serde::{Deserialize, Serialize}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub(super) struct Application { + name: String, + website: Option, + vapid_key: Option, + client_id: Option, + client_secret: Option, +} diff --git a/src/messages/event/checked_event/status/attachment.rs b/src/messages/event/checked_event/status/attachment.rs new file mode 100644 index 0000000..bd76c14 --- /dev/null +++ b/src/messages/event/checked_event/status/attachment.rs @@ -0,0 +1,25 @@ +use serde::{Deserialize, Serialize}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub(super) struct Attachment { + id: String, + r#type: AttachmentType, + url: String, + preview_url: String, + remote_url: Option, + text_url: Option, + meta: Option, + description: Option, + blurhash: Option, +} + +#[serde(rename_all = "lowercase", deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +enum AttachmentType { + Unknown, + Image, + Gifv, + Video, + Audio, +} diff --git a/src/messages/event/checked_event/status/card.rs b/src/messages/event/checked_event/status/card.rs new file mode 100644 index 0000000..2a5667f --- /dev/null +++ b/src/messages/event/checked_event/status/card.rs @@ -0,0 +1,28 @@ +use serde::{Deserialize, Serialize}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub(super) struct Card { + url: String, + title: String, + description: String, + r#type: CardType, + author_name: Option, + author_url: Option, + provider_name: Option, + provider_url: Option, + html: Option, + width: Option, + height: Option, + image: Option, + embed_url: Option, +} + +#[serde(rename_all = "lowercase", deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +enum CardType { + Link, + Photo, + Video, + Rich, +} diff --git a/src/messages/event/checked_event/status/mod.rs b/src/messages/event/checked_event/status/mod.rs new file mode 100644 index 0000000..427bc17 --- /dev/null +++ b/src/messages/event/checked_event/status/mod.rs @@ -0,0 +1,130 @@ +mod application; +mod attachment; +mod card; +mod poll; + +use super::{account::Account, emoji::Emoji, mention::Mention, tag::Tag, visibility::Visibility}; +use {application::Application, attachment::Attachment, card::Card, poll::Poll}; + +use crate::log_fatal; +use crate::parse_client_request::Blocks; + +use serde::{Deserialize, Serialize}; +use std::boxed::Box; +use std::{collections::HashSet, string::String}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Status { + id: String, + uri: String, + created_at: String, + account: Account, + content: String, + visibility: Visibility, + sensitive: bool, + spoiler_text: String, + media_attachments: Vec, + application: Option, // Should be non-optional? + mentions: Vec, + tags: Vec, + emojis: Vec, + reblogs_count: i64, + favourites_count: i64, + replies_count: i64, + url: Option, + in_reply_to_id: Option, + in_reply_to_account_id: Option, + reblog: Option>, + poll: Option, + card: Option, + language: Option, + text: Option, + // ↓↓↓ Only for authorized users + favourited: Option, + reblogged: Option, + muted: Option, + bookmarked: Option, + pinned: Option, +} + +impl Status { + /// Returns `true` if the status is filtered out based on its language + pub fn language_not(&self, allowed_langs: &HashSet) -> bool { + const ALLOW: bool = false; + const REJECT: bool = true; + + let reject_and_maybe_log = |toot_language| { + log::info!("Filtering out toot from `{}`", &self.account.acct); + log::info!("Toot language: `{}`", toot_language); + log::info!("Recipient's allowed languages: `{:?}`", allowed_langs); + REJECT + }; + if allowed_langs.is_empty() { + return ALLOW; // listing no allowed_langs results in allowing all languages + } + + match self.language.as_ref() { + Some(toot_language) if allowed_langs.contains(toot_language) => ALLOW, + None => ALLOW, // If toot language is unknown, toot is always allowed + Some(empty) if empty == &String::new() => ALLOW, + Some(toot_language) => reject_and_maybe_log(toot_language), + } + } + + /// Returns `true` if the Status originated from a blocked domain, is from an account + /// that has blocked the current user, or if the User's list of blocked/muted users + /// includes a user involved in the Status. + /// + /// A user is involved in the Status/toot if they: + /// * Are mentioned in this toot + /// * Wrote this toot + /// * Wrote a toot that this toot is replying to (if any) + /// * Wrote the toot that this toot is boosting (if any) + pub fn involves_any(&self, blocks: &Blocks) -> bool { + const ALLOW: bool = false; + const REJECT: bool = true; + + let Blocks { + blocked_users, + blocking_users, + blocked_domains, + } = blocks; + + if !self.calculate_involved_users().is_disjoint(blocked_users) { + REJECT + } else if blocking_users.contains(&self.account.id.parse().expect("TODO")) { + REJECT + } else { + let full_username = &self.account.acct; + match full_username.split('@').nth(1) { + Some(originating_domain) if blocked_domains.contains(originating_domain) => REJECT, + Some(_) | None => ALLOW, // None means the local instance, which can't be blocked + } + } + } + + fn calculate_involved_users(&self) -> HashSet { + // TODO replace vvvv with error handling + let err = |_| log_fatal!("Could not process an `id` field in {:?}", &self); + + // involved_users = mentioned_users + author + replied-to user + boosted user + let mut involved_users: HashSet = self + .mentions + .iter() + .map(|mention| mention.id.parse().unwrap_or_else(err)) + .collect(); + + // author + involved_users.insert(self.account.id.parse::().unwrap_or_else(err)); + // replied-to user + if let Some(user_id) = self.in_reply_to_account_id.clone() { + involved_users.insert(user_id.parse().unwrap_or_else(err)); + } + // boosted user + if let Some(boosted_status) = self.reblog.clone() { + involved_users.insert(boosted_status.account.id.parse().unwrap_or_else(err)); + } + involved_users + } +} diff --git a/src/messages/event/checked_event/status/poll.rs b/src/messages/event/checked_event/status/poll.rs new file mode 100644 index 0000000..908358e --- /dev/null +++ b/src/messages/event/checked_event/status/poll.rs @@ -0,0 +1,24 @@ +use super::super::emoji::Emoji; +use serde::{Deserialize, Serialize}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub(super) struct Poll { + id: String, + expires_at: String, + expired: bool, + multiple: bool, + votes_count: i64, + voters_count: Option, + voted: Option, + own_votes: Option>, + options: Vec, + emojis: Vec, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct PollOptions { + title: String, + votes_count: Option, +} diff --git a/src/messages/event/checked_event/tag.rs b/src/messages/event/checked_event/tag.rs new file mode 100644 index 0000000..99fe927 --- /dev/null +++ b/src/messages/event/checked_event/tag.rs @@ -0,0 +1,17 @@ +use serde::{Deserialize, Serialize}; + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub(super) struct Tag { + name: String, + url: String, + history: Option>, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct History { + day: String, + uses: String, + accounts: String, +} diff --git a/src/messages/event/checked_event/visibility.rs b/src/messages/event/checked_event/visibility.rs new file mode 100644 index 0000000..2c3efba --- /dev/null +++ b/src/messages/event/checked_event/visibility.rs @@ -0,0 +1,10 @@ +use serde::{Deserialize, Serialize}; + +#[serde(rename_all = "lowercase", deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub(super) enum Visibility { + Public, + Unlisted, + Private, + Direct, +} diff --git a/src/messages/event/dynamic_event.rs b/src/messages/event/dynamic_event.rs new file mode 100644 index 0000000..45ed3ba --- /dev/null +++ b/src/messages/event/dynamic_event.rs @@ -0,0 +1,88 @@ +use crate::parse_client_request::Blocks; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashSet; + +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] +pub struct DynamicEvent { + pub event: String, + pub payload: Value, + queued_at: Option, +} + +impl DynamicEvent { + /// Returns `true` if the status is filtered out based on its language + pub fn language_not(&self, allowed_langs: &HashSet) -> bool { + const ALLOW: bool = false; + const REJECT: bool = true; + + if allowed_langs.is_empty() { + return ALLOW; // listing no allowed_langs results in allowing all languages + } + + match self.payload["language"].as_str() { + Some(toot_language) if allowed_langs.contains(toot_language) => ALLOW, + None => ALLOW, // If toot language is unknown, toot is always allowed + Some(empty) if empty == &String::new() => ALLOW, + Some(_toot_language) => REJECT, + } + } + /// Returns `true` if the toot contained in this Event originated from a blocked domain, + /// is from an account that has blocked the current user, or if the User's list of + /// blocked/muted users includes a user involved in the toot. + /// + /// A user is involved in the toot if they: + /// * Are mentioned in this toot + /// * Wrote this toot + /// * Wrote a toot that this toot is replying to (if any) + /// * Wrote the toot that this toot is boosting (if any) + pub fn involves_any(&self, blocks: &Blocks) -> bool { + const ALLOW: bool = false; + const REJECT: bool = true; + let Blocks { + blocked_users, + blocking_users, + blocked_domains, + } = blocks; + + let user_id = self.payload["account"]["id"].as_str().expect("TODO"); + let username = self.payload["account"]["acct"].as_str().expect("TODO"); + + if !self.calculate_involved_users().is_disjoint(blocked_users) { + REJECT + } else if blocking_users.contains(&user_id.parse().expect("TODO")) { + REJECT + } else { + let full_username = &username; + match full_username.split('@').nth(1) { + Some(originating_domain) if blocked_domains.contains(originating_domain) => REJECT, + Some(_) | None => ALLOW, // None means the local instance, which can't be blocked + } + } + } + fn calculate_involved_users(&self) -> HashSet { + let mentions = self.payload["mentions"].as_array().expect("TODO"); + // involved_users = mentioned_users + author + replied-to user + boosted user + let mut involved_users: HashSet = mentions + .iter() + .map(|mention| mention["id"].as_str().expect("TODO").parse().expect("TODO")) + .collect(); + + // author + let author_id = self.payload["account"]["id"].as_str().expect("TODO"); + involved_users.insert(author_id.parse::().expect("TODO")); + // replied-to user + let replied_to_user = self.payload["in_reply_to_account_id"].as_str(); + if let Some(user_id) = replied_to_user.clone() { + involved_users.insert(user_id.parse().expect("TODO")); + } + // boosted user + + let id_of_boosted_user = self.payload["reblog"]["account"]["id"] + .as_str() + .expect("TODO"); + involved_users.insert(id_of_boosted_user.parse().expect("TODO")); + + involved_users + } +} diff --git a/src/messages/event/mod.rs b/src/messages/event/mod.rs new file mode 100644 index 0000000..133ffa2 --- /dev/null +++ b/src/messages/event/mod.rs @@ -0,0 +1,94 @@ +mod checked_event; +mod dynamic_event; + +pub use {checked_event::CheckedEvent, dynamic_event::DynamicEvent}; + +use crate::log_fatal; +use serde::Serialize; +use std::string::String; + +#[derive(Debug, Clone)] +pub enum Event { + TypeSafe(CheckedEvent), + Dynamic(DynamicEvent), +} + +impl Event { + pub fn to_json_string(&self) -> String { + let event = &self.event_name(); + let sendable_event = match self.payload() { + Some(payload) => SendableEvent::WithPayload { event, payload }, + None => SendableEvent::NoPayload { event }, + }; + serde_json::to_string(&sendable_event) + .unwrap_or_else(|_| log_fatal!("Could not serialize `{:?}`", &sendable_event)) + } + + pub fn event_name(&self) -> String { + String::from(match self { + Self::TypeSafe(checked) => match checked { + CheckedEvent::Update { .. } => "update", + CheckedEvent::Notification { .. } => "notification", + CheckedEvent::Delete { .. } => "delete", + CheckedEvent::Announcement { .. } => "announcement", + CheckedEvent::AnnouncementReaction { .. } => "announcement.reaction", + CheckedEvent::AnnouncementDelete { .. } => "announcement.delete", + CheckedEvent::Conversation { .. } => "conversation", + CheckedEvent::FiltersChanged => "filters_changed", + }, + Self::Dynamic(dyn_event) => &dyn_event.event, + }) + } + + pub fn payload(&self) -> Option { + use CheckedEvent::*; + match self { + Self::TypeSafe(checked) => match checked { + Update { payload, .. } => Some(escaped(payload)), + Notification { payload, .. } => Some(escaped(payload)), + Delete { payload, .. } => Some(payload.clone()), + Announcement { payload, .. } => Some(escaped(payload)), + AnnouncementReaction { payload, .. } => Some(escaped(payload)), + AnnouncementDelete { payload, .. } => Some(payload.clone()), + Conversation { payload, .. } => Some(escaped(payload)), + FiltersChanged => None, + }, + Self::Dynamic(dyn_event) => Some(dyn_event.payload.to_string()), + } + } +} + +impl From for Event { + fn from(event_txt: String) -> Event { + Event::from(event_txt.as_str()) + } +} +impl From<&str> for Event { + fn from(event_txt: &str) -> Event { + match serde_json::from_str(event_txt) { + Ok(checked_event) => Event::TypeSafe(checked_event), + Err(e) => { + log::error!( + "Error safely parsing Redis input. Mastodon and Flodgatt do not \ + strictly conform to the same version of Mastodon's API.\n{}\ + Forwarding Redis payload without type checking it.", + e + ); + let dyn_event: DynamicEvent = serde_json::from_str(&event_txt).expect("TODO"); + Event::Dynamic(dyn_event) + } + } + } +} + +#[derive(Serialize, Debug, Clone)] +#[serde(untagged)] +enum SendableEvent<'a> { + WithPayload { event: &'a str, payload: String }, + NoPayload { event: &'a str }, +} + +fn escaped(content: T) -> String { + serde_json::to_string(&content) + .unwrap_or_else(|_| log_fatal!("Could not parse Event with: `{:?}`", &content)) +} diff --git a/src/messages/mod.rs b/src/messages/mod.rs index 813b771..cab529c 100644 --- a/src/messages/mod.rs +++ b/src/messages/mod.rs @@ -1,430 +1,3 @@ -use crate::log_fatal; -use serde::{Deserialize, Serialize}; -use serde_json; -use std::boxed::Box; -use std::{collections::HashSet, string::String}; +mod event; -#[serde(rename_all = "snake_case", tag = "event", deny_unknown_fields)] -#[rustfmt::skip] -#[derive(Deserialize, Debug, Clone, PartialEq)] -pub enum Event { - Update { payload: Status, queued_at: Option }, - Notification { payload: Notification }, - Delete { payload: DeletedId }, - FiltersChanged, - Announcement { payload: Announcement }, - #[serde(rename(serialize = "announcement.reaction", deserialize = "announcement.reaction"))] - AnnouncementReaction { payload: AnnouncementReaction }, - #[serde(rename(serialize = "announcement.delete", deserialize = "announcement.delete"))] - AnnouncementDelete { payload: DeletedId }, - Conversation { payload: Conversation, queued_at: Option }, -} - -#[derive(Serialize, Debug, Clone)] -#[serde(untagged)] -pub enum SendableEvent<'a> { - WithPayload { event: &'a str, payload: String }, - NoPayload { event: &'a str }, -} -#[rustfmt::skip] -impl Event { - pub fn event_name(&self) -> String { - use Event::*; - match self { - Update { .. } => "update", - Notification { .. } => "notification", - Delete { .. } => "delete", - Announcement { .. } => "announcement", - AnnouncementReaction { .. } => "announcement.reaction", - AnnouncementDelete { .. } => "announcement.delete", - Conversation { .. } => "conversation", - FiltersChanged => "filters_changed", - } - .to_string() - } - - - pub fn payload(&self) -> Option { - use Event::*; - match self { - Update { payload: status, .. } => Some(escaped(status)), - Notification { payload: notification, .. } => Some(escaped(notification)), - Delete { payload: id, .. } => Some(id.0.clone()), - Announcement { payload: announcement, .. } => Some(escaped(announcement)), - AnnouncementReaction { payload: reaction, .. } => Some(escaped(reaction)), - AnnouncementDelete { payload: id, .. } => Some(id.0.clone()), - Conversation { payload: conversation, ..} => Some(escaped(conversation)), - FiltersChanged => None, - } - } - pub fn to_json_string(&self) -> String { - let event = &self.event_name(); - let sendable_event = match self.payload() { - Some(payload) => SendableEvent::WithPayload { event, payload }, - None => SendableEvent::NoPayload { event }, - }; - serde_json::to_string(&sendable_event) - .unwrap_or_else(|_| log_fatal!("Could not serialize `{:?}`", &sendable_event)) - } -} - -fn escaped(content: T) -> String { - serde_json::to_string(&content) - .unwrap_or_else(|_| log_fatal!("Could not parse Event with: `{:?}`", &content)) -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub struct Conversation { - id: String, - accounts: Vec, - unread: bool, - last_status: Option, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub struct DeletedId(String); - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub struct Status { - id: String, - uri: String, - created_at: String, - account: Account, - content: String, - visibility: Visibility, - sensitive: bool, - spoiler_text: String, - media_attachments: Vec, - application: Option, // Should be non-optional? - mentions: Vec, - tags: Vec, - emojis: Vec, - reblogs_count: i64, - favourites_count: i64, - replies_count: i64, - url: Option, - in_reply_to_id: Option, - in_reply_to_account_id: Option, - reblog: Option>, - poll: Option, - card: Option, - language: Option, - text: Option, - // ↓↓↓ Only for authorized users - favourited: Option, - reblogged: Option, - muted: Option, - bookmarked: Option, - pinned: Option, -} - -#[serde(rename_all = "lowercase", deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub enum Visibility { - Public, - Unlisted, - Private, - Direct, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub struct Account { - id: String, - username: String, - acct: String, - url: String, - display_name: String, - note: String, - avatar: String, - avatar_static: String, - header: String, - header_static: String, - locked: bool, - emojis: Vec, - discoverable: Option, // Shouldn't be option? - created_at: String, - statuses_count: i64, - followers_count: i64, - following_count: i64, - moved: Option>, - fields: Option>, - bot: Option, - source: Option, - group: Option, // undocumented - last_status_at: Option, // undocumented -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -struct Attachment { - id: String, - r#type: AttachmentType, - url: String, - preview_url: String, - remote_url: Option, - text_url: Option, - meta: Option, - description: Option, - blurhash: Option, -} - -#[serde(rename_all = "lowercase", deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -enum AttachmentType { - Unknown, - Image, - Gifv, - Video, - Audio, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub struct Application { - name: String, - website: Option, - vapid_key: Option, - client_id: Option, - client_secret: Option, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -struct Emoji { - shortcode: String, - url: String, - static_url: String, - visible_in_picker: bool, - category: Option, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -struct Field { - name: String, - value: String, - verified_at: Option, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -struct Source { - note: String, - fields: Vec, - privacy: Option, - sensitive: bool, - language: String, - follow_requests_count: i64, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub struct Mention { - id: String, - username: String, - acct: String, - url: String, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -struct Tag { - name: String, - url: String, - history: Option>, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -struct Poll { - id: String, - expires_at: String, - expired: bool, - multiple: bool, - votes_count: i64, - voters_count: Option, - voted: Option, - own_votes: Option>, - options: Vec, - emojis: Vec, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -struct PollOptions { - title: String, - votes_count: Option, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -struct Card { - url: String, - title: String, - description: String, - r#type: CardType, - author_name: Option, - author_url: Option, - provider_name: Option, - provider_url: Option, - html: Option, - width: Option, - height: Option, - image: Option, - embed_url: Option, -} - -#[serde(rename_all = "lowercase", deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -enum CardType { - Link, - Photo, - Video, - Rich, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -struct History { - day: String, - uses: String, - accounts: String, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub struct Notification { - id: String, - r#type: NotificationType, - created_at: String, - account: Account, - status: Option, -} - -#[serde(rename_all = "snake_case", deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -enum NotificationType { - Follow, - FollowRequest, // Undocumented - Mention, - Reblog, - Favourite, - Poll, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub struct Announcement { - // Fully undocumented - id: String, - tags: Vec, - all_day: bool, - content: String, - emojis: Vec, - starts_at: Option, - ends_at: Option, - published_at: String, - updated_at: String, - mentions: Vec, - reactions: Vec, -} - -#[serde(deny_unknown_fields)] -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub struct AnnouncementReaction { - #[serde(skip_serializing_if = "Option::is_none")] - announcement_id: Option, - count: i64, - name: String, -} - -impl Status { - /// Returns `true` if the status is filtered out based on its language - pub fn language_not_allowed(&self, allowed_langs: &HashSet) -> bool { - const ALLOW: bool = false; - const REJECT: bool = true; - - let reject_and_maybe_log = |toot_language| { - log::info!("Filtering out toot from `{}`", &self.account.acct); - log::info!("Toot language: `{}`", toot_language); - log::info!("Recipient's allowed languages: `{:?}`", allowed_langs); - REJECT - }; - if allowed_langs.is_empty() { - return ALLOW; // listing no allowed_langs results in allowing all languages - } - - match self.language.as_ref() { - Some(toot_language) if allowed_langs.contains(toot_language) => ALLOW, - None => ALLOW, // If toot language is unknown, toot is always allowed - Some(empty) if empty == &String::new() => ALLOW, - Some(toot_language) => reject_and_maybe_log(toot_language), - } - } - - /// Returns `true` if this toot originated from a domain the User has blocked. - pub fn from_blocked_domain(&self, blocked_domains: &HashSet) -> bool { - let full_username = &self.account.acct; - - match full_username.split('@').nth(1) { - Some(originating_domain) => blocked_domains.contains(originating_domain), - None => false, // None means the user is on the local instance, which can't be blocked - } - } - /// Returns `true` if the Status is from an account that has blocked the current user. - pub fn from_blocking_user(&self, blocking_users: &HashSet) -> bool { - const ALLOW: bool = false; - const REJECT: bool = true; - let err = |_| log_fatal!("Could not process `account.id` in {:?}", &self); - - if blocking_users.contains(&self.account.id.parse().unwrap_or_else(err)) { - REJECT - } else { - ALLOW - } - } - - /// Returns `true` if the User's list of blocked and muted users includes a user - /// involved in this toot. - /// - /// A user is involved if they: - /// * Are mentioned in this toot - /// * Wrote this toot - /// * Wrote a toot that this toot is replying to (if any) - /// * Wrote the toot that this toot is boosting (if any) - pub fn involves_blocked_user(&self, blocked_users: &HashSet) -> bool { - const ALLOW: bool = false; - const REJECT: bool = true; - let err = |_| log_fatal!("Could not process an `id` field in {:?}", &self); - - // involved_users = mentioned_users + author + replied-to user + boosted user - let mut involved_users: HashSet = self - .mentions - .iter() - .map(|mention| mention.id.parse().unwrap_or_else(err)) - .collect(); - - involved_users.insert(self.account.id.parse::().unwrap_or_else(err)); - - if let Some(replied_to_account_id) = self.in_reply_to_account_id.clone() { - involved_users.insert(replied_to_account_id.parse().unwrap_or_else(err)); - } - - if let Some(boosted_status) = self.reblog.clone() { - involved_users.insert(boosted_status.account.id.parse().unwrap_or_else(err)); - } - - if involved_users.is_disjoint(blocked_users) { - ALLOW - } else { - REJECT - } - } -} - -#[cfg(test)] -mod test; +pub use event::{CheckedEvent, DynamicEvent, Event}; diff --git a/src/messages/test.rs b/src/messages/test.rs index 7dd033a..12b0190 100644 --- a/src/messages/test.rs +++ b/src/messages/test.rs @@ -1,3 +1,47 @@ +use super::*; +use std::collections::HashMap; + +#[serde(rename_all = "snake_case")] +#[derive(Deserialize, Debug, Clone, PartialEq)] +pub enum Event { + Update, + Notification, + Delete, + FiltersChanged, + Announcement, + #[serde(rename( + serialize = "announcement.reaction", + deserialize = "announcement.reaction" + ))] + AnnouncementReaction, + #[serde(rename(serialize = "announcement.delete", deserialize = "announcement.delete"))] + AnnouncementDelete, + Conversation, +} + +#[serde(rename_all = "snake_case", tag = "event")] +#[derive(Deserialize, Debug, Clone, PartialEq)] +pub struct Msg { + event: Event, + queued_at: Option, +} +#[test] +fn parse_redis_msg_to() { + let input = REDIS_MSG_EVENT_TXT; + let msg: Msg = serde_json::from_str(input).expect("TODO"); + dbg!(&msg); + let raw_txt = match (msg.event, msg.queued_at) { + (Event::Update, Some(n)) => { + &input[r#"{"event":"update","payload":"#.len() + ..input.len() - format!(",\"queued_at\":{}", n).len()] + } + _ => unimplemented!(), + }; + dbg!(raw_txt); + panic!("TODO"); +} + +fn parse_redis_msg_to_old() {} // TODO: Revise these tests to cover *only* the RedisMessage -> (Timeline, Event) parsing // use super::*; // use crate::{ @@ -507,3 +551,5 @@ // assert_eq!(rest, String::new()); // Ok(()) // } + +const REDIS_MSG_EVENT_TXT: &str = r#"{"event":"update","payload":{"id":"102775370117886890","created_at":"2019-09-11T18:42:19.000Z","in_reply_to_id":null,"in_reply_to_account_id":null,"sensitive":false,"spoiler_text":"","visibility":"unlisted","language":"en","uri":"https://mastodon.host/users/federationbot/statuses/102775346916917099","url":"https://mastodon.host/@federationbot/102775346916917099","replies_count":0,"reblogs_count":0,"favourites_count":0,"favourited":false,"reblogged":false,"muted":false,"content":"

Trending tags:
#neverforget
#4styles
#newpipe
#uber
#mercredifiction

","reblog":null,"account":{"id":"78","username":"federationbot","acct":"federationbot@mastodon.host","display_name":"Federation Bot","locked":false,"bot":false,"created_at":"2019-09-10T15:04:25.559Z","note":"

Hello, I am mastodon.host official semi bot.

Follow me if you want to have some updates on the view of the fediverse from here ( I only post unlisted ).

I also randomly boost one of my followers toot every hour !

If you don't feel confortable with me following you, tell me: unfollow and I'll do it :)

If you want me to follow you, just tell me follow !

If you want automatic follow for new users on your instance and you are an instance admin, contact me !

Other commands are private :)

","url":"https://mastodon.host/@federationbot","avatar":"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863","avatar_static":"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863","header":"https://instance.codesections.com/headers/original/missing.png","header_static":"https://instance.codesections.com/headers/original/missing.png","followers_count":16636,"following_count":179532,"statuses_count":50554,"emojis":[],"fields":[{"name":"More stats","value":"https://mastodon.host/stats.html","verified_at":null},{"name":"More infos","value":"https://mastodon.host/about/more","verified_at":null},{"name":"Owner/Friend","value":"@gled","verified_at":null}]},"media_attachments":[],"mentions":[],"tags":[{"name":"4styles","url":"https://instance.codesections.com/tags/4styles"},{"name":"neverforget","url":"https://instance.codesections.com/tags/neverforget"},{"name":"mercredifiction","url":"https://instance.codesections.com/tags/mercredifiction"},{"name":"uber","url":"https://instance.codesections.com/tags/uber"},{"name":"newpipe","url":"https://instance.codesections.com/tags/newpipe"}],"emojis":[],"card":null,"poll":null},"queued_at":1568227693541}"#; diff --git a/src/parse_client_request/mod.rs b/src/parse_client_request/mod.rs index 32a5b08..83b48ed 100644 --- a/src/parse_client_request/mod.rs +++ b/src/parse_client_request/mod.rs @@ -6,7 +6,7 @@ mod subscription; pub use self::postgres::PgPool; // TODO consider whether we can remove `Stream` from public API -pub use subscription::{Stream, Subscription, Timeline}; +pub use subscription::{Blocks, Stream, Subscription, Timeline}; //#[cfg(test)] pub use subscription::{Content, Reach}; diff --git a/src/redis_to_client_stream/client_agent.rs b/src/redis_to_client_stream/client_agent.rs index 1948c63..187cd1d 100644 --- a/src/redis_to_client_stream/client_agent.rs +++ b/src/redis_to_client_stream/client_agent.rs @@ -90,35 +90,28 @@ impl futures::stream::Stream for ClientAgent { receiver.poll_for(self.subscription.id, self.subscription.timeline) }; + let timeline = &self.subscription.timeline; let allowed_langs = &self.subscription.allowed_langs; - let blocked_users = &self.subscription.blocks.blocked_users; - let blocking_users = &self.subscription.blocks.blocking_users; - let blocked_domains = &self.subscription.blocks.blocked_domains; + let blocks = &self.subscription.blocks; let (send, block) = (|msg| Ok(Ready(Some(msg))), Ok(NotReady)); - use Event::*; + use crate::messages::{CheckedEvent::Update, Event::*}; match result { Ok(Async::Ready(Some(event))) => match event { - Update { - payload: status, .. - } => match self.subscription.timeline { - _ if status.involves_blocked_user(blocked_users) => block, - _ if status.from_blocked_domain(blocked_domains) => block, - _ if status.from_blocking_user(blocking_users) => block, - Timeline(Public, _, _) if status.language_not_allowed(allowed_langs) => block, - _ => send(Update { - payload: status, - queued_at: None, - }), + TypeSafe(Update { payload, queued_at }) => match timeline { + Timeline(Public, _, _) if payload.language_not(allowed_langs) => block, + _ if payload.involves_any(blocks) => block, + _ => send(TypeSafe(Update { payload, queued_at })), }, - Notification { .. } - | Conversation { .. } - | Delete { .. } - | FiltersChanged - | Announcement { .. } - | AnnouncementReaction { .. } - | AnnouncementDelete { .. } => send(event), + TypeSafe(non_update) => send(Event::TypeSafe(non_update)), + Dynamic(event) if event.event == "update" => match timeline { + Timeline(Public, _, _) if event.language_not(allowed_langs) => block, + _ if event.involves_any(blocks) => block, + _ => send(Dynamic(event)), + }, + Dynamic(non_update) => send(Dynamic(non_update)), }, + Ok(Ready(None)) => Ok(Ready(None)), Ok(NotReady) => Ok(NotReady), Err(e) => Err(e), diff --git a/src/redis_to_client_stream/event_stream.rs b/src/redis_to_client_stream/event_stream.rs index 11705b0..5eb994b 100644 --- a/src/redis_to_client_stream/event_stream.rs +++ b/src/redis_to_client_stream/event_stream.rs @@ -61,11 +61,12 @@ impl EventStream { event_stream .for_each(move |_instant| { match client_agent.poll() { - Ok(Async::Ready(Some(msg))) => tx - .unbounded_send(Message::text(msg.to_json_string())) - .unwrap_or_else(|e| { - log::error!("Could not send message to WebSocket: {}", e) - }), + Ok(Async::Ready(Some(msg))) => { + tx.unbounded_send(Message::text(msg.to_json_string())) + .unwrap_or_else(|e| { + log::error!("Could not send message to WebSocket: {}", e) + }); + } Ok(Async::Ready(None)) => log::info!("WebSocket ClientAgent got Ready(None)"), Ok(Async::NotReady) if last_ping_time.elapsed() > Duration::from_secs(30) => { tx.unbounded_send(Message::text("{}")).unwrap_or_else(|e| { diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index 4b73844..3f419d1 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -5,3 +5,6 @@ mod receiver; mod redis; pub use {client_agent::ClientAgent, event_stream::EventStream, receiver::Receiver}; + +#[cfg(feature = "bench")] +pub use redis::redis_msg::{RedisMsg, RedisParseOutput}; diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index 63fa863..3e2e07b 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -76,13 +76,14 @@ impl Receiver { pub fn poll_for(&mut self, id: Uuid, timeline: Timeline) -> Poll, ReceiverErr> { loop { match self.redis_connection.poll_redis() { - Ok(Async::Ready(Some((timeline, event)))) => self - .msg_queues - .values_mut() - .filter(|msg_queue| msg_queue.timeline == timeline) - .for_each(|msg_queue| { - msg_queue.messages.push_back(event.clone()); - }), + Ok(Async::Ready(Some((timeline, event)))) => { + self.msg_queues + .values_mut() + .filter(|msg_queue| msg_queue.timeline == timeline) + .for_each(|msg_queue| { + msg_queue.messages.push_back(event.clone()); + }); + } Ok(Async::NotReady) => break, Ok(Async::Ready(None)) => (), Err(err) => Err(err)?, diff --git a/src/redis_to_client_stream/redis/redis_connection/mod.rs b/src/redis_to_client_stream/redis/redis_connection/mod.rs index 45fb3ca..3dadf01 100644 --- a/src/redis_to_client_stream/redis/redis_connection/mod.rs +++ b/src/redis_to_client_stream/redis/redis_connection/mod.rs @@ -82,13 +82,13 @@ impl RedisConn { Some(ns) if msg.timeline_txt.starts_with(&format!("{}:timeline:", ns)) => { let trimmed_tl_txt = &msg.timeline_txt[ns.len() + ":timeline:".len()..]; let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut self.tag_id_cache)?; - let event: Event = serde_json::from_str(msg.event_txt)?; + let event = msg.event_txt.into(); (Ok(Ready(Some((tl, event)))), msg.leftover_input) } None => { let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..]; let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut self.tag_id_cache)?; - let event: Event = serde_json::from_str(msg.event_txt)?; + let event = msg.event_txt.into(); (Ok(Ready(Some((tl, event)))), msg.leftover_input) } Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input), diff --git a/src/redis_to_client_stream/redis/redis_msg/test.rs b/src/redis_to_client_stream/redis/redis_msg/test.rs index b59760f..2107caf 100644 --- a/src/redis_to_client_stream/redis/redis_msg/test.rs +++ b/src/redis_to_client_stream/redis/redis_msg/test.rs @@ -34,6 +34,7 @@ fn parse_redis_detects_non_newline() -> Result<(), RedisParseErr> { Ok(()) } +#[test] fn parse_redis_msg() -> Result<(), RedisParseErr> { let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$38\r\n{\"event\":\"delete\",\"payload\":\"1038647\"}\r\n"; @@ -52,3 +53,23 @@ fn parse_redis_msg() -> Result<(), RedisParseErr> { assert_eq!(r_msg.event_txt, r#"{"event":"delete","payload":"1038647"}"#); Ok(()) } + +#[test] +fn parse_long_redis_msg() -> Result<(), RedisParseErr> { + let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS; + + let r_msg = match RedisParseOutput::try_from(input) { + Ok(NonMsg(leftover)) => panic!( + "Parsed a msg as a non-msg.\nInput `{}` parsed to NonMsg({:?})", + &input, leftover + ), + Ok(Msg(msg)) => msg, + Err(e) => panic!("Error in parsing subscribe command: {:?}", e), + }; + + assert!(r_msg.leftover_input.is_empty()); + assert_eq!(r_msg.timeline_txt, "timeline:1"); + Ok(()) +} + +const ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS: &str = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:1\r\n$3790\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102775370117886890\",\"created_at\":\"2019-09-11T18:42:19.000Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"unlisted\",\"language\":\"en\",\"uri\":\"https://mastodon.host/users/federationbot/statuses/102775346916917099\",\"url\":\"https://mastodon.host/@federationbot/102775346916917099\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"

Trending tags:
#neverforget
#4styles
#newpipe
#uber
#mercredifiction

\",\"reblog\":null,\"account\":{\"id\":\"78\",\"username\":\"federationbot\",\"acct\":\"federationbot@mastodon.host\",\"display_name\":\"Federation Bot\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-09-10T15:04:25.559Z\",\"note\":\"

Hello, I am mastodon.host official semi bot.

Follow me if you want to have some updates on the view of the fediverse from here ( I only post unlisted ).

I also randomly boost one of my followers toot every hour !

If you don\'t feel confortable with me following you, tell me: unfollow and I\'ll do it :)

If you want me to follow you, just tell me follow !

If you want automatic follow for new users on your instance and you are an instance admin, contact me !

Other commands are private :)

\",\"url\":\"https://mastodon.host/@federationbot\",\"avatar\":\"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863\",\"avatar_static\":\"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":16636,\"following_count\":179532,\"statuses_count\":50554,\"emojis\":[],\"fields\":[{\"name\":\"More stats\",\"value\":\"https://mastodon.host/stats.html\",\"verified_at\":null},{\"name\":\"More infos\",\"value\":\"https://mastodon.host/about/more\",\"verified_at\":null},{\"name\":\"Owner/Friend\",\"value\":\"@gled\",\"verified_at\":null}]},\"media_attachments\":[],\"mentions\":[],\"tags\":[{\"name\":\"4styles\",\"url\":\"https://instance.codesections.com/tags/4styles\"},{\"name\":\"neverforget\",\"url\":\"https://instance.codesections.com/tags/neverforget\"},{\"name\":\"mercredifiction\",\"url\":\"https://instance.codesections.com/tags/mercredifiction\"},{\"name\":\"uber\",\"url\":\"https://instance.codesections.com/tags/uber\"},{\"name\":\"newpipe\",\"url\":\"https://instance.codesections.com/tags/newpipe\"}],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1568227693541}\r\n";