Adjust module privacy

This commit is contained in:
Daniel Sockwell 2020-04-17 21:56:31 -04:00
parent 4a456c6e90
commit 0e21b189bd
21 changed files with 130 additions and 131 deletions

View File

@ -4,7 +4,7 @@ pub(crate) use redis_cfg::Redis;
use deployment_cfg::Deployment; use deployment_cfg::Deployment;
use self::environmental_variables::EnvVar; use self::environmental_variables::EnvVar;
use super::err::FatalErr; use super::err::Error;
use hashbrown::HashMap; use hashbrown::HashMap;
use std::env; use std::env;
@ -16,13 +16,13 @@ mod postgres_cfg_types;
mod redis_cfg; mod redis_cfg;
mod redis_cfg_types; mod redis_cfg_types;
type Result<T> = std::result::Result<T, FatalErr>; type Result<T> = std::result::Result<T, Error>;
pub fn merge_dotenv() -> Result<()> { pub fn merge_dotenv() -> Result<()> {
let env_file = match env::var("ENV").ok().as_deref() { let env_file = match env::var("ENV").ok().as_deref() {
Some("production") => ".env.production", Some("production") => ".env.production",
Some("development") | None => ".env", Some("development") | None => ".env",
Some(v) => Err(FatalErr::config("ENV", v, "`production` or `development`"))?, Some(v) => Err(Error::config("ENV", v, "`production` or `development`"))?,
}; };
let res = dotenv::from_filename(env_file); let res = dotenv::from_filename(env_file);

View File

@ -1,5 +1,5 @@
use super::{deployment_cfg_types::*, EnvVar}; use super::{deployment_cfg_types::*, EnvVar};
use crate::err::FatalErr; use crate::err::Error;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct Deployment<'a> { pub struct Deployment<'a> {
@ -13,7 +13,7 @@ pub struct Deployment<'a> {
} }
impl Deployment<'_> { impl Deployment<'_> {
pub(crate) fn from_env(env: &EnvVar) -> Result<Self, FatalErr> { pub(crate) fn from_env(env: &EnvVar) -> Result<Self, Error> {
let mut cfg = Self { let mut cfg = Self {
env: Env::default().maybe_update(env.get("NODE_ENV"))?, env: Env::default().maybe_update(env.get("NODE_ENV"))?,
log_level: LogLevel::default().maybe_update(env.get("RUST_LOG"))?, log_level: LogLevel::default().maybe_update(env.get("RUST_LOG"))?,

View File

@ -109,11 +109,11 @@ macro_rules! from_env_var {
pub(crate) fn maybe_update( pub(crate) fn maybe_update(
self, self,
var: Option<&String>, var: Option<&String>,
) -> Result<Self, crate::err::FatalErr> { ) -> Result<Self, crate::err::Error> {
Ok(match var { Ok(match var {
Some(empty_string) if empty_string.is_empty() => Self::default(), Some(empty_string) if empty_string.is_empty() => Self::default(),
Some(value) => Self(Self::inner_from_str(value).ok_or_else(|| { Some(value) => Self(Self::inner_from_str(value).ok_or_else(|| {
crate::err::FatalErr::config($env_var, value, $allowed_values) crate::err::Error::config($env_var, value, $allowed_values)
})?), })?),
None => self, None => self,
}) })

View File

@ -1,10 +1,10 @@
use super::{postgres_cfg_types::*, EnvVar}; use super::{postgres_cfg_types::*, EnvVar};
use crate::err::FatalErr; use crate::err::Error;
use url::Url; use url::Url;
use urlencoding; use urlencoding;
type Result<T> = std::result::Result<T, FatalErr>; type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Postgres { pub struct Postgres {
@ -27,7 +27,7 @@ impl EnvVar {
"password" => self.maybe_add_env_var("DB_PASS", Some(v.to_string())), "password" => self.maybe_add_env_var("DB_PASS", Some(v.to_string())),
"host" => self.maybe_add_env_var("DB_HOST", Some(v.to_string())), "host" => self.maybe_add_env_var("DB_HOST", Some(v.to_string())),
"sslmode" => self.maybe_add_env_var("DB_SSLMODE", Some(v.to_string())), "sslmode" => self.maybe_add_env_var("DB_SSLMODE", Some(v.to_string())),
_ => Err(FatalErr::config( _ => Err(Error::config(
"POSTGRES_URL", "POSTGRES_URL",
&k, &k,
"a URL with parameters `password`, `user`, `host`, and `sslmode` only", "a URL with parameters `password`, `user`, `host`, and `sslmode` only",

View File

@ -1,10 +1,10 @@
use super::redis_cfg_types::*; use super::redis_cfg_types::*;
use super::EnvVar; use super::EnvVar;
use crate::err::FatalErr; use crate::err::Error;
use url::Url; use url::Url;
type Result<T> = std::result::Result<T, FatalErr>; type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct Redis { pub struct Redis {
@ -33,7 +33,7 @@ impl EnvVar {
match k.to_string().as_str() { match k.to_string().as_str() {
"password" => self.maybe_add_env_var("REDIS_PASSWORD", Some(v.to_string())), "password" => self.maybe_add_env_var("REDIS_PASSWORD", Some(v.to_string())),
"db" => self.maybe_add_env_var("REDIS_DB", Some(v.to_string())), "db" => self.maybe_add_env_var("REDIS_DB", Some(v.to_string())),
_ => Err(FatalErr::config( _ => Err(Error::config(
"REDIS_URL", "REDIS_URL",
&k, &k,
"a URL with parameters `password`, `db`, only", "a URL with parameters `password`, `db`, only",

View File

@ -1,4 +1,4 @@
use crate::from_env_var; use crate::from_env_var; //macro
use std::time::Duration; use std::time::Duration;
//use std::{fmt, net::IpAddr, os::unix::net::UnixListener, str::FromStr, time::Duration}; //use std::{fmt, net::IpAddr, os::unix::net::UnixListener, str::FromStr, time::Duration};
//use strum_macros::{EnumString, EnumVariantNames}; //use strum_macros::{EnumString, EnumVariantNames};

View File

@ -1,11 +1,11 @@
use crate::request::RequestErr; use crate::request;
use crate::response::ManagerErr; use crate::response;
use std::fmt; use std::fmt;
pub enum FatalErr { pub enum Error {
ReceiverErr(ManagerErr), ReceiverErr(response::Error),
Logger(log::SetLoggerError), Logger(log::SetLoggerError),
Postgres(RequestErr), Postgres(request::Error),
Unrecoverable, Unrecoverable,
StdIo(std::io::Error), StdIo(std::io::Error),
// config errs // config errs
@ -14,7 +14,7 @@ pub enum FatalErr {
ConfigErr(String), ConfigErr(String),
} }
impl FatalErr { impl Error {
pub fn log(msg: impl fmt::Display) { pub fn log(msg: impl fmt::Display) {
eprintln!("{}", msg); eprintln!("{}", msg);
} }
@ -27,16 +27,16 @@ impl FatalErr {
} }
} }
impl std::error::Error for FatalErr {} impl std::error::Error for Error {}
impl fmt::Debug for FatalErr { impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{}", self) write!(f, "{}", self)
} }
} }
impl fmt::Display for FatalErr { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use FatalErr::*; use Error::*;
write!( write!(
f, f,
"{}", "{}",
@ -54,33 +54,33 @@ impl fmt::Display for FatalErr {
} }
} }
impl From<RequestErr> for FatalErr { impl From<request::Error> for Error {
fn from(e: RequestErr) -> Self { fn from(e: request::Error) -> Self {
Self::Postgres(e) Self::Postgres(e)
} }
} }
impl From<ManagerErr> for FatalErr { impl From<response::Error> for Error {
fn from(e: ManagerErr) -> Self { fn from(e: response::Error) -> Self {
Self::ReceiverErr(e) Self::ReceiverErr(e)
} }
} }
impl From<urlencoding::FromUrlEncodingError> for FatalErr { impl From<urlencoding::FromUrlEncodingError> for Error {
fn from(e: urlencoding::FromUrlEncodingError) -> Self { fn from(e: urlencoding::FromUrlEncodingError) -> Self {
Self::UrlEncoding(e) Self::UrlEncoding(e)
} }
} }
impl From<url::ParseError> for FatalErr { impl From<url::ParseError> for Error {
fn from(e: url::ParseError) -> Self { fn from(e: url::ParseError) -> Self {
Self::UrlParse(e) Self::UrlParse(e)
} }
} }
impl From<std::io::Error> for FatalErr { impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self { fn from(e: std::io::Error) -> Self {
Self::StdIo(e) Self::StdIo(e)
} }
} }
impl From<log::SetLoggerError> for FatalErr { impl From<log::SetLoggerError> for Error {
fn from(e: log::SetLoggerError) -> Self { fn from(e: log::SetLoggerError) -> Self {
Self::Logger(e) Self::Logger(e)
} }

View File

@ -39,8 +39,10 @@
#![allow(clippy::try_err, clippy::match_bool)] #![allow(clippy::try_err, clippy::match_bool)]
//#![allow(clippy::large_enum_variant)] //#![allow(clippy::large_enum_variant)]
pub use err::Error;
pub mod config; pub mod config;
pub mod err; mod err;
pub mod event; pub mod event;
pub mod request; pub mod request;
pub mod response; pub mod response;

View File

@ -1,9 +1,9 @@
use flodgatt::config; use flodgatt::config;
use flodgatt::err::FatalErr;
use flodgatt::event::Event; use flodgatt::event::Event;
use flodgatt::request::{Handler, Subscription, Timeline}; use flodgatt::request::{Handler, Subscription, Timeline};
use flodgatt::response::redis; use flodgatt::response::redis::Manager;
use flodgatt::response::stream; use flodgatt::response::stream;
use flodgatt::Error;
use futures::{future::lazy, stream::Stream as _}; use futures::{future::lazy, stream::Stream as _};
use std::fs; use std::fs;
@ -16,7 +16,7 @@ use tokio::timer::Interval;
use warp::ws::Ws2; use warp::ws::Ws2;
use warp::Filter; use warp::Filter;
fn main() -> Result<(), FatalErr> { fn main() -> Result<(), Error> {
config::merge_dotenv()?; config::merge_dotenv()?;
pretty_env_logger::try_init()?; pretty_env_logger::try_init()?;
let (postgres_cfg, redis_cfg, cfg) = config::from_env(dotenv::vars().collect())?; let (postgres_cfg, redis_cfg, cfg) = config::from_env(dotenv::vars().collect())?;
@ -27,7 +27,7 @@ fn main() -> Result<(), FatalErr> {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let request = Handler::new(&postgres_cfg, *cfg.whitelist_mode)?; let request = Handler::new(&postgres_cfg, *cfg.whitelist_mode)?;
let shared_manager = redis::Manager::try_from(&redis_cfg, event_tx, cmd_rx)?.into_arc(); let shared_manager = Manager::try_from(&redis_cfg, event_tx, cmd_rx)?.into_arc();
// Server Sent Events // Server Sent Events
let sse_manager = shared_manager.clone(); let sse_manager = shared_manager.clone();
@ -37,7 +37,7 @@ fn main() -> Result<(), FatalErr> {
.and(warp::sse()) .and(warp::sse())
.map(move |subscription: Subscription, sse: warp::sse::Sse| { .map(move |subscription: Subscription, sse: warp::sse::Sse| {
log::info!("Incoming SSE request for {:?}", subscription.timeline); log::info!("Incoming SSE request for {:?}", subscription.timeline);
let mut manager = sse_manager.lock().unwrap_or_else(redis::Manager::recover); let mut manager = sse_manager.lock().unwrap_or_else(Manager::recover);
manager.subscribe(&subscription); manager.subscribe(&subscription);
stream::Sse::send_events(sse, sse_cmd_tx.clone(), subscription, sse_rx.clone()) stream::Sse::send_events(sse, sse_cmd_tx.clone(), subscription, sse_rx.clone())
@ -51,7 +51,7 @@ fn main() -> Result<(), FatalErr> {
.and(warp::ws::ws2()) .and(warp::ws::ws2())
.map(move |subscription: Subscription, ws: Ws2| { .map(move |subscription: Subscription, ws: Ws2| {
log::info!("Incoming websocket request for {:?}", subscription.timeline); log::info!("Incoming websocket request for {:?}", subscription.timeline);
let mut manager = ws_manager.lock().unwrap_or_else(redis::Manager::recover); let mut manager = ws_manager.lock().unwrap_or_else(Manager::recover);
manager.subscribe(&subscription); manager.subscribe(&subscription);
let token = subscription.access_token.clone().unwrap_or_default(); // token sent for security let token = subscription.access_token.clone().unwrap_or_default(); // token sent for security
let ws_stream = stream::Ws::new(cmd_tx.clone(), event_rx.clone(), subscription); let ws_stream = stream::Ws::new(cmd_tx.clone(), event_rx.clone(), subscription);
@ -78,22 +78,14 @@ fn main() -> Result<(), FatalErr> {
.allow_methods(cfg.cors.allowed_methods) .allow_methods(cfg.cors.allowed_methods)
.allow_headers(cfg.cors.allowed_headers); .allow_headers(cfg.cors.allowed_headers);
// use futures::future::Future;
let streaming_server = move || { let streaming_server = move || {
let manager = shared_manager.clone(); let manager = shared_manager.clone();
let stream = Interval::new(Instant::now(), poll_freq) let stream = Interval::new(Instant::now(), poll_freq)
// .take(1200)
.map_err(|e| log::error!("{}", e)) .map_err(|e| log::error!("{}", e))
.for_each( .for_each(move |_| {
move |_| { let mut manager = manager.lock().unwrap_or_else(Manager::recover);
let mut manager = manager.lock().unwrap_or_else(redis::Manager::recover); manager.poll_broadcast().map_err(Error::log)
manager.poll_broadcast().map_err(FatalErr::log) });
}, // ).and_then(|_| {
// log::info!("shutting down!");
// std::process::exit(0);
// futures::future::ok(())
// }
);
warp::spawn(lazy(move || stream)); warp::spawn(lazy(move || stream));
warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err)) warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err))
@ -109,5 +101,5 @@ fn main() -> Result<(), FatalErr> {
let server_addr = SocketAddr::new(*cfg.address, *cfg.port); let server_addr = SocketAddr::new(*cfg.address, *cfg.port);
tokio::run(lazy(move || streaming_server().bind(server_addr))); tokio::run(lazy(move || streaming_server().bind(server_addr)));
} }
Err(FatalErr::Unrecoverable) // on get here if there's an unrecoverable error in poll_broadcast. Err(Error::Unrecoverable) // only get here if there's an unrecoverable error in poll_broadcast.
} }

View File

@ -1,21 +1,22 @@
//! Parse the client request and return a Subscription //! Parse the client request and return a Subscription
mod postgres; mod postgres;
mod query; mod query;
pub mod timeline; mod timeline;
mod err; pub mod err;
mod subscription; mod subscription;
pub(crate) use self::err::RequestErr;
pub(crate) use self::postgres::PgPool; pub(crate) use self::postgres::PgPool;
pub(crate) use err::Error;
pub(crate) use subscription::Blocks; pub(crate) use subscription::Blocks;
pub use subscription::Subscription; pub use subscription::Subscription;
pub(crate) use timeline::Stream;
pub use timeline::Timeline; pub use timeline::Timeline;
pub(crate) use timeline::{Content, Reach, Stream, TimelineErr}; use timeline::{Content, Reach};
use self::query::Query; use self::query::Query;
use crate::config; use crate::config::Postgres;
use warp::filters::BoxedFilter; use warp::filters::BoxedFilter;
use warp::http::StatusCode; use warp::http::StatusCode;
use warp::path; use warp::path;
@ -26,7 +27,7 @@ mod sse_test;
#[cfg(test)] #[cfg(test)]
mod ws_test; mod ws_test;
type Result<T> = std::result::Result<T, err::RequestErr>; type Result<T> = std::result::Result<T, err::Error>;
/// Helper macro to match on the first of any of the provided filters /// Helper macro to match on the first of any of the provided filters
macro_rules! any_of { macro_rules! any_of {
@ -62,7 +63,7 @@ pub struct Handler {
} }
impl Handler { impl Handler {
pub fn new(postgres_cfg: &config::Postgres, whitelist_mode: bool) -> Result<Self> { pub fn new(postgres_cfg: &Postgres, whitelist_mode: bool) -> Result<Self> {
Ok(Self { Ok(Self {
pg_conn: PgPool::new(postgres_cfg, whitelist_mode)?, pg_conn: PgPool::new(postgres_cfg, whitelist_mode)?,
}) })

View File

@ -1,15 +1,15 @@
use std::fmt; use std::fmt;
#[derive(Debug)] #[derive(Debug)]
pub enum RequestErr { pub enum Error {
PgPool(r2d2::Error), PgPool(r2d2::Error),
Pg(postgres::Error), Pg(postgres::Error),
} }
impl std::error::Error for RequestErr {} impl std::error::Error for Error {}
impl fmt::Display for RequestErr { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use RequestErr::*; use Error::*;
let msg = match self { let msg = match self {
PgPool(e) => format!("{}", e), PgPool(e) => format!("{}", e),
Pg(e) => format!("{}", e), Pg(e) => format!("{}", e),
@ -18,13 +18,40 @@ impl fmt::Display for RequestErr {
} }
} }
impl From<r2d2::Error> for RequestErr { impl From<r2d2::Error> for Error {
fn from(e: r2d2::Error) -> Self { fn from(e: r2d2::Error) -> Self {
Self::PgPool(e) Self::PgPool(e)
} }
} }
impl From<postgres::Error> for RequestErr { impl From<postgres::Error> for Error {
fn from(e: postgres::Error) -> Self { fn from(e: postgres::Error) -> Self {
Self::Pg(e) Self::Pg(e)
} }
} }
// TODO make Timeline & TimelineErr their own top-level module
#[derive(Debug)]
pub enum Timeline {
MissingHashtag,
InvalidInput,
BadTag,
}
impl std::error::Error for Timeline {}
impl From<std::num::ParseIntError> for Timeline {
fn from(_error: std::num::ParseIntError) -> Self {
Self::InvalidInput
}
}
impl fmt::Display for Timeline {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use Timeline::*;
let msg = match self {
InvalidInput => "The timeline text from Redis could not be parsed into a supported timeline. TODO: add incoming timeline text",
MissingHashtag => "Attempted to send a hashtag timeline without supplying a tag name",
BadTag => "No hashtag exists with the specified hashtag ID"
};
write!(f, "{}", msg)
}
}

View File

@ -15,7 +15,7 @@ pub struct PgPool {
whitelist_mode: bool, whitelist_mode: bool,
} }
type Result<T> = std::result::Result<T, err::RequestErr>; type Result<T> = std::result::Result<T, err::Error>;
type Rejectable<T> = std::result::Result<T, warp::Rejection>; type Rejectable<T> = std::result::Result<T, warp::Rejection>;
impl PgPool { impl PgPool {

View File

@ -1,5 +1,5 @@
pub(crate) use self::err::TimelineErr;
pub(crate) use self::inner::{Content, Reach, Scope, Stream, UserData}; pub(crate) use self::inner::{Content, Reach, Scope, Stream, UserData};
use super::err::Timeline as Error;
use super::query::Query; use super::query::Query;
use lru::LruCache; use lru::LruCache;
@ -8,7 +8,7 @@ use warp::reject::Rejection;
mod err; mod err;
mod inner; mod inner;
type Result<T> = std::result::Result<T, TimelineErr>; type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)] #[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)]
pub struct Timeline(pub(crate) Stream, pub(crate) Reach, pub(crate) Content); pub struct Timeline(pub(crate) Stream, pub(crate) Reach, pub(crate) Content);
@ -20,7 +20,7 @@ impl Timeline {
pub(crate) fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> Result<String> { pub(crate) fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> Result<String> {
// TODO -- does this need to account for namespaces? // TODO -- does this need to account for namespaces?
use {Content::*, Reach::*, Stream::*, TimelineErr::*}; use {Content::*, Error::*, Reach::*, Stream::*};
Ok(match self { Ok(match self {
Timeline(Public, Federated, All) => "timeline:public".to_string(), Timeline(Public, Federated, All) => "timeline:public".to_string(),
@ -42,7 +42,7 @@ impl Timeline {
} }
Timeline(List(id), Federated, All) => ["timeline:list:", &id.to_string()].concat(), Timeline(List(id), Federated, All) => ["timeline:list:", &id.to_string()].concat(),
Timeline(Direct(id), Federated, All) => ["timeline:direct:", &id.to_string()].concat(), Timeline(Direct(id), Federated, All) => ["timeline:direct:", &id.to_string()].concat(),
Timeline(_one, _two, _three) => Err(TimelineErr::InvalidInput)?, Timeline(_one, _two, _three) => Err(Error::InvalidInput)?,
}) })
} }
@ -50,7 +50,7 @@ impl Timeline {
timeline: &str, timeline: &str,
cache: &mut LruCache<String, i64>, cache: &mut LruCache<String, i64>,
) -> Result<Self> { ) -> Result<Self> {
use {Content::*, Reach::*, Stream::*, TimelineErr::*}; use {Content::*, Error::*, Reach::*, Stream::*};
let mut tag_id = |t: &str| cache.get(&t.to_string()).map_or(Err(BadTag), |id| Ok(*id)); let mut tag_id = |t: &str| cache.get(&t.to_string()).map_or(Err(BadTag), |id| Ok(*id));
Ok(match &timeline.split(':').collect::<Vec<&str>>()[..] { Ok(match &timeline.split(':').collect::<Vec<&str>>()[..] {

View File

@ -1,28 +1 @@
use std::fmt;
#[derive(Debug)]
pub enum TimelineErr {
MissingHashtag,
InvalidInput,
BadTag,
}
impl std::error::Error for TimelineErr {}
impl From<std::num::ParseIntError> for TimelineErr {
fn from(_error: std::num::ParseIntError) -> Self {
Self::InvalidInput
}
}
impl fmt::Display for TimelineErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use TimelineErr::*;
let msg = match self {
InvalidInput => "The timeline text from Redis could not be parsed into a supported timeline. TODO: add incoming timeline text",
MissingHashtag => "Attempted to send a hashtag timeline without supplying a tag name",
BadTag => "No hashtag exists with the specified hashtag ID"
};
write!(f, "{}", msg)
}
}

View File

@ -1,4 +1,4 @@
use super::TimelineErr; use super::Error;
use crate::event::Id; use crate::event::Id;
use hashbrown::HashSet; use hashbrown::HashSet;
@ -36,18 +36,18 @@ pub(crate) enum Scope {
} }
impl TryFrom<&str> for Scope { impl TryFrom<&str> for Scope {
type Error = TimelineErr; type Error = Error;
fn try_from(s: &str) -> Result<Self, TimelineErr> { fn try_from(s: &str) -> Result<Self, Error> {
match s { match s {
"read" => Ok(Scope::Read), "read" => Ok(Scope::Read),
"read:statuses" => Ok(Scope::Statuses), "read:statuses" => Ok(Scope::Statuses),
"read:notifications" => Ok(Scope::Notifications), "read:notifications" => Ok(Scope::Notifications),
"read:lists" => Ok(Scope::Lists), "read:lists" => Ok(Scope::Lists),
"write" | "follow" => Err(TimelineErr::InvalidInput), // ignore write scopes "write" | "follow" => Err(Error::InvalidInput), // ignore write scopes
unexpected => { unexpected => {
log::warn!("Ignoring unknown scope `{}`", unexpected); log::warn!("Ignoring unknown scope `{}`", unexpected);
Err(TimelineErr::InvalidInput) Err(Error::InvalidInput)
} }
} }
} }

View File

@ -3,7 +3,7 @@
pub mod redis; pub mod redis;
pub mod stream; pub mod stream;
pub(crate) use redis::ManagerErr; pub(crate) use redis::Error;
#[cfg(feature = "bench")] #[cfg(feature = "bench")]
pub use redis::msg::{RedisMsg, RedisParseOutput}; pub use redis::msg::{RedisMsg, RedisParseOutput};

View File

@ -2,18 +2,20 @@ mod connection;
mod manager; mod manager;
mod msg; mod msg;
pub(crate) use connection::{RedisConn, RedisConnErr}; pub(self) use connection::RedisConn;
pub(crate) use manager::Error;
pub use manager::Manager; pub use manager::Manager;
pub(crate) use manager::ManagerErr;
pub(crate) use msg::RedisParseErr;
pub(crate) enum RedisCmd { use connection::RedisConnErr;
use msg::RedisParseErr;
enum RedisCmd {
Subscribe, Subscribe,
Unsubscribe, Unsubscribe,
} }
impl RedisCmd { impl RedisCmd {
pub(crate) fn into_sendable(self, tl: &str) -> (Vec<u8>, Vec<u8>) { fn into_sendable(self, tl: &str) -> (Vec<u8>, Vec<u8>) {
match self { match self {
RedisCmd::Subscribe => ( RedisCmd::Subscribe => (
[ [

View File

@ -2,7 +2,8 @@ mod err;
pub(crate) use err::RedisConnErr; pub(crate) use err::RedisConnErr;
use super::msg::{RedisParseErr, RedisParseOutput}; use super::msg::{RedisParseErr, RedisParseOutput};
use super::{ManagerErr, RedisCmd}; use super::Error as ManagerErr;
use super::RedisCmd;
use crate::config::Redis; use crate::config::Redis;
use crate::event::Event; use crate::event::Event;
use crate::request::{Stream, Timeline}; use crate::request::{Stream, Timeline};
@ -18,7 +19,7 @@ use std::time::Duration;
type Result<T> = std::result::Result<T, RedisConnErr>; type Result<T> = std::result::Result<T, RedisConnErr>;
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct RedisConn { pub(super) struct RedisConn {
primary: TcpStream, primary: TcpStream,
secondary: TcpStream, secondary: TcpStream,
redis_namespace: Option<String>, redis_namespace: Option<String>,
@ -29,7 +30,7 @@ pub(crate) struct RedisConn {
} }
impl RedisConn { impl RedisConn {
pub(crate) fn new(redis_cfg: &Redis) -> Result<Self> { pub(super) fn new(redis_cfg: &Redis) -> Result<Self> {
let addr = [&*redis_cfg.host, ":", &*redis_cfg.port.to_string()].concat(); let addr = [&*redis_cfg.host, ":", &*redis_cfg.port.to_string()].concat();
let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?; let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?;
@ -50,7 +51,7 @@ impl RedisConn {
Ok(redis_conn) Ok(redis_conn)
} }
pub(crate) fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, ManagerErr> { pub(super) fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, ManagerErr> {
loop { loop {
match self.primary.read(&mut self.redis_input[self.cursor..]) { match self.primary.read(&mut self.redis_input[self.cursor..]) {
Ok(n) => { Ok(n) => {
@ -111,7 +112,7 @@ impl RedisConn {
res res
} }
pub(crate) fn update_cache(&mut self, hashtag: String, id: i64) { pub(super) fn update_cache(&mut self, hashtag: String, id: i64) {
self.tag_id_cache.put(hashtag.clone(), id); self.tag_id_cache.put(hashtag.clone(), id);
self.tag_name_cache.put(id, hashtag); self.tag_name_cache.put(id, hashtag);
} }

View File

@ -1,4 +1,4 @@
use crate::request::TimelineErr; use crate::request;
use std::fmt; use std::fmt;
#[derive(Debug)] #[derive(Debug)]
@ -9,11 +9,11 @@ pub enum RedisConnErr {
IncorrectPassword(String), IncorrectPassword(String),
MissingPassword, MissingPassword,
NotRedis(String), NotRedis(String),
TimelineErr(TimelineErr), TimelineErr(request::err::Timeline),
} }
impl RedisConnErr { impl RedisConnErr {
pub(crate) fn with_addr<T: AsRef<str>>(address: T, inner: std::io::Error) -> Self { pub(super) fn with_addr<T: AsRef<str>>(address: T, inner: std::io::Error) -> Self {
Self::ConnectionErr { Self::ConnectionErr {
addr: address.as_ref().to_string(), addr: address.as_ref().to_string(),
inner, inner,
@ -57,8 +57,8 @@ impl fmt::Display for RedisConnErr {
} }
} }
impl From<TimelineErr> for RedisConnErr { impl From<request::err::Timeline> for RedisConnErr {
fn from(e: TimelineErr) -> RedisConnErr { fn from(e: request::err::Timeline) -> RedisConnErr {
RedisConnErr::TimelineErr(e) RedisConnErr::TimelineErr(e)
} }
} }

View File

@ -2,7 +2,7 @@
//! polled by the correct `ClientAgent`. Also manages sububscriptions and //! polled by the correct `ClientAgent`. Also manages sububscriptions and
//! unsubscriptions to/from Redis. //! unsubscriptions to/from Redis.
mod err; mod err;
pub(crate) use err::ManagerErr; pub(crate) use err::Error;
use super::{RedisCmd, RedisConn}; use super::{RedisCmd, RedisConn};
use crate::config; use crate::config;
@ -15,7 +15,7 @@ use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
type Result<T> = std::result::Result<T, ManagerErr>; type Result<T> = std::result::Result<T, Error>;
/// The item that streams from Redis and is polled by the `ClientAgent` /// The item that streams from Redis and is polled by the `ClientAgent`
#[derive(Debug)] #[derive(Debug)]

View File

@ -1,10 +1,11 @@
use super::super::{RedisConnErr, RedisParseErr}; use super::super::{RedisConnErr, RedisParseErr};
use crate::event::{Event, EventErr}; use crate::event::{Event, EventErr};
use crate::request::{Timeline, TimelineErr}; use crate::request::err::Timeline as TimelineErr;
use crate::request::Timeline;
use std::fmt; use std::fmt;
#[derive(Debug)] #[derive(Debug)]
pub enum ManagerErr { pub enum Error {
InvalidId, InvalidId,
TimelineErr(TimelineErr), TimelineErr(TimelineErr),
EventErr(EventErr), EventErr(EventErr),
@ -13,11 +14,11 @@ pub enum ManagerErr {
ChannelSendErr(tokio::sync::watch::error::SendError<(Timeline, Event)>), ChannelSendErr(tokio::sync::watch::error::SendError<(Timeline, Event)>),
} }
impl std::error::Error for ManagerErr {} impl std::error::Error for Error {}
impl fmt::Display for ManagerErr { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use ManagerErr::*; use Error::*;
match self { match self {
InvalidId => write!( InvalidId => write!(
f, f,
@ -33,31 +34,31 @@ impl fmt::Display for ManagerErr {
} }
} }
impl From<tokio::sync::watch::error::SendError<(Timeline, Event)>> for ManagerErr { impl From<tokio::sync::watch::error::SendError<(Timeline, Event)>> for Error {
fn from(error: tokio::sync::watch::error::SendError<(Timeline, Event)>) -> Self { fn from(error: tokio::sync::watch::error::SendError<(Timeline, Event)>) -> Self {
Self::ChannelSendErr(error) Self::ChannelSendErr(error)
} }
} }
impl From<EventErr> for ManagerErr { impl From<EventErr> for Error {
fn from(error: EventErr) -> Self { fn from(error: EventErr) -> Self {
Self::EventErr(error) Self::EventErr(error)
} }
} }
impl From<RedisConnErr> for ManagerErr { impl From<RedisConnErr> for Error {
fn from(e: RedisConnErr) -> Self { fn from(e: RedisConnErr) -> Self {
Self::RedisConnErr(e) Self::RedisConnErr(e)
} }
} }
impl From<TimelineErr> for ManagerErr { impl From<TimelineErr> for Error {
fn from(e: TimelineErr) -> Self { fn from(e: TimelineErr) -> Self {
Self::TimelineErr(e) Self::TimelineErr(e)
} }
} }
impl From<RedisParseErr> for ManagerErr { impl From<RedisParseErr> for Error {
fn from(e: RedisParseErr) -> Self { fn from(e: RedisParseErr) -> Self {
Self::RedisParseErr(e) Self::RedisParseErr(e)
} }