mirror of https://github.com/mastodon/flodgatt
Increase verbosity of debug info (#86)
* Increase verbosity of debug info * Add email field to User in tests
This commit is contained in:
parent
405b5e88e5
commit
ed75905fa3
|
@ -414,7 +414,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flodgatt"
|
name = "flodgatt"
|
||||||
version = "0.4.7"
|
version = "0.4.8"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "flodgatt"
|
name = "flodgatt"
|
||||||
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
|
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
|
||||||
version = "0.4.7"
|
version = "0.4.8"
|
||||||
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
|
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
|
|
|
@ -179,6 +179,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "public:media".to_string(),
|
target_timeline: "public:media".to_string(),
|
||||||
id: -1,
|
id: -1,
|
||||||
|
email: "".to_string(),
|
||||||
access_token: "no access token".to_string(),
|
access_token: "no access token".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -196,6 +197,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "public:media".to_string(),
|
target_timeline: "public:media".to_string(),
|
||||||
id: -1,
|
id: -1,
|
||||||
|
email: "".to_string(),
|
||||||
access_token: "no access token".to_string(),
|
access_token: "no access token".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -213,6 +215,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "public:local".to_string(),
|
target_timeline: "public:local".to_string(),
|
||||||
id: -1,
|
id: -1,
|
||||||
|
email: "".to_string(),
|
||||||
access_token: "no access token".to_string(),
|
access_token: "no access token".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -230,6 +233,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "public:local:media".to_string(),
|
target_timeline: "public:local:media".to_string(),
|
||||||
id: -1,
|
id: -1,
|
||||||
|
email: "".to_string(),
|
||||||
access_token: "no access token".to_string(),
|
access_token: "no access token".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -247,6 +251,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "public:local:media".to_string(),
|
target_timeline: "public:local:media".to_string(),
|
||||||
id: -1,
|
id: -1,
|
||||||
|
email: "".to_string(),
|
||||||
access_token: "no access token".to_string(),
|
access_token: "no access token".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -264,6 +269,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "hashtag:a".to_string(),
|
target_timeline: "hashtag:a".to_string(),
|
||||||
id: -1,
|
id: -1,
|
||||||
|
email: "".to_string(),
|
||||||
access_token: "no access token".to_string(),
|
access_token: "no access token".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -281,6 +287,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "hashtag:local:a".to_string(),
|
target_timeline: "hashtag:local:a".to_string(),
|
||||||
id: -1,
|
id: -1,
|
||||||
|
email: "".to_string(),
|
||||||
access_token: "no access token".to_string(),
|
access_token: "no access token".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -299,6 +306,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "1".to_string(),
|
target_timeline: "1".to_string(),
|
||||||
id: 1,
|
id: 1,
|
||||||
|
email: "user@example.com".to_string(),
|
||||||
access_token: "TEST_USER".to_string(),
|
access_token: "TEST_USER".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -316,6 +324,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "1".to_string(),
|
target_timeline: "1".to_string(),
|
||||||
id: 1,
|
id: 1,
|
||||||
|
email: "user@example.com".to_string(),
|
||||||
access_token: "TEST_USER".to_string(),
|
access_token: "TEST_USER".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -333,6 +342,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "direct".to_string(),
|
target_timeline: "direct".to_string(),
|
||||||
id: 1,
|
id: 1,
|
||||||
|
email: "user@example.com".to_string(),
|
||||||
access_token: "TEST_USER".to_string(),
|
access_token: "TEST_USER".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -352,6 +362,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "list:1".to_string(),
|
target_timeline: "list:1".to_string(),
|
||||||
id: 1,
|
id: 1,
|
||||||
|
email: "user@example.com".to_string(),
|
||||||
access_token: "TEST_USER".to_string(),
|
access_token: "TEST_USER".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
|
|
@ -11,10 +11,11 @@ impl PostgresPool {
|
||||||
pub fn query_for_user_data(
|
pub fn query_for_user_data(
|
||||||
access_token: &str,
|
access_token: &str,
|
||||||
_pg_pool: PostgresPool,
|
_pg_pool: PostgresPool,
|
||||||
) -> (i64, Option<Vec<String>>, Vec<String>) {
|
) -> (i64, String, Option<Vec<String>>, Vec<String>) {
|
||||||
let (user_id, lang, scopes) = if access_token == "TEST_USER" {
|
let (user_id, email, lang, scopes) = if access_token == "TEST_USER" {
|
||||||
(
|
(
|
||||||
1,
|
1,
|
||||||
|
"user@example.com".to_string(),
|
||||||
None,
|
None,
|
||||||
vec![
|
vec![
|
||||||
"read".to_string(),
|
"read".to_string(),
|
||||||
|
@ -23,9 +24,9 @@ pub fn query_for_user_data(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
(-1, None, Vec::new())
|
(-1, "".to_string(), None, Vec::new())
|
||||||
};
|
};
|
||||||
(user_id, lang, scopes)
|
(user_id, email, lang, scopes)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn query_list_owner(list_id: i64, _pg_pool: PostgresPool) -> Option<i64> {
|
pub fn query_list_owner(list_id: i64, _pg_pool: PostgresPool) -> Option<i64> {
|
||||||
|
|
|
@ -26,6 +26,7 @@ impl Default for Filter {
|
||||||
#[derive(Clone, Debug, Default, PartialEq)]
|
#[derive(Clone, Debug, Default, PartialEq)]
|
||||||
pub struct User {
|
pub struct User {
|
||||||
pub target_timeline: String,
|
pub target_timeline: String,
|
||||||
|
pub email: String, // We only use email for logging; we could cut it for performance
|
||||||
pub id: i64,
|
pub id: i64,
|
||||||
pub access_token: String,
|
pub access_token: String,
|
||||||
pub scopes: OauthScope,
|
pub scopes: OauthScope,
|
||||||
|
@ -59,26 +60,29 @@ impl From<Vec<String>> for OauthScope {
|
||||||
|
|
||||||
impl User {
|
impl User {
|
||||||
pub fn from_query(q: Query, pg_pool: PostgresPool) -> Result<Self, Rejection> {
|
pub fn from_query(q: Query, pg_pool: PostgresPool) -> Result<Self, Rejection> {
|
||||||
let (id, access_token, scopes, langs, logged_in) = match q.access_token.clone() {
|
let (id, access_token, email, scopes, langs, logged_in) = match q.access_token.clone() {
|
||||||
None => (
|
None => (
|
||||||
-1,
|
-1,
|
||||||
"no access token".to_owned(),
|
"no access token".to_owned(),
|
||||||
|
"".to_string(),
|
||||||
OauthScope::default(),
|
OauthScope::default(),
|
||||||
None,
|
None,
|
||||||
false,
|
false,
|
||||||
),
|
),
|
||||||
Some(token) => {
|
Some(token) => {
|
||||||
let (id, langs, scope_list) =
|
let (id, email, langs, scope_list) =
|
||||||
postgres::query_for_user_data(&token, pg_pool.clone());
|
postgres::query_for_user_data(&token, pg_pool.clone());
|
||||||
|
|
||||||
if id == -1 {
|
if id == -1 {
|
||||||
return Err(warp::reject::custom("Error: Invalid access token"));
|
return Err(warp::reject::custom("Error: Invalid access token"));
|
||||||
}
|
}
|
||||||
let scopes = OauthScope::from(scope_list);
|
let scopes = OauthScope::from(scope_list);
|
||||||
(id, token, scopes, langs, true)
|
(id, token, email, scopes, langs, true)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let mut user = User {
|
let mut user = User {
|
||||||
id,
|
id,
|
||||||
|
email,
|
||||||
target_timeline: "PLACEHOLDER".to_string(),
|
target_timeline: "PLACEHOLDER".to_string(),
|
||||||
access_token,
|
access_token,
|
||||||
scopes,
|
scopes,
|
||||||
|
|
|
@ -28,13 +28,13 @@ impl PostgresPool {
|
||||||
pub fn query_for_user_data(
|
pub fn query_for_user_data(
|
||||||
access_token: &str,
|
access_token: &str,
|
||||||
pg_pool: PostgresPool,
|
pg_pool: PostgresPool,
|
||||||
) -> (i64, Option<Vec<String>>, Vec<String>) {
|
) -> (i64, String, Option<Vec<String>>, Vec<String>) {
|
||||||
let mut conn = pg_pool.0.get().unwrap();
|
let mut conn = pg_pool.0.get().unwrap();
|
||||||
|
|
||||||
let query_result = conn
|
let query_result = conn
|
||||||
.query(
|
.query(
|
||||||
"
|
"
|
||||||
SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes
|
SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.email, users.chosen_languages, oauth_access_tokens.scopes
|
||||||
FROM
|
FROM
|
||||||
oauth_access_tokens
|
oauth_access_tokens
|
||||||
INNER JOIN users ON
|
INNER JOIN users ON
|
||||||
|
@ -48,15 +48,16 @@ LIMIT 1",
|
||||||
if !query_result.is_empty() {
|
if !query_result.is_empty() {
|
||||||
let only_row: &postgres::Row = query_result.get(0).unwrap();
|
let only_row: &postgres::Row = query_result.get(0).unwrap();
|
||||||
let id: i64 = only_row.get(1);
|
let id: i64 = only_row.get(1);
|
||||||
|
let email: String = only_row.get(2);
|
||||||
let scopes = only_row
|
let scopes = only_row
|
||||||
.get::<_, String>(3)
|
.get::<_, String>(4)
|
||||||
.split(' ')
|
.split(' ')
|
||||||
.map(|s| s.to_owned())
|
.map(|s| s.to_owned())
|
||||||
.collect();
|
.collect();
|
||||||
let langs: Option<Vec<String>> = only_row.get(2);
|
let langs: Option<Vec<String>> = only_row.get(3);
|
||||||
(id, langs, scopes)
|
(id, email, langs, scopes)
|
||||||
} else {
|
} else {
|
||||||
(-1, None, Vec::new())
|
(-1, "".to_string(), None, Vec::new())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -126,6 +126,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "public:media".to_string(),
|
target_timeline: "public:media".to_string(),
|
||||||
id: -1,
|
id: -1,
|
||||||
|
email: "".to_string(),
|
||||||
access_token: "no access token".to_string(),
|
access_token: "no access token".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -143,6 +144,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "public:local".to_string(),
|
target_timeline: "public:local".to_string(),
|
||||||
id: -1,
|
id: -1,
|
||||||
|
email: "".to_string(),
|
||||||
access_token: "no access token".to_string(),
|
access_token: "no access token".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -160,6 +162,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "public:local:media".to_string(),
|
target_timeline: "public:local:media".to_string(),
|
||||||
id: -1,
|
id: -1,
|
||||||
|
email: "".to_string(),
|
||||||
access_token: "no access token".to_string(),
|
access_token: "no access token".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -177,6 +180,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "hashtag:a".to_string(),
|
target_timeline: "hashtag:a".to_string(),
|
||||||
id: -1,
|
id: -1,
|
||||||
|
email: "".to_string(),
|
||||||
access_token: "no access token".to_string(),
|
access_token: "no access token".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -194,6 +198,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "hashtag:local:a".to_string(),
|
target_timeline: "hashtag:local:a".to_string(),
|
||||||
id: -1,
|
id: -1,
|
||||||
|
email: "".to_string(),
|
||||||
access_token: "no access token".to_string(),
|
access_token: "no access token".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -212,6 +217,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "1".to_string(),
|
target_timeline: "1".to_string(),
|
||||||
id: 1,
|
id: 1,
|
||||||
|
email: "user@example.com".to_string(),
|
||||||
access_token: "TEST_USER".to_string(),
|
access_token: "TEST_USER".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -229,6 +235,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "1".to_string(),
|
target_timeline: "1".to_string(),
|
||||||
id: 1,
|
id: 1,
|
||||||
|
email: "user@example.com".to_string(),
|
||||||
access_token: "TEST_USER".to_string(),
|
access_token: "TEST_USER".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -246,6 +253,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "direct".to_string(),
|
target_timeline: "direct".to_string(),
|
||||||
id: 1,
|
id: 1,
|
||||||
|
email: "user@example.com".to_string(),
|
||||||
access_token: "TEST_USER".to_string(),
|
access_token: "TEST_USER".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
@ -263,6 +271,7 @@ mod test {
|
||||||
user: User {
|
user: User {
|
||||||
target_timeline: "list:1".to_string(),
|
target_timeline: "list:1".to_string(),
|
||||||
id: 1,
|
id: 1,
|
||||||
|
email: "user@example.com".to_string(),
|
||||||
access_token: "TEST_USER".to_string(),
|
access_token: "TEST_USER".to_string(),
|
||||||
langs: None,
|
langs: None,
|
||||||
scopes: OauthScope {
|
scopes: OauthScope {
|
||||||
|
|
|
@ -28,8 +28,8 @@ use uuid::Uuid;
|
||||||
pub struct ClientAgent {
|
pub struct ClientAgent {
|
||||||
receiver: sync::Arc<sync::Mutex<Receiver>>,
|
receiver: sync::Arc<sync::Mutex<Receiver>>,
|
||||||
id: uuid::Uuid,
|
id: uuid::Uuid,
|
||||||
target_timeline: String,
|
pub target_timeline: String,
|
||||||
current_user: User,
|
pub current_user: User,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ClientAgent {
|
impl ClientAgent {
|
||||||
|
@ -111,7 +111,7 @@ impl futures::stream::Stream for ClientAgent {
|
||||||
/// The message to send to the client (which might not literally be a toot in some cases).
|
/// The message to send to the client (which might not literally be a toot in some cases).
|
||||||
struct Toot {
|
struct Toot {
|
||||||
category: String,
|
category: String,
|
||||||
payload: String,
|
payload: Value,
|
||||||
language: Option<String>,
|
language: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,7 +127,7 @@ impl Toot {
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
category,
|
category,
|
||||||
payload: value["payload"].to_string(),
|
payload: value["payload"].clone(),
|
||||||
language,
|
language,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ pub fn send_updates_to_sse(
|
||||||
/// Send a stream of replies to a WebSocket client.
|
/// Send a stream of replies to a WebSocket client.
|
||||||
pub fn send_updates_to_ws(
|
pub fn send_updates_to_ws(
|
||||||
socket: warp::ws::WebSocket,
|
socket: warp::ws::WebSocket,
|
||||||
mut stream: ClientAgent,
|
mut client_agent: ClientAgent,
|
||||||
update_interval: time::Duration,
|
update_interval: time::Duration,
|
||||||
) -> impl futures::future::Future<Item = (), Error = ()> {
|
) -> impl futures::future::Future<Item = (), Error = ()> {
|
||||||
let (ws_tx, mut ws_rx) = socket.split();
|
let (ws_tx, mut ws_rx) = socket.split();
|
||||||
|
@ -54,6 +54,11 @@ pub fn send_updates_to_ws(
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let (tl, email, id) = (
|
||||||
|
client_agent.current_user.target_timeline.clone(),
|
||||||
|
client_agent.current_user.email.clone(),
|
||||||
|
client_agent.current_user.id,
|
||||||
|
);
|
||||||
// Yield new events for as long as the client is still connected
|
// Yield new events for as long as the client is still connected
|
||||||
let event_stream = tokio::timer::Interval::new(time::Instant::now(), update_interval)
|
let event_stream = tokio::timer::Interval::new(time::Instant::now(), update_interval)
|
||||||
.take_while(move |_| match ws_rx.poll() {
|
.take_while(move |_| match ws_rx.poll() {
|
||||||
|
@ -69,18 +74,26 @@ pub fn send_updates_to_ws(
|
||||||
futures::future::ok(false)
|
futures::future::ok(false)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::warn!("{}", e);
|
log::warn!("Error in TL {}\nfor user: {}({})\n{}", tl, email, id, e);
|
||||||
futures::future::ok(false)
|
futures::future::ok(false)
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut time = time::Instant::now();
|
let mut time = time::Instant::now();
|
||||||
|
|
||||||
|
let (tl, email, id) = (
|
||||||
|
client_agent.current_user.target_timeline.clone(),
|
||||||
|
client_agent.current_user.email.clone(),
|
||||||
|
client_agent.current_user.id,
|
||||||
|
);
|
||||||
// Every time you get an event from that stream, send it through the pipe
|
// Every time you get an event from that stream, send it through the pipe
|
||||||
event_stream
|
event_stream
|
||||||
.for_each(move |_instant| {
|
.for_each(move |_instant| {
|
||||||
if let Ok(Async::Ready(Some(json_value))) = stream.poll() {
|
if let Ok(Async::Ready(Some(json_value))) = client_agent.poll() {
|
||||||
|
let toot = json_value["payload"]["content"].clone();
|
||||||
|
log::warn!("toot: {}\n in TL: {}\nuser: {}({})", toot, tl, email, id);
|
||||||
let msg = warp::ws::Message::text(json_value.to_string());
|
let msg = warp::ws::Message::text(json_value.to_string());
|
||||||
|
|
||||||
tx.unbounded_send(msg).expect("No send error");
|
tx.unbounded_send(msg).expect("No send error");
|
||||||
};
|
};
|
||||||
if time.elapsed() > time::Duration::from_secs(30) {
|
if time.elapsed() > time::Duration::from_secs(30) {
|
||||||
|
@ -95,5 +108,5 @@ pub fn send_updates_to_ws(
|
||||||
log::info!("WebSocket connection closed.");
|
log::info!("WebSocket connection closed.");
|
||||||
result
|
result
|
||||||
})
|
})
|
||||||
.map_err(move |e| log::error!("{}", e))
|
.map_err(move |e| log::warn!("Error sending to user: {}\n{}", id, e))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue