From b18500b8843abf7b2f62938f08629d644d40ce6b Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Fri, 24 Apr 2020 13:23:59 -0400 Subject: [PATCH] Resolve memory-use regression (#140) * Use monotonically increasing channel_id Using a monotonically increasing channel_id (instead of a Uuid) reduces memory use under load by ~3% * Use replace unbounded channels with bounded This also slightly reduces memory use * Heap allocate Event Wrapping the Event struct in an Arc avoids excessive copying and significantly reduces memory use. * Implement more efficient unsubscribe strategy * Fix various Clippy lints; bump version * Update config defaults --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/config/redis_cfg_types.rs | 2 +- src/main.rs | 6 +-- src/request.rs | 2 +- src/request/postgres.rs | 2 +- src/response/event.rs | 19 ++++------ src/response/redis/connection.rs | 2 +- src/response/redis/manager.rs | 57 +++++++++++++++++------------ src/response/redis/manager.rs:115:5 | 0 src/response/stream/sse.rs | 5 ++- src/response/stream/ws.rs | 7 ++-- 12 files changed, 57 insertions(+), 49 deletions(-) create mode 100644 src/response/redis/manager.rs:115:5 diff --git a/Cargo.lock b/Cargo.lock index 6356baf..8ae62a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,7 +416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.9.1" +version = "0.9.2" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 7df2294..6946bf4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.9.1" +version = "0.9.2" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/config/redis_cfg_types.rs b/src/config/redis_cfg_types.rs index 0780377..6e07063 100644 --- a/src/config/redis_cfg_types.rs +++ b/src/config/redis_cfg_types.rs @@ -20,7 +20,7 @@ from_env_var!( from_env_var!( /// How frequently to poll Redis let name = RedisInterval; - let default: Duration = Duration::from_millis(10); + let default: Duration = Duration::from_millis(100); let (env_var, allowed_values) = ("REDIS_FREQ", "a number of milliseconds"); let from_str = |s| s.parse().map(Duration::from_millis).ok(); ); diff --git a/src/main.rs b/src/main.rs index 3d9f191..a3c6b21 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,7 @@ fn main() -> Result<(), Error> { .map(move |subscription: Subscription, sse: warp::sse::Sse| { log::info!("Incoming SSE request for {:?}", subscription.timeline); let mut manager = sse_manager.lock().unwrap_or_else(RedisManager::recover); - let (event_tx, event_rx) = mpsc::unbounded_channel(); + let (event_tx, event_rx) = mpsc::channel(10); manager.subscribe(&subscription, event_tx); let sse_stream = SseStream::new(subscription); sse_stream.send_events(sse, event_rx) @@ -46,7 +46,7 @@ fn main() -> Result<(), Error> { .map(move |subscription: Subscription, ws: Ws2| { log::info!("Incoming websocket request for {:?}", subscription.timeline); let mut manager = ws_manager.lock().unwrap_or_else(RedisManager::recover); - let (event_tx, event_rx) = mpsc::unbounded_channel(); + let (event_tx, event_rx) = mpsc::channel(10); manager.subscribe(&subscription, event_tx); let token = subscription.access_token.clone().unwrap_or_default(); // token sent for security let ws_stream = WsStream::new(subscription); @@ -99,5 +99,5 @@ fn main() -> Result<(), Error> { let server_addr = SocketAddr::new(*cfg.address, *cfg.port); tokio::run(lazy(move || streaming_server().bind(server_addr))); } - Err(Error::Unrecoverable) // only get here if there's an unrecoverable error in poll_broadcast. + Err(Error::Unrecoverable) // only reached if poll_broadcast encounters an unrecoverable error } diff --git a/src/request.rs b/src/request.rs index 84c8e0b..2b3e58a 100644 --- a/src/request.rs +++ b/src/request.rs @@ -120,7 +120,7 @@ impl Handler { pub fn err(r: Rejection) -> std::result::Result { use StatusCode as Code; - let (msg, code) = match &r.cause().map(|s| s.to_string()).as_deref() { + let (msg, code) = match &r.cause().map(|cause| cause.to_string()).as_deref() { Some(PgPool::BAD_TOKEN) => (PgPool::BAD_TOKEN, Code::UNAUTHORIZED), Some(PgPool::PG_NULL) => (PgPool::PG_NULL, Code::BAD_REQUEST), Some(PgPool::MISSING_HASHTAG) => (PgPool::MISSING_HASHTAG, Code::BAD_REQUEST), diff --git a/src/request/postgres.rs b/src/request/postgres.rs index b143b0c..5b7a082 100644 --- a/src/request/postgres.rs +++ b/src/request/postgres.rs @@ -216,5 +216,5 @@ fn get_col_or_reject(row: &postgres::row::SimpleQueryRow, col: usize) -> Rejecta Ok(row .try_get(col) .map_err(reject::custom)? - .ok_or(reject::custom(PgPool::PG_NULL))?) + .ok_or_else(|| reject::custom(PgPool::PG_NULL))?) } diff --git a/src/response/event.rs b/src/response/event.rs index 091a250..37c7af1 100644 --- a/src/response/event.rs +++ b/src/response/event.rs @@ -21,13 +21,9 @@ pub enum Event { pub(crate) trait Payload { fn language_unset(&self) -> bool; - fn language(&self) -> String; - fn involved_users(&self) -> HashSet; - fn author(&self) -> &Id; - fn sent_from(&self) -> &str; } @@ -97,18 +93,19 @@ impl Event { }) } + #[rustfmt::skip] fn payload(&self) -> Option { use CheckedEvent::*; match self { Self::TypeSafe(checked) => match checked { - Update { payload, .. } => Some(escaped(payload)), - Notification { payload, .. } => Some(escaped(payload)), - Delete { payload, .. } => Some(payload.clone()), - Announcement { payload, .. } => Some(escaped(payload)), + Update { payload, .. } => Some(escaped(payload)), + Notification { payload, .. } => Some(escaped(payload)), + Conversation { payload, .. } => Some(escaped(payload)), + Announcement { payload, .. } => Some(escaped(payload)), AnnouncementReaction { payload, .. } => Some(escaped(payload)), - AnnouncementDelete { payload, .. } => Some(payload.clone()), - Conversation { payload, .. } => Some(escaped(payload)), - FiltersChanged => None, + AnnouncementDelete { payload, .. } => Some(payload.clone()), + Delete { payload, .. } => Some(payload.clone()), + FiltersChanged => None, }, Self::Dynamic(DynEvent { payload, .. }) => Some(payload.to_string()), Self::Ping => unreachable!(), // private method only called above diff --git a/src/response/redis/connection.rs b/src/response/redis/connection.rs index 557d934..1ac980b 100644 --- a/src/response/redis/connection.rs +++ b/src/response/redis/connection.rs @@ -104,7 +104,7 @@ impl RedisConn { // Store leftover in same buffer and set cursor to start after leftover next time self.cursor = 0; - for byte in [leftover.as_bytes(), invalid_bytes].concat().iter() { + for byte in &[leftover.as_bytes(), invalid_bytes].concat() { self.redis_input[self.cursor] = *byte; self.cursor += 1; } diff --git a/src/response/redis/manager.rs b/src/response/redis/manager.rs index 3c7f5cc..a179115 100644 --- a/src/response/redis/manager.rs +++ b/src/response/redis/manager.rs @@ -15,16 +15,16 @@ use futures::Async; use hashbrown::HashMap; use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; use std::time::{Duration, Instant}; -use tokio::sync::mpsc::UnboundedSender; -use uuid::Uuid; +use tokio::sync::mpsc::Sender; type Result = std::result::Result; /// The item that streams from Redis and is polled by the `ClientAgent` pub struct Manager { redis_connection: RedisConn, - timelines: HashMap>>, + timelines: HashMap>>>, ping_time: Instant, + channel_id: u32, } impl Manager { @@ -35,6 +35,7 @@ impl Manager { redis_connection: RedisConn::new(redis_cfg)?, timelines: HashMap::new(), ping_time: Instant::now(), + channel_id: 0, }) } @@ -42,14 +43,15 @@ impl Manager { Arc::new(Mutex::new(self)) } - pub fn subscribe(&mut self, subscription: &Subscription, channel: UnboundedSender) { + pub fn subscribe(&mut self, subscription: &Subscription, channel: Sender>) { let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); if let (Some(hashtag), Some(id)) = (tag, tl.tag()) { self.redis_connection.update_cache(hashtag, id); }; let channels = self.timelines.entry(tl).or_default(); - channels.insert(Uuid::new_v4(), channel); + channels.insert(self.channel_id, channel); + self.channel_id += 1; if channels.len() == 1 { self.redis_connection @@ -58,38 +60,45 @@ impl Manager { }; } - pub(crate) fn unsubscribe(&mut self, tl: &mut Timeline, id: &Uuid) -> Result<()> { - let channels = self.timelines.get_mut(tl).ok_or(Error::InvalidId)?; - channels.remove(id); + pub(crate) fn unsubscribe(&mut self, tl: &mut Timeline) -> Result<()> { + self.redis_connection.send_cmd(RedisCmd::Unsubscribe, &tl)?; + self.timelines.remove(&tl); - if channels.len() == 0 { - self.redis_connection.send_cmd(RedisCmd::Unsubscribe, &tl)?; - self.timelines.remove(&tl); - }; log::info!("Ended stream for {:?}", tl); Ok(()) } pub fn poll_broadcast(&mut self) -> Result<()> { let mut completed_timelines = Vec::new(); + let log_send_err = |tl, e| Some(log::error!("cannot send to {:?}: {}", tl, e)).is_some(); + if self.ping_time.elapsed() > Duration::from_secs(30) { self.ping_time = Instant::now(); - for (timeline, channels) in self.timelines.iter_mut() { - for (uuid, channel) in channels.iter_mut() { - match channel.try_send(Event::Ping) { - Ok(_) => (), - Err(_) => completed_timelines.push((*timeline, *uuid)), - } + for (tl, channels) in self.timelines.iter_mut() { + channels.retain(|_, chan| match chan.try_send(Arc::new(Event::Ping)) { + Ok(()) => true, + Err(e) if !e.is_closed() => log_send_err(*tl, e), + Err(_) => false, + }); + if channels.is_empty() { + completed_timelines.push(*tl); } } }; + loop { match self.redis_connection.poll_redis() { Ok(Async::NotReady) => break, Ok(Async::Ready(Some((tl, event)))) => { - for (uuid, tx) in self.timelines.get_mut(&tl).ok_or(Error::InvalidId)? { - tx.try_send(event.clone()) - .unwrap_or_else(|_| completed_timelines.push((tl, *uuid))) + let sendable_event = Arc::new(event); + let channels = self.timelines.get_mut(&tl).ok_or(Error::InvalidId)?; + channels.retain(|_, chan| match chan.try_send(sendable_event.clone()) { + Ok(()) => true, + Err(e) if !e.is_closed() => log_send_err(tl, e), + Err(_) => false, + }); + if channels.is_empty() { + completed_timelines.push(tl); } } Ok(Async::Ready(None)) => (), // cmd or msg for other namespace @@ -97,8 +106,8 @@ impl Manager { } } - for (tl, channel) in completed_timelines.iter_mut() { - self.unsubscribe(tl, &channel)?; + for tl in &mut completed_timelines { + self.unsubscribe(tl)?; } Ok(()) } @@ -111,7 +120,7 @@ impl Manager { pub fn count(&self) -> String { format!( "Current connections: {}", - self.timelines.values().map(|el| el.len()).sum::() + self.timelines.values().map(HashMap::len).sum::() ) } diff --git a/src/response/redis/manager.rs:115:5 b/src/response/redis/manager.rs:115:5 new file mode 100644 index 0000000..e69de29 diff --git a/src/response/stream/sse.rs b/src/response/stream/sse.rs index 973c04c..2375308 100644 --- a/src/response/stream/sse.rs +++ b/src/response/stream/sse.rs @@ -2,12 +2,13 @@ use super::{Event, Payload}; use crate::request::Subscription; use futures::stream::Stream; +use std::sync::Arc; use std::time::Duration; -use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc::Receiver; use warp::reply::Reply; use warp::sse::Sse as WarpSse; -type EventRx = UnboundedReceiver; +type EventRx = Receiver>; pub struct Sse(Subscription); diff --git a/src/response/stream/ws.rs b/src/response/stream/ws.rs index e4d38e1..515e572 100644 --- a/src/response/stream/ws.rs +++ b/src/response/stream/ws.rs @@ -3,10 +3,11 @@ use crate::request::Subscription; use futures::future::Future; use futures::stream::Stream; -use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use std::sync::Arc; +use tokio::sync::mpsc::{self, Receiver, UnboundedSender}; use warp::ws::{Message, WebSocket}; -type EventRx = UnboundedReceiver; +type EventRx = Receiver>; type MsgTx = UnboundedSender; pub struct Ws(Subscription); @@ -39,7 +40,7 @@ impl Ws { ); event_rx.map_err(|_| ()).for_each(move |event| { - if matches!(event, Event::Ping) { + if matches!(*event, Event::Ping) { send_msg(&event, &mut ws_tx)? } else { match (event.update_payload(), event.dyn_update_payload()) {