Implement basic Server Sent Events with Warp

This commit rolls up several days of work that took place in a side repo
which was originally intended to be a test/proof-of-concept repo but
that grew into a full implementation.

This implementation switches to Warp as the framework for the primary
web server (in large part because Warp offers built-in support for
SSE and several other advantages).  This implementation relies on
a custom Redis interface built on Tokio (because the current Redis
Rust crate does not support asnyc PubSub).  Using a custom interface
should also be faster, since it does not contain logic for anything
other than pubsub—which is all we need.

Finally, I note that the SSE support is currently as feature-complete
as it is possible to be without having yet added the Postgress interface
this means that all of the endpoints are present and are accessible
on localhost.  However, none of the endpoints have authentication yet,
and the endpoints that should provide user-specific data (e.g.,
`/api/v1/streaming/user`) are currently hard-coded to provide data for
user 1 (the admin).  Other than those limitations, however, the SSE
implementation is feature complete.
This commit is contained in:
Daniel Sockwell 2019-04-15 14:22:44 -04:00
parent 36c2734c86
commit 23eaa4a270
4 changed files with 307 additions and 90 deletions

100
src/main.old.rs Normal file
View File

@ -0,0 +1,100 @@
#[macro_use]
extern crate envconfig_derive;
mod api;
mod common;
mod env;
mod middleware;
use actix::prelude::*;
use actix_redis::RedisActor;
use actix_web::{http::header, middleware::cors::Cors, server, App, HttpResponse};
use env::{RedisConfig, ServerConfig};
use env_logger::Builder;
use envconfig::Envconfig;
use log::info;
use std::net::SocketAddr;
const ENV_LOG_VARIABLE: &str = "STREAMING_API_LOG";
#[derive(Clone)]
pub struct AppState {
redis: Addr<RedisActor>,
}
fn main() {
Builder::from_env(ENV_LOG_VARIABLE).init();
info!("starting streaming api server");
let server_cfg = ServerConfig::init().expect("failed to obtain server environment");
let redis_cfg = RedisConfig::init().expect("failed to obtain redis environment");
let sys = System::new("streaming-api-server");
let redis_addr = RedisActor::start(format!("{}:{}", redis_cfg.host, redis_cfg.port));
let app_state = AppState {
redis: redis_addr.clone(),
};
server::new(move || endpoints(&app_state))
.bind(SocketAddr::new(server_cfg.address, server_cfg.port))
.unwrap()
.start();
sys.run();
}
fn endpoints(app_state: &AppState) -> App<AppState> {
use api::http;
use api::ws;
App::with_state(app_state.clone())
.prefix("/api/v1")
.resource("/streaming", |r| r.with(ws::index))
.resource("/streaming/health", |r| {
r.middleware(cors_middleware());
r.get().f(|_| HttpResponse::Ok())
})
.resource("/streaming/user", |r| {
r.middleware(cors_middleware());
r.get().with(http::user::index)
})
.resource("/streaming/public", |r| {
r.middleware(cors_middleware());
r.get().with(http::public::index)
})
.resource("/streaming/public/local", |r| {
r.middleware(cors_middleware());
r.get().with(http::public::local)
})
.resource("/streaming/direct", |r| {
r.middleware(cors_middleware());
r.get().with(http::direct::index)
})
.resource("/streaming/hashtag", |r| {
r.middleware(cors_middleware());
r.get().with(http::hashtag::index)
})
.resource("/streaming/hashtag/local", |r| {
r.middleware(cors_middleware());
r.get().with(http::hashtag::local)
})
.resource("/streaming/list", |r| {
r.middleware(cors_middleware());
r.get().with(http::list::index)
})
}
fn cors_middleware() -> Cors {
Cors::build()
.allowed_origin("*")
.allowed_methods(vec!["GET", "OPTIONS"])
.allowed_headers(vec![
header::AUTHORIZATION,
header::ACCEPT,
header::CACHE_CONTROL,
])
.finish()
}

View File

