mirror of https://github.com/mastodon/flodgatt
Revised WebSocket implementation
This commit is contained in:
parent
54ad55e0c0
commit
8ae9bbfac5
|
@ -25,7 +25,6 @@ pub fn handle_errors(
|
|||
None => "Error: Nonexistant endpoint".to_string(),
|
||||
};
|
||||
let json = warp::reply::json(&ErrorMessage::new(err_txt));
|
||||
println!("REJECTED!");
|
||||
Ok(warp::reply::with_status(
|
||||
json,
|
||||
warp::http::StatusCode::UNAUTHORIZED,
|
||||
|
|
56
src/main.rs
56
src/main.rs
|
@ -28,11 +28,13 @@
|
|||
pub mod error;
|
||||
pub mod query;
|
||||
pub mod receiver;
|
||||
pub mod redis_cmd;
|
||||
pub mod stream;
|
||||
pub mod timeline;
|
||||
pub mod user;
|
||||
pub mod ws;
|
||||
use futures::stream::Stream;
|
||||
use futures::Async;
|
||||
use receiver::Receiver;
|
||||
use stream::StreamManager;
|
||||
use user::{Scope, User};
|
||||
|
@ -43,6 +45,8 @@ fn main() {
|
|||
pretty_env_logger::init();
|
||||
|
||||
let redis_updates = StreamManager::new(Receiver::new());
|
||||
let redis_updates_sse = redis_updates.blank_copy();
|
||||
let redis_updates_ws = redis_updates.blank_copy();
|
||||
|
||||
let routes = any_of!(
|
||||
// GET /api/v1/streaming/user/notification [private; notification filter]
|
||||
|
@ -68,26 +72,25 @@ fn main() {
|
|||
)
|
||||
.untuple_one()
|
||||
.and(warp::sse())
|
||||
.and(warp::any().map(move || redis_updates.new_copy()))
|
||||
.map(
|
||||
|timeline: String, user: User, sse: warp::sse::Sse, mut event_stream: StreamManager| {
|
||||
dbg!(&event_stream);
|
||||
event_stream.add(&timeline, &user);
|
||||
event_stream.set_user(user.clone());
|
||||
sse.reply(warp::sse::keep(
|
||||
event_stream.filter_map(move |item| {
|
||||
let payload = item["payload"].clone();
|
||||
let event = item["event"].clone().to_string();
|
||||
Some((warp::sse::event(event), warp::sse::data(payload)))
|
||||
}),
|
||||
None,
|
||||
))
|
||||
},
|
||||
)
|
||||
.map(move |timeline: String, user: User, sse: warp::sse::Sse| {
|
||||
let mut redis_stream = redis_updates_sse.configure_copy(&timeline, user);
|
||||
let event_stream = tokio::timer::Interval::new(
|
||||
std::time::Instant::now(),
|
||||
std::time::Duration::from_millis(100),
|
||||
)
|
||||
.filter_map(move |_| match redis_stream.poll() {
|
||||
Ok(Async::Ready(Some(json_value))) => Some((
|
||||
warp::sse::event(json_value["event"].clone().to_string()),
|
||||
warp::sse::data(json_value["payload"].clone()),
|
||||
)),
|
||||
_ => None,
|
||||
});
|
||||
sse.reply(warp::sse::keep(event_stream, None))
|
||||
})
|
||||
.with(warp::reply::with::header("Connection", "keep-alive"))
|
||||
.recover(error::handle_errors);
|
||||
|
||||
let redis_updates_ws = StreamManager::new(Receiver::new());
|
||||
//let redis_updates_ws = StreamManager::new(Receiver::new());
|
||||
let websocket = path!("api" / "v1" / "streaming")
|
||||
.and(Scope::Public.get_access_token())
|
||||
.and_then(|token| User::from_access_token(token, Scope::Public))
|
||||
|
@ -96,15 +99,13 @@ fn main() {
|
|||
.and(query::Hashtag::to_filter())
|
||||
.and(query::List::to_filter())
|
||||
.and(warp::ws2())
|
||||
.and(warp::any().map(move || redis_updates_ws.new_copy()))
|
||||
.and_then(
|
||||
|mut user: User,
|
||||
q: query::Stream,
|
||||
m: query::Media,
|
||||
h: query::Hashtag,
|
||||
l: query::List,
|
||||
ws: warp::ws::Ws2,
|
||||
mut stream: StreamManager| {
|
||||
move |mut user: User,
|
||||
q: query::Stream,
|
||||
m: query::Media,
|
||||
h: query::Hashtag,
|
||||
l: query::List,
|
||||
ws: warp::ws::Ws2| {
|
||||
let unauthorized = Err(warp::reject::custom("Error: Invalid Access Token"));
|
||||
let timeline = match q.stream.as_ref() {
|
||||
// Public endpoints:
|
||||
|
@ -131,10 +132,9 @@ fn main() {
|
|||
// Other endpoints don't exist:
|
||||
_ => return Err(warp::reject::custom("Error: Nonexistent WebSocket query")),
|
||||
};
|
||||
let stream = redis_updates_ws.configure_copy(&timeline, user);
|
||||
|
||||
stream.add(&timeline, &user);
|
||||
stream.set_user(user);
|
||||
Ok(ws.on_upgrade(move |socket| ws::handle_ws(socket, stream)))
|
||||
Ok(ws.on_upgrade(move |socket| ws::send_replies(socket, stream)))
|
||||
},
|
||||
);
|
||||
|
||||
|
|
159
src/receiver.rs
159
src/receiver.rs
|
@ -1,4 +1,5 @@
|
|||
//! Interfacing with Redis and stream the results on to the `StreamManager`
|
||||
use crate::redis_cmd;
|
||||
use crate::user::User;
|
||||
use futures::stream::Stream;
|
||||
use futures::{Async, Poll};
|
||||
|
@ -6,13 +7,11 @@ use log::info;
|
|||
use regex::Regex;
|
||||
use serde_json::Value;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use tokio::io::{AsyncRead, Error};
|
||||
use uuid::Uuid;
|
||||
|
||||
use std::io::{Read, Write};
|
||||
use std::net::TcpStream;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::io::{AsyncRead, Error};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MsgQueue {
|
||||
|
@ -34,12 +33,13 @@ impl MsgQueue {
|
|||
/// The item that streams from Redis and is polled by the `StreamManger`
|
||||
#[derive(Debug)]
|
||||
pub struct Receiver {
|
||||
stream: TcpStream,
|
||||
pubsub_connection: TcpStream,
|
||||
secondary_redis_connection: TcpStream,
|
||||
tl: String,
|
||||
pub user: User,
|
||||
manager_id: Uuid,
|
||||
msg_queue: HashMap<Uuid, MsgQueue>,
|
||||
subscribed_timelines: HashMap<String, i32>,
|
||||
msg_queues: HashMap<Uuid, MsgQueue>,
|
||||
clients_per_timeline: HashMap<String, i32>,
|
||||
}
|
||||
impl Default for Receiver {
|
||||
fn default() -> Self {
|
||||
|
@ -48,69 +48,97 @@ impl Default for Receiver {
|
|||
}
|
||||
impl Receiver {
|
||||
pub fn new() -> Self {
|
||||
let stream = TcpStream::connect("127.0.0.1:6379").expect("Can connect to Redis");
|
||||
|
||||
stream
|
||||
let pubsub_connection = TcpStream::connect("127.0.0.1:6379").expect("Can connect to Redis");
|
||||
pubsub_connection
|
||||
.set_read_timeout(Some(Duration::from_millis(10)))
|
||||
.expect("Can set read timeout for Redis connection");
|
||||
let secondary_redis_connection =
|
||||
TcpStream::connect("127.0.0.1:6379").expect("Can connect to Redis");
|
||||
secondary_redis_connection
|
||||
.set_read_timeout(Some(Duration::from_millis(10)))
|
||||
.expect("Can set read timeout for Redis connection");
|
||||
Self {
|
||||
stream,
|
||||
pubsub_connection,
|
||||
secondary_redis_connection,
|
||||
tl: String::new(),
|
||||
user: User::public(),
|
||||
manager_id: Uuid::new_v4(),
|
||||
msg_queue: HashMap::new(),
|
||||
subscribed_timelines: HashMap::new(),
|
||||
msg_queues: HashMap::new(),
|
||||
clients_per_timeline: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the `StreamManager` that is currently polling the `Receiver`
|
||||
pub fn set_manager_id(&mut self, id: Uuid) {
|
||||
pub fn update(&mut self, id: Uuid, timeline: impl std::fmt::Display) {
|
||||
self.manager_id = id;
|
||||
self.tl = timeline.to_string();
|
||||
}
|
||||
/// Send a subscribe command to the Redis PubSub and check if any subscriptions should be dropped
|
||||
pub fn subscribe(&mut self, tl: &str) {
|
||||
|
||||
/// Send a subscribe command to the Redis PubSub (if needed)
|
||||
pub fn maybe_subscribe(&mut self, tl: &str) {
|
||||
info!("Subscribing to {}", &tl);
|
||||
|
||||
let manager_id = self.manager_id;
|
||||
self.msg_queue.insert(manager_id, MsgQueue::new(tl));
|
||||
self.subscribed_timelines
|
||||
self.msg_queues.insert(manager_id, MsgQueue::new(tl));
|
||||
let current_clients = self
|
||||
.clients_per_timeline
|
||||
.entry(tl.to_string())
|
||||
.and_modify(|n| *n += 1)
|
||||
.or_insert(1);
|
||||
|
||||
let mut timelines_with_dropped_clients = Vec::new();
|
||||
self.msg_queue.retain(|_id, msg_queue| {
|
||||
if msg_queue.last_polled_at.elapsed() > Duration::from_secs(30) {
|
||||
timelines_with_dropped_clients.push(msg_queue.redis_channel.clone());
|
||||
false
|
||||
} else {
|
||||
if *current_clients == 1 {
|
||||
let subscribe_cmd = redis_cmd::pubsub("subscribe", tl);
|
||||
self.pubsub_connection
|
||||
.write_all(&subscribe_cmd)
|
||||
.expect("Can subscribe to Redis");
|
||||
let set_subscribed_cmd = redis_cmd::set(format!("subscribed:timeline:{}", tl), "1");
|
||||
self.secondary_redis_connection
|
||||
.write_all(&set_subscribed_cmd)
|
||||
.expect("Can set Redis");
|
||||
info!("Now subscribed to: {:#?}", &self.msg_queues);
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop any PubSub subscriptions that don't have active clients
|
||||
pub fn unsubscribe_from_empty_channels(&mut self) {
|
||||
let mut timelines_with_fewer_clients = Vec::new();
|
||||
|
||||
// Keep only message queues that have been polled recently
|
||||
self.msg_queues.retain(|_id, msg_queue| {
|
||||
if msg_queue.last_polled_at.elapsed() < Duration::from_secs(30) {
|
||||
true
|
||||
} else {
|
||||
timelines_with_fewer_clients.push(msg_queue.redis_channel.clone());
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
for timeline in timelines_with_dropped_clients {
|
||||
// Record the lower number of clients subscribed to that channel
|
||||
for timeline in timelines_with_fewer_clients {
|
||||
let count_of_subscribed_clients = self
|
||||
.subscribed_timelines
|
||||
.clients_per_timeline
|
||||
.entry(timeline.clone())
|
||||
.and_modify(|n| *n -= 1)
|
||||
.or_insert(0);
|
||||
// If no clients, unsubscribe from the channel
|
||||
if *count_of_subscribed_clients <= 0 {
|
||||
self.unsubscribe(&timeline);
|
||||
}
|
||||
}
|
||||
|
||||
let subscribe_cmd = redis_cmd_from("subscribe", &tl);
|
||||
self.stream
|
||||
.write_all(&subscribe_cmd)
|
||||
.expect("Can subscribe to Redis");
|
||||
}
|
||||
|
||||
/// Send an unsubscribe command to the Redis PubSub
|
||||
pub fn unsubscribe(&mut self, tl: &str) {
|
||||
let unsubscribe_cmd = redis_cmd_from("unsubscribe", &tl);
|
||||
let unsubscribe_cmd = redis_cmd::pubsub("unsubscribe", tl);
|
||||
info!("Unsubscribing from {}", &tl);
|
||||
self.stream
|
||||
self.pubsub_connection
|
||||
.write_all(&unsubscribe_cmd)
|
||||
.expect("Can unsubscribe from Redis");
|
||||
println!("Done unsubscribing");
|
||||
let set_subscribed_cmd = redis_cmd::set(format!("subscribed:timeline:{}", tl), "0");
|
||||
self.secondary_redis_connection
|
||||
.write_all(&set_subscribed_cmd)
|
||||
.expect("Can set Redis");
|
||||
info!("Now subscribed only to: {:#?}", &self.msg_queues);
|
||||
}
|
||||
}
|
||||
impl Stream for Receiver {
|
||||
|
@ -119,40 +147,50 @@ impl Stream for Receiver {
|
|||
|
||||
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
|
||||
let mut buffer = vec![0u8; 3000];
|
||||
let polled_by = self.manager_id;
|
||||
self.msg_queue
|
||||
.entry(polled_by)
|
||||
info!("Being polled by: {}", self.manager_id);
|
||||
let timeline = self.tl.clone();
|
||||
|
||||
// Record current time as last polled time
|
||||
self.msg_queues
|
||||
.entry(self.manager_id)
|
||||
.and_modify(|msg_queue| msg_queue.last_polled_at = Instant::now());
|
||||
info!("Being polled by StreamManager with uuid: {}", polled_by);
|
||||
|
||||
let mut async_stream = AsyncReadableStream(&mut self.stream);
|
||||
|
||||
// Add any incomming messages to the back of the relevant `msg_queues`
|
||||
// NOTE: This could be more/other than the `msg_queue` currently being polled
|
||||
let mut async_stream = AsyncReadableStream(&mut self.pubsub_connection);
|
||||
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer)? {
|
||||
let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]);
|
||||
// capture everything between `{` and `}` as potential JSON
|
||||
// TODO: figure out if `(?x)` is needed
|
||||
let re = Regex::new(r"(?P<json>\{.*\})").expect("Valid hard-coded regex");
|
||||
let json_regex = Regex::new(r"(?P<json>\{.*\})").expect("Hard-coded");
|
||||
// capture the timeline so we know which queues to add it to
|
||||
let timeline_regex = Regex::new(r"timeline:(?P<timeline>.*?)\r").expect("Hard-codded");
|
||||
if let Some(result) = json_regex.captures(raw_redis_response) {
|
||||
let timeline =
|
||||
timeline_regex.captures(raw_redis_response).unwrap()["timeline"].to_string();
|
||||
|
||||
if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) {
|
||||
let json: Value = serde_json::from_str(&cap["json"].to_string().clone())?;
|
||||
for msg_queue in self.msg_queue.values_mut() {
|
||||
msg_queue.messages.push_back(json.clone());
|
||||
let msg: Value = serde_json::from_str(&result["json"].to_string().clone())?;
|
||||
for msg_queue in self.msg_queues.values_mut() {
|
||||
if msg_queue.redis_channel == timeline {
|
||||
msg_queue.messages.push_back(msg.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let timeline = self.tl.clone();
|
||||
if let Some(value) = self
|
||||
.msg_queue
|
||||
.entry(polled_by)
|
||||
|
||||
// If the `msg_queue` being polled has any new messages, return the first (oldest) one
|
||||
match self
|
||||
.msg_queues
|
||||
.entry(self.manager_id)
|
||||
.or_insert_with(|| MsgQueue::new(timeline))
|
||||
.messages
|
||||
.pop_front()
|
||||
{
|
||||
Ok(Async::Ready(Some(value)))
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
Some(value) => Ok(Async::Ready(Some(value))),
|
||||
_ => Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Receiver {
|
||||
fn drop(&mut self) {
|
||||
let timeline = self.tl.clone();
|
||||
|
@ -176,16 +214,3 @@ impl<'a> AsyncRead for AsyncReadableStream<'a> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn redis_cmd_from(cmd: impl std::fmt::Display, timeline: impl std::fmt::Display) -> Vec<u8> {
|
||||
let (cmd, arg) = (cmd.to_string(), format!("timeline:{}", timeline));
|
||||
format!(
|
||||
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
|
||||
cmd_length = cmd.len(),
|
||||
cmd = cmd,
|
||||
arg_length = arg.len(),
|
||||
arg = arg
|
||||
)
|
||||
.as_bytes()
|
||||
.to_owned()
|
||||
}
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
//! Send raw TCP commands to the Redis server
|
||||
use std::fmt::Display;
|
||||
|
||||
/// Send a `SUBSCRIBE` or `UNSUBSCRIBE` command to a specific timeline
|
||||
pub fn pubsub(command: impl Display, timeline: impl Display) -> Vec<u8> {
|
||||
let arg = format!("timeline:{}", timeline);
|
||||
let command = command.to_string();
|
||||
format!(
|
||||
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
|
||||
cmd_length = command.len(),
|
||||
cmd = command,
|
||||
arg_length = arg.len(),
|
||||
arg = arg
|
||||
)
|
||||
.as_bytes()
|
||||
.to_owned()
|
||||
}
|
||||
|
||||
/// Send a `SET` command
|
||||
pub fn set(key: impl Display, value: impl Display) -> Vec<u8> {
|
||||
let (key, value) = (key.to_string(), value.to_string());
|
||||
format!(
|
||||
"*3\r\n$3\r\nSET\r\n${key_length}\r\n{key}\r\n${value_length}\r\n{value}\r\n",
|
||||
key_length = key.len(),
|
||||
key = key,
|
||||
value_length = value.len(),
|
||||
value = value
|
||||
)
|
||||
.as_bytes()
|
||||
.to_owned()
|
||||
}
|
|
@ -1,8 +1,9 @@
|
|||
//! Manage all existing Redis PubSub connection
|
||||
use crate::receiver::Receiver;
|
||||
use crate::user::User;
|
||||
use crate::user::{Filter, User};
|
||||
use futures::stream::Stream;
|
||||
use futures::{Async, Poll};
|
||||
use serde_json::json;
|
||||
use serde_json::Value;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::io::Error;
|
||||
|
@ -13,39 +14,37 @@ use uuid::Uuid;
|
|||
pub struct StreamManager {
|
||||
receiver: Arc<Mutex<Receiver>>,
|
||||
id: uuid::Uuid,
|
||||
target_timeline: String,
|
||||
current_user: Option<User>,
|
||||
}
|
||||
impl StreamManager {
|
||||
pub fn new(reciever: Receiver) -> Self {
|
||||
StreamManager {
|
||||
receiver: Arc::new(Mutex::new(reciever)),
|
||||
id: Uuid::new_v4(),
|
||||
id: Uuid::default(),
|
||||
target_timeline: String::new(),
|
||||
current_user: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Clone the StreamManager with a new unique id
|
||||
pub fn new_copy(&self) -> Self {
|
||||
/// Create a blank StreamManager copy
|
||||
pub fn blank_copy(&self) -> Self {
|
||||
StreamManager { ..self.clone() }
|
||||
}
|
||||
/// Create a StreamManager copy with a new unique id manage subscriptions
|
||||
pub fn configure_copy(&self, timeline: &String, user: User) -> Self {
|
||||
let id = Uuid::new_v4();
|
||||
StreamManager { id, ..self.clone() }
|
||||
}
|
||||
|
||||
/// Subscribe to a channel if not already subscribed
|
||||
///
|
||||
///
|
||||
/// `.add()` also unsubscribes from any channels that no longer have clients
|
||||
pub fn add(&mut self, timeline: &str, _user: &User) {
|
||||
let mut receiver = self.receiver.lock().expect("No panic in other threads");
|
||||
receiver.set_manager_id(self.id);
|
||||
receiver.subscribe(timeline);
|
||||
}
|
||||
|
||||
pub fn set_user(&mut self, user: User) {
|
||||
self.current_user = Some(user);
|
||||
receiver.update(id, timeline);
|
||||
receiver.maybe_subscribe(timeline);
|
||||
StreamManager {
|
||||
id,
|
||||
current_user: Some(user),
|
||||
target_timeline: timeline.clone(),
|
||||
..self.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
use crate::user::Filter;
|
||||
use serde_json::json;
|
||||
|
||||
impl Stream for StreamManager {
|
||||
type Item = Value;
|
||||
|
@ -53,7 +52,7 @@ impl Stream for StreamManager {
|
|||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
let mut receiver = self.receiver.lock().expect("No other thread panic");
|
||||
receiver.set_manager_id(self.id);
|
||||
receiver.update(self.id, &self.target_timeline.clone());
|
||||
match receiver.poll() {
|
||||
Ok(Async::Ready(Some(value))) => {
|
||||
let user = self
|
||||
|
@ -77,7 +76,6 @@ impl Stream for StreamManager {
|
|||
(Filter::Language, Some(ref langs)) if !langs.contains(&toot_lang) => {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
|
||||
_ => Ok(Async::Ready(Some(json!(
|
||||
{"event": event,
|
||||
"payload": payload,}
|
||||
|
|
|
@ -59,7 +59,7 @@ LIMIT 1",
|
|||
filter: Filter::None,
|
||||
})
|
||||
} else if let Scope::Public = scope {
|
||||
println!("Granting public access");
|
||||
info!("Granting public access to non-authenticated client");
|
||||
Ok(User {
|
||||
id: -1,
|
||||
langs: None,
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
|
|
@ -1,9 +1,11 @@
|
|||
//! WebSocket-specific functionality
|
||||
use crate::stream::StreamManager;
|
||||
use futures::future::Future;
|
||||
use futures::stream::Stream;
|
||||
use futures::Async;
|
||||
|
||||
pub fn handle_ws(
|
||||
/// Send a stream of replies to a WebSocket client
|
||||
pub fn send_replies(
|
||||
socket: warp::ws::WebSocket,
|
||||
mut stream: StreamManager,
|
||||
) -> impl futures::future::Future<Item = (), Error = ()> {
|
||||
|
|
Loading…
Reference in New Issue