Error handling, pt1 (#115)

* Initial work to support structured errors

* WIP error handling and RedisConn refactor

* WIP for error handling refactor

* Finish substantive work for Redis error handling

* Apply clippy lints
This commit is contained in:
Daniel Sockwell 2020-04-01 15:35:24 -04:00 committed by GitHub
parent 81b454c88c
commit d5f079a864
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 592 additions and 638 deletions

2
Cargo.lock generated
View File

@ -440,7 +440,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "flodgatt" name = "flodgatt"
version = "0.6.6" version = "0.6.7"
dependencies = [ dependencies = [
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,7 +1,7 @@
[package] [package]
name = "flodgatt" name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.6.6" version = "0.6.7"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"] authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018" edition = "2018"

View File

@ -63,7 +63,7 @@ impl fmt::Display for EnvVar {
] ]
.iter() .iter()
{ {
if let Some(value) = self.get(&env_var.to_string()) { if let Some(value) = self.get(&(*env_var).to_string()) {
result = format!("{}\n {}: {}", result, env_var, value) result = format!("{}\n {}: {}", result, env_var, value)
} }
} }

View File

@ -1,75 +0,0 @@
use std::{error::Error, fmt};
pub fn die_with_msg(msg: impl fmt::Display) -> ! {
eprintln!("FATAL ERROR: {}", msg);
std::process::exit(1);
}
#[macro_export]
macro_rules! log_fatal {
($str:expr, $var:expr) => {{
log::error!($str, $var);
panic!();
};};
}
#[derive(Debug)]
pub enum RedisParseErr {
Incomplete,
InvalidNumber(std::num::ParseIntError),
NonNumericInput,
InvalidLineStart(String),
InvalidLineEnd,
IncorrectRedisType,
MissingField,
UnsupportedTimeline,
UnsupportedEvent(serde_json::Error),
}
impl fmt::Display for RedisParseErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{}", match self {
Self::Incomplete => "The input from Redis does not form a complete message, likely because the input buffer filled partway through a message. Save this input and try again with additional input from Redis.".to_string(),
Self::InvalidNumber(e) => format!( "Redis input cannot be parsed: {}", e),
Self::NonNumericInput => "Received non-numeric input when expecting a Redis number".to_string(),
Self::InvalidLineStart(s) => format!("Got `{}` as a line start from Redis", s),
Self::InvalidLineEnd => "Redis input ended before promised length".to_string(),
Self::IncorrectRedisType => "Received a non-array when expecting a Redis array".to_string(),
Self::MissingField => "Redis input was missing a required field".to_string(),
Self::UnsupportedTimeline => "The raw timeline received from Redis could not be parsed into a supported timeline".to_string(),
Self::UnsupportedEvent(e) => format!("The event text from Redis could not be parsed into a valid event: {}", e)
})
}
}
impl Error for RedisParseErr {}
impl From<std::num::ParseIntError> for RedisParseErr {
fn from(error: std::num::ParseIntError) -> Self {
Self::InvalidNumber(error)
}
}
impl From<serde_json::Error> for RedisParseErr {
fn from(error: serde_json::Error) -> Self {
Self::UnsupportedEvent(error)
}
}
impl From<TimelineErr> for RedisParseErr {
fn from(_: TimelineErr) -> Self {
Self::UnsupportedTimeline
}
}
#[derive(Debug)]
pub enum TimelineErr {
RedisNamespaceMismatch,
InvalidInput,
}
impl From<std::num::ParseIntError> for TimelineErr {
fn from(_error: std::num::ParseIntError) -> Self {
Self::InvalidInput
}
}

18
src/err/mod.rs Normal file
View File

@ -0,0 +1,18 @@
mod timeline;
pub use timeline::TimelineErr;
use std::fmt;
pub fn die_with_msg(msg: impl fmt::Display) -> ! {
eprintln!("FATAL ERROR: {}", msg);
std::process::exit(1);
}
#[macro_export]
macro_rules! log_fatal {
($str:expr, $var:expr) => {{
log::error!($str, $var);
panic!();
};};
}

24
src/err/timeline.rs Normal file
View File

@ -0,0 +1,24 @@
use std::fmt;
#[derive(Debug)]
pub enum TimelineErr {
RedisNamespaceMismatch,
InvalidInput,
}
impl From<std::num::ParseIntError> for TimelineErr {
fn from(_error: std::num::ParseIntError) -> Self {
Self::InvalidInput
}
}
impl fmt::Display for TimelineErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use TimelineErr::*;
let msg = match self {
RedisNamespaceMismatch => "TODO: Cut this error",
InvalidInput => "The timeline text from Redis could not be parsed into a supported timeline. TODO: add incoming timeline text"
};
write!(f, "{}", msg)
}
}

View File

