diff --git a/Cargo.lock b/Cargo.lock index 7c54ed8..ac9700f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -440,7 +440,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.6.6" +version = "0.6.7" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 45b984e..6226c65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.6.6" +version = "0.6.7" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/config/environmental_variables.rs b/src/config/environmental_variables.rs index a790c0b..a65025f 100644 --- a/src/config/environmental_variables.rs +++ b/src/config/environmental_variables.rs @@ -63,7 +63,7 @@ impl fmt::Display for EnvVar { ] .iter() { - if let Some(value) = self.get(&env_var.to_string()) { + if let Some(value) = self.get(&(*env_var).to_string()) { result = format!("{}\n {}: {}", result, env_var, value) } } diff --git a/src/err.rs b/src/err.rs deleted file mode 100644 index 80dbdb0..0000000 --- a/src/err.rs +++ /dev/null @@ -1,75 +0,0 @@ -use std::{error::Error, fmt}; - -pub fn die_with_msg(msg: impl fmt::Display) -> ! { - eprintln!("FATAL ERROR: {}", msg); - std::process::exit(1); -} - -#[macro_export] -macro_rules! log_fatal { - ($str:expr, $var:expr) => {{ - log::error!($str, $var); - panic!(); - };}; -} - -#[derive(Debug)] -pub enum RedisParseErr { - Incomplete, - InvalidNumber(std::num::ParseIntError), - NonNumericInput, - InvalidLineStart(String), - InvalidLineEnd, - IncorrectRedisType, - MissingField, - UnsupportedTimeline, - UnsupportedEvent(serde_json::Error), -} - -impl fmt::Display for RedisParseErr { - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "{}", match self { - Self::Incomplete => "The input from Redis does not form a complete message, likely because the input buffer filled partway through a message. Save this input and try again with additional input from Redis.".to_string(), - Self::InvalidNumber(e) => format!( "Redis input cannot be parsed: {}", e), - Self::NonNumericInput => "Received non-numeric input when expecting a Redis number".to_string(), - Self::InvalidLineStart(s) => format!("Got `{}` as a line start from Redis", s), - Self::InvalidLineEnd => "Redis input ended before promised length".to_string(), - Self::IncorrectRedisType => "Received a non-array when expecting a Redis array".to_string(), - Self::MissingField => "Redis input was missing a required field".to_string(), - Self::UnsupportedTimeline => "The raw timeline received from Redis could not be parsed into a supported timeline".to_string(), - Self::UnsupportedEvent(e) => format!("The event text from Redis could not be parsed into a valid event: {}", e) - }) - } -} - -impl Error for RedisParseErr {} - -impl From for RedisParseErr { - fn from(error: std::num::ParseIntError) -> Self { - Self::InvalidNumber(error) - } -} - -impl From for RedisParseErr { - fn from(error: serde_json::Error) -> Self { - Self::UnsupportedEvent(error) - } -} - -impl From for RedisParseErr { - fn from(_: TimelineErr) -> Self { - Self::UnsupportedTimeline - } -} - -#[derive(Debug)] -pub enum TimelineErr { - RedisNamespaceMismatch, - InvalidInput, -} - -impl From for TimelineErr { - fn from(_error: std::num::ParseIntError) -> Self { - Self::InvalidInput - } -} diff --git a/src/err/mod.rs b/src/err/mod.rs new file mode 100644 index 0000000..dc96bbb --- /dev/null +++ b/src/err/mod.rs @@ -0,0 +1,18 @@ +mod timeline; + +pub use timeline::TimelineErr; + +use std::fmt; + +pub fn die_with_msg(msg: impl fmt::Display) -> ! { + eprintln!("FATAL ERROR: {}", msg); + std::process::exit(1); +} + +#[macro_export] +macro_rules! log_fatal { + ($str:expr, $var:expr) => {{ + log::error!($str, $var); + panic!(); + };}; +} diff --git a/src/err/timeline.rs b/src/err/timeline.rs new file mode 100644 index 0000000..4ba9f34 --- /dev/null +++ b/src/err/timeline.rs @@ -0,0 +1,24 @@ +use std::fmt; + +#[derive(Debug)] +pub enum TimelineErr { + RedisNamespaceMismatch, + InvalidInput, +} + +impl From for TimelineErr { + fn from(_error: std::num::ParseIntError) -> Self { + Self::InvalidInput + } +} + +impl fmt::Display for TimelineErr { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + use TimelineErr::*; + let msg = match self { + RedisNamespaceMismatch => "TODO: Cut this error", + InvalidInput => "The timeline text from Redis could not be parsed into a supported timeline. TODO: add incoming timeline text" + }; + write!(f, "{}", msg) + } +} diff --git a/src/main.rs b/src/main.rs index 2d2aaa4..2535f46 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,74 +1,71 @@ use flodgatt::{ config::{DeploymentConfig, EnvVar, PostgresConfig, RedisConfig}, parse_client_request::{PgPool, Subscription}, - redis_to_client_stream::{ClientAgent, EventStream}, + redis_to_client_stream::{ClientAgent, EventStream, Receiver}, }; -use std::{collections::HashMap, env, fs, net, os::unix::fs::PermissionsExt}; +use std::{env, fs, net::SocketAddr, os::unix::fs::PermissionsExt}; use tokio::net::UnixListener; -use warp::{path, ws::Ws2, Filter}; +use warp::{http::StatusCode, path, ws::Ws2, Filter, Rejection}; fn main() { - dotenv::from_filename(match env::var("ENV").ok().as_ref().map(String::as_str) { + dotenv::from_filename(match env::var("ENV").ok().as_deref() { Some("production") => ".env.production", Some("development") | None => ".env", Some(unsupported) => EnvVar::err("ENV", unsupported, "`production` or `development`"), }) .ok(); - let env_vars_map: HashMap<_, _> = dotenv::vars().collect(); - let env_vars = EnvVar::new(env_vars_map); + let env_vars = EnvVar::new(dotenv::vars().collect()); pretty_env_logger::init(); - - log::info!( - "Flodgatt recognized the following environmental variables:{}", - env_vars.clone() - ); - let redis_cfg = RedisConfig::from_env(env_vars.clone()); - let cfg = DeploymentConfig::from_env(env_vars.clone()); + log::info!("Environmental variables Flodgatt received: {}", &env_vars); let postgres_cfg = PostgresConfig::from_env(env_vars.clone()); + let redis_cfg = RedisConfig::from_env(env_vars.clone()); + let cfg = DeploymentConfig::from_env(env_vars); + let pg_pool = PgPool::new(postgres_cfg); - let client_agent_sse = ClientAgent::blank(redis_cfg); - let client_agent_ws = client_agent_sse.clone_with_shared_receiver(); - + let sharable_receiver = Receiver::try_from(redis_cfg) + .unwrap_or_else(|e| { + log::error!("{}\nFlodgatt shutting down...", e); + std::process::exit(1); + }) + .into_arc(); log::info!("Streaming server initialized and ready to accept connections"); // Server Sent Events + let sse_receiver = sharable_receiver.clone(); let (sse_interval, whitelist_mode) = (*cfg.sse_interval, *cfg.whitelist_mode); let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode) .and(warp::sse()) .map( move |subscription: Subscription, sse_connection_to_client: warp::sse::Sse| { log::info!("Incoming SSE request for {:?}", subscription.timeline); - // Create a new ClientAgent - let mut client_agent = client_agent_sse.clone_with_shared_receiver(); - // Assign ClientAgent to generate stream of updates for the user/timeline pair - client_agent.init_for_user(subscription); + let mut client_agent = ClientAgent::new(sse_receiver.clone(), &subscription); + client_agent.subscribe(); + // send the updates through the SSE connection - EventStream::to_sse(client_agent, sse_connection_to_client, sse_interval) + EventStream::send_to_sse(client_agent, sse_connection_to_client, sse_interval) }, ) .with(warp::reply::with::header("Connection", "keep-alive")); // WebSocket + let ws_receiver = sharable_receiver; let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode); - let websocket_routes = Subscription::from_ws_request(pg_pool.clone(), whitelist_mode) + let ws_routes = Subscription::from_ws_request(pg_pool, whitelist_mode) .and(warp::ws::ws2()) .map(move |subscription: Subscription, ws: Ws2| { log::info!("Incoming websocket request for {:?}", subscription.timeline); + let mut client_agent = ClientAgent::new(ws_receiver.clone(), &subscription); + client_agent.subscribe(); - let token = subscription.access_token.clone(); - // Create a new ClientAgent - let mut client_agent = client_agent_ws.clone_with_shared_receiver(); - // Assign that agent to generate a stream of updates for the user/timeline pair - client_agent.init_for_user(subscription); - // send the updates through the WS connection (along with the User's access_token - // which is sent for security) + // send the updates through the WS connection + // (along with the User's access_token which is sent for security) ( - ws.on_upgrade(move |socket| { - EventStream::to_ws(socket, client_agent, ws_update_interval) + ws.on_upgrade(move |s| { + EventStream::send_to_ws(s, client_agent, ws_update_interval) }), - token.unwrap_or_else(String::new), + subscription.access_token.unwrap_or_else(String::new), ) }) .map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token)); @@ -84,33 +81,23 @@ fn main() { log::info!("Using Unix socket {}", socket); fs::remove_file(socket).unwrap_or_default(); let incoming = UnixListener::bind(socket).unwrap().incoming(); - fs::set_permissions(socket, PermissionsExt::from_mode(0o666)).unwrap(); warp::serve( - health.or(websocket_routes.or(sse_routes).with(cors).recover( - |rejection: warp::reject::Rejection| { - let err_txt = match rejection.cause() { - Some(text) - if text.to_string() == "Missing request header 'authorization'" => - { - "Error: Missing access token".to_string() - } - Some(text) => text.to_string(), - None => "Error: Nonexistant endpoint".to_string(), - }; - let json = warp::reply::json(&err_txt); - - Ok(warp::reply::with_status( - json, - warp::http::StatusCode::UNAUTHORIZED, - )) - }, - )), + health.or(ws_routes.or(sse_routes).with(cors).recover(|r: Rejection| { + let json_err = match r.cause() { + Some(text) if text.to_string() == "Missing request header 'authorization'" => { + warp::reply::json(&"Error: Missing access token".to_string()) + } + Some(text) => warp::reply::json(&text.to_string()), + None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()), + }; + Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED)) + })), ) .run_incoming(incoming); } else { - let server_addr = net::SocketAddr::new(*cfg.address, cfg.port.0); - warp::serve(health.or(websocket_routes.or(sse_routes).with(cors))).run(server_addr); - } + let server_addr = SocketAddr::new(*cfg.address, *cfg.port); + warp::serve(health.or(ws_routes.or(sse_routes).with(cors))).run(server_addr); + }; } diff --git a/src/messages/mod.rs b/src/messages/mod.rs index f162725..813b771 100644 --- a/src/messages/mod.rs +++ b/src/messages/mod.rs @@ -305,10 +305,11 @@ pub struct Notification { status: Option, } -#[serde(rename_all = "lowercase", deny_unknown_fields)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] enum NotificationType { Follow, + FollowRequest, // Undocumented Mention, Reblog, Favourite, diff --git a/src/parse_client_request/postgres.rs b/src/parse_client_request/postgres.rs index 69650a5..4bb1348 100644 --- a/src/parse_client_request/postgres.rs +++ b/src/parse_client_request/postgres.rs @@ -83,7 +83,7 @@ LIMIT 1", } } - pub fn select_hashtag_id(self, tag_name: &String) -> Result { + pub fn select_hashtag_id(self, tag_name: &str) -> Result { let mut conn = self.0.get().unwrap(); let rows = &conn .query( diff --git a/src/parse_client_request/subscription.rs b/src/parse_client_request/subscription.rs index db6a94a..790a6ab 100644 --- a/src/parse_client_request/subscription.rs +++ b/src/parse_client_request/subscription.rs @@ -11,6 +11,7 @@ use crate::err::TimelineErr; use crate::log_fatal; use lru::LruCache; use std::collections::HashSet; +use uuid::Uuid; use warp::reject::Rejection; use super::query; @@ -50,6 +51,7 @@ macro_rules! parse_sse_query { #[derive(Clone, Debug, PartialEq)] pub struct Subscription { + pub id: Uuid, pub timeline: Timeline, pub allowed_langs: HashSet, pub blocks: Blocks, @@ -60,6 +62,7 @@ pub struct Subscription { impl Default for Subscription { fn default() -> Self { Self { + id: Uuid::new_v4(), timeline: Timeline(Stream::Unset, Reach::Local, Content::Notification), allowed_langs: HashSet::new(), blocks: Blocks::default(), @@ -123,12 +126,13 @@ impl Subscription { }; Ok(Subscription { + id: Uuid::new_v4(), timeline, allowed_langs: user.allowed_langs, blocks: Blocks { blocking_users: pool.clone().select_blocking_users(user.id), blocked_users: pool.clone().select_blocked_users(user.id), - blocked_domains: pool.clone().select_blocked_domains(user.id), + blocked_domains: pool.select_blocked_domains(user.id), }, hashtag_name, access_token: q.access_token, diff --git a/src/redis_to_client_stream/client_agent.rs b/src/redis_to_client_stream/client_agent.rs index 2c6de80..1948c63 100644 --- a/src/redis_to_client_stream/client_agent.rs +++ b/src/redis_to_client_stream/client_agent.rs @@ -15,10 +15,8 @@ //! Because `StreamManagers` are lightweight data structures that do not directly //! communicate with Redis, it we create a new `ClientAgent` for //! each new client connection (each in its own thread).use super::{message::Message, receiver::Receiver} -use super::receiver::Receiver; +use super::receiver::{Receiver, ReceiverErr}; use crate::{ - config, - err::RedisParseErr, messages::Event, parse_client_request::{Stream::Public, Subscription, Timeline}, }; @@ -26,33 +24,20 @@ use futures::{ Async::{self, NotReady, Ready}, Poll, }; -use std::sync::{Arc, Mutex}; -use uuid::Uuid; +use std::sync::{Arc, Mutex, MutexGuard}; /// Struct for managing all Redis streams. #[derive(Clone, Debug)] pub struct ClientAgent { receiver: Arc>, - id: Uuid, pub subscription: Subscription, } impl ClientAgent { - /// Create a new `ClientAgent` with no shared data. - pub fn blank(redis_cfg: config::RedisConfig) -> Self { + pub fn new(receiver: Arc>, subscription: &Subscription) -> Self { ClientAgent { - receiver: Arc::new(Mutex::new(Receiver::new(redis_cfg))), - id: Uuid::default(), - subscription: Subscription::default(), - } - } - - /// Clones the `ClientAgent`, sharing the `Receiver`. - pub fn clone_with_shared_receiver(&self) -> Self { - Self { - receiver: self.receiver.clone(), - id: self.id, - subscription: self.subscription.clone(), + receiver, + subscription: subscription.clone(), } } @@ -64,25 +49,32 @@ impl ClientAgent { /// a different user, the `Receiver` is responsible for figuring /// that out and avoiding duplicated connections. Thus, it is safe to /// use this method for each new client connection. - pub fn init_for_user(&mut self, subscription: Subscription) { - use std::time::Instant; - self.id = Uuid::new_v4(); - self.subscription = subscription; - let start_time = Instant::now(); - let mut receiver = self.receiver.lock().expect("No thread panic (stream.rs)"); - receiver.manage_new_timeline( - self.id, - self.subscription.timeline, - self.subscription.hashtag_name.clone(), - ); - log::info!("init_for_user had lock for: {:?}", start_time.elapsed()); + pub fn subscribe(&mut self) { + let mut receiver = self.lock_receiver(); + receiver + .add_subscription(&self.subscription) + .unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e)) + } + + fn lock_receiver(&self) -> MutexGuard { + match self.receiver.lock() { + Ok(inner) => inner, + Err(e) => { + log::error!( + "Another thread crashed: {}\n + Attempting to continue, possibly with invalid data", + e + ); + e.into_inner() + } + } } } /// The stream that the `ClientAgent` manages. `Poll` is the only method implemented. impl futures::stream::Stream for ClientAgent { type Item = Event; - type Error = RedisParseErr; + type Error = ReceiverErr; /// Checks for any new messages that should be sent to the client. /// @@ -94,12 +86,8 @@ impl futures::stream::Stream for ClientAgent { /// errors from the underlying data structures. fn poll(&mut self) -> Poll, Self::Error> { let result = { - let mut receiver = self - .receiver - .lock() - .expect("ClientAgent: No other thread panic"); - receiver.configure_for_polling(self.id, self.subscription.timeline); - receiver.poll() + let mut receiver = self.lock_receiver(); + receiver.poll_for(self.subscription.id, self.subscription.timeline) }; let allowed_langs = &self.subscription.allowed_langs; @@ -107,6 +95,7 @@ impl futures::stream::Stream for ClientAgent { let blocking_users = &self.subscription.blocks.blocking_users; let blocked_domains = &self.subscription.blocks.blocked_domains; let (send, block) = (|msg| Ok(Ready(Some(msg))), Ok(NotReady)); + use Event::*; match result { Ok(Async::Ready(Some(event))) => match event { diff --git a/src/redis_to_client_stream/event_stream.rs b/src/redis_to_client_stream/event_stream.rs index ddcf1d3..11705b0 100644 --- a/src/redis_to_client_stream/event_stream.rs +++ b/src/redis_to_client_stream/event_stream.rs @@ -8,12 +8,11 @@ use warp::{ sse::Sse, ws::{Message, WebSocket}, }; - pub struct EventStream; impl EventStream { /// Send a stream of replies to a WebSocket client. - pub fn to_ws( + pub fn send_to_ws( ws: WebSocket, mut client_agent: ClientAgent, interval: Duration, @@ -32,7 +31,7 @@ impl EventStream { .map(|_r| ()) .map_err(|e| match e.to_string().as_ref() { "IO error: Broken pipe (os error 32)" => (), // just closed unix socket - _ => log::warn!("websocket send error: {}", e), + _ => log::warn!("WebSocket send error: {}", e), }), ); @@ -42,7 +41,6 @@ impl EventStream { match ws_rx.poll() { Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true), Ok(Async::Ready(None)) => { - // TODO: consider whether we should manually drop closed connections here log::info!("Client closed WebSocket connection for {:?}", timeline); futures::future::ok(false) } @@ -58,28 +56,36 @@ impl EventStream { } }); - let mut time = Instant::now(); + let mut last_ping_time = Instant::now(); // Every time you get an event from that stream, send it through the pipe event_stream .for_each(move |_instant| { - if let Ok(Async::Ready(Some(msg))) = client_agent.poll() { - tx.unbounded_send(Message::text(msg.to_json_string())) - .expect("No send error"); - }; - if time.elapsed() > Duration::from_secs(30) { - tx.unbounded_send(Message::text("{}")).expect("Can ping"); - time = Instant::now(); + match client_agent.poll() { + Ok(Async::Ready(Some(msg))) => tx + .unbounded_send(Message::text(msg.to_json_string())) + .unwrap_or_else(|e| { + log::error!("Could not send message to WebSocket: {}", e) + }), + Ok(Async::Ready(None)) => log::info!("WebSocket ClientAgent got Ready(None)"), + Ok(Async::NotReady) if last_ping_time.elapsed() > Duration::from_secs(30) => { + tx.unbounded_send(Message::text("{}")).unwrap_or_else(|e| { + log::error!("Could not send ping to WebSocket: {}", e) + }); + last_ping_time = Instant::now(); + } + Ok(Async::NotReady) => (), // no new messages; nothing to do + Err(e) => log::error!("{}\n Dropping WebSocket message and continuing.", e), } Ok(()) }) .then(move |result| { - // TODO: consider whether we should manually drop closed connections here log::info!("WebSocket connection for {:?} closed.", timeline); result }) .map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e)) } - pub fn to_sse(mut client_agent: ClientAgent, sse: Sse, interval: Duration) -> impl Reply { + + pub fn send_to_sse(mut client_agent: ClientAgent, sse: Sse, interval: Duration) -> impl Reply { let event_stream = tokio::timer::Interval::new(Instant::now(), interval).filter_map(move |_| { match client_agent.poll() { @@ -87,7 +93,15 @@ impl EventStream { warp::sse::event(event.event_name()), warp::sse::data(event.payload().unwrap_or_else(String::new)), )), - _ => None, + Ok(Async::Ready(None)) => { + log::info!("SSE ClientAgent got Ready(None)"); + None + } + Ok(Async::NotReady) => None, + Err(e) => { + log::error!("{}\n Dropping SSE message and continuing.", e); + None + } } }); diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index ce7e1c5..4b73844 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -4,14 +4,4 @@ mod event_stream; mod receiver; mod redis; -pub use {client_agent::ClientAgent, event_stream::EventStream}; - -// TODO remove -pub use redis::redis_msg; - -//#[cfg(test)] -//pub use receiver::process_messages; -//#[cfg(test)] -pub use receiver::{MessageQueues, MsgQueue}; -//#[cfg(test)] -//pub use redis::redis_msg::{RedisMsg, RedisUtf8}; +pub use {client_agent::ClientAgent, event_stream::EventStream, receiver::Receiver}; diff --git a/src/redis_to_client_stream/receiver/err.rs b/src/redis_to_client_stream/receiver/err.rs new file mode 100644 index 0000000..b3723e1 --- /dev/null +++ b/src/redis_to_client_stream/receiver/err.rs @@ -0,0 +1,50 @@ +use super::super::redis::{RedisConnErr, RedisParseErr}; +use crate::err::TimelineErr; + +use serde_json; +use std::fmt; + +#[derive(Debug)] +pub enum ReceiverErr { + TimelineErr(TimelineErr), + EventErr(serde_json::Error), + RedisParseErr(RedisParseErr), + RedisConnErr(RedisConnErr), +} + +impl fmt::Display for ReceiverErr { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + use ReceiverErr::*; + match self { + EventErr(inner) => write!(f, "{}", inner), + RedisParseErr(inner) => write!(f, "{}", inner), + RedisConnErr(inner) => write!(f, "{}", inner), + TimelineErr(inner) => write!(f, "{}", inner), + }?; + Ok(()) + } +} + +impl From for ReceiverErr { + fn from(error: serde_json::Error) -> Self { + Self::EventErr(error) + } +} + +impl From for ReceiverErr { + fn from(e: RedisConnErr) -> Self { + Self::RedisConnErr(e) + } +} + +impl From for ReceiverErr { + fn from(e: TimelineErr) -> Self { + Self::TimelineErr(e) + } +} + +impl From for ReceiverErr { + fn from(e: RedisParseErr) -> Self { + Self::RedisParseErr(e) + } +} diff --git a/src/redis_to_client_stream/receiver/message_queues.rs b/src/redis_to_client_stream/receiver/message_queues.rs index eb45b30..593f660 100644 --- a/src/redis_to_client_stream/receiver/message_queues.rs +++ b/src/redis_to_client_stream/receiver/message_queues.rs @@ -1,5 +1,6 @@ use crate::messages::Event; use crate::parse_client_request::Timeline; + use std::{ collections::{HashMap, VecDeque}, fmt, @@ -13,22 +14,6 @@ pub struct MsgQueue { pub messages: VecDeque, last_polled_at: Instant, } -impl fmt::Debug for MsgQueue { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "\ -MsgQueue {{ - timeline: {:?}, - messages: {:?}, - last_polled_at: {:?}, -}}", - self.timeline, - self.messages, - self.last_polled_at.elapsed(), - ) - } -} impl MsgQueue { pub fn new(timeline: Timeline) -> Self { @@ -38,26 +23,15 @@ impl MsgQueue { timeline, } } + pub fn update_polled_at_time(&mut self) { + self.last_polled_at = Instant::now(); + } } #[derive(Debug)] pub struct MessageQueues(pub HashMap); impl MessageQueues { - pub fn update_time_for_target_queue(&mut self, id: Uuid) { - self.entry(id) - .and_modify(|queue| queue.last_polled_at = Instant::now()); - } - - pub fn oldest_msg_in_target_queue(&mut self, id: Uuid, timeline: Timeline) -> Option { - let msg_qs_entry = self.entry(id); - let mut inserted_tl = false; - let msg_q = msg_qs_entry.or_insert_with(|| { - inserted_tl = true; - MsgQueue::new(timeline) - }); - msg_q.messages.pop_front() - } pub fn calculate_timelines_to_add_or_drop(&mut self, timeline: Timeline) -> Vec { let mut timelines_to_modify = Vec::new(); @@ -85,6 +59,23 @@ pub struct Change { pub in_subscriber_number: i32, } +impl fmt::Debug for MsgQueue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "\ +MsgQueue {{ + timeline: {:?}, + messages: {:?}, + last_polled_at: {:?} ago, +}}", + self.timeline, + self.messages, + self.last_polled_at.elapsed(), + ) + } +} + impl std::ops::Deref for MessageQueues { type Target = HashMap; fn deref(&self) -> &Self::Target { diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index 23e5948..63fa863 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -1,51 +1,53 @@ //! Receives data from Redis, sorts it by `ClientAgent`, and stores it until //! polled by the correct `ClientAgent`. Also manages sububscriptions and //! unsubscriptions to/from Redis. +mod err; mod message_queues; +pub use err::ReceiverErr; pub use message_queues::{MessageQueues, MsgQueue}; +use super::redis::{redis_connection::RedisCmd, RedisConn}; + use crate::{ config, - err::RedisParseErr, messages::Event, - parse_client_request::{Stream, Timeline}, - redis_to_client_stream::redis::RedisConn, + parse_client_request::{Stream, Subscription, Timeline}, }; + use futures::{Async, Poll}; -use lru::LruCache; -use std::{collections::HashMap, time::Instant}; +use std::{ + collections::HashMap, + result, + sync::{Arc, Mutex}, +}; use uuid::Uuid; +type Result = result::Result; + /// The item that streams from Redis and is polled by the `ClientAgent` #[derive(Debug)] pub struct Receiver { redis_connection: RedisConn, - timeline: Timeline, - manager_id: Uuid, pub msg_queues: MessageQueues, clients_per_timeline: HashMap, - hashtag_cache: LruCache, - // TODO: eventually, it might make sense to have Mastodon publish to timelines with - // the tag number instead of the tag name. This would save us from dealing - // with a cache here and would be consistent with how lists/users are handled. } impl Receiver { /// Create a new `Receiver`, with its own Redis connections (but, as yet, no /// active subscriptions). - pub fn new(redis_cfg: config::RedisConfig) -> Self { - let redis_connection = RedisConn::new(redis_cfg); + pub fn try_from(redis_cfg: config::RedisConfig) -> Result { + let redis_connection = RedisConn::new(redis_cfg)?; - Self { + Ok(Self { redis_connection, - timeline: Timeline::empty(), - manager_id: Uuid::default(), msg_queues: MessageQueues(HashMap::new()), clients_per_timeline: HashMap::new(), - hashtag_cache: LruCache::new(1000), - // should this be a run-time option? - } + }) + } + + pub fn into_arc(self) -> Arc> { + Arc::new(Mutex::new(self)) } /// Assigns the `Receiver` a new timeline to monitor and runs other @@ -54,73 +56,24 @@ impl Receiver { /// Note: this method calls `subscribe_or_unsubscribe_as_needed`, /// so Redis PubSub subscriptions are only updated when a new timeline /// comes under management for the first time. - pub fn manage_new_timeline(&mut self, id: Uuid, tl: Timeline, hashtag: Option) { - self.timeline = tl; - if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (hashtag, tl) { - self.hashtag_cache.put(id, hashtag.clone()); + pub fn add_subscription(&mut self, subscription: &Subscription) -> Result<()> { + let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); + + if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (tag, tl) { self.redis_connection.update_cache(hashtag, id); }; - - self.msg_queues.insert(id, MsgQueue::new(tl)); - self.subscribe_or_unsubscribe_as_needed(tl); + self.msg_queues.insert(subscription.id, MsgQueue::new(tl)); + self.subscribe_or_unsubscribe_as_needed(tl)?; + Ok(()) } - /// Set the `Receiver`'s manager_id and target_timeline fields to the appropriate - /// value to be polled by the current `StreamManager`. - pub fn configure_for_polling(&mut self, manager_id: Uuid, timeline: Timeline) { - self.manager_id = manager_id; - self.timeline = timeline; - } - - /// Drop any PubSub subscriptions that don't have active clients and check - /// that there's a subscription to the current one. If there isn't, then - /// subscribe to it. - fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: Timeline) { - let start_time = Instant::now(); - let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(timeline); - - // Record the lower number of clients subscribed to that channel - for change in timelines_to_modify { - let timeline = change.timeline; - let hashtag = match timeline { - Timeline(Stream::Hashtag(id), _, _) => self.hashtag_cache.get(&id), - _non_hashtag_timeline => None, - }; - - let count_of_subscribed_clients = self - .clients_per_timeline - .entry(timeline) - .and_modify(|n| *n += change.in_subscriber_number) - .or_insert_with(|| 1); - - // If no clients, unsubscribe from the channel - if *count_of_subscribed_clients <= 0 { - self.redis_connection - .send_unsubscribe_cmd(&timeline.to_redis_raw_timeline(hashtag)); - } else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 { - self.redis_connection - .send_subscribe_cmd(&timeline.to_redis_raw_timeline(hashtag)); - } - } - if start_time.elapsed().as_millis() > 1 { - log::warn!("Sending cmd to Redis took: {:?}", start_time.elapsed()); - }; - } -} - -/// The stream that the ClientAgent polls to learn about new messages. -impl futures::stream::Stream for Receiver { - type Item = Event; - type Error = RedisParseErr; - /// Returns the oldest message in the `ClientAgent`'s queue (if any). /// /// Note: This method does **not** poll Redis every time, because polling /// Redis is significantly more time consuming that simply returning the /// message already in a queue. Thus, we only poll Redis if it has not /// been polled lately. - fn poll(&mut self) -> Poll, Self::Error> { - let (timeline, id) = (self.timeline.clone(), self.manager_id); + pub fn poll_for(&mut self, id: Uuid, timeline: Timeline) -> Poll, ReceiverErr> { loop { match self.redis_connection.poll_redis() { Ok(Async::Ready(Some((timeline, event)))) => self @@ -136,13 +89,47 @@ impl futures::stream::Stream for Receiver { } } - // Record current time as last polled time - self.msg_queues.update_time_for_target_queue(id); - // If the `msg_queue` being polled has any new messages, return the first (oldest) one - match self.msg_queues.oldest_msg_in_target_queue(id, timeline) { - Some(value) => Ok(Async::Ready(Some(value))), - _ => Ok(Async::NotReady), + match self.msg_queues.get_mut(&id) { + Some(msg_q) => { + msg_q.update_polled_at_time(); + match msg_q.messages.pop_front() { + Some(event) => Ok(Async::Ready(Some(event))), + None => Ok(Async::NotReady), + } + } + None => { + log::error!("Polled a MsgQueue that had not been set up. Setting it up now."); + self.msg_queues.insert(id, MsgQueue::new(timeline)); + Ok(Async::NotReady) + } } } + + /// Drop any PubSub subscriptions that don't have active clients and check + /// that there's a subscription to the current one. If there isn't, then + /// subscribe to it. + fn subscribe_or_unsubscribe_as_needed(&mut self, tl: Timeline) -> Result<()> { + let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(tl); + + // Record the lower number of clients subscribed to that channel + for change in timelines_to_modify { + let timeline = change.timeline; + + let count_of_subscribed_clients = self + .clients_per_timeline + .entry(timeline) + .and_modify(|n| *n += change.in_subscriber_number) + .or_insert_with(|| 1); + + // If no clients, unsubscribe from the channel + use RedisCmd::*; + if *count_of_subscribed_clients <= 0 { + self.redis_connection.send_cmd(Unsubscribe, &timeline)?; + } else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 { + self.redis_connection.send_cmd(Subscribe, &timeline)? + } + } + Ok(()) + } } diff --git a/src/redis_to_client_stream/redis/mod.rs b/src/redis_to_client_stream/redis/mod.rs index 70ad337..69334d6 100644 --- a/src/redis_to_client_stream/redis/mod.rs +++ b/src/redis_to_client_stream/redis/mod.rs @@ -1,5 +1,5 @@ -pub mod redis_cmd; pub mod redis_connection; pub mod redis_msg; -pub use redis_connection::RedisConn; +pub use redis_connection::{RedisConn, RedisConnErr}; +pub use redis_msg::RedisParseErr; diff --git a/src/redis_to_client_stream/redis/redis_cmd.rs b/src/redis_to_client_stream/redis/redis_cmd.rs deleted file mode 100644 index f7f24a4..0000000 --- a/src/redis_to_client_stream/redis/redis_cmd.rs +++ /dev/null @@ -1,75 +0,0 @@ -//! Send raw TCP commands to the Redis server -use std::fmt::Display; - -/// Send a subscribe or unsubscribe to the Redis PubSub channel -#[macro_export] -macro_rules! pubsub_cmd { - ($cmd:expr, $self:expr, $tl:expr) => {{ - use std::io::Write; - log::info!("Sending {} command to {}", $cmd, $tl); - let namespace = $self.redis_namespace.clone(); - - $self - .primary - .write_all(&redis_cmd::pubsub($cmd, $tl, namespace.clone())) - .expect("Can send command to Redis"); - // Because we keep track of the number of clients subscribed to a channel on our end, - // we need to manually tell Redis when we have subscribed or unsubscribed - let subscription_new_number = match $cmd { - "unsubscribe" => "0", - "subscribe" => "1", - _ => panic!("Given unacceptable PUBSUB command"), - }; - $self - .secondary - .write_all(&redis_cmd::set( - format!("subscribed:{}", $tl), - subscription_new_number, - namespace.clone(), - )) - .expect("Can set Redis"); - - // TODO: re-enable info logging >>> log::info!("Now subscribed to: {:#?}", $self.msg_queues); - }}; -} -/// Send a `SUBSCRIBE` or `UNSUBSCRIBE` command to a specific timeline -pub fn pubsub(command: impl Display, timeline: impl Display, ns: Option) -> Vec { - let arg = match ns { - Some(namespace) => format!("{}:{}", namespace, timeline), - None => format!("{}", timeline), - }; - cmd(command, arg) -} - -/// Send a generic two-item command to Redis -pub fn cmd(command: impl Display, arg: impl Display) -> Vec { - let (command, arg) = (command.to_string(), arg.to_string()); - log::info!("Sent {} command", &command); - format!( - "*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n", - cmd_length = command.len(), - cmd = command, - arg_length = arg.len(), - arg = arg - ) - .as_bytes() - .to_owned() -} - -/// Send a `SET` command (used to manually unsubscribe from Redis) -pub fn set(key: impl Display, value: impl Display, ns: Option) -> Vec { - let key = match ns { - Some(namespace) => format!("{}:{}", namespace, key), - None => key.to_string(), - }; - let value = value.to_string(); - format!( - "*3\r\n$3\r\nSET\r\n${key_length}\r\n{key}\r\n${value_length}\r\n{value}\r\n", - key_length = key.len(), - key = key, - value_length = value.len(), - value = value - ) - .as_bytes() - .to_owned() -} diff --git a/src/redis_to_client_stream/redis/redis_connection.rs b/src/redis_to_client_stream/redis/redis_connection.rs deleted file mode 100644 index 2689786..0000000 --- a/src/redis_to_client_stream/redis/redis_connection.rs +++ /dev/null @@ -1,189 +0,0 @@ -use super::{redis_cmd, redis_msg::RedisParseOutput}; -use crate::{ - config::RedisConfig, - err::{self, RedisParseErr}, - messages::Event, - parse_client_request::Timeline, - pubsub_cmd, -}; - -use futures::{Async, Poll}; -use lru::LruCache; -use std::{ - convert::TryFrom, - io::Read, - io::Write, - net, str, - time::{Duration, Instant}, -}; -use tokio::io::AsyncRead; - -#[derive(Debug)] -pub struct RedisConn { - primary: net::TcpStream, - secondary: net::TcpStream, - redis_poll_interval: Duration, - redis_polled_at: Instant, - redis_namespace: Option, - cache: LruCache, - redis_input: Vec, // TODO: Consider queue internal to RedisConn -} - -impl RedisConn { - pub fn new(redis_cfg: RedisConfig) -> Self { - let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port); - let conn_err = |e| { - err::die_with_msg(format!( - "Could not connect to Redis at {}:{}.\n Error detail: {}", - *redis_cfg.host, *redis_cfg.port, e, - )) - }; - let update_conn = |mut conn| { - if let Some(password) = redis_cfg.password.clone() { - conn = send_password(conn, &password); - } - conn = send_test_ping(conn); - conn.set_read_timeout(Some(Duration::from_millis(10))) - .expect("Can set read timeout for Redis connection"); - if let Some(db) = &*redis_cfg.db { - conn = set_db(conn, db); - } - conn - }; - let (primary_conn, secondary_conn) = ( - update_conn(net::TcpStream::connect(addr.clone()).unwrap_or_else(conn_err)), - update_conn(net::TcpStream::connect(addr).unwrap_or_else(conn_err)), - ); - primary_conn - .set_nonblocking(true) - .expect("set_nonblocking call failed"); - - Self { - primary: primary_conn, - secondary: secondary_conn, - cache: LruCache::new(1000), - redis_namespace: redis_cfg.namespace.clone(), - redis_poll_interval: *redis_cfg.polling_interval, - redis_input: Vec::new(), - redis_polled_at: Instant::now(), - } - } - - pub fn poll_redis(&mut self) -> Poll, RedisParseErr> { - let mut buffer = vec![0u8; 6000]; - if self.redis_polled_at.elapsed() > self.redis_poll_interval { - if let Ok(Async::Ready(bytes_read)) = self.poll_read(&mut buffer) { - self.redis_input.extend_from_slice(&buffer[..bytes_read]); - } - } - let input = self.redis_input.clone(); - self.redis_input.clear(); - - let (input, invalid_bytes) = str::from_utf8(&input) - .map(|input| (input, "".as_bytes())) - .unwrap_or_else(|e| { - let (valid, invalid) = input.split_at(e.valid_up_to()); - (str::from_utf8(valid).expect("Guaranteed by ^^^^"), invalid) - }); - - use {Async::*, RedisParseOutput::*}; - let (res, leftover) = match RedisParseOutput::try_from(input) { - Ok(Msg(msg)) => match &self.redis_namespace { - Some(ns) if msg.timeline_txt.starts_with(&format!("{}:timeline:", ns)) => { - let tl = Timeline::from_redis_text( - &msg.timeline_txt[ns.len() + ":timeline:".len()..], - &mut self.cache, - )?; - let event: Event = serde_json::from_str(msg.event_txt)?; - (Ok(Ready(Some((tl, event)))), msg.leftover_input) - } - None => { - let tl = Timeline::from_redis_text( - &msg.timeline_txt["timeline:".len()..], - &mut self.cache, - )?; - - let event: Event = serde_json::from_str(msg.event_txt)?; - (Ok(Ready(Some((tl, event)))), msg.leftover_input) - } - Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input), - }, - Ok(NonMsg(leftover)) => (Ok(Ready(None)), leftover), - Err(RedisParseErr::Incomplete) => (Ok(NotReady), input), - Err(other) => (Err(other), input), - }; - self.redis_input.extend_from_slice(leftover.as_bytes()); - self.redis_input.extend_from_slice(invalid_bytes); - res - } - - pub fn update_cache(&mut self, hashtag: String, id: i64) { - self.cache.put(hashtag, id); - } - - pub fn send_unsubscribe_cmd(&mut self, timeline: &str) { - pubsub_cmd!("unsubscribe", self, timeline); - } - pub fn send_subscribe_cmd(&mut self, timeline: &str) { - pubsub_cmd!("subscribe", self, timeline); - } -} - -fn send_password(mut conn: net::TcpStream, password: &str) -> net::TcpStream { - conn.write_all(&redis_cmd::cmd("auth", &password)).unwrap(); - let mut buffer = vec![0u8; 5]; - conn.read_exact(&mut buffer).unwrap(); - let reply = String::from_utf8(buffer.to_vec()).unwrap(); - if reply != "+OK\r\n" { - err::die_with_msg(format!( - r"Incorrect Redis password. You supplied `{}`. - Please supply correct password with REDIS_PASSWORD environmental variable.", - password, - )) - }; - conn -} - -fn set_db(mut conn: net::TcpStream, db: &str) -> net::TcpStream { - conn.write_all(&redis_cmd::cmd("SELECT", &db)).unwrap(); - conn -} - -fn send_test_ping(mut conn: net::TcpStream) -> net::TcpStream { - conn.write_all(b"PING\r\n").unwrap(); - let mut buffer = vec![0u8; 7]; - conn.read_exact(&mut buffer).unwrap(); - let reply = String::from_utf8(buffer.to_vec()).unwrap(); - match reply.as_str() { - "+PONG\r\n" => (), - "-NOAUTH" => err::die_with_msg( - r"Invalid authentication for Redis. - Redis reports that it needs a password, but you did not provide one. - You can set a password with the REDIS_PASSWORD environmental variable.", - ), - "HTTP/1." => err::die_with_msg( - r"The server at REDIS_HOST and REDIS_PORT is not a Redis server. - Please update the REDIS_HOST and/or REDIS_PORT environmental variables.", - ), - _ => err::die_with_msg(format!( - "Could not connect to Redis for unknown reason. Expected `+PONG` reply but got {}", - reply - )), - }; - conn -} - -impl Read for RedisConn { - fn read(&mut self, buffer: &mut [u8]) -> Result { - self.primary.read(buffer) - } -} - -impl AsyncRead for RedisConn { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll { - match self.read(buf) { - Ok(t) => Ok(Async::Ready(t)), - Err(_) => Ok(Async::NotReady), - } - } -} diff --git a/src/redis_to_client_stream/redis/redis_connection/err.rs b/src/redis_to_client_stream/redis/redis_connection/err.rs new file mode 100644 index 0000000..ddaf3b4 --- /dev/null +++ b/src/redis_to_client_stream/redis/redis_connection/err.rs @@ -0,0 +1,61 @@ +use std::fmt; + +#[derive(Debug)] +pub enum RedisConnErr { + ConnectionErr { addr: String, inner: std::io::Error }, + InvalidRedisReply(String), + UnknownRedisErr(std::io::Error), + IncorrectPassword(String), + MissingPassword, + NotRedis(String), +} + +impl RedisConnErr { + pub fn with_addr>(address: T, inner: std::io::Error) -> Self { + Self::ConnectionErr { + addr: address.as_ref().to_string(), + inner, + } + } +} + +impl fmt::Display for RedisConnErr { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + use RedisConnErr::*; + let msg = match self { + ConnectionErr { addr, inner } => format!( + "Error connecting to Redis at {}.\n\ + Connection Error: {}", + addr, inner + ), + InvalidRedisReply(unexpected_reply) => format!( + "Received and unexpected reply from Redis. Expected `+PONG` reply but got `{}`", + unexpected_reply + ), + UnknownRedisErr(io_err) => { + format!("Unexpected failure communicating with Redis: {}", io_err) + } + IncorrectPassword(attempted_password) => format!( + "Incorrect Redis password. You supplied `{}`.\n \ + Please supply correct password with REDIS_PASSWORD environmental variable.", + attempted_password + ), + MissingPassword => "Invalid authentication for Redis. Redis is configured to require \ + a password, but you did not provide one. \n\ + Set a password using the REDIS_PASSWORD environmental variable." + .to_string(), + NotRedis(addr) => format!( + "The server at {} is not a Redis server. Please update the REDIS_HOST and/or \ + REDIS_PORT environmental variables and try again.", + addr + ), + }; + write!(f, "{}", msg) + } +} + +impl From for RedisConnErr { + fn from(e: std::io::Error) -> RedisConnErr { + RedisConnErr::UnknownRedisErr(e) + } +} diff --git a/src/redis_to_client_stream/redis/redis_connection/mod.rs b/src/redis_to_client_stream/redis/redis_connection/mod.rs new file mode 100644 index 0000000..45fb3ca --- /dev/null +++ b/src/redis_to_client_stream/redis/redis_connection/mod.rs @@ -0,0 +1,180 @@ +mod err; +pub use err::RedisConnErr; + +use super::super::receiver::ReceiverErr; +use super::redis_msg::{RedisParseErr, RedisParseOutput}; +use crate::{ + config::RedisConfig, + messages::Event, + parse_client_request::{Stream, Timeline}, +}; + +use std::{ + convert::TryFrom, + io::{Read, Write}, + net::TcpStream, + str, + time::{Duration, Instant}, +}; + +use futures::{Async, Poll}; +use lru::LruCache; + +type Result = std::result::Result; + +#[derive(Debug)] +pub struct RedisConn { + primary: TcpStream, + secondary: TcpStream, + redis_poll_interval: Duration, + redis_polled_at: Instant, + redis_namespace: Option, + tag_id_cache: LruCache, + tag_name_cache: LruCache, + redis_input: Vec, +} + +impl RedisConn { + pub fn new(redis_cfg: RedisConfig) -> Result { + let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port); + let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?; + conn.set_nonblocking(true) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let redis_conn = Self { + primary: conn, + secondary: Self::new_connection(&addr, redis_cfg.password.as_ref())?, + tag_id_cache: LruCache::new(1000), + tag_name_cache: LruCache::new(1000), + // TODO: eventually, it might make sense to have Mastodon publish to timelines with + // the tag number instead of the tag name. This would save us from dealing + // with a cache here and would be consistent with how lists/users are handled. + redis_namespace: redis_cfg.namespace.clone(), + redis_poll_interval: *redis_cfg.polling_interval, + redis_input: Vec::new(), + redis_polled_at: Instant::now(), + }; + Ok(redis_conn) + } + + pub fn poll_redis(&mut self) -> Poll, ReceiverErr> { + let mut buffer = vec![0u8; 6000]; + if self.redis_polled_at.elapsed() > self.redis_poll_interval { + if let Ok(bytes_read) = self.primary.read(&mut buffer) { + self.redis_input.extend_from_slice(&buffer[..bytes_read]); + } + } + if self.redis_input.is_empty() { + return Ok(Async::NotReady); + } + let input = self.redis_input.clone(); + self.redis_input.clear(); + + let (input, invalid_bytes) = str::from_utf8(&input) + .map(|input| (input, "".as_bytes())) + .unwrap_or_else(|e| { + let (valid, invalid) = input.split_at(e.valid_up_to()); + (str::from_utf8(valid).expect("Guaranteed by ^^^^"), invalid) + }); + + use {Async::*, RedisParseOutput::*}; + let (res, leftover) = match RedisParseOutput::try_from(input) { + Ok(Msg(msg)) => match &self.redis_namespace { + Some(ns) if msg.timeline_txt.starts_with(&format!("{}:timeline:", ns)) => { + let trimmed_tl_txt = &msg.timeline_txt[ns.len() + ":timeline:".len()..]; + let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut self.tag_id_cache)?; + let event: Event = serde_json::from_str(msg.event_txt)?; + (Ok(Ready(Some((tl, event)))), msg.leftover_input) + } + None => { + let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..]; + let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut self.tag_id_cache)?; + let event: Event = serde_json::from_str(msg.event_txt)?; + (Ok(Ready(Some((tl, event)))), msg.leftover_input) + } + Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input), + }, + Ok(NonMsg(leftover)) => (Ok(Ready(None)), leftover), + Err(RedisParseErr::Incomplete) => (Ok(NotReady), input), + Err(other_parse_err) => (Err(ReceiverErr::RedisParseErr(other_parse_err)), input), + }; + self.redis_input.extend_from_slice(leftover.as_bytes()); + self.redis_input.extend_from_slice(invalid_bytes); + res + } + + pub fn update_cache(&mut self, hashtag: String, id: i64) { + self.tag_id_cache.put(hashtag.clone(), id); + self.tag_name_cache.put(id, hashtag); + } + + fn new_connection(addr: &str, pass: Option<&String>) -> Result { + match TcpStream::connect(&addr) { + Ok(mut conn) => { + if let Some(password) = pass { + Self::auth_connection(&mut conn, &addr, password)?; + } + + Self::validate_connection(&mut conn, &addr)?; + conn.set_read_timeout(Some(Duration::from_millis(10))) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + Ok(conn) + } + Err(e) => Err(RedisConnErr::with_addr(&addr, e)), + } + } + fn auth_connection(conn: &mut TcpStream, addr: &str, pass: &str) -> Result<()> { + conn.write_all(&format!("*2\r\n$4\r\nauth\r\n${}\r\n{}\r\n", pass.len(), pass).as_bytes()) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let mut buffer = vec![0u8; 5]; + conn.read_exact(&mut buffer) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let reply = String::from_utf8_lossy(&buffer); + match &*reply { + "+OK\r\n" => (), + _ => Err(RedisConnErr::IncorrectPassword(pass.to_string()))?, + }; + Ok(()) + } + + fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<()> { + conn.write_all(b"PING\r\n") + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let mut buffer = vec![0u8; 7]; + conn.read_exact(&mut buffer) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let reply = String::from_utf8_lossy(&buffer); + match &*reply { + "+PONG\r\n" => Ok(()), + "-NOAUTH" => Err(RedisConnErr::MissingPassword), + "HTTP/1." => Err(RedisConnErr::NotRedis(addr.to_string())), + _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), + } + } + + pub fn send_cmd(&mut self, cmd: RedisCmd, timeline: &Timeline) -> Result<()> { + let hashtag = match timeline { + Timeline(Stream::Hashtag(id), _, _) => self.tag_name_cache.get(id), + _non_hashtag_timeline => None, + }; + let tl = timeline.to_redis_raw_timeline(hashtag); + + let (primary_cmd, secondary_cmd) = match cmd { + RedisCmd::Subscribe => ( + format!("*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl), + format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n1\r\n", tl.len(), tl), + ), + RedisCmd::Unsubscribe => ( + format!("*2\r\n$11\r\nunsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl), + format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n0\r\n", tl.len(), tl), + ), + }; + self.primary.write_all(&primary_cmd.as_bytes())?; + self.secondary.write_all(&secondary_cmd.as_bytes())?; + Ok(()) + } +} + +pub enum RedisCmd { + Subscribe, + Unsubscribe, +} diff --git a/src/redis_to_client_stream/redis/redis_msg/err.rs b/src/redis_to_client_stream/redis/redis_msg/err.rs new file mode 100644 index 0000000..9a3c9ee --- /dev/null +++ b/src/redis_to_client_stream/redis/redis_msg/err.rs @@ -0,0 +1,50 @@ +use std::{error::Error, fmt}; + +#[derive(Debug)] +pub enum RedisParseErr { + Incomplete, + InvalidNumber(std::num::ParseIntError), + InvalidLineStart(String), + InvalidLineEnd, + IncorrectRedisType, + MissingField, +} + +impl fmt::Display for RedisParseErr { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + use RedisParseErr::*; + let msg = match self { + Incomplete => "The input from Redis does not form a complete message, likely because \ + the input buffer filled partway through a message. Save this input \ + and try again with additional input from Redis." + .to_string(), + InvalidNumber(parse_int_err) => format!( + "Redis indicated that an item would be a number, but it could not be parsed: {}", + parse_int_err + ), + + InvalidLineStart(line_start_char) => format!( + "A line from Redis started with `{}`, which is not a valid character to indicate \ + the type of the Redis line.", + line_start_char + ), + InvalidLineEnd => "A Redis line ended before expected line length".to_string(), + IncorrectRedisType => "Received a Redis type that is not supported in this context. \ + Flodgatt expects each message from Redis to be a Redis array \ + consisting of bulk strings or integers." + .to_string(), + MissingField => "Redis input was missing a field Flodgatt expected (e.g., a `message` \ + without a payload line)" + .to_string(), + }; + write!(f, "{}", msg) + } +} + +impl Error for RedisParseErr {} + +impl From for RedisParseErr { + fn from(error: std::num::ParseIntError) -> Self { + Self::InvalidNumber(error) + } +} diff --git a/src/redis_to_client_stream/redis/redis_msg/mod.rs b/src/redis_to_client_stream/redis/redis_msg/mod.rs index 8c3386d..de9c542 100644 --- a/src/redis_to_client_stream/redis/redis_msg/mod.rs +++ b/src/redis_to_client_stream/redis/redis_msg/mod.rs @@ -20,8 +20,11 @@ //! three characters, the second is a bulk string with ten characters, and the third is a //! bulk string with 1,386 characters. +mod err; +pub use err::RedisParseErr; + use self::RedisParseOutput::*; -use crate::err::RedisParseErr; + use std::{ convert::{TryFrom, TryInto}, str, @@ -84,7 +87,7 @@ fn utf8_to_redis_data<'a>(s: &'a str) -> Result<(RedisData, &'a str), RedisParse } } -fn after_newline_at<'a>(s: &'a str, start: usize) -> RedisParser<'a, &'a str> { +fn after_newline_at(s: &str, start: usize) -> RedisParser<&str> { let s = s.get(start..).ok_or(Incomplete)?; if !s.starts_with("\r\n") { return Err(RedisParseErr::InvalidLineEnd); @@ -93,10 +96,7 @@ fn after_newline_at<'a>(s: &'a str, start: usize) -> RedisParser<'a, &'a str> { } fn parse_number_at<'a>(s: &'a str) -> RedisParser<(usize, &'a str)> { - let len = s - .chars() - .position(|c| !c.is_numeric()) - .ok_or(NonNumericInput)?; + let len = s.chars().position(|c| !c.is_numeric()).ok_or(Incomplete)?; Ok((s[..len].parse()?, after_newline_at(s, len)?)) } diff --git a/tc --explain E0106 b/tc --explain E0106 deleted file mode 100644 index b9b1d97..0000000 --- a/tc --explain E0106 +++ /dev/null @@ -1,53 +0,0 @@ -This error indicates that a lifetime is missing from a type. If it is an error -inside a function signature, the problem may be with failing to adhere to the -lifetime elision rules (see below). - -Erroneous code examples: - -``` -struct Foo1 { x: &bool } - // ^ expected lifetime parameter -struct Foo2<'a> { x: &'a bool } // correct - -struct Bar1 { x: Foo2 } - // ^^^^ expected lifetime parameter -struct Bar2<'a> { x: Foo2<'a> } // correct - -enum Baz1 { A(u8), B(&bool), } - // ^ expected lifetime parameter -enum Baz2<'a> { A(u8), B(&'a bool), } // correct - -type MyStr1 = &str; - // ^ expected lifetime parameter -type MyStr2<'a> = &'a str; // correct -``` - -Lifetime elision is a special, limited kind of inference for lifetimes in -function signatures which allows you to leave out lifetimes in certain cases. -For more background on lifetime elision see [the book][book-le]. - -The lifetime elision rules require that any function signature with an elided -output lifetime must either have: - - - exactly one input lifetime - - or, multiple input lifetimes, but the function must also be a method with a - `&self` or `&mut self` receiver - -In the first case, the output lifetime is inferred to be the same as the unique -input lifetime. In the second case, the lifetime is instead inferred to be the -same as the lifetime on `&self` or `&mut self`. - -Here are some examples of elision errors: - -``` -// error, no input lifetimes -fn foo() -> &str { } - -// error, `x` and `y` have distinct lifetimes inferred -fn bar(x: &str, y: &str) -> &str { } - -// error, `y`'s lifetime is inferred to be distinct from `x`'s -fn baz<'a>(x: &'a str, y: &str) -> &str { } -``` - -[book-le]: https://doc.rust-lang.org/book/ch10-03-lifetime-syntax.html#lifetime-elision