Finish error handling improvements

This commit is contained in:
Daniel Sockwell 2020-04-14 18:40:33 -04:00
parent 3280c12fe1
commit e100f5760a
20 changed files with 187 additions and 149 deletions

View File

2
Cargo.lock generated
View File

@ -406,7 +406,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "flodgatt"
version = "0.8.3"
version = "0.8.4"
dependencies = [
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,7 +1,7 @@
[package]
name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.8.3"
version = "0.8.4"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018"

View File

@ -1,7 +1,7 @@
pub use {deployment_cfg::Deployment, postgres_cfg::Postgres, redis_cfg::Redis};
use self::environmental_variables::EnvVar;
use super::err;
use super::err::FatalErr;
use hashbrown::HashMap;
use std::env;
@ -13,21 +13,45 @@ mod postgres_cfg_types;
mod redis_cfg;
mod redis_cfg_types;
pub fn merge_dotenv() -> Result<(), err::FatalErr> {
dotenv::from_filename(match env::var("ENV").ok().as_deref() {
type Result<T> = std::result::Result<T, FatalErr>;
pub fn merge_dotenv() -> Result<()> {
let env_file = match env::var("ENV").ok().as_deref() {
Some("production") => ".env.production",
Some("development") | None => ".env",
Some(_unsupported) => Err(err::FatalErr::Unknown)?, // TODO make more specific
})?;
Some(v) => Err(FatalErr::config("ENV", v, "`production` or `development`"))?,
};
let res = dotenv::from_filename(env_file);
if let Ok(log_level) = env::var("RUST_LOG") {
if res.is_err() && ["warn", "info", "trace", "debug"].contains(&log_level.as_str()) {
eprintln!(
" WARN: could not load environmental variables from {:?}\n\
{:8}Are you in the right directory? Proceeding with variables from the environment.",
env::current_dir().unwrap_or_else(|_|"./".into()).join(env_file), ""
);
}
}
Ok(())
}
pub fn from_env<'a>(env_vars: HashMap<String, String>) -> (Postgres, Redis, Deployment<'a>) {
#[allow(clippy::implicit_hasher)]
pub fn from_env<'a>(
env_vars: HashMap<String, String>,
) -> Result<(Postgres, Redis, Deployment<'a>)> {
let env_vars = EnvVar::new(env_vars);
log::info!("Environmental variables Flodgatt received: {}", &env_vars);
(
Postgres::from_env(env_vars.clone()),
Redis::from_env(env_vars.clone()),
Deployment::from_env(env_vars.clone()),
)
log::info!(
"Flodgatt received the following environmental variables:{}",
&env_vars
);
let pg_cfg = Postgres::from_env(env_vars.clone())?;
log::info!("Configuration for {:#?}", &pg_cfg);
let redis_cfg = Redis::from_env(env_vars.clone())?;
log::info!("Configuration for {:#?},", &redis_cfg);
let deployment_cfg = Deployment::from_env(&env_vars)?;
log::info!("Configuration for {:#?}", &deployment_cfg);
Ok((pg_cfg, redis_cfg, deployment_cfg))
}

View File

@ -1,4 +1,5 @@
use super::{deployment_cfg_types::*, EnvVar};
use crate::err::FatalErr;
#[derive(Debug, Default)]
pub struct Deployment<'a> {
@ -14,20 +15,19 @@ pub struct Deployment<'a> {
}
impl Deployment<'_> {
pub fn from_env(env: EnvVar) -> Self {
pub fn from_env(env: &EnvVar) -> Result<Self, FatalErr> {
let mut cfg = Self {
env: Env::default().maybe_update(env.get("NODE_ENV")),
log_level: LogLevel::default().maybe_update(env.get("RUST_LOG")),
address: FlodgattAddr::default().maybe_update(env.get("BIND")),
port: Port::default().maybe_update(env.get("PORT")),
unix_socket: Socket::default().maybe_update(env.get("SOCKET")),
sse_interval: SseInterval::default().maybe_update(env.get("SSE_FREQ")),
ws_interval: WsInterval::default().maybe_update(env.get("WS_FREQ")),
whitelist_mode: WhitelistMode::default().maybe_update(env.get("WHITELIST_MODE")),
env: Env::default().maybe_update(env.get("NODE_ENV"))?,
log_level: LogLevel::default().maybe_update(env.get("RUST_LOG"))?,
address: FlodgattAddr::default().maybe_update(env.get("BIND"))?,
port: Port::default().maybe_update(env.get("PORT"))?,
unix_socket: Socket::default().maybe_update(env.get("SOCKET"))?,
sse_interval: SseInterval::default().maybe_update(env.get("SSE_FREQ"))?,
ws_interval: WsInterval::default().maybe_update(env.get("WS_FREQ"))?,
whitelist_mode: WhitelistMode::default().maybe_update(env.get("WHITELIST_MODE"))?,
cors: Cors::default(),
};
cfg.env = cfg.env.maybe_update(env.get("RUST_ENV"));
log::info!("Using deployment configuration:\n {:#?}", &cfg);
cfg
cfg.env = cfg.env.maybe_update(env.get("RUST_ENV"))?;
Ok(cfg)
}
}

