diff --git a/Cargo.lock b/Cargo.lock index fda536c..df45e16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -302,7 +302,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.3.0" +version = "0.3.1" dependencies = [ "dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index c5b0ba1..96a46ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.3.0" +version = "0.3.1" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/main.rs b/src/main.rs index 79e73ba..8bd7605 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,7 +33,7 @@ fn main() { // WebSocket let websocket_routes = ws::extract_user_or_reject() .and(warp::ws::ws2()) - .and_then(move |user: user::User, ws: Ws2| { + .map(move |user: user::User, ws: Ws2| { let token = user.access_token.clone(); // Create a new ClientAgent let mut client_agent = client_agent_ws.clone_with_shared_receiver(); @@ -41,12 +41,13 @@ fn main() { client_agent.init_for_user(user); // send the updates through the WS connection (along with the User's access_token // which is sent for security) - Ok::<_, warp::Rejection>(( + + ( ws.on_upgrade(move |socket| { redis_to_client_stream::send_updates_to_ws(socket, client_agent) }), token, - )) + ) }) .map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token)); diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index b74c372..29160d6 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -44,29 +44,41 @@ pub fn send_updates_to_ws( warp::spawn( rx.map_err(|()| -> warp::Error { unreachable!() }) .forward(ws_tx) - .map_err(|_| ()) - .map(|_r| ()), + .map(|_r| ()) + .map_err(|e| eprintln!("websocket send error: {}", e)), ); - // For as long as the client is still connected, yeild a new event every 100 ms + // Yield new events for as long as the client is still connected let event_stream = tokio::timer::Interval::new( time::Instant::now(), time::Duration::from_millis(*config::WS_UPDATE_INTERVAL), ) .take_while(move |_| match ws_rx.poll() { - Ok(Async::Ready(None)) => futures::future::ok(false), - _ => futures::future::ok(true), + Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true), + Ok(Async::Ready(None)) => { + // TODO: consider whether we should manually drop closed connections here + log::info!("Client closed WebSocket connection"); + futures::future::ok(false) + } + Err(e) => { + log::warn!("{}", e); + futures::future::ok(false) + } }); // Every time you get an event from that stream, send it through the pipe event_stream - .for_each(move |_json_value| { + .for_each(move |_instant| { if let Ok(Async::Ready(Some(json_value))) = stream.poll() { let msg = warp::ws::Message::text(json_value.to_string()); tx.unbounded_send(msg).expect("No send error"); }; Ok(()) }) - .then(|msg| msg) - .map_err(|e| log::error!("{}", e)) + .then(move |result| { + // TODO: consider whether we should manually drop closed connections here + log::info!("WebSocket connection closed."); + result + }) + .map_err(move |e| log::error!("{}", e)) }