1
0
mirror of https://github.com/mastodon/flodgatt synced 2025-04-11 22:58:25 +02:00

Minor refactoring

Improved clarity, concision, and ergonomics of the codebase.
This commit is contained in:
Daniel Sockwell 2019-04-19 12:16:03 -04:00
parent 6746514f9a
commit ff957bd407
3 changed files with 61 additions and 97 deletions

View File

@ -2,141 +2,97 @@ mod auth;
mod error; mod error;
mod pubsub; mod pubsub;
mod query; mod query;
mod utils;
use futures::stream::Stream; use futures::stream::Stream;
use log::info;
use pretty_env_logger; use pretty_env_logger;
use warp::{path, Filter}; use warp::{path, Filter};
fn main() { fn main() {
pretty_env_logger::init(); pretty_env_logger::init();
let base = path!("api" / "v1" / "streaming");
// GET /api/v1/streaming/user // GET /api/v1/streaming/user
let user_timeline = base let user_timeline = path!("api" / "v1" / "streaming" / "user")
.and(path("user"))
.and(path::end()) .and(path::end())
.and(auth::get_token()) .and(auth::get_token())
.and_then(auth::get_account_id_from_token) .and_then(auth::get_account_id_from_token)
.map(|account_id: i64| { .map(|account_id: i64| pubsub::stream_from(account_id.to_string()));
info!("GET /api/v1/streaming/user");
pubsub::stream_from(account_id.to_string())
});
// GET /api/v1/streaming/user/notification // GET /api/v1/streaming/user/notification
let user_timeline_notifications = base let user_timeline_notifications = path!("api" / "v1" / "streaming" / "user" / "notification")
.and(path!("user" / "notification"))
.and(path::end()) .and(path::end())
.and(auth::get_token()) .and(auth::get_token())
.and_then(auth::get_account_id_from_token) .and_then(auth::get_account_id_from_token)
.map(|account_id: i64| { .map(|account_id: i64| {
let full_stream = pubsub::stream_from(account_id.to_string()); let full_stream = pubsub::stream_from(account_id.to_string());
// TODO: filter stream to just have notifications // TODO: filter stream to just have notifications
info!("GET /api/v1/streaming/user/notification");
full_stream full_stream
}); });
let public_timeline = base.and(path("public")).and(path::end()).map(|| { // GET /api/v1/streaming/public
info!("GET /api/v1/streaming/public"); let public_timeline = path!("api" / "v1" / "streaming" / "public")
pubsub::stream_from("public".to_string()) .and(path::end())
}); .map(|| pubsub::stream_from("public".to_string()));
// GET /api/v1/streaming/public?only_media=true // GET /api/v1/streaming/public?only_media=true
let public_timeline_media = base let public_timeline_media = path!("api" / "v1" / "streaming" / "public")
.and(path("public"))
.and(warp::query())
.and(path::end()) .and(path::end())
.map(|q: query::Media| { .and(warp::query())
info!("GET /api/v1/streaming/public?only_media=true"); .map(|q: query::Media| match q.only_media.as_ref() {
if q.only_media == "1" || q.only_media == "true" { "1" | "true" => pubsub::stream_from("public:media".to_string()),
pubsub::stream_from("public:media".to_string()) _ => pubsub::stream_from("public".to_string()),
} else {
pubsub::stream_from("public".to_string())
}
}); });
// GET /api/v1/streaming/public/local // GET /api/v1/streaming/public/local
let local_timeline = base let local_timeline = path!("api" / "v1" / "streaming" / "public" / "local")
.and(path!("public" / "local"))
.and(path::end()) .and(path::end())
.map(|| { .map(|| pubsub::stream_from("public:local".to_string()));
info!("GET /api/v1/streaming/public/local");
pubsub::stream_from("public:local".to_string())
});
// GET /api/v1/streaming/public/local?only_media=true // GET /api/v1/streaming/public/local?only_media=true
let local_timeline_media = base let local_timeline_media = path!("api" / "v1" / "streaming" / "public" / "local")
.and(path!("public" / "local"))
.and(warp::query()) .and(warp::query())
.and(path::end()) .and(path::end())
.map(|q: query::Media| { .map(|q: query::Media| match q.only_media.as_ref() {
info!("GET /api/v1/streaming/public/local?only_media=true"); "1" | "true" => pubsub::stream_from("public:local:media".to_string()),
if q.only_media == "1" || q.only_media == "true" { _ => pubsub::stream_from("public:local".to_string()),
pubsub::stream_from("public:local:media".to_string())
} else {
pubsub::stream_from("public:local".to_string())
}
}); });
// GET /api/v1/streaming/direct // GET /api/v1/streaming/direct
let direct_timeline = base let direct_timeline = path!("api" / "v1" / "streaming" / "direct")
.and(path("direct"))
.and(path::end()) .and(path::end())
.and(auth::get_token()) .and(auth::get_token())
.and_then(auth::get_account_id_from_token) .and_then(auth::get_account_id_from_token)
.map(|account_id: i64| { .map(|account_id: i64| pubsub::stream_from(format!("direct:{}", account_id)));
info!("GET /api/v1/streaming/direct");
pubsub::stream_from(format!("direct:{}", account_id))
});
// GET /api/v1/streaming/hashtag?tag=:hashtag // GET /api/v1/streaming/hashtag?tag=:hashtag
let hashtag_timeline = base let hashtag_timeline = path!("api" / "v1" / "streaming" / "hashtag")
.and(path("hashtag"))
.and(warp::query()) .and(warp::query())
.and(path::end()) .and(path::end())
.map(|q: query::Hashtag| { .map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}", q.tag)));
info!("GET /api/v1/streaming/hashtag?tag=:hashtag");
pubsub::stream_from(format!("hashtag:{}", q.tag))
});
// GET /api/v1/streaming/hashtag/local?tag=:hashtag // GET /api/v1/streaming/hashtag/local?tag=:hashtag
let hashtag_timeline_local = base let hashtag_timeline_local = path!("api" / "v1" / "streaming" / "hashtag" / "local")
.and(path!("hashtag" / "local"))
.and(warp::query()) .and(warp::query())
.and(path::end()) .and(path::end())
.map(|q: query::Hashtag| { .map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}:local", q.tag)));
info!("GET /api/v1/streaming/hashtag/local?tag=:hashtag");
pubsub::stream_from(format!("hashtag:{}:local", q.tag))
});
// GET /api/v1/streaming/list?list=:list_id // GET /api/v1/streaming/list?list=:list_id
let list_timeline = base let list_timeline = path!("api" / "v1" / "streaming" / "list")
.and(path("list"))
.and(warp::query()) .and(warp::query())
.and(path::end()) .and(path::end())
.map(|q: query::List| { .map(|q: query::List| pubsub::stream_from(format!("list:{}", q.list)));
info!("GET /api/v1/streaming/list?list=:list_id");
pubsub::stream_from(format!("list:{}", q.list))
});
let routes = user_timeline let routes = or!(
.or(user_timeline_notifications) user_timeline,
.unify() user_timeline_notifications,
.or(public_timeline_media) public_timeline_media,
.unify() public_timeline,
.or(public_timeline) local_timeline_media,
.unify() local_timeline,
.or(local_timeline_media) direct_timeline,
.unify() hashtag_timeline,
.or(local_timeline) hashtag_timeline_local,
.unify() list_timeline
.or(direct_timeline) )
.unify()
.or(hashtag_timeline)
.unify()
.or(hashtag_timeline_local)
.unify()
.or(list_timeline)
.unify()
.and_then(|event_stream| event_stream) .and_then(|event_stream| event_stream)
.and(warp::sse()) .and(warp::sse())
.map(|event_stream: pubsub::Receiver, sse: warp::sse::Sse| { .map(|event_stream: pubsub::Receiver, sse: warp::sse::Sse| {
@ -151,6 +107,5 @@ fn main() {
}) })
.recover(error::handle_errors); .recover(error::handle_errors);
info!("starting streaming api server");
warp::serve(routes).run(([127, 0, 0, 1], 3030)); warp::serve(routes).run(([127, 0, 0, 1], 3030));
} }

View File

@ -61,6 +61,7 @@ fn send_subscribe_cmd(tx: WriteHalf<TcpStream>, channel: String) {
tokio::spawn(sender.map_err(|e| eprintln!("{}", e))); tokio::spawn(sender.map_err(|e| eprintln!("{}", e)));
} }
/// Create a stream from a string.
pub fn stream_from( pub fn stream_from(
timeline: String, timeline: String,
) -> impl Future<Item = Receiver, Error = warp::reject::Rejection> { ) -> impl Future<Item = Receiver, Error = warp::reject::Rejection> {

8
src/utils.rs Normal file
View File

@ -0,0 +1,8 @@
/// Combines multiple routes with the same return type together with
/// `or()` and `unify()`
#[macro_export]
macro_rules! or {
($filter:expr, $($other_filter:expr),*) => {
$filter$(.or($other_filter).unify())*
};
}