Improve pubsub implementation [WIP]

This commit is contained in:
Daniel Sockwell 2020-04-08 18:13:49 -04:00
parent 92c0e5dd10
commit e1be0db974
6 changed files with 301 additions and 161 deletions

View File

@ -43,8 +43,8 @@ stub_status = []
production = []
[profile.release]
lto = "fat"
panic = "abort"
codegen-units = 1
#lto = "fat"
#panic = "abort"
#codegen-units = 1

View File

@ -5,7 +5,10 @@ use flodgatt::{
redis_to_client_stream::{EventStream, Receiver},
};
use std::{env, fs, net::SocketAddr, os::unix::fs::PermissionsExt};
use tokio::{net::UnixListener, sync::watch};
use tokio::{
net::UnixListener,
sync::{mpsc, watch},
};
use warp::{http::StatusCode, path, ws::Ws2, Filter, Rejection};
fn main() {
@ -24,8 +27,10 @@ fn main() {
let cfg = DeploymentConfig::from_env(env_vars);
let pg_pool = PgPool::new(postgres_cfg);
let (tx, rx) = watch::channel((Timeline::empty(), Event::EventNotReady));
let receiver = Receiver::try_from(redis_cfg, tx)
let (event_tx, event_rx) = watch::channel((Timeline::empty(), Event::Ping));
let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel();
let poll_freq = *redis_cfg.polling_interval;
let receiver = Receiver::try_from(redis_cfg, event_tx, subscribe_rx)
.unwrap_or_else(|e| {
log::error!("{}\nFlodgatt shutting down...", e);
std::process::exit(1);
@ -35,7 +40,7 @@ fn main() {
// Server Sent Events
let sse_receiver = receiver.clone();
let sse_rx = rx.clone();
let sse_rx = event_rx.clone();
let whitelist_mode = *cfg.whitelist_mode;
let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode)
.and(warp::sse())
@ -75,8 +80,8 @@ fn main() {
log::error!("Could not subscribe to the Redis channel: {}", e)
});
}
let ws_rx = rx.clone();
let ws_subscribe_tx = subscribe_tx.clone();
let ws_rx = event_rx.clone();
let token = subscription
.clone()
.access_token
@ -85,7 +90,9 @@ fn main() {
// send the updates through the WS connection
// (along with the User's access_token which is sent for security)
(
ws.on_upgrade(move |s| EventStream::send_to_ws(s, subscription, ws_rx)),
ws.on_upgrade(move |s| {
EventStream::send_to_ws(s, subscription, ws_rx, ws_subscribe_tx)
}),
token,
)
})
@ -98,14 +105,12 @@ fn main() {
#[cfg(feature = "stub_status")]
let status_endpoints = {
let (r1, r2, r3) = (receiver.clone(), receiver.clone(), receiver.clone());
let (r1, r3) = (receiver.clone(), receiver.clone());
warp::path!("api" / "v1" / "streaming" / "health")
.map(|| "OK")
.or(warp::path!("api" / "v1" / "streaming" / "status")
.and(warp::path::end())
.map(move || r1.lock().expect("TODO").count_connections()))
.or(warp::path!("api" / "v1" / "streaming" / "status" / "queue")
.map(move || r2.lock().expect("TODO").queue_length()))
.or(
warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline")
.map(move || r3.lock().expect("TODO").list_connections()),
@ -114,11 +119,12 @@ fn main() {
#[cfg(not(feature = "stub_status"))]
let status_endpoints = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK");
let receiver = receiver.clone();
std::thread::spawn(move || loop {
std::thread::sleep(std::time::Duration::from_millis(1000));
receiver.lock().unwrap().poll_broadcast();
});
// let receiver_old = receiver.clone();
// // TODO make vvvv a green thread
// std::thread::spawn(move || loop {
// std::thread::sleep(std::time::Duration::from_millis(10000));
// receiver_old.lock().unwrap().poll_broadcast();
// });
if let Some(socket) = &*cfg.unix_socket {
log::info!("Using Unix socket {}", socket);
@ -146,7 +152,25 @@ fn main() {
)
.run_incoming(incoming);
} else {
use futures::{future::lazy, stream::Stream as _Stream};
use std::time::Instant;
let server_addr = SocketAddr::new(*cfg.address, *cfg.port);
warp::serve(ws_routes.or(sse_routes).with(cors).or(status_endpoints)).run(server_addr);
let receiver = receiver.clone();
tokio::run(lazy(move || {
let receiver = receiver.clone();
tokio::spawn(lazy(move || {
tokio::timer::Interval::new(Instant::now(), poll_freq)
.map_err(|e| log::error!("{}", e))
.for_each(move |_| {
let receiver = receiver.clone();
receiver.lock().expect("TODO").poll_broadcast();
Ok(())
})
}));
warp::serve(ws_routes.or(sse_routes).with(cors).or(status_endpoints)).bind(server_addr)
}));
};
}

View File

@ -21,7 +21,8 @@ pub struct Status {
uri: String,
created_at: String,
account: Account,
content: String,
// TODO remove pub
pub content: String,
visibility: Visibility,
sensitive: bool,
spoiler_text: String,
@ -40,6 +41,7 @@ pub struct Status {
poll: Option<Poll>,
card: Option<Card>,
language: Option<String>,
text: Option<String>,
// ↓↓↓ Only for authorized users
favourited: Option<bool>,

View File

@ -11,7 +11,7 @@ use std::string::String;
pub enum Event {
TypeSafe(CheckedEvent),
Dynamic(DynamicEvent),
EventNotReady,
Ping,
}
impl Event {
@ -38,7 +38,7 @@ impl Event {
CheckedEvent::FiltersChanged => "filters_changed",
},
Self::Dynamic(dyn_event) => &dyn_event.event,
Self::EventNotReady => panic!("event_name() called on EventNotReady"),
Self::Ping => panic!("event_name() called on EventNotReady"),
})
}
@ -56,7 +56,7 @@ impl Event {
FiltersChanged => None,
},
Self::Dynamic(dyn_event) => Some(dyn_event.payload.to_string()),
Self::EventNotReady => panic!("payload() called on EventNotReady"),
Self::Ping => panic!("payload() called on EventNotReady"),
}
}
}

View File

@ -4,7 +4,7 @@ use crate::parse_client_request::{Subscription, Timeline};
use futures::{future::Future, stream::Stream};
use log;
use std::time::Duration;
use tokio::sync::watch;
use tokio::sync::{mpsc, watch};
use warp::{
reply::Reply,
sse::Sse,
@ -17,20 +17,22 @@ impl EventStream {
pub fn send_to_ws(
ws: WebSocket,
subscription: Subscription,
ws_rx: watch::Receiver<(Timeline, Event)>,
event_rx: watch::Receiver<(Timeline, Event)>,
mut subscribe_tx: mpsc::UnboundedSender<Timeline>,
) -> impl Future<Item = (), Error = ()> {
let (transmit_to_ws, _receive_from_ws) = ws.split();
let target_timeline = subscription.timeline;
let allowed_langs = subscription.allowed_langs;
let user_langs = subscription.allowed_langs;
let blocks = subscription.blocks;
// Create a pipe
let (tx, rx) = futures::sync::mpsc::unbounded();
let (ws_tx, ws_rx) = futures::sync::mpsc::unbounded();
// Send one end of it to a different green thread and tell that end to forward
// whatever it gets on to the WebSocket client
warp::spawn(
rx.map_err(|()| -> warp::Error { unreachable!() })
ws_rx
.map_err(|()| -> warp::Error { unreachable!() })
.forward(transmit_to_ws)
.map(|_r| ())
.map_err(|e| match e.to_string().as_ref() {
@ -39,43 +41,164 @@ impl EventStream {
}),
);
return ws_rx
.for_each(move |(timeline, event)| {
if target_timeline == timeline {
log::info!("Got event for {:?}", timeline);
use crate::messages::{CheckedEvent::Update, Event::*};
use crate::parse_client_request::Stream::Public;
match event {
TypeSafe(Update { payload, queued_at }) => match timeline {
Timeline(Public, _, _) if payload.language_not(&allowed_langs) => (),
_ if payload.involves_any(&blocks) => (),
// TODO filter vvvv
_ => tx
.unbounded_send(Message::text(
TypeSafe(Update { payload, queued_at }).to_json_string(),
))
.expect("TODO"),
},
TypeSafe(non_update) => tx
.unbounded_send(Message::text(TypeSafe(non_update).to_json_string()))
.expect("TODO"),
Dynamic(event) if event.event == "update" => match timeline {
Timeline(Public, _, _) if event.language_not(&allowed_langs) => (),
_ if event.involves_any(&blocks) => (),
// TODO filter vvvv
_ => tx
.unbounded_send(Message::text(Dynamic(event).to_json_string()))
.expect("TODO"),
},
Dynamic(non_update) => tx
.unbounded_send(Message::text(Dynamic(non_update).to_json_string()))
.expect("TODO"),
EventNotReady => panic!("TODO"),
}
event_rx.map_err(|_| ()).for_each(move |(tl, event)| {
if target_timeline == tl {
use crate::messages::{CheckedEvent::Update, Event::*};
use crate::parse_client_request::Stream::Public;
log::info!("Got event for {:?}", tl);
if let Event::TypeSafe(Update { payload, .. }) = event.clone() {
log::info!("{:?}", &payload.content);
}
Ok(())
})
.map_err(|_| ());
match event {
Ping => match ws_tx.unbounded_send(Message::text("{}")) {
Ok(_) => Ok(()),
Err(_) => {
subscribe_tx.try_send(tl).expect("TODO");
Err(())
}
},
TypeSafe(Update { payload, queued_at }) => match tl {
Timeline(Public, _, _) if payload.language_not(&user_langs) => Ok(()),
_ if payload.involves_any(&blocks) => Ok(()),
_ => match ws_tx.unbounded_send(Message::text(
TypeSafe(Update { payload, queued_at }).to_json_string(),
)) {
Ok(_) => Ok(()),
Err(_) => {
subscribe_tx.try_send(tl).expect("TODO");
Err(())
}
},
},
TypeSafe(non_update) => match ws_tx
.unbounded_send(Message::text(TypeSafe(non_update).to_json_string()))
{
Ok(_) => Ok(()),
Err(_) => {
subscribe_tx.try_send(tl).expect("TODO");
Err(())
}
},
Dynamic(event) if event.event == "update" => match tl {
Timeline(Public, _, _) if event.language_not(&user_langs) => Ok(()),
_ if event.involves_any(&blocks) => Ok(()),
_ => match ws_tx
.unbounded_send(Message::text(Dynamic(event).to_json_string()))
{
Ok(_) => Ok(()),
Err(_) => {
subscribe_tx.try_send(tl).expect("TODO");
Err(())
}
},
},
Dynamic(non_update) => match ws_tx
.unbounded_send(Message::text(Dynamic(non_update).to_json_string()))
{
Ok(_) => Ok(()),
Err(_) => {
subscribe_tx.try_send(tl).expect("TODO");
Err(())
}
},
}
} else {
if let Event::Ping = event {
match ws_tx.unbounded_send(Message::text("{}")) {
Ok(_) => Ok(()),
Err(_) => {
subscribe_tx.try_send(target_timeline).expect("TODO");
Err(())
}
}
} else {
Ok(())
}
}
})
// event_rx
// .take_while(move |(tl, event)| {
// let (tl, event) = (*tl, event.clone());
// if target_timeline == tl {
// log::info!("Got event for {:?}", tl);
// use crate::messages::{CheckedEvent::Update, Event::*};
// use crate::parse_client_request::Stream::Public;
// match event {
// Ping => match ws_tx.unbounded_send(Message::text("{}")) {
// Ok(_) => futures::future::ok(true),
// Err(_) => {
// subscribe_tx.try_send(tl).expect("TODO");
// futures::future::ok(false)
// }
// },
// TypeSafe(Update { payload, queued_at }) => match tl {
// Timeline(Public, _, _) if payload.language_not(&user_langs) => {
// futures::future::ok(true)
// }
// _ if payload.involves_any(&blocks) => futures::future::ok(true),
// _ => match ws_tx.unbounded_send(Message::text(
// TypeSafe(Update { payload, queued_at }).to_json_string(),
// )) {
// Ok(_) => futures::future::ok(true),
// Err(_) => {
// subscribe_tx.try_send(tl).expect("TODO");
// futures::future::ok(false)
// }
// },
// },
// TypeSafe(non_update) => match ws_tx
// .unbounded_send(Message::text(TypeSafe(non_update).to_json_string()))
// {
// Ok(_) => futures::future::ok(true),
// Err(_) => {
// subscribe_tx.try_send(tl).expect("TODO");
// futures::future::ok(false)
// }
// },
// Dynamic(event) if event.event == "update" => match tl {
// Timeline(Public, _, _) if event.language_not(&user_langs) => {
// futures::future::ok(true)
// }
// _ if event.involves_any(&blocks) => futures::future::ok(true),
// _ => match ws_tx
// .unbounded_send(Message::text(Dynamic(event).to_json_string()))
// {
// Ok(_) => futures::future::ok(true),
// Err(_) => {
// subscribe_tx.try_send(tl).expect("TODO");
// futures::future::ok(false)
// }
// },
// },
// Dynamic(non_update) => match ws_tx
// .unbounded_send(Message::text(Dynamic(non_update).to_json_string()))
// {
// Ok(_) => futures::future::ok(true),
// Err(_) => {
// subscribe_tx.try_send(tl).expect("TODO");
// futures::future::ok(false)
// }
// },
// }
// } else {
// if let Event::Ping = event {
// match ws_tx.unbounded_send(Message::text("{}")) {
// Ok(_) => futures::future::ok(true),
// Err(_) => {
// subscribe_tx.try_send(target_timeline).expect("TODO");
// futures::future::ok(false)
// }
// }
// } else {
// futures::future::ok(true)
// }
// }
// })
// .for_each(|_| Ok(()))
// .map_err(|_| ())
// .map(|_| ())
}
pub fn send_to_sse(
@ -96,7 +219,6 @@ impl EventStream {
TypeSafe(Update { payload, queued_at }) => match timeline {
Timeline(Public, _, _) if payload.language_not(&allowed_langs) => None,
_ if payload.involves_any(&blocks) => None,
// TODO filter vvvv
_ => {
let event =
Event::TypeSafe(CheckedEvent::Update { payload, queued_at });
@ -116,22 +238,16 @@ impl EventStream {
Dynamic(event) if event.event == "update" => match timeline {
Timeline(Public, _, _) if event.language_not(&allowed_langs) => None,
_ if event.involves_any(&blocks) => None,
// TODO filter vvvv
_ => {
let event = Event::Dynamic(event);
Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
))
}
_ => Some((
warp::sse::event(event.event),
warp::sse::data(event.payload.to_string()),
)),
},
Dynamic(non_update) => {
let event = Event::Dynamic(non_update);
Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
))
}
Dynamic(non_update) => Some((
warp::sse::event(non_update.event),
warp::sse::data(non_update.payload.to_string()),
)),
// TODO: Fix for Ping
EventNotReady => panic!("TODO"),
}
} else {
@ -147,3 +263,49 @@ impl EventStream {
)
}
}
// if target_timeline == tl {
// log::info!("Got event for {:?}", tl);
// use crate::messages::{CheckedEvent::Update, Event::*};
// use crate::parse_client_request::Stream::Public;
// match event {
// TypeSafe(Update { payload, queued_at }) => match tl {
// Timeline(Public, _, _) if payload.language_not(&user_langs) => Ok(()),
// _ if payload.involves_any(&blocks) => Ok(()),
// _ => Ok(ws_tx
// .unbounded_send(Message::text(
// TypeSafe(Update { payload, queued_at }).to_json_string(),
// ))
// .unwrap_or_else(|_| subscribe_tx.try_send(tl).expect("TODO"))),
// },
// TypeSafe(non_update) => Ok(ws_tx
// .unbounded_send(Message::text(TypeSafe(non_update).to_json_string()))
// .unwrap_or_else(|_| subscribe_tx.try_send(tl).expect("TODO"))),
// Dynamic(event) if event.event == "update" => match tl {
// Timeline(Public, _, _) if event.language_not(&user_langs) => Ok(()),
// _ if event.involves_any(&blocks) => Ok(()),
// _ => Ok(ws_tx
// .unbounded_send(Message::text(Dynamic(event).to_json_string()))
// .unwrap_or_else(|_| subscribe_tx.try_send(tl).expect("TODO"))),
// },
// Dynamic(non_update) => Ok(ws_tx
// .unbounded_send(Message::text(Dynamic(non_update).to_json_string()))
// .unwrap_or_else(|_| subscribe_tx.try_send(tl).expect("TODO"))),
// Ping => Ok(match ws_tx.unbounded_send(Message::text("{}")) {
// Ok(_) => (),
// Err(_) => {
// subscribe_tx.try_send(tl).expect("TODO");
// }
// }),
// }
// } else {
// if let Event::Ping = event {
// Ok(ws_tx
// .unbounded_send(Message::text("{}"))
// .unwrap_or_else(|_| {
// subscribe_tx.try_send(target_timeline).expect("TODO")
// }))
// } else {
// Ok(())
// }
// }

View File

@ -2,10 +2,7 @@
//! polled by the correct `ClientAgent`. Also manages sububscriptions and
//! unsubscriptions to/from Redis.
mod err;
mod message_queues;
pub use err::ReceiverErr;
pub use message_queues::{MessageQueues, MsgQueue};
use super::redis::{redis_connection::RedisCmd, RedisConn};
@ -15,12 +12,9 @@ use crate::{
parse_client_request::{Stream, Subscription, Timeline},
};
use {
futures::{Async, Poll},
hashbrown::HashMap,
tokio::sync::watch,
uuid::Uuid,
};
use futures::{Async, Stream as _Stream};
use hashbrown::HashMap;
use tokio::sync::{mpsc, watch};
use std::{
result,
@ -34,11 +28,11 @@ type Result<T> = result::Result<T, ReceiverErr>;
#[derive(Debug)]
pub struct Receiver {
redis_connection: RedisConn,
redis_poll_interval: Duration,
redis_polled_at: Instant,
pub msg_queues: MessageQueues,
clients_per_timeline: HashMap<Timeline, i32>,
channel: watch::Sender<(Timeline, Event)>,
tx: watch::Sender<(Timeline, Event)>,
rx: mpsc::UnboundedReceiver<Timeline>,
ping_time: Instant,
time: Instant,
}
impl Receiver {
@ -46,18 +40,16 @@ impl Receiver {
/// active subscriptions).
pub fn try_from(
redis_cfg: config::RedisConfig,
sender: watch::Sender<(Timeline, Event)>,
tx: watch::Sender<(Timeline, Event)>,
rx: mpsc::UnboundedReceiver<Timeline>,
) -> Result<Self> {
let redis_poll_interval = *redis_cfg.polling_interval;
let redis_connection = RedisConn::new(redis_cfg)?;
Ok(Self {
redis_polled_at: Instant::now(),
redis_poll_interval,
redis_connection,
msg_queues: MessageQueues(HashMap::new()),
redis_connection: RedisConn::new(redis_cfg)?,
clients_per_timeline: HashMap::new(),
channel: sender,
tx,
rx,
ping_time: Instant::now(),
time: Instant::now(),
})
}
@ -65,15 +57,12 @@ impl Receiver {
Arc::new(Mutex::new(self))
}
/// Assigns the `Receiver` a new timeline to monitor and runs other
/// first-time setup.
pub fn add_subscription(&mut self, subscription: &Subscription) -> Result<()> {
let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline);
if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (tag, tl) {
self.redis_connection.update_cache(hashtag, id);
};
self.msg_queues.insert(subscription.id, MsgQueue::new(tl));
let number_of_subscriptions = self
.clients_per_timeline
@ -85,13 +74,11 @@ impl Receiver {
if *number_of_subscriptions == 1 {
self.redis_connection.send_cmd(Subscribe, &tl)?
};
log::info!("Started stream for {:?}", tl);
Ok(())
}
pub fn remove_subscription(&mut self, subscription: &Subscription) -> Result<()> {
let tl = subscription.timeline;
self.msg_queues.remove(&subscription.id);
pub fn remove_subscription(&mut self, tl: Timeline) -> Result<()> {
let number_of_subscriptions = self
.clients_per_timeline
.entry(tl)
@ -108,58 +95,33 @@ impl Receiver {
self.redis_connection.send_cmd(Unsubscribe, &tl)?;
self.clients_per_timeline.remove_entry(&tl);
};
log::info!("Ended stream for {:?}", tl);
Ok(())
}
pub fn poll_broadcast(&mut self) {
loop {
log::info!("{:?}", self.time.elapsed());
while let Ok(Async::Ready(Some(tl))) = self.rx.poll() {
self.remove_subscription(tl).expect("TODO");
}
if self.ping_time.elapsed() > Duration::from_secs(30) {
self.ping_time = Instant::now();
self.tx
.broadcast((Timeline::empty(), Event::Ping))
.expect("TODO");
} else {
match self.redis_connection.poll_redis() {
Ok(Async::NotReady) => break,
Ok(Async::NotReady) => (),
Ok(Async::Ready(Some((timeline, event)))) => {
self.channel.broadcast((timeline, event)).expect("TODO");
self.tx.broadcast((timeline, event)).expect("TODO");
}
Ok(Async::Ready(None)) => (), // subscription cmd or msg for other namespace
Err(_err) => panic!("TODO"),
}
}
}
/// Returns the oldest message in the `ClientAgent`'s queue (if any).
///
/// Note: This method does **not** poll Redis every time, because polling
/// Redis is significantly more time consuming that simply returning the
/// message already in a queue. Thus, we only poll Redis if it has not
/// been polled lately.
pub fn poll_for(&mut self, id: Uuid) -> Poll<Option<Event>, ReceiverErr> {
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
log::info!("Polling Redis");
loop {
match self.redis_connection.poll_redis() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some((timeline, event)))) => {
self.msg_queues
.values_mut()
.filter(|msg_queue| msg_queue.timeline == timeline)
.for_each(|msg_queue| {
msg_queue.messages.push_back(event.clone());
});
}
Ok(Async::Ready(None)) => (), // subscription cmd or msg for other namespace
Err(err) => Err(err)?,
}
}
self.redis_polled_at = Instant::now();
}
// If the `msg_queue` being polled has any new messages, return the first (oldest) one
let msg_q = self.msg_queues.get_mut(&id).ok_or(ReceiverErr::InvalidId)?;
let res = match msg_q.messages.pop_front() {
Some(event) => Ok(Async::Ready(Some(event))),
None => Ok(Async::NotReady),
};
res
self.time = Instant::now();
}
pub fn count_connections(&self) -> String {
@ -182,14 +144,4 @@ impl Receiver {
})
.collect()
}
pub fn queue_length(&self) -> String {
format!(
"Longest MessageQueue: {}",
self.msg_queues
.0
.values()
.fold(0, |acc, el| acc.max(el.messages.len()))
)
}
}