Update documentation and restructure code

This commit is contained in:
Daniel Sockwell 2019-07-08 15:21:02 -04:00
parent d6ae45b292
commit 866f3ee34d
12 changed files with 182 additions and 117 deletions

1
Cargo.lock generated
View File

@ -819,6 +819,7 @@ version = "0.1.0"
dependencies = [
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"postgres 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)",
"pretty_env_logger 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -18,6 +18,7 @@ pretty_env_logger = "0.3.0"
postgres = "0.15.2"
uuid = { version = "0.7", features = ["v4"] }
dotenv = "0.14.0"
lazy_static = "1.3.0"
[features]
default = [ "production" ]

View File

@ -2,31 +2,63 @@
A WIP blazingly fast drop-in replacement for the Mastodon streaming api server.
## Current status
The streaming server is very much a work in progress. It is currently missing essential features including support for SSL, CORS, and separate development/production environments. However, it has reached the point where it is usable/testable in a localhost development environment and I would greatly appreciate any testing, bug reports, or other feedback you could provide.
The streaming server is currently a work in progress. However, it is now testable and, if
configured properly, would theoretically be usable in production—though production use is
not advisable until we have completed further testing. I would greatly appreciate any testing,
bug reports, or other feedback you could provide.
## Installation
Installing the WIP version requires the Rust toolchain (the released version will be available as a pre-compiled binary). To install, clone this repository and run `cargo build` (to build the server) `cargo run` (to both build and run the server), or `cargo build --release` (to build the server with release optimizations).
Installing the WIP version requires the Rust toolchain (the released version will be available
as a pre-compiled binary). To install, clone this repository and run `cargo build` (to build
the server) `cargo run` (to both build and run the server), or `cargo build --release` (to
build the server with release optimizations).
## Connection to Mastodon
The streaming server expects to connect to a running development version of Mastodon built off of the `master` branch. Specifically, it needs to connect to both the Postgres database (to authenticate users) and to the Redis database. You should run Mastodon in whatever way you normally do and configure the streaming server to connect to the appropriate databases.
The streaming server expects to connect to a running development version of Mastodon built off of
the `master` branch. Specifically, it needs to connect to both the Postgres database (to
authenticate users) and to the Redis database. You should run Mastodon in whatever way you
normally do and configure the streaming server to connect to the appropriate databases.
## Configuring
You may edit the (currently limited) configuration variables in the `.env` file. Note that, by default, this server is configured to run on port 4000. This allows for easy testing with the development version of Mastodon (which, by default, is configured to communicate with a streaming server running on `localhost:4000`). However, it also conflicts with the current/Node.js version of Mastodon's streaming server, which runs on the same port. Thus, to test this server, you should disable the other streaming server or move it to a non-conflicting port.
You may edit the configuration variables in the `config.rs` module. You can also overwrite the
default config variables in the `.env` file. Note that, by default, this server is configured
to run on port 4000. This allows for easy testing with the development version of Mastodon
(which, by default, is configured to communicate with a streaming server running on
`localhost:4000`). However, it also conflicts with the current/Node.js version of Mastodon's
streaming server, which runs on the same port. Thus, to test this server, you should disable
the Node streaming server or move it to a non-conflicting port.
## Documentation
Build documentation with `cargo doc --open`, which will build the Markdown docs and open them in your browser. Please consult those docs for a description of the code structure/organization.
Build documentation with `cargo doc --open`, which will build the Markdown docs and open them
in your browser. Please consult those docs for a detailed description of the code
structure/organization. The documentation also contains additional notes about data flow and
options for configuration.
## Running
As noted above, you can run the server with `cargo run`. Alternatively, if you built the sever using `cargo build` or `cargo build --release`, you can run the executable produced in the `target/build/debug` folder or the `target/build/release` folder.
As noted above, you can run the server with `cargo run`. Alternatively, if you built the sever
using `cargo build` or `cargo build --release`, you can run the executable produced in the
`target/build/debug` folder or the `target/build/release` folder.
## Unit and (limited) integration tests
You can run basic unit test of the public Server Sent Event endpoints with `cargo test`. You can run integration tests of the authenticated SSE endpoints (which require a Postgres connection) with `cargo test -- --ignored`.
You can run basic unit test of the public Server Sent Event endpoints with `cargo test`. You can
run integration tests of the authenticated SSE endpoints (which require a Postgres connection)
with `cargo test -- --ignored`.
## Manual testing
Once the streaming server is running, you can also test it manually. You can test it using a browser connected to the relevant Mastodon development server. Or you can test the SSE endpoints with `curl`, PostMan, or any other HTTP client. Similarly, you can test the WebSocket endpoints with `websocat` or any other WebSocket client.
Once the streaming server is running, you can also test it manually. You can test it using a
browser connected to the relevant Mastodon development server. Or you can test the SSE endpoints
with `curl`, PostMan, or any other HTTP client. Similarly, you can test the WebSocket endpoints
with `websocat` or any other WebSocket client.
## Memory/CPU usage
Note that memory usage is higher when running the development version of the streaming server (the one generated with `cargo run` or `cargo build`). If you are interested in measuring RAM or CPU usage, you should likely run `cargo build --release` and test the release version of the executable.
Note that memory usage is higher when running the development version of the streaming server (the
one generated with `cargo run` or `cargo build`). If you are interested in measuring RAM or CPU
usage, you should likely run `cargo build --release` and test the release version of the executable.
## Load testing
I have not yet found a good way to test the streaming server under load. I have experimented with using `artillery` or other load-testing utilities. However, every utility I am familiar with or have found is built around either HTTP requests or WebSocket connections in which the client sends messages. I have not found a good solution to test receiving SSEs or WebSocket connections where the client does not transmit data after establishing the connection. If you are aware of a good way to do load testing, please let me know.
I have not yet found a good way to test the streaming server under load. I have experimented with
using `artillery` or other load-testing utilities. However, every utility I am familiar with or
have found is built around either HTTP requests or WebSocket connections in which the client sends
messages. I have not found a good solution to test receiving SSEs or WebSocket connections where
the client does not transmit data after establishing the connection. If you are aware of a good
way to do load testing, please let me know.

View File

@ -1,5 +1,8 @@
//! Configuration settings and custom errors for servers and databases
//! Configuration defaults. All settings with the prefix of `DEFAULT_` can be overridden
//! by an environmental variable of the same name without that prefix (either by setting
//! the variable at runtime or in the `.env` file)
use dotenv::dotenv;
use lazy_static::lazy_static;
use log::warn;
use serde_derive::Serialize;
use std::{env, net, time};
@ -10,10 +13,46 @@ const DEFAULT_POSTGRES_ADDR: &str = "postgres://@localhost/mastodon_development"
const DEFAULT_REDIS_ADDR: &str = "127.0.0.1:6379";
const DEFAULT_SERVER_ADDR: &str = "127.0.0.1:4000";
/// The frequency with which the StreamAgent will poll for updates to send via SSE
pub const DEFAULT_SSE_UPDATE_INTERVAL: u64 = 100;
pub const DEFAULT_WS_UPDATE_INTERVAL: u64 = 100;
pub const DEFAULT_REDIS_POLL_INTERVAL: u64 = 100;
const DEFAULT_SSE_UPDATE_INTERVAL: u64 = 100;
const DEFAULT_WS_UPDATE_INTERVAL: u64 = 100;
const DEFAULT_REDIS_POLL_INTERVAL: u64 = 100;
lazy_static! {
static ref POSTGRES_ADDR: String = env::var("POSTGRESS_ADDR").unwrap_or_else(|_| {
let mut postgres_addr = DEFAULT_POSTGRES_ADDR.to_string();
postgres_addr.insert_str(11,
&env::var("USER").unwrap_or_else(|_| {
warn!("No USER env variable set. Connecting to Postgress with default `postgres` user");
"postgres".to_string()
}).as_str()
);
postgres_addr
});
static ref REDIS_ADDR: String = env::var("REDIS_ADDR").unwrap_or_else(|_| DEFAULT_REDIS_ADDR.to_owned());
pub static ref SERVER_ADDR: net::SocketAddr = env::var("SERVER_ADDR")
.unwrap_or_else(|_| DEFAULT_SERVER_ADDR.to_owned())
.parse()
.expect("static string");
/// Interval, in ms, at which the `ClientAgent` polls the `Receiver` for updates to send via SSE.
pub static ref SSE_UPDATE_INTERVAL: u64 = env::var("SSE_UPDATE_INTERVAL")
.map(|s| s.parse().expect("Valid config"))
.unwrap_or(DEFAULT_SSE_UPDATE_INTERVAL);
/// Interval, in ms, at which the `ClientAgent` polls the `Receiver` for updates to send via WS.
pub static ref WS_UPDATE_INTERVAL: u64 = env::var("WS_UPDATE_INTERVAL")
.map(|s| s.parse().expect("Valid config"))
.unwrap_or(DEFAULT_WS_UPDATE_INTERVAL);
/// Interval, in ms, at which the `Receiver` polls Redis.
/// **NOTE**: Polling Redis is much more time consuming than polling the `Receiver`
/// (on the order of 10ms rather than 50μs). Thus, changing this setting
/// would be a good place to start for performance improvements at the cost
/// of delaying all updates.
pub static ref REDIS_POLL_INTERVAL: u64 = env::var("REDIS_POLL_INTERVAL")
.map(|s| s.parse().expect("Valid config"))
.unwrap_or(DEFAULT_REDIS_POLL_INTERVAL);
}
/// Configure CORS for the API server
pub fn cross_origin_resource_sharing() -> warp::filters::cors::Cors {
@ -31,42 +70,25 @@ pub fn logging_and_env() {
/// Configure Postgres and return a connection
pub fn postgres() -> postgres::Connection {
let postgres_addr = env::var("POSTGRESS_ADDR").unwrap_or_else(|_| {
let mut postgres_addr = DEFAULT_POSTGRES_ADDR.to_string();
postgres_addr.insert_str(11,
&env::var("USER").unwrap_or_else(|_| {
warn!("No USER env variable set. Connecting to Postgress with default `postgres` user");
"postgres".to_string()
}).as_str()
);
postgres_addr
});
postgres::Connection::connect(postgres_addr, postgres::TlsMode::None)
postgres::Connection::connect(POSTGRES_ADDR.to_string(), postgres::TlsMode::None)
.expect("Can connect to local Postgres")
}
/// Configure Redis
pub fn redis_addr() -> (net::TcpStream, net::TcpStream) {
let redis_addr = env::var("REDIS_ADDR").unwrap_or_else(|_| DEFAULT_REDIS_ADDR.to_owned());
let pubsub_connection = net::TcpStream::connect(&redis_addr).expect("Can connect to Redis");
let pubsub_connection =
net::TcpStream::connect(&REDIS_ADDR.to_string()).expect("Can connect to Redis");
pubsub_connection
.set_read_timeout(Some(time::Duration::from_millis(10)))
.expect("Can set read timeout for Redis connection");
let secondary_redis_connection =
net::TcpStream::connect(&redis_addr).expect("Can connect to Redis");
net::TcpStream::connect(&REDIS_ADDR.to_string()).expect("Can connect to Redis");
secondary_redis_connection
.set_read_timeout(Some(time::Duration::from_millis(10)))
.expect("Can set read timeout for Redis connection");
(pubsub_connection, secondary_redis_connection)
}
pub fn socket_address() -> net::SocketAddr {
env::var("SERVER_ADDR")
.unwrap_or_else(|_| DEFAULT_SERVER_ADDR.to_owned())
.parse()
.expect("static string")
}
#[derive(Serialize)]
pub struct ErrorMessage {
error: String,

View File

@ -6,26 +6,37 @@
//! connect to the server with the API described [in Mastodon's public API
//! documentation](https://docs.joinmastodon.org/api/streaming/).
//!
//! # Notes on data flow
//! * **Client Request → Warp**:
//! Warp filters for valid requests and parses request data. Based on that data, it generates a `User`
//! representing the client that made the request with data from the client's request and from
//! Postgres. The `User` is authenticated, if appropriate. Warp //! repeatedly polls the
//! ClientAgent for information relevant to the User.
//! # Data Flow
//! * **Parsing the client request**
//! When the client request first comes in, it is parsed based on the endpoint it targets (for
//! server sent events), its query parameters, and its headers (for WebSocket). Based on this
//! data, we authenticate the user, retrieve relevant user data from Postgres, and determine the
//! timeline targeted by the request. Successfully parsing the client request results in generating
//! a `User` and target `timeline` for the request. If any requests are invalid/not authorized, we
//! reject them in this stage.
//! * **Streaming update from Redis to the client**:
//! After the user request is parsed, we pass the `User` and `timeline` data on to the
//! `ClientAgent`. The `ClientAgent` is responsible for communicating the user's request to the
//! `Receiver`, polling the `Receiver` for any updates, and then for wording those updates on to the
//! client. The `Receiver`, in tern, is responsible for managing the Redis subscriptions,
//! periodically polling Redis, and sorting the replies from Redis into queues for when it is polled
//! by the `ClientAgent`.
//!
//! * **Warp → ClientAgent**:
//! A new `ClientAgent` is created for each request. The `ClientAgent` exists to manage concurrent
//! access to the (single) `Receiver`, which it can access behind an `Arc<Mutex>`. The `ClientAgent`
//! polls the `Receiver` for any updates relevant to the current client. If there are updates, the
//! `ClientAgent` filters them with the client's filters and passes any matching updates up to Warp.
//! The `ClientAgent` is also responsible for sending `subscribe` commands to Redis (via the
//! `Receiver`) when necessary.
//! # Concurrency
//! The `Receiver` is created when the server is first initialized, and there is only one
//! `Receiver`. Thus, the `Receiver` is a potential bottleneck. On the other hand, each
//! client request results in a new green thread, which spawns its own `ClientAgent`. Thus,
//! their will be many `ClientAgent`s polling a single `Receiver`. Accordingly, it is very
//! important that polling the `Receiver` remain as fast as possible. It is less important
//! that the `Receiver`'s poll of Redis be fast, since there will only ever be one
//! `Receiver`.
//!
//! # Configuration
//! By default, the server uses config values from the `config.rs` module; these values can be
//! overwritten with environmental variables or in the `.env` file. The most important settings
//! for performance control the frequency with which the `ClientAgent` polls the `Receiver` and
//! the frequency with which the `Receiver` polls Redis.
//!
//! * **ClientAgent → Receiver**:
//! The Receiver receives data from Redis and stores it in a series of queues (one for each
//! ClientAgent). When (asynchronously) polled by the ClientAgent, it sends back the messages
//! relevant to that ClientAgent and removes them from the queue.
pub mod config;
pub mod parse_client_request;
pub mod redis_to_client_stream;

View File

@ -110,7 +110,6 @@ fn main() {
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
let cors = config::cross_origin_resource_sharing();
let address = config::socket_address();
warp::serve(websocket_routes.or(sse_routes).with(cors)).run(address);
warp::serve(websocket_routes.or(sse_routes).with(cors)).run(*config::SERVER_ADDR);
}

View File

@ -1,3 +1,4 @@
//! Parse the client request and return a 'timeline' and a (maybe authenticated) `User`
pub mod query;
pub mod sse;
pub mod user;

View File

@ -1,4 +1,4 @@
//! WebSocket functionality
//! Filters for the WebSocket endpoint
use super::{
query,
user::{Scope, User},

View File

@ -4,7 +4,7 @@
//! The `ClientAgent`'s interface is very simple. All you can do with it is:
//! * Create a totally new `ClientAgent` with no shared data;
//! * Clone an existing `ClientAgent`, sharing the `Receiver`;
//! * to manage an new timeline/user pair; or
//! * Manage an new timeline/user pair; or
//! * Poll an existing `ClientAgent` to see if there are any new messages
//! for clients
//!
@ -18,6 +18,7 @@
use super::receiver::Receiver;
use crate::parse_client_request::user::User;
use futures::{Async, Poll};
use log;
use serde_json::{json, Value};
use std::{sync, time};
use tokio::io::Error;
@ -94,7 +95,7 @@ impl futures::stream::Stream for ClientAgent {
};
if start_time.elapsed() > time::Duration::from_millis(20) {
println!("Polling took: {:?}", start_time.elapsed());
log::warn!("Polling took: {:?}", start_time.elapsed());
}
match result {
Ok(Async::Ready(Some(value))) => {

View File

@ -1,3 +1,4 @@
//! Stream the updates appropriate for a given `User`/`timeline` pair from Redis.
pub mod client_agent;
pub mod receiver;
pub mod redis_cmd;
@ -5,18 +6,17 @@ pub mod redis_cmd;
use crate::config;
pub use client_agent::ClientAgent;
use futures::{future::Future, stream::Stream, Async};
use std::{env, time};
use log;
use std::time;
/// Send a stream of replies to a Server Sent Events client.
pub fn send_updates_to_sse(
mut client_agent: ClientAgent,
connection: warp::sse::Sse,
) -> impl warp::reply::Reply {
let sse_update_interval = env::var("SSE_UPDATE_INTERVAL")
.map(|s| s.parse().expect("Valid config"))
.unwrap_or(config::DEFAULT_SSE_UPDATE_INTERVAL);
let event_stream = tokio::timer::Interval::new(
time::Instant::now(),
time::Duration::from_millis(sse_update_interval),
time::Duration::from_millis(*config::SSE_UPDATE_INTERVAL),
)
.filter_map(move |_| match client_agent.poll() {
Ok(Async::Ready(Some(json_value))) => Some((
@ -29,7 +29,7 @@ pub fn send_updates_to_sse(
connection.reply(warp::sse::keep(event_stream, None))
}
/// Send a stream of replies to a WebSocket client
/// Send a stream of replies to a WebSocket client.
pub fn send_updates_to_ws(
socket: warp::ws::WebSocket,
mut stream: ClientAgent,
@ -49,12 +49,9 @@ pub fn send_updates_to_ws(
);
// For as long as the client is still connected, yeild a new event every 100 ms
let ws_update_interval = env::var("WS_UPDATE_INTERVAL")
.map(|s| s.parse().expect("Valid config"))
.unwrap_or(config::DEFAULT_WS_UPDATE_INTERVAL);
let event_stream = tokio::timer::Interval::new(
time::Instant::now(),
time::Duration::from_millis(ws_update_interval),
time::Duration::from_millis(*config::WS_UPDATE_INTERVAL),
)
.take_while(move |_| match ws_rx.poll() {
Ok(Async::Ready(None)) => futures::future::ok(false),
@ -71,5 +68,5 @@ pub fn send_updates_to_ws(
Ok(())
})
.then(|msg| msg)
.map_err(|e| println!("{}", e))
.map_err(|e| log::error!("{}", e))
}

View File

@ -7,11 +7,11 @@ use futures::{Async, Poll};
use log::info;
use regex::Regex;
use serde_json::Value;
use std::{collections, env, io::Read, io::Write, net, time};
use std::{collections, io::Read, io::Write, net, time};
use tokio::io::{AsyncRead, Error};
use uuid::Uuid;
/// The item that streams from Redis and is polled by the `StreamManager`
/// The item that streams from Redis and is polled by the `ClientAgent`
#[derive(Debug)]
pub struct Receiver {
pubsub_connection: net::TcpStream,
@ -53,7 +53,7 @@ impl Receiver {
self.subscribe_or_unsubscribe_as_needed(timeline);
}
/// Set the `Receiver`'s manager_id and target_timeline fields to the approprate
/// Set the `Receiver`'s manager_id and target_timeline fields to the appropriate
/// value to be polled by the current `StreamManager`.
pub fn configure_for_polling(&mut self, manager_id: Uuid, timeline: &str) {
self.manager_id = manager_id;
@ -102,43 +102,14 @@ impl Receiver {
});
// If no clients, unsubscribe from the channel
if *count_of_subscribed_clients <= 0 {
info!("Sent unsubscribe command");
pubsub_cmd!("unsubscribe", self, change.timeline.clone());
}
if need_to_subscribe {
info!("Sent subscribe command");
pubsub_cmd!("subscribe", self, change.timeline.clone());
}
}
}
/// Polls Redis for any new messages and adds them to the `MsgQueue` for
/// the appropriate `ClientAgent`.
fn poll_redis(&mut self) {
let mut buffer = vec![0u8; 3000];
// Add any incoming messages to the back of the relevant `msg_queues`
// NOTE: This could be more/other than the `msg_queue` currently being polled
let mut async_stream = AsyncReadableStream::new(&mut self.pubsub_connection);
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() {
let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]);
// capture everything between `{` and `}` as potential JSON
let json_regex = Regex::new(r"(?P<json>\{.*\})").expect("Hard-coded");
// capture the timeline so we know which queues to add it to
let timeline_regex = Regex::new(r"timeline:(?P<timeline>.*?)\r").expect("Hard-codded");
if let Some(result) = json_regex.captures(raw_redis_response) {
let timeline =
timeline_regex.captures(raw_redis_response).unwrap()["timeline"].to_string();
let msg: Value = serde_json::from_str(&result["json"].to_string().clone()).unwrap();
for msg_queue in self.msg_queues.values_mut() {
if msg_queue.redis_channel == timeline {
msg_queue.messages.push_back(msg.clone());
}
}
}
}
}
fn log_number_of_msgs_in_queue(&self) {
let messages_waiting = self
.msg_queues
@ -153,6 +124,10 @@ impl Receiver {
_ => log::info!("{} messages waiting in the queue", messages_waiting),
}
}
fn get_target_msg_queue(&mut self) -> collections::hash_map::Entry<Uuid, MsgQueue> {
self.msg_queues.entry(self.manager_id)
}
}
impl Default for Receiver {
@ -175,24 +150,20 @@ impl futures::stream::Stream for Receiver {
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
let timeline = self.timeline.clone();
let redis_poll_interval = env::var("REDIS_POLL_INTERVAL")
.map(|s| s.parse().expect("Valid config"))
.unwrap_or(config::DEFAULT_REDIS_POLL_INTERVAL);
if self.redis_polled_at.elapsed() > time::Duration::from_millis(redis_poll_interval) {
self.poll_redis();
if self.redis_polled_at.elapsed()
> time::Duration::from_millis(*config::REDIS_POLL_INTERVAL)
{
AsyncReadableStream::poll_redis(self);
self.redis_polled_at = time::Instant::now();
}
// Record current time as last polled time
self.msg_queues
.entry(self.manager_id)
self.get_target_msg_queue()
.and_modify(|msg_queue| msg_queue.last_polled_at = time::Instant::now());
// If the `msg_queue` being polled has any new messages, return the first (oldest) one
match self
.msg_queues
.entry(self.manager_id)
.get_target_msg_queue()
.or_insert_with(|| MsgQueue::new(timeline.clone()))
.messages
.pop_front()
@ -214,13 +185,13 @@ impl Drop for Receiver {
#[derive(Debug, Clone)]
struct MsgQueue {
pub messages: collections::VecDeque<Value>,
pub last_polled_at: time::Instant,
pub redis_channel: String,
messages: collections::VecDeque<Value>,
last_polled_at: time::Instant,
redis_channel: String,
}
impl MsgQueue {
pub fn new(redis_channel: impl std::fmt::Display) -> Self {
fn new(redis_channel: impl std::fmt::Display) -> Self {
let redis_channel = redis_channel.to_string();
MsgQueue {
messages: collections::VecDeque::new(),
@ -232,9 +203,36 @@ impl MsgQueue {
struct AsyncReadableStream<'a>(&'a mut net::TcpStream);
impl<'a> AsyncReadableStream<'a> {
pub fn new(stream: &'a mut net::TcpStream) -> Self {
fn new(stream: &'a mut net::TcpStream) -> Self {
AsyncReadableStream(stream)
}
/// Polls Redis for any new messages and adds them to the `MsgQueue` for
/// the appropriate `ClientAgent`.
fn poll_redis(receiver: &mut Receiver) {
let mut buffer = vec![0u8; 3000];
// Add any incoming messages to the back of the relevant `msg_queues`
// NOTE: This could be more/other than the `msg_queue` currently being polled
let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection);
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() {
let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]);
// capture everything between `{` and `}` as potential JSON
let json_regex = Regex::new(r"(?P<json>\{.*\})").expect("Hard-coded");
// capture the timeline so we know which queues to add it to
let timeline_regex = Regex::new(r"timeline:(?P<timeline>.*?)\r").expect("Hard-codded");
if let Some(result) = json_regex.captures(raw_redis_response) {
let timeline =
timeline_regex.captures(raw_redis_response).unwrap()["timeline"].to_string();
let msg: Value = serde_json::from_str(&result["json"].to_string().clone()).unwrap();
for msg_queue in receiver.msg_queues.values_mut() {
if msg_queue.redis_channel == timeline {
msg_queue.messages.push_back(msg.clone());
}
}
}
}
}
}
impl<'a> Read for AsyncReadableStream<'a> {

View File

@ -1,4 +1,5 @@
//! Send raw TCP commands to the Redis server
use log::info;
use std::fmt::Display;
/// Send a subscribe or unsubscribe to the Redis PubSub channel
@ -25,6 +26,7 @@ macro_rules! pubsub_cmd {
pub fn pubsub(command: impl Display, timeline: impl Display) -> Vec<u8> {
let arg = format!("timeline:{}", timeline);
let command = command.to_string();
info!("Sent {} command", &command);
format!(
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
cmd_length = command.len(),