flodgatt/src/main.rs

98 lines
4.4 KiB
Rust
Raw Normal View History

use log::{log_enabled, Level};
2019-07-06 02:08:50 +02:00
use ragequit::{
config,
2019-07-08 13:31:42 +02:00
parse_client_request::{sse, user, ws},
redis_to_client_stream,
redis_to_client_stream::ClientAgent,
2019-07-06 02:08:50 +02:00
};
use warp::{ws::Ws2, Filter as WarpFilter};
2019-02-19 20:29:32 +01:00
fn main() {
2019-07-06 02:08:50 +02:00
config::logging_and_env();
2019-07-08 13:31:42 +02:00
let client_agent_sse = ClientAgent::blank();
let client_agent_ws = client_agent_sse.clone_with_shared_receiver();
if log_enabled!(Level::Warn) {
println!("Streaming server initialized and ready to accept connections");
};
2019-07-06 02:08:50 +02:00
// Server Sent Events
2019-07-08 13:31:42 +02:00
//
// For SSE, the API requires users to use different endpoints, so we first filter based on
// the endpoint. Using that endpoint determine the `timeline` the user is requesting,
// the scope for that `timeline`, and authenticate the `User` if they provided a token.
let sse_routes = sse::filter_incomming_request()
.and(warp::sse())
.map(
move |timeline: String, user: user::User, sse_connection_to_client: warp::sse::Sse| {
// Create a new ClientAgent
let mut client_agent = client_agent_sse.clone_with_shared_receiver();
// Assign that agent to generate a stream of updates for the user/timeline pair
client_agent.init_for_user(&timeline, user);
// send the updates through the SSE connection
redis_to_client_stream::send_updates_to_sse(client_agent, sse_connection_to_client)
},
)
.with(warp::reply::with::header("Connection", "keep-alive"))
.recover(config::handle_errors);
2019-07-06 02:08:50 +02:00
// WebSocket
2019-07-08 13:31:42 +02:00
//
// For WS, the API specifies a single endpoint, so we extract the User/timeline pair
// directy from the query
let websocket_routes = ws::extract_user_and_query()
.and_then(move |mut user: user::User, q: ws::Query, ws: Ws2| {
let token = user.access_token.clone();
2019-07-06 02:08:50 +02:00
let read_scope = user.scopes.clone();
2019-07-08 13:31:42 +02:00
2019-07-06 02:08:50 +02:00
let timeline = match q.stream.as_ref() {
// Public endpoints:
tl @ "public" | tl @ "public:local" if q.media => format!("{}:media", tl),
tl @ "public:media" | tl @ "public:local:media" => tl.to_string(),
tl @ "public" | tl @ "public:local" => tl.to_string(),
// Hashtag endpoints:
tl @ "hashtag" | tl @ "hashtag:local" => format!("{}:{}", tl, q.hashtag),
// Private endpoints: User
"user" if user.logged_in && (read_scope.all || read_scope.statuses) => {
format!("{}", user.id)
}
"user:notification" if user.logged_in && (read_scope.all || read_scope.notify) => {
2019-07-08 13:31:42 +02:00
user = user.set_filter(user::Filter::Notification);
2019-07-06 02:08:50 +02:00
format!("{}", user.id)
}
// List endpoint:
"list" if user.owns_list(q.list) && (read_scope.all || read_scope.lists) => {
format!("list:{}", q.list)
}
// Direct endpoint:
"direct" if user.logged_in && (read_scope.all || read_scope.statuses) => {
"direct".to_string()
}
// Reject unathorized access attempts for private endpoints
"user" | "user:notification" | "direct" | "list" => {
return Err(warp::reject::custom("Error: Invalid Access Token"))
}
// Other endpoints don't exist:
_ => return Err(warp::reject::custom("Error: Nonexistent WebSocket query")),
};
2019-07-08 13:31:42 +02:00
// Create a new ClientAgent
let mut client_agent = client_agent_ws.clone_with_shared_receiver();
// Assign that agent to generate a stream of updates for the user/timeline pair
client_agent.init_for_user(&timeline, user);
// send the updates through the WS connection (along with the User's access_token
// which is sent for security)
2019-07-06 02:08:50 +02:00
Ok((
2019-07-08 13:31:42 +02:00
ws.on_upgrade(move |socket| {
redis_to_client_stream::send_updates_to_ws(socket, client_agent)
}),
2019-07-06 02:08:50 +02:00
token,
))
})
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
2019-07-06 02:08:50 +02:00
let cors = config::cross_origin_resource_sharing();
warp::serve(websocket_routes.or(sse_routes).with(cors)).run(*config::SERVER_ADDR);
2019-02-11 09:45:14 +01:00
}