From eda52c20b1298fe8d7477c394d32d1cec985d1d8 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Thu, 19 Mar 2020 20:54:23 -0400 Subject: [PATCH] Add additional info logging (#98) --- Cargo.toml | 2 +- src/config/deployment_cfg.rs | 2 +- src/config/postgres_cfg.rs | 4 +--- src/config/redis_cfg.rs | 5 ++-- src/main.rs | 23 ++++++++++--------- src/parse_client_request/mod.rs | 2 +- src/parse_client_request/sse.rs | 2 +- .../{user => subscription}/mock_postgres.rs | 0 .../{user => subscription}/mod.rs | 2 +- .../{user => subscription}/postgres.rs | 2 +- .../{user => subscription}/stdin | 0 src/parse_client_request/ws.rs | 2 +- src/redis_to_client_stream/client_agent.rs | 6 ++--- src/redis_to_client_stream/message.rs | 19 ++++----------- src/redis_to_client_stream/mod.rs | 12 +++++----- .../receiver/message_queues.rs | 2 +- src/redis_to_client_stream/receiver/mod.rs | 4 ++-- 17 files changed, 39 insertions(+), 50 deletions(-) rename src/parse_client_request/{user => subscription}/mock_postgres.rs (100%) rename src/parse_client_request/{user => subscription}/mod.rs (99%) rename src/parse_client_request/{user => subscription}/postgres.rs (99%) rename src/parse_client_request/{user => subscription}/stdin (100%) diff --git a/Cargo.toml b/Cargo.toml index f3253a7..8796ed8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.6.2" +version = "0.6.3" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/config/deployment_cfg.rs b/src/config/deployment_cfg.rs index cf7dfb4..d94a79e 100644 --- a/src/config/deployment_cfg.rs +++ b/src/config/deployment_cfg.rs @@ -25,7 +25,7 @@ impl DeploymentConfig<'_> { cors: Cors::default(), }; cfg.env = cfg.env.maybe_update(env.get("RUST_ENV")); - log::warn!("Using deployment configuration:\n {:#?}", &cfg); + log::info!("Using deployment configuration:\n {:#?}", &cfg); cfg } } diff --git a/src/config/postgres_cfg.rs b/src/config/postgres_cfg.rs index fdbf27e..f109925 100644 --- a/src/config/postgres_cfg.rs +++ b/src/config/postgres_cfg.rs @@ -56,9 +56,7 @@ impl PostgresConfig { port: PgPort::default().maybe_update(env.get("DB_PORT")), ssl_mode: PgSslMode::default().maybe_update(env.get("DB_SSLMODE")), }; - - log::warn!("Postgres configuration:\n{:#?}", &cfg); - + log::info!("Postgres configuration:\n{:#?}", &cfg); cfg } diff --git a/src/config/redis_cfg.rs b/src/config/redis_cfg.rs index 0dd1e87..f78a0fb 100644 --- a/src/config/redis_cfg.rs +++ b/src/config/redis_cfg.rs @@ -43,8 +43,7 @@ impl EnvVar { impl RedisConfig { const USER_SET_WARNING: &'static str = "Redis user specified, but Redis did not ask for a username. Ignoring it."; - const DB_SET_WARNING: &'static str = - r"Redis database specified, but PubSub connections do not use databases. + const DB_SET_WARNING: &'static str = r"Redis database specified, but PubSub connections do not use databases. For similar functionality, you may wish to set a REDIS_NAMESPACE"; pub fn from_env(env: EnvVar) -> Self { @@ -69,7 +68,7 @@ For similar functionality, you may wish to set a REDIS_NAMESPACE"; if cfg.user.is_some() { log::warn!("{}", Self::USER_SET_WARNING); } - log::warn!("Redis configuration:\n{:#?},", &cfg); + log::info!("Redis configuration:\n{:#?},", &cfg); cfg } } diff --git a/src/main.rs b/src/main.rs index ab263d8..1e4fcc0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use flodgatt::{ config, err, - parse_client_request::{sse, user, ws}, + parse_client_request::{sse, subscription, ws}, redis_to_client_stream::{self, ClientAgent}, }; use std::{collections::HashMap, env, fs, net, os::unix::fs::PermissionsExt}; @@ -18,7 +18,7 @@ fn main() { let env_vars = config::EnvVar::new(env_vars_map); pretty_env_logger::init(); - log::warn!( + log::info!( "Flodgatt recognized the following environmental variables:{}", env_vars.clone() ); @@ -26,24 +26,25 @@ fn main() { let cfg = config::DeploymentConfig::from_env(env_vars.clone()); let postgres_cfg = config::PostgresConfig::from_env(env_vars.clone()); - let pg_pool = user::PgPool::new(postgres_cfg); + let pg_pool = subscription::PgPool::new(postgres_cfg); let client_agent_sse = ClientAgent::blank(redis_cfg, pg_pool.clone()); let client_agent_ws = client_agent_sse.clone_with_shared_receiver(); - log::warn!("Streaming server initialized and ready to accept connections"); + log::info!("Streaming server initialized and ready to accept connections"); // Server Sent Events let sse_update_interval = *cfg.ws_interval; let sse_routes = sse::extract_user_or_reject(pg_pool.clone()) .and(warp::sse()) .map( - move |user: user::Subscription, sse_connection_to_client: warp::sse::Sse| { - log::info!("Incoming SSE request"); + move |subscription: subscription::Subscription, + sse_connection_to_client: warp::sse::Sse| { + log::info!("Incoming SSE request for {:?}", subscription.timeline); // Create a new ClientAgent let mut client_agent = client_agent_sse.clone_with_shared_receiver(); // Assign ClientAgent to generate stream of updates for the user/timeline pair - client_agent.init_for_user(user); + client_agent.init_for_user(subscription); // send the updates through the SSE connection redis_to_client_stream::send_updates_to_sse( client_agent, @@ -60,12 +61,12 @@ fn main() { let websocket_routes = ws::extract_user_and_token_or_reject(pg_pool.clone()) .and(warp::ws::ws2()) .map( - move |user: user::Subscription, token: Option, ws: Ws2| { - log::info!("Incoming websocket request"); + move |subscription: subscription::Subscription, token: Option, ws: Ws2| { + log::info!("Incoming websocket request for {:?}", subscription.timeline); // Create a new ClientAgent let mut client_agent = client_agent_ws.clone_with_shared_receiver(); // Assign that agent to generate a stream of updates for the user/timeline pair - client_agent.init_for_user(user); + client_agent.init_for_user(subscription); // send the updates through the WS connection (along with the User's access_token // which is sent for security) @@ -91,7 +92,7 @@ fn main() { let health = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK"); if let Some(socket) = &*cfg.unix_socket { - log::warn!("Using Unix socket {}", socket); + log::info!("Using Unix socket {}", socket); fs::remove_file(socket).unwrap_or_default(); let incoming = UnixListener::bind(socket).unwrap().incoming(); diff --git a/src/parse_client_request/mod.rs b/src/parse_client_request/mod.rs index b55fd8d..c999c0e 100644 --- a/src/parse_client_request/mod.rs +++ b/src/parse_client_request/mod.rs @@ -1,5 +1,5 @@ //! Parse the client request and return a (possibly authenticated) `User` pub mod query; pub mod sse; -pub mod user; +pub mod subscription; pub mod ws; diff --git a/src/parse_client_request/sse.rs b/src/parse_client_request/sse.rs index f863a60..279ea7a 100644 --- a/src/parse_client_request/sse.rs +++ b/src/parse_client_request/sse.rs @@ -1,7 +1,7 @@ //! Filters for all the endpoints accessible for Server Sent Event updates use super::{ query::{self, Query}, - user::{PgPool, Subscription}, + subscription::{PgPool, Subscription}, }; use warp::{filters::BoxedFilter, path, Filter}; #[allow(dead_code)] diff --git a/src/parse_client_request/user/mock_postgres.rs b/src/parse_client_request/subscription/mock_postgres.rs similarity index 100% rename from src/parse_client_request/user/mock_postgres.rs rename to src/parse_client_request/subscription/mock_postgres.rs diff --git a/src/parse_client_request/user/mod.rs b/src/parse_client_request/subscription/mod.rs similarity index 99% rename from src/parse_client_request/user/mod.rs rename to src/parse_client_request/subscription/mod.rs index 8cdff77..9017c81 100644 --- a/src/parse_client_request/user/mod.rs +++ b/src/parse_client_request/subscription/mod.rs @@ -135,7 +135,7 @@ impl Timeline { false => Err(custom("Error: Missing access token"))?, }, other => { - log::warn!("Client attempted to subscribe to: `{}`", other); + log::warn!("Request for nonexistent endpoint: `{}`", other); Err(custom("Error: Nonexistent endpoint"))? } }) diff --git a/src/parse_client_request/user/postgres.rs b/src/parse_client_request/subscription/postgres.rs similarity index 99% rename from src/parse_client_request/user/postgres.rs rename to src/parse_client_request/subscription/postgres.rs index 9b80708..9353efa 100644 --- a/src/parse_client_request/user/postgres.rs +++ b/src/parse_client_request/subscription/postgres.rs @@ -1,7 +1,7 @@ //! Postgres queries use crate::{ config, - parse_client_request::user::{Scope, UserData}, + parse_client_request::subscription::{Scope, UserData}, }; use ::postgres; use r2d2_postgres::PostgresConnectionManager; diff --git a/src/parse_client_request/user/stdin b/src/parse_client_request/subscription/stdin similarity index 100% rename from src/parse_client_request/user/stdin rename to src/parse_client_request/subscription/stdin diff --git a/src/parse_client_request/ws.rs b/src/parse_client_request/ws.rs index 560530e..1aac83f 100644 --- a/src/parse_client_request/ws.rs +++ b/src/parse_client_request/ws.rs @@ -1,7 +1,7 @@ //! Filters for the WebSocket endpoint use super::{ query::{self, Query}, - user::{PgPool, Subscription}, + subscription::{PgPool, Subscription}, }; use warp::{filters::BoxedFilter, path, Filter}; diff --git a/src/redis_to_client_stream/client_agent.rs b/src/redis_to_client_stream/client_agent.rs index ae81e09..4ff069d 100644 --- a/src/redis_to_client_stream/client_agent.rs +++ b/src/redis_to_client_stream/client_agent.rs @@ -18,13 +18,12 @@ use super::{message::Message, receiver::Receiver}; use crate::{ config, - parse_client_request::user::{PgPool, Stream::Public, Subscription, Timeline}, + parse_client_request::subscription::{PgPool, Stream::Public, Subscription, Timeline}, }; use futures::{ Async::{self, NotReady, Ready}, Poll, }; - use std::sync; use tokio::io::Error; use uuid::Uuid; @@ -34,7 +33,7 @@ use uuid::Uuid; pub struct ClientAgent { receiver: sync::Arc>, id: uuid::Uuid, - subscription: Subscription, + pub subscription: Subscription, } impl ClientAgent { @@ -97,6 +96,7 @@ impl futures::stream::Stream for ClientAgent { }; if start_time.elapsed().as_millis() > 1 { log::warn!("Polling the Receiver took: {:?}", start_time.elapsed()); + log::info!("Longer polling yielded: {:#?}", &result); }; let allowed_langs = &self.subscription.allowed_langs; diff --git a/src/redis_to_client_stream/message.rs b/src/redis_to_client_stream/message.rs index 31b7d76..bc35e1e 100644 --- a/src/redis_to_client_stream/message.rs +++ b/src/redis_to_client_stream/message.rs @@ -1,5 +1,4 @@ use crate::log_fatal; -use log::{log_enabled, Level}; use serde_json::Value; use std::{collections::HashSet, string::String}; use strum_macros::Display; @@ -50,11 +49,8 @@ impl Message { .unwrap_or_else(|| log_fatal!("Could not process `payload` in {:?}", json)) .to_string(), )), - unexpected_event => { - log::warn!( - "Received an unexpected `event` type from Redis: {}", - unexpected_event - ); + other => { + log::warn!("Received unexpected `event` from Redis: {}", other); Self::UnknownEvent(event.to_string(), json["payload"].clone()) } } @@ -96,14 +92,9 @@ impl Status { const REJECT: bool = true; let reject_and_maybe_log = |toot_language| { - if log_enabled!(Level::Info) { - log::info!( - "Language `{toot_language}` is not in list `{allowed_langs:?}`", - toot_language = toot_language, - allowed_langs = allowed_langs - ); - log::info!("Filtering out toot from `{}`", &self.0["account"]["acct"],); - } + log::info!("Filtering out toot from `{}`", &self.0["account"]["acct"]); + log::info!("Toot language: `{}`", toot_language); + log::info!("Recipient's allowed languages: `{:?}`", allowed_langs); REJECT }; if allowed_langs.is_empty() { diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index ea1902a..1f9d20f 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -3,7 +3,6 @@ pub mod client_agent; pub mod message; pub mod receiver; pub mod redis; - pub use client_agent::ClientAgent; use futures::{future::Future, stream::Stream, Async}; use log; @@ -40,6 +39,7 @@ pub fn send_updates_to_ws( update_interval: time::Duration, ) -> impl futures::future::Future { let (ws_tx, mut ws_rx) = socket.split(); + let timeline = client_agent.subscription.timeline; // Create a pipe let (tx, rx) = futures::sync::mpsc::unbounded(); @@ -62,16 +62,16 @@ pub fn send_updates_to_ws( Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true), Ok(Async::Ready(None)) => { // TODO: consider whether we should manually drop closed connections here - log::info!("Client closed WebSocket connection"); + log::info!("Client closed WebSocket connection for {:?}", timeline); futures::future::ok(false) } Err(e) if e.to_string() == "IO error: Broken pipe (os error 32)" => { // no err, just closed Unix socket - log::info!("Client closed WebSocket connection"); + log::info!("Client closed WebSocket connection for {:?}", timeline); futures::future::ok(false) } Err(e) => { - log::warn!("Error in TL {}", e); + log::warn!("Error in {:?}: {}", timeline, e); futures::future::ok(false) } }); @@ -98,8 +98,8 @@ pub fn send_updates_to_ws( }) .then(move |result| { // TODO: consider whether we should manually drop closed connections here - log::info!("WebSocket connection closed."); + log::info!("WebSocket connection for {:?} closed.", timeline); result }) - .map_err(move |e| log::warn!("Error sending to user: {}", e)) + .map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e)) } diff --git a/src/redis_to_client_stream/receiver/message_queues.rs b/src/redis_to_client_stream/receiver/message_queues.rs index 853e1a1..670bc45 100644 --- a/src/redis_to_client_stream/receiver/message_queues.rs +++ b/src/redis_to_client_stream/receiver/message_queues.rs @@ -1,4 +1,4 @@ -use crate::parse_client_request::user::Timeline; +use crate::parse_client_request::subscription::Timeline; use serde_json::Value; use std::{collections, fmt, time}; use uuid::Uuid; diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index aeafdd1..5b142d3 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -5,7 +5,7 @@ mod message_queues; use crate::{ config::{self, RedisInterval}, log_fatal, - parse_client_request::user::{self, postgres, PgPool, Timeline}, + parse_client_request::subscription::{self, postgres, PgPool, Timeline}, pubsub_cmd, redis_to_client_stream::redis::{redis_cmd, RedisConn, RedisStream}, }; @@ -92,7 +92,7 @@ impl Receiver { } fn if_hashtag_timeline_get_hashtag_name(&mut self, timeline: Timeline) -> Option { - use user::Stream::*; + use subscription::Stream::*; if let Timeline(Hashtag(id), _, _) = timeline { let cached_tag = self.cache.id_to_hashtag.get(&id).map(String::from); let tag = match cached_tag {