From 4a13412f987b37f20906f22d73c134c98ab1cbe9 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Mon, 27 Apr 2020 16:03:05 -0400 Subject: [PATCH] Improve handling of large Redis input (#143) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Implement faster buffered input This commit implements a modified ring buffer for input from Redis. Specifically, Flodgatt now limits the amount of data it fetches from Redis in one syscall to 8 KiB (two pages on most systems). Flodgatt will process all complete messages it receives from Redis and then re-use the same buffer for the next time it retrieves data. If Flodgatt received a partial message, it will copy the partial message to the beginning of the buffer before its next read. This change has little effect on Flodgatt under light load (because it was rare for Redis to have more than 8 KiB of messages available at any one time). However, my hope is that this will significantly reduce memory use on the largest instances. * Improve handling of backpresure This commit alters how Flodgatt behaves if it receives enough messages for a single client to fill that clients channel. (Because the clients regularly send their messages, should only occur if a single client receives a large number of messages nearly simultaneously; this is rare, but could occur, especially on large instances). Previously, Flodgatt would drop messages in the rare case when the client's channel was full. Now, Flodgatt will pause the current Redis poll and yield control back to the client streams, allowing the clients to empty their channels; Flodgatt will then resume polling Redis/sending the messages it previously received. With the approach, Flodgatt will never drop messages. However, the risk to this approach is that, by never dropping messages, Flodgatt does not have any way to reduce the amount of work it needs to do when under heavy load – it delays the work slightly, but doesn't reduce it. What this means is that it would be *theoretically* possible for Flodgatt to fall increasingly behind, if it is continuously receiving more messages than it can process. Due to how quickly Flodgatt can process messages, though, I suspect this would only come up if an admin were running Flodgatt in a *significantly* resource constrained environment, but I wanted to mention it for the sake of completeness. This commit also adds a new /status/backpressure endpoint that displays the current length of the Redis input buffer (which should typically be low or 0). Like the other /status endpoints, this endpoint is only enabled when Flodgatt is compiled with the `stub_status` feature. --- Cargo.lock | 2 +- Cargo.toml | 2 +- benches/parse_redis.rs | 74 ++++++------ src/err.rs | 6 - src/main.rs | 17 ++- src/request.rs | 4 + src/request/timeline.rs | 1 - src/response.rs | 2 +- src/response/redis.rs | 83 +++++++------ src/response/redis/connection.rs | 142 +++++++++------------- src/response/redis/connection/err.rs | 2 +- src/response/redis/manager.rs | 174 +++++++++++++++++---------- src/response/redis/manager/err.rs | 18 +-- src/response/redis/msg.rs | 5 +- src/response/stream/ws.rs | 7 +- 15 files changed, 284 insertions(+), 255 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0cf4dd3..86deab7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,7 +416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.9.4" +version = "0.9.6" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index d7e1d43..abda086 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.9.4" +version = "0.9.6" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/benches/parse_redis.rs b/benches/parse_redis.rs index f5f1980..48c6aeb 100644 --- a/benches/parse_redis.rs +++ b/benches/parse_redis.rs @@ -1,9 +1,5 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use flodgatt::{ - event::*, - request::{Content::*, Reach::*, Stream::*, Timeline}, - response::{RedisMsg, RedisParseOutput}, -}; +use flodgatt::response::{RedisMsg, RedisParseOutput}; use lru::LruCache; use std::convert::TryFrom; @@ -16,27 +12,27 @@ fn parse_long_redis_input<'a>(input: &'a str) -> RedisMsg<'a> { } } -fn parse_to_timeline(msg: RedisMsg) -> Timeline { - let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..]; - let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap(); - assert_eq!(tl, Timeline(User(Id(1)), Federated, All)); - tl -} -fn parse_to_checked_event(msg: RedisMsg) -> EventKind { - EventKind::TypeSafe(serde_json::from_str(msg.event_txt).unwrap()) -} +// fn parse_to_timeline(msg: RedisMsg) -> Timeline { +// let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..]; +// let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap(); +// assert_eq!(tl, Timeline(User(Id(1)), Federated, All)); +// tl +// } +// fn parse_to_checked_event(msg: RedisMsg) -> EventKind { +// EventKind::TypeSafe(serde_json::from_str(msg.event_txt).unwrap()) +// } -fn parse_to_dyn_event(msg: RedisMsg) -> EventKind { - EventKind::Dynamic(serde_json::from_str(msg.event_txt).unwrap()) -} +// fn parse_to_dyn_event(msg: RedisMsg) -> EventKind { +// EventKind::Dynamic(serde_json::from_str(msg.event_txt).unwrap()) +// } -fn redis_msg_to_event_string(msg: RedisMsg) -> String { - msg.event_txt.to_string() -} +// fn redis_msg_to_event_string(msg: RedisMsg) -> String { +// msg.event_txt.to_string() +// } -fn string_to_checked_event(event_txt: &String) -> EventKind { - EventKind::TypeSafe(serde_json::from_str(event_txt).unwrap()) -} +// fn string_to_checked_event(event_txt: &String) -> EventKind { +// EventKind::TypeSafe(serde_json::from_str(event_txt).unwrap()) +// } fn criterion_benchmark(c: &mut Criterion) { let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS; @@ -46,25 +42,25 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| black_box(parse_long_redis_input(input))) }); - let msg = parse_long_redis_input(input); - group.bench_function("parse RedisMsg to Timeline", |b| { - b.iter(|| black_box(parse_to_timeline(msg.clone()))) - }); + // let msg = parse_long_redis_input(input); + // group.bench_function("parse RedisMsg to Timeline", |b| { + // b.iter(|| black_box(parse_to_timeline(msg.clone()))) + // }); - group.bench_function("parse RedisMsg -> DynamicEvent", |b| { - b.iter(|| black_box(parse_to_dyn_event(msg.clone()))) - }); + // group.bench_function("parse RedisMsg -> DynamicEvent", |b| { + // b.iter(|| black_box(parse_to_dyn_event(msg.clone()))) + // }); - group.bench_function("parse RedisMsg -> CheckedEvent", |b| { - b.iter(|| black_box(parse_to_checked_event(msg.clone()))) - }); + // group.bench_function("parse RedisMsg -> CheckedEvent", |b| { + // b.iter(|| black_box(parse_to_checked_event(msg.clone()))) + // }); - group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| { - b.iter(|| { - let txt = black_box(redis_msg_to_event_string(msg.clone())); - black_box(string_to_checked_event(&txt)); - }) - }); + // group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| { + // b.iter(|| { + // let txt = black_box(redis_msg_to_event_string(msg.clone())); + // black_box(string_to_checked_event(&txt)); + // }) + // }); } criterion_group!(benches, criterion_benchmark); diff --git a/src/err.rs b/src/err.rs index bb4cf61..e9120d0 100644 --- a/src/err.rs +++ b/src/err.rs @@ -13,12 +13,6 @@ pub enum Error { Config(config::Error), } -impl Error { - pub fn log(msg: impl fmt::Display) { - eprintln!("{}", msg); - } -} - impl std::error::Error for Error {} impl fmt::Debug for Error { diff --git a/src/main.rs b/src/main.rs index 3d151c1..87ae0a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,8 @@ use flodgatt::request::{Handler, Subscription}; use flodgatt::response::{RedisManager, SseStream, WsStream}; use flodgatt::Error; -use futures::{future::lazy, stream::Stream as _}; +use futures::future::lazy; +use futures::stream::Stream as _; use std::fs; use std::net::SocketAddr; use std::os::unix::fs::PermissionsExt; @@ -61,10 +62,12 @@ fn main() -> Result<(), Error> { #[cfg(feature = "stub_status")] #[rustfmt::skip] let status = { - let (r1, r3) = (shared_manager.clone(), shared_manager.clone()); + let (r1, r2, r3) = (shared_manager.clone(), shared_manager.clone(), shared_manager.clone()); request.health().map(|| "OK") .or(request.status() .map(move || r1.lock().unwrap_or_else(RedisManager::recover).count())) + .or(request.status_backpresure() + .map(move || r2.lock().unwrap_or_else(RedisManager::recover).backpresure())) .or(request.status_per_timeline() .map(move || r3.lock().unwrap_or_else(RedisManager::recover).list())) }; @@ -80,10 +83,12 @@ fn main() -> Result<(), Error> { let manager = shared_manager.clone(); let stream = Interval::new(Instant::now(), poll_freq) .map_err(|e| log::error!("{}", e)) - .for_each(move |_| { - let mut manager = manager.lock().unwrap_or_else(RedisManager::recover); - manager.poll_broadcast().map_err(Error::log) - }); + .for_each( + move |_| match manager.lock().unwrap_or_else(RedisManager::recover).poll() { + Err(e) => Ok(log::error!("{}", e)), + Ok(_) => Ok(()), + }, + ); warp::spawn(lazy(move || stream)); warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err)) diff --git a/src/request.rs b/src/request.rs index 2b3e58a..bd5c435 100644 --- a/src/request.rs +++ b/src/request.rs @@ -118,6 +118,10 @@ impl Handler { warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline").boxed() } + pub fn status_backpresure(&self) -> BoxedFilter<()> { + warp::path!("api" / "v1" / "streaming" / "status" / "backpresure").boxed() + } + pub fn err(r: Rejection) -> std::result::Result { use StatusCode as Code; let (msg, code) = match &r.cause().map(|cause| cause.to_string()).as_deref() { diff --git a/src/request/timeline.rs b/src/request/timeline.rs index 154a839..87b96d0 100644 --- a/src/request/timeline.rs +++ b/src/request/timeline.rs @@ -35,7 +35,6 @@ impl Timeline { } pub(crate) fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> Result { - // TODO -- does this need to account for namespaces? use {Content::*, Error::*, Reach::*, Stream::*}; Ok(match self { diff --git a/src/response.rs b/src/response.rs index be209c3..85b2898 100644 --- a/src/response.rs +++ b/src/response.rs @@ -14,4 +14,4 @@ mod stream; pub use redis::Error; #[cfg(feature = "bench")] -pub use redis::msg::{RedisMsg, RedisParseOutput}; +pub use redis::{RedisMsg, RedisParseOutput}; diff --git a/src/response/redis.rs b/src/response/redis.rs index 3f44f4d..6ce2ea7 100644 --- a/src/response/redis.rs +++ b/src/response/redis.rs @@ -7,6 +7,9 @@ pub(self) use connection::RedisConn; pub use manager::Error; pub use manager::Manager; +#[cfg(feature = "bench")] +pub use msg::{RedisMsg, RedisParseOutput}; + use connection::RedisConnErr; use msg::RedisParseErr; @@ -16,44 +19,50 @@ enum RedisCmd { } impl RedisCmd { - fn into_sendable(self, tl: &str) -> (Vec, Vec) { + fn into_sendable(self, timelines: &[String]) -> (Vec, Vec) { match self { - RedisCmd::Subscribe => ( - [ - b"*2\r\n$9\r\nsubscribe\r\n$", - tl.len().to_string().as_bytes(), - b"\r\n", - tl.as_bytes(), - b"\r\n", - ] - .concat(), - [ - b"*3\r\n$3\r\nSET\r\n$", - (tl.len() + "subscribed:".len()).to_string().as_bytes(), - b"\r\nsubscribed:", - tl.to_string().as_bytes(), - b"\r\n$1\r\n1\r\n", - ] - .concat(), - ), - RedisCmd::Unsubscribe => ( - [ - b"*2\r\n$11\r\nunsubscribe\r\n$", - tl.len().to_string().as_bytes(), - b"\r\n", - tl.as_bytes(), - b"\r\n", - ] - .concat(), - [ - b"*3\r\n$3\r\nSET\r\n$", - (tl.len() + "subscribed:".len()).to_string().as_bytes(), - b"\r\nsubscribed:", - tl.to_string().as_bytes(), - b"\r\n$1\r\n0\r\n", - ] - .concat(), - ), + RedisCmd::Subscribe => { + let primary = { + let mut cmd = format!("*{}\r\n$9\r\nsubscribe\r\n", 1 + timelines.len()); + for tl in timelines { + cmd.push_str(&format!("${}\r\n{}\r\n", tl.len(), tl)); + } + cmd + }; + let secondary = { + let mut cmd = format!("*{}\r\n$4\r\nMSET\r\n", 1 + timelines.len()); + for tl in timelines { + cmd.push_str(&format!( + "${}\r\nsubscribed:{}\r\n$1\r\n$1\r\n", + "subscribed:".len() + tl.len(), + tl + )); + } + cmd + }; + (primary.as_bytes().to_vec(), secondary.as_bytes().to_vec()) + } + RedisCmd::Unsubscribe => { + let primary = { + let mut cmd = format!("*{}\r\n$11\r\nunsubscribe\r\n", 1 + timelines.len()); + for tl in timelines { + cmd.push_str(&format!("${}\r\n{}\r\n", tl.len(), tl)); + } + cmd + }; + let secondary = { + let mut cmd = format!("*{}\r\n$4\r\nMSET\r\n", 1 + timelines.len()); + for tl in timelines { + cmd.push_str(&format!( + "${}\r\nsubscribed:{}\r\n$1\r\n$0\r\n", + "subscribed:".len() + tl.len(), + tl + )); + } + cmd + }; + (primary.as_bytes().to_vec(), secondary.as_bytes().to_vec()) + } } } } diff --git a/src/response/redis/connection.rs b/src/response/redis/connection.rs index 49c8cfe..c7edbf4 100644 --- a/src/response/redis/connection.rs +++ b/src/response/redis/connection.rs @@ -1,19 +1,15 @@ mod err; pub(crate) use err::RedisConnErr; -use super::msg::{RedisParseErr, RedisParseOutput}; use super::Error as ManagerErr; -use super::Event; use super::RedisCmd; use crate::config::Redis; use crate::request::Timeline; use futures::{Async, Poll}; use lru::LruCache; -use std::convert::{TryFrom, TryInto}; use std::io::{self, Read, Write}; use std::net::TcpStream; -use std::str; use std::time::Duration; type Result = std::result::Result; @@ -22,11 +18,13 @@ type Result = std::result::Result; pub(super) struct RedisConn { primary: TcpStream, secondary: TcpStream, - redis_namespace: Option, - tag_id_cache: LruCache, + pub(super) namespace: Option, + // TODO: eventually, it might make sense to have Mastodon publish to timelines with + // the tag number instead of the tag name. This would save us from dealing + // with a cache here and would be consistent with how lists/users are handled. + pub(super) tag_id_cache: LruCache, tag_name_cache: LruCache, - redis_input: Vec, - cursor: usize, + pub(super) input: Vec, } impl RedisConn { @@ -36,79 +34,32 @@ impl RedisConn { let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?; conn.set_nonblocking(true) .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let redis_conn = Self { + Ok(Self { primary: conn, secondary: Self::new_connection(&addr, redis_cfg.password.as_ref())?, tag_id_cache: LruCache::new(1000), tag_name_cache: LruCache::new(1000), - // TODO: eventually, it might make sense to have Mastodon publish to timelines with - // the tag number instead of the tag name. This would save us from dealing - // with a cache here and would be consistent with how lists/users are handled. - redis_namespace: redis_cfg.namespace.clone().0, - redis_input: vec![0_u8; 5000], - cursor: 0, - }; - Ok(redis_conn) + namespace: redis_cfg.namespace.clone().0, + input: vec![0; 4096 * 4], + }) } - - pub(super) fn poll_redis(&mut self) -> Poll, ManagerErr> { - loop { - match self.primary.read(&mut self.redis_input[self.cursor..]) { - Ok(n) => { - self.cursor += n; - if self.redis_input.len() - 1 == self.cursor { - self.redis_input.resize(self.redis_input.len() * 2, 0); - } else { - break; - } - } - Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => { - break; - } - Err(e) => break log::error!("{}", e), - }; + pub(super) fn poll_redis(&mut self, start: usize) -> Poll { + const BLOCK: usize = 4096 * 2; + if self.input.len() < start + BLOCK { + self.input.resize(self.input.len() * 2, 0); + log::info!("Resizing input buffer to {} KiB.", self.input.len() / 1024); + // log::info!("Current buffer: {}", String::from_utf8_lossy(&self.input)); } - // at this point, we have the raw bytes; now, parse a msg - let input = &self.redis_input[..self.cursor]; - - let (input, invalid_bytes) = str::from_utf8(&input) - .map(|input| (input, "".as_bytes())) - .unwrap_or_else(|e| { - let (valid, invalid) = input.split_at(e.valid_up_to()); - (str::from_utf8(valid).expect("Guaranteed by ^^^^"), invalid) - }); - - use {Async::*, RedisParseOutput::*}; - let (res, leftover) = match RedisParseOutput::try_from(input) { - Ok(Msg(msg)) => match &self.redis_namespace { - Some(ns) if msg.timeline_txt.starts_with(&[ns, ":timeline:"].concat()) => { - let trimmed_tl = &msg.timeline_txt[ns.len() + ":timeline:".len()..]; - let tl = Timeline::from_redis_text(trimmed_tl, &mut self.tag_id_cache)?; - let event = msg.event_txt.try_into()?; - (Ok(Ready(Some((tl, event)))), (msg.leftover_input)) - } - None => { - let trimmed_tl = &msg.timeline_txt["timeline:".len()..]; - let tl = Timeline::from_redis_text(trimmed_tl, &mut self.tag_id_cache)?; - let event = msg.event_txt.try_into()?; - (Ok(Ready(Some((tl, event)))), (msg.leftover_input)) - } - Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input), - }, - - Ok(NonMsg(leftover)) => (Ok(Ready(None)), leftover), - Err(RedisParseErr::Incomplete) => (Ok(NotReady), input), - Err(other_parse_err) => (Err(ManagerErr::RedisParseErr(other_parse_err)), input), - }; - - // Store leftover in same buffer and set cursor to start after leftover next time - self.cursor = 0; - for byte in &[leftover.as_bytes(), invalid_bytes].concat() { - self.redis_input[self.cursor] = *byte; - self.cursor += 1; + use Async::*; + match self.primary.read(&mut self.input[start..start + BLOCK]) { + Ok(n) => Ok(Ready(n)), + Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => Ok(NotReady), + Err(e) => { + Ready(log::error!("{}", e)); + Ok(Ready(0)) + } } - res } pub(super) fn update_cache(&mut self, hashtag: String, id: i64) { @@ -116,15 +67,20 @@ impl RedisConn { self.tag_name_cache.put(id, hashtag); } - pub(crate) fn send_cmd(&mut self, cmd: RedisCmd, timeline: &Timeline) -> Result<()> { - let namespace = self.redis_namespace.take(); - let hashtag = timeline.tag().and_then(|id| self.tag_name_cache.get(&id)); - let tl = match &namespace { - Some(ns) => format!("{}:{}", ns, timeline.to_redis_raw_timeline(hashtag)?), - None => timeline.to_redis_raw_timeline(hashtag)?, - }; + pub(crate) fn send_cmd(&mut self, cmd: RedisCmd, timelines: &[Timeline]) -> Result<()> { + let namespace = self.namespace.take(); + let timelines: Result> = timelines + .iter() + .map(|tl| { + let hashtag = tl.tag().and_then(|id| self.tag_name_cache.get(&id)); + match &namespace { + Some(ns) => Ok(format!("{}:{}", ns, tl.to_redis_raw_timeline(hashtag)?)), + None => Ok(tl.to_redis_raw_timeline(hashtag)?), + } + }) + .collect(); - let (primary_cmd, secondary_cmd) = cmd.into_sendable(&tl); + let (primary_cmd, secondary_cmd) = cmd.into_sendable(&timelines?[..]); self.primary.write_all(&primary_cmd)?; // We also need to set a key to tell the Puma server that we've subscribed or @@ -145,6 +101,7 @@ impl RedisConn { Self::validate_connection(&mut conn, &addr)?; conn.set_read_timeout(Some(Duration::from_millis(10))) .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + Self::set_connection_name(&mut conn, &addr)?; Ok(conn) } @@ -172,14 +129,27 @@ impl RedisConn { fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<()> { conn.write_all(b"PING\r\n") .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let mut buffer = vec![0_u8; 7]; - conn.read_exact(&mut buffer) + let mut buffer = vec![0_u8; 100]; + conn.read(&mut buffer) .map_err(|e| RedisConnErr::with_addr(&addr, e))?; let reply = String::from_utf8_lossy(&buffer); match &*reply { - "+PONG\r\n" => Ok(()), - "-NOAUTH" => Err(RedisConnErr::MissingPassword), - "HTTP/1." => Err(RedisConnErr::NotRedis(addr.to_string())), + r if r.starts_with("+PONG\r\n") => Ok(()), + r if r.starts_with("-NOAUTH") => Err(RedisConnErr::MissingPassword), + r if r.starts_with("HTTP/1.") => Err(RedisConnErr::NotRedis(addr.to_string())), + _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), + } + } + + fn set_connection_name(conn: &mut TcpStream, addr: &str) -> Result<()> { + conn.write_all(b"*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$8\r\nflodgatt\r\n") + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let mut buffer = vec![0_u8; 100]; + conn.read(&mut buffer) + .map_err(|e| RedisConnErr::with_addr(&addr, e))?; + let reply = String::from_utf8_lossy(&buffer); + match &*reply { + r if r.starts_with("+OK\r\n") => Ok(()), _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), } } diff --git a/src/response/redis/connection/err.rs b/src/response/redis/connection/err.rs index 893dd30..da98940 100644 --- a/src/response/redis/connection/err.rs +++ b/src/response/redis/connection/err.rs @@ -31,7 +31,7 @@ impl fmt::Display for RedisConnErr { addr, inner ), InvalidRedisReply(unexpected_reply) => format!( - "Received and unexpected reply from Redis. Expected `+PONG` reply but got `{}`", + "Received and unexpected reply from Redis: `{}`", unexpected_reply ), UnknownRedisErr(io_err) => { diff --git a/src/response/redis/manager.rs b/src/response/redis/manager.rs index 4a3d3f1..ceaf243 100644 --- a/src/response/redis/manager.rs +++ b/src/response/redis/manager.rs @@ -4,37 +4,111 @@ mod err; pub use err::Error; -use super::Event; -use super::{RedisCmd, RedisConn}; +use super::msg::{RedisParseErr, RedisParseOutput}; +use super::{Event, RedisCmd, RedisConn}; use crate::config; use crate::request::{Subscription, Timeline}; pub(self) use super::EventErr; -use futures::Async; +use futures::{Async, Poll, Stream}; use hashbrown::{HashMap, HashSet}; +use std::convert::{TryFrom, TryInto}; +use std::str; use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; use std::time::{Duration, Instant}; use tokio::sync::mpsc::Sender; type Result = std::result::Result; +type EventChannel = Sender>; /// The item that streams from Redis and is polled by the `ClientAgent` pub struct Manager { - redis_connection: RedisConn, - timelines: HashMap>>>, + redis_conn: RedisConn, + timelines: HashMap>, ping_time: Instant, channel_id: u32, + unread_idx: (usize, usize), +} + +impl Stream for Manager { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll, Error> { + if self.ping_time.elapsed() > Duration::from_secs(30) { + self.send_pings()? + } + + while let Async::Ready(msg_len) = self.redis_conn.poll_redis(self.unread_idx.1)? { + self.unread_idx.1 += msg_len; + let input = &self.redis_conn.input[..self.unread_idx.1]; + let mut unread = str::from_utf8(input).unwrap_or_else(|e| { + str::from_utf8(input.split_at(e.valid_up_to()).0).expect("guaranteed by `split_at`") + }); + + while !unread.is_empty() { + let tag_id_cache = &mut self.redis_conn.tag_id_cache; + let redis_namespace = &self.redis_conn.namespace; + + use {Error::InvalidId, RedisParseOutput::*}; + unread = match RedisParseOutput::try_from(unread) { + Ok(Msg(msg)) => { + let trimmed_tl = match redis_namespace { + Some(ns) if msg.timeline_txt.starts_with(ns) => { + Some(&msg.timeline_txt[ns.len() + ":timeline:".len()..]) + } + None => Some(&msg.timeline_txt["timeline:".len()..]), + Some(_non_matching_ns) => None, + }; + + if let Some(trimmed_tl) = trimmed_tl { + let tl = Timeline::from_redis_text(trimmed_tl, tag_id_cache)?; + let event: Arc = Arc::new(msg.event_txt.try_into()?); + let channels = self.timelines.get_mut(&tl).ok_or(InvalidId)?; + for (_id, channel) in channels { + if let Ok(Async::NotReady) = channel.poll_ready() { + log::warn!("{:?} channel full", tl); + return Ok(Async::NotReady); + } + let _ = channel.try_send(event.clone()); // err just means channel will be closed + } + } else { + // skip messages for different Redis namespaces + } + msg.leftover_input + } + Ok(NonMsg(leftover_input)) => leftover_input, + Err(RedisParseErr::Incomplete) => { + log::info!("Copying partial message"); + let (read, unread) = self.redis_conn.input[..self.unread_idx.1] + .split_at_mut(self.unread_idx.0); + for (i, b) in unread.iter().enumerate() { + read[i] = *b; + } + self.unread_idx = (0, unread.len()); + break; + } + Err(e) => Err(e)?, + }; + self.unread_idx.0 = self.unread_idx.1 - unread.len(); + } + + self.unread_idx = (0, 0) // reaching here means last msg was complete; reuse the full buffer + } + Ok(Async::Ready(Some(()))) + } } impl Manager { /// Create a new `Manager`, with its own Redis connections (but no active subscriptions). pub fn try_from(redis_cfg: &config::Redis) -> Result { Ok(Self { - redis_connection: RedisConn::new(redis_cfg)?, + redis_conn: RedisConn::new(redis_cfg)?, timelines: HashMap::new(), ping_time: Instant::now(), channel_id: 0, + unread_idx: (0, 0), }) } @@ -42,10 +116,10 @@ impl Manager { Arc::new(Mutex::new(self)) } - pub fn subscribe(&mut self, subscription: &Subscription, channel: Sender>) { + pub fn subscribe(&mut self, subscription: &Subscription, channel: EventChannel) { let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); if let (Some(hashtag), Some(id)) = (tag, tl.tag()) { - self.redis_connection.update_cache(hashtag, id); + self.redis_conn.update_cache(hashtag, id); }; let channels = self.timelines.entry(tl).or_default(); @@ -53,66 +127,37 @@ impl Manager { self.channel_id += 1; if channels.len() == 1 { - self.redis_connection - .send_cmd(RedisCmd::Subscribe, &tl) + self.redis_conn + .send_cmd(RedisCmd::Subscribe, &[tl]) .unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e)); + log::info!("Subscribed to {:?}", tl); }; } - pub(crate) fn unsubscribe(&mut self, tl: &Timeline) -> Result<()> { - self.redis_connection.send_cmd(RedisCmd::Unsubscribe, &tl)?; - self.timelines.remove(&tl); - Ok(log::info!("Ended stream for {:?}", tl)) - } + fn send_pings(&mut self) -> Result<()> { + // NOTE: this takes two cycles to close a connection after the client times out: on + // the first cycle, this successfully sends the Event to the response::Ws thread but + // that thread fatally errors sending to the client. On the *second* cycle, this + // gets the error. This isn't ideal, but is harmless. - pub fn poll_broadcast(&mut self) -> Result<()> { - let mut completed_timelines = HashSet::new(); - let log_send_err = |tl, e| Some(log::error!("cannot send to {:?}: {}", tl, e)).is_some(); + self.ping_time = Instant::now(); + let mut subscriptions_to_close = HashSet::new(); + self.timelines.retain(|tl, channels| { + channels.retain(|_, chan| chan.try_send(Arc::new(Event::Ping)).is_ok()); - if self.ping_time.elapsed() > Duration::from_secs(30) { - self.ping_time = Instant::now(); - for (tl, channels) in self.timelines.iter_mut() { - channels.retain(|_, chan| match chan.try_send(Arc::new(Event::Ping)) { - Ok(()) => true, - Err(e) if !e.is_closed() => log_send_err(*tl, e), - Err(_) => false, - }); - - // NOTE: this takes two cycles to close a connection after the client - // times out: on the first cycle, this fn sends the Event to the - // response::Ws thread without any error, but that thread encounters an - // error sending to the client and ends. On the *second* cycle, this fn - // gets the error it's waiting on to clean up the connection. This isn't - // ideal, but is harmless, since the only reason we haven't cleaned up the - // connection is that no messages are being sent to that client. - if channels.is_empty() { - completed_timelines.insert(*tl); - } + if channels.is_empty() { + subscriptions_to_close.insert(*tl); + false + } else { + true } - }; - - loop { - match self.redis_connection.poll_redis() { - Ok(Async::NotReady) => break, - Ok(Async::Ready(Some((tl, event)))) => { - let sendable_event = Arc::new(event); - let channels = self.timelines.get_mut(&tl).ok_or(Error::InvalidId)?; - channels.retain(|_, chan| match chan.try_send(sendable_event.clone()) { - Ok(()) => true, - Err(e) if !e.is_closed() => log_send_err(tl, e), - Err(_) => false, - }); - if channels.is_empty() { - completed_timelines.insert(tl); - } - } - Ok(Async::Ready(None)) => (), // cmd or msg for other namespace - Err(err) => log::error!("{}", err), // drop msg, log err, and proceed - } - } - - for tl in &mut completed_timelines.iter() { - self.unsubscribe(tl)?; + }); + if !subscriptions_to_close.is_empty() { + let timelines: Vec<_> = subscriptions_to_close.into_iter().collect(); + &self + .redis_conn + .send_cmd(RedisCmd::Unsubscribe, &timelines[..])?; + log::info!("Unsubscribed from {:?}", timelines); } Ok(()) } @@ -129,6 +174,13 @@ impl Manager { ) } + pub fn backpresure(&self) -> String { + format!( + "Input buffer size: {} KiB", + (self.unread_idx.1 - self.unread_idx.0) / 1024 + ) + } + pub fn list(&self) -> String { let max_len = self .timelines diff --git a/src/response/redis/manager/err.rs b/src/response/redis/manager/err.rs index 9f7dcad..b4dc064 100644 --- a/src/response/redis/manager/err.rs +++ b/src/response/redis/manager/err.rs @@ -1,18 +1,18 @@ use super::super::{RedisConnErr, RedisParseErr}; use super::{Event, EventErr}; -use crate::request::{Timeline, TimelineErr}; +use crate::request::TimelineErr; use std::fmt; +use std::sync::Arc; + #[derive(Debug)] pub enum Error { InvalidId, - TimelineErr(TimelineErr), EventErr(EventErr), RedisParseErr(RedisParseErr), RedisConnErr(RedisConnErr), - ChannelSendErr(tokio::sync::watch::error::SendError<(Timeline, Event)>), - ChannelSendErr2(tokio::sync::mpsc::error::UnboundedTrySendError), + ChannelSendErr(tokio::sync::mpsc::error::TrySendError>), } impl std::error::Error for Error {} @@ -30,22 +30,16 @@ impl fmt::Display for Error { RedisConnErr(inner) => write!(f, "{}", inner), TimelineErr(inner) => write!(f, "{}", inner), ChannelSendErr(inner) => write!(f, "{}", inner), - ChannelSendErr2(inner) => write!(f, "{}", inner), }?; Ok(()) } } -impl From> for Error { - fn from(error: tokio::sync::watch::error::SendError<(Timeline, Event)>) -> Self { +impl From>> for Error { + fn from(error: tokio::sync::mpsc::error::TrySendError>) -> Self { Self::ChannelSendErr(error) } } -impl From> for Error { - fn from(error: tokio::sync::mpsc::error::UnboundedTrySendError) -> Self { - Self::ChannelSendErr2(error) - } -} impl From for Error { fn from(error: EventErr) -> Self { diff --git a/src/response/redis/msg.rs b/src/response/redis/msg.rs index bfa2344..8af186b 100644 --- a/src/response/redis/msg.rs +++ b/src/response/redis/msg.rs @@ -82,8 +82,11 @@ fn utf8_to_redis_data<'a>(s: &'a str) -> Result<(RedisData, &'a str), RedisParse fn after_newline_at(s: &str, start: usize) -> RedisParser<&str> { let s = s.get(start..).ok_or(Incomplete)?; + if s.len() < 2 { + Err(Incomplete)?; + } if !s.starts_with("\r\n") { - return Err(RedisParseErr::InvalidLineEnd); + Err(InvalidLineEnd)?; } Ok(s.get("\r\n".len()..).ok_or(Incomplete)?) } diff --git a/src/response/stream/ws.rs b/src/response/stream/ws.rs index e5b1227..f350def 100644 --- a/src/response/stream/ws.rs +++ b/src/response/stream/ws.rs @@ -50,9 +50,12 @@ impl Ws { e => log::warn!("WebSocket send error: {}", e), }) } - fn filtered(&mut self, update: &impl Payload) -> bool { + fn filtered(&mut self, update: &T) -> bool { let (blocks, allowed_langs) = (&self.0.blocks, &self.0.allowed_langs); - let skip = |msg| Some(log::info!("{:?} msg skipped - {}", self.0.timeline, msg)).is_some(); + let skip = |msg| { + // Some(log::info!("{:?} msg skipped - {}\n{:?}", self.0.timeline, msg, update)).is_some() + Some(log::info!("{:?} msg skipped - {}", self.0.timeline, msg)).is_some() + }; match self.0.timeline { tl if tl.is_public()