2019-07-10 04:20:11 +02:00
|
|
|
use log::{log_enabled, Level};
|
2019-07-06 02:08:50 +02:00
|
|
|
use ragequit::{
|
2019-07-08 13:31:42 +02:00
|
|
|
any_of, config,
|
|
|
|
parse_client_request::{sse, user, ws},
|
|
|
|
redis_to_client_stream,
|
|
|
|
redis_to_client_stream::ClientAgent,
|
2019-07-06 02:08:50 +02:00
|
|
|
};
|
2019-07-08 13:31:42 +02:00
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
use warp::{ws::Ws2, Filter as WarpFilter};
|
2019-02-19 20:29:32 +01:00
|
|
|
|
2019-04-15 20:22:44 +02:00
|
|
|
fn main() {
|
2019-07-06 02:08:50 +02:00
|
|
|
config::logging_and_env();
|
2019-07-08 13:31:42 +02:00
|
|
|
let client_agent_sse = ClientAgent::blank();
|
|
|
|
let client_agent_ws = client_agent_sse.clone_with_shared_receiver();
|
2019-05-01 00:41:13 +02:00
|
|
|
|
2019-07-10 04:20:11 +02:00
|
|
|
if log_enabled!(Level::Warn) {
|
|
|
|
println!("Streaming server initialized and ready to accept connections");
|
|
|
|
};
|
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
// Server Sent Events
|
2019-07-08 13:31:42 +02:00
|
|
|
//
|
|
|
|
// For SSE, the API requires users to use different endpoints, so we first filter based on
|
|
|
|
// the endpoint. Using that endpoint determine the `timeline` the user is requesting,
|
|
|
|
// the scope for that `timeline`, and authenticate the `User` if they provided a token.
|
2019-07-06 02:08:50 +02:00
|
|
|
let sse_routes = any_of!(
|
2019-05-09 17:52:05 +02:00
|
|
|
// GET /api/v1/streaming/user/notification [private; notification filter]
|
2019-07-08 13:31:42 +02:00
|
|
|
sse::Request::user_notifications(),
|
2019-05-09 17:52:05 +02:00
|
|
|
// GET /api/v1/streaming/user [private; language filter]
|
2019-07-08 13:31:42 +02:00
|
|
|
sse::Request::user(),
|
2019-05-09 17:52:05 +02:00
|
|
|
// GET /api/v1/streaming/public/local?only_media=true [public; language filter]
|
2019-07-08 13:31:42 +02:00
|
|
|
sse::Request::public_local_media(),
|
2019-05-09 17:52:05 +02:00
|
|
|
// GET /api/v1/streaming/public?only_media=true [public; language filter]
|
2019-07-08 13:31:42 +02:00
|
|
|
sse::Request::public_media(),
|
2019-05-09 17:52:05 +02:00
|
|
|
// GET /api/v1/streaming/public/local [public; language filter]
|
2019-07-08 13:31:42 +02:00
|
|
|
sse::Request::public_local(),
|
2019-05-09 17:52:05 +02:00
|
|
|
// GET /api/v1/streaming/public [public; language filter]
|
2019-07-08 13:31:42 +02:00
|
|
|
sse::Request::public(),
|
2019-05-09 17:52:05 +02:00
|
|
|
// GET /api/v1/streaming/direct [private; *no* filter]
|
2019-07-08 13:31:42 +02:00
|
|
|
sse::Request::direct(),
|
2019-05-09 17:52:05 +02:00
|
|
|
// GET /api/v1/streaming/hashtag?tag=:hashtag [public; no filter]
|
2019-07-08 13:31:42 +02:00
|
|
|
sse::Request::hashtag(),
|
2019-05-09 17:52:05 +02:00
|
|
|
// GET /api/v1/streaming/hashtag/local?tag=:hashtag [public; no filter]
|
2019-07-08 13:31:42 +02:00
|
|
|
sse::Request::hashtag_local(),
|
2019-05-09 17:52:05 +02:00
|
|
|
// GET /api/v1/streaming/list?list=:list_id [private; no filter]
|
2019-07-08 13:31:42 +02:00
|
|
|
sse::Request::list()
|
2019-05-09 17:52:05 +02:00
|
|
|
)
|
|
|
|
.untuple_one()
|
|
|
|
.and(warp::sse())
|
2019-07-08 13:31:42 +02:00
|
|
|
.map(
|
|
|
|
move |timeline: String, user: user::User, sse_connection_to_client: warp::sse::Sse| {
|
|
|
|
// Create a new ClientAgent
|
|
|
|
let mut client_agent = client_agent_sse.clone_with_shared_receiver();
|
|
|
|
// Assign that agent to generate a stream of updates for the user/timeline pair
|
|
|
|
client_agent.init_for_user(&timeline, user);
|
|
|
|
// send the updates through the SSE connection
|
|
|
|
redis_to_client_stream::send_updates_to_sse(client_agent, sse_connection_to_client)
|
|
|
|
},
|
|
|
|
)
|
2019-05-09 17:52:05 +02:00
|
|
|
.with(warp::reply::with::header("Connection", "keep-alive"))
|
2019-07-08 13:31:42 +02:00
|
|
|
.recover(config::handle_errors);
|
2019-04-28 23:28:57 +02:00
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
// WebSocket
|
2019-07-08 13:31:42 +02:00
|
|
|
//
|
|
|
|
// For WS, the API specifies a single endpoint, so we extract the User/timeline pair
|
|
|
|
// directy from the query
|
|
|
|
let websocket_routes = ws::extract_user_and_query()
|
|
|
|
.and_then(move |mut user: user::User, q: ws::Query, ws: Ws2| {
|
|
|
|
let token = user.access_token.clone();
|
2019-07-06 02:08:50 +02:00
|
|
|
let read_scope = user.scopes.clone();
|
2019-07-08 13:31:42 +02:00
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
let timeline = match q.stream.as_ref() {
|
|
|
|
// Public endpoints:
|
|
|
|
tl @ "public" | tl @ "public:local" if q.media => format!("{}:media", tl),
|
|
|
|
tl @ "public:media" | tl @ "public:local:media" => tl.to_string(),
|
|
|
|
tl @ "public" | tl @ "public:local" => tl.to_string(),
|
|
|
|
// Hashtag endpoints:
|
|
|
|
tl @ "hashtag" | tl @ "hashtag:local" => format!("{}:{}", tl, q.hashtag),
|
|
|
|
// Private endpoints: User
|
|
|
|
"user" if user.logged_in && (read_scope.all || read_scope.statuses) => {
|
|
|
|
format!("{}", user.id)
|
|
|
|
}
|
|
|
|
"user:notification" if user.logged_in && (read_scope.all || read_scope.notify) => {
|
2019-07-08 13:31:42 +02:00
|
|
|
user = user.set_filter(user::Filter::Notification);
|
2019-07-06 02:08:50 +02:00
|
|
|
format!("{}", user.id)
|
|
|
|
}
|
|
|
|
// List endpoint:
|
|
|
|
"list" if user.owns_list(q.list) && (read_scope.all || read_scope.lists) => {
|
|
|
|
format!("list:{}", q.list)
|
|
|
|
}
|
|
|
|
// Direct endpoint:
|
|
|
|
"direct" if user.logged_in && (read_scope.all || read_scope.statuses) => {
|
|
|
|
"direct".to_string()
|
|
|
|
}
|
|
|
|
// Reject unathorized access attempts for private endpoints
|
|
|
|
"user" | "user:notification" | "direct" | "list" => {
|
|
|
|
return Err(warp::reject::custom("Error: Invalid Access Token"))
|
|
|
|
}
|
|
|
|
// Other endpoints don't exist:
|
|
|
|
_ => return Err(warp::reject::custom("Error: Nonexistent WebSocket query")),
|
|
|
|
};
|
2019-05-09 05:02:01 +02:00
|
|
|
|
2019-07-08 13:31:42 +02:00
|
|
|
// Create a new ClientAgent
|
|
|
|
let mut client_agent = client_agent_ws.clone_with_shared_receiver();
|
|
|
|
// Assign that agent to generate a stream of updates for the user/timeline pair
|
|
|
|
client_agent.init_for_user(&timeline, user);
|
|
|
|
// send the updates through the WS connection (along with the User's access_token
|
|
|
|
// which is sent for security)
|
2019-07-06 02:08:50 +02:00
|
|
|
Ok((
|
2019-07-08 13:31:42 +02:00
|
|
|
ws.on_upgrade(move |socket| {
|
|
|
|
redis_to_client_stream::send_updates_to_ws(socket, client_agent)
|
|
|
|
}),
|
2019-07-06 02:08:50 +02:00
|
|
|
token,
|
|
|
|
))
|
|
|
|
})
|
2019-07-04 16:57:15 +02:00
|
|
|
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
|
2019-05-09 05:02:01 +02:00
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
let cors = config::cross_origin_resource_sharing();
|
|
|
|
|
2019-07-08 21:21:02 +02:00
|
|
|
warp::serve(websocket_routes.or(sse_routes).with(cors)).run(*config::SERVER_ADDR);
|
2019-02-11 09:45:14 +01:00
|
|
|
}
|