Finish /status API endpoints

This PR enables compiling Flodgatt with the `stub_status` feature.
When compiled with `stub_status`, Flodgatt has 3 new API endpoints:
/api/v1/streaming/status, /api/v1/streaming/status/per_timeline, and
/api/v1/streaming/status/queue.  The first endpoint lists the total
number of connections, the second lists the number of connections per
timeline, and the third lists the length of the longest queue of
unsent messages (which should be low or zero when Flodgatt is
functioning normally).

Note that the number of _connections_ is not equal to the number of
connected _clients_.  If a user is viewing the local timeline, they
would have at least two connections: one for the local timeline, and
one for their user timeline.  Other users could have even more
connections.

I decided to make the status endpoints an option you enable at compile
time rather than at run time for three reasons:

  * It keeps the API of the default version of Flodgatt 100%
    compatible with the Node server's API;

  * I don't beleive it's an option Flodgatt adminstrators will want to
    toggle on and off frequently.

  * Using a compile time option ensures that there is zero runtime
    cost when the option is disabled.  (The runtime cost should be
    negligible either way, but there is value in being 100% sure that
    the cost can be eliminated.)

However, I'm happy to make it a runtime option instead if other think
that would be helpful.
This commit is contained in:
Daniel Sockwell 2020-04-05 10:29:52 -04:00
parent d8f99b0ba0
commit 00a9576c2f
3 changed files with 40 additions and 39 deletions

2
Cargo.lock generated
View File

@ -440,7 +440,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "flodgatt"
version = "0.6.8"
version = "0.7.0"
dependencies = [
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,7 +1,7 @@
[package]
name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.6.8"
version = "0.7.0"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018"
@ -38,4 +38,5 @@ harness = false
[features]
default = [ "production" ]
bench = []
stub_status = []
production = []

View File

@ -24,7 +24,7 @@ fn main() {
let pg_pool = PgPool::new(postgres_cfg);
let sharable_receiver = Receiver::try_from(redis_cfg)
let receiver = Receiver::try_from(redis_cfg)
.unwrap_or_else(|e| {
log::error!("{}\nFlodgatt shutting down...", e);
std::process::exit(1);
@ -33,7 +33,7 @@ fn main() {
log::info!("Streaming server initialized and ready to accept connections");
// Server Sent Events
let sse_receiver = sharable_receiver.clone();
let sse_receiver = receiver.clone();
let (sse_interval, whitelist_mode) = (*cfg.sse_interval, *cfg.whitelist_mode);
let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode)
.and(warp::sse())
@ -50,7 +50,7 @@ fn main() {
.with(warp::reply::with::header("Connection", "keep-alive"));
// WebSocket
let ws_receiver = sharable_receiver.clone();
let ws_receiver = receiver.clone();
let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode);
let ws_routes = Subscription::from_ws_request(pg_pool, whitelist_mode)
.and(warp::ws::ws2())
@ -75,17 +75,23 @@ fn main() {
.allow_methods(cfg.cors.allowed_methods)
.allow_headers(cfg.cors.allowed_headers);
let health = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK");
let stats_receiver = sharable_receiver.clone();
let status = warp::path!("api" / "v1" / "streaming" / "status")
.and(warp::path::end())
.map(move || stats_receiver.lock().expect("TODO").count_connections());
let stats_receiver = sharable_receiver.clone();
let status_queue_len = warp::path!("api" / "v1" / "streaming" / "status" / "queue")
.map(move || stats_receiver.lock().expect("TODO").queue_length());
let stats_receiver = sharable_receiver.clone();
let status_per_timeline = warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline")
.map(move || stats_receiver.lock().expect("TODO").list_connections());
#[cfg(feature = "stub_status")]
let status_endpoints = {
let (r1, r2, r3) = (receiver.clone(), receiver.clone(), receiver.clone());
warp::path!("api" / "v1" / "streaming" / "health")
.map(|| "OK")
.or(warp::path!("api" / "v1" / "streaming" / "status")
.and(warp::path::end())
.map(move || r1.lock().expect("TODO").count_connections()))
.or(warp::path!("api" / "v1" / "streaming" / "status" / "queue")
.map(move || r2.lock().expect("TODO").queue_length()))
.or(
warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline")
.map(move || r3.lock().expect("TODO").list_connections()),
)
};
#[cfg(not(feature = "stub_status"))]
let status_endpoints = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK");
if let Some(socket) = &*cfg.unix_socket {
log::info!("Using Unix socket {}", socket);
@ -94,32 +100,26 @@ fn main() {
fs::set_permissions(socket, PermissionsExt::from_mode(0o666)).unwrap();
warp::serve(
health.or(
status.or(status_per_timeline.or(status_queue_len.or(ws_routes
.or(sse_routes)
.with(cors)
.recover(|r: Rejection| {
let json_err = match r.cause() {
Some(text)
if text.to_string() == "Missing request header 'authorization'" =>
{
warp::reply::json(&"Error: Missing access token".to_string())
}
Some(text) => warp::reply::json(&text.to_string()),
None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()),
};
Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED))
})))),
),
ws_routes
.or(sse_routes)
.with(cors)
.or(status_endpoints)
.recover(|r: Rejection| {
let json_err = match r.cause() {
Some(text)
if text.to_string() == "Missing request header 'authorization'" =>
{
warp::reply::json(&"Error: Missing access token".to_string())
}
Some(text) => warp::reply::json(&text.to_string()),
None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()),
};
Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED))
}),
)
.run_incoming(incoming);
} else {
let server_addr = SocketAddr::new(*cfg.address, *cfg.port);
warp::serve(health.or(
status.or(
status_per_timeline.or(status_queue_len.or(ws_routes.or(sse_routes).with(cors))),
),
))
.run(server_addr);
warp::serve(ws_routes.or(sse_routes).with(cors).or(status_endpoints)).run(server_addr);
};
}