Improve error handling

This commit is contained in:
Daniel Sockwell 2020-04-09 22:01:14 -04:00
parent 1657113c58
commit 62df3a56b1
14 changed files with 164 additions and 70 deletions

View File

@ -2,8 +2,44 @@ mod timeline;
pub use timeline::TimelineErr;
use crate::redis_to_client_stream::ReceiverErr;
use std::fmt;
pub enum FatalErr {
Err,
ReceiverErr(ReceiverErr),
}
impl FatalErr {
pub fn exit(msg: impl fmt::Display) {
eprintln!("{}", msg);
std::process::exit(1);
}
}
impl std::error::Error for FatalErr {}
impl fmt::Debug for FatalErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{}", self)
}
}
impl fmt::Display for FatalErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Error message")
}
}
impl From<ReceiverErr> for FatalErr {
fn from(e: ReceiverErr) -> Self {
Self::ReceiverErr(e)
}
}
pub fn die_with_msg2(msg: impl fmt::Display) {
eprintln!("{}", msg);
std::process::exit(1);
}
pub fn die_with_msg(msg: impl fmt::Display) -> ! {
eprintln!("FATAL ERROR: {}", msg);
std::process::exit(1);

View File

@ -1,5 +1,6 @@
use flodgatt::{
config::{DeploymentConfig, EnvVar, PostgresConfig, RedisConfig},
err::FatalErr,
messages::Event,
parse_client_request::{PgPool, Subscription, Timeline},
redis_to_client_stream::{Receiver, SseStream, WsStream},
@ -11,7 +12,7 @@ use tokio::{
};
use warp::{http::StatusCode, path, ws::Ws2, Filter, Rejection};
fn main() {
fn main() -> Result<(), FatalErr> {
dotenv::from_filename(match env::var("ENV").ok().as_deref() {
Some("production") => ".env.production",
Some("development") | None => ".env",
@ -30,12 +31,7 @@ fn main() {
let (event_tx, event_rx) = watch::channel((Timeline::empty(), Event::Ping));
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let poll_freq = *redis_cfg.polling_interval;
let receiver = Receiver::try_from(redis_cfg, event_tx, cmd_rx)
.unwrap_or_else(|e| {
log::error!("{}\nFlodgatt shutting down...", e);
std::process::exit(1);
})
.into_arc();
let receiver = Receiver::try_from(redis_cfg, event_tx, cmd_rx)?.into_arc();
log::info!("Streaming server initialized and ready to accept connections");
// Server Sent Events
@ -48,19 +44,13 @@ fn main() {
move |subscription: Subscription, sse_connection_to_client: warp::sse::Sse| {
log::info!("Incoming SSE request for {:?}", subscription.timeline);
{
let mut receiver = sse_receiver.lock().expect("TODO");
let mut receiver = sse_receiver.lock().unwrap_or_else(Receiver::recover);
receiver.subscribe(&subscription).unwrap_or_else(|e| {
log::error!("Could not subscribe to the Redis channel: {}", e)
});
}
let cmd_tx = sse_cmd_tx.clone();
let sse_rx = sse_rx.clone();
// self.sse.reply(
// warp::sse::keep_alive()
// .interval(Duration::from_secs(30))
// .text("thump".to_string())
// .stream(event_stream),
// )
// send the updates through the SSE connection
SseStream::send_events(sse_connection_to_client, cmd_tx, subscription, sse_rx)
},
@ -75,7 +65,7 @@ fn main() {
.map(move |subscription: Subscription, ws: Ws2| {
log::info!("Incoming websocket request for {:?}", subscription.timeline);
{
let mut receiver = ws_receiver.lock().expect("TODO");
let mut receiver = ws_receiver.lock().unwrap_or_else(Receiver::recover);
receiver.subscribe(&subscription).unwrap_or_else(|e| {
log::error!("Could not subscribe to the Redis channel: {}", e)
});
@ -107,10 +97,10 @@ fn main() {
.map(|| "OK")
.or(warp::path!("api" / "v1" / "streaming" / "status")
.and(warp::path::end())
.map(move || r1.lock().expect("TODO").count_connections()))
.map(move || r1.lock().unwrap_or_else(Receiver::recover).count()))
.or(
warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline")
.map(move || r3.lock().expect("TODO").list_connections()),
.map(move || r3.lock().unwrap_or_else(Receiver::recover).list()),
)
};
#[cfg(not(feature = "stub_status"))]
@ -149,12 +139,13 @@ fn main() {
tokio::run(lazy(move || {
let receiver = receiver.clone();
warp::spawn(lazy(move || {
tokio::timer::Interval::new(Instant::now(), poll_freq)
.map_err(|e| log::error!("{}", e))
.for_each(move |_| {
let receiver = receiver.clone();
receiver.lock().expect("TODO").poll_broadcast();
let mut receiver = receiver.lock().unwrap_or_else(Receiver::recover);
receiver.poll_broadcast().unwrap_or_else(FatalErr::exit);
Ok(())
})
}));
@ -162,4 +153,5 @@ fn main() {
warp::serve(ws_routes.or(sse_routes).with(cors).or(status_endpoints)).bind(server_addr)
}));
};
Ok(())
}

View File

@ -1,10 +1,10 @@
use super::{emoji::Emoji, visibility::Visibility};
use super::{emoji::Emoji, id::Id, visibility::Visibility};
use serde::{Deserialize, Serialize};
#[serde(deny_unknown_fields)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub(super) struct Account {
pub id: String,
pub id: Id,
username: String,
pub acct: String,
url: String,
@ -21,7 +21,7 @@ pub(super) struct Account {
statuses_count: i64,
followers_count: i64,
following_count: i64,
moved: Option<Box<String>>,
moved: Option<String>,
fields: Option<Vec<Field>>,
bot: Option<bool>,
source: Option<Source>,

View File

@ -0,0 +1,47 @@
use serde::{
de::{self, Visitor},
Deserialize, Deserializer, Serialize, Serializer,
};
use std::fmt;
/// A user ID.
///
/// Internally, Mastodon IDs are i64s, but are sent to clients as string because
/// JavaScript numbers don't support i64s. This newtype serializes to/from a string, but
/// keeps the i64 as the "true" value for internal use.
#[derive(Debug, Clone, PartialEq)]
pub struct Id(pub i64);
impl Serialize for Id {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("{}", self.0))
}
}
impl<'de> Deserialize<'de> for Id {
fn deserialize<D>(deserializer: D) -> Result<Id, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_string(IdVisitor)
}
}
struct IdVisitor;
impl<'de> Visitor<'de> for IdVisitor {
type Value = Id;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string that can be parsed into an i64")
}
fn visit_string<E: de::Error>(self, value: String) -> Result<Self::Value, E> {
match value.parse() {
Ok(n) => Ok(Id(n)),
Err(e) => Err(E::custom(format!("could not parse: {}", e))),
}
}
}

View File

@ -1,9 +1,10 @@
use super::id::Id;
use serde::{Deserialize, Serialize};
#[serde(deny_unknown_fields)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub(super) struct Mention {
pub id: String,
pub id: Id,
username: String,
acct: String,
url: String,

View File

@ -4,6 +4,7 @@ mod announcement;
mod announcement_reaction;
mod conversation;
mod emoji;
mod id;
mod mention;
mod notification;
mod status;

View File

@ -3,10 +3,11 @@ mod attachment;
mod card;
mod poll;
use super::{account::Account, emoji::Emoji, mention::Mention, tag::Tag, visibility::Visibility};
use super::{
account::Account, emoji::Emoji, id::Id, mention::Mention, tag::Tag, visibility::Visibility,
};
use {application::Application, attachment::Attachment, card::Card, poll::Poll};
use crate::log_fatal;
use crate::parse_client_request::Blocks;
use hashbrown::HashSet;
@ -17,7 +18,7 @@ use std::string::String;
#[serde(deny_unknown_fields)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct Status {
id: String,
id: Id,
uri: String,
created_at: String,
account: Account,
@ -34,8 +35,8 @@ pub struct Status {
favourites_count: i64,
replies_count: i64,
url: Option<String>,
in_reply_to_id: Option<String>,
in_reply_to_account_id: Option<String>,
in_reply_to_id: Option<Id>,
in_reply_to_account_id: Option<Id>,
reblog: Option<Box<Status>>,
poll: Option<Poll>,
card: Option<Card>,
@ -91,7 +92,7 @@ impl Status {
blocking_users,
blocked_domains,
} = blocks;
let user_id = &self.account.id.parse().expect("TODO");
let user_id = &self.account.id.0;
if blocking_users.contains(user_id) || self.involves(blocked_users) {
REJECT
@ -105,25 +106,19 @@ impl Status {
}
fn involves(&self, blocked_users: &HashSet<i64>) -> bool {
// TODO replace vvvv with error handling
let err = |_| log_fatal!("Could not process an `id` field in {:?}", &self);
// involved_users = mentioned_users + author + replied-to user + boosted user
let mut involved_users: HashSet<i64> = self
.mentions
.iter()
.map(|mention| mention.id.parse().unwrap_or_else(err))
.collect();
let mut involved_users: HashSet<i64> =
self.mentions.iter().map(|mention| mention.id.0).collect();
// author
involved_users.insert(self.account.id.parse::<i64>().unwrap_or_else(err));
involved_users.insert(self.account.id.0);
// replied-to user
if let Some(user_id) = self.in_reply_to_account_id.clone() {
involved_users.insert(user_id.parse().unwrap_or_else(err));
involved_users.insert(user_id.0);
}
// boosted user
if let Some(boosted_status) = self.reblog.clone() {
involved_users.insert(boosted_status.account.id.parse().unwrap_or_else(err));
involved_users.insert(boosted_status.account.id.0);
}
!involved_users.is_disjoint(blocked_users)
}

View File

@ -77,10 +77,10 @@ impl DynamicEvent {
involved_users.insert(user_id.parse().expect("TODO"));
}
// boosted user
let id_of_boosted_user = self.payload["reblog"]["account"]["id"]
.as_str()
.expect("TODO");
involved_users.insert(id_of_boosted_user.parse().expect("TODO"));
let id_of_boosted_user = self.payload["reblog"]["account"]["id"].as_str();
if let Some(user_id) = id_of_boosted_user {
involved_users.insert(user_id.parse().expect("TODO"));
}
!involved_users.is_disjoint(blocked_users)
}

View File

@ -4,8 +4,9 @@ mod dynamic_event;
pub use {checked_event::CheckedEvent, dynamic_event::DynamicEvent};
use crate::log_fatal;
use crate::redis_to_client_stream::ReceiverErr;
use serde::Serialize;
use std::string::String;
use std::{convert::TryFrom, string::String};
#[derive(Debug, Clone)]
pub enum Event {
@ -61,24 +62,26 @@ impl Event {
}
}
impl From<String> for Event {
fn from(event_txt: String) -> Event {
Event::from(event_txt.as_str())
impl TryFrom<String> for Event {
type Error = ReceiverErr;
fn try_from(event_txt: String) -> Result<Event, ReceiverErr> {
Event::try_from(event_txt.as_str())
}
}
impl From<&str> for Event {
fn from(event_txt: &str) -> Event {
impl TryFrom<&str> for Event {
type Error = ReceiverErr;
fn try_from(event_txt: &str) -> Result<Event, ReceiverErr> {
match serde_json::from_str(event_txt) {
Ok(checked_event) => Event::TypeSafe(checked_event),
Ok(checked_event) => Ok(Event::TypeSafe(checked_event)),
Err(e) => {
log::error!(
"Error safely parsing Redis input. Mastodon and Flodgatt do not \
strictly conform to the same version of Mastodon's API.\n{}\
strictly conform to the same version of Mastodon's API.\n{}\n\
Forwarding Redis payload without type checking it.",
e
);
let dyn_event: DynamicEvent = serde_json::from_str(&event_txt).expect("TODO");
Event::Dynamic(dyn_event)
let dyn_event: DynamicEvent = serde_json::from_str(&event_txt)?;
Ok(Event::Dynamic(dyn_event))
}
}
}

View File

@ -95,7 +95,9 @@ impl WsStream {
match self.ws_tx.try_send(Message::text(txt)) {
Ok(_) => Ok(()),
Err(_) => {
self.unsubscribe_tx.try_send(tl).expect("TODO");
self.unsubscribe_tx
.try_send(tl)
.unwrap_or_else(|e| log::error!("could not unsubscribe from channel: {}", e));
Err(())
}
}
@ -150,7 +152,9 @@ impl SseStream {
}
})
.then(move |res| {
unsubscribe_tx.try_send(target_timeline).expect("TODO");
unsubscribe_tx
.try_send(target_timeline)
.unwrap_or_else(|e| log::error!("could not unsubscribe from channel: {}", e));
res
});

View File

@ -5,7 +5,7 @@ mod redis;
pub use {
event_stream::{SseStream, WsStream},
receiver::Receiver,
receiver::{Receiver, ReceiverErr},
};
#[cfg(feature = "bench")]

View File

@ -1,5 +1,7 @@
use super::super::redis::{RedisConnErr, RedisParseErr};
use crate::err::TimelineErr;
use crate::messages::Event;
use crate::parse_client_request::Timeline;
use serde_json;
use std::fmt;
@ -11,8 +13,11 @@ pub enum ReceiverErr {
EventErr(serde_json::Error),
RedisParseErr(RedisParseErr),
RedisConnErr(RedisConnErr),
ChannelSendErr(tokio::sync::watch::error::SendError<(Timeline, Event)>),
}
impl std::error::Error for ReceiverErr {}
impl fmt::Display for ReceiverErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use ReceiverErr::*;
@ -25,10 +30,16 @@ impl fmt::Display for ReceiverErr {
RedisParseErr(inner) => write!(f, "{}", inner),
RedisConnErr(inner) => write!(f, "{}", inner),
TimelineErr(inner) => write!(f, "{}", inner),
ChannelSendErr(inner) => write!(f, "{}", inner),
}?;
Ok(())
}
}
impl From<tokio::sync::watch::error::SendError<(Timeline, Event)>> for ReceiverErr {
fn from(error: tokio::sync::watch::error::SendError<(Timeline, Event)>) -> Self {
Self::ChannelSendErr(error)
}
}
impl From<serde_json::Error> for ReceiverErr {
fn from(error: serde_json::Error) -> Self {

View File

@ -18,7 +18,7 @@ use tokio::sync::{mpsc, watch};
use std::{
result,
sync::{Arc, Mutex},
sync::{Arc, Mutex, MutexGuard, PoisonError},
time::{Duration, Instant},
};
@ -99,36 +99,40 @@ impl Receiver {
Ok(())
}
pub fn poll_broadcast(&mut self) {
pub fn poll_broadcast(&mut self) -> Result<()> {
while let Ok(Async::Ready(Some(tl))) = self.rx.poll() {
self.unsubscribe(tl).expect("TODO");
self.unsubscribe(tl)?
}
if self.ping_time.elapsed() > Duration::from_secs(30) {
self.ping_time = Instant::now();
self.tx
.broadcast((Timeline::empty(), Event::Ping))
.expect("TODO");
self.tx.broadcast((Timeline::empty(), Event::Ping))?
} else {
match self.redis_connection.poll_redis() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(Some((timeline, event)))) => {
self.tx.broadcast((timeline, event)).expect("TODO");
self.tx.broadcast((timeline, event))?
}
Ok(Async::Ready(None)) => (), // subscription cmd or msg for other namespace
Err(_err) => panic!("TODO"),
Err(err) => log::error!("{}", err), // drop msg, log err, and proceed
}
}
Ok(())
}
pub fn count_connections(&self) -> String {
pub fn recover(poisoned: PoisonError<MutexGuard<Self>>) -> MutexGuard<Self> {
log::error!("{}", &poisoned);
poisoned.into_inner()
}
pub fn count(&self) -> String {
format!(
"Current connections: {}",
self.clients_per_timeline.values().sum::<i32>()
)
}
pub fn list_connections(&self) -> String {
pub fn list(&self) -> String {
let max_len = self
.clients_per_timeline
.keys()

View File

@ -10,7 +10,7 @@ use crate::{
};
use std::{
convert::TryFrom,
convert::{TryFrom, TryInto},
io::{Read, Write},
net::TcpStream,
str,
@ -92,13 +92,13 @@ impl RedisConn {
Some(ns) if msg.timeline_txt.starts_with(&format!("{}:timeline:", ns)) => {
let trimmed_tl_txt = &msg.timeline_txt[ns.len() + ":timeline:".len()..];
let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut self.tag_id_cache)?;
let event = msg.event_txt.into();
let event = msg.event_txt.try_into()?;
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
}
None => {
let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..];
let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut self.tag_id_cache)?;
let event = msg.event_txt.into();
let event = msg.event_txt.try_into()?;
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
}
Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input),