Initial [WIP] implementation

This initial implementation works to send messages but does not yet
handle unsubscribing properly.
This commit is contained in:
Daniel Sockwell 2020-04-23 14:59:39 -04:00
parent 91186fb9f7
commit 96f2d6ca36
4 changed files with 80 additions and 31 deletions

View File

@ -22,6 +22,7 @@ fn main() -> Result<(), Error> {
// Create channels to communicate between threads // Create channels to communicate between threads
let (event_tx, event_rx) = watch::channel((Timeline::empty(), Event::Ping)); let (event_tx, event_rx) = watch::channel((Timeline::empty(), Event::Ping));
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let request = Handler::new(&postgres_cfg, *cfg.whitelist_mode)?; let request = Handler::new(&postgres_cfg, *cfg.whitelist_mode)?;
@ -36,7 +37,8 @@ fn main() -> Result<(), Error> {
.map(move |subscription: Subscription, sse: warp::sse::Sse| { .map(move |subscription: Subscription, sse: warp::sse::Sse| {
log::info!("Incoming SSE request for {:?}", subscription.timeline); log::info!("Incoming SSE request for {:?}", subscription.timeline);
let mut manager = sse_manager.lock().unwrap_or_else(RedisManager::recover); let mut manager = sse_manager.lock().unwrap_or_else(RedisManager::recover);
manager.subscribe(&subscription); let (event_tx_2, _event_rx_2) = mpsc::unbounded_channel();
manager.subscribe(&subscription, event_tx_2);
SseStream::send_events(sse, sse_cmd_tx.clone(), subscription, sse_rx.clone()) SseStream::send_events(sse, sse_cmd_tx.clone(), subscription, sse_rx.clone())
}) })
@ -50,11 +52,15 @@ fn main() -> Result<(), Error> {
.map(move |subscription: Subscription, ws: Ws2| { .map(move |subscription: Subscription, ws: Ws2| {
log::info!("Incoming websocket request for {:?}", subscription.timeline); log::info!("Incoming websocket request for {:?}", subscription.timeline);
let mut manager = ws_manager.lock().unwrap_or_else(RedisManager::recover); let mut manager = ws_manager.lock().unwrap_or_else(RedisManager::recover);
manager.subscribe(&subscription); let (event_tx_2, event_rx_2) = mpsc::unbounded_channel();
manager.subscribe(&subscription, event_tx_2);
let token = subscription.access_token.clone().unwrap_or_default(); // token sent for security let token = subscription.access_token.clone().unwrap_or_default(); // token sent for security
let ws_stream = WsStream::new(cmd_tx.clone(), event_rx.clone(), subscription); let ws_stream = WsStream::new(cmd_tx.clone(), subscription);
(ws.on_upgrade(move |ws| ws_stream.send_to(ws)), token) (
ws.on_upgrade(move |ws| ws_stream.send_to(ws, event_rx_2)),
token,
)
}) })
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token)); .map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));

View File

