Fix Receiver for tests

This commit is contained in:
Daniel Sockwell 2020-03-27 11:50:09 -04:00
parent fb2d7b39fb
commit ca4f1263fc
2 changed files with 51 additions and 32 deletions

View File

@ -21,7 +21,7 @@ use tokio::io::AsyncRead;
use std::{
collections::HashMap,
io::Read,
net,
net, str,
time::{Duration, Instant},
};
use tokio::io::Error;
@ -39,7 +39,7 @@ pub struct Receiver {
pub msg_queues: MessageQueues,
clients_per_timeline: HashMap<Timeline, i32>,
cache: Cache,
redis_input: String,
redis_input: Vec<u8>,
redis_namespace: Option<String>,
}
@ -77,7 +77,7 @@ impl Receiver {
id_to_hashtag: LruCache::new(1000),
hashtag_to_id: LruCache::new(1000),
}, // should these be run-time options?
redis_input: String::new(),
redis_input: Vec::new(),
redis_namespace,
}
}
@ -157,33 +157,23 @@ impl futures::stream::Stream for Receiver {
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
let mut buffer = vec![0u8; 6000];
if let Ok(Async::Ready(bytes_read)) = self.poll_read(&mut buffer) {
let raw_utf = String::from_utf8(buffer[..bytes_read].to_vec())
.expect("TODO: Get next byte if not on Unicode boundary");
let (mut cache, namespace) = (&mut self.cache.hashtag_to_id, &self.redis_namespace);
self.redis_input.push_str(&raw_utf);
let mut input = self.redis_input.as_str();
let binary_input = buffer[..bytes_read].to_vec();
let (input, extra_bytes) = match str::from_utf8(&binary_input) {
Ok(input) => (input, "".as_bytes()),
Err(e) => {
let (valid, after_valid) = binary_input.split_at(e.valid_up_to());
let input = str::from_utf8(valid).expect("Guaranteed by `.valid_up_to`");
(input, after_valid)
}
};
use RedisMsg::*;
loop {
match RedisMsg::from_raw(&mut input, &mut cache, namespace) {
Ok((EventMsg(timeline, event), rest)) => {
for msg_queue in self.msg_queues.values_mut() {
if msg_queue.timeline == timeline {
msg_queue.messages.push_back(event.clone());
}
}
input = rest;
}
Ok((SubscriptionMsg, rest)) | Ok((MsgForDifferentNamespace, rest)) => {
input = rest;
}
Err(RedisParseErr::Incomplete) => break,
Err(RedisParseErr::Unrecoverable) => {
panic!("Failed parsing Redis msg: {}", &self.redis_input)
}
};
}
self.redis_input = input.to_string();
let (cache, namespace) = (&mut self.cache.hashtag_to_id, &self.redis_namespace);
let remaining_input =
process_messages(input, cache, namespace, &mut self.msg_queues);
self.redis_input.extend_from_slice(remaining_input);
self.redis_input.extend_from_slice(extra_bytes);
}
}
@ -212,3 +202,34 @@ impl AsyncRead for Receiver {
}
}
}
#[must_use]
pub fn process_messages<'a>(
input: &'a str,
mut cache: &mut LruCache<String, i64>,
namespace: &Option<String>,
msg_queues: &mut MessageQueues,
) -> &'a [u8] {
let mut remaining_input = input;
use RedisMsg::*;
loop {
match RedisMsg::from_raw(&mut remaining_input, &mut cache, namespace) {
Ok((EventMsg(timeline, event), rest)) => {
for msg_queue in msg_queues.values_mut() {
if msg_queue.timeline == timeline {
msg_queue.messages.push_back(event.clone());
}
}
remaining_input = rest;
}
Ok((SubscriptionMsg, rest)) | Ok((MsgForDifferentNamespace, rest)) => {
remaining_input = rest;
}
Err(RedisParseErr::Incomplete) => break,
Err(RedisParseErr::Unrecoverable) => {
panic!("Failed parsing Redis msg: {}", &remaining_input)
}
};
}
remaining_input.as_bytes()
}

View File

@ -7,9 +7,7 @@ macro_rules! pubsub_cmd {
($cmd:expr, $self:expr, $tl:expr) => {{
use std::io::Write;
log::info!("Sending {} command to {}", $cmd, $tl);
// let namespace = $self.pubsub_connection.namespace.clone();
// TODO: fixme
let namespace = None;
let namespace = $self.redis_namespace.clone();
$self
.pubsub_connection