2019-07-08 13:31:42 +02:00
|
|
|
//! Receives data from Redis, sorts it by `ClientAgent`, and stores it until
|
|
|
|
//! polled by the correct `ClientAgent`. Also manages sububscriptions and
|
|
|
|
//! unsubscriptions to/from Redis.
|
|
|
|
use super::redis_cmd;
|
|
|
|
use crate::{config, pubsub_cmd};
|
2019-04-30 15:44:51 +02:00
|
|
|
use futures::{Async, Poll};
|
|
|
|
use log::info;
|
|
|
|
use regex::Regex;
|
|
|
|
use serde_json::Value;
|
2019-07-08 13:31:42 +02:00
|
|
|
use std::{collections, env, io::Read, io::Write, net, time};
|
2019-05-10 07:47:29 +02:00
|
|
|
use tokio::io::{AsyncRead, Error};
|
|
|
|
use uuid::Uuid;
|
2019-05-09 05:02:01 +02:00
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
/// The item that streams from Redis and is polled by the `StreamManager`
|
2019-04-30 15:44:51 +02:00
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct Receiver {
|
2019-07-06 02:08:50 +02:00
|
|
|
pubsub_connection: net::TcpStream,
|
|
|
|
secondary_redis_connection: net::TcpStream,
|
2019-07-08 13:31:42 +02:00
|
|
|
redis_polled_at: time::Instant,
|
|
|
|
timeline: String,
|
2019-05-09 05:02:01 +02:00
|
|
|
manager_id: Uuid,
|
2019-07-06 02:08:50 +02:00
|
|
|
msg_queues: collections::HashMap<Uuid, MsgQueue>,
|
|
|
|
clients_per_timeline: collections::HashMap<String, i32>,
|
2019-05-09 17:52:05 +02:00
|
|
|
}
|
2019-07-06 02:08:50 +02:00
|
|
|
|
2019-04-30 15:44:51 +02:00
|
|
|
impl Receiver {
|
2019-07-06 02:08:50 +02:00
|
|
|
/// Create a new `Receiver`, with its own Redis connections (but, as yet, no
|
|
|
|
/// active subscriptions).
|
2019-04-30 15:44:51 +02:00
|
|
|
pub fn new() -> Self {
|
2019-07-06 02:08:50 +02:00
|
|
|
let (pubsub_connection, secondary_redis_connection) = config::redis_addr();
|
2019-04-30 15:44:51 +02:00
|
|
|
Self {
|
2019-05-10 07:47:29 +02:00
|
|
|
pubsub_connection,
|
|
|
|
secondary_redis_connection,
|
2019-07-08 13:31:42 +02:00
|
|
|
redis_polled_at: time::Instant::now(),
|
|
|
|
timeline: String::new(),
|
2019-07-06 02:08:50 +02:00
|
|
|
manager_id: Uuid::default(),
|
|
|
|
msg_queues: collections::HashMap::new(),
|
|
|
|
clients_per_timeline: collections::HashMap::new(),
|
2019-04-30 15:44:51 +02:00
|
|
|
}
|
|
|
|
}
|
2019-05-10 07:47:29 +02:00
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
/// Assigns the `Receiver` a new timeline to monitor and runs other
|
|
|
|
/// first-time setup.
|
|
|
|
///
|
2019-07-08 13:31:42 +02:00
|
|
|
/// Note: this method calls `subscribe_or_unsubscribe_as_needed`,
|
2019-07-06 02:08:50 +02:00
|
|
|
/// so Redis PubSub subscriptions are only updated when a new timeline
|
|
|
|
/// comes under management for the first time.
|
|
|
|
pub fn manage_new_timeline(&mut self, manager_id: Uuid, timeline: &str) {
|
|
|
|
self.manager_id = manager_id;
|
2019-07-08 13:31:42 +02:00
|
|
|
self.timeline = timeline.to_string();
|
|
|
|
self.msg_queues
|
2019-07-06 02:08:50 +02:00
|
|
|
.insert(self.manager_id, MsgQueue::new(timeline));
|
|
|
|
self.subscribe_or_unsubscribe_as_needed(timeline);
|
2019-04-30 15:44:51 +02:00
|
|
|
}
|
2019-05-10 07:47:29 +02:00
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
/// Set the `Receiver`'s manager_id and target_timeline fields to the approprate
|
|
|
|
/// value to be polled by the current `StreamManager`.
|
|
|
|
pub fn configure_for_polling(&mut self, manager_id: Uuid, timeline: &str) {
|
|
|
|
self.manager_id = manager_id;
|
2019-07-08 13:31:42 +02:00
|
|
|
self.timeline = timeline.to_string();
|
2019-05-10 07:47:29 +02:00
|
|
|
}
|
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
/// Drop any PubSub subscriptions that don't have active clients and check
|
|
|
|
/// that there's a subscription to the current one. If there isn't, then
|
|
|
|
/// subscribe to it.
|
2019-07-08 13:31:42 +02:00
|
|
|
fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: &str) {
|
2019-07-06 02:08:50 +02:00
|
|
|
let mut timelines_to_modify = Vec::new();
|
2019-07-08 13:31:42 +02:00
|
|
|
struct Change {
|
|
|
|
timeline: String,
|
|
|
|
change_in_subscriber_number: i32,
|
|
|
|
}
|
|
|
|
|
|
|
|
timelines_to_modify.push(Change {
|
|
|
|
timeline: timeline.to_owned(),
|
|
|
|
change_in_subscriber_number: 1,
|
|
|
|
});
|
2019-05-10 07:47:29 +02:00
|
|
|
|
|
|
|
// Keep only message queues that have been polled recently
|
|
|
|
self.msg_queues.retain(|_id, msg_queue| {
|
2019-07-06 02:08:50 +02:00
|
|
|
if msg_queue.last_polled_at.elapsed() < time::Duration::from_secs(30) {
|
2019-05-09 05:02:01 +02:00
|
|
|
true
|
2019-05-10 07:47:29 +02:00
|
|
|
} else {
|
2019-07-08 13:31:42 +02:00
|
|
|
let timeline = &msg_queue.redis_channel;
|
|
|
|
timelines_to_modify.push(Change {
|
|
|
|
timeline: timeline.to_owned(),
|
|
|
|
change_in_subscriber_number: -1,
|
|
|
|
});
|
2019-05-10 07:47:29 +02:00
|
|
|
false
|
2019-05-09 05:02:01 +02:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2019-05-10 07:47:29 +02:00
|
|
|
// Record the lower number of clients subscribed to that channel
|
2019-07-08 13:31:42 +02:00
|
|
|
for change in timelines_to_modify {
|
2019-07-06 02:08:50 +02:00
|
|
|
let mut need_to_subscribe = false;
|
2019-05-09 05:02:01 +02:00
|
|
|
let count_of_subscribed_clients = self
|
2019-05-10 07:47:29 +02:00
|
|
|
.clients_per_timeline
|
2019-07-08 13:31:42 +02:00
|
|
|
.entry(change.timeline.clone())
|
|
|
|
.and_modify(|n| *n += change.change_in_subscriber_number)
|
2019-07-06 02:08:50 +02:00
|
|
|
.or_insert_with(|| {
|
|
|
|
need_to_subscribe = true;
|
|
|
|
1
|
|
|
|
});
|
2019-05-10 07:47:29 +02:00
|
|
|
// If no clients, unsubscribe from the channel
|
2019-05-09 05:02:01 +02:00
|
|
|
if *count_of_subscribed_clients <= 0 {
|
2019-07-06 02:08:50 +02:00
|
|
|
info!("Sent unsubscribe command");
|
2019-07-08 13:31:42 +02:00
|
|
|
pubsub_cmd!("unsubscribe", self, change.timeline.clone());
|
2019-07-06 02:08:50 +02:00
|
|
|
}
|
|
|
|
if need_to_subscribe {
|
|
|
|
info!("Sent subscribe command");
|
2019-07-08 13:31:42 +02:00
|
|
|
pubsub_cmd!("subscribe", self, change.timeline.clone());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Polls Redis for any new messages and adds them to the `MsgQueue` for
|
|
|
|
/// the appropriate `ClientAgent`.
|
|
|
|
fn poll_redis(&mut self) {
|
|
|
|
let mut buffer = vec![0u8; 3000];
|
|
|
|
// Add any incoming messages to the back of the relevant `msg_queues`
|
|
|
|
// NOTE: This could be more/other than the `msg_queue` currently being polled
|
|
|
|
let mut async_stream = AsyncReadableStream::new(&mut self.pubsub_connection);
|
|
|
|
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer).unwrap() {
|
|
|
|
let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]);
|
|
|
|
// capture everything between `{` and `}` as potential JSON
|
|
|
|
let json_regex = Regex::new(r"(?P<json>\{.*\})").expect("Hard-coded");
|
|
|
|
// capture the timeline so we know which queues to add it to
|
|
|
|
let timeline_regex = Regex::new(r"timeline:(?P<timeline>.*?)\r").expect("Hard-codded");
|
|
|
|
if let Some(result) = json_regex.captures(raw_redis_response) {
|
|
|
|
let timeline =
|
|
|
|
timeline_regex.captures(raw_redis_response).unwrap()["timeline"].to_string();
|
|
|
|
|
|
|
|
let msg: Value = serde_json::from_str(&result["json"].to_string().clone()).unwrap();
|
|
|
|
for msg_queue in self.msg_queues.values_mut() {
|
|
|
|
if msg_queue.redis_channel == timeline {
|
|
|
|
msg_queue.messages.push_back(msg.clone());
|
|
|
|
}
|
|
|
|
}
|
2019-05-09 05:02:01 +02:00
|
|
|
}
|
|
|
|
}
|
2019-04-30 15:44:51 +02:00
|
|
|
}
|
2019-05-10 07:47:29 +02:00
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
fn log_number_of_msgs_in_queue(&self) {
|
|
|
|
let messages_waiting = self
|
|
|
|
.msg_queues
|
|
|
|
.get(&self.manager_id)
|
|
|
|
.expect("Guaranteed by match block")
|
|
|
|
.messages
|
|
|
|
.len();
|
|
|
|
match messages_waiting {
|
|
|
|
number if number > 10 => {
|
|
|
|
log::error!("{} messages waiting in the queue", messages_waiting)
|
|
|
|
}
|
|
|
|
_ => log::info!("{} messages waiting in the queue", messages_waiting),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-07-08 13:31:42 +02:00
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
impl Default for Receiver {
|
|
|
|
fn default() -> Self {
|
|
|
|
Receiver::new()
|
2019-04-30 15:44:51 +02:00
|
|
|
}
|
|
|
|
}
|
2019-07-06 02:08:50 +02:00
|
|
|
|
2019-07-08 13:31:42 +02:00
|
|
|
/// The stream that the ClientAgent polls to learn about new messages.
|
2019-07-06 02:08:50 +02:00
|
|
|
impl futures::stream::Stream for Receiver {
|
2019-04-30 15:44:51 +02:00
|
|
|
type Item = Value;
|
|
|
|
type Error = Error;
|
|
|
|
|
2019-07-08 13:31:42 +02:00
|
|
|
/// Returns the oldest message in the `ClientAgent`'s queue (if any).
|
|
|
|
///
|
|
|
|
/// Note: This method does **not** poll Redis every time, because polling
|
|
|
|
/// Redis is signifiantly more time consuming that simply returning the
|
|
|
|
/// message already in a queue. Thus, we only poll Redis if it has not
|
|
|
|
/// been polled lately.
|
2019-04-30 15:44:51 +02:00
|
|
|
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
|
2019-07-08 13:31:42 +02:00
|
|
|
let timeline = self.timeline.clone();
|
|
|
|
|
|
|
|
let redis_poll_interval = env::var("REDIS_POLL_INTERVAL")
|
|
|
|
.map(|s| s.parse().expect("Valid config"))
|
|
|
|
.unwrap_or(config::DEFAULT_REDIS_POLL_INTERVAL);
|
|
|
|
|
|
|
|
if self.redis_polled_at.elapsed() > time::Duration::from_millis(redis_poll_interval) {
|
|
|
|
self.poll_redis();
|
|
|
|
self.redis_polled_at = time::Instant::now();
|
|
|
|
}
|
2019-04-30 15:44:51 +02:00
|
|
|
|
2019-05-10 07:47:29 +02:00
|
|
|
// Record current time as last polled time
|
|
|
|
self.msg_queues
|
|
|
|
.entry(self.manager_id)
|
2019-07-06 02:08:50 +02:00
|
|
|
.and_modify(|msg_queue| msg_queue.last_polled_at = time::Instant::now());
|
2019-04-30 15:44:51 +02:00
|
|
|
|
2019-05-10 07:47:29 +02:00
|
|
|
// If the `msg_queue` being polled has any new messages, return the first (oldest) one
|
|
|
|
match self
|
|
|
|
.msg_queues
|
|
|
|
.entry(self.manager_id)
|
2019-07-06 02:08:50 +02:00
|
|
|
.or_insert_with(|| MsgQueue::new(timeline.clone()))
|
2019-05-09 05:02:01 +02:00
|
|
|
.messages
|
|
|
|
.pop_front()
|
|
|
|
{
|
2019-07-06 02:08:50 +02:00
|
|
|
Some(value) => {
|
|
|
|
self.log_number_of_msgs_in_queue();
|
|
|
|
Ok(Async::Ready(Some(value)))
|
|
|
|
}
|
2019-05-10 07:47:29 +02:00
|
|
|
_ => Ok(Async::NotReady),
|
2019-04-30 15:44:51 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-05-10 07:47:29 +02:00
|
|
|
|
2019-04-30 15:44:51 +02:00
|
|
|
impl Drop for Receiver {
|
|
|
|
fn drop(&mut self) {
|
2019-07-08 13:31:42 +02:00
|
|
|
pubsub_cmd!("unsubscribe", self, self.timeline.clone());
|
2019-07-06 02:08:50 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
struct MsgQueue {
|
|
|
|
pub messages: collections::VecDeque<Value>,
|
|
|
|
pub last_polled_at: time::Instant,
|
|
|
|
pub redis_channel: String,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl MsgQueue {
|
|
|
|
pub fn new(redis_channel: impl std::fmt::Display) -> Self {
|
|
|
|
let redis_channel = redis_channel.to_string();
|
|
|
|
MsgQueue {
|
|
|
|
messages: collections::VecDeque::new(),
|
|
|
|
last_polled_at: time::Instant::now(),
|
|
|
|
redis_channel,
|
|
|
|
}
|
2019-04-30 15:44:51 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-07-06 02:08:50 +02:00
|
|
|
struct AsyncReadableStream<'a>(&'a mut net::TcpStream);
|
|
|
|
impl<'a> AsyncReadableStream<'a> {
|
|
|
|
pub fn new(stream: &'a mut net::TcpStream) -> Self {
|
|
|
|
AsyncReadableStream(stream)
|
|
|
|
}
|
|
|
|
}
|
2019-04-30 15:44:51 +02:00
|
|
|
|
|
|
|
impl<'a> Read for AsyncReadableStream<'a> {
|
|
|
|
fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> {
|
|
|
|
self.0.read(buffer)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> AsyncRead for AsyncReadableStream<'a> {
|
|
|
|
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
|
|
|
|
match self.read(buf) {
|
|
|
|
Ok(t) => Ok(Async::Ready(t)),
|
|
|
|
Err(_) => Ok(Async::NotReady),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|