mirror of https://github.com/mastodon/flodgatt
Update logging (#85)
* Change "Incoming" log msgs from Warn to Info * Stop logging err when unix socket closed * Bump version to 0.4.7
This commit is contained in:
parent
ac75cb54af
commit
405b5e88e5
|
@ -414,7 +414,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flodgatt"
|
name = "flodgatt"
|
||||||
version = "0.4.6"
|
version = "0.4.7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "flodgatt"
|
name = "flodgatt"
|
||||||
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
|
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
|
||||||
version = "0.4.6"
|
version = "0.4.7"
|
||||||
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
|
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
|
|
10
src/main.rs
10
src/main.rs
|
@ -3,7 +3,6 @@ use flodgatt::{
|
||||||
parse_client_request::{sse, user, ws},
|
parse_client_request::{sse, user, ws},
|
||||||
redis_to_client_stream::{self, ClientAgent},
|
redis_to_client_stream::{self, ClientAgent},
|
||||||
};
|
};
|
||||||
use log::warn;
|
|
||||||
use std::{collections::HashMap, env, fs, net, os::unix::fs::PermissionsExt};
|
use std::{collections::HashMap, env, fs, net, os::unix::fs::PermissionsExt};
|
||||||
use tokio::net::UnixListener;
|
use tokio::net::UnixListener;
|
||||||
use warp::{path, ws::Ws2, Filter};
|
use warp::{path, ws::Ws2, Filter};
|
||||||
|
@ -19,7 +18,7 @@ fn main() {
|
||||||
let env_vars = config::EnvVar::new(env_vars_map);
|
let env_vars = config::EnvVar::new(env_vars_map);
|
||||||
pretty_env_logger::init();
|
pretty_env_logger::init();
|
||||||
|
|
||||||
warn!(
|
log::warn!(
|
||||||
"Flodgatt recognized the following environmental variables:{}",
|
"Flodgatt recognized the following environmental variables:{}",
|
||||||
env_vars.clone()
|
env_vars.clone()
|
||||||
);
|
);
|
||||||
|
@ -32,7 +31,7 @@ fn main() {
|
||||||
let client_agent_ws = client_agent_sse.clone_with_shared_receiver();
|
let client_agent_ws = client_agent_sse.clone_with_shared_receiver();
|
||||||
let pg_pool = user::PostgresPool::new(postgres_cfg);
|
let pg_pool = user::PostgresPool::new(postgres_cfg);
|
||||||
|
|
||||||
warn!("Streaming server initialized and ready to accept connections");
|
log::warn!("Streaming server initialized and ready to accept connections");
|
||||||
|
|
||||||
// Server Sent Events
|
// Server Sent Events
|
||||||
let sse_update_interval = *cfg.ws_interval;
|
let sse_update_interval = *cfg.ws_interval;
|
||||||
|
@ -40,6 +39,7 @@ fn main() {
|
||||||
.and(warp::sse())
|
.and(warp::sse())
|
||||||
.map(
|
.map(
|
||||||
move |user: user::User, sse_connection_to_client: warp::sse::Sse| {
|
move |user: user::User, sse_connection_to_client: warp::sse::Sse| {
|
||||||
|
log::info!("Incoming SSE request");
|
||||||
// Create a new ClientAgent
|
// Create a new ClientAgent
|
||||||
let mut client_agent = client_agent_sse.clone_with_shared_receiver();
|
let mut client_agent = client_agent_sse.clone_with_shared_receiver();
|
||||||
// Assign ClientAgent to generate stream of updates for the user/timeline pair
|
// Assign ClientAgent to generate stream of updates for the user/timeline pair
|
||||||
|
@ -60,7 +60,7 @@ fn main() {
|
||||||
let websocket_routes = ws::extract_user_or_reject(pg_pool.clone())
|
let websocket_routes = ws::extract_user_or_reject(pg_pool.clone())
|
||||||
.and(warp::ws::ws2())
|
.and(warp::ws::ws2())
|
||||||
.map(move |user: user::User, ws: Ws2| {
|
.map(move |user: user::User, ws: Ws2| {
|
||||||
warn!("Incoming request");
|
log::info!("Incoming websocket request");
|
||||||
let token = user.access_token.clone();
|
let token = user.access_token.clone();
|
||||||
// Create a new ClientAgent
|
// Create a new ClientAgent
|
||||||
let mut client_agent = client_agent_ws.clone_with_shared_receiver();
|
let mut client_agent = client_agent_ws.clone_with_shared_receiver();
|
||||||
|
@ -90,7 +90,7 @@ fn main() {
|
||||||
let health = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK");
|
let health = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK");
|
||||||
|
|
||||||
if let Some(socket) = &*cfg.unix_socket {
|
if let Some(socket) = &*cfg.unix_socket {
|
||||||
warn!("Using Unix socket {}", socket);
|
log::warn!("Using Unix socket {}", socket);
|
||||||
fs::remove_file(socket).unwrap_or_default();
|
fs::remove_file(socket).unwrap_or_default();
|
||||||
let incoming = UnixListener::bind(socket).unwrap().incoming();
|
let incoming = UnixListener::bind(socket).unwrap().incoming();
|
||||||
|
|
||||||
|
|
|
@ -48,7 +48,10 @@ pub fn send_updates_to_ws(
|
||||||
rx.map_err(|()| -> warp::Error { unreachable!() })
|
rx.map_err(|()| -> warp::Error { unreachable!() })
|
||||||
.forward(ws_tx)
|
.forward(ws_tx)
|
||||||
.map(|_r| ())
|
.map(|_r| ())
|
||||||
.map_err(|e| eprintln!("websocket send error: {}", e)),
|
.map_err(|e| match e.to_string().as_ref() {
|
||||||
|
"IO error: Broken pipe (os error 32)" => (), // just closed unix socket
|
||||||
|
_ => log::warn!("websocket send error: {}", e),
|
||||||
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Yield new events for as long as the client is still connected
|
// Yield new events for as long as the client is still connected
|
||||||
|
@ -60,6 +63,11 @@ pub fn send_updates_to_ws(
|
||||||
log::info!("Client closed WebSocket connection");
|
log::info!("Client closed WebSocket connection");
|
||||||
futures::future::ok(false)
|
futures::future::ok(false)
|
||||||
}
|
}
|
||||||
|
Err(e) if e.to_string() == "IO error: Broken pipe (os error 32)" => {
|
||||||
|
// no err, just closed Unix socket
|
||||||
|
log::info!("Client closed WebSocket connection");
|
||||||
|
futures::future::ok(false)
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::warn!("{}", e);
|
log::warn!("{}", e);
|
||||||
futures::future::ok(false)
|
futures::future::ok(false)
|
||||||
|
|
Loading…
Reference in New Issue