From 92c0e5dd10393f93f8440421906b0dbc4fa91bb2 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Tue, 7 Apr 2020 21:06:38 -0400 Subject: [PATCH] Implement Flodgatt pubsub --- src/main.rs | 59 ++++++-- src/messages/event/mod.rs | 3 + src/redis_to_client_stream/event_stream.rs | 167 +++++++++++++-------- src/redis_to_client_stream/mod.rs | 3 +- src/redis_to_client_stream/receiver/mod.rs | 28 +++- 5 files changed, 172 insertions(+), 88 deletions(-) diff --git a/src/main.rs b/src/main.rs index 74b6702..4435ddd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,11 @@ use flodgatt::{ config::{DeploymentConfig, EnvVar, PostgresConfig, RedisConfig}, - parse_client_request::{PgPool, Subscription}, - redis_to_client_stream::{ClientAgent, EventStream, Receiver}, + messages::Event, + parse_client_request::{PgPool, Subscription, Timeline}, + redis_to_client_stream::{EventStream, Receiver}, }; use std::{env, fs, net::SocketAddr, os::unix::fs::PermissionsExt}; -use tokio::net::UnixListener; +use tokio::{net::UnixListener, sync::watch}; use warp::{http::StatusCode, path, ws::Ws2, Filter, Rejection}; fn main() { @@ -23,8 +24,8 @@ fn main() { let cfg = DeploymentConfig::from_env(env_vars); let pg_pool = PgPool::new(postgres_cfg); - - let receiver = Receiver::try_from(redis_cfg) + let (tx, rx) = watch::channel((Timeline::empty(), Event::EventNotReady)); + let receiver = Receiver::try_from(redis_cfg, tx) .unwrap_or_else(|e| { log::error!("{}\nFlodgatt shutting down...", e); std::process::exit(1); @@ -34,38 +35,58 @@ fn main() { // Server Sent Events let sse_receiver = receiver.clone(); - let (sse_interval, whitelist_mode) = (*cfg.sse_interval, *cfg.whitelist_mode); + let sse_rx = rx.clone(); + let whitelist_mode = *cfg.whitelist_mode; let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode) .and(warp::sse()) .map( move |subscription: Subscription, sse_connection_to_client: warp::sse::Sse| { log::info!("Incoming SSE request for {:?}", subscription.timeline); - let mut client_agent = ClientAgent::new(sse_receiver.clone(), &subscription); - client_agent.subscribe(); + { + let mut receiver = sse_receiver.lock().expect("TODO"); + receiver + .add_subscription(&subscription) + .unwrap_or_else(|e| { + log::error!("Could not subscribe to the Redis channel: {}", e) + }); + } + + let sse_rx = sse_rx.clone(); // send the updates through the SSE connection - EventStream::send_to_sse(client_agent, sse_connection_to_client, sse_interval) + EventStream::send_to_sse(sse_connection_to_client, subscription, sse_rx) }, ) .with(warp::reply::with::header("Connection", "keep-alive")); // WebSocket let ws_receiver = receiver.clone(); - let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode); + + let whitelist_mode = *cfg.whitelist_mode; let ws_routes = Subscription::from_ws_request(pg_pool, whitelist_mode) .and(warp::ws::ws2()) .map(move |subscription: Subscription, ws: Ws2| { log::info!("Incoming websocket request for {:?}", subscription.timeline); - let mut client_agent = ClientAgent::new(ws_receiver.clone(), &subscription); - client_agent.subscribe(); + { + let mut receiver = ws_receiver.lock().expect("TODO"); + receiver + .add_subscription(&subscription) + .unwrap_or_else(|e| { + log::error!("Could not subscribe to the Redis channel: {}", e) + }); + } + + let ws_rx = rx.clone(); + let token = subscription + .clone() + .access_token + .unwrap_or_else(String::new); // send the updates through the WS connection // (along with the User's access_token which is sent for security) ( - ws.on_upgrade(move |s| { - EventStream::send_to_ws(s, client_agent, ws_update_interval) - }), - subscription.access_token.unwrap_or_else(String::new), + ws.on_upgrade(move |s| EventStream::send_to_ws(s, subscription, ws_rx)), + token, ) }) .map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token)); @@ -93,6 +114,12 @@ fn main() { #[cfg(not(feature = "stub_status"))] let status_endpoints = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK"); + let receiver = receiver.clone(); + std::thread::spawn(move || loop { + std::thread::sleep(std::time::Duration::from_millis(1000)); + receiver.lock().unwrap().poll_broadcast(); + }); + if let Some(socket) = &*cfg.unix_socket { log::info!("Using Unix socket {}", socket); fs::remove_file(socket).unwrap_or_default(); diff --git a/src/messages/event/mod.rs b/src/messages/event/mod.rs index 133ffa2..e1a8ddc 100644 --- a/src/messages/event/mod.rs +++ b/src/messages/event/mod.rs @@ -11,6 +11,7 @@ use std::string::String; pub enum Event { TypeSafe(CheckedEvent), Dynamic(DynamicEvent), + EventNotReady, } impl Event { @@ -37,6 +38,7 @@ impl Event { CheckedEvent::FiltersChanged => "filters_changed", }, Self::Dynamic(dyn_event) => &dyn_event.event, + Self::EventNotReady => panic!("event_name() called on EventNotReady"), }) } @@ -54,6 +56,7 @@ impl Event { FiltersChanged => None, }, Self::Dynamic(dyn_event) => Some(dyn_event.payload.to_string()), + Self::EventNotReady => panic!("payload() called on EventNotReady"), } } } diff --git a/src/redis_to_client_stream/event_stream.rs b/src/redis_to_client_stream/event_stream.rs index 8c28ecb..4603a3f 100644 --- a/src/redis_to_client_stream/event_stream.rs +++ b/src/redis_to_client_stream/event_stream.rs @@ -1,8 +1,10 @@ -use super::ClientAgent; +use crate::messages::Event; +use crate::parse_client_request::{Subscription, Timeline}; -use futures::{future::Future, stream::Stream, Async}; +use futures::{future::Future, stream::Stream}; use log; -use std::time::{Duration, Instant}; +use std::time::Duration; +use tokio::sync::watch; use warp::{ reply::Reply, sse::Sse, @@ -14,17 +16,19 @@ impl EventStream { /// Send a stream of replies to a WebSocket client. pub fn send_to_ws( ws: WebSocket, - mut client_agent: ClientAgent, - interval: Duration, + subscription: Subscription, + ws_rx: watch::Receiver<(Timeline, Event)>, ) -> impl Future { let (transmit_to_ws, _receive_from_ws) = ws.split(); - let timeline = client_agent.subscription.timeline; + let target_timeline = subscription.timeline; + let allowed_langs = subscription.allowed_langs; + let blocks = subscription.blocks; // Create a pipe let (tx, rx) = futures::sync::mpsc::unbounded(); - // Send one end of it to a different thread and tell that end to forward whatever it gets - // on to the WebSocket client + // Send one end of it to a different green thread and tell that end to forward + // whatever it gets on to the WebSocket client warp::spawn( rx.map_err(|()| -> warp::Error { unreachable!() }) .forward(transmit_to_ws) @@ -35,70 +39,105 @@ impl EventStream { }), ); - let mut last_ping_time = Instant::now(); - tokio::timer::Interval::new(Instant::now(), interval) - .take_while(move |_| { - // Right now, we do not need to see if we have any messages _from_ the - // WebSocket connection because the API doesn't support clients sending - // commands via the WebSocket. However, if the [stream multiplexing API - // change](github.com/tootsuite/flodgatt/issues/121) is implemented, we'll - // need to receive messages from the client. If so, we'll need a - // `receive_from_ws.poll() call here (or later)` - match client_agent.poll() { - Ok(Async::NotReady) => { - if last_ping_time.elapsed() > Duration::from_secs(30) { - last_ping_time = Instant::now(); - match tx.unbounded_send(Message::text("{}")) { - Ok(_) => futures::future::ok(true), - Err(_) => client_agent.disconnect(), - } - } else { - futures::future::ok(true) - } - } - Ok(Async::Ready(Some(msg))) => { - match tx.unbounded_send(Message::text(msg.to_json_string())) { - Ok(_) => futures::future::ok(true), - Err(_) => client_agent.disconnect(), - } - } - Err(e) => { - log::error!("{}\n Dropping WebSocket message and continuing.", e); - futures::future::ok(true) - } - Ok(Async::Ready(None)) => { - log::info!("WebSocket ClientAgent got Ready(None)"); - futures::future::ok(true) + return ws_rx + .for_each(move |(timeline, event)| { + if target_timeline == timeline { + log::info!("Got event for {:?}", timeline); + use crate::messages::{CheckedEvent::Update, Event::*}; + use crate::parse_client_request::Stream::Public; + match event { + TypeSafe(Update { payload, queued_at }) => match timeline { + Timeline(Public, _, _) if payload.language_not(&allowed_langs) => (), + _ if payload.involves_any(&blocks) => (), + // TODO filter vvvv + _ => tx + .unbounded_send(Message::text( + TypeSafe(Update { payload, queued_at }).to_json_string(), + )) + .expect("TODO"), + }, + TypeSafe(non_update) => tx + .unbounded_send(Message::text(TypeSafe(non_update).to_json_string())) + .expect("TODO"), + Dynamic(event) if event.event == "update" => match timeline { + Timeline(Public, _, _) if event.language_not(&allowed_langs) => (), + _ if event.involves_any(&blocks) => (), + // TODO filter vvvv + _ => tx + .unbounded_send(Message::text(Dynamic(event).to_json_string())) + .expect("TODO"), + }, + Dynamic(non_update) => tx + .unbounded_send(Message::text(Dynamic(non_update).to_json_string())) + .expect("TODO"), + EventNotReady => panic!("TODO"), } } + Ok(()) }) - .for_each(move |_instant| Ok(())) - .then(move |result| { - log::info!("WebSocket connection for {:?} closed.", timeline); - result - }) - .map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e)) + .map_err(|_| ()); } - pub fn send_to_sse(mut client_agent: ClientAgent, sse: Sse, interval: Duration) -> impl Reply { - let event_stream = - tokio::timer::Interval::new(Instant::now(), interval).filter_map(move |_| { - match client_agent.poll() { - Ok(Async::Ready(Some(event))) => Some(( - warp::sse::event(event.event_name()), - warp::sse::data(event.payload().unwrap_or_else(String::new)), - )), - Ok(Async::Ready(None)) => { - log::info!("SSE ClientAgent got Ready(None)"); - None + pub fn send_to_sse( + sse: Sse, + subscription: Subscription, + sse_rx: watch::Receiver<(Timeline, Event)>, + ) -> impl Reply { + let target_timeline = subscription.timeline; + let allowed_langs = subscription.allowed_langs; + let blocks = subscription.blocks; + + let event_stream = sse_rx.filter_map(move |(timeline, event)| { + if target_timeline == timeline { + log::info!("Got event for {:?}", timeline); + use crate::messages::{CheckedEvent, CheckedEvent::Update, Event::*}; + use crate::parse_client_request::Stream::Public; + match event { + TypeSafe(Update { payload, queued_at }) => match timeline { + Timeline(Public, _, _) if payload.language_not(&allowed_langs) => None, + _ if payload.involves_any(&blocks) => None, + // TODO filter vvvv + _ => { + let event = + Event::TypeSafe(CheckedEvent::Update { payload, queued_at }); + Some(( + warp::sse::event(event.event_name()), + warp::sse::data(event.payload().unwrap_or_else(String::new)), + )) + } + }, + TypeSafe(non_update) => { + let event = Event::TypeSafe(non_update); + Some(( + warp::sse::event(event.event_name()), + warp::sse::data(event.payload().unwrap_or_else(String::new)), + )) } - Ok(Async::NotReady) => None, - Err(e) => { - log::error!("{}\n Dropping SSE message and continuing.", e); - None + Dynamic(event) if event.event == "update" => match timeline { + Timeline(Public, _, _) if event.language_not(&allowed_langs) => None, + _ if event.involves_any(&blocks) => None, + // TODO filter vvvv + _ => { + let event = Event::Dynamic(event); + Some(( + warp::sse::event(event.event_name()), + warp::sse::data(event.payload().unwrap_or_else(String::new)), + )) + } + }, + Dynamic(non_update) => { + let event = Event::Dynamic(non_update); + Some(( + warp::sse::event(event.event_name()), + warp::sse::data(event.payload().unwrap_or_else(String::new)), + )) } + EventNotReady => panic!("TODO"), } - }); + } else { + None + } + }); sse.reply( warp::sse::keep_alive() diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index 3f419d1..78ced45 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -1,10 +1,9 @@ //! Stream the updates appropriate for a given `User`/`timeline` pair from Redis. -mod client_agent; mod event_stream; mod receiver; mod redis; -pub use {client_agent::ClientAgent, event_stream::EventStream, receiver::Receiver}; +pub use {event_stream::EventStream, receiver::Receiver}; #[cfg(feature = "bench")] pub use redis::redis_msg::{RedisMsg, RedisParseOutput}; diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index 2df1cbc..d487763 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -18,6 +18,7 @@ use crate::{ use { futures::{Async, Poll}, hashbrown::HashMap, + tokio::sync::watch, uuid::Uuid, }; @@ -37,12 +38,16 @@ pub struct Receiver { redis_polled_at: Instant, pub msg_queues: MessageQueues, clients_per_timeline: HashMap, + channel: watch::Sender<(Timeline, Event)>, } impl Receiver { /// Create a new `Receiver`, with its own Redis connections (but, as yet, no /// active subscriptions). - pub fn try_from(redis_cfg: config::RedisConfig) -> Result { + pub fn try_from( + redis_cfg: config::RedisConfig, + sender: watch::Sender<(Timeline, Event)>, + ) -> Result { let redis_poll_interval = *redis_cfg.polling_interval; let redis_connection = RedisConn::new(redis_cfg)?; @@ -52,6 +57,7 @@ impl Receiver { redis_connection, msg_queues: MessageQueues(HashMap::new()), clients_per_timeline: HashMap::new(), + channel: sender, }) } @@ -105,6 +111,18 @@ impl Receiver { Ok(()) } + pub fn poll_broadcast(&mut self) { + loop { + match self.redis_connection.poll_redis() { + Ok(Async::NotReady) => break, + Ok(Async::Ready(Some((timeline, event)))) => { + self.channel.broadcast((timeline, event)).expect("TODO"); + } + Ok(Async::Ready(None)) => (), // subscription cmd or msg for other namespace + Err(_err) => panic!("TODO"), + } + } + } /// Returns the oldest message in the `ClientAgent`'s queue (if any). /// @@ -113,8 +131,8 @@ impl Receiver { /// message already in a queue. Thus, we only poll Redis if it has not /// been polled lately. pub fn poll_for(&mut self, id: Uuid) -> Poll, ReceiverErr> { - // let (t1, mut polled_redis) = (Instant::now(), false); if self.redis_polled_at.elapsed() > self.redis_poll_interval { + log::info!("Polling Redis"); loop { match self.redis_connection.poll_redis() { Ok(Async::NotReady) => break, @@ -130,7 +148,7 @@ impl Receiver { Err(err) => Err(err)?, } } - // polled_redis = true; + self.redis_polled_at = Instant::now(); } @@ -140,9 +158,7 @@ impl Receiver { Some(event) => Ok(Async::Ready(Some(event))), None => Ok(Async::NotReady), }; - // if !polled_redis { - // log::info!("poll_for in {:?}", t1.elapsed()); - // } + res }