Reorganize code, pt2 (#112)

* Cleanup RedisMsg parsing [WIP]

* Add tests to Redis parsing

* WIP RedisMsg refactor

Committing WIP before trying a different approach

* WIP

* Refactor RedisConn and Receiver

* Finish second reorganization
This commit is contained in:
Daniel Sockwell 2020-03-30 18:54:00 -04:00 committed by GitHub
parent 0acbde3eee
commit 5965a514fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1187 additions and 989 deletions

View File

@ -1,7 +1,7 @@
[package] [package]
name = "flodgatt" name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.6.5" version = "0.6.6"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"] authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018" edition = "2018"

View File

@ -1,60 +1,46 @@
use criterion::black_box; use criterion::{black_box, criterion_group, criterion_main, Criterion};
use criterion::criterion_group; use flodgatt::{
use criterion::criterion_main; messages::Event,
use criterion::Criterion; parse_client_request::{Content::*, Reach::*, Stream::*, Timeline},
redis_to_client_stream::redis_msg::{RedisMsg, RedisParseOutput},
};
use lru::LruCache;
use std::convert::TryFrom;
/// Parse using Flodgatt's current functions fn parse_long_redis_input<'a>(input: &'a str) -> RedisMsg<'a> {
mod flodgatt_parse_event { if let RedisParseOutput::Msg(msg) = RedisParseOutput::try_from(input).unwrap() {
use flodgatt::{ assert_eq!(msg.timeline_txt, "timeline:1");
messages::Event, msg
parse_client_request::Timeline, } else {
redis_to_client_stream::{process_messages, MessageQueues, MsgQueue}, panic!()
};
use lru::LruCache;
use std::collections::HashMap;
use uuid::Uuid;
/// One-time setup, not included in testing time.
pub fn setup() -> (LruCache<String, i64>, MessageQueues, Uuid, Timeline) {
let mut cache: LruCache<String, i64> = LruCache::new(1000);
let mut queues_map = HashMap::new();
let id = Uuid::default();
let timeline =
Timeline::from_redis_raw_timeline("timeline:1", &mut cache, &None).expect("In test");
queues_map.insert(id, MsgQueue::new(timeline));
let queues = MessageQueues(queues_map);
(cache, queues, id, timeline)
}
pub fn to_event_struct(
input: String,
mut cache: &mut LruCache<String, i64>,
mut queues: &mut MessageQueues,
id: Uuid,
timeline: Timeline,
) -> Event {
process_messages(&input, &mut cache, &mut None, &mut queues);
queues
.oldest_msg_in_target_queue(id, timeline)
.expect("In test")
} }
} }
fn parse_to_timeline(msg: RedisMsg) -> Timeline {
let tl = Timeline::from_redis_text(msg.timeline_txt, &mut LruCache::new(1000), &None).unwrap();
assert_eq!(tl, Timeline(User(1), Federated, All));
tl
}
fn parse_to_event(msg: RedisMsg) -> Event {
serde_json::from_str(msg.event_txt).unwrap()
}
fn criterion_benchmark(c: &mut Criterion) { fn criterion_benchmark(c: &mut Criterion) {
let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS.to_string(); let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS;
let mut group = c.benchmark_group("Parse redis RESP array"); let mut group = c.benchmark_group("Parse redis RESP array");
let (mut cache, mut queues, id, timeline) = flodgatt_parse_event::setup(); group.bench_function("parse redis input to RedisMsg", |b| {
group.bench_function("parse to Event using Flodgatt functions", |b| { b.iter(|| black_box(parse_long_redis_input(input)))
b.iter(|| { });
black_box(flodgatt_parse_event::to_event_struct(
black_box(input.clone()), let msg = parse_long_redis_input(input);
black_box(&mut cache), group.bench_function("parse RedisMsg to Timeline", |b| {
black_box(&mut queues), b.iter(|| black_box(parse_to_timeline(msg.clone())))
black_box(id), });
black_box(timeline),
)) group.bench_function("parse RedisMsg to Event", |b| {
}) b.iter(|| black_box(parse_to_event(msg.clone())))
}); });
} }

View File

@ -1,6 +1,6 @@
use std::fmt::Display; use std::{error::Error, fmt};
pub fn die_with_msg(msg: impl Display) -> ! { pub fn die_with_msg(msg: impl fmt::Display) -> ! {
eprintln!("FATAL ERROR: {}", msg); eprintln!("FATAL ERROR: {}", msg);
std::process::exit(1); std::process::exit(1);
} }
@ -16,7 +16,50 @@ macro_rules! log_fatal {
#[derive(Debug)] #[derive(Debug)]
pub enum RedisParseErr { pub enum RedisParseErr {
Incomplete, Incomplete,
Unrecoverable, InvalidNumber(std::num::ParseIntError),
NonNumericInput,
InvalidLineStart(String),
InvalidLineEnd,
IncorrectRedisType,
MissingField,
UnsupportedTimeline,
UnsupportedEvent(serde_json::Error),
}
impl fmt::Display for RedisParseErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{}", match self {
Self::Incomplete => "The input from Redis does not form a complete message, likely because the input buffer filled partway through a message. Save this input and try again with additional input from Redis.".to_string(),
Self::InvalidNumber(e) => format!( "Redis input cannot be parsed: {}", e),
Self::NonNumericInput => "Received non-numeric input when expecting a Redis number".to_string(),
Self::InvalidLineStart(s) => format!("Got `{}` as a line start from Redis", s),
Self::InvalidLineEnd => "Redis input ended before promised length".to_string(),
Self::IncorrectRedisType => "Received a non-array when expecting a Redis array".to_string(),
Self::MissingField => "Redis input was missing a required field".to_string(),
Self::UnsupportedTimeline => "The raw timeline received from Redis could not be parsed into a supported timeline".to_string(),
Self::UnsupportedEvent(e) => format!("The event text from Redis could not be parsed into a valid event: {}", e)
})
}
}
impl Error for RedisParseErr {}
impl From<std::num::ParseIntError> for RedisParseErr {
fn from(error: std::num::ParseIntError) -> Self {
Self::InvalidNumber(error)
}
}
impl From<serde_json::Error> for RedisParseErr {
fn from(error: serde_json::Error) -> Self {
Self::UnsupportedEvent(error)
}
}
impl From<TimelineErr> for RedisParseErr {
fn from(_: TimelineErr) -> Self {
Self::UnsupportedTimeline
}
} }
#[derive(Debug)] #[derive(Debug)]

File diff suppressed because one or more lines are too long

View File

@ -9,5 +9,5 @@ pub use self::postgres::PgPool;
// TODO consider whether we can remove `Stream` from public API // TODO consider whether we can remove `Stream` from public API
pub use subscription::{Stream, Subscription, Timeline}; pub use subscription::{Stream, Subscription, Timeline};
#[cfg(test)] //#[cfg(test)]
pub use subscription::{Content, Reach}; pub use subscription::{Content, Reach};

