Fix `channel full` codepath

A bug was causing the buffer index to not properly update when
handling full channels; this is now fixed.
This commit is contained in:
Daniel Sockwell 2020-04-29 16:58:09 -04:00
parent 2f07cf7d16
commit 000deaa9a6
3 changed files with 9 additions and 5 deletions

2
Cargo.lock generated
View File

@ -416,7 +416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "flodgatt" name = "flodgatt"
version = "0.9.7" version = "0.9.8"
dependencies = [ dependencies = [
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,7 +1,7 @@
[package] [package]
name = "flodgatt" name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.9.7" version = "0.9.8"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"] authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018" edition = "2018"

View File

@ -70,15 +70,18 @@ impl Stream for Manager {
} }
} }
unread = msg.leftover_input; unread = msg.leftover_input;
self.unread_idx.0 = self.unread_idx.1 - unread.len();
}
Ok(NonMsg(leftover_input)) => {
unread = leftover_input;
self.unread_idx.0 = self.unread_idx.1 - unread.len();
} }
Ok(NonMsg(leftover_input)) => unread = leftover_input,
Err(RedisParseErr::Incomplete) => { Err(RedisParseErr::Incomplete) => {
self.copy_partial_msg(); self.copy_partial_msg();
unread = ""; break;
} }
Err(e) => Err(Error::RedisParseErr(e, unread.to_string()))?, Err(e) => Err(Error::RedisParseErr(e, unread.to_string()))?,
}; };
self.unread_idx.0 = self.unread_idx.1 - unread.len();
} }
if self.unread_idx.0 == self.unread_idx.1 { if self.unread_idx.0 == self.unread_idx.1 {
self.unread_idx = (0, 0) self.unread_idx = (0, 0)
@ -104,6 +107,7 @@ impl Manager {
self.redis_conn.input = self.redis_conn.input[self.unread_idx.0..].into(); self.redis_conn.input = self.redis_conn.input[self.unread_idx.0..].into();
} }
self.unread_idx = (0, self.unread_idx.1 - self.unread_idx.0); self.unread_idx = (0, self.unread_idx.1 - self.unread_idx.0);
dbg!(&self.unread_idx);
} }
/// Create a new `Manager`, with its own Redis connections (but no active subscriptions). /// Create a new `Manager`, with its own Redis connections (but no active subscriptions).
pub fn try_from(redis_cfg: &config::Redis) -> Result<Self> { pub fn try_from(redis_cfg: &config::Redis) -> Result<Self> {