Add /status API endpoints [WIP]

This commit is contained in:
Daniel Sockwell 2020-04-03 22:09:11 -04:00
parent 6a6537253d
commit d8f99b0ba0
2 changed files with 65 additions and 12 deletions

View File

@ -50,7 +50,7 @@ fn main() {
.with(warp::reply::with::header("Connection", "keep-alive"));
// WebSocket
let ws_receiver = sharable_receiver;
let ws_receiver = sharable_receiver.clone();
let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode);
let ws_routes = Subscription::from_ws_request(pg_pool, whitelist_mode)
.and(warp::ws::ws2())
@ -76,6 +76,16 @@ fn main() {
.allow_headers(cfg.cors.allowed_headers);
let health = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK");
let stats_receiver = sharable_receiver.clone();
let status = warp::path!("api" / "v1" / "streaming" / "status")
.and(warp::path::end())
.map(move || stats_receiver.lock().expect("TODO").count_connections());
let stats_receiver = sharable_receiver.clone();
let status_queue_len = warp::path!("api" / "v1" / "streaming" / "status" / "queue")
.map(move || stats_receiver.lock().expect("TODO").queue_length());
let stats_receiver = sharable_receiver.clone();
let status_per_timeline = warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline")
.map(move || stats_receiver.lock().expect("TODO").list_connections());
if let Some(socket) = &*cfg.unix_socket {
log::info!("Using Unix socket {}", socket);
@ -84,20 +94,32 @@ fn main() {
fs::set_permissions(socket, PermissionsExt::from_mode(0o666)).unwrap();
warp::serve(
health.or(ws_routes.or(sse_routes).with(cors).recover(|r: Rejection| {
let json_err = match r.cause() {
Some(text) if text.to_string() == "Missing request header 'authorization'" => {
warp::reply::json(&"Error: Missing access token".to_string())
}
Some(text) => warp::reply::json(&text.to_string()),
None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()),
};
Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED))
})),
health.or(
status.or(status_per_timeline.or(status_queue_len.or(ws_routes
.or(sse_routes)
.with(cors)
.recover(|r: Rejection| {
let json_err = match r.cause() {
Some(text)
if text.to_string() == "Missing request header 'authorization'" =>
{
warp::reply::json(&"Error: Missing access token".to_string())
}
Some(text) => warp::reply::json(&text.to_string()),
None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()),
};
Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED))
})))),
),
)
.run_incoming(incoming);
} else {
let server_addr = SocketAddr::new(*cfg.address, *cfg.port);
warp::serve(health.or(ws_routes.or(sse_routes).with(cors))).run(server_addr);
warp::serve(health.or(
status.or(
status_per_timeline.or(status_queue_len.or(ws_routes.or(sse_routes).with(cors))),
),
))
.run(server_addr);
};
}

View File

@ -107,6 +107,37 @@ impl Receiver {
}
}
pub fn count_connections(&self) -> String {
format!(
"Current connections: {}",
self.clients_per_timeline.values().sum::<i32>()
)
}
pub fn list_connections(&self) -> String {
let max_len = self
.clients_per_timeline
.keys()
.fold(0, |acc, el| acc.max(format!("{:?}:", el).len()));
self.clients_per_timeline
.iter()
.map(|(tl, n)| {
let tl_txt = format!("{:?}:", tl);
format!("{:>1$} {2}\n", tl_txt, max_len, n)
})
.collect()
}
pub fn queue_length(&self) -> String {
format!(
"Longest MessageQueue: {}",
self.msg_queues
.0
.values()
.fold(0, |acc, el| acc.max(el.messages.len()))
)
}
/// Drop any PubSub subscriptions that don't have active clients and check
/// that there's a subscription to the current one. If there isn't, then
/// subscribe to it.