mirror of https://github.com/mastodon/flodgatt
Solve SendErrors (#47)
This commit solves the SendErrors that were triggered by attempting to use a WebSocket connection after it had been closed by the client
This commit is contained in:
parent
11163237bc
commit
0dec8c4124
|
@ -302,7 +302,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flodgatt"
|
name = "flodgatt"
|
||||||
version = "0.3.0"
|
version = "0.3.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "flodgatt"
|
name = "flodgatt"
|
||||||
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
|
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
|
||||||
version = "0.3.0"
|
version = "0.3.1"
|
||||||
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
|
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ fn main() {
|
||||||
// WebSocket
|
// WebSocket
|
||||||
let websocket_routes = ws::extract_user_or_reject()
|
let websocket_routes = ws::extract_user_or_reject()
|
||||||
.and(warp::ws::ws2())
|
.and(warp::ws::ws2())
|
||||||
.and_then(move |user: user::User, ws: Ws2| {
|
.map(move |user: user::User, ws: Ws2| {
|
||||||
let token = user.access_token.clone();
|
let token = user.access_token.clone();
|
||||||
// Create a new ClientAgent
|
// Create a new ClientAgent
|
||||||
let mut client_agent = client_agent_ws.clone_with_shared_receiver();
|
let mut client_agent = client_agent_ws.clone_with_shared_receiver();
|
||||||
|
@ -41,12 +41,13 @@ fn main() {
|
||||||
client_agent.init_for_user(user);
|
client_agent.init_for_user(user);
|
||||||
// send the updates through the WS connection (along with the User's access_token
|
// send the updates through the WS connection (along with the User's access_token
|
||||||
// which is sent for security)
|
// which is sent for security)
|
||||||
Ok::<_, warp::Rejection>((
|
|
||||||
|
(
|
||||||
ws.on_upgrade(move |socket| {
|
ws.on_upgrade(move |socket| {
|
||||||
redis_to_client_stream::send_updates_to_ws(socket, client_agent)
|
redis_to_client_stream::send_updates_to_ws(socket, client_agent)
|
||||||
}),
|
}),
|
||||||
token,
|
token,
|
||||||
))
|
)
|
||||||
})
|
})
|
||||||
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
|
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
|
||||||
|
|
||||||
|
|
|
@ -44,29 +44,41 @@ pub fn send_updates_to_ws(
|
||||||
warp::spawn(
|
warp::spawn(
|
||||||
rx.map_err(|()| -> warp::Error { unreachable!() })
|
rx.map_err(|()| -> warp::Error { unreachable!() })
|
||||||
.forward(ws_tx)
|
.forward(ws_tx)
|
||||||
.map_err(|_| ())
|
.map(|_r| ())
|
||||||
.map(|_r| ()),
|
.map_err(|e| eprintln!("websocket send error: {}", e)),
|
||||||
);
|
);
|
||||||
|
|
||||||
// For as long as the client is still connected, yeild a new event every 100 ms
|
// Yield new events for as long as the client is still connected
|
||||||
let event_stream = tokio::timer::Interval::new(
|
let event_stream = tokio::timer::Interval::new(
|
||||||
time::Instant::now(),
|
time::Instant::now(),
|
||||||
time::Duration::from_millis(*config::WS_UPDATE_INTERVAL),
|
time::Duration::from_millis(*config::WS_UPDATE_INTERVAL),
|
||||||
)
|
)
|
||||||
.take_while(move |_| match ws_rx.poll() {
|
.take_while(move |_| match ws_rx.poll() {
|
||||||
Ok(Async::Ready(None)) => futures::future::ok(false),
|
Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true),
|
||||||
_ => futures::future::ok(true),
|
Ok(Async::Ready(None)) => {
|
||||||
|
// TODO: consider whether we should manually drop closed connections here
|
||||||
|
log::info!("Client closed WebSocket connection");
|
||||||
|
futures::future::ok(false)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("{}", e);
|
||||||
|
futures::future::ok(false)
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Every time you get an event from that stream, send it through the pipe
|
// Every time you get an event from that stream, send it through the pipe
|
||||||
event_stream
|
event_stream
|
||||||
.for_each(move |_json_value| {
|
.for_each(move |_instant| {
|
||||||
if let Ok(Async::Ready(Some(json_value))) = stream.poll() {
|
if let Ok(Async::Ready(Some(json_value))) = stream.poll() {
|
||||||
let msg = warp::ws::Message::text(json_value.to_string());
|
let msg = warp::ws::Message::text(json_value.to_string());
|
||||||
tx.unbounded_send(msg).expect("No send error");
|
tx.unbounded_send(msg).expect("No send error");
|
||||||
};
|
};
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.then(|msg| msg)
|
.then(move |result| {
|
||||||
.map_err(|e| log::error!("{}", e))
|
// TODO: consider whether we should manually drop closed connections here
|
||||||
|
log::info!("WebSocket connection closed.");
|
||||||
|
result
|
||||||
|
})
|
||||||
|
.map_err(move |e| log::error!("{}", e))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue