Remove use of last_polled_time [WIP]

This commit stops removing subscriptions based on their last polled
time to test the impact of this change on CPU use.  This is a WIP
because it does not yet remove subscriptions in any other way, which
(if deployed in production) would cause a memory leak – memory use
would grow with each new subscription and would never be reduced as
clients end their subscriptions.
This commit is contained in:
Daniel Sockwell 2020-04-05 11:29:19 -04:00
parent d2e0a01baf
commit 63fb29320d
3 changed files with 25 additions and 30 deletions

View File

@ -34,6 +34,10 @@ regex = "1.3.2"
name = "parse_redis"
harness = false
[[bench]]
name = "bench_instant"
harness = false
[features]
default = [ "production" ]

View File

@ -4,7 +4,6 @@ use crate::parse_client_request::Timeline;
use std::{
collections::{HashMap, VecDeque},
fmt,
time::{Duration, Instant},
};
use uuid::Uuid;
@ -12,20 +11,16 @@ use uuid::Uuid;
pub struct MsgQueue {
pub timeline: Timeline,
pub messages: VecDeque<Event>,
last_polled_at: Instant,
}
impl MsgQueue {
pub fn new(timeline: Timeline) -> Self {
MsgQueue {
messages: VecDeque::new(),
last_polled_at: Instant::now(),
timeline,
}
}
pub fn update_polled_at_time(&mut self) {
self.last_polled_at = Instant::now();
}
}
#[derive(Debug)]
@ -39,18 +34,20 @@ impl MessageQueues {
timeline,
in_subscriber_number: 1,
});
self.retain(|_id, msg_queue| {
if msg_queue.last_polled_at.elapsed() < Duration::from_secs(30) {
true
} else {
let timeline = &msg_queue.timeline;
timelines_to_modify.push(Change {
timeline: *timeline,
in_subscriber_number: -1,
});
false
}
});
// self.retain(|_id, msg_queue| {
// if msg_queue.last_polled_at.elapsed() < Duration::from_secs(30) {
// true
// } else {
// let timeline = &msg_queue.timeline;
// timelines_to_modify.push(Change {
// timeline: *timeline,
// in_subscriber_number: -1,
// });
// false
// }
// });
// TODO: reimplement ^^^^
timelines_to_modify
}
}
@ -66,12 +63,9 @@ impl fmt::Debug for MsgQueue {
"\
MsgQueue {{
timeline: {:?},
messages: {:?},
last_polled_at: {:?} ago,
messages: {:?},
}}",
self.timeline,
self.messages,
self.last_polled_at.elapsed(),
self.timeline, self.messages,
)
}
}

View File

@ -92,13 +92,10 @@ impl Receiver {
// If the `msg_queue` being polled has any new messages, return the first (oldest) one
match self.msg_queues.get_mut(&id) {
Some(msg_q) => {
msg_q.update_polled_at_time();
match msg_q.messages.pop_front() {
Some(event) => Ok(Async::Ready(Some(event))),
None => Ok(Async::NotReady),
}
}
Some(msg_q) => match msg_q.messages.pop_front() {
Some(event) => Ok(Async::Ready(Some(event))),
None => Ok(Async::NotReady),
},
None => {
log::error!("Polled a MsgQueue that had not been set up. Setting it up now.");
self.msg_queues.insert(id, MsgQueue::new(timeline));