Finish watch_channel implementation

This commit is contained in:
Daniel Sockwell 2020-04-09 12:47:41 -04:00
parent f7780f55e5
commit 592a7fc9d2
13 changed files with 165 additions and 506 deletions

2
Cargo.lock generated
View File

@ -453,7 +453,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "flodgatt"
version = "0.7.1"
version = "0.8.0"
dependencies = [
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,7 +1,7 @@
[package]
name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.7.1"
version = "0.8.0"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018"

View File

@ -34,6 +34,9 @@
//! most important settings for performance control the frequency with which the `ClientAgent`
//! polls the `Receiver` and the frequency with which the `Receiver` polls Redis.
//!
#![allow(clippy::try_err, clippy::match_bool)]
pub mod config;
pub mod err;
pub mod messages;

View File

@ -2,7 +2,7 @@ use flodgatt::{
config::{DeploymentConfig, EnvVar, PostgresConfig, RedisConfig},
messages::Event,
parse_client_request::{PgPool, Subscription, Timeline},
redis_to_client_stream::{EventStream, Receiver},
redis_to_client_stream::{Receiver, SseStream, WsStream},
};
use std::{env, fs, net::SocketAddr, os::unix::fs::PermissionsExt};
use tokio::{
@ -28,9 +28,9 @@ fn main() {
let pg_pool = PgPool::new(postgres_cfg);
let (event_tx, event_rx) = watch::channel((Timeline::empty(), Event::Ping));
let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel();
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let poll_freq = *redis_cfg.polling_interval;
let receiver = Receiver::try_from(redis_cfg, event_tx, subscribe_rx)
let receiver = Receiver::try_from(redis_cfg, event_tx, cmd_rx)
.unwrap_or_else(|e| {
log::error!("{}\nFlodgatt shutting down...", e);
std::process::exit(1);
@ -40,7 +40,7 @@ fn main() {
// Server Sent Events
let sse_receiver = receiver.clone();
let sse_rx = event_rx.clone();
let (sse_rx, sse_cmd_tx) = (event_rx.clone(), cmd_tx.clone());
let whitelist_mode = *cfg.whitelist_mode;
let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode)
.and(warp::sse())
@ -49,24 +49,26 @@ fn main() {
log::info!("Incoming SSE request for {:?}", subscription.timeline);
{
let mut receiver = sse_receiver.lock().expect("TODO");
receiver
.add_subscription(&subscription)
.unwrap_or_else(|e| {
log::error!("Could not subscribe to the Redis channel: {}", e)
});
receiver.subscribe(&subscription).unwrap_or_else(|e| {
log::error!("Could not subscribe to the Redis channel: {}", e)
});
}
let cmd_tx = sse_cmd_tx.clone();
let sse_rx = sse_rx.clone();
// self.sse.reply(
// warp::sse::keep_alive()
// .interval(Duration::from_secs(30))
// .text("thump".to_string())
// .stream(event_stream),
// )
// send the updates through the SSE connection
EventStream::send_to_sse(sse_connection_to_client, subscription, sse_rx)
SseStream::send_events(sse_connection_to_client, cmd_tx, subscription, sse_rx)
},
)
.with(warp::reply::with::header("Connection", "keep-alive"));
// WebSocket
let ws_receiver = receiver.clone();
let whitelist_mode = *cfg.whitelist_mode;
let ws_routes = Subscription::from_ws_request(pg_pool, whitelist_mode)
.and(warp::ws::ws2())
@ -74,25 +76,20 @@ fn main() {
log::info!("Incoming websocket request for {:?}", subscription.timeline);
{
let mut receiver = ws_receiver.lock().expect("TODO");
receiver
.add_subscription(&subscription)
.unwrap_or_else(|e| {
log::error!("Could not subscribe to the Redis channel: {}", e)
});
receiver.subscribe(&subscription).unwrap_or_else(|e| {
log::error!("Could not subscribe to the Redis channel: {}", e)
});
}
let ws_subscribe_tx = subscribe_tx.clone();
let cmd_tx = cmd_tx.clone();
let ws_rx = event_rx.clone();
let token = subscription
.clone()
.access_token
.unwrap_or_else(String::new);
// send the updates through the WS connection
// (along with the User's access_token which is sent for security)
// send the updates through the WS connection (along with the access_token, for security)
(
ws.on_upgrade(move |s| {
EventStream::send_to_ws(s, subscription, ws_rx, ws_subscribe_tx)
}),
ws.on_upgrade(move |ws| WsStream::new(ws, cmd_tx, subscription).send_events(ws_rx)),
token,
)
})
@ -119,13 +116,6 @@ fn main() {
#[cfg(not(feature = "stub_status"))]
let status_endpoints = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK");
// let receiver_old = receiver.clone();
// // TODO make vvvv a green thread
// std::thread::spawn(move || loop {
// std::thread::sleep(std::time::Duration::from_millis(10000));
// receiver_old.lock().unwrap().poll_broadcast();
// });
if let Some(socket) = &*cfg.unix_socket {
log::info!("Using Unix socket {}", socket);
fs::remove_file(socket).unwrap_or_default();
@ -157,10 +147,9 @@ fn main() {
let server_addr = SocketAddr::new(*cfg.address, *cfg.port);
let receiver = receiver.clone();
tokio::run(lazy(move || {
let receiver = receiver.clone();
tokio::spawn(lazy(move || {
warp::spawn(lazy(move || {
tokio::timer::Interval::new(Instant::now(), poll_freq)
.map_err(|e| log::error!("{}", e))
.for_each(move |_| {

View File

@ -21,8 +21,7 @@ pub struct Status {
uri: String,
created_at: String,
account: Account,
// TODO remove pub
pub content: String,
content: String,
visibility: Visibility,
sensitive: bool,
spoiler_text: String,
@ -87,16 +86,14 @@ impl Status {
pub fn involves_any(&self, blocks: &Blocks) -> bool {
const ALLOW: bool = false;
const REJECT: bool = true;
let Blocks {
blocked_users,
blocking_users,
blocked_domains,
} = blocks;
let user_id = &self.account.id.parse().expect("TODO");
if !self.calculate_involved_users().is_disjoint(blocked_users) {
REJECT
} else if blocking_users.contains(&self.account.id.parse().expect("TODO")) {
if blocking_users.contains(user_id) || self.involves(blocked_users) {
REJECT
} else {
let full_username = &self.account.acct;
@ -107,7 +104,7 @@ impl Status {
}
}
fn calculate_involved_users(&self) -> HashSet<i64> {
fn involves(&self, blocked_users: &HashSet<i64>) -> bool {
// TODO replace vvvv with error handling
let err = |_| log_fatal!("Could not process an `id` field in {:?}", &self);
@ -128,6 +125,6 @@ impl Status {
if let Some(boosted_status) = self.reblog.clone() {
involved_users.insert(boosted_status.account.id.parse().unwrap_or_else(err));
}
involved_users
!involved_users.is_disjoint(blocked_users)
}
}

View File

@ -23,7 +23,7 @@ impl DynamicEvent {
match self.payload["language"].as_str() {
Some(toot_language) if allowed_langs.contains(toot_language) => ALLOW,
None => ALLOW, // If toot language is unknown, toot is always allowed
Some(empty) if empty == &String::new() => ALLOW,
Some(empty) if empty == String::new() => ALLOW,
Some(_toot_language) => REJECT,
}
}
@ -45,12 +45,10 @@ impl DynamicEvent {
blocked_domains,
} = blocks;
let user_id = self.payload["account"]["id"].as_str().expect("TODO");
let id = self.payload["account"]["id"].as_str().expect("TODO");
let username = self.payload["account"]["acct"].as_str().expect("TODO");
if !self.calculate_involved_users().is_disjoint(blocked_users) {
REJECT
} else if blocking_users.contains(&user_id.parse().expect("TODO")) {
if self.involves(blocked_users) || blocking_users.contains(&id.parse().expect("TODO")) {
REJECT
} else {
let full_username = &username;
@ -60,9 +58,11 @@ impl DynamicEvent {
}
}
}
fn calculate_involved_users(&self) -> HashSet<i64> {
// involved_users = mentioned_users + author + replied-to user + boosted user
fn involves(&self, blocked_users: &HashSet<i64>) -> bool {
// mentions
let mentions = self.payload["mentions"].as_array().expect("TODO");
// involved_users = mentioned_users + author + replied-to user + boosted user
let mut involved_users: HashSet<i64> = mentions
.iter()
.map(|mention| mention["id"].as_str().expect("TODO").parse().expect("TODO"))
@ -73,16 +73,15 @@ impl DynamicEvent {
involved_users.insert(author_id.parse::<i64>().expect("TODO"));
// replied-to user
let replied_to_user = self.payload["in_reply_to_account_id"].as_str();
if let Some(user_id) = replied_to_user.clone() {
if let Some(user_id) = replied_to_user {
involved_users.insert(user_id.parse().expect("TODO"));
}
// boosted user
let id_of_boosted_user = self.payload["reblog"]["account"]["id"]
.as_str()
.expect("TODO");
involved_users.insert(id_of_boosted_user.parse().expect("TODO"));
involved_users
!involved_users.is_disjoint(blocked_users)
}
}

View File

@ -218,7 +218,7 @@ impl Timeline {
};
use {Content::*, Reach::*, Stream::*};
Ok(match &timeline.split(":").collect::<Vec<&str>>()[..] {
Ok(match &timeline.split(':').collect::<Vec<&str>>()[..] {
["public"] => Timeline(Public, Federated, All),
["public", "local"] => Timeline(Public, Local, All),
["public", "media"] => Timeline(Public, Federated, Media),

View File

@ -1,127 +0,0 @@
//! Provides an interface between the `Warp` filters and the underlying
//! mechanics of talking with Redis/managing multiple threads.
//!
//! The `ClientAgent`'s interface is very simple. All you can do with it is:
//! * Create a totally new `ClientAgent` with no shared data;
//! * Clone an existing `ClientAgent`, sharing the `Receiver`;
//! * Manage an new timeline/user pair; or
//! * Poll an existing `ClientAgent` to see if there are any new messages
//! for clients
//!
//! When you poll the `ClientAgent`, it is responsible for polling internal data
//! structures, getting any updates from Redis, and then filtering out any updates
//! that should be excluded by relevant filters.
//!
//! Because `StreamManagers` are lightweight data structures that do not directly
//! communicate with Redis, it we create a new `ClientAgent` for
//! each new client connection (each in its own thread).use super::{message::Message, receiver::Receiver}
use super::receiver::{Receiver, ReceiverErr};
use crate::{
messages::Event,
parse_client_request::{Stream::Public, Subscription, Timeline},
};
use futures::{
Async::{self, NotReady, Ready},
Poll,
};
use std::sync::{Arc, Mutex, MutexGuard};
/// Struct for managing all Redis streams.
#[derive(Clone, Debug)]
pub struct ClientAgent {
receiver: Arc<Mutex<Receiver>>,
pub subscription: Subscription,
}
impl ClientAgent {
pub fn new(receiver: Arc<Mutex<Receiver>>, subscription: &Subscription) -> Self {
ClientAgent {
receiver,
subscription: subscription.clone(),
}
}
/// Initializes the `ClientAgent` with a unique ID associated with a specific user's
/// subscription. Also passes values to the `Receiver` for it's initialization.
///
/// Note that this *may or may not* result in a new Redis connection.
/// If the server has already subscribed to the timeline on behalf of
/// a different user, the `Receiver` is responsible for figuring
/// that out and avoiding duplicated connections. Thus, it is safe to
/// use this method for each new client connection.
pub fn subscribe(&mut self) {
let mut receiver = self.lock_receiver();
receiver
.add_subscription(&self.subscription)
.unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e))
}
pub fn disconnect(&self) -> futures::future::FutureResult<bool, tokio::timer::Error> {
let mut receiver = self.lock_receiver();
receiver
.remove_subscription(&self.subscription)
.unwrap_or_else(|e| log::error!("Could not unsubscribe from: {}", e));
futures::future::ok(false)
}
fn lock_receiver(&self) -> MutexGuard<Receiver> {
match self.receiver.lock() {
Ok(inner) => inner,
Err(e) => {
log::error!(
"Another thread crashed: {}\n
Attempting to continue, possibly with invalid data",
e
);
e.into_inner()
}
}
}
}
/// The stream that the `ClientAgent` manages. `Poll` is the only method implemented.
impl futures::stream::Stream for ClientAgent {
type Item = Event;
type Error = ReceiverErr;
/// Checks for any new messages that should be sent to the client.
///
/// The `ClientAgent` polls the `Receiver` and replies
/// with `Ok(Ready(Some(Value)))` if there is a new message to send to
/// the client. If there is no new message or if the new message should be
/// filtered out based on one of the user's filters, then the `ClientAgent`
/// replies with `Ok(NotReady)`. The `ClientAgent` bubles up any
/// errors from the underlying data structures.
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let result = {
let mut receiver = self.lock_receiver();
receiver.poll_for(self.subscription.id)
};
let timeline = &self.subscription.timeline;
let allowed_langs = &self.subscription.allowed_langs;
let blocks = &self.subscription.blocks;
let (send, block) = (|msg| Ok(Ready(Some(msg))), Ok(NotReady));
use crate::messages::{CheckedEvent::Update, Event::*};
match result {
Ok(NotReady) => Ok(NotReady),
Ok(Ready(None)) => Ok(Ready(None)),
Ok(Async::Ready(Some(event))) => match event {
TypeSafe(Update { payload, queued_at }) => match timeline {
Timeline(Public, _, _) if payload.language_not(allowed_langs) => block,
_ if payload.involves_any(blocks) => block,
_ => send(TypeSafe(Update { payload, queued_at })),
},
TypeSafe(non_update) => send(Event::TypeSafe(non_update)),
Dynamic(event) if event.event == "update" => match timeline {
Timeline(Public, _, _) if event.language_not(allowed_langs) => block,
_ if event.involves_any(blocks) => block,
_ => send(Dynamic(event)),
},
Dynamic(non_update) => send(Dynamic(non_update)),
},
Err(e) => Err(e),
}
}
}

View File

@ -7,32 +7,31 @@ use std::time::Duration;
use tokio::sync::{mpsc, watch};
use warp::{
reply::Reply,
sse::Sse,
sse::{ServerSentEvent, Sse},
ws::{Message, WebSocket},
};
pub struct EventStream;
impl EventStream {
/// Send a stream of replies to a WebSocket client.
pub fn send_to_ws(
pub struct WsStream {
ws_tx: mpsc::UnboundedSender<Message>,
unsubscribe_tx: mpsc::UnboundedSender<Timeline>,
subscription: Subscription,
}
impl WsStream {
pub fn new(
ws: WebSocket,
unsubscribe_tx: mpsc::UnboundedSender<Timeline>,
subscription: Subscription,
event_rx: watch::Receiver<(Timeline, Event)>,
mut subscribe_tx: mpsc::UnboundedSender<Timeline>,
) -> impl Future<Item = (), Error = ()> {
) -> Self {
let (transmit_to_ws, _receive_from_ws) = ws.split();
let target_timeline = subscription.timeline;
let user_langs = subscription.allowed_langs;
let blocks = subscription.blocks;
// Create a pipe
let (ws_tx, ws_rx) = futures::sync::mpsc::unbounded();
let (ws_tx, ws_rx) = mpsc::unbounded_channel();
// Send one end of it to a different green thread and tell that end to forward
// whatever it gets on to the WebSocket client
warp::spawn(
ws_rx
.map_err(|()| -> warp::Error { unreachable!() })
.map_err(|_| -> warp::Error { unreachable!() })
.forward(transmit_to_ws)
.map(|_r| ())
.map_err(|e| match e.to_string().as_ref() {
@ -40,169 +39,82 @@ impl EventStream {
_ => log::warn!("WebSocket send error: {}", e),
}),
);
event_rx.map_err(|_| ()).for_each(move |(tl, event)| {
if target_timeline == tl {
use crate::messages::{CheckedEvent::Update, Event::*};
use crate::parse_client_request::Stream::Public;
log::info!("Got event for {:?}", tl);
if let Event::TypeSafe(Update { payload, .. }) = event.clone() {
log::info!("{:?}", &payload.content);
}
match event {
Ping => match ws_tx.unbounded_send(Message::text("{}")) {
Ok(_) => Ok(()),
Err(_) => {
subscribe_tx.try_send(tl).expect("TODO");
Err(())
}
},
TypeSafe(Update { payload, queued_at }) => match tl {
Timeline(Public, _, _) if payload.language_not(&user_langs) => Ok(()),
_ if payload.involves_any(&blocks) => Ok(()),
_ => match ws_tx.unbounded_send(Message::text(
TypeSafe(Update { payload, queued_at }).to_json_string(),
)) {
Ok(_) => Ok(()),
Err(_) => {
subscribe_tx.try_send(tl).expect("TODO");
Err(())
}
},
},
TypeSafe(non_update) => match ws_tx
.unbounded_send(Message::text(TypeSafe(non_update).to_json_string()))
{
Ok(_) => Ok(()),
Err(_) => {
subscribe_tx.try_send(tl).expect("TODO");
Err(())
}
},
Dynamic(event) if event.event == "update" => match tl {
Timeline(Public, _, _) if event.language_not(&user_langs) => Ok(()),
_ if event.involves_any(&blocks) => Ok(()),
_ => match ws_tx
.unbounded_send(Message::text(Dynamic(event).to_json_string()))
{
Ok(_) => Ok(()),
Err(_) => {
subscribe_tx.try_send(tl).expect("TODO");
Err(())
}
},
},
Dynamic(non_update) => match ws_tx
.unbounded_send(Message::text(Dynamic(non_update).to_json_string()))
{
Ok(_) => Ok(()),
Err(_) => {
subscribe_tx.try_send(tl).expect("TODO");
Err(())
}
},
}
} else {
if let Event::Ping = event {
match ws_tx.unbounded_send(Message::text("{}")) {
Ok(_) => Ok(()),
Err(_) => {
subscribe_tx.try_send(target_timeline).expect("TODO");
Err(())
}
}
} else {
Ok(())
}
}
})
// _rx
// .take_while(move |(tl, event)| {
// let (tl, event) = (*tl, event.clone());
// if target_timeline == tl {
// log::info!("Got event for {:?}", tl);
// use crate::messages::{CheckedEvent::Update, Event::*};
// use crate::parse_client_request::Stream::Public;
// match event {
// Ping => match ws_tx.unbounded_send(Message::text("{}")) {
// Ok(_) => futures::future::ok(true),
// Err(_) => {
// subscribe_tx.try_send(tl).expect("TODO");
// futures::future::ok(false)
// }
// },
// TypeSafe(Update { payload, queued_at }) => match tl {
// Timeline(Public, _, _) if payload.language_not(&user_langs) => {
// futures::future::ok(true)
// }
// _ if payload.involves_any(&blocks) => futures::future::ok(true),
// _ => match ws_tx.unbounded_send(Message::text(
// TypeSafe(Update { payload, queued_at }).to_json_string(),
// )) {
// Ok(_) => futures::future::ok(true),
// Err(_) => {
// subscribe_tx.try_send(tl).expect("TODO");
// futures::future::ok(false)
// }
// },
// },
// TypeSafe(non_update) => match ws_tx
// .unbounded_send(Message::text(TypeSafe(non_update).to_json_string()))
// {
// Ok(_) => futures::future::ok(true),
// Err(_) => {
// subscribe_tx.try_send(tl).expect("TODO");
// futures::future::ok(false)
// }
// },
// Dynamic(event) if event.event == "update" => match tl {
// Timeline(Public, _, _) if event.language_not(&user_langs) => {
// futures::future::ok(true)
// }
// _ if event.involves_any(&blocks) => futures::future::ok(true),
// _ => match ws_tx
// .unbounded_send(Message::text(Dynamic(event).to_json_string()))
// {
// Ok(_) => futures::future::ok(true),
// Err(_) => {
// subscribe_tx.try_send(tl).expect("TODO");
// futures::future::ok(false)
// }
// },
// },
// Dynamic(non_update) => match ws_tx
// .unbounded_send(Message::text(Dynamic(non_update).to_json_string()))
// {
// Ok(_) => futures::future::ok(true),
// Err(_) => {
// subscribe_tx.try_send(tl).expect("TODO");
// futures::future::ok(false)
// }
// },
// }
// } else {
// if let Event::Ping = event {
// match ws_tx.unbounded_send(Message::text("{}")) {
// Ok(_) => futures::future::ok(true),
// Err(_) => {
// subscribe_tx.try_send(target_timeline).expect("TODO");
// futures::future::ok(false)
// }
// }
// } else {
// futures::future::ok(true)
// }
// }
// })
// .for_each(|_| Ok(()))
// .map_err(|_| ())
// .map(|_| ())
Self {
ws_tx,
unsubscribe_tx,
subscription,
}
}
pub fn send_to_sse(
pub fn send_events(
mut self,
event_rx: watch::Receiver<(Timeline, Event)>,
) -> impl Future<Item = (), Error = ()> {
let target_timeline = self.subscription.timeline;
event_rx.map_err(|_| ()).for_each(move |(tl, event)| {
if matches!(event, Event::Ping) {
self.send_ping()
} else if target_timeline == tl {
use crate::messages::{CheckedEvent::Update, Event::*};
use crate::parse_client_request::Stream::Public;
let blocks = &self.subscription.blocks;
let allowed_langs = &self.subscription.allowed_langs;
match event {
TypeSafe(Update { payload, queued_at }) => match tl {
Timeline(Public, _, _) if payload.language_not(allowed_langs) => Ok(()),
_ if payload.involves_any(&blocks) => Ok(()),
_ => self.send_msg(TypeSafe(Update { payload, queued_at })),
},
TypeSafe(non_update) => self.send_msg(TypeSafe(non_update)),
Dynamic(event) if event.event == "update" => match tl {
Timeline(Public, _, _) if event.language_not(allowed_langs) => Ok(()),
_ if event.involves_any(&blocks) => Ok(()),
_ => self.send_msg(Dynamic(event)),
},
Dynamic(non_update) => self.send_msg(Dynamic(non_update)),
Ping => unreachable!(), // handled pings above
}
} else {
Ok(())
}
})
}
fn send_ping(&mut self) -> Result<(), ()> {
self.send_txt("{}")
}
fn send_msg(&mut self, event: Event) -> Result<(), ()> {
self.send_txt(&event.to_json_string())
}
fn send_txt(&mut self, txt: &str) -> Result<(), ()> {
let tl = self.subscription.timeline;
match self.ws_tx.try_send(Message::text(txt)) {
Ok(_) => Ok(()),
Err(_) => {
self.unsubscribe_tx.try_send(tl).expect("TODO");
Err(())
}
}
}
}
pub struct SseStream {}
impl SseStream {
fn reply_with(event: Event) -> Option<(impl ServerSentEvent, impl ServerSentEvent)> {
Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
))
}
pub fn send_events(
sse: Sse,
mut unsubscribe_tx: mpsc::UnboundedSender<Timeline>,
subscription: Subscription,
sse_rx: watch::Receiver<(Timeline, Event)>,
) -> impl Reply {
@ -210,50 +122,37 @@ impl EventStream {
let allowed_langs = subscription.allowed_langs;
let blocks = subscription.blocks;
let event_stream = sse_rx.filter_map(move |(timeline, event)| {
if target_timeline == timeline {
log::info!("Got event for {:?}", timeline);
use crate::messages::{CheckedEvent, CheckedEvent::Update, Event::*};
use crate::parse_client_request::Stream::Public;
match event {
TypeSafe(Update { payload, queued_at }) => match timeline {
Timeline(Public, _, _) if payload.language_not(&allowed_langs) => None,
_ if payload.involves_any(&blocks) => None,
_ => {
let event =
Event::TypeSafe(CheckedEvent::Update { payload, queued_at });
Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
))
}
},
TypeSafe(non_update) => {
let event = Event::TypeSafe(non_update);
Some((
warp::sse::event(event.event_name()),
warp::sse::data(event.payload().unwrap_or_else(String::new)),
))
let event_stream = sse_rx
.filter_map(move |(timeline, event)| {
if target_timeline == timeline {
use crate::messages::{CheckedEvent, CheckedEvent::Update, Event::*};
use crate::parse_client_request::Stream::Public;
match event {
TypeSafe(Update { payload, queued_at }) => match timeline {
Timeline(Public, _, _) if payload.language_not(&allowed_langs) => None,
_ if payload.involves_any(&blocks) => None,
_ => Self::reply_with(Event::TypeSafe(CheckedEvent::Update {
payload,
queued_at,
})),
},
TypeSafe(non_update) => Self::reply_with(Event::TypeSafe(non_update)),
Dynamic(event) if event.event == "update" => match timeline {
Timeline(Public, _, _) if event.language_not(&allowed_langs) => None,
_ if event.involves_any(&blocks) => None,
_ => Self::reply_with(Event::Dynamic(event)),
},
Dynamic(non_update) => Self::reply_with(Event::Dynamic(non_update)),
Ping => None, // pings handled automatically
}
Dynamic(event) if event.event == "update" => match timeline {
Timeline(Public, _, _) if event.language_not(&allowed_langs) => None,
_ if event.involves_any(&blocks) => None,
_ => Some((
warp::sse::event(event.event),
warp::sse::data(event.payload.to_string()),
)),
},
Dynamic(non_update) => Some((
warp::sse::event(non_update.event),
warp::sse::data(non_update.payload.to_string()),
)),
// TODO: Fix for Ping
EventNotReady => panic!("TODO"),
} else {
None
}
} else {
None
}
});
})
.then(move |res| {
unsubscribe_tx.try_send(target_timeline).expect("TODO");
res
});
sse.reply(
warp::sse::keep_alive()
@ -263,49 +162,3 @@ impl EventStream {
)
}
}
// if target_timeline == tl {
// log::info!("Got event for {:?}", tl);
// use crate::messages::{CheckedEvent::Update, Event::*};
// use crate::parse_client_request::Stream::Public;
// match event {
// TypeSafe(Update { payload, queued_at }) => match tl {
// Timeline(Public, _, _) if payload.language_not(&user_langs) => Ok(()),
// _ if payload.involves_any(&blocks) => Ok(()),
// _ => Ok(ws_tx
// .unbounded_send(Message::text(
// TypeSafe(Update { payload, queued_at }).to_json_string(),
// ))
// .unwrap_or_else(|_| subscribe_tx.try_send(tl).expect("TODO"))),
// },
// TypeSafe(non_update) => Ok(ws_tx
// .unbounded_send(Message::text(TypeSafe(non_update).to_json_string()))
// .unwrap_or_else(|_| subscribe_tx.try_send(tl).expect("TODO"))),
// Dynamic(event) if event.event == "update" => match tl {
// Timeline(Public, _, _) if event.language_not(&user_langs) => Ok(()),
// _ if event.involves_any(&blocks) => Ok(()),
// _ => Ok(ws_tx
// .unbounded_send(Message::text(Dynamic(event).to_json_string()))
// .unwrap_or_else(|_| subscribe_tx.try_send(tl).expect("TODO"))),
// },
// Dynamic(non_update) => Ok(ws_tx
// .unbounded_send(Message::text(Dynamic(non_update).to_json_string()))
// .unwrap_or_else(|_| subscribe_tx.try_send(tl).expect("TODO"))),
// Ping => Ok(match ws_tx.unbounded_send(Message::text("{}")) {
// Ok(_) => (),
// Err(_) => {
// subscribe_tx.try_send(tl).expect("TODO");
// }
// }),
// }
// } else {
// if let Event::Ping = event {
// Ok(ws_tx
// .unbounded_send(Message::text("{}"))
// .unwrap_or_else(|_| {
// subscribe_tx.try_send(target_timeline).expect("TODO")
// }))
// } else {
// Ok(())
// }
// }

View File

@ -3,7 +3,10 @@ mod event_stream;
mod receiver;
mod redis;
pub use {event_stream::EventStream, receiver::Receiver};
pub use {
event_stream::{SseStream, WsStream},
receiver::Receiver,
};
#[cfg(feature = "bench")]
pub use redis::redis_msg::{RedisMsg, RedisParseOutput};

View File

@ -1,53 +0,0 @@
use crate::messages::Event;
use crate::parse_client_request::Timeline;
use hashbrown::HashMap;
use std::{collections::VecDeque, fmt};
use uuid::Uuid;
#[derive(Clone)]
pub struct MsgQueue {
pub timeline: Timeline,
pub messages: VecDeque<Event>,
}
impl MsgQueue {
pub fn new(timeline: Timeline) -> Self {
MsgQueue {
messages: VecDeque::new(),
timeline,
}
}
}
#[derive(Debug)]
pub struct MessageQueues(pub HashMap<Uuid, MsgQueue>);
impl MessageQueues {}
impl fmt::Debug for MsgQueue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"\
MsgQueue {{
timeline: {:?},
messages: {:?},
}}",
self.timeline, self.messages,
)
}
}
impl std::ops::Deref for MessageQueues {
type Target = HashMap<Uuid, MsgQueue>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for MessageQueues {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

View File

@ -32,7 +32,6 @@ pub struct Receiver {
tx: watch::Sender<(Timeline, Event)>,
rx: mpsc::UnboundedReceiver<Timeline>,
ping_time: Instant,
time: Instant,
}
impl Receiver {
@ -51,7 +50,6 @@ impl Receiver {
tx,
rx,
ping_time: Instant::now(),
time: Instant::now(),
})
}
@ -59,7 +57,7 @@ impl Receiver {
Arc::new(Mutex::new(self))
}
pub fn add_subscription(&mut self, subscription: &Subscription) -> Result<()> {
pub fn subscribe(&mut self, subscription: &Subscription) -> Result<()> {
let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline);
if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (tag, tl) {
@ -80,7 +78,7 @@ impl Receiver {
Ok(())
}
pub fn remove_subscription(&mut self, tl: Timeline) -> Result<()> {
pub fn unsubscribe(&mut self, tl: Timeline) -> Result<()> {
let number_of_subscriptions = self
.clients_per_timeline
.entry(tl)
@ -102,10 +100,8 @@ impl Receiver {
}
pub fn poll_broadcast(&mut self) {
log::info!("{:?}", self.time.elapsed());
while let Ok(Async::Ready(Some(tl))) = self.rx.poll() {
self.remove_subscription(tl).expect("TODO");
self.unsubscribe(tl).expect("TODO");
}
if self.ping_time.elapsed() > Duration::from_secs(30) {
@ -123,7 +119,6 @@ impl Receiver {
Err(_err) => panic!("TODO"),
}
}
self.time = Instant::now();
}
pub fn count_connections(&self) -> String {

View File

@ -80,7 +80,7 @@ impl RedisConn {
self.redis_input.clear();
let (input, invalid_bytes) = str::from_utf8(&input)
.map(|input| (input, "".as_bytes()))
.map(|input| (input, &b""[..]))
.unwrap_or_else(|e| {
let (valid, invalid) = input.split_at(e.valid_up_to());
(str::from_utf8(valid).expect("Guaranteed by ^^^^"), invalid)