Add additional parsing tests

This commit is contained in:
Daniel Sockwell 2020-03-25 16:44:23 -04:00
parent 562f239b55
commit 99f71ab269
10 changed files with 712 additions and 216 deletions

View File

@ -1,12 +1,12 @@
[package]
name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.6.4"
version = "0.6.5"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018"
[dependencies]
log = "0.4.6"
log = { version = "0.4.6", features = ["release_max_level_info"] }
futures = "0.1.26"
tokio = "0.1.19"
warp = { git = "https://github.com/seanmonstar/warp.git"}

View File

@ -88,7 +88,7 @@ mod flodgatt_parse_event {
pub fn setup() -> MessageQueues {
let mut queues_map = HashMap::new();
let id = Uuid::default();
let timeline = Timeline::from_redis_str("1", None);
let timeline = Timeline::from_redis_raw_timeline("1", None);
queues_map.insert(id, MsgQueue::new(timeline));
let queues = MessageQueues(queues_map);
queues
@ -101,7 +101,7 @@ mod flodgatt_parse_event {
id: Uuid,
timeline: Timeline,
) -> Event {
redis_stream::process_messages(input, &mut None, &mut cache, &mut queues);
redis_stream::process_messages(input, &mut None, &mut cache, &mut queues).unwrap();
queues
.oldest_msg_in_target_queue(id, timeline)
.expect("In test")
@ -238,7 +238,7 @@ mod flodgatt_parse_value {
"message" => {
let (raw_timeline, msg_value) = msg.extract_raw_timeline_and_message();
let hashtag = hashtag_from_timeline(&raw_timeline, hashtag_id_cache);
let timeline = Timeline::from_redis_str(&raw_timeline, hashtag);
let timeline = Timeline::from_redis_raw_timeline(&raw_timeline, hashtag);
for msg_queue in queues.values_mut() {
if msg_queue.timeline == timeline {
msg_queue.messages.push_back(msg_value.clone());
@ -282,7 +282,7 @@ mod flodgatt_parse_value {
let cache: LruCache<String, i64> = LruCache::new(1000);
let mut queues_map = HashMap::new();
let id = Uuid::default();
let timeline = Timeline::from_redis_str("1", None);
let timeline = Timeline::from_redis_raw_timeline("1", None);
queues_map.insert(id, MsgQueue::new(timeline));
let queues = MessageQueues(queues_map);
(cache, queues, id, timeline)
@ -303,12 +303,12 @@ mod flodgatt_parse_value {
}
fn criterion_benchmark(c: &mut Criterion) {
let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS.to_string();
let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS.to_string(); //INPUT.to_string();
let mut group = c.benchmark_group("Parse redis RESP array");
group.bench_function("parse to Value with a regex", |b| {
b.iter(|| regex_parse::to_json_value(black_box(input.clone())))
});
// group.bench_function("parse to Value with a regex", |b| {
// b.iter(|| regex_parse::to_json_value(black_box(input.clone())))
// });
group.bench_function("parse to Value inline", |b| {
b.iter(|| parse_inline::to_json_value(black_box(input.clone())))
});

File diff suppressed because one or more lines are too long

View File

@ -57,7 +57,7 @@ impl Timeline {
Self(Unset, Local, Notification)
}
pub fn to_redis_str(&self, hashtag: Option<&String>) -> String {
pub fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> String {
use {Content::*, Reach::*, Stream::*};
match self {
Timeline(Public, Federated, All) => "timeline:public".into(),
@ -82,7 +82,7 @@ impl Timeline {
}
}
}
pub fn from_redis_str(raw_timeline: &str, hashtag: Option<i64>) -> Self {
pub fn from_redis_raw_timeline(raw_timeline: &str, hashtag: Option<i64>) -> Self {
use {Content::*, Reach::*, Stream::*};
match raw_timeline.split(':').collect::<Vec<&str>>()[..] {
["public"] => Timeline(Public, Federated, All),

View File

@ -89,25 +89,13 @@ impl futures::stream::Stream for ClientAgent {
/// replies with `Ok(NotReady)`. The `ClientAgent` bubles up any
/// errors from the underlying data structures.
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
use std::time::Instant;
let start_time = Instant::now();
let result = {
let mut receiver = self
.receiver
.lock()
.expect("ClientAgent: No other thread panic");
let get_lock = Instant::now();
receiver.configure_for_polling(self.id, self.subscription.timeline);
let res = receiver.poll();
if start_time.elapsed().as_millis() > 1 {
log::warn!(
"Polling the Receiver took: {:?}\n({:?} waiting for lock)",
start_time.elapsed(),
get_lock.elapsed()
);
log::info!("Longer polling yielded: {:#?}", &res);
};
res
receiver.poll()
};
let allowed_langs = &self.subscription.allowed_langs;
@ -118,12 +106,17 @@ impl futures::stream::Stream for ClientAgent {
use Event::*;
match result {
Ok(Async::Ready(Some(event))) => match event {
Update { payload: status } => match self.subscription.timeline {
Update {
payload: status, ..
} => match self.subscription.timeline {
_ if status.involves_blocked_user(blocked_users) => block,
_ if status.from_blocked_domain(blocked_domains) => block,
_ if status.from_blocking_user(blocking_users) => block,
Timeline(Public, _, _) if status.language_not_allowed(allowed_langs) => block,
_ => send(Update { payload: status }),
_ => send(Update {
payload: status,
queued_at: None,
}),
},
Notification { .. }
| Conversation { .. }

View File

@ -50,10 +50,13 @@ impl MessageQueues {
}
pub fn oldest_msg_in_target_queue(&mut self, id: Uuid, timeline: Timeline) -> Option<Event> {
self.entry(id)
.or_insert_with(|| MsgQueue::new(timeline))
.messages
.pop_front()
let msg_qs_entry = self.entry(id);
let mut inserted_tl = false;
let msg_q = msg_qs_entry.or_insert_with(|| {
inserted_tl = true;
MsgQueue::new(timeline)
});
msg_q.messages.pop_front()
}
pub fn calculate_timelines_to_add_or_drop(&mut self, timeline: Timeline) -> Vec<Change> {
let mut timelines_to_modify = Vec::new();

View File

@ -121,8 +121,8 @@ impl Receiver {
// Record the lower number of clients subscribed to that channel
for change in timelines_to_modify {
let timeline = change.timeline;
let opt_hashtag = self.if_hashtag_timeline_get_hashtag_name(timeline);
let opt_hashtag = opt_hashtag.as_ref();
let hashtag = self.if_hashtag_timeline_get_hashtag_name(timeline);
let hashtag = hashtag.as_ref();
let count_of_subscribed_clients = self
.clients_per_timeline
@ -132,9 +132,9 @@ impl Receiver {
// If no clients, unsubscribe from the channel
if *count_of_subscribed_clients <= 0 {
pubsub_cmd!("unsubscribe", self, timeline.to_redis_str(opt_hashtag));
pubsub_cmd!("unsubscribe", self, timeline.to_redis_raw_timeline(hashtag));
} else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 {
pubsub_cmd!("subscribe", self, timeline.to_redis_str(opt_hashtag));
pubsub_cmd!("subscribe", self, timeline.to_redis_raw_timeline(hashtag));
}
}
if start_time.elapsed().as_millis() > 1 {
@ -156,6 +156,7 @@ impl futures::stream::Stream for Receiver {
/// been polled lately.
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let (timeline, id) = (self.timeline.clone(), self.manager_id);
if self.redis_polled_at.elapsed() > *self.redis_poll_interval {
self.pubsub_connection
.poll_redis(&mut self.cache.hashtag_to_id, &mut self.msg_queues);

View File

@ -1,49 +1,118 @@
use crate::{log_fatal, messages::Event};
//! Methods for parsing input in the small subset of the Redis Serialization Protocol we
//! support.
//!
//! Every message Flodgatt receives from Redis is a Redis Array; the elements in the array
//! will be either Bulk Strings or Integers (as Redis defines those terms). See the
//! [Redis protocol documentation](https://redis.io/topics/protocol) for details. A raw
//! message might look slightly like this (simplified, with line brakes added between
//! fields):
//!
//! ```text
//! *3\r\n
//! $7\r\nmessage\r\n
//! $10\r\ntimeline:4\r\n
//! $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n
//! ```
//!
//! Read that as: an array with three elements: the first element is a bulk string with
//! three characters, the second is a bulk string with ten characters, and the third is a
//! bulk string with 1,386 characters.
use crate::{log_fatal, messages::Event, parse_client_request::subscription::Timeline};
use lru::LruCache;
type Parser<'a, Item> = Result<(Item, &'a str), ParseErr>;
#[derive(Debug)]
pub struct RedisMsg<'a> {
pub raw: &'a str,
pub cursor: usize,
pub prefix_len: usize,
pub enum ParseErr {
Incomplete,
Unrecoverable,
}
use ParseErr::*;
/// A message that has been parsed from an incoming raw message from Redis.
#[derive(Debug, Clone)]
pub enum RedisMsg {
EventMsg(Timeline, Event),
SubscriptionMsg,
}
impl<'a> RedisMsg<'a> {
pub fn from_raw(raw: &'a str, prefix_len: usize) -> Self {
Self {
raw,
cursor: "*3\r\n".len(), //length of intro header
prefix_len,
}
}
/// Move the cursor from the beginning of a number through its end and return the number
pub fn process_number(&mut self) -> usize {
let (mut selected_number, selection_start) = (0, self.cursor);
while let Ok(number) = self.raw[selection_start..=self.cursor].parse::<usize>() {
self.cursor += 1;
selected_number = number;
}
selected_number
}
/// In a pubsub reply from Redis, an item can be either the name of the subscribed channel
/// or the msg payload. Either way, it follows the same format:
/// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n`
pub fn next_field(&mut self) -> String {
self.cursor += "$".len();
type Hashtags = LruCache<String, i64>;
impl RedisMsg {
pub fn from_raw<'a>(input: &'a str, cache: &mut Hashtags, prefix: usize) -> Parser<'a, Self> {
// No need to parse the Redis Array header, just skip it
let input = input.get("*3\r\n".len()..).ok_or(Incomplete)?;
let (command, rest) = parse_redis_bulk_string(&input)?;
match command {
"message" => {
// Messages look like;
// $10\r\ntimeline:4\r\n
// $1386\r\n{\"event\":\"update\",\"payload\"...\"queued_at\":1569623342825}\r\n
let (raw_timeline, rest) = parse_redis_bulk_string(&rest)?;
let (msg_txt, rest) = parse_redis_bulk_string(&rest)?;
let item_len = self.process_number();
self.cursor += "\r\n".len();
let item_start_position = self.cursor;
self.cursor += item_len;
let item = self.raw[item_start_position..self.cursor].to_string();
self.cursor += "\r\n".len();
item
}
pub fn extract_raw_timeline_and_message(&mut self) -> (String, Event) {
let timeline = &self.next_field()[self.prefix_len..];
let msg_txt = self.next_field();
let msg_value: Event = serde_json::from_str(&msg_txt)
.unwrap_or_else(|_| log_fatal!("Invalid JSON from Redis: {:?}", &msg_txt));
(timeline.to_string(), msg_value)
let raw_timeline = &raw_timeline.get(prefix..).ok_or(Unrecoverable)?;
let event: Event = serde_json::from_str(&msg_txt).unwrap();
let hashtag = hashtag_from_timeline(&raw_timeline, cache);
let timeline = Timeline::from_redis_raw_timeline(&raw_timeline, hashtag);
Ok((Self::EventMsg(timeline, event), rest))
}
"subscribe" | "unsubscribe" => {
// subscription statuses look like:
// $14\r\ntimeline:local\r\n
// :47\r\n
let (_raw_timeline, rest) = parse_redis_bulk_string(&rest)?;
let (_number_of_subscriptions, rest) = parse_redis_int(&rest)?;
Ok((Self::SubscriptionMsg, &rest))
}
_cmd => Err(Incomplete)?,
}
}
}
/// Parse a Redis bulk string and return the content of that string and the unparsed remainder.
///
/// All bulk strings have the format `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n`
fn parse_redis_bulk_string(input: &str) -> Parser<&str> {
let input = &input.get("$".len()..).ok_or(Incomplete)?;
let (field_len, rest) = parse_redis_length(input)?;
let field_content = rest.get(..field_len).ok_or(Incomplete)?;
Ok((field_content, &rest[field_len + "\r\n".len()..]))
}
fn parse_redis_int(input: &str) -> Parser<usize> {
let input = &input.get(":".len()..).ok_or(Incomplete)?;
let (number, rest_with_newline) = parse_number_at(input)?;
let rest = &rest_with_newline.get("\r\n".len()..).ok_or(Incomplete)?;
Ok((number, rest))
}
/// Return the value of a Redis length (for an array or bulk string) and the unparsed remainder
fn parse_redis_length(input: &str) -> Parser<usize> {
let (number, rest_with_newline) = parse_number_at(input)?;
let rest = &rest_with_newline.get("\r\n".len()..).ok_or(Incomplete)?;
Ok((number, rest))
}
fn parse_number_at(input: &str) -> Parser<usize> {
let number_len = input
.chars()
.position(|c| !c.is_numeric())
.ok_or(Unrecoverable)?;
let number = input[..number_len].parse().map_err(|_| Unrecoverable)?;
let rest = &input.get(number_len..).ok_or(Incomplete)?;
Ok((number, rest))
}
fn hashtag_from_timeline(raw_timeline: &str, hashtag_id_cache: &mut Hashtags) -> Option<i64> {
if raw_timeline.starts_with("hashtag") {
let tag_name = raw_timeline
.split(':')
.nth(1)
.unwrap_or_else(|| log_fatal!("No hashtag found in `{}`", raw_timeline))
.to_string();
let tag_id = *hashtag_id_cache
.get(&tag_name)
.unwrap_or_else(|| log_fatal!("No cached id for `{}`", tag_name));
Some(tag_id)
} else {
None
}
}

View File

@ -1,11 +1,10 @@
use super::redis_msg::RedisMsg;
use super::redis_msg::{ParseErr, RedisMsg};
use crate::config::RedisNamespace;
use crate::log_fatal;
use crate::parse_client_request::subscription::Timeline;
use crate::redis_to_client_stream::receiver::MessageQueues;
use futures::{Async, Poll};
use lru::LruCache;
use std::{io::Read, net};
use std::{error::Error, io::Read, net};
use tokio::io::AsyncRead;
#[derive(Debug)]
@ -26,10 +25,10 @@ impl RedisStream {
pub fn with_namespace(self, namespace: RedisNamespace) -> Self {
RedisStream { namespace, ..self }
}
// Text comes in from redis as a raw stream, which could be more than one message
// and is not guaranteed to end on a message boundary. We need to break it down
// into messages. Incoming messages *are* guaranteed to be RESP arrays,
// https://redis.io/topics/protocol
// Text comes in from redis as a raw stream, which could be more than one message and
// is not guaranteed to end on a message boundary. We need to break it down into
// messages. Incoming messages *are* guaranteed to be RESP arrays (though still not
// guaranteed to end at an array boundary). See https://redis.io/topics/protocol
/// Adds any new Redis messages to the `MsgQueue` for the appropriate `ClientAgent`.
pub fn poll_redis(
&mut self,
@ -39,8 +38,20 @@ impl RedisStream {
let mut buffer = vec![0u8; 6000];
if let Ok(Async::Ready(num_bytes_read)) = self.poll_read(&mut buffer) {
let raw_utf = self.as_utf8(buffer, num_bytes_read);
process_messages(raw_utf, &*self.namespace, hashtag_to_id_cache, queues);
self.incoming_raw_msg.clear();
self.incoming_raw_msg.push_str(&raw_utf);
match process_messages(
self.incoming_raw_msg.clone(),
&mut self.namespace.0,
hashtag_to_id_cache,
queues,
) {
Ok(None) => self.incoming_raw_msg.clear(),
Ok(Some(msg_fragment)) => self.incoming_raw_msg = msg_fragment,
Err(e) => {
log::error!("{}", e);
log_fatal!("Could not process RedisStream: {:?}", &self);
}
}
}
}
@ -54,77 +65,37 @@ impl RedisStream {
}
}
type HashtagCache = LruCache<String, i64>;
pub fn process_messages(
raw_utf: String,
namespace: &Option<String>,
hashtag_id_cache: &mut LruCache<String, i64>,
raw_msg: String,
namespace: &mut Option<String>,
cache: &mut HashtagCache,
queues: &mut MessageQueues,
) {
// Only act if we have a full message (end on a msg boundary)
if !raw_utf.ends_with("}\r\n") {
return;
};
let prefix_to_skip = match namespace {
Some(namespace) => format!("{}:timeline:", namespace),
None => "timeline:".to_string(),
) -> Result<Option<String>, Box<dyn Error>> {
let prefix_len = match namespace {
Some(namespace) => format!("{}:timeline:", namespace).len(),
None => "timeline:".len(),
};
let mut msg = RedisMsg::from_raw(&raw_utf, prefix_to_skip.len());
while !msg.raw.is_empty() {
let command = msg.next_field();
process_msg(command, &mut msg, hashtag_id_cache, queues);
msg = RedisMsg::from_raw(&msg.raw[msg.cursor..], msg.prefix_len);
}
}
pub fn process_msg(
command: String,
msg: &mut RedisMsg,
hashtag_id_cache: &mut LruCache<String, i64>,
queues: &mut MessageQueues,
) {
match command.as_str() {
"message" => {
let (raw_timeline, msg_value) = msg.extract_raw_timeline_and_message();
let hashtag = hashtag_from_timeline(&raw_timeline, hashtag_id_cache);
let timeline = Timeline::from_redis_str(&raw_timeline, hashtag);
for msg_queue in queues.values_mut() {
if msg_queue.timeline == timeline {
msg_queue.messages.push_back(msg_value.clone());
let mut input = raw_msg.as_str();
loop {
let rest = match RedisMsg::from_raw(&input, cache, prefix_len) {
Ok((RedisMsg::EventMsg(timeline, event), rest)) => {
for msg_queue in queues.values_mut() {
if msg_queue.timeline == timeline {
msg_queue.messages.push_back(event.clone());
}
}
rest
}
}
"subscribe" | "unsubscribe" => {
// No msg, so ignore & advance cursor to end
let _channel = msg.next_field();
msg.cursor += ":".len();
let _active_subscriptions = msg.process_number();
msg.cursor += "\r\n".len();
}
cmd => panic!("Invariant violation: {} is unexpected Redis output", cmd),
};
}
fn hashtag_from_timeline(
raw_timeline: &str,
hashtag_id_cache: &mut LruCache<String, i64>,
) -> Option<i64> {
if raw_timeline.starts_with("hashtag") {
let tag_name = raw_timeline
.split(':')
.nth(1)
.unwrap_or_else(|| log_fatal!("No hashtag found in `{}`", raw_timeline))
.to_string();
let tag_id = *hashtag_id_cache
.get(&tag_name)
.unwrap_or_else(|| log_fatal!("No cached id for `{}`", tag_name));
Some(tag_id)
} else {
None
Ok((RedisMsg::SubscriptionMsg, rest)) => rest,
Err(ParseErr::Incomplete) => break,
Err(ParseErr::Unrecoverable) => log_fatal!("Failed parsing Redis msg: {}", &input),
};
input = rest
}
Ok(Some(input.to_string()))
}
impl std::ops::Deref for RedisStream {

53
tc --explain E0106 Normal file
View File

@ -0,0 +1,53 @@
This error indicates that a lifetime is missing from a type. If it is an error
inside a function signature, the problem may be with failing to adhere to the
lifetime elision rules (see below).
Erroneous code examples:
```
struct Foo1 { x: &bool }
// ^ expected lifetime parameter
struct Foo2<'a> { x: &'a bool } // correct
struct Bar1 { x: Foo2 }
// ^^^^ expected lifetime parameter
struct Bar2<'a> { x: Foo2<'a> } // correct
enum Baz1 { A(u8), B(&bool), }
// ^ expected lifetime parameter
enum Baz2<'a> { A(u8), B(&'a bool), } // correct
type MyStr1 = &str;
// ^ expected lifetime parameter
type MyStr2<'a> = &'a str; // correct
```
Lifetime elision is a special, limited kind of inference for lifetimes in
function signatures which allows you to leave out lifetimes in certain cases.
For more background on lifetime elision see [the book][book-le].
The lifetime elision rules require that any function signature with an elided
output lifetime must either have:
- exactly one input lifetime
- or, multiple input lifetimes, but the function must also be a method with a
`&self` or `&mut self` receiver
In the first case, the output lifetime is inferred to be the same as the unique
input lifetime. In the second case, the lifetime is instead inferred to be the
same as the lifetime on `&self` or `&mut self`.
Here are some examples of elision errors:
```
// error, no input lifetimes
fn foo() -> &str { }
// error, `x` and `y` have distinct lifetimes inferred
fn bar(x: &str, y: &str) -> &str { }
// error, `y`'s lifetime is inferred to be distinct from `x`'s
fn baz<'a>(x: &'a str, y: &str) -> &str { }
```
[book-le]: https://doc.rust-lang.org/book/ch10-03-lifetime-syntax.html#lifetime-elision