Implement Flodgatt pubsub

This commit is contained in:
Daniel Sockwell 2020-04-07 21:06:38 -04:00
parent 17afe4697d
commit 92c0e5dd10
5 changed files with 172 additions and 88 deletions

View File

@ -1,10 +1,11 @@
use flodgatt::{
config::{DeploymentConfig, EnvVar, PostgresConfig, RedisConfig},
parse_client_request::{PgPool, Subscription},
redis_to_client_stream::{ClientAgent, EventStream, Receiver},
messages::Event,
parse_client_request::{PgPool, Subscription, Timeline},
redis_to_client_stream::{EventStream, Receiver},
};
use std::{env, fs, net::SocketAddr, os::unix::fs::PermissionsExt};
use tokio::net::UnixListener;
use tokio::{net::UnixListener, sync::watch};
use warp::{http::StatusCode, path, ws::Ws2, Filter, Rejection};
fn main() {
@ -23,8 +24,8 @@ fn main() {
let cfg = DeploymentConfig::from_env(env_vars);
let pg_pool = PgPool::new(postgres_cfg);
let receiver = Receiver::try_from(redis_cfg)
let (tx, rx) = watch::channel((Timeline::empty(), Event::EventNotReady));
let receiver = Receiver::try_from(redis_cfg, tx)
.unwrap_or_else(|e| {
log::error!("{}\nFlodgatt shutting down...", e);
std::process::exit(1);
@ -34,38 +35,58 @@ fn main() {
// Server Sent Events
let sse_receiver = receiver.clone();
let (sse_interval, whitelist_mode) = (*cfg.sse_interval, *cfg.whitelist_mode);
let sse_rx = rx.clone();
let whitelist_mode = *cfg.whitelist_mode;
let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode)
.and(warp::sse())
.map(
move |subscription: Subscription, sse_connection_to_client: warp::sse::Sse| {
log::info!("Incoming SSE request for {:?}", subscription.timeline);
let mut client_agent = ClientAgent::new(sse_receiver.clone(), &subscription);
client_agent.subscribe();
{
let mut receiver = sse_receiver.lock().expect("TODO");
receiver
.add_subscription(&subscription)
.unwrap_or_else(|e| {
log::error!("Could not subscribe to the Redis channel: {}", e)
});
}
let sse_rx = sse_rx.clone();
// send the updates through the SSE connection
EventStream::send_to_sse(client_agent, sse_connection_to_client, sse_interval)
EventStream::send_to_sse(sse_connection_to_client, subscription, sse_rx)
},
)
.with(warp::reply::with::header("Connection", "keep-alive"));
// WebSocket
let ws_receiver = receiver.clone();
let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode);
let whitelist_mode = *cfg.whitelist_mode;
let ws_routes = Subscription::from_ws_request(pg_pool, whitelist_mode)
.and(warp::ws::ws2())
.map(move |subscription: Subscription, ws: Ws2| {
log::info!("Incoming websocket request for {:?}", subscription.timeline);
let mut client_agent = ClientAgent::new(ws_receiver.clone(), &subscription);
client_agent.subscribe();
{
let mut receiver = ws_receiver.lock().expect("TODO");
receiver
.add_subscription(&subscription)
.unwrap_or_else(|e| {
log::error!("Could not subscribe to the Redis channel: {}", e)
});
}
let ws_rx = rx.clone();
let token = subscription
.clone()
.access_token
.unwrap_or_else(String::new);
// send the updates through the WS connection
// (along with the User's access_token which is sent for security)
(
ws.on_upgrade(move |s| {
EventStream::send_to_ws(s, client_agent, ws_update_interval)
}),
subscription.access_token.unwrap_or_else(String::new),
ws.on_upgrade(move |s| EventStream::send_to_ws(s, subscription, ws_rx)),
token,
)
})
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
@ -93,6 +114,12 @@ fn main() {
#[cfg(not(feature = "stub_status"))]
let status_endpoints = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK");
let receiver = receiver.clone();
std::thread::spawn(move || loop {
std::thread::sleep(std::time::Duration::from_millis(1000));
receiver.lock().unwrap().poll_broadcast();
});
if let Some(socket) = &*cfg.unix_socket {
log::info!("Using Unix socket {}", socket);
fs::remove_file(socket).unwrap_or_default();

View File

@ -11,6 +11,7 @@ use std::string::String;
pub enum Event {
TypeSafe(CheckedEvent),
Dynamic(DynamicEvent),
EventNotReady,
}
impl Event {
@ -37,6 +38,7 @@ impl Event {
CheckedEvent::FiltersChanged => "filters_changed",
},
Self::Dynamic(dyn_event) => &dyn_event.event,
Self::EventNotReady => panic!("event_name() called on EventNotReady"),
})
}
@ -54,6 +56,7 @@ impl Event {
FiltersChanged => None,
},
Self::Dynamic(dyn_event) => Some(dyn_event.payload.to_string()),
Self::EventNotReady => panic!("payload() called on EventNotReady"),
}
}
}

