From 4a2d08c693b82b623a1793a2b5c0c4278fcf26ac Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Wed, 9 Oct 2019 14:46:56 -0400 Subject: [PATCH] Refactor/reorganize streaming code (#64) --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/config/redis_cfg.rs | 2 +- src/redis_to_client_stream/mod.rs | 4 +- .../receiver/message_queues.rs | 75 +++++ .../{receiver.rs => receiver/mod.rs} | 89 ++---- src/redis_to_client_stream/redis/mod.rs | 37 +++ .../{ => redis}/redis_cmd.rs | 6 +- .../redis/redis_connection.rs | 93 ++++++ src/redis_to_client_stream/redis/redis_msg.rs | 49 ++++ .../redis/redis_stream.rs | 110 +++++++ src/redis_to_client_stream/redis_stream.rs | 268 ------------------ 12 files changed, 390 insertions(+), 347 deletions(-) create mode 100644 src/redis_to_client_stream/receiver/message_queues.rs rename src/redis_to_client_stream/{receiver.rs => receiver/mod.rs} (65%) create mode 100644 src/redis_to_client_stream/redis/mod.rs rename src/redis_to_client_stream/{ => redis}/redis_cmd.rs (93%) create mode 100644 src/redis_to_client_stream/redis/redis_connection.rs create mode 100644 src/redis_to_client_stream/redis/redis_msg.rs create mode 100644 src/redis_to_client_stream/redis/redis_stream.rs delete mode 100644 src/redis_to_client_stream/redis_stream.rs diff --git a/Cargo.lock b/Cargo.lock index 0ca68d3..147dac6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -404,7 +404,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.4.0" +version = "0.4.1" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index a463b2c..ed88ed9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.4.0" +version = "0.4.1" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/config/redis_cfg.rs b/src/config/redis_cfg.rs index f73717f..527dadd 100644 --- a/src/config/redis_cfg.rs +++ b/src/config/redis_cfg.rs @@ -23,7 +23,7 @@ impl RedisConfig { For similar functionality, you may wish to set a REDIS_NAMESPACE"; pub fn from_env(env: EnvVar) -> Self { - let env = match env.get("REDIS_URL").map(|s| s.clone()) { + let env = match env.get("REDIS_URL").cloned() { Some(url_str) => env.update_with_url(&url_str), None => env, }; diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index 9a88858..522bae8 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -1,10 +1,8 @@ //! Stream the updates appropriate for a given `User`/`timeline` pair from Redis. pub mod client_agent; pub mod receiver; -pub mod redis_cmd; -pub mod redis_stream; +pub mod redis; -use crate::config; pub use client_agent::ClientAgent; use futures::{future::Future, stream::Stream, Async}; use log; diff --git a/src/redis_to_client_stream/receiver/message_queues.rs b/src/redis_to_client_stream/receiver/message_queues.rs new file mode 100644 index 0000000..3c556b0 --- /dev/null +++ b/src/redis_to_client_stream/receiver/message_queues.rs @@ -0,0 +1,75 @@ +use serde_json::Value; +use std::{collections, time}; +use uuid::Uuid; + +#[derive(Debug, Clone)] +pub struct MsgQueue { + pub messages: collections::VecDeque, + last_polled_at: time::Instant, + pub redis_channel: String, +} + +impl MsgQueue { + pub fn new(redis_channel: impl std::fmt::Display) -> Self { + let redis_channel = redis_channel.to_string(); + MsgQueue { + messages: collections::VecDeque::new(), + last_polled_at: time::Instant::now(), + redis_channel, + } + } +} + +#[derive(Debug)] +pub struct MessageQueues(pub collections::HashMap); + +impl MessageQueues { + pub fn update_time_for_target_queue(&mut self, id: Uuid) { + self.entry(id) + .and_modify(|queue| queue.last_polled_at = time::Instant::now()); + } + + pub fn oldest_msg_in_target_queue(&mut self, id: Uuid, timeline: String) -> Option { + self.entry(id) + .or_insert_with(|| MsgQueue::new(timeline)) + .messages + .pop_front() + } + pub fn calculate_timelines_to_add_or_drop(&mut self, timeline: String) -> Vec { + let mut timelines_to_modify = Vec::new(); + + timelines_to_modify.push(Change { + timeline: timeline.to_owned(), + in_subscriber_number: 1, + }); + self.retain(|_id, msg_queue| { + if msg_queue.last_polled_at.elapsed() < time::Duration::from_secs(30) { + true + } else { + let timeline = &msg_queue.redis_channel; + timelines_to_modify.push(Change { + timeline: timeline.to_owned(), + in_subscriber_number: -1, + }); + false + } + }); + timelines_to_modify + } +} +pub struct Change { + pub timeline: String, + pub in_subscriber_number: i32, +} + +impl std::ops::Deref for MessageQueues { + type Target = collections::HashMap; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl std::ops::DerefMut for MessageQueues { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} diff --git a/src/redis_to_client_stream/receiver.rs b/src/redis_to_client_stream/receiver/mod.rs similarity index 65% rename from src/redis_to_client_stream/receiver.rs rename to src/redis_to_client_stream/receiver/mod.rs index 25997d8..e795a77 100644 --- a/src/redis_to_client_stream/receiver.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -1,13 +1,14 @@ //! Receives data from Redis, sorts it by `ClientAgent`, and stores it until //! polled by the correct `ClientAgent`. Also manages sububscriptions and //! unsubscriptions to/from Redis. -use super::{ - config::{self, RedisInterval, RedisNamespace}, - redis_cmd, redis_stream, - redis_stream::RedisConn, +mod message_queues; +use crate::{ + config::{self, RedisInterval}, + pubsub_cmd, + redis_to_client_stream::redis::{redis_cmd, RedisConn, RedisStream}, }; -use crate::pubsub_cmd; use futures::{Async, Poll}; +pub use message_queues::{MessageQueues, MsgQueue}; use serde_json::Value; use std::{collections, net, time}; use tokio::io::Error; @@ -16,16 +17,14 @@ use uuid::Uuid; /// The item that streams from Redis and is polled by the `ClientAgent` #[derive(Debug)] pub struct Receiver { - pub pubsub_connection: net::TcpStream, + pub pubsub_connection: RedisStream, secondary_redis_connection: net::TcpStream, - pub redis_namespace: RedisNamespace, redis_poll_interval: RedisInterval, redis_polled_at: time::Instant, timeline: String, manager_id: Uuid, - pub msg_queues: collections::HashMap, + pub msg_queues: MessageQueues, clients_per_timeline: collections::HashMap, - pub incoming_raw_msg: String, } impl Receiver { @@ -40,18 +39,15 @@ impl Receiver { } = RedisConn::new(redis_cfg); Self { - pubsub_connection, + pubsub_connection: RedisStream::from_stream(pubsub_connection) + .with_namespace(redis_namespace), secondary_redis_connection, - redis_namespace, redis_poll_interval, redis_polled_at: time::Instant::now(), timeline: String::new(), manager_id: Uuid::default(), - msg_queues: collections::HashMap::new(), + msg_queues: MessageQueues(collections::HashMap::new()), clients_per_timeline: collections::HashMap::new(), - /// The unprocessed message from Redis, consisting of 0 or more - /// actual `messages` in the sense of updates to send. - incoming_raw_msg: String::new(), } } @@ -81,30 +77,9 @@ impl Receiver { /// subscribe to it. fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: &str) { let start_time = std::time::Instant::now(); - let mut timelines_to_modify = Vec::new(); - struct Change { - timeline: String, - in_subscriber_number: i32, - } - - timelines_to_modify.push(Change { - timeline: timeline.to_owned(), - in_subscriber_number: 1, - }); - - // Keep only message queues that have been polled recently - self.msg_queues.retain(|_id, msg_queue| { - if msg_queue.last_polled_at.elapsed() < time::Duration::from_secs(30) { - true - } else { - let timeline = &msg_queue.redis_channel; - timelines_to_modify.push(Change { - timeline: timeline.to_owned(), - in_subscriber_number: -1, - }); - false - } - }); + let timelines_to_modify = self + .msg_queues + .calculate_timelines_to_add_or_drop(timeline.to_string()); // Record the lower number of clients subscribed to that channel for change in timelines_to_modify { @@ -124,10 +99,6 @@ impl Receiver { log::warn!("Sending cmd to Redis took: {:?}", start_time.elapsed()); }; } - - fn get_target_msg_queue(&mut self) -> collections::hash_map::Entry { - self.msg_queues.entry(self.manager_id) - } } /// The stream that the ClientAgent polls to learn about new messages. @@ -142,23 +113,17 @@ impl futures::stream::Stream for Receiver { /// message already in a queue. Thus, we only poll Redis if it has not /// been polled lately. fn poll(&mut self) -> Poll, Self::Error> { - let timeline = self.timeline.clone(); + let (timeline, id) = (self.timeline.clone(), self.manager_id); if self.redis_polled_at.elapsed() > *self.redis_poll_interval { - redis_stream::AsyncReadableStream::poll_redis(self); + self.pubsub_connection.poll_redis(&mut self.msg_queues); self.redis_polled_at = time::Instant::now(); } // Record current time as last polled time - self.get_target_msg_queue() - .and_modify(|msg_queue| msg_queue.last_polled_at = time::Instant::now()); + self.msg_queues.update_time_for_target_queue(id); // If the `msg_queue` being polled has any new messages, return the first (oldest) one - match self - .get_target_msg_queue() - .or_insert_with(|| MsgQueue::new(timeline.clone())) - .messages - .pop_front() - { + match self.msg_queues.oldest_msg_in_target_queue(id, timeline) { Some(value) => Ok(Async::Ready(Some(value))), _ => Ok(Async::NotReady), } @@ -170,21 +135,3 @@ impl Drop for Receiver { pubsub_cmd!("unsubscribe", self, self.timeline.clone()); } } - -#[derive(Debug, Clone)] -pub struct MsgQueue { - pub messages: collections::VecDeque, - last_polled_at: time::Instant, - pub redis_channel: String, -} - -impl MsgQueue { - fn new(redis_channel: impl std::fmt::Display) -> Self { - let redis_channel = redis_channel.to_string(); - MsgQueue { - messages: collections::VecDeque::new(), - last_polled_at: time::Instant::now(), - redis_channel, - } - } -} diff --git a/src/redis_to_client_stream/redis/mod.rs b/src/redis_to_client_stream/redis/mod.rs new file mode 100644 index 0000000..2ce67a6 --- /dev/null +++ b/src/redis_to_client_stream/redis/mod.rs @@ -0,0 +1,37 @@ +pub mod redis_cmd; +pub mod redis_connection; +pub mod redis_msg; +pub mod redis_stream; + +pub use redis_connection::RedisConn; +pub use redis_stream::RedisStream; + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn simple_redis_parse() { + let input = "*3\r\n$9\r\nSUBSCRIBE\r\n$10\r\ntimeline:1\r\n:1\r\n"; + let mut msg = redis_msg::RedisMsg::from_raw(input, "timeline".len()); + let cmd = msg.next_field(); + assert_eq!(&cmd, "SUBSCRIBE"); + let timeline = msg.next_field(); + assert_eq!(&timeline, "timeline:1"); + msg.cursor += ":1\r\n".len(); + assert_eq!(msg.cursor, input.len()); + } + + #[test] + fn realistic_redis_parse() { + let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:4\r\n$1386\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102866835379605039\",\"created_at\":\"2019-09-27T22:29:02.590Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"http://localhost:3000/users/admin/statuses/102866835379605039\",\"url\":\"http://localhost:3000/@admin/102866835379605039\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"

@susan hi

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"1\",\"username\":\"admin\",\"acct\":\"admin\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-07-04T00:21:05.890Z\",\"note\":\"

\",\"url\":\"http://localhost:3000/@admin\",\"avatar\":\"http://localhost:3000/avatars/original/missing.png\",\"avatar_static\":\"http://localhost:3000/avatars/original/missing.png\",\"header\":\"http://localhost:3000/headers/original/missing.png\",\"header_static\":\"http://localhost:3000/headers/original/missing.png\",\"followers_count\":3,\"following_count\":3,\"statuses_count\":192,\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"4\",\"username\":\"susan\",\"url\":\"http://localhost:3000/@susan\",\"acct\":\"susan\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1569623342825}\r\n"; + let mut msg = redis_msg::RedisMsg::from_raw(input, "timeline".len()); + let cmd = msg.next_field(); + assert_eq!(&cmd, "message"); + let timeline = msg.next_field(); + assert_eq!(&timeline, "timeline:4"); + let message_str = msg.next_field(); + assert_eq!(message_str, input[41..input.len() - 2]); + assert_eq!(msg.cursor, input.len()); + } +} diff --git a/src/redis_to_client_stream/redis_cmd.rs b/src/redis_to_client_stream/redis/redis_cmd.rs similarity index 93% rename from src/redis_to_client_stream/redis_cmd.rs rename to src/redis_to_client_stream/redis/redis_cmd.rs index 6a9c07a..271bbe8 100644 --- a/src/redis_to_client_stream/redis_cmd.rs +++ b/src/redis_to_client_stream/redis/redis_cmd.rs @@ -7,9 +7,11 @@ macro_rules! pubsub_cmd { ($cmd:expr, $self:expr, $tl:expr) => {{ use std::io::Write; log::info!("Sending {} command to {}", $cmd, $tl); + let namespace = $self.pubsub_connection.namespace.clone(); + $self .pubsub_connection - .write_all(&redis_cmd::pubsub($cmd, $tl, $self.redis_namespace.clone())) + .write_all(&redis_cmd::pubsub($cmd, $tl, namespace.clone())) .expect("Can send command to Redis"); // Because we keep track of the number of clients subscribed to a channel on our end, // we need to manually tell Redis when we have subscribed or unsubscribed @@ -23,7 +25,7 @@ macro_rules! pubsub_cmd { .write_all(&redis_cmd::set( format!("subscribed:timeline:{}", $tl), subscription_new_number, - $self.redis_namespace.clone(), + namespace.clone(), )) .expect("Can set Redis"); diff --git a/src/redis_to_client_stream/redis/redis_connection.rs b/src/redis_to_client_stream/redis/redis_connection.rs new file mode 100644 index 0000000..947a0d1 --- /dev/null +++ b/src/redis_to_client_stream/redis/redis_connection.rs @@ -0,0 +1,93 @@ +use super::redis_cmd; +use crate::config::{RedisConfig, RedisInterval, RedisNamespace}; +use crate::err; +use std::{io::Read, io::Write, net, time}; + +pub struct RedisConn { + pub primary: net::TcpStream, + pub secondary: net::TcpStream, + pub namespace: RedisNamespace, + pub polling_interval: RedisInterval, +} + +fn send_password(mut conn: net::TcpStream, password: &str) -> net::TcpStream { + conn.write_all(&redis_cmd::cmd("auth", &password)).unwrap(); + let mut buffer = vec![0u8; 5]; + conn.read_exact(&mut buffer).unwrap(); + let reply = String::from_utf8(buffer.to_vec()).unwrap(); + if reply != "+OK\r\n" { + err::die_with_msg(format!( + r"Incorrect Redis password. You supplied `{}`. + Please supply correct password with REDIS_PASSWORD environmental variable.", + password, + )) + }; + conn +} + +fn set_db(mut conn: net::TcpStream, db: &str) -> net::TcpStream { + conn.write_all(&redis_cmd::cmd("SELECT", &db)).unwrap(); + conn +} + +fn send_test_ping(mut conn: net::TcpStream) -> net::TcpStream { + conn.write_all(b"PING\r\n").unwrap(); + let mut buffer = vec![0u8; 7]; + conn.read_exact(&mut buffer).unwrap(); + let reply = String::from_utf8(buffer.to_vec()).unwrap(); + match reply.as_str() { + "+PONG\r\n" => (), + "-NOAUTH" => err::die_with_msg( + r"Invalid authentication for Redis. + Redis reports that it needs a password, but you did not provide one. + You can set a password with the REDIS_PASSWORD environmental variable.", + ), + "HTTP/1." => err::die_with_msg( + r"The server at REDIS_HOST and REDIS_PORT is not a Redis server. + Please update the REDIS_HOST and/or REDIS_PORT environmental variables.", + ), + _ => err::die_with_msg(format!( + "Could not connect to Redis for unknown reason. Expected `+PONG` reply but got {}", + reply + )), + }; + conn +} + +impl RedisConn { + pub fn new(redis_cfg: RedisConfig) -> Self { + let addr = net::SocketAddr::from((*redis_cfg.host, *redis_cfg.port)); + let conn_err = |e| { + err::die_with_msg(format!( + "Could not connect to Redis at {}:{}.\n Error detail: {}", + *redis_cfg.host, *redis_cfg.port, e, + )) + }; + let update_conn = |mut conn| { + if let Some(password) = redis_cfg.password.clone() { + conn = send_password(conn, &password); + } + conn = send_test_ping(conn); + conn.set_read_timeout(Some(time::Duration::from_millis(10))) + .expect("Can set read timeout for Redis connection"); + if let Some(db) = &*redis_cfg.db { + conn = set_db(conn, db); + } + conn + }; + let (primary_conn, secondary_conn) = ( + update_conn(net::TcpStream::connect(addr).unwrap_or_else(conn_err)), + update_conn(net::TcpStream::connect(addr).unwrap_or_else(conn_err)), + ); + primary_conn + .set_nonblocking(true) + .expect("set_nonblocking call failed"); + + Self { + primary: primary_conn, + secondary: secondary_conn, + namespace: redis_cfg.namespace, + polling_interval: redis_cfg.polling_interval, + } + } +} diff --git a/src/redis_to_client_stream/redis/redis_msg.rs b/src/redis_to_client_stream/redis/redis_msg.rs new file mode 100644 index 0000000..3ec0ded --- /dev/null +++ b/src/redis_to_client_stream/redis/redis_msg.rs @@ -0,0 +1,49 @@ +use serde_json::Value; + +#[derive(Debug)] +pub struct RedisMsg<'a> { + pub raw: &'a str, + pub cursor: usize, + pub prefix_len: usize, +} + +impl<'a> RedisMsg<'a> { + pub fn from_raw(raw: &'a str, prefix_len: usize) -> Self { + Self { + raw, + cursor: "*3\r\n".len(), //length of intro header + prefix_len, + } + } + /// Move the cursor from the beginning of a number through its end and return the number + pub fn process_number(&mut self) -> usize { + let (mut selected_number, selection_start) = (0, self.cursor); + while let Ok(number) = self.raw[selection_start..=self.cursor].parse::() { + self.cursor += 1; + selected_number = number; + } + selected_number + } + /// In a pubsub reply from Redis, an item can be either the name of the subscribed channel + /// or the msg payload. Either way, it follows the same format: + /// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n` + pub fn next_field(&mut self) -> String { + self.cursor += "$".len(); + + let item_len = self.process_number(); + self.cursor += "\r\n".len(); + let item_start_position = self.cursor; + self.cursor += item_len; + let item = self.raw[item_start_position..self.cursor].to_string(); + self.cursor += "\r\n".len(); + item + } + + pub fn extract_timeline_and_message(&mut self) -> (String, Value) { + let timeline = &self.next_field()[self.prefix_len..]; + let msg_txt = self.next_field(); + let msg_value: Value = + serde_json::from_str(&msg_txt).expect("Invariant violation: Invalid JSON from Redis"); + (timeline.to_string(), msg_value) + } +} diff --git a/src/redis_to_client_stream/redis/redis_stream.rs b/src/redis_to_client_stream/redis/redis_stream.rs new file mode 100644 index 0000000..d647e27 --- /dev/null +++ b/src/redis_to_client_stream/redis/redis_stream.rs @@ -0,0 +1,110 @@ +use super::redis_msg::RedisMsg; +use crate::{config::RedisNamespace, redis_to_client_stream::receiver::MessageQueues}; +use futures::{Async, Poll}; +use std::{io::Read, net}; +use tokio::io::AsyncRead; + +#[derive(Debug)] +pub struct RedisStream { + pub inner: net::TcpStream, + incoming_raw_msg: String, + pub namespace: RedisNamespace, +} + +impl RedisStream { + pub fn from_stream(inner: net::TcpStream) -> Self { + RedisStream { + inner, + incoming_raw_msg: String::new(), + namespace: RedisNamespace(None), + } + } + pub fn with_namespace(self, namespace: RedisNamespace) -> Self { + RedisStream { namespace, ..self } + } + // Text comes in from redis as a raw stream, which could be more than one message + // and is not guaranteed to end on a message boundary. We need to break it down + // into messages. Incoming messages *are* guaranteed to be RESP arrays, + // https://redis.io/topics/protocol + /// Adds any new Redis messages to the `MsgQueue` for the appropriate `ClientAgent`. + pub fn poll_redis(&mut self, msg_queues: &mut MessageQueues) { + let mut buffer = vec![0u8; 6000]; + + if let Async::Ready(num_bytes_read) = self.poll_read(&mut buffer).unwrap() { + let raw_utf = self.as_utf8(buffer, num_bytes_read); + self.incoming_raw_msg.push_str(&raw_utf); + + // Only act if we have a full message (end on a msg boundary) + if !self.incoming_raw_msg.ends_with("}\r\n") { + return; + }; + let prefix_to_skip = match &*self.namespace { + Some(namespace) => format!("{}:timeline:", namespace), + None => "timeline:".to_string(), + }; + + let mut msg = RedisMsg::from_raw(&self.incoming_raw_msg, prefix_to_skip.len()); + + while !msg.raw.is_empty() { + let command = msg.next_field(); + match command.as_str() { + "message" => { + let (timeline, msg_value) = msg.extract_timeline_and_message(); + for msg_queue in msg_queues.values_mut() { + if msg_queue.redis_channel == timeline { + msg_queue.messages.push_back(msg_value.clone()); + } + } + } + + "subscribe" | "unsubscribe" => { + // No msg, so ignore & advance cursor to end + let _channel = msg.next_field(); + msg.cursor += ":".len(); + let _active_subscriptions = msg.process_number(); + msg.cursor += "\r\n".len(); + } + cmd => panic!("Invariant violation: {} is invalid Redis input", cmd), + }; + msg = RedisMsg::from_raw(&msg.raw[msg.cursor..], msg.prefix_len); + } + self.incoming_raw_msg.clear(); + } + } + + fn as_utf8(&mut self, cur_buffer: Vec, size: usize) -> String { + String::from_utf8(cur_buffer[..size].to_vec()).unwrap_or_else(|_| { + let mut new_buffer = vec![0u8; 1]; + self.poll_read(&mut new_buffer).unwrap(); + let buffer = ([cur_buffer, new_buffer]).concat(); + self.as_utf8(buffer, size + 1) + }) + } +} + +impl std::ops::Deref for RedisStream { + type Target = net::TcpStream; + fn deref(&self) -> &Self::Target { + &self.inner + } +} +impl std::ops::DerefMut for RedisStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Read for RedisStream { + fn read(&mut self, buffer: &mut [u8]) -> Result { + self.inner.read(buffer) + } +} + +impl AsyncRead for RedisStream { + fn poll_read(&mut self, buf: &mut [u8]) -> Poll { + match self.read(buf) { + Ok(t) => Ok(Async::Ready(t)), + Err(_) => Ok(Async::NotReady), + } + } +} diff --git a/src/redis_to_client_stream/redis_stream.rs b/src/redis_to_client_stream/redis_stream.rs deleted file mode 100644 index f0132d8..0000000 --- a/src/redis_to_client_stream/redis_stream.rs +++ /dev/null @@ -1,268 +0,0 @@ -use super::receiver::Receiver; -use crate::{ - config::{self, RedisInterval, RedisNamespace}, - err, - redis_to_client_stream::redis_cmd, -}; -use futures::{Async, Poll}; -use serde_json::Value; -use std::{io::Read, io::Write, net, time}; -use tokio::io::AsyncRead; - -pub struct RedisConn { - pub primary: net::TcpStream, - pub secondary: net::TcpStream, - pub namespace: RedisNamespace, - pub polling_interval: RedisInterval, -} - -fn send_password(mut conn: net::TcpStream, password: &str) -> net::TcpStream { - conn.write_all(&redis_cmd::cmd("auth", &password)).unwrap(); - let mut buffer = vec![0u8; 5]; - conn.read_exact(&mut buffer).unwrap(); - let reply = String::from_utf8(buffer.to_vec()).unwrap(); - if reply != "+OK\r\n" { - err::die_with_msg(format!( - r"Incorrect Redis password. You supplied `{}`. - Please supply correct password with REDIS_PASSWORD environmental variable.", - password, - )) - }; - conn -} - -fn set_db(mut conn: net::TcpStream, db: &str) -> net::TcpStream { - conn.write_all(&redis_cmd::cmd("SELECT", &db)).unwrap(); - conn -} - -fn send_test_ping(mut conn: net::TcpStream) -> net::TcpStream { - conn.write_all(b"PING\r\n").unwrap(); - let mut buffer = vec![0u8; 7]; - conn.read_exact(&mut buffer).unwrap(); - let reply = String::from_utf8(buffer.to_vec()).unwrap(); - match reply.as_str() { - "+PONG\r\n" => (), - "-NOAUTH" => err::die_with_msg( - r"Invalid authentication for Redis. - Redis reports that it needs a password, but you did not provide one. - You can set a password with the REDIS_PASSWORD environmental variable.", - ), - "HTTP/1." => err::die_with_msg( - r"The server at REDIS_HOST and REDIS_PORT is not a Redis server. - Please update the REDIS_HOST and/or REDIS_PORT environmental variables.", - ), - _ => err::die_with_msg(format!( - "Could not connect to Redis for unknown reason. Expected `+PONG` reply but got {}", - reply - )), - }; - conn -} - -impl RedisConn { - pub fn new(redis_cfg: config::RedisConfig) -> Self { - let addr = net::SocketAddr::from((*redis_cfg.host, *redis_cfg.port)); - let conn_err = |e| { - err::die_with_msg(format!( - "Could not connect to Redis at {}:{}.\n Error detail: {}", - *redis_cfg.host, *redis_cfg.port, e, - )) - }; - let update_conn = |mut conn| { - if let Some(password) = redis_cfg.password.clone() { - conn = send_password(conn, &password); - } - conn = send_test_ping(conn); - conn.set_read_timeout(Some(time::Duration::from_millis(10))) - .expect("Can set read timeout for Redis connection"); - if let Some(db) = &*redis_cfg.db { - conn = set_db(conn, db); - } - conn - }; - let (primary_conn, secondary_conn) = ( - update_conn(net::TcpStream::connect(addr).unwrap_or_else(conn_err)), - update_conn(net::TcpStream::connect(addr).unwrap_or_else(conn_err)), - ); - primary_conn - .set_nonblocking(true) - .expect("set_nonblocking call failed"); - - Self { - primary: primary_conn, - secondary: secondary_conn, - namespace: redis_cfg.namespace, - polling_interval: redis_cfg.polling_interval, - } - } -} - -pub struct AsyncReadableStream<'a>(&'a mut net::TcpStream); - -impl<'a> AsyncReadableStream<'a> { - pub fn new(stream: &'a mut net::TcpStream) -> Self { - Self(stream) - } - - // Text comes in from redis as a raw stream, which could be more than one message - // and is not guaranteed to end on a message boundary. We need to break it down - // into messages. Incoming messages *are* guaranteed to be RESP arrays, - // https://redis.io/topics/protocol - /// Adds any new Redis messages to the `MsgQueue` for the appropriate `ClientAgent`. - pub fn poll_redis(receiver: &mut Receiver) { - let mut buffer = vec![0u8; 6000]; - let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection); - - if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() { - let raw_redis_response = async_stream.as_utf8(buffer, num_bytes_read); - dbg!(&raw_redis_response); - if raw_redis_response.starts_with("-NOAUTH") { - err::die_with_msg( - r"Invalid authentication for Redis. -Do you need a password? -If so, set it with the REDIS_PASSWORD environmental variable.", - ); - } else if raw_redis_response.starts_with("HTTP") { - err::die_with_msg( - r"The server at REDIS_HOST and REDIS_PORT is not a Redis server. -Please update the REDIS_HOST and/or REDIS_PORT environmental variables with the correct values.", - ); - } - - receiver.incoming_raw_msg.push_str(&raw_redis_response); - - // Only act if we have a full message (end on a msg boundary) - if !receiver.incoming_raw_msg.ends_with("}\r\n") { - return; - }; - let mut msg = RedisMsg::from_raw(&receiver.incoming_raw_msg); - - let prefix_to_skip = match &*receiver.redis_namespace { - Some(namespace) => format!("{}:timeline:", namespace), - None => "timeline:".to_string(), - }; - - while !msg.raw.is_empty() { - let command = msg.next_field(); - match command.as_str() { - "message" => { - let timeline = &msg.next_field()[prefix_to_skip.len()..]; - let msg_txt = &msg.next_field(); - let msg_value: Value = match serde_json::from_str(msg_txt) { - Ok(v) => v, - Err(e) => panic!("Unparseable json {}\n\n{}", msg_txt, e), - }; - for msg_queue in receiver.msg_queues.values_mut() { - if msg_queue.redis_channel == timeline { - msg_queue.messages.push_back(msg_value.clone()); - } - } - } - "subscribe" | "unsubscribe" => { - // No msg, so ignore & advance cursor to end - let _channel = msg.next_field(); - msg.cursor += ":".len(); - let _active_subscriptions = msg.process_number(); - msg.cursor += "\r\n".len(); - } - cmd => panic!("Invariant violation: {} is invalid Redis input", cmd), - }; - msg = RedisMsg::from_raw(&msg.raw[msg.cursor..]); - } - receiver.incoming_raw_msg.clear(); - } - } - - fn as_utf8(&mut self, cur_buffer: Vec, size: usize) -> String { - String::from_utf8(cur_buffer[..size].to_vec()).unwrap_or_else(|_| { - let mut new_buffer = vec![0u8; 1]; - self.poll_read(&mut new_buffer).unwrap(); - let buffer = ([cur_buffer, new_buffer]).concat(); - self.as_utf8(buffer, size + 1) - }) - } -} - -impl<'a> Read for AsyncReadableStream<'a> { - fn read(&mut self, buffer: &mut [u8]) -> Result { - self.0.read(buffer) - } -} - -impl<'a> AsyncRead for AsyncReadableStream<'a> { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll { - match self.read(buf) { - Ok(t) => Ok(Async::Ready(t)), - Err(_) => Ok(Async::NotReady), - } - } -} - -#[derive(Debug)] -pub struct RedisMsg<'a> { - pub raw: &'a str, - pub cursor: usize, -} - -impl<'a> RedisMsg<'a> { - pub fn from_raw(raw: &'a str) -> Self { - Self { - raw, - cursor: "*3\r\n".len(), //length of intro header - } - } - /// Move the cursor from the beginning of a number through its end and return the number - pub fn process_number(&mut self) -> usize { - let (mut selected_number, selection_start) = (0, self.cursor); - while let Ok(number) = self.raw[selection_start..=self.cursor].parse::() { - self.cursor += 1; - selected_number = number; - } - selected_number - } - /// In a pubsub reply from Redis, an item can be either the name of the subscribed channel - /// or the msg payload. Either way, it follows the same format: - /// `$[LENGTH_OF_ITEM_BODY]\r\n[ITEM_BODY]\r\n` - pub fn next_field(&mut self) -> String { - self.cursor += "$".len(); - - let item_len = self.process_number(); - self.cursor += "\r\n".len(); - let item_start_position = self.cursor; - self.cursor += item_len; - let item = self.raw[item_start_position..self.cursor].to_string(); - self.cursor += "\r\n".len(); - item - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn simple_redis_parse() { - let input = "*3\r\n$9\r\nSUBSCRIBE\r\n$10\r\ntimeline:1\r\n:1\r\n"; - let mut msg = RedisMsg::from_raw(input); - let cmd = msg.next_field(); - assert_eq!(&cmd, "SUBSCRIBE"); - let timeline = msg.next_field(); - assert_eq!(&timeline, "timeline:1"); - msg.cursor += ":1\r\n".len(); - assert_eq!(msg.cursor, input.len()); - } - - #[test] - fn realistic_redis_parse() { - let input = "*3\r\n$7\r\nmessage\r\n$10\r\ntimeline:4\r\n$1386\r\n{\"event\":\"update\",\"payload\":{\"id\":\"102866835379605039\",\"created_at\":\"2019-09-27T22:29:02.590Z\",\"in_reply_to_id\":null,\"in_reply_to_account_id\":null,\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"http://localhost:3000/users/admin/statuses/102866835379605039\",\"url\":\"http://localhost:3000/@admin/102866835379605039\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"content\":\"

@susan hi

\",\"reblog\":null,\"application\":{\"name\":\"Web\",\"website\":null},\"account\":{\"id\":\"1\",\"username\":\"admin\",\"acct\":\"admin\",\"display_name\":\"\",\"locked\":false,\"bot\":false,\"created_at\":\"2019-07-04T00:21:05.890Z\",\"note\":\"

\",\"url\":\"http://localhost:3000/@admin\",\"avatar\":\"http://localhost:3000/avatars/original/missing.png\",\"avatar_static\":\"http://localhost:3000/avatars/original/missing.png\",\"header\":\"http://localhost:3000/headers/original/missing.png\",\"header_static\":\"http://localhost:3000/headers/original/missing.png\",\"followers_count\":3,\"following_count\":3,\"statuses_count\":192,\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[{\"id\":\"4\",\"username\":\"susan\",\"url\":\"http://localhost:3000/@susan\",\"acct\":\"susan\"}],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null},\"queued_at\":1569623342825}\r\n"; - let mut msg = RedisMsg::from_raw(input); - let cmd = msg.next_field(); - assert_eq!(&cmd, "message"); - let timeline = msg.next_field(); - assert_eq!(&timeline, "timeline:4"); - let message_str = msg.next_field(); - assert_eq!(message_str, input[41..input.len() - 2]); - assert_eq!(msg.cursor, input.len()); - } -}