@ -1,74 +1,71 @@
use flodgatt::{ use flodgatt::{
config::{DeploymentConfig, EnvVar, PostgresConfig, RedisConfig}, config::{DeploymentConfig, EnvVar, PostgresConfig, RedisConfig},
parse_client_request::{PgPool, Subscription}, parse_client_request::{PgPool, Subscription},
redis_to_client_stream::{ClientAgent, EventStream}, redis_to_client_stream::{ClientAgent, EventStream, Receiver},
}; };
use std::{collections::HashMap, env, fs, net, os::unix::fs::PermissionsExt}; use std::{env, fs, net::SocketAddr, os::unix::fs::PermissionsExt};
use tokio::net::UnixListener; use tokio::net::UnixListener;
use warp::{path, ws::Ws2, Filter}; use warp::{http::StatusCode, path, ws::Ws2, Filter, Rejection};
fn main() { fn main() {
dotenv::from_filename(match env::var("ENV").ok().as_ref().map(String::as_str) { dotenv::from_filename(match env::var("ENV").ok().as_deref() {
Some("production") => ".env.production", Some("production") => ".env.production",
Some("development") | None => ".env", Some("development") | None => ".env",
Some(unsupported) => EnvVar::err("ENV", unsupported, "`production` or `development`"), Some(unsupported) => EnvVar::err("ENV", unsupported, "`production` or `development`"),
}) })
.ok(); .ok();
let env_vars_map: HashMap<_, _> = dotenv::vars().collect(); let env_vars = EnvVar::new(dotenv::vars().collect());
let env_vars = EnvVar::new(env_vars_map);
pretty_env_logger::init(); pretty_env_logger::init();
log::info!("Environmental variables Flodgatt received: {}", &env_vars);
log::info!(
"Flodgatt recognized the following environmental variables:{}",
env_vars.clone()
);
let redis_cfg = RedisConfig::from_env(env_vars.clone());
let cfg = DeploymentConfig::from_env(env_vars.clone());
let postgres_cfg = PostgresConfig::from_env(env_vars.clone()); let postgres_cfg = PostgresConfig::from_env(env_vars.clone());
let redis_cfg = RedisConfig::from_env(env_vars.clone());
let cfg = DeploymentConfig::from_env(env_vars);
let pg_pool = PgPool::new(postgres_cfg); let pg_pool = PgPool::new(postgres_cfg);
let client_agent_sse = ClientAgent::blank(redis_cfg); let sharable_receiver = Receiver::try_from(redis_cfg)
let client_agent_ws = client_agent_sse.clone_with_shared_receiver(); .unwrap_or_else(|e| {
log::error!("{}\nFlodgatt shutting down...", e);
std::process::exit(1);
})
.into_arc();
log::info!("Streaming server initialized and ready to accept connections"); log::info!("Streaming server initialized and ready to accept connections");
// Server Sent Events // Server Sent Events
let sse_receiver = sharable_receiver.clone();
let (sse_interval, whitelist_mode) = (*cfg.sse_interval, *cfg.whitelist_mode); let (sse_interval, whitelist_mode) = (*cfg.sse_interval, *cfg.whitelist_mode);
let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode) let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode)
.and(warp::sse()) .and(warp::sse())
.map( .map(
move |subscription: Subscription, sse_connection_to_client: warp::sse::Sse| { move |subscription: Subscription, sse_connection_to_client: warp::sse::Sse| {
log::info!("Incoming SSE request for {:?}", subscription.timeline); log::info!("Incoming SSE request for {:?}", subscription.timeline);
// Create a new ClientAgent let mut client_agent = ClientAgent::new(sse_receiver.clone(), &subscription);
let mut client_agent = client_agent_sse.clone_with_shared_receiver(); client_agent.subscribe();
// Assign ClientAgent to generate stream of updates for the user/timeline pair
client_agent.init_for_user(subscription);
// send the updates through the SSE connection // send the updates through the SSE connection
EventStream::to_sse(client_agent, sse_connection_to_client, sse_interval) EventStream::send_to_sse(client_agent, sse_connection_to_client, sse_interval)
}, },
) )
.with(warp::reply::with::header("Connection", "keep-alive")); .with(warp::reply::with::header("Connection", "keep-alive"));
// WebSocket // WebSocket
let ws_receiver = sharable_receiver;
let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode); let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode);
let websocket_routes = Subscription::from_ws_request(pg_pool.clone(), whitelist_mode) let ws_routes = Subscription::from_ws_request(pg_pool, whitelist_mode)
.and(warp::ws::ws2()) .and(warp::ws::ws2())
.map(move |subscription: Subscription, ws: Ws2| { .map(move |subscription: Subscription, ws: Ws2| {
log::info!("Incoming websocket request for {:?}", subscription.timeline); log::info!("Incoming websocket request for {:?}", subscription.timeline);
let mut client_agent = ClientAgent::new(ws_receiver.clone(), &subscription);
client_agent.subscribe();
let token = subscription.access_token.clone(); // send the updates through the WS connection
// Create a new ClientAgent // (along with the User's access_token which is sent for security)
let mut client_agent = client_agent_ws.clone_with_shared_receiver();
// Assign that agent to generate a stream of updates for the user/timeline pair
client_agent.init_for_user(subscription);
// send the updates through the WS connection (along with the User's access_token
// which is sent for security)
( (
ws.on_upgrade(move |socket| { ws.on_upgrade(move |s| {
EventStream::to_ws(socket, client_agent, ws_update_interval) EventStream::send_to_ws(s, client_agent, ws_update_interval)
}), }),
token.unwrap_or_else(String::new), subscription.access_token.unwrap_or_else(String::new),
) )
}) })
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token)); .map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
@ -84,33 +81,23 @@ fn main() {
log::info!("Using Unix socket {}", socket); log::info!("Using Unix socket {}", socket);
fs::remove_file(socket).unwrap_or_default(); fs::remove_file(socket).unwrap_or_default();
let incoming = UnixListener::bind(socket).unwrap().incoming(); let incoming = UnixListener::bind(socket).unwrap().incoming();
fs::set_permissions(socket, PermissionsExt::from_mode(0o666)).unwrap(); fs::set_permissions(socket, PermissionsExt::from_mode(0o666)).unwrap();
warp::serve( warp::serve(
health.or(websocket_routes.or(sse_routes).with(cors).recover( health.or(ws_routes.or(sse_routes).with(cors).recover(|r: Rejection| {
|rejection: warp::reject::Rejection| { let json_err = match r.cause() {
let err_txt = match rejection.cause() { Some(text) if text.to_string() == "Missing request header 'authorization'" => {
Some(text) warp::reply::json(&"Error: Missing access token".to_string())
if text.to_string() == "Missing request header 'authorization'" => }
{ Some(text) => warp::reply::json(&text.to_string()),
"Error: Missing access token".to_string() None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()),
} };
Some(text) => text.to_string(), Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED))
None => "Error: Nonexistant endpoint".to_string(), })),
};
let json = warp::reply::json(&err_txt);
Ok(warp::reply::with_status(
json,
warp::http::StatusCode::UNAUTHORIZED,
))
},
)),
) )
.run_incoming(incoming); .run_incoming(incoming);
} else { } else {
let server_addr = net::SocketAddr::new(*cfg.address, cfg.port.0); let server_addr = SocketAddr::new(*cfg.address, *cfg.port);
warp::serve(health.or(websocket_routes.or(sse_routes).with(cors))).run(server_addr); warp::serve(health.or(ws_routes.or(sse_routes).with(cors))).run(server_addr);
} };
} }

View File

@ -305,10 +305,11 @@ pub struct Notification {
status: Option<Status>, status: Option<Status>,
} }
#[serde(rename_all = "lowercase", deny_unknown_fields)] #[serde(rename_all = "snake_case", deny_unknown_fields)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
enum NotificationType { enum NotificationType {
Follow, Follow,
FollowRequest, // Undocumented
Mention, Mention,
Reblog, Reblog,
Favourite, Favourite,

View File

@ -83,7 +83,7 @@ LIMIT 1",
} }
} }
pub fn select_hashtag_id(self, tag_name: &String) -> Result<i64, Rejection> { pub fn select_hashtag_id(self, tag_name: &str) -> Result<i64, Rejection> {
let mut conn = self.0.get().unwrap(); let mut conn = self.0.get().unwrap();
let rows = &conn let rows = &conn
.query( .query(

View File

@ -11,6 +11,7 @@ use crate::err::TimelineErr;
use crate::log_fatal; use crate::log_fatal;
use lru::LruCache; use lru::LruCache;
use std::collections::HashSet; use std::collections::HashSet;
use uuid::Uuid;
use warp::reject::Rejection; use warp::reject::Rejection;
use super::query; use super::query;
@ -50,6 +51,7 @@ macro_rules! parse_sse_query {
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub struct Subscription { pub struct Subscription {
pub id: Uuid,
pub timeline: Timeline, pub timeline: Timeline,
pub allowed_langs: HashSet<String>, pub allowed_langs: HashSet<String>,
pub blocks: Blocks, pub blocks: Blocks,
@ -60,6 +62,7 @@ pub struct Subscription {
impl Default for Subscription { impl Default for Subscription {
fn default() -> Self { fn default() -> Self {
Self { Self {
id: Uuid::new_v4(),
timeline: Timeline(Stream::Unset, Reach::Local, Content::Notification), timeline: Timeline(Stream::Unset, Reach::Local, Content::Notification),
allowed_langs: HashSet::new(), allowed_langs: HashSet::new(),
blocks: Blocks::default(), blocks: Blocks::default(),
@ -123,12 +126,13 @@ impl Subscription {
}; };
Ok(Subscription { Ok(Subscription {
id: Uuid::new_v4(),
timeline, timeline,
allowed_langs: user.allowed_langs, allowed_langs: user.allowed_langs,
blocks: Blocks { blocks: Blocks {
blocking_users: pool.clone().select_blocking_users(user.id), blocking_users: pool.clone().select_blocking_users(user.id),
blocked_users: pool.clone().select_blocked_users(user.id), blocked_users: pool.clone().select_blocked_users(user.id),
blocked_domains: pool.clone().select_blocked_domains(user.id), blocked_domains: pool.select_blocked_domains(user.id),
}, },
hashtag_name, hashtag_name,
access_token: q.access_token, access_token: q.access_token,

View File

@ -15,10 +15,8 @@
//! Because `StreamManagers` are lightweight data structures that do not directly //! Because `StreamManagers` are lightweight data structures that do not directly
//! communicate with Redis, it we create a new `ClientAgent` for //! communicate with Redis, it we create a new `ClientAgent` for
//! each new client connection (each in its own thread).use super::{message::Message, receiver::Receiver} //! each new client connection (each in its own thread).use super::{message::Message, receiver::Receiver}
use super::receiver::Receiver; use super::receiver::{Receiver, ReceiverErr};
use crate::{ use crate::{
config,
err::RedisParseErr,
messages::Event, messages::Event,
parse_client_request::{Stream::Public, Subscription, Timeline}, parse_client_request::{Stream::Public, Subscription, Timeline},
}; };
@ -26,33 +24,20 @@ use futures::{
Async::{self, NotReady, Ready}, Async::{self, NotReady, Ready},
Poll, Poll,
}; };
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex, MutexGuard};
use uuid::Uuid;
/// Struct for managing all Redis streams. /// Struct for managing all Redis streams.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ClientAgent { pub struct ClientAgent {
receiver: Arc<Mutex<Receiver>>, receiver: Arc<Mutex<Receiver>>,
id: Uuid,
pub subscription: Subscription, pub subscription: Subscription,
} }
impl ClientAgent { impl ClientAgent {
/// Create a new `ClientAgent` with no shared data. pub fn new(receiver: Arc<Mutex<Receiver>>, subscription: &Subscription) -> Self {
pub fn blank(redis_cfg: config::RedisConfig) -> Self {
ClientAgent { ClientAgent {
receiver: Arc::new(Mutex::new(Receiver::new(redis_cfg))), receiver,
id: Uuid::default(), subscription: subscription.clone(),
subscription: Subscription::default(),
}
}
/// Clones the `ClientAgent`, sharing the `Receiver`.
pub fn clone_with_shared_receiver(&self) -> Self {
Self {
receiver: self.receiver.clone(),
id: self.id,
subscription: self.subscription.clone(),
} }
} }
@ -64,25 +49,32 @@ impl ClientAgent {
/// a different user, the `Receiver` is responsible for figuring /// a different user, the `Receiver` is responsible for figuring
/// that out and avoiding duplicated connections. Thus, it is safe to /// that out and avoiding duplicated connections. Thus, it is safe to
/// use this method for each new client connection. /// use this method for each new client connection.
pub fn init_for_user(&mut self, subscription: Subscription) { pub fn subscribe(&mut self) {
use std::time::Instant; let mut receiver = self.lock_receiver();
self.id = Uuid::new_v4(); receiver
self.subscription = subscription; .add_subscription(&self.subscription)
let start_time = Instant::now(); .unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e))
let mut receiver = self.receiver.lock().expect("No thread panic (stream.rs)"); }
receiver.manage_new_timeline(
self.id, fn lock_receiver(&self) -> MutexGuard<Receiver> {
self.subscription.timeline, match self.receiver.lock() {
self.subscription.hashtag_name.clone(), Ok(inner) => inner,
); Err(e) => {
log::info!("init_for_user had lock for: {:?}", start_time.elapsed()); log::error!(
"Another thread crashed: {}\n
Attempting to continue, possibly with invalid data",
e
);
e.into_inner()
}
}
} }
} }
/// The stream that the `ClientAgent` manages. `Poll` is the only method implemented. /// The stream that the `ClientAgent` manages. `Poll` is the only method implemented.
impl futures::stream::Stream for ClientAgent { impl futures::stream::Stream for ClientAgent {
type Item = Event; type Item = Event;
type Error = RedisParseErr; type Error = ReceiverErr;
/// Checks for any new messages that should be sent to the client. /// Checks for any new messages that should be sent to the client.
/// ///
@ -94,12 +86,8 @@ impl futures::stream::Stream for ClientAgent {
/// errors from the underlying data structures. /// errors from the underlying data structures.
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let result = { let result = {
let mut receiver = self let mut receiver = self.lock_receiver();
.receiver receiver.poll_for(self.subscription.id, self.subscription.timeline)
.lock()
.expect("ClientAgent: No other thread panic");
receiver.configure_for_polling(self.id, self.subscription.timeline);
receiver.poll()
}; };
let allowed_langs = &self.subscription.allowed_langs; let allowed_langs = &self.subscription.allowed_langs;
@ -107,6 +95,7 @@ impl futures::stream::Stream for ClientAgent {
let blocking_users = &self.subscription.blocks.blocking_users; let blocking_users = &self.subscription.blocks.blocking_users;
let blocked_domains = &self.subscription.blocks.blocked_domains; let blocked_domains = &self.subscription.blocks.blocked_domains;
let (send, block) = (|msg| Ok(Ready(Some(msg))), Ok(NotReady)); let (send, block) = (|msg| Ok(Ready(Some(msg))), Ok(NotReady));
use Event::*; use Event::*;
match result { match result {
Ok(Async::Ready(Some(event))) => match event { Ok(Async::Ready(Some(event))) => match event {

View File

@ -8,12 +8,11 @@ use warp::{
sse::Sse, sse::Sse,
ws::{Message, WebSocket}, ws::{Message, WebSocket},
}; };
pub struct EventStream; pub struct EventStream;
impl EventStream { impl EventStream {
/// Send a stream of replies to a WebSocket client. /// Send a stream of replies to a WebSocket client.
pub fn to_ws( pub fn send_to_ws(
ws: WebSocket, ws: WebSocket,
mut client_agent: ClientAgent, mut client_agent: ClientAgent,
interval: Duration, interval: Duration,
@ -32,7 +31,7 @@ impl EventStream {
.map(|_r| ()) .map(|_r| ())
.map_err(|e| match e.to_string().as_ref() { .map_err(|e| match e.to_string().as_ref() {
"IO error: Broken pipe (os error 32)" => (), // just closed unix socket "IO error: Broken pipe (os error 32)" => (), // just closed unix socket
_ => log::warn!("websocket send error: {}", e), _ => log::warn!("WebSocket send error: {}", e),
}), }),
); );
@ -42,7 +41,6 @@ impl EventStream {
match ws_rx.poll() { match ws_rx.poll() {
Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true), Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true),
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
// TODO: consider whether we should manually drop closed connections here
log::info!("Client closed WebSocket connection for {:?}", timeline); log::info!("Client closed WebSocket connection for {:?}", timeline);
futures::future::ok(false) futures::future::ok(false)
} }
@ -58,28 +56,36 @@ impl EventStream {
} }
}); });
let mut time = Instant::now(); let mut last_ping_time = Instant::now();
// Every time you get an event from that stream, send it through the pipe // Every time you get an event from that stream, send it through the pipe
event_stream event_stream
.for_each(move |_instant| { .for_each(move |_instant| {
if let Ok(Async::Ready(Some(msg))) = client_agent.poll() { match client_agent.poll() {
tx.unbounded_send(Message::text(msg.to_json_string())) Ok(Async::Ready(Some(msg))) => tx
.expect("No send error"); .unbounded_send(Message::text(msg.to_json_string()))
}; .unwrap_or_else(|e| {
if time.elapsed() > Duration::from_secs(30) { log::error!("Could not send message to WebSocket: {}", e)
tx.unbounded_send(Message::text("{}")).expect("Can ping"); }),
time = Instant::now(); Ok(Async::Ready(None)) => log::info!("WebSocket ClientAgent got Ready(None)"),
Ok(Async::NotReady) if last_ping_time.elapsed() > Duration::from_secs(30) => {
tx.unbounded_send(Message::text("{}")).unwrap_or_else(|e| {
log::error!("Could not send ping to WebSocket: {}", e)
});
last_ping_time = Instant::now();
}
Ok(Async::NotReady) => (), // no new messages; nothing to do
Err(e) => log::error!("{}\n Dropping WebSocket message and continuing.", e),
} }
Ok(()) Ok(())
}) })
.then(move |result| { .then(move |result| {
// TODO: consider whether we should manually drop closed connections here
log::info!("WebSocket connection for {:?} closed.", timeline); log::info!("WebSocket connection for {:?} closed.", timeline);
result result
}) })
.map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e)) .map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e))
} }
pub fn to_sse(mut client_agent: ClientAgent, sse: Sse, interval: Duration) -> impl Reply {
pub fn send_to_sse(mut client_agent: ClientAgent, sse: Sse, interval: Duration) -> impl Reply {
let event_stream = let event_stream =
tokio::timer::Interval::new(Instant::now(), interval).filter_map(move |_| { tokio::timer::Interval::new(Instant::now(), interval).filter_map(move |_| {
match client_agent.poll() { match client_agent.poll() {
@ -87,7 +93,15 @@ impl EventStream {
warp::sse::event(event.event_name()), warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)), warp::sse::data(event.payload().unwrap_or_else(String::new)),
)), )),
_ => None, Ok(Async::Ready(None)) => {
log::info!("SSE ClientAgent got Ready(None)");
None
}
Ok(Async::NotReady) => None,
Err(e) => {
log::error!("{}\n Dropping SSE message and continuing.", e);
None
}
} }
}); });

View File

@ -4,14 +4,4 @@ mod event_stream;
mod receiver; mod receiver;
mod redis; mod redis;
pub use {client_agent::ClientAgent, event_stream::EventStream}; pub use {client_agent::ClientAgent, event_stream::EventStream, receiver::Receiver};
// TODO remove
pub use redis::redis_msg;
//#[cfg(test)]
//pub use receiver::process_messages;
//#[cfg(test)]
pub use receiver::{MessageQueues, MsgQueue};
//#[cfg(test)]
//pub use redis::redis_msg::{RedisMsg, RedisUtf8};

View File

@ -0,0 +1,50 @@
use super::super::redis::{RedisConnErr, RedisParseErr};
use crate::err::TimelineErr;
use serde_json;
use std::fmt;
#[derive(Debug)]
pub enum ReceiverErr {
TimelineErr(TimelineErr),
EventErr(serde_json::Error),
RedisParseErr(RedisParseErr),
RedisConnErr(RedisConnErr),
}
impl fmt::Display for ReceiverErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use ReceiverErr::*;
match self {
EventErr(inner) => write!(f, "{}", inner),
RedisParseErr(inner) => write!(f, "{}", inner),
RedisConnErr(inner) => write!(f, "{}", inner),
TimelineErr(inner) => write!(f, "{}", inner),
}?;
Ok(())
}
}
impl From<serde_json::Error> for ReceiverErr {
fn from(error: serde_json::Error) -> Self {
Self::EventErr(error)
}
}
impl From<RedisConnErr> for ReceiverErr {
fn from(e: RedisConnErr) -> Self {
Self::RedisConnErr(e)
}
}
impl From<TimelineErr> for ReceiverErr {
fn from(e: TimelineErr) -> Self {
Self::TimelineErr(e)
}
}
impl From<RedisParseErr> for ReceiverErr {
fn from(e: RedisParseErr) -> Self {
Self::RedisParseErr(e)
}
}

View File

@ -1,5 +1,6 @@
use crate::messages::Event; use crate::messages::Event;
use crate::parse_client_request::Timeline; use crate::parse_client_request::Timeline;
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
fmt, fmt,
@ -13,22 +14,6 @@ pub struct MsgQueue {
pub messages: VecDeque<Event>, pub messages: VecDeque<Event>,
last_polled_at: Instant, last_polled_at: Instant,
} }
impl fmt::Debug for MsgQueue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"\
MsgQueue {{
timeline: {:?},
messages: {:?},
last_polled_at: {:?},
}}",
self.timeline,
self.messages,
self.last_polled_at.elapsed(),
)
}
}
impl MsgQueue { impl MsgQueue {
pub fn new(timeline: Timeline) -> Self { pub fn new(timeline: Timeline) -> Self {
@ -38,26 +23,15 @@ impl MsgQueue {
timeline, timeline,
} }
} }
pub fn update_polled_at_time(&mut self) {
self.last_polled_at = Instant::now();
}
} }
#[derive(Debug)] #[derive(Debug)]
pub struct MessageQueues(pub HashMap<Uuid, MsgQueue>); pub struct MessageQueues(pub HashMap<Uuid, MsgQueue>);
impl MessageQueues { impl MessageQueues {
pub fn update_time_for_target_queue(&mut self, id: Uuid) {
self.entry(id)
.and_modify(|queue| queue.last_polled_at = Instant::now());
}
pub fn oldest_msg_in_target_queue(&mut self, id: Uuid, timeline: Timeline) -> Option<Event> {
let msg_qs_entry = self.entry(id);
let mut inserted_tl = false;
let msg_q = msg_qs_entry.or_insert_with(|| {
inserted_tl = true;
MsgQueue::new(timeline)
});
msg_q.messages.pop_front()
}
pub fn calculate_timelines_to_add_or_drop(&mut self, timeline: Timeline) -> Vec<Change> { pub fn calculate_timelines_to_add_or_drop(&mut self, timeline: Timeline) -> Vec<Change> {
let mut timelines_to_modify = Vec::new(); let mut timelines_to_modify = Vec::new();
@ -85,6 +59,23 @@ pub struct Change {
pub in_subscriber_number: i32, pub in_subscriber_number: i32,
} }
impl fmt::Debug for MsgQueue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"\
MsgQueue {{
timeline: {:?},
messages: {:?},
last_polled_at: {:?} ago,
}}",
self.timeline,
self.messages,
self.last_polled_at.elapsed(),
)
}
}
impl std::ops::Deref for MessageQueues { impl std::ops::Deref for MessageQueues {
type Target = HashMap<Uuid, MsgQueue>; type Target = HashMap<Uuid, MsgQueue>;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {

View File

@ -1,51 +1,53 @@
//! Receives data from Redis, sorts it by `ClientAgent`, and stores it until //! Receives data from Redis, sorts it by `ClientAgent`, and stores it until
//! polled by the correct `ClientAgent`. Also manages sububscriptions and //! polled by the correct `ClientAgent`. Also manages sububscriptions and
//! unsubscriptions to/from Redis. //! unsubscriptions to/from Redis.
mod err;
mod message_queues; mod message_queues;
pub use err::ReceiverErr;
pub use message_queues::{MessageQueues, MsgQueue}; pub use message_queues::{MessageQueues, MsgQueue};
use super::redis::{redis_connection::RedisCmd, RedisConn};
use crate::{ use crate::{
config, config,
err::RedisParseErr,
messages::Event, messages::Event,
parse_client_request::{Stream, Timeline}, parse_client_request::{Stream, Subscription, Timeline},
redis_to_client_stream::redis::RedisConn,
}; };
use futures::{Async, Poll}; use futures::{Async, Poll};
use lru::LruCache; use std::{
use std::{collections::HashMap, time::Instant}; collections::HashMap,
result,
sync::{Arc, Mutex},
};
use uuid::Uuid; use uuid::Uuid;
type Result<T> = result::Result<T, ReceiverErr>;
/// The item that streams from Redis and is polled by the `ClientAgent` /// The item that streams from Redis and is polled by the `ClientAgent`
#[derive(Debug)] #[derive(Debug)]
pub struct Receiver { pub struct Receiver {
redis_connection: RedisConn, redis_connection: RedisConn,
timeline: Timeline,
manager_id: Uuid,
pub msg_queues: MessageQueues, pub msg_queues: MessageQueues,
clients_per_timeline: HashMap<Timeline, i32>, clients_per_timeline: HashMap<Timeline, i32>,
hashtag_cache: LruCache<i64, String>,
// TODO: eventually, it might make sense to have Mastodon publish to timelines with
// the tag number instead of the tag name. This would save us from dealing
// with a cache here and would be consistent with how lists/users are handled.
} }
impl Receiver { impl Receiver {
/// Create a new `Receiver`, with its own Redis connections (but, as yet, no /// Create a new `Receiver`, with its own Redis connections (but, as yet, no
/// active subscriptions). /// active subscriptions).
pub fn new(redis_cfg: config::RedisConfig) -> Self { pub fn try_from(redis_cfg: config::RedisConfig) -> Result<Self> {
let redis_connection = RedisConn::new(redis_cfg); let redis_connection = RedisConn::new(redis_cfg)?;
Self { Ok(Self {
redis_connection, redis_connection,
timeline: Timeline::empty(),
manager_id: Uuid::default(),
msg_queues: MessageQueues(HashMap::new()), msg_queues: MessageQueues(HashMap::new()),
clients_per_timeline: HashMap::new(), clients_per_timeline: HashMap::new(),
hashtag_cache: LruCache::new(1000), })
// should this be a run-time option? }
}
pub fn into_arc(self) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(self))
} }
/// Assigns the `Receiver` a new timeline to monitor and runs other /// Assigns the `Receiver` a new timeline to monitor and runs other
@ -54,73 +56,24 @@ impl Receiver {
/// Note: this method calls `subscribe_or_unsubscribe_as_needed`, /// Note: this method calls `subscribe_or_unsubscribe_as_needed`,
/// so Redis PubSub subscriptions are only updated when a new timeline /// so Redis PubSub subscriptions are only updated when a new timeline
/// comes under management for the first time. /// comes under management for the first time.
pub fn manage_new_timeline(&mut self, id: Uuid, tl: Timeline, hashtag: Option<String>) { pub fn add_subscription(&mut self, subscription: &Subscription) -> Result<()> {
self.timeline = tl; let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline);
if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (hashtag, tl) {
self.hashtag_cache.put(id, hashtag.clone()); if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (tag, tl) {
self.redis_connection.update_cache(hashtag, id); self.redis_connection.update_cache(hashtag, id);
}; };
self.msg_queues.insert(subscription.id, MsgQueue::new(tl));
self.msg_queues.insert(id, MsgQueue::new(tl)); self.subscribe_or_unsubscribe_as_needed(tl)?;
self.subscribe_or_unsubscribe_as_needed(tl); Ok(())
} }
/// Set the `Receiver`'s manager_id and target_timeline fields to the appropriate
/// value to be polled by the current `StreamManager`.
pub fn configure_for_polling(&mut self, manager_id: Uuid, timeline: Timeline) {
self.manager_id = manager_id;
self.timeline = timeline;
}
/// Drop any PubSub subscriptions that don't have active clients and check
/// that there's a subscription to the current one. If there isn't, then
/// subscribe to it.
fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: Timeline) {
let start_time = Instant::now();
let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(timeline);
// Record the lower number of clients subscribed to that channel
for change in timelines_to_modify {
let timeline = change.timeline;
let hashtag = match timeline {
Timeline(Stream::Hashtag(id), _, _) => self.hashtag_cache.get(&id),
_non_hashtag_timeline => None,
};
let count_of_subscribed_clients = self
.clients_per_timeline
.entry(timeline)
.and_modify(|n| *n += change.in_subscriber_number)
.or_insert_with(|| 1);
// If no clients, unsubscribe from the channel
if *count_of_subscribed_clients <= 0 {
self.redis_connection
.send_unsubscribe_cmd(&timeline.to_redis_raw_timeline(hashtag));
} else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 {
self.redis_connection
.send_subscribe_cmd(&timeline.to_redis_raw_timeline(hashtag));
}
}
if start_time.elapsed().as_millis() > 1 {
log::warn!("Sending cmd to Redis took: {:?}", start_time.elapsed());
};
}
}
/// The stream that the ClientAgent polls to learn about new messages.
impl futures::stream::Stream for Receiver {
type Item = Event;
type Error = RedisParseErr;
/// Returns the oldest message in the `ClientAgent`'s queue (if any). /// Returns the oldest message in the `ClientAgent`'s queue (if any).
/// ///
/// Note: This method does **not** poll Redis every time, because polling /// Note: This method does **not** poll Redis every time, because polling
/// Redis is significantly more time consuming that simply returning the /// Redis is significantly more time consuming that simply returning the
/// message already in a queue. Thus, we only poll Redis if it has not /// message already in a queue. Thus, we only poll Redis if it has not
/// been polled lately. /// been polled lately.
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { pub fn poll_for(&mut self, id: Uuid, timeline: Timeline) -> Poll<Option<Event>, ReceiverErr> {
let (timeline, id) = (self.timeline.clone(), self.manager_id);
loop { loop {
match self.redis_connection.poll_redis() { match self.redis_connection.poll_redis() {
Ok(Async::Ready(Some((timeline, event)))) => self Ok(Async::Ready(Some((timeline, event)))) => self
@ -136,13 +89,47 @@ impl futures::stream::Stream for Receiver {
} }
} }
// Record current time as last polled time
self.msg_queues.update_time_for_target_queue(id);
// If the `msg_queue` being polled has any new messages, return the first (oldest) one // If the `msg_queue` being polled has any new messages, return the first (oldest) one
match self.msg_queues.oldest_msg_in_target_queue(id, timeline) { match self.msg_queues.get_mut(&id) {
Some(value) => Ok(Async::Ready(Some(value))), Some(msg_q) => {
_ => Ok(Async::NotReady), msg_q.update_polled_at_time();
match msg_q.messages.pop_front() {
Some(event) => Ok(Async::Ready(Some(event))),
None => Ok(Async::NotReady),
}
}
None => {
log::error!("Polled a MsgQueue that had not been set up. Setting it up now.");
self.msg_queues.insert(id, MsgQueue::new(timeline));
Ok(Async::NotReady)
}
} }
} }
/// Drop any PubSub subscriptions that don't have active clients and check
/// that there's a subscription to the current one. If there isn't, then
/// subscribe to it.
fn subscribe_or_unsubscribe_as_needed(&mut self, tl: Timeline) -> Result<()> {
let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(tl);
// Record the lower number of clients subscribed to that channel
for change in timelines_to_modify {
let timeline = change.timeline;
let count_of_subscribed_clients = self
.clients_per_timeline
.entry(timeline)
.and_modify(|n| *n += change.in_subscriber_number)
.or_insert_with(|| 1);
// If no clients, unsubscribe from the channel
use RedisCmd::*;
if *count_of_subscribed_clients <= 0 {
self.redis_connection.send_cmd(Unsubscribe, &timeline)?;
} else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 {
self.redis_connection.send_cmd(Subscribe, &timeline)?
}
}
Ok(())
}
} }

View File

@ -1,5 +1,5 @@
pub mod redis_cmd;
pub mod redis_connection; pub mod redis_connection;
pub mod redis_msg; pub mod redis_msg;
pub use redis_connection::RedisConn; pub use redis_connection::{RedisConn, RedisConnErr};
pub use redis_msg::RedisParseErr;

View File

@ -1,75 +0,0 @@
//! Send raw TCP commands to the Redis server
use std::fmt::Display;
/// Send a subscribe or unsubscribe to the Redis PubSub channel
#[macro_export]
macro_rules! pubsub_cmd {
($cmd:expr, $self:expr, $tl:expr) => {{
use std::io::Write;
log::info!("Sending {} command to {}", $cmd, $tl);
let namespace = $self.redis_namespace.clone();
$self
.primary
.write_all(&redis_cmd::pubsub($cmd, $tl, namespace.clone()))
.expect("Can send command to Redis");
// Because we keep track of the number of clients subscribed to a channel on our end,
// we need to manually tell Redis when we have subscribed or unsubscribed
let subscription_new_number = match $cmd {
"unsubscribe" => "0",
"subscribe" => "1",
_ => panic!("Given unacceptable PUBSUB command"),
};
$self
.secondary
.write_all(&redis_cmd::set(
format!("subscribed:{}", $tl),
subscription_new_number,
namespace.clone(),
))
.expect("Can set Redis");
// TODO: re-enable info logging >>> log::info!("Now subscribed to: {:#?}", $self.msg_queues);
}};
}
/// Send a `SUBSCRIBE` or `UNSUBSCRIBE` command to a specific timeline
pub fn pubsub(command: impl Display, timeline: impl Display, ns: Option<String>) -> Vec<u8> {
let arg = match ns {
Some(namespace) => format!("{}:{}", namespace, timeline),
None => format!("{}", timeline),
};
cmd(command, arg)
}
/// Send a generic two-item command to Redis
pub fn cmd(command: impl Display, arg: impl Display) -> Vec<u8> {
let (command, arg) = (command.to_string(), arg.to_string());
log::info!("Sent {} command", &command);
format!(
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
cmd_length = command.len(),
cmd = command,
arg_length = arg.len(),
arg = arg
)
.as_bytes()
.to_owned()
}
/// Send a `SET` command (used to manually unsubscribe from Redis)
pub fn set(key: impl Display, value: impl Display, ns: Option<String>) -> Vec<u8> {
let key = match ns {
Some(namespace) => format!("{}:{}", namespace, key),
None => key.to_string(),
};
let value = value.to_string();
format!(
"*3\r\n$3\r\nSET\r\n${key_length}\r\n{key}\r\n${value_length}\r\n{value}\r\n",
key_length = key.len(),
key = key,
value_length = value.len(),
value = value
)
.as_bytes()
.to_owned()
}

View File

@ -1,189 +0,0 @@
use super::{redis_cmd, redis_msg::RedisParseOutput};
use crate::{
config::RedisConfig,
err::{self, RedisParseErr},
messages::Event,
parse_client_request::Timeline,
pubsub_cmd,
};
use futures::{Async, Poll};
use lru::LruCache;
use std::{
convert::TryFrom,
io::Read,
io::Write,
net, str,
time::{Duration, Instant},
};
use tokio::io::AsyncRead;
#[derive(Debug)]
pub struct RedisConn {
primary: net::TcpStream,
secondary: net::TcpStream,
redis_poll_interval: Duration,
redis_polled_at: Instant,
redis_namespace: Option<String>,
cache: LruCache<String, i64>,
redis_input: Vec<u8>, // TODO: Consider queue internal to RedisConn
}
impl RedisConn {
pub fn new(redis_cfg: RedisConfig) -> Self {
let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port);
let conn_err = |e| {
err::die_with_msg(format!(
"Could not connect to Redis at {}:{}.\n Error detail: {}",
*redis_cfg.host, *redis_cfg.port, e,
))
};
let update_conn = |mut conn| {
if let Some(password) = redis_cfg.password.clone() {
conn = send_password(conn, &password);
}
conn = send_test_ping(conn);
conn.set_read_timeout(Some(Duration::from_millis(10)))
.expect("Can set read timeout for Redis connection");
if let Some(db) = &*redis_cfg.db {
conn = set_db(conn, db);
}
conn
};
let (primary_conn, secondary_conn) = (
update_conn(net::TcpStream::connect(addr.clone()).unwrap_or_else(conn_err)),
update_conn(net::TcpStream::connect(addr).unwrap_or_else(conn_err)),
);
primary_conn
.set_nonblocking(true)
.expect("set_nonblocking call failed");
Self {
primary: primary_conn,
secondary: secondary_conn,
cache: LruCache::new(1000),
redis_namespace: redis_cfg.namespace.clone(),
redis_poll_interval: *redis_cfg.polling_interval,
redis_input: Vec::new(),
redis_polled_at: Instant::now(),
}
}
pub fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, RedisParseErr> {
let mut buffer = vec![0u8; 6000];
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
if let Ok(Async::Ready(bytes_read)) = self.poll_read(&mut buffer) {
self.redis_input.extend_from_slice(&buffer[..bytes_read]);
}
}
let input = self.redis_input.clone();
self.redis_input.clear();
let (input, invalid_bytes) = str::from_utf8(&input)
.map(|input| (input, "".as_bytes()))
.unwrap_or_else(|e| {
let (valid, invalid) = input.split_at(e.valid_up_to());
(str::from_utf8(valid).expect("Guaranteed by ^^^^"), invalid)
});
use {Async::*, RedisParseOutput::*};
let (res, leftover) = match RedisParseOutput::try_from(input) {
Ok(Msg(msg)) => match &self.redis_namespace {
Some(ns) if msg.timeline_txt.starts_with(&format!("{}:timeline:", ns)) => {
let tl = Timeline::from_redis_text(
&msg.timeline_txt[ns.len() + ":timeline:".len()..],
&mut self.cache,
)?;
let event: Event = serde_json::from_str(msg.event_txt)?;
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
}
None => {
let tl = Timeline::from_redis_text(
&msg.timeline_txt["timeline:".len()..],
&mut self.cache,
)?;
let event: Event = serde_json::from_str(msg.event_txt)?;
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
}
Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input),
},
Ok(NonMsg(leftover)) => (Ok(Ready(None)), leftover),
Err(RedisParseErr::Incomplete) => (Ok(NotReady), input),
Err(other) => (Err(other), input),
};
self.redis_input.extend_from_slice(leftover.as_bytes());
self.redis_input.extend_from_slice(invalid_bytes);
res
}
pub fn update_cache(&mut self, hashtag: String, id: i64) {
self.cache.put(hashtag, id);
}
pub fn send_unsubscribe_cmd(&mut self, timeline: &str) {
pubsub_cmd!("unsubscribe", self, timeline);
}
pub fn send_subscribe_cmd(&mut self, timeline: &str) {
pubsub_cmd!("subscribe", self, timeline);
}
}
fn send_password(mut conn: net::TcpStream, password: &str) -> net::TcpStream {
conn.write_all(&redis_cmd::cmd("auth", &password)).unwrap();
let mut buffer = vec![0u8; 5];
conn.read_exact(&mut buffer).unwrap();
let reply = String::from_utf8(buffer.to_vec()).unwrap();
if reply != "+OK\r\n" {
err::die_with_msg(format!(
r"Incorrect Redis password. You supplied `{}`.
Please supply correct password with REDIS_PASSWORD environmental variable.",
password,
))
};
conn
}
fn set_db(mut conn: net::TcpStream, db: &str) -> net::TcpStream {
conn.write_all(&redis_cmd::cmd("SELECT", &db)).unwrap();
conn
}
fn send_test_ping(mut conn: net::TcpStream) -> net::TcpStream {
conn.write_all(b"PING\r\n").unwrap();
let mut buffer = vec![0u8; 7];
conn.read_exact(&mut buffer).unwrap();
let reply = String::from_utf8(buffer.to_vec()).unwrap();
match reply.as_str() {
"+PONG\r\n" => (),
"-NOAUTH" => err::die_with_msg(
r"Invalid authentication for Redis.
Redis reports that it needs a password, but you did not provide one.
You can set a password with the REDIS_PASSWORD environmental variable.",
),
"HTTP/1." => err::die_with_msg(
r"The server at REDIS_HOST and REDIS_PORT is not a Redis server.
Please update the REDIS_HOST and/or REDIS_PORT environmental variables.",
),
_ => err::die_with_msg(format!(
"Could not connect to Redis for unknown reason. Expected `+PONG` reply but got {}",
reply
)),
};
conn
}
impl Read for RedisConn {
fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> {
self.primary.read(buffer)
}
}
impl AsyncRead for RedisConn {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
match self.read(buf) {
Ok(t) => Ok(Async::Ready(t)),
Err(_) => Ok(Async::NotReady),
}
}
}

View File

@ -0,0 +1,61 @@
use std::fmt;
#[derive(Debug)]
pub enum RedisConnErr {
ConnectionErr { addr: String, inner: std::io::Error },
InvalidRedisReply(String),
UnknownRedisErr(std::io::Error),
IncorrectPassword(String),
MissingPassword,
NotRedis(String),
}
impl RedisConnErr {
pub fn with_addr<T: AsRef<str>>(address: T, inner: std::io::Error) -> Self {
Self::ConnectionErr {
addr: address.as_ref().to_string(),
inner,
}
}
}
impl fmt::Display for RedisConnErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use RedisConnErr::*;
let msg = match self {
ConnectionErr { addr, inner } => format!(
"Error connecting to Redis at {}.\n\
Connection Error: {}",
addr, inner
),
InvalidRedisReply(unexpected_reply) => format!(
"Received and unexpected reply from Redis. Expected `+PONG` reply but got `{}`",
unexpected_reply
),
UnknownRedisErr(io_err) => {
format!("Unexpected failure communicating with Redis: {}", io_err)
}
IncorrectPassword(attempted_password) => format!(
"Incorrect Redis password. You supplied `{}`.\n \
Please supply correct password with REDIS_PASSWORD environmental variable.",
attempted_password
),
MissingPassword => "Invalid authentication for Redis. Redis is configured to require \
a password, but you did not provide one. \n\
Set a password using the REDIS_PASSWORD environmental variable."
.to_string(),
NotRedis(addr) => format!(
"The server at {} is not a Redis server. Please update the REDIS_HOST and/or \
REDIS_PORT environmental variables and try again.",
addr
),
};
write!(f, "{}", msg)
}
}
impl From<std::io::Error> for RedisConnErr {
fn from(e: std::io::Error) -> RedisConnErr {
RedisConnErr::UnknownRedisErr(e)
}
}