@ -25,6 +25,7 @@ pub struct Manager {
redis_connection: RedisConn, redis_connection: RedisConn,
clients_per_timeline: HashMap<Timeline, i32>, clients_per_timeline: HashMap<Timeline, i32>,
tx: watch::Sender<(Timeline, Event)>, tx: watch::Sender<(Timeline, Event)>,
timelines: HashMap<Timeline, Vec<mpsc::UnboundedSender<Event>>>,
rx: mpsc::UnboundedReceiver<Timeline>, rx: mpsc::UnboundedReceiver<Timeline>,
ping_time: Instant, ping_time: Instant,
} }
@ -40,6 +41,7 @@ impl Manager {
Ok(Self { Ok(Self {
redis_connection: RedisConn::new(redis_cfg)?, redis_connection: RedisConn::new(redis_cfg)?,
clients_per_timeline: HashMap::new(), clients_per_timeline: HashMap::new(),
timelines: HashMap::new(),
tx, tx,
rx, rx,
ping_time: Instant::now(), ping_time: Instant::now(),
@ -50,12 +52,21 @@ impl Manager {
Arc::new(Mutex::new(self)) Arc::new(Mutex::new(self))
} }
pub fn subscribe(&mut self, subscription: &Subscription) { pub fn subscribe(
&mut self,
subscription: &Subscription,
channel: mpsc::UnboundedSender<Event>,
) {
let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline);
if let (Some(hashtag), Some(id)) = (tag, tl.tag()) { if let (Some(hashtag), Some(id)) = (tag, tl.tag()) {
self.redis_connection.update_cache(hashtag, id); self.redis_connection.update_cache(hashtag, id);
}; };
self.timelines
.entry(tl)
.and_modify(|vec| vec.push(channel.clone()))
.or_insert_with(|| vec![channel]);
let number_of_subscriptions = self let number_of_subscriptions = self
.clients_per_timeline .clients_per_timeline
.entry(tl) .entry(tl)
@ -70,7 +81,16 @@ impl Manager {
}; };
} }
pub(crate) fn unsubscribe(&mut self, tl: Timeline) -> Result<()> { pub(crate) fn unsubscribe(
&mut self,
tl: Timeline,
_target_channel: mpsc::UnboundedSender<Event>,
) -> Result<()> {
let channels = self.timelines.get(&tl).expect("TODO");
for (_i, _channel) in channels.iter().enumerate() {
// TODO - find alternate implementation
}
let number_of_subscriptions = self let number_of_subscriptions = self
.clients_per_timeline .clients_per_timeline
.entry(tl) .entry(tl)
@ -92,22 +112,40 @@ impl Manager {
} }
pub fn poll_broadcast(&mut self) -> Result<()> { pub fn poll_broadcast(&mut self) -> Result<()> {
while let Ok(Async::Ready(Some(tl))) = self.rx.poll() { // while let Ok(Async::Ready(Some(tl))) = self.rx.poll() {
self.unsubscribe(tl)? // self.unsubscribe(tl)?
} // }
let mut completed_timelines = Vec::new();
if self.ping_time.elapsed() > Duration::from_secs(30) { if self.ping_time.elapsed() > Duration::from_secs(30) {
self.ping_time = Instant::now(); self.ping_time = Instant::now();
self.tx.broadcast((Timeline::empty(), Event::Ping))? for (timeline, channels) in self.timelines.iter_mut() {
} else { for channel in channels.iter_mut() {
match self.redis_connection.poll_redis() { match channel.try_send(Event::Ping) {
Ok(Async::NotReady) | Ok(Async::Ready(None)) => (), // None = cmd or msg for other namespace Ok(_) => (),
Ok(Async::Ready(Some((timeline, event)))) => { Err(_) => completed_timelines.push((*timeline, channel.clone())),
self.tx.broadcast((timeline, event))? }
} }
}
};
loop {
match self.redis_connection.poll_redis() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some((timeline, event)))) => {
for channel in self.timelines.get_mut(&timeline).ok_or(Error::InvalidId)? {
match channel.try_send(event.clone()) {
Ok(_) => (),
Err(_) => completed_timelines.push((timeline, channel.clone())),
}
}
}
Ok(Async::Ready(None)) => (), // None = cmd or msg for other namespace
Err(err) => log::error!("{}", err), // drop msg, log err, and proceed Err(err) => log::error!("{}", err), // drop msg, log err, and proceed
} }
} }
for (tl, channel) in completed_timelines {
self.unsubscribe(tl, channel)?;
}
Ok(()) Ok(())
} }

View File

