From 11661d2fdc330ce5739455f640aee536b7be0869 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Wed, 2 Oct 2019 00:03:18 -0400 Subject: [PATCH] Redis config (#56) * Add most Redis config variables * Add REDIS_NAMESPACE env var * Fix Clippy lints --- Cargo.lock | 29 ++++ Cargo.toml | 1 + src/config.rs | 155 ++++++++++++++------- src/err.rs | 53 +++++++ src/lib.rs | 1 + src/main.rs | 4 +- src/parse_client_request/query.rs | 3 +- src/redis_to_client_stream/receiver.rs | 20 +-- src/redis_to_client_stream/redis_cmd.rs | 40 ++++-- src/redis_to_client_stream/redis_stream.rs | 24 +++- 10 files changed, 248 insertions(+), 82 deletions(-) create mode 100644 src/err.rs diff --git a/Cargo.lock b/Cargo.lock index b94a5ef..7cb3f5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -401,6 +401,7 @@ dependencies = [ "serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", + "url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", "warp 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -608,6 +609,16 @@ dependencies = [ "unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "idna" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "indexmap" version = "1.0.2" @@ -879,6 +890,11 @@ name = "percent-encoding" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "petgraph" version = "0.4.13" @@ -1748,6 +1764,16 @@ dependencies = [ "percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "url" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "urlencoding" version = "1.0.0" @@ -1952,6 +1978,7 @@ dependencies = [ "checksum hyper 0.12.28 (registry+https://github.com/rust-lang/crates.io-index)" = "e8e4606fed1c162e3a63d408c07584429f49a4f34c7176cb6cbee60e78f2372c" "checksum ident_case 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" "checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" +"checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" "checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d" "checksum input_buffer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8e1b822cc844905551931d6f81608ed5f50a79c1078a4e2b4d42dbc7c1eedfbf" "checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" @@ -1986,6 +2013,7 @@ dependencies = [ "checksum parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab41b4aed082705d1056416ae4468b6ea99d52599ecf3169b00088d43113e337" "checksum parking_lot_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94c8c7923936b28d546dfd14d4472eaf34c99b14e1c973a32b3e6d4eb04298c9" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" +"checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" "checksum petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3659d1ee90221741f65dd128d9998311b0e40c5d3c23a62445938214abce4f" "checksum phf 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b3da44b85f8e8dfaec21adae67f95d93244b2ecf6ad2a692320598dcc8e6dd18" "checksum phf_codegen 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e" @@ -2081,6 +2109,7 @@ dependencies = [ "checksum unicode-width 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7007dbd421b92cc6e28410fe7362e2e0a2503394908f417b68ec8d1c364c4e20" "checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" "checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" +"checksum url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75b414f6c464c879d7f9babf951f23bc3743fb7313c081b2e6ca719067ea9d61" "checksum urlencoding 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3df3561629a8bb4c57e5a2e4c43348d9e29c7c29d9b1c4c1f47166deca8f37ed" "checksum utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" "checksum utf8-ranges 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "796f7e48bef87609f7ade7e06495a87d5cd06c7866e6a5cbfceffc558a243737" diff --git a/Cargo.toml b/Cargo.toml index 5950a7e..fc0dd21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ uuid = { version = "0.7", features = ["v4"] } dotenv = "0.14.0" lazy_static = "1.3.0" postgres-openssl = { git = "https://github.com/sfackler/rust-postgres.git"} +url = "2.1.0" [dev-dependencies] criterion = "0.3" diff --git a/src/config.rs b/src/config.rs index 77c5bed..afff20c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,8 +4,10 @@ use dotenv::dotenv; use lazy_static::lazy_static; use log::warn; -use serde_derive::Serialize; -use std::{env, net, time}; +use std::{env, io::Write, net, time}; +use url::Url; + +use crate::{err, redis_to_client_stream::redis_cmd}; const CORS_ALLOWED_METHODS: [&str; 2] = ["GET", "OPTIONS"]; const CORS_ALLOWED_HEADERS: [&str; 3] = ["Authorization", "Accept", "Cache-Control"]; @@ -16,7 +18,11 @@ const DEFAULT_DB_NAME: &str = "mastodon_development"; const DEFAULT_DB_PORT: &str = "5432"; const DEFAULT_DB_SSLMODE: &str = "prefer"; // Redis -const DEFAULT_REDIS_ADDR: &str = "127.0.0.1:6379"; +const DEFAULT_REDIS_HOST: &str = "127.0.0.1"; +const DEFAULT_REDIS_PORT: &str = "6379"; + +const _DEFAULT_REDIS_NAMESPACE: &str = ""; +// Deployment const DEFAULT_SERVER_ADDR: &str = "127.0.0.1:4000"; const DEFAULT_SSE_UPDATE_INTERVAL: u64 = 100; @@ -31,7 +37,7 @@ fn default(var: &str, default_var: &str) -> String { env::var(var) .unwrap_or_else(|_| { warn!( - "No {} env variable set for Postgres. Using default value: {}", + "No {} env variable set. Using default value: {}", var, default_var ); default_var.to_string() @@ -40,9 +46,9 @@ fn default(var: &str, default_var: &str) -> String { } lazy_static! { - static ref POSTGRES_ADDR: String = match &env::var("POSTGRES_ADDR") { + static ref POSTGRES_ADDR: String = match &env::var("DATABASE_URL") { Ok(url) => { - warn!("DATABASE_URL env variable set. Trying to connect to Postgres with that URL instead of any values set in DB_HOST, DB_USER, DB_NAME, DB_PASS, or DB_PORT."); + warn!("DATABASE_URL env variable set. Connecting to Postgres with that URL and ignoring any values set in DB_HOST, DB_USER, DB_NAME, DB_PASS, or DB_PORT."); url.to_string() } Err(_) => { @@ -74,8 +80,60 @@ lazy_static! { } } }; - static ref REDIS_ADDR: String = env::var("REDIS_ADDR") - .unwrap_or_else(|_| DEFAULT_REDIS_ADDR.to_owned()); + static ref REDIS_ADDR: RedisConfig = match &env::var("REDIS_URL") { + Ok(url) => { + warn!(r"REDIS_URL env variable set. + Connecting to Redis with that URL and ignoring any values set in REDIS_HOST or DB_PORT."); + let url = Url::parse(url).unwrap(); + fn none_if_empty(item: &str) -> Option { + if item.is_empty() { None } else { Some(item.to_string()) } + }; + + + let user = none_if_empty(url.username()); + let mut password = url.password().as_ref().map(|str| str.to_string()); + let host = err::unwrap_or_die(url.host_str(),"Missing/invalid host in REDIS_URL"); + let port = err::unwrap_or_die(url.port(), "Missing/invalid port in REDIS_URL"); + let mut db = none_if_empty(url.path()); + let query_pairs = url.query_pairs(); + + for (key, value) in query_pairs { + match key.to_string().as_str() { + "password" => { password = Some(value.to_string());}, + "db" => { db = Some(value.to_string())} + _ => { err::die_with_msg(format!("Unsupported parameter {} in REDIS_URL.\n Flodgatt supports only `password` and `db` parameters.", key))} + } + } + RedisConfig { + user, + password, + host, + port, + db + } + } + Err(_) => { + let host = env::var("REDIS_HOST") + .unwrap_or_else(|_| default("REDIS_HOST", DEFAULT_REDIS_HOST)); + let port = env::var("REDIS_PORT") + .unwrap_or_else(|_| default("REDIS_PORT", DEFAULT_REDIS_PORT)); + RedisConfig { + user: None, + password: None, + host, + port, + db: None, + } + } + }; + pub static ref REDIS_NAMESPACE: Option = match env::var("REDIS_NAMESPACE") { + Ok(ns) => { + log::warn!("Using `{}:` as a Redis namespace.", ns); + Some(ns) + }, + _ => None + }; + pub static ref SERVER_ADDR: net::SocketAddr = env::var("SERVER_ADDR") .unwrap_or_else(|_| DEFAULT_SERVER_ADDR.to_owned()) @@ -122,59 +180,54 @@ pub fn postgres() -> postgres::Client { postgres::Client::connect(&POSTGRES_ADDR.to_string(), postgres::NoTls) .expect("Can connect to local Postgres") } - +#[derive(Default)] +struct RedisConfig { + user: Option, + password: Option, + port: String, + host: String, + db: Option, +} /// Configure Redis pub fn redis_addr() -> (net::TcpStream, net::TcpStream) { - let pubsub_connection = - net::TcpStream::connect(&REDIS_ADDR.to_string()).expect("Can connect to Redis"); + let redis = &REDIS_ADDR; + let addr = format!("{}:{}", redis.host, redis.port); + if let Some(user) = &redis.user { + log::error!( + "Username {} provided, but Redis does not need a username. Ignoring it", + user + ); + }; + let mut pubsub_connection = + net::TcpStream::connect(addr.clone()).expect("Can connect to Redis"); pubsub_connection .set_read_timeout(Some(time::Duration::from_millis(10))) .expect("Can set read timeout for Redis connection"); pubsub_connection .set_nonblocking(true) .expect("set_nonblocking call failed"); - let secondary_redis_connection = - net::TcpStream::connect(&REDIS_ADDR.to_string()).expect("Can connect to Redis"); + let mut secondary_redis_connection = + net::TcpStream::connect(addr).expect("Can connect to Redis"); secondary_redis_connection .set_read_timeout(Some(time::Duration::from_millis(10))) .expect("Can set read timeout for Redis connection"); + if let Some(password) = &REDIS_ADDR.password { + pubsub_connection + .write_all(&redis_cmd::cmd("auth", &password)) + .unwrap(); + secondary_redis_connection + .write_all(&redis_cmd::cmd("auth", password)) + .unwrap(); + } else { + warn!("No REDIS_PASSWORD set. Attempting to connect to Redis without a password. (This is correct if you are following the default setup.)"); + } + if let Some(db) = &REDIS_ADDR.db { + pubsub_connection + .write_all(&redis_cmd::cmd("SELECT", &db)) + .unwrap(); + secondary_redis_connection + .write_all(&redis_cmd::cmd("SELECT", &db)) + .unwrap(); + } (pubsub_connection, secondary_redis_connection) } - -#[derive(Serialize)] -pub struct ErrorMessage { - error: String, -} -impl ErrorMessage { - fn new(msg: impl std::fmt::Display) -> Self { - Self { - error: msg.to_string(), - } - } -} - -/// Recover from Errors by sending appropriate Warp::Rejections -pub fn handle_errors( - rejection: warp::reject::Rejection, -) -> Result { - let err_txt = match rejection.cause() { - Some(text) if text.to_string() == "Missing request header 'authorization'" => { - "Error: Missing access token".to_string() - } - Some(text) => text.to_string(), - None => "Error: Nonexistant endpoint".to_string(), - }; - let json = warp::reply::json(&ErrorMessage::new(err_txt)); - Ok(warp::reply::with_status( - json, - warp::http::StatusCode::UNAUTHORIZED, - )) -} - -pub struct CustomError {} - -impl CustomError { - pub fn unauthorized_list() -> warp::reject::Rejection { - warp::reject::custom("Error: Access to list not authorized") - } -} diff --git a/src/err.rs b/src/err.rs new file mode 100644 index 0000000..a9eff65 --- /dev/null +++ b/src/err.rs @@ -0,0 +1,53 @@ +use serde_derive::Serialize; +use std::fmt::Display; + +pub fn die_with_msg(msg: impl Display) -> ! { + eprintln!("FATAL ERROR: {}", msg); + std::process::exit(1); +} +pub fn unwrap_or_die(s: Option, msg: &str) -> String { + s.unwrap_or_else(|| { + eprintln!("FATAL ERROR: {}", msg); + std::process::exit(1) + }) + .to_string() +} + +#[derive(Serialize)] +pub struct ErrorMessage { + error: String, +} +impl ErrorMessage { + fn new(msg: impl std::fmt::Display) -> Self { + Self { + error: msg.to_string(), + } + } +} + +/// Recover from Errors by sending appropriate Warp::Rejections +pub fn handle_errors( + rejection: warp::reject::Rejection, +) -> Result { + let err_txt = match rejection.cause() { + Some(text) if text.to_string() == "Missing request header 'authorization'" => { + "Error: Missing access token".to_string() + } + Some(text) => text.to_string(), + None => "Error: Nonexistant endpoint".to_string(), + }; + let json = warp::reply::json(&ErrorMessage::new(err_txt)); + + Ok(warp::reply::with_status( + json, + warp::http::StatusCode::UNAUTHORIZED, + )) +} + +pub struct CustomError {} + +impl CustomError { + pub fn unauthorized_list() -> warp::reject::Rejection { + warp::reject::custom("Error: Access to list not authorized") + } +} diff --git a/src/lib.rs b/src/lib.rs index de167ca..a1c374a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,5 +35,6 @@ //! polls the `Receiver` and the frequency with which the `Receiver` polls Redis. //! pub mod config; +pub mod err; pub mod parse_client_request; pub mod redis_to_client_stream; diff --git a/src/main.rs b/src/main.rs index 8bd7605..0c6d242 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use flodgatt::{ - config, + config, err, parse_client_request::{sse, user, ws}, redis_to_client_stream, redis_to_client_stream::ClientAgent, @@ -28,7 +28,7 @@ fn main() { }, ) .with(warp::reply::with::header("Connection", "keep-alive")) - .recover(config::handle_errors); + .recover(err::handle_errors); // WebSocket let websocket_routes = ws::extract_user_or_reject() diff --git a/src/parse_client_request/query.rs b/src/parse_client_request/query.rs index d283216..3c92c40 100644 --- a/src/parse_client_request/query.rs +++ b/src/parse_client_request/query.rs @@ -83,8 +83,7 @@ impl OptionalAccessToken { from_header.or(no_token).unify().boxed() } pub fn from_ws_header() -> warp::filters::BoxedFilter<(Option,)> { - let from_header = - warp::header::header::("Sec-Websocket-Protocol").map(|auth: String| Some(auth)); + let from_header = warp::header::header::("Sec-Websocket-Protocol").map(Some); let no_token = warp::any().map(|| None); from_header.or(no_token).unify().boxed() diff --git a/src/redis_to_client_stream/receiver.rs b/src/redis_to_client_stream/receiver.rs index 46e2b65..3513948 100644 --- a/src/redis_to_client_stream/receiver.rs +++ b/src/redis_to_client_stream/receiver.rs @@ -4,9 +4,8 @@ use super::{redis_cmd, redis_stream}; use crate::{config, pubsub_cmd}; use futures::{Async, Poll}; -use log::info; use serde_json::Value; -use std::{collections, io::Write, net, time}; +use std::{collections, net, time}; use tokio::io::Error; use uuid::Uuid; @@ -71,12 +70,12 @@ impl Receiver { let mut timelines_to_modify = Vec::new(); struct Change { timeline: String, - change_in_subscriber_number: i32, + in_subscriber_number: i32, } timelines_to_modify.push(Change { timeline: timeline.to_owned(), - change_in_subscriber_number: 1, + in_subscriber_number: 1, }); // Keep only message queues that have been polled recently @@ -87,7 +86,7 @@ impl Receiver { let timeline = &msg_queue.redis_channel; timelines_to_modify.push(Change { timeline: timeline.to_owned(), - change_in_subscriber_number: -1, + in_subscriber_number: -1, }); false } @@ -95,20 +94,15 @@ impl Receiver { // Record the lower number of clients subscribed to that channel for change in timelines_to_modify { - let mut need_to_subscribe = false; let count_of_subscribed_clients = self .clients_per_timeline .entry(change.timeline.clone()) - .and_modify(|n| *n += change.change_in_subscriber_number) - .or_insert_with(|| { - need_to_subscribe = true; - 1 - }); + .and_modify(|n| *n += change.in_subscriber_number) + .or_insert_with(|| 1); // If no clients, unsubscribe from the channel if *count_of_subscribed_clients <= 0 { pubsub_cmd!("unsubscribe", self, change.timeline.clone()); - } - if need_to_subscribe { + } else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 { pubsub_cmd!("subscribe", self, change.timeline.clone()); } } diff --git a/src/redis_to_client_stream/redis_cmd.rs b/src/redis_to_client_stream/redis_cmd.rs index 7591ac8..8b9eb0c 100644 --- a/src/redis_to_client_stream/redis_cmd.rs +++ b/src/redis_to_client_stream/redis_cmd.rs @@ -1,32 +1,48 @@ //! Send raw TCP commands to the Redis server -use log::info; +use crate::config; use std::fmt::Display; /// Send a subscribe or unsubscribe to the Redis PubSub channel #[macro_export] macro_rules! pubsub_cmd { ($cmd:expr, $self:expr, $tl:expr) => {{ - info!("Sending {} command to {}", $cmd, $tl); + use std::io::Write; + log::info!("Sending {} command to {}", $cmd, $tl); $self .pubsub_connection .write_all(&redis_cmd::pubsub($cmd, $tl)) .expect("Can send command to Redis"); - let new_value = if $cmd == "subscribe" { "1" } else { "0" }; + // Because we keep track of the number of clients subscribed to a channel on our end, + // we need to manually tell Redis when we have subscribed or unsubscribed + let subscription_new_number = match $cmd { + "unsubscribe" => "0", + "subscribe" => "1", + _ => panic!("Given unacceptable PUBSUB command"), + }; $self .secondary_redis_connection .write_all(&redis_cmd::set( format!("subscribed:timeline:{}", $tl), - new_value, + subscription_new_number, )) .expect("Can set Redis"); - info!("Now subscribed to: {:#?}", $self.msg_queues); + + log::info!("Now subscribed to: {:#?}", $self.msg_queues); }}; } /// Send a `SUBSCRIBE` or `UNSUBSCRIBE` command to a specific timeline pub fn pubsub(command: impl Display, timeline: impl Display) -> Vec { - let arg = format!("timeline:{}", timeline); - let command = command.to_string(); - info!("Sent {} command", &command); + let arg = match &*config::REDIS_NAMESPACE { + Some(namespace) => format!("{}:timeline:{}", namespace, timeline), + None => format!("timeline:{}", timeline), + }; + cmd(command, arg) +} + +/// Send a generic two-item command to Redis +pub fn cmd(command: impl Display, arg: impl Display) -> Vec { + let (command, arg) = (command.to_string(), arg.to_string()); + log::info!("Sent {} command", &command); format!( "*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n", cmd_length = command.len(), @@ -38,9 +54,13 @@ pub fn pubsub(command: impl Display, timeline: impl Display) -> Vec { .to_owned() } -/// Send a `SET` command +/// Send a `SET` command (used to manually unsubscribe from Redis) pub fn set(key: impl Display, value: impl Display) -> Vec { - let (key, value) = (key.to_string(), value.to_string()); + let key = match &*config::REDIS_NAMESPACE { + Some(namespace) => format!("{}:{}", namespace, key), + None => key.to_string(), + }; + let value = value.to_string(); format!( "*3\r\n$3\r\nSET\r\n${key_length}\r\n{key}\r\n${value_length}\r\n{value}\r\n", key_length = key.len(), diff --git a/src/redis_to_client_stream/redis_stream.rs b/src/redis_to_client_stream/redis_stream.rs index 9c46618..3e4cf33 100644 --- a/src/redis_to_client_stream/redis_stream.rs +++ b/src/redis_to_client_stream/redis_stream.rs @@ -1,4 +1,5 @@ use super::receiver::Receiver; +use crate::config; use futures::{Async, Poll}; use serde_json::Value; use std::io::Read; @@ -22,7 +23,16 @@ impl<'a> AsyncReadableStream<'a> { let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection); if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() { - let raw_redis_response = async_stream.to_utf8(buffer, num_bytes_read); + let raw_redis_response = async_stream.as_utf8(buffer, num_bytes_read); + if raw_redis_response.starts_with("-NOAUTH") { + eprintln!( + r"Invalid authentication for Redis. +Do you need a password? +If so, set it with the REDIS_PASSWORD environmental variable" + ); + std::process::exit(1); + } + receiver.incoming_raw_msg.push_str(&raw_redis_response); // Only act if we have a full message (end on a msg boundary) @@ -31,16 +41,22 @@ impl<'a> AsyncReadableStream<'a> { }; let mut msg = RedisMsg::from_raw(&receiver.incoming_raw_msg); + let prefix_to_skip = match &*config::REDIS_NAMESPACE { + Some(namespace) => format!("{}:timeline:", namespace), + None => "timeline:".to_string(), + }; + while !msg.raw.is_empty() { let command = msg.next_field(); match command.as_str() { "message" => { - let timeline = &msg.next_field()["timeline:".len()..]; + let timeline = &msg.next_field()[prefix_to_skip.len()..]; let msg_txt = &msg.next_field(); let msg_value: Value = match serde_json::from_str(msg_txt) { Ok(v) => v, Err(e) => panic!("Unparseable json {}\n\n{}", msg_txt, e), }; + dbg!(&timeline); for msg_queue in receiver.msg_queues.values_mut() { if msg_queue.redis_channel == timeline { msg_queue.messages.push_back(msg_value.clone()); @@ -62,12 +78,12 @@ impl<'a> AsyncReadableStream<'a> { } } - fn to_utf8(&mut self, cur_buffer: Vec, size: usize) -> String { + fn as_utf8(&mut self, cur_buffer: Vec, size: usize) -> String { String::from_utf8(cur_buffer[..size].to_vec()).unwrap_or_else(|_| { let mut new_buffer = vec![0u8; 1]; self.poll_read(&mut new_buffer).unwrap(); let buffer = ([cur_buffer, new_buffer]).concat(); - self.to_utf8(buffer, size + 1) + self.as_utf8(buffer, size + 1) }) } }