Fix bug leading to dropped messages ()

This commit fixes an issue where messages would be dropped if
multiple messages arrived from Redis at the same time.
This commit is contained in:
Daniel Sockwell 2019-08-27 20:29:37 -04:00 committed by GitHub
parent 161da3f456
commit a8345bd234
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 28 deletions
src/redis_to_client_stream

View File

@ -63,6 +63,7 @@ pub fn send_updates_to_ws(
.for_each(move |_json_value| {
if let Ok(Async::Ready(Some(json_value))) = stream.poll() {
let msg = warp::ws::Message::text(json_value.to_string());
// dbg!(&msg);
tx.unbounded_send(msg).expect("No send error");
};
Ok(())

View File

@ -4,7 +4,7 @@
use super::redis_cmd;
use crate::{config, pubsub_cmd};
use futures::{Async, Poll};
use log::{info, warn};
use log::info;
use regex::Regex;
use serde_json::Value;
use std::{collections, io::Read, io::Write, net, time};
@ -21,7 +21,7 @@ pub struct Receiver {
manager_id: Uuid,
msg_queues: collections::HashMap<Uuid, MsgQueue>,
clients_per_timeline: collections::HashMap<String, i32>,
current_message: String,
incoming_raw_msg: String,
}
impl Receiver {
@ -37,7 +37,7 @@ impl Receiver {
manager_id: Uuid::default(),
msg_queues: collections::HashMap::new(),
clients_per_timeline: collections::HashMap::new(),
current_message: String::new(),
incoming_raw_msg: String::new(),
}
}
@ -217,37 +217,38 @@ impl<'a> AsyncReadableStream<'a> {
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() {
let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]);
receiver.current_message.push_str(raw_redis_response);
if !receiver
.current_message
.to_ascii_lowercase()
.starts_with("*3\r\n$7\r\nmessage\r\n")
{
// Not dealing with a message, nothing to do here
receiver.current_message.clear();
} else if receiver.current_message.ends_with("}\r\n") {
warn!("Received Redis message: {}", &receiver.current_message);
receiver.incoming_raw_msg.push_str(raw_redis_response);
// Text comes in from redis as a raw stream, which could be more than one message
// and is not guaranteed to end on a message boundary. We need to break it down
// into messages. First, start by only acting if we end on a valid message boundary
if receiver.incoming_raw_msg.ends_with("}\r\n") {
// Every valid message is tagged with the string `message`. This means 3 things:
// 1) We can discard everything before the first `message` (with `skip(1)`)
// 2) We can split into separate messages by splitting on `message`
// 3) We can use a regex that discards everything after the *first* valid
// message (since the next message will have a new `message` tag)
let messages = receiver.incoming_raw_msg.as_str().split("message").skip(1);
let regex =
Regex::new(r"timeline:(?P<timeline>.*?)\r\n\$\d+\r\n(?P<value>.*?)\r\n")
.expect("Hard-codded");
for message in messages {
let timeline = regex.captures(message).expect("Hard-coded timeline regex")
["timeline"]
.to_string();
let timeline = regex
.captures(&receiver.current_message)
.expect("Hard-coded timeline regex")["timeline"]
.to_string();
let redis_msg: Value = serde_json::from_str(
&regex
.captures(&receiver.current_message)
.expect("Hard-coded value regex")["value"],
)
.expect("Valid json");
for msg_queue in receiver.msg_queues.values_mut() {
if msg_queue.redis_channel == timeline {
msg_queue.messages.push_back(redis_msg.clone());
let redis_msg: Value = serde_json::from_str(
&regex.captures(message).expect("Hard-coded value regex")["value"],
)
.expect("Valid json");
for msg_queue in receiver.msg_queues.values_mut() {
if msg_queue.redis_channel == timeline {
msg_queue.messages.push_back(redis_msg.clone());
}
}
}
// We're done here, clear `current_message` for next msg
receiver.current_message.clear();
// We've processed this raw msg and can safely discard it
receiver.incoming_raw_msg.clear();
}
}
}