@ -11,6 +11,7 @@ pub enum Error {
RedisParseErr(RedisParseErr), RedisParseErr(RedisParseErr),
RedisConnErr(RedisConnErr), RedisConnErr(RedisConnErr),
ChannelSendErr(tokio::sync::watch::error::SendError<(Timeline, Event)>), ChannelSendErr(tokio::sync::watch::error::SendError<(Timeline, Event)>),
ChannelSendErr2(tokio::sync::mpsc::error::UnboundedTrySendError<Event>),
} }
impl std::error::Error for Error {} impl std::error::Error for Error {}
@ -28,6 +29,7 @@ impl fmt::Display for Error {
RedisConnErr(inner) => write!(f, "{}", inner), RedisConnErr(inner) => write!(f, "{}", inner),
TimelineErr(inner) => write!(f, "{}", inner), TimelineErr(inner) => write!(f, "{}", inner),
ChannelSendErr(inner) => write!(f, "{}", inner), ChannelSendErr(inner) => write!(f, "{}", inner),
ChannelSendErr2(inner) => write!(f, "{}", inner),
}?; }?;
Ok(()) Ok(())
} }
@ -38,6 +40,11 @@ impl From<tokio::sync::watch::error::SendError<(Timeline, Event)>> for Error {
Self::ChannelSendErr(error) Self::ChannelSendErr(error)
} }
} }
impl From<tokio::sync::mpsc::error::UnboundedTrySendError<Event>> for Error {
fn from(error: tokio::sync::mpsc::error::UnboundedTrySendError<Event>) -> Self {
Self::ChannelSendErr2(error)
}
}
impl From<EventErr> for Error { impl From<EventErr> for Error {
fn from(error: EventErr) -> Self { fn from(error: EventErr) -> Self {

View File

@ -2,7 +2,7 @@ use super::{Event, Payload};
use crate::request::{Subscription, Timeline}; use crate::request::{Subscription, Timeline};
use futures::{future::Future, stream::Stream}; use futures::{future::Future, stream::Stream};
use tokio::sync::{mpsc, watch}; use tokio::sync::mpsc;
use warp::ws::{Message, WebSocket}; use warp::ws::{Message, WebSocket};
type Result<T> = std::result::Result<T, ()>; type Result<T> = std::result::Result<T, ()>;
@ -10,25 +10,27 @@ type Result<T> = std::result::Result<T, ()>;
pub struct Ws { pub struct Ws {
unsubscribe_tx: mpsc::UnboundedSender<Timeline>, unsubscribe_tx: mpsc::UnboundedSender<Timeline>,
subscription: Subscription, subscription: Subscription,
ws_rx: watch::Receiver<(Timeline, Event)>,
ws_tx: Option<mpsc::UnboundedSender<Message>>, ws_tx: Option<mpsc::UnboundedSender<Message>>,
} }
impl Ws { impl Ws {
pub fn new( pub fn new(
unsubscribe_tx: mpsc::UnboundedSender<Timeline>, unsubscribe_tx: mpsc::UnboundedSender<Timeline>,
ws_rx: watch::Receiver<(Timeline, Event)>,
subscription: Subscription, subscription: Subscription,
) -> Self { ) -> Self {
Self { Self {
unsubscribe_tx, unsubscribe_tx,
subscription, subscription,
ws_rx,
ws_tx: None, ws_tx: None,
} }
} }
pub fn send_to(mut self, ws: WebSocket) -> impl Future<Item = (), Error = ()> { pub fn send_to(
mut self,
ws: WebSocket,
incoming_events: mpsc::UnboundedReceiver<Event>,
) -> impl Future<Item = (), Error = ()> {
let (transmit_to_ws, _receive_from_ws) = ws.split(); let (transmit_to_ws, _receive_from_ws) = ws.split();
// Create a pipe // Create a pipe
let (ws_tx, ws_rx) = mpsc::unbounded_channel(); let (ws_tx, ws_rx) = mpsc::unbounded_channel();
@ -49,29 +51,25 @@ impl Ws {
}), }),
); );
let target_timeline = self.subscription.timeline; incoming_events.map_err(|_| ()).for_each(move |event| {
let incoming_events = self.ws_rx.clone().map_err(|_| ());
incoming_events.for_each(move |(tl, event)| {
//TODO log::info!("{:?}, {:?}", &tl, &event);
if matches!(event, Event::Ping) { if matches!(event, Event::Ping) {
self.send_msg(&event)? self.send_msg(&event)?
} else if target_timeline == tl { } else {
match (event.update_payload(), event.dyn_update_payload()) { match (event.update_payload(), event.dyn_update_payload()) {
(Some(update), _) => self.send_or_filter(tl, &event, update)?, (Some(update), _) => self.send_or_filter(&event, update)?,
(None, None) => self.send_msg(&event)?, // send all non-updates (None, None) => self.send_msg(&event)?, // send all non-updates
(_, Some(dyn_update)) => self.send_or_filter(tl, &event, dyn_update)?, (_, Some(dyn_update)) => self.send_or_filter(&event, dyn_update)?,
} }
} }
Ok(()) Ok(())
}) })
} }
fn send_or_filter(&mut self, tl: Timeline, event: &Event, update: &impl Payload) -> Result<()> { fn send_or_filter(&mut self, event: &Event, update: &impl Payload) -> Result<()> {
let (blocks, allowed_langs) = (&self.subscription.blocks, &self.subscription.allowed_langs); let (blocks, allowed_langs) = (&self.subscription.blocks, &self.subscription.allowed_langs);
const SKIP: Result<()> = Ok(()); const SKIP: Result<()> = Ok(());
match tl { match self.subscription.timeline {
tl if tl.is_public() tl if tl.is_public()
&& !update.language_unset() && !update.language_unset()
&& !allowed_langs.is_empty() && !allowed_langs.is_empty()