View File

@ -0,0 +1,180 @@
mod err;
pub use err::RedisConnErr;
use super::super::receiver::ReceiverErr;
use super::redis_msg::{RedisParseErr, RedisParseOutput};
use crate::{
config::RedisConfig,
messages::Event,
parse_client_request::{Stream, Timeline},
};
use std::{
convert::TryFrom,
io::{Read, Write},
net::TcpStream,
str,
time::{Duration, Instant},
};
use futures::{Async, Poll};
use lru::LruCache;
type Result<T> = std::result::Result<T, RedisConnErr>;
#[derive(Debug)]
pub struct RedisConn {
primary: TcpStream,
secondary: TcpStream,
redis_poll_interval: Duration,
redis_polled_at: Instant,
redis_namespace: Option<String>,
tag_id_cache: LruCache<String, i64>,
tag_name_cache: LruCache<i64, String>,
redis_input: Vec<u8>,
}
impl RedisConn {
pub fn new(redis_cfg: RedisConfig) -> Result<Self> {
let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port);
let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?;
conn.set_nonblocking(true)
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
let redis_conn = Self {
primary: conn,
secondary: Self::new_connection(&addr, redis_cfg.password.as_ref())?,
tag_id_cache: LruCache::new(1000),
tag_name_cache: LruCache::new(1000),
// TODO: eventually, it might make sense to have Mastodon publish to timelines with
// the tag number instead of the tag name. This would save us from dealing
// with a cache here and would be consistent with how lists/users are handled.
redis_namespace: redis_cfg.namespace.clone(),
redis_poll_interval: *redis_cfg.polling_interval,
redis_input: Vec::new(),
redis_polled_at: Instant::now(),
};
Ok(redis_conn)
}
pub fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, ReceiverErr> {
let mut buffer = vec![0u8; 6000];
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
if let Ok(bytes_read) = self.primary.read(&mut buffer) {
self.redis_input.extend_from_slice(&buffer[..bytes_read]);
}
}
if self.redis_input.is_empty() {
return Ok(Async::NotReady);
}
let input = self.redis_input.clone();
self.redis_input.clear();
let (input, invalid_bytes) = str::from_utf8(&input)
.map(|input| (input, "".as_bytes()))
.unwrap_or_else(|e| {
let (valid, invalid) = input.split_at(e.valid_up_to());
(str::from_utf8(valid).expect("Guaranteed by ^^^^"), invalid)
});
use {Async::*, RedisParseOutput::*};
let (res, leftover) = match RedisParseOutput::try_from(input) {
Ok(Msg(msg)) => match &self.redis_namespace {
Some(ns) if msg.timeline_txt.starts_with(&format!("{}:timeline:", ns)) => {
let trimmed_tl_txt = &msg.timeline_txt[ns.len() + ":timeline:".len()..];
let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut self.tag_id_cache)?;
let event: Event = serde_json::from_str(msg.event_txt)?;
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
}
None => {
let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..];
let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut self.tag_id_cache)?;
let event: Event = serde_json::from_str(msg.event_txt)?;
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
}
Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input),
},
Ok(NonMsg(leftover)) => (Ok(Ready(None)), leftover),
Err(RedisParseErr::Incomplete) => (Ok(NotReady), input),
Err(other_parse_err) => (Err(ReceiverErr::RedisParseErr(other_parse_err)), input),
};
self.redis_input.extend_from_slice(leftover.as_bytes());
self.redis_input.extend_from_slice(invalid_bytes);
res
}
pub fn update_cache(&mut self, hashtag: String, id: i64) {
self.tag_id_cache.put(hashtag.clone(), id);
self.tag_name_cache.put(id, hashtag);
}
fn new_connection(addr: &str, pass: Option<&String>) -> Result<TcpStream> {
match TcpStream::connect(&addr) {
Ok(mut conn) => {
if let Some(password) = pass {
Self::auth_connection(&mut conn, &addr, password)?;
}
Self::validate_connection(&mut conn, &addr)?;
conn.set_read_timeout(Some(Duration::from_millis(10)))
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
Ok(conn)
}
Err(e) => Err(RedisConnErr::with_addr(&addr, e)),
}
}
fn auth_connection(conn: &mut TcpStream, addr: &str, pass: &str) -> Result<()> {
conn.write_all(&format!("*2\r\n$4\r\nauth\r\n${}\r\n{}\r\n", pass.len(), pass).as_bytes())
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
let mut buffer = vec![0u8; 5];
conn.read_exact(&mut buffer)
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
let reply = String::from_utf8_lossy(&buffer);
match &*reply {
"+OK\r\n" => (),
_ => Err(RedisConnErr::IncorrectPassword(pass.to_string()))?,
};
Ok(())
}
fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<()> {
conn.write_all(b"PING\r\n")
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
let mut buffer = vec![0u8; 7];
conn.read_exact(&mut buffer)
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
let reply = String::from_utf8_lossy(&buffer);
match &*reply {
"+PONG\r\n" => Ok(()),
"-NOAUTH" => Err(RedisConnErr::MissingPassword),
"HTTP/1." => Err(RedisConnErr::NotRedis(addr.to_string())),
_ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())),
}
}
pub fn send_cmd(&mut self, cmd: RedisCmd, timeline: &Timeline) -> Result<()> {
let hashtag = match timeline {
Timeline(Stream::Hashtag(id), _, _) => self.tag_name_cache.get(id),
_non_hashtag_timeline => None,
};
let tl = timeline.to_redis_raw_timeline(hashtag);
let (primary_cmd, secondary_cmd) = match cmd {
RedisCmd::Subscribe => (
format!("*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl),
format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n1\r\n", tl.len(), tl),
),
RedisCmd::Unsubscribe => (
format!("*2\r\n$11\r\nunsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl),
format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n0\r\n", tl.len(), tl),
),
};
self.primary.write_all(&primary_cmd.as_bytes())?;
self.secondary.write_all(&secondary_cmd.as_bytes())?;
Ok(())
}
}
pub enum RedisCmd {
Subscribe,
Unsubscribe,
}

View File

@ -0,0 +1,50 @@
use std::{error::Error, fmt};
#[derive(Debug)]
pub enum RedisParseErr {
Incomplete,
InvalidNumber(std::num::ParseIntError),
InvalidLineStart(String),
InvalidLineEnd,
IncorrectRedisType,
MissingField,
}
impl fmt::Display for RedisParseErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use RedisParseErr::*;
let msg = match self {
Incomplete => "The input from Redis does not form a complete message, likely because \
the input buffer filled partway through a message. Save this input \
and try again with additional input from Redis."
.to_string(),
InvalidNumber(parse_int_err) => format!(
"Redis indicated that an item would be a number, but it could not be parsed: {}",
parse_int_err
),
InvalidLineStart(line_start_char) => format!(
"A line from Redis started with `{}`, which is not a valid character to indicate \
the type of the Redis line.",
line_start_char
),
InvalidLineEnd => "A Redis line ended before expected line length".to_string(),
IncorrectRedisType => "Received a Redis type that is not supported in this context. \
Flodgatt expects each message from Redis to be a Redis array \
consisting of bulk strings or integers."
.to_string(),
MissingField => "Redis input was missing a field Flodgatt expected (e.g., a `message` \
without a payload line)"
.to_string(),
};
write!(f, "{}", msg)
}
}
impl Error for RedisParseErr {}
impl From<std::num::ParseIntError> for RedisParseErr {
fn from(error: std::num::ParseIntError) -> Self {
Self::InvalidNumber(error)
}
}

