From d8f99b0ba0f3c15198072cd4e47655de33f7f8d8 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Fri, 3 Apr 2020 22:09:11 -0400 Subject: [PATCH] Add /status API endpoints [WIP] --- src/main.rs | 46 ++++++++++++++++------ src/redis_to_client_stream/receiver/mod.rs | 31 +++++++++++++++ 2 files changed, 65 insertions(+), 12 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2535f46..d8b5107 100644 --- a/src/main.rs +++ b/src/main.rs @@ -50,7 +50,7 @@ fn main() { .with(warp::reply::with::header("Connection", "keep-alive")); // WebSocket - let ws_receiver = sharable_receiver; + let ws_receiver = sharable_receiver.clone(); let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode); let ws_routes = Subscription::from_ws_request(pg_pool, whitelist_mode) .and(warp::ws::ws2()) @@ -76,6 +76,16 @@ fn main() { .allow_headers(cfg.cors.allowed_headers); let health = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK"); + let stats_receiver = sharable_receiver.clone(); + let status = warp::path!("api" / "v1" / "streaming" / "status") + .and(warp::path::end()) + .map(move || stats_receiver.lock().expect("TODO").count_connections()); + let stats_receiver = sharable_receiver.clone(); + let status_queue_len = warp::path!("api" / "v1" / "streaming" / "status" / "queue") + .map(move || stats_receiver.lock().expect("TODO").queue_length()); + let stats_receiver = sharable_receiver.clone(); + let status_per_timeline = warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline") + .map(move || stats_receiver.lock().expect("TODO").list_connections()); if let Some(socket) = &*cfg.unix_socket { log::info!("Using Unix socket {}", socket); @@ -84,20 +94,32 @@ fn main() { fs::set_permissions(socket, PermissionsExt::from_mode(0o666)).unwrap(); warp::serve( - health.or(ws_routes.or(sse_routes).with(cors).recover(|r: Rejection| { - let json_err = match r.cause() { - Some(text) if text.to_string() == "Missing request header 'authorization'" => { - warp::reply::json(&"Error: Missing access token".to_string()) - } - Some(text) => warp::reply::json(&text.to_string()), - None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()), - }; - Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED)) - })), + health.or( + status.or(status_per_timeline.or(status_queue_len.or(ws_routes + .or(sse_routes) + .with(cors) + .recover(|r: Rejection| { + let json_err = match r.cause() { + Some(text) + if text.to_string() == "Missing request header 'authorization'" => + { + warp::reply::json(&"Error: Missing access token".to_string()) + } + Some(text) => warp::reply::json(&text.to_string()), + None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()), + }; + Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED)) + })))), + ), ) .run_incoming(incoming); } else { let server_addr = SocketAddr::new(*cfg.address, *cfg.port); - warp::serve(health.or(ws_routes.or(sse_routes).with(cors))).run(server_addr); + warp::serve(health.or( + status.or( + status_per_timeline.or(status_queue_len.or(ws_routes.or(sse_routes).with(cors))), + ), + )) + .run(server_addr); }; } diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index 3e2e07b..8abdaea 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -107,6 +107,37 @@ impl Receiver { } } + pub fn count_connections(&self) -> String { + format!( + "Current connections: {}", + self.clients_per_timeline.values().sum::() + ) + } + + pub fn list_connections(&self) -> String { + let max_len = self + .clients_per_timeline + .keys() + .fold(0, |acc, el| acc.max(format!("{:?}:", el).len())); + self.clients_per_timeline + .iter() + .map(|(tl, n)| { + let tl_txt = format!("{:?}:", tl); + format!("{:>1$} {2}\n", tl_txt, max_len, n) + }) + .collect() + } + + pub fn queue_length(&self) -> String { + format!( + "Longest MessageQueue: {}", + self.msg_queues + .0 + .values() + .fold(0, |acc, el| acc.max(el.messages.len())) + ) + } + /// Drop any PubSub subscriptions that don't have active clients and check /// that there's a subscription to the current one. If there isn't, then /// subscribe to it.