Merge pull request #16 from tootsuite/WebSocket

WebSocket streaming
This commit is contained in:
Daniel Sockwell 2019-05-10 02:03:38 -04:00 committed by GitHub
commit 1e9358f95d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 429 additions and 168 deletions

View File

@ -8,15 +8,17 @@
//!
//! # Notes on data flow
//! * **Client Request → Warp**:
//! Warp filters for valid requests and parses request data. Based on that data, it repeatedly polls
//! the StreamManager
//! Warp filters for valid requests and parses request data. Based on that data, it generates a `User`
//! representing the client that made the request. The `User` is authenticated, if appropriate. Warp
//! repeatedly polls the StreamManager for information relevant to the User.
//!
//! * **Warp → StreamManager**:
//! The StreamManager consults a hash table to see if there is a currently open PubSub channel. If
//! there is, it uses that channel; if not, it (synchronously) sends a subscribe command to Redis.
//! The StreamManager polls the Receiver, providing info about which StreamManager it is that is
//! doing the polling. The stream manager is also responsible for monitoring the hash table to see
//! if it should unsubscribe from any channels and, if necessary, sending the unsubscribe command.
//! A new `StreamManager` is created for each request. The `StreamManager` exists to manage concurrent
//! access to the (single) `Receiver`, which it can access behind an `Arc<Mutex>`. The `StreamManager`
//! polles the `Receiver` for any updates relvant to the current client. If there are updates, the
//! `StreamManager` filters them with the client's filters and passes any matching updates up to Warp.
//! The `StreamManager` is also responsible for sending `subscribe` commands to Redis (via the
//! `Receiver`) when necessary.
//!
//! * **StreamManger → Receiver**:
//! The Receiver receives data from Redis and stores it in a series of queues (one for each
@ -26,19 +28,25 @@
pub mod error;
pub mod query;
pub mod receiver;
pub mod redis_cmd;
pub mod stream;
pub mod timeline;
pub mod user;
pub mod ws;
use futures::stream::Stream;
use futures::Async;
use receiver::Receiver;
use stream::StreamManager;
use user::{Filter, User};
use user::{Scope, User};
use warp::path;
use warp::Filter as WarpFilter;
fn main() {
pretty_env_logger::init();
let redis_updates = StreamManager::new(Receiver::new());
let redis_updates_sse = redis_updates.blank_copy();
let redis_updates_ws = redis_updates.blank_copy();
let routes = any_of!(
// GET /api/v1/streaming/user/notification [private; notification filter]
@ -64,29 +72,71 @@ fn main() {
)
.untuple_one()
.and(warp::sse())
.and(warp::any().map(move || redis_updates.new_copy()))
.map(
|timeline: String, user: User, sse: warp::sse::Sse, mut event_stream: StreamManager| {
event_stream.add(&timeline, &user);
sse.reply(warp::sse::keep(
event_stream.filter_map(move |item| {
let payload = item["payload"].clone();
let event = item["event"].clone().to_string();
let toot_lang = payload["language"].as_str().expect("redis str").to_string();
let user_langs = user.langs.clone();
match (&user.filter, user_langs) {
(Filter::Notification, _) if event != "notification" => None,
(Filter::Language, Some(ref langs)) if !langs.contains(&toot_lang) => None,
_ => Some((warp::sse::event(event), warp::sse::data(payload))),
}
}),
None,
))
},
)
.map(move |timeline: String, user: User, sse: warp::sse::Sse| {
let mut redis_stream = redis_updates_sse.configure_copy(&timeline, user);
let event_stream = tokio::timer::Interval::new(
std::time::Instant::now(),
std::time::Duration::from_millis(100),
)
.filter_map(move |_| match redis_stream.poll() {
Ok(Async::Ready(Some(json_value))) => Some((
warp::sse::event(json_value["event"].clone().to_string()),
warp::sse::data(json_value["payload"].clone()),
)),
_ => None,
});
sse.reply(warp::sse::keep(event_stream, None))
})
.with(warp::reply::with::header("Connection", "keep-alive"))
.recover(error::handle_errors);
warp::serve(routes).run(([127, 0, 0, 1], 3030));
//let redis_updates_ws = StreamManager::new(Receiver::new());
let websocket = path!("api" / "v1" / "streaming")
.and(Scope::Public.get_access_token())
.and_then(|token| User::from_access_token(token, Scope::Public))
.and(warp::query())
.and(query::Media::to_filter())
.and(query::Hashtag::to_filter())
.and(query::List::to_filter())
.and(warp::ws2())
.and_then(
move |mut user: User,
q: query::Stream,
m: query::Media,
h: query::Hashtag,
l: query::List,
ws: warp::ws::Ws2| {
let unauthorized = Err(warp::reject::custom("Error: Invalid Access Token"));
let timeline = match q.stream.as_ref() {
// Public endpoints:
tl @ "public" | tl @ "public:local" if m.is_truthy() => format!("{}:media", tl),
tl @ "public:media" | tl @ "public:local:media" => tl.to_string(),
tl @ "public" | tl @ "public:local" => tl.to_string(),
// User
"user" if user.id == -1 => return unauthorized,
"user" => format!("{}", user.id),
"user:notification" => {
user = user.with_notification_filter();
format!("{}", user.id)
}
// Hashtag endpoints:
// TODO: handle missing query
tl @ "hashtag" | tl @ "hashtag:local" => format!("{}:{}", tl, h.tag),
// List endpoint:
// TODO: handle missing query
"list" if user.authorized_for_list(l.list).is_err() => return unauthorized,
"list" => format!("list:{}", l.list),
// Direct endpoint:
"direct" if user.id == -1 => return unauthorized,
"direct" => "direct".to_string(),
// Other endpoints don't exist:
_ => return Err(warp::reject::custom("Error: Nonexistent WebSocket query")),
};
let stream = redis_updates_ws.configure_copy(&timeline, user);
Ok(ws.on_upgrade(move |socket| ws::send_replies(socket, stream)))
},
);
warp::serve(websocket.or(routes)).run(([127, 0, 0, 1], 4000));
}

View File

@ -1,19 +1,66 @@
//! Validate query prarams with type checking
use serde_derive::Deserialize;
use warp::filters::BoxedFilter;
use warp::Filter as WarpFilter;
#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Default)]
pub struct Media {
pub only_media: String,
}
#[derive(Deserialize, Debug)]
impl Media {
pub fn to_filter() -> BoxedFilter<(Self,)> {
warp::query()
.or(warp::any().map(Self::default))
.unify()
.boxed()
}
pub fn is_truthy(&self) -> bool {
self.only_media == "true" || self.only_media == "1"
}
}
#[derive(Deserialize, Debug, Default)]
pub struct Hashtag {
pub tag: String,
}
#[derive(Deserialize, Debug)]
impl Hashtag {
pub fn to_filter() -> BoxedFilter<(Self,)> {
warp::query()
.or(warp::any().map(Self::default))
.unify()
.boxed()
}
}
#[derive(Deserialize, Debug, Default)]
pub struct List {
pub list: i64,
}
impl List {
pub fn to_filter() -> BoxedFilter<(Self,)> {
warp::query()
.or(warp::any().map(Self::default))
.unify()
.boxed()
}
}
#[derive(Deserialize, Debug)]
pub struct Auth {
pub access_token: String,
}
#[derive(Deserialize, Debug)]
pub struct Stream {
pub stream: String,
}
impl ToString for Stream {
fn to_string(&self) -> String {
format!("{:?}", self)
}
}
pub fn optional_media_query() -> BoxedFilter<(Media,)> {
warp::query()
.or(warp::any().map(|| Media {
only_media: "false".to_owned(),
}))
.unify()
.boxed()
}

View File

@ -1,4 +1,5 @@
//! Interfacing with Redis and stream the results on to the `StreamManager`
use crate::redis_cmd;
use crate::user::User;
use futures::stream::Stream;
use futures::{Async, Poll};
@ -6,57 +7,138 @@ use log::info;
use regex::Regex;
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, Error};
use uuid::Uuid;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::Duration;
#[derive(Debug)]
struct MsgQueue {
messages: VecDeque<Value>,
last_polled_at: Instant,
redis_channel: String,
}
impl MsgQueue {
fn new(redis_channel: impl std::fmt::Display) -> Self {
let redis_channel = redis_channel.to_string();
MsgQueue {
messages: VecDeque::new(),
last_polled_at: Instant::now(),
redis_channel,
}
}
}
/// The item that streams from Redis and is polled by the `StreamManger`
#[derive(Debug)]
pub struct Receiver {
stream: TcpStream,
pubsub_connection: TcpStream,
secondary_redis_connection: TcpStream,
tl: String,
pub user: User,
polled_by: Uuid,
msg_queue: HashMap<Uuid, VecDeque<Value>>,
manager_id: Uuid,
msg_queues: HashMap<Uuid, MsgQueue>,
clients_per_timeline: HashMap<String, i32>,
}
impl Default for Receiver {
fn default() -> Self {
Self::new()
}
}
impl Receiver {
pub fn new() -> Self {
let stream = TcpStream::connect("127.0.0.1:6379").unwrap();
stream
let pubsub_connection = TcpStream::connect("127.0.0.1:6379").expect("Can connect to Redis");
pubsub_connection
.set_read_timeout(Some(Duration::from_millis(10)))
.unwrap();
.expect("Can set read timeout for Redis connection");
let secondary_redis_connection =
TcpStream::connect("127.0.0.1:6379").expect("Can connect to Redis");
secondary_redis_connection
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("Can set read timeout for Redis connection");
Self {
stream,
pubsub_connection,
secondary_redis_connection,
tl: String::new(),
user: User::public(),
polled_by: Uuid::new_v4(),
msg_queue: HashMap::new(),
manager_id: Uuid::new_v4(),
msg_queues: HashMap::new(),
clients_per_timeline: HashMap::new(),
}
}
/// Update the `StreamManager` that is currently polling the `Receiver`
pub fn set_polled_by(&mut self, id: Uuid) -> &Self {
self.polled_by = id;
self
pub fn update(&mut self, id: Uuid, timeline: impl std::fmt::Display) {
self.manager_id = id;
self.tl = timeline.to_string();
}
/// Send a subscribe command to the Redis PubSub
pub fn subscribe(&mut self, tl: &str) {
let subscribe_cmd = redis_cmd_from("subscribe", &tl);
/// Send a subscribe command to the Redis PubSub (if needed)
pub fn maybe_subscribe(&mut self, tl: &str) {
info!("Subscribing to {}", &tl);
self.stream
.write_all(&subscribe_cmd)
.expect("Can subscribe to Redis");
let manager_id = self.manager_id;
self.msg_queues.insert(manager_id, MsgQueue::new(tl));
let current_clients = self
.clients_per_timeline
.entry(tl.to_string())
.and_modify(|n| *n += 1)
.or_insert(1);
if *current_clients == 1 {
let subscribe_cmd = redis_cmd::pubsub("subscribe", tl);
self.pubsub_connection
.write_all(&subscribe_cmd)
.expect("Can subscribe to Redis");
let set_subscribed_cmd = redis_cmd::set(format!("subscribed:timeline:{}", tl), "1");
self.secondary_redis_connection
.write_all(&set_subscribed_cmd)
.expect("Can set Redis");
info!("Now subscribed to: {:#?}", &self.msg_queues);
}
}
/// Drop any PubSub subscriptions that don't have active clients
pub fn unsubscribe_from_empty_channels(&mut self) {
let mut timelines_with_fewer_clients = Vec::new();
// Keep only message queues that have been polled recently
self.msg_queues.retain(|_id, msg_queue| {
if msg_queue.last_polled_at.elapsed() < Duration::from_secs(30) {
true
} else {
timelines_with_fewer_clients.push(msg_queue.redis_channel.clone());
false
}
});
// Record the lower number of clients subscribed to that channel
for timeline in timelines_with_fewer_clients {
let count_of_subscribed_clients = self
.clients_per_timeline
.entry(timeline.clone())
.and_modify(|n| *n -= 1)
.or_insert(0);
// If no clients, unsubscribe from the channel
if *count_of_subscribed_clients <= 0 {
self.unsubscribe(&timeline);
}
}
}
/// Send an unsubscribe command to the Redis PubSub
pub fn unsubscribe(&mut self, tl: &str) {
let unsubscribe_cmd = redis_cmd_from("unsubscribe", &tl);
info!("Subscribing to {}", &tl);
self.stream
let unsubscribe_cmd = redis_cmd::pubsub("unsubscribe", tl);
info!("Unsubscribing from {}", &tl);
self.pubsub_connection
.write_all(&unsubscribe_cmd)
.expect("Can unsubscribe from Redis");
let set_subscribed_cmd = redis_cmd::set(format!("subscribed:timeline:{}", tl), "0");
self.secondary_redis_connection
.write_all(&set_subscribed_cmd)
.expect("Can set Redis");
info!("Now subscribed only to: {:#?}", &self.msg_queues);
}
}
impl Stream for Receiver {
@ -65,33 +147,50 @@ impl Stream for Receiver {
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
let mut buffer = vec![0u8; 3000];
let polled_by = self.polled_by;
self.msg_queue
.entry(polled_by)
.or_insert_with(VecDeque::new);
info!("Being polled by StreamManager with uuid: {}", polled_by);
info!("Being polled by: {}", self.manager_id);
let timeline = self.tl.clone();
let mut async_stream = AsyncReadableStream(&mut self.stream);
// Record current time as last polled time
self.msg_queues
.entry(self.manager_id)
.and_modify(|msg_queue| msg_queue.last_polled_at = Instant::now());
// Add any incomming messages to the back of the relevant `msg_queues`
// NOTE: This could be more/other than the `msg_queue` currently being polled
let mut async_stream = AsyncReadableStream(&mut self.pubsub_connection);
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer)? {
let raw_redis_response = &String::from_utf8_lossy(&buffer[..num_bytes_read]);
// capture everything between `{` and `}` as potential JSON
// TODO: figure out if `(?x)` is needed
let re = Regex::new(r"(?P<json>\{.*\})").expect("Valid hard-coded regex");
let json_regex = Regex::new(r"(?P<json>\{.*\})").expect("Hard-coded");
// capture the timeline so we know which queues to add it to
let timeline_regex = Regex::new(r"timeline:(?P<timeline>.*?)\r").expect("Hard-codded");
if let Some(result) = json_regex.captures(raw_redis_response) {
let timeline =
timeline_regex.captures(raw_redis_response).unwrap()["timeline"].to_string();
if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) {
let json: Value = serde_json::from_str(&cap["json"].to_string().clone())?;
for value in self.msg_queue.values_mut() {
value.push_back(json.clone());
let msg: Value = serde_json::from_str(&result["json"].to_string().clone())?;
for msg_queue in self.msg_queues.values_mut() {
if msg_queue.redis_channel == timeline {
msg_queue.messages.push_back(msg.clone());
}
}
}
}
if let Some(value) = self.msg_queue.entry(polled_by).or_default().pop_front() {
Ok(Async::Ready(Some(value)))
} else {
Ok(Async::NotReady)
// If the `msg_queue` being polled has any new messages, return the first (oldest) one
match self
.msg_queues
.entry(self.manager_id)
.or_insert_with(|| MsgQueue::new(timeline))
.messages
.pop_front()
{
Some(value) => Ok(Async::Ready(Some(value))),
_ => Ok(Async::NotReady),
}
}
}
impl Drop for Receiver {
fn drop(&mut self) {
let timeline = self.tl.clone();
@ -115,16 +214,3 @@ impl<'a> AsyncRead for AsyncReadableStream<'a> {
}
}
}
fn redis_cmd_from(cmd: impl std::fmt::Display, timeline: impl std::fmt::Display) -> Vec<u8> {
let (cmd, arg) = (cmd.to_string(), format!("timeline:{}", timeline));
format!(
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
cmd_length = cmd.len(),
cmd = cmd,
arg_length = arg.len(),
arg = arg
)
.as_bytes()
.to_owned()
}

31
src/redis_cmd.rs Normal file
View File

@ -0,0 +1,31 @@
//! Send raw TCP commands to the Redis server
use std::fmt::Display;
/// Send a `SUBSCRIBE` or `UNSUBSCRIBE` command to a specific timeline
pub fn pubsub(command: impl Display, timeline: impl Display) -> Vec<u8> {
let arg = format!("timeline:{}", timeline);
let command = command.to_string();
format!(
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
cmd_length = command.len(),
cmd = command,
arg_length = arg.len(),
arg = arg
)
.as_bytes()
.to_owned()
}
/// Send a `SET` command
pub fn set(key: impl Display, value: impl Display) -> Vec<u8> {
let (key, value) = (key.to_string(), value.to_string());
format!(
"*3\r\n$3\r\nSET\r\n${key_length}\r\n{key}\r\n${value_length}\r\n{value}\r\n",
key_length = key.len(),
key = key,
value_length = value.len(),
value = value
)
.as_bytes()
.to_owned()
}

View File

@ -1,86 +1,87 @@
//! Manage all existing Redis PubSub connection
use crate::receiver::Receiver;
use crate::user::User;
use crate::user::{Filter, User};
use futures::stream::Stream;
use futures::{Async, Poll};
use serde_json::json;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tokio::io::Error;
use uuid::Uuid;
/// Struct for manageing all Redis streams
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct StreamManager {
receiver: Arc<Mutex<Receiver>>,
subscriptions: Arc<Mutex<HashMap<String, Instant>>>,
current_stream: String,
id: uuid::Uuid,
target_timeline: String,
current_user: Option<User>,
}
impl StreamManager {
pub fn new(reciever: Receiver) -> Self {
StreamManager {
receiver: Arc::new(Mutex::new(reciever)),
subscriptions: Arc::new(Mutex::new(HashMap::new())),
current_stream: String::new(),
id: Uuid::new_v4(),
id: Uuid::default(),
target_timeline: String::new(),
current_user: None,
}
}
/// Clone the StreamManager with a new unique id
pub fn new_copy(&self) -> Self {
/// Create a blank StreamManager copy
pub fn blank_copy(&self) -> Self {
StreamManager { ..self.clone() }
}
/// Create a StreamManager copy with a new unique id manage subscriptions
pub fn configure_copy(&self, timeline: &String, user: User) -> Self {
let id = Uuid::new_v4();
StreamManager { id, ..self.clone() }
}
/// Subscribe to a channel if not already subscribed
///
///
/// `.add()` also unsubscribes from any channels that no longer have clients
pub fn add(&mut self, timeline: &str, _user: &User) -> &Self {
let mut subscriptions = self.subscriptions.lock().expect("No other thread panic");
let mut receiver = self.receiver.lock().unwrap();
subscriptions
.entry(timeline.to_string())
.or_insert_with(|| {
receiver.subscribe(timeline);
Instant::now()
});
// Unsubscribe from that haven't been polled in the last 30 seconds
let channels = subscriptions.clone();
let channels_to_unsubscribe = channels
.iter()
.filter(|(_, time)| time.elapsed().as_secs() > 30);
for (channel, _) in channels_to_unsubscribe {
receiver.unsubscribe(&channel);
let mut receiver = self.receiver.lock().expect("No panic in other threads");
receiver.update(id, timeline);
receiver.maybe_subscribe(timeline);
StreamManager {
id,
current_user: Some(user),
target_timeline: timeline.clone(),
..self.clone()
}
// Update our map of streams
*subscriptions = channels
.clone()
.into_iter()
.filter(|(_, time)| time.elapsed().as_secs() > 30)
.collect();
self.current_stream = timeline.to_string();
self
}
}
impl Stream for StreamManager {
type Item = Value;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut subscriptions = self.subscriptions.lock().expect("No other thread panic");
let target_stream = self.current_stream.clone();
subscriptions.insert(target_stream.clone(), Instant::now());
let mut receiver = self.receiver.lock().expect("No other thread panic");
receiver.set_polled_by(self.id);
receiver.update(self.id, &self.target_timeline.clone());
match receiver.poll() {
Ok(Async::Ready(Some(value))) => Ok(Async::Ready(Some(value))),
Ok(Async::Ready(Some(value))) => {
let user = self
.clone()
.current_user
.expect("Previously set current user");
let user_langs = user.langs.clone();
let copy = value.clone();
let event = copy["event"].as_str().expect("Redis string");
let copy = value.clone();
let payload = copy["payload"].to_string();
let copy = value.clone();
let toot_lang = copy["payload"]["language"]
.as_str()
.expect("redis str")
.to_string();
match (&user.filter, user_langs) {
(Filter::Notification, _) if event != "notification" => Ok(Async::NotReady),
(Filter::Language, Some(ref langs)) if !langs.contains(&toot_lang) => {
Ok(Async::NotReady)
}
_ => Ok(Async::Ready(Some(json!(
{"event": event,
"payload": payload,}
)))),
}
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),

View File

@ -142,7 +142,7 @@ pub fn list() -> BoxedFilter<TimelineUser> {
.and(Scope::Private.get_access_token())
.and_then(|token| User::from_access_token(token, Scope::Private))
.and(warp::query())
.and_then(|user: User, q: query::List| (user.is_authorized_for_list(q.list), Ok(user)))
.and_then(|user: User, q: query::List| (user.authorized_for_list(q.list), Ok(user)))
.untuple_one()
.and(path::end())
.map(|list: i64, user: User| (format!("list:{}", list), user.with_no_filter()))
@ -191,10 +191,10 @@ mod tests {
access_token
))
.filter(&user())
.unwrap();
.expect("in test");
let expected_user =
User::from_access_token(access_token.clone(), user::Scope::Private).unwrap();
User::from_access_token(access_token.clone(), user::Scope::Private).expect("in test");
assert_eq!(actual_timeline, "1");
assert_eq!(actual_user, expected_user);
@ -204,9 +204,10 @@ mod tests {
.path("/api/v1/streaming/user")
.header("Authorization", format!("Bearer: {}", access_token.clone()))
.filter(&user())
.unwrap();
.expect("in test");
let expected_user = User::from_access_token(access_token, user::Scope::Private).unwrap();
let expected_user =
User::from_access_token(access_token, user::Scope::Private).expect("in test");
assert_eq!(actual_timeline, "1");
assert_eq!(actual_user, expected_user);
@ -240,10 +241,10 @@ mod tests {
access_token
))
.filter(&user_notifications())
.unwrap();
.expect("in test");
let expected_user = User::from_access_token(access_token.clone(), user::Scope::Private)
.unwrap()
.expect("in test")
.with_notification_filter();
assert_eq!(actual_timeline, "1");
@ -254,10 +255,10 @@ mod tests {
.path("/api/v1/streaming/user/notification")
.header("Authorization", format!("Bearer: {}", access_token.clone()))
.filter(&user_notifications())
.unwrap();
.expect("in test");
let expected_user = User::from_access_token(access_token, user::Scope::Private)
.unwrap()
.expect("in test")
.with_notification_filter();
assert_eq!(actual_timeline, "1");
@ -268,7 +269,7 @@ mod tests {
let value = warp::test::request()
.path("/api/v1/streaming/public")
.filter(&public())
.unwrap();
.expect("in test");
assert_eq!(value.0, "public".to_string());
assert_eq!(value.1, User::public().with_language_filter());
@ -279,7 +280,7 @@ mod tests {
let value = warp::test::request()
.path("/api/v1/streaming/public?only_media=true")
.filter(&public_media())
.unwrap();
.expect("in test");
assert_eq!(value.0, "public:media".to_string());
assert_eq!(value.1, User::public().with_language_filter());
@ -287,7 +288,7 @@ mod tests {
let value = warp::test::request()
.path("/api/v1/streaming/public?only_media=1")
.filter(&public_media())
.unwrap();
.expect("in test");
assert_eq!(value.0, "public:media".to_string());
assert_eq!(value.1, User::public().with_language_filter());
@ -298,7 +299,7 @@ mod tests {
let value = warp::test::request()
.path("/api/v1/streaming/public/local")
.filter(&public_local())
.unwrap();
.expect("in test");
assert_eq!(value.0, "public:local".to_string());
assert_eq!(value.1, User::public().with_language_filter());
@ -309,7 +310,7 @@ mod tests {
let value = warp::test::request()
.path("/api/v1/streaming/public/local?only_media=true")
.filter(&public_local_media())
.unwrap();
.expect("in test");
assert_eq!(value.0, "public:local:media".to_string());
assert_eq!(value.1, User::public().with_language_filter());
@ -317,7 +318,7 @@ mod tests {
let value = warp::test::request()
.path("/api/v1/streaming/public/local?only_media=1")
.filter(&public_local_media())
.unwrap();
.expect("in test");
assert_eq!(value.0, "public:local:media".to_string());
assert_eq!(value.1, User::public().with_language_filter());
@ -351,10 +352,10 @@ mod tests {
access_token
))
.filter(&direct())
.unwrap();
.expect("in test");
let expected_user =
User::from_access_token(access_token.clone(), user::Scope::Private).unwrap();
User::from_access_token(access_token.clone(), user::Scope::Private).expect("in test");
assert_eq!(actual_timeline, "direct:1");
assert_eq!(actual_user, expected_user);
@ -364,9 +365,10 @@ mod tests {
.path("/api/v1/streaming/direct")
.header("Authorization", format!("Bearer: {}", access_token.clone()))
.filter(&direct())
.unwrap();
.expect("in test");
let expected_user = User::from_access_token(access_token, user::Scope::Private).unwrap();
let expected_user =
User::from_access_token(access_token, user::Scope::Private).expect("in test");
assert_eq!(actual_timeline, "direct:1");
assert_eq!(actual_user, expected_user);
@ -377,7 +379,7 @@ mod tests {
let value = warp::test::request()
.path("/api/v1/streaming/hashtag?tag=a")
.filter(&hashtag())
.unwrap();
.expect("in test");
assert_eq!(value.0, "hashtag:a".to_string());
assert_eq!(value.1, User::public());
@ -388,7 +390,7 @@ mod tests {
let value = warp::test::request()
.path("/api/v1/streaming/hashtag/local?tag=a")
.filter(&hashtag_local())
.unwrap();
.expect("in test");
assert_eq!(value.0, "hashtag:a:local".to_string());
assert_eq!(value.1, User::public());
@ -408,10 +410,10 @@ mod tests {
access_token, list_id,
))
.filter(&list())
.unwrap();
.expect("in test");
let expected_user =
User::from_access_token(access_token.clone(), user::Scope::Private).unwrap();
User::from_access_token(access_token.clone(), user::Scope::Private).expect("in test");
assert_eq!(actual_timeline, "list:1");
assert_eq!(actual_user, expected_user);
@ -421,9 +423,10 @@ mod tests {
.path("/api/v1/streaming/list?list=1")
.header("Authorization", format!("Bearer: {}", access_token.clone()))
.filter(&list())
.unwrap();
.expect("in test");
let expected_user = User::from_access_token(access_token, user::Scope::Private).unwrap();
let expected_user =
User::from_access_token(access_token, user::Scope::Private).expect("in test");
assert_eq!(actual_timeline, "list:1");
assert_eq!(actual_user, expected_user);
@ -452,7 +455,7 @@ mod tests {
"SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1",
&[&list_number],
)
.unwrap();
.expect("in test");
assert_eq!(
rows.len(),

View File

@ -10,7 +10,7 @@ pub fn connect_to_postgres() -> postgres::Connection {
"postgres://dsock@localhost/mastodon_development",
postgres::TlsMode::None,
)
.unwrap()
.expect("Can connect to local Postgres")
}
/// The filters that can be applied to toots after they come from Redis
@ -59,7 +59,7 @@ LIMIT 1",
filter: Filter::None,
})
} else if let Scope::Public = scope {
info!("Granting public access");
info!("Granting public access to non-authenticated client");
Ok(User {
id: -1,
langs: None,
@ -92,7 +92,7 @@ LIMIT 1",
}
}
/// Determine whether the User is authorised for a specified list
pub fn is_authorized_for_list(&self, list: i64) -> Result<i64, warp::reject::Rejection> {
pub fn authorized_for_list(&self, list: i64) -> Result<i64, warp::reject::Rejection> {
let conn = connect_to_postgres();
// For the Postgres query, `id` = list number; `account_id` = user.id
let rows = &conn

View File

@ -1 +0,0 @@

44
src/ws.rs Normal file
View File

@ -0,0 +1,44 @@
//! WebSocket-specific functionality
use crate::stream::StreamManager;
use futures::future::Future;
use futures::stream::Stream;
use futures::Async;
/// Send a stream of replies to a WebSocket client
pub fn send_replies(
socket: warp::ws::WebSocket,
mut stream: StreamManager,
) -> impl futures::future::Future<Item = (), Error = ()> {
let (tx, rx) = futures::sync::mpsc::unbounded();
let (ws_tx, mut ws_rx) = socket.split();
warp::spawn(
rx.map_err(|()| -> warp::Error { unreachable!() })
.forward(ws_tx)
.map_err(|_| ())
.map(|_r| ()),
);
let event_stream = tokio::timer::Interval::new(
std::time::Instant::now(),
std::time::Duration::from_millis(100),
)
.take_while(move |_| {
if ws_rx.poll().is_err() {
futures::future::ok(false)
} else {
futures::future::ok(true)
}
});
event_stream
.for_each(move |_json_value| {
if let Ok(Async::Ready(Some(json_value))) = stream.poll() {
let msg = warp::ws::Message::text(json_value.to_string());
if !tx.is_closed() {
tx.unbounded_send(msg).expect("No send error");
}
};
Ok(())
})
.then(|msg| msg)
.map_err(|e| println!("{}", e))
}