From 2dd9ccbf9120fd21284a37220ab6b0c32fb04731 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Wed, 25 Mar 2020 17:50:32 -0400 Subject: [PATCH] Performance tuning (#108) * Initial implementation WIP * Add Event type for faster parsing * Add tests and benchmarks * Add additional parsing tests --- Cargo.lock | 5 +- Cargo.toml | 10 +- benches/parse_redis.rs | 400 ++++++-- benches/read_time.rs | 23 - src/lib.rs | 1 + src/messages.rs | 941 ++++++++++++++++++ src/parse_client_request/subscription/mod.rs | 4 +- src/redis_to_client_stream/client_agent.rs | 39 +- src/redis_to_client_stream/message.rs | 176 +--- src/redis_to_client_stream/mod.rs | 54 +- .../receiver/message_queues.rs | 37 +- src/redis_to_client_stream/receiver/mod.rs | 56 +- src/redis_to_client_stream/redis/mod.rs | 28 - src/redis_to_client_stream/redis/redis_msg.rs | 153 ++- .../redis/redis_stream.rs | 104 +- tc --explain E0106 | 53 + 16 files changed, 1609 insertions(+), 475 deletions(-) delete mode 100644 benches/read_time.rs create mode 100644 src/messages.rs create mode 100644 tc --explain E0106 diff --git a/Cargo.lock b/Cargo.lock index 60d5ef3..e33dbe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -440,7 +440,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.6.3" +version = "0.6.4" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1660,6 +1660,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "serde" version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", +] [[package]] name = "serde_derive" diff --git a/Cargo.toml b/Cargo.toml index c73262b..0b5248e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,18 +1,18 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.6.4" +version = "0.6.5" authors = ["Daniel Long Sockwell "] edition = "2018" [dependencies] -log = "0.4.6" +log = { version = "0.4.6", features = ["release_max_level_info"] } futures = "0.1.26" tokio = "0.1.19" warp = { git = "https://github.com/seanmonstar/warp.git"} +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.39" serde_derive = "1.0.90" -serde = "1.0.90" pretty_env_logger = "0.3.0" postgres = "0.17.0" uuid = { version = "0.7", features = ["v4"] } @@ -33,10 +33,6 @@ regex = "1.3.2" name = "parse_redis" harness = false -[[bench]] -name = "read_time" -harness = false - [features] default = [ "production" ] diff --git a/benches/parse_redis.rs b/benches/parse_redis.rs index b696e71..a683583 100644 --- a/benches/parse_redis.rs +++ b/benches/parse_redis.rs @@ -2,111 +2,339 @@ use criterion::black_box; use criterion::criterion_group; use criterion::criterion_main; use criterion::Criterion; -use flodgatt::redis_to_client_stream::redis::redis_msg::RedisMsg; -use regex::Regex; -use serde_json::Value; -fn regex_parse(input: String) -> Vec<(String, Value)> { - let mut output = Vec::new(); - if input.ends_with("}\r\n") { - // Every valid message is tagged with the string `message`. This means 3 things: - // 1) We can discard everything before the first `message` (with `skip(1)`) - // 2) We can split into separate messages by splitting on `message` - // 3) We can use a regex that discards everything after the *first* valid - // message (since the next message will have a new `message` tag) +const ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS: &str = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:1\r\n$3790\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102775370117886890\",\"created_at\":\"2019-09-11T18:42:19.000Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"unlisted\",\"language\":\"en\",\"uri\":\"https://mastodon.host/users/federationbot/statuses/102775346916917099\",\"url\":\"https://mastodon.host/@federationbot/102775346916917099\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"

Trending tags:
#neverforget
#4styles
#newpipe
#uber
#mercredifiction

\",\"reblog\":null,\"account\":{\"id\":\"78\",\"username\":\"federationbot\",\"acct\":\"federationbot@mastodon.host\",\"display_name\":\"Federation Bot\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-09-10T15:04:25.559Z\",\"note\":\"

Hello, I am mastodon.host official semi bot.

Follow me if you want to have some updates on the view of the fediverse from here ( I only post unlisted ).

I also randomly boost one of my followers toot every hour !

If you don\'t feel confortable with me following you, tell me: unfollow and I\'ll do it :)

If you want me to follow you, just tell me follow !

If you want automatic follow for new users on your instance and you are an instance admin, contact me !

Other commands are private :)

\",\"url\":\"https://mastodon.host/@federationbot\",\"avatar\":\"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863\",\"avatar_static\":\"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":16636,\"following_count\":179532,\"statuses_count\":50554,\"emojis\":[],\"fields\":[{\"name\":\"More stats\",\"value\":\"https://mastodon.host/stats.html\",\"verified_at\":null},{\"name\":\"More infos\",\"value\":\"https://mastodon.host/about/more\",\"verified_at\":null},{\"name\":\"Owner/Friend\",\"value\":\"@gled\",\"verified_at\":null}]},\"media_attachments\":[],\"mentions\":[],\"tags\":[{\"name\":\"4styles\",\"url\":\"https://instance.codesections.com/tags/4styles\"},{\"name\":\"neverforget\",\"url\":\"https://instance.codesections.com/tags/neverforget\"},{\"name\":\"mercredifiction\",\"url\":\"https://instance.codesections.com/tags/mercredifiction\"},{\"name\":\"uber\",\"url\":\"https://instance.codesections.com/tags/uber\"},{\"name\":\"newpipe\",\"url\":\"https://instance.codesections.com/tags/newpipe\"}],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1568227693541}\r\n"; - let messages = input.as_str().split("message").skip(1); - let regex = Regex::new(r"timeline:(?P.*?)\r\n\$\d+\r\n(?P.*?)\r\n") - .expect("Hard-codded"); - for message in messages { - let timeline = - regex.captures(message).expect("Hard-coded timeline regex")["timeline"].to_string(); +/// Parses the Redis message using a Regex. +/// +/// The naive approach from Flodgatt's proof-of-concept stage. +mod regex_parse { + use regex::Regex; + use serde_json::Value; - let redis_msg: Value = serde_json::from_str( - ®ex.captures(message).expect("Hard-coded value regex")["value"], - ) - .expect("Valid json"); + pub fn to_json_value(input: String) -> Value { + if input.ends_with("}\r\n") { + let messages = input.as_str().split("message").skip(1); + let regex = Regex::new(r"timeline:(?P.*?)\r\n\$\d+\r\n(?P.*?)\r\n") + .expect("Hard-codded"); + for message in messages { + let _timeline = regex.captures(message).expect("Hard-coded timeline regex") + ["timeline"] + .to_string(); - output.push((timeline, redis_msg)); + let redis_msg: Value = serde_json::from_str( + ®ex.captures(message).expect("Hard-coded value regex")["value"], + ) + .expect("Valid json"); + + return redis_msg; + } + unreachable!() + } else { + unreachable!() } } - output } -fn hand_parse(input: String) -> Vec<(String, Value)> { - let mut output = Vec::new(); - if input.ends_with("}\r\n") { - let end = 2; - let (end, _) = print_next_str(end, &input); - let (end, timeline) = print_next_str(end, &input); - let (_, msg) = print_next_str(end, &input); - let redis_msg: Value = serde_json::from_str(&msg).unwrap(); - output.push((timeline, redis_msg)); - } - output -} +/// Parse with a simplified inline iterator. +/// +/// Essentially shows best-case performance for producing a serde_json::Value. +mod parse_inline { + use serde_json::Value; + pub fn to_json_value(input: String) -> Value { + fn print_next_str(mut end: usize, input: &str) -> (usize, String) { + let mut start = end + 3; + end = start + 1; -fn print_next_str(mut end: usize, input: &str) -> (usize, String) { - let mut start = end + 3; - end = start + 1; + let mut iter = input.chars(); + iter.nth(start); - let mut iter = input.chars(); - iter.nth(start); - - while iter.next().unwrap().is_digit(10) { - end += 1; - } - let length = &input[start..end].parse::().unwrap(); - start = end + 2; - end = start + length; - - let string = &input[start..end]; - (end, string.to_string()) -} - -fn parse_with_stuct(input: &str) -> Vec<(String, Value)> { - let mut output = Vec::new(); - let mut incoming_raw_msg = input; - - while incoming_raw_msg.len() > 0 { - let mut msg = RedisMsg::from_raw(incoming_raw_msg, "timeline".len()); - let command = msg.next_field(); - match command.as_str() { - "message" => { - let timeline = msg.next_field()["timeline:".len()..].to_string(); - let message: Value = serde_json::from_str(&msg.next_field()).unwrap(); - output.push((timeline, message)); + while iter.next().unwrap().is_digit(10) { + end += 1; } - "subscribe" | "unsubscribe" => { - // This returns a confirmation. We don't need to do anything with it, - // but we do need to advance the cursor past it - msg.next_field(); // name of channel (un)subscribed - msg.cursor += ":".len(); - msg.process_number(); // The number of active subscriptions - msg.cursor += "\r\n".len(); - } - cmd => panic!( - "Invariant violation: bad Redis input. Got {} as a command", - cmd - ), + let length = &input[start..end].parse::().unwrap(); + start = end + 2; + end = start + length; + + let string = &input[start..end]; + (end, string.to_string()) + } + + if input.ends_with("}\r\n") { + let end = 2; + let (end, _) = print_next_str(end, &input); + let (end, _timeline) = print_next_str(end, &input); + let (_, msg) = print_next_str(end, &input); + let redis_msg: Value = serde_json::from_str(&msg).unwrap(); + redis_msg + } else { + unreachable!() } - incoming_raw_msg = &msg.raw[msg.cursor..]; } - output +} + +/// Parse using Flodgatt's current functions +mod flodgatt_parse_event { + use flodgatt::{messages::Event, redis_to_client_stream::receiver::MessageQueues}; + use flodgatt::{ + parse_client_request::subscription::Timeline, + redis_to_client_stream::{receiver::MsgQueue, redis::redis_stream}, + }; + use lru::LruCache; + use std::collections::HashMap; + use uuid::Uuid; + + /// One-time setup, not included in testing time. + pub fn setup() -> MessageQueues { + let mut queues_map = HashMap::new(); + let id = Uuid::default(); + let timeline = Timeline::from_redis_raw_timeline("1", None); + queues_map.insert(id, MsgQueue::new(timeline)); + let queues = MessageQueues(queues_map); + queues + } + + pub fn to_event_struct( + input: String, + mut cache: &mut LruCache, + mut queues: &mut MessageQueues, + id: Uuid, + timeline: Timeline, + ) -> Event { + redis_stream::process_messages(input, &mut None, &mut cache, &mut queues).unwrap(); + queues + .oldest_msg_in_target_queue(id, timeline) + .expect("In test") + } +} + +/// Parse using modified a modified version of Flodgatt's current function. +/// +/// This version is modified to return a serde_json::Value instead of an Event to shows +/// the performance we would see if we used serde's built-in method for handling weakly +/// typed JSON instead of our own strongly typed struct. +mod flodgatt_parse_value { + use flodgatt::{log_fatal, parse_client_request::subscription::Timeline}; + use lru::LruCache; + use serde_json::Value; + use std::{ + collections::{HashMap, VecDeque}, + time::Instant, + }; + use uuid::Uuid; + #[derive(Debug)] + pub struct RedisMsg<'a> { + pub raw: &'a str, + pub cursor: usize, + pub prefix_len: usize, + } + + impl<'a> RedisMsg<'a> { + pub fn from_raw(raw: &'a str, prefix_len: usize) -> Self { + Self { + raw, + cursor: "*3\r\n".len(), //length of intro header + prefix_len, + } + } + + /// Move the cursor from the beginning of a number through its end and return the number + pub fn process_number(&mut self) -> usize { + let (mut selected_number, selection_start) = (0, self.cursor); + while let Ok(number) = self.raw[selection_start..=self.cursor].parse::() { + self.cursor += 1; + selected_number = number; + } + selected_number + } + + /// In a pubsub reply from Redis, an item can be either the name of the subscribed channel + /// or the msg payload. Either way, it follows the same format: + /// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n` + pub fn next_field(&mut self) -> String { + self.cursor += "$".len(); + + let item_len = self.process_number(); + self.cursor += "\r\n".len(); + let item_start_position = self.cursor; + self.cursor += item_len; + let item = self.raw[item_start_position..self.cursor].to_string(); + self.cursor += "\r\n".len(); + item + } + + pub fn extract_raw_timeline_and_message(&mut self) -> (String, Value) { + let timeline = &self.next_field()[self.prefix_len..]; + let msg_txt = self.next_field(); + let msg_value: Value = serde_json::from_str(&msg_txt) + .unwrap_or_else(|_| log_fatal!("Invalid JSON from Redis: {:?}", &msg_txt)); + (timeline.to_string(), msg_value) + } + } + + pub struct MsgQueue { + pub timeline: Timeline, + pub messages: VecDeque, + _last_polled_at: Instant, + } + + pub struct MessageQueues(HashMap); + impl std::ops::Deref for MessageQueues { + type Target = HashMap; + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl std::ops::DerefMut for MessageQueues { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } + } + + impl MessageQueues { + pub fn oldest_msg_in_target_queue( + &mut self, + id: Uuid, + timeline: Timeline, + ) -> Option { + self.entry(id) + .or_insert_with(|| MsgQueue::new(timeline)) + .messages + .pop_front() + } + } + + impl MsgQueue { + pub fn new(timeline: Timeline) -> Self { + MsgQueue { + messages: VecDeque::new(), + _last_polled_at: Instant::now(), + timeline, + } + } + } + + pub fn process_msg( + raw_utf: String, + namespace: &Option, + hashtag_id_cache: &mut LruCache, + queues: &mut MessageQueues, + ) { + // Only act if we have a full message (end on a msg boundary) + if !raw_utf.ends_with("}\r\n") { + return; + }; + let prefix_to_skip = match namespace { + Some(namespace) => format!("{}:timeline:", namespace), + None => "timeline:".to_string(), + }; + + let mut msg = RedisMsg::from_raw(&raw_utf, prefix_to_skip.len()); + + while !msg.raw.is_empty() { + let command = msg.next_field(); + match command.as_str() { + "message" => { + let (raw_timeline, msg_value) = msg.extract_raw_timeline_and_message(); + let hashtag = hashtag_from_timeline(&raw_timeline, hashtag_id_cache); + let timeline = Timeline::from_redis_raw_timeline(&raw_timeline, hashtag); + for msg_queue in queues.values_mut() { + if msg_queue.timeline == timeline { + msg_queue.messages.push_back(msg_value.clone()); + } + } + } + + "subscribe" | "unsubscribe" => { + // No msg, so ignore & advance cursor to end + let _channel = msg.next_field(); + msg.cursor += ":".len(); + let _active_subscriptions = msg.process_number(); + msg.cursor += "\r\n".len(); + } + cmd => panic!("Invariant violation: {} is unexpected Redis output", cmd), + }; + msg = RedisMsg::from_raw(&msg.raw[msg.cursor..], msg.prefix_len); + } + } + + fn hashtag_from_timeline( + raw_timeline: &str, + hashtag_id_cache: &mut LruCache, + ) -> Option { + if raw_timeline.starts_with("hashtag") { + let tag_name = raw_timeline + .split(':') + .nth(1) + .unwrap_or_else(|| log_fatal!("No hashtag found in `{}`", raw_timeline)) + .to_string(); + + let tag_id = *hashtag_id_cache + .get(&tag_name) + .unwrap_or_else(|| log_fatal!("No cached id for `{}`", tag_name)); + Some(tag_id) + } else { + None + } + } + pub fn setup() -> (LruCache, MessageQueues, Uuid, Timeline) { + let cache: LruCache = LruCache::new(1000); + let mut queues_map = HashMap::new(); + let id = Uuid::default(); + let timeline = Timeline::from_redis_raw_timeline("1", None); + queues_map.insert(id, MsgQueue::new(timeline)); + let queues = MessageQueues(queues_map); + (cache, queues, id, timeline) + } + + pub fn to_json_value( + input: String, + mut cache: &mut LruCache, + mut queues: &mut MessageQueues, + id: Uuid, + timeline: Timeline, + ) -> Value { + process_msg(input, &None, &mut cache, &mut queues); + queues + .oldest_msg_in_target_queue(id, timeline) + .expect("In test") + } } fn criterion_benchmark(c: &mut Criterion) { - let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:1\r\n$3790\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102775370117886890\",\"created_at\":\"2019-09-11T18:42:19.000Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"unlisted\",\"language\":\"en\",\"uri\":\"https://mastodon.host/users/federationbot/statuses/102775346916917099\",\"url\":\"https://mastodon.host/@federationbot/102775346916917099\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"

Trending tags:
#neverforget
#4styles
#newpipe
#uber
#mercredifiction

\",\"reblog\":null,\"account\":{\"id\":\"78\",\"username\":\"federationbot\",\"acct\":\"federationbot@mastodon.host\",\"display_name\":\"Federation Bot\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-09-10T15:04:25.559Z\",\"note\":\"

Hello, I am mastodon.host official semi bot.

Follow me if you want to have some updates on the view of the fediverse from here ( I only post unlisted ).

I also randomly boost one of my followers toot every hour !

If you don\'t feel confortable with me following you, tell me: unfollow and I\'ll do it :)

If you want me to follow you, just tell me follow !

If you want automatic follow for new users on your instance and you are an instance admin, contact me !

Other commands are private :)