@ -1,100 +1,127 @@
#[macro_use]
extern crate envconfig_derive;
mod api;
mod common;
mod env;
mod middleware;
use actix::prelude::*;
use actix_redis::RedisActor;
use actix_web::{http::header, middleware::cors::Cors, server, App, HttpResponse};
use env::{RedisConfig, ServerConfig};
use env_logger::Builder;
use envconfig::Envconfig;
use log::info;
use std::net::SocketAddr;
const ENV_LOG_VARIABLE: &str = "STREAMING_API_LOG";
#[derive(Clone)]
pub struct AppState {
redis: Addr<RedisActor>,
}
mod pubsub;
mod query;
use futures::stream::Stream;
use warp::{path, Filter};
fn main() {
Builder::from_env(ENV_LOG_VARIABLE).init();
use warp::path;
let base = path!("api" / "v1" / "streaming");
info!("starting streaming api server");
// GET /api/v1/streaming/user
let user_timeline = base
.and(path("user"))
.and(path::end())
// TODO get user id from postgress
.map(|| pubsub::stream_from("1".to_string()));
let server_cfg = ServerConfig::init().expect("failed to obtain server environment");
let redis_cfg = RedisConfig::init().expect("failed to obtain redis environment");
// GET /api/v1/streaming/user/notification
let user_timeline_notifications = base
.and(path!("user" / "notification"))
.and(path::end())
// TODO get user id from postgress
.map(|| {
let full_stream = pubsub::stream_from("1".to_string());
// TODO: filter stream to just have notifications
full_stream
});
let sys = System::new("streaming-api-server");
// GET /api/v1/streaming/public
let public_timeline = base
.and(path("public"))
.and(path::end())
.map(|| pubsub::stream_from("public".to_string()));
let redis_addr = RedisActor::start(format!("{}:{}", redis_cfg.host, redis_cfg.port));
// GET /api/v1/streaming/public?only_media=true
let public_timeline_media = base
.and(path("public"))
.and(warp::query())
.and(path::end())
.map(|q: query::Media| {
if q.only_media == "1" || q.only_media == "true" {
pubsub::stream_from("public:media".to_string())
} else {
pubsub::stream_from("public".to_string())
}
});
let app_state = AppState {
redis: redis_addr.clone(),
};
// GET /api/v1/streaming/public/local
let local_timeline = base
.and(path!("public" / "local"))
.and(path::end())
.map(|| pubsub::stream_from("public:local".to_string()));
server::new(move || endpoints(&app_state))
.bind(SocketAddr::new(server_cfg.address, server_cfg.port))
.unwrap()
.start();
// GET /api/v1/streaming/public/local?only_media=true
let local_timeline_media = base
.and(path!("public" / "local"))
.and(warp::query())
.and(path::end())
.map(|q: query::Media| {
if q.only_media == "1" || q.only_media == "true" {
pubsub::stream_from("public:local:media".to_string())
} else {
pubsub::stream_from("public:local".to_string())
}
});
sys.run();
}
fn endpoints(app_state: &AppState) -> App<AppState> {
use api::http;
use api::ws;
App::with_state(app_state.clone())
.prefix("/api/v1")
.resource("/streaming", |r| r.with(ws::index))
.resource("/streaming/health", |r| {
r.middleware(cors_middleware());
r.get().f(|_| HttpResponse::Ok())
})
.resource("/streaming/user", |r| {
r.middleware(cors_middleware());
r.get().with(http::user::index)
})
.resource("/streaming/public", |r| {
r.middleware(cors_middleware());
r.get().with(http::public::index)
})
.resource("/streaming/public/local", |r| {
r.middleware(cors_middleware());
r.get().with(http::public::local)
})
.resource("/streaming/direct", |r| {
r.middleware(cors_middleware());
r.get().with(http::direct::index)
})
.resource("/streaming/hashtag", |r| {
r.middleware(cors_middleware());
r.get().with(http::hashtag::index)
})
.resource("/streaming/hashtag/local", |r| {
r.middleware(cors_middleware());
r.get().with(http::hashtag::local)
})
.resource("/streaming/list", |r| {
r.middleware(cors_middleware());
r.get().with(http::list::index)
})
}
fn cors_middleware() -> Cors {
Cors::build()
.allowed_origin("*")
.allowed_methods(vec!["GET", "OPTIONS"])
.allowed_headers(vec![
header::AUTHORIZATION,
header::ACCEPT,
header::CACHE_CONTROL,
])
.finish()
// GET /api/v1/streaming/direct
let direct_timeline = base
.and(path("direct"))
.and(path::end())
// TODO get user id from postgress
.map(|| pubsub::stream_from("direct:1".to_string()));
// GET /api/v1/streaming/hashtag?tag=:hashtag
let hashtag_timeline = base
.and(path("hashtag"))
.and(warp::query())
.and(path::end())
.map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}", q.tag)));
// GET /api/v1/streaming/hashtag/local?tag=:hashtag
let hashtag_timeline_local = base
.and(path!("hashtag" / "local"))
.and(warp::query())
.and(path::end())
.map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}:local", q.tag)));
// GET /api/v1/streaming/list?list=:list_id
let list_timeline = base
.and(path("list"))
.and(warp::query())
.and(path::end())
.map(|q: query::List| pubsub::stream_from(format!("list:{}", q.list)));
let routes = user_timeline
.or(user_timeline_notifications)
.unify()
.or(public_timeline_media)
.unify()
.or(public_timeline)
.unify()
.or(local_timeline_media)
.unify()
.or(local_timeline)
.unify()
.or(direct_timeline)
.unify()
.or(hashtag_timeline)
.unify()
.or(hashtag_timeline_local)
.unify()
.or(list_timeline)
.unify()
.and_then(|event_stream| event_stream)
.and(warp::sse())
.map(|event_stream: pubsub::Receiver, sse: warp::sse::Sse| {
sse.reply(warp::sse::keep(
event_stream.map(|item| {
let payload = item["payload"].clone();
let event = item["event"].clone();
(warp::sse::event(event), warp::sse::data(payload))
}),
None,
))
});
warp::serve(routes).run(([127, 0, 0, 1], 3030));
}

