mirror of https://github.com/mastodon/flodgatt
commit
4b39009556
149
src/main.rs
149
src/main.rs
|
@ -2,155 +2,110 @@ mod auth;
|
|||
mod error;
|
||||
mod pubsub;
|
||||
mod query;
|
||||
mod utils;
|
||||
use futures::stream::Stream;
|
||||
use log::info;
|
||||
use pretty_env_logger;
|
||||
use warp::{path, Filter};
|
||||
|
||||
fn main() {
|
||||
pretty_env_logger::init();
|
||||
let base = path!("api" / "v1" / "streaming");
|
||||
|
||||
// GET /api/v1/streaming/user
|
||||
let user_timeline = base
|
||||
.and(path("user"))
|
||||
let user_timeline = path!("api" / "v1" / "streaming" / "user")
|
||||
.and(path::end())
|
||||
.and(auth::get_token())
|
||||
.and_then(auth::get_account_id_from_token)
|
||||
.map(|account_id: i64| {
|
||||
info!("GET /api/v1/streaming/user");
|
||||
pubsub::stream_from(account_id.to_string())
|
||||
});
|
||||
.map(|account_id: i64| pubsub::stream_from(account_id.to_string()));
|
||||
|
||||
// GET /api/v1/streaming/user/notification
|
||||
let user_timeline_notifications = base
|
||||
.and(path!("user" / "notification"))
|
||||
let user_timeline_notifications = path!("api" / "v1" / "streaming" / "user" / "notification")
|
||||
.and(path::end())
|
||||
.and(auth::get_token())
|
||||
.and_then(auth::get_account_id_from_token)
|
||||
.map(|account_id: i64| {
|
||||
let full_stream = pubsub::stream_from(account_id.to_string());
|
||||
// TODO: filter stream to just have notifications
|
||||
info!("GET /api/v1/streaming/user/notification");
|
||||
full_stream
|
||||
});
|
||||
|
||||
let public_timeline = base.and(path("public")).and(path::end()).map(|| {
|
||||
info!("GET /api/v1/streaming/public");
|
||||
pubsub::stream_from("public".to_string())
|
||||
});
|
||||
// GET /api/v1/streaming/public
|
||||
let public_timeline = path!("api" / "v1" / "streaming" / "public")
|
||||
.and(path::end())
|
||||
.map(|| pubsub::stream_from("public".to_string()));
|
||||
|
||||
// GET /api/v1/streaming/public?only_media=true
|
||||
let public_timeline_media = base
|
||||
.and(path("public"))
|
||||
.and(warp::query())
|
||||
let public_timeline_media = path!("api" / "v1" / "streaming" / "public")
|
||||
.and(path::end())
|
||||
.map(|q: query::Media| {
|
||||
info!("GET /api/v1/streaming/public?only_media=true");
|
||||
if q.only_media == "1" || q.only_media == "true" {
|
||||
pubsub::stream_from("public:media".to_string())
|
||||
} else {
|
||||
pubsub::stream_from("public".to_string())
|
||||
}
|
||||
.and(warp::query())
|
||||
.map(|q: query::Media| match q.only_media.as_ref() {
|
||||
"1" | "true" => pubsub::stream_from("public:media".to_string()),
|
||||
_ => pubsub::stream_from("public".to_string()),
|
||||
});
|
||||
|
||||
// GET /api/v1/streaming/public/local
|
||||
let local_timeline = base
|
||||
.and(path!("public" / "local"))
|
||||
let local_timeline = path!("api" / "v1" / "streaming" / "public" / "local")
|
||||
.and(path::end())
|
||||
.map(|| {
|
||||
info!("GET /api/v1/streaming/public/local");
|
||||
pubsub::stream_from("public:local".to_string())
|
||||
});
|
||||
.map(|| pubsub::stream_from("public:local".to_string()));
|
||||
|
||||
// GET /api/v1/streaming/public/local?only_media=true
|
||||
let local_timeline_media = base
|
||||
.and(path!("public" / "local"))
|
||||
let local_timeline_media = path!("api" / "v1" / "streaming" / "public" / "local")
|
||||
.and(warp::query())
|
||||
.and(path::end())
|
||||
.map(|q: query::Media| {
|
||||
info!("GET /api/v1/streaming/public/local?only_media=true");
|
||||
if q.only_media == "1" || q.only_media == "true" {
|
||||
pubsub::stream_from("public:local:media".to_string())
|
||||
} else {
|
||||
pubsub::stream_from("public:local".to_string())
|
||||
}
|
||||
.map(|q: query::Media| match q.only_media.as_ref() {
|
||||
"1" | "true" => pubsub::stream_from("public:local:media".to_string()),
|
||||
_ => pubsub::stream_from("public:local".to_string()),
|
||||
});
|
||||
|
||||
// GET /api/v1/streaming/direct
|
||||
let direct_timeline = base
|
||||
.and(path("direct"))
|
||||
let direct_timeline = path!("api" / "v1" / "streaming" / "direct")
|
||||
.and(path::end())
|
||||
.and(auth::get_token())
|
||||
.and_then(auth::get_account_id_from_token)
|
||||
.map(|account_id: i64| {
|
||||
info!("GET /api/v1/streaming/direct");
|
||||
pubsub::stream_from(format!("direct:{}", account_id))
|
||||
});
|
||||
.map(|account_id: i64| pubsub::stream_from(format!("direct:{}", account_id)));
|
||||
|
||||
// GET /api/v1/streaming/hashtag?tag=:hashtag
|
||||
let hashtag_timeline = base
|
||||
.and(path("hashtag"))
|
||||
let hashtag_timeline = path!("api" / "v1" / "streaming" / "hashtag")
|
||||
.and(warp::query())
|
||||
.and(path::end())
|
||||
.map(|q: query::Hashtag| {
|
||||
info!("GET /api/v1/streaming/hashtag?tag=:hashtag");
|
||||
pubsub::stream_from(format!("hashtag:{}", q.tag))
|
||||
});
|
||||
.map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}", q.tag)));
|
||||
|
||||
// GET /api/v1/streaming/hashtag/local?tag=:hashtag
|
||||
let hashtag_timeline_local = base
|
||||
.and(path!("hashtag" / "local"))
|
||||
let hashtag_timeline_local = path!("api" / "v1" / "streaming" / "hashtag" / "local")
|
||||
.and(warp::query())
|
||||
.and(path::end())
|
||||
.map(|q: query::Hashtag| {
|
||||
info!("GET /api/v1/streaming/hashtag/local?tag=:hashtag");
|
||||
pubsub::stream_from(format!("hashtag:{}:local", q.tag))
|
||||
});
|
||||
.map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}:local", q.tag)));
|
||||
|
||||
// GET /api/v1/streaming/list?list=:list_id
|
||||
let list_timeline = base
|
||||
.and(path("list"))
|
||||
let list_timeline = path!("api" / "v1" / "streaming" / "list")
|
||||
.and(warp::query())
|
||||
.and(path::end())
|
||||
.map(|q: query::List| {
|
||||
info!("GET /api/v1/streaming/list?list=:list_id");
|
||||
pubsub::stream_from(format!("list:{}", q.list))
|
||||
});
|
||||
.map(|q: query::List| pubsub::stream_from(format!("list:{}", q.list)));
|
||||
|
||||
let routes = user_timeline
|
||||
.or(user_timeline_notifications)
|
||||
.unify()
|
||||
.or(public_timeline_media)
|
||||
.unify()
|
||||
.or(public_timeline)
|
||||
.unify()
|
||||
.or(local_timeline_media)
|
||||
.unify()
|
||||
.or(local_timeline)
|
||||
.unify()
|
||||
.or(direct_timeline)
|
||||
.unify()
|
||||
.or(hashtag_timeline)
|
||||
.unify()
|
||||
.or(hashtag_timeline_local)
|
||||
.unify()
|
||||
.or(list_timeline)
|
||||
.unify()
|
||||
.and_then(|event_stream| event_stream)
|
||||
.and(warp::sse())
|
||||
.map(|event_stream: pubsub::Receiver, sse: warp::sse::Sse| {
|
||||
sse.reply(warp::sse::keep(
|
||||
event_stream.map(|item| {
|
||||
let payload = item["payload"].clone();
|
||||
let event = item["event"].clone();
|
||||
(warp::sse::event(event), warp::sse::data(payload))
|
||||
}),
|
||||
None,
|
||||
))
|
||||
})
|
||||
.recover(error::handle_errors);
|
||||
let routes = or!(
|
||||
user_timeline,
|
||||
user_timeline_notifications,
|
||||
public_timeline_media,
|
||||
public_timeline,
|
||||
local_timeline_media,
|
||||
local_timeline,
|
||||
direct_timeline,
|
||||
hashtag_timeline,
|
||||
hashtag_timeline_local,
|
||||
list_timeline
|
||||
)
|
||||
.and_then(|event_stream| event_stream)
|
||||
.and(warp::sse())
|
||||
.map(|event_stream: pubsub::Receiver, sse: warp::sse::Sse| {
|
||||
sse.reply(warp::sse::keep(
|
||||
event_stream.map(|item| {
|
||||
let payload = item["payload"].clone();
|
||||
let event = item["event"].clone();
|
||||
(warp::sse::event(event), warp::sse::data(payload))
|
||||
}),
|
||||
None,
|
||||
))
|
||||
})
|
||||
.recover(error::handle_errors);
|
||||
|
||||
info!("starting streaming api server");
|
||||
warp::serve(routes).run(([127, 0, 0, 1], 3030));
|
||||
}
|
||||
|
|
|
@ -61,6 +61,7 @@ fn send_subscribe_cmd(tx: WriteHalf<TcpStream>, channel: String) {
|
|||
tokio::spawn(sender.map_err(|e| eprintln!("{}", e)));
|
||||
}
|
||||
|
||||
/// Create a stream from a string.
|
||||
pub fn stream_from(
|
||||
timeline: String,
|
||||
) -> impl Future<Item = Receiver, Error = warp::reject::Rejection> {
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
/// Combines multiple routes with the same return type together with
|
||||
/// `or()` and `unify()`
|
||||
#[macro_export]
|
||||
macro_rules! or {
|
||||
($filter:expr, $($other_filter:expr),*) => {
|
||||
$filter$(.or($other_filter).unify())*
|
||||
};
|
||||
}
|
Loading…
Reference in New Issue