Resolve memory-use regression (#140)

* Use monotonically increasing channel_id

Using a monotonically increasing channel_id (instead of a Uuid)
reduces memory use under load by ~3%

* Use replace unbounded channels with bounded

This also slightly reduces memory use

* Heap allocate Event

Wrapping the Event struct in an Arc avoids excessive copying and significantly reduces memory use.

* Implement more efficient unsubscribe strategy

* Fix various Clippy lints; bump version

* Update config defaults
This commit is contained in:
Daniel Sockwell 2020-04-24 13:23:59 -04:00 committed by GitHub
parent 2725439110
commit b18500b884
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 57 additions and 49 deletions

2
Cargo.lock generated
View File

@ -416,7 +416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "flodgatt"
version = "0.9.1"
version = "0.9.2"
dependencies = [
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,7 +1,7 @@
[package]
name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.9.1"
version = "0.9.2"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018"

View File

@ -20,7 +20,7 @@ from_env_var!(
from_env_var!(
/// How frequently to poll Redis
let name = RedisInterval;
let default: Duration = Duration::from_millis(10);
let default: Duration = Duration::from_millis(100);
let (env_var, allowed_values) = ("REDIS_FREQ", "a number of milliseconds");
let from_str = |s| s.parse().map(Duration::from_millis).ok();
);

View File

@ -31,7 +31,7 @@ fn main() -> Result<(), Error> {
.map(move |subscription: Subscription, sse: warp::sse::Sse| {
log::info!("Incoming SSE request for {:?}", subscription.timeline);
let mut manager = sse_manager.lock().unwrap_or_else(RedisManager::recover);
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (event_tx, event_rx) = mpsc::channel(10);
manager.subscribe(&subscription, event_tx);
let sse_stream = SseStream::new(subscription);
sse_stream.send_events(sse, event_rx)
@ -46,7 +46,7 @@ fn main() -> Result<(), Error> {
.map(move |subscription: Subscription, ws: Ws2| {
log::info!("Incoming websocket request for {:?}", subscription.timeline);
let mut manager = ws_manager.lock().unwrap_or_else(RedisManager::recover);
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (event_tx, event_rx) = mpsc::channel(10);
manager.subscribe(&subscription, event_tx);
let token = subscription.access_token.clone().unwrap_or_default(); // token sent for security
let ws_stream = WsStream::new(subscription);
@ -99,5 +99,5 @@ fn main() -> Result<(), Error> {
let server_addr = SocketAddr::new(*cfg.address, *cfg.port);
tokio::run(lazy(move || streaming_server().bind(server_addr)));
}
Err(Error::Unrecoverable) // only get here if there's an unrecoverable error in poll_broadcast.
Err(Error::Unrecoverable) // only reached if poll_broadcast encounters an unrecoverable error
}

View File

@ -120,7 +120,7 @@ impl Handler {
pub fn err(r: Rejection) -> std::result::Result<impl warp::Reply, warp::Rejection> {
use StatusCode as Code;
let (msg, code) = match &r.cause().map(|s| s.to_string()).as_deref() {
let (msg, code) = match &r.cause().map(|cause| cause.to_string()).as_deref() {
Some(PgPool::BAD_TOKEN) => (PgPool::BAD_TOKEN, Code::UNAUTHORIZED),
Some(PgPool::PG_NULL) => (PgPool::PG_NULL, Code::BAD_REQUEST),
Some(PgPool::MISSING_HASHTAG) => (PgPool::MISSING_HASHTAG, Code::BAD_REQUEST),

View File

@ -216,5 +216,5 @@ fn get_col_or_reject(row: &postgres::row::SimpleQueryRow, col: usize) -> Rejecta
Ok(row
.try_get(col)
.map_err(reject::custom)?
.ok_or(reject::custom(PgPool::PG_NULL))?)
.ok_or_else(|| reject::custom(PgPool::PG_NULL))?)
}

View File

@ -21,13 +21,9 @@ pub enum Event {
pub(crate) trait Payload {
fn language_unset(&self) -> bool;
fn language(&self) -> String;
fn involved_users(&self) -> HashSet<Id>;
fn author(&self) -> &Id;
fn sent_from(&self) -> &str;
}
@ -97,18 +93,19 @@ impl Event {
})
}
#[rustfmt::skip]
fn payload(&self) -> Option<String> {
use CheckedEvent::*;
match self {
Self::TypeSafe(checked) => match checked {
Update { payload, .. } => Some(escaped(payload)),
Notification { payload, .. } => Some(escaped(payload)),
Delete { payload, .. } => Some(payload.clone()),
Announcement { payload, .. } => Some(escaped(payload)),
Update { payload, .. } => Some(escaped(payload)),
Notification { payload, .. } => Some(escaped(payload)),
Conversation { payload, .. } => Some(escaped(payload)),
Announcement { payload, .. } => Some(escaped(payload)),
AnnouncementReaction { payload, .. } => Some(escaped(payload)),
AnnouncementDelete { payload, .. } => Some(payload.clone()),
Conversation { payload, .. } => Some(escaped(payload)),
FiltersChanged => None,
AnnouncementDelete { payload, .. } => Some(payload.clone()),
Delete { payload, .. } => Some(payload.clone()),
FiltersChanged => None,
},
Self::Dynamic(DynEvent { payload, .. }) => Some(payload.to_string()),
Self::Ping => unreachable!(), // private method only called above

View File

@ -104,7 +104,7 @@ impl RedisConn {
// Store leftover in same buffer and set cursor to start after leftover next time
self.cursor = 0;
for byte in [leftover.as_bytes(), invalid_bytes].concat().iter() {
for byte in &[leftover.as_bytes(), invalid_bytes].concat() {
self.redis_input[self.cursor] = *byte;
self.cursor += 1;
}

View File

@ -15,16 +15,16 @@ use futures::Async;
use hashbrown::HashMap;
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
use std::time::{Duration, Instant};
use tokio::sync::mpsc::UnboundedSender;
use uuid::Uuid;
use tokio::sync::mpsc::Sender;
type Result<T> = std::result::Result<T, Error>;
/// The item that streams from Redis and is polled by the `ClientAgent`
pub struct Manager {
redis_connection: RedisConn,
timelines: HashMap<Timeline, HashMap<Uuid, UnboundedSender<Event>>>,
timelines: HashMap<Timeline, HashMap<u32, Sender<Arc<Event>>>>,
ping_time: Instant,
channel_id: u32,
}
impl Manager {
@ -35,6 +35,7 @@ impl Manager {
redis_connection: RedisConn::new(redis_cfg)?,
timelines: HashMap::new(),
ping_time: Instant::now(),
channel_id: 0,
})
}
@ -42,14 +43,15 @@ impl Manager {
Arc::new(Mutex::new(self))
}
pub fn subscribe(&mut self, subscription: &Subscription, channel: UnboundedSender<Event>) {
pub fn subscribe(&mut self, subscription: &Subscription, channel: Sender<Arc<Event>>) {
let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline);
if let (Some(hashtag), Some(id)) = (tag, tl.tag()) {
self.redis_connection.update_cache(hashtag, id);
};
let channels = self.timelines.entry(tl).or_default();
channels.insert(Uuid::new_v4(), channel);
channels.insert(self.channel_id, channel);
self.channel_id += 1;
if channels.len() == 1 {
self.redis_connection
@ -58,38 +60,45 @@ impl Manager {
};
}
pub(crate) fn unsubscribe(&mut self, tl: &mut Timeline, id: &Uuid) -> Result<()> {
let channels = self.timelines.get_mut(tl).ok_or(Error::InvalidId)?;
channels.remove(id);
pub(crate) fn unsubscribe(&mut self, tl: &mut Timeline) -> Result<()> {
self.redis_connection.send_cmd(RedisCmd::Unsubscribe, &tl)?;
self.timelines.remove(&tl);
if channels.len() == 0 {
self.redis_connection.send_cmd(RedisCmd::Unsubscribe, &tl)?;
self.timelines.remove(&tl);
};
log::info!("Ended stream for {:?}", tl);
Ok(())
}
pub fn poll_broadcast(&mut self) -> Result<()> {
let mut completed_timelines = Vec::new();
let log_send_err = |tl, e| Some(log::error!("cannot send to {:?}: {}", tl, e)).is_some();
if self.ping_time.elapsed() > Duration::from_secs(30) {
self.ping_time = Instant::now();
for (timeline, channels) in self.timelines.iter_mut() {
for (uuid, channel) in channels.iter_mut() {
match channel.try_send(Event::Ping) {
Ok(_) => (),
Err(_) => completed_timelines.push((*timeline, *uuid)),
}
for (tl, channels) in self.timelines.iter_mut() {
channels.retain(|_, chan| match chan.try_send(Arc::new(Event::Ping)) {
Ok(()) => true,
Err(e) if !e.is_closed() => log_send_err(*tl, e),
Err(_) => false,
});
if channels.is_empty() {
completed_timelines.push(*tl);
}
}
};
loop {
match self.redis_connection.poll_redis() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some((tl, event)))) => {
for (uuid, tx) in self.timelines.get_mut(&tl).ok_or(Error::InvalidId)? {
tx.try_send(event.clone())
.unwrap_or_else(|_| completed_timelines.push((tl, *uuid)))
let sendable_event = Arc::new(event);
let channels = self.timelines.get_mut(&tl).ok_or(Error::InvalidId)?;
channels.retain(|_, chan| match chan.try_send(sendable_event.clone()) {
Ok(()) => true,
Err(e) if !e.is_closed() => log_send_err(tl, e),
Err(_) => false,
});
if channels.is_empty() {
completed_timelines.push(tl);
}
}
Ok(Async::Ready(None)) => (), // cmd or msg for other namespace
@ -97,8 +106,8 @@ impl Manager {
}
}
for (tl, channel) in completed_timelines.iter_mut() {
self.unsubscribe(tl, &channel)?;
for tl in &mut completed_timelines {
self.unsubscribe(tl)?;
}
Ok(())
}
@ -111,7 +120,7 @@ impl Manager {
pub fn count(&self) -> String {
format!(
"Current connections: {}",
self.timelines.values().map(|el| el.len()).sum::<usize>()
self.timelines.values().map(HashMap::len).sum::<usize>()
)
}

View File

View File

@ -2,12 +2,13 @@ use super::{Event, Payload};
use crate::request::Subscription;
use futures::stream::Stream;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::Receiver;
use warp::reply::Reply;
use warp::sse::Sse as WarpSse;
type EventRx = UnboundedReceiver<Event>;
type EventRx = Receiver<Arc<Event>>;
pub struct Sse(Subscription);

View File

@ -3,10 +3,11 @@ use crate::request::Subscription;
use futures::future::Future;
use futures::stream::Stream;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use std::sync::Arc;
use tokio::sync::mpsc::{self, Receiver, UnboundedSender};
use warp::ws::{Message, WebSocket};
type EventRx = UnboundedReceiver<Event>;
type EventRx = Receiver<Arc<Event>>;
type MsgTx = UnboundedSender<Message>;
pub struct Ws(Subscription);
@ -39,7 +40,7 @@ impl Ws {
);
event_rx.map_err(|_| ()).for_each(move |event| {
if matches!(event, Event::Ping) {
if matches!(*event, Event::Ping) {
send_msg(&event, &mut ws_tx)?
} else {
match (event.update_payload(), event.dyn_update_payload()) {