diff --git a/src/main.rs b/src/main.rs index 58673f7..2d676f1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,15 +8,17 @@ //! //! # Notes on data flow //! * **Client Request → Warp**: -//! Warp filters for valid requests and parses request data. Based on that data, it repeatedly polls -//! the StreamManager +//! Warp filters for valid requests and parses request data. Based on that data, it generates a `User` +//! representing the client that made the request. The `User` is authenticated, if appropriate. Warp +//! repeatedly polls the StreamManager for information relevant to the User. //! //! * **Warp → StreamManager**: -//! The StreamManager consults a hash table to see if there is a currently open PubSub channel. If -//! there is, it uses that channel; if not, it (synchronously) sends a subscribe command to Redis. -//! The StreamManager polls the Receiver, providing info about which StreamManager it is that is -//! doing the polling. The stream manager is also responsible for monitoring the hash table to see -//! if it should unsubscribe from any channels and, if necessary, sending the unsubscribe command. +//! A new `StreamManager` is created for each request. The `StreamManager` exists to manage concurrent +//! access to the (single) `Receiver`, which it can access behind an `Arc`. The `StreamManager` +//! polles the `Receiver` for any updates relvant to the current client. If there are updates, the +//! `StreamManager` filters them with the client's filters and passes any matching updates up to Warp. +//! The `StreamManager` is also responsible for sending `subscribe` commands to Redis (via the +//! `Receiver`) when necessary. //! //! * **StreamManger → Receiver**: //! The Receiver receives data from Redis and stores it in a series of queues (one for each @@ -29,71 +31,62 @@ pub mod receiver; pub mod stream; pub mod timeline; pub mod user; +pub mod ws; use futures::stream::Stream; use receiver::Receiver; use stream::StreamManager; -use user::{Filter, User}; +use user::{Scope, User}; +use warp::path; use warp::Filter as WarpFilter; fn main() { pretty_env_logger::init(); - // let redis_updates = StreamManager::new(Receiver::new()); + let redis_updates = StreamManager::new(Receiver::new()); - // let routes = any_of!( - // // GET /api/v1/streaming/user/notification [private; notification filter] - // timeline::user_notifications(), - // // GET /api/v1/streaming/user [private; language filter] - // timeline::user(), - // // GET /api/v1/streaming/public/local?only_media=true [public; language filter] - // timeline::public_local_media(), - // // GET /api/v1/streaming/public?only_media=true [public; language filter] - // timeline::public_media(), - // // GET /api/v1/streaming/public/local [public; language filter] - // timeline::public_local(), - // // GET /api/v1/streaming/public [public; language filter] - // timeline::public(), - // // GET /api/v1/streaming/direct [private; *no* filter] - // timeline::direct(), - // // GET /api/v1/streaming/hashtag?tag=:hashtag [public; no filter] - // timeline::hashtag(), - // // GET /api/v1/streaming/hashtag/local?tag=:hashtag [public; no filter] - // timeline::hashtag_local(), - // // GET /api/v1/streaming/list?list=:list_id [private; no filter] - // timeline::list() - // ) - // .untuple_one() - // .and(warp::sse()) - // .and(warp::any().map(move || redis_updates.new_copy())) - // .map( - // |timeline: String, user: User, sse: warp::sse::Sse, mut event_stream: StreamManager| { - // dbg!(&event_stream); - // event_stream.add(&timeline, &user); - // sse.reply(warp::sse::keep( - // event_stream.filter_map(move |item| { - // let payload = item["payload"].clone(); - // let event = item["event"].clone().to_string(); - // let toot_lang = payload["language"].as_str().expect("redis str").to_string(); - // let user_langs = user.langs.clone(); + let routes = any_of!( + // GET /api/v1/streaming/user/notification [private; notification filter] + timeline::user_notifications(), + // GET /api/v1/streaming/user [private; language filter] + timeline::user(), + // GET /api/v1/streaming/public/local?only_media=true [public; language filter] + timeline::public_local_media(), + // GET /api/v1/streaming/public?only_media=true [public; language filter] + timeline::public_media(), + // GET /api/v1/streaming/public/local [public; language filter] + timeline::public_local(), + // GET /api/v1/streaming/public [public; language filter] + timeline::public(), + // GET /api/v1/streaming/direct [private; *no* filter] + timeline::direct(), + // GET /api/v1/streaming/hashtag?tag=:hashtag [public; no filter] + timeline::hashtag(), + // GET /api/v1/streaming/hashtag/local?tag=:hashtag [public; no filter] + timeline::hashtag_local(), + // GET /api/v1/streaming/list?list=:list_id [private; no filter] + timeline::list() + ) + .untuple_one() + .and(warp::sse()) + .and(warp::any().map(move || redis_updates.new_copy())) + .map( + |timeline: String, user: User, sse: warp::sse::Sse, mut event_stream: StreamManager| { + dbg!(&event_stream); + event_stream.add(&timeline, &user); + event_stream.set_user(user.clone()); + sse.reply(warp::sse::keep( + event_stream.filter_map(move |item| { + let payload = item["payload"].clone(); + let event = item["event"].clone().to_string(); + Some((warp::sse::event(event), warp::sse::data(payload))) + }), + None, + )) + }, + ) + .with(warp::reply::with::header("Connection", "keep-alive")) + .recover(error::handle_errors); - // match (&user.filter, user_langs) { - // (Filter::Notification, _) if event != "notification" => None, - // (Filter::Language, Some(ref langs)) if !langs.contains(&toot_lang) => None, - // _ => Some((warp::sse::event(event), warp::sse::data(payload))), - // } - // }), - // None, - // )) - // }, - // ) - // .with(warp::reply::with::header("Connection", "keep-alive")) - // .recover(error::handle_errors); - - use futures::future::Future; - use futures::sink::Sink; - use futures::Async; - use user::Scope; - use warp::path; let redis_updates_ws = StreamManager::new(Receiver::new()); let websocket = path!("api" / "v1" / "streaming") .and(Scope::Public.get_access_token()) @@ -103,10 +96,7 @@ fn main() { .and(query::Hashtag::to_filter()) .and(query::List::to_filter()) .and(warp::ws2()) - .and(warp::any().map(move || { - println!("Getting StreamManager.new_copy()"); - redis_updates_ws.new_copy() - })) + .and(warp::any().map(move || redis_updates_ws.new_copy())) .and_then( |mut user: User, q: query::Stream, @@ -115,13 +105,12 @@ fn main() { l: query::List, ws: warp::ws::Ws2, mut stream: StreamManager| { - println!("DING"); let unauthorized = Err(warp::reject::custom("Error: Invalid Access Token")); let timeline = match q.stream.as_ref() { // Public endpoints: tl @ "public" | tl @ "public:local" if m.is_truthy() => format!("{}:media", tl), - tl @ "public:media" | tl @ "public:local:media" => format!("{}", tl), - tl @ "public" | tl @ "public:local" => format!("{}", tl), + tl @ "public:media" | tl @ "public:local:media" => tl.to_string(), + tl @ "public" | tl @ "public:local" => tl.to_string(), // User "user" if user.id == -1 => return unauthorized, "user" => format!("{}", user.id), @@ -138,119 +127,16 @@ fn main() { "list" => format!("list:{}", l.list), // Direct endpoint: "direct" if user.id == -1 => return unauthorized, - "direct" => format!("direct"), + "direct" => "direct".to_string(), // Other endpoints don't exist: _ => return Err(warp::reject::custom("Error: Nonexistent WebSocket query")), }; stream.add(&timeline, &user); stream.set_user(user); - dbg!(&stream); - Ok(ws.on_upgrade(move |socket| handle_ws(socket, stream))) + Ok(ws.on_upgrade(move |socket| ws::handle_ws(socket, stream))) }, ); - fn handle_ws( - socket: warp::ws::WebSocket, - mut stream: StreamManager, - ) -> impl futures::future::Future { - let (mut tx, rx) = futures::sync::mpsc::unbounded(); - let (ws_tx, mut ws_rx) = socket.split(); - // let event_stream = stream - // .map(move |value| warp::ws::Message::text(value.to_string())) - // .map_err(|_| unreachable!()); - warp::spawn( - rx.map_err(|()| -> warp::Error { unreachable!() }) - .forward(ws_tx) - .map_err(|_| ()) - .map(|_r| ()), - ); - let event_stream = tokio::timer::Interval::new( - std::time::Instant::now(), - std::time::Duration::from_secs(10), - ) - .take_while(move |_| { - if ws_rx.poll().is_err() { - println!("Need to close WS"); - futures::future::ok(false) - } else { - // println!("We can still send to WS"); - futures::future::ok(true) - } - }); - - event_stream - .for_each(move |_json_value| { - // println!("For each triggered"); - if let Ok(Async::Ready(Some(json_value))) = stream.poll() { - let msg = warp::ws::Message::text(json_value.to_string()); - tx.unbounded_send(msg).unwrap(); - }; - Ok(()) - }) - .then(|msg| { - println!("Done with stream"); - msg - }) - .map_err(|e| { - println!("{}", e); - }) - } - - let log = warp::any().map(|| { - println!("----got request----"); - warp::reply() - }); - warp::serve(websocket.or(log)).run(([127, 0, 0, 1], 3030)); + warp::serve(websocket.or(routes)).run(([127, 0, 0, 1], 4000)); } - -// loop { -// //println!("Awake"); -// match stream.poll() { -// Err(_) | Ok(Async::Ready(None)) => { -// eprintln!("Breaking out of poll loop due to an error"); -// break; -// } -// Ok(Async::NotReady) => (), -// Ok(Async::Ready(Some(item))) => { -// let user_langs = user.langs.clone(); -// let copy = item.clone(); -// let event = copy["event"].as_str().unwrap(); -// let copy = item.clone(); -// let payload = copy["payload"].to_string(); -// let copy = item.clone(); -// let toot_lang = copy["payload"]["language"] -// .as_str() -// .expect("redis str") -// .to_string(); - -// println!("sending: {:?}", &payload); -// match (&user.filter, user_langs) { -// (Filter::Notification, _) if event != "notification" => continue, -// (Filter::Language, Some(ref langs)) if !langs.contains(&toot_lang) => { -// continue; -// } -// _ => match tx.unbounded_send(warp::ws::Message::text( -// json!( -// {"event": event, -// "payload": payload,} -// ) -// .to_string(), -// )) { -// Ok(()) => println!("Sent OK"), -// Err(e) => { -// println!("Couldn't send: {}", e); -// } -// }, -// } -// } -// }; -// if ws_rx.poll().is_err() { -// println!("Need to close WS"); -// break; -// } else { -// println!("We can still send to WS"); -// } -// std::thread::sleep(std::time::Duration::from_millis(2000)); -// //println!("Asleep"); -// } diff --git a/src/query.rs b/src/query.rs index 4ec5d15..8b43d71 100644 --- a/src/query.rs +++ b/src/query.rs @@ -10,7 +10,7 @@ pub struct Media { impl Media { pub fn to_filter() -> BoxedFilter<(Self,)> { warp::query() - .or(warp::any().map(|| Self::default())) + .or(warp::any().map(Self::default)) .unify() .boxed() } @@ -25,7 +25,7 @@ pub struct Hashtag { impl Hashtag { pub fn to_filter() -> BoxedFilter<(Self,)> { warp::query() - .or(warp::any().map(|| Self::default())) + .or(warp::any().map(Self::default)) .unify() .boxed() } @@ -37,7 +37,7 @@ pub struct List { impl List { pub fn to_filter() -> BoxedFilter<(Self,)> { warp::query() - .or(warp::any().map(|| Self::default())) + .or(warp::any().map(Self::default)) .unify() .boxed() } diff --git a/src/receiver.rs b/src/receiver.rs index 456f7f3..aa91547 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -41,13 +41,18 @@ pub struct Receiver { msg_queue: HashMap, subscribed_timelines: HashMap, } +impl Default for Receiver { + fn default() -> Self { + Self::new() + } +} impl Receiver { pub fn new() -> Self { - let stream = TcpStream::connect("127.0.0.1:6379").unwrap(); + let stream = TcpStream::connect("127.0.0.1:6379").expect("Can connect to Redis"); stream .set_read_timeout(Some(Duration::from_millis(10))) - .unwrap(); + .expect("Can set read timeout for Redis connection"); Self { stream, tl: String::new(), @@ -73,13 +78,11 @@ impl Receiver { .or_insert(1); let mut timelines_with_dropped_clients = Vec::new(); - self.msg_queue.retain(|id, msg_queue| { + self.msg_queue.retain(|_id, msg_queue| { if msg_queue.last_polled_at.elapsed() > Duration::from_secs(30) { timelines_with_dropped_clients.push(msg_queue.redis_channel.clone()); - println!("Dropping: {}", id); false } else { - println!("Retaining: {}", id); true } }); @@ -99,7 +102,6 @@ impl Receiver { self.stream .write_all(&subscribe_cmd) .expect("Can subscribe to Redis"); - println!("Done subscribing"); } /// Send an unsubscribe command to the Redis PubSub pub fn unsubscribe(&mut self, tl: &str) { @@ -137,11 +139,11 @@ impl Stream for Receiver { } } } - dbg!(&self); + let timeline = self.tl.clone(); if let Some(value) = self .msg_queue .entry(polled_by) - .or_insert(MsgQueue::new(self.tl.clone())) + .or_insert_with(|| MsgQueue::new(timeline)) .messages .pop_front() { diff --git a/src/stream.rs b/src/stream.rs index d4c30f0..57d88a2 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -4,9 +4,7 @@ use crate::user::User; use futures::stream::Stream; use futures::{Async, Poll}; use serde_json::Value; -use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use std::time::Instant; use tokio::io::Error; use uuid::Uuid; @@ -14,7 +12,6 @@ use uuid::Uuid; #[derive(Clone, Debug)] pub struct StreamManager { receiver: Arc>, - //subscriptions: Arc>>, id: uuid::Uuid, current_user: Option, } @@ -22,7 +19,6 @@ impl StreamManager { pub fn new(reciever: Receiver) -> Self { StreamManager { receiver: Arc::new(Mutex::new(reciever)), - // subscriptions: Arc::new(Mutex::new(HashMap::new())), id: Uuid::new_v4(), current_user: None, } @@ -39,13 +35,9 @@ impl StreamManager { /// /// `.add()` also unsubscribes from any channels that no longer have clients pub fn add(&mut self, timeline: &str, _user: &User) { - println!("ADD lock"); - let mut receiver = self.receiver.lock().unwrap(); + let mut receiver = self.receiver.lock().expect("No panic in other threads"); receiver.set_manager_id(self.id); receiver.subscribe(timeline); - dbg!(&receiver); - - println!("ADD unlock"); } pub fn set_user(&mut self, user: User) { @@ -62,13 +54,16 @@ impl Stream for StreamManager { fn poll(&mut self) -> Poll, Self::Error> { let mut receiver = self.receiver.lock().expect("No other thread panic"); receiver.set_manager_id(self.id); - let result = match receiver.poll() { + match receiver.poll() { Ok(Async::Ready(Some(value))) => { - let user = self.clone().current_user.unwrap(); + let user = self + .clone() + .current_user + .expect("Previously set current user"); let user_langs = user.langs.clone(); let copy = value.clone(); - let event = copy["event"].as_str().unwrap(); + let event = copy["event"].as_str().expect("Redis string"); let copy = value.clone(); let payload = copy["payload"].to_string(); let copy = value.clone(); @@ -77,7 +72,6 @@ impl Stream for StreamManager { .expect("redis str") .to_string(); - println!("sending: {:?}", &payload); match (&user.filter, user_langs) { (Filter::Notification, _) if event != "notification" => Ok(Async::NotReady), (Filter::Language, Some(ref langs)) if !langs.contains(&toot_lang) => { @@ -93,34 +87,6 @@ impl Stream for StreamManager { Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::NotReady) => Ok(Async::NotReady), Err(e) => Err(e), - }; - // dbg!(&result); - result + } } } - -// CUT FROM .add -// let mut subscriptions = self.subscriptions.lo ck().expect("No other thread panic"); -// subscriptions -// .entry(timeline.to_string()) -// .or_insert_with(|| { -// println!("Inserting TL: {}", &timeline); -//***** // -// Instant::now() -// }); - -// self.current_stream = timeline.to_string(); -// // Unsubscribe from that haven't been polled in the last 30 seconds -// let channels = subscriptions.clone(); -// let channels_to_unsubscribe = channels -// .iter() -// .filter(|(_, time)| time.elapsed().as_secs() > 30); -// for (channel, _) in channels_to_unsubscribe { -//***** // receiver.unsubscribe(&channel); -// } -// // Update our map of streams -// *subscriptions = channels -// .clone() -// .into_iter() -// .filter(|(_, time)| time.elapsed().as_secs() < 30) -// .collect(); diff --git a/src/timeline.rs b/src/timeline.rs index 38b4e6e..541ac2b 100644 --- a/src/timeline.rs +++ b/src/timeline.rs @@ -191,10 +191,10 @@ mod tests { access_token )) .filter(&user()) - .unwrap(); + .expect("in test"); let expected_user = - User::from_access_token(access_token.clone(), user::Scope::Private).unwrap(); + User::from_access_token(access_token.clone(), user::Scope::Private).expect("in test"); assert_eq!(actual_timeline, "1"); assert_eq!(actual_user, expected_user); @@ -204,9 +204,10 @@ mod tests { .path("/api/v1/streaming/user") .header("Authorization", format!("Bearer: {}", access_token.clone())) .filter(&user()) - .unwrap(); + .expect("in test"); - let expected_user = User::from_access_token(access_token, user::Scope::Private).unwrap(); + let expected_user = + User::from_access_token(access_token, user::Scope::Private).expect("in test"); assert_eq!(actual_timeline, "1"); assert_eq!(actual_user, expected_user); @@ -240,10 +241,10 @@ mod tests { access_token )) .filter(&user_notifications()) - .unwrap(); + .expect("in test"); let expected_user = User::from_access_token(access_token.clone(), user::Scope::Private) - .unwrap() + .expect("in test") .with_notification_filter(); assert_eq!(actual_timeline, "1"); @@ -254,10 +255,10 @@ mod tests { .path("/api/v1/streaming/user/notification") .header("Authorization", format!("Bearer: {}", access_token.clone())) .filter(&user_notifications()) - .unwrap(); + .expect("in test"); let expected_user = User::from_access_token(access_token, user::Scope::Private) - .unwrap() + .expect("in test") .with_notification_filter(); assert_eq!(actual_timeline, "1"); @@ -268,7 +269,7 @@ mod tests { let value = warp::test::request() .path("/api/v1/streaming/public") .filter(&public()) - .unwrap(); + .expect("in test"); assert_eq!(value.0, "public".to_string()); assert_eq!(value.1, User::public().with_language_filter()); @@ -279,7 +280,7 @@ mod tests { let value = warp::test::request() .path("/api/v1/streaming/public?only_media=true") .filter(&public_media()) - .unwrap(); + .expect("in test"); assert_eq!(value.0, "public:media".to_string()); assert_eq!(value.1, User::public().with_language_filter()); @@ -287,7 +288,7 @@ mod tests { let value = warp::test::request() .path("/api/v1/streaming/public?only_media=1") .filter(&public_media()) - .unwrap(); + .expect("in test"); assert_eq!(value.0, "public:media".to_string()); assert_eq!(value.1, User::public().with_language_filter()); @@ -298,7 +299,7 @@ mod tests { let value = warp::test::request() .path("/api/v1/streaming/public/local") .filter(&public_local()) - .unwrap(); + .expect("in test"); assert_eq!(value.0, "public:local".to_string()); assert_eq!(value.1, User::public().with_language_filter()); @@ -309,7 +310,7 @@ mod tests { let value = warp::test::request() .path("/api/v1/streaming/public/local?only_media=true") .filter(&public_local_media()) - .unwrap(); + .expect("in test"); assert_eq!(value.0, "public:local:media".to_string()); assert_eq!(value.1, User::public().with_language_filter()); @@ -317,7 +318,7 @@ mod tests { let value = warp::test::request() .path("/api/v1/streaming/public/local?only_media=1") .filter(&public_local_media()) - .unwrap(); + .expect("in test"); assert_eq!(value.0, "public:local:media".to_string()); assert_eq!(value.1, User::public().with_language_filter()); @@ -351,10 +352,10 @@ mod tests { access_token )) .filter(&direct()) - .unwrap(); + .expect("in test"); let expected_user = - User::from_access_token(access_token.clone(), user::Scope::Private).unwrap(); + User::from_access_token(access_token.clone(), user::Scope::Private).expect("in test"); assert_eq!(actual_timeline, "direct:1"); assert_eq!(actual_user, expected_user); @@ -364,9 +365,10 @@ mod tests { .path("/api/v1/streaming/direct") .header("Authorization", format!("Bearer: {}", access_token.clone())) .filter(&direct()) - .unwrap(); + .expect("in test"); - let expected_user = User::from_access_token(access_token, user::Scope::Private).unwrap(); + let expected_user = + User::from_access_token(access_token, user::Scope::Private).expect("in test"); assert_eq!(actual_timeline, "direct:1"); assert_eq!(actual_user, expected_user); @@ -377,7 +379,7 @@ mod tests { let value = warp::test::request() .path("/api/v1/streaming/hashtag?tag=a") .filter(&hashtag()) - .unwrap(); + .expect("in test"); assert_eq!(value.0, "hashtag:a".to_string()); assert_eq!(value.1, User::public()); @@ -388,7 +390,7 @@ mod tests { let value = warp::test::request() .path("/api/v1/streaming/hashtag/local?tag=a") .filter(&hashtag_local()) - .unwrap(); + .expect("in test"); assert_eq!(value.0, "hashtag:a:local".to_string()); assert_eq!(value.1, User::public()); @@ -408,10 +410,10 @@ mod tests { access_token, list_id, )) .filter(&list()) - .unwrap(); + .expect("in test"); let expected_user = - User::from_access_token(access_token.clone(), user::Scope::Private).unwrap(); + User::from_access_token(access_token.clone(), user::Scope::Private).expect("in test"); assert_eq!(actual_timeline, "list:1"); assert_eq!(actual_user, expected_user); @@ -421,9 +423,10 @@ mod tests { .path("/api/v1/streaming/list?list=1") .header("Authorization", format!("Bearer: {}", access_token.clone())) .filter(&list()) - .unwrap(); + .expect("in test"); - let expected_user = User::from_access_token(access_token, user::Scope::Private).unwrap(); + let expected_user = + User::from_access_token(access_token, user::Scope::Private).expect("in test"); assert_eq!(actual_timeline, "list:1"); assert_eq!(actual_user, expected_user); @@ -452,7 +455,7 @@ mod tests { "SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1", &[&list_number], ) - .unwrap(); + .expect("in test"); assert_eq!( rows.len(), diff --git a/src/user.rs b/src/user.rs index f752f8e..073ad49 100644 --- a/src/user.rs +++ b/src/user.rs @@ -10,7 +10,7 @@ pub fn connect_to_postgres() -> postgres::Connection { "postgres://dsock@localhost/mastodon_development", postgres::TlsMode::None, ) - .unwrap() + .expect("Can connect to local Postgres") } /// The filters that can be applied to toots after they come from Redis @@ -32,7 +32,6 @@ pub struct User { impl User { /// Create a user from the access token supplied in the header or query paramaters pub fn from_access_token(token: String, scope: Scope) -> Result { - println!("Getting user"); let conn = connect_to_postgres(); let result = &conn .query( @@ -129,19 +128,9 @@ pub enum Scope { } impl Scope { pub fn get_access_token(self) -> warp::filters::BoxedFilter<(String,)> { - println!("Getting access token"); - let token_from_header = - warp::header::header::("authorization").map(|auth: String| { - println!( - "Got token_from_header: {}", - auth.split(' ').nth(1).unwrap_or("invalid").to_string() - ); - auth.split(' ').nth(1).unwrap_or("invalid").to_string() - }); - let token_from_query = warp::query().map(|q: query::Auth| { - println!("Got token_from_query: {}", &q.access_token); - q.access_token - }); + let token_from_header = warp::header::header::("authorization") + .map(|auth: String| auth.split(' ').nth(1).unwrap_or("invalid").to_string()); + let token_from_query = warp::query().map(|q: query::Auth| q.access_token); let public = warp::any().map(|| "no access token".to_string()); match self { diff --git a/src/ws.rs b/src/ws.rs new file mode 100644 index 0000000..5ed494b --- /dev/null +++ b/src/ws.rs @@ -0,0 +1,42 @@ +use crate::stream::StreamManager; +use futures::future::Future; +use futures::stream::Stream; +use futures::Async; + +pub fn handle_ws( + socket: warp::ws::WebSocket, + mut stream: StreamManager, +) -> impl futures::future::Future { + let (tx, rx) = futures::sync::mpsc::unbounded(); + let (ws_tx, mut ws_rx) = socket.split(); + warp::spawn( + rx.map_err(|()| -> warp::Error { unreachable!() }) + .forward(ws_tx) + .map_err(|_| ()) + .map(|_r| ()), + ); + let event_stream = tokio::timer::Interval::new( + std::time::Instant::now(), + std::time::Duration::from_millis(100), + ) + .take_while(move |_| { + if ws_rx.poll().is_err() { + futures::future::ok(false) + } else { + futures::future::ok(true) + } + }); + + event_stream + .for_each(move |_json_value| { + if let Ok(Async::Ready(Some(json_value))) = stream.poll() { + let msg = warp::ws::Message::text(json_value.to_string()); + if !tx.is_closed() { + tx.unbounded_send(msg).expect("No send error"); + } + }; + Ok(()) + }) + .then(|msg| msg) + .map_err(|e| println!("{}", e)) +}