From daf7d1ae7f61888035c49a87d3f85a1d93c21ec1 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Thu, 7 May 2020 10:56:11 -0400 Subject: [PATCH] Add tests for polling for multiple messages (#149) * Add tests for polling for multiple messages This commit adds a mock Redis interface and adds tests that poll the mock interface for multiple messages at a time. These tests test that Flodgatt is robust against receiving incomplete messages, including if the message break results in receiving invalid UTF8. * Remove temporary files --- Cargo.lock | 2 +- Cargo.toml | 2 +- benches/parse_redis.rs | 111 ++++-- src/main.rs | 12 +- src/request.rs | 5 + src/request/timeline.rs | 10 +- src/request/timeline/inner.rs | 8 +- src/response.rs | 4 +- src/response/event.rs | 7 +- src/response/event/checked_event.rs | 9 +- src/response/event/checked_event/account.rs | 66 ++-- src/response/event/checked_event/emoji.rs | 2 +- src/response/event/checked_event/mention.rs | 2 +- src/response/event/checked_event/status.rs | 59 ++-- .../event/checked_event/status/application.rs | 12 +- .../event/checked_event/status/attachment.rs | 22 +- .../event/checked_event/status/card.rs | 30 +- .../event/checked_event/status/poll.rs | 28 +- src/response/event/checked_event/tag.rs | 16 +- .../event/checked_event/visibility.rs | 2 +- src/response/event/dynamic_event.rs | 4 +- src/response/redis.rs | 2 +- src/response/redis/connection.rs | 330 +++++++++++------- src/response/redis/connection/err.rs | 1 + src/response/redis/manager.rs | 141 +++++--- src/response/redis/manager/test.rs | 245 +++++++++++++ src/response/redis/msg.rs | 8 +- src/response/redis/msg/err.rs | 17 +- 28 files changed, 786 insertions(+), 371 deletions(-) create mode 100644 src/response/redis/manager/test.rs diff --git a/Cargo.lock b/Cargo.lock index 84de382..c010f01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,7 +416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.9.8" +version = "0.9.9" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 6ea405d..367ba03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.9.8" +version = "0.9.9" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/benches/parse_redis.rs b/benches/parse_redis.rs index 48c6aeb..670b8c0 100644 --- a/benches/parse_redis.rs +++ b/benches/parse_redis.rs @@ -1,7 +1,12 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use flodgatt::response::{RedisMsg, RedisParseOutput}; +use flodgatt::config; +use flodgatt::request::{Content::*, Reach::*, Stream::*, Timeline}; +use flodgatt::response::{Event, Manager, RedisMsg, RedisParseOutput}; +use flodgatt::Id; +use futures::{Async, Stream}; use lru::LruCache; use std::convert::TryFrom; +use std::fs; fn parse_long_redis_input<'a>(input: &'a str) -> RedisMsg<'a> { if let RedisParseOutput::Msg(msg) = RedisParseOutput::try_from(input).unwrap() { @@ -12,27 +17,34 @@ fn parse_long_redis_input<'a>(input: &'a str) -> RedisMsg<'a> { } } -// fn parse_to_timeline(msg: RedisMsg) -> Timeline { -// let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..]; -// let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap(); -// assert_eq!(tl, Timeline(User(Id(1)), Federated, All)); -// tl -// } -// fn parse_to_checked_event(msg: RedisMsg) -> EventKind { -// EventKind::TypeSafe(serde_json::from_str(msg.event_txt).unwrap()) -// } +fn parse_to_timeline(msg: RedisMsg) -> Timeline { + let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..]; + let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap(); + assert_eq!(tl, Timeline(User(Id(1)), Federated, All)); + tl +} +fn parse_to_checked_event(msg: RedisMsg) -> Event { + Event::TypeSafe(serde_json::from_str(msg.event_txt).unwrap()) +} -// fn parse_to_dyn_event(msg: RedisMsg) -> EventKind { -// EventKind::Dynamic(serde_json::from_str(msg.event_txt).unwrap()) -// } +fn parse_to_dyn_event(msg: RedisMsg) -> Event { + Event::Dynamic(serde_json::from_str(msg.event_txt).unwrap()) +} -// fn redis_msg_to_event_string(msg: RedisMsg) -> String { -// msg.event_txt.to_string() -// } +fn redis_msg_to_event_string(msg: RedisMsg) -> String { + msg.event_txt.to_string() +} -// fn string_to_checked_event(event_txt: &String) -> EventKind { -// EventKind::TypeSafe(serde_json::from_str(event_txt).unwrap()) -// } +fn string_to_checked_event(event_txt: &String) -> Event { + Event::TypeSafe(serde_json::from_str(event_txt).unwrap()) +} + +fn input_msg(i: usize) -> Vec { + fs::read_to_string(format!("test_data/redis_input_{:03}.resp", i)) + .expect("test input not found") + .as_bytes() + .to_vec() +} fn criterion_benchmark(c: &mut Criterion) { let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS; @@ -42,25 +54,54 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| black_box(parse_long_redis_input(input))) }); - // let msg = parse_long_redis_input(input); - // group.bench_function("parse RedisMsg to Timeline", |b| { - // b.iter(|| black_box(parse_to_timeline(msg.clone()))) - // }); + let msg = parse_long_redis_input(input); + group.bench_function("parse RedisMsg to Timeline", |b| { + b.iter(|| black_box(parse_to_timeline(msg.clone()))) + }); - // group.bench_function("parse RedisMsg -> DynamicEvent", |b| { - // b.iter(|| black_box(parse_to_dyn_event(msg.clone()))) - // }); + group.bench_function("parse RedisMsg -> CheckedEvent", |b| { + b.iter(|| black_box(parse_to_checked_event(msg.clone()))) + }); - // group.bench_function("parse RedisMsg -> CheckedEvent", |b| { - // b.iter(|| black_box(parse_to_checked_event(msg.clone()))) - // }); + group.bench_function("parse RedisMsg -> DynamicEvent", |b| { + b.iter(|| black_box(parse_to_dyn_event(msg.clone()))) + }); - // group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| { - // b.iter(|| { - // let txt = black_box(redis_msg_to_event_string(msg.clone())); - // black_box(string_to_checked_event(&txt)); - // }) - // }); + group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| { + b.iter(|| { + let txt = black_box(redis_msg_to_event_string(msg.clone())); + black_box(string_to_checked_event(&txt)); + }) + }); + + group.bench_function("parse six messages from Redis", |b| { + b.iter_batched( + || { + let mut manager = Manager::try_from(&config::Redis::default()).expect("bench"); + for i in 1..=6 { + manager.redis_conn.add(&input_msg(i)); + } + manager + }, + |mut m| { + black_box({ + let mut i = 1; + while let Ok(Async::Ready(Some(len))) = m.redis_conn.poll_redis(m.unread_idx.1) + { + m.unread_idx = (0, m.unread_idx.1 + len); + while let Ok(Async::Ready(Some((_tl, event)))) = m.poll() { + // println!("Parsing Event #{:03}", i + 1); + // assert_eq!(event, output(i)); + i += 1; + } + } + + assert_eq!(i, 7) + }) + }, + criterion::BatchSize::SmallInput, + ) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/src/main.rs b/src/main.rs index 87ae0a5..070609e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -83,12 +83,16 @@ fn main() -> Result<(), Error> { let manager = shared_manager.clone(); let stream = Interval::new(Instant::now(), poll_freq) .map_err(|e| log::error!("{}", e)) - .for_each( - move |_| match manager.lock().unwrap_or_else(RedisManager::recover).poll() { + .for_each(move |_| { + match manager + .lock() + .unwrap_or_else(RedisManager::recover) + .send_msgs() + { Err(e) => Ok(log::error!("{}", e)), Ok(_) => Ok(()), - }, - ); + } + }); warp::spawn(lazy(move || stream)); warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err)) diff --git a/src/request.rs b/src/request.rs index bd5c435..f753706 100644 --- a/src/request.rs +++ b/src/request.rs @@ -9,6 +9,11 @@ mod subscription; pub use err::{Error, Timeline as TimelineErr}; pub use subscription::{Blocks, Subscription}; pub use timeline::Timeline; + +#[cfg(feature = "bench")] +pub use timeline::{Content, Reach, Stream}; + +#[cfg(not(feature = "bench"))] use timeline::{Content, Reach, Stream}; pub use self::postgres::PgPool; diff --git a/src/request/timeline.rs b/src/request/timeline.rs index 87b96d0..3f5eeed 100644 --- a/src/request/timeline.rs +++ b/src/request/timeline.rs @@ -1,6 +1,7 @@ -pub(crate) use self::inner::{Content, Reach, Scope, Stream, UserData}; +pub use self::inner::{Content, Reach, Scope, Stream}; use super::err::Timeline as Error; use super::query::Query; +pub(crate) use inner::UserData; use lru::LruCache; use warp::reject::Rejection; @@ -11,7 +12,7 @@ mod inner; type Result = std::result::Result; #[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)] -pub struct Timeline(pub(crate) Stream, pub(crate) Reach, pub(crate) Content); +pub struct Timeline(pub Stream, pub Reach, pub Content); impl Timeline { pub fn empty() -> Self { @@ -61,10 +62,7 @@ impl Timeline { }) } - pub(crate) fn from_redis_text( - timeline: &str, - cache: &mut LruCache, - ) -> Result { + pub fn from_redis_text(timeline: &str, cache: &mut LruCache) -> Result { use {Content::*, Error::*, Reach::*, Stream::*}; let mut tag_id = |t: &str| cache.get(&t.to_string()).map_or(Err(BadTag), |id| Ok(*id)); diff --git a/src/request/timeline/inner.rs b/src/request/timeline/inner.rs index 4d2e320..87a3724 100644 --- a/src/request/timeline/inner.rs +++ b/src/request/timeline/inner.rs @@ -5,7 +5,7 @@ use hashbrown::HashSet; use std::convert::TryFrom; #[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)] -pub(crate) enum Stream { +pub enum Stream { User(Id), List(i64), Direct(i64), @@ -15,20 +15,20 @@ pub(crate) enum Stream { } #[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)] -pub(crate) enum Reach { +pub enum Reach { Local, Federated, } #[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)] -pub(crate) enum Content { +pub enum Content { All, Media, Notification, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub(crate) enum Scope { +pub enum Scope { Read, Statuses, Notifications, diff --git a/src/response.rs b/src/response.rs index 85b2898..7d63986 100644 --- a/src/response.rs +++ b/src/response.rs @@ -14,4 +14,6 @@ mod stream; pub use redis::Error; #[cfg(feature = "bench")] -pub use redis::{RedisMsg, RedisParseOutput}; +pub use event::EventKind; +#[cfg(feature = "bench")] +pub use redis::{Manager, RedisMsg, RedisParseOutput}; diff --git a/src/response/event.rs b/src/response/event.rs index 6220981..7c78c26 100644 --- a/src/response/event.rs +++ b/src/response/event.rs @@ -1,9 +1,12 @@ +#[cfg(not(test))] mod checked_event; +#[cfg(test)] +pub mod checked_event; mod dynamic_event; pub mod err; -use self::checked_event::CheckedEvent; -use self::dynamic_event::{DynEvent, EventKind}; +pub use self::checked_event::CheckedEvent; +pub use self::dynamic_event::{DynEvent, EventKind}; use crate::Id; use hashbrown::HashSet; diff --git a/src/response/event/checked_event.rs b/src/response/event/checked_event.rs index 25278ca..a77f4eb 100644 --- a/src/response/event/checked_event.rs +++ b/src/response/event/checked_event.rs @@ -1,5 +1,4 @@ -mod account; - +pub(crate) mod account; mod announcement; mod announcement_reaction; mod conversation; @@ -7,9 +6,9 @@ mod emoji; mod id; mod mention; mod notification; -mod status; -mod tag; -mod visibility; +pub(crate) mod status; +pub(crate) mod tag; +pub(crate) mod visibility; pub(self) use super::Payload; pub(super) use announcement_reaction::AnnouncementReaction; diff --git a/src/response/event/checked_event/account.rs b/src/response/event/checked_event/account.rs index b9a6344..b3ae322 100644 --- a/src/response/event/checked_event/account.rs +++ b/src/response/event/checked_event/account.rs @@ -4,47 +4,47 @@ use serde::{Deserialize, Serialize}; #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(super) struct Account { +pub(crate) struct Account { pub id: Id, - pub(super) username: String, + pub(crate) username: String, pub acct: String, - pub(super) url: String, - pub(super) display_name: String, - pub(super) note: String, - pub(super) avatar: String, - pub(super) avatar_static: String, - pub(super) header: String, - pub(super) header_static: String, - pub(super) locked: bool, - pub(super) emojis: Vec, - pub(super) discoverable: Option, // Shouldn't be option? - pub(super) created_at: String, - pub(super) statuses_count: i64, - pub(super) followers_count: i64, - pub(super) following_count: i64, - pub(super) moved: Option, - pub(super) fields: Option>, - pub(super) bot: Option, - pub(super) source: Option, - pub(super) group: Option, // undocumented - pub(super) last_status_at: Option, // undocumented + pub(crate) url: String, + pub(crate) display_name: String, + pub(crate) note: String, + pub(crate) avatar: String, + pub(crate) avatar_static: String, + pub(crate) header: String, + pub(crate) header_static: String, + pub(crate) locked: bool, + pub(crate) emojis: Vec, + pub(crate) discoverable: Option, // Shouldn't be option? + pub(crate) created_at: String, + pub(crate) statuses_count: i64, + pub(crate) followers_count: i64, + pub(crate) following_count: i64, + pub(crate) moved: Option, + pub(crate) fields: Option>, + pub(crate) bot: Option, + pub(crate) source: Option, + pub(crate) group: Option, // undocumented + pub(crate) last_status_at: Option, // undocumented } #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(super) struct Field { - pub(super) name: String, - pub(super) value: String, - pub(super) verified_at: Option, +pub(crate) struct Field { + pub(crate) name: String, + pub(crate) value: String, + pub(crate) verified_at: Option, } #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(super) struct Source { - pub(super) note: String, - pub(super) fields: Vec, - pub(super) privacy: Option, - pub(super) sensitive: bool, - pub(super) language: String, - pub(super) follow_requests_count: i64, +pub(crate) struct Source { + pub(crate) note: String, + pub(crate) fields: Vec, + pub(crate) privacy: Option, + pub(crate) sensitive: bool, + pub(crate) language: String, + pub(crate) follow_requests_count: i64, } diff --git a/src/response/event/checked_event/emoji.rs b/src/response/event/checked_event/emoji.rs index f274113..80cc3e4 100644 --- a/src/response/event/checked_event/emoji.rs +++ b/src/response/event/checked_event/emoji.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(super) struct Emoji { +pub(crate) struct Emoji { shortcode: String, url: String, static_url: String, diff --git a/src/response/event/checked_event/mention.rs b/src/response/event/checked_event/mention.rs index c26ee2c..28c8357 100644 --- a/src/response/event/checked_event/mention.rs +++ b/src/response/event/checked_event/mention.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(super) struct Mention { +pub(crate) struct Mention { pub id: Id, username: String, acct: String, diff --git a/src/response/event/checked_event/status.rs b/src/response/event/checked_event/status.rs index c702132..0fc65fd 100644 --- a/src/response/event/checked_event/status.rs +++ b/src/response/event/checked_event/status.rs @@ -1,5 +1,5 @@ mod application; -pub(super) mod attachment; +pub(crate) mod attachment; mod card; mod poll; @@ -22,37 +22,36 @@ use std::string::String; #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct Status { - pub(super) id: Id, - pub(super) uri: String, - pub(super) created_at: String, - pub(super) account: Account, - pub(super) content: String, - pub(super) visibility: Visibility, - pub(super) sensitive: bool, - pub(super) spoiler_text: String, - pub(super) media_attachments: Vec, - pub(super) application: Option, // Should be non-optional? - pub(super) mentions: Vec, - pub(super) tags: Vec, - pub(super) emojis: Vec, - pub(super) reblogs_count: i64, - pub(super) favourites_count: i64, - pub(super) replies_count: i64, - pub(super) url: Option, - pub(super) in_reply_to_id: Option, - pub(super) in_reply_to_account_id: Option, - pub(super) reblog: Option>, - pub(super) poll: Option, - pub(super) card: Option, + pub(crate) id: Id, + pub(crate) uri: String, + pub(crate) created_at: String, + pub(crate) account: Account, + pub(crate) content: String, + pub(crate) visibility: Visibility, + pub(crate) sensitive: bool, + pub(crate) spoiler_text: String, + pub(crate) media_attachments: Vec, + pub(crate) application: Option, // Should be non-optional? + pub(crate) mentions: Vec, + pub(crate) tags: Vec, + pub(crate) emojis: Vec, + pub(crate) reblogs_count: i64, + pub(crate) favourites_count: i64, + pub(crate) replies_count: i64, + pub(crate) url: Option, + pub(crate) in_reply_to_id: Option, + pub(crate) in_reply_to_account_id: Option, + pub(crate) reblog: Option>, + pub(crate) poll: Option, + pub(crate) card: Option, pub(crate) language: Option, - - pub(super) text: Option, + pub(crate) text: Option, // ↓↓↓ Only for authorized users - pub(super) favourited: Option, - pub(super) reblogged: Option, - pub(super) muted: Option, - pub(super) bookmarked: Option, - pub(super) pinned: Option, + pub(crate) favourited: Option, + pub(crate) reblogged: Option, + pub(crate) muted: Option, + pub(crate) bookmarked: Option, + pub(crate) pinned: Option, } impl Payload for Status { diff --git a/src/response/event/checked_event/status/application.rs b/src/response/event/checked_event/status/application.rs index 2407b00..f8401d2 100644 --- a/src/response/event/checked_event/status/application.rs +++ b/src/response/event/checked_event/status/application.rs @@ -2,10 +2,10 @@ use serde::{Deserialize, Serialize}; #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(in super::super) struct Application { - pub(super) name: String, - pub(super) website: Option, - pub(super) vapid_key: Option, - pub(super) client_id: Option, - pub(super) client_secret: Option, +pub(crate) struct Application { + pub(crate) name: String, + pub(crate) website: Option, + pub(crate) vapid_key: Option, + pub(crate) client_id: Option, + pub(crate) client_secret: Option, } diff --git a/src/response/event/checked_event/status/attachment.rs b/src/response/event/checked_event/status/attachment.rs index ec21de4..9c991f8 100644 --- a/src/response/event/checked_event/status/attachment.rs +++ b/src/response/event/checked_event/status/attachment.rs @@ -2,21 +2,21 @@ use serde::{Deserialize, Serialize}; #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(in super::super) struct Attachment { - pub(in super::super) id: String, - pub(in super::super) r#type: AttachmentType, - pub(in super::super) url: String, - pub(in super::super) preview_url: String, - pub(in super::super) remote_url: Option, - pub(in super::super) text_url: Option, - pub(in super::super) meta: Option, // TODO - is this the best type for the API? - pub(in super::super) description: Option, - pub(in super::super) blurhash: Option, +pub(crate) struct Attachment { + pub(crate) id: String, + pub(crate) r#type: AttachmentType, + pub(crate) url: String, + pub(crate) preview_url: String, + pub(crate) remote_url: Option, + pub(crate) text_url: Option, + pub(crate) meta: Option, // TODO - is this the best type for the API? + pub(crate) description: Option, + pub(crate) blurhash: Option, } #[serde(rename_all = "lowercase", deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(in super::super) enum AttachmentType { +pub(crate) enum AttachmentType { Unknown, Image, Gifv, diff --git a/src/response/event/checked_event/status/card.rs b/src/response/event/checked_event/status/card.rs index 5bbcabf..33f78d7 100644 --- a/src/response/event/checked_event/status/card.rs +++ b/src/response/event/checked_event/status/card.rs @@ -2,25 +2,25 @@ use serde::{Deserialize, Serialize}; #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(in super::super) struct Card { - pub(super) url: String, - pub(super) title: String, - pub(super) description: String, - pub(super) r#type: CardType, - pub(super) author_name: Option, - pub(super) author_url: Option, - pub(super) provider_name: Option, - pub(super) provider_url: Option, - pub(super) html: Option, - pub(super) width: Option, - pub(super) height: Option, - pub(super) image: Option, - pub(super) embed_url: Option, +pub(crate) struct Card { + pub(crate) url: String, + pub(crate) title: String, + pub(crate) description: String, + pub(crate) r#type: CardType, + pub(crate) author_name: Option, + pub(crate) author_url: Option, + pub(crate) provider_name: Option, + pub(crate) provider_url: Option, + pub(crate) html: Option, + pub(crate) width: Option, + pub(crate) height: Option, + pub(crate) image: Option, + pub(crate) embed_url: Option, } #[serde(rename_all = "lowercase", deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(super) enum CardType { +pub(crate) enum CardType { Link, Photo, Video, diff --git a/src/response/event/checked_event/status/poll.rs b/src/response/event/checked_event/status/poll.rs index 47d018c..a7bc95b 100644 --- a/src/response/event/checked_event/status/poll.rs +++ b/src/response/event/checked_event/status/poll.rs @@ -3,22 +3,22 @@ use serde::{Deserialize, Serialize}; #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(in super::super) struct Poll { - pub(super) id: String, - pub(super) expires_at: String, - pub(super) expired: bool, - pub(super) multiple: bool, - pub(super) votes_count: i64, - pub(super) voters_count: Option, - pub(super) voted: Option, - pub(super) own_votes: Option>, - pub(super) options: Vec, - pub(super) emojis: Vec, +pub(crate) struct Poll { + pub(crate) id: String, + pub(crate) expires_at: String, + pub(crate) expired: bool, + pub(crate) multiple: bool, + pub(crate) votes_count: i64, + pub(crate) voters_count: Option, + pub(crate) voted: Option, + pub(crate) own_votes: Option>, + pub(crate) options: Vec, + pub(crate) emojis: Vec, } #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(super) struct PollOptions { - pub(super) title: String, - pub(super) votes_count: Option, +pub(crate) struct PollOptions { + pub(crate) title: String, + pub(crate) votes_count: Option, } diff --git a/src/response/event/checked_event/tag.rs b/src/response/event/checked_event/tag.rs index 1b39626..275bbb7 100644 --- a/src/response/event/checked_event/tag.rs +++ b/src/response/event/checked_event/tag.rs @@ -2,16 +2,16 @@ use serde::{Deserialize, Serialize}; #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(super) struct Tag { - pub(super) name: String, - pub(super) url: String, - pub(super) history: Option>, +pub(crate) struct Tag { + pub(crate) name: String, + pub(crate) url: String, + pub(crate) history: Option>, } #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(super) struct History { - pub(super) day: String, - pub(super) uses: String, - pub(super) accounts: String, +pub(crate) struct History { + pub(crate) day: String, + pub(crate) uses: String, + pub(crate) accounts: String, } diff --git a/src/response/event/checked_event/visibility.rs b/src/response/event/checked_event/visibility.rs index 1334b7a..0ec1a05 100644 --- a/src/response/event/checked_event/visibility.rs +++ b/src/response/event/checked_event/visibility.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; #[serde(rename_all = "lowercase", deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub(super) enum Visibility { +pub(crate) enum Visibility { Public, Unlisted, Private, diff --git a/src/response/event/dynamic_event.rs b/src/response/event/dynamic_event.rs index f65b5fa..53ec243 100644 --- a/src/response/event/dynamic_event.rs +++ b/src/response/event/dynamic_event.rs @@ -18,7 +18,7 @@ pub struct DynEvent { } #[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum EventKind { +pub enum EventKind { Update(DynStatus), NonUpdate, } @@ -30,7 +30,7 @@ impl Default for EventKind { } #[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) struct DynStatus { +pub struct DynStatus { pub(crate) id: Id, pub(crate) username: String, pub(crate) language: Option, diff --git a/src/response/redis.rs b/src/response/redis.rs index 6ce2ea7..6e2281c 100644 --- a/src/response/redis.rs +++ b/src/response/redis.rs @@ -13,7 +13,7 @@ pub use msg::{RedisMsg, RedisParseOutput}; use connection::RedisConnErr; use msg::RedisParseErr; -enum RedisCmd { +pub(crate) enum RedisCmd { Subscribe, Unsubscribe, } diff --git a/src/response/redis/connection.rs b/src/response/redis/connection.rs index 8bf0697..b68d7c4 100644 --- a/src/response/redis/connection.rs +++ b/src/response/redis/connection.rs @@ -1,149 +1,227 @@ mod err; -pub(crate) use err::RedisConnErr; +pub(super) use connection::*; +pub use err::RedisConnErr; +#[cfg(any(test, feature = "bench"))] +pub(self) use mock_connection as connection; -use super::Error as ManagerErr; -use super::RedisCmd; -use crate::config::Redis; -use crate::request::Timeline; +#[cfg(not(any(test, feature = "bench")))] +mod connection { + use super::super::Error as ManagerErr; + use super::super::RedisCmd; + use super::err::RedisConnErr; + use crate::config::Redis; + use crate::request::Timeline; -use futures::{Async, Poll}; -use lru::LruCache; -use std::io::{self, Read, Write}; -use std::net::TcpStream; -use std::time::Duration; + use futures::{Async, Poll}; + use lru::LruCache; + use std::io::{self, Read, Write}; + use std::net::TcpStream; + use std::time::Duration; -type Result = std::result::Result; + type Result = std::result::Result; -#[derive(Debug)] -pub(super) struct RedisConn { - primary: TcpStream, - secondary: TcpStream, - pub(super) namespace: Option, - // TODO: eventually, it might make sense to have Mastodon publish to timelines with - // the tag number instead of the tag name. This would save us from dealing - // with a cache here and would be consistent with how lists/users are handled. - pub(super) tag_name_cache: LruCache, - pub(super) input: Vec, -} - -impl RedisConn { - pub(super) fn new(redis_cfg: &Redis) -> Result { - let addr = [&*redis_cfg.host, ":", &*redis_cfg.port.to_string()].concat(); - - let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?; - conn.set_nonblocking(true) - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - Ok(Self { - primary: conn, - secondary: Self::new_connection(&addr, redis_cfg.password.as_ref())?, - tag_name_cache: LruCache::new(1000), - namespace: redis_cfg.namespace.clone().0, - input: vec![0; 4096 * 4], - }) + #[derive(Debug)] + pub struct RedisConn { + primary: TcpStream, + secondary: TcpStream, + pub(in super::super) namespace: Option, + // TODO: eventually, it might make sense to have Mastodon publish to timelines with + // the tag number instead of the tag name. This would save us from dealing + // with a cache here and would be consistent with how lists/users are handled. + pub(in super::super) tag_name_cache: LruCache, + pub(in super::super) input: Vec, } - pub(super) fn poll_redis(&mut self, start: usize) -> Poll { - const BLOCK: usize = 4096 * 2; - if self.input.len() < start + BLOCK { - self.input.resize(self.input.len() * 2, 0); - log::info!("Resizing input buffer to {} KiB.", self.input.len() / 1024); - // log::info!("Current buffer: {}", String::from_utf8_lossy(&self.input)); + + impl RedisConn { + pub(in super::super) fn new(redis_cfg: &Redis) -> Result { + let addr = [&*redis_cfg.host, ":", &*redis_cfg.port.to_string()].concat(); + + let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?; + conn.set_nonblocking(true) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + Ok(Self { + primary: conn, + secondary: Self::new_connection(&addr, redis_cfg.password.as_ref())?, + tag_name_cache: LruCache::new(1000), + namespace: redis_cfg.namespace.clone().0, + input: vec![0; 4096 * 4], + }) } - use Async::*; - match self.primary.read(&mut self.input[start..start + BLOCK]) { - Ok(n) => Ok(Ready(n)), - Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => Ok(NotReady), - Err(e) => { - Ready(log::error!("{}", e)); - Ok(Ready(0)) + pub(in super::super) fn poll_redis(&mut self, i: usize) -> Poll, ManagerErr> { + const BLOCK: usize = 4096 * 2; + if self.input.len() < i + BLOCK { + self.input.resize(self.input.len() * 2, 0); + log::info!("Resizing input buffer to {} KiB.", self.input.len() / 1024); + // log::info!("Current buffer: {}", String::from_utf8_lossy(&self.input)); + } + + use Async::*; + match self.primary.read(&mut self.input[i..i + BLOCK]) { + Ok(n) if n == 0 => Ok(Ready(None)), + Ok(n) => Ok(Ready(Some(n))), + Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => Ok(NotReady), + Err(e) => { + Ready(log::error!("{}", e)); + Ok(Ready(None)) + } + } + } + + pub(crate) fn send_cmd(&mut self, cmd: RedisCmd, timelines: &[Timeline]) -> Result<()> { + let namespace = self.namespace.take(); + let timelines: Result> = timelines + .iter() + .map(|tl| { + let hashtag = tl.tag().and_then(|id| self.tag_name_cache.get(&id)); + match &namespace { + Some(ns) => Ok(format!("{}:{}", ns, tl.to_redis_raw_timeline(hashtag)?)), + None => Ok(tl.to_redis_raw_timeline(hashtag)?), + } + }) + .collect(); + + let (primary_cmd, secondary_cmd) = cmd.into_sendable(&timelines?[..]); + self.primary.write_all(&primary_cmd)?; + + // We also need to set a key to tell the Puma server that we've subscribed or + // unsubscribed to the channel because it stops publishing updates when it thinks + // no one is subscribed. + // (Documented in [PR #3278](https://github.com/tootsuite/mastodon/pull/3278)) + // Question: why can't the Puma server just use NUMSUB for this? + self.secondary.write_all(&secondary_cmd)?; + Ok(()) + } + + fn new_connection(addr: &str, pass: Option<&String>) -> Result { + let mut conn = TcpStream::connect(&addr)?; + if let Some(password) = pass { + Self::auth_connection(&mut conn, &addr, password)?; + } + + Self::validate_connection(&mut conn, &addr)?; + conn.set_read_timeout(Some(Duration::from_millis(10))) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + Self::set_connection_name(&mut conn, &addr)?; + Ok(conn) + } + + fn auth_connection(conn: &mut TcpStream, addr: &str, pass: &str) -> Result<()> { + conn.write_all( + &[ + b"*2\r\n$4\r\nauth\r\n$", + pass.len().to_string().as_bytes(), + b"\r\n", + pass.as_bytes(), + b"\r\n", + ] + .concat(), + ) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let mut buffer = vec![0_u8; 5]; + conn.read_exact(&mut buffer) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + if String::from_utf8_lossy(&buffer) != "+OK\r\n" { + Err(RedisConnErr::IncorrectPassword(pass.to_string()))? + } + Ok(()) + } + + fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<()> { + conn.write_all(b"PING\r\n") + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let mut buffer = vec![0_u8; 100]; + conn.read(&mut buffer) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let reply = String::from_utf8_lossy(&buffer); + match &*reply { + r if r.starts_with("+PONG\r\n") => Ok(()), + r if r.starts_with("-NOAUTH") => Err(RedisConnErr::MissingPassword), + r if r.starts_with("HTTP/1.") => Err(RedisConnErr::NotRedis(addr.to_string())), + _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), + } + } + + fn set_connection_name(conn: &mut TcpStream, addr: &str) -> Result<()> { + conn.write_all(b"*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$8\r\nflodgatt\r\n") + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let mut buffer = vec![0_u8; 100]; + conn.read(&mut buffer) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let reply = String::from_utf8_lossy(&buffer); + match &*reply { + r if r.starts_with("+OK\r\n") => Ok(()), + _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), } } } +} +#[cfg(any(test, feature = "bench"))] +mod mock_connection { + use super::super::Error as ManagerErr; + use super::super::RedisCmd; + use super::err::RedisConnErr; + use crate::config::Redis; + use crate::request::Timeline; - pub(crate) fn send_cmd(&mut self, cmd: RedisCmd, timelines: &[Timeline]) -> Result<()> { - let namespace = self.namespace.take(); - let timelines: Result> = timelines - .iter() - .map(|tl| { - let hashtag = tl.tag().and_then(|id| self.tag_name_cache.get(&id)); - match &namespace { - Some(ns) => Ok(format!("{}:{}", ns, tl.to_redis_raw_timeline(hashtag)?)), - None => Ok(tl.to_redis_raw_timeline(hashtag)?), - } + use futures::{Async, Poll}; + use lru::LruCache; + use std::collections::VecDeque; + + type Result = std::result::Result; + + #[derive(Debug)] + pub struct RedisConn { + pub(in super::super) namespace: Option, + pub(in super::super) tag_name_cache: LruCache, + pub(in super::super) input: Vec, + pub(in super::super) test_input: VecDeque, + } + + impl RedisConn { + pub(in super::super) fn new(redis_cfg: &Redis) -> Result { + Ok(Self { + tag_name_cache: LruCache::new(1000), + namespace: redis_cfg.namespace.clone().0, + input: vec![0; 4096 * 4], + test_input: VecDeque::new(), }) - .collect(); - - let (primary_cmd, secondary_cmd) = cmd.into_sendable(&timelines?[..]); - self.primary.write_all(&primary_cmd)?; - - // We also need to set a key to tell the Puma server that we've subscribed or - // unsubscribed to the channel because it stops publishing updates when it thinks - // no one is subscribed. - // (Documented in [PR #3278](https://github.com/tootsuite/mastodon/pull/3278)) - // Question: why can't the Puma server just use NUMSUB for this? - self.secondary.write_all(&secondary_cmd)?; - Ok(()) - } - - fn new_connection(addr: &str, pass: Option<&String>) -> Result { - let mut conn = TcpStream::connect(&addr)?; - if let Some(password) = pass { - Self::auth_connection(&mut conn, &addr, password)?; } - Self::validate_connection(&mut conn, &addr)?; - conn.set_read_timeout(Some(Duration::from_millis(10))) - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - Self::set_connection_name(&mut conn, &addr)?; - Ok(conn) - } + pub fn poll_redis(&mut self, start: usize) -> Poll, ManagerErr> { + const BLOCK: usize = 4096 * 2; + if self.input.len() < start + BLOCK { + self.input.resize(self.input.len() * 2, 0); + log::info!("Resizing input buffer to {} KiB.", self.input.len() / 1024); + } - fn auth_connection(conn: &mut TcpStream, addr: &str, pass: &str) -> Result<()> { - conn.write_all( - &[ - b"*2\r\n$4\r\nauth\r\n$", - pass.len().to_string().as_bytes(), - b"\r\n", - pass.as_bytes(), - b"\r\n", - ] - .concat(), - ) - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let mut buffer = vec![0_u8; 5]; - conn.read_exact(&mut buffer) - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - if String::from_utf8_lossy(&buffer) != "+OK\r\n" { - Err(RedisConnErr::IncorrectPassword(pass.to_string()))? + for i in 0..BLOCK { + if let Some(byte) = self.test_input.pop_front() { + self.input[start + i] = byte; + } else if i > 0 { + return Ok(Async::Ready(Some(i))); + } else { + return Ok(Async::Ready(None)); + } + } + Ok(Async::Ready(Some(BLOCK))) } - Ok(()) - } - fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<()> { - conn.write_all(b"PING\r\n") - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let mut buffer = vec![0_u8; 100]; - conn.read(&mut buffer) - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let reply = String::from_utf8_lossy(&buffer); - match &*reply { - r if r.starts_with("+PONG\r\n") => Ok(()), - r if r.starts_with("-NOAUTH") => Err(RedisConnErr::MissingPassword), - r if r.starts_with("HTTP/1.") => Err(RedisConnErr::NotRedis(addr.to_string())), - _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), + pub fn add(&mut self, input: &[u8]) { + for byte in input { + self.test_input.push_back(*byte) + } } - } + pub(crate) fn send_cmd(&mut self, cmd: RedisCmd, timelines: &[Timeline]) -> Result<()> { + // stub - does nothing; silences some unused-code warnings + let timelines: Result> = timelines + .iter() + .map(|tl| Ok(tl.to_redis_raw_timeline(None).expect("test"))) + .collect(); - fn set_connection_name(conn: &mut TcpStream, addr: &str) -> Result<()> { - conn.write_all(b"*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$8\r\nflodgatt\r\n") - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let mut buffer = vec![0_u8; 100]; - conn.read(&mut buffer) - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let reply = String::from_utf8_lossy(&buffer); - match &*reply { - r if r.starts_with("+OK\r\n") => Ok(()), - _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), + let _ = cmd.into_sendable(&timelines?); + + Ok(()) } } } diff --git a/src/response/redis/connection/err.rs b/src/response/redis/connection/err.rs index da98940..a68a890 100644 --- a/src/response/redis/connection/err.rs +++ b/src/response/redis/connection/err.rs @@ -13,6 +13,7 @@ pub enum RedisConnErr { } impl RedisConnErr { + #[allow(unused)] // Not used during testing due to conditional compilation pub(super) fn with_addr>(address: T, inner: std::io::Error) -> Self { Self::ConnectionErr { addr: address.as_ref().to_string(), diff --git a/src/response/redis/manager.rs b/src/response/redis/manager.rs index aaade1f..ab878c1 100644 --- a/src/response/redis/manager.rs +++ b/src/response/redis/manager.rs @@ -25,73 +25,111 @@ type EventChannel = Sender>; /// The item that streams from Redis and is polled by the `ClientAgent` pub struct Manager { - redis_conn: RedisConn, + pub redis_conn: RedisConn, timelines: HashMap>, ping_time: Instant, channel_id: u32, - unread_idx: (usize, usize), + pub unread_idx: (usize, usize), tag_id_cache: LruCache, } impl Stream for Manager { - type Item = (); + type Item = (Timeline, Arc); type Error = Error; - fn poll(&mut self) -> Poll, Error> { - if self.ping_time.elapsed() > Duration::from_secs(30) { - self.send_pings()? - } - - while let Async::Ready(msg_len) = self.redis_conn.poll_redis(self.unread_idx.1)? { - self.unread_idx = (0, self.unread_idx.1 + msg_len); - - let input = &self.redis_conn.input[..self.unread_idx.1]; - let mut unread = str::from_utf8(input).unwrap_or_else(|e| { - str::from_utf8(input.split_at(e.valid_up_to()).0).expect("guaranteed by `split_at`") + fn poll(&mut self) -> Poll, Error> { + let input = &self.redis_conn.input[self.unread_idx.0..self.unread_idx.1]; + let (valid, invalid) = str::from_utf8(input) + .map(|v| (v, &b""[..])) + .unwrap_or_else(|e| { + // NOTE - this bounds check occurs more often than necessary; it could occur only when + // polling Redis. However, benchmarking with Criterion shows it to be *very* + // inexpensive (<1 us) and thus not worth removing (doing so would require `unsafe`). + let (valid, invalid) = input.split_at(e.valid_up_to()); + (str::from_utf8(valid).expect("split_at"), invalid) }); - while !unread.is_empty() { - use RedisParseOutput::*; - match RedisParseOutput::try_from(unread) { - Ok(Msg(msg)) => { - // If we get a message and it matches the redis_namespace, get the msg's - // Event and send it to all channels matching the msg's Timeline - if let Some(tl) = msg.timeline_matching_ns(&self.redis_conn.namespace) { - let tl = Timeline::from_redis_text(tl, &mut self.tag_id_cache)?; - let event: Arc = Arc::new(msg.event_txt.try_into()?); - if let Some(channels) = self.timelines.get_mut(&tl) { - for channel in channels.values_mut() { - if let Ok(Async::NotReady) = channel.poll_ready() { - log::warn!("{:?} channel full\ncan't send:{:?}", tl, event); - return Ok(Async::NotReady); - } - let _ = channel.try_send(event.clone()); // err just means channel will be closed - } - } - } - unread = msg.leftover_input; - self.unread_idx.0 = self.unread_idx.1 - unread.len(); + if !valid.is_empty() { + use RedisParseOutput::*; + match RedisParseOutput::try_from(valid) { + Ok(Msg(msg)) => { + // If we get a message and it matches the redis_namespace, get the msg's + // Event and send it to all channels matching the msg's Timeline + if let Some(tl) = msg.timeline_matching_ns(&self.redis_conn.namespace) { + self.unread_idx.0 = + self.unread_idx.1 - msg.leftover_input.len() - invalid.len(); + + let tl = Timeline::from_redis_text(tl, &mut self.tag_id_cache)?; + let event: Arc = Arc::new(msg.event_txt.try_into()?); + Ok(Async::Ready(Some((tl, event)))) + } else { + Ok(Async::Ready(None)) } - Ok(NonMsg(leftover_input)) => { - unread = leftover_input; - self.unread_idx.0 = self.unread_idx.1 - unread.len(); - } - Err(RedisParseErr::Incomplete) => { - self.copy_partial_msg(); - break; - } - Err(e) => Err(Error::RedisParseErr(e, unread.to_string()))?, - }; - } - if self.unread_idx.0 == self.unread_idx.1 { - self.unread_idx = (0, 0) + } + Ok(NonMsg(leftover_input)) => { + self.unread_idx.0 = self.unread_idx.1 - leftover_input.len(); + Ok(Async::Ready(None)) + } + Err(RedisParseErr::Incomplete) => { + self.copy_partial_msg(); + Ok(Async::NotReady) + } + Err(e) => Err(Error::RedisParseErr(e, valid.to_string()))?, } + } else { + self.unread_idx = (0, 0); + Ok(Async::NotReady) } - Ok(Async::Ready(Some(()))) } } impl Manager { + // untested + pub fn send_msgs(&mut self) -> Poll<(), Error> { + if self.ping_time.elapsed() > Duration::from_secs(30) { + self.send_pings()? + } + + while let Ok(Async::Ready(Some(msg_len))) = self.redis_conn.poll_redis(self.unread_idx.1) { + self.unread_idx.1 += msg_len; + + while let Ok(Async::Ready(msg)) = self.poll() { + if let Some((tl, event)) = msg { + for channel in self.timelines.entry(tl).or_default().values_mut() { + if let Ok(Async::NotReady) = channel.poll_ready() { + log::warn!("{:?} channel full\ncan't send:{:?}", tl, event); + self.rewind_to_prev_msg(); + return Ok(Async::NotReady); + } + + let _ = channel.try_send(event.clone()); // err just means channel will be closed + } + } + } + } + Ok(Async::Ready(())) + } + + fn rewind_to_prev_msg(&mut self) { + self.unread_idx.0 = loop { + let input = &self.redis_conn.input[..self.unread_idx.0]; + let input = str::from_utf8(input).unwrap_or_else(|e| { + str::from_utf8(input.split_at(e.valid_up_to()).0).expect("guaranteed by `split_at`") + }); + + let index = if let Some(i) = input.rfind("\r\n*") { + i + "\r\n".len() + } else { + 0 + }; + self.unread_idx.0 = index; + + if let Ok(Async::Ready(Some(_))) = self.poll() { + break index; + } + } + } + fn copy_partial_msg(&mut self) { if self.unread_idx.0 == 0 { // msg already first; no copying needed @@ -107,7 +145,7 @@ impl Manager { self.redis_conn.input = self.redis_conn.input[self.unread_idx.0..].into(); } self.unread_idx = (0, self.unread_idx.1 - self.unread_idx.0); - dbg!(&self.unread_idx); + &self.unread_idx; } /// Create a new `Manager`, with its own Redis connections (but no active subscriptions). pub fn try_from(redis_cfg: &config::Redis) -> Result { @@ -208,3 +246,6 @@ impl Manager { .collect() } } + +#[cfg(test)] +mod test; diff --git a/src/response/redis/manager/test.rs b/src/response/redis/manager/test.rs new file mode 100644 index 0000000..749aeef --- /dev/null +++ b/src/response/redis/manager/test.rs @@ -0,0 +1,245 @@ +use super::*; +use crate::config; +use crate::response::event::checked_event::{ + account::{Account, Field}, + status::attachment::{Attachment, AttachmentType::*}, + status::Status, + tag::Tag, + visibility::Visibility::*, + CheckedEvent::*, +}; +use crate::Id; +use serde_json::json; +use std::fs; + +type TestResult = std::result::Result<(), Box>; + +fn input(i: usize) -> Vec { + fs::read_to_string(format!("test_data/redis_input_{:03}.resp", i)) + .expect("test input not found") + .as_bytes() + .to_vec() +} +fn output(i: usize) -> Arc { + vec![ + Arc::new(include!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/test_data/event_001.rs" + ))), + Arc::new(include!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/test_data/event_002.rs" + ))), + Arc::new(include!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/test_data/event_003.rs" + ))), + Arc::new(include!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/test_data/event_004.rs" + ))), + Arc::new(include!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/test_data/event_005.rs" + ))), + Arc::new(include!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/test_data/event_006.rs" + ))), + ][i] + .clone() +} + +#[test] +fn manager_poll_matches_six_events() -> TestResult { + let mut manager = Manager::try_from(&config::Redis::default())?; + for i in 1..=6 { + manager.redis_conn.add(&input(i)); + } + let mut i = 0; + while let Ok(Async::Ready(Some(len))) = manager.redis_conn.poll_redis(manager.unread_idx.1) { + manager.unread_idx = (0, manager.unread_idx.1 + len); + while let Ok(Async::Ready(Some((_tl, event)))) = manager.poll() { + println!("Parsing Event #{:03}", i + 1); + assert_eq!(event, output(i)); + i += 1; + } + } + Ok(assert_eq!(i, 6)) +} + +#[test] +fn manager_poll_handles_non_utf8() -> TestResult { + let mut manager = Manager::try_from(&config::Redis::default())?; + let mut input_txt = Vec::new(); + for i in 1..=6 { + input_txt.extend_from_slice(&input(i)) + } + + let invalid_idx = str::from_utf8(&input_txt)? + .chars() + .take_while(|char| char.len_utf8() == 1) + .collect::>() + .len() + + 1; + + manager.redis_conn.add(&input_txt[..invalid_idx]); + + let mut i = 0; + while let Ok(Async::Ready(Some(len))) = manager.redis_conn.poll_redis(manager.unread_idx.1) { + manager.unread_idx.1 += len; + while let Ok(Async::Ready(Some((_tl, event)))) = manager.poll() { + println!("Parsing Event #{:03}", i + 1); + assert_eq!(event, output(i)); + i += 1; + } + } + + manager.redis_conn.add(&input_txt[invalid_idx..]); + + while let Ok(Async::Ready(Some(len))) = manager.redis_conn.poll_redis(manager.unread_idx.1) { + manager.unread_idx.1 += len; + while let Ok(Async::Ready(Some((_tl, event)))) = manager.poll() { + println!("Parsing Event #{:03}", i + 1); + assert_eq!(event, output(i)); + i += 1; + } + } + + Ok(assert_eq!(i, 6)) +} + +#[test] +fn manager_poll_matches_six_events_in_batches() -> TestResult { + let mut manager = Manager::try_from(&config::Redis::default())?; + for i in 1..=3 { + manager.redis_conn.add(&input(i)) + } + let mut i = 0; + while let Ok(Async::Ready(Some(len))) = manager.redis_conn.poll_redis(manager.unread_idx.1) { + manager.unread_idx.1 += len; + while let Ok(Async::Ready(Some((_tl, event)))) = manager.poll() { + println!("Parsing Event #{:03}", i + 1); + assert_eq!(event, output(i)); + i += 1; + } + } + + for i in 4..=6 { + manager.redis_conn.add(&input(i)); + } + while let Ok(Async::Ready(Some(len))) = manager.redis_conn.poll_redis(manager.unread_idx.1) { + manager.unread_idx.1 += len; + while let Ok(Async::Ready(Some((_tl, event)))) = manager.poll() { + println!("Parsing Event #{:03}", i + 1); + assert_eq!(event, output(i)); + i += 1; + } + } + Ok(assert_eq!(i, 6)) +} + +#[test] +fn manager_poll_handles_non_events() -> TestResult { + let mut manager = Manager::try_from(&config::Redis::default())?; + for i in 1..=6 { + manager.redis_conn.add(&input(i)); + manager + .redis_conn + .add(b"*3\r\n$9\r\nsubscribe\r\n$12\r\ntimeline:308\r\n:1\r\n"); + } + let mut i = 0; + + while let Ok(Async::Ready(Some(len))) = manager.redis_conn.poll_redis(manager.unread_idx.1) { + manager.unread_idx.1 += len; + while let Ok(Async::Ready(msg)) = manager.poll() { + if let Some((_tl, event)) = msg { + println!("Parsing Event #{:03}", i + 1); + assert_eq!(event, output(i)); + i += 1; + } + } + } + Ok(assert_eq!(i, 6)) +} + +#[test] +fn manager_poll_handles_partial_events() -> TestResult { + let mut manager = Manager::try_from(&config::Redis::default())?; + for i in 1..=3 { + manager.redis_conn.add(&input(i)); + } + manager.redis_conn.add(&input(4)[..50]); + let mut i = 0; + + while let Ok(Async::Ready(Some(len))) = manager.redis_conn.poll_redis(manager.unread_idx.1) { + manager.unread_idx.1 += len; + while let Ok(Async::Ready(msg)) = manager.poll() { + if let Some((_tl, event)) = msg { + println!("Parsing Event #{:03}", i + 1); + assert_eq!(event, output(i)); + i += 1; + } + } + } + assert_eq!(i, 3); + + manager.redis_conn.add(&input(4)[50..]); + manager.redis_conn.add(&input(5)); + manager.redis_conn.add(&input(6)); + while let Ok(Async::Ready(Some(len))) = manager.redis_conn.poll_redis(manager.unread_idx.1) { + manager.unread_idx.1 += len; + while let Ok(Async::Ready(msg)) = manager.poll() { + if let Some((_tl, event)) = msg { + println!("Parsing Event #{:03}", i + 1); + assert_eq!(event, output(i)); + i += 1; + } + } + } + + Ok(assert_eq!(i, 6)) +} + +#[test] +fn manager_poll_handles_full_channel() -> TestResult { + let mut manager = Manager::try_from(&config::Redis::default())?; + for i in 1..=6 { + manager.redis_conn.add(&input(i)); + } + let (mut i, channel_full) = (0, 3); + 'outer: loop { + while let Ok(Async::Ready(Some(n))) = manager.redis_conn.poll_redis(manager.unread_idx.1) { + manager.unread_idx.1 += n; + while let Ok(Async::Ready(msg)) = manager.poll() { + if let Some((_tl, event)) = msg { + println!("Parsing Event #{:03}", i + 1); + assert_eq!(event, output(i)); + i += 1; + } + // Simulates a `ChannelFull` error after sending `channel_full` msgs + if i == channel_full { + break 'outer; + } + } + } + } + + let _rewind = (|| { + manager.rewind_to_prev_msg(); + i -= 1; + })(); + + while let Ok(Async::Ready(Some(len))) = manager.redis_conn.poll_redis(manager.unread_idx.1) { + manager.unread_idx.1 += len; + while let Ok(Async::Ready(msg)) = manager.poll() { + if let Some((_tl, event)) = msg { + println!("Parsing Event #{:03}", i + 1); + assert_eq!(event, output(i)); + i += 1; + } + } + } + + Ok(assert_eq!(i, 6)) +} diff --git a/src/response/redis/msg.rs b/src/response/redis/msg.rs index e2cdbd9..7b9d16b 100644 --- a/src/response/redis/msg.rs +++ b/src/response/redis/msg.rs @@ -27,15 +27,15 @@ use std::str; mod err; #[derive(Debug, Clone, PartialEq)] -pub(crate) enum RedisParseOutput<'a> { +pub enum RedisParseOutput<'a> { Msg(RedisMsg<'a>), NonMsg(&'a str), } #[derive(Debug, Clone, PartialEq)] -pub(crate) struct RedisMsg<'a> { - pub(crate) timeline_txt: &'a str, - pub(crate) event_txt: &'a str, +pub struct RedisMsg<'a> { + pub timeline_txt: &'a str, + pub event_txt: &'a str, pub(crate) leftover_input: &'a str, } diff --git a/src/response/redis/msg/err.rs b/src/response/redis/msg/err.rs index 95c19db..0c655b0 100644 --- a/src/response/redis/msg/err.rs +++ b/src/response/redis/msg/err.rs @@ -15,8 +15,8 @@ impl fmt::Display for RedisParseErr { use RedisParseErr::*; let msg = match self { Incomplete => "The input from Redis does not form a complete message, likely because \ - the input buffer filled partway through a message. Save this input \ - and try again with additional input from Redis." + the input buffer filled partway through a message. Save this input and try again \ + with additional input from Redis." .to_string(), InvalidNumber(parse_int_err) => format!( "Redis indicated that an item would be a number, but it could not be parsed: {}", @@ -25,20 +25,19 @@ impl fmt::Display for RedisParseErr { InvalidLineStart(line_start_char) => format!( "A line from Redis started with `{}`, which is not a valid character to indicate \ - the type of the Redis line.", + the type of the Redis line.", line_start_char ), - InvalidLineEnd(len, line) => format!( // TODO - FIXME - "A Redis line did not have the promised length of {}. \ - The line is: {}", + InvalidLineEnd(len, line) => format!( + "A Redis line did not have the promised length of {}. The line is: {}", len, line ), IncorrectRedisType => "Received a Redis type that is not supported in this context. \ - Flodgatt expects each message from Redis to be a Redis array \ - consisting of bulk strings or integers." + Flodgatt expects each message from Redis to be a Redis array consisting of bulk \ + strings or integers." .to_string(), MissingField => "Redis input was missing a field Flodgatt expected (e.g., a `message` \ - without a payload line)" + without a payload line)" .to_string(), }; write!(f, "{}", msg)