From fc7feb6694f5821a303a61c6e4ff48a7d8aca5b6 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Sat, 28 Sep 2019 12:27:45 -0400 Subject: [PATCH] Speed improvment to redis_parse (#52) --- benches/parse_redis.rs | 16 ++--- src/redis_to_client_stream/redis_stream.rs | 72 ++++++++++------------ 2 files changed, 41 insertions(+), 47 deletions(-) diff --git a/benches/parse_redis.rs b/benches/parse_redis.rs index cd5f30e..b4d0651 100644 --- a/benches/parse_redis.rs +++ b/benches/parse_redis.rs @@ -64,23 +64,23 @@ fn print_next_str(mut end: usize, input: &str) -> (usize, String) { (end, string.to_string()) } -fn parse_with_stuct(input: String) -> Vec<(String, Value)> { +fn parse_with_stuct(input: &str) -> Vec<(String, Value)> { let mut output = Vec::new(); let mut incoming_raw_msg = input; while incoming_raw_msg.len() > 0 { - let mut msg = RedisMsg::from_raw(incoming_raw_msg.clone()); - let command = msg.get_next_item(); + let mut msg = RedisMsg::from_raw(incoming_raw_msg); + let command = msg.next_item(); match command.as_str() { "message" => { - let timeline = msg.get_next_item()["timeline:".len()..].to_string(); - let message: Value = serde_json::from_str(&msg.get_next_item()).unwrap(); + let timeline = msg.next_item()["timeline:".len()..].to_string(); + let message: Value = serde_json::from_str(&msg.next_item()).unwrap(); output.push((timeline, message)); } "subscribe" | "unsubscribe" => { // This returns a confirmation. We don't need to do anything with it, // but we do need to advance the cursor past it - msg.get_next_item(); // name of channel (un)subscribed + msg.next_item(); // name of channel (un)subscribed msg.cursor += ":".len(); msg.process_number(); // The number of active subscriptions msg.cursor += "\r\n".len(); @@ -90,7 +90,7 @@ fn parse_with_stuct(input: String) -> Vec<(String, Value)> { cmd ), } - incoming_raw_msg = msg.raw[msg.cursor..].to_string(); + incoming_raw_msg = &msg.raw[msg.cursor..]; } output } @@ -106,7 +106,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| hand_parse(black_box(input.clone()))) }); group.bench_function("stuct parse", |b| { - b.iter(|| parse_with_stuct(black_box(input.clone()))) + b.iter(|| parse_with_stuct(black_box(&input))) }); } diff --git a/src/redis_to_client_stream/redis_stream.rs b/src/redis_to_client_stream/redis_stream.rs index 8d02a6a..6de5ab9 100644 --- a/src/redis_to_client_stream/redis_stream.rs +++ b/src/redis_to_client_stream/redis_stream.rs @@ -10,12 +10,12 @@ impl<'a> AsyncReadableStream<'a> { pub fn new(stream: &'a mut net::TcpStream) -> Self { AsyncReadableStream(stream) } - /// Polls Redis for any new messages and adds them to the `MsgQueue` for - /// the appropriate `ClientAgent`. + + /// Adds any new Redis messages to the `MsgQueue` for the appropriate `ClientAgent`. pub fn poll_redis(receiver: &mut Receiver) { let mut buffer = vec![0u8; 3000]; - let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection); + if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() { let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]); receiver.incoming_raw_msg.push_str(raw_redis_response); @@ -28,34 +28,33 @@ impl<'a> AsyncReadableStream<'a> { if !receiver.incoming_raw_msg.ends_with("}\r\n") { return; }; - while receiver.incoming_raw_msg.len() > 0 { - let mut msg = RedisMsg::from_raw(receiver.incoming_raw_msg.clone()); - let command = msg.get_next_item(); + let mut msg = RedisMsg::from_raw(&receiver.incoming_raw_msg); + while !msg.raw.is_empty() { + let command = msg.next_item(); match command.as_str() { "message" => { - let timeline = msg.get_next_item()["timeline:".len()..].to_string(); - let message: Value = serde_json::from_str(&msg.get_next_item()).unwrap(); + let timeline = &msg.next_item()["timeline:".len()..]; + let msg_txt = &msg.next_item(); + let msg_value: Value = serde_json::from_str(msg_txt).expect("Redis json"); for msg_queue in receiver.msg_queues.values_mut() { if msg_queue.redis_channel == timeline { - msg_queue.messages.push_back(message.clone()); + msg_queue.messages.push_back(msg_value.clone()); } } } "subscribe" | "unsubscribe" => { - // This returns a confirmation. We don't need to do anything with it, - // but we do need to advance the cursor past it - msg.get_next_item(); // name of channel (un)subscribed + // This returns a confirmation that we ignore, but must advance + // the cursor past + let _channel = msg.next_item(); msg.cursor += ":".len(); - msg.process_number(); // The number of active subscriptions + let _active_subscriptions = msg.process_number(); msg.cursor += "\r\n".len(); } - cmd => panic!( - "Invariant violation: bad Redis input. Got {} as a command", - cmd - ), + cmd => panic!("Invariant violation: {} is bad Redis input", cmd), } - receiver.incoming_raw_msg = msg.raw[msg.cursor..].to_string(); + msg = RedisMsg::from_raw(&msg.raw[msg.cursor..]); } + receiver.incoming_raw_msg.clear(); } } } @@ -76,12 +75,12 @@ impl<'a> AsyncRead for AsyncReadableStream<'a> { } #[derive(Default)] -pub struct RedisMsg { - pub raw: String, +pub struct RedisMsg<'a> { + pub raw: &'a str, pub cursor: usize, } -impl RedisMsg { - pub fn from_raw(raw: String) -> Self { +impl<'a> RedisMsg<'a> { + pub fn from_raw(raw: &'a str) -> Self { Self { raw, cursor: "*3\r\n".len(), //length of intro header @@ -90,22 +89,17 @@ impl RedisMsg { } /// Move the cursor from the beginning of a number through its end and return the number pub fn process_number(&mut self) -> usize { - let mut selection_end = self.cursor + 1; - let mut chars = self.raw.chars(); - chars.nth(self.cursor); - while chars.next().expect("still in str").is_digit(10) { - selection_end += 1; + let (mut selected_number, selection_start) = (0, self.cursor); + while let Ok(number) = self.raw[selection_start..self.cursor + 1].parse::() { + self.cursor += 1; + selected_number = number; } - let selected_number = self.raw[self.cursor..selection_end] - .parse::() - .expect("checked with `.is_digit(10)`"); - self.cursor = selection_end; selected_number } /// In a pubsub reply from Redis, an item can be either the name of the subscribed channel /// or the msg payload. Either way, it follows the same format: /// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n` - pub fn get_next_item(&mut self) -> String { + pub fn next_item(&mut self) -> String { self.cursor += "$".len(); let item_len = self.process_number(); self.cursor += "\r\n".len(); @@ -123,10 +117,10 @@ mod test { #[test] fn simple_redis_parse() { let input = "*3\r\n$9\r\nSUBSCRIBE\r\n$10\r\ntimeline:1\r\n:1\r\n"; - let mut msg = RedisMsg::from_raw(input.to_string()); - let cmd = msg.get_next_item(); + let mut msg = RedisMsg::from_raw(input); + let cmd = msg.next_item(); assert_eq!(&cmd, "SUBSCRIBE"); - let timeline = msg.get_next_item(); + let timeline = msg.next_item(); assert_eq!(&timeline, "timeline:1"); msg.cursor += ":1\r\n".len(); assert_eq!(msg.cursor, input.len()); @@ -135,12 +129,12 @@ mod test { #[test] fn realistic_redis_parse() { let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:4\r\n$1386\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102866835379605039\",\"created_at\":\"2019-09-27T22:29:02.590Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"http://localhost:3000/users/admin/statuses/102866835379605039\",\"url\":\"http://localhost:3000/@admin/102866835379605039\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"

@susan hi

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"1\",\"username\":\"admin\",\"acct\":\"admin\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-07-04T00:21:05.890Z\",\"note\":\"

\",\"url\":\"http://localhost:3000/@admin\",\"avatar\":\"http://localhost:3000/avatars/original/missing.png\",\"avatar_static\":\"http://localhost:3000/avatars/original/missing.png\",\"header\":\"http://localhost:3000/headers/original/missing.png\",\"header_static\":\"http://localhost:3000/headers/original/missing.png\",\"followers_count\":3,\"following_count\":3,\"statuses_count\":192,\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"4\",\"username\":\"susan\",\"url\":\"http://localhost:3000/@susan\",\"acct\":\"susan\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1569623342825}\r\n"; - let mut msg = RedisMsg::from_raw(input.to_string()); - let cmd = msg.get_next_item(); + let mut msg = RedisMsg::from_raw(input); + let cmd = msg.next_item(); assert_eq!(&cmd, "message"); - let timeline = msg.get_next_item(); + let timeline = msg.next_item(); assert_eq!(&timeline, "timeline:4"); - let message_str = msg.get_next_item(); + let message_str = msg.next_item(); assert_eq!(message_str, input[41..input.len() - 2]); assert_eq!(msg.cursor, input.len()); }