From d347b8e2dc1722235e57f4e029028a7d6ec4d13b Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Sat, 28 Sep 2019 17:57:37 -0400 Subject: [PATCH] Fix CPU issue caused by blocking code (#53) * Fix blocking read * Update logging for long polling --- src/config.rs | 3 +++ src/redis_to_client_stream/client_agent.rs | 17 ++++++--------- src/redis_to_client_stream/receiver.rs | 25 +++++----------------- 3 files changed, 14 insertions(+), 31 deletions(-) diff --git a/src/config.rs b/src/config.rs index 7d4fc19..77c5bed 100644 --- a/src/config.rs +++ b/src/config.rs @@ -130,6 +130,9 @@ pub fn redis_addr() -> (net::TcpStream, net::TcpStream) { pubsub_connection .set_read_timeout(Some(time::Duration::from_millis(10))) .expect("Can set read timeout for Redis connection"); + pubsub_connection + .set_nonblocking(true) + .expect("set_nonblocking call failed"); let secondary_redis_connection = net::TcpStream::connect(&REDIS_ADDR.to_string()).expect("Can connect to Redis"); secondary_redis_connection diff --git a/src/redis_to_client_stream/client_agent.rs b/src/redis_to_client_stream/client_agent.rs index d1e32cf..fdf0404 100644 --- a/src/redis_to_client_stream/client_agent.rs +++ b/src/redis_to_client_stream/client_agent.rs @@ -18,9 +18,8 @@ use super::receiver::Receiver; use crate::parse_client_request::user::User; use futures::{Async, Poll}; -use log; use serde_json::{json, Value}; -use std::{sync, time}; +use std::sync; use tokio::io::Error; use uuid::Uuid; @@ -84,21 +83,17 @@ impl futures::stream::Stream for ClientAgent { /// replies with `Ok(NotReady)`. The `ClientAgent` bubles up any /// errors from the underlying data structures. fn poll(&mut self) -> Poll, Self::Error> { - let start_time = time::Instant::now(); + let start_time = std::time::Instant::now(); let result = { - let before_locking_receiver = time::Instant::now(); let mut receiver = self .receiver .lock() .expect("ClientAgent: No other thread panic"); - let before_configuring_receiver = time::Instant::now(); receiver.configure_for_polling(self.id, &self.target_timeline.clone()); - let before_polling_receiver = time::Instant::now(); - let result = receiver.poll(); - if start_time.elapsed() > time::Duration::from_millis(20) { - log::warn!("Polling TOTAL time: {:?}\n since poll function: {:?}\n since configuring: {:?}\n since locking: {:?}", start_time.elapsed(), before_polling_receiver.elapsed(), before_configuring_receiver.elapsed(), before_locking_receiver.elapsed()); - } - result + receiver.poll() + }; + if start_time.elapsed().as_millis() > 1 { + log::warn!("Polling the Receiver took: {:?}", start_time.elapsed()); }; match result { diff --git a/src/redis_to_client_stream/receiver.rs b/src/redis_to_client_stream/receiver.rs index 55bab4e..46e2b65 100644 --- a/src/redis_to_client_stream/receiver.rs +++ b/src/redis_to_client_stream/receiver.rs @@ -67,6 +67,7 @@ impl Receiver { /// that there's a subscription to the current one. If there isn't, then /// subscribe to it. fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: &str) { + let start_time = std::time::Instant::now(); let mut timelines_to_modify = Vec::new(); struct Change { timeline: String, @@ -111,21 +112,9 @@ impl Receiver { pubsub_cmd!("subscribe", self, change.timeline.clone()); } } - } - - fn log_number_of_msgs_in_queue(&self) { - let messages_waiting = self - .msg_queues - .get(&self.manager_id) - .expect("Guaranteed by match block") - .messages - .len(); - match messages_waiting { - number if number > 10 => { - log::error!("{} messages waiting in the queue", messages_waiting) - } - _ => log::info!("{} messages waiting in the queue", messages_waiting), - } + if start_time.elapsed().as_millis() > 1 { + log::warn!("Sending cmd to Redis took: {:?}", start_time.elapsed()); + }; } fn get_target_msg_queue(&mut self) -> collections::hash_map::Entry { @@ -152,7 +141,6 @@ impl futures::stream::Stream for Receiver { /// been polled lately. fn poll(&mut self) -> Poll, Self::Error> { let timeline = self.timeline.clone(); - if self.redis_polled_at.elapsed() > time::Duration::from_millis(*config::REDIS_POLL_INTERVAL) { @@ -171,10 +159,7 @@ impl futures::stream::Stream for Receiver { .messages .pop_front() { - Some(value) => { - self.log_number_of_msgs_in_queue(); - Ok(Async::Ready(Some(value))) - } + Some(value) => Ok(Async::Ready(Some(value))), _ => Ok(Async::NotReady), } }