Implement faster buffered input

This commit implements a modified ring buffer for input from Redis.
Specifically, Flodgatt now limits the amount of data it fetches from
Redis in one syscall to 8 KiB (two pages on most systems). Flodgatt
will process all complete messages it receives from Redis and then
re-use the same buffer for the next time it retrieves data.  If
Flodgatt received a partial message, it will copy the partial message
to the beginning of the buffer before its next read.

This change has little effect on Flodgatt under light load (because it
was rare for Redis to have more than 8 KiB of messages available at
any one time).  However, my hope is that this will significantly
reduce memory use on the largest instances.
This commit is contained in:
Daniel Sockwell 2020-04-25 16:40:37 -04:00
parent d8b07b4b03
commit 3782ab13a3
12 changed files with 250 additions and 234 deletions

2
Cargo.lock generated
View File

@ -416,7 +416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "flodgatt" name = "flodgatt"
version = "0.9.4" version = "0.9.5"
dependencies = [ dependencies = [
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,7 +1,7 @@
[package] [package]
name = "flodgatt" name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.9.4" version = "0.9.5"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"] authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018" edition = "2018"

View File

@ -1,9 +1,5 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion}; use criterion::{black_box, criterion_group, criterion_main, Criterion};
use flodgatt::{ use flodgatt::response::{RedisMsg, RedisParseOutput};
event::*,
request::{Content::*, Reach::*, Stream::*, Timeline},
response::{RedisMsg, RedisParseOutput},
};
use lru::LruCache; use lru::LruCache;
use std::convert::TryFrom; use std::convert::TryFrom;
@ -16,27 +12,27 @@ fn parse_long_redis_input<'a>(input: &'a str) -> RedisMsg<'a> {
} }
} }
fn parse_to_timeline(msg: RedisMsg) -> Timeline { // fn parse_to_timeline(msg: RedisMsg) -> Timeline {
let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..]; // let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..];
let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap(); // let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap();
assert_eq!(tl, Timeline(User(Id(1)), Federated, All)); // assert_eq!(tl, Timeline(User(Id(1)), Federated, All));
tl // tl
} // }
fn parse_to_checked_event(msg: RedisMsg) -> EventKind { // fn parse_to_checked_event(msg: RedisMsg) -> EventKind {
EventKind::TypeSafe(serde_json::from_str(msg.event_txt).unwrap()) // EventKind::TypeSafe(serde_json::from_str(msg.event_txt).unwrap())
} // }
fn parse_to_dyn_event(msg: RedisMsg) -> EventKind { // fn parse_to_dyn_event(msg: RedisMsg) -> EventKind {
EventKind::Dynamic(serde_json::from_str(msg.event_txt).unwrap()) // EventKind::Dynamic(serde_json::from_str(msg.event_txt).unwrap())
} // }
fn redis_msg_to_event_string(msg: RedisMsg) -> String { // fn redis_msg_to_event_string(msg: RedisMsg) -> String {
msg.event_txt.to_string() // msg.event_txt.to_string()
} // }
fn string_to_checked_event(event_txt: &String) -> EventKind { // fn string_to_checked_event(event_txt: &String) -> EventKind {
EventKind::TypeSafe(serde_json::from_str(event_txt).unwrap()) // EventKind::TypeSafe(serde_json::from_str(event_txt).unwrap())
} // }
fn criterion_benchmark(c: &mut Criterion) { fn criterion_benchmark(c: &mut Criterion) {
let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS; let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS;
@ -46,25 +42,25 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| black_box(parse_long_redis_input(input))) b.iter(|| black_box(parse_long_redis_input(input)))
}); });
let msg = parse_long_redis_input(input); // let msg = parse_long_redis_input(input);
group.bench_function("parse RedisMsg to Timeline", |b| { // group.bench_function("parse RedisMsg to Timeline", |b| {
b.iter(|| black_box(parse_to_timeline(msg.clone()))) // b.iter(|| black_box(parse_to_timeline(msg.clone())))
}); // });
group.bench_function("parse RedisMsg -> DynamicEvent", |b| { // group.bench_function("parse RedisMsg -> DynamicEvent", |b| {
b.iter(|| black_box(parse_to_dyn_event(msg.clone()))) // b.iter(|| black_box(parse_to_dyn_event(msg.clone())))
}); // });
group.bench_function("parse RedisMsg -> CheckedEvent", |b| { // group.bench_function("parse RedisMsg -> CheckedEvent", |b| {
b.iter(|| black_box(parse_to_checked_event(msg.clone()))) // b.iter(|| black_box(parse_to_checked_event(msg.clone())))
}); // });
group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| { // group.bench_function("parse RedisMsg -> String -> CheckedEvent", |b| {
b.iter(|| { // b.iter(|| {
let txt = black_box(redis_msg_to_event_string(msg.clone())); // let txt = black_box(redis_msg_to_event_string(msg.clone()));
black_box(string_to_checked_event(&txt)); // black_box(string_to_checked_event(&txt));
}) // })
}); // });
} }
criterion_group!(benches, criterion_benchmark); criterion_group!(benches, criterion_benchmark);

