diff --git a/Cargo.lock b/Cargo.lock index 43a4204..b94a5ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -386,7 +386,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.3.1" +version = "0.3.4" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 0a54719..5950a7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.3.1" +version = "0.3.4" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/benches/parse_redis.rs b/benches/parse_redis.rs index b4d0651..088eac0 100644 --- a/benches/parse_redis.rs +++ b/benches/parse_redis.rs @@ -70,17 +70,17 @@ fn parse_with_stuct(input: &str) -> Vec<(String, Value)> { while incoming_raw_msg.len() > 0 { let mut msg = RedisMsg::from_raw(incoming_raw_msg); - let command = msg.next_item(); + let command = msg.next_field(); match command.as_str() { "message" => { - let timeline = msg.next_item()["timeline:".len()..].to_string(); - let message: Value = serde_json::from_str(&msg.next_item()).unwrap(); + let timeline = msg.next_field()["timeline:".len()..].to_string(); + let message: Value = serde_json::from_str(&msg.next_field()).unwrap(); output.push((timeline, message)); } "subscribe" | "unsubscribe" => { // This returns a confirmation. We don't need to do anything with it, // but we do need to advance the cursor past it - msg.next_item(); // name of channel (un)subscribed + msg.next_field(); // name of channel (un)subscribed msg.cursor += ":".len(); msg.process_number(); // The number of active subscriptions msg.cursor += "\r\n".len(); diff --git a/src/redis_to_client_stream/redis_stream.rs b/src/redis_to_client_stream/redis_stream.rs index 6de5ab9..9c46618 100644 --- a/src/redis_to_client_stream/redis_stream.rs +++ b/src/redis_to_client_stream/redis_stream.rs @@ -6,36 +6,41 @@ use std::net; use tokio::io::AsyncRead; pub struct AsyncReadableStream<'a>(&'a mut net::TcpStream); + impl<'a> AsyncReadableStream<'a> { pub fn new(stream: &'a mut net::TcpStream) -> Self { - AsyncReadableStream(stream) + Self(stream) } + // Text comes in from redis as a raw stream, which could be more than one message + // and is not guaranteed to end on a message boundary. We need to break it down + // into messages. Incoming messages *are* guaranteed to be RESP arrays, + // https://redis.io/topics/protocol /// Adds any new Redis messages to the `MsgQueue` for the appropriate `ClientAgent`. pub fn poll_redis(receiver: &mut Receiver) { - let mut buffer = vec![0u8; 3000]; + let mut buffer = vec![0u8; 6000]; let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection); if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() { - let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]); - receiver.incoming_raw_msg.push_str(raw_redis_response); - // Text comes in from redis as a raw stream, which could be more than one message - // and is not guaranteed to end on a message boundary. We need to break it down - // into messages. Incoming messages *are* guaranteed to be RESP arrays, - // https://redis.io/topics/protocol + let raw_redis_response = async_stream.to_utf8(buffer, num_bytes_read); + receiver.incoming_raw_msg.push_str(&raw_redis_response); // Only act if we have a full message (end on a msg boundary) if !receiver.incoming_raw_msg.ends_with("}\r\n") { return; }; let mut msg = RedisMsg::from_raw(&receiver.incoming_raw_msg); + while !msg.raw.is_empty() { - let command = msg.next_item(); + let command = msg.next_field(); match command.as_str() { "message" => { - let timeline = &msg.next_item()["timeline:".len()..]; - let msg_txt = &msg.next_item(); - let msg_value: Value = serde_json::from_str(msg_txt).expect("Redis json"); + let timeline = &msg.next_field()["timeline:".len()..]; + let msg_txt = &msg.next_field(); + let msg_value: Value = match serde_json::from_str(msg_txt) { + Ok(v) => v, + Err(e) => panic!("Unparseable json {}\n\n{}", msg_txt, e), + }; for msg_queue in receiver.msg_queues.values_mut() { if msg_queue.redis_channel == timeline { msg_queue.messages.push_back(msg_value.clone()); @@ -43,20 +48,28 @@ impl<'a> AsyncReadableStream<'a> { } } "subscribe" | "unsubscribe" => { - // This returns a confirmation that we ignore, but must advance - // the cursor past - let _channel = msg.next_item(); + // No msg, so ignore & advance cursor to end + let _channel = msg.next_field(); msg.cursor += ":".len(); let _active_subscriptions = msg.process_number(); msg.cursor += "\r\n".len(); } - cmd => panic!("Invariant violation: {} is bad Redis input", cmd), - } + cmd => panic!("Invariant violation: {} is invalid Redis input", cmd), + }; msg = RedisMsg::from_raw(&msg.raw[msg.cursor..]); } receiver.incoming_raw_msg.clear(); } } + + fn to_utf8(&mut self, cur_buffer: Vec, size: usize) -> String { + String::from_utf8(cur_buffer[..size].to_vec()).unwrap_or_else(|_| { + let mut new_buffer = vec![0u8; 1]; + self.poll_read(&mut new_buffer).unwrap(); + let buffer = ([cur_buffer, new_buffer]).concat(); + self.to_utf8(buffer, size + 1) + }) + } } impl<'a> Read for AsyncReadableStream<'a> { @@ -74,23 +87,23 @@ impl<'a> AsyncRead for AsyncReadableStream<'a> { } } -#[derive(Default)] +#[derive(Debug)] pub struct RedisMsg<'a> { pub raw: &'a str, pub cursor: usize, } + impl<'a> RedisMsg<'a> { pub fn from_raw(raw: &'a str) -> Self { Self { raw, cursor: "*3\r\n".len(), //length of intro header - ..Self::default() } } /// Move the cursor from the beginning of a number through its end and return the number pub fn process_number(&mut self) -> usize { let (mut selected_number, selection_start) = (0, self.cursor); - while let Ok(number) = self.raw[selection_start..self.cursor + 1].parse::() { + while let Ok(number) = self.raw[selection_start..=self.cursor].parse::() { self.cursor += 1; selected_number = number; } @@ -99,8 +112,9 @@ impl<'a> RedisMsg<'a> { /// In a pubsub reply from Redis, an item can be either the name of the subscribed channel /// or the msg payload. Either way, it follows the same format: /// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n` - pub fn next_item(&mut self) -> String { + pub fn next_field(&mut self) -> String { self.cursor += "$".len(); + let item_len = self.process_number(); self.cursor += "\r\n".len(); let item_start_position = self.cursor; @@ -110,6 +124,7 @@ impl<'a> RedisMsg<'a> { item } } + #[cfg(test)] mod test { use super::*; @@ -118,9 +133,9 @@ mod test { fn simple_redis_parse() { let input = "*3\r\n$9\r\nSUBSCRIBE\r\n$10\r\ntimeline:1\r\n:1\r\n"; let mut msg = RedisMsg::from_raw(input); - let cmd = msg.next_item(); + let cmd = msg.next_field(); assert_eq!(&cmd, "SUBSCRIBE"); - let timeline = msg.next_item(); + let timeline = msg.next_field(); assert_eq!(&timeline, "timeline:1"); msg.cursor += ":1\r\n".len(); assert_eq!(msg.cursor, input.len()); @@ -130,11 +145,11 @@ mod test { fn realistic_redis_parse() { let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:4\r\n$1386\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102866835379605039\",\"created_at\":\"2019-09-27T22:29:02.590Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"http://localhost:3000/users/admin/statuses/102866835379605039\",\"url\":\"http://localhost:3000/@admin/102866835379605039\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"

@susan hi

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"1\",\"username\":\"admin\",\"acct\":\"admin\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-07-04T00:21:05.890Z\",\"note\":\"

\",\"url\":\"http://localhost:3000/@admin\",\"avatar\":\"http://localhost:3000/avatars/original/missing.png\",\"avatar_static\":\"http://localhost:3000/avatars/original/missing.png\",\"header\":\"http://localhost:3000/headers/original/missing.png\",\"header_static\":\"http://localhost:3000/headers/original/missing.png\",\"followers_count\":3,\"following_count\":3,\"statuses_count\":192,\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"4\",\"username\":\"susan\",\"url\":\"http://localhost:3000/@susan\",\"acct\":\"susan\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1569623342825}\r\n"; let mut msg = RedisMsg::from_raw(input); - let cmd = msg.next_item(); + let cmd = msg.next_field(); assert_eq!(&cmd, "message"); - let timeline = msg.next_item(); + let timeline = msg.next_field(); assert_eq!(&timeline, "timeline:4"); - let message_str = msg.next_item(); + let message_str = msg.next_field(); assert_eq!(message_str, input[41..input.len() - 2]); assert_eq!(msg.cursor, input.len()); }