diff --git a/src/main.rs b/src/main.rs index 5749e83..e209d34 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,155 +2,110 @@ mod auth; mod error; mod pubsub; mod query; +mod utils; use futures::stream::Stream; -use log::info; use pretty_env_logger; use warp::{path, Filter}; fn main() { pretty_env_logger::init(); - let base = path!("api" / "v1" / "streaming"); // GET /api/v1/streaming/user - let user_timeline = base - .and(path("user")) + let user_timeline = path!("api" / "v1" / "streaming" / "user") .and(path::end()) .and(auth::get_token()) .and_then(auth::get_account_id_from_token) - .map(|account_id: i64| { - info!("GET /api/v1/streaming/user"); - pubsub::stream_from(account_id.to_string()) - }); + .map(|account_id: i64| pubsub::stream_from(account_id.to_string())); // GET /api/v1/streaming/user/notification - let user_timeline_notifications = base - .and(path!("user" / "notification")) + let user_timeline_notifications = path!("api" / "v1" / "streaming" / "user" / "notification") .and(path::end()) .and(auth::get_token()) .and_then(auth::get_account_id_from_token) .map(|account_id: i64| { let full_stream = pubsub::stream_from(account_id.to_string()); // TODO: filter stream to just have notifications - info!("GET /api/v1/streaming/user/notification"); full_stream }); - let public_timeline = base.and(path("public")).and(path::end()).map(|| { - info!("GET /api/v1/streaming/public"); - pubsub::stream_from("public".to_string()) - }); + // GET /api/v1/streaming/public + let public_timeline = path!("api" / "v1" / "streaming" / "public") + .and(path::end()) + .map(|| pubsub::stream_from("public".to_string())); // GET /api/v1/streaming/public?only_media=true - let public_timeline_media = base - .and(path("public")) - .and(warp::query()) + let public_timeline_media = path!("api" / "v1" / "streaming" / "public") .and(path::end()) - .map(|q: query::Media| { - info!("GET /api/v1/streaming/public?only_media=true"); - if q.only_media == "1" || q.only_media == "true" { - pubsub::stream_from("public:media".to_string()) - } else { - pubsub::stream_from("public".to_string()) - } + .and(warp::query()) + .map(|q: query::Media| match q.only_media.as_ref() { + "1" | "true" => pubsub::stream_from("public:media".to_string()), + _ => pubsub::stream_from("public".to_string()), }); // GET /api/v1/streaming/public/local - let local_timeline = base - .and(path!("public" / "local")) + let local_timeline = path!("api" / "v1" / "streaming" / "public" / "local") .and(path::end()) - .map(|| { - info!("GET /api/v1/streaming/public/local"); - pubsub::stream_from("public:local".to_string()) - }); + .map(|| pubsub::stream_from("public:local".to_string())); // GET /api/v1/streaming/public/local?only_media=true - let local_timeline_media = base - .and(path!("public" / "local")) + let local_timeline_media = path!("api" / "v1" / "streaming" / "public" / "local") .and(warp::query()) .and(path::end()) - .map(|q: query::Media| { - info!("GET /api/v1/streaming/public/local?only_media=true"); - if q.only_media == "1" || q.only_media == "true" { - pubsub::stream_from("public:local:media".to_string()) - } else { - pubsub::stream_from("public:local".to_string()) - } + .map(|q: query::Media| match q.only_media.as_ref() { + "1" | "true" => pubsub::stream_from("public:local:media".to_string()), + _ => pubsub::stream_from("public:local".to_string()), }); // GET /api/v1/streaming/direct - let direct_timeline = base - .and(path("direct")) + let direct_timeline = path!("api" / "v1" / "streaming" / "direct") .and(path::end()) .and(auth::get_token()) .and_then(auth::get_account_id_from_token) - .map(|account_id: i64| { - info!("GET /api/v1/streaming/direct"); - pubsub::stream_from(format!("direct:{}", account_id)) - }); + .map(|account_id: i64| pubsub::stream_from(format!("direct:{}", account_id))); // GET /api/v1/streaming/hashtag?tag=:hashtag - let hashtag_timeline = base - .and(path("hashtag")) + let hashtag_timeline = path!("api" / "v1" / "streaming" / "hashtag") .and(warp::query()) .and(path::end()) - .map(|q: query::Hashtag| { - info!("GET /api/v1/streaming/hashtag?tag=:hashtag"); - pubsub::stream_from(format!("hashtag:{}", q.tag)) - }); + .map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}", q.tag))); // GET /api/v1/streaming/hashtag/local?tag=:hashtag - let hashtag_timeline_local = base - .and(path!("hashtag" / "local")) + let hashtag_timeline_local = path!("api" / "v1" / "streaming" / "hashtag" / "local") .and(warp::query()) .and(path::end()) - .map(|q: query::Hashtag| { - info!("GET /api/v1/streaming/hashtag/local?tag=:hashtag"); - pubsub::stream_from(format!("hashtag:{}:local", q.tag)) - }); + .map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}:local", q.tag))); // GET /api/v1/streaming/list?list=:list_id - let list_timeline = base - .and(path("list")) + let list_timeline = path!("api" / "v1" / "streaming" / "list") .and(warp::query()) .and(path::end()) - .map(|q: query::List| { - info!("GET /api/v1/streaming/list?list=:list_id"); - pubsub::stream_from(format!("list:{}", q.list)) - }); + .map(|q: query::List| pubsub::stream_from(format!("list:{}", q.list))); - let routes = user_timeline - .or(user_timeline_notifications) - .unify() - .or(public_timeline_media) - .unify() - .or(public_timeline) - .unify() - .or(local_timeline_media) - .unify() - .or(local_timeline) - .unify() - .or(direct_timeline) - .unify() - .or(hashtag_timeline) - .unify() - .or(hashtag_timeline_local) - .unify() - .or(list_timeline) - .unify() - .and_then(|event_stream| event_stream) - .and(warp::sse()) - .map(|event_stream: pubsub::Receiver, sse: warp::sse::Sse| { - sse.reply(warp::sse::keep( - event_stream.map(|item| { - let payload = item["payload"].clone(); - let event = item["event"].clone(); - (warp::sse::event(event), warp::sse::data(payload)) - }), - None, - )) - }) - .recover(error::handle_errors); + let routes = or!( + user_timeline, + user_timeline_notifications, + public_timeline_media, + public_timeline, + local_timeline_media, + local_timeline, + direct_timeline, + hashtag_timeline, + hashtag_timeline_local, + list_timeline + ) + .and_then(|event_stream| event_stream) + .and(warp::sse()) + .map(|event_stream: pubsub::Receiver, sse: warp::sse::Sse| { + sse.reply(warp::sse::keep( + event_stream.map(|item| { + let payload = item["payload"].clone(); + let event = item["event"].clone(); + (warp::sse::event(event), warp::sse::data(payload)) + }), + None, + )) + }) + .recover(error::handle_errors); - info!("starting streaming api server"); warp::serve(routes).run(([127, 0, 0, 1], 3030)); } diff --git a/src/pubsub.rs b/src/pubsub.rs index a818e26..c567a73 100644 --- a/src/pubsub.rs +++ b/src/pubsub.rs @@ -61,6 +61,7 @@ fn send_subscribe_cmd(tx: WriteHalf, channel: String) { tokio::spawn(sender.map_err(|e| eprintln!("{}", e))); } +/// Create a stream from a string. pub fn stream_from( timeline: String, ) -> impl Future { diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..8c59e43 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,8 @@ +/// Combines multiple routes with the same return type together with +/// `or()` and `unify()` +#[macro_export] +macro_rules! or { + ($filter:expr, $($other_filter:expr),*) => { + $filter$(.or($other_filter).unify())* + }; +}