View File

@ -17,7 +17,7 @@ from_env_var!(
from_env_var!(
/// The address to run Flodgatt on
let name = FlodgattAddr;
let default: IpAddr = IpAddr::V4("127.0.0.1".parse().expect("hardcoded"));
let default: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let (env_var, allowed_values) = ("BIND", "a valid address (e.g., 127.0.0.1)");
let from_str = |s| match s {
"localhost" => Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),

View File

@ -25,17 +25,6 @@ impl EnvVar {
self.0.insert(key.to_string(), value.to_string());
}
}
pub fn err(env_var: &str, supplied_value: &str, allowed_values: &str) -> ! {
log::error!(
r"{var} is set to `{value}`, which is invalid.
{var} must be {allowed_vals}.",
var = env_var,
value = supplied_value,
allowed_vals = allowed_values
);
std::process::exit(1);
}
}
impl fmt::Display for EnvVar {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -98,7 +87,7 @@ macro_rules! from_env_var {
#[derive(Clone)]
pub struct $name(pub $type);
impl std::fmt::Debug for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self.0)
}
}
@ -117,14 +106,14 @@ macro_rules! from_env_var {
fn inner_from_str($arg: &str) -> Option<$type> {
$body
}
pub fn maybe_update(self, var: Option<&String>) -> Self {
match var {
pub fn maybe_update(self, var: Option<&String>) -> Result<Self, crate::err::FatalErr> {
Ok(match var {
Some(empty_string) if empty_string.is_empty() => Self::default(),
Some(value) => Self(Self::inner_from_str(value).unwrap_or_else(|| {
crate::config::EnvVar::err($env_var, value, $allowed_values)
})),
Some(value) => Self(Self::inner_from_str(value).ok_or_else(|| {
crate::err::FatalErr::config($env_var, value, $allowed_values)
})?),
None => self,
}
})
}
}
};

View File

@ -1,7 +1,11 @@
use super::{postgres_cfg_types::*, EnvVar};
use crate::err::FatalErr;
use url::Url;
use urlencoding;
type Result<T> = std::result::Result<T, FatalErr>;
#[derive(Debug, Clone)]
pub struct Postgres {
pub user: PgUser,
@ -13,8 +17,8 @@ pub struct Postgres {
}
impl EnvVar {
fn update_with_postgres_url(mut self, url_str: &str) -> Self {
let url = Url::parse(url_str).unwrap();
fn update_with_postgres_url(mut self, url_str: &str) -> Result<Self> {
let url = Url::parse(url_str)?;
let none_if_empty = |s: String| if s.is_empty() { None } else { Some(s) };
for (k, v) in url.query_pairs().into_owned() {
@ -23,11 +27,11 @@ impl EnvVar {
"password" => self.maybe_add_env_var("DB_PASS", Some(v.to_string())),
"host" => self.maybe_add_env_var("DB_HOST", Some(v.to_string())),
"sslmode" => self.maybe_add_env_var("DB_SSLMODE", Some(v.to_string())),
_ => crate::err::die_with_msg(format!(
r"Unsupported parameter {} in POSTGRES_URL
Flodgatt supports only `password`, `user`, `host`, and `sslmode` parameters",
k
)),
_ => Err(FatalErr::config(
"POSTGRES_URL",
&k,
"a URL with parameters `password`, `user`, `host`, and `sslmode` only",
))?,
}
}
@ -35,42 +39,39 @@ impl EnvVar {
self.maybe_add_env_var("DB_PASS", url.password());
self.maybe_add_env_var(
"DB_HOST",
url.host().map(|h| {
urlencoding::decode(&h.to_string()).expect("Non-Unicode text in hostname")
}),
url.host()
.map(|host| urlencoding::decode(&host.to_string()))
.transpose()?,
);
self.maybe_add_env_var("DB_USER", none_if_empty(url.username().to_string()));
self.maybe_add_env_var("DB_NAME", none_if_empty(url.path()[1..].to_string()));
self
Ok(self)
}
}
impl Postgres {
/// Configure Postgres and return a connection
pub fn from_env(env: EnvVar) -> Self {
pub fn from_env(env: EnvVar) -> Result<Self> {
let env = match env.get("DATABASE_URL").cloned() {
Some(url_str) => env.update_with_postgres_url(&url_str),
Some(url_str) => env.update_with_postgres_url(&url_str)?,
None => env,
};
let cfg = Self {
user: PgUser::default().maybe_update(env.get("DB_USER")),
host: PgHost::default().maybe_update(env.get("DB_HOST")),
password: PgPass::default().maybe_update(env.get("DB_PASS")),
database: PgDatabase::default().maybe_update(env.get("DB_NAME")),
port: PgPort::default().maybe_update(env.get("DB_PORT")),
ssl_mode: PgSslMode::default().maybe_update(env.get("DB_SSLMODE")),
user: PgUser::default().maybe_update(env.get("DB_USER"))?,
host: PgHost::default().maybe_update(env.get("DB_HOST"))?,
password: PgPass::default().maybe_update(env.get("DB_PASS"))?,
database: PgDatabase::default().maybe_update(env.get("DB_NAME"))?,
port: PgPort::default().maybe_update(env.get("DB_PORT"))?,
ssl_mode: PgSslMode::default().maybe_update(env.get("DB_SSLMODE"))?,
};
log::info!("Postgres configuration:\n{:#?}", &cfg);
cfg
Ok(cfg)
}
// // use openssl::ssl::{SslConnector, SslMethod};
// // use postgres_openssl::MakeTlsConnector;
// // let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
// // builder.set_ca_file("/etc/ssl/cert.pem").unwrap();
// // let mut builder = SslConnector::builder(SslMethod::tls())?;
// // builder.set_ca_file("/etc/ssl/cert.pem")?;
// // let connector = MakeTlsConnector::new(builder.build());
// // TODO: add TLS support, remove `NoTls`
}

View File

@ -1,7 +1,11 @@
use super::redis_cfg_types::*;
use crate::config::EnvVar;
use super::EnvVar;
use crate::err::FatalErr;
use url::Url;
type Result<T> = std::result::Result<T, FatalErr>;
#[derive(Debug, Default)]
pub struct Redis {
pub user: RedisUser,
@ -17,8 +21,8 @@ pub struct Redis {
}
impl EnvVar {
fn update_with_redis_url(mut self, url_str: &str) -> Self {
let url = Url::parse(url_str).unwrap();
fn update_with_redis_url(mut self, url_str: &str) -> Result<Self> {
let url = Url::parse(url_str)?;
let none_if_empty = |s: String| if s.is_empty() { None } else { Some(s) };
self.maybe_add_env_var("REDIS_PORT", url.port());
@ -29,14 +33,14 @@ impl EnvVar {
match k.to_string().as_str() {
"password" => self.maybe_add_env_var("REDIS_PASSWORD", Some(v.to_string())),
"db" => self.maybe_add_env_var("REDIS_DB", Some(v.to_string())),
_ => crate::err::die_with_msg(format!(
r"Unsupported parameter {} in REDIS_URL.
Flodgatt supports only `password` and `db` parameters.",
k
)),
_ => Err(FatalErr::config(
"REDIS_URL",
&k,
"a URL with parameters `password`, `db`, only",
))?,
}
}
self
Ok(self)
}
}
@ -46,20 +50,20 @@ impl Redis {
const DB_SET_WARNING: &'static str = r"Redis database specified, but PubSub connections do not use databases.
For similar functionality, you may wish to set a REDIS_NAMESPACE";
pub fn from_env(env: EnvVar) -> Self {
pub fn from_env(env: EnvVar) -> Result<Self> {
let env = match env.get("REDIS_URL").cloned() {
Some(url_str) => env.update_with_redis_url(&url_str),
Some(url_str) => env.update_with_redis_url(&url_str)?,
None => env,
};
let cfg = Redis {
user: RedisUser::default().maybe_update(env.get("REDIS_USER")),
password: RedisPass::default().maybe_update(env.get("REDIS_PASSWORD")),
port: RedisPort::default().maybe_update(env.get("REDIS_PORT")),
host: RedisHost::default().maybe_update(env.get("REDIS_HOST")),
db: RedisDb::default().maybe_update(env.get("REDIS_DB")),
namespace: RedisNamespace::default().maybe_update(env.get("REDIS_NAMESPACE")),
polling_interval: RedisInterval::default().maybe_update(env.get("REDIS_FREQ")),
user: RedisUser::default().maybe_update(env.get("REDIS_USER"))?,
password: RedisPass::default().maybe_update(env.get("REDIS_PASSWORD"))?,
port: RedisPort::default().maybe_update(env.get("REDIS_PORT"))?,
host: RedisHost::default().maybe_update(env.get("REDIS_HOST"))?,
db: RedisDb::default().maybe_update(env.get("REDIS_DB"))?,
namespace: RedisNamespace::default().maybe_update(env.get("REDIS_NAMESPACE"))?,
polling_interval: RedisInterval::default().maybe_update(env.get("REDIS_FREQ"))?,
};
if cfg.db.is_some() {
@ -68,7 +72,6 @@ For similar functionality, you may wish to set a REDIS_NAMESPACE";
if cfg.user.is_some() {
log::warn!("{}", Self::USER_SET_WARNING);
}
log::info!("Redis configuration:\n{:#?},", &cfg);
cfg
Ok(cfg)
}
}

View File

@ -3,17 +3,27 @@ use crate::response::ManagerErr;
use std::fmt;
pub enum FatalErr {
Unknown,
ReceiverErr(ManagerErr),
DotEnv(dotenv::Error),
Logger(log::SetLoggerError),
Postgres(RequestErr),
Unrecoverable,
StdIo(std::io::Error),
// config errs
UrlParse(url::ParseError),
UrlEncoding(urlencoding::FromUrlEncodingError),
ConfigErr(String),
}
impl FatalErr {
pub fn exit(msg: impl fmt::Display) {
pub fn log(msg: impl fmt::Display) {
eprintln!("{}", msg);
std::process::exit(1);
}
pub fn config<T: fmt::Display>(var: T, value: T, allowed_vals: T) -> Self {
Self::ConfigErr(format!(
"{0} is set to `{1}`, which is invalid.\n{3:7}{0} must be {2}.",
var, value, allowed_vals, ""
))
}
}
@ -31,11 +41,14 @@ impl fmt::Display for FatalErr {
f,
"{}",
match self {
Unknown => "Flodgatt encountered an unknown, unrecoverable error".into(),
ReceiverErr(e) => format!("{}", e),
Logger(e) => format!("{}", e),
DotEnv(e) => format!("Could not load specified environmental file: {}", e),
Postgres(e) => format!("Could not connect to Postgres: {}", e),
StdIo(e) => format!("{}", e),
Postgres(e) => format!("could not connect to Postgres.\n{:7}{}", "", e),
ConfigErr(e) => e.to_string(),
UrlParse(e) => format!("could parse Postgres URL.\n{:7}{}", "", e),
UrlEncoding(e) => format!("could not parse POSTGRES_URL.\n{:7}{:?}", "", e),
Unrecoverable => "Flodgatt will now shut down.".into(),
}
)
}
@ -47,26 +60,28 @@ impl From<RequestErr> for FatalErr {
}
}
impl From<dotenv::Error> for FatalErr {
fn from(e: dotenv::Error) -> Self {
Self::DotEnv(e)
}
}
impl From<ManagerErr> for FatalErr {
fn from(e: ManagerErr) -> Self {
Self::ReceiverErr(e)
}
}
impl From<urlencoding::FromUrlEncodingError> for FatalErr {
fn from(e: urlencoding::FromUrlEncodingError) -> Self {
Self::UrlEncoding(e)
}
}
impl From<url::ParseError> for FatalErr {
fn from(e: url::ParseError) -> Self {
Self::UrlParse(e)
}
}
impl From<std::io::Error> for FatalErr {
fn from(e: std::io::Error) -> Self {
Self::StdIo(e)
}
}
impl From<log::SetLoggerError> for FatalErr {
fn from(e: log::SetLoggerError) -> Self {
Self::Logger(e)
}
}
// TODO delete vvvv when postgres_cfg.rs has better error handling
pub fn die_with_msg(msg: impl fmt::Display) -> ! {
eprintln!("FATAL ERROR: {}", msg);
std::process::exit(1);
}

View File

@ -43,7 +43,7 @@ type Result<T> = std::result::Result<T, EventErr>;
impl DynEvent {
pub fn set_update(self) -> Result<Self> {
if self.event == "update" {
let kind = EventKind::Update(DynStatus::new(self.payload.clone())?);
let kind = EventKind::Update(DynStatus::new(&self.payload.clone())?);
Ok(Self { kind, ..self })
} else {
Ok(self)
@ -52,7 +52,7 @@ impl DynEvent {
}
impl DynStatus {
pub fn new(payload: Value) -> Result<Self> {
pub fn new(payload: &Value) -> Result<Self> {
use EventErr::*;
Ok(Self {
@ -61,7 +61,7 @@ impl DynStatus {
.as_str()
.ok_or(DynParse)?
.to_string(),
language: payload["language"].as_str().map(|s| s.to_string()),
language: payload["language"].as_str().map(String::from),
mentioned_users: HashSet::new(),
replied_to_user: Id::try_from(&payload["in_reply_to_account_id"]).ok(),
boosted_user: Id::try_from(&payload["reblog"]["account"]["id"]).ok(),

View File

@ -37,6 +37,7 @@
//#![warn(clippy::pedantic)]
#![allow(clippy::try_err, clippy::match_bool)]
//#![allow(clippy::large_enum_variant)]
pub mod config;
pub mod err;

View File

@ -19,20 +19,19 @@ use warp::Filter;
fn main() -> Result<(), FatalErr> {
config::merge_dotenv()?;
pretty_env_logger::try_init()?;
let (postgres_cfg, redis_cfg, cfg) = config::from_env(dotenv::vars().collect());
let (postgres_cfg, redis_cfg, cfg) = config::from_env(dotenv::vars().collect())?;
let poll_freq = *redis_cfg.polling_interval;
// Create channels to communicate between threads
let (event_tx, event_rx) = watch::channel((Timeline::empty(), Event::Ping));
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let request = Handler::new(postgres_cfg, *cfg.whitelist_mode)?;
let poll_freq = *redis_cfg.polling_interval;
let shared_manager = redis::Manager::try_from(redis_cfg, event_tx, cmd_rx)?.into_arc();
let request = Handler::new(&postgres_cfg, *cfg.whitelist_mode)?;
let shared_manager = redis::Manager::try_from(&redis_cfg, event_tx, cmd_rx)?.into_arc();
// Server Sent Events
let sse_manager = shared_manager.clone();
let (sse_rx, sse_cmd_tx) = (event_rx.clone(), cmd_tx.clone());
let sse = request
.sse_subscription()
.and(warp::sse())
@ -85,8 +84,7 @@ fn main() -> Result<(), FatalErr> {
.map_err(|e| log::error!("{}", e))
.for_each(move |_| {
let mut manager = manager.lock().unwrap_or_else(redis::Manager::recover);
manager.poll_broadcast().unwrap_or_else(FatalErr::exit);
Ok(())
manager.poll_broadcast().map_err(FatalErr::log)
});
warp::spawn(lazy(move || stream));
warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err))
@ -95,13 +93,13 @@ fn main() -> Result<(), FatalErr> {
if let Some(socket) = &*cfg.unix_socket {
log::info!("Using Unix socket {}", socket);
fs::remove_file(socket).unwrap_or_default();
let incoming = UnixListener::bind(socket).expect("TODO").incoming();
fs::set_permissions(socket, PermissionsExt::from_mode(0o666)).expect("TODO");
let incoming = UnixListener::bind(socket)?.incoming();
fs::set_permissions(socket, PermissionsExt::from_mode(0o666))?;
tokio::run(lazy(|| streaming_server().serve_incoming(incoming)));
} else {
let server_addr = SocketAddr::new(*cfg.address, *cfg.port);
tokio::run(lazy(move || streaming_server().bind(server_addr)));
}
Ok(())
Err(FatalErr::Unrecoverable) // on get here if there's an unrecoverable error in poll_broadcast.
}

View File

@ -60,7 +60,7 @@ pub struct Handler {
}
impl Handler {
pub fn new(postgres_cfg: config::Postgres, whitelist_mode: bool) -> Result<Self> {
pub fn new(postgres_cfg: &config::Postgres, whitelist_mode: bool) -> Result<Self> {
Ok(Self {
pg_conn: PgPool::new(postgres_cfg, whitelist_mode)?,
})

View File

@ -3,6 +3,7 @@ use std::fmt;
pub enum RequestErr {
Unknown,
PgPool(r2d2::Error),
Pg(postgres::Error),
}
impl std::error::Error for RequestErr {}
@ -13,6 +14,7 @@ impl fmt::Display for RequestErr {
let msg = match self {
Unknown => "Encountered an unrecoverable error related to handling a request".into(),
PgPool(e) => format!("{}", e),
Pg(e) => format!("{}", e),
};
write!(f, "{}", msg)
}
@ -23,3 +25,8 @@ impl From<r2d2::Error> for RequestErr {
Self::PgPool(e)
}
}
impl From<postgres::Error> for RequestErr {
fn from(e: postgres::Error) -> Self {
Self::Pg(e)
}
}

View File

@ -19,7 +19,7 @@ type Result<T> = std::result::Result<T, err::RequestErr>;
type Rejectable<T> = std::result::Result<T, warp::Rejection>;
impl PgPool {
pub fn new(pg_cfg: config::Postgres, whitelist_mode: bool) -> Result<Self> {
pub fn new(pg_cfg: &config::Postgres, whitelist_mode: bool) -> Result<Self> {
let mut cfg = postgres::Config::new();
cfg.user(&pg_cfg.user)
.host(&*pg_cfg.host.to_string())
@ -29,6 +29,8 @@ impl PgPool {
cfg.password(password);
};
cfg.connect(postgres::NoTls)?; // Test connection, letting us immediately exit with an error
// when Postgres isn't running instead of timing out below
let manager = PostgresConnectionManager::new(cfg, postgres::NoTls);
let pool = r2d2::Pool::builder().max_size(10).build(manager)?;

View File

@ -12,7 +12,7 @@ pub enum RedisCmd {
}
impl RedisCmd {
pub fn into_sendable(&self, tl: &String) -> (Vec<u8>, Vec<u8>) {
pub fn into_sendable(self, tl: &str) -> (Vec<u8>, Vec<u8>) {
match self {
RedisCmd::Subscribe => (
format!("*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl).into_bytes(),

View File

@ -28,7 +28,7 @@ pub struct RedisConn {
}
impl RedisConn {
pub fn new(redis_cfg: Redis) -> Result<Self> {
pub fn new(redis_cfg: &Redis) -> Result<Self> {
let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port);
let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?;
conn.set_nonblocking(true)
@ -49,7 +49,7 @@ impl RedisConn {
pub fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, ManagerErr> {
let mut size = 100; // large enough to handle subscribe/unsubscribe notice
let (mut buffer, mut first_read) = (vec![0u8; size], true);
let (mut buffer, mut first_read) = (vec![0_u8; size], true);
loop {
match self.primary.read(&mut buffer) {
Ok(n) if n != size => break self.redis_input.extend_from_slice(&buffer[..n]),

View File

@ -31,7 +31,7 @@ impl Manager {
/// Create a new `Manager`, with its own Redis connections (but, as yet, no
/// active subscriptions).
pub fn try_from(
redis_cfg: config::Redis,
redis_cfg: &config::Redis,
tx: watch::Sender<(Timeline, Event)>,
rx: mpsc::UnboundedReceiver<Timeline>,
) -> Result<Self> {
@ -99,11 +99,10 @@ impl Manager {
self.tx.broadcast((Timeline::empty(), Event::Ping))?
} else {
match self.redis_connection.poll_redis() {
Ok(Async::NotReady) => (),
Ok(Async::NotReady) | Ok(Async::Ready(None)) => (), // None = cmd or msg for other namespace
Ok(Async::Ready(Some((timeline, event)))) => {
self.tx.broadcast((timeline, event))?
}
Ok(Async::Ready(None)) => (), // subscription cmd or msg for other namespace
Err(err) => log::error!("{}", err), // drop msg, log err, and proceed
}
}

View File

@ -39,9 +39,11 @@ impl Ws {
.map_err(|_| -> warp::Error { unreachable!() })
.forward(transmit_to_ws)
.map(|_r| ())
.map_err(|e| match e.to_string().as_ref() {
"IO error: Broken pipe (os error 32)" => (), // just closed unix socket
_ => log::warn!("WebSocket send error: {}", e),
.map_err(|e| {
match e.to_string().as_ref() {
"IO error: Broken pipe (os error 32)" => (), // just closed unix socket
_ => log::warn!("WebSocket send error: {}", e),
}
}),
);
@ -50,7 +52,7 @@ impl Ws {
incoming_events.for_each(move |(tl, event)| {
if matches!(event, Event::Ping) {
self.send_msg(event)
self.send_msg(&event)
} else if target_timeline == tl {
use crate::event::{CheckedEvent::Update, Event::*, EventKind};
use crate::request::Stream::Public;
@ -61,18 +63,18 @@ impl Ws {
TypeSafe(Update { payload, queued_at }) => match tl {
Timeline(Public, _, _) if payload.language_not(allowed_langs) => Ok(()),
_ if payload.involves_any(&blocks) => Ok(()),
_ => self.send_msg(TypeSafe(Update { payload, queued_at })),
_ => self.send_msg(&TypeSafe(Update { payload, queued_at })),
},
TypeSafe(non_update) => self.send_msg(TypeSafe(non_update)),
TypeSafe(non_update) => self.send_msg(&TypeSafe(non_update)),
Dynamic(dyn_event) => {
if let EventKind::Update(s) = dyn_event.kind.clone() {
match tl {
Timeline(Public, _, _) if s.language_not(allowed_langs) => Ok(()),
_ if s.involves_any(&blocks) => Ok(()),
_ => self.send_msg(Dynamic(dyn_event)),
_ => self.send_msg(&Dynamic(dyn_event)),
}
} else {
self.send_msg(Dynamic(dyn_event))
self.send_msg(&Dynamic(dyn_event))
}
}
Ping => unreachable!(), // handled pings above
@ -83,17 +85,14 @@ impl Ws {
})
}
fn send_msg(&mut self, event: Event) -> Result<(), ()> {
fn send_msg(&mut self, event: &Event) -> Result<(), ()> {
let txt = &event.to_json_string();
let tl = self.subscription.timeline;
match self.ws_tx.clone().ok_or(())?.try_send(Message::text(txt)) {
Ok(_) => Ok(()),
Err(_) => {
self.unsubscribe_tx
.try_send(tl)
.unwrap_or_else(|e| log::error!("could not unsubscribe from channel: {}", e));
Err(())
}
}
let mut channel = self.ws_tx.clone().ok_or(())?;
channel.try_send(Message::text(txt)).map_err(|_| {
self.unsubscribe_tx
.try_send(tl)
.unwrap_or_else(|e| log::error!("could not unsubscribe from channel: {}", e));
})
}
}