mirror of https://github.com/mastodon/flodgatt
Redis config (#56)
* Add most Redis config variables * Add REDIS_NAMESPACE env var * Fix Clippy lints
This commit is contained in:
parent
5b663d110e
commit
11661d2fdc
|
@ -401,6 +401,7 @@ dependencies = [
|
||||||
"serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)",
|
"serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)",
|
"serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)",
|
"tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
"uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"warp 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
|
"warp 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
@ -608,6 +609,16 @@ dependencies = [
|
||||||
"unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
"unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "idna"
|
||||||
|
version = "0.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
dependencies = [
|
||||||
|
"matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "indexmap"
|
name = "indexmap"
|
||||||
version = "1.0.2"
|
version = "1.0.2"
|
||||||
|
@ -879,6 +890,11 @@ name = "percent-encoding"
|
||||||
version = "1.0.1"
|
version = "1.0.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "percent-encoding"
|
||||||
|
version = "2.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "petgraph"
|
name = "petgraph"
|
||||||
version = "0.4.13"
|
version = "0.4.13"
|
||||||
|
@ -1748,6 +1764,16 @@ dependencies = [
|
||||||
"percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
"percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "url"
|
||||||
|
version = "2.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
dependencies = [
|
||||||
|
"idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "urlencoding"
|
name = "urlencoding"
|
||||||
version = "1.0.0"
|
version = "1.0.0"
|
||||||
|
@ -1952,6 +1978,7 @@ dependencies = [
|
||||||
"checksum hyper 0.12.28 (registry+https://github.com/rust-lang/crates.io-index)" = "e8e4606fed1c162e3a63d408c07584429f49a4f34c7176cb6cbee60e78f2372c"
|
"checksum hyper 0.12.28 (registry+https://github.com/rust-lang/crates.io-index)" = "e8e4606fed1c162e3a63d408c07584429f49a4f34c7176cb6cbee60e78f2372c"
|
||||||
"checksum ident_case 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
|
"checksum ident_case 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
|
||||||
"checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e"
|
"checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e"
|
||||||
|
"checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9"
|
||||||
"checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d"
|
"checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d"
|
||||||
"checksum input_buffer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8e1b822cc844905551931d6f81608ed5f50a79c1078a4e2b4d42dbc7c1eedfbf"
|
"checksum input_buffer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8e1b822cc844905551931d6f81608ed5f50a79c1078a4e2b4d42dbc7c1eedfbf"
|
||||||
"checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08"
|
"checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08"
|
||||||
|
@ -1986,6 +2013,7 @@ dependencies = [
|
||||||
"checksum parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab41b4aed082705d1056416ae4468b6ea99d52599ecf3169b00088d43113e337"
|
"checksum parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab41b4aed082705d1056416ae4468b6ea99d52599ecf3169b00088d43113e337"
|
||||||
"checksum parking_lot_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94c8c7923936b28d546dfd14d4472eaf34c99b14e1c973a32b3e6d4eb04298c9"
|
"checksum parking_lot_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94c8c7923936b28d546dfd14d4472eaf34c99b14e1c973a32b3e6d4eb04298c9"
|
||||||
"checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831"
|
"checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831"
|
||||||
|
"checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
|
||||||
"checksum petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3659d1ee90221741f65dd128d9998311b0e40c5d3c23a62445938214abce4f"
|
"checksum petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3659d1ee90221741f65dd128d9998311b0e40c5d3c23a62445938214abce4f"
|
||||||
"checksum phf 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b3da44b85f8e8dfaec21adae67f95d93244b2ecf6ad2a692320598dcc8e6dd18"
|
"checksum phf 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b3da44b85f8e8dfaec21adae67f95d93244b2ecf6ad2a692320598dcc8e6dd18"
|
||||||
"checksum phf_codegen 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e"
|
"checksum phf_codegen 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e"
|
||||||
|
@ -2081,6 +2109,7 @@ dependencies = [
|
||||||
"checksum unicode-width 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7007dbd421b92cc6e28410fe7362e2e0a2503394908f417b68ec8d1c364c4e20"
|
"checksum unicode-width 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7007dbd421b92cc6e28410fe7362e2e0a2503394908f417b68ec8d1c364c4e20"
|
||||||
"checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"
|
"checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"
|
||||||
"checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a"
|
"checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a"
|
||||||
|
"checksum url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75b414f6c464c879d7f9babf951f23bc3743fb7313c081b2e6ca719067ea9d61"
|
||||||
"checksum urlencoding 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3df3561629a8bb4c57e5a2e4c43348d9e29c7c29d9b1c4c1f47166deca8f37ed"
|
"checksum urlencoding 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3df3561629a8bb4c57e5a2e4c43348d9e29c7c29d9b1c4c1f47166deca8f37ed"
|
||||||
"checksum utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7"
|
"checksum utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7"
|
||||||
"checksum utf8-ranges 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "796f7e48bef87609f7ade7e06495a87d5cd06c7866e6a5cbfceffc558a243737"
|
"checksum utf8-ranges 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "796f7e48bef87609f7ade7e06495a87d5cd06c7866e6a5cbfceffc558a243737"
|
||||||
|
|
|
@ -20,6 +20,7 @@ uuid = { version = "0.7", features = ["v4"] }
|
||||||
dotenv = "0.14.0"
|
dotenv = "0.14.0"
|
||||||
lazy_static = "1.3.0"
|
lazy_static = "1.3.0"
|
||||||
postgres-openssl = { git = "https://github.com/sfackler/rust-postgres.git"}
|
postgres-openssl = { git = "https://github.com/sfackler/rust-postgres.git"}
|
||||||
|
url = "2.1.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
criterion = "0.3"
|
criterion = "0.3"
|
||||||
|
|
155
src/config.rs
155
src/config.rs
|
@ -4,8 +4,10 @@
|
||||||
use dotenv::dotenv;
|
use dotenv::dotenv;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use log::warn;
|
use log::warn;
|
||||||
use serde_derive::Serialize;
|
use std::{env, io::Write, net, time};
|
||||||
use std::{env, net, time};
|
use url::Url;
|
||||||
|
|
||||||
|
use crate::{err, redis_to_client_stream::redis_cmd};
|
||||||
|
|
||||||
const CORS_ALLOWED_METHODS: [&str; 2] = ["GET", "OPTIONS"];
|
const CORS_ALLOWED_METHODS: [&str; 2] = ["GET", "OPTIONS"];
|
||||||
const CORS_ALLOWED_HEADERS: [&str; 3] = ["Authorization", "Accept", "Cache-Control"];
|
const CORS_ALLOWED_HEADERS: [&str; 3] = ["Authorization", "Accept", "Cache-Control"];
|
||||||
|
@ -16,7 +18,11 @@ const DEFAULT_DB_NAME: &str = "mastodon_development";
|
||||||
const DEFAULT_DB_PORT: &str = "5432";
|
const DEFAULT_DB_PORT: &str = "5432";
|
||||||
const DEFAULT_DB_SSLMODE: &str = "prefer";
|
const DEFAULT_DB_SSLMODE: &str = "prefer";
|
||||||
// Redis
|
// Redis
|
||||||
const DEFAULT_REDIS_ADDR: &str = "127.0.0.1:6379";
|
const DEFAULT_REDIS_HOST: &str = "127.0.0.1";
|
||||||
|
const DEFAULT_REDIS_PORT: &str = "6379";
|
||||||
|
|
||||||
|
const _DEFAULT_REDIS_NAMESPACE: &str = "";
|
||||||
|
// Deployment
|
||||||
const DEFAULT_SERVER_ADDR: &str = "127.0.0.1:4000";
|
const DEFAULT_SERVER_ADDR: &str = "127.0.0.1:4000";
|
||||||
|
|
||||||
const DEFAULT_SSE_UPDATE_INTERVAL: u64 = 100;
|
const DEFAULT_SSE_UPDATE_INTERVAL: u64 = 100;
|
||||||
|
@ -31,7 +37,7 @@ fn default(var: &str, default_var: &str) -> String {
|
||||||
env::var(var)
|
env::var(var)
|
||||||
.unwrap_or_else(|_| {
|
.unwrap_or_else(|_| {
|
||||||
warn!(
|
warn!(
|
||||||
"No {} env variable set for Postgres. Using default value: {}",
|
"No {} env variable set. Using default value: {}",
|
||||||
var, default_var
|
var, default_var
|
||||||
);
|
);
|
||||||
default_var.to_string()
|
default_var.to_string()
|
||||||
|
@ -40,9 +46,9 @@ fn default(var: &str, default_var: &str) -> String {
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref POSTGRES_ADDR: String = match &env::var("POSTGRES_ADDR") {
|
static ref POSTGRES_ADDR: String = match &env::var("DATABASE_URL") {
|
||||||
Ok(url) => {
|
Ok(url) => {
|
||||||
warn!("DATABASE_URL env variable set. Trying to connect to Postgres with that URL instead of any values set in DB_HOST, DB_USER, DB_NAME, DB_PASS, or DB_PORT.");
|
warn!("DATABASE_URL env variable set. Connecting to Postgres with that URL and ignoring any values set in DB_HOST, DB_USER, DB_NAME, DB_PASS, or DB_PORT.");
|
||||||
url.to_string()
|
url.to_string()
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
|
@ -74,8 +80,60 @@ lazy_static! {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
static ref REDIS_ADDR: String = env::var("REDIS_ADDR")
|
static ref REDIS_ADDR: RedisConfig = match &env::var("REDIS_URL") {
|
||||||
.unwrap_or_else(|_| DEFAULT_REDIS_ADDR.to_owned());
|
Ok(url) => {
|
||||||
|
warn!(r"REDIS_URL env variable set.
|
||||||
|
Connecting to Redis with that URL and ignoring any values set in REDIS_HOST or DB_PORT.");
|
||||||
|
let url = Url::parse(url).unwrap();
|
||||||
|
fn none_if_empty(item: &str) -> Option<String> {
|
||||||
|
if item.is_empty() { None } else { Some(item.to_string()) }
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
let user = none_if_empty(url.username());
|
||||||
|
let mut password = url.password().as_ref().map(|str| str.to_string());
|
||||||
|
let host = err::unwrap_or_die(url.host_str(),"Missing/invalid host in REDIS_URL");
|
||||||
|
let port = err::unwrap_or_die(url.port(), "Missing/invalid port in REDIS_URL");
|
||||||
|
let mut db = none_if_empty(url.path());
|
||||||
|
let query_pairs = url.query_pairs();
|
||||||
|
|
||||||
|
for (key, value) in query_pairs {
|
||||||
|
match key.to_string().as_str() {
|
||||||
|
"password" => { password = Some(value.to_string());},
|
||||||
|
"db" => { db = Some(value.to_string())}
|
||||||
|
_ => { err::die_with_msg(format!("Unsupported parameter {} in REDIS_URL.\n Flodgatt supports only `password` and `db` parameters.", key))}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RedisConfig {
|
||||||
|
user,
|
||||||
|
password,
|
||||||
|
host,
|
||||||
|
port,
|
||||||
|
db
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
let host = env::var("REDIS_HOST")
|
||||||
|
.unwrap_or_else(|_| default("REDIS_HOST", DEFAULT_REDIS_HOST));
|
||||||
|
let port = env::var("REDIS_PORT")
|
||||||
|
.unwrap_or_else(|_| default("REDIS_PORT", DEFAULT_REDIS_PORT));
|
||||||
|
RedisConfig {
|
||||||
|
user: None,
|
||||||
|
password: None,
|
||||||
|
host,
|
||||||
|
port,
|
||||||
|
db: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
pub static ref REDIS_NAMESPACE: Option<String> = match env::var("REDIS_NAMESPACE") {
|
||||||
|
Ok(ns) => {
|
||||||
|
log::warn!("Using `{}:` as a Redis namespace.", ns);
|
||||||
|
Some(ns)
|
||||||
|
},
|
||||||
|
_ => None
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
pub static ref SERVER_ADDR: net::SocketAddr = env::var("SERVER_ADDR")
|
pub static ref SERVER_ADDR: net::SocketAddr = env::var("SERVER_ADDR")
|
||||||
.unwrap_or_else(|_| DEFAULT_SERVER_ADDR.to_owned())
|
.unwrap_or_else(|_| DEFAULT_SERVER_ADDR.to_owned())
|
||||||
|
@ -122,59 +180,54 @@ pub fn postgres() -> postgres::Client {
|
||||||
postgres::Client::connect(&POSTGRES_ADDR.to_string(), postgres::NoTls)
|
postgres::Client::connect(&POSTGRES_ADDR.to_string(), postgres::NoTls)
|
||||||
.expect("Can connect to local Postgres")
|
.expect("Can connect to local Postgres")
|
||||||
}
|
}
|
||||||
|
#[derive(Default)]
|
||||||
|
struct RedisConfig {
|
||||||
|
user: Option<String>,
|
||||||
|
password: Option<String>,
|
||||||
|
port: String,
|
||||||
|
host: String,
|
||||||
|
db: Option<String>,
|
||||||
|
}
|
||||||
/// Configure Redis
|
/// Configure Redis
|
||||||
pub fn redis_addr() -> (net::TcpStream, net::TcpStream) {
|
pub fn redis_addr() -> (net::TcpStream, net::TcpStream) {
|
||||||
let pubsub_connection =
|
let redis = &REDIS_ADDR;
|
||||||
net::TcpStream::connect(&REDIS_ADDR.to_string()).expect("Can connect to Redis");
|
let addr = format!("{}:{}", redis.host, redis.port);
|
||||||
|
if let Some(user) = &redis.user {
|
||||||
|
log::error!(
|
||||||
|
"Username {} provided, but Redis does not need a username. Ignoring it",
|
||||||
|
user
|
||||||
|
);
|
||||||
|
};
|
||||||
|
let mut pubsub_connection =
|
||||||
|
net::TcpStream::connect(addr.clone()).expect("Can connect to Redis");
|
||||||
pubsub_connection
|
pubsub_connection
|
||||||
.set_read_timeout(Some(time::Duration::from_millis(10)))
|
.set_read_timeout(Some(time::Duration::from_millis(10)))
|
||||||
.expect("Can set read timeout for Redis connection");
|
.expect("Can set read timeout for Redis connection");
|
||||||
pubsub_connection
|
pubsub_connection
|
||||||
.set_nonblocking(true)
|
.set_nonblocking(true)
|
||||||
.expect("set_nonblocking call failed");
|
.expect("set_nonblocking call failed");
|
||||||
let secondary_redis_connection =
|
let mut secondary_redis_connection =
|
||||||
net::TcpStream::connect(&REDIS_ADDR.to_string()).expect("Can connect to Redis");
|
net::TcpStream::connect(addr).expect("Can connect to Redis");
|
||||||
secondary_redis_connection
|
secondary_redis_connection
|
||||||
.set_read_timeout(Some(time::Duration::from_millis(10)))
|
.set_read_timeout(Some(time::Duration::from_millis(10)))
|
||||||
.expect("Can set read timeout for Redis connection");
|
.expect("Can set read timeout for Redis connection");
|
||||||
|
if let Some(password) = &REDIS_ADDR.password {
|
||||||
|
pubsub_connection
|
||||||
|
.write_all(&redis_cmd::cmd("auth", &password))
|
||||||
|
.unwrap();
|
||||||
|
secondary_redis_connection
|
||||||
|
.write_all(&redis_cmd::cmd("auth", password))
|
||||||
|
.unwrap();
|
||||||
|
} else {
|
||||||
|
warn!("No REDIS_PASSWORD set. Attempting to connect to Redis without a password. (This is correct if you are following the default setup.)");
|
||||||
|
}
|
||||||
|
if let Some(db) = &REDIS_ADDR.db {
|
||||||
|
pubsub_connection
|
||||||
|
.write_all(&redis_cmd::cmd("SELECT", &db))
|
||||||
|
.unwrap();
|
||||||
|
secondary_redis_connection
|
||||||
|
.write_all(&redis_cmd::cmd("SELECT", &db))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
(pubsub_connection, secondary_redis_connection)
|
(pubsub_connection, secondary_redis_connection)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
|
||||||
pub struct ErrorMessage {
|
|
||||||
error: String,
|
|
||||||
}
|
|
||||||
impl ErrorMessage {
|
|
||||||
fn new(msg: impl std::fmt::Display) -> Self {
|
|
||||||
Self {
|
|
||||||
error: msg.to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Recover from Errors by sending appropriate Warp::Rejections
|
|
||||||
pub fn handle_errors(
|
|
||||||
rejection: warp::reject::Rejection,
|
|
||||||
) -> Result<impl warp::Reply, warp::reject::Rejection> {
|
|
||||||
let err_txt = match rejection.cause() {
|
|
||||||
Some(text) if text.to_string() == "Missing request header 'authorization'" => {
|
|
||||||
"Error: Missing access token".to_string()
|
|
||||||
}
|
|
||||||
Some(text) => text.to_string(),
|
|
||||||
None => "Error: Nonexistant endpoint".to_string(),
|
|
||||||
};
|
|
||||||
let json = warp::reply::json(&ErrorMessage::new(err_txt));
|
|
||||||
Ok(warp::reply::with_status(
|
|
||||||
json,
|
|
||||||
warp::http::StatusCode::UNAUTHORIZED,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct CustomError {}
|
|
||||||
|
|
||||||
impl CustomError {
|
|
||||||
pub fn unauthorized_list() -> warp::reject::Rejection {
|
|
||||||
warp::reject::custom("Error: Access to list not authorized")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
use serde_derive::Serialize;
|
||||||
|
use std::fmt::Display;
|
||||||
|
|
||||||
|
pub fn die_with_msg(msg: impl Display) -> ! {
|
||||||
|
eprintln!("FATAL ERROR: {}", msg);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
pub fn unwrap_or_die(s: Option<impl Display>, msg: &str) -> String {
|
||||||
|
s.unwrap_or_else(|| {
|
||||||
|
eprintln!("FATAL ERROR: {}", msg);
|
||||||
|
std::process::exit(1)
|
||||||
|
})
|
||||||
|
.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
pub struct ErrorMessage {
|
||||||
|
error: String,
|
||||||
|
}
|
||||||
|
impl ErrorMessage {
|
||||||
|
fn new(msg: impl std::fmt::Display) -> Self {
|
||||||
|
Self {
|
||||||
|
error: msg.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Recover from Errors by sending appropriate Warp::Rejections
|
||||||
|
pub fn handle_errors(
|
||||||
|
rejection: warp::reject::Rejection,
|
||||||
|
) -> Result<impl warp::Reply, warp::reject::Rejection> {
|
||||||
|
let err_txt = match rejection.cause() {
|
||||||
|
Some(text) if text.to_string() == "Missing request header 'authorization'" => {
|
||||||
|
"Error: Missing access token".to_string()
|
||||||
|
}
|
||||||
|
Some(text) => text.to_string(),
|
||||||
|
None => "Error: Nonexistant endpoint".to_string(),
|
||||||
|
};
|
||||||
|
let json = warp::reply::json(&ErrorMessage::new(err_txt));
|
||||||
|
|
||||||
|
Ok(warp::reply::with_status(
|
||||||
|
json,
|
||||||
|
warp::http::StatusCode::UNAUTHORIZED,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct CustomError {}
|
||||||
|
|
||||||
|
impl CustomError {
|
||||||
|
pub fn unauthorized_list() -> warp::reject::Rejection {
|
||||||
|
warp::reject::custom("Error: Access to list not authorized")
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,5 +35,6 @@
|
||||||
//! polls the `Receiver` and the frequency with which the `Receiver` polls Redis.
|
//! polls the `Receiver` and the frequency with which the `Receiver` polls Redis.
|
||||||
//!
|
//!
|
||||||
pub mod config;
|
pub mod config;
|
||||||
|
pub mod err;
|
||||||
pub mod parse_client_request;
|
pub mod parse_client_request;
|
||||||
pub mod redis_to_client_stream;
|
pub mod redis_to_client_stream;
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use flodgatt::{
|
use flodgatt::{
|
||||||
config,
|
config, err,
|
||||||
parse_client_request::{sse, user, ws},
|
parse_client_request::{sse, user, ws},
|
||||||
redis_to_client_stream,
|
redis_to_client_stream,
|
||||||
redis_to_client_stream::ClientAgent,
|
redis_to_client_stream::ClientAgent,
|
||||||
|
@ -28,7 +28,7 @@ fn main() {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.with(warp::reply::with::header("Connection", "keep-alive"))
|
.with(warp::reply::with::header("Connection", "keep-alive"))
|
||||||
.recover(config::handle_errors);
|
.recover(err::handle_errors);
|
||||||
|
|
||||||
// WebSocket
|
// WebSocket
|
||||||
let websocket_routes = ws::extract_user_or_reject()
|
let websocket_routes = ws::extract_user_or_reject()
|
||||||
|
|
|
@ -83,8 +83,7 @@ impl OptionalAccessToken {
|
||||||
from_header.or(no_token).unify().boxed()
|
from_header.or(no_token).unify().boxed()
|
||||||
}
|
}
|
||||||
pub fn from_ws_header() -> warp::filters::BoxedFilter<(Option<String>,)> {
|
pub fn from_ws_header() -> warp::filters::BoxedFilter<(Option<String>,)> {
|
||||||
let from_header =
|
let from_header = warp::header::header::<String>("Sec-Websocket-Protocol").map(Some);
|
||||||
warp::header::header::<String>("Sec-Websocket-Protocol").map(|auth: String| Some(auth));
|
|
||||||
let no_token = warp::any().map(|| None);
|
let no_token = warp::any().map(|| None);
|
||||||
|
|
||||||
from_header.or(no_token).unify().boxed()
|
from_header.or(no_token).unify().boxed()
|
||||||
|
|
|
@ -4,9 +4,8 @@
|
||||||
use super::{redis_cmd, redis_stream};
|
use super::{redis_cmd, redis_stream};
|
||||||
use crate::{config, pubsub_cmd};
|
use crate::{config, pubsub_cmd};
|
||||||
use futures::{Async, Poll};
|
use futures::{Async, Poll};
|
||||||
use log::info;
|
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::{collections, io::Write, net, time};
|
use std::{collections, net, time};
|
||||||
use tokio::io::Error;
|
use tokio::io::Error;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
@ -71,12 +70,12 @@ impl Receiver {
|
||||||
let mut timelines_to_modify = Vec::new();
|
let mut timelines_to_modify = Vec::new();
|
||||||
struct Change {
|
struct Change {
|
||||||
timeline: String,
|
timeline: String,
|
||||||
change_in_subscriber_number: i32,
|
in_subscriber_number: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
timelines_to_modify.push(Change {
|
timelines_to_modify.push(Change {
|
||||||
timeline: timeline.to_owned(),
|
timeline: timeline.to_owned(),
|
||||||
change_in_subscriber_number: 1,
|
in_subscriber_number: 1,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Keep only message queues that have been polled recently
|
// Keep only message queues that have been polled recently
|
||||||
|
@ -87,7 +86,7 @@ impl Receiver {
|
||||||
let timeline = &msg_queue.redis_channel;
|
let timeline = &msg_queue.redis_channel;
|
||||||
timelines_to_modify.push(Change {
|
timelines_to_modify.push(Change {
|
||||||
timeline: timeline.to_owned(),
|
timeline: timeline.to_owned(),
|
||||||
change_in_subscriber_number: -1,
|
in_subscriber_number: -1,
|
||||||
});
|
});
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
@ -95,20 +94,15 @@ impl Receiver {
|
||||||
|
|
||||||
// Record the lower number of clients subscribed to that channel
|
// Record the lower number of clients subscribed to that channel
|
||||||
for change in timelines_to_modify {
|
for change in timelines_to_modify {
|
||||||
let mut need_to_subscribe = false;
|
|
||||||
let count_of_subscribed_clients = self
|
let count_of_subscribed_clients = self
|
||||||
.clients_per_timeline
|
.clients_per_timeline
|
||||||
.entry(change.timeline.clone())
|
.entry(change.timeline.clone())
|
||||||
.and_modify(|n| *n += change.change_in_subscriber_number)
|
.and_modify(|n| *n += change.in_subscriber_number)
|
||||||
.or_insert_with(|| {
|
.or_insert_with(|| 1);
|
||||||
need_to_subscribe = true;
|
|
||||||
1
|
|
||||||
});
|
|
||||||
// If no clients, unsubscribe from the channel
|
// If no clients, unsubscribe from the channel
|
||||||
if *count_of_subscribed_clients <= 0 {
|
if *count_of_subscribed_clients <= 0 {
|
||||||
pubsub_cmd!("unsubscribe", self, change.timeline.clone());
|
pubsub_cmd!("unsubscribe", self, change.timeline.clone());
|
||||||
}
|
} else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 {
|
||||||
if need_to_subscribe {
|
|
||||||
pubsub_cmd!("subscribe", self, change.timeline.clone());
|
pubsub_cmd!("subscribe", self, change.timeline.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,32 +1,48 @@
|
||||||
//! Send raw TCP commands to the Redis server
|
//! Send raw TCP commands to the Redis server
|
||||||
use log::info;
|
use crate::config;
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
|
|
||||||
/// Send a subscribe or unsubscribe to the Redis PubSub channel
|
/// Send a subscribe or unsubscribe to the Redis PubSub channel
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! pubsub_cmd {
|
macro_rules! pubsub_cmd {
|
||||||
($cmd:expr, $self:expr, $tl:expr) => {{
|
($cmd:expr, $self:expr, $tl:expr) => {{
|
||||||
info!("Sending {} command to {}", $cmd, $tl);
|
use std::io::Write;
|
||||||
|
log::info!("Sending {} command to {}", $cmd, $tl);
|
||||||
$self
|
$self
|
||||||
.pubsub_connection
|
.pubsub_connection
|
||||||
.write_all(&redis_cmd::pubsub($cmd, $tl))
|
.write_all(&redis_cmd::pubsub($cmd, $tl))
|
||||||
.expect("Can send command to Redis");
|
.expect("Can send command to Redis");
|
||||||
let new_value = if $cmd == "subscribe" { "1" } else { "0" };
|
// Because we keep track of the number of clients subscribed to a channel on our end,
|
||||||
|
// we need to manually tell Redis when we have subscribed or unsubscribed
|
||||||
|
let subscription_new_number = match $cmd {
|
||||||
|
"unsubscribe" => "0",
|
||||||
|
"subscribe" => "1",
|
||||||
|
_ => panic!("Given unacceptable PUBSUB command"),
|
||||||
|
};
|
||||||
$self
|
$self
|
||||||
.secondary_redis_connection
|
.secondary_redis_connection
|
||||||
.write_all(&redis_cmd::set(
|
.write_all(&redis_cmd::set(
|
||||||
format!("subscribed:timeline:{}", $tl),
|
format!("subscribed:timeline:{}", $tl),
|
||||||
new_value,
|
subscription_new_number,
|
||||||
))
|
))
|
||||||
.expect("Can set Redis");
|
.expect("Can set Redis");
|
||||||
info!("Now subscribed to: {:#?}", $self.msg_queues);
|
|
||||||
|
log::info!("Now subscribed to: {:#?}", $self.msg_queues);
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
/// Send a `SUBSCRIBE` or `UNSUBSCRIBE` command to a specific timeline
|
/// Send a `SUBSCRIBE` or `UNSUBSCRIBE` command to a specific timeline
|
||||||
pub fn pubsub(command: impl Display, timeline: impl Display) -> Vec<u8> {
|
pub fn pubsub(command: impl Display, timeline: impl Display) -> Vec<u8> {
|
||||||
let arg = format!("timeline:{}", timeline);
|
let arg = match &*config::REDIS_NAMESPACE {
|
||||||
let command = command.to_string();
|
Some(namespace) => format!("{}:timeline:{}", namespace, timeline),
|
||||||
info!("Sent {} command", &command);
|
None => format!("timeline:{}", timeline),
|
||||||
|
};
|
||||||
|
cmd(command, arg)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a generic two-item command to Redis
|
||||||
|
pub fn cmd(command: impl Display, arg: impl Display) -> Vec<u8> {
|
||||||
|
let (command, arg) = (command.to_string(), arg.to_string());
|
||||||
|
log::info!("Sent {} command", &command);
|
||||||
format!(
|
format!(
|
||||||
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
|
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
|
||||||
cmd_length = command.len(),
|
cmd_length = command.len(),
|
||||||
|
@ -38,9 +54,13 @@ pub fn pubsub(command: impl Display, timeline: impl Display) -> Vec<u8> {
|
||||||
.to_owned()
|
.to_owned()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a `SET` command
|
/// Send a `SET` command (used to manually unsubscribe from Redis)
|
||||||
pub fn set(key: impl Display, value: impl Display) -> Vec<u8> {
|
pub fn set(key: impl Display, value: impl Display) -> Vec<u8> {
|
||||||
let (key, value) = (key.to_string(), value.to_string());
|
let key = match &*config::REDIS_NAMESPACE {
|
||||||
|
Some(namespace) => format!("{}:{}", namespace, key),
|
||||||
|
None => key.to_string(),
|
||||||
|
};
|
||||||
|
let value = value.to_string();
|
||||||
format!(
|
format!(
|
||||||
"*3\r\n$3\r\nSET\r\n${key_length}\r\n{key}\r\n${value_length}\r\n{value}\r\n",
|
"*3\r\n$3\r\nSET\r\n${key_length}\r\n{key}\r\n${value_length}\r\n{value}\r\n",
|
||||||
key_length = key.len(),
|
key_length = key.len(),
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use super::receiver::Receiver;
|
use super::receiver::Receiver;
|
||||||
|
use crate::config;
|
||||||
use futures::{Async, Poll};
|
use futures::{Async, Poll};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
|
@ -22,7 +23,16 @@ impl<'a> AsyncReadableStream<'a> {
|
||||||
let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection);
|
let mut async_stream = AsyncReadableStream::new(&mut receiver.pubsub_connection);
|
||||||
|
|
||||||
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() {
|
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() {
|
||||||
let raw_redis_response = async_stream.to_utf8(buffer, num_bytes_read);
|
let raw_redis_response = async_stream.as_utf8(buffer, num_bytes_read);
|
||||||
|
if raw_redis_response.starts_with("-NOAUTH") {
|
||||||
|
eprintln!(
|
||||||
|
r"Invalid authentication for Redis.
|
||||||
|
Do you need a password?
|
||||||
|
If so, set it with the REDIS_PASSWORD environmental variable"
|
||||||
|
);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
receiver.incoming_raw_msg.push_str(&raw_redis_response);
|
receiver.incoming_raw_msg.push_str(&raw_redis_response);
|
||||||
|
|
||||||
// Only act if we have a full message (end on a msg boundary)
|
// Only act if we have a full message (end on a msg boundary)
|
||||||
|
@ -31,16 +41,22 @@ impl<'a> AsyncReadableStream<'a> {
|
||||||
};
|
};
|
||||||
let mut msg = RedisMsg::from_raw(&receiver.incoming_raw_msg);
|
let mut msg = RedisMsg::from_raw(&receiver.incoming_raw_msg);
|
||||||
|
|
||||||
|
let prefix_to_skip = match &*config::REDIS_NAMESPACE {
|
||||||
|
Some(namespace) => format!("{}:timeline:", namespace),
|
||||||
|
None => "timeline:".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
while !msg.raw.is_empty() {
|
while !msg.raw.is_empty() {
|
||||||
let command = msg.next_field();
|
let command = msg.next_field();
|
||||||
match command.as_str() {
|
match command.as_str() {
|
||||||
"message" => {
|
"message" => {
|
||||||
let timeline = &msg.next_field()["timeline:".len()..];
|
let timeline = &msg.next_field()[prefix_to_skip.len()..];
|
||||||
let msg_txt = &msg.next_field();
|
let msg_txt = &msg.next_field();
|
||||||
let msg_value: Value = match serde_json::from_str(msg_txt) {
|
let msg_value: Value = match serde_json::from_str(msg_txt) {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => panic!("Unparseable json {}\n\n{}", msg_txt, e),
|
Err(e) => panic!("Unparseable json {}\n\n{}", msg_txt, e),
|
||||||
};
|
};
|
||||||
|
dbg!(&timeline);
|
||||||
for msg_queue in receiver.msg_queues.values_mut() {
|
for msg_queue in receiver.msg_queues.values_mut() {
|
||||||
if msg_queue.redis_channel == timeline {
|
if msg_queue.redis_channel == timeline {
|
||||||
msg_queue.messages.push_back(msg_value.clone());
|
msg_queue.messages.push_back(msg_value.clone());
|
||||||
|
@ -62,12 +78,12 @@ impl<'a> AsyncReadableStream<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn to_utf8(&mut self, cur_buffer: Vec<u8>, size: usize) -> String {
|
fn as_utf8(&mut self, cur_buffer: Vec<u8>, size: usize) -> String {
|
||||||
String::from_utf8(cur_buffer[..size].to_vec()).unwrap_or_else(|_| {
|
String::from_utf8(cur_buffer[..size].to_vec()).unwrap_or_else(|_| {
|
||||||
let mut new_buffer = vec![0u8; 1];
|
let mut new_buffer = vec![0u8; 1];
|
||||||
self.poll_read(&mut new_buffer).unwrap();
|
self.poll_read(&mut new_buffer).unwrap();
|
||||||
let buffer = ([cur_buffer, new_buffer]).concat();
|
let buffer = ([cur_buffer, new_buffer]).concat();
|
||||||
self.to_utf8(buffer, size + 1)
|
self.as_utf8(buffer, size + 1)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue