mirror of https://github.com/mastodon/flodgatt
Fix CPU issue caused by blocking code (#53)
* Fix blocking read * Update logging for long polling
This commit is contained in:
parent
fc7feb6694
commit
d347b8e2dc
|
@ -130,6 +130,9 @@ pub fn redis_addr() -> (net::TcpStream, net::TcpStream) {
|
||||||
pubsub_connection
|
pubsub_connection
|
||||||
.set_read_timeout(Some(time::Duration::from_millis(10)))
|
.set_read_timeout(Some(time::Duration::from_millis(10)))
|
||||||
.expect("Can set read timeout for Redis connection");
|
.expect("Can set read timeout for Redis connection");
|
||||||
|
pubsub_connection
|
||||||
|
.set_nonblocking(true)
|
||||||
|
.expect("set_nonblocking call failed");
|
||||||
let secondary_redis_connection =
|
let secondary_redis_connection =
|
||||||
net::TcpStream::connect(&REDIS_ADDR.to_string()).expect("Can connect to Redis");
|
net::TcpStream::connect(&REDIS_ADDR.to_string()).expect("Can connect to Redis");
|
||||||
secondary_redis_connection
|
secondary_redis_connection
|
||||||
|
|
|
@ -18,9 +18,8 @@
|
||||||
use super::receiver::Receiver;
|
use super::receiver::Receiver;
|
||||||
use crate::parse_client_request::user::User;
|
use crate::parse_client_request::user::User;
|
||||||
use futures::{Async, Poll};
|
use futures::{Async, Poll};
|
||||||
use log;
|
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
use std::{sync, time};
|
use std::sync;
|
||||||
use tokio::io::Error;
|
use tokio::io::Error;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
@ -84,21 +83,17 @@ impl futures::stream::Stream for ClientAgent {
|
||||||
/// replies with `Ok(NotReady)`. The `ClientAgent` bubles up any
|
/// replies with `Ok(NotReady)`. The `ClientAgent` bubles up any
|
||||||
/// errors from the underlying data structures.
|
/// errors from the underlying data structures.
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
let start_time = time::Instant::now();
|
let start_time = std::time::Instant::now();
|
||||||
let result = {
|
let result = {
|
||||||
let before_locking_receiver = time::Instant::now();
|
|
||||||
let mut receiver = self
|
let mut receiver = self
|
||||||
.receiver
|
.receiver
|
||||||
.lock()
|
.lock()
|
||||||
.expect("ClientAgent: No other thread panic");
|
.expect("ClientAgent: No other thread panic");
|
||||||
let before_configuring_receiver = time::Instant::now();
|
|
||||||
receiver.configure_for_polling(self.id, &self.target_timeline.clone());
|
receiver.configure_for_polling(self.id, &self.target_timeline.clone());
|
||||||
let before_polling_receiver = time::Instant::now();
|
receiver.poll()
|
||||||
let result = receiver.poll();
|
};
|
||||||
if start_time.elapsed() > time::Duration::from_millis(20) {
|
if start_time.elapsed().as_millis() > 1 {
|
||||||
log::warn!("Polling TOTAL time: {:?}\n since poll function: {:?}\n since configuring: {:?}\n since locking: {:?}", start_time.elapsed(), before_polling_receiver.elapsed(), before_configuring_receiver.elapsed(), before_locking_receiver.elapsed());
|
log::warn!("Polling the Receiver took: {:?}", start_time.elapsed());
|
||||||
}
|
|
||||||
result
|
|
||||||
};
|
};
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
|
|
|
@ -67,6 +67,7 @@ impl Receiver {
|
||||||
/// that there's a subscription to the current one. If there isn't, then
|
/// that there's a subscription to the current one. If there isn't, then
|
||||||
/// subscribe to it.
|
/// subscribe to it.
|
||||||
fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: &str) {
|
fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: &str) {
|
||||||
|
let start_time = std::time::Instant::now();
|
||||||
let mut timelines_to_modify = Vec::new();
|
let mut timelines_to_modify = Vec::new();
|
||||||
struct Change {
|
struct Change {
|
||||||
timeline: String,
|
timeline: String,
|
||||||
|
@ -111,21 +112,9 @@ impl Receiver {
|
||||||
pubsub_cmd!("subscribe", self, change.timeline.clone());
|
pubsub_cmd!("subscribe", self, change.timeline.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
if start_time.elapsed().as_millis() > 1 {
|
||||||
|
log::warn!("Sending cmd to Redis took: {:?}", start_time.elapsed());
|
||||||
fn log_number_of_msgs_in_queue(&self) {
|
};
|
||||||
let messages_waiting = self
|
|
||||||
.msg_queues
|
|
||||||
.get(&self.manager_id)
|
|
||||||
.expect("Guaranteed by match block")
|
|
||||||
.messages
|
|
||||||
.len();
|
|
||||||
match messages_waiting {
|
|
||||||
number if number > 10 => {
|
|
||||||
log::error!("{} messages waiting in the queue", messages_waiting)
|
|
||||||
}
|
|
||||||
_ => log::info!("{} messages waiting in the queue", messages_waiting),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_target_msg_queue(&mut self) -> collections::hash_map::Entry<Uuid, MsgQueue> {
|
fn get_target_msg_queue(&mut self) -> collections::hash_map::Entry<Uuid, MsgQueue> {
|
||||||
|
@ -152,7 +141,6 @@ impl futures::stream::Stream for Receiver {
|
||||||
/// been polled lately.
|
/// been polled lately.
|
||||||
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
|
||||||
let timeline = self.timeline.clone();
|
let timeline = self.timeline.clone();
|
||||||
|
|
||||||
if self.redis_polled_at.elapsed()
|
if self.redis_polled_at.elapsed()
|
||||||
> time::Duration::from_millis(*config::REDIS_POLL_INTERVAL)
|
> time::Duration::from_millis(*config::REDIS_POLL_INTERVAL)
|
||||||
{
|
{
|
||||||
|
@ -171,10 +159,7 @@ impl futures::stream::Stream for Receiver {
|
||||||
.messages
|
.messages
|
||||||
.pop_front()
|
.pop_front()
|
||||||
{
|
{
|
||||||
Some(value) => {
|
Some(value) => Ok(Async::Ready(Some(value))),
|
||||||
self.log_number_of_msgs_in_queue();
|
|
||||||
Ok(Async::Ready(Some(value)))
|
|
||||||
}
|
|
||||||
_ => Ok(Async::NotReady),
|
_ => Ok(Async::NotReady),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue