Add tests to Redis parsing

This commit is contained in:
Daniel Sockwell 2020-03-28 16:27:40 -04:00
parent 96760e50e5
commit 89faddfe64
5 changed files with 118 additions and 210 deletions

View File

@ -1,6 +1,6 @@
use std::fmt::Display;
use std::{error::Error, fmt};
pub fn die_with_msg(msg: impl Display) -> ! {
pub fn die_with_msg(msg: impl fmt::Display) -> ! {
eprintln!("FATAL ERROR: {}", msg);
std::process::exit(1);
}
@ -13,10 +13,31 @@ macro_rules! log_fatal {
};};
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum RedisParseErr {
Incomplete,
Unrecoverable,
InvalidNumber(std::num::ParseIntError),
NonNumericInput,
InvalidLineStart(String),
IncorrectRedisType,
}
impl fmt::Display for RedisParseErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{}", match self {
Self::Incomplete => "The input from Redis does not form a complete message, likely because the input buffer filled partway through a message. Save this input and try again with additional input from Redis.".to_string(),
Self::InvalidNumber(e) => format!( "Redis input cannot be parsed: {}", e),
_ => "TODO".to_string(),
})
}
}
impl Error for RedisParseErr {}
impl From<std::num::ParseIntError> for RedisParseErr {
fn from(error: std::num::ParseIntError) -> Self {
Self::InvalidNumber(error)
}
}
#[derive(Debug)]

View File

@ -431,7 +431,7 @@ impl Status {
// use crate::{
// err::RedisParseErr,
// parse_client_request::{Content::*, Reach::*, Stream::*, Timeline},
// redis_to_client_stream::{MessageQueues, MsgQueue, RedisMsg},
// redis_to_client_stream::*,
// };
// use lru::LruCache;
// use std::collections::HashMap;
@ -936,3 +936,4 @@ impl Status {
// Ok(())
// }
// }
// TODO: Revise these tests to cover *only* the RedisMessage -> (Timeline, Event) parsing

View File

@ -7,7 +7,7 @@ mod redis;
pub use {client_agent::ClientAgent, event_stream::EventStream};
// TODO remove
pub use redis::redis_msg::{self, RedisMsg, RedisUtf8};
pub use redis::redis_msg::{self, RedisUtf8};
//#[cfg(test)]
pub use receiver::process_messages;

View File

@ -7,11 +7,10 @@ pub use message_queues::{MessageQueues, MsgQueue};
use crate::{
config,
err::{RedisParseErr, TimelineErr},
err::RedisParseErr,
messages::Event,
parse_client_request::{Stream, Timeline},
pubsub_cmd,
redis_to_client_stream::redis::redis_msg::RedisMsg,
redis_to_client_stream::redis::{redis_cmd, RedisConn},
};
use futures::{Async, Poll};
@ -170,9 +169,9 @@ impl futures::stream::Stream for Receiver {
let (cache, namespace) = (&mut self.cache.hashtag_to_id, &self.redis_namespace);
let remaining_input =
process_messages(input, cache, namespace, &mut self.msg_queues);
process_messages(input, cache, namespace, &mut self.msg_queues).expect("TODO");
self.redis_input.extend_from_slice(remaining_input);
self.redis_input.extend_from_slice(&remaining_input);
self.redis_input.extend_from_slice(extra_bytes);
}
}
@ -209,34 +208,23 @@ pub fn process_messages<'a>(
cache: &mut LruCache<String, i64>,
namespace: &Option<String>,
msg_queues: &mut MessageQueues,
) -> &'a [u8] {
let mut remaining_input = input;
use RedisMsg::*;
match RedisMsg::from_raw(&mut remaining_input) {
Ok((EventMsg { tl_txt, event_txt }, rest)) => {
use TimelineErr::*;
let timeline = match Timeline::from_redis_raw_timeline(tl_txt, cache, namespace) {
Ok(timeline) => timeline,
Err(RedisNamespaceMismatch) => todo!(),
Err(InvalidInput) => todo!(),
};
let event: Event = serde_json::from_str(&event_txt).expect("TODO");
) -> Result<Vec<u8>, RedisParseErr> {
use crate::redis_to_client_stream::redis::redis_msg::RedisBytes;
let r_msg = RedisBytes::new(input.as_bytes())
.into_redis_utf8()
.try_into_redis_structured_text()?
.try_into_redis_message()?;
for msg_queue in msg_queues.values_mut() {
if msg_queue.timeline == timeline {
msg_queue.messages.push_back(event.clone());
}
}
remaining_input = rest;
}
Ok((SubscriptionMsg, rest)) => {
remaining_input = rest;
}
Err(RedisParseErr::Incomplete) => (),
Err(RedisParseErr::Unrecoverable) => {
panic!("Failed parsing Redis msg: {}", &remaining_input)
}
};
let (timeline, event) = (
r_msg.parse_timeline(cache, namespace)?,
r_msg.parse_event()?,
);
remaining_input.as_bytes()
for msg_queue in msg_queues.values_mut() {
if msg_queue.timeline == timeline {
msg_queue.messages.push_back(event.clone());
}
}
Ok(r_msg.leftover_input.as_leftover_bytes())
}

