flodgatt/src/redis_to_client_stream/receiver.rs

187 lines
6.8 KiB
Rust
Raw Normal View History

2019-07-08 13:31:42 +02:00
//! Receives data from Redis, sorts it by `ClientAgent`, and stores it until
//! polled by the correct `ClientAgent`. Also manages sububscriptions and
//! unsubscriptions to/from Redis.
2019-10-04 00:02:23 +02:00
use super::{config, redis_cmd, redis_stream, redis_stream::RedisConn};
use crate::pubsub_cmd;
use futures::{Async, Poll};
use serde_json::Value;
use std::{collections, net, time};
use tokio::io::Error;
2019-05-10 07:47:29 +02:00
use uuid::Uuid;
/// The item that streams from Redis and is polled by the `ClientAgent`
#[derive(Debug)]
pub struct Receiver {
pub pubsub_connection: net::TcpStream,
2019-07-06 02:08:50 +02:00
secondary_redis_connection: net::TcpStream,
pub redis_namespace: Option<String>,
2019-10-04 00:02:23 +02:00
redis_poll_interval: time::Duration,
2019-07-08 13:31:42 +02:00
redis_polled_at: time::Instant,
timeline: String,
manager_id: Uuid,
pub msg_queues: collections::HashMap<Uuid, MsgQueue>,
2019-07-06 02:08:50 +02:00
clients_per_timeline: collections::HashMap<String, i32>,
pub incoming_raw_msg: String,
2019-05-09 17:52:05 +02:00
}
2019-07-06 02:08:50 +02:00
impl Receiver {
2019-07-06 02:08:50 +02:00
/// Create a new `Receiver`, with its own Redis connections (but, as yet, no
/// active subscriptions).
2019-10-04 00:02:23 +02:00
pub fn new(redis_cfg: config::RedisConfig) -> Self {
let RedisConn {
primary: pubsub_connection,
secondary: secondary_redis_connection,
namespace: redis_namespace,
2019-10-04 00:02:23 +02:00
polling_interval: redis_poll_interval,
} = RedisConn::new(redis_cfg);
Self {
2019-05-10 07:47:29 +02:00
pubsub_connection,
secondary_redis_connection,
redis_namespace,
2019-10-04 00:02:23 +02:00
redis_poll_interval,
2019-07-08 13:31:42 +02:00
redis_polled_at: time::Instant::now(),
timeline: String::new(),
2019-07-06 02:08:50 +02:00
manager_id: Uuid::default(),
msg_queues: collections::HashMap::new(),
clients_per_timeline: collections::HashMap::new(),
/// The unprocessed message from Redis, consisting of 0 or more
/// actual `messages` in the sense of updates to send.
incoming_raw_msg: String::new(),
}
}
2019-05-10 07:47:29 +02:00
2019-07-06 02:08:50 +02:00
/// Assigns the `Receiver` a new timeline to monitor and runs other
/// first-time setup.
///
2019-07-08 13:31:42 +02:00
/// Note: this method calls `subscribe_or_unsubscribe_as_needed`,
2019-07-06 02:08:50 +02:00
/// so Redis PubSub subscriptions are only updated when a new timeline
/// comes under management for the first time.
pub fn manage_new_timeline(&mut self, manager_id: Uuid, timeline: &str) {
self.manager_id = manager_id;
2019-07-08 13:31:42 +02:00
self.timeline = timeline.to_string();
self.msg_queues
2019-07-06 02:08:50 +02:00
.insert(self.manager_id, MsgQueue::new(timeline));
self.subscribe_or_unsubscribe_as_needed(timeline);
}
2019-05-10 07:47:29 +02:00
/// Set the `Receiver`'s manager_id and target_timeline fields to the appropriate
2019-07-06 02:08:50 +02:00
/// value to be polled by the current `StreamManager`.
pub fn configure_for_polling(&mut self, manager_id: Uuid, timeline: &str) {
self.manager_id = manager_id;
2019-07-08 13:31:42 +02:00
self.timeline = timeline.to_string();
2019-05-10 07:47:29 +02:00
}
2019-07-06 02:08:50 +02:00
/// Drop any PubSub subscriptions that don't have active clients and check
/// that there's a subscription to the current one. If there isn't, then
/// subscribe to it.
2019-07-08 13:31:42 +02:00
fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: &str) {
let start_time = std::time::Instant::now();
2019-07-06 02:08:50 +02:00
let mut timelines_to_modify = Vec::new();
2019-07-08 13:31:42 +02:00
struct Change {
timeline: String,
in_subscriber_number: i32,
2019-07-08 13:31:42 +02:00
}
timelines_to_modify.push(Change {
timeline: timeline.to_owned(),
in_subscriber_number: 1,
2019-07-08 13:31:42 +02:00
});
2019-05-10 07:47:29 +02:00
// Keep only message queues that have been polled recently
self.msg_queues.retain(|_id, msg_queue| {
2019-07-06 02:08:50 +02:00
if msg_queue.last_polled_at.elapsed() < time::Duration::from_secs(30) {
true
2019-05-10 07:47:29 +02:00
} else {
2019-07-08 13:31:42 +02:00
let timeline = &msg_queue.redis_channel;
timelines_to_modify.push(Change {
timeline: timeline.to_owned(),
in_subscriber_number: -1,
2019-07-08 13:31:42 +02:00
});
2019-05-10 07:47:29 +02:00
false
}
});
2019-05-10 07:47:29 +02:00
// Record the lower number of clients subscribed to that channel
2019-07-08 13:31:42 +02:00
for change in timelines_to_modify {
let count_of_subscribed_clients = self
2019-05-10 07:47:29 +02:00
.clients_per_timeline
2019-07-08 13:31:42 +02:00
.entry(change.timeline.clone())
.and_modify(|n| *n += change.in_subscriber_number)
.or_insert_with(|| 1);
2019-05-10 07:47:29 +02:00
// If no clients, unsubscribe from the channel
if *count_of_subscribed_clients <= 0 {
2019-07-08 13:31:42 +02:00
pubsub_cmd!("unsubscribe", self, change.timeline.clone());
} else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 {
2019-07-08 13:31:42 +02:00
pubsub_cmd!("subscribe", self, change.timeline.clone());
}
}
if start_time.elapsed().as_millis() > 1 {
log::warn!("Sending cmd to Redis took: {:?}", start_time.elapsed());
};
2019-07-06 02:08:50 +02:00
}
fn get_target_msg_queue(&mut self) -> collections::hash_map::Entry<Uuid, MsgQueue> {
self.msg_queues.entry(self.manager_id)
}
2019-07-06 02:08:50 +02:00
}
2019-07-08 13:31:42 +02:00
/// The stream that the ClientAgent polls to learn about new messages.
2019-07-06 02:08:50 +02:00
impl futures::stream::Stream for Receiver {
type Item = Value;
type Error = Error;
2019-07-08 13:31:42 +02:00
/// Returns the oldest message in the `ClientAgent`'s queue (if any).
///
/// Note: This method does **not** poll Redis every time, because polling
/// Redis is signifiantly more time consuming that simply returning the
/// message already in a queue. Thus, we only poll Redis if it has not
/// been polled lately.
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
2019-07-08 13:31:42 +02:00
let timeline = self.timeline.clone();
2019-10-04 00:02:23 +02:00
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
redis_stream::AsyncReadableStream::poll_redis(self);
2019-07-08 13:31:42 +02:00
self.redis_polled_at = time::Instant::now();
}
2019-05-10 07:47:29 +02:00
// Record current time as last polled time
self.get_target_msg_queue()
2019-07-06 02:08:50 +02:00
.and_modify(|msg_queue| msg_queue.last_polled_at = time::Instant::now());
2019-05-10 07:47:29 +02:00
// If the `msg_queue` being polled has any new messages, return the first (oldest) one
match self
.get_target_msg_queue()
2019-07-06 02:08:50 +02:00
.or_insert_with(|| MsgQueue::new(timeline.clone()))
.messages
.pop_front()
{
Some(value) => Ok(Async::Ready(Some(value))),
2019-05-10 07:47:29 +02:00
_ => Ok(Async::NotReady),
}
}
}
2019-05-10 07:47:29 +02:00
impl Drop for Receiver {
fn drop(&mut self) {
2019-07-08 13:31:42 +02:00
pubsub_cmd!("unsubscribe", self, self.timeline.clone());
2019-07-06 02:08:50 +02:00
}
}
#[derive(Debug, Clone)]
pub struct MsgQueue {
pub messages: collections::VecDeque<Value>,
last_polled_at: time::Instant,
pub redis_channel: String,
2019-07-06 02:08:50 +02:00
}
impl MsgQueue {
fn new(redis_channel: impl std::fmt::Display) -> Self {
2019-07-06 02:08:50 +02:00
let redis_channel = redis_channel.to_string();
MsgQueue {
messages: collections::VecDeque::new(),
last_polled_at: time::Instant::now(),
redis_channel,
}
}
}