mirror of
https://github.com/mastodon/flodgatt
synced 2025-04-11 22:58:25 +02:00
Reorganize code [WIP]
This commit is contained in:
parent
a2e879ec3a
commit
829d193ec3
@ -12,7 +12,7 @@ mod tag;
|
|||||||
mod visibility;
|
mod visibility;
|
||||||
|
|
||||||
pub use announcement::Announcement;
|
pub use announcement::Announcement;
|
||||||
pub(in crate::messages::event) use announcement_reaction::AnnouncementReaction;
|
pub(in crate::event) use announcement_reaction::AnnouncementReaction;
|
||||||
pub use conversation::Conversation;
|
pub use conversation::Conversation;
|
||||||
pub use id::Id;
|
pub use id::Id;
|
||||||
pub use notification::Notification;
|
pub use notification::Notification;
|
@ -40,6 +40,6 @@
|
|||||||
|
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod err;
|
pub mod err;
|
||||||
pub mod messages;
|
pub mod event;
|
||||||
pub mod request;
|
pub mod request;
|
||||||
pub mod response;
|
pub mod response;
|
||||||
|
89
src/main.rs
89
src/main.rs
@ -1,7 +1,7 @@
|
|||||||
use flodgatt::config;
|
use flodgatt::config;
|
||||||
use flodgatt::err::FatalErr;
|
use flodgatt::err::FatalErr;
|
||||||
use flodgatt::messages::Event;
|
use flodgatt::event::Event;
|
||||||
use flodgatt::request::{self, Subscription, Timeline};
|
use flodgatt::request::{Handler, Subscription, Timeline};
|
||||||
use flodgatt::response::redis;
|
use flodgatt::response::redis;
|
||||||
use flodgatt::response::stream;
|
use flodgatt::response::stream;
|
||||||
|
|
||||||
@ -13,10 +13,8 @@ use std::time::Instant;
|
|||||||
use tokio::net::UnixListener;
|
use tokio::net::UnixListener;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
use tokio::timer::Interval;
|
use tokio::timer::Interval;
|
||||||
use warp::http::StatusCode;
|
|
||||||
use warp::path;
|
|
||||||
use warp::ws::Ws2;
|
use warp::ws::Ws2;
|
||||||
use warp::{Filter, Rejection};
|
use warp::Filter;
|
||||||
|
|
||||||
fn main() -> Result<(), FatalErr> {
|
fn main() -> Result<(), FatalErr> {
|
||||||
config::merge_dotenv()?;
|
config::merge_dotenv()?;
|
||||||
@ -27,7 +25,7 @@ fn main() -> Result<(), FatalErr> {
|
|||||||
let (event_tx, event_rx) = watch::channel((Timeline::empty(), Event::Ping));
|
let (event_tx, event_rx) = watch::channel((Timeline::empty(), Event::Ping));
|
||||||
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
|
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let request_handler = request::Handler::new(postgres_cfg, *cfg.whitelist_mode);
|
let request = Handler::new(postgres_cfg, *cfg.whitelist_mode);
|
||||||
let poll_freq = *redis_cfg.polling_interval;
|
let poll_freq = *redis_cfg.polling_interval;
|
||||||
let shared_manager = redis::Manager::try_from(redis_cfg, event_tx, cmd_rx)?.into_arc();
|
let shared_manager = redis::Manager::try_from(redis_cfg, event_tx, cmd_rx)?.into_arc();
|
||||||
|
|
||||||
@ -35,38 +33,27 @@ fn main() -> Result<(), FatalErr> {
|
|||||||
let sse_manager = shared_manager.clone();
|
let sse_manager = shared_manager.clone();
|
||||||
let (sse_rx, sse_cmd_tx) = (event_rx.clone(), cmd_tx.clone());
|
let (sse_rx, sse_cmd_tx) = (event_rx.clone(), cmd_tx.clone());
|
||||||
|
|
||||||
let sse = request_handler
|
let sse = request
|
||||||
.parse_sse_request()
|
.sse_subscription()
|
||||||
.and(warp::sse())
|
.and(warp::sse())
|
||||||
.map(
|
.map(move |subscription: Subscription, sse: warp::sse::Sse| {
|
||||||
move |subscription: Subscription, client_conn: warp::sse::Sse| {
|
log::info!("Incoming SSE request for {:?}", subscription.timeline);
|
||||||
log::info!("Incoming SSE request for {:?}", subscription.timeline);
|
let mut manager = sse_manager.lock().unwrap_or_else(redis::Manager::recover);
|
||||||
{
|
manager.subscribe(&subscription);
|
||||||
let mut manager = sse_manager.lock().unwrap_or_else(redis::Manager::recover);
|
|
||||||
manager.subscribe(&subscription);
|
|
||||||
}
|
|
||||||
|
|
||||||
stream::Sse::send_events(
|
stream::Sse::send_events(sse, sse_cmd_tx.clone(), subscription, sse_rx.clone())
|
||||||
client_conn,
|
})
|
||||||
sse_cmd_tx.clone(),
|
|
||||||
subscription,
|
|
||||||
sse_rx.clone(),
|
|
||||||
)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.with(warp::reply::with::header("Connection", "keep-alive"));
|
.with(warp::reply::with::header("Connection", "keep-alive"));
|
||||||
|
|
||||||
// WebSocket
|
// WebSocket
|
||||||
let ws_manager = shared_manager.clone();
|
let ws_manager = shared_manager.clone();
|
||||||
let ws = request_handler
|
let ws = request
|
||||||
.parse_ws_request()
|
.ws_subscription()
|
||||||
.and(warp::ws::ws2())
|
.and(warp::ws::ws2())
|
||||||
.map(move |subscription: Subscription, ws: Ws2| {
|
.map(move |subscription: Subscription, ws: Ws2| {
|
||||||
log::info!("Incoming websocket request for {:?}", subscription.timeline);
|
log::info!("Incoming websocket request for {:?}", subscription.timeline);
|
||||||
{
|
let mut manager = ws_manager.lock().unwrap_or_else(redis::Manager::recover);
|
||||||
let mut manager = ws_manager.lock().unwrap_or_else(redis::Manager::recover);
|
manager.subscribe(&subscription);
|
||||||
manager.subscribe(&subscription);
|
|
||||||
}
|
|
||||||
let token = subscription.access_token.clone().unwrap_or_default(); // token sent for security
|
let token = subscription.access_token.clone().unwrap_or_default(); // token sent for security
|
||||||
let ws_stream = stream::Ws::new(cmd_tx.clone(), event_rx.clone(), subscription);
|
let ws_stream = stream::Ws::new(cmd_tx.clone(), event_rx.clone(), subscription);
|
||||||
|
|
||||||
@ -74,28 +61,24 @@ fn main() -> Result<(), FatalErr> {
|
|||||||
})
|
})
|
||||||
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
|
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
|
||||||
|
|
||||||
|
#[cfg(feature = "stub_status")]
|
||||||
|
let status = {
|
||||||
|
let (r1, r3) = (shared_manager.clone(), shared_manager.clone());
|
||||||
|
#[rustfmt::skip]
|
||||||
|
request.health().map(|| "OK")
|
||||||
|
.or(request.status()
|
||||||
|
.map(move || r1.lock().unwrap_or_else(redis::Manager::recover).count()))
|
||||||
|
.or(request.status_per_timeline()
|
||||||
|
.map(move || r3.lock().unwrap_or_else(redis::Manager::recover).list()))
|
||||||
|
};
|
||||||
|
#[cfg(not(feature = "stub_status"))]
|
||||||
|
let status = request.health().map(|| "OK");
|
||||||
|
|
||||||
let cors = warp::cors()
|
let cors = warp::cors()
|
||||||
.allow_any_origin()
|
.allow_any_origin()
|
||||||
.allow_methods(cfg.cors.allowed_methods)
|
.allow_methods(cfg.cors.allowed_methods)
|
||||||
.allow_headers(cfg.cors.allowed_headers);
|
.allow_headers(cfg.cors.allowed_headers);
|
||||||
|
|
||||||
// TODO -- extract to separate file
|
|
||||||
#[cfg(feature = "stub_status")]
|
|
||||||
let status = {
|
|
||||||
let (r1, r3) = (shared_manager.clone(), shared_manager.clone());
|
|
||||||
warp::path!("api" / "v1" / "streaming" / "health")
|
|
||||||
.map(|| "OK")
|
|
||||||
.or(warp::path!("api" / "v1" / "streaming" / "status")
|
|
||||||
.and(warp::path::end())
|
|
||||||
.map(move || r1.lock().unwrap_or_else(redis::Manager::recover).count()))
|
|
||||||
.or(
|
|
||||||
warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline")
|
|
||||||
.map(move || r3.lock().unwrap_or_else(redis::Manager::recover).list()),
|
|
||||||
)
|
|
||||||
};
|
|
||||||
#[cfg(not(feature = "stub_status"))]
|
|
||||||
let status = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK");
|
|
||||||
|
|
||||||
let streaming_server = move || {
|
let streaming_server = move || {
|
||||||
let manager = shared_manager.clone();
|
let manager = shared_manager.clone();
|
||||||
let stream = Interval::new(Instant::now(), poll_freq)
|
let stream = Interval::new(Instant::now(), poll_freq)
|
||||||
@ -106,7 +89,7 @@ fn main() -> Result<(), FatalErr> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
warp::spawn(lazy(move || stream));
|
warp::spawn(lazy(move || stream));
|
||||||
warp::serve(ws.or(sse).with(cors).or(status).recover(recover))
|
warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err))
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(socket) = &*cfg.unix_socket {
|
if let Some(socket) = &*cfg.unix_socket {
|
||||||
@ -122,15 +105,3 @@ fn main() -> Result<(), FatalErr> {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO -- extract to separate file
|
|
||||||
fn recover(r: Rejection) -> Result<impl warp::Reply, warp::Rejection> {
|
|
||||||
let json_err = match r.cause() {
|
|
||||||
Some(text) if text.to_string() == "Missing request header 'authorization'" => {
|
|
||||||
warp::reply::json(&"Error: Missing access token".to_string())
|
|
||||||
}
|
|
||||||
Some(text) => warp::reply::json(&text.to_string()),
|
|
||||||
None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()),
|
|
||||||
};
|
|
||||||
Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED))
|
|
||||||
}
|
|
||||||
|
@ -1,3 +0,0 @@
|
|||||||
mod event;
|
|
||||||
|
|
||||||
pub use event::{CheckedEvent, DynEvent, Event, EventErr, EventKind, Id};
|
|
File diff suppressed because one or more lines are too long
118
src/request.rs
118
src/request.rs
@ -12,7 +12,10 @@ pub use timeline::{Content, Reach, Stream, Timeline};
|
|||||||
|
|
||||||
use self::query::Query;
|
use self::query::Query;
|
||||||
use crate::config;
|
use crate::config;
|
||||||
use warp::{filters::BoxedFilter, path, Filter};
|
use warp::filters::BoxedFilter;
|
||||||
|
use warp::http::StatusCode;
|
||||||
|
use warp::path;
|
||||||
|
use warp::{Filter, Rejection};
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod sse_test;
|
mod sse_test;
|
||||||
@ -59,39 +62,25 @@ impl Handler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn parse_ws_request(&self) -> BoxedFilter<(Subscription,)> {
|
pub fn sse_subscription(&self) -> BoxedFilter<(Subscription,)> {
|
||||||
let pg_conn = self.pg_conn.clone();
|
|
||||||
parse_ws_query()
|
|
||||||
.and(query::OptionalAccessToken::from_ws_header())
|
|
||||||
.and_then(Query::update_access_token)
|
|
||||||
.and_then(move |q| Subscription::query_postgres(q, pg_conn.clone()))
|
|
||||||
.boxed()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn parse_sse_request(&self) -> BoxedFilter<(Subscription,)> {
|
|
||||||
let pg_conn = self.pg_conn.clone();
|
let pg_conn = self.pg_conn.clone();
|
||||||
any_of!(
|
any_of!(
|
||||||
parse_sse_query!(
|
parse_sse_query!( path => "api" / "v1" / "streaming" / "user" / "notification"
|
||||||
path => "api" / "v1" / "streaming" / "user" / "notification"
|
endpoint => "user:notification" ),
|
||||||
endpoint => "user:notification" ),
|
parse_sse_query!( path => "api" / "v1" / "streaming" / "user"
|
||||||
parse_sse_query!(
|
endpoint => "user"),
|
||||||
path => "api" / "v1" / "streaming" / "user"
|
parse_sse_query!( path => "api" / "v1" / "streaming" / "public" / "local"
|
||||||
endpoint => "user"),
|
endpoint => "public:local"),
|
||||||
parse_sse_query!(
|
parse_sse_query!( path => "api" / "v1" / "streaming" / "public"
|
||||||
path => "api" / "v1" / "streaming" / "public" / "local"
|
endpoint => "public"),
|
||||||
endpoint => "public:local"),
|
parse_sse_query!( path => "api" / "v1" / "streaming" / "direct"
|
||||||
parse_sse_query!(
|
endpoint => "direct"),
|
||||||
path => "api" / "v1" / "streaming" / "public"
|
parse_sse_query!( path => "api" / "v1" / "streaming" / "hashtag" / "local"
|
||||||
endpoint => "public"),
|
endpoint => "hashtag:local"),
|
||||||
parse_sse_query!(
|
parse_sse_query!( path => "api" / "v1" / "streaming" / "hashtag"
|
||||||
path => "api" / "v1" / "streaming" / "direct"
|
endpoint => "hashtag"),
|
||||||
endpoint => "direct"),
|
parse_sse_query!( path => "api" / "v1" / "streaming" / "list"
|
||||||
parse_sse_query!(path => "api" / "v1" / "streaming" / "hashtag" / "local"
|
endpoint => "list")
|
||||||
endpoint => "hashtag:local"),
|
|
||||||
parse_sse_query!(path => "api" / "v1" / "streaming" / "hashtag"
|
|
||||||
endpoint => "hashtag"),
|
|
||||||
parse_sse_query!(path => "api" / "v1" / "streaming" / "list"
|
|
||||||
endpoint => "list")
|
|
||||||
)
|
)
|
||||||
// because SSE requests place their `access_token` in the header instead of in a query
|
// because SSE requests place their `access_token` in the header instead of in a query
|
||||||
// parameter, we need to update our Query if the header has a token
|
// parameter, we need to update our Query if the header has a token
|
||||||
@ -100,30 +89,57 @@ impl Handler {
|
|||||||
.and_then(move |q| Subscription::query_postgres(q, pg_conn.clone()))
|
.and_then(move |q| Subscription::query_postgres(q, pg_conn.clone()))
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn ws_subscription(&self) -> BoxedFilter<(Subscription,)> {
|
||||||
|
let pg_conn = self.pg_conn.clone();
|
||||||
|
parse_ws_query()
|
||||||
|
.and(query::OptionalAccessToken::from_ws_header())
|
||||||
|
.and_then(Query::update_access_token)
|
||||||
|
.and_then(move |q| Subscription::query_postgres(q, pg_conn.clone()))
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn health(&self) -> BoxedFilter<()> {
|
||||||
|
warp::path!("api" / "v1" / "streaming" / "health").boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn status(&self) -> BoxedFilter<()> {
|
||||||
|
warp::path!("api" / "v1" / "streaming" / "status")
|
||||||
|
.and(warp::path::end())
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn status_per_timeline(&self) -> BoxedFilter<()> {
|
||||||
|
warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline").boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn err(r: Rejection) -> Result<impl warp::Reply, warp::Rejection> {
|
||||||
|
let json_err = match r.cause() {
|
||||||
|
Some(text) if text.to_string() == "Missing request header 'authorization'" => {
|
||||||
|
warp::reply::json(&"Error: Missing access token".to_string())
|
||||||
|
}
|
||||||
|
Some(text) => warp::reply::json(&text.to_string()),
|
||||||
|
None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()),
|
||||||
|
};
|
||||||
|
Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_ws_query() -> BoxedFilter<(Query,)> {
|
fn parse_ws_query() -> BoxedFilter<(Query,)> {
|
||||||
|
use query::*;
|
||||||
path!("api" / "v1" / "streaming")
|
path!("api" / "v1" / "streaming")
|
||||||
.and(path::end())
|
.and(path::end())
|
||||||
.and(warp::query())
|
.and(warp::query())
|
||||||
.and(query::Auth::to_filter())
|
.and(Auth::to_filter())
|
||||||
.and(query::Media::to_filter())
|
.and(Media::to_filter())
|
||||||
.and(query::Hashtag::to_filter())
|
.and(Hashtag::to_filter())
|
||||||
.and(query::List::to_filter())
|
.and(List::to_filter())
|
||||||
.map(
|
.map(|s: Stream, a: Auth, m: Media, h: Hashtag, l: List| Query {
|
||||||
|stream: query::Stream,
|
access_token: a.access_token,
|
||||||
auth: query::Auth,
|
stream: s.stream,
|
||||||
media: query::Media,
|
media: m.is_truthy(),
|
||||||
hashtag: query::Hashtag,
|
hashtag: h.tag,
|
||||||
list: query::List| {
|
list: l.list,
|
||||||
Query {
|
})
|
||||||
access_token: auth.access_token,
|
|
||||||
stream: stream.stream,
|
|
||||||
media: media.is_truthy(),
|
|
||||||
hashtag: hashtag.tag,
|
|
||||||
list: list.list,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
//! Postgres queries
|
//! Postgres queries
|
||||||
use crate::config;
|
use crate::config;
|
||||||
use crate::messages::Id;
|
use crate::event::Id;
|
||||||
use crate::request::timeline::{Scope, UserData};
|
use crate::request::timeline::{Scope, UserData};
|
||||||
|
|
||||||
use ::postgres;
|
use ::postgres;
|
||||||
|
@ -8,7 +8,7 @@
|
|||||||
use super::postgres::PgPool;
|
use super::postgres::PgPool;
|
||||||
use super::query::Query;
|
use super::query::Query;
|
||||||
use super::{Content, Reach, Stream, Timeline};
|
use super::{Content, Reach, Stream, Timeline};
|
||||||
use crate::messages::Id;
|
use crate::event::Id;
|
||||||
|
|
||||||
use hashbrown::HashSet;
|
use hashbrown::HashSet;
|
||||||
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use super::query::Query;
|
use super::query::Query;
|
||||||
use crate::err::TimelineErr;
|
use crate::err::TimelineErr;
|
||||||
use crate::messages::Id;
|
use crate::event::Id;
|
||||||
|
|
||||||
use hashbrown::HashSet;
|
use hashbrown::HashSet;
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
@ -12,8 +12,7 @@ pub struct Timeline(pub Stream, pub Reach, pub Content);
|
|||||||
|
|
||||||
impl Timeline {
|
impl Timeline {
|
||||||
pub fn empty() -> Self {
|
pub fn empty() -> Self {
|
||||||
use {Content::*, Reach::*, Stream::*};
|
Self(Stream::Unset, Reach::Local, Content::Notification)
|
||||||
Self(Unset, Local, Notification)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> Result<String, TimelineErr> {
|
pub fn to_redis_raw_timeline(&self, hashtag: Option<&String>) -> Result<String, TimelineErr> {
|
||||||
@ -26,11 +25,11 @@ impl Timeline {
|
|||||||
// TODO -- would `.push_str` be faster here?
|
// TODO -- would `.push_str` be faster here?
|
||||||
Timeline(Hashtag(_id), Federated, All) => format!(
|
Timeline(Hashtag(_id), Federated, All) => format!(
|
||||||
"timeline:hashtag:{}",
|
"timeline:hashtag:{}",
|
||||||
hashtag.ok_or_else(|| TimelineErr::MissingHashtag)?
|
hashtag.ok_or(TimelineErr::MissingHashtag)?
|
||||||
),
|
),
|
||||||
Timeline(Hashtag(_id), Local, All) => format!(
|
Timeline(Hashtag(_id), Local, All) => format!(
|
||||||
"timeline:hashtag:{}:local",
|
"timeline:hashtag:{}:local",
|
||||||
hashtag.ok_or_else(|| TimelineErr::MissingHashtag)?
|
hashtag.ok_or(TimelineErr::MissingHashtag)?
|
||||||
),
|
),
|
||||||
Timeline(User(id), Federated, All) => format!("timeline:{}", id),
|
Timeline(User(id), Federated, All) => format!("timeline:{}", id),
|
||||||
Timeline(User(id), Federated, Notification) => format!("timeline:{}:notification", id),
|
Timeline(User(id), Federated, Notification) => format!("timeline:{}:notification", id),
|
||||||
@ -44,6 +43,7 @@ impl Timeline {
|
|||||||
timeline: &str,
|
timeline: &str,
|
||||||
cache: &mut LruCache<String, i64>,
|
cache: &mut LruCache<String, i64>,
|
||||||
) -> Result<Self, TimelineErr> {
|
) -> Result<Self, TimelineErr> {
|
||||||
|
// TODO -- can a combinator shorten this?
|
||||||
let mut id_from_tag = |tag: &str| match cache.get(&tag.to_string()) {
|
let mut id_from_tag = |tag: &str| match cache.get(&tag.to_string()) {
|
||||||
Some(id) => Ok(*id),
|
Some(id) => Ok(*id),
|
||||||
None => Err(TimelineErr::InvalidInput), // TODO more specific
|
None => Err(TimelineErr::InvalidInput), // TODO more specific
|
||||||
|
@ -4,7 +4,7 @@ pub use err::RedisConnErr;
|
|||||||
use super::msg::{RedisParseErr, RedisParseOutput};
|
use super::msg::{RedisParseErr, RedisParseOutput};
|
||||||
use super::ManagerErr;
|
use super::ManagerErr;
|
||||||
use crate::config::Redis;
|
use crate::config::Redis;
|
||||||
use crate::messages::Event;
|
use crate::event::Event;
|
||||||
use crate::request::{Stream, Timeline};
|
use crate::request::{Stream, Timeline};
|
||||||
use futures::{Async, Poll};
|
use futures::{Async, Poll};
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
|
@ -6,7 +6,7 @@ pub use err::ManagerErr;
|
|||||||
|
|
||||||
use super::{connection::RedisCmd, RedisConn};
|
use super::{connection::RedisCmd, RedisConn};
|
||||||
use crate::config;
|
use crate::config;
|
||||||
use crate::messages::Event;
|
use crate::event::Event;
|
||||||
use crate::request::{Stream, Subscription, Timeline};
|
use crate::request::{Stream, Subscription, Timeline};
|
||||||
|
|
||||||
use futures::{Async, Stream as _Stream};
|
use futures::{Async, Stream as _Stream};
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use super::super::{RedisConnErr, RedisParseErr};
|
use super::super::{RedisConnErr, RedisParseErr};
|
||||||
use crate::err::TimelineErr;
|
use crate::err::TimelineErr;
|
||||||
use crate::messages::{Event, EventErr};
|
use crate::event::{Event, EventErr};
|
||||||
use crate::request::Timeline;
|
use crate::request::Timeline;
|
||||||
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
@ -1,184 +1,5 @@
|
|||||||
use crate::messages::Event;
|
pub use sse::Sse;
|
||||||
use crate::request::{Subscription, Timeline};
|
pub use ws::Ws;
|
||||||
|
|
||||||
use futures::{future::Future, stream::Stream};
|
mod sse;
|
||||||
use log;
|
mod ws;
|
||||||
use std::time::Duration;
|
|
||||||
use tokio::sync::{mpsc, watch};
|
|
||||||
use warp::{
|
|
||||||
reply::Reply,
|
|
||||||
sse::{ServerSentEvent, Sse as WarpSse},
|
|
||||||
ws::{Message, WebSocket},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub struct Ws {
|
|
||||||
unsubscribe_tx: mpsc::UnboundedSender<Timeline>,
|
|
||||||
subscription: Subscription,
|
|
||||||
ws_rx: watch::Receiver<(Timeline, Event)>,
|
|
||||||
ws_tx: Option<mpsc::UnboundedSender<Message>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Ws {
|
|
||||||
pub fn new(
|
|
||||||
unsubscribe_tx: mpsc::UnboundedSender<Timeline>,
|
|
||||||
ws_rx: watch::Receiver<(Timeline, Event)>,
|
|
||||||
subscription: Subscription,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
unsubscribe_tx,
|
|
||||||
subscription,
|
|
||||||
ws_rx,
|
|
||||||
ws_tx: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send_to(mut self, ws: WebSocket) -> impl Future<Item = (), Error = ()> {
|
|
||||||
let (transmit_to_ws, _receive_from_ws) = ws.split();
|
|
||||||
// Create a pipe
|
|
||||||
let (ws_tx, ws_rx) = mpsc::unbounded_channel();
|
|
||||||
self.ws_tx = Some(ws_tx);
|
|
||||||
|
|
||||||
// Send one end of it to a different green thread and tell that end to forward
|
|
||||||
// whatever it gets on to the WebSocket client
|
|
||||||
warp::spawn(
|
|
||||||
ws_rx
|
|
||||||
.map_err(|_| -> warp::Error { unreachable!() })
|
|
||||||
.forward(transmit_to_ws)
|
|
||||||
.map(|_r| ())
|
|
||||||
.map_err(|e| match e.to_string().as_ref() {
|
|
||||||
"IO error: Broken pipe (os error 32)" => (), // just closed unix socket
|
|
||||||
_ => log::warn!("WebSocket send error: {}", e),
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
let target_timeline = self.subscription.timeline;
|
|
||||||
let incoming_events = self.ws_rx.clone().map_err(|_| ());
|
|
||||||
|
|
||||||
incoming_events.for_each(move |(tl, event)| {
|
|
||||||
if matches!(event, Event::Ping) {
|
|
||||||
self.send_ping()
|
|
||||||
} else if target_timeline == tl {
|
|
||||||
use crate::messages::{CheckedEvent::Update, Event::*, EventKind};
|
|
||||||
use crate::request::Stream::Public;
|
|
||||||
let blocks = &self.subscription.blocks;
|
|
||||||
let allowed_langs = &self.subscription.allowed_langs;
|
|
||||||
|
|
||||||
match event {
|
|
||||||
TypeSafe(Update { payload, queued_at }) => match tl {
|
|
||||||
Timeline(Public, _, _) if payload.language_not(allowed_langs) => Ok(()),
|
|
||||||
_ if payload.involves_any(&blocks) => Ok(()),
|
|
||||||
_ => self.send_msg(TypeSafe(Update { payload, queued_at })),
|
|
||||||
},
|
|
||||||
TypeSafe(non_update) => self.send_msg(TypeSafe(non_update)),
|
|
||||||
Dynamic(dyn_event) => {
|
|
||||||
if let EventKind::Update(s) = dyn_event.kind.clone() {
|
|
||||||
match tl {
|
|
||||||
Timeline(Public, _, _) if s.language_not(allowed_langs) => Ok(()),
|
|
||||||
_ if s.involves_any(&blocks) => Ok(()),
|
|
||||||
_ => self.send_msg(Dynamic(dyn_event)),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.send_msg(Dynamic(dyn_event))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ping => unreachable!(), // handled pings above
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn send_ping(&mut self) -> Result<(), ()> {
|
|
||||||
self.send_txt("{}")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn send_msg(&mut self, event: Event) -> Result<(), ()> {
|
|
||||||
self.send_txt(&event.to_json_string())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn send_txt(&mut self, txt: &str) -> Result<(), ()> {
|
|
||||||
let tl = self.subscription.timeline;
|
|
||||||
match self.ws_tx.clone().ok_or(())?.try_send(Message::text(txt)) {
|
|
||||||
Ok(_) => Ok(()),
|
|
||||||
Err(_) => {
|
|
||||||
self.unsubscribe_tx
|
|
||||||
.try_send(tl)
|
|
||||||
.unwrap_or_else(|e| log::error!("could not unsubscribe from channel: {}", e));
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Sse;
|
|
||||||
|
|
||||||
impl Sse {
|
|
||||||
fn reply_with(event: Event) -> Option<(impl ServerSentEvent, impl ServerSentEvent)> {
|
|
||||||
Some((
|
|
||||||
warp::sse::event(event.event_name()),
|
|
||||||
warp::sse::data(event.payload().unwrap_or_else(String::new)),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send_events(
|
|
||||||
sse: WarpSse,
|
|
||||||
mut unsubscribe_tx: mpsc::UnboundedSender<Timeline>,
|
|
||||||
subscription: Subscription,
|
|
||||||
sse_rx: watch::Receiver<(Timeline, Event)>,
|
|
||||||
) -> impl Reply {
|
|
||||||
let target_timeline = subscription.timeline;
|
|
||||||
let allowed_langs = subscription.allowed_langs;
|
|
||||||
let blocks = subscription.blocks;
|
|
||||||
|
|
||||||
let event_stream = sse_rx
|
|
||||||
.filter(move |(timeline, _)| target_timeline == *timeline)
|
|
||||||
.filter_map(move |(timeline, event)| {
|
|
||||||
use crate::messages::{
|
|
||||||
CheckedEvent, CheckedEvent::Update, DynEvent, Event::*, EventKind,
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::request::Stream::Public;
|
|
||||||
match event {
|
|
||||||
TypeSafe(Update { payload, queued_at }) => match timeline {
|
|
||||||
Timeline(Public, _, _) if payload.language_not(&allowed_langs) => None,
|
|
||||||
_ if payload.involves_any(&blocks) => None,
|
|
||||||
_ => Self::reply_with(Event::TypeSafe(CheckedEvent::Update {
|
|
||||||
payload,
|
|
||||||
queued_at,
|
|
||||||
})),
|
|
||||||
},
|
|
||||||
TypeSafe(non_update) => Self::reply_with(Event::TypeSafe(non_update)),
|
|
||||||
Dynamic(dyn_event) => {
|
|
||||||
if let EventKind::Update(s) = dyn_event.kind {
|
|
||||||
match timeline {
|
|
||||||
Timeline(Public, _, _) if s.language_not(&allowed_langs) => None,
|
|
||||||
_ if s.involves_any(&blocks) => None,
|
|
||||||
_ => Self::reply_with(Dynamic(DynEvent {
|
|
||||||
kind: EventKind::Update(s),
|
|
||||||
..dyn_event
|
|
||||||
})),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ping => None, // pings handled automatically
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.then(move |res| {
|
|
||||||
unsubscribe_tx
|
|
||||||
.try_send(target_timeline)
|
|
||||||
.unwrap_or_else(|e| log::error!("could not unsubscribe from channel: {}", e));
|
|
||||||
res
|
|
||||||
});
|
|
||||||
|
|
||||||
sse.reply(
|
|
||||||
warp::sse::keep_alive()
|
|
||||||
.interval(Duration::from_secs(30))
|
|
||||||
.text("thump".to_string())
|
|
||||||
.stream(event_stream),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// TODO -- split WS and SSE into separate files and add misc stuff from main.rs here
|
|
||||||
|
80
src/response/stream/sse.rs
Normal file
80
src/response/stream/sse.rs
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
use crate::event::Event;
|
||||||
|
use crate::request::{Subscription, Timeline};
|
||||||
|
|
||||||
|
use futures::stream::Stream;
|
||||||
|
use log;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::sync::{mpsc, watch};
|
||||||
|
use warp::reply::Reply;
|
||||||
|
use warp::sse::{ServerSentEvent, Sse as WarpSse};
|
||||||
|
|
||||||
|
pub struct Sse;
|
||||||
|
|
||||||
|
impl Sse {
|
||||||
|
fn reply_with(event: Event) -> Option<(impl ServerSentEvent, impl ServerSentEvent)> {
|
||||||
|
Some((
|
||||||
|
warp::sse::event(event.event_name()),
|
||||||
|
warp::sse::data(event.payload().unwrap_or_else(String::new)),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send_events(
|
||||||
|
sse: WarpSse,
|
||||||
|
mut unsubscribe_tx: mpsc::UnboundedSender<Timeline>,
|
||||||
|
subscription: Subscription,
|
||||||
|
sse_rx: watch::Receiver<(Timeline, Event)>,
|
||||||
|
) -> impl Reply {
|
||||||
|
let target_timeline = subscription.timeline;
|
||||||
|
let allowed_langs = subscription.allowed_langs;
|
||||||
|
let blocks = subscription.blocks;
|
||||||
|
|
||||||
|
let event_stream = sse_rx
|
||||||
|
.filter(move |(timeline, _)| target_timeline == *timeline)
|
||||||
|
.filter_map(move |(timeline, event)| {
|
||||||
|
use crate::event::{
|
||||||
|
CheckedEvent, CheckedEvent::Update, DynEvent, Event::*, EventKind,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::request::Stream::Public;
|
||||||
|
match event {
|
||||||
|
TypeSafe(Update { payload, queued_at }) => match timeline {
|
||||||
|
Timeline(Public, _, _) if payload.language_not(&allowed_langs) => None,
|
||||||
|
_ if payload.involves_any(&blocks) => None,
|
||||||
|
_ => Self::reply_with(Event::TypeSafe(CheckedEvent::Update {
|
||||||
|
payload,
|
||||||
|
queued_at,
|
||||||
|
})),
|
||||||
|
},
|
||||||
|
TypeSafe(non_update) => Self::reply_with(Event::TypeSafe(non_update)),
|
||||||
|
Dynamic(dyn_event) => {
|
||||||
|
if let EventKind::Update(s) = dyn_event.kind {
|
||||||
|
match timeline {
|
||||||
|
Timeline(Public, _, _) if s.language_not(&allowed_langs) => None,
|
||||||
|
_ if s.involves_any(&blocks) => None,
|
||||||
|
_ => Self::reply_with(Dynamic(DynEvent {
|
||||||
|
kind: EventKind::Update(s),
|
||||||
|
..dyn_event
|
||||||
|
})),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ping => None, // pings handled automatically
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.then(move |res| {
|
||||||
|
unsubscribe_tx
|
||||||
|
.try_send(target_timeline)
|
||||||
|
.unwrap_or_else(|e| log::error!("could not unsubscribe from channel: {}", e));
|
||||||
|
res
|
||||||
|
});
|
||||||
|
|
||||||
|
sse.reply(
|
||||||
|
warp::sse::keep_alive()
|
||||||
|
.interval(Duration::from_secs(30))
|
||||||
|
.text("thump".to_string())
|
||||||
|
.stream(event_stream),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
106
src/response/stream/ws.rs
Normal file
106
src/response/stream/ws.rs
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
use crate::event::Event;
|
||||||
|
use crate::request::{Subscription, Timeline};
|
||||||
|
|
||||||
|
use futures::{future::Future, stream::Stream};
|
||||||
|
use tokio::sync::{mpsc, watch};
|
||||||
|
use warp::ws::{Message, WebSocket};
|
||||||
|
|
||||||
|
pub struct Ws {
|
||||||
|
unsubscribe_tx: mpsc::UnboundedSender<Timeline>,
|
||||||
|
subscription: Subscription,
|
||||||
|
ws_rx: watch::Receiver<(Timeline, Event)>,
|
||||||
|
ws_tx: Option<mpsc::UnboundedSender<Message>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ws {
|
||||||
|
pub fn new(
|
||||||
|
unsubscribe_tx: mpsc::UnboundedSender<Timeline>,
|
||||||
|
ws_rx: watch::Receiver<(Timeline, Event)>,
|
||||||
|
subscription: Subscription,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
unsubscribe_tx,
|
||||||
|
subscription,
|
||||||
|
ws_rx,
|
||||||
|
ws_tx: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send_to(mut self, ws: WebSocket) -> impl Future<Item = (), Error = ()> {
|
||||||
|
let (transmit_to_ws, _receive_from_ws) = ws.split();
|
||||||
|
// Create a pipe
|
||||||
|
let (ws_tx, ws_rx) = mpsc::unbounded_channel();
|
||||||
|
self.ws_tx = Some(ws_tx);
|
||||||
|
|
||||||
|
// Send one end of it to a different green thread and tell that end to forward
|
||||||
|
// whatever it gets on to the WebSocket client
|
||||||
|
warp::spawn(
|
||||||
|
ws_rx
|
||||||
|
.map_err(|_| -> warp::Error { unreachable!() })
|
||||||
|
.forward(transmit_to_ws)
|
||||||
|
.map(|_r| ())
|
||||||
|
.map_err(|e| match e.to_string().as_ref() {
|
||||||
|
"IO error: Broken pipe (os error 32)" => (), // just closed unix socket
|
||||||
|
_ => log::warn!("WebSocket send error: {}", e),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
let target_timeline = self.subscription.timeline;
|
||||||
|
let incoming_events = self.ws_rx.clone().map_err(|_| ());
|
||||||
|
|
||||||
|
incoming_events.for_each(move |(tl, event)| {
|
||||||
|
if matches!(event, Event::Ping) {
|
||||||
|
self.send_ping()
|
||||||
|
} else if target_timeline == tl {
|
||||||
|
use crate::event::{CheckedEvent::Update, Event::*, EventKind};
|
||||||
|
use crate::request::Stream::Public;
|
||||||
|
let blocks = &self.subscription.blocks;
|
||||||
|
let allowed_langs = &self.subscription.allowed_langs;
|
||||||
|
|
||||||
|
match event {
|
||||||
|
TypeSafe(Update { payload, queued_at }) => match tl {
|
||||||
|
Timeline(Public, _, _) if payload.language_not(allowed_langs) => Ok(()),
|
||||||
|
_ if payload.involves_any(&blocks) => Ok(()),
|
||||||
|
_ => self.send_msg(TypeSafe(Update { payload, queued_at })),
|
||||||
|
},
|
||||||
|
TypeSafe(non_update) => self.send_msg(TypeSafe(non_update)),
|
||||||
|
Dynamic(dyn_event) => {
|
||||||
|
if let EventKind::Update(s) = dyn_event.kind.clone() {
|
||||||
|
match tl {
|
||||||
|
Timeline(Public, _, _) if s.language_not(allowed_langs) => Ok(()),
|
||||||
|
_ if s.involves_any(&blocks) => Ok(()),
|
||||||
|
_ => self.send_msg(Dynamic(dyn_event)),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.send_msg(Dynamic(dyn_event))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ping => unreachable!(), // handled pings above
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_ping(&mut self) -> Result<(), ()> {
|
||||||
|
self.send_txt("{}")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_msg(&mut self, event: Event) -> Result<(), ()> {
|
||||||
|
self.send_txt(&event.to_json_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_txt(&mut self, txt: &str) -> Result<(), ()> {
|
||||||
|
let tl = self.subscription.timeline;
|
||||||
|
match self.ws_tx.clone().ok_or(())?.try_send(Message::text(txt)) {
|
||||||
|
Ok(_) => Ok(()),
|
||||||
|
Err(_) => {
|
||||||
|
self.unsubscribe_tx
|
||||||
|
.try_send(tl)
|
||||||
|
.unwrap_or_else(|e| log::error!("could not unsubscribe from channel: {}", e));
|
||||||
|
Err(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user