Initial work to support structured errors

This commit is contained in:
Daniel Sockwell 2020-03-31 12:57:46 -04:00
parent 81b454c88c
commit fa73089f59
11 changed files with 235 additions and 184 deletions

View File

@ -1,75 +0,0 @@
use std::{error::Error, fmt};
pub fn die_with_msg(msg: impl fmt::Display) -> ! {
eprintln!("FATAL ERROR: {}", msg);
std::process::exit(1);
}
#[macro_export]
macro_rules! log_fatal {
($str:expr, $var:expr) => {{
log::error!($str, $var);
panic!();
};};
}
#[derive(Debug)]
pub enum RedisParseErr {
Incomplete,
InvalidNumber(std::num::ParseIntError),
NonNumericInput,
InvalidLineStart(String),
InvalidLineEnd,
IncorrectRedisType,
MissingField,
UnsupportedTimeline,
UnsupportedEvent(serde_json::Error),
}
impl fmt::Display for RedisParseErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{}", match self {
Self::Incomplete => "The input from Redis does not form a complete message, likely because the input buffer filled partway through a message. Save this input and try again with additional input from Redis.".to_string(),
Self::InvalidNumber(e) => format!( "Redis input cannot be parsed: {}", e),
Self::NonNumericInput => "Received non-numeric input when expecting a Redis number".to_string(),
Self::InvalidLineStart(s) => format!("Got `{}` as a line start from Redis", s),
Self::InvalidLineEnd => "Redis input ended before promised length".to_string(),
Self::IncorrectRedisType => "Received a non-array when expecting a Redis array".to_string(),
Self::MissingField => "Redis input was missing a required field".to_string(),
Self::UnsupportedTimeline => "The raw timeline received from Redis could not be parsed into a supported timeline".to_string(),
Self::UnsupportedEvent(e) => format!("The event text from Redis could not be parsed into a valid event: {}", e)
})
}
}
impl Error for RedisParseErr {}
impl From<std::num::ParseIntError> for RedisParseErr {
fn from(error: std::num::ParseIntError) -> Self {
Self::InvalidNumber(error)
}
}
impl From<serde_json::Error> for RedisParseErr {
fn from(error: serde_json::Error) -> Self {
Self::UnsupportedEvent(error)
}
}
impl From<TimelineErr> for RedisParseErr {
fn from(_: TimelineErr) -> Self {
Self::UnsupportedTimeline
}
}
#[derive(Debug)]
pub enum TimelineErr {
RedisNamespaceMismatch,
InvalidInput,
}
impl From<std::num::ParseIntError> for TimelineErr {
fn from(_error: std::num::ParseIntError) -> Self {
Self::InvalidInput
}
}

18
src/err/mod.rs Normal file
View File

@ -0,0 +1,18 @@
mod timeline;
pub use timeline::TimelineErr;
use std::fmt;
pub fn die_with_msg(msg: impl fmt::Display) -> ! {
eprintln!("FATAL ERROR: {}", msg);
std::process::exit(1);
}
#[macro_export]
macro_rules! log_fatal {
($str:expr, $var:expr) => {{
log::error!($str, $var);
panic!();
};};
}

13
src/err/timeline.rs Normal file
View File

@ -0,0 +1,13 @@
//use std::{error::Error, fmt};
#[derive(Debug)]
pub enum TimelineErr {
RedisNamespaceMismatch,
InvalidInput,
}
impl From<std::num::ParseIntError> for TimelineErr {
fn from(_error: std::num::ParseIntError) -> Self {
Self::InvalidInput
}
}

View File