View File

@ -1,8 +1,10 @@
use super::ClientAgent;
use crate::messages::Event;
use crate::parse_client_request::{Subscription, Timeline};
use futures::{future::Future, stream::Stream, Async};
use futures::{future::Future, stream::Stream};
use log;
use std::time::{Duration, Instant};
use std::time::Duration;
use tokio::sync::watch;
use warp::{
reply::Reply,
sse::Sse,
@ -14,17 +16,19 @@ impl EventStream {
/// Send a stream of replies to a WebSocket client.
pub fn send_to_ws(
ws: WebSocket,
mut client_agent: ClientAgent,
interval: Duration,
subscription: Subscription,
ws_rx: watch::Receiver<(Timeline, Event)>,
) -> impl Future<Item = (), Error = ()> {
let (transmit_to_ws, _receive_from_ws) = ws.split();
let timeline = client_agent.subscription.timeline;
let target_timeline = subscription.timeline;
let allowed_langs = subscription.allowed_langs;
let blocks = subscription.blocks;
// Create a pipe
let (tx, rx) = futures::sync::mpsc::unbounded();
// Send one end of it to a different thread and tell that end to forward whatever it gets
// on to the WebSocket client
// Send one end of it to a different green thread and tell that end to forward
// whatever it gets on to the WebSocket client
warp::spawn(
rx.map_err(|()| -> warp::Error { unreachable!() })
.forward(transmit_to_ws)
@ -35,70 +39,105 @@ impl EventStream {
}),
);
let mut last_ping_time = Instant::now();
tokio::timer::Interval::new(Instant::now(), interval)
.take_while(move |_| {
// Right now, we do not need to see if we have any messages _from_ the
// WebSocket connection because the API doesn't support clients sending
// commands via the WebSocket. However, if the [stream multiplexing API
// change](github.com/tootsuite/flodgatt/issues/121) is implemented, we'll
// need to receive messages from the client. If so, we'll need a
// `receive_from_ws.poll() call here (or later)`
match client_agent.poll() {
Ok(Async::NotReady) => {
if last_ping_time.elapsed() > Duration::from_secs(30) {
last_ping_time = Instant::now();
match tx.unbounded_send(Message::text("{}")) {
Ok(_) => futures::future::ok(true),
Err(_) => client_agent.disconnect(),
}
} else {
futures::future::ok(true)
}
}
Ok(Async::Ready(Some(msg))) => {
match tx.unbounded_send(Message::text(msg.to_json_string())) {
Ok(_) => futures::future::ok(true),
Err(_) => client_agent.disconnect(),
}
}
Err(e) => {
log::error!("{}\n Dropping WebSocket message and continuing.", e);
futures::future::ok(true)
}
Ok(Async::Ready(None)) => {
log::info!("WebSocket ClientAgent got Ready(None)");
futures::future::ok(true)
return ws_rx
.for_each(move |(timeline, event)| {
if target_timeline == timeline {
log::info!("Got event for {:?}", timeline);
use crate::messages::{CheckedEvent::Update, Event::*};
use crate::parse_client_request::Stream::Public;
match event {
TypeSafe(Update { payload, queued_at }) => match timeline {
Timeline(Public, _, _) if payload.language_not(&allowed_langs) => (),
_ if payload.involves_any(&blocks) => (),
// TODO filter vvvv
_ => tx
.unbounded_send(Message::text(
TypeSafe(Update { payload, queued_at }).to_json_string(),
))
.expect("TODO"),
},
TypeSafe(non_update) => tx
.unbounded_send(Message::text(TypeSafe(non_update).to_json_string()))
.expect("TODO"),
Dynamic(event) if event.event == "update" => match timeline {
Timeline(Public, _, _) if event.language_not(&allowed_langs) => (),
_ if event.involves_any(&blocks) => (),
// TODO filter vvvv
_ => tx
.unbounded_send(Message::text(Dynamic(event).to_json_string()))
.expect("TODO"),
},
Dynamic(non_update) => tx
.unbounded_send(Message::text(Dynamic(non_update).to_json_string()))
.expect("TODO"),
EventNotReady => panic!("TODO"),
}
}
Ok(())
})
.for_each(move |_instant| Ok(()))
.then(move |result| {
log::info!("WebSocket connection for {:?} closed.", timeline);
result
})
.map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e))
.map_err(|_| ());
}
pub fn send_to_sse(mut client_agent: ClientAgent, sse: Sse, interval: Duration) -> impl Reply {
let event_stream =
tokio::timer::Interval::new(Instant::now(), interval).filter_map(move |_| {
match client_agent.poll() {
Ok(Async::Ready(Some(event))) => Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
)),
Ok(Async::Ready(None)) => {
log::info!("SSE ClientAgent got Ready(None)");
None
pub fn send_to_sse(
sse: Sse,
subscription: Subscription,
sse_rx: watch::Receiver<(Timeline, Event)>,
) -> impl Reply {
let target_timeline = subscription.timeline;
let allowed_langs = subscription.allowed_langs;
let blocks = subscription.blocks;
let event_stream = sse_rx.filter_map(move |(timeline, event)| {
if target_timeline == timeline {
log::info!("Got event for {:?}", timeline);
use crate::messages::{CheckedEvent, CheckedEvent::Update, Event::*};
use crate::parse_client_request::Stream::Public;
match event {
TypeSafe(Update { payload, queued_at }) => match timeline {
Timeline(Public, _, _) if payload.language_not(&allowed_langs) => None,
_ if payload.involves_any(&blocks) => None,
// TODO filter vvvv
_ => {
let event =
Event::TypeSafe(CheckedEvent::Update { payload, queued_at });
Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
))
}
},
TypeSafe(non_update) => {
let event = Event::TypeSafe(non_update);
Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
))
}
Ok(Async::NotReady) => None,
Err(e) => {
log::error!("{}\n Dropping SSE message and continuing.", e);
None
Dynamic(event) if event.event == "update" => match timeline {
Timeline(Public, _, _) if event.language_not(&allowed_langs) => None,
_ if event.involves_any(&blocks) => None,
// TODO filter vvvv
_ => {
let event = Event::Dynamic(event);
Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
))
}
},
Dynamic(non_update) => {
let event = Event::Dynamic(non_update);
Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
))
}
EventNotReady => panic!("TODO"),
}
});
} else {
None
}
});
sse.reply(
warp::sse::keep_alive()

View File

@ -1,10 +1,9 @@
//! Stream the updates appropriate for a given `User`/`timeline` pair from Redis.
mod client_agent;
mod event_stream;
mod receiver;
mod redis;
pub use {client_agent::ClientAgent, event_stream::EventStream, receiver::Receiver};
pub use {event_stream::EventStream, receiver::Receiver};
#[cfg(feature = "bench")]
pub use redis::redis_msg::{RedisMsg, RedisParseOutput};

View File

@ -18,6 +18,7 @@ use crate::{
use {
futures::{Async, Poll},
hashbrown::HashMap,
tokio::sync::watch,
uuid::Uuid,
};
@ -37,12 +38,16 @@ pub struct Receiver {
redis_polled_at: Instant,
pub msg_queues: MessageQueues,
clients_per_timeline: HashMap<Timeline, i32>,
channel: watch::Sender<(Timeline, Event)>,
}
impl Receiver {
/// Create a new `Receiver`, with its own Redis connections (but, as yet, no
/// active subscriptions).
pub fn try_from(redis_cfg: config::RedisConfig) -> Result<Self> {
pub fn try_from(
redis_cfg: config::RedisConfig,
sender: watch::Sender<(Timeline, Event)>,
) -> Result<Self> {
let redis_poll_interval = *redis_cfg.polling_interval;
let redis_connection = RedisConn::new(redis_cfg)?;
@ -52,6 +57,7 @@ impl Receiver {
redis_connection,
msg_queues: MessageQueues(HashMap::new()),
clients_per_timeline: HashMap::new(),
channel: sender,
})
}
@ -105,6 +111,18 @@ impl Receiver {
Ok(())
}
pub fn poll_broadcast(&mut self) {
loop {
match self.redis_connection.poll_redis() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some((timeline, event)))) => {
self.channel.broadcast((timeline, event)).expect("TODO");
}
Ok(Async::Ready(None)) => (), // subscription cmd or msg for other namespace
Err(_err) => panic!("TODO"),
}
}
}
/// Returns the oldest message in the `ClientAgent`'s queue (if any).
///
@ -113,8 +131,8 @@ impl Receiver {
/// message already in a queue. Thus, we only poll Redis if it has not
/// been polled lately.
pub fn poll_for(&mut self, id: Uuid) -> Poll<Option<Event>, ReceiverErr> {
// let (t1, mut polled_redis) = (Instant::now(), false);
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
log::info!("Polling Redis");
loop {
match self.redis_connection.poll_redis() {
Ok(Async::NotReady) => break,
@ -130,7 +148,7 @@ impl Receiver {
Err(err) => Err(err)?,
}
}
// polled_redis = true;
self.redis_polled_at = Instant::now();
}
@ -140,9 +158,7 @@ impl Receiver {
Some(event) => Ok(Async::Ready(Some(event))),
None => Ok(Async::NotReady),
};
// if !polled_redis {
// log::info!("poll_for in {:?}", t1.elapsed());
// }
res
}