Finish second reorganization

This commit is contained in:
Daniel Sockwell 2020-03-30 18:39:20 -04:00
parent e1909bf14f
commit 4a66be90ac
7 changed files with 149 additions and 147 deletions

View File

@ -1,7 +1,7 @@
[package]
name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.6.5"
version = "0.6.6"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018"

View File

@ -13,7 +13,7 @@ macro_rules! log_fatal {
};};
}
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug)]
pub enum RedisParseErr {
Incomplete,
InvalidNumber(std::num::ParseIntError),
@ -21,6 +21,9 @@ pub enum RedisParseErr {
InvalidLineStart(String),
InvalidLineEnd,
IncorrectRedisType,
MissingField,
UnsupportedTimeline,
UnsupportedEvent(serde_json::Error),
}
impl fmt::Display for RedisParseErr {
@ -28,7 +31,13 @@ impl fmt::Display for RedisParseErr {
write!(f, "{}", match self {
Self::Incomplete => "The input from Redis does not form a complete message, likely because the input buffer filled partway through a message. Save this input and try again with additional input from Redis.".to_string(),
Self::InvalidNumber(e) => format!( "Redis input cannot be parsed: {}", e),
_ => "TODO".to_string(),
Self::NonNumericInput => "Received non-numeric input when expecting a Redis number".to_string(),
Self::InvalidLineStart(s) => format!("Got `{}` as a line start from Redis", s),
Self::InvalidLineEnd => "Redis input ended before promised length".to_string(),
Self::IncorrectRedisType => "Received a non-array when expecting a Redis array".to_string(),
Self::MissingField => "Redis input was missing a required field".to_string(),
Self::UnsupportedTimeline => "The raw timeline received from Redis could not be parsed into a supported timeline".to_string(),
Self::UnsupportedEvent(e) => format!("The event text from Redis could not be parsed into a valid event: {}", e)
})
}
}
@ -41,6 +50,18 @@ impl From<std::num::ParseIntError> for RedisParseErr {
}
}
impl From<serde_json::Error> for RedisParseErr {
fn from(error: serde_json::Error) -> Self {
Self::UnsupportedEvent(error)
}
}
impl From<TimelineErr> for RedisParseErr {
fn from(_: TimelineErr) -> Self {
Self::UnsupportedTimeline
}
}
#[derive(Debug)]
pub enum TimelineErr {
RedisNamespaceMismatch,

View File

@ -18,6 +18,7 @@
use super::receiver::Receiver;
use crate::{
config,
err::RedisParseErr,
messages::Event,
parse_client_request::{Stream::Public, Subscription, Timeline},
};
@ -26,7 +27,6 @@ use futures::{
Poll,
};
use std::sync::{Arc, Mutex};
use tokio::io::Error;
use uuid::Uuid;
/// Struct for managing all Redis streams.
@ -82,7 +82,7 @@ impl ClientAgent {
/// The stream that the `ClientAgent` manages. `Poll` is the only method implemented.
impl futures::stream::Stream for ClientAgent {
type Item = Event;
type Error = Error;
type Error = RedisParseErr;
/// Checks for any new messages that should be sent to the client.
///

View File

@ -1,103 +1,101 @@
use super::ClientAgent;
use warp::ws::WebSocket;
use futures::{future::Future, stream::Stream, Async};
use log;
use std::time::{Duration, Instant};
use warp::{
reply::Reply,
sse::Sse,
ws::{Message, WebSocket},
};
pub struct EventStream;
impl EventStream {
/// Send a stream of replies to a WebSocket client.
/// Send a stream of replies to a WebSocket client.
pub fn to_ws(
socket: WebSocket,
mut client_agent: ClientAgent,
update_interval: Duration,
) -> impl Future<Item = (), Error = ()> {
let (ws_tx, mut ws_rx) = socket.split();
let timeline = client_agent.subscription.timeline;
ws: WebSocket,
mut client_agent: ClientAgent,
interval: Duration,
) -> impl Future<Item = (), Error = ()> {
let (ws_tx, mut ws_rx) = ws.split();
let timeline = client_agent.subscription.timeline;
// Create a pipe
let (tx, rx) = futures::sync::mpsc::unbounded();
// Create a pipe
let (tx, rx) = futures::sync::mpsc::unbounded();
// Send one end of it to a different thread and tell that end to forward whatever it gets
// on to the websocket client
warp::spawn(
rx.map_err(|()| -> warp::Error { unreachable!() })
.forward(ws_tx)
.map(|_r| ())
.map_err(|e| match e.to_string().as_ref() {
"IO error: Broken pipe (os error 32)" => (), // just closed unix socket
_ => log::warn!("websocket send error: {}", e),
}),
);
// Send one end of it to a different thread and tell that end to forward whatever it gets
// on to the websocket client
warp::spawn(
rx.map_err(|()| -> warp::Error { unreachable!() })
.forward(ws_tx)
.map(|_r| ())
.map_err(|e| match e.to_string().as_ref() {
"IO error: Broken pipe (os error 32)" => (), // just closed unix socket
_ => log::warn!("websocket send error: {}", e),
}),
);
// Yield new events for as long as the client is still connected
let event_stream = tokio::timer::Interval::new(Instant::now(), update_interval).take_while(
move |_| match ws_rx.poll() {
Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true),
Ok(Async::Ready(None)) => {
// Yield new events for as long as the client is still connected
let event_stream =
tokio::timer::Interval::new(Instant::now(), interval).take_while(move |_| {
match ws_rx.poll() {
Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true),
Ok(Async::Ready(None)) => {
// TODO: consider whether we should manually drop closed connections here
log::info!("Client closed WebSocket connection for {:?}", timeline);
futures::future::ok(false)
}
Err(e) if e.to_string() == "IO error: Broken pipe (os error 32)" => {
// no err, just closed Unix socket
log::info!("Client closed WebSocket connection for {:?}", timeline);
futures::future::ok(false)
}
Err(e) => {
log::warn!("Error in {:?}: {}", timeline, e);
futures::future::ok(false)
}
}
});
let mut time = Instant::now();
// Every time you get an event from that stream, send it through the pipe
event_stream
.for_each(move |_instant| {
if let Ok(Async::Ready(Some(msg))) = client_agent.poll() {
tx.unbounded_send(Message::text(msg.to_json_string()))
.expect("No send error");
};
if time.elapsed() > Duration::from_secs(30) {
tx.unbounded_send(Message::text("{}")).expect("Can ping");
time = Instant::now();
}
Ok(())
})
.then(move |result| {
// TODO: consider whether we should manually drop closed connections here
log::info!("Client closed WebSocket connection for {:?}", timeline);
futures::future::ok(false)
}
Err(e) if e.to_string() == "IO error: Broken pipe (os error 32)" => {
// no err, just closed Unix socket
log::info!("Client closed WebSocket connection for {:?}", timeline);
futures::future::ok(false)
}
Err(e) => {
log::warn!("Error in {:?}: {}", timeline, e);
futures::future::ok(false)
}
},
);
log::info!("WebSocket connection for {:?} closed.", timeline);
result
})
.map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e))
}
pub fn to_sse(mut client_agent: ClientAgent, sse: Sse, interval: Duration) -> impl Reply {
let event_stream =
tokio::timer::Interval::new(Instant::now(), interval).filter_map(move |_| {
match client_agent.poll() {
Ok(Async::Ready(Some(event))) => Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
)),
_ => None,
}
});
let mut time = Instant::now();
// Every time you get an event from that stream, send it through the pipe
event_stream
.for_each(move |_instant| {
if let Ok(Async::Ready(Some(msg))) = client_agent.poll() {
tx.unbounded_send(warp::ws::Message::text(msg.to_json_string()))
.expect("No send error");
};
if time.elapsed() > Duration::from_secs(30) {
tx.unbounded_send(warp::ws::Message::text("{}"))
.expect("Can ping");
time = Instant::now();
}
Ok(())
})
.then(move |result| {
// TODO: consider whether we should manually drop closed connections here
log::info!("WebSocket connection for {:?} closed.", timeline);
result
})
.map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e))
}
pub fn to_sse(
mut client_agent: ClientAgent,
connection: warp::sse::Sse,
update_interval: Duration,
) ->impl warp::reply::Reply {
let event_stream =
tokio::timer::Interval::new(Instant::now(), update_interval).filter_map(move |_| {
match client_agent.poll() {
Ok(Async::Ready(Some(event))) => Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
)),
_ => None,
}
});
connection.reply(
warp::sse::keep_alive()
.interval(Duration::from_secs(30))
.text("thump".to_string())
.stream(event_stream),
)
}
sse.reply(
warp::sse::keep_alive()
.interval(Duration::from_secs(30))
.text("thump".to_string())
.stream(event_stream),
)
}
}

