From a8345bd234ee1aa77b87dd27985571c76be92e5b Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Tue, 27 Aug 2019 20:29:37 -0400 Subject: [PATCH] Fix bug leading to dropped messages (#34) This commit fixes an issue where messages would be dropped if multiple messages arrived from Redis at the same time. --- src/redis_to_client_stream/mod.rs | 1 + src/redis_to_client_stream/receiver.rs | 57 +++++++++++++------------- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index b74c372..a9221a3 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -63,6 +63,7 @@ pub fn send_updates_to_ws( .for_each(move |_json_value| { if let Ok(Async::Ready(Some(json_value))) = stream.poll() { let msg = warp::ws::Message::text(json_value.to_string()); + // dbg!(&msg); tx.unbounded_send(msg).expect("No send error"); }; Ok(()) diff --git a/src/redis_to_client_stream/receiver.rs b/src/redis_to_client_stream/receiver.rs index ad6507e..a544520 100644 --- a/src/redis_to_client_stream/receiver.rs +++ b/src/redis_to_client_stream/receiver.rs @@ -4,7 +4,7 @@ use super::redis_cmd; use crate::{config, pubsub_cmd}; use futures::{Async, Poll}; -use log::{info, warn}; +use log::info; use regex::Regex; use serde_json::Value; use std::{collections, io::Read, io::Write, net, time}; @@ -21,7 +21,7 @@ pub struct Receiver { manager_id: Uuid, msg_queues: collections::HashMap, clients_per_timeline: collections::HashMap, - current_message: String, + incoming_raw_msg: String, } impl Receiver { @@ -37,7 +37,7 @@ impl Receiver { manager_id: Uuid::default(), msg_queues: collections::HashMap::new(), clients_per_timeline: collections::HashMap::new(), - current_message: String::new(), + incoming_raw_msg: String::new(), } } @@ -217,37 +217,38 @@ impl<'a> AsyncReadableStream<'a> { if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() { let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]); - receiver.current_message.push_str(raw_redis_response); - if !receiver - .current_message - .to_ascii_lowercase() - .starts_with("*3\r\n$7\r\nmessage\r\n") - { - // Not dealing with a message, nothing to do here - receiver.current_message.clear(); - } else if receiver.current_message.ends_with("}\r\n") { - warn!("Received Redis message: {}", &receiver.current_message); + receiver.incoming_raw_msg.push_str(raw_redis_response); + // Text comes in from redis as a raw stream, which could be more than one message + // and is not guaranteed to end on a message boundary. We need to break it down + // into messages. First, start by only acting if we end on a valid message boundary + if receiver.incoming_raw_msg.ends_with("}\r\n") { + // Every valid message is tagged with the string `message`. This means 3 things: + // 1) We can discard everything before the first `message` (with `skip(1)`) + // 2) We can split into separate messages by splitting on `message` + // 3) We can use a regex that discards everything after the *first* valid + // message (since the next message will have a new `message` tag) + let messages = receiver.incoming_raw_msg.as_str().split("message").skip(1); let regex = Regex::new(r"timeline:(?P.*?)\r\n\$\d+\r\n(?P.*?)\r\n") .expect("Hard-codded"); + for message in messages { + let timeline = regex.captures(message).expect("Hard-coded timeline regex") + ["timeline"] + .to_string(); - let timeline = regex - .captures(&receiver.current_message) - .expect("Hard-coded timeline regex")["timeline"] - .to_string(); - let redis_msg: Value = serde_json::from_str( - ®ex - .captures(&receiver.current_message) - .expect("Hard-coded value regex")["value"], - ) - .expect("Valid json"); - for msg_queue in receiver.msg_queues.values_mut() { - if msg_queue.redis_channel == timeline { - msg_queue.messages.push_back(redis_msg.clone()); + let redis_msg: Value = serde_json::from_str( + ®ex.captures(message).expect("Hard-coded value regex")["value"], + ) + .expect("Valid json"); + + for msg_queue in receiver.msg_queues.values_mut() { + if msg_queue.redis_channel == timeline { + msg_queue.messages.push_back(redis_msg.clone()); + } } } - // We're done here, clear `current_message` for next msg - receiver.current_message.clear(); + // We've processed this raw msg and can safely discard it + receiver.incoming_raw_msg.clear(); } } }