View File

@ -196,109 +196,120 @@ impl Timeline {
} }
} }
pub fn from_redis_raw_timeline( pub fn from_redis_text(
timeline: &str, timeline: &str,
cache: &mut LruCache<String, i64>, cache: &mut LruCache<String, i64>,
namespace: &Option<String>,
) -> Result<Self, TimelineErr> { ) -> Result<Self, TimelineErr> {
use crate::err::TimelineErr::RedisNamespaceMismatch; let mut id_from_tag = |tag: &str| match cache.get(&tag.to_string()) {
use {Content::*, Reach::*, Stream::*}; Some(id) => Ok(*id),
let timeline_slice = &timeline.split(":").collect::<Vec<&str>>()[..]; None => Err(TimelineErr::InvalidInput), // TODO more specific
#[rustfmt::skip]
let (stream, reach, content) = if let Some(ns) = namespace {
match timeline_slice {
[n, "timeline", "public"] if n == ns => (Public, Federated, All),
[_, "timeline", "public"]
| ["timeline", "public"] => Err(RedisNamespaceMismatch)?,
[n, "timeline", "public", "local"] if ns == n => (Public, Local, All),
[_, "timeline", "public", "local"]
| ["timeline", "public", "local"] => Err(RedisNamespaceMismatch)?,
[n, "timeline", "public", "media"] if ns == n => (Public, Federated, Media),
[_, "timeline", "public", "media"]
| ["timeline", "public", "media"] => Err(RedisNamespaceMismatch)?,
[n, "timeline", "public", "local", "media"] if ns == n => (Public, Local, Media),
[_, "timeline", "public", "local", "media"]
| ["timeline", "public", "local", "media"] => Err(RedisNamespaceMismatch)?,
[n, "timeline", "hashtag", tag_name] if ns == n => {
let tag_id = *cache
.get(&tag_name.to_string())
.unwrap_or_else(|| log_fatal!("No cached id for `{}`", tag_name));
(Hashtag(tag_id), Federated, All)
}
[_, "timeline", "hashtag", _tag]
| ["timeline", "hashtag", _tag] => Err(RedisNamespaceMismatch)?,
[n, "timeline", "hashtag", _tag, "local"] if ns == n => (Hashtag(0), Local, All),
[_, "timeline", "hashtag", _tag, "local"]
| ["timeline", "hashtag", _tag, "local"] => Err(RedisNamespaceMismatch)?,
[n, "timeline", id] if ns == n => (User(id.parse().unwrap()), Federated, All),
[_, "timeline", _id]
| ["timeline", _id] => Err(RedisNamespaceMismatch)?,
[n, "timeline", id, "notification"] if ns == n =>
(User(id.parse()?), Federated, Notification),
[_, "timeline", _id, "notification"]
| ["timeline", _id, "notification"] => Err(RedisNamespaceMismatch)?,
[n, "timeline", "list", id] if ns == n => (List(id.parse()?), Federated, All),
[_, "timeline", "list", _id]
| ["timeline", "list", _id] => Err(RedisNamespaceMismatch)?,
[n, "timeline", "direct", id] if ns == n => (Direct(id.parse()?), Federated, All),
[_, "timeline", "direct", _id]
| ["timeline", "direct", _id] => Err(RedisNamespaceMismatch)?,
[..] => log_fatal!("Unexpected channel from Redis: {:?}", timeline_slice),
}
} else {
match timeline_slice {
["timeline", "public"] => (Public, Federated, All),
[_, "timeline", "public"] => Err(RedisNamespaceMismatch)?,
["timeline", "public", "local"] => (Public, Local, All),
[_, "timeline", "public", "local"] => Err(RedisNamespaceMismatch)?,
["timeline", "public", "media"] => (Public, Federated, Media),
[_, "timeline", "public", "media"] => Err(RedisNamespaceMismatch)?,
["timeline", "public", "local", "media"] => (Public, Local, Media),
[_, "timeline", "public", "local", "media"] => Err(RedisNamespaceMismatch)?,
["timeline", "hashtag", _tag] => (Hashtag(0), Federated, All),
[_, "timeline", "hashtag", _tag] => Err(RedisNamespaceMismatch)?,
["timeline", "hashtag", _tag, "local"] => (Hashtag(0), Local, All),
[_, "timeline", "hashtag", _tag, "local"] => Err(RedisNamespaceMismatch)?,
["timeline", id] => (User(id.parse().unwrap()), Federated, All),
[_, "timeline", _id] => Err(RedisNamespaceMismatch)?,
["timeline", id, "notification"] => {
(User(id.parse().unwrap()), Federated, Notification)
}
[_, "timeline", _id, "notification"] => Err(RedisNamespaceMismatch)?,
["timeline", "list", id] => (List(id.parse().unwrap()), Federated, All),
[_, "timeline", "list", _id] => Err(RedisNamespaceMismatch)?,
["timeline", "direct", id] => (Direct(id.parse().unwrap()), Federated, All),
[_, "timeline", "direct", _id] => Err(RedisNamespaceMismatch)?,
// Other endpoints don't exist:
[..] => Err(TimelineErr::InvalidInput)?,
}
}; };
Ok(Timeline(stream, reach, content)) use {Content::*, Reach::*, Stream::*};
Ok(match &timeline.split(":").collect::<Vec<&str>>()[..] {
["public"] => Timeline(Public, Federated, All),
["public", "local"] => Timeline(Public, Local, All),
["public", "media"] => Timeline(Public, Federated, Media),
["public", "local", "media"] => Timeline(Public, Local, Media),
["hashtag", tag] => Timeline(Hashtag(id_from_tag(tag)?), Federated, All),
["hashtag", tag, "local"] => Timeline(Hashtag(id_from_tag(tag)?), Local, All),
[id] => Timeline(User(id.parse().unwrap()), Federated, All),
[id, "notification"] => Timeline(User(id.parse().unwrap()), Federated, Notification),
["list", id] => Timeline(List(id.parse().unwrap()), Federated, All),
["direct", id] => Timeline(Direct(id.parse().unwrap()), Federated, All),
// Other endpoints don't exist:
[..] => Err(TimelineErr::InvalidInput)?,
})
// let (stream, reach, content) = if let Some(ns) = namespace {
// match timeline_slice {
// [n, "timeline", "public"] if n == ns => (Public, Federated, All),
// [_, "timeline", "public"]
// | ["timeline", "public"] => Err(RedisNamespaceMismatch)?,
// [n, "timeline", "public", "local"] if ns == n => (Public, Local, All),
// [_, "timeline", "public", "local"]
// | ["timeline", "public", "local"] => Err(RedisNamespaceMismatch)?,
// [n, "timeline", "public", "media"] if ns == n => (Public, Federated, Media),
// [_, "timeline", "public", "media"]
// | ["timeline", "public", "media"] => Err(RedisNamespaceMismatch)?,
// [n, "timeline", "public", "local", "media"] if ns == n => (Public, Local, Media),
// [_, "timeline", "public", "local", "media"]
// | ["timeline", "public", "local", "media"] => Err(RedisNamespaceMismatch)?,
// [n, "timeline", "hashtag", tag_name] if ns == n => {
// let tag_id = *cache
// .get(&tag_name.to_string())
// .unwrap_or_else(|| log_fatal!("No cached id for `{}`", tag_name));
// (Hashtag(tag_id), Federated, All)
// }
// [_, "timeline", "hashtag", _tag]
// | ["timeline", "hashtag", _tag] => Err(RedisNamespaceMismatch)?,
// [n, "timeline", "hashtag", _tag, "local"] if ns == n => (Hashtag(0), Local, All),
// [_, "timeline", "hashtag", _tag, "local"]
// | ["timeline", "hashtag", _tag, "local"] => Err(RedisNamespaceMismatch)?,
// [n, "timeline", id] if ns == n => (User(id.parse().unwrap()), Federated, All),
// [_, "timeline", _id]
// | ["timeline", _id] => Err(RedisNamespaceMismatch)?,
// [n, "timeline", id, "notification"] if ns == n =>
// (User(id.parse()?), Federated, Notification),
// [_, "timeline", _id, "notification"]
// | ["timeline", _id, "notification"] => Err(RedisNamespaceMismatch)?,
// [n, "timeline", "list", id] if ns == n => (List(id.parse()?), Federated, All),
// [_, "timeline", "list", _id]
// | ["timeline", "list", _id] => Err(RedisNamespaceMismatch)?,
// [n, "timeline", "direct", id] if ns == n => (Direct(id.parse()?), Federated, All),
// [_, "timeline", "direct", _id]
// | ["timeline", "direct", _id] => Err(RedisNamespaceMismatch)?,
// [..] => log_fatal!("Unexpected channel from Redis: {:?}", timeline_slice),
// }
// } else {
// match timeline_slice {
// ["timeline", "public"] => (Public, Federated, All),
// [_, "timeline", "public"] => Err(RedisNamespaceMismatch)?,
// ["timeline", "public", "local"] => (Public, Local, All),
// [_, "timeline", "public", "local"] => Err(RedisNamespaceMismatch)?,
// ["timeline", "public", "media"] => (Public, Federated, Media),
// [_, "timeline", "public", "media"] => Err(RedisNamespaceMismatch)?,
// ["timeline", "public", "local", "media"] => (Public, Local, Media),
// [_, "timeline", "public", "local", "media"] => Err(RedisNamespaceMismatch)?,
// ["timeline", "hashtag", _tag] => (Hashtag(0), Federated, All),
// [_, "timeline", "hashtag", _tag] => Err(RedisNamespaceMismatch)?,
// ["timeline", "hashtag", _tag, "local"] => (Hashtag(0), Local, All),
// [_, "timeline", "hashtag", _tag, "local"] => Err(RedisNamespaceMismatch)?,
// ["timeline", id] => (User(id.parse().unwrap()), Federated, All),
// [_, "timeline", _id] => Err(RedisNamespaceMismatch)?,
// ["timeline", id, "notification"] => {
// (User(id.parse().unwrap()), Federated, Notification)
// }
// [_, "timeline", _id, "notification"] => Err(RedisNamespaceMismatch)?,
// ["timeline", "list", id] => (List(id.parse().unwrap()), Federated, All),
// [_, "timeline", "list", _id] => Err(RedisNamespaceMismatch)?,
// ["timeline", "direct", id] => (Direct(id.parse().unwrap()), Federated, All),
// [_, "timeline", "direct", _id] => Err(RedisNamespaceMismatch)?,
// // Other endpoints don't exist:
// [..] => Err(TimelineErr::InvalidInput)?,
// }
// };
} }
fn from_query_and_user(q: &Query, user: &UserData, pool: PgPool) -> Result<Self, Rejection> { fn from_query_and_user(q: &Query, user: &UserData, pool: PgPool) -> Result<Self, Rejection> {
use {warp::reject::custom, Content::*, Reach::*, Scope::*, Stream::*}; use {warp::reject::custom, Content::*, Reach::*, Scope::*, Stream::*};

View File

@ -18,6 +18,7 @@
use super::receiver::Receiver; use super::receiver::Receiver;
use crate::{ use crate::{
config, config,
err::RedisParseErr,
messages::Event, messages::Event,
parse_client_request::{Stream::Public, Subscription, Timeline}, parse_client_request::{Stream::Public, Subscription, Timeline},
}; };
@ -26,7 +27,6 @@ use futures::{
Poll, Poll,
}; };
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use tokio::io::Error;
use uuid::Uuid; use uuid::Uuid;
/// Struct for managing all Redis streams. /// Struct for managing all Redis streams.
@ -82,7 +82,7 @@ impl ClientAgent {
/// The stream that the `ClientAgent` manages. `Poll` is the only method implemented. /// The stream that the `ClientAgent` manages. `Poll` is the only method implemented.
impl futures::stream::Stream for ClientAgent { impl futures::stream::Stream for ClientAgent {
type Item = Event; type Item = Event;
type Error = Error; type Error = RedisParseErr;
/// Checks for any new messages that should be sent to the client. /// Checks for any new messages that should be sent to the client.
/// ///

View File

@ -1,103 +1,101 @@
use super::ClientAgent; use super::ClientAgent;
use warp::ws::WebSocket;
use futures::{future::Future, stream::Stream, Async}; use futures::{future::Future, stream::Stream, Async};
use log; use log;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use warp::{
reply::Reply,
sse::Sse,
ws::{Message, WebSocket},
};
pub struct EventStream; pub struct EventStream;
impl EventStream { impl EventStream {
/// Send a stream of replies to a WebSocket client.
/// Send a stream of replies to a WebSocket client.
pub fn to_ws( pub fn to_ws(
socket: WebSocket, ws: WebSocket,
mut client_agent: ClientAgent, mut client_agent: ClientAgent,
update_interval: Duration, interval: Duration,
) -> impl Future<Item = (), Error = ()> { ) -> impl Future<Item = (), Error = ()> {
let (ws_tx, mut ws_rx) = socket.split(); let (ws_tx, mut ws_rx) = ws.split();
let timeline = client_agent.subscription.timeline; let timeline = client_agent.subscription.timeline;
// Create a pipe // Create a pipe
let (tx, rx) = futures::sync::mpsc::unbounded(); let (tx, rx) = futures::sync::mpsc::unbounded();
// Send one end of it to a different thread and tell that end to forward whatever it gets // Send one end of it to a different thread and tell that end to forward whatever it gets
// on to the websocket client // on to the websocket client
warp::spawn( warp::spawn(
rx.map_err(|()| -> warp::Error { unreachable!() }) rx.map_err(|()| -> warp::Error { unreachable!() })
.forward(ws_tx) .forward(ws_tx)
.map(|_r| ()) .map(|_r| ())
.map_err(|e| match e.to_string().as_ref() { .map_err(|e| match e.to_string().as_ref() {
"IO error: Broken pipe (os error 32)" => (), // just closed unix socket "IO error: Broken pipe (os error 32)" => (), // just closed unix socket
_ => log::warn!("websocket send error: {}", e), _ => log::warn!("websocket send error: {}", e),
}), }),
); );
// Yield new events for as long as the client is still connected // Yield new events for as long as the client is still connected
let event_stream = tokio::timer::Interval::new(Instant::now(), update_interval).take_while( let event_stream =
move |_| match ws_rx.poll() { tokio::timer::Interval::new(Instant::now(), interval).take_while(move |_| {
Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true), match ws_rx.poll() {
Ok(Async::Ready(None)) => { Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true),
Ok(Async::Ready(None)) => {
// TODO: consider whether we should manually drop closed connections here
log::info!("Client closed WebSocket connection for {:?}", timeline);
futures::future::ok(false)
}
Err(e) if e.to_string() == "IO error: Broken pipe (os error 32)" => {
// no err, just closed Unix socket
log::info!("Client closed WebSocket connection for {:?}", timeline);
futures::future::ok(false)
}
Err(e) => {
log::warn!("Error in {:?}: {}", timeline, e);
futures::future::ok(false)
}
}
});
let mut time = Instant::now();
// Every time you get an event from that stream, send it through the pipe
event_stream
.for_each(move |_instant| {
if let Ok(Async::Ready(Some(msg))) = client_agent.poll() {
tx.unbounded_send(Message::text(msg.to_json_string()))
.expect("No send error");
};
if time.elapsed() > Duration::from_secs(30) {
tx.unbounded_send(Message::text("{}")).expect("Can ping");
time = Instant::now();
}
Ok(())
})
.then(move |result| {
// TODO: consider whether we should manually drop closed connections here // TODO: consider whether we should manually drop closed connections here
log::info!("Client closed WebSocket connection for {:?}", timeline); log::info!("WebSocket connection for {:?} closed.", timeline);
futures::future::ok(false) result
} })
Err(e) if e.to_string() == "IO error: Broken pipe (os error 32)" => { .map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e))
// no err, just closed Unix socket }
log::info!("Client closed WebSocket connection for {:?}", timeline); pub fn to_sse(mut client_agent: ClientAgent, sse: Sse, interval: Duration) -> impl Reply {
futures::future::ok(false) let event_stream =
} tokio::timer::Interval::new(Instant::now(), interval).filter_map(move |_| {
Err(e) => { match client_agent.poll() {
log::warn!("Error in {:?}: {}", timeline, e); Ok(Async::Ready(Some(event))) => Some((
futures::future::ok(false) warp::sse::event(event.event_name()),
} warp::sse::data(event.payload().unwrap_or_else(String::new)),
}, )),
); _ => None,
}
});
let mut time = Instant::now(); sse.reply(
// Every time you get an event from that stream, send it through the pipe warp::sse::keep_alive()
event_stream .interval(Duration::from_secs(30))
.for_each(move |_instant| { .text("thump".to_string())
if let Ok(Async::Ready(Some(msg))) = client_agent.poll() { .stream(event_stream),
tx.unbounded_send(warp::ws::Message::text(msg.to_json_string())) )
.expect("No send error"); }
};
if time.elapsed() > Duration::from_secs(30) {
tx.unbounded_send(warp::ws::Message::text("{}"))
.expect("Can ping");
time = Instant::now();
}
Ok(())
})
.then(move |result| {
// TODO: consider whether we should manually drop closed connections here
log::info!("WebSocket connection for {:?} closed.", timeline);
result
})
.map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e))
}
pub fn to_sse(
mut client_agent: ClientAgent,
connection: warp::sse::Sse,
update_interval: Duration,
) ->impl warp::reply::Reply {
let event_stream =
tokio::timer::Interval::new(Instant::now(), update_interval).filter_map(move |_| {
match client_agent.poll() {
Ok(Async::Ready(Some(event))) => Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
)),
_ => None,
}
});
connection.reply(
warp::sse::keep_alive()
.interval(Duration::from_secs(30))
.text("thump".to_string())
.stream(event_stream),
)
}
} }

View File

@ -6,9 +6,12 @@ mod redis;
pub use {client_agent::ClientAgent, event_stream::EventStream}; pub use {client_agent::ClientAgent, event_stream::EventStream};
#[cfg(test)] // TODO remove
pub use receiver::process_messages; pub use redis::redis_msg;
#[cfg(test)]
//#[cfg(test)]
//pub use receiver::process_messages;
//#[cfg(test)]
pub use receiver::{MessageQueues, MsgQueue}; pub use receiver::{MessageQueues, MsgQueue};
#[cfg(test)] //#[cfg(test)]
pub use redis::redis_msg::RedisMsg; //pub use redis::redis_msg::{RedisMsg, RedisUtf8};

View File

@ -10,75 +10,41 @@ use crate::{
err::RedisParseErr, err::RedisParseErr,
messages::Event, messages::Event,
parse_client_request::{Stream, Timeline}, parse_client_request::{Stream, Timeline},
pubsub_cmd, redis_to_client_stream::redis::RedisConn,
redis_to_client_stream::redis::redis_msg::RedisMsg,
redis_to_client_stream::redis::{redis_cmd, RedisConn},
}; };
use futures::{Async, Poll}; use futures::{Async, Poll};
use lru::LruCache; use lru::LruCache;
use tokio::io::AsyncRead; use std::{collections::HashMap, time::Instant};
use std::{
collections::HashMap,
io::Read,
net, str,
time::{Duration, Instant},
};
use tokio::io::Error;
use uuid::Uuid; use uuid::Uuid;
/// The item that streams from Redis and is polled by the `ClientAgent` /// The item that streams from Redis and is polled by the `ClientAgent`
#[derive(Debug)] #[derive(Debug)]
pub struct Receiver { pub struct Receiver {
pub pubsub_connection: net::TcpStream, redis_connection: RedisConn,
secondary_redis_connection: net::TcpStream,
redis_poll_interval: Duration,
redis_polled_at: Instant,
timeline: Timeline, timeline: Timeline,
manager_id: Uuid, manager_id: Uuid,
pub msg_queues: MessageQueues, pub msg_queues: MessageQueues,
clients_per_timeline: HashMap<Timeline, i32>, clients_per_timeline: HashMap<Timeline, i32>,
cache: Cache, hashtag_cache: LruCache<i64, String>,
redis_input: Vec<u8>,
redis_namespace: Option<String>,
}
#[derive(Debug)]
pub struct Cache {
// TODO: eventually, it might make sense to have Mastodon publish to timelines with // TODO: eventually, it might make sense to have Mastodon publish to timelines with
// the tag number instead of the tag name. This would save us from dealing // the tag number instead of the tag name. This would save us from dealing
// with a cache here and would be consistent with how lists/users are handled. // with a cache here and would be consistent with how lists/users are handled.
id_to_hashtag: LruCache<i64, String>,
pub hashtag_to_id: LruCache<String, i64>,
} }
impl Receiver { impl Receiver {
/// Create a new `Receiver`, with its own Redis connections (but, as yet, no /// Create a new `Receiver`, with its own Redis connections (but, as yet, no
/// active subscriptions). /// active subscriptions).
pub fn new(redis_cfg: config::RedisConfig) -> Self { pub fn new(redis_cfg: config::RedisConfig) -> Self {
let redis_namespace = redis_cfg.namespace.clone(); let redis_connection = RedisConn::new(redis_cfg);
let RedisConn {
primary: pubsub_connection,
secondary: secondary_redis_connection,
polling_interval: redis_poll_interval,
} = RedisConn::new(redis_cfg);
Self { Self {
pubsub_connection, redis_connection,
secondary_redis_connection,
redis_poll_interval,
redis_polled_at: Instant::now(),
timeline: Timeline::empty(), timeline: Timeline::empty(),
manager_id: Uuid::default(), manager_id: Uuid::default(),
msg_queues: MessageQueues(HashMap::new()), msg_queues: MessageQueues(HashMap::new()),
clients_per_timeline: HashMap::new(), clients_per_timeline: HashMap::new(),
cache: Cache { hashtag_cache: LruCache::new(1000),
id_to_hashtag: LruCache::new(1000), // should this be a run-time option?
hashtag_to_id: LruCache::new(1000),
}, // should these be run-time options?
redis_input: Vec::new(),
redis_namespace,
} }
} }
@ -91,8 +57,8 @@ impl Receiver {
pub fn manage_new_timeline(&mut self, id: Uuid, tl: Timeline, hashtag: Option<String>) { pub fn manage_new_timeline(&mut self, id: Uuid, tl: Timeline, hashtag: Option<String>) {
self.timeline = tl; self.timeline = tl;
if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (hashtag, tl) { if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (hashtag, tl) {
self.cache.id_to_hashtag.put(id, hashtag.clone()); self.hashtag_cache.put(id, hashtag.clone());
self.cache.hashtag_to_id.put(hashtag, id); self.redis_connection.update_cache(hashtag, id);
}; };
self.msg_queues.insert(id, MsgQueue::new(tl)); self.msg_queues.insert(id, MsgQueue::new(tl));
@ -117,7 +83,7 @@ impl Receiver {
for change in timelines_to_modify { for change in timelines_to_modify {
let timeline = change.timeline; let timeline = change.timeline;
let hashtag = match timeline { let hashtag = match timeline {
Timeline(Stream::Hashtag(id), _, _) => self.cache.id_to_hashtag.get(&id), Timeline(Stream::Hashtag(id), _, _) => self.hashtag_cache.get(&id),
_non_hashtag_timeline => None, _non_hashtag_timeline => None,
}; };
@ -129,9 +95,11 @@ impl Receiver {
// If no clients, unsubscribe from the channel // If no clients, unsubscribe from the channel
if *count_of_subscribed_clients <= 0 { if *count_of_subscribed_clients <= 0 {
pubsub_cmd!("unsubscribe", self, timeline.to_redis_raw_timeline(hashtag)); self.redis_connection
.send_unsubscribe_cmd(&timeline.to_redis_raw_timeline(hashtag));
} else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 { } else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 {
pubsub_cmd!("subscribe", self, timeline.to_redis_raw_timeline(hashtag)); self.redis_connection
.send_subscribe_cmd(&timeline.to_redis_raw_timeline(hashtag));
} }
} }
if start_time.elapsed().as_millis() > 1 { if start_time.elapsed().as_millis() > 1 {
@ -143,7 +111,7 @@ impl Receiver {
/// The stream that the ClientAgent polls to learn about new messages. /// The stream that the ClientAgent polls to learn about new messages.
impl futures::stream::Stream for Receiver { impl futures::stream::Stream for Receiver {
type Item = Event; type Item = Event;
type Error = Error; type Error = RedisParseErr;
/// Returns the oldest message in the `ClientAgent`'s queue (if any). /// Returns the oldest message in the `ClientAgent`'s queue (if any).
/// ///
@ -153,27 +121,18 @@ impl futures::stream::Stream for Receiver {
/// been polled lately. /// been polled lately.
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let (timeline, id) = (self.timeline.clone(), self.manager_id); let (timeline, id) = (self.timeline.clone(), self.manager_id);
loop {
if self.redis_polled_at.elapsed() > self.redis_poll_interval { match self.redis_connection.poll_redis() {
let mut buffer = vec![0u8; 6000]; Ok(Async::Ready(Some((timeline, event)))) => self
if let Ok(Async::Ready(bytes_read)) = self.poll_read(&mut buffer) { .msg_queues
let binary_input = buffer[..bytes_read].to_vec(); .values_mut()
let (input, extra_bytes) = match str::from_utf8(&binary_input) { .filter(|msg_queue| msg_queue.timeline == timeline)
Ok(input) => (input, "".as_bytes()), .for_each(|msg_queue| {
Err(e) => { msg_queue.messages.push_back(event.clone());
let (valid, after_valid) = binary_input.split_at(e.valid_up_to()); }),
let input = str::from_utf8(valid).expect("Guaranteed by `.valid_up_to`"); Ok(Async::NotReady) => break,
(input, after_valid) Ok(Async::Ready(None)) => (),
} Err(err) => Err(err)?,
};
let (cache, namespace) = (&mut self.cache.hashtag_to_id, &self.redis_namespace);
let remaining_input =
process_messages(input, cache, namespace, &mut self.msg_queues);
self.redis_input.extend_from_slice(remaining_input);
self.redis_input.extend_from_slice(extra_bytes);
} }
} }
@ -187,49 +146,3 @@ impl futures::stream::Stream for Receiver {
} }
} }
} }
impl Read for Receiver {
fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> {
self.pubsub_connection.read(buffer)
}
}
impl AsyncRead for Receiver {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
match self.read(buf) {
Ok(t) => Ok(Async::Ready(t)),
Err(_) => Ok(Async::NotReady),
}
}
}
#[must_use]
pub fn process_messages<'a>(
input: &'a str,
mut cache: &mut LruCache<String, i64>,
namespace: &Option<String>,
msg_queues: &mut MessageQueues,
) -> &'a [u8] {
let mut remaining_input = input;
use RedisMsg::*;
loop {
match RedisMsg::from_raw(&mut remaining_input, &mut cache, namespace) {
Ok((EventMsg(timeline, event), rest)) => {
for msg_queue in msg_queues.values_mut() {
if msg_queue.timeline == timeline {
msg_queue.messages.push_back(event.clone());
}
}
remaining_input = rest;
}
Ok((SubscriptionMsg, rest)) | Ok((MsgForDifferentNamespace, rest)) => {
remaining_input = rest;
}
Err(RedisParseErr::Incomplete) => break,
Err(RedisParseErr::Unrecoverable) => {
panic!("Failed parsing Redis msg: {}", &remaining_input)
}
};
}
remaining_input.as_bytes()
}

View File

@ -10,7 +10,7 @@ macro_rules! pubsub_cmd {
let namespace = $self.redis_namespace.clone(); let namespace = $self.redis_namespace.clone();
$self $self
.pubsub_connection .primary
.write_all(&redis_cmd::pubsub($cmd, $tl, namespace.clone())) .write_all(&redis_cmd::pubsub($cmd, $tl, namespace.clone()))
.expect("Can send command to Redis"); .expect("Can send command to Redis");
// Because we keep track of the number of clients subscribed to a channel on our end, // Because we keep track of the number of clients subscribed to a channel on our end,
@ -21,7 +21,7 @@ macro_rules! pubsub_cmd {
_ => panic!("Given unacceptable PUBSUB command"), _ => panic!("Given unacceptable PUBSUB command"),
}; };
$self $self
.secondary_redis_connection .secondary
.write_all(&redis_cmd::set( .write_all(&redis_cmd::set(
format!("subscribed:{}", $tl), format!("subscribed:{}", $tl),
subscription_new_number, subscription_new_number,
@ -29,7 +29,7 @@ macro_rules! pubsub_cmd {
)) ))
.expect("Can set Redis"); .expect("Can set Redis");
log::info!("Now subscribed to: {:#?}", $self.msg_queues); // TODO: re-enable info logging >>> log::info!("Now subscribed to: {:#?}", $self.msg_queues);
}}; }};
} }
/// Send a `SUBSCRIBE` or `UNSUBSCRIBE` command to a specific timeline /// Send a `SUBSCRIBE` or `UNSUBSCRIBE` command to a specific timeline

