Add type safety to RedisConfig

This commit is contained in:
Daniel Sockwell 2019-10-08 20:14:05 -04:00
parent 526e9d99cb
commit fce9d4e0a7
9 changed files with 154 additions and 99 deletions

View File

@ -1,5 +1,4 @@
use super::deployment_cfg_types::*;
use std::collections::HashMap;
use super::{deployment_cfg_types::*, EnvVar};
#[derive(Debug, Default)]
pub struct DeploymentConfig<'a> {
@ -14,7 +13,7 @@ pub struct DeploymentConfig<'a> {
}
impl DeploymentConfig<'_> {
pub fn from_env(env: HashMap<String, String>) -> Self {
pub fn from_env(env: EnvVar) -> Self {
let mut cfg = Self {
env: Env::default().maybe_update(env.get("NODE_ENV")),
log_level: LogLevel::default().maybe_update(env.get("RUST_LOG")),

View File

@ -47,14 +47,14 @@ from_env_var!(
let name = WsInterval;
let default: Duration = Duration::from_millis(100);
let (env_var, allowed_values) = ("WS_FREQ", "a valid Unix Socket".to_string());
let from_str = |s| s.parse().map(|num| Duration::from_millis(num)).ok();
let from_str = |s| s.parse().map(Duration::from_millis).ok();
);
from_env_var!(
/// The time between replies sent via Server Sent Events
let name = SseInterval;
let default: Duration = Duration::from_millis(100);
let (env_var, allowed_values) = ("WS_FREQ", "a number of milliseconds".to_string());
let from_str = |s| s.parse().map(|num| Duration::from_millis(num)).ok();
let from_str = |s| s.parse().map(Duration::from_millis).ok();
);
from_env_var!(
/// The port to run Flodgatt on

View File

@ -4,9 +4,55 @@ mod postgres_cfg;
mod redis_cfg;
mod redis_cfg_types;
pub use self::{
deployment_cfg::DeploymentConfig, postgres_cfg::PostgresConfig, redis_cfg::RedisConfig,
redis_cfg_types::RedisInterval,
deployment_cfg::DeploymentConfig,
postgres_cfg::PostgresConfig,
redis_cfg::RedisConfig,
redis_cfg_types::{RedisInterval, RedisNamespace},
};
use std::collections::HashMap;
use url::Url;
pub struct EnvVar(pub HashMap<String, String>);
impl std::ops::Deref for EnvVar {
type Target = HashMap<String, String>;
fn deref(&self) -> &HashMap<String, String> {
&self.0
}
}
impl Clone for EnvVar {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl EnvVar {
fn update_with_url(mut self, url_str: &str) -> Self {
let url = Url::parse(url_str).unwrap();
let none_if_empty = |s: String| if s.is_empty() { None } else { Some(s) };
self.maybe_add_env_var("REDIS_PORT", url.port());
self.maybe_add_env_var("REDIS_PASSWORD", url.password());
self.maybe_add_env_var("REDIS_USERNAME", none_if_empty(url.username().to_string()));
self.maybe_add_env_var("REDIS_DB", none_if_empty(url.path()[1..].to_string()));
for (k, v) in url.query_pairs().into_owned() {
match k.to_string().as_str() {
"password" => self.maybe_add_env_var("REDIS_PASSWORD", Some(v.to_string())),
"db" => self.maybe_add_env_var("REDIS_DB", Some(v.to_string())),
_ => crate::err::die_with_msg(format!(
r"Unsupported parameter {} in REDIS_URL.
Flodgatt supports only `password` and `db` parameters.",
k
)),
}
}
self
}
fn maybe_add_env_var(&mut self, key: &str, maybe_value: Option<impl ToString>) {
if let Some(value) = maybe_value {
self.0.insert(key.to_string(), value.to_string());
}
}
}
#[macro_export]
macro_rules! maybe_update {

View File

@ -1,5 +1,5 @@
use super::EnvVar;
use crate::{err, maybe_update};
use std::collections::HashMap;
use url::Url;
#[derive(Debug)]
@ -30,7 +30,7 @@ fn none_if_empty(item: &str) -> Option<String> {
impl PostgresConfig {
/// Configure Postgres and return a connection
pub fn from_env(env_vars: HashMap<String, String>) -> Self {
pub fn from_env(env_vars: EnvVar) -> Self {
// use openssl::ssl::{SslConnector, SslMethod};
// use postgres_openssl::MakeTlsConnector;
// let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();

View File

@ -1,64 +1,50 @@
use super::redis_cfg_types::*;
//use crate::{err, maybe_update};
use crate::maybe_update;
use std::collections::HashMap;
//use url::Url;
use crate::config::EnvVar;
fn none_if_empty(item: &str) -> Option<String> {
Some(item).filter(|i| !i.is_empty()).map(String::from)
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct RedisConfig {
pub user: Option<String>,
pub user: RedisUser,
pub password: RedisPass,
pub port: RedisPort,
pub host: RedisHost,
pub db: Option<String>,
pub namespace: Option<String>,
// **NOTE**: Polling Redis is much more time consuming than polling the `Receiver`
// (on the order of 1ms rather than 50μs). Thus, changing this setting
// would be a good place to start for performance improvements at the cost
// of delaying all updates.
pub db: RedisDb,
pub namespace: RedisNamespace,
// **NOTE**: Polling Redis is much more time consuming than polling the `Receiver` (~1ms
// compared to ~50μs). Thus, changing this setting with REDIS_POLL_INTERVAL may be a good
// place to start for performance improvements at the cost of delaying all updates.
pub polling_interval: RedisInterval,
}
impl Default for RedisConfig {
fn default() -> Self {
Self {
user: None,
password: RedisPass::default(),
db: None,
port: RedisPort::default(),
host: RedisHost::default(),
namespace: None,
polling_interval: RedisInterval::default(),
}
}
}
impl RedisConfig {
pub fn from_env(env_vars: HashMap<String, String>) -> Self {
// TODO handle REDIS_URL
const USER_SET_WARNING: &'static str =
"Redis user specified, but Redis did not ask for a username. Ignoring it.";
const DB_SET_WARNING: &'static str =
r"Redis database specified, but PubSub connections do not use databases.
For similar functionality, you may wish to set a REDIS_NAMESPACE";
let mut cfg = RedisConfig::default();
cfg.host = RedisHost::default().maybe_update(env_vars.get("REDIS_HOST"));
cfg = cfg.maybe_update_namespace(env_vars.get("REDIS_NAMESPACE").map(String::from));
pub fn from_env(env: EnvVar) -> Self {
let env = match env.get("REDIS_URL").map(|s| s.clone()) {
Some(url_str) => env.update_with_url(&url_str),
None => env,
};
cfg.port = RedisPort::default().maybe_update(env_vars.get("REDIS_PORT"));
cfg.polling_interval =
RedisInterval::default().maybe_update(env_vars.get("REDIS_POLL_INTERVAL"));
cfg.password = RedisPass::default().maybe_update(env_vars.get("REDIS_PASSWORD"));
let cfg = RedisConfig {
user: RedisUser::default().maybe_update(env.get("REDIS_USER")),
password: RedisPass::default().maybe_update(env.get("REDIS_PASSWORD")),
port: RedisPort::default().maybe_update(env.get("REDIS_PORT")),
host: RedisHost::default().maybe_update(env.get("REDIS_HOST")),
db: RedisDb::default().maybe_update(env.get("REDIS_DB")),
namespace: RedisNamespace::default().maybe_update(env.get("REDIS_NAMESPACE")),
polling_interval: RedisInterval::default().maybe_update(env.get("REDIS_POLL_INTERVAL")),
};
cfg.log()
}
// maybe_update!(maybe_update_host; host: String);
// maybe_update!(maybe_update_port; port: u16);
maybe_update!(maybe_update_namespace; Some(namespace: String));
// maybe_update!(maybe_update_polling_interval; polling_interval: Duration);
fn log(self) -> Self {
log::warn!("Redis configuration:\n{:#?},", &self);
self
if cfg.db.is_some() {
log::warn!("{}", Self::DB_SET_WARNING);
}
if cfg.user.is_some() {
log::warn!("{}", Self::USER_SET_WARNING);
}
log::info!("Redis configuration:\n{:#?},", &cfg);
cfg
}
}

View File

@ -16,7 +16,6 @@ from_env_var!(
_ => s.parse().ok(),
};
);
from_env_var!(
/// The port Redis is running on
let name = RedisPort;
@ -29,7 +28,7 @@ from_env_var!(
let name = RedisInterval;
let default: Duration = Duration::from_millis(100);
let (env_var, allowed_values) = ("REDIS_POLL_INTERVAL", "a number of milliseconds".to_string());
let from_str = |s| s.parse().map(|num| Duration::from_millis(num)).ok();
let from_str = |s| s.parse().map(Duration::from_millis).ok();
);
from_env_var!(
/// The password to use for Redis
@ -38,3 +37,24 @@ from_env_var!(
let (env_var, allowed_values) = ("REDIS_PASSWORD", "any string".to_string());
let from_str = |s| Some(Some(s.to_string()));
);
from_env_var!(
/// An optional Redis Namespace
let name = RedisNamespace;
let default: Option<String> = None;
let (env_var, allowed_values) = ("REDIS_NAMESPACE", "any string".to_string());
let from_str = |s| Some(Some(s.to_string()));
);
from_env_var!(
/// A user for Redis (not supported)
let name = RedisUser;
let default: Option<String> = None;
let (env_var, allowed_values) = ("REDIS_USER", "any string".to_string());
let from_str = |s| Some(Some(s.to_string()));
);
from_env_var!(
/// The database to use with Redis (no current effect for PubSub connections)
let name = RedisDb;
let default: Option<String> = None;
let (env_var, allowed_values) = ("REDIS_DB", "any string".to_string());
let from_str = |s| Some(Some(s.to_string()));
);

View File

@ -14,11 +14,12 @@ fn main() {
Some("development") | None => ".env",
Some(_) => err::die_with_msg("Unknown ENV variable specified.\n Valid options are: `production` or `development`."),
}).ok();
let env_vars: HashMap<_, _> = dotenv::vars().collect();
let env_vars_map: HashMap<_, _> = dotenv::vars().collect();
let env_vars = config::EnvVar(env_vars_map);
pretty_env_logger::init();
let cfg = config::DeploymentConfig::from_env(env_vars.clone());
let redis_cfg = config::RedisConfig::from_env(env_vars.clone());
let cfg = config::DeploymentConfig::from_env(env_vars.clone());
let postgres_cfg = config::PostgresConfig::from_env(env_vars.clone());
let client_agent_sse = ClientAgent::blank(redis_cfg);

View File

@ -1,7 +1,11 @@
//! Receives data from Redis, sorts it by `ClientAgent`, and stores it until
//! polled by the correct `ClientAgent`. Also manages sububscriptions and
//! unsubscriptions to/from Redis.
use super::{config, config::RedisInterval, redis_cmd, redis_stream, redis_stream::RedisConn};
use super::{
config::{self, RedisInterval, RedisNamespace},
redis_cmd, redis_stream,
redis_stream::RedisConn,
};
use crate::pubsub_cmd;
use futures::{Async, Poll};
use serde_json::Value;
@ -14,7 +18,7 @@ use uuid::Uuid;
pub struct Receiver {
pub pubsub_connection: net::TcpStream,
secondary_redis_connection: net::TcpStream,
pub redis_namespace: Option<String>,
pub redis_namespace: RedisNamespace,
redis_poll_interval: RedisInterval,
redis_polled_at: time::Instant,
timeline: String,

View File

@ -1,5 +1,9 @@
use super::receiver::Receiver;
use crate::{config, config::RedisInterval, err, redis_to_client_stream::redis_cmd};
use crate::{
config::{self, RedisInterval, RedisNamespace},
err,
redis_to_client_stream::redis_cmd,
};
use futures::{Async, Poll};
use serde_json::Value;
use std::{io::Read, io::Write, net, time};
@ -8,16 +12,16 @@ use tokio::io::AsyncRead;
pub struct RedisConn {
pub primary: net::TcpStream,
pub secondary: net::TcpStream,
pub namespace: Option<String>,
pub namespace: RedisNamespace,
pub polling_interval: RedisInterval,
}
fn send_password(mut conn: net::TcpStream, password: &String) -> net::TcpStream {
fn send_password(mut conn: net::TcpStream, password: &str) -> net::TcpStream {
conn.write_all(&redis_cmd::cmd("auth", &password)).unwrap();
let mut buffer = vec![0u8; 5];
conn.read_exact(&mut buffer).unwrap();
let reply = String::from_utf8(buffer.to_vec()).unwrap();
if reply != "+OK\r\n".to_string() {
if reply != "+OK\r\n" {
err::die_with_msg(format!(
r"Incorrect Redis password. You supplied `{}`.
Please supply correct password with REDIS_PASSWORD environmental variable.",
@ -27,6 +31,11 @@ fn send_password(mut conn: net::TcpStream, password: &String) -> net::TcpStream
conn
}
fn set_db(mut conn: net::TcpStream, db: &str) -> net::TcpStream {
conn.write_all(&redis_cmd::cmd("SELECT", &db)).unwrap();
conn
}
fn send_test_ping(mut conn: net::TcpStream) -> net::TcpStream {
conn.write_all(b"PING\r\n").unwrap();
let mut buffer = vec![0u8; 7];
@ -60,39 +69,29 @@ impl RedisConn {
*redis_cfg.host, *redis_cfg.port, e,
))
};
let mut pubsub_connection = net::TcpStream::connect(addr).unwrap_or_else(conn_err);
let mut secondary_redis_connection = net::TcpStream::connect(addr).unwrap_or_else(conn_err);
if let Some(password) = redis_cfg.password.clone() {
pubsub_connection = send_password(pubsub_connection, &password);
secondary_redis_connection = send_password(secondary_redis_connection, &password)
}
pubsub_connection = send_test_ping(pubsub_connection);
secondary_redis_connection = send_test_ping(secondary_redis_connection);
pubsub_connection
.set_read_timeout(Some(time::Duration::from_millis(10)))
.expect("Can set read timeout for Redis connection");
pubsub_connection
let update_conn = |mut conn| {
if let Some(password) = redis_cfg.password.clone() {
conn = send_password(conn, &password);
}
conn = send_test_ping(conn);
conn.set_read_timeout(Some(time::Duration::from_millis(10)))
.expect("Can set read timeout for Redis connection");
if let Some(db) = &*redis_cfg.db {
conn = set_db(conn, db);
}
conn
};
let (primary_conn, secondary_conn) = (
update_conn(net::TcpStream::connect(addr).unwrap_or_else(conn_err)),
update_conn(net::TcpStream::connect(addr).unwrap_or_else(conn_err)),
);
primary_conn
.set_nonblocking(true)
.expect("set_nonblocking call failed");
secondary_redis_connection
.set_read_timeout(Some(time::Duration::from_millis(10)))
.expect("Can set read timeout for Redis connection");
if let Some(db) = redis_cfg.db {
pubsub_connection
.write_all(&redis_cmd::cmd("SELECT", &db))
.unwrap();
secondary_redis_connection
.write_all(&redis_cmd::cmd("SELECT", &db))
.unwrap();
}
Self {
primary: pubsub_connection,
secondary: secondary_redis_connection,
primary: primary_conn,
secondary: secondary_conn,
namespace: redis_cfg.namespace,
polling_interval: redis_cfg.polling_interval,
}
@ -139,7 +138,7 @@ Please update the REDIS_HOST and/or REDIS_PORT environmental variables with the
};
let mut msg = RedisMsg::from_raw(&receiver.incoming_raw_msg);
let prefix_to_skip = match &receiver.redis_namespace {
let prefix_to_skip = match &*receiver.redis_namespace {
Some(namespace) => format!("{}:timeline:", namespace),
None => "timeline:".to_string(),
};