2019-09-09 19:06:24 +02:00
|
|
|
use flodgatt::{
|
2019-09-05 03:48:29 +02:00
|
|
|
config,
|
2019-07-08 13:31:42 +02:00
|
|
|
parse_client_request::{sse, user, ws},
|
|
|
|
redis_to_client_stream,
|
|
|
|
redis_to_client_stream::ClientAgent,
|
2019-07-06 02:08:50 +02:00
|
|
|
};
|
2019-09-09 20:23:48 +02:00
|
|
|
use log::warn;
|
2019-07-06 02:08:50 +02:00
|
|
|
use warp::{ws::Ws2, Filter as WarpFilter};
|
2019-02-19 20:29:32 +01:00
|
|
|
|
2019-04-15 20:22:44 +02:00
|
|
|
fn main() {
|
2019-07-06 02:08:50 +02:00
|
|
|
config::logging_and_env();
|
2019-07-08 13:31:42 +02:00
|
|
|
let client_agent_sse = ClientAgent::blank();
|
|
|
|
let client_agent_ws = client_agent_sse.clone_with_shared_receiver();
|
2019-05-01 00:41:13 +02:00
|
|
|
|
2019-09-09 20:23:48 +02:00
|
|
|
warn!("Streaming server initialized and ready to accept connections");
|
2019-07-10 04:20:11 +02:00
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
// Server Sent Events
|
2019-09-09 19:06:24 +02:00
|
|
|
let sse_routes = sse::extract_user_or_reject()
|
2019-09-05 03:48:29 +02:00
|
|
|
.and(warp::sse())
|
|
|
|
.map(
|
2019-09-09 19:06:24 +02:00
|
|
|
move |user: user::User, sse_connection_to_client: warp::sse::Sse| {
|
2019-09-05 03:48:29 +02:00
|
|
|
// Create a new ClientAgent
|
|
|
|
let mut client_agent = client_agent_sse.clone_with_shared_receiver();
|
2019-09-09 19:06:24 +02:00
|
|
|
// Assign ClientAgent to generate stream of updates for the user/timeline pair
|
|
|
|
client_agent.init_for_user(user);
|
2019-09-05 03:48:29 +02:00
|
|
|
// send the updates through the SSE connection
|
|
|
|
redis_to_client_stream::send_updates_to_sse(client_agent, sse_connection_to_client)
|
|
|
|
},
|
|
|
|
)
|
|
|
|
.with(warp::reply::with::header("Connection", "keep-alive"))
|
|
|
|
.recover(config::handle_errors);
|
2019-04-28 23:28:57 +02:00
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
// WebSocket
|
2019-09-09 19:06:24 +02:00
|
|
|
let websocket_routes = ws::extract_user_or_reject()
|
|
|
|
.and(warp::ws::ws2())
|
|
|
|
.and_then(move |user: user::User, ws: Ws2| {
|
2019-07-08 13:31:42 +02:00
|
|
|
let token = user.access_token.clone();
|
|
|
|
// Create a new ClientAgent
|
|
|
|
let mut client_agent = client_agent_ws.clone_with_shared_receiver();
|
|
|
|
// Assign that agent to generate a stream of updates for the user/timeline pair
|
2019-09-09 19:06:24 +02:00
|
|
|
client_agent.init_for_user(user);
|
2019-07-08 13:31:42 +02:00
|
|
|
// send the updates through the WS connection (along with the User's access_token
|
|
|
|
// which is sent for security)
|
2019-09-09 19:06:24 +02:00
|
|
|
Ok::<_, warp::Rejection>((
|
2019-07-08 13:31:42 +02:00
|
|
|
ws.on_upgrade(move |socket| {
|
|
|
|
redis_to_client_stream::send_updates_to_ws(socket, client_agent)
|
|
|
|
}),
|
2019-07-06 02:08:50 +02:00
|
|
|
token,
|
|
|
|
))
|
|
|
|
})
|
2019-07-04 16:57:15 +02:00
|
|
|
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
|
2019-05-09 05:02:01 +02:00
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
let cors = config::cross_origin_resource_sharing();
|
|
|
|
|
2019-07-08 21:21:02 +02:00
|
|
|
warp::serve(websocket_routes.or(sse_routes).with(cors)).run(*config::SERVER_ADDR);
|
2019-02-11 09:45:14 +01:00
|
|
|
}
|