use flodgatt::config; use flodgatt::request::{Handler, Subscription}; use flodgatt::response::{RedisManager, SseStream, WsStream}; use flodgatt::Error; use futures::future::lazy; use futures::stream::Stream as _; use std::fs; use std::net::SocketAddr; use std::os::unix::fs::PermissionsExt; use std::time::Instant; use tokio::net::UnixListener; use tokio::sync::mpsc; use tokio::timer::Interval; use warp::ws::Ws2; use warp::Filter; fn main() -> Result<(), Error> { config::merge_dotenv()?; pretty_env_logger::try_init_timed()?; let (postgres_cfg, redis_cfg, cfg) = config::from_env(dotenv::vars().collect())?; let poll_freq = *redis_cfg.polling_interval; let request = Handler::new(&postgres_cfg, *cfg.whitelist_mode)?; let shared_manager = RedisManager::try_from(&redis_cfg)?.into_arc(); // Server Sent Events let sse_manager = shared_manager.clone(); let sse = request .sse_subscription() .and(warp::sse()) .map(move |subscription: Subscription, sse: warp::sse::Sse| { log::info!("Incoming SSE request for {:?}", subscription.timeline); let mut manager = sse_manager.lock().unwrap_or_else(RedisManager::recover); let (event_tx, event_rx) = mpsc::channel(10); manager.subscribe(&subscription, event_tx); let sse_stream = SseStream::new(subscription); sse_stream.send_events(sse, event_rx) }) .with(warp::reply::with::header("Connection", "keep-alive")); // WebSocket let ws_manager = shared_manager.clone(); let ws = request .ws_subscription() .and(warp::ws::ws2()) .map(move |subscription: Subscription, ws: Ws2| { log::info!("Incoming websocket request for {:?}", subscription.timeline); let mut manager = ws_manager.lock().unwrap_or_else(RedisManager::recover); let (event_tx, event_rx) = mpsc::channel(10); manager.subscribe(&subscription, event_tx); let token = subscription.access_token.clone().unwrap_or_default(); // token sent for security let ws_stream = WsStream::new(subscription); ( ws.on_upgrade(move |ws| ws_stream.send_to(ws, event_rx)), token, ) }) .map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token)); #[cfg(feature = "stub_status")] #[rustfmt::skip] let status = { let (r1, r2, r3) = (shared_manager.clone(), shared_manager.clone(), shared_manager.clone()); request.health().map(|| "OK") .or(request.status() .map(move || r1.lock().unwrap_or_else(RedisManager::recover).count())) .or(request.status_backpresure() .map(move || r2.lock().unwrap_or_else(RedisManager::recover).backpresure())) .or(request.status_per_timeline() .map(move || r3.lock().unwrap_or_else(RedisManager::recover).list())) }; #[cfg(not(feature = "stub_status"))] let status = request.health().map(|| "OK"); let cors = warp::cors() .allow_any_origin() .allow_methods(cfg.cors.allowed_methods) .allow_headers(cfg.cors.allowed_headers); let streaming_server = move || { let manager = shared_manager.clone(); let stream = Interval::new(Instant::now(), poll_freq) .map_err(|e| log::error!("{}", e)) .for_each(move |_| { match manager .lock() .unwrap_or_else(RedisManager::recover) .send_msgs() { Err(e) => Ok(log::error!("{}", e)), Ok(_) => Ok(()), } }); warp::spawn(lazy(move || stream)); warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err)) }; if let Some(socket) = &*cfg.unix_socket { log::info!("Using Unix socket {}", socket); fs::remove_file(socket).unwrap_or_default(); let incoming = UnixListener::bind(socket)?.incoming(); fs::set_permissions(socket, PermissionsExt::from_mode(0o666))?; tokio::run(lazy(|| streaming_server().serve_incoming(incoming))); } else { let server_addr = SocketAddr::new(*cfg.address, *cfg.port); tokio::run(lazy(move || streaming_server().bind(server_addr))); } Err(Error::Unrecoverable) // only reached if poll_broadcast encounters an unrecoverable error }