Speed improvment to redis_parse (#52)

This commit is contained in:
Daniel Sockwell 2019-09-28 12:27:45 -04:00 committed by GitHub
parent 2ac08c5316
commit fc7feb6694
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 47 deletions

View File

@ -64,23 +64,23 @@ fn print_next_str(mut end: usize, input: &str) -> (usize, String) {
(end, string.to_string()) (end, string.to_string())
} }
fn parse_with_stuct(input: String) -> Vec<(String, Value)> { fn parse_with_stuct(input: &str) -> Vec<(String, Value)> {
let mut output = Vec::new(); let mut output = Vec::new();
let mut incoming_raw_msg = input; let mut incoming_raw_msg = input;
while incoming_raw_msg.len() > 0 { while incoming_raw_msg.len() > 0 {
let mut msg = RedisMsg::from_raw(incoming_raw_msg.clone()); let mut msg = RedisMsg::from_raw(incoming_raw_msg);
let command = msg.get_next_item(); let command = msg.next_item();
match command.as_str() { match command.as_str() {
"message" => { "message" => {
let timeline = msg.get_next_item()["timeline:".len()..].to_string(); let timeline = msg.next_item()["timeline:".len()..].to_string();
let message: Value = serde_json::from_str(&msg.get_next_item()).unwrap(); let message: Value = serde_json::from_str(&msg.next_item()).unwrap();
output.push((timeline, message)); output.push((timeline, message));
} }
"subscribe" | "unsubscribe" => { "subscribe" | "unsubscribe" => {
// This returns a confirmation. We don't need to do anything with it, // This returns a confirmation. We don't need to do anything with it,
// but we do need to advance the cursor past it // but we do need to advance the cursor past it
msg.get_next_item(); // name of channel (un)subscribed msg.next_item(); // name of channel (un)subscribed
msg.cursor += ":".len(); msg.cursor += ":".len();
msg.process_number(); // The number of active subscriptions msg.process_number(); // The number of active subscriptions
msg.cursor += "\r\n".len(); msg.cursor += "\r\n".len();
@ -90,7 +90,7 @@ fn parse_with_stuct(input: String) -> Vec<(String, Value)> {
cmd cmd
), ),
} }
incoming_raw_msg = msg.raw[msg.cursor..].to_string(); incoming_raw_msg = &msg.raw[msg.cursor..];
} }
output output
} }
@ -106,7 +106,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| hand_parse(black_box(input.clone()))) b.iter(|| hand_parse(black_box(input.clone())))
}); });
group.bench_function("stuct parse", |b| { group.bench_function("stuct parse", |b| {
b.iter(|| parse_with_stuct(black_box(input.clone()))) b.iter(|| parse_with_stuct(black_box(&input)))
}); });
} }

View File

