mirror of https://github.com/mastodon/flodgatt
Unicode fix (#54)
* Increase detail of error logging; bump version * Fix panic caused by spliting a message inside a Unicode char
This commit is contained in:
parent
d347b8e2dc
commit
5b663d110e
|
@ -386,7 +386,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flodgatt"
|
name = "flodgatt"
|
||||||
version = "0.3.1"
|
version = "0.3.4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "flodgatt"
|
name = "flodgatt"
|
||||||
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
|
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
|
||||||
version = "0.3.1"
|
version = "0.3.4"
|
||||||
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
|
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
|
|
|
@ -70,17 +70,17 @@ fn parse_with_stuct(input: &str) -> Vec<(String, Value)> {
|
||||||
|
|
||||||
while incoming_raw_msg.len() > 0 {
|
while incoming_raw_msg.len() > 0 {
|
||||||
let mut msg = RedisMsg::from_raw(incoming_raw_msg);
|
let mut msg = RedisMsg::from_raw(incoming_raw_msg);
|
||||||
let command = msg.next_item();
|
let command = msg.next_field();
|
||||||
match command.as_str() {
|
match command.as_str() {
|
||||||
"message" => {
|
"message" => {
|
||||||
let timeline = msg.next_item()["timeline:".len()..].to_string();
|
let timeline = msg.next_field()["timeline:".len()..].to_string();
|
||||||
let message: Value = serde_json::from_str(&msg.next_item()).unwrap();
|
let message: Value = serde_json::from_str(&msg.next_field()).unwrap();
|
||||||
output.push((timeline, message));
|
output.push((timeline, message));
|
||||||
}
|
}
|
||||||
"subscribe" | "unsubscribe" => {
|
"subscribe" | "unsubscribe" => {
|
||||||
// This returns a confirmation. We don't need to do anything with it,
|
// This returns a confirmation. We don't need to do anything with it,
|
||||||
// but we do need to advance the cursor past it
|
// but we do need to advance the cursor past it
|
||||||
msg.next_item(); // name of channel (un)subscribed
|
msg.next_field(); // name of channel (un)subscribed
|
||||||
msg.cursor += ":".len();
|
msg.cursor += ":".len();
|
||||||
msg.process_number(); // The number of active subscriptions
|
msg.process_number(); // The number of active subscriptions
|
||||||
msg.cursor += "\r\n".len();
|
msg.cursor += "\r\n".len();
|
||||||
|
|
|
@ -6,36 +6,41 @@ use std::net;
|
||||||
use tokio::io::AsyncRead;
|
use tokio::io::AsyncRead;
|
||||||
|
|
||||||
pub struct AsyncReadableStream<'a>(&'a mut net::TcpStream);
|
pub struct AsyncReadableStream<'a>(&'a mut net::TcpStream);
|
||||||
|
|
||||||
impl<'a> AsyncReadableStream<'a> {
|
impl<'a> AsyncReadableStream<'a> {
|
||||||
pub fn new(stream: &'a mut net::TcpStream) -> Self {
|
pub fn new(stream: &'a mut net::TcpStream) -> Self {
|
||||||
AsyncReadableStream(stream)
|
Self(stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Text comes in from redis as a raw stream, which could be more than one message
|
||||||
|
// and is not guaranteed to end on a message boundary. We need to break it down
|
||||||
|
// into messages. Incoming messages *are* guaranteed to be RESP arrays,
|
||||||
|
// https://redis.io/topics/protocol
|
||||||
/// Adds any new Redis messages to the `MsgQueue` for the appropriate `ClientAgent`.
|
/// Adds any new Redis messages to the `MsgQueue` for the appropriate `ClientAgent`.
|
||||||
pub fn poll_redis(receiver: &mut Receiver) {
|
pub fn poll_redis(receiver: &mut Receiver) {
|
||||||
let mut buffer = vec![0u8; 3000];
|
let mut buffer = vec![0u8; 6000];
|
||||||
let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection);
|
let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection);
|
||||||
|
|
||||||
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() {
|
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() {
|
||||||
let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]);
|
let raw_redis_response = async_stream.to_utf8(buffer, num_bytes_read);
|
||||||
receiver.incoming_raw_msg.push_str(raw_redis_response);
|
receiver.incoming_raw_msg.push_str(&raw_redis_response);
|
||||||
// Text comes in from redis as a raw stream, which could be more than one message
|
|
||||||
// and is not guaranteed to end on a message boundary. We need to break it down
|
|
||||||
// into messages. Incoming messages *are* guaranteed to be RESP arrays,
|
|
||||||
// https://redis.io/topics/protocol
|
|
||||||
|
|
||||||
// Only act if we have a full message (end on a msg boundary)
|
// Only act if we have a full message (end on a msg boundary)
|
||||||
if !receiver.incoming_raw_msg.ends_with("}\r\n") {
|
if !receiver.incoming_raw_msg.ends_with("}\r\n") {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
let mut msg = RedisMsg::from_raw(&receiver.incoming_raw_msg);
|
let mut msg = RedisMsg::from_raw(&receiver.incoming_raw_msg);
|
||||||
|
|
||||||
while !msg.raw.is_empty() {
|
while !msg.raw.is_empty() {
|
||||||
let command = msg.next_item();
|
let command = msg.next_field();
|
||||||
match command.as_str() {
|
match command.as_str() {
|
||||||
"message" => {
|
"message" => {
|
||||||
let timeline = &msg.next_item()["timeline:".len()..];
|
let timeline = &msg.next_field()["timeline:".len()..];
|
||||||
let msg_txt = &msg.next_item();
|
let msg_txt = &msg.next_field();
|
||||||
let msg_value: Value = serde_json::from_str(msg_txt).expect("Redis json");
|
let msg_value: Value = match serde_json::from_str(msg_txt) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => panic!("Unparseable json {}\n\n{}", msg_txt, e),
|
||||||
|
};
|
||||||
for msg_queue in receiver.msg_queues.values_mut() {
|
for msg_queue in receiver.msg_queues.values_mut() {
|
||||||
if msg_queue.redis_channel == timeline {
|
if msg_queue.redis_channel == timeline {
|
||||||
msg_queue.messages.push_back(msg_value.clone());
|
msg_queue.messages.push_back(msg_value.clone());
|
||||||
|
@ -43,20 +48,28 @@ impl<'a> AsyncReadableStream<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"subscribe" | "unsubscribe" => {
|
"subscribe" | "unsubscribe" => {
|
||||||
// This returns a confirmation that we ignore, but must advance
|
// No msg, so ignore & advance cursor to end
|
||||||
// the cursor past
|
let _channel = msg.next_field();
|
||||||
let _channel = msg.next_item();
|
|
||||||
msg.cursor += ":".len();
|
msg.cursor += ":".len();
|
||||||
let _active_subscriptions = msg.process_number();
|
let _active_subscriptions = msg.process_number();
|
||||||
msg.cursor += "\r\n".len();
|
msg.cursor += "\r\n".len();
|
||||||
}
|
}
|
||||||
cmd => panic!("Invariant violation: {} is bad Redis input", cmd),
|
cmd => panic!("Invariant violation: {} is invalid Redis input", cmd),
|
||||||
}
|
};
|
||||||
msg = RedisMsg::from_raw(&msg.raw[msg.cursor..]);
|
msg = RedisMsg::from_raw(&msg.raw[msg.cursor..]);
|
||||||
}
|
}
|
||||||
receiver.incoming_raw_msg.clear();
|
receiver.incoming_raw_msg.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn to_utf8(&mut self, cur_buffer: Vec<u8>, size: usize) -> String {
|
||||||
|
String::from_utf8(cur_buffer[..size].to_vec()).unwrap_or_else(|_| {
|
||||||
|
let mut new_buffer = vec![0u8; 1];
|
||||||
|
self.poll_read(&mut new_buffer).unwrap();
|
||||||
|
let buffer = ([cur_buffer, new_buffer]).concat();
|
||||||
|
self.to_utf8(buffer, size + 1)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Read for AsyncReadableStream<'a> {
|
impl<'a> Read for AsyncReadableStream<'a> {
|
||||||
|
@ -74,23 +87,23 @@ impl<'a> AsyncRead for AsyncReadableStream<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Debug)]
|
||||||
pub struct RedisMsg<'a> {
|
pub struct RedisMsg<'a> {
|
||||||
pub raw: &'a str,
|
pub raw: &'a str,
|
||||||
pub cursor: usize,
|
pub cursor: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> RedisMsg<'a> {
|
impl<'a> RedisMsg<'a> {
|
||||||
pub fn from_raw(raw: &'a str) -> Self {
|
pub fn from_raw(raw: &'a str) -> Self {
|
||||||
Self {
|
Self {
|
||||||
raw,
|
raw,
|
||||||
cursor: "*3\r\n".len(), //length of intro header
|
cursor: "*3\r\n".len(), //length of intro header
|
||||||
..Self::default()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/// Move the cursor from the beginning of a number through its end and return the number
|
/// Move the cursor from the beginning of a number through its end and return the number
|
||||||
pub fn process_number(&mut self) -> usize {
|
pub fn process_number(&mut self) -> usize {
|
||||||
let (mut selected_number, selection_start) = (0, self.cursor);
|
let (mut selected_number, selection_start) = (0, self.cursor);
|
||||||
while let Ok(number) = self.raw[selection_start..self.cursor + 1].parse::<usize>() {
|
while let Ok(number) = self.raw[selection_start..=self.cursor].parse::<usize>() {
|
||||||
self.cursor += 1;
|
self.cursor += 1;
|
||||||
selected_number = number;
|
selected_number = number;
|
||||||
}
|
}
|
||||||
|
@ -99,8 +112,9 @@ impl<'a> RedisMsg<'a> {
|
||||||
/// In a pubsub reply from Redis, an item can be either the name of the subscribed channel
|
/// In a pubsub reply from Redis, an item can be either the name of the subscribed channel
|
||||||
/// or the msg payload. Either way, it follows the same format:
|
/// or the msg payload. Either way, it follows the same format:
|
||||||
/// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n`
|
/// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n`
|
||||||
pub fn next_item(&mut self) -> String {
|
pub fn next_field(&mut self) -> String {
|
||||||
self.cursor += "$".len();
|
self.cursor += "$".len();
|
||||||
|
|
||||||
let item_len = self.process_number();
|
let item_len = self.process_number();
|
||||||
self.cursor += "\r\n".len();
|
self.cursor += "\r\n".len();
|
||||||
let item_start_position = self.cursor;
|
let item_start_position = self.cursor;
|
||||||
|
@ -110,6 +124,7 @@ impl<'a> RedisMsg<'a> {
|
||||||
item
|
item
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -118,9 +133,9 @@ mod test {
|
||||||
fn simple_redis_parse() {
|
fn simple_redis_parse() {
|
||||||
let input = "*3\r\n$9\r\nSUBSCRIBE\r\n$10\r\ntimeline:1\r\n:1\r\n";
|
let input = "*3\r\n$9\r\nSUBSCRIBE\r\n$10\r\ntimeline:1\r\n:1\r\n";
|
||||||
let mut msg = RedisMsg::from_raw(input);
|
let mut msg = RedisMsg::from_raw(input);
|
||||||
let cmd = msg.next_item();
|
let cmd = msg.next_field();
|
||||||
assert_eq!(&cmd, "SUBSCRIBE");
|
assert_eq!(&cmd, "SUBSCRIBE");
|
||||||
let timeline = msg.next_item();
|
let timeline = msg.next_field();
|
||||||
assert_eq!(&timeline, "timeline:1");
|
assert_eq!(&timeline, "timeline:1");
|
||||||
msg.cursor += ":1\r\n".len();
|
msg.cursor += ":1\r\n".len();
|
||||||
assert_eq!(msg.cursor, input.len());
|
assert_eq!(msg.cursor, input.len());
|
||||||
|
@ -130,11 +145,11 @@ mod test {
|
||||||
fn realistic_redis_parse() {
|
fn realistic_redis_parse() {
|
||||||
let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:4\r\n$1386\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102866835379605039\",\"created_at\":\"2019-09-27T22:29:02.590Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"http://localhost:3000/users/admin/statuses/102866835379605039\",\"url\":\"http://localhost:3000/@admin/102866835379605039\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"<p><span class=\\\"h-card\\\"><a href=\\\"http://localhost:3000/@susan\\\" class=\\\"u-url mention\\\">@<span>susan</span></a></span> hi</p>\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"1\",\"username\":\"admin\",\"acct\":\"admin\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-07-04T00:21:05.890Z\",\"note\":\"<p></p>\",\"url\":\"http://localhost:3000/@admin\",\"avatar\":\"http://localhost:3000/avatars/original/missing.png\",\"avatar_static\":\"http://localhost:3000/avatars/original/missing.png\",\"header\":\"http://localhost:3000/headers/original/missing.png\",\"header_static\":\"http://localhost:3000/headers/original/missing.png\",\"followers_count\":3,\"following_count\":3,\"statuses_count\":192,\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"4\",\"username\":\"susan\",\"url\":\"http://localhost:3000/@susan\",\"acct\":\"susan\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1569623342825}\r\n";
|
let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:4\r\n$1386\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102866835379605039\",\"created_at\":\"2019-09-27T22:29:02.590Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"http://localhost:3000/users/admin/statuses/102866835379605039\",\"url\":\"http://localhost:3000/@admin/102866835379605039\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"<p><span class=\\\"h-card\\\"><a href=\\\"http://localhost:3000/@susan\\\" class=\\\"u-url mention\\\">@<span>susan</span></a></span> hi</p>\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"1\",\"username\":\"admin\",\"acct\":\"admin\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-07-04T00:21:05.890Z\",\"note\":\"<p></p>\",\"url\":\"http://localhost:3000/@admin\",\"avatar\":\"http://localhost:3000/avatars/original/missing.png\",\"avatar_static\":\"http://localhost:3000/avatars/original/missing.png\",\"header\":\"http://localhost:3000/headers/original/missing.png\",\"header_static\":\"http://localhost:3000/headers/original/missing.png\",\"followers_count\":3,\"following_count\":3,\"statuses_count\":192,\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"4\",\"username\":\"susan\",\"url\":\"http://localhost:3000/@susan\",\"acct\":\"susan\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1569623342825}\r\n";
|
||||||
let mut msg = RedisMsg::from_raw(input);
|
let mut msg = RedisMsg::from_raw(input);
|
||||||
let cmd = msg.next_item();
|
let cmd = msg.next_field();
|
||||||
assert_eq!(&cmd, "message");
|
assert_eq!(&cmd, "message");
|
||||||
let timeline = msg.next_item();
|
let timeline = msg.next_field();
|
||||||
assert_eq!(&timeline, "timeline:4");
|
assert_eq!(&timeline, "timeline:4");
|
||||||
let message_str = msg.next_item();
|
let message_str = msg.next_field();
|
||||||
assert_eq!(message_str, input[41..input.len() - 2]);
|
assert_eq!(message_str, input[41..input.len() - 2]);
|
||||||
assert_eq!(msg.cursor, input.len());
|
assert_eq!(msg.cursor, input.len());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue