Fix edge case bug for long messages (#144)

This commit is contained in:
Daniel Sockwell 2020-04-28 12:47:14 -04:00 committed by GitHub
parent 4a13412f98
commit 66553408fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 40 deletions

2
Cargo.lock generated
View File

@ -416,7 +416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "flodgatt"
version = "0.9.6"
version = "0.9.7"
dependencies = [
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,7 +1,7 @@
[package]
name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.9.6"
version = "0.9.7"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018"

View File

@ -41,20 +41,19 @@ impl Stream for Manager {
}
while let Async::Ready(msg_len) = self.redis_conn.poll_redis(self.unread_idx.1)? {
self.unread_idx.1 += msg_len;
self.unread_idx = (0, self.unread_idx.1 + msg_len);
let input = &self.redis_conn.input[..self.unread_idx.1];
let mut unread = str::from_utf8(input).unwrap_or_else(|e| {
str::from_utf8(input.split_at(e.valid_up_to()).0).expect("guaranteed by `split_at`")
});
let tag_id_cache = &mut self.redis_conn.tag_id_cache;
let redis_namespace = &self.redis_conn.namespace;
while !unread.is_empty() {
let tag_id_cache = &mut self.redis_conn.tag_id_cache;
let redis_namespace = &self.redis_conn.namespace;
use {Error::InvalidId, RedisParseOutput::*};
unread = match RedisParseOutput::try_from(unread) {
use RedisParseOutput::*;
match RedisParseOutput::try_from(unread) {
Ok(Msg(msg)) => {
let trimmed_tl = match redis_namespace {
let tl_matching_ns = match redis_namespace {
Some(ns) if msg.timeline_txt.starts_with(ns) => {
Some(&msg.timeline_txt[ns.len() + ":timeline:".len()..])
}
@ -62,39 +61,44 @@ impl Stream for Manager {
Some(_non_matching_ns) => None,
};
if let Some(trimmed_tl) = trimmed_tl {
if let Some(trimmed_tl) = tl_matching_ns {
let tl = Timeline::from_redis_text(trimmed_tl, tag_id_cache)?;
let event: Arc<Event> = Arc::new(msg.event_txt.try_into()?);
let channels = self.timelines.get_mut(&tl).ok_or(InvalidId)?;
for (_id, channel) in channels {
if let Ok(Async::NotReady) = channel.poll_ready() {
log::warn!("{:?} channel full", tl);
return Ok(Async::NotReady);
if let Some(channels) = self.timelines.get_mut(&tl) {
for (_id, channel) in channels {
if let Ok(Async::NotReady) = channel.poll_ready() {
log::warn!("{:?} channel full", tl);
return Ok(Async::NotReady);
}
let _ = channel.try_send(event.clone()); // err just means channel will be closed
}
let _ = channel.try_send(event.clone()); // err just means channel will be closed
}
}
unread = msg.leftover_input;
}
Ok(NonMsg(leftover_input)) => unread = leftover_input,
Err(RedisParseErr::Incomplete) => {
if self.unread_idx.0 == 0 {
// msg already first; no copying needed
} else if self.unread_idx.0 >= (self.unread_idx.1 - self.unread_idx.0) {
let (read, unread) = self.redis_conn.input[..self.unread_idx.1]
.split_at_mut(self.unread_idx.0);
for (i, b) in unread.iter().enumerate() {
read[i] = *b;
}
} else {
// skip messages for different Redis namespaces
// Less efficient, but should never occur in production
log::warn!("Moving partial input requires heap allocation");
self.redis_conn.input =
self.redis_conn.input[self.unread_idx.0..].into();
}
msg.leftover_input
self.unread_idx = (0, self.unread_idx.1 - self.unread_idx.0);
unread = "";
}
Ok(NonMsg(leftover_input)) => leftover_input,
Err(RedisParseErr::Incomplete) => {
log::info!("Copying partial message");
let (read, unread) = self.redis_conn.input[..self.unread_idx.1]
.split_at_mut(self.unread_idx.0);
for (i, b) in unread.iter().enumerate() {
read[i] = *b;
}
self.unread_idx = (0, unread.len());
break;
}
Err(e) => Err(e)?,
Err(e) => Err(Error::RedisParseErr(e, unread.to_string()))?,
};
self.unread_idx.0 = self.unread_idx.1 - unread.len();
}
self.unread_idx = (0, 0) // reaching here means last msg was complete; reuse the full buffer
}
Ok(Async::Ready(Some(())))
}

View File

@ -10,7 +10,7 @@ pub enum Error {
InvalidId,
TimelineErr(TimelineErr),
EventErr(EventErr),
RedisParseErr(RedisParseErr),
RedisParseErr(RedisParseErr, String),
RedisConnErr(RedisConnErr),
ChannelSendErr(tokio::sync::mpsc::error::TrySendError<Arc<Event>>),
}
@ -26,7 +26,7 @@ impl fmt::Display for Error {
"tried to access a timeline/channel subscription that does not exist"
),
EventErr(inner) => write!(f, "{}", inner),
RedisParseErr(inner) => write!(f, "{}", inner),
RedisParseErr(inner, input) => write!(f, "error parsing {}\n{}", input, inner),
RedisConnErr(inner) => write!(f, "{}", inner),
TimelineErr(inner) => write!(f, "{}", inner),
ChannelSendErr(inner) => write!(f, "{}", inner),
@ -59,8 +59,8 @@ impl From<TimelineErr> for Error {
}
}
impl From<RedisParseErr> for Error {
fn from(e: RedisParseErr) -> Self {
Self::RedisParseErr(e)
}
}
// impl From<RedisParseErr> for Error {
// fn from(e: RedisParseErr) -> Self {
// Self::RedisParseErr(e)
// }
// }