View File

@ -9,8 +9,10 @@
//!
//! ```text
//! *3\r\n
//! $7\r\nmessage\r\n
//! $10\r\ntimeline:4\r\n
//! $7\r\n
//! message\r\n
//! $10\r\n
//! timeline:4\r\n
//! $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n
//! ```
//!
@ -24,38 +26,34 @@ use crate::{
parse_client_request::Timeline,
};
use lru::LruCache;
use std::str;
use std::{
convert::{TryFrom, TryInto},
str,
};
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct RedisBytes<'a>(&'a [u8]);
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct RedisUtf8<'a> {
valid_utf8: &'a str,
leftover_bytes: RedisBytes<'a>,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct RedisStructuredText<'a> {
parsed_reply: RedisDataType<'a>,
leftover_input: RedisUtf8<'a>,
}
#[derive(Debug, Clone)]
pub struct RedisMessage<'a> {
timeline_txt: &'a str,
event_txt: &'a str,
leftover_input: RedisUtf8<'a>,
}
#[derive(Debug, Clone)]
pub struct RedisParsed<'a> {
pub timeline: Timeline,
pub event: Event,
pub leftover_input: RedisUtf8<'a>,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct RedisMessage<'a> {
timeline_txt: &'a str,
event_txt: &'a str,
pub leftover_input: RedisUtf8<'a>,
}
#[derive(Debug, Clone, PartialEq)]
enum RedisDataType<'a> {
RedisArray(Vec<RedisDataType<'a>>),
BulkString(&'a str),
@ -91,6 +89,7 @@ impl<'a> From<RedisBytes<'a>> for RedisUtf8<'a> {
}
use RedisDataType::*;
use RedisParseErr::*;
type RedisParser<'a, Item> = Result<Item, RedisParseErr>;
impl<'a> RedisUtf8<'a> {
pub fn try_into_redis_structured_text(self) -> Result<RedisStructuredText<'a>, RedisParseErr> {
@ -108,77 +107,83 @@ impl<'a> RedisUtf8<'a> {
}
}
fn parse_number_at(input: &'a str) -> RedisParser<(usize, &'a str)> {
let number_len = input
fn after_newline_at(s: &'a str, start: usize) -> RedisParser<'a, &'a str> {
Ok(s.get(start + "\r\n".len()..).ok_or(Incomplete)?)
}
fn parse_number_at(s: &'a str) -> RedisParser<(usize, &'a str)> {
let len = s
.chars()
.position(|c| !c.is_numeric())
.ok_or(Unrecoverable)?;
let number = input[..number_len].parse().map_err(|_| Unrecoverable)?;
let rest = input.get(number_len..).ok_or(Incomplete)?;
Ok((number, rest))
.ok_or(NonNumericInput)?;
// TODO: Test how this error looks when triggered. Consider adding new variant
Ok((s[..len].parse()?, Self::after_newline_at(s, len)?))
}
/// Parse a Redis bulk string and return the content of that string and the unparsed remainder.
///
/// All bulk strings have the format `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n`
fn parse_redis_bulk_string(input: &'a str) -> RedisParser<RedisStructuredText> {
let (field_len, rest) = parse_redis_length(input)?;
fn parse_redis_bulk_string(s: &'a str) -> RedisParser<RedisStructuredText> {
let (field_len, rest) = Self::parse_number_at(s)?;
let field_content = rest.get(..field_len).ok_or(Incomplete)?;
Ok(RedisStructuredText {
parsed_reply: BulkString(field_content),
leftover_input: RedisUtf8::from_str(&rest[field_len + "\r\n".len()..]),
leftover_input: RedisUtf8::from_str(Self::after_newline_at(&rest, field_len)?),
})
}
fn parse_redis_int(input: &'a str) -> RedisParser<RedisStructuredText> {
let (number, rest_with_newline) = parse_number_at(input)?;
let leftover_utf8 =
RedisUtf8::from_str(rest_with_newline.get("\r\n".len()..).ok_or(Incomplete)?);
fn parse_redis_int(s: &'a str) -> RedisParser<RedisStructuredText> {
let (number, rest) = Self::parse_number_at(s)?;
let leftover_utf8 = RedisUtf8::from_str(Self::after_newline_at(rest, 0)?);
Ok(RedisStructuredText {
parsed_reply: Integer(number),
leftover_input: leftover_utf8,
})
}
fn parse_redis_array(input: &'a str) -> RedisParser<RedisStructuredText> {
let (number_of_elements, rest) = Self::parse_number_at(input)?;
fn parse_redis_array(s: &'a str) -> RedisParser<RedisStructuredText> {
let (number_of_elements, rest) = Self::parse_number_at(s)?;
let mut str_left_to_parse = RedisUtf8::from_str(rest);
let mut inner = Vec::with_capacity(number_of_elements);
let mut leftover_utf8 = RedisUtf8::from_str(rest.get("\r\n".len()..).ok_or(Incomplete)?);
inner.resize(number_of_elements, RedisDataType::Uninitilized);
for i in (0..number_of_elements).rev() {
let next_el = Self::new_try_from(leftover_utf8.valid_utf8)?;
leftover_utf8 = next_el.leftover_input;
let next_el = Self::new_try_from(str_left_to_parse.valid_utf8)?;
str_left_to_parse = next_el.leftover_input;
inner[i] = next_el.parsed_reply;
}
Ok(RedisStructuredText {
parsed_reply: RedisDataType::RedisArray(inner),
leftover_input: leftover_utf8,
leftover_input: str_left_to_parse,
})
}
fn new_try_from(input: &'a str) -> Result<RedisStructuredText, RedisParseErr> {
if input.len() < 4 {
fn new_try_from(s: &'a str) -> Result<RedisStructuredText, RedisParseErr> {
if s.len() < 4 {
Err(Incomplete)?
};
let (first_char, input) = input.split_at(1);
let (first_char, s) = s.split_at(1);
match first_char {
":" => Self::parse_redis_int(input),
"$" => Self::parse_redis_bulk_string(input),
"*" => Self::parse_redis_array(input),
_ => panic!("TODO: Error for unimplemented"),
":" => Self::parse_redis_int(s),
"$" => Self::parse_redis_bulk_string(s),
"*" => Self::parse_redis_array(s),
e => Err(InvalidLineStart(format!(
"Encountered invalid initial character `{}` in line `{}`",
e, s
))),
}
}
}
use std::convert::{TryFrom, TryInto};
impl<'a> TryFrom<RedisDataType<'a>> for &'a str {
type Error = RedisParseErr;
fn try_from(val: RedisDataType<'a>) -> Result<Self, Self::Error> {
match val {
RedisDataType::BulkString(inner) => Ok(inner),
_ => Err(Unrecoverable),
_ => Err(IncorrectRedisType),
}
}
}
@ -207,47 +212,6 @@ impl<'a> RedisStructuredText<'a> {
panic!("TODO");
}
}
pub fn try_into_parsed(
self,
cache: &mut LruCache<String, i64>,
namespace: &Option<String>,
) -> Result<RedisParsed<'a>, RedisParseErr> {
if let RedisDataType::RedisArray(mut redis_strings) = self.parsed_reply {
let command = redis_strings.pop().expect("TODO").try_into()?;
match command {
"subscribe" | "unsubscribe" => {
// subscription statuses look like:
// $14\r\ntimeline:local\r\n
// :47\r\n
panic!("TODO: skip");
}
"message" => {
// Messages look like;
// $10\r\ntimeline:4\r\n
// $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n
let tl_txt = redis_strings.pop().expect("TODO").try_into()?;
let event_txt = redis_strings.pop().expect("TODO").try_into()?;
let timeline = match Timeline::from_redis_raw_timeline(tl_txt, cache, namespace)
{
Ok(timeline) => timeline,
Err(RedisNamespaceMismatch) => todo!(),
Err(InvalidInput) => todo!(),
};
let event: Event = serde_json::from_str(event_txt).expect("TODO");
Ok(RedisParsed {
timeline,
event,
leftover_input: self.leftover_input,
})
}
_cmd => Err(Incomplete)?,
}
} else {
panic!("TODO");
}
}
}
type HashtagCache = LruCache<String, i64>;
@ -268,79 +232,6 @@ impl<'a> RedisMessage<'a> {
}
}
type Parser<'a, Item> = Result<(Item, &'a str), RedisParseErr>;
/// A message that has been parsed from an incoming raw message from Redis.
#[derive(Debug, Clone)]
pub enum RedisMsg<'a> {
EventMsg { tl_txt: &'a str, event_txt: &'a str },
SubscriptionMsg,
}
use RedisParseErr::*;
impl<'a> RedisMsg<'a> {
pub fn from_raw(input: &'a str) -> Parser<'a, Self> {
// No need to parse the Redis Array header, just skip it
let input = input.get("*3\r\n".len()..).ok_or(Incomplete)?;
let (command, rest) = parse_redis_bulk_string(&input)?;
match command {
"subscribe" | "unsubscribe" => {
// subscription statuses look like:
// $14\r\ntimeline:local\r\n
// :47\r\n
let (_raw_timeline, rest) = parse_redis_bulk_string(&rest)?;
let (_number_of_subscriptions, rest) = parse_redis_int(&rest)?;
Ok((RedisMsg::SubscriptionMsg, &rest))
}
"message" => {
// Messages look like;
// $10\r\ntimeline:4\r\n
// $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n
let (tl_txt, rest) = parse_redis_bulk_string(&rest)?;
let (event_txt, rest) = parse_redis_bulk_string(&rest)?;
Ok((RedisMsg::EventMsg { tl_txt, event_txt }, rest))
}
_cmd => Err(Incomplete)?,
}
}
}
/// Parse a Redis bulk string and return the content of that string and the unparsed remainder.
///
/// All bulk strings have the format `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n`
fn parse_redis_bulk_string(input: &str) -> Parser<&str> {
let input = &input.get("$".len()..).ok_or(Incomplete)?;
let (field_len, rest) = parse_redis_length(input)?;
let field_content = rest.get(..field_len).ok_or(Incomplete)?;
Ok((field_content, &rest[field_len + "\r\n".len()..]))
}
fn parse_redis_int(input: &str) -> Parser<usize> {
let input = &input.get(":".len()..).ok_or(Incomplete)?;
let (number, rest_with_newline) = parse_number_at(input)?;
let rest = &rest_with_newline.get("\r\n".len()..).ok_or(Incomplete)?;
Ok((number, rest))
}
/// Return the value of a Redis length (for an array or bulk string) and the unparsed remainder
fn parse_redis_length(input: &str) -> Parser<usize> {
let (number, rest_with_newline) = parse_number_at(input)?;
let rest = &rest_with_newline.get("\r\n".len()..).ok_or(Incomplete)?;
Ok((number, rest))
}
fn parse_number_at(input: &str) -> Parser<usize> {
let number_len = input
.chars()
.position(|c| !c.is_numeric())
.ok_or(Unrecoverable)?;
let number = input[..number_len].parse().map_err(|_| Unrecoverable)?;
let rest = &input.get(number_len..).ok_or(Incomplete)?;
Ok((number, rest))
}
#[cfg(test)]
mod test {
use super::*;
@ -350,17 +241,24 @@ mod test {
let mut buffer = Vec::new();
let input = "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$45\r\n{\"event\":\"announcement.delete\",\"payload\":\"5\"}\r\n";
let mut cache = LruCache::new(1000);
let r_msg = RedisBytes(input.as_bytes())
.into_redis_utf8()
.try_into_redis_structured_text()?
.try_into_redis_message()?;
let r_txt = RedisBytes(input.as_bytes()).into_redis_utf8();
assert_eq!(r_txt.valid_utf8, "*3\r\n$7\r\nmessage\r\n$12\r\ntimeline:308\r\n$45\r\n{\"event\":\"announcement.delete\",\"payload\":\"5\"}\r\n");
let r_structured = r_txt.try_into_redis_structured_text()?;
assert_eq!(
r_structured.parsed_reply,
RedisArray(vec![
BulkString(&"{\"event\":\"announcement.delete\",\"payload\":\"5\"}"),
BulkString(&"timeline:308"),
BulkString(&"message"),
])
);
let r_msg = r_structured.try_into_redis_message()?;
buffer.push(r_msg.leftover_input.as_leftover_bytes());
let (timeline, event) = (
r_msg.parse_timeline(&mut cache, &None)?,
r_msg.parse_event()?,
assert_eq!(r_msg.timeline_txt, "timeline:308");
assert_eq!(
r_msg.event_txt,
"{\"event\":\"announcement.delete\",\"payload\":\"5\"}"
);
Ok(())