View File

@ -7,15 +7,14 @@ pub use message_queues::{MessageQueues, MsgQueue};
use crate::{
config,
err::RedisParseErr,
messages::Event,
parse_client_request::{Stream, Timeline},
redis_to_client_stream::redis::RedisConn,
};
use futures::{Async, Poll};
use lru::LruCache;
use std::{collections::HashMap, time::Instant};
use tokio::io::Error;
use uuid::Uuid;
/// The item that streams from Redis and is polled by the `ClientAgent`
@ -26,17 +25,10 @@ pub struct Receiver {
manager_id: Uuid,
pub msg_queues: MessageQueues,
clients_per_timeline: HashMap<Timeline, i32>,
cache: Cache,
// TODO remove items ^^^^^ moved to RedisConn
}
#[derive(Debug)]
pub struct Cache {
hashtag_cache: LruCache<i64, String>,
// TODO: eventually, it might make sense to have Mastodon publish to timelines with
// the tag number instead of the tag name. This would save us from dealing
// with a cache here and would be consistent with how lists/users are handled.
id_to_hashtag: LruCache<i64, String>,
pub hashtag_to_id: LruCache<String, i64>,
}
impl Receiver {
@ -51,10 +43,8 @@ impl Receiver {
manager_id: Uuid::default(),
msg_queues: MessageQueues(HashMap::new()),
clients_per_timeline: HashMap::new(),
cache: Cache {
id_to_hashtag: LruCache::new(1000),
hashtag_to_id: LruCache::new(1000),
}, // should these be run-time options?
hashtag_cache: LruCache::new(1000),
// should this be a run-time option?
}
}
@ -67,8 +57,7 @@ impl Receiver {
pub fn manage_new_timeline(&mut self, id: Uuid, tl: Timeline, hashtag: Option<String>) {
self.timeline = tl;
if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (hashtag, tl) {
self.cache.id_to_hashtag.put(id, hashtag.clone());
self.cache.hashtag_to_id.put(hashtag.clone(), id);
self.hashtag_cache.put(id, hashtag.clone());
self.redis_connection.update_cache(hashtag, id);
};
@ -94,7 +83,7 @@ impl Receiver {
for change in timelines_to_modify {
let timeline = change.timeline;
let hashtag = match timeline {
Timeline(Stream::Hashtag(id), _, _) => self.cache.id_to_hashtag.get(&id),
Timeline(Stream::Hashtag(id), _, _) => self.hashtag_cache.get(&id),
_non_hashtag_timeline => None,
};
@ -122,7 +111,7 @@ impl Receiver {
/// The stream that the ClientAgent polls to learn about new messages.
impl futures::stream::Stream for Receiver {
type Item = Event;
type Error = Error;
type Error = RedisParseErr;
/// Returns the oldest message in the `ClientAgent`'s queue (if any).
///
@ -143,7 +132,7 @@ impl futures::stream::Stream for Receiver {
}),
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) => (),
Err(_) => todo!(),
Err(err) => Err(err)?,
}
}

View File

@ -1,12 +1,12 @@
use super::{
redis_cmd,
redis_msg::{RedisMsg, RedisParseOutput},
use super::{redis_cmd, redis_msg::RedisParseOutput};
use crate::{
config::RedisConfig,
err::{self, RedisParseErr},
messages::Event,
parse_client_request::Timeline,
pubsub_cmd,
};
use crate::config::RedisConfig;
use crate::err::{self, RedisParseErr};
use crate::messages::Event;
use crate::parse_client_request::Timeline;
use crate::pubsub_cmd;
use futures::{Async, Poll};
use lru::LruCache;
use std::{
@ -78,48 +78,42 @@ impl RedisConn {
}
let input = self.redis_input.clone();
self.redis_input.clear();
let (input, invalid_bytes) = match str::from_utf8(&input) {
Ok(input) => (input, None),
Err(e) => {
let (valid, invalid) = input.split_at(e.valid_up_to());
(str::from_utf8(valid).expect("Guaranteed ^"), Some(invalid))
}
};
let (cache, ns) = (&mut self.cache, &self.redis_namespace);
let (input, invalid_bytes) = str::from_utf8(&input)
.map(|input| (input, "".as_bytes()))
.unwrap_or_else(|e| {
let (valid, invalid) = input.split_at(e.valid_up_to());
(str::from_utf8(valid).expect("Guaranteed by ^^^^"), invalid)
});
use {Async::*, RedisParseOutput::*};
let (res, leftover) = match RedisParseOutput::try_from(input) {
Ok(Msg(msg)) => match ns {
Ok(Msg(msg)) => match &self.redis_namespace {
Some(ns) if msg.timeline_txt.starts_with(&format!("{}:timeline:", ns)) => {
let tl = Timeline::from_redis_text(
&msg.timeline_txt[ns.len() + ":timeline:".len()..],
cache,
)
.unwrap_or_else(|_| todo!());
let event: Event = serde_json::from_str(msg.event_txt).expect("TODO");
&mut self.cache,
)?;
let event: Event = serde_json::from_str(msg.event_txt)?;
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
}
None => {
let tl =
Timeline::from_redis_text(&msg.timeline_txt["timeline:".len()..], cache)
.unwrap_or_else(|_| todo!());
let event: Event = serde_json::from_str(msg.event_txt).expect("TODO");
let tl = Timeline::from_redis_text(
&msg.timeline_txt["timeline:".len()..],
&mut self.cache,
)?;
let event: Event = serde_json::from_str(msg.event_txt)?;
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
}
Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input),
},
Ok(NonMsg(leftover)) => (Ok(Ready(None)), leftover),
Err(RedisParseErr::Incomplete) => (Ok(NotReady), input),
Err(_other) => todo!(),
Err(other) => (Err(other), input),
};
self.redis_input.extend_from_slice(leftover.as_bytes());
if let Some(bytes) = invalid_bytes {
self.redis_input.extend_from_slice(bytes);
};
self.redis_input.extend_from_slice(invalid_bytes);
res
}

View File

@ -144,7 +144,7 @@ impl<'a> TryFrom<RedisStructuredText<'a>> for RedisParseOutput<'a> {
fn try_from(input: RedisStructuredText<'a>) -> Result<RedisParseOutput<'a>, Self::Error> {
if let RedisData::RedisArray(mut redis_strings) = input.structured_txt {
let command = redis_strings.pop().expect("TODO").try_into()?;
let command = redis_strings.pop().ok_or(MissingField)?.try_into()?;
match command {
// subscription statuses look like:
// $14\r\ntimeline:local\r\n
@ -154,14 +154,14 @@ impl<'a> TryFrom<RedisStructuredText<'a>> for RedisParseOutput<'a> {
// $10\r\ntimeline:4\r\n
// $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n
"message" => Ok(Msg(RedisMsg {
timeline_txt: redis_strings.pop().expect("TODO").try_into()?,
event_txt: redis_strings.pop().expect("TODO").try_into()?,
timeline_txt: redis_strings.pop().ok_or(MissingField)?.try_into()?,
event_txt: redis_strings.pop().ok_or(MissingField)?.try_into()?,
leftover_input: input.leftover_input,
})),
_cmd => Err(Incomplete)?,
_cmd => Err(Incomplete),
}
} else {
panic!("TODO");
Err(IncorrectRedisType)
}
}
}