Store each Redis message as a growable String (#30)

This commit stores each message received from Redis as a growable string
rather than in a buffer of fixed size.  This allows the server to
receive messages of any length.
This commit is contained in:
Daniel Sockwell 2019-07-11 15:10:56 -04:00 committed by GitHub
parent 1925710e8e
commit 2fca5ec327
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 14 additions and 7 deletions

View File

@ -21,6 +21,7 @@ pub struct Receiver {
manager_id: Uuid,
msg_queues: collections::HashMap<Uuid, MsgQueue>,
clients_per_timeline: collections::HashMap<String, i32>,
current_message: String,
}
impl Receiver {
@ -36,6 +37,7 @@ impl Receiver {
manager_id: Uuid::default(),
msg_queues: collections::HashMap::new(),
clients_per_timeline: collections::HashMap::new(),
current_message: String::new(),
}
}
@ -210,29 +212,32 @@ impl<'a> AsyncReadableStream<'a> {
/// the appropriate `ClientAgent`.
fn poll_redis(receiver: &mut Receiver) {
let mut buffer = vec![0u8; 3000];
// Add any incoming messages to the back of the relevant `msg_queues`
// NOTE: This could be more/other than the `msg_queue` currently being polled
let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection);
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() {
let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]);
warn!("redis: {}", &raw_redis_response);
if raw_redis_response
receiver.current_message.push_str(raw_redis_response);
if !receiver
.current_message
.to_ascii_lowercase()
.starts_with("$7\r\nmessage\r\n")
.starts_with("*3\r\n$7\r\nmessage\r\n")
{
// Not dealing with a message, nothing to do here
receiver.current_message.clear();
} else if receiver.current_message.ends_with("}\r\n") {
warn!("Received Redis message: {}", &receiver.current_message);
let regex =
Regex::new(r"timeline:(?P<timeline>.*?)\r\n\$\d+\r\n(?P<value>.*?)\r\n")
.expect("Hard-codded");
let timeline = regex
.captures(raw_redis_response)
.captures(&receiver.current_message)
.expect("Hard-coded timeline regex")["timeline"]
.to_string();
let redis_msg: Value = serde_json::from_str(
&regex
.captures(raw_redis_response)
.captures(&receiver.current_message)
.expect("Hard-coded value regex")["value"],
)
.expect("Valid json");
@ -241,6 +246,8 @@ impl<'a> AsyncReadableStream<'a> {
msg_queue.messages.push_back(redis_msg.clone());
}
}
// We're done here, clear `current_message` for next msg
receiver.current_message.clear();
}
}
}