diff --git a/Cargo.lock b/Cargo.lock index c23a034..6356baf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,7 +416,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.9.0" +version = "0.9.1" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index d48ef39..7df2294 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.9.0" +version = "0.9.1" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/response/redis/manager.rs b/src/response/redis/manager.rs index ba84c76..3c7f5cc 100644 --- a/src/response/redis/manager.rs +++ b/src/response/redis/manager.rs @@ -59,7 +59,7 @@ impl Manager { } pub(crate) fn unsubscribe(&mut self, tl: &mut Timeline, id: &Uuid) -> Result<()> { - let channels = self.timelines.get_mut(tl).expect("TODO"); + let channels = self.timelines.get_mut(tl).ok_or(Error::InvalidId)?; channels.remove(id); if channels.len() == 0 { @@ -71,10 +71,8 @@ impl Manager { } pub fn poll_broadcast(&mut self) -> Result<()> { - let (start, mut sent) = (std::time::Instant::now(), false); let mut completed_timelines = Vec::new(); if self.ping_time.elapsed() > Duration::from_secs(30) { - log::info!("Ping!"); self.ping_time = Instant::now(); for (timeline, channels) in self.timelines.iter_mut() { for (uuid, channel) in channels.iter_mut() { @@ -89,13 +87,9 @@ impl Manager { match self.redis_connection.poll_redis() { Ok(Async::NotReady) => break, Ok(Async::Ready(Some((tl, event)))) => { - for (uuid, channel) in self.timelines.get_mut(&tl).ok_or(Error::InvalidId)? { - log::info!("Sending real event for {:?}", tl); - sent = true; - match channel.try_send(event.clone()) { - Ok(_) => (), - Err(_) => completed_timelines.push((tl, *uuid)), - } + for (uuid, tx) in self.timelines.get_mut(&tl).ok_or(Error::InvalidId)? { + tx.try_send(event.clone()) + .unwrap_or_else(|_| completed_timelines.push((tl, *uuid))) } } Ok(Async::Ready(None)) => (), // cmd or msg for other namespace @@ -106,9 +100,6 @@ impl Manager { for (tl, channel) in completed_timelines.iter_mut() { self.unsubscribe(tl, &channel)?; } - if sent { - log::info!("{:?}", start.elapsed()) - } Ok(()) } diff --git a/src/response/redis/manager/err.rs b/src/response/redis/manager/err.rs index 4ef4653..9f7dcad 100644 --- a/src/response/redis/manager/err.rs +++ b/src/response/redis/manager/err.rs @@ -6,6 +6,7 @@ use std::fmt; #[derive(Debug)] pub enum Error { InvalidId, + TimelineErr(TimelineErr), EventErr(EventErr), RedisParseErr(RedisParseErr), @@ -22,7 +23,7 @@ impl fmt::Display for Error { match self { InvalidId => write!( f, - "Attempted to get messages for a subscription that had not been set up." + "tried to access a timeline/channel subscription that does not exist" ), EventErr(inner) => write!(f, "{}", inner), RedisParseErr(inner) => write!(f, "{}", inner), diff --git a/src/response/stream/ws.rs b/src/response/stream/ws.rs index 5a77b3a..e4d38e1 100644 --- a/src/response/stream/ws.rs +++ b/src/response/stream/ws.rs @@ -1,7 +1,8 @@ use super::{Event, Payload}; use crate::request::Subscription; -use futures::{future::Future, stream::Stream}; +use futures::future::Future; +use futures::stream::Stream; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use warp::ws::{Message, WebSocket}; @@ -31,9 +32,7 @@ impl Ws { .map(|_r| ()) .map_err(|e| { match e.to_string().as_ref() { - "IO error: Broken pipe (os error 32)" => { - log::info!("Client connection lost") - } // just closed unix socket + "IO error: Broken pipe (os error 32)" => (), // just closed unix socket _ => log::warn!("WebSocket send error: {}", e), } }),