diff --git a/Cargo.lock b/Cargo.lock index 1f4de48..5119ced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -414,7 +414,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.4.6" +version = "0.4.7" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 991dca3..6454c5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.4.6" +version = "0.4.7" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/main.rs b/src/main.rs index fd6688f..843bed3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,6 @@ use flodgatt::{ parse_client_request::{sse, user, ws}, redis_to_client_stream::{self, ClientAgent}, }; -use log::warn; use std::{collections::HashMap, env, fs, net, os::unix::fs::PermissionsExt}; use tokio::net::UnixListener; use warp::{path, ws::Ws2, Filter}; @@ -19,7 +18,7 @@ fn main() { let env_vars = config::EnvVar::new(env_vars_map); pretty_env_logger::init(); - warn!( + log::warn!( "Flodgatt recognized the following environmental variables:{}", env_vars.clone() ); @@ -32,7 +31,7 @@ fn main() { let client_agent_ws = client_agent_sse.clone_with_shared_receiver(); let pg_pool = user::PostgresPool::new(postgres_cfg); - warn!("Streaming server initialized and ready to accept connections"); + log::warn!("Streaming server initialized and ready to accept connections"); // Server Sent Events let sse_update_interval = *cfg.ws_interval; @@ -40,6 +39,7 @@ fn main() { .and(warp::sse()) .map( move |user: user::User, sse_connection_to_client: warp::sse::Sse| { + log::info!("Incoming SSE request"); // Create a new ClientAgent let mut client_agent = client_agent_sse.clone_with_shared_receiver(); // Assign ClientAgent to generate stream of updates for the user/timeline pair @@ -60,7 +60,7 @@ fn main() { let websocket_routes = ws::extract_user_or_reject(pg_pool.clone()) .and(warp::ws::ws2()) .map(move |user: user::User, ws: Ws2| { - warn!("Incoming request"); + log::info!("Incoming websocket request"); let token = user.access_token.clone(); // Create a new ClientAgent let mut client_agent = client_agent_ws.clone_with_shared_receiver(); @@ -90,7 +90,7 @@ fn main() { let health = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK"); if let Some(socket) = &*cfg.unix_socket { - warn!("Using Unix socket {}", socket); + log::warn!("Using Unix socket {}", socket); fs::remove_file(socket).unwrap_or_default(); let incoming = UnixListener::bind(socket).unwrap().incoming(); diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index ce9fe68..e2e5777 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -48,7 +48,10 @@ pub fn send_updates_to_ws( rx.map_err(|()| -> warp::Error { unreachable!() }) .forward(ws_tx) .map(|_r| ()) - .map_err(|e| eprintln!("websocket send error: {}", e)), + .map_err(|e| match e.to_string().as_ref() { + "IO error: Broken pipe (os error 32)" => (), // just closed unix socket + _ => log::warn!("websocket send error: {}", e), + }), ); // Yield new events for as long as the client is still connected @@ -60,6 +63,11 @@ pub fn send_updates_to_ws( log::info!("Client closed WebSocket connection"); futures::future::ok(false) } + Err(e) if e.to_string() == "IO error: Broken pipe (os error 32)" => { + // no err, just closed Unix socket + log::info!("Client closed WebSocket connection"); + futures::future::ok(false) + } Err(e) => { log::warn!("{}", e); futures::future::ok(false)