\",\"url\":\"https://mastodon.host/@federationbot\",\"avatar\":\"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863\",\"avatar_static\":\"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":16636,\"following_count\":179532,\"statuses_count\":50554,\"emojis\":[],\"fields\":[{\"name\":\"More stats\",\"value\":\"https://mastodon.host/stats.html\",\"verified_at\":null},{\"name\":\"More infos\",\"value\":\"https://mastodon.host/about/more\",\"verified_at\":null},{\"name\":\"Owner/Friend\",\"value\":\"@gled\",\"verified_at\":null}]},\"media_attachments\":[],\"mentions\":[],\"tags\":[{\"name\":\"4styles\",\"url\":\"https://instance.codesections.com/tags/4styles\"},{\"name\":\"neverforget\",\"url\":\"https://instance.codesections.com/tags/neverforget\"},{\"name\":\"mercredifiction\",\"url\":\"https://instance.codesections.com/tags/mercredifiction\"},{\"name\":\"uber\",\"url\":\"https://instance.codesections.com/tags/uber\"},{\"name\":\"newpipe\",\"url\":\"https://instance.codesections.com/tags/newpipe\"}],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1568227693541}\r\n".to_string(); - + let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS.to_string(); //INPUT.to_string(); let mut group = c.benchmark_group("Parse redis RESP array"); - group.bench_function("regex parse", |b| { - b.iter(|| regex_parse(black_box(input.clone()))) + + // group.bench_function("parse to Value with a regex", |b| { + // b.iter(|| regex_parse::to_json_value(black_box(input.clone()))) + // }); + group.bench_function("parse to Value inline", |b| { + b.iter(|| parse_inline::to_json_value(black_box(input.clone()))) }); - group.bench_function("hand parse", |b| { - b.iter(|| hand_parse(black_box(input.clone()))) + let (mut cache, mut queues, id, timeline) = flodgatt_parse_value::setup(); + group.bench_function("parse to Value using Flodgatt functions", |b| { + b.iter(|| { + black_box(flodgatt_parse_value::to_json_value( + black_box(input.clone()), + black_box(&mut cache), + black_box(&mut queues), + black_box(id), + black_box(timeline), + )) + }) }); - group.bench_function("stuct parse", |b| { - b.iter(|| parse_with_stuct(black_box(&input))) + let mut queues = flodgatt_parse_event::setup(); + group.bench_function("parse to Event using Flodgatt functions", |b| { + b.iter(|| { + black_box(flodgatt_parse_event::to_event_struct( + black_box(input.clone()), + black_box(&mut cache), + black_box(&mut queues), + black_box(id), + black_box(timeline), + )) + }) }); } diff --git a/benches/read_time.rs b/benches/read_time.rs deleted file mode 100644 index fb45e6f..0000000 --- a/benches/read_time.rs +++ /dev/null @@ -1,23 +0,0 @@ -use criterion::black_box; -use criterion::criterion_group; -use criterion::criterion_main; -use criterion::Criterion; -use log; -use std::time; - -fn simple_read_time(_: &str) { - let start_time = time::Instant::now(); - - if start_time.elapsed() > time::Duration::from_millis(20) { - log::warn!("Polling took: {:?}", start_time.elapsed()); - } -} - -fn criterion_benchmark2(c: &mut Criterion) { - c.bench_function("read elapsed time", |b| { - b.iter(|| simple_read_time(black_box("foo"))) - }); -} - -criterion_group!(benches, criterion_benchmark2); -criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs index a1c374a..ede8963 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,5 +36,6 @@ //! pub mod config; pub mod err; +pub mod messages; pub mod parse_client_request; pub mod redis_to_client_stream; diff --git a/src/messages.rs b/src/messages.rs new file mode 100644 index 0000000..5facc8e --- /dev/null +++ b/src/messages.rs @@ -0,0 +1,941 @@ +use crate::log_fatal; +use serde::{Deserialize, Serialize}; +use serde_json; +use std::boxed::Box; +use std::{collections::HashSet, string::String}; + +#[serde(rename_all = "snake_case", tag = "event", deny_unknown_fields)] +#[rustfmt::skip] +#[derive(Deserialize, Debug, Clone, PartialEq)] +pub enum Event { + Update { payload: Status, queued_at: Option }, + Notification { payload: Notification }, + Delete { payload: DeletedId }, + FiltersChanged, + Announcement { payload: Announcement }, + #[serde(rename(serialize = "announcement.reaction", deserialize = "announcement.reaction"))] + AnnouncementReaction { payload: AnnouncementReaction }, + #[serde(rename(serialize = "announcement.delete", deserialize = "announcement.delete"))] + AnnouncementDelete { payload: DeletedId }, + Conversation { payload: Conversation, queued_at: Option }, +} + +#[derive(Serialize, Debug, Clone)] +#[serde(untagged)] +pub enum SendableEvent<'a> { + WithPayload { event: &'a str, payload: String }, + NoPayload { event: &'a str }, +} +#[rustfmt::skip] +impl Event { + pub fn event_name(&self) -> String { + use Event::*; + match self { + Update { .. } => "update", + Notification { .. } => "notification", + Delete { .. } => "delete", + Announcement { .. } => "announcement", + AnnouncementReaction { .. } => "announcement.reaction", + AnnouncementDelete { .. } => "announcement.delete", + Conversation { .. } => "conversation", + FiltersChanged => "filters_changed", + } + .to_string() + } + + + pub fn payload(&self) -> Option { + use Event::*; + match self { + Update { payload: status, .. } => Some(escaped(status)), + Notification { payload: notification, .. } => Some(escaped(notification)), + Delete { payload: id, .. } => Some(id.0.clone()), + Announcement { payload: announcement, .. } => Some(escaped(announcement)), + AnnouncementReaction { payload: reaction, .. } => Some(escaped(reaction)), + AnnouncementDelete { payload: id, .. } => Some(id.0.clone()), + Conversation { payload: conversation, ..} => Some(escaped(conversation)), + FiltersChanged => None, + } + } + pub fn to_json_string(&self) -> String { + let event = &self.event_name(); + let sendable_event = match self.payload() { + Some(payload) => SendableEvent::WithPayload { event, payload }, + None => SendableEvent::NoPayload { event }, + }; + serde_json::to_string(&sendable_event) + .unwrap_or_else(|_| log_fatal!("Could not serialize `{:?}`", &sendable_event)) + } +} + +fn escaped(content: T) -> String { + serde_json::to_string(&content) + .unwrap_or_else(|_| log_fatal!("Could not parse Event with: `{:?}`", &content)) +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Conversation { + id: String, + accounts: Vec, + unread: bool, + last_status: Option, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct DeletedId(String); + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Status { + id: String, + uri: String, + created_at: String, + account: Account, + content: String, + visibility: Visibility, + sensitive: bool, + spoiler_text: String, + media_attachments: Vec, + application: Option, // Should be non-optional? + mentions: Vec, + tags: Vec, + emojis: Vec, + reblogs_count: i64, + favourites_count: i64, + replies_count: i64, + url: Option, + in_reply_to_id: Option, + in_reply_to_account_id: Option, + reblog: Option>, + poll: Option, + card: Option, + language: Option, + text: Option, + // ↓↓↓ Only for authorized users + favourited: Option, + reblogged: Option, + muted: Option, + bookmarked: Option, + pinned: Option, +} + +#[serde(rename_all = "lowercase", deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub enum Visibility { + Public, + Unlisted, + Private, + Direct, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Account { + id: String, + username: String, + acct: String, + url: String, + display_name: String, + note: String, + avatar: String, + avatar_static: String, + header: String, + header_static: String, + locked: bool, + emojis: Vec, + discoverable: Option, // Shouldn't be option? + created_at: String, + statuses_count: i64, + followers_count: i64, + following_count: i64, + moved: Option>, + fields: Option>, + bot: Option, + source: Option, + group: Option, // undocumented + last_status_at: Option, // undocumented +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Attachment { + id: String, + r#type: AttachmentType, + url: String, + preview_url: String, + remote_url: Option, + text_url: Option, + meta: Option, + description: Option, + blurhash: Option, +} + +#[serde(rename_all = "lowercase", deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +enum AttachmentType { + Unknown, + Image, + Gifv, + Video, + Audio, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Application { + name: String, + website: Option, + vapid_key: Option, + client_id: Option, + client_secret: Option, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Emoji { + shortcode: String, + url: String, + static_url: String, + visible_in_picker: bool, + category: Option, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Field { + name: String, + value: String, + verified_at: Option, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Source { + note: String, + fields: Vec, + privacy: Option, + sensitive: bool, + language: String, + follow_requests_count: i64, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Mention { + id: String, + username: String, + acct: String, + url: String, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Tag { + name: String, + url: String, + history: Option>, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Poll { + id: String, + expires_at: String, + expired: bool, + multiple: bool, + votes_count: i64, + voters_count: Option, + voted: Option, + own_votes: Option>, + options: Vec, + emojis: Vec, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct PollOptions { + title: String, + votes_count: Option, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Card { + url: String, + title: String, + description: String, + r#type: CardType, + author_name: Option, + author_url: Option, + provider_name: Option, + provider_url: Option, + html: Option, + width: Option, + height: Option, + image: Option, + embed_url: Option, +} + +#[serde(rename_all = "lowercase", deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +enum CardType { + Link, + Photo, + Video, + Rich, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct History { + day: String, + uses: String, + accounts: String, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Notification { + id: String, + r#type: NotificationType, + created_at: String, + account: Account, + status: Option, +} + +#[serde(rename_all = "lowercase", deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +enum NotificationType { + Follow, + Mention, + Reblog, + Favourite, + Poll, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Announcement { + // Fully undocumented + id: String, + tags: Vec, + all_day: bool, + content: String, + emojis: Vec, + starts_at: Option, + ends_at: Option, + published_at: String, + updated_at: String, + mentions: Vec, + reactions: Vec, +} + +#[serde(deny_unknown_fields)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct AnnouncementReaction { + #[serde(skip_serializing_if = "Option::is_none")] + announcement_id: Option, + count: i64, + name: String, +} + +impl Status { + /// Returns `true` if the status is filtered out based on its language + pub fn language_not_allowed(&self, allowed_langs: &HashSet) -> bool { + const ALLOW: bool = false; + const REJECT: bool = true; + + let reject_and_maybe_log = |toot_language| { + log::info!("Filtering out toot from `{}`", &self.account.acct); + log::info!("Toot language: `{}`", toot_language); + log::info!("Recipient's allowed languages: `{:?}`", allowed_langs); + REJECT + }; + if allowed_langs.is_empty() { + return ALLOW; // listing no allowed_langs results in allowing all languages + } + + match self.language.as_ref() { + Some(toot_language) if allowed_langs.contains(toot_language) => ALLOW, + None => ALLOW, // If toot language is unknown, toot is always allowed + Some(empty) if empty == &String::new() => ALLOW, + Some(toot_language) => reject_and_maybe_log(toot_language), + } + } + + /// Returns `true` if this toot originated from a domain the User has blocked. + pub fn from_blocked_domain(&self, blocked_domains: &HashSet) -> bool { + let full_username = &self.account.acct; + + match full_username.split('@').nth(1) { + Some(originating_domain) => blocked_domains.contains(originating_domain), + None => false, // None means the user is on the local instance, which can't be blocked + } + } + /// Returns `true` if the Status is from an account that has blocked the current user. + pub fn from_blocking_user(&self, blocking_users: &HashSet) -> bool { + const ALLOW: bool = false; + const REJECT: bool = true; + let err = |_| log_fatal!("Could not process `account.id` in {:?}", &self); + + if blocking_users.contains(&self.account.id.parse().unwrap_or_else(err)) { + REJECT + } else { + ALLOW + } + } + + /// Returns `true` if the User's list of blocked and muted users includes a user + /// involved in this toot. + /// + /// A user is involved if they: + /// * Are mentioned in this toot + /// * Wrote this toot + /// * Wrote a toot that this toot is replying to (if any) + /// * Wrote the toot that this toot is boosting (if any) + pub fn involves_blocked_user(&self, blocked_users: &HashSet) -> bool { + const ALLOW: bool = false; + const REJECT: bool = true; + let err = |_| log_fatal!("Could not process an `id` field in {:?}", &self); + + // involved_users = mentioned_users + author + replied-to user + boosted user + let mut involved_users: HashSet = self + .mentions + .iter() + .map(|mention| mention.id.parse().unwrap_or_else(err)) + .collect(); + + involved_users.insert(self.account.id.parse::().unwrap_or_else(err)); + + if let Some(replied_to_account_id) = self.in_reply_to_account_id.clone() { + involved_users.insert(replied_to_account_id.parse().unwrap_or_else(err)); + } + + if let Some(boosted_status) = self.reblog.clone() { + involved_users.insert(boosted_status.account.id.parse().unwrap_or_else(err)); + } + + if involved_users.is_disjoint(blocked_users) { + ALLOW + } else { + REJECT + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{ + parse_client_request::subscription::{Content::*, Reach::*, Stream::*, Timeline}, + redis_to_client_stream::{ + receiver::{MessageQueues, MsgQueue}, + redis::{ + redis_msg::{ParseErr, RedisMsg}, + redis_stream, + }, + }, + }; + use lru::LruCache; + use std::collections::HashMap; + use uuid::Uuid; + type Err = ParseErr; + + /// Set up state shared between multiple tests of Redis parsing + pub fn shared_setup() -> (LruCache, MessageQueues, Uuid, Timeline) { + let cache: LruCache = LruCache::new(1000); + let mut queues_map = HashMap::new(); + let id = Uuid::default(); + + let timeline = Timeline::from_redis_raw_timeline("4", None); + queues_map.insert(id, MsgQueue::new(timeline)); + let queues = MessageQueues(queues_map); + (cache, queues, id, timeline) + } + + #[test] + fn accurately_parse_redis_output_into_event() -> Result<(), Err> { + let input ="*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:4\r\n$1386\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102866835379605039\",\"created_at\":\"2019-09-27T22:29:02.590Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"http://localhost:3000/users/admin/statuses/102866835379605039\",\"url\":\"http://localhost:3000/@admin/102866835379605039\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"