View File

@ -1,12 +1,132 @@
use super::redis_cmd; use super::{redis_cmd, redis_msg::RedisParseOutput};
use crate::config::RedisConfig; use crate::{
use crate::err; config::RedisConfig,
use std::{io::Read, io::Write, net, time::Duration}; err::{self, RedisParseErr},
messages::Event,
parse_client_request::Timeline,
pubsub_cmd,
};
use futures::{Async, Poll};
use lru::LruCache;
use std::{
convert::TryFrom,
io::Read,
io::Write,
net, str,
time::{Duration, Instant},
};
use tokio::io::AsyncRead;
#[derive(Debug)]
pub struct RedisConn { pub struct RedisConn {
pub primary: net::TcpStream, primary: net::TcpStream,
pub secondary: net::TcpStream, secondary: net::TcpStream,
pub polling_interval: Duration, redis_poll_interval: Duration,
redis_polled_at: Instant,
redis_namespace: Option<String>,
cache: LruCache<String, i64>,
redis_input: Vec<u8>, // TODO: Consider queue internal to RedisConn
}
impl RedisConn {
pub fn new(redis_cfg: RedisConfig) -> Self {
let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port);
let conn_err = |e| {
err::die_with_msg(format!(
"Could not connect to Redis at {}:{}.\n Error detail: {}",
*redis_cfg.host, *redis_cfg.port, e,
))
};
let update_conn = |mut conn| {
if let Some(password) = redis_cfg.password.clone() {
conn = send_password(conn, &password);
}
conn = send_test_ping(conn);
conn.set_read_timeout(Some(Duration::from_millis(10)))
.expect("Can set read timeout for Redis connection");
if let Some(db) = &*redis_cfg.db {
conn = set_db(conn, db);
}
conn
};
let (primary_conn, secondary_conn) = (
update_conn(net::TcpStream::connect(addr.clone()).unwrap_or_else(conn_err)),
update_conn(net::TcpStream::connect(addr).unwrap_or_else(conn_err)),
);
primary_conn
.set_nonblocking(true)
.expect("set_nonblocking call failed");
Self {
primary: primary_conn,
secondary: secondary_conn,
cache: LruCache::new(1000),
redis_namespace: redis_cfg.namespace.clone(),
redis_poll_interval: *redis_cfg.polling_interval,
redis_input: Vec::new(),
redis_polled_at: Instant::now(),
}
}
pub fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, RedisParseErr> {
let mut buffer = vec![0u8; 6000];
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
if let Ok(Async::Ready(bytes_read)) = self.poll_read(&mut buffer) {
self.redis_input.extend_from_slice(&buffer[..bytes_read]);
}
}
let input = self.redis_input.clone();
self.redis_input.clear();
let (input, invalid_bytes) = str::from_utf8(&input)
.map(|input| (input, "".as_bytes()))
.unwrap_or_else(|e| {
let (valid, invalid) = input.split_at(e.valid_up_to());
(str::from_utf8(valid).expect("Guaranteed by ^^^^"), invalid)
});
use {Async::*, RedisParseOutput::*};
let (res, leftover) = match RedisParseOutput::try_from(input) {
Ok(Msg(msg)) => match &self.redis_namespace {
Some(ns) if msg.timeline_txt.starts_with(&format!("{}:timeline:", ns)) => {
let tl = Timeline::from_redis_text(
&msg.timeline_txt[ns.len() + ":timeline:".len()..],
&mut self.cache,
)?;
let event: Event = serde_json::from_str(msg.event_txt)?;
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
}
None => {
let tl = Timeline::from_redis_text(
&msg.timeline_txt["timeline:".len()..],
&mut self.cache,
)?;
let event: Event = serde_json::from_str(msg.event_txt)?;
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
}
Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input),
},
Ok(NonMsg(leftover)) => (Ok(Ready(None)), leftover),
Err(RedisParseErr::Incomplete) => (Ok(NotReady), input),
Err(other) => (Err(other), input),
};
self.redis_input.extend_from_slice(leftover.as_bytes());
self.redis_input.extend_from_slice(invalid_bytes);
res
}
pub fn update_cache(&mut self, hashtag: String, id: i64) {
self.cache.put(hashtag, id);
}
pub fn send_unsubscribe_cmd(&mut self, timeline: &str) {
pubsub_cmd!("unsubscribe", self, timeline);
}
pub fn send_subscribe_cmd(&mut self, timeline: &str) {
pubsub_cmd!("subscribe", self, timeline);
}
} }
fn send_password(mut conn: net::TcpStream, password: &str) -> net::TcpStream { fn send_password(mut conn: net::TcpStream, password: &str) -> net::TcpStream {
@ -53,39 +173,17 @@ fn send_test_ping(mut conn: net::TcpStream) -> net::TcpStream {
conn conn
} }
impl RedisConn { impl Read for RedisConn {
pub fn new(redis_cfg: RedisConfig) -> Self { fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> {
let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port); self.primary.read(buffer)
let conn_err = |e| { }
err::die_with_msg(format!( }
"Could not connect to Redis at {}:{}.\n Error detail: {}",
*redis_cfg.host, *redis_cfg.port, e,
))
};
let update_conn = |mut conn| {
if let Some(password) = redis_cfg.password.clone() {
conn = send_password(conn, &password);
}
conn = send_test_ping(conn);
conn.set_read_timeout(Some(Duration::from_millis(10)))
.expect("Can set read timeout for Redis connection");
if let Some(db) = &*redis_cfg.db {
conn = set_db(conn, db);
}
conn
};
let (primary_conn, secondary_conn) = (
update_conn(net::TcpStream::connect(addr.clone()).unwrap_or_else(conn_err)),
update_conn(net::TcpStream::connect(addr).unwrap_or_else(conn_err)),
);
primary_conn
.set_nonblocking(true)
.expect("set_nonblocking call failed");
Self { impl AsyncRead for RedisConn {
primary: primary_conn, fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
secondary: secondary_conn, match self.read(buf) {
polling_interval: *redis_cfg.polling_interval, Ok(t) => Ok(Async::Ready(t)),
Err(_) => Ok(Async::NotReady),
} }
} }
} }

