2019-05-01 00:41:13 +02:00
|
|
|
//! Streaming server for Mastodon
|
|
|
|
//!
|
|
|
|
//!
|
|
|
|
//! This server provides live, streaming updates for Mastodon clients. Specifically, when a server
|
|
|
|
//! is running this sever, Mastodon clients can use either Server Sent Events or WebSockets to
|
|
|
|
//! connect to the server with the API described [in the public API
|
|
|
|
//! documentation](https://docs.joinmastodon.org/api/streaming/)
|
|
|
|
//!
|
|
|
|
//! # Notes on data flow
|
|
|
|
//! * **Client Request → Warp**:
|
2019-05-09 17:52:05 +02:00
|
|
|
//! Warp filters for valid requests and parses request data. Based on that data, it generates a `User`
|
|
|
|
//! representing the client that made the request. The `User` is authenticated, if appropriate. Warp
|
|
|
|
//! repeatedly polls the StreamManager for information relevant to the User.
|
2019-05-01 00:41:13 +02:00
|
|
|
//!
|
|
|
|
//! * **Warp → StreamManager**:
|
2019-05-09 17:52:05 +02:00
|
|
|
//! A new `StreamManager` is created for each request. The `StreamManager` exists to manage concurrent
|
|
|
|
//! access to the (single) `Receiver`, which it can access behind an `Arc<Mutex>`. The `StreamManager`
|
|
|
|
//! polles the `Receiver` for any updates relvant to the current client. If there are updates, the
|
|
|
|
//! `StreamManager` filters them with the client's filters and passes any matching updates up to Warp.
|
|
|
|
//! The `StreamManager` is also responsible for sending `subscribe` commands to Redis (via the
|
|
|
|
//! `Receiver`) when necessary.
|
2019-05-01 00:41:13 +02:00
|
|
|
//!
|
|
|
|
//! * **StreamManger → Receiver**:
|
|
|
|
//! The Receiver receives data from Redis and stores it in a series of queues (one for each
|
|
|
|
//! StreamManager). When (asynchronously) polled by the StreamManager, it sends back the messages
|
|
|
|
//! relevant to that StreamManager and removes them from the queue.
|
|
|
|
|
|
|
|
pub mod error;
|
|
|
|
pub mod query;
|
|
|
|
pub mod receiver;
|
2019-05-10 07:47:29 +02:00
|
|
|
pub mod redis_cmd;
|
2019-05-01 00:41:13 +02:00
|
|
|
pub mod stream;
|
|
|
|
pub mod timeline;
|
|
|
|
pub mod user;
|
2019-05-09 17:52:05 +02:00
|
|
|
pub mod ws;
|
2019-05-10 12:22:26 +02:00
|
|
|
use dotenv::dotenv;
|
2019-04-15 20:22:44 +02:00
|
|
|
use futures::stream::Stream;
|
2019-05-10 07:47:29 +02:00
|
|
|
use futures::Async;
|
2019-04-30 15:44:51 +02:00
|
|
|
use receiver::Receiver;
|
2019-05-10 12:22:26 +02:00
|
|
|
use std::env;
|
|
|
|
use std::net::SocketAddr;
|
2019-04-28 23:28:57 +02:00
|
|
|
use stream::StreamManager;
|
2019-07-04 15:33:50 +02:00
|
|
|
use user::{Method, Scope, User};
|
2019-05-09 17:52:05 +02:00
|
|
|
use warp::path;
|
2019-05-01 00:41:13 +02:00
|
|
|
use warp::Filter as WarpFilter;
|
2019-02-19 20:29:32 +01:00
|
|
|
|
2019-04-15 20:22:44 +02:00
|
|
|
fn main() {
|
2019-04-18 16:10:01 +02:00
|
|
|
pretty_env_logger::init();
|
2019-05-10 12:22:26 +02:00
|
|
|
dotenv().ok();
|
2019-02-11 18:58:51 +01:00
|
|
|
|
2019-05-09 17:52:05 +02:00
|
|
|
let redis_updates = StreamManager::new(Receiver::new());
|
2019-05-10 07:47:29 +02:00
|
|
|
let redis_updates_sse = redis_updates.blank_copy();
|
|
|
|
let redis_updates_ws = redis_updates.blank_copy();
|
2019-05-01 00:41:13 +02:00
|
|
|
|
2019-05-09 17:52:05 +02:00
|
|
|
let routes = any_of!(
|
|
|
|
// GET /api/v1/streaming/user/notification [private; notification filter]
|
|
|
|
timeline::user_notifications(),
|
|
|
|
// GET /api/v1/streaming/user [private; language filter]
|
|
|
|
timeline::user(),
|
|
|
|
// GET /api/v1/streaming/public/local?only_media=true [public; language filter]
|
|
|
|
timeline::public_local_media(),
|
|
|
|
// GET /api/v1/streaming/public?only_media=true [public; language filter]
|
|
|
|
timeline::public_media(),
|
|
|
|
// GET /api/v1/streaming/public/local [public; language filter]
|
|
|
|
timeline::public_local(),
|
|
|
|
// GET /api/v1/streaming/public [public; language filter]
|
|
|
|
timeline::public(),
|
|
|
|
// GET /api/v1/streaming/direct [private; *no* filter]
|
|
|
|
timeline::direct(),
|
|
|
|
// GET /api/v1/streaming/hashtag?tag=:hashtag [public; no filter]
|
|
|
|
timeline::hashtag(),
|
|
|
|
// GET /api/v1/streaming/hashtag/local?tag=:hashtag [public; no filter]
|
|
|
|
timeline::hashtag_local(),
|
|
|
|
// GET /api/v1/streaming/list?list=:list_id [private; no filter]
|
|
|
|
timeline::list()
|
|
|
|
)
|
|
|
|
.untuple_one()
|
|
|
|
.and(warp::sse())
|
2019-05-10 07:47:29 +02:00
|
|
|
.map(move |timeline: String, user: User, sse: warp::sse::Sse| {
|
|
|
|
let mut redis_stream = redis_updates_sse.configure_copy(&timeline, user);
|
|
|
|
let event_stream = tokio::timer::Interval::new(
|
|
|
|
std::time::Instant::now(),
|
|
|
|
std::time::Duration::from_millis(100),
|
|
|
|
)
|
|
|
|
.filter_map(move |_| match redis_stream.poll() {
|
|
|
|
Ok(Async::Ready(Some(json_value))) => Some((
|
|
|
|
warp::sse::event(json_value["event"].clone().to_string()),
|
|
|
|
warp::sse::data(json_value["payload"].clone()),
|
|
|
|
)),
|
|
|
|
_ => None,
|
|
|
|
});
|
|
|
|
sse.reply(warp::sse::keep(event_stream, None))
|
|
|
|
})
|
2019-05-09 17:52:05 +02:00
|
|
|
.with(warp::reply::with::header("Connection", "keep-alive"))
|
|
|
|
.recover(error::handle_errors);
|
2019-04-28 23:28:57 +02:00
|
|
|
|
2019-05-10 07:47:29 +02:00
|
|
|
//let redis_updates_ws = StreamManager::new(Receiver::new());
|
2019-05-09 05:02:01 +02:00
|
|
|
let websocket = path!("api" / "v1" / "streaming")
|
2019-07-04 15:33:50 +02:00
|
|
|
.and(Scope::Public.get_access_token(Method::WS))
|
2019-05-09 05:02:01 +02:00
|
|
|
.and_then(|token| User::from_access_token(token, Scope::Public))
|
|
|
|
.and(warp::query())
|
|
|
|
.and(query::Media::to_filter())
|
|
|
|
.and(query::Hashtag::to_filter())
|
|
|
|
.and(query::List::to_filter())
|
|
|
|
.and(warp::ws2())
|
|
|
|
.and_then(
|
2019-05-10 07:47:29 +02:00
|
|
|
move |mut user: User,
|
|
|
|
q: query::Stream,
|
|
|
|
m: query::Media,
|
|
|
|
h: query::Hashtag,
|
|
|
|
l: query::List,
|
|
|
|
ws: warp::ws::Ws2| {
|
2019-05-09 05:02:01 +02:00
|
|
|
let unauthorized = Err(warp::reject::custom("Error: Invalid Access Token"));
|
|
|
|
let timeline = match q.stream.as_ref() {
|
|
|
|
// Public endpoints:
|
|
|
|
tl @ "public" | tl @ "public:local" if m.is_truthy() => format!("{}:media", tl),
|
2019-05-09 17:52:05 +02:00
|
|
|
tl @ "public:media" | tl @ "public:local:media" => tl.to_string(),
|
|
|
|
tl @ "public" | tl @ "public:local" => tl.to_string(),
|
2019-05-09 05:02:01 +02:00
|
|
|
// User
|
|
|
|
"user" if user.id == -1 => return unauthorized,
|
|
|
|
"user" => format!("{}", user.id),
|
|
|
|
"user:notification" => {
|
|
|
|
user = user.with_notification_filter();
|
|
|
|
format!("{}", user.id)
|
2019-04-28 23:28:57 +02:00
|
|
|
}
|
2019-05-09 05:02:01 +02:00
|
|
|
// Hashtag endpoints:
|
|
|
|
// TODO: handle missing query
|
|
|
|
tl @ "hashtag" | tl @ "hashtag:local" => format!("{}:{}", tl, h.tag),
|
|
|
|
// List endpoint:
|
|
|
|
// TODO: handle missing query
|
|
|
|
"list" if user.authorized_for_list(l.list).is_err() => return unauthorized,
|
|
|
|
"list" => format!("list:{}", l.list),
|
|
|
|
// Direct endpoint:
|
|
|
|
"direct" if user.id == -1 => return unauthorized,
|
2019-05-09 17:52:05 +02:00
|
|
|
"direct" => "direct".to_string(),
|
2019-05-09 05:02:01 +02:00
|
|
|
// Other endpoints don't exist:
|
|
|
|
_ => return Err(warp::reject::custom("Error: Nonexistent WebSocket query")),
|
|
|
|
};
|
2019-05-10 07:47:29 +02:00
|
|
|
let stream = redis_updates_ws.configure_copy(&timeline, user);
|
2019-05-09 05:02:01 +02:00
|
|
|
|
2019-05-10 07:47:29 +02:00
|
|
|
Ok(ws.on_upgrade(move |socket| ws::send_replies(socket, stream)))
|
2019-05-09 05:02:01 +02:00
|
|
|
},
|
2019-07-04 15:33:50 +02:00
|
|
|
)
|
|
|
|
.map(|reply| {
|
|
|
|
warp::reply::with_header(
|
|
|
|
reply,
|
|
|
|
"sec-websocket-protocol",
|
|
|
|
"LhbVOxKckgqyMg3nDLaEu5vgqY6Yzc9Pk1w8_yKQwS8",
|
|
|
|
)
|
|
|
|
});
|
2019-05-09 05:02:01 +02:00
|
|
|
|
2019-05-10 12:22:26 +02:00
|
|
|
let address: SocketAddr = env::var("SERVER_ADDR")
|
|
|
|
.unwrap_or("127.0.0.1:4000".to_owned())
|
|
|
|
.parse()
|
|
|
|
.expect("static string");
|
|
|
|
warp::serve(websocket.or(routes)).run(address);
|
2019-02-11 09:45:14 +01:00
|
|
|
}
|