diff --git a/benches/parse_redis.rs b/benches/parse_redis.rs index d05d911..cd5f30e 100644 --- a/benches/parse_redis.rs +++ b/benches/parse_redis.rs @@ -2,6 +2,7 @@ use criterion::black_box; use criterion::criterion_group; use criterion::criterion_main; use criterion::Criterion; +use flodgatt::redis_to_client_stream::redis_stream::RedisMsg; use regex::Regex; use serde_json::Value; @@ -62,6 +63,38 @@ fn print_next_str(mut end: usize, input: &str) -> (usize, String) { let string = &input[start..end]; (end, string.to_string()) } + +fn parse_with_stuct(input: String) -> Vec<(String, Value)> { + let mut output = Vec::new(); + let mut incoming_raw_msg = input; + + while incoming_raw_msg.len() > 0 { + let mut msg = RedisMsg::from_raw(incoming_raw_msg.clone()); + let command = msg.get_next_item(); + match command.as_str() { + "message" => { + let timeline = msg.get_next_item()["timeline:".len()..].to_string(); + let message: Value = serde_json::from_str(&msg.get_next_item()).unwrap(); + output.push((timeline, message)); + } + "subscribe" | "unsubscribe" => { + // This returns a confirmation. We don't need to do anything with it, + // but we do need to advance the cursor past it + msg.get_next_item(); // name of channel (un)subscribed + msg.cursor += ":".len(); + msg.process_number(); // The number of active subscriptions + msg.cursor += "\r\n".len(); + } + cmd => panic!( + "Invariant violation: bad Redis input. Got {} as a command", + cmd + ), + } + incoming_raw_msg = msg.raw[msg.cursor..].to_string(); + } + output +} + fn criterion_benchmark(c: &mut Criterion) { let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:1\r\n$3790\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102775370117886890\",\"created_at\":\"2019-09-11T18:42:19.000Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"unlisted\",\"language\":\"en\",\"uri\":\"https://mastodon.host/users/federationbot/statuses/102775346916917099\",\"url\":\"https://mastodon.host/@federationbot/102775346916917099\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"

Trending tags:
#neverforget
#4styles
#newpipe
#uber
#mercredifiction

\",\"reblog\":null,\"account\":{\"id\":\"78\",\"username\":\"federationbot\",\"acct\":\"federationbot@mastodon.host\",\"display_name\":\"Federation Bot\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-09-10T15:04:25.559Z\",\"note\":\"

Hello, I am mastodon.host official semi bot.

Follow me if you want to have some updates on the view of the fediverse from here ( I only post unlisted ).

I also randomly boost one of my followers toot every hour !

If you don\'t feel confortable with me following you, tell me: unfollow and I\'ll do it :)

If you want me to follow you, just tell me follow !

If you want automatic follow for new users on your instance and you are an instance admin, contact me !

Other commands are private :)

\",\"url\":\"https://mastodon.host/@federationbot\",\"avatar\":\"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863\",\"avatar_static\":\"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":16636,\"following_count\":179532,\"statuses_count\":50554,\"emojis\":[],\"fields\":[{\"name\":\"More stats\",\"value\":\"https://mastodon.host/stats.html\",\"verified_at\":null},{\"name\":\"More infos\",\"value\":\"https://mastodon.host/about/more\",\"verified_at\":null},{\"name\":\"Owner/Friend\",\"value\":\"@gled\",\"verified_at\":null}]},\"media_attachments\":[],\"mentions\":[],\"tags\":[{\"name\":\"4styles\",\"url\":\"https://instance.codesections.com/tags/4styles\"},{\"name\":\"neverforget\",\"url\":\"https://instance.codesections.com/tags/neverforget\"},{\"name\":\"mercredifiction\",\"url\":\"https://instance.codesections.com/tags/mercredifiction\"},{\"name\":\"uber\",\"url\":\"https://instance.codesections.com/tags/uber\"},{\"name\":\"newpipe\",\"url\":\"https://instance.codesections.com/tags/newpipe\"}],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1568227693541}\r\n".to_string(); @@ -72,6 +105,9 @@ fn criterion_benchmark(c: &mut Criterion) { group.bench_function("hand parse", |b| { b.iter(|| hand_parse(black_box(input.clone()))) }); + group.bench_function("stuct parse", |b| { + b.iter(|| parse_with_stuct(black_box(input.clone()))) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index 29160d6..3040bf5 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -2,6 +2,7 @@ pub mod client_agent; pub mod receiver; pub mod redis_cmd; +pub mod redis_stream; use crate::config; pub use client_agent::ClientAgent; diff --git a/src/redis_to_client_stream/receiver.rs b/src/redis_to_client_stream/receiver.rs index 111d3cd..55bab4e 100644 --- a/src/redis_to_client_stream/receiver.rs +++ b/src/redis_to_client_stream/receiver.rs @@ -1,27 +1,26 @@ //! Receives data from Redis, sorts it by `ClientAgent`, and stores it until //! polled by the correct `ClientAgent`. Also manages sububscriptions and //! unsubscriptions to/from Redis. -use super::redis_cmd; +use super::{redis_cmd, redis_stream}; use crate::{config, pubsub_cmd}; use futures::{Async, Poll}; use log::info; -use regex::Regex; use serde_json::Value; -use std::{collections, io::Read, io::Write, net, time}; -use tokio::io::{AsyncRead, Error}; +use std::{collections, io::Write, net, time}; +use tokio::io::Error; use uuid::Uuid; /// The item that streams from Redis and is polled by the `ClientAgent` #[derive(Debug)] pub struct Receiver { - pubsub_connection: net::TcpStream, + pub pubsub_connection: net::TcpStream, secondary_redis_connection: net::TcpStream, redis_polled_at: time::Instant, timeline: String, manager_id: Uuid, - msg_queues: collections::HashMap, + pub msg_queues: collections::HashMap, clients_per_timeline: collections::HashMap, - incoming_raw_msg: String, + pub incoming_raw_msg: String, } impl Receiver { @@ -157,7 +156,7 @@ impl futures::stream::Stream for Receiver { if self.redis_polled_at.elapsed() > time::Duration::from_millis(*config::REDIS_POLL_INTERVAL) { - AsyncReadableStream::poll_redis(self); + redis_stream::AsyncReadableStream::poll_redis(self); self.redis_polled_at = time::Instant::now(); } @@ -188,10 +187,10 @@ impl Drop for Receiver { } #[derive(Debug, Clone)] -struct MsgQueue { - messages: collections::VecDeque, +pub struct MsgQueue { + pub messages: collections::VecDeque, last_polled_at: time::Instant, - redis_channel: String, + pub redis_channel: String, } impl MsgQueue { @@ -204,69 +203,3 @@ impl MsgQueue { } } } - -struct AsyncReadableStream<'a>(&'a mut net::TcpStream); -impl<'a> AsyncReadableStream<'a> { - fn new(stream: &'a mut net::TcpStream) -> Self { - AsyncReadableStream(stream) - } - /// Polls Redis for any new messages and adds them to the `MsgQueue` for - /// the appropriate `ClientAgent`. - fn poll_redis(receiver: &mut Receiver) { - let mut buffer = vec![0u8; 3000]; - - let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection); - if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() { - let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]); - dbg!(&raw_redis_response); - receiver.incoming_raw_msg.push_str(raw_redis_response); - // Text comes in from redis as a raw stream, which could be more than one message - // and is not guaranteed to end on a message boundary. We need to break it down - // into messages. First, start by only acting if we end on a valid message boundary - if receiver.incoming_raw_msg.ends_with("}\r\n") { - // Every valid message is tagged with the string `message`. This means 3 things: - // 1) We can discard everything before the first `message` (with `skip(1)`) - // 2) We can split into separate messages by splitting on `message` - // 3) We can use a regex that discards everything after the *first* valid - // message (since the next message will have a new `message` tag) - let messages = receiver.incoming_raw_msg.as_str().split("message").skip(1); - let regex = - Regex::new(r"timeline:(?P.*?)\r\n\$\d+\r\n(?P.*?)\r\n") - .expect("Hard-codded"); - for message in messages { - let timeline = regex.captures(message).expect("Hard-coded timeline regex") - ["timeline"] - .to_string(); - - let redis_msg: Value = serde_json::from_str( - ®ex.captures(message).expect("Hard-coded value regex")["value"], - ) - .expect("Valid json"); - - for msg_queue in receiver.msg_queues.values_mut() { - if msg_queue.redis_channel == timeline { - msg_queue.messages.push_back(redis_msg.clone()); - } - } - } - // We've processed this raw msg and can safely discard it - receiver.incoming_raw_msg.clear(); - } - } - } -} - -impl<'a> Read for AsyncReadableStream<'a> { - fn read(&mut self, buffer: &mut [u8]) -> Result { - self.0.read(buffer) - } -} - -impl<'a> AsyncRead for AsyncReadableStream<'a> { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll { - match self.read(buf) { - Ok(t) => Ok(Async::Ready(t)), - Err(_) => Ok(Async::NotReady), - } - } -} diff --git a/src/redis_to_client_stream/redis_stream.rs b/src/redis_to_client_stream/redis_stream.rs new file mode 100644 index 0000000..8d02a6a --- /dev/null +++ b/src/redis_to_client_stream/redis_stream.rs @@ -0,0 +1,147 @@ +use super::receiver::Receiver; +use futures::{Async, Poll}; +use serde_json::Value; +use std::io::Read; +use std::net; +use tokio::io::AsyncRead; + +pub struct AsyncReadableStream<'a>(&'a mut net::TcpStream); +impl<'a> AsyncReadableStream<'a> { + pub fn new(stream: &'a mut net::TcpStream) -> Self { + AsyncReadableStream(stream) + } + /// Polls Redis for any new messages and adds them to the `MsgQueue` for + /// the appropriate `ClientAgent`. + pub fn poll_redis(receiver: &mut Receiver) { + let mut buffer = vec![0u8; 3000]; + + let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection); + if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() { + let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]); + receiver.incoming_raw_msg.push_str(raw_redis_response); + // Text comes in from redis as a raw stream, which could be more than one message + // and is not guaranteed to end on a message boundary. We need to break it down + // into messages. Incoming messages *are* guaranteed to be RESP arrays, + // https://redis.io/topics/protocol + + // Only act if we have a full message (end on a msg boundary) + if !receiver.incoming_raw_msg.ends_with("}\r\n") { + return; + }; + while receiver.incoming_raw_msg.len() > 0 { + let mut msg = RedisMsg::from_raw(receiver.incoming_raw_msg.clone()); + let command = msg.get_next_item(); + match command.as_str() { + "message" => { + let timeline = msg.get_next_item()["timeline:".len()..].to_string(); + let message: Value = serde_json::from_str(&msg.get_next_item()).unwrap(); + for msg_queue in receiver.msg_queues.values_mut() { + if msg_queue.redis_channel == timeline { + msg_queue.messages.push_back(message.clone()); + } + } + } + "subscribe" | "unsubscribe" => { + // This returns a confirmation. We don't need to do anything with it, + // but we do need to advance the cursor past it + msg.get_next_item(); // name of channel (un)subscribed + msg.cursor += ":".len(); + msg.process_number(); // The number of active subscriptions + msg.cursor += "\r\n".len(); + } + cmd => panic!( + "Invariant violation: bad Redis input. Got {} as a command", + cmd + ), + } + receiver.incoming_raw_msg = msg.raw[msg.cursor..].to_string(); + } + } + } +} + +impl<'a> Read for AsyncReadableStream<'a> { + fn read(&mut self, buffer: &mut [u8]) -> Result { + self.0.read(buffer) + } +} + +impl<'a> AsyncRead for AsyncReadableStream<'a> { + fn poll_read(&mut self, buf: &mut [u8]) -> Poll { + match self.read(buf) { + Ok(t) => Ok(Async::Ready(t)), + Err(_) => Ok(Async::NotReady), + } + } +} + +#[derive(Default)] +pub struct RedisMsg { + pub raw: String, + pub cursor: usize, +} +impl RedisMsg { + pub fn from_raw(raw: String) -> Self { + Self { + raw, + cursor: "*3\r\n".len(), //length of intro header + ..Self::default() + } + } + /// Move the cursor from the beginning of a number through its end and return the number + pub fn process_number(&mut self) -> usize { + let mut selection_end = self.cursor + 1; + let mut chars = self.raw.chars(); + chars.nth(self.cursor); + while chars.next().expect("still in str").is_digit(10) { + selection_end += 1; + } + let selected_number = self.raw[self.cursor..selection_end] + .parse::() + .expect("checked with `.is_digit(10)`"); + self.cursor = selection_end; + selected_number + } + /// In a pubsub reply from Redis, an item can be either the name of the subscribed channel + /// or the msg payload. Either way, it follows the same format: + /// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n` + pub fn get_next_item(&mut self) -> String { + self.cursor += "$".len(); + let item_len = self.process_number(); + self.cursor += "\r\n".len(); + let item_start_position = self.cursor; + self.cursor += item_len; + let item = self.raw[item_start_position..self.cursor].to_string(); + self.cursor += "\r\n".len(); + item + } +} +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn simple_redis_parse() { + let input = "*3\r\n$9\r\nSUBSCRIBE\r\n$10\r\ntimeline:1\r\n:1\r\n"; + let mut msg = RedisMsg::from_raw(input.to_string()); + let cmd = msg.get_next_item(); + assert_eq!(&cmd, "SUBSCRIBE"); + let timeline = msg.get_next_item(); + assert_eq!(&timeline, "timeline:1"); + msg.cursor += ":1\r\n".len(); + assert_eq!(msg.cursor, input.len()); + } + + #[test] + fn realistic_redis_parse() { + let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:4\r\n$1386\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102866835379605039\",\"created_at\":\"2019-09-27T22:29:02.590Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"http://localhost:3000/users/admin/statuses/102866835379605039\",\"url\":\"http://localhost:3000/@admin/102866835379605039\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"

@susan hi

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"1\",\"username\":\"admin\",\"acct\":\"admin\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-07-04T00:21:05.890Z\",\"note\":\"

\",\"url\":\"http://localhost:3000/@admin\",\"avatar\":\"http://localhost:3000/avatars/original/missing.png\",\"avatar_static\":\"http://localhost:3000/avatars/original/missing.png\",\"header\":\"http://localhost:3000/headers/original/missing.png\",\"header_static\":\"http://localhost:3000/headers/original/missing.png\",\"followers_count\":3,\"following_count\":3,\"statuses_count\":192,\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"4\",\"username\":\"susan\",\"url\":\"http://localhost:3000/@susan\",\"acct\":\"susan\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1569623342825}\r\n"; + let mut msg = RedisMsg::from_raw(input.to_string()); + let cmd = msg.get_next_item(); + assert_eq!(&cmd, "message"); + let timeline = msg.get_next_item(); + assert_eq!(&timeline, "timeline:4"); + let message_str = msg.get_next_item(); + assert_eq!(message_str, input[41..input.len() - 2]); + assert_eq!(msg.cursor, input.len()); + } +}