76
src/pubsub.rs Normal file
View File

@ -0,0 +1,76 @@
use futures::{Async, Future, Poll};
use regex::Regex;
use serde_json::Value;
use tokio::io::{AsyncRead, AsyncWrite, Error, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use warp::Stream;
pub struct Receiver {
rx: ReadHalf<TcpStream>,
}
impl Stream for Receiver {
type Item = Value;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
let mut buffer = vec![0u8; 3000];
while let Async::Ready(num_bytes_read) = self.rx.poll_read(&mut buffer)? {
let re = Regex::new(r"(?x)(?P<json>\{.*\})").unwrap();
if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) {
let json_string = cap["json"].to_string();
let json: Value = serde_json::from_str(&json_string.clone())?;
return Ok(Async::Ready(Some(json)));
}
return Ok(Async::NotReady);
}
return Ok(Async::NotReady);
}
}
struct Sender {
tx: WriteHalf<TcpStream>,
channel: String,
}
impl Future for Sender {
type Item = ();
type Error = Box<Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
println!("Subscribing to {}", &self.channel);
let subscribe_cmd = format!(
"*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n",
self.channel.len(),
self.channel
);
let mut buffer = subscribe_cmd.as_bytes();
self.tx.poll_write(&mut buffer)?;
return Ok(Async::NotReady);
}
}
fn get_socket() -> impl Future<Item = TcpStream, Error = Box<Error>> {
let address = "127.0.0.1:6379".parse().expect("Unable to parse address");
let connection = TcpStream::connect(&address);
connection
.and_then(|socket| Ok(socket))
.map_err(|e| Box::new(e))
}
fn send_subscribe_cmd(tx: WriteHalf<TcpStream>, channel: String) {
let sender = Sender { tx, channel };
tokio::spawn(sender.map_err(|e| eprintln!("{}", e)));
}
pub fn stream_from(
timeline: String,
) -> impl Future<Item = Receiver, Error = warp::reject::Rejection> {
get_socket()
.and_then(move |socket| {
let (rx, tx) = socket.split();
send_subscribe_cmd(tx, format!("timeline:{}", timeline));
let stream_of_data_from_redis = Receiver { rx };
Ok(stream_of_data_from_redis)
})
.and_then(|stream| Ok(stream))
.map_err(|e| warp::reject::custom(e))
}

14
src/query.rs Normal file
View File

@ -0,0 +1,14 @@
use serde_derive::Deserialize;
#[derive(Deserialize)]
pub struct Media {
pub only_media: String,
}
#[derive(Deserialize)]
pub struct Hashtag {
pub tag: String,
}
#[derive(Deserialize)]
pub struct List {
pub list: String,
}