mirror of https://github.com/mastodon/flodgatt
Use Timeline type instead of String
This commit is contained in:
parent
7dafa834c1
commit
bef5cfa45b
|
@ -1,5 +1,13 @@
|
|||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.2.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"const-random 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aho-corasick"
|
||||
version = "0.7.6"
|
||||
|
@ -33,7 +41,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "autocfg"
|
||||
version = "0.1.2"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
|
@ -41,7 +49,7 @@ name = "backtrace"
|
|||
version = "0.3.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"autocfg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"backtrace-sys 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -192,6 +200,24 @@ dependencies = [
|
|||
"bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const-random"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"const-random-macro 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const-random-macro"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"getrandom 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "criterion"
|
||||
version = "0.3.0"
|
||||
|
@ -420,6 +446,7 @@ dependencies = [
|
|||
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lru 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"postgres 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"postgres-openssl 0.2.0-rc.1 (git+https://github.com/sfackler/rust-postgres.git)",
|
||||
"pretty_env_logger 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -613,6 +640,15 @@ dependencies = [
|
|||
"tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.6.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"ahash 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "headers"
|
||||
version = "0.2.1"
|
||||
|
@ -817,6 +853,14 @@ dependencies = [
|
|||
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"hashbrown 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matches"
|
||||
version = "0.1.8"
|
||||
|
@ -957,7 +1001,7 @@ name = "num-integer"
|
|||
version = "0.1.40"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"autocfg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"num-traits 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
|
@ -966,7 +1010,7 @@ name = "num-traits"
|
|||
version = "0.2.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"autocfg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1005,7 +1049,7 @@ name = "openssl-sys"
|
|||
version = "0.9.49"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"autocfg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"pkg-config 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -1315,7 +1359,7 @@ name = "rand"
|
|||
version = "0.6.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"autocfg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -1345,7 +1389,7 @@ name = "rand_chacha"
|
|||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"autocfg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
|
@ -1440,7 +1484,7 @@ name = "rand_pcg"
|
|||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"autocfg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
|
@ -2334,11 +2378,12 @@ dependencies = [
|
|||
]
|
||||
|
||||
[metadata]
|
||||
"checksum ahash 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "6f33b5018f120946c1dcf279194f238a9f146725593ead1c08fa47ff22b0b5d3"
|
||||
"checksum aho-corasick 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "58fb5e95d83b38284460a5fda7d6470aa0b8844d283a0b614b8535e880800d2d"
|
||||
"checksum antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5"
|
||||
"checksum arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "92c7fb76bc8826a8b33b4ee5bb07a247a81e76764ab4d55e8f73e3a4d8808c71"
|
||||
"checksum atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9a7d5b8723950951411ee34d271d99dddcc2035a16ab25310ea2c8cfd4369652"
|
||||
"checksum autocfg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a6d640bee2da49f60a4068a7fae53acde8982514ab7bae8b8cea9e88cbcfd799"
|
||||
"checksum autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2"
|
||||
"checksum backtrace 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "f106c02a3604afcdc0df5d36cc47b44b55917dbaf3d808f71c163a0ddba64637"
|
||||
"checksum backtrace-sys 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "797c830ac25ccc92a7f8a7b9862bde440715531514594a6154e3d4a54dd769b6"
|
||||
"checksum base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e"
|
||||
|
@ -2359,6 +2404,8 @@ dependencies = [
|
|||
"checksum chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "77d81f58b7301084de3b958691458a53c3f7e0b1d702f77e550b6a88e3a88abe"
|
||||
"checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9"
|
||||
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
|
||||
"checksum const-random 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
|
||||
"checksum const-random-macro 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
|
||||
"checksum criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "938703e165481c8d612ea3479ac8342e5615185db37765162e762ec3523e2fc6"
|
||||
"checksum criterion-plot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "eccdc6ce8bbe352ca89025bee672aa6d24f4eb8c53e3a8b5d1bc58011da072a2"
|
||||
"checksum crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b18cd2e169ad86297e6bc0ad9aa679aee9daa4f19e8163860faf7c164e4f5a71"
|
||||
|
@ -2403,6 +2450,7 @@ dependencies = [
|
|||
"checksum generic-array 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0ed1e761351b56f54eb9dcd0cfaca9fd0daecf93918e1cfc01c8a3d26ee7adcd"
|
||||
"checksum getrandom 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "473a1265acc8ff1e808cd0a1af8cee3c2ee5200916058a2ca113c29f2d903571"
|
||||
"checksum h2 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "85ab6286db06040ddefb71641b50017c06874614001a134b423783e2db2920bd"
|
||||
"checksum hashbrown 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead"
|
||||
"checksum headers 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dc6e2e51d356081258ef05ff4c648138b5d3fe64b7300aaad3b820554a2b7fb6"
|
||||
"checksum headers-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "51ae5b0b5417559ee1d2733b21d33b0868ae9e406bd32eb1a51d613f66ed472a"
|
||||
"checksum headers-derive 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "97c462e8066bca4f0968ddf8d12de64c40f2c2187b3b9a2fa994d06e8ad444a9"
|
||||
|
@ -2426,6 +2474,7 @@ dependencies = [
|
|||
"checksum lock_api 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "79b2de95ecb4691949fea4716ca53cdbcfccb2c612e19644a8bad05edcf9f47b"
|
||||
"checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b"
|
||||
"checksum log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c84ec4b527950aa83a329754b01dbe3f58361d1c5efacd1f6d68c494d08a17c6"
|
||||
"checksum lru 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0609345ddee5badacf857d4f547e0e5a2e987db77085c24cd887f73573a04237"
|
||||
"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
|
||||
"checksum md5 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e6bcd6433cff03a4bfc3d9834d504467db1f1cf6d0ea765d37d330249ed629d"
|
||||
"checksum md5 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
|
||||
|
|
|
@ -23,6 +23,7 @@ strum = "0.16.0"
|
|||
strum_macros = "0.16.0"
|
||||
r2d2_postgres = "0.16.0"
|
||||
r2d2 = "0.8.8"
|
||||
lru = "0.4.3"
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = "0.3"
|
||||
|
|
48
src/main.rs
48
src/main.rs
|
@ -26,11 +26,11 @@ fn main() {
|
|||
let cfg = config::DeploymentConfig::from_env(env_vars.clone());
|
||||
|
||||
let postgres_cfg = config::PostgresConfig::from_env(env_vars.clone());
|
||||
|
||||
let client_agent_sse = ClientAgent::blank(redis_cfg);
|
||||
let client_agent_ws = client_agent_sse.clone_with_shared_receiver();
|
||||
let pg_pool = user::PgPool::new(postgres_cfg);
|
||||
|
||||
let client_agent_sse = ClientAgent::blank(redis_cfg, pg_pool.clone());
|
||||
let client_agent_ws = client_agent_sse.clone_with_shared_receiver();
|
||||
|
||||
log::warn!("Streaming server initialized and ready to accept connections");
|
||||
|
||||
// Server Sent Events
|
||||
|
@ -38,7 +38,7 @@ fn main() {
|
|||
let sse_routes = sse::extract_user_or_reject(pg_pool.clone())
|
||||
.and(warp::sse())
|
||||
.map(
|
||||
move |user: user::User, sse_connection_to_client: warp::sse::Sse| {
|
||||
move |user: user::Subscription, sse_connection_to_client: warp::sse::Sse| {
|
||||
log::info!("Incoming SSE request");
|
||||
// Create a new ClientAgent
|
||||
let mut client_agent = client_agent_sse.clone_with_shared_receiver();
|
||||
|
@ -59,26 +59,28 @@ fn main() {
|
|||
let ws_update_interval = *cfg.ws_interval;
|
||||
let websocket_routes = ws::extract_user_and_token_or_reject(pg_pool.clone())
|
||||
.and(warp::ws::ws2())
|
||||
.map(move |user: user::User, token: Option<String>, ws: Ws2| {
|
||||
log::info!("Incoming websocket request");
|
||||
// Create a new ClientAgent
|
||||
let mut client_agent = client_agent_ws.clone_with_shared_receiver();
|
||||
// Assign that agent to generate a stream of updates for the user/timeline pair
|
||||
client_agent.init_for_user(user);
|
||||
// send the updates through the WS connection (along with the User's access_token
|
||||
// which is sent for security)
|
||||
.map(
|
||||
move |user: user::Subscription, token: Option<String>, ws: Ws2| {
|
||||
log::info!("Incoming websocket request");
|
||||
// Create a new ClientAgent
|
||||
let mut client_agent = client_agent_ws.clone_with_shared_receiver();
|
||||
// Assign that agent to generate a stream of updates for the user/timeline pair
|
||||
client_agent.init_for_user(user);
|
||||
// send the updates through the WS connection (along with the User's access_token
|
||||
// which is sent for security)
|
||||
|
||||
(
|
||||
ws.on_upgrade(move |socket| {
|
||||
redis_to_client_stream::send_updates_to_ws(
|
||||
socket,
|
||||
client_agent,
|
||||
ws_update_interval,
|
||||
)
|
||||
}),
|
||||
token.unwrap_or_else(String::new),
|
||||
)
|
||||
})
|
||||
(
|
||||
ws.on_upgrade(move |socket| {
|
||||
redis_to_client_stream::send_updates_to_ws(
|
||||
socket,
|
||||
client_agent,
|
||||
ws_update_interval,
|
||||
)
|
||||
}),
|
||||
token.unwrap_or_else(String::new),
|
||||
)
|
||||
},
|
||||
)
|
||||
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
|
||||
|
||||
let cors = warp::cors()
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
//! Filters for all the endpoints accessible for Server Sent Event updates
|
||||
use super::{
|
||||
query::{self, Query},
|
||||
user::{PgPool, User},
|
||||
user::{PgPool, Subscription},
|
||||
};
|
||||
use warp::{filters::BoxedFilter, path, Filter};
|
||||
#[allow(dead_code)]
|
||||
type TimelineUser = ((String, User),);
|
||||
type TimelineUser = ((String, Subscription),);
|
||||
|
||||
/// Helper macro to match on the first of any of the provided filters
|
||||
macro_rules! any_of {
|
||||
|
@ -39,7 +39,7 @@ macro_rules! parse_query {
|
|||
.boxed()
|
||||
};
|
||||
}
|
||||
pub fn extract_user_or_reject(pg_pool: PgPool) -> BoxedFilter<(User,)> {
|
||||
pub fn extract_user_or_reject(pg_pool: PgPool) -> BoxedFilter<(Subscription,)> {
|
||||
any_of!(
|
||||
parse_query!(
|
||||
path => "api" / "v1" / "streaming" / "user" / "notification"
|
||||
|
@ -67,7 +67,7 @@ pub fn extract_user_or_reject(pg_pool: PgPool) -> BoxedFilter<(User,)> {
|
|||
// parameter, we need to update our Query if the header has a token
|
||||
.and(query::OptionalAccessToken::from_sse_header())
|
||||
.and_then(Query::update_access_token)
|
||||
.and_then(move |q| User::from_query(q, pg_pool.clone()))
|
||||
.and_then(move |q| Subscription::from_query(q, pg_pool.clone()))
|
||||
.boxed()
|
||||
}
|
||||
|
||||
|
@ -176,8 +176,8 @@ mod test {
|
|||
|
||||
test_public_endpoint!(public_media_true {
|
||||
endpoint: "/api/v1/streaming/public?only_media=true",
|
||||
user: User {
|
||||
target_timeline: "public:media".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "public:media".to_string(),
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
|
@ -195,8 +195,8 @@ mod test {
|
|||
});
|
||||
test_public_endpoint!(public_media_1 {
|
||||
endpoint: "/api/v1/streaming/public?only_media=1",
|
||||
user: User {
|
||||
target_timeline: "public:media".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "public:media".to_string(),
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
|
@ -214,8 +214,8 @@ mod test {
|
|||
});
|
||||
test_public_endpoint!(public_local {
|
||||
endpoint: "/api/v1/streaming/public/local",
|
||||
user: User {
|
||||
target_timeline: "public:local".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "public:local".to_string(),
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
|
@ -233,8 +233,8 @@ mod test {
|
|||
});
|
||||
test_public_endpoint!(public_local_media_true {
|
||||
endpoint: "/api/v1/streaming/public/local?only_media=true",
|
||||
user: User {
|
||||
target_timeline: "public:local:media".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "public:local:media".to_string(),
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
|
@ -252,8 +252,8 @@ mod test {
|
|||
});
|
||||
test_public_endpoint!(public_local_media_1 {
|
||||
endpoint: "/api/v1/streaming/public/local?only_media=1",
|
||||
user: User {
|
||||
target_timeline: "public:local:media".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "public:local:media".to_string(),
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
|
@ -271,8 +271,8 @@ mod test {
|
|||
});
|
||||
test_public_endpoint!(hashtag {
|
||||
endpoint: "/api/v1/streaming/hashtag?tag=a",
|
||||
user: User {
|
||||
target_timeline: "hashtag:a".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "hashtag:a".to_string(),
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
|
@ -290,8 +290,8 @@ mod test {
|
|||
});
|
||||
test_public_endpoint!(hashtag_local {
|
||||
endpoint: "/api/v1/streaming/hashtag/local?tag=a",
|
||||
user: User {
|
||||
target_timeline: "hashtag:local:a".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "hashtag:local:a".to_string(),
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
|
@ -310,8 +310,8 @@ mod test {
|
|||
|
||||
test_private_endpoint!(user {
|
||||
endpoint: "/api/v1/streaming/user",
|
||||
user: User {
|
||||
target_timeline: "1".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "1".to_string(),
|
||||
id: 1,
|
||||
email: "user@example.com".to_string(),
|
||||
access_token: "TEST_USER".to_string(),
|
||||
|
@ -329,8 +329,8 @@ mod test {
|
|||
});
|
||||
test_private_endpoint!(user_notification {
|
||||
endpoint: "/api/v1/streaming/user/notification",
|
||||
user: User {
|
||||
target_timeline: "1".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "1".to_string(),
|
||||
id: 1,
|
||||
email: "user@example.com".to_string(),
|
||||
access_token: "TEST_USER".to_string(),
|
||||
|
@ -348,8 +348,8 @@ mod test {
|
|||
});
|
||||
test_private_endpoint!(direct {
|
||||
endpoint: "/api/v1/streaming/direct",
|
||||
user: User {
|
||||
target_timeline: "direct".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "direct".to_string(),
|
||||
id: 1,
|
||||
email: "user@example.com".to_string(),
|
||||
access_token: "TEST_USER".to_string(),
|
||||
|
@ -369,8 +369,8 @@ mod test {
|
|||
test_private_endpoint!(list_valid_list {
|
||||
endpoint: "/api/v1/streaming/list",
|
||||
query: "list=1",
|
||||
user: User {
|
||||
target_timeline: "list:1".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "list:1".to_string(),
|
||||
id: 1,
|
||||
email: "user@example.com".to_string(),
|
||||
access_token: "TEST_USER".to_string(),
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
//! Mock Postgres connection (for use in unit testing)
|
||||
use super::{OauthScope, User};
|
||||
use super::{OauthScope, Subscription};
|
||||
use std::collections::HashSet;
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -10,8 +10,11 @@ impl PgPool {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn select_user(access_token: &str, _pg_pool: PgPool) -> Result<User, warp::reject::Rejection> {
|
||||
let mut user = User::default();
|
||||
pub fn select_user(
|
||||
access_token: &str,
|
||||
_pg_pool: PgPool,
|
||||
) -> Result<Subscription, warp::reject::Rejection> {
|
||||
let mut user = Subscription::default();
|
||||
if access_token == "TEST_USER" {
|
||||
user.id = 1;
|
||||
user.logged_in = true;
|
||||
|
|
|
@ -4,15 +4,162 @@ mod mock_postgres;
|
|||
#[cfg(test)]
|
||||
use mock_postgres as postgres;
|
||||
#[cfg(not(test))]
|
||||
mod postgres;
|
||||
pub mod postgres;
|
||||
pub use self::postgres::PgPool;
|
||||
use super::query::Query;
|
||||
use crate::log_fatal;
|
||||
use std::collections::HashSet;
|
||||
use warp::reject::Rejection;
|
||||
|
||||
/// The User (with data read from Postgres)
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct Subscription {
|
||||
pub timeline: Timeline,
|
||||
pub allowed_langs: HashSet<String>,
|
||||
pub blocks: Blocks,
|
||||
}
|
||||
|
||||
impl Default for Subscription {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
timeline: Timeline(Stream::Unset, Reach::Local, Content::Notification),
|
||||
allowed_langs: HashSet::new(),
|
||||
blocks: Blocks::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Subscription {
|
||||
pub fn from_query(q: Query, pool: PgPool) -> Result<Self, Rejection> {
|
||||
let user = match q.access_token.clone() {
|
||||
Some(token) => postgres::select_user(&token, pool.clone())?,
|
||||
None => UserData::public(),
|
||||
};
|
||||
Ok(Subscription {
|
||||
timeline: Timeline::from_query_and_user(&q, &user, pool.clone())?,
|
||||
allowed_langs: user.allowed_langs,
|
||||
blocks: Blocks {
|
||||
user_blocks: postgres::select_user_blocks(user.id, pool.clone()),
|
||||
domain_blocks: postgres::select_domain_blocks(user.id, pool.clone()),
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)]
|
||||
pub struct Timeline(pub Stream, pub Reach, pub Content);
|
||||
|
||||
impl Timeline {
|
||||
pub fn empty() -> Self {
|
||||
use {Content::*, Reach::*, Stream::*};
|
||||
Self(Unset, Local, Notification)
|
||||
}
|
||||
pub fn from_redis_channel(raw_timeline: &str, hashtag: Option<i64>) -> Self {
|
||||
use {Content::*, Reach::*, Stream::*};
|
||||
match raw_timeline.split(':').collect::<Vec<&str>>()[..] {
|
||||
["public"] => Timeline(Public, Federated, All),
|
||||
["public", "local"] => Timeline(Public, Local, All),
|
||||
["public", "media"] => Timeline(Public, Federated, Media),
|
||||
["public", "local", "media"] => Timeline(Public, Local, Media),
|
||||
["hashtag", _tag] => Timeline(Hashtag(hashtag.unwrap()), Federated, All),
|
||||
["hashtag", _tag, "local"] => Timeline(Hashtag(hashtag.unwrap()), Local, All),
|
||||
[id] => Timeline(User(id.parse().unwrap()), Local, All),
|
||||
["list", id] => Timeline(List(id.parse().unwrap()), Federated, All),
|
||||
["direct", id] => Timeline(Direct(id.parse().unwrap()), Federated, All),
|
||||
// Other endpoints don't exist:
|
||||
[..] => log_fatal!("Unexpected channel from Redis: {}", raw_timeline),
|
||||
}
|
||||
}
|
||||
pub fn to_redis_channel(&self, hashtag: Option<&String>) -> String {
|
||||
use {Content::*, Reach::*, Stream::*};
|
||||
match self {
|
||||
Timeline(User(id), Federated, All) => format!("timeline:{}", id),
|
||||
Timeline(User(id), Federated, Notification) => format!("timeline:{}:notification", id),
|
||||
Timeline(List(id), Federated, All) => format!("timeline:list:{}", id),
|
||||
Timeline(Direct(id), Federated, All) => format!("timeline:direct:{}", id),
|
||||
Timeline(Hashtag(id), Federated, All) => format!(
|
||||
"timeline:hashtag:{}",
|
||||
hashtag.unwrap_or_else(|| log_fatal!("Did not supply a name for hashtag #{}", id))
|
||||
),
|
||||
Timeline(Hashtag(id), Local, All) => format!(
|
||||
"timeline:hashtag:{}:local",
|
||||
hashtag.unwrap_or_else(|| log_fatal!("Did not supply a name for hashtag #{}", id))
|
||||
),
|
||||
Timeline(Public, Federated, Media) => "timeline:public:media".into(),
|
||||
Timeline(Public, Local, All) => "timeline:public:local".into(),
|
||||
|
||||
Timeline(Public, Federated, All) => "timeline:public".into(),
|
||||
Timeline(one, _two, _three) => {
|
||||
log_fatal!("Supposedly impossible timeline reached: {:?}", one)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn from_query_and_user(q: &Query, user: &UserData, pool: PgPool) -> Result<Self, Rejection> {
|
||||
use {warp::reject::custom, Content::*, Reach::*, Scope::*, Stream::*};
|
||||
let id_from_hashtag = || postgres::select_list_id(&q.hashtag, pool.clone());
|
||||
let user_owns_list = || postgres::user_owns_list(user.id, q.list, pool.clone());
|
||||
|
||||
Ok(match q.stream.as_ref() {
|
||||
"user" => match user.scopes.contains(&Statuses) {
|
||||
true => Timeline(User(user.id), Federated, All),
|
||||
false => Err(custom("Error: Missing access token"))?,
|
||||
},
|
||||
"user:notification" => match user.scopes.contains(&Statuses) {
|
||||
true => Timeline(User(user.id), Federated, Notification),
|
||||
false => Err(custom("Error: Missing access token"))?,
|
||||
},
|
||||
"list" => match user.scopes.contains(&Lists) && user_owns_list() {
|
||||
true => Timeline(List(q.list), Federated, All),
|
||||
false => Err(warp::reject::custom("Error: Missing access token"))?,
|
||||
},
|
||||
"direct" => match user.scopes.contains(&Statuses) {
|
||||
true => Timeline(Direct(user.id), Federated, All),
|
||||
false => Err(custom("Error: Missing access token"))?,
|
||||
},
|
||||
"hashtag" => Timeline(Hashtag(id_from_hashtag()?), Federated, All),
|
||||
"hashtag:local" => Timeline(Hashtag(id_from_hashtag()?), Local, All),
|
||||
"public" => match q.media {
|
||||
true => Timeline(Public, Federated, Media),
|
||||
false => Timeline(Public, Federated, All),
|
||||
},
|
||||
"public:local" => match q.media {
|
||||
true => Timeline(Public, Local, All),
|
||||
false => Timeline(Public, Local, All),
|
||||
},
|
||||
"public:media" => Timeline(Public, Federated, Media),
|
||||
"public:local:media" => Timeline(Public, Local, Media),
|
||||
other => {
|
||||
log::warn!("Client attempted to subscribe to: `{}`", other);
|
||||
Err(custom("Error: Nonexistent endpoint"))?
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
#[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)]
|
||||
pub enum Stream {
|
||||
User(i64),
|
||||
List(i64),
|
||||
Direct(i64),
|
||||
Hashtag(i64),
|
||||
Public,
|
||||
Unset,
|
||||
}
|
||||
#[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)]
|
||||
pub enum Reach {
|
||||
Local,
|
||||
Federated,
|
||||
}
|
||||
#[derive(Clone, Debug, Copy, Eq, Hash, PartialEq)]
|
||||
pub enum Content {
|
||||
All,
|
||||
Media,
|
||||
Notification,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
pub enum Scope {
|
||||
All,
|
||||
Read,
|
||||
Statuses,
|
||||
Notifications,
|
||||
Lists,
|
||||
|
@ -24,86 +171,59 @@ pub struct Blocks {
|
|||
pub user_blocks: HashSet<i64>,
|
||||
}
|
||||
|
||||
/// The User (with data read from Postgres)
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct User {
|
||||
pub target_timeline: String,
|
||||
pub id: i64,
|
||||
pub scopes: HashSet<Scope>,
|
||||
pub logged_in: bool,
|
||||
pub allowed_langs: HashSet<String>,
|
||||
pub blocks: Blocks,
|
||||
pub struct UserData {
|
||||
id: i64,
|
||||
allowed_langs: HashSet<String>,
|
||||
scopes: HashSet<Scope>,
|
||||
}
|
||||
|
||||
impl Default for User {
|
||||
fn default() -> Self {
|
||||
impl UserData {
|
||||
fn public() -> Self {
|
||||
Self {
|
||||
id: -1,
|
||||
scopes: HashSet::new(),
|
||||
logged_in: false,
|
||||
target_timeline: String::new(),
|
||||
allowed_langs: HashSet::new(),
|
||||
blocks: Blocks::default(),
|
||||
scopes: HashSet::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl User {
|
||||
pub fn from_query(q: Query, pool: PgPool) -> Result<Self, Rejection> {
|
||||
let token = q.access_token.clone();
|
||||
let mut user: User = match token {
|
||||
None => User::default(),
|
||||
Some(token) => postgres::select_user(&token, pool.clone())?,
|
||||
};
|
||||
// fn set_timeline_and_filter(self, q: Query, pool: PgPool) -> Result<Self, Rejection> {
|
||||
// let (read_scope, f) = (self.scopes.clone(), self.allowed_langs.clone());
|
||||
// use Scope::*;
|
||||
// let (filter, target_timeline) = match q.stream.as_ref() {
|
||||
// // Public endpoints:
|
||||
// tl @ "public" | tl @ "public:local" if q.media => (f, format!("{}:media", tl)),
|
||||
// tl @ "public:media" | tl @ "public:local:media" => (f, tl.to_string()),
|
||||
// tl @ "public" | tl @ "public:local" => (f, tl.to_string()),
|
||||
|
||||
user = user.set_timeline_and_filter(q, pool.clone())?;
|
||||
user.blocks.user_blocks = postgres::select_user_blocks(user.id, pool.clone());
|
||||
user.blocks.domain_blocks = postgres::select_domain_blocks(user.id, pool.clone());
|
||||
log::info!("Creating user: {:#?}", user);
|
||||
Ok(user)
|
||||
}
|
||||
|
||||
fn set_timeline_and_filter(self, q: Query, pool: PgPool) -> Result<Self, Rejection> {
|
||||
let (read_scope, f) = (self.scopes.clone(), self.allowed_langs.clone());
|
||||
use Scope::*;
|
||||
let (filter, target_timeline) = match q.stream.as_ref() {
|
||||
// Public endpoints:
|
||||
tl @ "public" | tl @ "public:local" if q.media => (f, format!("{}:media", tl)),
|
||||
tl @ "public:media" | tl @ "public:local:media" => (f, tl.to_string()),
|
||||
tl @ "public" | tl @ "public:local" => (f, tl.to_string()),
|
||||
|
||||
// Hashtag endpoints:
|
||||
tl @ "hashtag" | tl @ "hashtag:local" => (f, format!("{}:{}", tl, q.hashtag)),
|
||||
// Private endpoints: User:
|
||||
"user" if self.logged_in && read_scope.contains(&Statuses) => {
|
||||
(HashSet::new(), format!("{}", self.id))
|
||||
}
|
||||
"user:notification" if self.logged_in && read_scope.contains(&Notifications) => {
|
||||
(HashSet::new(), format!("{}", self.id))
|
||||
}
|
||||
// List endpoint:
|
||||
"list" if self.owns_list(q.list, pool) && read_scope.contains(&Lists) => {
|
||||
(HashSet::new(), format!("list:{}", q.list))
|
||||
}
|
||||
// Direct endpoint:
|
||||
"direct" if self.logged_in && read_scope.contains(&Statuses) => {
|
||||
(HashSet::new(), "direct".to_string())
|
||||
}
|
||||
// Reject unathorized access attempts for private endpoints
|
||||
"user" | "user:notification" | "direct" | "list" => {
|
||||
return Err(warp::reject::custom("Error: Missing access token"))
|
||||
}
|
||||
// Other endpoints don't exist:
|
||||
_ => return Err(warp::reject::custom("Error: Nonexistent endpoint")),
|
||||
};
|
||||
Ok(Self {
|
||||
target_timeline,
|
||||
allowed_langs: filter,
|
||||
..self
|
||||
})
|
||||
}
|
||||
|
||||
fn owns_list(&self, list: i64, pool: PgPool) -> bool {
|
||||
postgres::user_owns_list(self.id, list, pool)
|
||||
}
|
||||
}
|
||||
// // Hashtag endpoints:
|
||||
// tl @ "hashtag" | tl @ "hashtag:local" => (f, format!("{}:{}", tl, q.hashtag)),
|
||||
// // Private endpoints: User:
|
||||
// "user" if self.logged_in && read_scope.contains(&Statuses) => {
|
||||
// (HashSet::new(), format!("{}", self.id))
|
||||
// }
|
||||
// "user:notification" if self.logged_in && read_scope.contains(&Notifications) => {
|
||||
// (HashSet::new(), format!("{}", self.id))
|
||||
// }
|
||||
// // List endpoint:
|
||||
// "list" if self.owns_list(q.list, pool) && read_scope.contains(&Lists) => {
|
||||
// (HashSet::new(), format!("list:{}", q.list))
|
||||
// }
|
||||
// // Direct endpoint:
|
||||
// "direct" if self.logged_in && read_scope.contains(&Statuses) => {
|
||||
// (HashSet::new(), "direct".to_string())
|
||||
// }
|
||||
// // Reject unathorized access attempts for private endpoints
|
||||
// "user" | "user:notification" | "direct" | "list" => {
|
||||
// return Err(warp::reject::custom("Error: Missing access token"))
|
||||
// }
|
||||
// // Other endpoints don't exist:
|
||||
// _ => return Err(warp::reject::custom("Error: Nonexistent endpoint")),
|
||||
// };
|
||||
// Ok(Self {
|
||||
// target_timeline,
|
||||
// allowed_langs: filter,
|
||||
// ..self
|
||||
// })
|
||||
// }
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
//! Postgres queries
|
||||
use crate::{
|
||||
config,
|
||||
parse_client_request::user::{Scope, User},
|
||||
parse_client_request::user::{Scope, UserData},
|
||||
};
|
||||
use ::postgres;
|
||||
use r2d2_postgres::PostgresConnectionManager;
|
||||
use std::collections::HashSet;
|
||||
use warp::reject::Rejection;
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PgPool(pub r2d2::Pool<PostgresConnectionManager<postgres::NoTls>>);
|
||||
impl PgPool {
|
||||
pub fn new(pg_cfg: config::PostgresConfig) -> Self {
|
||||
|
@ -30,12 +30,8 @@ impl PgPool {
|
|||
}
|
||||
}
|
||||
|
||||
/// Build a user based on the result of querying Postgres with the access token
|
||||
///
|
||||
/// This does _not_ set the timeline, filter, or blocks fields. Use the various `User`
|
||||
/// methods to do so. In general, this function shouldn't be needed outside `User`.
|
||||
pub fn select_user(access_token: &str, pg_pool: PgPool) -> Result<User, Rejection> {
|
||||
let mut conn = pg_pool.0.get().unwrap();
|
||||
pub fn select_user(token: &str, pool: PgPool) -> Result<UserData, Rejection> {
|
||||
let mut conn = pool.0.get().unwrap();
|
||||
let query_rows = conn
|
||||
.query(
|
||||
"
|
||||
|
@ -47,15 +43,21 @@ oauth_access_tokens.resource_owner_id = users.id
|
|||
WHERE oauth_access_tokens.token = $1
|
||||
AND oauth_access_tokens.revoked_at IS NULL
|
||||
LIMIT 1",
|
||||
&[&access_token.to_owned()],
|
||||
&[&token.to_owned()],
|
||||
)
|
||||
.expect("Hard-coded query will return Some([0 or more rows])");
|
||||
if let Some(result_columns) = query_rows.get(0) {
|
||||
let id = result_columns.get(1);
|
||||
let allowed_langs = result_columns
|
||||
.try_get::<_, Vec<_>>(2)
|
||||
.unwrap_or_else(|_| Vec::new())
|
||||
.into_iter()
|
||||
.collect();
|
||||
let mut scopes: HashSet<Scope> = result_columns
|
||||
.get::<_, String>(3)
|
||||
.split(' ')
|
||||
.filter_map(|scope| match scope {
|
||||
"read" => Some(Scope::All),
|
||||
"read" => Some(Scope::Read),
|
||||
"read:statuses" => Some(Scope::Statuses),
|
||||
"read:notifications" => Some(Scope::Notifications),
|
||||
"read:lists" => Some(Scope::Lists),
|
||||
|
@ -65,30 +67,62 @@ LIMIT 1",
|
|||
}
|
||||
})
|
||||
.collect();
|
||||
if scopes.remove(&Scope::All) {
|
||||
// We don't need to separately track read auth - it's just all three others
|
||||
if scopes.remove(&Scope::Read) {
|
||||
scopes.insert(Scope::Statuses);
|
||||
scopes.insert(Scope::Notifications);
|
||||
scopes.insert(Scope::Lists);
|
||||
}
|
||||
let mut allowed_langs = HashSet::new();
|
||||
if let Ok(langs_vec) = result_columns.try_get::<_, Vec<String>>(2) {
|
||||
for lang in langs_vec {
|
||||
allowed_langs.insert(lang);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(User {
|
||||
id: result_columns.get(1),
|
||||
scopes,
|
||||
logged_in: true,
|
||||
Ok(UserData {
|
||||
id,
|
||||
allowed_langs,
|
||||
..User::default()
|
||||
scopes,
|
||||
})
|
||||
} else {
|
||||
Err(warp::reject::custom("Error: Invalid access token"))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn select_list_id(tag_name: &String, pg_pool: PgPool) -> Result<i64, Rejection> {
|
||||
let mut conn = pg_pool.0.get().unwrap();
|
||||
// For the Postgres query, `id` = list number; `account_id` = user.id
|
||||
let rows = &conn
|
||||
.query(
|
||||
"
|
||||
SELECT id
|
||||
FROM tags
|
||||
WHERE name = $1
|
||||
LIMIT 1",
|
||||
&[&tag_name],
|
||||
)
|
||||
.expect("Hard-coded query will return Some([0 or more rows])");
|
||||
|
||||
match rows.get(0) {
|
||||
Some(row) => Ok(row.get(0)),
|
||||
None => Err(warp::reject::custom("Error: Hashtag does not exist.")),
|
||||
}
|
||||
}
|
||||
pub fn select_hashtag_name(tag_id: &i64, pg_pool: PgPool) -> Result<String, Rejection> {
|
||||
let mut conn = pg_pool.0.get().unwrap();
|
||||
// For the Postgres query, `id` = list number; `account_id` = user.id
|
||||
let rows = &conn
|
||||
.query(
|
||||
"
|
||||
SELECT name
|
||||
FROM tags
|
||||
WHERE id = $1
|
||||
LIMIT 1",
|
||||
&[&tag_id],
|
||||
)
|
||||
.expect("Hard-coded query will return Some([0 or more rows])");
|
||||
|
||||
match rows.get(0) {
|
||||
Some(row) => Ok(row.get(0)),
|
||||
None => Err(warp::reject::custom("Error: Hashtag does not exist.")),
|
||||
}
|
||||
}
|
||||
|
||||
/// Query Postgres for everyone the user has blocked or muted
|
||||
///
|
||||
/// **NOTE**: because we check this when the user connects, it will not include any blocks
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! Filters for the WebSocket endpoint
|
||||
use super::{
|
||||
query::{self, Query},
|
||||
user::{PgPool, User},
|
||||
user::{PgPool, Subscription},
|
||||
};
|
||||
use warp::{filters::BoxedFilter, path, Filter};
|
||||
|
||||
|
@ -32,11 +32,13 @@ fn parse_query() -> BoxedFilter<(Query,)> {
|
|||
.boxed()
|
||||
}
|
||||
|
||||
pub fn extract_user_and_token_or_reject(pg_pool: PgPool) -> BoxedFilter<(User, Option<String>)> {
|
||||
pub fn extract_user_and_token_or_reject(
|
||||
pg_pool: PgPool,
|
||||
) -> BoxedFilter<(Subscription, Option<String>)> {
|
||||
parse_query()
|
||||
.and(query::OptionalAccessToken::from_ws_header())
|
||||
.and_then(Query::update_access_token)
|
||||
.and_then(move |q| User::from_query(q, pg_pool.clone()))
|
||||
.and_then(move |q| Subscription::from_query(q, pg_pool.clone()))
|
||||
.and(query::OptionalAccessToken::from_ws_header())
|
||||
.boxed()
|
||||
}
|
||||
|
@ -124,8 +126,8 @@ mod test {
|
|||
|
||||
test_public_endpoint!(public_media {
|
||||
endpoint: "/api/v1/streaming?stream=public:media",
|
||||
user: User {
|
||||
target_timeline: "public:media".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "public:media".to_string(),
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
|
@ -143,8 +145,8 @@ mod test {
|
|||
});
|
||||
test_public_endpoint!(public_local {
|
||||
endpoint: "/api/v1/streaming?stream=public:local",
|
||||
user: User {
|
||||
target_timeline: "public:local".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "public:local".to_string(),
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
|
@ -162,8 +164,8 @@ mod test {
|
|||
});
|
||||
test_public_endpoint!(public_local_media {
|
||||
endpoint: "/api/v1/streaming?stream=public:local:media",
|
||||
user: User {
|
||||
target_timeline: "public:local:media".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "public:local:media".to_string(),
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
|
@ -181,8 +183,8 @@ mod test {
|
|||
});
|
||||
test_public_endpoint!(hashtag {
|
||||
endpoint: "/api/v1/streaming?stream=hashtag&tag=a",
|
||||
user: User {
|
||||
target_timeline: "hashtag:a".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "hashtag:a".to_string(),
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
|
@ -200,8 +202,8 @@ mod test {
|
|||
});
|
||||
test_public_endpoint!(hashtag_local {
|
||||
endpoint: "/api/v1/streaming?stream=hashtag:local&tag=a",
|
||||
user: User {
|
||||
target_timeline: "hashtag:local:a".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "hashtag:local:a".to_string(),
|
||||
id: -1,
|
||||
email: "".to_string(),
|
||||
access_token: "".to_string(),
|
||||
|
@ -220,8 +222,8 @@ mod test {
|
|||
|
||||
test_private_endpoint!(user {
|
||||
endpoint: "/api/v1/streaming?stream=user",
|
||||
user: User {
|
||||
target_timeline: "1".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "1".to_string(),
|
||||
id: 1,
|
||||
email: "user@example.com".to_string(),
|
||||
access_token: "TEST_USER".to_string(),
|
||||
|
@ -239,8 +241,8 @@ mod test {
|
|||
});
|
||||
test_private_endpoint!(user_notification {
|
||||
endpoint: "/api/v1/streaming?stream=user:notification",
|
||||
user: User {
|
||||
target_timeline: "1".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "1".to_string(),
|
||||
id: 1,
|
||||
email: "user@example.com".to_string(),
|
||||
access_token: "TEST_USER".to_string(),
|
||||
|
@ -258,8 +260,8 @@ mod test {
|
|||
});
|
||||
test_private_endpoint!(direct {
|
||||
endpoint: "/api/v1/streaming?stream=direct",
|
||||
user: User {
|
||||
target_timeline: "direct".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "direct".to_string(),
|
||||
id: 1,
|
||||
email: "user@example.com".to_string(),
|
||||
access_token: "TEST_USER".to_string(),
|
||||
|
@ -277,8 +279,8 @@ mod test {
|
|||
});
|
||||
test_private_endpoint!(list_valid_list {
|
||||
endpoint: "/api/v1/streaming?stream=list&list=1",
|
||||
user: User {
|
||||
target_timeline: "list:1".to_string(),
|
||||
user: Subscription {
|
||||
timeline: "list:1".to_string(),
|
||||
id: 1,
|
||||
email: "user@example.com".to_string(),
|
||||
access_token: "TEST_USER".to_string(),
|
||||
|
|
|
@ -16,7 +16,10 @@
|
|||
//! communicate with Redis, it we create a new `ClientAgent` for
|
||||
//! each new client connection (each in its own thread).use super::{message::Message, receiver::Receiver}
|
||||
use super::{message::Message, receiver::Receiver};
|
||||
use crate::{config, parse_client_request::user::User};
|
||||
use crate::{
|
||||
config,
|
||||
parse_client_request::user::{PgPool, Subscription},
|
||||
};
|
||||
use futures::{
|
||||
Async::{self, NotReady, Ready},
|
||||
Poll,
|
||||
|
@ -31,18 +34,17 @@ use uuid::Uuid;
|
|||
pub struct ClientAgent {
|
||||
receiver: sync::Arc<sync::Mutex<Receiver>>,
|
||||
id: uuid::Uuid,
|
||||
pub target_timeline: String,
|
||||
pub current_user: User,
|
||||
// pub current_timeline: String,
|
||||
subscription: Subscription,
|
||||
}
|
||||
|
||||
impl ClientAgent {
|
||||
/// Create a new `ClientAgent` with no shared data.
|
||||
pub fn blank(redis_cfg: config::RedisConfig) -> Self {
|
||||
pub fn blank(redis_cfg: config::RedisConfig, pg_pool: PgPool) -> Self {
|
||||
ClientAgent {
|
||||
receiver: sync::Arc::new(sync::Mutex::new(Receiver::new(redis_cfg))),
|
||||
receiver: sync::Arc::new(sync::Mutex::new(Receiver::new(redis_cfg, pg_pool))),
|
||||
id: Uuid::default(),
|
||||
target_timeline: String::new(),
|
||||
current_user: User::default(),
|
||||
subscription: Subscription::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -51,24 +53,23 @@ impl ClientAgent {
|
|||
Self {
|
||||
receiver: self.receiver.clone(),
|
||||
id: self.id,
|
||||
target_timeline: self.target_timeline.clone(),
|
||||
current_user: self.current_user.clone(),
|
||||
subscription: self.subscription.clone(),
|
||||
}
|
||||
}
|
||||
/// Initializes the `ClientAgent` with a unique ID, a `User`, and the target timeline.
|
||||
/// Also passes values to the `Receiver` for it's initialization.
|
||||
|
||||
/// Initializes the `ClientAgent` with a unique ID associated with a specific user's
|
||||
/// subscription. Also passes values to the `Receiver` for it's initialization.
|
||||
///
|
||||
/// Note that this *may or may not* result in a new Redis connection.
|
||||
/// If the server has already subscribed to the timeline on behalf of
|
||||
/// a different user, the `Receiver` is responsible for figuring
|
||||
/// that out and avoiding duplicated connections. Thus, it is safe to
|
||||
/// use this method for each new client connection.
|
||||
pub fn init_for_user(&mut self, user: User) {
|
||||
pub fn init_for_user(&mut self, subscription: Subscription) {
|
||||
self.id = Uuid::new_v4();
|
||||
self.target_timeline = user.target_timeline.to_owned();
|
||||
self.current_user = user;
|
||||
self.subscription = subscription;
|
||||
let mut receiver = self.receiver.lock().expect("No thread panic (stream.rs)");
|
||||
receiver.manage_new_timeline(self.id, &self.target_timeline);
|
||||
receiver.manage_new_timeline(self.id, self.subscription.timeline);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -92,14 +93,14 @@ impl futures::stream::Stream for ClientAgent {
|
|||
.receiver
|
||||
.lock()
|
||||
.expect("ClientAgent: No other thread panic");
|
||||
receiver.configure_for_polling(self.id, &self.target_timeline.clone());
|
||||
receiver.configure_for_polling(self.id, self.subscription.timeline);
|
||||
receiver.poll()
|
||||
};
|
||||
if start_time.elapsed().as_millis() > 1 {
|
||||
log::warn!("Polling the Receiver took: {:?}", start_time.elapsed());
|
||||
};
|
||||
|
||||
let (allowed_langs, blocks) = (&self.current_user.allowed_langs, &self.current_user.blocks);
|
||||
let (allowed_langs, blocks) = (&self.subscription.allowed_langs, &self.subscription.blocks);
|
||||
let (blocked_users, blocked_domains) = (&blocks.user_blocks, &blocks.domain_blocks);
|
||||
let (send, block) = (|msg| Ok(Ready(Some(msg))), Ok(NotReady));
|
||||
use Message::*;
|
||||
|
|
|
@ -56,10 +56,6 @@ pub fn send_updates_to_ws(
|
|||
}),
|
||||
);
|
||||
|
||||
let (tl, id) = (
|
||||
client_agent.current_user.target_timeline.clone(),
|
||||
client_agent.current_user.id,
|
||||
);
|
||||
// Yield new events for as long as the client is still connected
|
||||
let event_stream = tokio::timer::Interval::new(time::Instant::now(), update_interval)
|
||||
.take_while(move |_| match ws_rx.poll() {
|
||||
|
@ -75,7 +71,7 @@ pub fn send_updates_to_ws(
|
|||
futures::future::ok(false)
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Error in TL {}\nfor user: #{}\n{}", tl, id, e);
|
||||
log::warn!("Error in TL {}", e);
|
||||
futures::future::ok(false)
|
||||
}
|
||||
});
|
||||
|
@ -105,5 +101,5 @@ pub fn send_updates_to_ws(
|
|||
log::info!("WebSocket connection closed.");
|
||||
result
|
||||
})
|
||||
.map_err(move |e| log::warn!("Error sending to user: {}\n{}", id, e))
|
||||
.map_err(move |e| log::warn!("Error sending to user: {}", e))
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use crate::parse_client_request::user::Timeline;
|
||||
use serde_json::Value;
|
||||
use std::{collections, time};
|
||||
use uuid::Uuid;
|
||||
|
@ -6,16 +7,15 @@ use uuid::Uuid;
|
|||
pub struct MsgQueue {
|
||||
pub messages: collections::VecDeque<Value>,
|
||||
last_polled_at: time::Instant,
|
||||
pub redis_channel: String,
|
||||
pub timeline: Timeline,
|
||||
}
|
||||
|
||||
impl MsgQueue {
|
||||
pub fn new(redis_channel: impl std::fmt::Display) -> Self {
|
||||
let redis_channel = redis_channel.to_string();
|
||||
pub fn new(timeline: Timeline) -> Self {
|
||||
MsgQueue {
|
||||
messages: collections::VecDeque::new(),
|
||||
last_polled_at: time::Instant::now(),
|
||||
redis_channel,
|
||||
timeline,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -29,26 +29,26 @@ impl MessageQueues {
|
|||
.and_modify(|queue| queue.last_polled_at = time::Instant::now());
|
||||
}
|
||||
|
||||
pub fn oldest_msg_in_target_queue(&mut self, id: Uuid, timeline: String) -> Option<Value> {
|
||||
pub fn oldest_msg_in_target_queue(&mut self, id: Uuid, timeline: Timeline) -> Option<Value> {
|
||||
self.entry(id)
|
||||
.or_insert_with(|| MsgQueue::new(timeline))
|
||||
.messages
|
||||
.pop_front()
|
||||
}
|
||||
pub fn calculate_timelines_to_add_or_drop(&mut self, timeline: String) -> Vec<Change> {
|
||||
pub fn calculate_timelines_to_add_or_drop(&mut self, timeline: Timeline) -> Vec<Change> {
|
||||
let mut timelines_to_modify = Vec::new();
|
||||
|
||||
timelines_to_modify.push(Change {
|
||||
timeline: timeline.to_owned(),
|
||||
timeline,
|
||||
in_subscriber_number: 1,
|
||||
});
|
||||
self.retain(|_id, msg_queue| {
|
||||
if msg_queue.last_polled_at.elapsed() < time::Duration::from_secs(30) {
|
||||
true
|
||||
} else {
|
||||
let timeline = &msg_queue.redis_channel;
|
||||
let timeline = &msg_queue.timeline;
|
||||
timelines_to_modify.push(Change {
|
||||
timeline: timeline.to_owned(),
|
||||
timeline: *timeline,
|
||||
in_subscriber_number: -1,
|
||||
});
|
||||
false
|
||||
|
@ -58,7 +58,7 @@ impl MessageQueues {
|
|||
}
|
||||
}
|
||||
pub struct Change {
|
||||
pub timeline: String,
|
||||
pub timeline: Timeline,
|
||||
pub in_subscriber_number: i32,
|
||||
}
|
||||
|
||||
|
|
|
@ -4,13 +4,15 @@
|
|||
mod message_queues;
|
||||
use crate::{
|
||||
config::{self, RedisInterval},
|
||||
parse_client_request::user::{self, postgres, PgPool, Timeline},
|
||||
pubsub_cmd,
|
||||
redis_to_client_stream::redis::{redis_cmd, RedisConn, RedisStream},
|
||||
};
|
||||
use futures::{Async, Poll};
|
||||
use lru::LruCache;
|
||||
pub use message_queues::{MessageQueues, MsgQueue};
|
||||
use serde_json::Value;
|
||||
use std::{collections, net, time};
|
||||
use std::{collections::HashMap, net, time};
|
||||
use tokio::io::Error;
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -21,16 +23,30 @@ pub struct Receiver {
|
|||
secondary_redis_connection: net::TcpStream,
|
||||
redis_poll_interval: RedisInterval,
|
||||
redis_polled_at: time::Instant,
|
||||
timeline: String,
|
||||
timeline: Timeline,
|
||||
manager_id: Uuid,
|
||||
pub msg_queues: MessageQueues,
|
||||
clients_per_timeline: collections::HashMap<String, i32>,
|
||||
clients_per_timeline: HashMap<Timeline, i32>,
|
||||
cache: Cache,
|
||||
pool: PgPool,
|
||||
}
|
||||
#[derive(Debug)]
|
||||
struct Cache {
|
||||
id_to_hashtag: LruCache<i64, String>,
|
||||
hashtag_to_id: LruCache<String, i64>,
|
||||
}
|
||||
impl Cache {
|
||||
fn new(size: usize) -> Self {
|
||||
Self {
|
||||
id_to_hashtag: LruCache::new(size),
|
||||
hashtag_to_id: LruCache::new(size),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Receiver {
|
||||
/// Create a new `Receiver`, with its own Redis connections (but, as yet, no
|
||||
/// active subscriptions).
|
||||
pub fn new(redis_cfg: config::RedisConfig) -> Self {
|
||||
pub fn new(redis_cfg: config::RedisConfig, pool: PgPool) -> Self {
|
||||
let RedisConn {
|
||||
primary: pubsub_connection,
|
||||
secondary: secondary_redis_connection,
|
||||
|
@ -44,10 +60,12 @@ impl Receiver {
|
|||
secondary_redis_connection,
|
||||
redis_poll_interval,
|
||||
redis_polled_at: time::Instant::now(),
|
||||
timeline: String::new(),
|
||||
timeline: Timeline::empty(),
|
||||
manager_id: Uuid::default(),
|
||||
msg_queues: MessageQueues(collections::HashMap::new()),
|
||||
clients_per_timeline: collections::HashMap::new(),
|
||||
msg_queues: MessageQueues(HashMap::new()),
|
||||
clients_per_timeline: HashMap::new(),
|
||||
cache: Cache::new(100),
|
||||
pool,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -57,9 +75,9 @@ impl Receiver {
|
|||
/// Note: this method calls `subscribe_or_unsubscribe_as_needed`,
|
||||
/// so Redis PubSub subscriptions are only updated when a new timeline
|
||||
/// comes under management for the first time.
|
||||
pub fn manage_new_timeline(&mut self, manager_id: Uuid, timeline: &str) {
|
||||
pub fn manage_new_timeline(&mut self, manager_id: Uuid, timeline: Timeline) {
|
||||
self.manager_id = manager_id;
|
||||
self.timeline = timeline.to_string();
|
||||
self.timeline = timeline;
|
||||
self.msg_queues
|
||||
.insert(self.manager_id, MsgQueue::new(timeline));
|
||||
self.subscribe_or_unsubscribe_as_needed(timeline);
|
||||
|
@ -67,19 +85,17 @@ impl Receiver {
|
|||
|
||||
/// Set the `Receiver`'s manager_id and target_timeline fields to the appropriate
|
||||
/// value to be polled by the current `StreamManager`.
|
||||
pub fn configure_for_polling(&mut self, manager_id: Uuid, timeline: &str) {
|
||||
pub fn configure_for_polling(&mut self, manager_id: Uuid, timeline: Timeline) {
|
||||
self.manager_id = manager_id;
|
||||
self.timeline = timeline.to_string();
|
||||
self.timeline = timeline;
|
||||
}
|
||||
|
||||
/// Drop any PubSub subscriptions that don't have active clients and check
|
||||
/// that there's a subscription to the current one. If there isn't, then
|
||||
/// subscribe to it.
|
||||
fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: &str) {
|
||||
fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: Timeline) {
|
||||
let start_time = std::time::Instant::now();
|
||||
let timelines_to_modify = self
|
||||
.msg_queues
|
||||
.calculate_timelines_to_add_or_drop(timeline.to_string());
|
||||
let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(timeline);
|
||||
|
||||
// Record the lower number of clients subscribed to that channel
|
||||
for change in timelines_to_modify {
|
||||
|
@ -88,11 +104,37 @@ impl Receiver {
|
|||
.entry(change.timeline.clone())
|
||||
.and_modify(|n| *n += change.in_subscriber_number)
|
||||
.or_insert_with(|| 1);
|
||||
use user::Stream::*;
|
||||
let hashtag = match timeline {
|
||||
Timeline(Hashtag(id), _, _) => {
|
||||
// TODO clean this up
|
||||
let maybe_tag = self.cache.id_to_hashtag.get(&id).map(String::from);
|
||||
let pool = self.pool.clone();
|
||||
let tag = maybe_tag.unwrap_or_else(|| {
|
||||
let tag = &postgres::select_hashtag_name(&id, pool).expect("TODO");
|
||||
|
||||
tag.to_string()
|
||||
});
|
||||
self.cache.hashtag_to_id.put(tag.clone(), id);
|
||||
self.cache.id_to_hashtag.put(id, tag.clone());
|
||||
|
||||
Some(tag)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
// If no clients, unsubscribe from the channel
|
||||
if *count_of_subscribed_clients <= 0 {
|
||||
pubsub_cmd!("unsubscribe", self, change.timeline.clone());
|
||||
pubsub_cmd!(
|
||||
"unsubscribe",
|
||||
self,
|
||||
change.timeline.to_redis_channel(hashtag.as_ref())
|
||||
);
|
||||
} else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 {
|
||||
pubsub_cmd!("subscribe", self, change.timeline.clone());
|
||||
pubsub_cmd!(
|
||||
"subscribe",
|
||||
self,
|
||||
change.timeline.to_redis_channel(hashtag.as_ref())
|
||||
);
|
||||
}
|
||||
}
|
||||
if start_time.elapsed().as_millis() > 1 {
|
||||
|
@ -113,9 +155,27 @@ impl futures::stream::Stream for Receiver {
|
|||
/// message already in a queue. Thus, we only poll Redis if it has not
|
||||
/// been polled lately.
|
||||
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
|
||||
let start_time = time::Instant::now();
|
||||
let (timeline, id) = (self.timeline.clone(), self.manager_id);
|
||||
if self.redis_polled_at.elapsed() > *self.redis_poll_interval {
|
||||
self.pubsub_connection.poll_redis(&mut self.msg_queues);
|
||||
for (raw_timeline, msg_value) in self.pubsub_connection.poll_redis() {
|
||||
let hashtag = if raw_timeline.starts_with("hashtag") {
|
||||
log::warn!("Found a hashtag in: {:?}", start_time.elapsed());
|
||||
let tag_name = raw_timeline.split(':').nth(1).expect("TODO").to_string();
|
||||
log::warn!("Got the tag name in: {:?}", start_time.elapsed());
|
||||
let tag_id = *self.cache.hashtag_to_id.get(&tag_name).expect("TODO");
|
||||
log::warn!("Got the cached id in: {:?}", start_time.elapsed());
|
||||
Some(tag_id)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let timeline = Timeline::from_redis_channel(&raw_timeline, hashtag);
|
||||
for msg_queue in self.msg_queues.values_mut() {
|
||||
if msg_queue.timeline == timeline {
|
||||
msg_queue.messages.push_back(msg_value.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
self.redis_polled_at = time::Instant::now();
|
||||
}
|
||||
|
||||
|
@ -132,6 +192,8 @@ impl futures::stream::Stream for Receiver {
|
|||
|
||||
impl Drop for Receiver {
|
||||
fn drop(&mut self) {
|
||||
pubsub_cmd!("unsubscribe", self, self.timeline.clone());
|
||||
// TODO fix for hashtags:
|
||||
let hashtag = None;
|
||||
pubsub_cmd!("unsubscribe", self, self.timeline.to_redis_channel(hashtag));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ macro_rules! pubsub_cmd {
|
|||
$self
|
||||
.secondary_redis_connection
|
||||
.write_all(&redis_cmd::set(
|
||||
format!("subscribed:timeline:{}", $tl),
|
||||
format!("subscribed:{}", $tl),
|
||||
subscription_new_number,
|
||||
namespace.clone(),
|
||||
))
|
||||
|
@ -35,8 +35,8 @@ macro_rules! pubsub_cmd {
|
|||
/// Send a `SUBSCRIBE` or `UNSUBSCRIBE` command to a specific timeline
|
||||
pub fn pubsub(command: impl Display, timeline: impl Display, ns: Option<String>) -> Vec<u8> {
|
||||
let arg = match ns {
|
||||
Some(namespace) => format!("{}:timeline:{}", namespace, timeline),
|
||||
None => format!("timeline:{}", timeline),
|
||||
Some(namespace) => format!("{}:{}", namespace, timeline),
|
||||
None => format!("{}", timeline),
|
||||
};
|
||||
cmd(command, arg)
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ impl<'a> RedisMsg<'a> {
|
|||
item
|
||||
}
|
||||
|
||||
pub fn extract_timeline_and_message(&mut self) -> (String, Value) {
|
||||
pub fn extract_raw_timeline_and_message(&mut self) -> (String, Value) {
|
||||
let timeline = &self.next_field()[self.prefix_len..];
|
||||
let msg_txt = self.next_field();
|
||||
let msg_value: Value =
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use super::redis_msg::RedisMsg;
|
||||
use crate::{config::RedisNamespace, redis_to_client_stream::receiver::MessageQueues};
|
||||
use crate::config::RedisNamespace;
|
||||
use futures::{Async, Poll};
|
||||
use serde_json::Value;
|
||||
use std::{io::Read, net};
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
|
@ -27,8 +28,9 @@ impl RedisStream {
|
|||
// into messages. Incoming messages *are* guaranteed to be RESP arrays,
|
||||
// https://redis.io/topics/protocol
|
||||
/// Adds any new Redis messages to the `MsgQueue` for the appropriate `ClientAgent`.
|
||||
pub fn poll_redis(&mut self, msg_queues: &mut MessageQueues) {
|
||||
pub fn poll_redis(&mut self) -> Vec<(String, Value)> {
|
||||
let mut buffer = vec![0u8; 6000];
|
||||
let mut messages = Vec::new();
|
||||
|
||||
if let Async::Ready(num_bytes_read) = self.poll_read(&mut buffer).unwrap() {
|
||||
let raw_utf = self.as_utf8(buffer, num_bytes_read);
|
||||
|
@ -36,7 +38,7 @@ impl RedisStream {
|
|||
|
||||
// Only act if we have a full message (end on a msg boundary)
|
||||
if !self.incoming_raw_msg.ends_with("}\r\n") {
|
||||
return;
|
||||
return messages;
|
||||
};
|
||||
let prefix_to_skip = match &*self.namespace {
|
||||
Some(namespace) => format!("{}:timeline:", namespace),
|
||||
|
@ -49,12 +51,8 @@ impl RedisStream {
|
|||
let command = msg.next_field();
|
||||
match command.as_str() {
|
||||
"message" => {
|
||||
let (timeline, msg_value) = msg.extract_timeline_and_message();
|
||||
for msg_queue in msg_queues.values_mut() {
|
||||
if msg_queue.redis_channel == timeline {
|
||||
msg_queue.messages.push_back(msg_value.clone());
|
||||
}
|
||||
}
|
||||
let (raw_timeline, msg_value) = msg.extract_raw_timeline_and_message();
|
||||
messages.push((raw_timeline, msg_value));
|
||||
}
|
||||
|
||||
"subscribe" | "unsubscribe" => {
|
||||
|
@ -64,12 +62,13 @@ impl RedisStream {
|
|||
let _active_subscriptions = msg.process_number();
|
||||
msg.cursor += "\r\n".len();
|
||||
}
|
||||
cmd => panic!("Invariant violation: {} is invalid Redis input", cmd),
|
||||
cmd => panic!("Invariant violation: {} is unexpected Redis output", cmd),
|
||||
};
|
||||
msg = RedisMsg::from_raw(&msg.raw[msg.cursor..], msg.prefix_len);
|
||||
}
|
||||
self.incoming_raw_msg.clear();
|
||||
}
|
||||
messages
|
||||
}
|
||||
|
||||
fn as_utf8(&mut self, cur_buffer: Vec<u8>, size: usize) -> String {
|
||||
|
|
Loading…
Reference in New Issue