diff --git a/Cargo.toml b/Cargo.toml index 627174d..aaee961 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.7.0" +version = "0.7.1" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/config/environmental_variables.rs b/src/config/environmental_variables.rs index a65025f..6588765 100644 --- a/src/config/environmental_variables.rs +++ b/src/config/environmental_variables.rs @@ -60,6 +60,7 @@ impl fmt::Display for EnvVar { "REDIS_PASSWORD", "REDIS_USER", "REDIS_DB", + "REDIS_FREQ", ] .iter() { diff --git a/src/config/redis_cfg.rs b/src/config/redis_cfg.rs index f78a0fb..e12766d 100644 --- a/src/config/redis_cfg.rs +++ b/src/config/redis_cfg.rs @@ -59,7 +59,7 @@ For similar functionality, you may wish to set a REDIS_NAMESPACE"; host: RedisHost::default().maybe_update(env.get("REDIS_HOST")), db: RedisDb::default().maybe_update(env.get("REDIS_DB")), namespace: RedisNamespace::default().maybe_update(env.get("REDIS_NAMESPACE")), - polling_interval: RedisInterval::default().maybe_update(env.get("REDIS_POLL_INTERVAL")), + polling_interval: RedisInterval::default().maybe_update(env.get("REDIS_FREQ")), }; if cfg.db.is_some() { diff --git a/src/config/redis_cfg_types.rs b/src/config/redis_cfg_types.rs index 8d43c76..94e7d1e 100644 --- a/src/config/redis_cfg_types.rs +++ b/src/config/redis_cfg_types.rs @@ -21,7 +21,7 @@ from_env_var!( /// How frequently to poll Redis let name = RedisInterval; let default: Duration = Duration::from_millis(100); - let (env_var, allowed_values) = ("REDIS_POLL_INTERVAL", "a number of milliseconds"); + let (env_var, allowed_values) = ("REDIS_FREQ", "a number of milliseconds"); let from_str = |s| s.parse().map(Duration::from_millis).ok(); ); from_env_var!( diff --git a/src/redis_to_client_stream/client_agent.rs b/src/redis_to_client_stream/client_agent.rs index 187cd1d..1a729ed 100644 --- a/src/redis_to_client_stream/client_agent.rs +++ b/src/redis_to_client_stream/client_agent.rs @@ -56,6 +56,14 @@ impl ClientAgent { .unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e)) } + pub fn disconnect(&self) -> futures::future::FutureResult { + let mut receiver = self.lock_receiver(); + receiver + .remove_subscription(&self.subscription) + .unwrap_or_else(|e| log::error!("Could not unsubscribe from: {}", e)); + futures::future::ok(false) + } + fn lock_receiver(&self) -> MutexGuard { match self.receiver.lock() { Ok(inner) => inner, diff --git a/src/redis_to_client_stream/event_stream.rs b/src/redis_to_client_stream/event_stream.rs index 5eb994b..9f56c84 100644 --- a/src/redis_to_client_stream/event_stream.rs +++ b/src/redis_to_client_stream/event_stream.rs @@ -17,17 +17,17 @@ impl EventStream { mut client_agent: ClientAgent, interval: Duration, ) -> impl Future { - let (ws_tx, mut ws_rx) = ws.split(); + let (transmit_to_ws, _receive_from_ws) = ws.split(); let timeline = client_agent.subscription.timeline; // Create a pipe let (tx, rx) = futures::sync::mpsc::unbounded(); // Send one end of it to a different thread and tell that end to forward whatever it gets - // on to the websocket client + // on to the WebSocket client warp::spawn( rx.map_err(|()| -> warp::Error { unreachable!() }) - .forward(ws_tx) + .forward(transmit_to_ws) .map(|_r| ()) .map_err(|e| match e.to_string().as_ref() { "IO error: Broken pipe (os error 32)" => (), // just closed unix socket @@ -35,50 +35,42 @@ impl EventStream { }), ); - // Yield new events for as long as the client is still connected - let event_stream = - tokio::timer::Interval::new(Instant::now(), interval).take_while(move |_| { - match ws_rx.poll() { - Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true), - Ok(Async::Ready(None)) => { - log::info!("Client closed WebSocket connection for {:?}", timeline); - futures::future::ok(false) - } - Err(e) if e.to_string() == "IO error: Broken pipe (os error 32)" => { - // no err, just closed Unix socket - log::info!("Client closed WebSocket connection for {:?}", timeline); - futures::future::ok(false) - } - Err(e) => { - log::warn!("Error in {:?}: {}", timeline, e); - futures::future::ok(false) - } - } - }); - let mut last_ping_time = Instant::now(); - // Every time you get an event from that stream, send it through the pipe - event_stream - .for_each(move |_instant| { + tokio::timer::Interval::new(Instant::now(), interval) + .take_while(move |_| { + // Right now, we do not need to see if we have any messages _from_ the + // WebSocket connection because the API doesn't support clients sending + // commands via the WebSocket. However, if the [stream multiplexing API + // change](github.com/tootsuite/flodgatt/issues/121) is implemented, we'll + // need to receive messages from the client. If so, we'll need a + // `receive_from_ws.poll() call here (or later)` + match client_agent.poll() { Ok(Async::Ready(Some(msg))) => { - tx.unbounded_send(Message::text(msg.to_json_string())) - .unwrap_or_else(|e| { - log::error!("Could not send message to WebSocket: {}", e) - }); + match tx.unbounded_send(Message::text(msg.to_json_string())) { + Ok(_) => futures::future::ok(true), + Err(_) => client_agent.disconnect(), + } + } + Ok(Async::Ready(None)) => { + log::info!("WebSocket ClientAgent got Ready(None)"); + futures::future::ok(true) } - Ok(Async::Ready(None)) => log::info!("WebSocket ClientAgent got Ready(None)"), Ok(Async::NotReady) if last_ping_time.elapsed() > Duration::from_secs(30) => { - tx.unbounded_send(Message::text("{}")).unwrap_or_else(|e| { - log::error!("Could not send ping to WebSocket: {}", e) - }); last_ping_time = Instant::now(); + match tx.unbounded_send(Message::text("{}")) { + Ok(_) => futures::future::ok(true), + Err(_) => client_agent.disconnect(), + } + } + Ok(Async::NotReady) => futures::future::ok(true), // no new messages; nothing to do + Err(e) => { + log::error!("{}\n Dropping WebSocket message and continuing.", e); + futures::future::ok(true) } - Ok(Async::NotReady) => (), // no new messages; nothing to do - Err(e) => log::error!("{}\n Dropping WebSocket message and continuing.", e), } - Ok(()) }) + .for_each(move |_instant| Ok(())) .then(move |result| { log::info!("WebSocket connection for {:?} closed.", timeline); result diff --git a/src/redis_to_client_stream/receiver/message_queues.rs b/src/redis_to_client_stream/receiver/message_queues.rs index 593f660..9044460 100644 --- a/src/redis_to_client_stream/receiver/message_queues.rs +++ b/src/redis_to_client_stream/receiver/message_queues.rs @@ -4,7 +4,6 @@ use crate::parse_client_request::Timeline; use std::{ collections::{HashMap, VecDeque}, fmt, - time::{Duration, Instant}, }; use uuid::Uuid; @@ -12,52 +11,22 @@ use uuid::Uuid; pub struct MsgQueue { pub timeline: Timeline, pub messages: VecDeque, - last_polled_at: Instant, } impl MsgQueue { pub fn new(timeline: Timeline) -> Self { MsgQueue { messages: VecDeque::new(), - last_polled_at: Instant::now(), + timeline, } } - pub fn update_polled_at_time(&mut self) { - self.last_polled_at = Instant::now(); - } } #[derive(Debug)] pub struct MessageQueues(pub HashMap); -impl MessageQueues { - pub fn calculate_timelines_to_add_or_drop(&mut self, timeline: Timeline) -> Vec { - let mut timelines_to_modify = Vec::new(); - - timelines_to_modify.push(Change { - timeline, - in_subscriber_number: 1, - }); - self.retain(|_id, msg_queue| { - if msg_queue.last_polled_at.elapsed() < Duration::from_secs(30) { - true - } else { - let timeline = &msg_queue.timeline; - timelines_to_modify.push(Change { - timeline: *timeline, - in_subscriber_number: -1, - }); - false - } - }); - timelines_to_modify - } -} -pub struct Change { - pub timeline: Timeline, - pub in_subscriber_number: i32, -} +impl MessageQueues {} impl fmt::Debug for MsgQueue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -66,12 +35,9 @@ impl fmt::Debug for MsgQueue { "\ MsgQueue {{ timeline: {:?}, - messages: {:?}, - last_polled_at: {:?} ago, + messages: {:?}, }}", - self.timeline, - self.messages, - self.last_polled_at.elapsed(), + self.timeline, self.messages, ) } } diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index 8abdaea..2229f2f 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -52,10 +52,6 @@ impl Receiver { /// Assigns the `Receiver` a new timeline to monitor and runs other /// first-time setup. - /// - /// Note: this method calls `subscribe_or_unsubscribe_as_needed`, - /// so Redis PubSub subscriptions are only updated when a new timeline - /// comes under management for the first time. pub fn add_subscription(&mut self, subscription: &Subscription) -> Result<()> { let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); @@ -63,7 +59,41 @@ impl Receiver { self.redis_connection.update_cache(hashtag, id); }; self.msg_queues.insert(subscription.id, MsgQueue::new(tl)); - self.subscribe_or_unsubscribe_as_needed(tl)?; + + let number_of_subscriptions = self + .clients_per_timeline + .entry(tl) + .and_modify(|n| *n += 1) + .or_insert(1); + + use RedisCmd::*; + if *number_of_subscriptions == 1 { + self.redis_connection.send_cmd(Subscribe, &tl)? + }; + + Ok(()) + } + + pub fn remove_subscription(&mut self, subscription: &Subscription) -> Result<()> { + let tl = subscription.timeline; + self.msg_queues.remove(&subscription.id); + let number_of_subscriptions = self + .clients_per_timeline + .entry(tl) + .and_modify(|n| *n -= 1) + .or_insert_with(|| { + log::error!( + "Attempted to unsubscribe from a timeline to which you were not subscribed: {:?}", + tl + ); + 0 + }); + use RedisCmd::*; + if *number_of_subscriptions == 0 { + self.redis_connection.send_cmd(Unsubscribe, &tl)?; + self.clients_per_timeline.remove_entry(&tl); + }; + Ok(()) } @@ -92,13 +122,10 @@ impl Receiver { // If the `msg_queue` being polled has any new messages, return the first (oldest) one match self.msg_queues.get_mut(&id) { - Some(msg_q) => { - msg_q.update_polled_at_time(); - match msg_q.messages.pop_front() { - Some(event) => Ok(Async::Ready(Some(event))), - None => Ok(Async::NotReady), - } - } + Some(msg_q) => match msg_q.messages.pop_front() { + Some(event) => Ok(Async::Ready(Some(event))), + None => Ok(Async::NotReady), + }, None => { log::error!("Polled a MsgQueue that had not been set up. Setting it up now."); self.msg_queues.insert(id, MsgQueue::new(timeline)); @@ -137,31 +164,4 @@ impl Receiver { .fold(0, |acc, el| acc.max(el.messages.len())) ) } - - /// Drop any PubSub subscriptions that don't have active clients and check - /// that there's a subscription to the current one. If there isn't, then - /// subscribe to it. - fn subscribe_or_unsubscribe_as_needed(&mut self, tl: Timeline) -> Result<()> { - let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(tl); - - // Record the lower number of clients subscribed to that channel - for change in timelines_to_modify { - let timeline = change.timeline; - - let count_of_subscribed_clients = self - .clients_per_timeline - .entry(timeline) - .and_modify(|n| *n += change.in_subscriber_number) - .or_insert_with(|| 1); - - // If no clients, unsubscribe from the channel - use RedisCmd::*; - if *count_of_subscribed_clients <= 0 { - self.redis_connection.send_cmd(Unsubscribe, &timeline)?; - } else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 { - self.redis_connection.send_cmd(Subscribe, &timeline)? - } - } - Ok(()) - } } diff --git a/src/redis_to_client_stream/redis/redis_connection/mod.rs b/src/redis_to_client_stream/redis/redis_connection/mod.rs index 3dadf01..bab23ca 100644 --- a/src/redis_to_client_stream/redis/redis_connection/mod.rs +++ b/src/redis_to_client_stream/redis/redis_connection/mod.rs @@ -61,7 +61,8 @@ impl RedisConn { if self.redis_polled_at.elapsed() > self.redis_poll_interval { if let Ok(bytes_read) = self.primary.read(&mut buffer) { self.redis_input.extend_from_slice(&buffer[..bytes_read]); - } + }; + self.redis_polled_at = Instant::now(); } if self.redis_input.is_empty() { return Ok(Async::NotReady);