From ed75905fa36b63bdc749c4bdd5e368a87c459700 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Wed, 11 Mar 2020 11:31:29 -0400 Subject: [PATCH] Increase verbosity of debug info (#86) * Increase verbosity of debug info * Add email field to User in tests --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/parse_client_request/sse.rs | 11 ++++++++++ .../user/mock_postgres.rs | 9 ++++---- src/parse_client_request/user/mod.rs | 10 ++++++--- src/parse_client_request/user/postgres.rs | 13 ++++++------ src/parse_client_request/ws.rs | 9 ++++++++ src/redis_to_client_stream/client_agent.rs | 8 +++---- src/redis_to_client_stream/mod.rs | 21 +++++++++++++++---- 9 files changed, 62 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5119ced..7454cd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -414,7 +414,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.4.7" +version = "0.4.8" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 6454c5b..cbdacc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.4.7" +version = "0.4.8" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/parse_client_request/sse.rs b/src/parse_client_request/sse.rs index f30e598..981a187 100644 --- a/src/parse_client_request/sse.rs +++ b/src/parse_client_request/sse.rs @@ -179,6 +179,7 @@ mod test { user: User { target_timeline: "public:media".to_string(), id: -1, + email: "".to_string(), access_token: "no access token".to_string(), langs: None, scopes: OauthScope { @@ -196,6 +197,7 @@ mod test { user: User { target_timeline: "public:media".to_string(), id: -1, + email: "".to_string(), access_token: "no access token".to_string(), langs: None, scopes: OauthScope { @@ -213,6 +215,7 @@ mod test { user: User { target_timeline: "public:local".to_string(), id: -1, + email: "".to_string(), access_token: "no access token".to_string(), langs: None, scopes: OauthScope { @@ -230,6 +233,7 @@ mod test { user: User { target_timeline: "public:local:media".to_string(), id: -1, + email: "".to_string(), access_token: "no access token".to_string(), langs: None, scopes: OauthScope { @@ -247,6 +251,7 @@ mod test { user: User { target_timeline: "public:local:media".to_string(), id: -1, + email: "".to_string(), access_token: "no access token".to_string(), langs: None, scopes: OauthScope { @@ -264,6 +269,7 @@ mod test { user: User { target_timeline: "hashtag:a".to_string(), id: -1, + email: "".to_string(), access_token: "no access token".to_string(), langs: None, scopes: OauthScope { @@ -281,6 +287,7 @@ mod test { user: User { target_timeline: "hashtag:local:a".to_string(), id: -1, + email: "".to_string(), access_token: "no access token".to_string(), langs: None, scopes: OauthScope { @@ -299,6 +306,7 @@ mod test { user: User { target_timeline: "1".to_string(), id: 1, + email: "user@example.com".to_string(), access_token: "TEST_USER".to_string(), langs: None, scopes: OauthScope { @@ -316,6 +324,7 @@ mod test { user: User { target_timeline: "1".to_string(), id: 1, + email: "user@example.com".to_string(), access_token: "TEST_USER".to_string(), langs: None, scopes: OauthScope { @@ -333,6 +342,7 @@ mod test { user: User { target_timeline: "direct".to_string(), id: 1, + email: "user@example.com".to_string(), access_token: "TEST_USER".to_string(), langs: None, scopes: OauthScope { @@ -352,6 +362,7 @@ mod test { user: User { target_timeline: "list:1".to_string(), id: 1, + email: "user@example.com".to_string(), access_token: "TEST_USER".to_string(), langs: None, scopes: OauthScope { diff --git a/src/parse_client_request/user/mock_postgres.rs b/src/parse_client_request/user/mock_postgres.rs index 6446aac..6f4739d 100644 --- a/src/parse_client_request/user/mock_postgres.rs +++ b/src/parse_client_request/user/mock_postgres.rs @@ -11,10 +11,11 @@ impl PostgresPool { pub fn query_for_user_data( access_token: &str, _pg_pool: PostgresPool, -) -> (i64, Option>, Vec) { - let (user_id, lang, scopes) = if access_token == "TEST_USER" { +) -> (i64, String, Option>, Vec) { + let (user_id, email, lang, scopes) = if access_token == "TEST_USER" { ( 1, + "user@example.com".to_string(), None, vec![ "read".to_string(), @@ -23,9 +24,9 @@ pub fn query_for_user_data( ], ) } else { - (-1, None, Vec::new()) + (-1, "".to_string(), None, Vec::new()) }; - (user_id, lang, scopes) + (user_id, email, lang, scopes) } pub fn query_list_owner(list_id: i64, _pg_pool: PostgresPool) -> Option { diff --git a/src/parse_client_request/user/mod.rs b/src/parse_client_request/user/mod.rs index af7086d..6d2dedc 100644 --- a/src/parse_client_request/user/mod.rs +++ b/src/parse_client_request/user/mod.rs @@ -26,6 +26,7 @@ impl Default for Filter { #[derive(Clone, Debug, Default, PartialEq)] pub struct User { pub target_timeline: String, + pub email: String, // We only use email for logging; we could cut it for performance pub id: i64, pub access_token: String, pub scopes: OauthScope, @@ -59,26 +60,29 @@ impl From> for OauthScope { impl User { pub fn from_query(q: Query, pg_pool: PostgresPool) -> Result { - let (id, access_token, scopes, langs, logged_in) = match q.access_token.clone() { + let (id, access_token, email, scopes, langs, logged_in) = match q.access_token.clone() { None => ( -1, "no access token".to_owned(), + "".to_string(), OauthScope::default(), None, false, ), Some(token) => { - let (id, langs, scope_list) = + let (id, email, langs, scope_list) = postgres::query_for_user_data(&token, pg_pool.clone()); + if id == -1 { return Err(warp::reject::custom("Error: Invalid access token")); } let scopes = OauthScope::from(scope_list); - (id, token, scopes, langs, true) + (id, token, email, scopes, langs, true) } }; let mut user = User { id, + email, target_timeline: "PLACEHOLDER".to_string(), access_token, scopes, diff --git a/src/parse_client_request/user/postgres.rs b/src/parse_client_request/user/postgres.rs index cf3f979..6c95ed7 100644 --- a/src/parse_client_request/user/postgres.rs +++ b/src/parse_client_request/user/postgres.rs @@ -28,13 +28,13 @@ impl PostgresPool { pub fn query_for_user_data( access_token: &str, pg_pool: PostgresPool, -) -> (i64, Option>, Vec) { +) -> (i64, String, Option>, Vec) { let mut conn = pg_pool.0.get().unwrap(); let query_result = conn .query( " -SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes +SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.email, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON @@ -48,15 +48,16 @@ LIMIT 1", if !query_result.is_empty() { let only_row: &postgres::Row = query_result.get(0).unwrap(); let id: i64 = only_row.get(1); + let email: String = only_row.get(2); let scopes = only_row - .get::<_, String>(3) + .get::<_, String>(4) .split(' ') .map(|s| s.to_owned()) .collect(); - let langs: Option> = only_row.get(2); - (id, langs, scopes) + let langs: Option> = only_row.get(3); + (id, email, langs, scopes) } else { - (-1, None, Vec::new()) + (-1, "".to_string(), None, Vec::new()) } } diff --git a/src/parse_client_request/ws.rs b/src/parse_client_request/ws.rs index 2fd965e..777478c 100644 --- a/src/parse_client_request/ws.rs +++ b/src/parse_client_request/ws.rs @@ -126,6 +126,7 @@ mod test { user: User { target_timeline: "public:media".to_string(), id: -1, + email: "".to_string(), access_token: "no access token".to_string(), langs: None, scopes: OauthScope { @@ -143,6 +144,7 @@ mod test { user: User { target_timeline: "public:local".to_string(), id: -1, + email: "".to_string(), access_token: "no access token".to_string(), langs: None, scopes: OauthScope { @@ -160,6 +162,7 @@ mod test { user: User { target_timeline: "public:local:media".to_string(), id: -1, + email: "".to_string(), access_token: "no access token".to_string(), langs: None, scopes: OauthScope { @@ -177,6 +180,7 @@ mod test { user: User { target_timeline: "hashtag:a".to_string(), id: -1, + email: "".to_string(), access_token: "no access token".to_string(), langs: None, scopes: OauthScope { @@ -194,6 +198,7 @@ mod test { user: User { target_timeline: "hashtag:local:a".to_string(), id: -1, + email: "".to_string(), access_token: "no access token".to_string(), langs: None, scopes: OauthScope { @@ -212,6 +217,7 @@ mod test { user: User { target_timeline: "1".to_string(), id: 1, + email: "user@example.com".to_string(), access_token: "TEST_USER".to_string(), langs: None, scopes: OauthScope { @@ -229,6 +235,7 @@ mod test { user: User { target_timeline: "1".to_string(), id: 1, + email: "user@example.com".to_string(), access_token: "TEST_USER".to_string(), langs: None, scopes: OauthScope { @@ -246,6 +253,7 @@ mod test { user: User { target_timeline: "direct".to_string(), id: 1, + email: "user@example.com".to_string(), access_token: "TEST_USER".to_string(), langs: None, scopes: OauthScope { @@ -263,6 +271,7 @@ mod test { user: User { target_timeline: "list:1".to_string(), id: 1, + email: "user@example.com".to_string(), access_token: "TEST_USER".to_string(), langs: None, scopes: OauthScope { diff --git a/src/redis_to_client_stream/client_agent.rs b/src/redis_to_client_stream/client_agent.rs index e4e8451..c71be4e 100644 --- a/src/redis_to_client_stream/client_agent.rs +++ b/src/redis_to_client_stream/client_agent.rs @@ -28,8 +28,8 @@ use uuid::Uuid; pub struct ClientAgent { receiver: sync::Arc>, id: uuid::Uuid, - target_timeline: String, - current_user: User, + pub target_timeline: String, + pub current_user: User, } impl ClientAgent { @@ -111,7 +111,7 @@ impl futures::stream::Stream for ClientAgent { /// The message to send to the client (which might not literally be a toot in some cases). struct Toot { category: String, - payload: String, + payload: Value, language: Option, } @@ -127,7 +127,7 @@ impl Toot { Self { category, - payload: value["payload"].to_string(), + payload: value["payload"].clone(), language, } } diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index e2e5777..609843b 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -34,7 +34,7 @@ pub fn send_updates_to_sse( /// Send a stream of replies to a WebSocket client. pub fn send_updates_to_ws( socket: warp::ws::WebSocket, - mut stream: ClientAgent, + mut client_agent: ClientAgent, update_interval: time::Duration, ) -> impl futures::future::Future { let (ws_tx, mut ws_rx) = socket.split(); @@ -54,6 +54,11 @@ pub fn send_updates_to_ws( }), ); + let (tl, email, id) = ( + client_agent.current_user.target_timeline.clone(), + client_agent.current_user.email.clone(), + client_agent.current_user.id, + ); // Yield new events for as long as the client is still connected let event_stream = tokio::timer::Interval::new(time::Instant::now(), update_interval) .take_while(move |_| match ws_rx.poll() { @@ -69,18 +74,26 @@ pub fn send_updates_to_ws( futures::future::ok(false) } Err(e) => { - log::warn!("{}", e); + log::warn!("Error in TL {}\nfor user: {}({})\n{}", tl, email, id, e); futures::future::ok(false) } }); let mut time = time::Instant::now(); + let (tl, email, id) = ( + client_agent.current_user.target_timeline.clone(), + client_agent.current_user.email.clone(), + client_agent.current_user.id, + ); // Every time you get an event from that stream, send it through the pipe event_stream .for_each(move |_instant| { - if let Ok(Async::Ready(Some(json_value))) = stream.poll() { + if let Ok(Async::Ready(Some(json_value))) = client_agent.poll() { + let toot = json_value["payload"]["content"].clone(); + log::warn!("toot: {}\n in TL: {}\nuser: {}({})", toot, tl, email, id); let msg = warp::ws::Message::text(json_value.to_string()); + tx.unbounded_send(msg).expect("No send error"); }; if time.elapsed() > time::Duration::from_secs(30) { @@ -95,5 +108,5 @@ pub fn send_updates_to_ws( log::info!("WebSocket connection closed."); result }) - .map_err(move |e| log::error!("{}", e)) + .map_err(move |e| log::warn!("Error sending to user: {}\n{}", id, e)) }