View File

@ -20,8 +20,11 @@
//! three characters, the second is a bulk string with ten characters, and the third is a //! three characters, the second is a bulk string with ten characters, and the third is a
//! bulk string with 1,386 characters. //! bulk string with 1,386 characters.
mod err;
pub use err::RedisParseErr;
use self::RedisParseOutput::*; use self::RedisParseOutput::*;
use crate::err::RedisParseErr;
use std::{ use std::{
convert::{TryFrom, TryInto}, convert::{TryFrom, TryInto},
str, str,
@ -84,7 +87,7 @@ fn utf8_to_redis_data<'a>(s: &'a str) -> Result<(RedisData, &'a str), RedisParse
} }
} }
fn after_newline_at<'a>(s: &'a str, start: usize) -> RedisParser<'a, &'a str> { fn after_newline_at(s: &str, start: usize) -> RedisParser<&str> {
let s = s.get(start..).ok_or(Incomplete)?; let s = s.get(start..).ok_or(Incomplete)?;
if !s.starts_with("\r\n") { if !s.starts_with("\r\n") {
return Err(RedisParseErr::InvalidLineEnd); return Err(RedisParseErr::InvalidLineEnd);
@ -93,10 +96,7 @@ fn after_newline_at<'a>(s: &'a str, start: usize) -> RedisParser<'a, &'a str> {
} }
fn parse_number_at<'a>(s: &'a str) -> RedisParser<(usize, &'a str)> { fn parse_number_at<'a>(s: &'a str) -> RedisParser<(usize, &'a str)> {
let len = s let len = s.chars().position(|c| !c.is_numeric()).ok_or(Incomplete)?;
.chars()
.position(|c| !c.is_numeric())
.ok_or(NonNumericInput)?;
Ok((s[..len].parse()?, after_newline_at(s, len)?)) Ok((s[..len].parse()?, after_newline_at(s, len)?))
} }

View File

@ -1,53 +0,0 @@
This error indicates that a lifetime is missing from a type. If it is an error
inside a function signature, the problem may be with failing to adhere to the
lifetime elision rules (see below).
Erroneous code examples:
```
struct Foo1 { x: &bool }
// ^ expected lifetime parameter
struct Foo2<'a> { x: &'a bool } // correct
struct Bar1 { x: Foo2 }
// ^^^^ expected lifetime parameter
struct Bar2<'a> { x: Foo2<'a> } // correct
enum Baz1 { A(u8), B(&bool), }
// ^ expected lifetime parameter
enum Baz2<'a> { A(u8), B(&'a bool), } // correct
type MyStr1 = &str;
// ^ expected lifetime parameter
type MyStr2<'a> = &'a str; // correct
```
Lifetime elision is a special, limited kind of inference for lifetimes in
function signatures which allows you to leave out lifetimes in certain cases.
For more background on lifetime elision see [the book][book-le].
The lifetime elision rules require that any function signature with an elided
output lifetime must either have:
- exactly one input lifetime
- or, multiple input lifetimes, but the function must also be a method with a
`&self` or `&mut self` receiver
In the first case, the output lifetime is inferred to be the same as the unique
input lifetime. In the second case, the lifetime is instead inferred to be the
same as the lifetime on `&self` or `&mut self`.
Here are some examples of elision errors:
```
// error, no input lifetimes
fn foo() -> &str { }
// error, `x` and `y` have distinct lifetimes inferred
fn bar(x: &str, y: &str) -> &str { }
// error, `y`'s lifetime is inferred to be distinct from `x`'s
fn baz<'a>(x: &'a str, y: &str) -> &str { }
```
[book-le]: https://doc.rust-lang.org/book/ch10-03-lifetime-syntax.html#lifetime-elision