mirror of https://github.com/mastodon/flodgatt
Parse redis messages (#51)
* Initial version of Redis parsing fix * Refactor parsing for more maintainability * Add benchmarks for new parsing strategy * Add tests for parsing
This commit is contained in:
parent
ceb38c2689
commit
2ac08c5316
|
@ -2,6 +2,7 @@ use criterion::black_box;
|
||||||
use criterion::criterion_group;
|
use criterion::criterion_group;
|
||||||
use criterion::criterion_main;
|
use criterion::criterion_main;
|
||||||
use criterion::Criterion;
|
use criterion::Criterion;
|
||||||
|
use flodgatt::redis_to_client_stream::redis_stream::RedisMsg;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
|
@ -62,6 +63,38 @@ fn print_next_str(mut end: usize, input: &str) -> (usize, String) {
|
||||||
let string = &input[start..end];
|
let string = &input[start..end];
|
||||||
(end, string.to_string())
|
(end, string.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn parse_with_stuct(input: String) -> Vec<(String, Value)> {
|
||||||
|
let mut output = Vec::new();
|
||||||
|
let mut incoming_raw_msg = input;
|
||||||
|
|
||||||
|
while incoming_raw_msg.len() > 0 {
|
||||||
|
let mut msg = RedisMsg::from_raw(incoming_raw_msg.clone());
|
||||||
|
let command = msg.get_next_item();
|
||||||
|
match command.as_str() {
|
||||||
|
"message" => {
|
||||||
|
let timeline = msg.get_next_item()["timeline:".len()..].to_string();
|
||||||
|
let message: Value = serde_json::from_str(&msg.get_next_item()).unwrap();
|
||||||
|
output.push((timeline, message));
|
||||||
|
}
|
||||||
|
"subscribe" | "unsubscribe" => {
|
||||||
|
// This returns a confirmation. We don't need to do anything with it,
|
||||||
|
// but we do need to advance the cursor past it
|
||||||
|
msg.get_next_item(); // name of channel (un)subscribed
|
||||||
|
msg.cursor += ":".len();
|
||||||
|
msg.process_number(); // The number of active subscriptions
|
||||||
|
msg.cursor += "\r\n".len();
|
||||||
|
}
|
||||||
|
cmd => panic!(
|
||||||
|
"Invariant violation: bad Redis input. Got {} as a command",
|
||||||
|
cmd
|
||||||
|
),
|
||||||
|
}
|
||||||
|
incoming_raw_msg = msg.raw[msg.cursor..].to_string();
|
||||||
|
}
|
||||||
|
output
|
||||||
|
}
|
||||||
|
|
||||||
fn criterion_benchmark(c: &mut Criterion) {
|
fn criterion_benchmark(c: &mut Criterion) {
|
||||||
let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:1\r\n$3790\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102775370117886890\",\"created_at\":\"2019-09-11T18:42:19.000Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"unlisted\",\"language\":\"en\",\"uri\":\"https://mastodon.host/users/federationbot/statuses/102775346916917099\",\"url\":\"https://mastodon.host/@federationbot/102775346916917099\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"<p>Trending tags:<br><a href=\\\"https://mastodon.host/tags/neverforget\\\" class=\\\"mention hashtag\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\">#<span>neverforget</span></a><br><a href=\\\"https://mastodon.host/tags/4styles\\\" class=\\\"mention hashtag\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\">#<span>4styles</span></a><br><a href=\\\"https://mastodon.host/tags/newpipe\\\" class=\\\"mention hashtag\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\">#<span>newpipe</span></a><br><a href=\\\"https://mastodon.host/tags/uber\\\" class=\\\"mention hashtag\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\">#<span>uber</span></a><br><a href=\\\"https://mastodon.host/tags/mercredifiction\\\" class=\\\"mention hashtag\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\">#<span>mercredifiction</span></a></p>\",\"reblog\":null,\"account\":{\"id\":\"78\",\"username\":\"federationbot\",\"acct\":\"federationbot@mastodon.host\",\"display_name\":\"Federation Bot\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-09-10T15:04:25.559Z\",\"note\":\"<p>Hello, I am mastodon.host official semi bot.</p><p>Follow me if you want to have some updates on the view of the fediverse from here ( I only post unlisted ). </p><p>I also randomly boost one of my followers toot every hour !</p><p>If you don\'t feel confortable with me following you, tell me: unfollow and I\'ll do it :)</p><p>If you want me to follow you, just tell me follow ! </p><p>If you want automatic follow for new users on your instance and you are an instance admin, contact me !</p><p>Other commands are private :)</p>\",\"url\":\"https://mastodon.host/@federationbot\",\"avatar\":\"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863\",\"avatar_static\":\"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":16636,\"following_count\":179532,\"statuses_count\":50554,\"emojis\":[],\"fields\":[{\"name\":\"More stats\",\"value\":\"<a href=\\\"https://mastodon.host/stats.html\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\"><span class=\\\"invisible\\\">https://</span><span class=\\\"\\\">mastodon.host/stats.html</span><span class=\\\"invisible\\\"></span></a>\",\"verified_at\":null},{\"name\":\"More infos\",\"value\":\"<a href=\\\"https://mastodon.host/about/more\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\"><span class=\\\"invisible\\\">https://</span><span class=\\\"\\\">mastodon.host/about/more</span><span class=\\\"invisible\\\"></span></a>\",\"verified_at\":null},{\"name\":\"Owner/Friend\",\"value\":\"<span class=\\\"h-card\\\"><a href=\\\"https://mastodon.host/@gled\\\" class=\\\"u-url mention\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\">@<span>gled</span></a></span>\",\"verified_at\":null}]},\"media_attachments\":[],\"mentions\":[],\"tags\":[{\"name\":\"4styles\",\"url\":\"https://instance.codesections.com/tags/4styles\"},{\"name\":\"neverforget\",\"url\":\"https://instance.codesections.com/tags/neverforget\"},{\"name\":\"mercredifiction\",\"url\":\"https://instance.codesections.com/tags/mercredifiction\"},{\"name\":\"uber\",\"url\":\"https://instance.codesections.com/tags/uber\"},{\"name\":\"newpipe\",\"url\":\"https://instance.codesections.com/tags/newpipe\"}],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1568227693541}\r\n".to_string();
|
let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:1\r\n$3790\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102775370117886890\",\"created_at\":\"2019-09-11T18:42:19.000Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"unlisted\",\"language\":\"en\",\"uri\":\"https://mastodon.host/users/federationbot/statuses/102775346916917099\",\"url\":\"https://mastodon.host/@federationbot/102775346916917099\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"<p>Trending tags:<br><a href=\\\"https://mastodon.host/tags/neverforget\\\" class=\\\"mention hashtag\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\">#<span>neverforget</span></a><br><a href=\\\"https://mastodon.host/tags/4styles\\\" class=\\\"mention hashtag\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\">#<span>4styles</span></a><br><a href=\\\"https://mastodon.host/tags/newpipe\\\" class=\\\"mention hashtag\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\">#<span>newpipe</span></a><br><a href=\\\"https://mastodon.host/tags/uber\\\" class=\\\"mention hashtag\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\">#<span>uber</span></a><br><a href=\\\"https://mastodon.host/tags/mercredifiction\\\" class=\\\"mention hashtag\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\">#<span>mercredifiction</span></a></p>\",\"reblog\":null,\"account\":{\"id\":\"78\",\"username\":\"federationbot\",\"acct\":\"federationbot@mastodon.host\",\"display_name\":\"Federation Bot\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-09-10T15:04:25.559Z\",\"note\":\"<p>Hello, I am mastodon.host official semi bot.</p><p>Follow me if you want to have some updates on the view of the fediverse from here ( I only post unlisted ). </p><p>I also randomly boost one of my followers toot every hour !</p><p>If you don\'t feel confortable with me following you, tell me: unfollow and I\'ll do it :)</p><p>If you want me to follow you, just tell me follow ! </p><p>If you want automatic follow for new users on your instance and you are an instance admin, contact me !</p><p>Other commands are private :)</p>\",\"url\":\"https://mastodon.host/@federationbot\",\"avatar\":\"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863\",\"avatar_static\":\"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863\",\"header\":\"https://instance.codesections.com/headers/original/missing.png\",\"header_static\":\"https://instance.codesections.com/headers/original/missing.png\",\"followers_count\":16636,\"following_count\":179532,\"statuses_count\":50554,\"emojis\":[],\"fields\":[{\"name\":\"More stats\",\"value\":\"<a href=\\\"https://mastodon.host/stats.html\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\"><span class=\\\"invisible\\\">https://</span><span class=\\\"\\\">mastodon.host/stats.html</span><span class=\\\"invisible\\\"></span></a>\",\"verified_at\":null},{\"name\":\"More infos\",\"value\":\"<a href=\\\"https://mastodon.host/about/more\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\"><span class=\\\"invisible\\\">https://</span><span class=\\\"\\\">mastodon.host/about/more</span><span class=\\\"invisible\\\"></span></a>\",\"verified_at\":null},{\"name\":\"Owner/Friend\",\"value\":\"<span class=\\\"h-card\\\"><a href=\\\"https://mastodon.host/@gled\\\" class=\\\"u-url mention\\\" rel=\\\"nofollow noopener\\\" target=\\\"_blank\\\">@<span>gled</span></a></span>\",\"verified_at\":null}]},\"media_attachments\":[],\"mentions\":[],\"tags\":[{\"name\":\"4styles\",\"url\":\"https://instance.codesections.com/tags/4styles\"},{\"name\":\"neverforget\",\"url\":\"https://instance.codesections.com/tags/neverforget\"},{\"name\":\"mercredifiction\",\"url\":\"https://instance.codesections.com/tags/mercredifiction\"},{\"name\":\"uber\",\"url\":\"https://instance.codesections.com/tags/uber\"},{\"name\":\"newpipe\",\"url\":\"https://instance.codesections.com/tags/newpipe\"}],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1568227693541}\r\n".to_string();
|
||||||
|
|
||||||
|
@ -72,6 +105,9 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||||
group.bench_function("hand parse", |b| {
|
group.bench_function("hand parse", |b| {
|
||||||
b.iter(|| hand_parse(black_box(input.clone())))
|
b.iter(|| hand_parse(black_box(input.clone())))
|
||||||
});
|
});
|
||||||
|
group.bench_function("stuct parse", |b| {
|
||||||
|
b.iter(|| parse_with_stuct(black_box(input.clone())))
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
criterion_group!(benches, criterion_benchmark);
|
criterion_group!(benches, criterion_benchmark);
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
pub mod client_agent;
|
pub mod client_agent;
|
||||||
pub mod receiver;
|
pub mod receiver;
|
||||||
pub mod redis_cmd;
|
pub mod redis_cmd;
|
||||||
|
pub mod redis_stream;
|
||||||
|
|
||||||
use crate::config;
|
use crate::config;
|
||||||
pub use client_agent::ClientAgent;
|
pub use client_agent::ClientAgent;
|
||||||
|
|
|
@ -1,27 +1,26 @@
|
||||||
//! Receives data from Redis, sorts it by `ClientAgent`, and stores it until
|
//! Receives data from Redis, sorts it by `ClientAgent`, and stores it until
|
||||||
//! polled by the correct `ClientAgent`. Also manages sububscriptions and
|
//! polled by the correct `ClientAgent`. Also manages sububscriptions and
|
||||||
//! unsubscriptions to/from Redis.
|
//! unsubscriptions to/from Redis.
|
||||||
use super::redis_cmd;
|
use super::{redis_cmd, redis_stream};
|
||||||
use crate::{config, pubsub_cmd};
|
use crate::{config, pubsub_cmd};
|
||||||
use futures::{Async, Poll};
|
use futures::{Async, Poll};
|
||||||
use log::info;
|
use log::info;
|
||||||
use regex::Regex;
|
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::{collections, io::Read, io::Write, net, time};
|
use std::{collections, io::Write, net, time};
|
||||||
use tokio::io::{AsyncRead, Error};
|
use tokio::io::Error;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
/// The item that streams from Redis and is polled by the `ClientAgent`
|
/// The item that streams from Redis and is polled by the `ClientAgent`
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Receiver {
|
pub struct Receiver {
|
||||||
pubsub_connection: net::TcpStream,
|
pub pubsub_connection: net::TcpStream,
|
||||||
secondary_redis_connection: net::TcpStream,
|
secondary_redis_connection: net::TcpStream,
|
||||||
redis_polled_at: time::Instant,
|
redis_polled_at: time::Instant,
|
||||||
timeline: String,
|
timeline: String,
|
||||||
manager_id: Uuid,
|
manager_id: Uuid,
|
||||||
msg_queues: collections::HashMap<Uuid, MsgQueue>,
|
pub msg_queues: collections::HashMap<Uuid, MsgQueue>,
|
||||||
clients_per_timeline: collections::HashMap<String, i32>,
|
clients_per_timeline: collections::HashMap<String, i32>,
|
||||||
incoming_raw_msg: String,
|
pub incoming_raw_msg: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Receiver {
|
impl Receiver {
|
||||||
|
@ -157,7 +156,7 @@ impl futures::stream::Stream for Receiver {
|
||||||
if self.redis_polled_at.elapsed()
|
if self.redis_polled_at.elapsed()
|
||||||
> time::Duration::from_millis(*config::REDIS_POLL_INTERVAL)
|
> time::Duration::from_millis(*config::REDIS_POLL_INTERVAL)
|
||||||
{
|
{
|
||||||
AsyncReadableStream::poll_redis(self);
|
redis_stream::AsyncReadableStream::poll_redis(self);
|
||||||
self.redis_polled_at = time::Instant::now();
|
self.redis_polled_at = time::Instant::now();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,10 +187,10 @@ impl Drop for Receiver {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
struct MsgQueue {
|
pub struct MsgQueue {
|
||||||
messages: collections::VecDeque<Value>,
|
pub messages: collections::VecDeque<Value>,
|
||||||
last_polled_at: time::Instant,
|
last_polled_at: time::Instant,
|
||||||
redis_channel: String,
|
pub redis_channel: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MsgQueue {
|
impl MsgQueue {
|
||||||
|
@ -204,69 +203,3 @@ impl MsgQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct AsyncReadableStream<'a>(&'a mut net::TcpStream);
|
|
||||||
impl<'a> AsyncReadableStream<'a> {
|
|
||||||
fn new(stream: &'a mut net::TcpStream) -> Self {
|
|
||||||
AsyncReadableStream(stream)
|
|
||||||
}
|
|
||||||
/// Polls Redis for any new messages and adds them to the `MsgQueue` for
|
|
||||||
/// the appropriate `ClientAgent`.
|
|
||||||
fn poll_redis(receiver: &mut Receiver) {
|
|
||||||
let mut buffer = vec![0u8; 3000];
|
|
||||||
|
|
||||||
let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection);
|
|
||||||
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() {
|
|
||||||
let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]);
|
|
||||||
dbg!(&raw_redis_response);
|
|
||||||
receiver.incoming_raw_msg.push_str(raw_redis_response);
|
|
||||||
// Text comes in from redis as a raw stream, which could be more than one message
|
|
||||||
// and is not guaranteed to end on a message boundary. We need to break it down
|
|
||||||
// into messages. First, start by only acting if we end on a valid message boundary
|
|
||||||
if receiver.incoming_raw_msg.ends_with("}\r\n") {
|
|
||||||
// Every valid message is tagged with the string `message`. This means 3 things:
|
|
||||||
// 1) We can discard everything before the first `message` (with `skip(1)`)
|
|
||||||
// 2) We can split into separate messages by splitting on `message`
|
|
||||||
// 3) We can use a regex that discards everything after the *first* valid
|
|
||||||
// message (since the next message will have a new `message` tag)
|
|
||||||
let messages = receiver.incoming_raw_msg.as_str().split("message").skip(1);
|
|
||||||
let regex =
|
|
||||||
Regex::new(r"timeline:(?P<timeline>.*?)\r\n\$\d+\r\n(?P<value>.*?)\r\n")
|
|
||||||
.expect("Hard-codded");
|
|
||||||
for message in messages {
|
|
||||||
let timeline = regex.captures(message).expect("Hard-coded timeline regex")
|
|
||||||
["timeline"]
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
let redis_msg: Value = serde_json::from_str(
|
|
||||||
®ex.captures(message).expect("Hard-coded value regex")["value"],
|
|
||||||
)
|
|
||||||
.expect("Valid json");
|
|
||||||
|
|
||||||
for msg_queue in receiver.msg_queues.values_mut() {
|
|
||||||
if msg_queue.redis_channel == timeline {
|
|
||||||
msg_queue.messages.push_back(redis_msg.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// We've processed this raw msg and can safely discard it
|
|
||||||
receiver.incoming_raw_msg.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> Read for AsyncReadableStream<'a> {
|
|
||||||
fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> {
|
|
||||||
self.0.read(buffer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> AsyncRead for AsyncReadableStream<'a> {
|
|
||||||
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
|
|
||||||
match self.read(buf) {
|
|
||||||
Ok(t) => Ok(Async::Ready(t)),
|
|
||||||
Err(_) => Ok(Async::NotReady),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,147 @@
|
||||||
|
use super::receiver::Receiver;
|
||||||
|
use futures::{Async, Poll};
|
||||||
|
use serde_json::Value;
|
||||||
|
use std::io::Read;
|
||||||
|
use std::net;
|
||||||
|
use tokio::io::AsyncRead;
|
||||||
|
|
||||||
|
pub struct AsyncReadableStream<'a>(&'a mut net::TcpStream);
|
||||||
|
impl<'a> AsyncReadableStream<'a> {
|
||||||
|
pub fn new(stream: &'a mut net::TcpStream) -> Self {
|
||||||
|
AsyncReadableStream(stream)
|
||||||
|
}
|
||||||
|
/// Polls Redis for any new messages and adds them to the `MsgQueue` for
|
||||||
|
/// the appropriate `ClientAgent`.
|
||||||
|
pub fn poll_redis(receiver: &mut Receiver) {
|
||||||
|
let mut buffer = vec![0u8; 3000];
|
||||||
|
|
||||||
|
let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection);
|
||||||
|
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() {
|
||||||
|
let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]);
|
||||||
|
receiver.incoming_raw_msg.push_str(raw_redis_response);
|
||||||
|
// Text comes in from redis as a raw stream, which could be more than one message
|
||||||
|
// and is not guaranteed to end on a message boundary. We need to break it down
|
||||||
|
// into messages. Incoming messages *are* guaranteed to be RESP arrays,
|
||||||
|
// https://redis.io/topics/protocol
|
||||||
|
|
||||||
|
// Only act if we have a full message (end on a msg boundary)
|
||||||
|
if !receiver.incoming_raw_msg.ends_with("}\r\n") {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
while receiver.incoming_raw_msg.len() > 0 {
|
||||||
|
let mut msg = RedisMsg::from_raw(receiver.incoming_raw_msg.clone());
|
||||||
|
let command = msg.get_next_item();
|
||||||
|
match command.as_str() {
|
||||||
|
"message" => {
|
||||||
|
let timeline = msg.get_next_item()["timeline:".len()..].to_string();
|
||||||
|
let message: Value = serde_json::from_str(&msg.get_next_item()).unwrap();
|
||||||
|
for msg_queue in receiver.msg_queues.values_mut() {
|
||||||
|
if msg_queue.redis_channel == timeline {
|
||||||
|
msg_queue.messages.push_back(message.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"subscribe" | "unsubscribe" => {
|
||||||
|
// This returns a confirmation. We don't need to do anything with it,
|
||||||
|
// but we do need to advance the cursor past it
|
||||||
|
msg.get_next_item(); // name of channel (un)subscribed
|
||||||
|
msg.cursor += ":".len();
|
||||||
|
msg.process_number(); // The number of active subscriptions
|
||||||
|
msg.cursor += "\r\n".len();
|
||||||
|
}
|
||||||
|
cmd => panic!(
|
||||||
|
"Invariant violation: bad Redis input. Got {} as a command",
|
||||||
|
cmd
|
||||||
|
),
|
||||||
|
}
|
||||||
|
receiver.incoming_raw_msg = msg.raw[msg.cursor..].to_string();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Read for AsyncReadableStream<'a> {
|
||||||
|
fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||||
|
self.0.read(buffer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> AsyncRead for AsyncReadableStream<'a> {
|
||||||
|
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
|
||||||
|
match self.read(buf) {
|
||||||
|
Ok(t) => Ok(Async::Ready(t)),
|
||||||
|
Err(_) => Ok(Async::NotReady),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct RedisMsg {
|
||||||
|
pub raw: String,
|
||||||
|
pub cursor: usize,
|
||||||
|
}
|
||||||
|
impl RedisMsg {
|
||||||
|
pub fn from_raw(raw: String) -> Self {
|
||||||
|
Self {
|
||||||
|
raw,
|
||||||
|
cursor: "*3\r\n".len(), //length of intro header
|
||||||
|
..Self::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// Move the cursor from the beginning of a number through its end and return the number
|
||||||
|
pub fn process_number(&mut self) -> usize {
|
||||||
|
let mut selection_end = self.cursor + 1;
|
||||||
|
let mut chars = self.raw.chars();
|
||||||
|
chars.nth(self.cursor);
|
||||||
|
while chars.next().expect("still in str").is_digit(10) {
|
||||||
|
selection_end += 1;
|
||||||
|
}
|
||||||
|
let selected_number = self.raw[self.cursor..selection_end]
|
||||||
|
.parse::<usize>()
|
||||||
|
.expect("checked with `.is_digit(10)`");
|
||||||
|
self.cursor = selection_end;
|
||||||
|
selected_number
|
||||||
|
}
|
||||||
|
/// In a pubsub reply from Redis, an item can be either the name of the subscribed channel
|
||||||
|
/// or the msg payload. Either way, it follows the same format:
|
||||||
|
/// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n`
|
||||||
|
pub fn get_next_item(&mut self) -> String {
|
||||||
|
self.cursor += "$".len();
|
||||||
|
let item_len = self.process_number();
|
||||||
|
self.cursor += "\r\n".len();
|
||||||
|
let item_start_position = self.cursor;
|
||||||
|
self.cursor += item_len;
|
||||||
|
let item = self.raw[item_start_position..self.cursor].to_string();
|
||||||
|
self.cursor += "\r\n".len();
|
||||||
|
item
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn simple_redis_parse() {
|
||||||
|
let input = "*3\r\n$9\r\nSUBSCRIBE\r\n$10\r\ntimeline:1\r\n:1\r\n";
|
||||||
|
let mut msg = RedisMsg::from_raw(input.to_string());
|
||||||
|
let cmd = msg.get_next_item();
|
||||||
|
assert_eq!(&cmd, "SUBSCRIBE");
|
||||||
|
let timeline = msg.get_next_item();
|
||||||
|
assert_eq!(&timeline, "timeline:1");
|
||||||
|
msg.cursor += ":1\r\n".len();
|
||||||
|
assert_eq!(msg.cursor, input.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn realistic_redis_parse() {
|
||||||
|
let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:4\r\n$1386\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102866835379605039\",\"created_at\":\"2019-09-27T22:29:02.590Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"http://localhost:3000/users/admin/statuses/102866835379605039\",\"url\":\"http://localhost:3000/@admin/102866835379605039\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"<p><span class=\\\"h-card\\\"><a href=\\\"http://localhost:3000/@susan\\\" class=\\\"u-url mention\\\">@<span>susan</span></a></span> hi</p>\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"1\",\"username\":\"admin\",\"acct\":\"admin\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-07-04T00:21:05.890Z\",\"note\":\"<p></p>\",\"url\":\"http://localhost:3000/@admin\",\"avatar\":\"http://localhost:3000/avatars/original/missing.png\",\"avatar_static\":\"http://localhost:3000/avatars/original/missing.png\",\"header\":\"http://localhost:3000/headers/original/missing.png\",\"header_static\":\"http://localhost:3000/headers/original/missing.png\",\"followers_count\":3,\"following_count\":3,\"statuses_count\":192,\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"4\",\"username\":\"susan\",\"url\":\"http://localhost:3000/@susan\",\"acct\":\"susan\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1569623342825}\r\n";
|
||||||
|
let mut msg = RedisMsg::from_raw(input.to_string());
|
||||||
|
let cmd = msg.get_next_item();
|
||||||
|
assert_eq!(&cmd, "message");
|
||||||
|
let timeline = msg.get_next_item();
|
||||||
|
assert_eq!(&timeline, "timeline:4");
|
||||||
|
let message_str = msg.get_next_item();
|
||||||
|
assert_eq!(message_str, input[41..input.len() - 2]);
|
||||||
|
assert_eq!(msg.cursor, input.len());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue