Finish Postgres changes

This commit is contained in:
Daniel Sockwell 2020-04-22 21:22:10 -04:00
parent 63a6d0ba13
commit 3d18989e9f
4 changed files with 23 additions and 17 deletions

View File

@ -20,7 +20,7 @@ from_env_var!(
from_env_var!(
/// How frequently to poll Redis
let name = RedisInterval;
let default: Duration = Duration::from_millis(100);
let default: Duration = Duration::from_millis(10);
let (env_var, allowed_values) = ("REDIS_FREQ", "a number of milliseconds");
let from_str = |s| s.parse().map(Duration::from_millis).ok();
);

View File

@ -29,9 +29,9 @@ impl RedisCmd {
.concat(),
[
b"*3\r\n$3\r\nSET\r\n$",
tl.len().to_string().as_bytes(),
b"\r\n",
tl.as_bytes(),
(tl.len() + "subscribed:".len()).to_string().as_bytes(),
b"\r\nsubscribed:",
tl.to_string().as_bytes(),
b"\r\n$1\r\n1\r\n",
]
.concat(),
@ -47,9 +47,9 @@ impl RedisCmd {
.concat(),
[
b"*3\r\n$3\r\nSET\r\n$",
tl.len().to_string().as_bytes(),
b"\r\n",
tl.as_bytes(),
(tl.len() + "subscribed:".len()).to_string().as_bytes(),
b"\r\nsubscribed:",
tl.to_string().as_bytes(),
b"\r\n$1\r\n0\r\n",
]
.concat(),

View File

@ -63,7 +63,7 @@ impl RedisConn {
}
}
Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => {
return Ok(Async::NotReady);
break;
}
Err(e) => break log::error!("{}", e),
};
@ -96,19 +96,18 @@ impl RedisConn {
}
Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input),
},
Ok(NonMsg(leftover)) => (Ok(Ready(None)), leftover),
Err(RedisParseErr::Incomplete) => (Ok(NotReady), input),
Err(other_parse_err) => (Err(ManagerErr::RedisParseErr(other_parse_err)), input),
};
self.cursor = [leftover.as_bytes(), invalid_bytes]
.concat()
.bytes()
.fold(0, |acc, cur| {
// TODO - make clearer and comment side-effect
self.redis_input[acc] = cur.expect("TODO");
acc + 1
});
// Store leftover in same buffer and set cursor to start after leftover next time
self.cursor = 0;
for byte in [leftover.as_bytes(), invalid_bytes].concat().iter() {
self.redis_input[self.cursor] = *byte;
self.cursor += 1;
}
res
}
@ -123,6 +122,12 @@ impl RedisConn {
let (primary_cmd, secondary_cmd) = cmd.into_sendable(&tl);
self.primary.write_all(&primary_cmd)?;
// We also need to set a key to tell the Puma server that we've subscribed
// or unsubscribed to the channel because it stops publishing updates when it
// thinks no one is subscribed. (Documented in [PR
// #3278](https://github.com/tootsuite/mastodon/pull/3278))
// Question: why can't the Puma server just use NUMSUB for this?
self.secondary.write_all(&secondary_cmd)?;
Ok(())
}

View File

@ -53,7 +53,7 @@ impl Ws {
let incoming_events = self.ws_rx.clone().map_err(|_| ());
incoming_events.for_each(move |(tl, event)| {
// dbg!(&tl, &event);
//TODO log::info!("{:?}, {:?}", &tl, &event);
if matches!(event, Event::Ping) {
self.send_msg(&event)?
} else if target_timeline == tl {
@ -70,6 +70,7 @@ impl Ws {
fn send_or_filter(&mut self, tl: Timeline, event: &Event, update: &impl Payload) -> Result<()> {
let (blocks, allowed_langs) = (&self.subscription.blocks, &self.subscription.allowed_langs);
const SKIP: Result<()> = Ok(());
match tl {
tl if tl.is_public()
&& !update.language_unset()