@susan hi

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"1\",\"username\":\"admin\",\"acct\":\"admin\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-07-04T00:21:05.890Z\",\"note\":\"

\",\"url\":\"http://localhost:3000/@admin\",\"avatar\":\"http://localhost:3000/avatars/original/missing.png\",\"avatar_static\":\"http://localhost:3000/avatars/original/missing.png\",\"header\":\"http://localhost:3000/headers/original/missing.png\",\"header_static\":\"http://localhost:3000/headers/original/missing.png\",\"followers_count\":3,\"following_count\":3,\"statuses_count\":192,\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"4\",\"username\":\"susan\",\"url\":\"http://localhost:3000/@susan\",\"acct\":\"susan\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1569623342825}\r\n"; + + let (mut cache, mut queues, id, timeline) = shared_setup(); + redis_stream::process_messages(input.to_string(), &mut None, &mut cache, &mut queues) + .map_err(|_| ParseErr::Unrecoverable)?; + let parsed_event = queues.oldest_msg_in_target_queue(id, timeline).unwrap(); + let test_event = Event::Update{ payload: Status { + id: "102866835379605039".to_string(), + created_at: "2019-09-27T22:29:02.590Z".to_string(), + in_reply_to_id: None, + in_reply_to_account_id: None, + sensitive: false, + spoiler_text: "".to_string(), + visibility: Visibility::Public, + language: Some("en".to_string()), + uri: "http://localhost:3000/users/admin/statuses/102866835379605039".to_string(), + url: Some("http://localhost:3000/@admin/102866835379605039".to_string()), + replies_count: 0, + reblogs_count: 0, + favourites_count: 0, + favourited: Some(false), + reblogged: Some(false), + muted: Some(false), + bookmarked: None, + pinned: None, + content: "

@susan hi

".to_string(), + reblog: None, + application: Some(Application { + name: "Web".to_string(), + website: None, + vapid_key: None, + client_id: None, + client_secret: None, + }), + account: Account { + id: "1".to_string(), + username: "admin".to_string(), + acct: "admin".to_string(), + display_name: "".to_string(), + locked:false, + bot:Some(false), + created_at: "2019-07-04T00:21:05.890Z".to_string(), + note:"

".to_string(), + url:"http://localhost:3000/@admin".to_string(), + avatar: "http://localhost:3000/avatars/original/missing.png".to_string(), + avatar_static:"http://localhost:3000/avatars/original/missing.png".to_string(), + header: "http://localhost:3000/headers/original/missing.png".to_string(), + header_static:"http://localhost:3000/headers/original/missing.png".to_string(), + followers_count:3, + following_count:3, + statuses_count:192, + emojis:vec![], + fields:Some(vec![]), + moved: None, + group: None, + last_status_at: None, + discoverable: None, + source: None, + }, + media_attachments:vec![], + mentions: vec![ Mention {id:"4".to_string(), + username:"susan".to_string(), + url:"http://localhost:3000/@susan".to_string(), + acct:"susan".to_string()}], + tags:vec![], + emojis:vec![], + card:None,poll:None, + text: None, + }, + queued_at: Some(1569623342825)}; + + assert_eq!(parsed_event, test_event); + Ok(()) + } + + #[test] + fn parse_redis_input_subscription_msgs_and_update() -> Result<(), Err> { + let input = "*3\r\n$9\r\nsubscribe\r\n$11\r\ntimeline:56\r\n:1\r\n*3\r\n$9\r\nsubscribe\r\n$12\r\ntimeline:308\r\n:2\r\n*3\r\n$9\r\nsubscribe\r\n$21\r\ntimeline:hashtag:test\r\n:3\r\n*3\r\n$9\r\nsubscribe\r\n$21\r\ntimeline:public:local\r\n:4\r\n*3\r\n$9\r\nsubscribe\r\n$11\r\ntimeline:55\r\n:5\r\n*3\r\n$7\r\nmessage\r\n$21\r\ntimeline:public:local\r\n$1249\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103881102123251272\",\"created_at\":\"2020-03-25T01:30:24.914Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/bob/statuses/103881102123251272\",\"url\":\"https://instance.codesections.com/@bob/103881102123251272\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"content\":\"

0111

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"55\",\"username\":\"bob\",\"acct\":\"bob\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T03:03:53.068Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@bob\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":1,\"statuses_count\":57,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null}}\r\n*3\r\n$7\r\nmessage\r\n$11\r\ntimeline:55\r\n$1360\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103881102123251272\",\"created_at\":\"2020-03-25T01:30:24.914Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/bob/statuses/103881102123251272\",\"url\":\"https://instance.codesections.com/@bob/103881102123251272\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":false,\"pinned\":false,\"content\":\"

0111

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"55\",\"username\":\"bob\",\"acct\":\"bob\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T03:03:53.068Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@bob\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":1,\"statuses_count\":57,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1585099825263}\r\n*3\r\n$7\r\nmessage\r\n$21\r\ntimeline:public:local\r\n$1249\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103881103451006570\",\"created_at\":\"2020-03-25T01:30:45.152Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/bob/statuses/103881103451006570\",\"url\":\"https://instance.codesections.com/@bob/103881103451006570\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"content\":\"

1000

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"55\",\"username\":\"bob\",\"acct\":\"bob\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T03:03:53.068Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@bob\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":1,\"statuses_count\":58,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null}}\r\n*3\r\n$7\r\nmessage\r\n$11\r\ntimeline:55\r\n$1360\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103881103451006570\",\"created_at\":\"2020-03-25T01:30:45.152Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/bob/statuses/103881103451006570\",\"url\":\"https://instance.codesections.com/@bob/103881103451006570\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":false,\"pinned\":false,\"content\":\"

1000

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"55\",\"username\":\"bob\",\"acct\":\"bob\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T03:03:53.068Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@bob\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":1,\"statuses_count\":58,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1585099845405}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (subscription_msg1, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + assert!(matches!(subscription_msg1, RedisMsg::SubscriptionMsg)); + + let (subscription_msg2, rest) = RedisMsg::from_raw(rest, &mut cache, "timeline:".len())?; + assert!(matches!(subscription_msg2, RedisMsg::SubscriptionMsg)); + + let (subscription_msg3, rest) = RedisMsg::from_raw(rest, &mut cache, "timeline:".len())?; + assert!(matches!(subscription_msg3, RedisMsg::SubscriptionMsg)); + + let (subscription_msg4, rest) = RedisMsg::from_raw(rest, &mut cache, "timeline:".len())?; + assert!(matches!(subscription_msg4, RedisMsg::SubscriptionMsg)); + + let (subscription_msg5, rest) = RedisMsg::from_raw(rest, &mut cache, "timeline:".len())?; + assert!(matches!(subscription_msg5, RedisMsg::SubscriptionMsg)); + + let (update_msg1, rest) = RedisMsg::from_raw(rest, &mut cache, "timeline:".len())?; + assert!(matches!( + update_msg1, + RedisMsg::EventMsg(_, Event::Update { .. }) + )); + + let (update_msg2, rest) = RedisMsg::from_raw(rest, &mut cache, "timeline:".len())?; + assert!(matches!( + update_msg2, + RedisMsg::EventMsg(_, Event::Update { .. }) + )); + + let (update_msg3, rest) = RedisMsg::from_raw(rest, &mut cache, "timeline:".len())?; + assert!(matches!( + update_msg3, + RedisMsg::EventMsg(_, Event::Update { .. }) + )); + + let (update_msg4, rest) = RedisMsg::from_raw(rest, &mut cache, "timeline:".len())?; + assert!(matches!( + update_msg4, + RedisMsg::EventMsg(_, Event::Update { .. }) + )); + + assert_eq!(rest, "".to_string()); + + Ok(()) + } + + #[test] + fn parse_redis_input_notification() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$11\r\ntimeline:55\r\n$2311\r\n{\"event\":\"notification\",\"payload\":{\"id\":\"147\",\"type\":\"mention\",\"created_at\":\"2020-03-25T14:25:09.295Z\",\"account\":{\"id\":\"308\",\"username\":\"ralph\",\"acct\":\"ralph\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T19:55:20.933Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@ralph\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":2,\"statuses_count\":100,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"status\":{\"id\":\"103884148503208016\",\"created_at\":\"2020-03-25T14:25:08.995Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/ralph/statuses/103884148503208016\",\"url\":\"https://instance.codesections.com/@ralph/103884148503208016\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":false,\"content\":\"

@bob notification test

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"308\",\"username\":\"ralph\",\"acct\":\"ralph\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T19:55:20.933Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@ralph\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":2,\"statuses_count\":100,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"55\",\"username\":\"bob\",\"url\":\"https://instance.codesections.com/@bob\",\"acct\":\"bob\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null}}}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (subscription_msg1, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + assert!(matches!( + subscription_msg1, + RedisMsg::EventMsg(Timeline(User(id), Federated, All), Event::Notification { .. }) if id == 55 + )); + + assert_eq!(rest, "".to_string()); + + Ok(()) + } + + #[test] + fn parse_redis_input_delete() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$49\r\n{\"event\":\"delete\",\"payload\":\"103864778284581232\"}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (subscription_msg1, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + assert!(matches!( + subscription_msg1, + RedisMsg::EventMsg( + Timeline(User(308), Federated, All), + Event::Delete { payload: DeletedId(id) } + ) if id == "103864778284581232".to_string() + )); + + assert_eq!(rest, "".to_string()); + + Ok(()) + } + + #[test] + fn parse_redis_input_filters_changed() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$11\r\ntimeline:56\r\n$27\r\n{\"event\":\"filters_changed\"}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (subscription_msg1, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + assert!(matches!( + subscription_msg1, + RedisMsg::EventMsg(Timeline(User(id), Federated, All), Event::FiltersChanged) if id == 56 + )); + + assert_eq!(rest, "".to_string()); + + Ok(()) + } + + #[test] + fn parse_redis_input_announcement() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$293\r\n{\"event\":\"announcement\",\"payload\":{\"id\":\"2\",\"content\":\"

Test announcement 0010

\",\"starts_at\":null,\"ends_at\":null,\"all_day\":false,\"published_at\":\"2020-03-25T14:57:57.550Z\",\"updated_at\":\"2020-03-25T14:57:57.566Z\",\"mentions\":[],\"tags\":[],\"emojis\":[],\"reactions\":[{\"name\":\"👍\",\"count\":2}]}}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + assert!(matches!( + msg, + RedisMsg::EventMsg( + Timeline(User(id), Federated, All), + Event::Announcement { .. }) if id == 308 + )); + + assert_eq!(rest, "".to_string()); + Ok(()) + } + + #[test] + fn parse_redis_input_announcement_reaction() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$91\r\n{\"event\":\"announcement.reaction\",\"payload\":{\"name\":\"👽\",\"count\":2,\"announcement_id\":\"8\"}}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + assert!(matches!( + msg, + RedisMsg::EventMsg( + Timeline(User(id), Federated, All), + Event::AnnouncementReaction{ .. } + ) if id == 308 + )); + + assert_eq!(rest, "".to_string()); + Ok(()) + } + + #[test] + fn parse_redis_input_announcement_delete() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$45\r\n{\"event\":\"announcement.delete\",\"payload\":\"5\"}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + assert!(matches!( + msg, + RedisMsg::EventMsg( + Timeline(User(id), Federated, All), + Event::AnnouncementDelete{ + payload: DeletedId(del_id), + + } + ) if id == 308 && del_id == "5".to_string() + )); + + assert_eq!(rest, "".to_string()); + Ok(()) + } + + #[test] + fn parse_redis_input_status_with_attachments() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$2049\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103884996729070829\",\"created_at\":\"2020-03-25T18:00:52.026Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/ralph/statuses/103884996729070829\",\"url\":\"https://instance.codesections.com/@ralph/103884996729070829\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":false,\"pinned\":false,\"content\":\"

Test with media attachment

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"308\",\"username\":\"ralph\",\"acct\":\"ralph\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T19:55:20.933Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@ralph\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":2,\"statuses_count\":103,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[{\"id\":\"3102\",\"type\":\"image\",\"url\":\"https://instance.codesections.com/system/media_attachments/files/000/003/102/original/1753cf5b8edd544a.jpg?1585159208\",\"preview_url\":\"https://instance.codesections.com/system/media_attachments/files/000/003/102/small/1753cf5b8edd544a.jpg?1585159208\",\"remote_url\":null,\"text_url\":\"https://instance.codesections.com/media/7XPfdkmAIHb3TQcLYII\",\"meta\":{\"original\":{\"width\":828,\"height\":340,\"size\":\"828x340\",\"aspect\":2.4352941176470586},\"small\":{\"width\":623,\"height\":256,\"size\":\"623x256\",\"aspect\":2.43359375},\"focus\":{\"x\":0.0,\"y\":0.0}},\"description\":\"Test image discription\",\"blurhash\":\"UBR{.4M{s;IU0JkBWBWB9bM{ofxu4^WAWBj[\"}],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1585159252656}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + dbg!(&msg); + assert!(matches!( + msg, + RedisMsg::EventMsg( + Timeline(User(308), Federated, All), + Event::Update{ payload: Status { media_attachments: attachments, .. }, .. } + ) if attachments.len() > 0 + )); + + assert_eq!(rest, "".to_string()); + Ok(()) + } + + #[test] + fn parse_redis_input_status_with_mentions() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$2094\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103885034181231245\",\"created_at\":\"2020-03-25T18:10:23.420Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/ralph/statuses/103885034181231245\",\"url\":\"https://instance.codesections.com/@ralph/103885034181231245\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":false,\"pinned\":false,\"content\":\"

@bob @susan @codesections

Test with mentions

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"308\",\"username\":\"ralph\",\"acct\":\"ralph\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T19:55:20.933Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@ralph\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":2,\"statuses_count\":104,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"55\",\"username\":\"bob\",\"url\":\"https://instance.codesections.com/@bob\",\"acct\":\"bob\"},{\"id\":\"56\",\"username\":\"susan\",\"url\":\"https://instance.codesections.com/@susan\",\"acct\":\"susan\"},{\"id\":\"9\",\"username\":\"codesections\",\"url\":\"https://instance.codesections.com/@codesections\",\"acct\":\"codesections\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1585159824540}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + dbg!(&msg); + assert!(matches!( + msg, + RedisMsg::EventMsg( + Timeline(User(308), Federated, All), + Event::Update{ payload: Status { mentions, .. }, .. } + ) if mentions.len() > 0 + )); + + assert_eq!(rest, "".to_string()); + Ok(()) + } + + #[test] + fn parse_redis_input_status_with_tags() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$1770\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103885047114641861\",\"created_at\":\"2020-03-25T18:13:40.741Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/ralph/statuses/103885047114641861\",\"url\":\"https://instance.codesections.com/@ralph/103885047114641861\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":false,\"pinned\":false,\"content\":\"

#test #hashtag

Test with tags

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"308\",\"username\":\"ralph\",\"acct\":\"ralph\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T19:55:20.933Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@ralph\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":2,\"statuses_count\":105,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[{\"name\":\"hashtag\",\"url\":\"https://instance.codesections.com/tags/hashtag\"},{\"name\":\"test\",\"url\":\"https://instance.codesections.com/tags/test\"}],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1585160021281}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + dbg!(&msg); + assert!(matches!( + msg, + RedisMsg::EventMsg( + Timeline(User(308), Federated, All), + Event::Update{ payload: Status { tags, .. }, .. } + ) if tags.len() > 0 + )); + + assert_eq!(rest, "".to_string()); + Ok(()) + } + + #[test] + fn parse_redis_input_status_with_emojis() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$1703\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103885068078872546\",\"created_at\":\"2020-03-25T18:19:00.620Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/ralph/statuses/103885068078872546\",\"url\":\"https://instance.codesections.com/@ralph/103885068078872546\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":false,\"pinned\":false,\"content\":\"

Test with custom emoji

:patcat:

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"308\",\"username\":\"ralph\",\"acct\":\"ralph\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T19:55:20.933Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@ralph\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":2,\"statuses_count\":106,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[],\"emojis\":[{\"shortcode\":\"patcat\",\"url\":\"https://instance.codesections.com/system/custom_emojis/images/000/001/071/original/d87fcdf79ed6fe20.png?1585160295\",\"static_url\":\"https://instance.codesections.com/system/custom_emojis/images/000/001/071/static/d87fcdf79ed6fe20.png?1585160295\",\"visible_in_picker\":true}],\"card\":null,\"poll\":null},\"queued_at\":1585160340991}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + dbg!(&msg); + assert!(matches!( + msg, + RedisMsg::EventMsg( + Timeline(User(308), Federated, All), + Event::Update{ payload: Status { emojis, .. }, .. } + ) if emojis.len() > 0 + )); + + assert_eq!(rest, "".to_string()); + Ok(()) + } + + #[test] + fn parse_redis_input_status_is_reply() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$1612\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103885083636011552\",\"created_at\":\"2020-03-25T18:22:57.963Z\",\"in_reply_to_id\":\"103881103451006570\",\"in_reply_to_account_id\":\"55\",\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/ralph/statuses/103885083636011552\",\"url\":\"https://instance.codesections.com/@ralph/103885083636011552\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":false,\"pinned\":false,\"content\":\"

@bob Test is reply

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"308\",\"username\":\"ralph\",\"acct\":\"ralph\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T19:55:20.933Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@ralph\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":2,\"statuses_count\":107,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"55\",\"username\":\"bob\",\"url\":\"https://instance.codesections.com/@bob\",\"acct\":\"bob\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1585160578486}\r\n*3\r\n$7\r\nmessage\r\n$11\r\ntimeline:55\r\n$2323\r\n{\"event\":\"notification\",\"payload\":{\"id\":\"156\",\"type\":\"mention\",\"created_at\":\"2020-03-25T18:22:58.293Z\",\"account\":{\"id\":\"308\",\"username\":\"ralph\",\"acct\":\"ralph\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T19:55:20.933Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@ralph\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":2,\"statuses_count\":107,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"status\":{\"id\":\"103885083636011552\",\"created_at\":\"2020-03-25T18:22:57.963Z\",\"in_reply_to_id\":\"103881103451006570\",\"in_reply_to_account_id\":\"55\",\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/ralph/statuses/103885083636011552\",\"url\":\"https://instance.codesections.com/@ralph/103885083636011552\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":false,\"content\":\"

@bob Test is reply

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"308\",\"username\":\"ralph\",\"acct\":\"ralph\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T19:55:20.933Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@ralph\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":2,\"statuses_count\":107,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"55\",\"username\":\"bob\",\"url\":\"https://instance.codesections.com/@bob\",\"acct\":\"bob\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null}}}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + dbg!(&msg); + assert!(matches!( + msg, + RedisMsg::EventMsg( + Timeline(User(308), Federated, All), + Event::Update { + payload: + Status { + in_reply_to_id: Some(_), + .. + }, + .. + }, + ) + )); + let (msg2, rest) = RedisMsg::from_raw(rest, &mut cache, "timeline:".len())?; + dbg!(&msg2); + assert!(matches!( + msg2, + RedisMsg::EventMsg(Timeline(User(55), Federated, All), Event::Notification { .. }) + )); + + assert_eq!(rest, "".to_string()); + Ok(()) + } + + #[test] + fn parse_redis_input_status_is_reblog() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$2778\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103885156768039822\",\"created_at\":\"2020-03-25T18:41:33.859Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":null,\"uri\":\"https://instance.codesections.com/users/ralph/statuses/103885156768039822/activity\",\"url\":\"https://instance.codesections.com/users/ralph/statuses/103885156768039822/activity\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":true,\"muted\":false,\"bookmarked\":false,\"content\":\"

RT @bob 0010

\",\"reblog\":{\"id\":\"103881061540314589\",\"created_at\":\"2020-03-25T01:20:05.648Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/bob/statuses/103881061540314589\",\"url\":\"https://instance.codesections.com/@bob/103881061540314589\",\"replies_count\":0,\"reblogs_count\":1,\"favourites_count\":0,\"favourited\":false,\"reblogged\":true,\"muted\":false,\"bookmarked\":false,\"content\":\"

0010

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"55\",\"username\":\"bob\",\"acct\":\"bob\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T03:03:53.068Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@bob\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":1,\"statuses_count\":58,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"application\":null,\"account\":{\"id\":\"308\",\"username\":\"ralph\",\"acct\":\"ralph\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T19:55:20.933Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@ralph\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":2,\"statuses_count\":110,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1585161694429}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + dbg!(&msg); + assert!(matches!( + msg, + RedisMsg::EventMsg( + Timeline(User(308), Federated, All), + Event::Update { + payload: + Status { + reblogged: Some(t), .. + }, + .. + }, + ) if t + )); + + assert_eq!(rest, "".to_string()); + Ok(()) + } + + #[test] + fn parse_redis_input_status_with_poll() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$1663\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103885228849512739\",\"created_at\":\"2020-03-25T18:59:53.788Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/ralph/statuses/103885228849512739\",\"url\":\"https://instance.codesections.com/@ralph/103885228849512739\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":false,\"pinned\":false,\"content\":\"

test poll:

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"308\",\"username\":\"ralph\",\"acct\":\"ralph\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T19:55:20.933Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@ralph\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":2,\"statuses_count\":112,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":{\"id\":\"46\",\"expires_at\":\"2020-03-26T18:59:53.747Z\",\"expired\":false,\"multiple\":false,\"votes_count\":0,\"voters_count\":0,\"voted\":true,\"own_votes\":[],\"options\":[{\"title\":\"1\",\"votes_count\":0},{\"title\":\"2\",\"votes_count\":0},{\"title\":\"3\",\"votes_count\":0},{\"title\":\"4\",\"votes_count\":0}],\"emojis\":[]}},\"queued_at\":1585162794362}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + dbg!(&msg); + assert!(matches!( + msg, + RedisMsg::EventMsg( + Timeline(User(308), Federated, All), + Event::Update { + payload: Status { poll: Some(_), .. }, + .. + }, + ) + )); + + assert_eq!(rest, "".to_string()); + Ok(()) + } + + #[test] + fn parse_redis_input_status_with_preview_card() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$11\r\ntimeline:55\r\n$2256\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103885300935387207\",\"created_at\":\"2020-03-25T19:18:13.753Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/ralph/statuses/103885300935387207\",\"url\":\"https://instance.codesections.com/@ralph/103885300935387207\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":false,\"content\":\"

Test with preview card:

https://www.codesections.com/blog/mastodon-elevator-pitch/

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"308\",\"username\":\"ralph\",\"acct\":\"ralph\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T19:55:20.933Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@ralph\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":2,\"following_count\":2,\"statuses_count\":120,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":{\"url\":\"https://www.codesections.com/blog/mastodon-elevator-pitch/\",\"title\":\" Mastodon Is Better than Twitter: Elevator Pitch | CodeSections\",\"description\":\"The personal website and blog of Daniel Long Sockwell, a lawyer-turned-programmer with an interest in web development, open source, and making things as simple as possible.\",\"type\":\"link\",\"author_name\":\"\",\"author_url\":\"\",\"provider_name\":\"\",\"provider_url\":\"\",\"html\":\"\",\"width\":400,\"height\":400,\"image\":\"https://instance.codesections.com/system/preview_cards/images/000/000/002/original/f6e89baa729668e7.png?1585163010\",\"embed_url\":\"\"},\"poll\":null},\"queued_at\":1585163894281}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + dbg!(&msg); + assert!(matches!( + msg, + RedisMsg::EventMsg( + Timeline(User(55), Federated, All), + Event::Update { + payload: Status { card: Some(_), .. }, + .. + }, + ) + )); + + assert_eq!(rest, "".to_string()); + Ok(()) + } + + #[test] + fn parse_redis_input_conversation() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$17\r\ntimeline:direct:9\r\n$2442\r\n{\"event\":\"conversation\",\"payload\":{\"id\":\"22\",\"unread\":false,\"accounts\":[{\"id\":\"55\",\"username\":\"bob\",\"acct\":\"bob\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"discoverable\":null,\"group\":false,\"created_at\":\"2020-03-11T03:03:53.068Z\",\"note\":\"

\",\"url\":\"https://instance.codesections.com/@bob\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":1,\"following_count\":1,\"statuses_count\":58,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]}],\"last_status\":{\"id\":\"103884351200485419\",\"created_at\":\"2020-03-25T15:16:41.915Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"direct\",\"language\":\"en\",\"uri\":\"https://instance.codesections.com/users/codesections/statuses/103884351200485419\",\"url\":\"https://instance.codesections.com/@codesections/103884351200485419\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":false,\"content\":\"

@bob Test Conversation

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"9\",\"username\":\"codesections\",\"acct\":\"codesections\",\"display_name\":\"TEST ACCOUT for codesections\",\"locked\":false,\"bot\":false,\"discoverable\":false,\"group\":false,\"created_at\":\"2020-03-11T01:17:13.412Z\",\"note\":\"

Used in the testing and development of flodgatt, the WIP streaming server for Mastodon

\",\"url\":\"https://instance.codesections.com/@codesections\",\"avatar\":\"https://instance.codesections.com/avatars/original/missing.png\",\"avatar_static\":\"https://instance.codesections.com/avatars/original/missing.png\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":79,\"following_count\":97,\"statuses_count\":7,\"last_status_at\":\"2020-03-25\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"55\",\"username\":\"bob\",\"url\":\"https://instance.codesections.com/@bob\",\"acct\":\"bob\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null}},\"queued_at\":1585149402344}\r\n"; + + let (mut cache, _, _, _) = shared_setup(); + + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + dbg!(&msg); + assert!(matches!( + msg, + RedisMsg::EventMsg( + Timeline(Direct(id), Federated, All), + Event::Conversation{ ..} + ) if id == 9 + )); + + assert_eq!(rest, "".to_string()); + Ok(()) + } + + #[test] + fn parse_redis_input_from_live_data_1() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$15\r\ntimeline:public\r\n$2799\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103880088450458596\",\"created_at\":\"2020-03-24T21:12:37.000Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"es\",\"uri\":\"https://mastodon.social/users/durru/statuses/103880088436492032\",\"url\":\"https://mastodon.social/@durru/103880088436492032\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"content\":\"

¡No puedes salir, loca!

\",\"reblog\":null,\"account\":{\"id\":\"2271\",\"username\":\"durru\",\"acct\":\"durru@mastodon.social\",\"display_name\":\"Cloaca Maxima\",\"locked\":false,\"bot\":false,\"discoverable\":true,\"group\":false,\"created_at\":\"2020-03-24T21:27:31.669Z\",\"note\":\"

Todo pasa, antes o después, por la Cloaca, diría Vitruvio.
También compongo palíndromos.

\",\"url\":\"https://mastodon.social/@durru\",\"avatar\":\"https://instance.codesections.com/system/accounts/avatars/000/002/271/original/d7675a6ff9d9baa7.jpeg?1585085250\",\"avatar_static\":\"https://instance.codesections.com/system/accounts/avatars/000/002/271/original/d7675a6ff9d9baa7.jpeg?1585085250\",\"header\":\"https://instance.codesections.com/system/accounts/headers/000/002/271/original/e3f0a1989b0d8efc.jpeg?1585085250\",\"header_static\":\"https://instance.codesections.com/system/accounts/headers/000/002/271/original/e3f0a1989b0d8efc.jpeg?1585085250\",\"followers_count\":222,\"following_count\":81,\"statuses_count\":5443,\"last_status_at\":\"2020-03-24\",\"emojis\":[],\"fields\":[{\"name\":\"Mis fotos\",\"value\":\"https://pixelfed.de/durru\",\"verified_at\":null},{\"name\":\"diaspora*\",\"value\":\"https://joindiaspora.com/people/75fec0e05114013484870242ac110007\",\"verified_at\":null}]},\"media_attachments\":[{\"id\":\"2864\",\"type\":\"image\",\"url\":\"https://instance.codesections.com/system/media_attachments/files/000/002/864/original/3988312d30936494.jpeg?1585085251\",\"preview_url\":\"https://instance.codesections.com/system/media_attachments/files/000/002/864/small/3988312d30936494.jpeg?1585085251\",\"remote_url\":\"https://files.mastodon.social/media_attachments/files/026/669/690/original/d8171331f956cf38.jpg\",\"text_url\":null,\"meta\":{\"original\":{\"width\":1001,\"height\":662,\"size\":\"1001x662\",\"aspect\":1.512084592145015},\"small\":{\"width\":491,\"height\":325,\"size\":\"491x325\",\"aspect\":1.5107692307692309}},\"description\":null,\"blurhash\":\"UdLqhI4n4TIUIAt7t7ay~qIojtRj?bM{M{of\"}],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null}}\r\n"; + let (mut cache, _, _, _) = shared_setup(); + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + assert!(matches!( + msg, + RedisMsg::EventMsg(Timeline(Public, Federated, All), Event::Update { .. }) + )); + assert_eq!(rest, String::new()); + Ok(()) + } + + #[test] + fn parse_redis_input_from_live_data_2() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$15\r\ntimeline:public\r\n$3888\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103880373579328660\",\"created_at\":\"2020-03-24T22:25:05.000Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://newsbots.eu/users/granma/statuses/103880373417385978\",\"url\":\"https://newsbots.eu/@granma/103880373417385978\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"content\":\"

A total of 11 measures have been established for the pre-epidemic stage of the battle against #Covid-19 in #Cuba
#CubaPorLaSalud
http://en.granma.cu/cuba/2020-03-23/public-health-measures-in-covid-19-pre-epidemic-stage 

\",\"reblog\":null,\"account\":{\"id\":\"717\",\"username\":\"granma\",\"acct\":\"granma@newsbots.eu\",\"display_name\":\"Granma (Unofficial)\",\"locked\":false,\"bot\":true,\"discoverable\":false,\"group\":false,\"created_at\":\"2020-03-13T11:08:08.420Z\",\"note\":\"

\",\"url\":\"https://newsbots.eu/@granma\",\"avatar\":\"https://instance.codesections.com/system/accounts/avatars/000/000/717/original/4a1f9ed090fc36e9.jpeg?1584097687\",\"avatar_static\":\"https://instance.codesections.com/system/accounts/avatars/000/000/717/original/4a1f9ed090fc36e9.jpeg?1584097687\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":57,\"following_count\":1,\"statuses_count\":742,\"last_status_at\":\"2020-03-24\",\"emojis\":[],\"fields\":[{\"name\":\"Source\",\"value\":\"https://twitter.com/Granma_English\",\"verified_at\":null},{\"name\":\"Operator\",\"value\":\"@felix\",\"verified_at\":null},{\"name\":\"Code\",\"value\":\"https://yerbamate.dev/nutomic/tootbot\",\"verified_at\":null}]},\"media_attachments\":[{\"id\":\"2881\",\"type\":\"image\",\"url\":\"https://instance.codesections.com/system/media_attachments/files/000/002/881/original/a1e97908e84efbcd.jpeg?1585088707\",\"preview_url\":\"https://instance.codesections.com/system/media_attachments/files/000/002/881/small/a1e97908e84efbcd.jpeg?1585088707\",\"remote_url\":\"https://newsbots.eu/system/media_attachments/files/000/176/298/original/f30a877d5035f4a6.jpeg\",\"text_url\":null,\"meta\":{\"original\":{\"width\":700,\"height\":795,\"size\":\"700x795\",\"aspect\":0.8805031446540881},\"small\":{\"width\":375,\"height\":426,\"size\":\"375x426\",\"aspect\":0.8802816901408451}},\"description\":null,\"blurhash\":\"UHCY?%sD%1t6}snOxuxu#7rrx]xu$*i_NFNF\"}],\"mentions\":[],\"tags\":[{\"name\":\"covid\",\"url\":\"https://instance.codesections.com/tags/covid\"},{\"name\":\"cuba\",\"url\":\"https://instance.codesections.com/tags/cuba\"},{\"name\":\"CubaPorLaSalud\",\"url\":\"https://instance.codesections.com/tags/CubaPorLaSalud\"}],\"emojis\":[],\"card\":null,\"poll\":null}}\r\n"; + let (mut cache, _, _, _) = shared_setup(); + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + assert!(matches!( + msg, + RedisMsg::EventMsg(Timeline(Public, Federated, All), Event::Update { .. }) + )); + assert_eq!(rest, String::new()); + Ok(()) + } + + #[test] + fn parse_redis_input_from_live_data_3() -> Result<(), Err> { + let input = "*3\r\n$7\r\nmessage\r\n$15\r\ntimeline:public\r\n$4803\r\n{\"event\":\"update\",\"payload\":{\"id\":\"103880453908763088\",\"created_at\":\"2020-03-24T22:45:33.000Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://mstdn.social/users/stux/statuses/103880453855603541\",\"url\":\"https://mstdn.social/@stux/103880453855603541\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"content\":\"

When they say lockdown. LOCKDOWN.

\",\"reblog\":null,\"account\":{\"id\":\"806\",\"username\":\"stux\",\"acct\":\"stux@mstdn.social\",\"display_name\":\"sтυx⚡\",\"locked\":false,\"bot\":false,\"discoverable\":true,\"group\":false,\"created_at\":\"2020-03-13T23:02:29.970Z\",\"note\":\"

Hi, Stux here! I am running the mstdn.social :mastodon: instance!

For questions and help or just for fun you can always send me a toot♥\u{fe0f}

Oh and no, I am not really a cat! Or am I?

\",\"url\":\"https://mstdn.social/@stux\",\"avatar\":\"https://instance.codesections.com/system/accounts/avatars/000/000/806/original/dae8d9d01d57d7f8.gif?1584140547\",\"avatar_static\":\"https://instance.codesections.com/system/accounts/avatars/000/000/806/static/dae8d9d01d57d7f8.png?1584140547\",\"header\":\"https://instance.codesections.com/system/accounts/headers/000/000/806/original/88c874d69f7d6989.gif?1584140548\",\"header_static\":\"https://instance.codesections.com/system/accounts/headers/000/000/806/static/88c874d69f7d6989.png?1584140548\",\"followers_count\":13954,\"following_count\":7600,\"statuses_count\":10207,\"last_status_at\":\"2020-03-24\",\"emojis\":[{\"shortcode\":\"mastodon\",\"url\":\"https://instance.codesections.com/system/custom_emojis/images/000/000/418/original/25ccc64333645735.png?1584140550\",\"static_url\":\"https://instance.codesections.com/system/custom_emojis/images/000/000/418/static/25ccc64333645735.png?1584140550\",\"visible_in_picker\":true},{\"shortcode\":\"patreon\",\"url\":\"https://instance.codesections.com/system/custom_emojis/images/000/000/419/original/3cc463d3dfc1e489.png?1584140550\",\"static_url\":\"https://instance.codesections.com/system/custom_emojis/images/000/000/419/static/3cc463d3dfc1e489.png?1584140550\",\"visible_in_picker\":true},{\"shortcode\":\"liberapay\",\"url\":\"https://instance.codesections.com/system/custom_emojis/images/000/000/420/original/893854353dfa9706.png?1584140551\",\"static_url\":\"https://instance.codesections.com/system/custom_emojis/images/000/000/420/static/893854353dfa9706.png?1584140551\",\"visible_in_picker\":true},{\"shortcode\":\"team_valor\",\"url\":\"https://instance.codesections.com/system/custom_emojis/images/000/000/958/original/96aae26b45292a12.png?1584910917\",\"static_url\":\"https://instance.codesections.com/system/custom_emojis/images/000/000/958/static/96aae26b45292a12.png?1584910917\",\"visible_in_picker\":true}],\"fields\":[{\"name\":\"Patreon :patreon:\",\"value\":\"https://www.patreon.com/mstdn\",\"verified_at\":null},{\"name\":\"LiberaPay :liberapay:\",\"value\":\"https://liberapay.com/mstdn\",\"verified_at\":null},{\"name\":\"Team :team_valor:\",\"value\":\"https://mstdn.social/team\",\"verified_at\":null},{\"name\":\"Support :mastodon:\",\"value\":\"https://mstdn.social/funding\",\"verified_at\":null}]},\"media_attachments\":[{\"id\":\"2886\",\"type\":\"video\",\"url\":\"https://instance.codesections.com/system/media_attachments/files/000/002/886/original/22b3f98a5e8f86d8.mp4?1585090023\",\"preview_url\":\"https://instance.codesections.com/system/media_attachments/files/000/002/886/small/22b3f98a5e8f86d8.png?1585090023\",\"remote_url\":\"https://cdn.mstdn.social/mstdn-social/media_attachments/files/003/338/384/original/c146f62ba86fe63e.mp4\",\"text_url\":null,\"meta\":{\"length\":\"0:00:27.03\",\"duration\":27.03,\"fps\":30,\"size\":\"272x480\",\"width\":272,\"height\":480,\"aspect\":0.5666666666666667,\"audio_encode\":\"aac (LC) (mp4a / 0x6134706D)\",\"audio_bitrate\":\"44100 Hz\",\"audio_channels\":\"stereo\",\"original\":{\"width\":272,\"height\":480,\"frame_rate\":\"30/1\",\"duration\":27.029,\"bitrate\":481885},\"small\":{\"width\":227,\"height\":400,\"size\":\"227x400\",\"aspect\":0.5675}},\"description\":null,\"blurhash\":\"UBF~N@OF-:xv4mM|s+ob9FE2t6tQ9Fs:t8oN\"}],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null}}\r\n"; + let (mut cache, _, _, _) = shared_setup(); + let (msg, rest) = RedisMsg::from_raw(input, &mut cache, "timeline:".len())?; + assert!(matches!( + msg, + RedisMsg::EventMsg(Timeline(Public, Federated, All), Event::Update { .. }) + )); + assert_eq!(rest, String::new()); + Ok(()) + } +} diff --git a/src/parse_client_request/subscription/mod.rs b/src/parse_client_request/subscription/mod.rs index 97661bb..3fdb060 100644 --- a/src/parse_client_request/subscription/mod.rs +++ b/src/parse_client_request/subscription/mod.rs @@ -57,7 +57,7 @@ impl Timeline { Self(Unset, Local, Notification) } - pub fn to_redis_str(&self, hashtag: Option<&String>) -> String { + pub fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> String { use {Content::*, Reach::*, Stream::*}; match self { Timeline(Public, Federated, All) => "timeline:public".into(), @@ -82,7 +82,7 @@ impl Timeline { } } } - pub fn from_redis_str(raw_timeline: &str, hashtag: Option) -> Self { + pub fn from_redis_raw_timeline(raw_timeline: &str, hashtag: Option) -> Self { use {Content::*, Reach::*, Stream::*}; match raw_timeline.split(':').collect::>()[..] { ["public"] => Timeline(Public, Federated, All), diff --git a/src/redis_to_client_stream/client_agent.rs b/src/redis_to_client_stream/client_agent.rs index 4ff069d..0162598 100644 --- a/src/redis_to_client_stream/client_agent.rs +++ b/src/redis_to_client_stream/client_agent.rs @@ -15,9 +15,10 @@ //! Because `StreamManagers` are lightweight data structures that do not directly //! communicate with Redis, it we create a new `ClientAgent` for //! each new client connection (each in its own thread).use super::{message::Message, receiver::Receiver} -use super::{message::Message, receiver::Receiver}; +use super::receiver::Receiver; use crate::{ config, + messages::Event, parse_client_request::subscription::{PgPool, Stream::Public, Subscription, Timeline}, }; use futures::{ @@ -64,16 +65,19 @@ impl ClientAgent { /// that out and avoiding duplicated connections. Thus, it is safe to /// use this method for each new client connection. pub fn init_for_user(&mut self, subscription: Subscription) { + use std::time::Instant; self.id = Uuid::new_v4(); self.subscription = subscription; + let start_time = Instant::now(); let mut receiver = self.receiver.lock().expect("No thread panic (stream.rs)"); receiver.manage_new_timeline(self.id, self.subscription.timeline); + log::info!("init_for_user had lock for: {:?}", start_time.elapsed()); } } /// The stream that the `ClientAgent` manages. `Poll` is the only method implemented. impl futures::stream::Stream for ClientAgent { - type Item = Message; + type Item = Event; type Error = Error; /// Checks for any new messages that should be sent to the client. @@ -85,7 +89,6 @@ impl futures::stream::Stream for ClientAgent { /// replies with `Ok(NotReady)`. The `ClientAgent` bubles up any /// errors from the underlying data structures. fn poll(&mut self) -> Poll, Self::Error> { - let start_time = std::time::Instant::now(); let result = { let mut receiver = self .receiver @@ -94,32 +97,34 @@ impl futures::stream::Stream for ClientAgent { receiver.configure_for_polling(self.id, self.subscription.timeline); receiver.poll() }; - if start_time.elapsed().as_millis() > 1 { - log::warn!("Polling the Receiver took: {:?}", start_time.elapsed()); - log::info!("Longer polling yielded: {:#?}", &result); - }; let allowed_langs = &self.subscription.allowed_langs; let blocked_users = &self.subscription.blocks.blocked_users; let blocking_users = &self.subscription.blocks.blocking_users; let blocked_domains = &self.subscription.blocks.blocked_domains; let (send, block) = (|msg| Ok(Ready(Some(msg))), Ok(NotReady)); - use Message::*; + use Event::*; match result { - Ok(Async::Ready(Some(json))) => match Message::from_json(json) { - Update(status) => match self.subscription.timeline { + Ok(Async::Ready(Some(event))) => match event { + Update { + payload: status, .. + } => match self.subscription.timeline { _ if status.involves_blocked_user(blocked_users) => block, _ if status.from_blocked_domain(blocked_domains) => block, _ if status.from_blocking_user(blocking_users) => block, Timeline(Public, _, _) if status.language_not_allowed(allowed_langs) => block, - _ => send(Update(status)), + _ => send(Update { + payload: status, + queued_at: None, + }), }, - Notification(payload) => send(Notification(payload)), - Conversation(payload) => send(Conversation(payload)), - Delete(status_id) => send(Delete(status_id)), - FiltersChanged => send(FiltersChanged), - Announcement(content) => send(Announcement(content)), - UnknownEvent(event, payload) => send(UnknownEvent(event, payload)), + Notification { .. } + | Conversation { .. } + | Delete { .. } + | FiltersChanged + | Announcement { .. } + | AnnouncementReaction { .. } + | AnnouncementDelete { .. } => send(event), }, Ok(Ready(None)) => Ok(Ready(None)), Ok(NotReady) => Ok(NotReady), diff --git a/src/redis_to_client_stream/message.rs b/src/redis_to_client_stream/message.rs index 819af87..10cd1d6 100644 --- a/src/redis_to_client_stream/message.rs +++ b/src/redis_to_client_stream/message.rs @@ -1,4 +1,5 @@ use crate::log_fatal; +use crate::messages::Event; use serde_json::Value; use std::{collections::HashSet, string::String}; use strum_macros::Display; @@ -25,36 +26,36 @@ pub enum AnnouncementType { } impl Message { - pub fn from_json(json: Value) -> Self { - use AnnouncementType::*; - let event = json["event"] - .as_str() - .unwrap_or_else(|| log_fatal!("Could not process `event` in {:?}", json)); - match event { - "update" => Self::Update(Status(json["payload"].clone())), - "conversation" => Self::Conversation(json["payload"].clone()), - "notification" => Self::Notification(json["payload"].clone()), - "delete" => Self::Delete( - json["payload"] - .as_str() - .unwrap_or_else(|| log_fatal!("Could not process `payload` in {:?}", json)) - .to_string(), - ), - "filters_changed" => Self::FiltersChanged, - "announcement" => Self::Announcement(New(json["payload"].clone())), - "announcement.reaction" => Self::Announcement(Reaction(json["payload"].clone())), - "announcement.delete" => Self::Announcement(Delete( - json["payload"] - .as_str() - .unwrap_or_else(|| log_fatal!("Could not process `payload` in {:?}", json)) - .to_string(), - )), - other => { - log::warn!("Received unexpected `event` from Redis: {}", other); - Self::UnknownEvent(event.to_string(), json["payload"].clone()) - } - } - } + // pub fn from_json(event: Event) -> Self { + // use AnnouncementType::*; + + // match event.event.as_ref() { + // "update" => Self::Update(Status(event.payload)), + // "conversation" => Self::Conversation(event.payload), + // "notification" => Self::Notification(event.payload), + // "delete" => Self::Delete( + // event + // .payload + // .as_str() + // .unwrap_or_else(|| log_fatal!("Could not process `payload` in {:?}", event)) + // .to_string(), + // ), + // "filters_changed" => Self::FiltersChanged, + // "announcement" => Self::Announcement(New(event.payload)), + // "announcement.reaction" => Self::Announcement(Reaction(event.payload)), + // "announcement.delete" => Self::Announcement(Delete( + // event + // .payload + // .as_str() + // .unwrap_or_else(|| log_fatal!("Could not process `payload` in {:?}", event)) + // .to_string(), + // )), + // other => { + // log::warn!("Received unexpected `event` from Redis: {}", other); + // Self::UnknownEvent(event.event.to_string(), event.payload) + // } + // } + // } pub fn event(&self) -> String { use AnnouncementType::*; match self { @@ -84,118 +85,3 @@ impl Message { } } } - -impl Status { - /// Returns `true` if the status is filtered out based on its language - pub fn language_not_allowed(&self, allowed_langs: &HashSet) -> bool { - const ALLOW: bool = false; - const REJECT: bool = true; - - let reject_and_maybe_log = |toot_language| { - log::info!("Filtering out toot from `{}`", &self.0["account"]["acct"]); - log::info!("Toot language: `{}`", toot_language); - log::info!("Recipient's allowed languages: `{:?}`", allowed_langs); - REJECT - }; - if allowed_langs.is_empty() { - return ALLOW; // listing no allowed_langs results in allowing all languages - } - match self.0["language"].as_str() { - Some(toot_language) if allowed_langs.contains(toot_language) => ALLOW, - None | Some("") => ALLOW, // If toot language is unknown, toot is always allowed - Some(toot_language) => reject_and_maybe_log(toot_language), - } - } - - /// Returns `true` if this toot originated from a domain the User has blocked. - pub fn from_blocked_domain(&self, blocked_domains: &HashSet) -> bool { - let full_username = self.0["account"]["acct"] - .as_str() - .unwrap_or_else(|| log_fatal!("Could not process `account.acct` in {:?}", self.0)); - - match full_username.split('@').nth(1) { - Some(originating_domain) => blocked_domains.contains(originating_domain), - None => false, // None means the user is on the local instance, which can't be blocked - } - } - /// Returns `true` if the Status is from an account that has blocked the current user. - pub fn from_blocking_user(&self, blocking_users: &HashSet) -> bool { - let toot = self.0.clone(); - const ALLOW: bool = false; - const REJECT: bool = true; - - let author = toot["account"]["id"] - .str_to_i64() - .unwrap_or_else(|_| log_fatal!("Could not process `account.id` in {:?}", toot)); - - if blocking_users.contains(&author) { - REJECT - } else { - ALLOW - } - } - - /// Returns `true` if the User's list of blocked and muted users includes a user - /// involved in this toot. - /// - /// A user is involved if they: - /// * Wrote this toot - /// * Are mentioned in this toot - /// * Wrote a toot that this toot is replying to (if any) - /// * Wrote the toot that this toot is boosting (if any) - pub fn involves_blocked_user(&self, blocked_users: &HashSet) -> bool { - let toot = self.0.clone(); - const ALLOW: bool = false; - const REJECT: bool = true; - - let author_user = match toot["account"]["id"].str_to_i64() { - Ok(user_id) => vec![user_id].into_iter(), - Err(_) => log_fatal!("Could not process `account.id` in {:?}", toot), - }; - - let mentioned_users = (match &toot["mentions"] { - Value::Array(inner) => inner, - _ => log_fatal!("Could not process `mentions` in {:?}", toot), - }) - .into_iter() - .map(|mention| match mention["id"].str_to_i64() { - Ok(user_id) => user_id, - Err(_) => log_fatal!("Could not process `id` field of mention in {:?}", toot), - }); - - let replied_to_user = match toot["in_reply_to_account_id"].str_to_i64() { - Ok(user_id) => vec![user_id].into_iter(), - Err(_) => vec![].into_iter(), // no error; just no replied_to_user - }; - - let boosted_user = match toot["reblog"].as_object() { - Some(boosted_user) => match boosted_user["account"]["id"].str_to_i64() { - Ok(user_id) => vec![user_id].into_iter(), - Err(_) => log_fatal!("Could not process `reblog.account.id` in {:?}", toot), - }, - None => vec![].into_iter(), // no error; just no boosted_user - }; - - let involved_users = author_user - .chain(mentioned_users) - .chain(replied_to_user) - .chain(boosted_user) - .collect::>(); - - if involved_users.is_disjoint(blocked_users) { - ALLOW - } else { - REJECT - } - } -} - -trait ConvertValue { - fn str_to_i64(&self) -> Result>; -} - -impl ConvertValue for Value { - fn str_to_i64(&self) -> Result> { - Ok(self.as_str().ok_or("none_err")?.parse()?) - } -} diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index 1f9d20f..5295073 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -1,43 +1,45 @@ //! Stream the updates appropriate for a given `User`/`timeline` pair from Redis. pub mod client_agent; -pub mod message; pub mod receiver; pub mod redis; pub use client_agent::ClientAgent; use futures::{future::Future, stream::Stream, Async}; use log; -use serde_json::json; -use std::time; +use std::time::{Duration, Instant}; /// Send a stream of replies to a Server Sent Events client. pub fn send_updates_to_sse( mut client_agent: ClientAgent, connection: warp::sse::Sse, - update_interval: time::Duration, + update_interval: Duration, ) -> impl warp::reply::Reply { - let event_stream = tokio::timer::Interval::new(time::Instant::now(), update_interval) - .filter_map(move |_| match client_agent.poll() { - Ok(Async::Ready(Some(msg))) => Some(( - warp::sse::event(msg.event()), - warp::sse::data(msg.payload()), - )), - _ => None, + let event_stream = + tokio::timer::Interval::new(Instant::now(), update_interval).filter_map(move |_| { + match client_agent.poll() { + Ok(Async::Ready(Some(event))) => Some(( + warp::sse::event(event.event_name()), + warp::sse::data(event.payload().unwrap_or_else(String::new)), + )), + _ => None, + } }); connection.reply( warp::sse::keep_alive() - .interval(time::Duration::from_secs(30)) + .interval(Duration::from_secs(30)) .text("thump".to_string()) .stream(event_stream), ) } +use warp::ws::WebSocket; + /// Send a stream of replies to a WebSocket client. pub fn send_updates_to_ws( - socket: warp::ws::WebSocket, + socket: WebSocket, mut client_agent: ClientAgent, - update_interval: time::Duration, -) -> impl futures::future::Future { + update_interval: Duration, +) -> impl Future { let (ws_tx, mut ws_rx) = socket.split(); let timeline = client_agent.subscription.timeline; @@ -57,8 +59,8 @@ pub fn send_updates_to_ws( ); // Yield new events for as long as the client is still connected - let event_stream = tokio::timer::Interval::new(time::Instant::now(), update_interval) - .take_while(move |_| match ws_rx.poll() { + let event_stream = tokio::timer::Interval::new(Instant::now(), update_interval).take_while( + move |_| match ws_rx.poll() { Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true), Ok(Async::Ready(None)) => { // TODO: consider whether we should manually drop closed connections here @@ -74,25 +76,21 @@ pub fn send_updates_to_ws( log::warn!("Error in {:?}: {}", timeline, e); futures::future::ok(false) } - }); - - let mut time = time::Instant::now(); + }, + ); + let mut time = Instant::now(); // Every time you get an event from that stream, send it through the pipe event_stream .for_each(move |_instant| { if let Ok(Async::Ready(Some(msg))) = client_agent.poll() { - tx.unbounded_send(warp::ws::Message::text( - json!({ "event": msg.event(), - "payload": msg.payload() }) - .to_string(), - )) - .expect("No send error"); + tx.unbounded_send(warp::ws::Message::text(msg.to_json_string())) + .expect("No send error"); }; - if time.elapsed() > time::Duration::from_secs(30) { + if time.elapsed() > Duration::from_secs(30) { tx.unbounded_send(warp::ws::Message::text("{}")) .expect("Can ping"); - time = time::Instant::now(); + time = Instant::now(); } Ok(()) }) diff --git a/src/redis_to_client_stream/receiver/message_queues.rs b/src/redis_to_client_stream/receiver/message_queues.rs index 670bc45..ffcd31e 100644 --- a/src/redis_to_client_stream/receiver/message_queues.rs +++ b/src/redis_to_client_stream/receiver/message_queues.rs @@ -1,13 +1,17 @@ +use crate::messages::Event; use crate::parse_client_request::subscription::Timeline; -use serde_json::Value; -use std::{collections, fmt, time}; +use std::{ + collections::{HashMap, VecDeque}, + fmt, + time::{Duration, Instant}, +}; use uuid::Uuid; #[derive(Clone)] pub struct MsgQueue { pub timeline: Timeline, - pub messages: collections::VecDeque, - last_polled_at: time::Instant, + pub messages: VecDeque, + last_polled_at: Instant, } impl fmt::Debug for MsgQueue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -29,27 +33,30 @@ MsgQueue {{ impl MsgQueue { pub fn new(timeline: Timeline) -> Self { MsgQueue { - messages: collections::VecDeque::new(), - last_polled_at: time::Instant::now(), + messages: VecDeque::new(), + last_polled_at: Instant::now(), timeline, } } } #[derive(Debug)] -pub struct MessageQueues(pub collections::HashMap); +pub struct MessageQueues(pub HashMap); impl MessageQueues { pub fn update_time_for_target_queue(&mut self, id: Uuid) { self.entry(id) - .and_modify(|queue| queue.last_polled_at = time::Instant::now()); + .and_modify(|queue| queue.last_polled_at = Instant::now()); } - pub fn oldest_msg_in_target_queue(&mut self, id: Uuid, timeline: Timeline) -> Option { - self.entry(id) - .or_insert_with(|| MsgQueue::new(timeline)) - .messages - .pop_front() + pub fn oldest_msg_in_target_queue(&mut self, id: Uuid, timeline: Timeline) -> Option { + let msg_qs_entry = self.entry(id); + let mut inserted_tl = false; + let msg_q = msg_qs_entry.or_insert_with(|| { + inserted_tl = true; + MsgQueue::new(timeline) + }); + msg_q.messages.pop_front() } pub fn calculate_timelines_to_add_or_drop(&mut self, timeline: Timeline) -> Vec { let mut timelines_to_modify = Vec::new(); @@ -59,7 +66,7 @@ impl MessageQueues { in_subscriber_number: 1, }); self.retain(|_id, msg_queue| { - if msg_queue.last_polled_at.elapsed() < time::Duration::from_secs(30) { + if msg_queue.last_polled_at.elapsed() < Duration::from_secs(30) { true } else { let timeline = &msg_queue.timeline; @@ -79,7 +86,7 @@ pub struct Change { } impl std::ops::Deref for MessageQueues { - type Target = collections::HashMap; + type Target = HashMap; fn deref(&self) -> &Self::Target { &self.0 } diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index 5b142d3..364d1fd 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -5,6 +5,7 @@ mod message_queues; use crate::{ config::{self, RedisInterval}, log_fatal, + messages::Event, parse_client_request::subscription::{self, postgres, PgPool, Timeline}, pubsub_cmd, redis_to_client_stream::redis::{redis_cmd, RedisConn, RedisStream}, @@ -12,8 +13,7 @@ use crate::{ use futures::{Async, Poll}; use lru::LruCache; pub use message_queues::{MessageQueues, MsgQueue}; -use serde_json::Value; -use std::{collections::HashMap, net, time}; +use std::{collections::HashMap, net, time::Instant}; use tokio::io::Error; use uuid::Uuid; @@ -23,7 +23,7 @@ pub struct Receiver { pub pubsub_connection: RedisStream, secondary_redis_connection: net::TcpStream, redis_poll_interval: RedisInterval, - redis_polled_at: time::Instant, + redis_polled_at: Instant, timeline: Timeline, manager_id: Uuid, pub msg_queues: MessageQueues, @@ -32,9 +32,9 @@ pub struct Receiver { pool: PgPool, } #[derive(Debug)] -struct Cache { +pub struct Cache { id_to_hashtag: LruCache, - hashtag_to_id: LruCache, + pub hashtag_to_id: LruCache, } impl Cache { fn new(size: usize) -> Self { @@ -60,7 +60,7 @@ impl Receiver { .with_namespace(redis_namespace), secondary_redis_connection, redis_poll_interval, - redis_polled_at: time::Instant::now(), + redis_polled_at: Instant::now(), timeline: Timeline::empty(), manager_id: Uuid::default(), msg_queues: MessageQueues(HashMap::new()), @@ -115,14 +115,14 @@ impl Receiver { /// that there's a subscription to the current one. If there isn't, then /// subscribe to it. fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: Timeline) { - let start_time = std::time::Instant::now(); + let start_time = Instant::now(); let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(timeline); // Record the lower number of clients subscribed to that channel for change in timelines_to_modify { let timeline = change.timeline; - let opt_hashtag = self.if_hashtag_timeline_get_hashtag_name(timeline); - let opt_hashtag = opt_hashtag.as_ref(); + let hashtag = self.if_hashtag_timeline_get_hashtag_name(timeline); + let hashtag = hashtag.as_ref(); let count_of_subscribed_clients = self .clients_per_timeline @@ -132,9 +132,9 @@ impl Receiver { // If no clients, unsubscribe from the channel if *count_of_subscribed_clients <= 0 { - pubsub_cmd!("unsubscribe", self, timeline.to_redis_str(opt_hashtag)); + pubsub_cmd!("unsubscribe", self, timeline.to_redis_raw_timeline(hashtag)); } else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 { - pubsub_cmd!("subscribe", self, timeline.to_redis_str(opt_hashtag)); + pubsub_cmd!("subscribe", self, timeline.to_redis_raw_timeline(hashtag)); } } if start_time.elapsed().as_millis() > 1 { @@ -145,42 +145,22 @@ impl Receiver { /// The stream that the ClientAgent polls to learn about new messages. impl futures::stream::Stream for Receiver { - type Item = Value; + type Item = Event; type Error = Error; /// Returns the oldest message in the `ClientAgent`'s queue (if any). /// /// Note: This method does **not** poll Redis every time, because polling - /// Redis is signifiantly more time consuming that simply returning the + /// Redis is significantly more time consuming that simply returning the /// message already in a queue. Thus, we only poll Redis if it has not /// been polled lately. - fn poll(&mut self) -> Poll, Self::Error> { + fn poll(&mut self) -> Poll, Self::Error> { let (timeline, id) = (self.timeline.clone(), self.manager_id); + if self.redis_polled_at.elapsed() > *self.redis_poll_interval { - for (raw_timeline, msg_value) in self.pubsub_connection.poll_redis() { - let hashtag = if raw_timeline.starts_with("hashtag") { - let tag_name = raw_timeline - .split(':') - .nth(1) - .unwrap_or_else(|| log_fatal!("No hashtag found in `{}`", raw_timeline)) - .to_string(); - let tag_id = *self - .cache - .hashtag_to_id - .get(&tag_name) - .unwrap_or_else(|| log_fatal!("No cached id for `{}`", tag_name)); - Some(tag_id) - } else { - None - }; - let timeline = Timeline::from_redis_str(&raw_timeline, hashtag); - for msg_queue in self.msg_queues.values_mut() { - if msg_queue.timeline == timeline { - msg_queue.messages.push_back(msg_value.clone()); - } - } - } - self.redis_polled_at = time::Instant::now(); + self.pubsub_connection + .poll_redis(&mut self.cache.hashtag_to_id, &mut self.msg_queues); + self.redis_polled_at = Instant::now(); } // Record current time as last polled time diff --git a/src/redis_to_client_stream/redis/mod.rs b/src/redis_to_client_stream/redis/mod.rs index 2ce67a6..c63e243 100644 --- a/src/redis_to_client_stream/redis/mod.rs +++ b/src/redis_to_client_stream/redis/mod.rs @@ -6,32 +6,4 @@ pub mod redis_stream; pub use redis_connection::RedisConn; pub use redis_stream::RedisStream; -#[cfg(test)] -mod test { - use super::*; - #[test] - fn simple_redis_parse() { - let input = "*3\r\n$9\r\nSUBSCRIBE\r\n$10\r\ntimeline:1\r\n:1\r\n"; - let mut msg = redis_msg::RedisMsg::from_raw(input, "timeline".len()); - let cmd = msg.next_field(); - assert_eq!(&cmd, "SUBSCRIBE"); - let timeline = msg.next_field(); - assert_eq!(&timeline, "timeline:1"); - msg.cursor += ":1\r\n".len(); - assert_eq!(msg.cursor, input.len()); - } - - #[test] - fn realistic_redis_parse() { - let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:4\r\n$1386\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102866835379605039\",\"created_at\":\"2019-09-27T22:29:02.590Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"http://localhost:3000/users/admin/statuses/102866835379605039\",\"url\":\"http://localhost:3000/@admin/102866835379605039\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"

@susan hi

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"1\",\"username\":\"admin\",\"acct\":\"admin\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-07-04T00:21:05.890Z\",\"note\":\"

\",\"url\":\"http://localhost:3000/@admin\",\"avatar\":\"http://localhost:3000/avatars/original/missing.png\",\"avatar_static\":\"http://localhost:3000/avatars/original/missing.png\",\"header\":\"http://localhost:3000/headers/original/missing.png\",\"header_static\":\"http://localhost:3000/headers/original/missing.png\",\"followers_count\":3,\"following_count\":3,\"statuses_count\":192,\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"4\",\"username\":\"susan\",\"url\":\"http://localhost:3000/@susan\",\"acct\":\"susan\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1569623342825}\r\n"; - let mut msg = redis_msg::RedisMsg::from_raw(input, "timeline".len()); - let cmd = msg.next_field(); - assert_eq!(&cmd, "message"); - let timeline = msg.next_field(); - assert_eq!(&timeline, "timeline:4"); - let message_str = msg.next_field(); - assert_eq!(message_str, input[41..input.len() - 2]); - assert_eq!(msg.cursor, input.len()); - } -} diff --git a/src/redis_to_client_stream/redis/redis_msg.rs b/src/redis_to_client_stream/redis/redis_msg.rs index 0520d4f..f03cd09 100644 --- a/src/redis_to_client_stream/redis/redis_msg.rs +++ b/src/redis_to_client_stream/redis/redis_msg.rs @@ -1,49 +1,118 @@ -use serde_json::Value; +//! Methods for parsing input in the small subset of the Redis Serialization Protocol we +//! support. +//! +//! Every message Flodgatt receives from Redis is a Redis Array; the elements in the array +//! will be either Bulk Strings or Integers (as Redis defines those terms). See the +//! [Redis protocol documentation](https://redis.io/topics/protocol) for details. A raw +//! message might look slightly like this (simplified, with line brakes added between +//! fields): +//! +//! ```text +//! *3\r\n +//! $7\r\nmessage\r\n +//! $10\r\ntimeline:4\r\n +//! $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n +//! ``` +//! +//! Read that as: an array with three elements: the first element is a bulk string with +//! three characters, the second is a bulk string with ten characters, and the third is a +//! bulk string with 1,386 characters. +use crate::{log_fatal, messages::Event, parse_client_request::subscription::Timeline}; +use lru::LruCache; +type Parser<'a, Item> = Result<(Item, &'a str), ParseErr>; #[derive(Debug)] -pub struct RedisMsg<'a> { - pub raw: &'a str, - pub cursor: usize, - pub prefix_len: usize, +pub enum ParseErr { + Incomplete, + Unrecoverable, +} +use ParseErr::*; + +/// A message that has been parsed from an incoming raw message from Redis. +#[derive(Debug, Clone)] +pub enum RedisMsg { + EventMsg(Timeline, Event), + SubscriptionMsg, } -impl<'a> RedisMsg<'a> { - pub fn from_raw(raw: &'a str, prefix_len: usize) -> Self { - Self { - raw, - cursor: "*3\r\n".len(), //length of intro header - prefix_len, - } - } - /// Move the cursor from the beginning of a number through its end and return the number - pub fn process_number(&mut self) -> usize { - let (mut selected_number, selection_start) = (0, self.cursor); - while let Ok(number) = self.raw[selection_start..=self.cursor].parse::() { - self.cursor += 1; - selected_number = number; - } - selected_number - } - /// In a pubsub reply from Redis, an item can be either the name of the subscribed channel - /// or the msg payload. Either way, it follows the same format: - /// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n` - pub fn next_field(&mut self) -> String { - self.cursor += "$".len(); +type Hashtags = LruCache; +impl RedisMsg { + pub fn from_raw<'a>(input: &'a str, cache: &mut Hashtags, prefix: usize) -> Parser<'a, Self> { + // No need to parse the Redis Array header, just skip it + let input = input.get("*3\r\n".len()..).ok_or(Incomplete)?; + let (command, rest) = parse_redis_bulk_string(&input)?; + match command { + "message" => { + // Messages look like; + // $10\r\ntimeline:4\r\n + // $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n + let (raw_timeline, rest) = parse_redis_bulk_string(&rest)?; + let (msg_txt, rest) = parse_redis_bulk_string(&rest)?; - let item_len = self.process_number(); - self.cursor += "\r\n".len(); - let item_start_position = self.cursor; - self.cursor += item_len; - let item = self.raw[item_start_position..self.cursor].to_string(); - self.cursor += "\r\n".len(); - item - } - - pub fn extract_raw_timeline_and_message(&mut self) -> (String, Value) { - let timeline = &self.next_field()[self.prefix_len..]; - let msg_txt = self.next_field(); - let msg_value: Value = - serde_json::from_str(&msg_txt).expect("Invariant violation: Invalid JSON from Redis"); - (timeline.to_string(), msg_value) + let raw_timeline = &raw_timeline.get(prefix..).ok_or(Unrecoverable)?; + let event: Event = serde_json::from_str(&msg_txt).unwrap(); + let hashtag = hashtag_from_timeline(&raw_timeline, cache); + let timeline = Timeline::from_redis_raw_timeline(&raw_timeline, hashtag); + Ok((Self::EventMsg(timeline, event), rest)) + } + "subscribe" | "unsubscribe" => { + // subscription statuses look like: + // $14\r\ntimeline:local\r\n + // :47\r\n + let (_raw_timeline, rest) = parse_redis_bulk_string(&rest)?; + let (_number_of_subscriptions, rest) = parse_redis_int(&rest)?; + Ok((Self::SubscriptionMsg, &rest)) + } + _cmd => Err(Incomplete)?, + } + } +} + +/// Parse a Redis bulk string and return the content of that string and the unparsed remainder. +/// +/// All bulk strings have the format `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n` +fn parse_redis_bulk_string(input: &str) -> Parser<&str> { + let input = &input.get("$".len()..).ok_or(Incomplete)?; + let (field_len, rest) = parse_redis_length(input)?; + let field_content = rest.get(..field_len).ok_or(Incomplete)?; + Ok((field_content, &rest[field_len + "\r\n".len()..])) +} + +fn parse_redis_int(input: &str) -> Parser { + let input = &input.get(":".len()..).ok_or(Incomplete)?; + let (number, rest_with_newline) = parse_number_at(input)?; + let rest = &rest_with_newline.get("\r\n".len()..).ok_or(Incomplete)?; + Ok((number, rest)) +} + +/// Return the value of a Redis length (for an array or bulk string) and the unparsed remainder +fn parse_redis_length(input: &str) -> Parser { + let (number, rest_with_newline) = parse_number_at(input)?; + let rest = &rest_with_newline.get("\r\n".len()..).ok_or(Incomplete)?; + Ok((number, rest)) +} + +fn parse_number_at(input: &str) -> Parser { + let number_len = input + .chars() + .position(|c| !c.is_numeric()) + .ok_or(Unrecoverable)?; + let number = input[..number_len].parse().map_err(|_| Unrecoverable)?; + let rest = &input.get(number_len..).ok_or(Incomplete)?; + Ok((number, rest)) +} +fn hashtag_from_timeline(raw_timeline: &str, hashtag_id_cache: &mut Hashtags) -> Option { + if raw_timeline.starts_with("hashtag") { + let tag_name = raw_timeline + .split(':') + .nth(1) + .unwrap_or_else(|| log_fatal!("No hashtag found in `{}`", raw_timeline)) + .to_string(); + let tag_id = *hashtag_id_cache + .get(&tag_name) + .unwrap_or_else(|| log_fatal!("No cached id for `{}`", tag_name)); + Some(tag_id) + } else { + None } } diff --git a/src/redis_to_client_stream/redis/redis_stream.rs b/src/redis_to_client_stream/redis/redis_stream.rs index 9eeff44..b50ca0c 100644 --- a/src/redis_to_client_stream/redis/redis_stream.rs +++ b/src/redis_to_client_stream/redis/redis_stream.rs @@ -1,8 +1,10 @@ -use super::redis_msg::RedisMsg; +use super::redis_msg::{ParseErr, RedisMsg}; use crate::config::RedisNamespace; +use crate::log_fatal; +use crate::redis_to_client_stream::receiver::MessageQueues; use futures::{Async, Poll}; -use serde_json::Value; -use std::{io::Read, net}; +use lru::LruCache; +use std::{error::Error, io::Read, net}; use tokio::io::AsyncRead; #[derive(Debug)] @@ -23,52 +25,34 @@ impl RedisStream { pub fn with_namespace(self, namespace: RedisNamespace) -> Self { RedisStream { namespace, ..self } } - // Text comes in from redis as a raw stream, which could be more than one message - // and is not guaranteed to end on a message boundary. We need to break it down - // into messages. Incoming messages *are* guaranteed to be RESP arrays, - // https://redis.io/topics/protocol + // Text comes in from redis as a raw stream, which could be more than one message and + // is not guaranteed to end on a message boundary. We need to break it down into + // messages. Incoming messages *are* guaranteed to be RESP arrays (though still not + // guaranteed to end at an array boundary). See https://redis.io/topics/protocol /// Adds any new Redis messages to the `MsgQueue` for the appropriate `ClientAgent`. - pub fn poll_redis(&mut self) -> Vec<(String, Value)> { + pub fn poll_redis( + &mut self, + hashtag_to_id_cache: &mut LruCache, + queues: &mut MessageQueues, + ) { let mut buffer = vec![0u8; 6000]; - let mut messages = Vec::new(); - - if let Async::Ready(num_bytes_read) = self.poll_read(&mut buffer).unwrap() { + if let Ok(Async::Ready(num_bytes_read)) = self.poll_read(&mut buffer) { let raw_utf = self.as_utf8(buffer, num_bytes_read); self.incoming_raw_msg.push_str(&raw_utf); - - // Only act if we have a full message (end on a msg boundary) - if !self.incoming_raw_msg.ends_with("}\r\n") { - return messages; - }; - let prefix_to_skip = match &*self.namespace { - Some(namespace) => format!("{}:timeline:", namespace), - None => "timeline:".to_string(), - }; - - let mut msg = RedisMsg::from_raw(&self.incoming_raw_msg, prefix_to_skip.len()); - - while !msg.raw.is_empty() { - let command = msg.next_field(); - match command.as_str() { - "message" => { - let (raw_timeline, msg_value) = msg.extract_raw_timeline_and_message(); - messages.push((raw_timeline, msg_value)); - } - - "subscribe" | "unsubscribe" => { - // No msg, so ignore & advance cursor to end - let _channel = msg.next_field(); - msg.cursor += ":".len(); - let _active_subscriptions = msg.process_number(); - msg.cursor += "\r\n".len(); - } - cmd => panic!("Invariant violation: {} is unexpected Redis output", cmd), - }; - msg = RedisMsg::from_raw(&msg.raw[msg.cursor..], msg.prefix_len); + match process_messages( + self.incoming_raw_msg.clone(), + &mut self.namespace.0, + hashtag_to_id_cache, + queues, + ) { + Ok(None) => self.incoming_raw_msg.clear(), + Ok(Some(msg_fragment)) => self.incoming_raw_msg = msg_fragment, + Err(e) => { + log::error!("{}", e); + log_fatal!("Could not process RedisStream: {:?}", &self); + } } - self.incoming_raw_msg.clear(); } - messages } fn as_utf8(&mut self, cur_buffer: Vec, size: usize) -> String { @@ -81,12 +65,46 @@ impl RedisStream { } } +type HashtagCache = LruCache; +pub fn process_messages( + raw_msg: String, + namespace: &mut Option, + cache: &mut HashtagCache, + queues: &mut MessageQueues, +) -> Result, Box> { + let prefix_len = match namespace { + Some(namespace) => format!("{}:timeline:", namespace).len(), + None => "timeline:".len(), + }; + + let mut input = raw_msg.as_str(); + loop { + let rest = match RedisMsg::from_raw(&input, cache, prefix_len) { + Ok((RedisMsg::EventMsg(timeline, event), rest)) => { + for msg_queue in queues.values_mut() { + if msg_queue.timeline == timeline { + msg_queue.messages.push_back(event.clone()); + } + } + rest + } + Ok((RedisMsg::SubscriptionMsg, rest)) => rest, + Err(ParseErr::Incomplete) => break, + Err(ParseErr::Unrecoverable) => log_fatal!("Failed parsing Redis msg: {}", &input), + }; + input = rest + } + + Ok(Some(input.to_string())) +} + impl std::ops::Deref for RedisStream { type Target = net::TcpStream; fn deref(&self) -> &Self::Target { &self.inner } } + impl std::ops::DerefMut for RedisStream { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner diff --git a/tc --explain E0106 b/tc --explain E0106 new file mode 100644 index 0000000..b9b1d97 --- /dev/null +++ b/tc --explain E0106 @@ -0,0 +1,53 @@ +This error indicates that a lifetime is missing from a type. If it is an error +inside a function signature, the problem may be with failing to adhere to the +lifetime elision rules (see below). + +Erroneous code examples: + +``` +struct Foo1 { x: &bool } + // ^ expected lifetime parameter +struct Foo2<'a> { x: &'a bool } // correct + +struct Bar1 { x: Foo2 } + // ^^^^ expected lifetime parameter +struct Bar2<'a> { x: Foo2<'a> } // correct + +enum Baz1 { A(u8), B(&bool), } + // ^ expected lifetime parameter +enum Baz2<'a> { A(u8), B(&'a bool), } // correct + +type MyStr1 = &str; + // ^ expected lifetime parameter +type MyStr2<'a> = &'a str; // correct +``` + +Lifetime elision is a special, limited kind of inference for lifetimes in +function signatures which allows you to leave out lifetimes in certain cases. +For more background on lifetime elision see [the book][book-le]. + +The lifetime elision rules require that any function signature with an elided +output lifetime must either have: + + - exactly one input lifetime + - or, multiple input lifetimes, but the function must also be a method with a + `&self` or `&mut self` receiver + +In the first case, the output lifetime is inferred to be the same as the unique +input lifetime. In the second case, the lifetime is instead inferred to be the +same as the lifetime on `&self` or `&mut self`. + +Here are some examples of elision errors: + +``` +// error, no input lifetimes +fn foo() -> &str { } + +// error, `x` and `y` have distinct lifetimes inferred +fn bar(x: &str, y: &str) -> &str { } + +// error, `y`'s lifetime is inferred to be distinct from `x`'s +fn baz<'a>(x: &'a str, y: &str) -> &str { } +``` + +[book-le]: https://doc.rust-lang.org/book/ch10-03-lifetime-syntax.html#lifetime-elision