View File

@ -15,7 +15,7 @@ pub enum Error {
impl Error { impl Error {
pub fn log(msg: impl fmt::Display) { pub fn log(msg: impl fmt::Display) {
eprintln!("{}", msg); eprintln!("Error: {}", msg);
} }
} }

View File

@ -35,7 +35,6 @@ impl Timeline {
} }
pub(crate) fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> Result<String> { pub(crate) fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> Result<String> {
// TODO -- does this need to account for namespaces?
use {Content::*, Error::*, Reach::*, Stream::*}; use {Content::*, Error::*, Reach::*, Stream::*};
Ok(match self { Ok(match self {

View File

@ -14,4 +14,4 @@ mod stream;
pub use redis::Error; pub use redis::Error;
#[cfg(feature = "bench")] #[cfg(feature = "bench")]
pub use redis::msg::{RedisMsg, RedisParseOutput}; pub use redis::{RedisMsg, RedisParseOutput};

View File

@ -7,6 +7,9 @@ pub(self) use connection::RedisConn;
pub use manager::Error; pub use manager::Error;
pub use manager::Manager; pub use manager::Manager;
#[cfg(feature = "bench")]
pub use msg::{RedisMsg, RedisParseOutput};
use connection::RedisConnErr; use connection::RedisConnErr;
use msg::RedisParseErr; use msg::RedisParseErr;
@ -16,44 +19,50 @@ enum RedisCmd {
} }
impl RedisCmd { impl RedisCmd {
fn into_sendable(self, tl: &str) -> (Vec<u8>, Vec<u8>) { fn into_sendable(self, timelines: &[String]) -> (Vec<u8>, Vec<u8>) {
match self { match self {
RedisCmd::Subscribe => ( RedisCmd::Subscribe => {
[ let primary = {
b"*2\r\n$9\r\nsubscribe\r\n$", let mut cmd = format!("*{}\r\n$9\r\nsubscribe\r\n", 1 + timelines.len());
tl.len().to_string().as_bytes(), for tl in timelines {
b"\r\n", cmd.push_str(&format!("${}\r\n{}\r\n", tl.len(), tl));
tl.as_bytes(), }
b"\r\n", cmd
] };
.concat(), let secondary = {
[ let mut cmd = format!("*{}\r\n$4\r\nMSET\r\n", 1 + timelines.len());
b"*3\r\n$3\r\nSET\r\n$", for tl in timelines {
(tl.len() + "subscribed:".len()).to_string().as_bytes(), cmd.push_str(&format!(
b"\r\nsubscribed:", "${}\r\nsubscribed:{}\r\n$1\r\n$1\r\n",
tl.to_string().as_bytes(), "subscribed:".len() + tl.len(),
b"\r\n$1\r\n1\r\n", tl
] ));
.concat(), }
), cmd
RedisCmd::Unsubscribe => ( };
[ (primary.as_bytes().to_vec(), secondary.as_bytes().to_vec())
b"*2\r\n$11\r\nunsubscribe\r\n$", }
tl.len().to_string().as_bytes(), RedisCmd::Unsubscribe => {
b"\r\n", let primary = {
tl.as_bytes(), let mut cmd = format!("*{}\r\n$11\r\nunsubscribe\r\n", 1 + timelines.len());
b"\r\n", for tl in timelines {
] cmd.push_str(&format!("${}\r\n{}\r\n", tl.len(), tl));
.concat(), }
[ cmd
b"*3\r\n$3\r\nSET\r\n$", };
(tl.len() + "subscribed:".len()).to_string().as_bytes(), let secondary = {
b"\r\nsubscribed:", let mut cmd = format!("*{}\r\n$4\r\nMSET\r\n", 1 + timelines.len());
tl.to_string().as_bytes(), for tl in timelines {
b"\r\n$1\r\n0\r\n", cmd.push_str(&format!(
] "${}\r\nsubscribed:{}\r\n$1\r\n$0\r\n",
.concat(), "subscribed:".len() + tl.len(),
), tl
));
}
cmd
};
(primary.as_bytes().to_vec(), secondary.as_bytes().to_vec())
}
} }
} }
} }

View File

@ -1,19 +1,15 @@
mod err; mod err;
pub(crate) use err::RedisConnErr; pub(crate) use err::RedisConnErr;
use super::msg::{RedisParseErr, RedisParseOutput};
use super::Error as ManagerErr; use super::Error as ManagerErr;
use super::Event;
use super::RedisCmd; use super::RedisCmd;
use crate::config::Redis; use crate::config::Redis;
use crate::request::Timeline; use crate::request::Timeline;
use futures::{Async, Poll}; use futures::{Async, Poll};
use lru::LruCache; use lru::LruCache;
use std::convert::{TryFrom, TryInto};
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::net::TcpStream; use std::net::TcpStream;
use std::str;
use std::time::Duration; use std::time::Duration;
type Result<T> = std::result::Result<T, RedisConnErr>; type Result<T> = std::result::Result<T, RedisConnErr>;
@ -22,11 +18,13 @@ type Result<T> = std::result::Result<T, RedisConnErr>;
pub(super) struct RedisConn { pub(super) struct RedisConn {
primary: TcpStream, primary: TcpStream,
secondary: TcpStream, secondary: TcpStream,
redis_namespace: Option<String>, pub(super) namespace: Option<String>,
tag_id_cache: LruCache<String, i64>, // TODO: eventually, it might make sense to have Mastodon publish to timelines with
// the tag number instead of the tag name. This would save us from dealing
// with a cache here and would be consistent with how lists/users are handled.
pub(super) tag_id_cache: LruCache<String, i64>,
tag_name_cache: LruCache<i64, String>, tag_name_cache: LruCache<i64, String>,
redis_input: Vec<u8>, pub(super) input: Vec<u8>,
cursor: usize,
} }
impl RedisConn { impl RedisConn {
@ -36,79 +34,31 @@ impl RedisConn {
let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?; let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?;
conn.set_nonblocking(true) conn.set_nonblocking(true)
.map_err(|e| RedisConnErr::with_addr(&addr, e))?; .map_err(|e| RedisConnErr::with_addr(&addr, e))?;
let redis_conn = Self { Ok(Self {
primary: conn, primary: conn,
secondary: Self::new_connection(&addr, redis_cfg.password.as_ref())?, secondary: Self::new_connection(&addr, redis_cfg.password.as_ref())?,
tag_id_cache: LruCache::new(1000), tag_id_cache: LruCache::new(1000),
tag_name_cache: LruCache::new(1000), tag_name_cache: LruCache::new(1000),
// TODO: eventually, it might make sense to have Mastodon publish to timelines with namespace: redis_cfg.namespace.clone().0,
// the tag number instead of the tag name. This would save us from dealing input: vec![47_u8; 10_000], // TODO - set to something reasonable
// with a cache here and would be consistent with how lists/users are handled. })
redis_namespace: redis_cfg.namespace.clone().0,
redis_input: vec![0_u8; 5000],
cursor: 0,
};
Ok(redis_conn)
} }
pub(super) fn poll_redis(&mut self, start: usize) -> Poll<usize, ManagerErr> {
pub(super) fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, ManagerErr> { const BLOCK: usize = 8192;
loop { if self.input.len() <= start + BLOCK {
match self.primary.read(&mut self.redis_input[self.cursor..]) { self.input.resize(self.input.len() * 2, 0);
Ok(n) => { log::info!("Resizing input buffer. (Old input was {} bytes)", start);
self.cursor += n;
if self.redis_input.len() - 1 == self.cursor {
self.redis_input.resize(self.redis_input.len() * 2, 0);
} else {
break;
}
}
Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => {
break;
}
Err(e) => break log::error!("{}", e),
};
} }
// at this point, we have the raw bytes; now, parse a msg use Async::*;
let input = &self.redis_input[..self.cursor]; match self.primary.read(&mut self.input[start..start + BLOCK]) {
Ok(n) => Ok(Ready(n)),
let (input, invalid_bytes) = str::from_utf8(&input) Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => Ok(NotReady),
.map(|input| (input, "".as_bytes())) Err(e) => {
.unwrap_or_else(|e| { Ready(log::error!("{}", e));
let (valid, invalid) = input.split_at(e.valid_up_to()); Ok(Ready(0))
(str::from_utf8(valid).expect("Guaranteed by ^^^^"), invalid) }
});
use {Async::*, RedisParseOutput::*};
let (res, leftover) = match RedisParseOutput::try_from(input) {
Ok(Msg(msg)) => match &self.redis_namespace {
Some(ns) if msg.timeline_txt.starts_with(&[ns, ":timeline:"].concat()) => {
let trimmed_tl = &msg.timeline_txt[ns.len() + ":timeline:".len()..];
let tl = Timeline::from_redis_text(trimmed_tl, &mut self.tag_id_cache)?;
let event = msg.event_txt.try_into()?;
(Ok(Ready(Some((tl, event)))), (msg.leftover_input))
}
None => {
let trimmed_tl = &msg.timeline_txt["timeline:".len()..];
let tl = Timeline::from_redis_text(trimmed_tl, &mut self.tag_id_cache)?;
let event = msg.event_txt.try_into()?;
(Ok(Ready(Some((tl, event)))), (msg.leftover_input))
}
Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input),
},
Ok(NonMsg(leftover)) => (Ok(Ready(None)), leftover),
Err(RedisParseErr::Incomplete) => (Ok(NotReady), input),
Err(other_parse_err) => (Err(ManagerErr::RedisParseErr(other_parse_err)), input),
};
// Store leftover in same buffer and set cursor to start after leftover next time
self.cursor = 0;
for byte in &[leftover.as_bytes(), invalid_bytes].concat() {
self.redis_input[self.cursor] = *byte;
self.cursor += 1;
} }
res
} }
pub(super) fn update_cache(&mut self, hashtag: String, id: i64) { pub(super) fn update_cache(&mut self, hashtag: String, id: i64) {
@ -116,15 +66,20 @@ impl RedisConn {
self.tag_name_cache.put(id, hashtag); self.tag_name_cache.put(id, hashtag);
} }
pub(crate) fn send_cmd(&mut self, cmd: RedisCmd, timeline: &Timeline) -> Result<()> { pub(crate) fn send_cmd(&mut self, cmd: RedisCmd, timelines: &[Timeline]) -> Result<()> {
let namespace = self.redis_namespace.take(); let namespace = self.namespace.take();
let hashtag = timeline.tag().and_then(|id| self.tag_name_cache.get(&id)); let timelines: Result<Vec<String>> = timelines
let tl = match &namespace { .iter()
Some(ns) => format!("{}:{}", ns, timeline.to_redis_raw_timeline(hashtag)?), .map(|tl| {
None => timeline.to_redis_raw_timeline(hashtag)?, let hashtag = tl.tag().and_then(|id| self.tag_name_cache.get(&id));
}; match &namespace {
Some(ns) => Ok(format!("{}:{}", ns, tl.to_redis_raw_timeline(hashtag)?)),
None => Ok(tl.to_redis_raw_timeline(hashtag)?),
}
})
.collect();
let (primary_cmd, secondary_cmd) = cmd.into_sendable(&tl); let (primary_cmd, secondary_cmd) = cmd.into_sendable(&timelines?[..]);
self.primary.write_all(&primary_cmd)?; self.primary.write_all(&primary_cmd)?;
// We also need to set a key to tell the Puma server that we've subscribed or // We also need to set a key to tell the Puma server that we've subscribed or
@ -145,6 +100,7 @@ impl RedisConn {
Self::validate_connection(&mut conn, &addr)?; Self::validate_connection(&mut conn, &addr)?;
conn.set_read_timeout(Some(Duration::from_millis(10))) conn.set_read_timeout(Some(Duration::from_millis(10)))
.map_err(|e| RedisConnErr::with_addr(&addr, e))?; .map_err(|e| RedisConnErr::with_addr(&addr, e))?;
Self::set_connection_name(&mut conn, &addr)?;
Ok(conn) Ok(conn)
} }
@ -172,14 +128,27 @@ impl RedisConn {
fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<()> { fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<()> {
conn.write_all(b"PING\r\n") conn.write_all(b"PING\r\n")
.map_err(|e| RedisConnErr::with_addr(&addr, e))?; .map_err(|e| RedisConnErr::with_addr(&addr, e))?;
let mut buffer = vec![0_u8; 7]; let mut buffer = vec![0_u8; 100];
conn.read_exact(&mut buffer) conn.read(&mut buffer)
.map_err(|e| RedisConnErr::with_addr(&addr, e))?; .map_err(|e| RedisConnErr::with_addr(&addr, e))?;
let reply = String::from_utf8_lossy(&buffer); let reply = String::from_utf8_lossy(&buffer);
match &*reply { match &*reply {
"+PONG\r\n" => Ok(()), r if r.starts_with("+PONG\r\n") => Ok(()),
"-NOAUTH" => Err(RedisConnErr::MissingPassword), r if r.starts_with("-NOAUTH") => Err(RedisConnErr::MissingPassword),
"HTTP/1." => Err(RedisConnErr::NotRedis(addr.to_string())), r if r.starts_with("HTTP/1.") => Err(RedisConnErr::NotRedis(addr.to_string())),
_ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())),
}
}
fn set_connection_name(conn: &mut TcpStream, addr: &str) -> Result<()> {
conn.write_all(b"*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$8\r\nflodgatt\r\n")
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
let mut buffer = vec![0_u8; 100];
conn.read(&mut buffer)
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
let reply = String::from_utf8_lossy(&buffer);
match &*reply {
r if r.starts_with("+OK\r\n") => Ok(()),
_ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())),
} }
} }

View File

@ -31,7 +31,7 @@ impl fmt::Display for RedisConnErr {
addr, inner addr, inner
), ),
InvalidRedisReply(unexpected_reply) => format!( InvalidRedisReply(unexpected_reply) => format!(
"Received and unexpected reply from Redis. Expected `+PONG` reply but got `{}`", "Received and unexpected reply from Redis: `{}`",
unexpected_reply unexpected_reply
), ),
UnknownRedisErr(io_err) => { UnknownRedisErr(io_err) => {

View File

@ -4,8 +4,8 @@
mod err; mod err;
pub use err::Error; pub use err::Error;
use super::Event; use super::msg::{RedisParseErr, RedisParseOutput};
use super::{RedisCmd, RedisConn}; use super::{Event, RedisCmd, RedisConn};
use crate::config; use crate::config;
use crate::request::{Subscription, Timeline}; use crate::request::{Subscription, Timeline};
@ -13,16 +13,19 @@ pub(self) use super::EventErr;
use futures::Async; use futures::Async;
use hashbrown::{HashMap, HashSet}; use hashbrown::{HashMap, HashSet};
use std::convert::{TryFrom, TryInto};
use std::str;
use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
type Result<T> = std::result::Result<T, Error>; type Result<T> = std::result::Result<T, Error>;
type EventChannel = Sender<Arc<Event>>;
/// The item that streams from Redis and is polled by the `ClientAgent` /// The item that streams from Redis and is polled by the `ClientAgent`
pub struct Manager { pub struct Manager {
redis_connection: RedisConn, redis_conn: RedisConn,
timelines: HashMap<Timeline, HashMap<u32, Sender<Arc<Event>>>>, timelines: HashMap<Timeline, HashMap<u32, EventChannel>>,
ping_time: Instant, ping_time: Instant,
channel_id: u32, channel_id: u32,
} }
@ -31,7 +34,7 @@ impl Manager {
/// Create a new `Manager`, with its own Redis connections (but no active subscriptions). /// Create a new `Manager`, with its own Redis connections (but no active subscriptions).
pub fn try_from(redis_cfg: &config::Redis) -> Result<Self> { pub fn try_from(redis_cfg: &config::Redis) -> Result<Self> {
Ok(Self { Ok(Self {
redis_connection: RedisConn::new(redis_cfg)?, redis_conn: RedisConn::new(redis_cfg)?,
timelines: HashMap::new(), timelines: HashMap::new(),
ping_time: Instant::now(), ping_time: Instant::now(),
channel_id: 0, channel_id: 0,
@ -42,10 +45,10 @@ impl Manager {
Arc::new(Mutex::new(self)) Arc::new(Mutex::new(self))
} }
pub fn subscribe(&mut self, subscription: &Subscription, channel: Sender<Arc<Event>>) { pub fn subscribe(&mut self, subscription: &Subscription, channel: EventChannel) {
let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline);
if let (Some(hashtag), Some(id)) = (tag, tl.tag()) { if let (Some(hashtag), Some(id)) = (tag, tl.tag()) {
self.redis_connection.update_cache(hashtag, id); self.redis_conn.update_cache(hashtag, id);
}; };
let channels = self.timelines.entry(tl).or_default(); let channels = self.timelines.entry(tl).or_default();
@ -53,67 +56,99 @@ impl Manager {
self.channel_id += 1; self.channel_id += 1;
if channels.len() == 1 { if channels.len() == 1 {
self.redis_connection self.redis_conn
.send_cmd(RedisCmd::Subscribe, &tl) .send_cmd(RedisCmd::Subscribe, &[tl])
.unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e)); .unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e));
log::info!("Subscribed to {:?}", tl);
}; };
} }
pub(crate) fn unsubscribe(&mut self, tl: &Timeline) -> Result<()> { fn send_pings(&mut self) -> Result<()> {
self.redis_connection.send_cmd(RedisCmd::Unsubscribe, &tl)?; // NOTE: this takes two cycles to close a connection after the client times out: on
self.timelines.remove(&tl); // the first cycle, this successfully sends the Event to the response::Ws thread but
Ok(log::info!("Ended stream for {:?}", tl)) // that thread fatally errors sending to the client. On the *second* cycle, this
// gets the error. This isn't ideal, but is harmless.
self.ping_time = Instant::now();
let mut subscriptions_to_close = HashSet::new();
self.timelines.retain(|tl, channels| {
channels.retain(|_, chan| try_send_event(Arc::new(Event::Ping), chan, *tl).is_ok());
if channels.is_empty() {
subscriptions_to_close.insert(*tl);
false
} else {
true
}
});
if !subscriptions_to_close.is_empty() {
let timelines: Vec<_> = subscriptions_to_close.into_iter().collect();
&self
.redis_conn
.send_cmd(RedisCmd::Unsubscribe, &timelines[..])?;
log::info!("Unsubscribed from {:?}", timelines);
}
Ok(())
} }
pub fn poll_broadcast(&mut self) -> Result<()> { pub fn poll_broadcast(&mut self) -> Result<()> {
let mut completed_timelines = HashSet::new();
let log_send_err = |tl, e| Some(log::error!("cannot send to {:?}: {}", tl, e)).is_some();
if self.ping_time.elapsed() > Duration::from_secs(30) { if self.ping_time.elapsed() > Duration::from_secs(30) {
self.ping_time = Instant::now(); self.send_pings()?
for (tl, channels) in self.timelines.iter_mut() { }
channels.retain(|_, chan| match chan.try_send(Arc::new(Event::Ping)) {
Ok(()) => true,
Err(e) if !e.is_closed() => log_send_err(*tl, e),
Err(_) => false,
});
// NOTE: this takes two cycles to close a connection after the client let (mut unread_start, mut msg_end) = (0, 0);
// times out: on the first cycle, this fn sends the Event to the
// response::Ws thread without any error, but that thread encounters an
// error sending to the client and ends. On the *second* cycle, this fn
// gets the error it's waiting on to clean up the connection. This isn't
// ideal, but is harmless, since the only reason we haven't cleaned up the
// connection is that no messages are being sent to that client.
if channels.is_empty() {
completed_timelines.insert(*tl);
}
}
};
loop { while let Async::Ready(msg_len) = self.redis_conn.poll_redis(msg_end)? {
match self.redis_connection.poll_redis() { msg_end += msg_len;
Ok(Async::NotReady) => break, let input = &self.redis_conn.input[..msg_end];
Ok(Async::Ready(Some((tl, event)))) => { let mut unread = str::from_utf8(input).unwrap_or_else(|e| {
let sendable_event = Arc::new(event); str::from_utf8(input.split_at(e.valid_up_to()).0).expect("guaranteed by `split_at`")
let channels = self.timelines.get_mut(&tl).ok_or(Error::InvalidId)?; });
channels.retain(|_, chan| match chan.try_send(sendable_event.clone()) {
Ok(()) => true, while !unread.is_empty() {
Err(e) if !e.is_closed() => log_send_err(tl, e), let tag_id_cache = &mut self.redis_conn.tag_id_cache;
Err(_) => false, let redis_namespace = &self.redis_conn.namespace;
});
if channels.is_empty() { use {Error::InvalidId, RedisParseOutput::*};
completed_timelines.insert(tl); unread = match RedisParseOutput::try_from(unread) {
Ok(Msg(msg)) => {
let trimmed_tl = match redis_namespace {
Some(ns) if msg.timeline_txt.starts_with(ns) => {
Some(&msg.timeline_txt[ns.len() + ":timeline:".len()..])
}
None => Some(&msg.timeline_txt["timeline:".len()..]),
Some(_non_matching_ns) => None,
};
if let Some(trimmed_tl) = trimmed_tl {
let tl = Timeline::from_redis_text(trimmed_tl, tag_id_cache)?;
let event: Arc<Event> = Arc::new(msg.event_txt.try_into()?);
let channels = self.timelines.get_mut(&tl).ok_or(InvalidId)?;
channels.retain(|_, c| try_send_event(event.clone(), c, tl).is_ok());
} else {
// skip messages for different Redis namespaces
}
msg.leftover_input
} }
Ok(NonMsg(leftover_input)) => leftover_input,
Err(RedisParseErr::Incomplete) => break,
Err(e) => Err(e)?,
};
unread_start = msg_end - unread.len();
}
if !unread.is_empty() && unread_start > unread.len() {
log::info!("Re-using memory");
let (read, unread) = self.redis_conn.input[..msg_end].split_at_mut(unread_start);
for (i, b) in unread.iter().enumerate() {
read[i] = *b;
} }
Ok(Async::Ready(None)) => (), // cmd or msg for other namespace msg_end = unread.len();
Err(err) => log::error!("{}", err), // drop msg, log err, and proceed unread_start = 0;
} }
} }
for tl in &mut completed_timelines.iter() {
self.unsubscribe(tl)?;
}
Ok(()) Ok(())
} }
@ -146,3 +181,14 @@ impl Manager {
.collect() .collect()
} }
} }
fn try_send_event(event: Arc<Event>, chan: &mut EventChannel, tl: Timeline) -> Result<()> {
match chan.try_send(event) {
Ok(()) => Ok(()),
Err(e) if !e.is_closed() => {
log::error!("cannot send to {:?}: {}", tl, e);
Ok(())
}
Err(e) => Err(e)?,
}
}

View File

@ -1,18 +1,18 @@
use super::super::{RedisConnErr, RedisParseErr}; use super::super::{RedisConnErr, RedisParseErr};
use super::{Event, EventErr}; use super::{Event, EventErr};
use crate::request::{Timeline, TimelineErr}; use crate::request::TimelineErr;
use std::fmt; use std::fmt;
use std::sync::Arc;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
InvalidId, InvalidId,
TimelineErr(TimelineErr), TimelineErr(TimelineErr),
EventErr(EventErr), EventErr(EventErr),
RedisParseErr(RedisParseErr), RedisParseErr(RedisParseErr),
RedisConnErr(RedisConnErr), RedisConnErr(RedisConnErr),
ChannelSendErr(tokio::sync::watch::error::SendError<(Timeline, Event)>), ChannelSendErr(tokio::sync::mpsc::error::TrySendError<Arc<Event>>),
ChannelSendErr2(tokio::sync::mpsc::error::UnboundedTrySendError<Event>),
} }
impl std::error::Error for Error {} impl std::error::Error for Error {}
@ -30,22 +30,16 @@ impl fmt::Display for Error {
RedisConnErr(inner) => write!(f, "{}", inner), RedisConnErr(inner) => write!(f, "{}", inner),
TimelineErr(inner) => write!(f, "{}", inner), TimelineErr(inner) => write!(f, "{}", inner),
ChannelSendErr(inner) => write!(f, "{}", inner), ChannelSendErr(inner) => write!(f, "{}", inner),
ChannelSendErr2(inner) => write!(f, "{}", inner),
}?; }?;
Ok(()) Ok(())
} }
} }
impl From<tokio::sync::watch::error::SendError<(Timeline, Event)>> for Error { impl From<tokio::sync::mpsc::error::TrySendError<Arc<Event>>> for Error {
fn from(error: tokio::sync::watch::error::SendError<(Timeline, Event)>) -> Self { fn from(error: tokio::sync::mpsc::error::TrySendError<Arc<Event>>) -> Self {
Self::ChannelSendErr(error) Self::ChannelSendErr(error)
} }
} }
impl From<tokio::sync::mpsc::error::UnboundedTrySendError<Event>> for Error {
fn from(error: tokio::sync::mpsc::error::UnboundedTrySendError<Event>) -> Self {
Self::ChannelSendErr2(error)
}
}
impl From<EventErr> for Error { impl From<EventErr> for Error {
fn from(error: EventErr) -> Self { fn from(error: EventErr) -> Self {

View File

@ -82,8 +82,11 @@ fn utf8_to_redis_data<'a>(s: &'a str) -> Result<(RedisData, &'a str), RedisParse
fn after_newline_at(s: &str, start: usize) -> RedisParser<&str> { fn after_newline_at(s: &str, start: usize) -> RedisParser<&str> {
let s = s.get(start..).ok_or(Incomplete)?; let s = s.get(start..).ok_or(Incomplete)?;
if s.len() < 2 {
Err(Incomplete)?;
}
if !s.starts_with("\r\n") { if !s.starts_with("\r\n") {
return Err(RedisParseErr::InvalidLineEnd); Err(InvalidLineEnd)?;
} }
Ok(s.get("\r\n".len()..).ok_or(Incomplete)?) Ok(s.get("\r\n".len()..).ok_or(Incomplete)?)
} }