From d23cc40beafe6c109f7905cfad9d50f4d58cae6c Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Sun, 5 Apr 2020 17:54:05 -0400 Subject: [PATCH] Iowait (#125) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Remove use of last_polled_time [WIP] This commit stops removing subscriptions based on their last polled time to test the impact of this change on CPU use. This is a WIP because it does not yet remove subscriptions in any other way, which (if deployed in production) would cause a memory leak – memory use would grow with each new subscription and would never be reduced as clients end their subscriptions. * Fix bug with RedisConnection polling freqeuency * Improve performance of EventStream This commit changes the EventStream so no longer polls client WebSocket connections to see if it should clean up the connection. Instead, it cleans up the connection whenever it attempts to send a ping or a message through the connection and receives an error indicating that the client has disconnected. As a result, client connections aren't cleaned up quite as quickly, but overall sys CPU time should be dramatically improved. * Remove empty entries from MsgQueues hashmap Before this change, entries in the MsgQueue hashmap would remain once added, even if their value fell to 0. This could lead to a very slight memory leak/increase, because the hashmap would grow each time a new user connected and would not decrease again. This is now fixed. * Bump version and remove unused benchmark --- Cargo.toml | 2 +- src/config/environmental_variables.rs | 1 + src/config/redis_cfg.rs | 2 +- src/config/redis_cfg_types.rs | 2 +- src/redis_to_client_stream/client_agent.rs | 8 ++ src/redis_to_client_stream/event_stream.rs | 68 +++++++--------- .../receiver/message_queues.rs | 42 +--------- src/redis_to_client_stream/receiver/mod.rs | 78 +++++++++---------- .../redis/redis_connection/mod.rs | 3 +- 9 files changed, 87 insertions(+), 119 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 627174d..aaee961 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.7.0" +version = "0.7.1" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/config/environmental_variables.rs b/src/config/environmental_variables.rs index a65025f..6588765 100644 --- a/src/config/environmental_variables.rs +++ b/src/config/environmental_variables.rs @@ -60,6 +60,7 @@ impl fmt::Display for EnvVar { "REDIS_PASSWORD", "REDIS_USER", "REDIS_DB", + "REDIS_FREQ", ] .iter() { diff --git a/src/config/redis_cfg.rs b/src/config/redis_cfg.rs index f78a0fb..e12766d 100644 --- a/src/config/redis_cfg.rs +++ b/src/config/redis_cfg.rs @@ -59,7 +59,7 @@ For similar functionality, you may wish to set a REDIS_NAMESPACE"; host: RedisHost::default().maybe_update(env.get("REDIS_HOST")), db: RedisDb::default().maybe_update(env.get("REDIS_DB")), namespace: RedisNamespace::default().maybe_update(env.get("REDIS_NAMESPACE")), - polling_interval: RedisInterval::default().maybe_update(env.get("REDIS_POLL_INTERVAL")), + polling_interval: RedisInterval::default().maybe_update(env.get("REDIS_FREQ")), }; if cfg.db.is_some() { diff --git a/src/config/redis_cfg_types.rs b/src/config/redis_cfg_types.rs index 8d43c76..94e7d1e 100644 --- a/src/config/redis_cfg_types.rs +++ b/src/config/redis_cfg_types.rs @@ -21,7 +21,7 @@ from_env_var!( /// How frequently to poll Redis let name = RedisInterval; let default: Duration = Duration::from_millis(100); - let (env_var, allowed_values) = ("REDIS_POLL_INTERVAL", "a number of milliseconds"); + let (env_var, allowed_values) = ("REDIS_FREQ", "a number of milliseconds"); let from_str = |s| s.parse().map(Duration::from_millis).ok(); ); from_env_var!( diff --git a/src/redis_to_client_stream/client_agent.rs b/src/redis_to_client_stream/client_agent.rs index 187cd1d..1a729ed 100644 --- a/src/redis_to_client_stream/client_agent.rs +++ b/src/redis_to_client_stream/client_agent.rs @@ -56,6 +56,14 @@ impl ClientAgent { .unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e)) } + pub fn disconnect(&self) -> futures::future::FutureResult { + let mut receiver = self.lock_receiver(); + receiver + .remove_subscription(&self.subscription) + .unwrap_or_else(|e| log::error!("Could not unsubscribe from: {}", e)); + futures::future::ok(false) + } + fn lock_receiver(&self) -> MutexGuard { match self.receiver.lock() { Ok(inner) => inner, diff --git a/src/redis_to_client_stream/event_stream.rs b/src/redis_to_client_stream/event_stream.rs index 5eb994b..9f56c84 100644 --- a/src/redis_to_client_stream/event_stream.rs +++ b/src/redis_to_client_stream/event_stream.rs @@ -17,17 +17,17 @@ impl EventStream { mut client_agent: ClientAgent, interval: Duration, ) -> impl Future { - let (ws_tx, mut ws_rx) = ws.split(); + let (transmit_to_ws, _receive_from_ws) = ws.split(); let timeline = client_agent.subscription.timeline; // Create a pipe let (tx, rx) = futures::sync::mpsc::unbounded(); // Send one end of it to a different thread and tell that end to forward whatever it gets - // on to the websocket client + // on to the WebSocket client warp::spawn( rx.map_err(|()| -> warp::Error { unreachable!() }) - .forward(ws_tx) + .forward(transmit_to_ws) .map(|_r| ()) .map_err(|e| match e.to_string().as_ref() { "IO error: Broken pipe (os error 32)" => (), // just closed unix socket @@ -35,50 +35,42 @@ impl EventStream { }), ); - // Yield new events for as long as the client is still connected - let event_stream = - tokio::timer::Interval::new(Instant::now(), interval).take_while(move |_| { - match ws_rx.poll() { - Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true), - Ok(Async::Ready(None)) => { - log::info!("Client closed WebSocket connection for {:?}", timeline); - futures::future::ok(false) - } - Err(e) if e.to_string() == "IO error: Broken pipe (os error 32)" => { - // no err, just closed Unix socket - log::info!("Client closed WebSocket connection for {:?}", timeline); - futures::future::ok(false) - } - Err(e) => { - log::warn!("Error in {:?}: {}", timeline, e); - futures::future::ok(false) - } - } - }); - let mut last_ping_time = Instant::now(); - // Every time you get an event from that stream, send it through the pipe - event_stream - .for_each(move |_instant| { + tokio::timer::Interval::new(Instant::now(), interval) + .take_while(move |_| { + // Right now, we do not need to see if we have any messages _from_ the + // WebSocket connection because the API doesn't support clients sending + // commands via the WebSocket. However, if the [stream multiplexing API + // change](github.com/tootsuite/flodgatt/issues/121) is implemented, we'll + // need to receive messages from the client. If so, we'll need a + // `receive_from_ws.poll() call here (or later)` + match client_agent.poll() { Ok(Async::Ready(Some(msg))) => { - tx.unbounded_send(Message::text(msg.to_json_string())) - .unwrap_or_else(|e| { - log::error!("Could not send message to WebSocket: {}", e) - }); + match tx.unbounded_send(Message::text(msg.to_json_string())) { + Ok(_) => futures::future::ok(true), + Err(_) => client_agent.disconnect(), + } + } + Ok(Async::Ready(None)) => { + log::info!("WebSocket ClientAgent got Ready(None)"); + futures::future::ok(true) } - Ok(Async::Ready(None)) => log::info!("WebSocket ClientAgent got Ready(None)"), Ok(Async::NotReady) if last_ping_time.elapsed() > Duration::from_secs(30) => { - tx.unbounded_send(Message::text("{}")).unwrap_or_else(|e| { - log::error!("Could not send ping to WebSocket: {}", e) - }); last_ping_time = Instant::now(); + match tx.unbounded_send(Message::text("{}")) { + Ok(_) => futures::future::ok(true), + Err(_) => client_agent.disconnect(), + } + } + Ok(Async::NotReady) => futures::future::ok(true), // no new messages; nothing to do + Err(e) => { + log::error!("{}\n Dropping WebSocket message and continuing.", e); + futures::future::ok(true) } - Ok(Async::NotReady) => (), // no new messages; nothing to do - Err(e) => log::error!("{}\n Dropping WebSocket message and continuing.", e), } - Ok(()) }) + .for_each(move |_instant| Ok(())) .then(move |result| { log::info!("WebSocket connection for {:?} closed.", timeline); result diff --git a/src/redis_to_client_stream/receiver/message_queues.rs b/src/redis_to_client_stream/receiver/message_queues.rs index 593f660..9044460 100644 --- a/src/redis_to_client_stream/receiver/message_queues.rs +++ b/src/redis_to_client_stream/receiver/message_queues.rs @@ -4,7 +4,6 @@ use crate::parse_client_request::Timeline; use std::{ collections::{HashMap, VecDeque}, fmt, - time::{Duration, Instant}, }; use uuid::Uuid; @@ -12,52 +11,22 @@ use uuid::Uuid; pub struct MsgQueue { pub timeline: Timeline, pub messages: VecDeque, - last_polled_at: Instant, } impl MsgQueue { pub fn new(timeline: Timeline) -> Self { MsgQueue { messages: VecDeque::new(), - last_polled_at: Instant::now(), + timeline, } } - pub fn update_polled_at_time(&mut self) { - self.last_polled_at = Instant::now(); - } } #[derive(Debug)] pub struct MessageQueues(pub HashMap); -impl MessageQueues { - pub fn calculate_timelines_to_add_or_drop(&mut self, timeline: Timeline) -> Vec { - let mut timelines_to_modify = Vec::new(); - - timelines_to_modify.push(Change { - timeline, - in_subscriber_number: 1, - }); - self.retain(|_id, msg_queue| { - if msg_queue.last_polled_at.elapsed() < Duration::from_secs(30) { - true - } else { - let timeline = &msg_queue.timeline; - timelines_to_modify.push(Change { - timeline: *timeline, - in_subscriber_number: -1, - }); - false - } - }); - timelines_to_modify - } -} -pub struct Change { - pub timeline: Timeline, - pub in_subscriber_number: i32, -} +impl MessageQueues {} impl fmt::Debug for MsgQueue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -66,12 +35,9 @@ impl fmt::Debug for MsgQueue { "\ MsgQueue {{ timeline: {:?}, - messages: {:?}, - last_polled_at: {:?} ago, + messages: {:?}, }}", - self.timeline, - self.messages, - self.last_polled_at.elapsed(), + self.timeline, self.messages, ) } } diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index 8abdaea..2229f2f 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -52,10 +52,6 @@ impl Receiver { /// Assigns the `Receiver` a new timeline to monitor and runs other /// first-time setup. - /// - /// Note: this method calls `subscribe_or_unsubscribe_as_needed`, - /// so Redis PubSub subscriptions are only updated when a new timeline - /// comes under management for the first time. pub fn add_subscription(&mut self, subscription: &Subscription) -> Result<()> { let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); @@ -63,7 +59,41 @@ impl Receiver { self.redis_connection.update_cache(hashtag, id); }; self.msg_queues.insert(subscription.id, MsgQueue::new(tl)); - self.subscribe_or_unsubscribe_as_needed(tl)?; + + let number_of_subscriptions = self + .clients_per_timeline + .entry(tl) + .and_modify(|n| *n += 1) + .or_insert(1); + + use RedisCmd::*; + if *number_of_subscriptions == 1 { + self.redis_connection.send_cmd(Subscribe, &tl)? + }; + + Ok(()) + } + + pub fn remove_subscription(&mut self, subscription: &Subscription) -> Result<()> { + let tl = subscription.timeline; + self.msg_queues.remove(&subscription.id); + let number_of_subscriptions = self + .clients_per_timeline + .entry(tl) + .and_modify(|n| *n -= 1) + .or_insert_with(|| { + log::error!( + "Attempted to unsubscribe from a timeline to which you were not subscribed: {:?}", + tl + ); + 0 + }); + use RedisCmd::*; + if *number_of_subscriptions == 0 { + self.redis_connection.send_cmd(Unsubscribe, &tl)?; + self.clients_per_timeline.remove_entry(&tl); + }; + Ok(()) } @@ -92,13 +122,10 @@ impl Receiver { // If the `msg_queue` being polled has any new messages, return the first (oldest) one match self.msg_queues.get_mut(&id) { - Some(msg_q) => { - msg_q.update_polled_at_time(); - match msg_q.messages.pop_front() { - Some(event) => Ok(Async::Ready(Some(event))), - None => Ok(Async::NotReady), - } - } + Some(msg_q) => match msg_q.messages.pop_front() { + Some(event) => Ok(Async::Ready(Some(event))), + None => Ok(Async::NotReady), + }, None => { log::error!("Polled a MsgQueue that had not been set up. Setting it up now."); self.msg_queues.insert(id, MsgQueue::new(timeline)); @@ -137,31 +164,4 @@ impl Receiver { .fold(0, |acc, el| acc.max(el.messages.len())) ) } - - /// Drop any PubSub subscriptions that don't have active clients and check - /// that there's a subscription to the current one. If there isn't, then - /// subscribe to it. - fn subscribe_or_unsubscribe_as_needed(&mut self, tl: Timeline) -> Result<()> { - let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(tl); - - // Record the lower number of clients subscribed to that channel - for change in timelines_to_modify { - let timeline = change.timeline; - - let count_of_subscribed_clients = self - .clients_per_timeline - .entry(timeline) - .and_modify(|n| *n += change.in_subscriber_number) - .or_insert_with(|| 1); - - // If no clients, unsubscribe from the channel - use RedisCmd::*; - if *count_of_subscribed_clients <= 0 { - self.redis_connection.send_cmd(Unsubscribe, &timeline)?; - } else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 { - self.redis_connection.send_cmd(Subscribe, &timeline)? - } - } - Ok(()) - } } diff --git a/src/redis_to_client_stream/redis/redis_connection/mod.rs b/src/redis_to_client_stream/redis/redis_connection/mod.rs index 3dadf01..bab23ca 100644 --- a/src/redis_to_client_stream/redis/redis_connection/mod.rs +++ b/src/redis_to_client_stream/redis/redis_connection/mod.rs @@ -61,7 +61,8 @@ impl RedisConn { if self.redis_polled_at.elapsed() > self.redis_poll_interval { if let Ok(bytes_read) = self.primary.read(&mut buffer) { self.redis_input.extend_from_slice(&buffer[..bytes_read]); - } + }; + self.redis_polled_at = Instant::now(); } if self.redis_input.is_empty() { return Ok(Async::NotReady);