From 23eaa4a270546b013d13b1dfcb107bc9a3163fde Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Mon, 15 Apr 2019 14:22:44 -0400 Subject: [PATCH] Implement basic Server Sent Events with Warp MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit rolls up several days of work that took place in a side repo which was originally intended to be a test/proof-of-concept repo but that grew into a full implementation. This implementation switches to Warp as the framework for the primary web server (in large part because Warp offers built-in support for SSE and several other advantages). This implementation relies on a custom Redis interface built on Tokio (because the current Redis Rust crate does not support asnyc PubSub). Using a custom interface should also be faster, since it does not contain logic for anything other than pubsub—which is all we need. Finally, I note that the SSE support is currently as feature-complete as it is possible to be without having yet added the Postgress interface this means that all of the endpoints are present and are accessible on localhost. However, none of the endpoints have authentication yet, and the endpoints that should provide user-specific data (e.g., `/api/v1/streaming/user`) are currently hard-coded to provide data for user 1 (the admin). Other than those limitations, however, the SSE implementation is feature complete. --- src/main.old.rs | 100 +++++++++++++++++++++++ src/main.rs | 207 +++++++++++++++++++++++++++--------------------- src/pubsub.rs | 76 ++++++++++++++++++ src/query.rs | 14 ++++ 4 files changed, 307 insertions(+), 90 deletions(-) create mode 100644 src/main.old.rs create mode 100644 src/pubsub.rs create mode 100644 src/query.rs diff --git a/src/main.old.rs b/src/main.old.rs new file mode 100644 index 0000000..b573ba1 --- /dev/null +++ b/src/main.old.rs @@ -0,0 +1,100 @@ +#[macro_use] +extern crate envconfig_derive; + +mod api; +mod common; +mod env; +mod middleware; + +use actix::prelude::*; +use actix_redis::RedisActor; +use actix_web::{http::header, middleware::cors::Cors, server, App, HttpResponse}; +use env::{RedisConfig, ServerConfig}; +use env_logger::Builder; +use envconfig::Envconfig; +use log::info; +use std::net::SocketAddr; + +const ENV_LOG_VARIABLE: &str = "STREAMING_API_LOG"; + +#[derive(Clone)] +pub struct AppState { + redis: Addr, +} + +fn main() { + Builder::from_env(ENV_LOG_VARIABLE).init(); + + info!("starting streaming api server"); + + let server_cfg = ServerConfig::init().expect("failed to obtain server environment"); + let redis_cfg = RedisConfig::init().expect("failed to obtain redis environment"); + + let sys = System::new("streaming-api-server"); + + let redis_addr = RedisActor::start(format!("{}:{}", redis_cfg.host, redis_cfg.port)); + + let app_state = AppState { + redis: redis_addr.clone(), + }; + + server::new(move || endpoints(&app_state)) + .bind(SocketAddr::new(server_cfg.address, server_cfg.port)) + .unwrap() + .start(); + + sys.run(); +} + +fn endpoints(app_state: &AppState) -> App { + use api::http; + use api::ws; + + App::with_state(app_state.clone()) + .prefix("/api/v1") + .resource("/streaming", |r| r.with(ws::index)) + .resource("/streaming/health", |r| { + r.middleware(cors_middleware()); + r.get().f(|_| HttpResponse::Ok()) + }) + .resource("/streaming/user", |r| { + r.middleware(cors_middleware()); + r.get().with(http::user::index) + }) + .resource("/streaming/public", |r| { + r.middleware(cors_middleware()); + r.get().with(http::public::index) + }) + .resource("/streaming/public/local", |r| { + r.middleware(cors_middleware()); + r.get().with(http::public::local) + }) + .resource("/streaming/direct", |r| { + r.middleware(cors_middleware()); + r.get().with(http::direct::index) + }) + .resource("/streaming/hashtag", |r| { + r.middleware(cors_middleware()); + r.get().with(http::hashtag::index) + }) + .resource("/streaming/hashtag/local", |r| { + r.middleware(cors_middleware()); + r.get().with(http::hashtag::local) + }) + .resource("/streaming/list", |r| { + r.middleware(cors_middleware()); + r.get().with(http::list::index) + }) +} + +fn cors_middleware() -> Cors { + Cors::build() + .allowed_origin("*") + .allowed_methods(vec!["GET", "OPTIONS"]) + .allowed_headers(vec![ + header::AUTHORIZATION, + header::ACCEPT, + header::CACHE_CONTROL, + ]) + .finish() +} diff --git a/src/main.rs b/src/main.rs index b573ba1..e6f5179 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,100 +1,127 @@ -#[macro_use] -extern crate envconfig_derive; - -mod api; -mod common; -mod env; -mod middleware; - -use actix::prelude::*; -use actix_redis::RedisActor; -use actix_web::{http::header, middleware::cors::Cors, server, App, HttpResponse}; -use env::{RedisConfig, ServerConfig}; -use env_logger::Builder; -use envconfig::Envconfig; -use log::info; -use std::net::SocketAddr; - -const ENV_LOG_VARIABLE: &str = "STREAMING_API_LOG"; - -#[derive(Clone)] -pub struct AppState { - redis: Addr, -} +mod pubsub; +mod query; +use futures::stream::Stream; +use warp::{path, Filter}; fn main() { - Builder::from_env(ENV_LOG_VARIABLE).init(); + use warp::path; + let base = path!("api" / "v1" / "streaming"); - info!("starting streaming api server"); + // GET /api/v1/streaming/user + let user_timeline = base + .and(path("user")) + .and(path::end()) + // TODO get user id from postgress + .map(|| pubsub::stream_from("1".to_string())); - let server_cfg = ServerConfig::init().expect("failed to obtain server environment"); - let redis_cfg = RedisConfig::init().expect("failed to obtain redis environment"); + // GET /api/v1/streaming/user/notification + let user_timeline_notifications = base + .and(path!("user" / "notification")) + .and(path::end()) + // TODO get user id from postgress + .map(|| { + let full_stream = pubsub::stream_from("1".to_string()); + // TODO: filter stream to just have notifications + full_stream + }); - let sys = System::new("streaming-api-server"); + // GET /api/v1/streaming/public + let public_timeline = base + .and(path("public")) + .and(path::end()) + .map(|| pubsub::stream_from("public".to_string())); - let redis_addr = RedisActor::start(format!("{}:{}", redis_cfg.host, redis_cfg.port)); + // GET /api/v1/streaming/public?only_media=true + let public_timeline_media = base + .and(path("public")) + .and(warp::query()) + .and(path::end()) + .map(|q: query::Media| { + if q.only_media == "1" || q.only_media == "true" { + pubsub::stream_from("public:media".to_string()) + } else { + pubsub::stream_from("public".to_string()) + } + }); - let app_state = AppState { - redis: redis_addr.clone(), - }; + // GET /api/v1/streaming/public/local + let local_timeline = base + .and(path!("public" / "local")) + .and(path::end()) + .map(|| pubsub::stream_from("public:local".to_string())); - server::new(move || endpoints(&app_state)) - .bind(SocketAddr::new(server_cfg.address, server_cfg.port)) - .unwrap() - .start(); + // GET /api/v1/streaming/public/local?only_media=true + let local_timeline_media = base + .and(path!("public" / "local")) + .and(warp::query()) + .and(path::end()) + .map(|q: query::Media| { + if q.only_media == "1" || q.only_media == "true" { + pubsub::stream_from("public:local:media".to_string()) + } else { + pubsub::stream_from("public:local".to_string()) + } + }); - sys.run(); -} - -fn endpoints(app_state: &AppState) -> App { - use api::http; - use api::ws; - - App::with_state(app_state.clone()) - .prefix("/api/v1") - .resource("/streaming", |r| r.with(ws::index)) - .resource("/streaming/health", |r| { - r.middleware(cors_middleware()); - r.get().f(|_| HttpResponse::Ok()) - }) - .resource("/streaming/user", |r| { - r.middleware(cors_middleware()); - r.get().with(http::user::index) - }) - .resource("/streaming/public", |r| { - r.middleware(cors_middleware()); - r.get().with(http::public::index) - }) - .resource("/streaming/public/local", |r| { - r.middleware(cors_middleware()); - r.get().with(http::public::local) - }) - .resource("/streaming/direct", |r| { - r.middleware(cors_middleware()); - r.get().with(http::direct::index) - }) - .resource("/streaming/hashtag", |r| { - r.middleware(cors_middleware()); - r.get().with(http::hashtag::index) - }) - .resource("/streaming/hashtag/local", |r| { - r.middleware(cors_middleware()); - r.get().with(http::hashtag::local) - }) - .resource("/streaming/list", |r| { - r.middleware(cors_middleware()); - r.get().with(http::list::index) - }) -} - -fn cors_middleware() -> Cors { - Cors::build() - .allowed_origin("*") - .allowed_methods(vec!["GET", "OPTIONS"]) - .allowed_headers(vec![ - header::AUTHORIZATION, - header::ACCEPT, - header::CACHE_CONTROL, - ]) - .finish() + // GET /api/v1/streaming/direct + let direct_timeline = base + .and(path("direct")) + .and(path::end()) + // TODO get user id from postgress + .map(|| pubsub::stream_from("direct:1".to_string())); + + // GET /api/v1/streaming/hashtag?tag=:hashtag + let hashtag_timeline = base + .and(path("hashtag")) + .and(warp::query()) + .and(path::end()) + .map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}", q.tag))); + + // GET /api/v1/streaming/hashtag/local?tag=:hashtag + let hashtag_timeline_local = base + .and(path!("hashtag" / "local")) + .and(warp::query()) + .and(path::end()) + .map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}:local", q.tag))); + + // GET /api/v1/streaming/list?list=:list_id + let list_timeline = base + .and(path("list")) + .and(warp::query()) + .and(path::end()) + .map(|q: query::List| pubsub::stream_from(format!("list:{}", q.list))); + + let routes = user_timeline + .or(user_timeline_notifications) + .unify() + .or(public_timeline_media) + .unify() + .or(public_timeline) + .unify() + .or(local_timeline_media) + .unify() + .or(local_timeline) + .unify() + .or(direct_timeline) + .unify() + .or(hashtag_timeline) + .unify() + .or(hashtag_timeline_local) + .unify() + .or(list_timeline) + .unify() + .and_then(|event_stream| event_stream) + .and(warp::sse()) + .map(|event_stream: pubsub::Receiver, sse: warp::sse::Sse| { + sse.reply(warp::sse::keep( + event_stream.map(|item| { + let payload = item["payload"].clone(); + let event = item["event"].clone(); + (warp::sse::event(event), warp::sse::data(payload)) + }), + None, + )) + }); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)); } diff --git a/src/pubsub.rs b/src/pubsub.rs new file mode 100644 index 0000000..8a49ce1 --- /dev/null +++ b/src/pubsub.rs @@ -0,0 +1,76 @@ +use futures::{Async, Future, Poll}; +use regex::Regex; +use serde_json::Value; +use tokio::io::{AsyncRead, AsyncWrite, Error, ReadHalf, WriteHalf}; +use tokio::net::TcpStream; +use warp::Stream; + +pub struct Receiver { + rx: ReadHalf, +} +impl Stream for Receiver { + type Item = Value; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let mut buffer = vec![0u8; 3000]; + while let Async::Ready(num_bytes_read) = self.rx.poll_read(&mut buffer)? { + let re = Regex::new(r"(?x)(?P\{.*\})").unwrap(); + + if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) { + let json_string = cap["json"].to_string(); + let json: Value = serde_json::from_str(&json_string.clone())?; + return Ok(Async::Ready(Some(json))); + } + return Ok(Async::NotReady); + } + return Ok(Async::NotReady); + } +} + +struct Sender { + tx: WriteHalf, + channel: String, +} +impl Future for Sender { + type Item = (); + type Error = Box; + fn poll(&mut self) -> Poll { + println!("Subscribing to {}", &self.channel); + let subscribe_cmd = format!( + "*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n", + self.channel.len(), + self.channel + ); + let mut buffer = subscribe_cmd.as_bytes(); + self.tx.poll_write(&mut buffer)?; + return Ok(Async::NotReady); + } +} + +fn get_socket() -> impl Future> { + let address = "127.0.0.1:6379".parse().expect("Unable to parse address"); + let connection = TcpStream::connect(&address); + connection + .and_then(|socket| Ok(socket)) + .map_err(|e| Box::new(e)) +} + +fn send_subscribe_cmd(tx: WriteHalf, channel: String) { + let sender = Sender { tx, channel }; + tokio::spawn(sender.map_err(|e| eprintln!("{}", e))); +} + +pub fn stream_from( + timeline: String, +) -> impl Future { + get_socket() + .and_then(move |socket| { + let (rx, tx) = socket.split(); + send_subscribe_cmd(tx, format!("timeline:{}", timeline)); + let stream_of_data_from_redis = Receiver { rx }; + Ok(stream_of_data_from_redis) + }) + .and_then(|stream| Ok(stream)) + .map_err(|e| warp::reject::custom(e)) +} diff --git a/src/query.rs b/src/query.rs new file mode 100644 index 0000000..42b4ea7 --- /dev/null +++ b/src/query.rs @@ -0,0 +1,14 @@ +use serde_derive::Deserialize; + +#[derive(Deserialize)] +pub struct Media { + pub only_media: String, +} +#[derive(Deserialize)] +pub struct Hashtag { + pub tag: String, +} +#[derive(Deserialize)] +pub struct List { + pub list: String, +}