@ -15,10 +15,9 @@
//! Because `StreamManagers` are lightweight data structures that do not directly
//! communicate with Redis, it we create a new `ClientAgent` for
//! each new client connection (each in its own thread).use super::{message::Message, receiver::Receiver}
use super::receiver::Receiver;
use super::{receiver::Receiver, redis::RedisConnErr};
use crate::{
config,
err::RedisParseErr,
messages::Event,
parse_client_request::{Stream::Public, Subscription, Timeline},
};
@ -82,7 +81,7 @@ impl ClientAgent {
/// The stream that the `ClientAgent` manages. `Poll` is the only method implemented.
impl futures::stream::Stream for ClientAgent {
type Item = Event;
type Error = RedisParseErr;
type Error = RedisConnErr;
/// Checks for any new messages that should be sent to the client.
///
@ -98,8 +97,7 @@ impl futures::stream::Stream for ClientAgent {
.receiver
.lock()
.expect("ClientAgent: No other thread panic");
receiver.configure_for_polling(self.id, self.subscription.timeline);
receiver.poll()
receiver.poll_for(self.id, self.subscription.timeline)
};
let allowed_langs = &self.subscription.allowed_langs;
@ -107,6 +105,7 @@ impl futures::stream::Stream for ClientAgent {
let blocking_users = &self.subscription.blocks.blocking_users;
let blocked_domains = &self.subscription.blocks.blocked_domains;
let (send, block) = (|msg| Ok(Ready(Some(msg))), Ok(NotReady));
use Event::*;
match result {
Ok(Async::Ready(Some(event))) => match event {

View File

@ -1,5 +1,6 @@
use crate::messages::Event;
use crate::parse_client_request::Timeline;
use std::{
collections::{HashMap, VecDeque},
fmt,
@ -13,22 +14,6 @@ pub struct MsgQueue {
pub messages: VecDeque<Event>,
last_polled_at: Instant,
}
impl fmt::Debug for MsgQueue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"\
MsgQueue {{
timeline: {:?},
messages: {:?},
last_polled_at: {:?},
}}",
self.timeline,
self.messages,
self.last_polled_at.elapsed(),
)
}
}
impl MsgQueue {
pub fn new(timeline: Timeline) -> Self {
@ -38,26 +23,15 @@ impl MsgQueue {
timeline,
}
}
pub fn update_polled_at_time(&mut self) {
self.last_polled_at = Instant::now();
}
}
#[derive(Debug)]
pub struct MessageQueues(pub HashMap<Uuid, MsgQueue>);
impl MessageQueues {
pub fn update_time_for_target_queue(&mut self, id: Uuid) {
self.entry(id)
.and_modify(|queue| queue.last_polled_at = Instant::now());
}
pub fn oldest_msg_in_target_queue(&mut self, id: Uuid, timeline: Timeline) -> Option<Event> {
let msg_qs_entry = self.entry(id);
let mut inserted_tl = false;
let msg_q = msg_qs_entry.or_insert_with(|| {
inserted_tl = true;
MsgQueue::new(timeline)
});
msg_q.messages.pop_front()
}
pub fn calculate_timelines_to_add_or_drop(&mut self, timeline: Timeline) -> Vec<Change> {
let mut timelines_to_modify = Vec::new();
@ -85,6 +59,23 @@ pub struct Change {
pub in_subscriber_number: i32,
}
impl fmt::Debug for MsgQueue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"\
MsgQueue {{
timeline: {:?},
messages: {:?},
last_polled_at: {:?} ago,
}}",
self.timeline,
self.messages,
self.last_polled_at.elapsed(),
)
}
}
impl std::ops::Deref for MessageQueues {
type Target = HashMap<Uuid, MsgQueue>;
fn deref(&self) -> &Self::Target {

View File

@ -5,24 +5,23 @@ mod message_queues;
pub use message_queues::{MessageQueues, MsgQueue};
use super::redis::{RedisConn, RedisConnErr};
use crate::{
config,
err::RedisParseErr,
messages::Event,
parse_client_request::{Stream, Timeline},
redis_to_client_stream::redis::RedisConn,
};
use futures::{Async, Poll};
use lru::LruCache;
use std::{collections::HashMap, time::Instant};
use std::collections::HashMap;
use uuid::Uuid;
/// The item that streams from Redis and is polled by the `ClientAgent`
#[derive(Debug)]
pub struct Receiver {
redis_connection: RedisConn,
timeline: Timeline,
manager_id: Uuid,
pub msg_queues: MessageQueues,
clients_per_timeline: HashMap<Timeline, i32>,
hashtag_cache: LruCache<i64, String>,
@ -39,8 +38,6 @@ impl Receiver {
Self {
redis_connection,
timeline: Timeline::empty(),
manager_id: Uuid::default(),
msg_queues: MessageQueues(HashMap::new()),
clients_per_timeline: HashMap::new(),
hashtag_cache: LruCache::new(1000),
@ -55,7 +52,6 @@ impl Receiver {
/// so Redis PubSub subscriptions are only updated when a new timeline
/// comes under management for the first time.
pub fn manage_new_timeline(&mut self, id: Uuid, tl: Timeline, hashtag: Option<String>) {
self.timeline = tl;
if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (hashtag, tl) {
self.hashtag_cache.put(id, hashtag.clone());
self.redis_connection.update_cache(hashtag, id);
@ -65,18 +61,49 @@ impl Receiver {
self.subscribe_or_unsubscribe_as_needed(tl);
}
/// Set the `Receiver`'s manager_id and target_timeline fields to the appropriate
/// value to be polled by the current `StreamManager`.
pub fn configure_for_polling(&mut self, manager_id: Uuid, timeline: Timeline) {
self.manager_id = manager_id;
self.timeline = timeline;
/// Returns the oldest message in the `ClientAgent`'s queue (if any).
///
/// Note: This method does **not** poll Redis every time, because polling
/// Redis is significantly more time consuming that simply returning the
/// message already in a queue. Thus, we only poll Redis if it has not
/// been polled lately.
pub fn poll_for(&mut self, id: Uuid, timeline: Timeline) -> Poll<Option<Event>, RedisConnErr> {
loop {
match self.redis_connection.poll_redis() {
Ok(Async::Ready(Some((timeline, event)))) => self
.msg_queues
.values_mut()
.filter(|msg_queue| msg_queue.timeline == timeline)
.for_each(|msg_queue| {
msg_queue.messages.push_back(event.clone());
}),
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) => (),
Err(err) => Err(err)?,
}
}
// If the `msg_queue` being polled has any new messages, return the first (oldest) one
match self.msg_queues.get_mut(&id) {
Some(msg_q) => {
msg_q.update_polled_at_time();
match msg_q.messages.pop_front() {
Some(event) => Ok(Async::Ready(Some(event))),
None => Ok(Async::NotReady),
}
}
None => {
log::error!("Polled a MsgQueue that had not been set up. Setting it up now.");
self.msg_queues.insert(id, MsgQueue::new(timeline));
Ok(Async::NotReady)
}
}
}
/// Drop any PubSub subscriptions that don't have active clients and check
/// that there's a subscription to the current one. If there isn't, then
/// subscribe to it.
fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: Timeline) {
let start_time = Instant::now();
let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(timeline);
// Record the lower number of clients subscribed to that channel
@ -102,47 +129,5 @@ impl Receiver {
.send_subscribe_cmd(&timeline.to_redis_raw_timeline(hashtag));
}
}
if start_time.elapsed().as_millis() > 1 {
log::warn!("Sending cmd to Redis took: {:?}", start_time.elapsed());
};
}
}
/// The stream that the ClientAgent polls to learn about new messages.
impl futures::stream::Stream for Receiver {
type Item = Event;
type Error = RedisParseErr;
/// Returns the oldest message in the `ClientAgent`'s queue (if any).
///
/// Note: This method does **not** poll Redis every time, because polling
/// Redis is significantly more time consuming that simply returning the
/// message already in a queue. Thus, we only poll Redis if it has not
/// been polled lately.
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let (timeline, id) = (self.timeline.clone(), self.manager_id);
loop {
match self.redis_connection.poll_redis() {
Ok(Async::Ready(Some((timeline, event)))) => self
.msg_queues
.values_mut()
.filter(|msg_queue| msg_queue.timeline == timeline)
.for_each(|msg_queue| {
msg_queue.messages.push_back(event.clone());
}),
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) => (),
Err(err) => Err(err)?,
}
}
// Record current time as last polled time
self.msg_queues.update_time_for_target_queue(id);
// If the `msg_queue` being polled has any new messages, return the first (oldest) one
match self.msg_queues.oldest_msg_in_target_queue(id, timeline) {
Some(value) => Ok(Async::Ready(Some(value))),
_ => Ok(Async::NotReady),
}
}
}

View File

@ -2,4 +2,4 @@ pub mod redis_cmd;
pub mod redis_connection;
pub mod redis_msg;
pub use redis_connection::RedisConn;
pub use redis_connection::{RedisConn, RedisConnErr};

View File

@ -0,0 +1,67 @@
use super::super::redis_msg::RedisParseErr;
use crate::err::TimelineErr;
#[derive(Debug)]
pub enum RedisConnErr {
TimelineErr(TimelineErr),
EventErr(serde_json::Error),
RedisParseErr(RedisParseErr),
}
impl From<serde_json::Error> for RedisConnErr {
fn from(error: serde_json::Error) -> Self {
Self::EventErr(error)
}
}
impl From<TimelineErr> for RedisConnErr {
fn from(e: TimelineErr) -> Self {
Self::TimelineErr(e)
}
}
impl From<RedisParseErr> for RedisConnErr {
fn from(e: RedisParseErr) -> Self {
Self::RedisParseErr(e)
}
}
// impl fmt::Display for RedisParseErr {
// fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
// use RedisParseErr::*;
// let msg = match self {
// Incomplete => "The input from Redis does not form a complete message, likely because \
// the input buffer filled partway through a message. Save this input \
// and try again with additional input from Redis."
// .to_string(),
// InvalidNumber(parse_int_err) => format!(
// "Redis indicated that an item would be a number, but it could not be parsed: {}",
// parse_int_err
// ),
// InvalidLineStart(line_start_char) => format!(
// "A line from Redis started with `{}`, which is not a valid character to indicate \
// the type of the Redis line.",
// line_start_char
// ),
// InvalidLineEnd => "A Redis line ended before expected line length".to_string(),
// IncorrectRedisType => "Received a Redis type that is not supported in this context. \
// Flodgatt expects each message from Redis to be a Redis array \
// consisting of bulk strings or integers."
// .to_string(),
// MissingField => "Redis input was missing a field Flodgatt expected (e.g., a `message` \
// without a payload line)"
// .to_string(),
// UnsupportedTimeline => {
// "The raw timeline received from Redis could not be parsed into a \
// supported timeline"
// .to_string()
// }
// UnsupportedEvent(e) => format!(
// "The event text from Redis could not be parsed into a valid event: {}",
// e
// ),
// };
// write!(f, "{}", msg)
// }
// }

View File

@ -1,11 +1,14 @@
use super::{redis_cmd, redis_msg::RedisParseOutput};
mod err;
use super::{
redis_cmd,
redis_msg::{RedisParseErr, RedisParseOutput},
};
use crate::{
config::RedisConfig,
err::{self, RedisParseErr},
messages::Event,
parse_client_request::Timeline,
config::RedisConfig, err::die_with_msg, messages::Event, parse_client_request::Timeline,
pubsub_cmd,
};
pub use err::RedisConnErr;
use futures::{Async, Poll};
use lru::LruCache;
@ -33,7 +36,7 @@ impl RedisConn {
pub fn new(redis_cfg: RedisConfig) -> Self {
let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port);
let conn_err = |e| {
err::die_with_msg(format!(
die_with_msg(format!(
"Could not connect to Redis at {}:{}.\n Error detail: {}",
*redis_cfg.host, *redis_cfg.port, e,
))
@ -69,7 +72,7 @@ impl RedisConn {
}
}
pub fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, RedisParseErr> {
pub fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, RedisConnErr> {
let mut buffer = vec![0u8; 6000];
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
if let Ok(Async::Ready(bytes_read)) = self.poll_read(&mut buffer) {
@ -110,7 +113,7 @@ impl RedisConn {
},
Ok(NonMsg(leftover)) => (Ok(Ready(None)), leftover),
Err(RedisParseErr::Incomplete) => (Ok(NotReady), input),
Err(other) => (Err(other), input),
Err(other_parse_err) => (Err(RedisConnErr::RedisParseErr(other_parse_err)), input),
};
self.redis_input.extend_from_slice(leftover.as_bytes());
self.redis_input.extend_from_slice(invalid_bytes);
@ -135,7 +138,7 @@ fn send_password(mut conn: net::TcpStream, password: &str) -> net::TcpStream {
conn.read_exact(&mut buffer).unwrap();
let reply = String::from_utf8(buffer.to_vec()).unwrap();
if reply != "+OK\r\n" {
err::die_with_msg(format!(
die_with_msg(format!(
r"Incorrect Redis password. You supplied `{}`.
Please supply correct password with REDIS_PASSWORD environmental variable.",
password,
@ -156,16 +159,16 @@ fn send_test_ping(mut conn: net::TcpStream) -> net::TcpStream {
let reply = String::from_utf8(buffer.to_vec()).unwrap();
match reply.as_str() {
"+PONG\r\n" => (),
"-NOAUTH" => err::die_with_msg(
"-NOAUTH" => die_with_msg(
r"Invalid authentication for Redis.
Redis reports that it needs a password, but you did not provide one.
You can set a password with the REDIS_PASSWORD environmental variable.",
),
"HTTP/1." => err::die_with_msg(
"HTTP/1." => die_with_msg(
r"The server at REDIS_HOST and REDIS_PORT is not a Redis server.
Please update the REDIS_HOST and/or REDIS_PORT environmental variables.",
),
_ => err::die_with_msg(format!(
_ => die_with_msg(format!(
"Could not connect to Redis for unknown reason. Expected `+PONG` reply but got {}",
reply
)),

View File

@ -0,0 +1,50 @@
use std::{error::Error, fmt};
#[derive(Debug)]
pub enum RedisParseErr {
Incomplete,
InvalidNumber(std::num::ParseIntError),
InvalidLineStart(String),
InvalidLineEnd,
IncorrectRedisType,
MissingField,
}
impl fmt::Display for RedisParseErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use RedisParseErr::*;
let msg = match self {
Incomplete => "The input from Redis does not form a complete message, likely because \
the input buffer filled partway through a message. Save this input \
and try again with additional input from Redis."
.to_string(),
InvalidNumber(parse_int_err) => format!(
"Redis indicated that an item would be a number, but it could not be parsed: {}",
parse_int_err
),
InvalidLineStart(line_start_char) => format!(
"A line from Redis started with `{}`, which is not a valid character to indicate \
the type of the Redis line.",
line_start_char
),
InvalidLineEnd => "A Redis line ended before expected line length".to_string(),
IncorrectRedisType => "Received a Redis type that is not supported in this context. \
Flodgatt expects each message from Redis to be a Redis array \
consisting of bulk strings or integers."
.to_string(),
MissingField => "Redis input was missing a field Flodgatt expected (e.g., a `message` \
without a payload line)"
.to_string(),
};
write!(f, "{}", msg)
}
}
impl Error for RedisParseErr {}
impl From<std::num::ParseIntError> for RedisParseErr {
fn from(error: std::num::ParseIntError) -> Self {
Self::InvalidNumber(error)
}
}

View File

@ -20,8 +20,11 @@
//! three characters, the second is a bulk string with ten characters, and the third is a
//! bulk string with 1,386 characters.
mod err;
pub use err::RedisParseErr;
use self::RedisParseOutput::*;
use crate::err::RedisParseErr;
use std::{
convert::{TryFrom, TryInto},
str,
@ -93,10 +96,7 @@ fn after_newline_at<'a>(s: &'a str, start: usize) -> RedisParser<'a, &'a str> {
}
fn parse_number_at<'a>(s: &'a str) -> RedisParser<(usize, &'a str)> {
let len = s
.chars()
.position(|c| !c.is_numeric())
.ok_or(NonNumericInput)?;
let len = s.chars().position(|c| !c.is_numeric()).ok_or(Incomplete)?;
Ok((s[..len].parse()?, after_newline_at(s, len)?))
}