diff --git a/Cargo.lock b/Cargo.lock index cab72f7..477ecee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -51,6 +51,29 @@ dependencies = [ "trust-dns-resolver 0.10.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "actix-redis" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "actix 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-web 0.7.18 (registry+https://github.com/rust-lang/crates.io-index)", + "backoff 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "cookie 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", + "redis-async 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "actix-web" version = "0.7.18" @@ -162,6 +185,14 @@ name = "autocfg" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "backoff" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.3.23 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "backtrace" version = "0.3.13" @@ -897,10 +928,34 @@ name = "ragequit" version = "0.1.0" dependencies = [ "actix 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-redis 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 0.7.18 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "redis-async 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "structopt 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", + "uuid 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1019,6 +1074,21 @@ dependencies = [ "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "redis-async" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "redox_syscall" version = "0.1.51" @@ -1751,6 +1821,7 @@ dependencies = [ [metadata] "checksum actix 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)" = "6c616db5fa4b0c40702fb75201c2af7f8aa8f3a2e2c1dda3b0655772aa949666" "checksum actix-net 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "8bebfbe6629e0131730746718c9e032b58f02c6ce06ed7c982b9fef6c8545acd" +"checksum actix-redis 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1f28e1fe2385c03b04ea744d9935cea6011f45ac32d72c480a6b62596a717ec2" "checksum actix-web 0.7.18 (registry+https://github.com/rust-lang/crates.io-index)" = "e9f33c941e5e69a58a6bfef33853228042ed3799fc4b5a4923a36a85776fb690" "checksum actix_derive 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4300e9431455322ae393d43a2ba1ef96b8080573c0fc23b196219efedfb6ba69" "checksum adler32 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7e522997b529f05601e05166c07ed17789691f562762c7f3b987263d2dedee5c" @@ -1760,6 +1831,7 @@ dependencies = [ "checksum arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "92c7fb76bc8826a8b33b4ee5bb07a247a81e76764ab4d55e8f73e3a4d8808c71" "checksum atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9a7d5b8723950951411ee34d271d99dddcc2035a16ab25310ea2c8cfd4369652" "checksum autocfg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a6d640bee2da49f60a4068a7fae53acde8982514ab7bae8b8cea9e88cbcfd799" +"checksum backoff 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b7bd67d02cc9dfe9bb1891cb6b4f0169f53cdf0a78b07276ab2141452aaf5789" "checksum backtrace 0.3.13 (registry+https://github.com/rust-lang/crates.io-index)" = "b5b493b66e03090ebc4343eb02f94ff944e0cbc9ac6571491d170ba026741eb5" "checksum backtrace-sys 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "797c830ac25ccc92a7f8a7b9862bde440715531514594a6154e3d4a54dd769b6" "checksum base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e" @@ -1847,6 +1919,8 @@ dependencies = [ "checksum proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)" = "4d317f9caece796be1980837fd5cb3dfec5613ebdb04ad0956deea83ce168915" "checksum quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9274b940887ce9addde99c4eee6b5c44cc494b182b97e73dc8ffdcb3397fd3f0" "checksum quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)" = "cdd8e04bd9c52e0342b406469d494fcb033be4bdbe5c606016defbb1681411e1" +"checksum rand 0.3.23 (registry+https://github.com/rust-lang/crates.io-index)" = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" +"checksum rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" "checksum rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c618c47cd3ebd209790115ab837de41425723956ad3ce2e6a7f09890947cacb9" "checksum rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca" "checksum rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef" @@ -1859,6 +1933,7 @@ dependencies = [ "checksum rand_pcg 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "086bd09a33c7044e56bb44d5bdde5a60e7f119a9e95b0775f545de759a32fe05" "checksum rand_xorshift 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c" "checksum rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +"checksum redis-async 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "892130a1d228f10cb177561ad9cd0fec994e324ed50b6b309a738de0ccd37690" "checksum redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)" = "423e376fffca3dfa06c9e9790a9ccd282fafb3cc6e6397d01dbf64f9bacc6b85" "checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" "checksum regex 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "37e7cbbd370869ce2e8dff25c7018702d10b21a20ef7135316f8daecd6c25b7f" diff --git a/Cargo.toml b/Cargo.toml index 0aef9bc..d5c5012 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,6 @@ structopt = "0.2.14" log = "0.4.6" env_logger = "0.6.0" actix = "0.7.9" +actix-redis = "0.5.1" +redis-async = "0.4.4" +uuid = "0.7.2" diff --git a/src/api/http/direct.rs b/src/api/http/direct.rs index 1560950..191fa9f 100644 --- a/src/api/http/direct.rs +++ b/src/api/http/direct.rs @@ -1,5 +1,6 @@ +use crate::AppState; use actix_web::{HttpRequest, Responder}; -pub fn index(_req: HttpRequest) -> impl Responder { +pub fn index(_req: HttpRequest) -> impl Responder { "OMG! It works!" } diff --git a/src/api/http/hashtag.rs b/src/api/http/hashtag.rs index 68dc8ee..594d6a8 100644 --- a/src/api/http/hashtag.rs +++ b/src/api/http/hashtag.rs @@ -1,9 +1,10 @@ +use crate::AppState; use actix_web::{HttpRequest, Responder}; -pub fn index(_req: HttpRequest) -> impl Responder { +pub fn index(_req: HttpRequest) -> impl Responder { "OMG! It works!" } -pub fn local(_req: HttpRequest) -> impl Responder { +pub fn local(_req: HttpRequest) -> impl Responder { "OMG! It works!" } diff --git a/src/api/http/list.rs b/src/api/http/list.rs index 1560950..191fa9f 100644 --- a/src/api/http/list.rs +++ b/src/api/http/list.rs @@ -1,5 +1,6 @@ +use crate::AppState; use actix_web::{HttpRequest, Responder}; -pub fn index(_req: HttpRequest) -> impl Responder { +pub fn index(_req: HttpRequest) -> impl Responder { "OMG! It works!" } diff --git a/src/api/http/public.rs b/src/api/http/public.rs index 68dc8ee..594d6a8 100644 --- a/src/api/http/public.rs +++ b/src/api/http/public.rs @@ -1,9 +1,10 @@ +use crate::AppState; use actix_web::{HttpRequest, Responder}; -pub fn index(_req: HttpRequest) -> impl Responder { +pub fn index(_req: HttpRequest) -> impl Responder { "OMG! It works!" } -pub fn local(_req: HttpRequest) -> impl Responder { +pub fn local(_req: HttpRequest) -> impl Responder { "OMG! It works!" } diff --git a/src/api/http/user.rs b/src/api/http/user.rs index 1560950..191fa9f 100644 --- a/src/api/http/user.rs +++ b/src/api/http/user.rs @@ -1,5 +1,6 @@ +use crate::AppState; use actix_web::{HttpRequest, Responder}; -pub fn index(_req: HttpRequest) -> impl Responder { +pub fn index(_req: HttpRequest) -> impl Responder { "OMG! It works!" } diff --git a/src/api/ws/mod.rs b/src/api/ws/mod.rs index 1fc9884..f5a12fe 100644 --- a/src/api/ws/mod.rs +++ b/src/api/ws/mod.rs @@ -1,28 +1,38 @@ -use actix::{Actor, StreamHandler}; +use crate::{common::HEARTBEAT_INTERVAL_SECONDS, AppState}; +use actix::{Actor, AsyncContext, StreamHandler}; +use actix_redis::{Command, RespValue}; use actix_web::{ws, HttpRequest, Responder}; -use log::debug; +use log::{debug, info}; +use std::time::Duration; /// Define http actor struct WebsocketActor; impl Actor for WebsocketActor { - type Context = ws::WebsocketContext; + type Context = ws::WebsocketContext; } /// Handler for ws::Message message impl StreamHandler for WebsocketActor { + fn started(&mut self, ctx: &mut Self::Context) { + ctx.run_interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS), |_, inner_ctx| { + inner_ctx.ping("0123456789abcdef"); + }); + } + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { debug!("Message {:?}", msg); + let red = ctx.state().redis.send(Command(RespValue::SimpleString("GET".into()))); + match msg { - ws::Message::Ping(msg) => ctx.pong(&msg), + ws::Message::Pong(msg) => debug!("{}", msg), ws::Message::Text(text) => ctx.text(text), - ws::Message::Binary(bin) => ctx.binary(bin), _ => (), } } } -pub fn index(req: HttpRequest) -> impl Responder { +pub fn index(req: HttpRequest) -> impl Responder { ws::start(&req, WebsocketActor) } diff --git a/src/common.rs b/src/common.rs new file mode 100644 index 0000000..e2d7462 --- /dev/null +++ b/src/common.rs @@ -0,0 +1 @@ +pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 30; diff --git a/src/main.rs b/src/main.rs index 2a38bd9..bb14657 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,10 @@ mod api; +mod common; +mod middleware; -use actix_web::{server, App}; +use actix::prelude::*; +use actix_redis::RedisActor; +use actix_web::{http::header, middleware::cors::Cors, server, App}; use env_logger::Builder; use log::info; use std::net::SocketAddr; @@ -14,6 +18,11 @@ struct Opt { port: u16, } +#[derive(Clone)] +pub struct AppState { + redis: Addr, +} + fn main() { Builder::from_env(ENV_LOG_VARIABLE).init(); @@ -23,21 +32,47 @@ fn main() { let addr: SocketAddr = ([127, 0, 0, 1], args.port).into(); - use api::{http, ws}; + let sys = System::new("streaming-api-server"); - server::new(|| { - App::new() - .resource("/api/v1/streaming/user", |r| r.with(http::user::index)) - .resource("/api/v1/streaming/public", |r| r.with(http::public::index)) - .resource("/api/v1/streaming/public/local", |r| r.with(http::public::local)) - .resource("/api/v1/streaming/direct", |r| r.with(http::direct::index)) - .resource("/api/v1/streaming/hashtag", |r| r.with(http::hashtag::index)) - .resource("/api/v1/streaming/hashtag/local", |r| r.with(http::hashtag::local)) - .resource("/api/v1/streaming/list", |r| r.with(http::list::index)) - .resource("/api/v1/streaming", |r| r.with(ws::index)) - }) - .bind(addr) - .unwrap() - .shutdown_timeout(10) - .run(); + let redis_addr = RedisActor::start("127.0.0.1:6379"); + + let app_state = AppState { + redis: redis_addr.clone(), + }; + + server::new(move || vec![ws_endpoints(&app_state), http_endpoints(&app_state)]) + .bind(addr) + .unwrap() + .start(); + + sys.run(); +} + +fn http_endpoints(app_state: &AppState) -> App { + use api::http; + + App::with_state(app_state.clone()) + .middleware(cors_middleware()) + .prefix("/api/v1") + .resource("/streaming/user", |r| r.with(http::user::index)) + .resource("/streaming/public", |r| r.with(http::public::index)) + .resource("/streaming/public/local", |r| r.with(http::public::local)) + .resource("/streaming/direct", |r| r.with(http::direct::index)) + .resource("/streaming/hashtag", |r| r.with(http::hashtag::index)) + .resource("/streaming/hashtag/local", |r| r.with(http::hashtag::local)) + .resource("/streaming/list", |r| r.with(http::list::index)) +} + +fn ws_endpoints(app_state: &AppState) -> App { + use api::ws; + + App::with_state(app_state.clone()).resource("/api/v1/streaming", |r| r.with(ws::index)) +} + +fn cors_middleware() -> Cors { + Cors::build() + .allowed_origin("*") + .allowed_methods(vec!["GET", "OPTIONS"]) + .allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT, header::CACHE_CONTROL]) + .finish() } diff --git a/src/middleware/auth.rs b/src/middleware/auth.rs new file mode 100644 index 0000000..ec98135 --- /dev/null +++ b/src/middleware/auth.rs @@ -0,0 +1,21 @@ +use crate::AppState; +use actix_web::{ + error::Result, + http::header::AUTHORIZATION, + middleware::{Middleware, Started}, + HttpRequest, HttpResponse, +}; + +pub struct Auth; + +impl Middleware for Auth { + fn start(&self, req: &HttpRequest) -> Result { + let res = req + .headers() + .get(AUTHORIZATION) + .map(|bearer| Started::Done) + .unwrap_or_else(|| Started::Response(HttpResponse::Unauthorized().finish())); + + Ok(res) + } +} diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs new file mode 100644 index 0000000..5696e21 --- /dev/null +++ b/src/middleware/mod.rs @@ -0,0 +1 @@ +pub mod auth; \ No newline at end of file