flodgatt/src/redis_to_client_stream/client_agent.rs

129 lines
5.5 KiB
Rust
Raw Normal View History

2019-07-08 13:31:42 +02:00
//! Provides an interface between the `Warp` filters and the underlying
//! mechanics of talking with Redis/managing multiple threads.
2019-07-06 02:08:50 +02:00
//!
2019-07-08 13:31:42 +02:00
//! The `ClientAgent`'s interface is very simple. All you can do with it is:
//! * Create a totally new `ClientAgent` with no shared data;
//! * Clone an existing `ClientAgent`, sharing the `Receiver`;
//! * Manage an new timeline/user pair; or
2019-07-08 13:31:42 +02:00
//! * Poll an existing `ClientAgent` to see if there are any new messages
2019-07-06 02:08:50 +02:00
//! for clients
//!
2019-07-08 13:31:42 +02:00
//! When you poll the `ClientAgent`, it is responsible for polling internal data
2019-07-06 02:08:50 +02:00
//! structures, getting any updates from Redis, and then filtering out any updates
//! that should be excluded by relevant filters.
//!
//! Because `StreamManagers` are lightweight data structures that do not directly
2019-07-08 13:31:42 +02:00
//! communicate with Redis, it we create a new `ClientAgent` for
//! each new client connection (each in its own thread).use super::{message::Message, receiver::Receiver}
use super::{message::Message, receiver::Receiver};
use crate::{
config,
parse_client_request::user::{PgPool, Subscription},
};
use futures::{
Async::{self, NotReady, Ready},
Poll,
};
use std::sync;
2019-07-06 02:08:50 +02:00
use tokio::io::Error;
use uuid::Uuid;
/// Struct for managing all Redis streams.
2019-10-04 00:02:23 +02:00
#[derive(Clone, Debug)]
2019-07-08 13:31:42 +02:00
pub struct ClientAgent {
2019-07-06 02:08:50 +02:00
receiver: sync::Arc<sync::Mutex<Receiver>>,
id: uuid::Uuid,
// pub current_timeline: String,
subscription: Subscription,
2019-07-06 02:08:50 +02:00
}
2019-07-08 13:31:42 +02:00
impl ClientAgent {
/// Create a new `ClientAgent` with no shared data.
pub fn blank(redis_cfg: config::RedisConfig, pg_pool: PgPool) -> Self {
2019-07-08 13:31:42 +02:00
ClientAgent {
receiver: sync::Arc::new(sync::Mutex::new(Receiver::new(redis_cfg, pg_pool))),
2019-07-06 02:08:50 +02:00
id: Uuid::default(),
subscription: Subscription::default(),
2019-07-06 02:08:50 +02:00
}
}
2019-07-08 13:31:42 +02:00
/// Clones the `ClientAgent`, sharing the `Receiver`.
pub fn clone_with_shared_receiver(&self) -> Self {
Self {
receiver: self.receiver.clone(),
id: self.id,
subscription: self.subscription.clone(),
2019-07-08 13:31:42 +02:00
}
}
/// Initializes the `ClientAgent` with a unique ID associated with a specific user's
/// subscription. Also passes values to the `Receiver` for it's initialization.
2019-07-06 02:08:50 +02:00
///
/// Note that this *may or may not* result in a new Redis connection.
/// If the server has already subscribed to the timeline on behalf of
2019-07-08 13:31:42 +02:00
/// a different user, the `Receiver` is responsible for figuring
2019-07-06 02:08:50 +02:00
/// that out and avoiding duplicated connections. Thus, it is safe to
/// use this method for each new client connection.
pub fn init_for_user(&mut self, subscription: Subscription) {
2019-07-08 13:31:42 +02:00
self.id = Uuid::new_v4();
self.subscription = subscription;
2019-07-06 02:08:50 +02:00
let mut receiver = self.receiver.lock().expect("No thread panic (stream.rs)");
receiver.manage_new_timeline(self.id, self.subscription.timeline);
2019-07-06 02:08:50 +02:00
}
}
2019-07-08 13:31:42 +02:00
/// The stream that the `ClientAgent` manages. `Poll` is the only method implemented.
impl futures::stream::Stream for ClientAgent {
type Item = Message;
2019-07-06 02:08:50 +02:00
type Error = Error;
/// Checks for any new messages that should be sent to the client.
///
2019-07-08 13:31:42 +02:00
/// The `ClientAgent` polls the `Receiver` and replies
/// with `Ok(Ready(Some(Value)))` if there is a new message to send to
2019-07-06 02:08:50 +02:00
/// the client. If there is no new message or if the new message should be
2019-07-08 13:31:42 +02:00
/// filtered out based on one of the user's filters, then the `ClientAgent`
/// replies with `Ok(NotReady)`. The `ClientAgent` bubles up any
2019-07-06 02:08:50 +02:00
/// errors from the underlying data structures.
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let start_time = std::time::Instant::now();
2019-07-06 02:08:50 +02:00
let result = {
let mut receiver = self
.receiver
.lock()
2019-07-08 13:31:42 +02:00
.expect("ClientAgent: No other thread panic");
receiver.configure_for_polling(self.id, self.subscription.timeline);
receiver.poll()
};
if start_time.elapsed().as_millis() > 1 {
log::warn!("Polling the Receiver took: {:?}", start_time.elapsed());
2019-07-06 02:08:50 +02:00
};
2019-07-08 13:31:42 +02:00
let allowed_langs = &self.subscription.allowed_langs;
let blocked_users = &self.subscription.blocks.blocked_users;
let blocking_users = &self.subscription.blocks.blocking_users;
let blocked_domains = &self.subscription.blocks.blocked_domains;
let (send, block) = (|msg| Ok(Ready(Some(msg))), Ok(NotReady));
use Message::*;
2019-07-08 13:31:42 +02:00
match result {
Ok(Async::Ready(Some(json))) => match Message::from_json(json) {
Update(status) if status.language_not_allowed(allowed_langs) => block,
Update(status) if status.involves_blocked_user(blocked_users) => block,
Update(status) if status.from_blocked_domain(blocked_domains) => block,
Update(status) if status.from_blocking_user(blocking_users) => block,
Update(status) => send(Update(status)),
Notification(notification) => send(Notification(notification)),
Conversation(notification) => send(Conversation(notification)),
Delete(status_id) => send(Delete(status_id)),
FiltersChanged => send(FiltersChanged),
Announcement(content) => send(Announcement(content)),
UnknownEvent(event, payload) => send(UnknownEvent(event, payload)),
},
Ok(Ready(None)) => Ok(Ready(None)),
Ok(NotReady) => Ok(NotReady),
2019-07-06 02:08:50 +02:00
Err(e) => Err(e),
2019-07-08 13:31:42 +02:00
}
2019-07-06 02:08:50 +02:00
}
}