diff --git a/Cargo.lock b/Cargo.lock index d3c4761..0cf4dd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,7 +416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.9.3" +version = "0.9.4" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -437,7 +437,6 @@ dependencies = [ "tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", "url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "urlencoding 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "warp 0.1.20 (git+https://github.com/seanmonstar/warp.git)", ] @@ -2224,14 +2223,6 @@ name = "utf-8" version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "uuid" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "vcpkg" version = "0.2.7" @@ -2598,7 +2589,6 @@ dependencies = [ "checksum url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75b414f6c464c879d7f9babf951f23bc3743fb7313c081b2e6ca719067ea9d61" "checksum urlencoding 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3df3561629a8bb4c57e5a2e4c43348d9e29c7c29d9b1c4c1f47166deca8f37ed" "checksum utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" -"checksum uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" "checksum vcpkg 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "33dd455d0f96e90a75803cfeb7f948768c08d70a6de9a8d2362461935698bf95" "checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd" "checksum walkdir 2.2.9 (registry+https://github.com/rust-lang/crates.io-index)" = "9658c94fa8b940eab2250bd5a457f9c48b748420d71293b165c8cdbe2f55f71e" diff --git a/Cargo.toml b/Cargo.toml index 7437aee..d7e1d43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.9.3" +version = "0.9.4" authors = ["Daniel Long Sockwell "] edition = "2018" @@ -25,7 +25,6 @@ r2d2 = "0.8.8" lru = "0.4.3" urlencoding = "1.0.0" hashbrown = "0.7.1" -uuid = { version = "0.8.1", features = ["v4"] } [dev-dependencies] criterion = "0.3" diff --git a/src/main.rs b/src/main.rs index a3c6b21..3d151c1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,7 @@ use warp::Filter; fn main() -> Result<(), Error> { config::merge_dotenv()?; - pretty_env_logger::try_init()?; + pretty_env_logger::try_init_timed()?; let (postgres_cfg, redis_cfg, cfg) = config::from_env(dotenv::vars().collect())?; let poll_freq = *redis_cfg.polling_interval; diff --git a/src/response/redis/connection.rs b/src/response/redis/connection.rs index acdc794..49c8cfe 100644 --- a/src/response/redis/connection.rs +++ b/src/response/redis/connection.rs @@ -133,7 +133,6 @@ impl RedisConn { // (Documented in [PR #3278](https://github.com/tootsuite/mastodon/pull/3278)) // Question: why can't the Puma server just use NUMSUB for this? self.secondary.write_all(&secondary_cmd)?; - log::info!("Sent {}", String::from_utf8_lossy(&secondary_cmd)); Ok(()) } diff --git a/src/response/redis/manager.rs b/src/response/redis/manager.rs index a179115..4a3d3f1 100644 --- a/src/response/redis/manager.rs +++ b/src/response/redis/manager.rs @@ -12,7 +12,7 @@ use crate::request::{Subscription, Timeline}; pub(self) use super::EventErr; use futures::Async; -use hashbrown::HashMap; +use hashbrown::{HashMap, HashSet}; use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; use std::time::{Duration, Instant}; use tokio::sync::mpsc::Sender; @@ -28,8 +28,7 @@ pub struct Manager { } impl Manager { - /// Create a new `Manager`, with its own Redis connections (but, as yet, no - /// active subscriptions). + /// Create a new `Manager`, with its own Redis connections (but no active subscriptions). pub fn try_from(redis_cfg: &config::Redis) -> Result { Ok(Self { redis_connection: RedisConn::new(redis_cfg)?, @@ -60,16 +59,14 @@ impl Manager { }; } - pub(crate) fn unsubscribe(&mut self, tl: &mut Timeline) -> Result<()> { + pub(crate) fn unsubscribe(&mut self, tl: &Timeline) -> Result<()> { self.redis_connection.send_cmd(RedisCmd::Unsubscribe, &tl)?; self.timelines.remove(&tl); - - log::info!("Ended stream for {:?}", tl); - Ok(()) + Ok(log::info!("Ended stream for {:?}", tl)) } pub fn poll_broadcast(&mut self) -> Result<()> { - let mut completed_timelines = Vec::new(); + let mut completed_timelines = HashSet::new(); let log_send_err = |tl, e| Some(log::error!("cannot send to {:?}: {}", tl, e)).is_some(); if self.ping_time.elapsed() > Duration::from_secs(30) { @@ -80,8 +77,16 @@ impl Manager { Err(e) if !e.is_closed() => log_send_err(*tl, e), Err(_) => false, }); + + // NOTE: this takes two cycles to close a connection after the client + // times out: on the first cycle, this fn sends the Event to the + // response::Ws thread without any error, but that thread encounters an + // error sending to the client and ends. On the *second* cycle, this fn + // gets the error it's waiting on to clean up the connection. This isn't + // ideal, but is harmless, since the only reason we haven't cleaned up the + // connection is that no messages are being sent to that client. if channels.is_empty() { - completed_timelines.push(*tl); + completed_timelines.insert(*tl); } } }; @@ -98,7 +103,7 @@ impl Manager { Err(_) => false, }); if channels.is_empty() { - completed_timelines.push(tl); + completed_timelines.insert(tl); } } Ok(Async::Ready(None)) => (), // cmd or msg for other namespace @@ -106,7 +111,7 @@ impl Manager { } } - for tl in &mut completed_timelines { + for tl in &mut completed_timelines.iter() { self.unsubscribe(tl)?; } Ok(()) @@ -135,6 +140,9 @@ impl Manager { let tl_txt = format!("{:?}:", tl); format!("{:>1$} {2}\n", tl_txt, max_len, channel_map.len()) }) + .chain(std::iter::once( + "\n*may include recently disconnected clients".to_string(), + )) .collect() } } diff --git a/src/response/stream/ws.rs b/src/response/stream/ws.rs index 515e572..e5b1227 100644 --- a/src/response/stream/ws.rs +++ b/src/response/stream/ws.rs @@ -4,11 +4,10 @@ use crate::request::Subscription; use futures::future::Future; use futures::stream::Stream; use std::sync::Arc; -use tokio::sync::mpsc::{self, Receiver, UnboundedSender}; +use tokio::sync::mpsc::Receiver; use warp::ws::{Message, WebSocket}; type EventRx = Receiver>; -type MsgTx = UnboundedSender; pub struct Ws(Subscription); @@ -23,45 +22,37 @@ impl Ws { event_rx: EventRx, ) -> impl Future { let (transmit_to_ws, _receive_from_ws) = ws.split(); - // Create a pipe, send one end of it to a different green thread and tell that end - // to forward to the WebSocket client - let (mut ws_tx, ws_rx) = mpsc::unbounded_channel(); - warp::spawn( - ws_rx - .map_err(|_| -> warp::Error { unreachable!() }) - .forward(transmit_to_ws) - .map(|_r| ()) - .map_err(|e| { - match e.to_string().as_ref() { - "IO error: Broken pipe (os error 32)" => (), // just closed unix socket - _ => log::warn!("WebSocket send error: {}", e), + event_rx + .filter_map(move |event| { + if matches!(*event, Event::Ping) { + Some(Message::text(&event.to_json_string())) + } else { + match (event.update_payload(), event.dyn_update_payload()) { + (Some(update), _) if !self.filtered(update) => { + Some(Message::text(&event.to_json_string())) + } + (None, None) => Some(Message::text(&event.to_json_string())), // send all non-updates + (_, Some(dyn_update)) if !self.filtered(dyn_update) => { + Some(Message::text(&event.to_json_string())) + } + _ => None, } - }), - ); - - event_rx.map_err(|_| ()).for_each(move |event| { - if matches!(*event, Event::Ping) { - send_msg(&event, &mut ws_tx)? - } else { - match (event.update_payload(), event.dyn_update_payload()) { - (Some(update), _) => self.send_or_filter(&event, update, &mut ws_tx), - (None, None) => send_msg(&event, &mut ws_tx), // send all non-updates - (_, Some(dyn_update)) => self.send_or_filter(&event, dyn_update, &mut ws_tx), - }? - } - Ok(()) - }) + } + }) + .map_err(|_| -> warp::Error { unreachable!() }) + .forward(transmit_to_ws) + .map(|_r| ()) + // ignore errors that indicate normal disconnects. TODO - once we upgrade our + // Warp version, we should stop matching on text, which is fragile. + .map_err(|e| match e.to_string().as_ref() { + "IO error: Broken pipe (os error 32)" + | "IO error: Connection reset by peer (os error 104)" => (), + e => log::warn!("WebSocket send error: {}", e), + }) } - - fn send_or_filter( - &mut self, - event: &Event, - update: &impl Payload, - mut ws_tx: &mut MsgTx, - ) -> Result<(), ()> { + fn filtered(&mut self, update: &impl Payload) -> bool { let (blocks, allowed_langs) = (&self.0.blocks, &self.0.allowed_langs); - - let skip = |reason, tl| Ok(log::info!("{:?} msg skipped - {}", tl, reason)); + let skip = |msg| Some(log::info!("{:?} msg skipped - {}", self.0.timeline, msg)).is_some(); match self.0.timeline { tl if tl.is_public() @@ -69,23 +60,14 @@ impl Ws { && !allowed_langs.is_empty() && !allowed_langs.contains(&update.language()) => { - skip("disallowed language", tl) + skip("disallowed language") } - - tl if !blocks.blocked_users.is_disjoint(&update.involved_users()) => { - skip("involves blocked user", tl) + _ if !blocks.blocked_users.is_disjoint(&update.involved_users()) => { + skip("involves blocked user") } - tl if blocks.blocking_users.contains(update.author()) => skip("from blocking user", tl), - tl if blocks.blocked_domains.contains(update.sent_from()) => { - skip("from blocked domain", tl) - } - _ => Ok(send_msg(event, &mut ws_tx)?), + _ if blocks.blocking_users.contains(update.author()) => skip("from blocking user"), + _ if blocks.blocked_domains.contains(update.sent_from()) => skip("from blocked domain"), + _ => false, } } } - -fn send_msg(event: &Event, ws_tx: &mut MsgTx) -> Result<(), ()> { - ws_tx - .try_send(Message::text(&event.to_json_string())) - .map_err(|_| log::info!("WebSocket connection closed")) -}