From d5528aaf0c917e3b904428a84e81abfb3422ba06 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Wed, 1 Apr 2020 14:18:44 -0400 Subject: [PATCH] Finish substantive work for Redis error handling --- src/err/timeline.rs | 13 ++- src/main.rs | 89 ++++++++----------- src/parse_client_request/subscription.rs | 4 + src/redis_to_client_stream/client_agent.rs | 66 ++++++-------- src/redis_to_client_stream/event_stream.rs | 40 ++++++--- src/redis_to_client_stream/mod.rs | 2 +- src/redis_to_client_stream/receiver/err.rs | 14 +++ src/redis_to_client_stream/receiver/mod.rs | 52 ++++++----- src/redis_to_client_stream/redis/mod.rs | 1 - .../redis/redis_connection/err.rs | 61 +++---------- .../redis/redis_connection/mod.rs | 40 +++++---- 11 files changed, 185 insertions(+), 197 deletions(-) diff --git a/src/err/timeline.rs b/src/err/timeline.rs index 652e9de..4ba9f34 100644 --- a/src/err/timeline.rs +++ b/src/err/timeline.rs @@ -1,4 +1,4 @@ -//use std::{error::Error, fmt}; +use std::fmt; #[derive(Debug)] pub enum TimelineErr { @@ -11,3 +11,14 @@ impl From for TimelineErr { Self::InvalidInput } } + +impl fmt::Display for TimelineErr { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + use TimelineErr::*; + let msg = match self { + RedisNamespaceMismatch => "TODO: Cut this error", + InvalidInput => "The timeline text from Redis could not be parsed into a supported timeline. TODO: add incoming timeline text" + }; + write!(f, "{}", msg) + } +} diff --git a/src/main.rs b/src/main.rs index 2d2aaa4..fa452fd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,11 @@ use flodgatt::{ config::{DeploymentConfig, EnvVar, PostgresConfig, RedisConfig}, parse_client_request::{PgPool, Subscription}, - redis_to_client_stream::{ClientAgent, EventStream}, + redis_to_client_stream::{ClientAgent, EventStream, Receiver}, }; -use std::{collections::HashMap, env, fs, net, os::unix::fs::PermissionsExt}; +use std::{env, fs, net::SocketAddr, os::unix::fs::PermissionsExt}; use tokio::net::UnixListener; -use warp::{path, ws::Ws2, Filter}; +use warp::{http::StatusCode, path, ws::Ws2, Filter, Rejection}; fn main() { dotenv::from_filename(match env::var("ENV").ok().as_ref().map(String::as_str) { @@ -14,36 +14,35 @@ fn main() { Some(unsupported) => EnvVar::err("ENV", unsupported, "`production` or `development`"), }) .ok(); - let env_vars_map: HashMap<_, _> = dotenv::vars().collect(); - let env_vars = EnvVar::new(env_vars_map); + let env_vars = EnvVar::new(dotenv::vars().collect()); pretty_env_logger::init(); + log::info!("Environmental variables Flodgatt received: {}", &env_vars); - log::info!( - "Flodgatt recognized the following environmental variables:{}", - env_vars.clone() - ); + let postgres_cfg = PostgresConfig::from_env(env_vars.clone()); let redis_cfg = RedisConfig::from_env(env_vars.clone()); let cfg = DeploymentConfig::from_env(env_vars.clone()); - let postgres_cfg = PostgresConfig::from_env(env_vars.clone()); let pg_pool = PgPool::new(postgres_cfg); - let client_agent_sse = ClientAgent::blank(redis_cfg); - let client_agent_ws = client_agent_sse.clone_with_shared_receiver(); - + let sharable_receiver = Receiver::try_from(redis_cfg) + .unwrap_or_else(|e| { + log::error!("{}\nFlodgatt shutting down...", e); + std::process::exit(1); + }) + .into_arc(); log::info!("Streaming server initialized and ready to accept connections"); // Server Sent Events + let sse_receiver = sharable_receiver.clone(); let (sse_interval, whitelist_mode) = (*cfg.sse_interval, *cfg.whitelist_mode); let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode) .and(warp::sse()) .map( move |subscription: Subscription, sse_connection_to_client: warp::sse::Sse| { log::info!("Incoming SSE request for {:?}", subscription.timeline); - // Create a new ClientAgent - let mut client_agent = client_agent_sse.clone_with_shared_receiver(); - // Assign ClientAgent to generate stream of updates for the user/timeline pair - client_agent.init_for_user(subscription); + let mut client_agent = ClientAgent::new(sse_receiver.clone(), &subscription); + client_agent.subscribe(); + // send the updates through the SSE connection EventStream::to_sse(client_agent, sse_connection_to_client, sse_interval) }, @@ -51,24 +50,20 @@ fn main() { .with(warp::reply::with::header("Connection", "keep-alive")); // WebSocket + let ws_receiver = sharable_receiver.clone(); let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode); - let websocket_routes = Subscription::from_ws_request(pg_pool.clone(), whitelist_mode) + let ws_routes = Subscription::from_ws_request(pg_pool.clone(), whitelist_mode) .and(warp::ws::ws2()) .map(move |subscription: Subscription, ws: Ws2| { log::info!("Incoming websocket request for {:?}", subscription.timeline); + let mut client_agent = ClientAgent::new(ws_receiver.clone(), &subscription); + client_agent.subscribe(); - let token = subscription.access_token.clone(); - // Create a new ClientAgent - let mut client_agent = client_agent_ws.clone_with_shared_receiver(); - // Assign that agent to generate a stream of updates for the user/timeline pair - client_agent.init_for_user(subscription); - // send the updates through the WS connection (along with the User's access_token - // which is sent for security) + // send the updates through the WS connection + // (along with the User's access_token which is sent for security) ( - ws.on_upgrade(move |socket| { - EventStream::to_ws(socket, client_agent, ws_update_interval) - }), - token.unwrap_or_else(String::new), + ws.on_upgrade(move |s| EventStream::to_ws(s, client_agent, ws_update_interval)), + subscription.access_token.unwrap_or_else(String::new), ) }) .map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token)); @@ -84,33 +79,23 @@ fn main() { log::info!("Using Unix socket {}", socket); fs::remove_file(socket).unwrap_or_default(); let incoming = UnixListener::bind(socket).unwrap().incoming(); - fs::set_permissions(socket, PermissionsExt::from_mode(0o666)).unwrap(); warp::serve( - health.or(websocket_routes.or(sse_routes).with(cors).recover( - |rejection: warp::reject::Rejection| { - let err_txt = match rejection.cause() { - Some(text) - if text.to_string() == "Missing request header 'authorization'" => - { - "Error: Missing access token".to_string() - } - Some(text) => text.to_string(), - None => "Error: Nonexistant endpoint".to_string(), - }; - let json = warp::reply::json(&err_txt); - - Ok(warp::reply::with_status( - json, - warp::http::StatusCode::UNAUTHORIZED, - )) - }, - )), + health.or(ws_routes.or(sse_routes).with(cors).recover(|r: Rejection| { + let json_err = match r.cause() { + Some(text) if text.to_string() == "Missing request header 'authorization'" => { + warp::reply::json(&"Error: Missing access token".to_string()) + } + Some(text) => warp::reply::json(&text.to_string()), + None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()), + }; + Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED)) + })), ) .run_incoming(incoming); } else { - let server_addr = net::SocketAddr::new(*cfg.address, cfg.port.0); - warp::serve(health.or(websocket_routes.or(sse_routes).with(cors))).run(server_addr); - } + let server_addr = SocketAddr::new(*cfg.address, *cfg.port); + warp::serve(health.or(ws_routes.or(sse_routes).with(cors))).run(server_addr); + }; } diff --git a/src/parse_client_request/subscription.rs b/src/parse_client_request/subscription.rs index db6a94a..004905e 100644 --- a/src/parse_client_request/subscription.rs +++ b/src/parse_client_request/subscription.rs @@ -11,6 +11,7 @@ use crate::err::TimelineErr; use crate::log_fatal; use lru::LruCache; use std::collections::HashSet; +use uuid::Uuid; use warp::reject::Rejection; use super::query; @@ -50,6 +51,7 @@ macro_rules! parse_sse_query { #[derive(Clone, Debug, PartialEq)] pub struct Subscription { + pub id: Uuid, pub timeline: Timeline, pub allowed_langs: HashSet, pub blocks: Blocks, @@ -60,6 +62,7 @@ pub struct Subscription { impl Default for Subscription { fn default() -> Self { Self { + id: Uuid::new_v4(), timeline: Timeline(Stream::Unset, Reach::Local, Content::Notification), allowed_langs: HashSet::new(), blocks: Blocks::default(), @@ -123,6 +126,7 @@ impl Subscription { }; Ok(Subscription { + id: Uuid::new_v4(), timeline, allowed_langs: user.allowed_langs, blocks: Blocks { diff --git a/src/redis_to_client_stream/client_agent.rs b/src/redis_to_client_stream/client_agent.rs index 7b97c54..1948c63 100644 --- a/src/redis_to_client_stream/client_agent.rs +++ b/src/redis_to_client_stream/client_agent.rs @@ -15,9 +15,8 @@ //! Because `StreamManagers` are lightweight data structures that do not directly //! communicate with Redis, it we create a new `ClientAgent` for //! each new client connection (each in its own thread).use super::{message::Message, receiver::Receiver} -use super::{receiver::Receiver, redis::RedisConnErr}; +use super::receiver::{Receiver, ReceiverErr}; use crate::{ - config, messages::Event, parse_client_request::{Stream::Public, Subscription, Timeline}, }; @@ -25,33 +24,20 @@ use futures::{ Async::{self, NotReady, Ready}, Poll, }; -use std::sync::{Arc, Mutex}; -use uuid::Uuid; +use std::sync::{Arc, Mutex, MutexGuard}; /// Struct for managing all Redis streams. #[derive(Clone, Debug)] pub struct ClientAgent { receiver: Arc>, - id: Uuid, pub subscription: Subscription, } impl ClientAgent { - /// Create a new `ClientAgent` with no shared data. - pub fn blank(redis_cfg: config::RedisConfig) -> Self { + pub fn new(receiver: Arc>, subscription: &Subscription) -> Self { ClientAgent { - receiver: Arc::new(Mutex::new(Receiver::new(redis_cfg))), - id: Uuid::default(), - subscription: Subscription::default(), - } - } - - /// Clones the `ClientAgent`, sharing the `Receiver`. - pub fn clone_with_shared_receiver(&self) -> Self { - Self { - receiver: self.receiver.clone(), - id: self.id, - subscription: self.subscription.clone(), + receiver, + subscription: subscription.clone(), } } @@ -63,25 +49,32 @@ impl ClientAgent { /// a different user, the `Receiver` is responsible for figuring /// that out and avoiding duplicated connections. Thus, it is safe to /// use this method for each new client connection. - pub fn init_for_user(&mut self, subscription: Subscription) { - use std::time::Instant; - self.id = Uuid::new_v4(); - self.subscription = subscription; - let start_time = Instant::now(); - let mut receiver = self.receiver.lock().expect("No thread panic (stream.rs)"); - receiver.manage_new_timeline( - self.id, - self.subscription.timeline, - self.subscription.hashtag_name.clone(), - ); - log::info!("init_for_user had lock for: {:?}", start_time.elapsed()); + pub fn subscribe(&mut self) { + let mut receiver = self.lock_receiver(); + receiver + .add_subscription(&self.subscription) + .unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e)) + } + + fn lock_receiver(&self) -> MutexGuard { + match self.receiver.lock() { + Ok(inner) => inner, + Err(e) => { + log::error!( + "Another thread crashed: {}\n + Attempting to continue, possibly with invalid data", + e + ); + e.into_inner() + } + } } } /// The stream that the `ClientAgent` manages. `Poll` is the only method implemented. impl futures::stream::Stream for ClientAgent { type Item = Event; - type Error = RedisConnErr; + type Error = ReceiverErr; /// Checks for any new messages that should be sent to the client. /// @@ -93,11 +86,8 @@ impl futures::stream::Stream for ClientAgent { /// errors from the underlying data structures. fn poll(&mut self) -> Poll, Self::Error> { let result = { - let mut receiver = self - .receiver - .lock() - .expect("ClientAgent: No other thread panic"); - receiver.poll_for(self.id, self.subscription.timeline) + let mut receiver = self.lock_receiver(); + receiver.poll_for(self.subscription.id, self.subscription.timeline) }; let allowed_langs = &self.subscription.allowed_langs; @@ -131,7 +121,7 @@ impl futures::stream::Stream for ClientAgent { }, Ok(Ready(None)) => Ok(Ready(None)), Ok(NotReady) => Ok(NotReady), - Err(_e) => todo!("Handle err gracefully"), + Err(e) => Err(e), } } } diff --git a/src/redis_to_client_stream/event_stream.rs b/src/redis_to_client_stream/event_stream.rs index ddcf1d3..9c444e5 100644 --- a/src/redis_to_client_stream/event_stream.rs +++ b/src/redis_to_client_stream/event_stream.rs @@ -8,7 +8,6 @@ use warp::{ sse::Sse, ws::{Message, WebSocket}, }; - pub struct EventStream; impl EventStream { @@ -32,7 +31,7 @@ impl EventStream { .map(|_r| ()) .map_err(|e| match e.to_string().as_ref() { "IO error: Broken pipe (os error 32)" => (), // just closed unix socket - _ => log::warn!("websocket send error: {}", e), + _ => log::warn!("WebSocket send error: {}", e), }), ); @@ -42,7 +41,6 @@ impl EventStream { match ws_rx.poll() { Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true), Ok(Async::Ready(None)) => { - // TODO: consider whether we should manually drop closed connections here log::info!("Client closed WebSocket connection for {:?}", timeline); futures::future::ok(false) } @@ -58,27 +56,35 @@ impl EventStream { } }); - let mut time = Instant::now(); + let mut last_ping_time = Instant::now(); // Every time you get an event from that stream, send it through the pipe event_stream .for_each(move |_instant| { - if let Ok(Async::Ready(Some(msg))) = client_agent.poll() { - tx.unbounded_send(Message::text(msg.to_json_string())) - .expect("No send error"); - }; - if time.elapsed() > Duration::from_secs(30) { - tx.unbounded_send(Message::text("{}")).expect("Can ping"); - time = Instant::now(); + match client_agent.poll() { + Ok(Async::Ready(Some(msg))) => tx + .unbounded_send(Message::text(msg.to_json_string())) + .unwrap_or_else(|e| { + log::error!("Could not send message to WebSocket: {}", e) + }), + Ok(Async::Ready(None)) => log::info!("WebSocket ClientAgent got Ready(None)"), + Ok(Async::NotReady) if last_ping_time.elapsed() > Duration::from_secs(30) => { + tx.unbounded_send(Message::text("{}")).unwrap_or_else(|e| { + log::error!("Could not send ping to WebSocket: {}", e) + }); + last_ping_time = Instant::now(); + } + Ok(Async::NotReady) => (), // no new messages; nothing to do + Err(e) => log::error!("{}\n Dropping WebSocket message and continuing.", e), } Ok(()) }) .then(move |result| { - // TODO: consider whether we should manually drop closed connections here log::info!("WebSocket connection for {:?} closed.", timeline); result }) .map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e)) } + pub fn to_sse(mut client_agent: ClientAgent, sse: Sse, interval: Duration) -> impl Reply { let event_stream = tokio::timer::Interval::new(Instant::now(), interval).filter_map(move |_| { @@ -87,7 +93,15 @@ impl EventStream { warp::sse::event(event.event_name()), warp::sse::data(event.payload().unwrap_or_else(String::new)), )), - _ => None, + Ok(Async::Ready(None)) => { + log::info!("SSE ClientAgent got Ready(None)"); + None + } + Ok(Async::NotReady) => None, + Err(e) => { + log::error!("{}\n Dropping SSE message and continuing.", e); + None + } } }); diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index ce7e1c5..51df137 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -12,6 +12,6 @@ pub use redis::redis_msg; //#[cfg(test)] //pub use receiver::process_messages; //#[cfg(test)] -pub use receiver::{MessageQueues, MsgQueue}; +pub use receiver::{MessageQueues, MsgQueue, Receiver, ReceiverErr}; //#[cfg(test)] //pub use redis::redis_msg::{RedisMsg, RedisUtf8}; diff --git a/src/redis_to_client_stream/receiver/err.rs b/src/redis_to_client_stream/receiver/err.rs index 8252bec..fb07702 100644 --- a/src/redis_to_client_stream/receiver/err.rs +++ b/src/redis_to_client_stream/receiver/err.rs @@ -2,6 +2,7 @@ use super::super::{redis::RedisConnErr, redis_msg::RedisParseErr}; use crate::err::TimelineErr; use serde_json; +use std::fmt; #[derive(Debug)] pub enum ReceiverErr { @@ -11,6 +12,19 @@ pub enum ReceiverErr { RedisConnErr(RedisConnErr), } +impl fmt::Display for ReceiverErr { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + use ReceiverErr::*; + match self { + EventErr(inner) => write!(f, "{}", inner), + RedisParseErr(inner) => write!(f, "{}", inner), + RedisConnErr(inner) => write!(f, "{}", inner), + TimelineErr(inner) => write!(f, "{}", inner), + }?; + Ok(()) + } +} + impl From for ReceiverErr { fn from(error: serde_json::Error) -> Self { Self::EventErr(error) diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index 3a27da8..63fa863 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -12,37 +12,42 @@ use super::redis::{redis_connection::RedisCmd, RedisConn}; use crate::{ config, messages::Event, - parse_client_request::{Stream, Timeline}, + parse_client_request::{Stream, Subscription, Timeline}, }; use futures::{Async, Poll}; -use std::collections::HashMap; +use std::{ + collections::HashMap, + result, + sync::{Arc, Mutex}, +}; use uuid::Uuid; +type Result = result::Result; + /// The item that streams from Redis and is polled by the `ClientAgent` #[derive(Debug)] pub struct Receiver { redis_connection: RedisConn, pub msg_queues: MessageQueues, clients_per_timeline: HashMap, - // hashtag_cache: LruCache, - // TODO: eventually, it might make sense to have Mastodon publish to timelines with - // the tag number instead of the tag name. This would save us from dealing - // with a cache here and would be consistent with how lists/users are handled. } impl Receiver { /// Create a new `Receiver`, with its own Redis connections (but, as yet, no /// active subscriptions). - pub fn new(redis_cfg: config::RedisConfig) -> Self { - let redis_connection = RedisConn::new(redis_cfg).expect("TODO"); + pub fn try_from(redis_cfg: config::RedisConfig) -> Result { + let redis_connection = RedisConn::new(redis_cfg)?; - Self { + Ok(Self { redis_connection, msg_queues: MessageQueues(HashMap::new()), clients_per_timeline: HashMap::new(), - // should this be a run-time option? - } + }) + } + + pub fn into_arc(self) -> Arc> { + Arc::new(Mutex::new(self)) } /// Assigns the `Receiver` a new timeline to monitor and runs other @@ -51,13 +56,15 @@ impl Receiver { /// Note: this method calls `subscribe_or_unsubscribe_as_needed`, /// so Redis PubSub subscriptions are only updated when a new timeline /// comes under management for the first time. - pub fn manage_new_timeline(&mut self, id: Uuid, tl: Timeline, hashtag: Option) { - if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (hashtag, tl) { + pub fn add_subscription(&mut self, subscription: &Subscription) -> Result<()> { + let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); + + if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (tag, tl) { self.redis_connection.update_cache(hashtag, id); }; - - self.msg_queues.insert(id, MsgQueue::new(tl)); - self.subscribe_or_unsubscribe_as_needed(tl); + self.msg_queues.insert(subscription.id, MsgQueue::new(tl)); + self.subscribe_or_unsubscribe_as_needed(tl)?; + Ok(()) } /// Returns the oldest message in the `ClientAgent`'s queue (if any). @@ -102,8 +109,8 @@ impl Receiver { /// Drop any PubSub subscriptions that don't have active clients and check /// that there's a subscription to the current one. If there isn't, then /// subscribe to it. - fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: Timeline) { - let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(timeline); + fn subscribe_or_unsubscribe_as_needed(&mut self, tl: Timeline) -> Result<()> { + let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(tl); // Record the lower number of clients subscribed to that channel for change in timelines_to_modify { @@ -118,14 +125,11 @@ impl Receiver { // If no clients, unsubscribe from the channel use RedisCmd::*; if *count_of_subscribed_clients <= 0 { - self.redis_connection - .send_cmd(Unsubscribe, &timeline) - .expect("TODO"); + self.redis_connection.send_cmd(Unsubscribe, &timeline)?; } else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 { - self.redis_connection - .send_cmd(Subscribe, &timeline) - .expect("TODO"); + self.redis_connection.send_cmd(Subscribe, &timeline)? } } + Ok(()) } } diff --git a/src/redis_to_client_stream/redis/mod.rs b/src/redis_to_client_stream/redis/mod.rs index a075463..92e672b 100644 --- a/src/redis_to_client_stream/redis/mod.rs +++ b/src/redis_to_client_stream/redis/mod.rs @@ -1,4 +1,3 @@ -pub mod redis_cmd; pub mod redis_connection; pub mod redis_msg; diff --git a/src/redis_to_client_stream/redis/redis_connection/err.rs b/src/redis_to_client_stream/redis/redis_connection/err.rs index c5afaf9..ddaf3b4 100644 --- a/src/redis_to_client_stream/redis/redis_connection/err.rs +++ b/src/redis_to_client_stream/redis/redis_connection/err.rs @@ -3,8 +3,8 @@ use std::fmt; #[derive(Debug)] pub enum RedisConnErr { ConnectionErr { addr: String, inner: std::io::Error }, - // TODO ^^^^ better name? - UnknownRedisErr(String), + InvalidRedisReply(String), + UnknownRedisErr(std::io::Error), IncorrectPassword(String), MissingPassword, NotRedis(String), @@ -28,10 +28,13 @@ impl fmt::Display for RedisConnErr { Connection Error: {}", addr, inner ), - UnknownRedisErr(unexpected_reply) => format!( - "Could not connect to Redis for an unknown reason. Expected `+PONG` reply but got `{}`", + InvalidRedisReply(unexpected_reply) => format!( + "Received and unexpected reply from Redis. Expected `+PONG` reply but got `{}`", unexpected_reply ), + UnknownRedisErr(io_err) => { + format!("Unexpected failure communicating with Redis: {}", io_err) + } IncorrectPassword(attempted_password) => format!( "Incorrect Redis password. You supplied `{}`.\n \ Please supply correct password with REDIS_PASSWORD environmental variable.", @@ -51,48 +54,8 @@ impl fmt::Display for RedisConnErr { } } -// die_with_msg(format!( -// r"Incorrect Redis password. You supplied `{}`. -// Please supply correct password with REDIS_PASSWORD environmental variable.", -// password, -// )) - -// impl fmt::Display for RedisParseErr { -// fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { -// use RedisParseErr::*; -// let msg = match self { -// Incomplete => "The input from Redis does not form a complete message, likely because \ -// the input buffer filled partway through a message. Save this input \ -// and try again with additional input from Redis." -// .to_string(), -// InvalidNumber(parse_int_err) => format!( -// "Redis indicated that an item would be a number, but it could not be parsed: {}", -// parse_int_err -// ), - -// InvalidLineStart(line_start_char) => format!( -// "A line from Redis started with `{}`, which is not a valid character to indicate \ -// the type of the Redis line.", -// line_start_char -// ), -// InvalidLineEnd => "A Redis line ended before expected line length".to_string(), -// IncorrectRedisType => "Received a Redis type that is not supported in this context. \ -// Flodgatt expects each message from Redis to be a Redis array \ -// consisting of bulk strings or integers." -// .to_string(), -// MissingField => "Redis input was missing a field Flodgatt expected (e.g., a `message` \ -// without a payload line)" -// .to_string(), -// UnsupportedTimeline => { -// "The raw timeline received from Redis could not be parsed into a \ -// supported timeline" -// .to_string() -// } -// UnsupportedEvent(e) => format!( -// "The event text from Redis could not be parsed into a valid event: {}", -// e -// ), -// }; -// write!(f, "{}", msg) -// } -// } +impl From for RedisConnErr { + fn from(e: std::io::Error) -> RedisConnErr { + RedisConnErr::UnknownRedisErr(e) + } +} diff --git a/src/redis_to_client_stream/redis/redis_connection/mod.rs b/src/redis_to_client_stream/redis/redis_connection/mod.rs index b99ddb7..e2745ef 100644 --- a/src/redis_to_client_stream/redis/redis_connection/mod.rs +++ b/src/redis_to_client_stream/redis/redis_connection/mod.rs @@ -20,6 +20,8 @@ use std::{ use futures::{Async, Poll}; use lru::LruCache; +type Result = std::result::Result; + #[derive(Debug)] pub struct RedisConn { primary: TcpStream, @@ -33,32 +35,38 @@ pub struct RedisConn { } impl RedisConn { - pub fn new(redis_cfg: RedisConfig) -> Result { + pub fn new(redis_cfg: RedisConfig) -> Result { let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port); let conn = Self::new_connection(&addr, &redis_cfg.password.as_ref())?; conn.set_nonblocking(true) .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - - Ok(Self { + let redis_conn = Self { primary: conn, secondary: Self::new_connection(&addr, &redis_cfg.password.as_ref())?, tag_id_cache: LruCache::new(1000), tag_name_cache: LruCache::new(1000), + // TODO: eventually, it might make sense to have Mastodon publish to timelines with + // the tag number instead of the tag name. This would save us from dealing + // with a cache here and would be consistent with how lists/users are handled. redis_namespace: redis_cfg.namespace.clone(), redis_poll_interval: *redis_cfg.polling_interval, redis_input: Vec::new(), redis_polled_at: Instant::now(), - }) + }; + + Ok(redis_conn) } pub fn poll_redis(&mut self) -> Poll, ReceiverErr> { let mut buffer = vec![0u8; 6000]; if self.redis_polled_at.elapsed() > self.redis_poll_interval { - match self.primary.read(&mut buffer) { - Ok(bytes_read) => self.redis_input.extend_from_slice(&buffer[..bytes_read]), - Err(e) => log::error!("Error polling Redis: {}\nRetrying...", e), + if let Ok(bytes_read) = self.primary.read(&mut buffer) { + self.redis_input.extend_from_slice(&buffer[..bytes_read]); } } + if self.redis_input.is_empty() { + return Ok(Async::NotReady); + } let input = self.redis_input.clone(); self.redis_input.clear(); @@ -100,7 +108,7 @@ impl RedisConn { self.tag_name_cache.put(id, hashtag); } - fn new_connection(addr: &String, pass: &Option<&String>) -> Result { + fn new_connection(addr: &String, pass: &Option<&String>) -> Result { match TcpStream::connect(&addr) { Ok(mut conn) => { if let Some(password) = pass { @@ -115,7 +123,7 @@ impl RedisConn { Err(e) => Err(RedisConnErr::with_addr(&addr, e)), } } - fn auth_connection(conn: &mut TcpStream, addr: &str, pass: &str) -> Result<(), RedisConnErr> { + fn auth_connection(conn: &mut TcpStream, addr: &str, pass: &str) -> Result<()> { conn.write_all(&format!("*2\r\n$4\r\nauth\r\n${}\r\n{}\r\n", pass.len(), pass).as_bytes()) .map_err(|e| RedisConnErr::with_addr(&addr, e))?; let mut buffer = vec![0u8; 5]; @@ -129,7 +137,7 @@ impl RedisConn { Ok(()) } - fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<(), RedisConnErr> { + fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<()> { conn.write_all(b"PING\r\n") .map_err(|e| RedisConnErr::with_addr(&addr, e))?; let mut buffer = vec![0u8; 7]; @@ -140,11 +148,11 @@ impl RedisConn { "+PONG\r\n" => Ok(()), "-NOAUTH" => Err(RedisConnErr::MissingPassword), "HTTP/1." => Err(RedisConnErr::NotRedis(addr.to_string())), - _ => Err(RedisConnErr::UnknownRedisErr(reply.to_string())), + _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), } } - pub fn send_cmd(&mut self, cmd: RedisCmd, timeline: &Timeline) -> Result<(), RedisConnErr> { + pub fn send_cmd(&mut self, cmd: RedisCmd, timeline: &Timeline) -> Result<()> { let hashtag = match timeline { Timeline(Stream::Hashtag(id), _, _) => self.tag_name_cache.get(id), _non_hashtag_timeline => None, @@ -161,12 +169,8 @@ impl RedisConn { format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n0\r\n", tl.len(), tl), ), }; - self.secondary - .write_all(&primary_cmd.as_bytes()) - .expect("TODO"); - self.secondary - .write_all(&secondary_cmd.as_bytes()) - .expect("TODO"); + self.primary.write_all(&primary_cmd.as_bytes())?; + self.secondary.write_all(&secondary_cmd.as_bytes())?; Ok(()) } }