@ -10,12 +10,12 @@ impl<'a> AsyncReadableStream<'a> {
pub fn new(stream: &'a mut net::TcpStream) -> Self { pub fn new(stream: &'a mut net::TcpStream) -> Self {
AsyncReadableStream(stream) AsyncReadableStream(stream)
} }
/// Polls Redis for any new messages and adds them to the `MsgQueue` for
/// the appropriate `ClientAgent`. /// Adds any new Redis messages to the `MsgQueue` for the appropriate `ClientAgent`.
pub fn poll_redis(receiver: &mut Receiver) { pub fn poll_redis(receiver: &mut Receiver) {
let mut buffer = vec![0u8; 3000]; let mut buffer = vec![0u8; 3000];
let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection); let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection);
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() { if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() {
let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]); let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]);
receiver.incoming_raw_msg.push_str(raw_redis_response); receiver.incoming_raw_msg.push_str(raw_redis_response);
@ -28,34 +28,33 @@ impl<'a> AsyncReadableStream<'a> {
if !receiver.incoming_raw_msg.ends_with("}\r\n") { if !receiver.incoming_raw_msg.ends_with("}\r\n") {
return; return;
}; };
while receiver.incoming_raw_msg.len() > 0 { let mut msg = RedisMsg::from_raw(&receiver.incoming_raw_msg);
let mut msg = RedisMsg::from_raw(receiver.incoming_raw_msg.clone()); while !msg.raw.is_empty() {
let command = msg.get_next_item(); let command = msg.next_item();
match command.as_str() { match command.as_str() {
"message" => { "message" => {
let timeline = msg.get_next_item()["timeline:".len()..].to_string(); let timeline = &msg.next_item()["timeline:".len()..];
let message: Value = serde_json::from_str(&msg.get_next_item()).unwrap(); let msg_txt = &msg.next_item();
let msg_value: Value = serde_json::from_str(msg_txt).expect("Redis json");
for msg_queue in receiver.msg_queues.values_mut() { for msg_queue in receiver.msg_queues.values_mut() {
if msg_queue.redis_channel == timeline { if msg_queue.redis_channel == timeline {
msg_queue.messages.push_back(message.clone()); msg_queue.messages.push_back(msg_value.clone());
} }
} }
} }
"subscribe" | "unsubscribe" => { "subscribe" | "unsubscribe" => {
// This returns a confirmation. We don't need to do anything with it, // This returns a confirmation that we ignore, but must advance
// but we do need to advance the cursor past it // the cursor past
msg.get_next_item(); // name of channel (un)subscribed let _channel = msg.next_item();
msg.cursor += ":".len(); msg.cursor += ":".len();
msg.process_number(); // The number of active subscriptions let _active_subscriptions = msg.process_number();
msg.cursor += "\r\n".len(); msg.cursor += "\r\n".len();
} }
cmd => panic!( cmd => panic!("Invariant violation: {} is bad Redis input", cmd),
"Invariant violation: bad Redis input. Got {} as a command",
cmd
),
} }
receiver.incoming_raw_msg = msg.raw[msg.cursor..].to_string(); msg = RedisMsg::from_raw(&msg.raw[msg.cursor..]);
} }
receiver.incoming_raw_msg.clear();
} }
} }
} }
@ -76,12 +75,12 @@ impl<'a> AsyncRead for AsyncReadableStream<'a> {
} }
#[derive(Default)] #[derive(Default)]
pub struct RedisMsg { pub struct RedisMsg<'a> {
pub raw: String, pub raw: &'a str,
pub cursor: usize, pub cursor: usize,
} }
impl RedisMsg { impl<'a> RedisMsg<'a> {
pub fn from_raw(raw: String) -> Self { pub fn from_raw(raw: &'a str) -> Self {
Self { Self {
raw, raw,
cursor: "*3\r\n".len(), //length of intro header cursor: "*3\r\n".len(), //length of intro header
@ -90,22 +89,17 @@ impl RedisMsg {
} }
/// Move the cursor from the beginning of a number through its end and return the number /// Move the cursor from the beginning of a number through its end and return the number
pub fn process_number(&mut self) -> usize { pub fn process_number(&mut self) -> usize {
let mut selection_end = self.cursor + 1; let (mut selected_number, selection_start) = (0, self.cursor);
let mut chars = self.raw.chars(); while let Ok(number) = self.raw[selection_start..self.cursor + 1].parse::<usize>() {
chars.nth(self.cursor); self.cursor += 1;
while chars.next().expect("still in str").is_digit(10) { selected_number = number;
selection_end += 1;
} }
let selected_number = self.raw[self.cursor..selection_end]
.parse::<usize>()
.expect("checked with `.is_digit(10)`");
self.cursor = selection_end;
selected_number selected_number
} }
/// In a pubsub reply from Redis, an item can be either the name of the subscribed channel /// In a pubsub reply from Redis, an item can be either the name of the subscribed channel
/// or the msg payload. Either way, it follows the same format: /// or the msg payload. Either way, it follows the same format:
/// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n` /// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n`
pub fn get_next_item(&mut self) -> String { pub fn next_item(&mut self) -> String {
self.cursor += "$".len(); self.cursor += "$".len();
let item_len = self.process_number(); let item_len = self.process_number();
self.cursor += "\r\n".len(); self.cursor += "\r\n".len();
@ -123,10 +117,10 @@ mod test {
#[test] #[test]
fn simple_redis_parse() { fn simple_redis_parse() {
let input = "*3\r\n$9\r\nSUBSCRIBE\r\n$10\r\ntimeline:1\r\n:1\r\n"; let input = "*3\r\n$9\r\nSUBSCRIBE\r\n$10\r\ntimeline:1\r\n:1\r\n";
let mut msg = RedisMsg::from_raw(input.to_string()); let mut msg = RedisMsg::from_raw(input);
let cmd = msg.get_next_item(); let cmd = msg.next_item();
assert_eq!(&cmd, "SUBSCRIBE"); assert_eq!(&cmd, "SUBSCRIBE");
let timeline = msg.get_next_item(); let timeline = msg.next_item();
assert_eq!(&timeline, "timeline:1"); assert_eq!(&timeline, "timeline:1");
msg.cursor += ":1\r\n".len(); msg.cursor += ":1\r\n".len();
assert_eq!(msg.cursor, input.len()); assert_eq!(msg.cursor, input.len());
@ -135,12 +129,12 @@ mod test {
#[test] #[test]
fn realistic_redis_parse() { fn realistic_redis_parse() {
let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:4\r\n$1386\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102866835379605039\",\"created_at\":\"2019-09-27T22:29:02.590Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"http://localhost:3000/users/admin/statuses/102866835379605039\",\"url\":\"http://localhost:3000/@admin/102866835379605039\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"<p><span class=\\\"h-card\\\"><a href=\\\"http://localhost:3000/@susan\\\" class=\\\"u-url mention\\\">@<span>susan</span></a></span> hi</p>\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"1\",\"username\":\"admin\",\"acct\":\"admin\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-07-04T00:21:05.890Z\",\"note\":\"<p></p>\",\"url\":\"http://localhost:3000/@admin\",\"avatar\":\"http://localhost:3000/avatars/original/missing.png\",\"avatar_static\":\"http://localhost:3000/avatars/original/missing.png\",\"header\":\"http://localhost:3000/headers/original/missing.png\",\"header_static\":\"http://localhost:3000/headers/original/missing.png\",\"followers_count\":3,\"following_count\":3,\"statuses_count\":192,\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"4\",\"username\":\"susan\",\"url\":\"http://localhost:3000/@susan\",\"acct\":\"susan\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1569623342825}\r\n"; let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:4\r\n$1386\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102866835379605039\",\"created_at\":\"2019-09-27T22:29:02.590Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"http://localhost:3000/users/admin/statuses/102866835379605039\",\"url\":\"http://localhost:3000/@admin/102866835379605039\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"<p><span class=\\\"h-card\\\"><a href=\\\"http://localhost:3000/@susan\\\" class=\\\"u-url mention\\\">@<span>susan</span></a></span> hi</p>\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"1\",\"username\":\"admin\",\"acct\":\"admin\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-07-04T00:21:05.890Z\",\"note\":\"<p></p>\",\"url\":\"http://localhost:3000/@admin\",\"avatar\":\"http://localhost:3000/avatars/original/missing.png\",\"avatar_static\":\"http://localhost:3000/avatars/original/missing.png\",\"header\":\"http://localhost:3000/headers/original/missing.png\",\"header_static\":\"http://localhost:3000/headers/original/missing.png\",\"followers_count\":3,\"following_count\":3,\"statuses_count\":192,\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"4\",\"username\":\"susan\",\"url\":\"http://localhost:3000/@susan\",\"acct\":\"susan\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1569623342825}\r\n";
let mut msg = RedisMsg::from_raw(input.to_string()); let mut msg = RedisMsg::from_raw(input);
let cmd = msg.get_next_item(); let cmd = msg.next_item();
assert_eq!(&cmd, "message"); assert_eq!(&cmd, "message");
let timeline = msg.get_next_item(); let timeline = msg.next_item();
assert_eq!(&timeline, "timeline:4"); assert_eq!(&timeline, "timeline:4");
let message_str = msg.get_next_item(); let message_str = msg.next_item();
assert_eq!(message_str, input[41..input.len() - 2]); assert_eq!(message_str, input[41..input.len() - 2]);
assert_eq!(msg.cursor, input.len()); assert_eq!(msg.cursor, input.len());
} }