WIP RedisMsg refactor

Committing WIP before trying a different approach
This commit is contained in:
Daniel Sockwell 2020-03-29 15:12:30 -04:00
parent 89faddfe64
commit f59890159d
5 changed files with 238 additions and 222 deletions

View File

@ -13,7 +13,7 @@ macro_rules! log_fatal {
};};
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum RedisParseErr {
Incomplete,
InvalidNumber(std::num::ParseIntError),

View File

@ -196,7 +196,7 @@ impl Timeline {
}
}
pub fn from_redis_raw_timeline(
pub fn from_redis_text(
timeline: &str,
cache: &mut LruCache<String, i64>,
namespace: &Option<String>,
@ -243,10 +243,9 @@ impl Timeline {
[n, "timeline", id, "notification"] if ns == n =>
(User(id.parse()?), Federated, Notification),
[_, "timeline", _id, "notification"]
| ["timeline", _id, "notification"] => Err(RedisNamespaceMismatch)?,
[n, "timeline", "list", id] if ns == n => (List(id.parse()?), Federated, All),
[_, "timeline", "list", _id]

View File

@ -10,7 +10,7 @@ pub use {client_agent::ClientAgent, event_stream::EventStream};
pub use redis::redis_msg::{self, RedisUtf8};
//#[cfg(test)]
pub use receiver::process_messages;
//pub use receiver::process_messages;
//#[cfg(test)]
pub use receiver::{MessageQueues, MsgQueue};
//#[cfg(test)]

View File

@ -7,11 +7,15 @@ pub use message_queues::{MessageQueues, MsgQueue};
use crate::{
config,
err::RedisParseErr,
err::{RedisParseErr, TimelineErr},
messages::Event,
parse_client_request::{Stream, Timeline},
pubsub_cmd,
redis_to_client_stream::redis::{redis_cmd, RedisConn},
redis_to_client_stream::redis::{
redis_cmd,
redis_msg::{RedisMsg, RedisParseOutput, RedisUtf8},
RedisConn,
},
};
use futures::{Async, Poll};
use lru::LruCache;
@ -19,8 +23,9 @@ use tokio::io::AsyncRead;
use std::{
collections::HashMap,
convert::TryFrom,
io::Read,
net, str,
net,
time::{Duration, Instant},
};
use tokio::io::Error;
@ -156,23 +161,66 @@ impl futures::stream::Stream for Receiver {
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
let mut buffer = vec![0u8; 6000];
if let Ok(Async::Ready(bytes_read)) = self.poll_read(&mut buffer) {
let binary_input = buffer[..bytes_read].to_vec();
let (input, extra_bytes) = match str::from_utf8(&binary_input) {
Ok(input) => (input, "".as_bytes()),
Err(e) => {
let (valid, after_valid) = binary_input.split_at(e.valid_up_to());
let input = str::from_utf8(valid).expect("Guaranteed by `.valid_up_to`");
(input, after_valid)
}
};
let (cache, ns) = (&mut self.cache.hashtag_to_id, &self.redis_namespace);
let (cache, namespace) = (&mut self.cache.hashtag_to_id, &self.redis_namespace);
use TimelineErr::*;
let mut remaining_input = RedisUtf8::from(&buffer[..bytes_read]);
loop {
use RedisParseOutput::*;
match RedisParseOutput::try_from(remaining_input) {
Ok(Msg(msg)) => {
let timeline =
match Timeline::from_redis_text(msg.timeline_txt, cache, ns) {
Ok(timeline) => timeline,
Err(TimelineErr::RedisNamespaceMismatch) => {
remaining_input = msg.leftover_input;
break;
}
Err(TimelineErr::InvalidInput) => {
log::error!("{:?}\n{:?}", InvalidInput, msg);
remaining_input = msg.leftover_input;
break;
}
};
let remaining_input =
process_messages(input, cache, namespace, &mut self.msg_queues).expect("TODO");
let event: Event = serde_json::from_str(msg.event_txt).expect("TODO");
self.redis_input.extend_from_slice(&remaining_input);
self.redis_input.extend_from_slice(extra_bytes);
for msg_queue in self.msg_queues.values_mut() {
if msg_queue.timeline == timeline {
msg_queue.messages.push_back(event.clone());
}
}
log::info!("Got a msg from Redis for {:?}", timeline);
remaining_input = msg.leftover_input;
continue;
}
Ok(NonMsg(leftover_input)) => {
log::info!("Got a non-msg from Redis.");
remaining_input = leftover_input;
continue;
}
Err(RedisParseErr::Incomplete) => {
log::info!(
"Got an incomplete msg from Redis: {:?}",
String::from_utf8_lossy(&buffer[..bytes_read])
);
break;
}
Err(other) => {
log::error!(
"{:?}\nRedis input: {:?}",
other,
String::from_utf8_lossy(&buffer[..bytes_read])
);
break;
}
};
}
self.redis_input
.extend_from_slice(remaining_input.valid_utf8.as_bytes());
self.redis_input
.extend_from_slice(remaining_input.leftover_bytes);
}
}
@ -202,29 +250,23 @@ impl AsyncRead for Receiver {
}
}
#[must_use]
pub fn process_messages<'a>(
input: &'a str,
cache: &mut LruCache<String, i64>,
namespace: &Option<String>,
msg_queues: &mut MessageQueues,
) -> Result<Vec<u8>, RedisParseErr> {
use crate::redis_to_client_stream::redis::redis_msg::RedisBytes;
let r_msg = RedisBytes::new(input.as_bytes())
.into_redis_utf8()
.try_into_redis_structured_text()?
.try_into_redis_message()?;
// #[must_use]
// pub fn process_messages<'a>(
// input: RedisUtf8<'a>,
// cache: &mut LruCache<String, i64>,
// namespace: &Option<String>,
// msg_queues: &mut MessageQueues,
// ) -> Result<RedisUtf8<'a>, RedisParseErr> {
// let r_msg = RedisMsg::try_from(RedisUtf8::from(input))?;
// let timeline = Timeline::from_redis_text(r_msg.timeline_txt, cache, namespace)
// .expect("TODO and handle timelines we skip");
// let event: Event = serde_json::from_str(r_msg.event_txt).expect("TODO");
let (timeline, event) = (
r_msg.parse_timeline(cache, namespace)?,
r_msg.parse_event()?,
);
// for msg_queue in msg_queues.values_mut() {
// if msg_queue.timeline == timeline {
// msg_queue.messages.push_back(event.clone());
// }
// }
for msg_queue in msg_queues.values_mut() {
if msg_queue.timeline == timeline {
msg_queue.messages.push_back(event.clone());
}
}
Ok(r_msg.leftover_input.as_leftover_bytes())
}
// Ok(r_msg.leftover_input)
// }

View File

@ -20,192 +20,190 @@
//! three characters, the second is a bulk string with ten characters, and the third is a
//! bulk string with 1,386 characters.
use crate::{
err::{RedisParseErr, TimelineErr},
messages::Event,
parse_client_request::Timeline,
};
use lru::LruCache;
use self::RedisParseOutput::*;
use crate::err::RedisParseErr;
use std::{
convert::{TryFrom, TryInto},
str,
};
#[derive(Debug, Clone, PartialEq)]
pub struct RedisBytes<'a>(&'a [u8]);
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Copy)]
pub struct RedisUtf8<'a> {
valid_utf8: &'a str,
leftover_bytes: RedisBytes<'a>,
pub valid_utf8: &'a str,
pub leftover_bytes: &'a [u8],
}
#[derive(Debug, Clone, PartialEq)]
pub struct RedisStructuredText<'a> {
parsed_reply: RedisDataType<'a>,
pub leftover_input: RedisUtf8<'a>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct RedisMessage<'a> {
timeline_txt: &'a str,
event_txt: &'a str,
pub leftover_input: RedisUtf8<'a>,
}
#[derive(Debug, Clone, PartialEq)]
enum RedisDataType<'a> {
RedisArray(Vec<RedisDataType<'a>>),
BulkString(&'a str),
Integer(usize),
Uninitilized,
}
impl<'a> RedisBytes<'a> {
pub fn new(bytes: &'a [u8]) -> Self {
Self(bytes)
}
pub fn into_redis_utf8(self) -> RedisUtf8<'a> {
RedisUtf8::from(self)
}
}
impl<'a> From<RedisBytes<'a>> for RedisUtf8<'a> {
fn from(val: RedisBytes<'a>) -> Self {
match str::from_utf8(val.0) {
impl<'a> From<&'a [u8]> for RedisUtf8<'a> {
fn from(bytes: &'a [u8]) -> Self {
match str::from_utf8(bytes) {
Ok(valid_utf8) => Self {
valid_utf8,
leftover_bytes: RedisBytes("".as_bytes()),
leftover_bytes: "".as_bytes(),
},
Err(e) => {
let (valid, after_valid) = val.0.split_at(e.valid_up_to());
let (valid, after_valid) = bytes.split_at(e.valid_up_to());
Self {
valid_utf8: str::from_utf8(valid).expect("Guaranteed by `.valid_up_to`"),
leftover_bytes: RedisBytes(after_valid),
leftover_bytes: after_valid,
}
}
}
}
}
use RedisDataType::*;
use RedisParseErr::*;
type RedisParser<'a, Item> = Result<Item, RedisParseErr>;
impl<'a> RedisUtf8<'a> {
pub fn try_into_redis_structured_text(self) -> Result<RedisStructuredText<'a>, RedisParseErr> {
Self::new_try_from(self.valid_utf8)
}
pub fn as_leftover_bytes(&self) -> Vec<u8> {
[self.valid_utf8.as_bytes(), self.leftover_bytes.0].concat()
}
fn from_str(valid_utf8: &'a str) -> Self {
Self {
valid_utf8,
leftover_bytes: RedisBytes("".as_bytes()),
}
}
fn after_newline_at(s: &'a str, start: usize) -> RedisParser<'a, &'a str> {
Ok(s.get(start + "\r\n".len()..).ok_or(Incomplete)?)
}
fn parse_number_at(s: &'a str) -> RedisParser<(usize, &'a str)> {
let len = s
.chars()
.position(|c| !c.is_numeric())
.ok_or(NonNumericInput)?;
// TODO: Test how this error looks when triggered. Consider adding new variant
Ok((s[..len].parse()?, Self::after_newline_at(s, len)?))
}
/// Parse a Redis bulk string and return the content of that string and the unparsed remainder.
///
/// All bulk strings have the format `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n`
fn parse_redis_bulk_string(s: &'a str) -> RedisParser<RedisStructuredText> {
let (field_len, rest) = Self::parse_number_at(s)?;
let field_content = rest.get(..field_len).ok_or(Incomplete)?;
Ok(RedisStructuredText {
parsed_reply: BulkString(field_content),
leftover_input: RedisUtf8::from_str(Self::after_newline_at(&rest, field_len)?),
})
}
fn parse_redis_int(s: &'a str) -> RedisParser<RedisStructuredText> {
let (number, rest) = Self::parse_number_at(s)?;
let leftover_utf8 = RedisUtf8::from_str(Self::after_newline_at(rest, 0)?);
Ok(RedisStructuredText {
parsed_reply: Integer(number),
leftover_input: leftover_utf8,
})
}
fn parse_redis_array(s: &'a str) -> RedisParser<RedisStructuredText> {
let (number_of_elements, rest) = Self::parse_number_at(s)?;
let mut str_left_to_parse = RedisUtf8::from_str(rest);
let mut inner = Vec::with_capacity(number_of_elements);
inner.resize(number_of_elements, RedisDataType::Uninitilized);
for i in (0..number_of_elements).rev() {
let next_el = Self::new_try_from(str_left_to_parse.valid_utf8)?;
str_left_to_parse = next_el.leftover_input;
inner[i] = next_el.parsed_reply;
}
Ok(RedisStructuredText {
parsed_reply: RedisDataType::RedisArray(inner),
leftover_input: str_left_to_parse,
})
}
fn new_try_from(s: &'a str) -> Result<RedisStructuredText, RedisParseErr> {
if s.len() < 4 {
Err(Incomplete)?
};
let (first_char, s) = s.split_at(1);
match first_char {
":" => Self::parse_redis_int(s),
"$" => Self::parse_redis_bulk_string(s),
"*" => Self::parse_redis_array(s),
e => Err(InvalidLineStart(format!(
"Encountered invalid initial character `{}` in line `{}`",
e, s
))),
}
impl<'a> Default for RedisUtf8<'a> {
fn default() -> Self {
Self::from("".as_bytes())
}
}
impl<'a> TryFrom<RedisDataType<'a>> for &'a str {
#[derive(Debug, Clone, PartialEq)]
pub enum RedisParseOutput<'a> {
Msg(RedisMsg<'a>),
NonMsg(RedisUtf8<'a>),
}
#[derive(Debug, Clone, PartialEq)]
pub struct RedisMsg<'a> {
pub timeline_txt: &'a str,
pub event_txt: &'a str,
pub leftover_input: RedisUtf8<'a>,
}
impl<'a> TryFrom<&'a str> for RedisParseOutput<'a> {
type Error = RedisParseErr;
fn try_from(utf8: &'a str) -> Result<RedisParseOutput<'a>, Self::Error> {
let (structured_txt, leftover_utf8) = utf8_to_redis_data(utf8.valid_utf8)?;
let structured_txt = RedisStructuredText {
structured_txt,
leftover_input: RedisUtf8 {
valid_utf8: leftover_utf8,
leftover_bytes: utf8.leftover_bytes,
},
};
Ok(structured_txt.try_into()?)
}
}
impl<'a> TryFrom<RedisUtf8<'a>> for RedisParseOutput<'a> {
type Error = RedisParseErr;
fn try_from(utf8: RedisUtf8<'a>) -> Result<RedisParseOutput<'a>, Self::Error> {
let (structured_txt, leftover_utf8) = utf8_to_redis_data(utf8.valid_utf8)?;
let structured_txt = RedisStructuredText {
structured_txt,
leftover_input: RedisUtf8 {
valid_utf8: leftover_utf8,
leftover_bytes: utf8.leftover_bytes,
},
};
Ok(structured_txt.try_into()?)
}
}
use RedisData::*;
use RedisParseErr::*;
type RedisParser<'a, Item> = Result<Item, RedisParseErr>;
#[derive(Debug, Clone, PartialEq)]
struct RedisStructuredText<'a> {
structured_txt: RedisData<'a>,
leftover_input: RedisUtf8<'a>,
}
#[derive(Debug, Clone, PartialEq)]
enum RedisData<'a> {
RedisArray(Vec<RedisData<'a>>),
BulkString(&'a str),
Integer(usize),
Uninitilized,
}
fn utf8_to_redis_data<'a>(s: &'a str) -> Result<(RedisData, &'a str), RedisParseErr> {
if s.len() < 4 {
Err(Incomplete)?
};
let (first_char, s) = s.split_at(1);
match first_char {
":" => parse_redis_int(s),
"$" => parse_redis_bulk_string(s),
"*" => parse_redis_array(s),
e => Err(InvalidLineStart(format!(
"Encountered invalid initial character `{}` in line `{}`",
e, s
))),
}
}
fn after_newline_at<'a>(s: &'a str, start: usize) -> RedisParser<'a, &'a str> {
Ok(s.get(start + "\r\n".len()..).ok_or(Incomplete)?)
}
fn parse_number_at<'a>(s: &'a str) -> RedisParser<(usize, &'a str)> {
let len = s
.chars()
.position(|c| !c.is_numeric())
.ok_or(NonNumericInput)?;
Ok((s[..len].parse()?, after_newline_at(s, len)?))
}
/// Parse a Redis bulk string and return the content of that string and the unparsed remainder.
///
/// All bulk strings have the format `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n`
fn parse_redis_bulk_string<'a>(s: &'a str) -> RedisParser<(RedisData, &'a str)> {
let (len, rest) = parse_number_at(s)?;
let content = rest.get(..len).ok_or(Incomplete)?;
Ok((BulkString(content), after_newline_at(&rest, len)?))
}
fn parse_redis_int<'a>(s: &'a str) -> RedisParser<(RedisData, &'a str)> {
let (number, rest) = parse_number_at(s)?;
Ok((Integer(number), rest))
}
fn parse_redis_array<'a>(s: &'a str) -> RedisParser<(RedisData, &'a str)> {
let (number_of_elements, mut rest) = parse_number_at(s)?;
let mut inner = Vec::with_capacity(number_of_elements);
inner.resize(number_of_elements, RedisData::Uninitilized);
for i in (0..number_of_elements).rev() {
let (next_el, new_rest) = utf8_to_redis_data(rest)?;
rest = new_rest;
inner[i] = next_el;
}
Ok((RedisData::RedisArray(inner), rest))
}
impl<'a> TryFrom<RedisData<'a>> for &'a str {
type Error = RedisParseErr;
fn try_from(val: RedisDataType<'a>) -> Result<Self, Self::Error> {
fn try_from(val: RedisData<'a>) -> Result<Self, Self::Error> {
match val {
RedisDataType::BulkString(inner) => Ok(inner),
RedisData::BulkString(inner) => Ok(inner),
_ => Err(IncorrectRedisType),
}
}
}
impl<'a> RedisStructuredText<'a> {
pub fn try_into_redis_message(self) -> Result<RedisMessage<'a>, RedisParseErr> {
if let RedisDataType::RedisArray(mut redis_strings) = self.parsed_reply {
let command = redis_strings.pop().expect("TODO").try_into()?;
impl<'a> TryFrom<RedisStructuredText<'a>> for RedisParseOutput<'a> {
type Error = RedisParseErr;
fn try_from(input: RedisStructuredText<'a>) -> Result<RedisParseOutput<'a>, Self::Error> {
if let RedisData::RedisArray(mut redis_strings) = input.structured_txt {
let command = redis_strings.pop().expect("TODO").try_into()?;
dbg!(&command);
match command {
// subscription statuses look like:
// $14\r\ntimeline:local\r\n
// :47\r\n
"subscribe" | "unsubscribe" => panic!("TODO: skip"),
"subscribe" | "unsubscribe" => Ok(NonMsg(input.leftover_input)),
// Messages look like;
// $10\r\ntimeline:4\r\n
// $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n
"message" => Ok(RedisMessage {
"message" => Ok(Msg(RedisMsg {
timeline_txt: redis_strings.pop().expect("TODO").try_into()?,
event_txt: redis_strings.pop().expect("TODO").try_into()?,
leftover_input: self.leftover_input,
}),
leftover_input: input.leftover_input,
})),
_cmd => Err(Incomplete)?,
}
} else {
@ -214,47 +212,24 @@ impl<'a> RedisStructuredText<'a> {
}
}
type HashtagCache = LruCache<String, i64>;
impl<'a> RedisMessage<'a> {
pub fn parse_timeline(
&self,
cache: &mut HashtagCache,
namespace: &Option<String>,
) -> Result<Timeline, RedisParseErr> {
match Timeline::from_redis_raw_timeline(self.timeline_txt, cache, namespace) {
Ok(timeline) => Ok(timeline),
Err(TimelineErr::RedisNamespaceMismatch) => todo!(),
Err(TimelineErr::InvalidInput) => todo!(),
}
}
pub fn parse_event(&self) -> Result<Event, RedisParseErr> {
Ok(serde_json::from_str(self.event_txt).expect("TODO"))
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn parse_redis_int() -> Result<(), RedisParseErr> {
let mut buffer = Vec::new();
let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$45\r\n{\"event\":\"announcement.delete\",\"payload\":\"5\"}\r\n";
let input = "*3\r\n$9\r\nsubscribe\r\n$15\r\ntimeline:public\r\n:1\r\n";
let r_txt = RedisBytes(input.as_bytes()).into_redis_utf8();
assert_eq!(r_txt.valid_utf8, "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$45\r\n{\"event\":\"announcement.delete\",\"payload\":\"5\"}\r\n");
let r_structured = r_txt.try_into_redis_structured_text()?;
let r_txt = RedisUtf8::from(input.as_bytes());
assert_eq!(
r_structured.parsed_reply,
RedisArray(vec![
BulkString(&"{\"event\":\"announcement.delete\",\"payload\":\"5\"}"),
BulkString(&"timeline:308"),
BulkString(&"message"),
])
r_txt.valid_utf8,
"*3\r\n$9\r\nsubscribe\r\n$15\r\ntimeline:public\r\n:1\r\n"
);
let r_msg = r_structured.try_into_redis_message()?;
buffer.push(r_msg.leftover_input.as_leftover_bytes());
let r_msg: RedisMsg = r_txt.try_into()?;
assert!(r_msg.leftover_input.valid_utf8.is_empty());
assert!(r_msg.leftover_input.leftover_bytes.is_empty());
assert_eq!(r_msg.timeline_txt, "timeline:308");
assert_eq!(
r_msg.event_txt,