mirror of https://github.com/mastodon/flodgatt
Improve performance of EventStream
This commit changes the EventStream so no longer polls client WebSocket connections to see if it should clean up the connection. Instead, it cleans up the connection whenever it attempts to send a ping or a message through the connection and receives an error indicating that the client has disconnected. As a result, client connections aren't cleaned up quite as quickly, but overall sys CPU time should be dramatically improved.
This commit is contained in:
parent
ebd0da7fe5
commit
a385434882
|
@ -56,6 +56,14 @@ impl ClientAgent {
|
||||||
.unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e))
|
.unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn disconnect(&self) -> futures::future::FutureResult<bool, tokio::timer::Error> {
|
||||||
|
let mut receiver = self.lock_receiver();
|
||||||
|
receiver
|
||||||
|
.remove_subscription(&self.subscription)
|
||||||
|
.unwrap_or_else(|e| log::error!("Could not unsubscribe from: {}", e));
|
||||||
|
futures::future::ok(false)
|
||||||
|
}
|
||||||
|
|
||||||
fn lock_receiver(&self) -> MutexGuard<Receiver> {
|
fn lock_receiver(&self) -> MutexGuard<Receiver> {
|
||||||
match self.receiver.lock() {
|
match self.receiver.lock() {
|
||||||
Ok(inner) => inner,
|
Ok(inner) => inner,
|
||||||
|
|
|
@ -17,17 +17,17 @@ impl EventStream {
|
||||||
mut client_agent: ClientAgent,
|
mut client_agent: ClientAgent,
|
||||||
interval: Duration,
|
interval: Duration,
|
||||||
) -> impl Future<Item = (), Error = ()> {
|
) -> impl Future<Item = (), Error = ()> {
|
||||||
let (ws_tx, mut ws_rx) = ws.split();
|
let (transmit_to_ws, _receive_from_ws) = ws.split();
|
||||||
let timeline = client_agent.subscription.timeline;
|
let timeline = client_agent.subscription.timeline;
|
||||||
|
|
||||||
// Create a pipe
|
// Create a pipe
|
||||||
let (tx, rx) = futures::sync::mpsc::unbounded();
|
let (tx, rx) = futures::sync::mpsc::unbounded();
|
||||||
|
|
||||||
// Send one end of it to a different thread and tell that end to forward whatever it gets
|
// Send one end of it to a different thread and tell that end to forward whatever it gets
|
||||||
// on to the websocket client
|
// on to the WebSocket client
|
||||||
warp::spawn(
|
warp::spawn(
|
||||||
rx.map_err(|()| -> warp::Error { unreachable!() })
|
rx.map_err(|()| -> warp::Error { unreachable!() })
|
||||||
.forward(ws_tx)
|
.forward(transmit_to_ws)
|
||||||
.map(|_r| ())
|
.map(|_r| ())
|
||||||
.map_err(|e| match e.to_string().as_ref() {
|
.map_err(|e| match e.to_string().as_ref() {
|
||||||
"IO error: Broken pipe (os error 32)" => (), // just closed unix socket
|
"IO error: Broken pipe (os error 32)" => (), // just closed unix socket
|
||||||
|
@ -35,50 +35,42 @@ impl EventStream {
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Yield new events for as long as the client is still connected
|
|
||||||
let event_stream =
|
|
||||||
tokio::timer::Interval::new(Instant::now(), interval).take_while(move |_| {
|
|
||||||
match ws_rx.poll() {
|
|
||||||
Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true),
|
|
||||||
Ok(Async::Ready(None)) => {
|
|
||||||
log::info!("Client closed WebSocket connection for {:?}", timeline);
|
|
||||||
futures::future::ok(false)
|
|
||||||
}
|
|
||||||
Err(e) if e.to_string() == "IO error: Broken pipe (os error 32)" => {
|
|
||||||
// no err, just closed Unix socket
|
|
||||||
log::info!("Client closed WebSocket connection for {:?}", timeline);
|
|
||||||
futures::future::ok(false)
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
log::warn!("Error in {:?}: {}", timeline, e);
|
|
||||||
futures::future::ok(false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut last_ping_time = Instant::now();
|
let mut last_ping_time = Instant::now();
|
||||||
// Every time you get an event from that stream, send it through the pipe
|
tokio::timer::Interval::new(Instant::now(), interval)
|
||||||
event_stream
|
.take_while(move |_| {
|
||||||
.for_each(move |_instant| {
|
// Right now, we do not need to see if we have any messages _from_ the
|
||||||
|
// WebSocket connection because the API doesn't support clients sending
|
||||||
|
// commands via the WebSocket. However, if the [stream multiplexing API
|
||||||
|
// change](github.com/tootsuite/flodgatt/issues/121) is implemented, we'll
|
||||||
|
// need to receive messages from the client. If so, we'll need a
|
||||||
|
// `receive_from_ws.poll() call here (or later)`
|
||||||
|
|
||||||
match client_agent.poll() {
|
match client_agent.poll() {
|
||||||
Ok(Async::Ready(Some(msg))) => {
|
Ok(Async::Ready(Some(msg))) => {
|
||||||
tx.unbounded_send(Message::text(msg.to_json_string()))
|
match tx.unbounded_send(Message::text(msg.to_json_string())) {
|
||||||
.unwrap_or_else(|e| {
|
Ok(_) => futures::future::ok(true),
|
||||||
log::error!("Could not send message to WebSocket: {}", e)
|
Err(_) => client_agent.disconnect(),
|
||||||
});
|
}
|
||||||
|
}
|
||||||
|
Ok(Async::Ready(None)) => {
|
||||||
|
log::info!("WebSocket ClientAgent got Ready(None)");
|
||||||
|
futures::future::ok(true)
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(None)) => log::info!("WebSocket ClientAgent got Ready(None)"),
|
|
||||||
Ok(Async::NotReady) if last_ping_time.elapsed() > Duration::from_secs(30) => {
|
Ok(Async::NotReady) if last_ping_time.elapsed() > Duration::from_secs(30) => {
|
||||||
tx.unbounded_send(Message::text("{}")).unwrap_or_else(|e| {
|
|
||||||
log::error!("Could not send ping to WebSocket: {}", e)
|
|
||||||
});
|
|
||||||
last_ping_time = Instant::now();
|
last_ping_time = Instant::now();
|
||||||
|
match tx.unbounded_send(Message::text("{}")) {
|
||||||
|
Ok(_) => futures::future::ok(true),
|
||||||
|
Err(_) => client_agent.disconnect(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => futures::future::ok(true), // no new messages; nothing to do
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("{}\n Dropping WebSocket message and continuing.", e);
|
||||||
|
futures::future::ok(true)
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => (), // no new messages; nothing to do
|
|
||||||
Err(e) => log::error!("{}\n Dropping WebSocket message and continuing.", e),
|
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
})
|
})
|
||||||
|
.for_each(move |_instant| Ok(()))
|
||||||
.then(move |result| {
|
.then(move |result| {
|
||||||
log::info!("WebSocket connection for {:?} closed.", timeline);
|
log::info!("WebSocket connection for {:?} closed.", timeline);
|
||||||
result
|
result
|
||||||
|
|
|
@ -26,35 +26,7 @@ impl MsgQueue {
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct MessageQueues(pub HashMap<Uuid, MsgQueue>);
|
pub struct MessageQueues(pub HashMap<Uuid, MsgQueue>);
|
||||||
|
|
||||||
impl MessageQueues {
|
impl MessageQueues {}
|
||||||
pub fn calculate_timelines_to_add_or_drop(&mut self, timeline: Timeline) -> Vec<Change> {
|
|
||||||
let mut timelines_to_modify = Vec::new();
|
|
||||||
|
|
||||||
timelines_to_modify.push(Change {
|
|
||||||
timeline,
|
|
||||||
in_subscriber_number: 1,
|
|
||||||
});
|
|
||||||
|
|
||||||
// self.retain(|_id, msg_queue| {
|
|
||||||
// if msg_queue.last_polled_at.elapsed() < Duration::from_secs(30) {
|
|
||||||
// true
|
|
||||||
// } else {
|
|
||||||
// let timeline = &msg_queue.timeline;
|
|
||||||
// timelines_to_modify.push(Change {
|
|
||||||
// timeline: *timeline,
|
|
||||||
// in_subscriber_number: -1,
|
|
||||||
// });
|
|
||||||
// false
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
// TODO: reimplement ^^^^
|
|
||||||
timelines_to_modify
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub struct Change {
|
|
||||||
pub timeline: Timeline,
|
|
||||||
pub in_subscriber_number: i32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for MsgQueue {
|
impl fmt::Debug for MsgQueue {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
|
|
@ -52,10 +52,6 @@ impl Receiver {
|
||||||
|
|
||||||
/// Assigns the `Receiver` a new timeline to monitor and runs other
|
/// Assigns the `Receiver` a new timeline to monitor and runs other
|
||||||
/// first-time setup.
|
/// first-time setup.
|
||||||
///
|
|
||||||
/// Note: this method calls `subscribe_or_unsubscribe_as_needed`,
|
|
||||||
/// so Redis PubSub subscriptions are only updated when a new timeline
|
|
||||||
/// comes under management for the first time.
|
|
||||||
pub fn add_subscription(&mut self, subscription: &Subscription) -> Result<()> {
|
pub fn add_subscription(&mut self, subscription: &Subscription) -> Result<()> {
|
||||||
let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline);
|
let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline);
|
||||||
|
|
||||||
|
@ -63,7 +59,40 @@ impl Receiver {
|
||||||
self.redis_connection.update_cache(hashtag, id);
|
self.redis_connection.update_cache(hashtag, id);
|
||||||
};
|
};
|
||||||
self.msg_queues.insert(subscription.id, MsgQueue::new(tl));
|
self.msg_queues.insert(subscription.id, MsgQueue::new(tl));
|
||||||
self.subscribe_or_unsubscribe_as_needed(tl)?;
|
|
||||||
|
let number_of_subscriptions = self
|
||||||
|
.clients_per_timeline
|
||||||
|
.entry(tl)
|
||||||
|
.and_modify(|n| *n += 1)
|
||||||
|
.or_insert(1);
|
||||||
|
|
||||||
|
use RedisCmd::*;
|
||||||
|
if *number_of_subscriptions == 1 {
|
||||||
|
self.redis_connection.send_cmd(Subscribe, &tl)?
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_subscription(&mut self, subscription: &Subscription) -> Result<()> {
|
||||||
|
let tl = subscription.timeline;
|
||||||
|
self.msg_queues.remove(&subscription.id);
|
||||||
|
let number_of_subscriptions = self
|
||||||
|
.clients_per_timeline
|
||||||
|
.entry(tl)
|
||||||
|
.and_modify(|n| *n -= 1)
|
||||||
|
.or_insert_with(|| {
|
||||||
|
log::error!(
|
||||||
|
"Attempted to unsubscribe from a timeline to which you were not subscribed: {:?}",
|
||||||
|
tl
|
||||||
|
);
|
||||||
|
0
|
||||||
|
});
|
||||||
|
use RedisCmd::*;
|
||||||
|
if *number_of_subscriptions == 0 {
|
||||||
|
self.redis_connection.send_cmd(Unsubscribe, &tl)?
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,30 +164,30 @@ impl Receiver {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Drop any PubSub subscriptions that don't have active clients and check
|
// /// Drop any PubSub subscriptions that don't have active clients and check
|
||||||
/// that there's a subscription to the current one. If there isn't, then
|
// /// that there's a subscription to the current one. If there isn't, then
|
||||||
/// subscribe to it.
|
// /// subscribe to it.
|
||||||
fn subscribe_or_unsubscribe_as_needed(&mut self, tl: Timeline) -> Result<()> {
|
// fn subscribe_or_unsubscribe_as_needed(&mut self, tl: Timeline) -> Result<()> {
|
||||||
let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(tl);
|
// let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(tl);
|
||||||
|
|
||||||
// Record the lower number of clients subscribed to that channel
|
// // Record the lower number of clients subscribed to that channel
|
||||||
for change in timelines_to_modify {
|
// for change in timelines_to_modify {
|
||||||
let timeline = change.timeline;
|
// let timeline = change.timeline;
|
||||||
|
|
||||||
let count_of_subscribed_clients = self
|
// let count_of_subscribed_clients = self
|
||||||
.clients_per_timeline
|
// .clients_per_timeline
|
||||||
.entry(timeline)
|
// .entry(timeline)
|
||||||
.and_modify(|n| *n += change.in_subscriber_number)
|
// .and_modify(|n| *n += change.in_subscriber_number)
|
||||||
.or_insert_with(|| 1);
|
// .or_insert_with(|| 1);
|
||||||
|
|
||||||
// If no clients, unsubscribe from the channel
|
// // If no clients, unsubscribe from the channel
|
||||||
use RedisCmd::*;
|
// use RedisCmd::*;
|
||||||
if *count_of_subscribed_clients <= 0 {
|
// if *count_of_subscribed_clients <= 0 {
|
||||||
self.redis_connection.send_cmd(Unsubscribe, &timeline)?;
|
// self.redis_connection.send_cmd(Unsubscribe, &timeline)?;
|
||||||
} else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 {
|
// } else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 {
|
||||||
self.redis_connection.send_cmd(Subscribe, &timeline)?
|
// self.redis_connection.send_cmd(Subscribe, &timeline)?
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
Ok(())
|
// Ok(())
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue