diff --git a/Cargo.lock b/Cargo.lock index bb10ac9..d5dd39c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -440,7 +440,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.6.8" +version = "0.7.0" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 9d6740b..627174d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.6.8" +version = "0.7.0" authors = ["Daniel Long Sockwell "] edition = "2018" @@ -38,4 +38,5 @@ harness = false [features] default = [ "production" ] bench = [] +stub_status = [] production = [] diff --git a/src/main.rs b/src/main.rs index 2535f46..74b6702 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,7 +24,7 @@ fn main() { let pg_pool = PgPool::new(postgres_cfg); - let sharable_receiver = Receiver::try_from(redis_cfg) + let receiver = Receiver::try_from(redis_cfg) .unwrap_or_else(|e| { log::error!("{}\nFlodgatt shutting down...", e); std::process::exit(1); @@ -33,7 +33,7 @@ fn main() { log::info!("Streaming server initialized and ready to accept connections"); // Server Sent Events - let sse_receiver = sharable_receiver.clone(); + let sse_receiver = receiver.clone(); let (sse_interval, whitelist_mode) = (*cfg.sse_interval, *cfg.whitelist_mode); let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode) .and(warp::sse()) @@ -50,7 +50,7 @@ fn main() { .with(warp::reply::with::header("Connection", "keep-alive")); // WebSocket - let ws_receiver = sharable_receiver; + let ws_receiver = receiver.clone(); let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode); let ws_routes = Subscription::from_ws_request(pg_pool, whitelist_mode) .and(warp::ws::ws2()) @@ -75,7 +75,23 @@ fn main() { .allow_methods(cfg.cors.allowed_methods) .allow_headers(cfg.cors.allowed_headers); - let health = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK"); + #[cfg(feature = "stub_status")] + let status_endpoints = { + let (r1, r2, r3) = (receiver.clone(), receiver.clone(), receiver.clone()); + warp::path!("api" / "v1" / "streaming" / "health") + .map(|| "OK") + .or(warp::path!("api" / "v1" / "streaming" / "status") + .and(warp::path::end()) + .map(move || r1.lock().expect("TODO").count_connections())) + .or(warp::path!("api" / "v1" / "streaming" / "status" / "queue") + .map(move || r2.lock().expect("TODO").queue_length())) + .or( + warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline") + .map(move || r3.lock().expect("TODO").list_connections()), + ) + }; + #[cfg(not(feature = "stub_status"))] + let status_endpoints = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK"); if let Some(socket) = &*cfg.unix_socket { log::info!("Using Unix socket {}", socket); @@ -84,20 +100,26 @@ fn main() { fs::set_permissions(socket, PermissionsExt::from_mode(0o666)).unwrap(); warp::serve( - health.or(ws_routes.or(sse_routes).with(cors).recover(|r: Rejection| { - let json_err = match r.cause() { - Some(text) if text.to_string() == "Missing request header 'authorization'" => { - warp::reply::json(&"Error: Missing access token".to_string()) - } - Some(text) => warp::reply::json(&text.to_string()), - None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()), - }; - Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED)) - })), + ws_routes + .or(sse_routes) + .with(cors) + .or(status_endpoints) + .recover(|r: Rejection| { + let json_err = match r.cause() { + Some(text) + if text.to_string() == "Missing request header 'authorization'" => + { + warp::reply::json(&"Error: Missing access token".to_string()) + } + Some(text) => warp::reply::json(&text.to_string()), + None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()), + }; + Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED)) + }), ) .run_incoming(incoming); } else { let server_addr = SocketAddr::new(*cfg.address, *cfg.port); - warp::serve(health.or(ws_routes.or(sse_routes).with(cors))).run(server_addr); + warp::serve(ws_routes.or(sse_routes).with(cors).or(status_endpoints)).run(server_addr); }; } diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index 3e2e07b..8abdaea 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -107,6 +107,37 @@ impl Receiver { } } + pub fn count_connections(&self) -> String { + format!( + "Current connections: {}", + self.clients_per_timeline.values().sum::() + ) + } + + pub fn list_connections(&self) -> String { + let max_len = self + .clients_per_timeline + .keys() + .fold(0, |acc, el| acc.max(format!("{:?}:", el).len())); + self.clients_per_timeline + .iter() + .map(|(tl, n)| { + let tl_txt = format!("{:?}:", tl); + format!("{:>1$} {2}\n", tl_txt, max_len, n) + }) + .collect() + } + + pub fn queue_length(&self) -> String { + format!( + "Longest MessageQueue: {}", + self.msg_queues + .0 + .values() + .fold(0, |acc, el| acc.max(el.messages.len())) + ) + } + /// Drop any PubSub subscriptions that don't have active clients and check /// that there's a subscription to the current one. If there isn't, then /// subscribe to it.