From 66553408fbe98684f6afa16ccc611d880a3f00a1 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Tue, 28 Apr 2020 12:47:14 -0400 Subject: [PATCH] Fix edge case bug for long messages (#144) --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/response/redis/manager.rs | 66 ++++++++++++++++--------------- src/response/redis/manager/err.rs | 14 +++---- 4 files changed, 44 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86deab7..ec3f6cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,7 +416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.9.6" +version = "0.9.7" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index abda086..f13c9ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.9.6" +version = "0.9.7" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/response/redis/manager.rs b/src/response/redis/manager.rs index ceaf243..8a4f9b6 100644 --- a/src/response/redis/manager.rs +++ b/src/response/redis/manager.rs @@ -41,20 +41,19 @@ impl Stream for Manager { } while let Async::Ready(msg_len) = self.redis_conn.poll_redis(self.unread_idx.1)? { - self.unread_idx.1 += msg_len; + self.unread_idx = (0, self.unread_idx.1 + msg_len); + let input = &self.redis_conn.input[..self.unread_idx.1]; let mut unread = str::from_utf8(input).unwrap_or_else(|e| { str::from_utf8(input.split_at(e.valid_up_to()).0).expect("guaranteed by `split_at`") }); - + let tag_id_cache = &mut self.redis_conn.tag_id_cache; + let redis_namespace = &self.redis_conn.namespace; while !unread.is_empty() { - let tag_id_cache = &mut self.redis_conn.tag_id_cache; - let redis_namespace = &self.redis_conn.namespace; - - use {Error::InvalidId, RedisParseOutput::*}; - unread = match RedisParseOutput::try_from(unread) { + use RedisParseOutput::*; + match RedisParseOutput::try_from(unread) { Ok(Msg(msg)) => { - let trimmed_tl = match redis_namespace { + let tl_matching_ns = match redis_namespace { Some(ns) if msg.timeline_txt.starts_with(ns) => { Some(&msg.timeline_txt[ns.len() + ":timeline:".len()..]) } @@ -62,39 +61,44 @@ impl Stream for Manager { Some(_non_matching_ns) => None, }; - if let Some(trimmed_tl) = trimmed_tl { + if let Some(trimmed_tl) = tl_matching_ns { let tl = Timeline::from_redis_text(trimmed_tl, tag_id_cache)?; let event: Arc = Arc::new(msg.event_txt.try_into()?); - let channels = self.timelines.get_mut(&tl).ok_or(InvalidId)?; - for (_id, channel) in channels { - if let Ok(Async::NotReady) = channel.poll_ready() { - log::warn!("{:?} channel full", tl); - return Ok(Async::NotReady); + if let Some(channels) = self.timelines.get_mut(&tl) { + for (_id, channel) in channels { + if let Ok(Async::NotReady) = channel.poll_ready() { + log::warn!("{:?} channel full", tl); + return Ok(Async::NotReady); + } + let _ = channel.try_send(event.clone()); // err just means channel will be closed } - let _ = channel.try_send(event.clone()); // err just means channel will be closed + } + } + unread = msg.leftover_input; + } + Ok(NonMsg(leftover_input)) => unread = leftover_input, + Err(RedisParseErr::Incomplete) => { + if self.unread_idx.0 == 0 { + // msg already first; no copying needed + } else if self.unread_idx.0 >= (self.unread_idx.1 - self.unread_idx.0) { + let (read, unread) = self.redis_conn.input[..self.unread_idx.1] + .split_at_mut(self.unread_idx.0); + for (i, b) in unread.iter().enumerate() { + read[i] = *b; } } else { - // skip messages for different Redis namespaces + // Less efficient, but should never occur in production + log::warn!("Moving partial input requires heap allocation"); + self.redis_conn.input = + self.redis_conn.input[self.unread_idx.0..].into(); } - msg.leftover_input + self.unread_idx = (0, self.unread_idx.1 - self.unread_idx.0); + unread = ""; } - Ok(NonMsg(leftover_input)) => leftover_input, - Err(RedisParseErr::Incomplete) => { - log::info!("Copying partial message"); - let (read, unread) = self.redis_conn.input[..self.unread_idx.1] - .split_at_mut(self.unread_idx.0); - for (i, b) in unread.iter().enumerate() { - read[i] = *b; - } - self.unread_idx = (0, unread.len()); - break; - } - Err(e) => Err(e)?, + Err(e) => Err(Error::RedisParseErr(e, unread.to_string()))?, }; self.unread_idx.0 = self.unread_idx.1 - unread.len(); } - - self.unread_idx = (0, 0) // reaching here means last msg was complete; reuse the full buffer } Ok(Async::Ready(Some(()))) } diff --git a/src/response/redis/manager/err.rs b/src/response/redis/manager/err.rs index b4dc064..3e247a7 100644 --- a/src/response/redis/manager/err.rs +++ b/src/response/redis/manager/err.rs @@ -10,7 +10,7 @@ pub enum Error { InvalidId, TimelineErr(TimelineErr), EventErr(EventErr), - RedisParseErr(RedisParseErr), + RedisParseErr(RedisParseErr, String), RedisConnErr(RedisConnErr), ChannelSendErr(tokio::sync::mpsc::error::TrySendError>), } @@ -26,7 +26,7 @@ impl fmt::Display for Error { "tried to access a timeline/channel subscription that does not exist" ), EventErr(inner) => write!(f, "{}", inner), - RedisParseErr(inner) => write!(f, "{}", inner), + RedisParseErr(inner, input) => write!(f, "error parsing {}\n{}", input, inner), RedisConnErr(inner) => write!(f, "{}", inner), TimelineErr(inner) => write!(f, "{}", inner), ChannelSendErr(inner) => write!(f, "{}", inner), @@ -59,8 +59,8 @@ impl From for Error { } } -impl From for Error { - fn from(e: RedisParseErr) -> Self { - Self::RedisParseErr(e) - } -} +// impl From for Error { +// fn from(e: RedisParseErr) -> Self { +// Self::RedisParseErr(e) +// } +// }