View File

@ -9,8 +9,10 @@
//! //!
//! ```text //! ```text
//! *3\r\n //! *3\r\n
//! $7\r\nmessage\r\n //! $7\r\n
//! $10\r\ntimeline:4\r\n //! message\r\n
//! $10\r\n
//! timeline:4\r\n
//! $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n //! $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n
//! ``` //! ```
//! //!
@ -18,93 +20,236 @@
//! three characters, the second is a bulk string with ten characters, and the third is a //! three characters, the second is a bulk string with ten characters, and the third is a
//! bulk string with 1,386 characters. //! bulk string with 1,386 characters.
use crate::{ use self::RedisParseOutput::*;
err::{RedisParseErr, TimelineErr}, use crate::err::RedisParseErr;
messages::Event, use std::{
parse_client_request::Timeline, convert::{TryFrom, TryInto},
str,
}; };
use lru::LruCache;
type Parser<'a, Item> = Result<(Item, &'a str), RedisParseErr>; #[derive(Debug, Clone, PartialEq)]
pub enum RedisParseOutput<'a> {
/// A message that has been parsed from an incoming raw message from Redis. Msg(RedisMsg<'a>),
#[derive(Debug, Clone)] NonMsg(&'a str),
pub enum RedisMsg {
EventMsg(Timeline, Event),
SubscriptionMsg,
MsgForDifferentNamespace,
} }
use RedisParseErr::*; #[derive(Debug, Clone, PartialEq)]
type Hashtags = LruCache<String, i64>; pub struct RedisMsg<'a> {
impl RedisMsg { pub timeline_txt: &'a str,
pub fn from_raw<'a>( pub event_txt: &'a str,
input: &'a str, pub leftover_input: &'a str,
cache: &mut Hashtags, }
namespace: &Option<String>,
) -> Parser<'a, Self> {
// No need to parse the Redis Array header, just skip it
let input = input.get("*3\r\n".len()..).ok_or(Incomplete)?;
let (command, rest) = parse_redis_bulk_string(&input)?;
match command {
"message" => {
// Messages look like;
// $10\r\ntimeline:4\r\n
// $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n
let (timeline, rest) = parse_redis_bulk_string(&rest)?;
let (msg_txt, rest) = parse_redis_bulk_string(&rest)?;
let event: Event = serde_json::from_str(&msg_txt).map_err(|_| Unrecoverable)?;
use TimelineErr::*; impl<'a> TryFrom<&'a str> for RedisParseOutput<'a> {
match Timeline::from_redis_raw_timeline(timeline, cache, namespace) { type Error = RedisParseErr;
Ok(timeline) => Ok((Self::EventMsg(timeline, event), rest)), fn try_from(utf8: &'a str) -> Result<RedisParseOutput<'a>, Self::Error> {
Err(RedisNamespaceMismatch) => Ok((Self::MsgForDifferentNamespace, rest)), let (structured_txt, leftover_utf8) = utf8_to_redis_data(utf8)?;
Err(InvalidInput) => Err(RedisParseErr::Unrecoverable), let structured_txt = RedisStructuredText {
} structured_txt,
} leftover_input: leftover_utf8,
"subscribe" | "unsubscribe" => { };
// subscription statuses look like: Ok(structured_txt.try_into()?)
// $14\r\ntimeline:local\r\n
// :47\r\n
let (_raw_timeline, rest) = parse_redis_bulk_string(&rest)?;
let (_number_of_subscriptions, rest) = parse_redis_int(&rest)?;
Ok((Self::SubscriptionMsg, &rest))
}
_cmd => Err(Incomplete)?,
}
} }
} }
#[derive(Debug, Clone, PartialEq)]
struct RedisStructuredText<'a> {
structured_txt: RedisData<'a>,
leftover_input: &'a str,
}
#[derive(Debug, Clone, PartialEq)]
enum RedisData<'a> {
RedisArray(Vec<RedisData<'a>>),
BulkString(&'a str),
Integer(usize),
Uninitilized,
}
use RedisData::*;
use RedisParseErr::*;
type RedisParser<'a, Item> = Result<Item, RedisParseErr>;
fn utf8_to_redis_data<'a>(s: &'a str) -> Result<(RedisData, &'a str), RedisParseErr> {
if s.len() < 4 {
Err(Incomplete)?
};
let (first_char, s) = s.split_at(1);
match first_char {
":" => parse_redis_int(s),
"$" => parse_redis_bulk_string(s),
"*" => parse_redis_array(s),
e => Err(InvalidLineStart(format!(
"Encountered invalid initial character `{}` in line `{}`",
e, s
))),
}
}
fn after_newline_at<'a>(s: &'a str, start: usize) -> RedisParser<'a, &'a str> {
let s = s.get(start..).ok_or(Incomplete)?;
if !s.starts_with("\r\n") {
return Err(RedisParseErr::InvalidLineEnd);
}
Ok(s.get("\r\n".len()..).ok_or(Incomplete)?)
}
fn parse_number_at<'a>(s: &'a str) -> RedisParser<(usize, &'a str)> {
let len = s
.chars()
.position(|c| !c.is_numeric())
.ok_or(NonNumericInput)?;
Ok((s[..len].parse()?, after_newline_at(s, len)?))
}
/// Parse a Redis bulk string and return the content of that string and the unparsed remainder. /// Parse a Redis bulk string and return the content of that string and the unparsed remainder.
/// ///
/// All bulk strings have the format `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n` /// All bulk strings have the format `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n`
fn parse_redis_bulk_string(input: &str) -> Parser<&str> { fn parse_redis_bulk_string<'a>(s: &'a str) -> RedisParser<(RedisData, &'a str)> {
let input = &input.get("$".len()..).ok_or(Incomplete)?; let (len, rest) = parse_number_at(s)?;
let (field_len, rest) = parse_redis_length(input)?; let content = rest.get(..len).ok_or(Incomplete)?;
let field_content = rest.get(..field_len).ok_or(Incomplete)?; Ok((BulkString(content), after_newline_at(&rest, len)?))
Ok((field_content, &rest[field_len + "\r\n".len()..]))
} }
fn parse_redis_int(input: &str) -> Parser<usize> { fn parse_redis_int<'a>(s: &'a str) -> RedisParser<(RedisData, &'a str)> {
let input = &input.get(":".len()..).ok_or(Incomplete)?; let (number, rest) = parse_number_at(s)?;
let (number, rest_with_newline) = parse_number_at(input)?; Ok((Integer(number), rest))
let rest = &rest_with_newline.get("\r\n".len()..).ok_or(Incomplete)?;
Ok((number, rest))
} }
/// Return the value of a Redis length (for an array or bulk string) and the unparsed remainder fn parse_redis_array<'a>(s: &'a str) -> RedisParser<(RedisData, &'a str)> {
fn parse_redis_length(input: &str) -> Parser<usize> { let (number_of_elements, mut rest) = parse_number_at(s)?;
let (number, rest_with_newline) = parse_number_at(input)?;
let rest = &rest_with_newline.get("\r\n".len()..).ok_or(Incomplete)?; let mut inner = Vec::with_capacity(number_of_elements);
Ok((number, rest)) inner.resize(number_of_elements, RedisData::Uninitilized);
for i in (0..number_of_elements).rev() {
let (next_el, new_rest) = utf8_to_redis_data(rest)?;
rest = new_rest;
inner[i] = next_el;
}
Ok((RedisData::RedisArray(inner), rest))
} }
fn parse_number_at(input: &str) -> Parser<usize> { impl<'a> TryFrom<RedisData<'a>> for &'a str {
let number_len = input type Error = RedisParseErr;
.chars()
.position(|c| !c.is_numeric()) fn try_from(val: RedisData<'a>) -> Result<Self, Self::Error> {
.ok_or(Unrecoverable)?; match val {
let number = input[..number_len].parse().map_err(|_| Unrecoverable)?; RedisData::BulkString(inner) => Ok(inner),
let rest = &input.get(number_len..).ok_or(Incomplete)?; _ => Err(IncorrectRedisType),
Ok((number, rest)) }
}
} }
impl<'a> TryFrom<RedisStructuredText<'a>> for RedisParseOutput<'a> {
type Error = RedisParseErr;
fn try_from(input: RedisStructuredText<'a>) -> Result<RedisParseOutput<'a>, Self::Error> {
if let RedisData::RedisArray(mut redis_strings) = input.structured_txt {
let command = redis_strings.pop().ok_or(MissingField)?.try_into()?;
match command {
// subscription statuses look like:
// $14\r\ntimeline:local\r\n
// :47\r\n
"subscribe" | "unsubscribe" => Ok(NonMsg(input.leftover_input)),
// Messages look like;
// $10\r\ntimeline:4\r\n
// $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n
"message" => Ok(Msg(RedisMsg {
timeline_txt: redis_strings.pop().ok_or(MissingField)?.try_into()?,
event_txt: redis_strings.pop().ok_or(MissingField)?.try_into()?,
leftover_input: input.leftover_input,
})),
_cmd => Err(Incomplete),
}
} else {
Err(IncorrectRedisType)
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn parse_redis_subscribe() -> Result<(), RedisParseErr> {
let input = "*3\r\n$9\r\nsubscribe\r\n$15\r\ntimeline:public\r\n:1\r\n";
let r_subscribe = match RedisParseOutput::try_from(input) {
Ok(NonMsg(leftover)) => leftover,
Ok(Msg(msg)) => panic!("unexpectedly got a msg: {:?}", msg),
Err(e) => panic!("Error in parsing subscribe command: {:?}", e),
};
assert!(r_subscribe.is_empty());
Ok(())
}
#[test]
fn parse_redis_detects_non_newline() -> Result<(), RedisParseErr> {
let input =
"*3QQ$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$38\r\n{\"event\":\"delete\",\"payload\":\"1038647\"}\r\n";
match RedisParseOutput::try_from(input) {
Ok(NonMsg(leftover)) => panic!(
"Parsed an invalid msg as a non-msg.\nInput `{}` parsed to NonMsg({:?})",
&input, leftover
),
Ok(Msg(msg)) => panic!(
"Parsed an invalid msg as a msg.\nInput `{:?}` parsed to {:?}",
&input, msg
),
Err(_) => (), // should err
};
Ok(())
}
fn parse_redis_msg() -> Result<(), RedisParseErr> {
let input =
"*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$38\r\n{\"event\":\"delete\",\"payload\":\"1038647\"}\r\n";
let r_msg = match RedisParseOutput::try_from(input) {
Ok(NonMsg(leftover)) => panic!(
"Parsed a msg as a non-msg.\nInput `{}` parsed to NonMsg({:?})",
&input, leftover
),
Ok(Msg(msg)) => msg,
Err(e) => panic!("Error in parsing subscribe command: {:?}", e),
};
assert!(r_msg.leftover_input.is_empty());
assert_eq!(r_msg.timeline_txt, "timeline:308");
assert_eq!(r_msg.event_txt, r#"{"event":"delete","payload":"1038647"}"#);
Ok(())
}
}
// #[derive(Debug, Clone, PartialEq, Copy)]
// pub struct RedisUtf8<'a> {
// pub valid_utf8: &'a str,
// pub leftover_bytes: &'a [u8],
// }
// impl<'a> From<&'a [u8]> for RedisUtf8<'a> {
// fn from(bytes: &'a [u8]) -> Self {
// match str::from_utf8(bytes) {
// Ok(valid_utf8) => Self {
// valid_utf8,
// leftover_bytes: "".as_bytes(),
// },
// Err(e) => {
// let (valid, after_valid) = bytes.split_at(e.valid_up_to());
// Self {
// valid_utf8: str::from_utf8(valid).expect("Guaranteed by `.valid_up_to`"),
// leftover_bytes: after_valid,
// }
// }
// }
// }
// }
// impl<'a> Default for RedisUtf8<'a> {
// fn default() -> Self {
// Self::from("".as_bytes())
// }
// }