diff --git a/Cargo.lock b/Cargo.lock index df55fd2..afe23e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -308,6 +308,16 @@ name = "cfg-if" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "chrono" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-integer 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "clap" version = "2.32.0" @@ -947,6 +957,19 @@ dependencies = [ "version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "num-integer" +version = "0.1.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-traits 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "num-traits" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "num_cpus" version = "1.10.0" @@ -1029,6 +1052,16 @@ dependencies = [ "unicase 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "pretty_env_logger" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "proc-macro2" version = "0.4.27" @@ -1062,6 +1095,7 @@ dependencies = [ "envconfig_derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "pretty_env_logger 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "redis-async 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2098,6 +2132,7 @@ dependencies = [ "checksum bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "40ade3d27603c2cb345eb0912aec461a6dec7e06a4ae48589904e808335c7afa" "checksum cc 1.0.29 (registry+https://github.com/rust-lang/crates.io-index)" = "4390a3b5f4f6bce9c1d0c00128379df433e53777fdd30e92f16a529332baec4e" "checksum cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "082bb9b28e00d3c9d39cc03e64ce4cea0f1bb9b3fde493f0cbc008472d22bdf4" +"checksum chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "45912881121cb26fad7c38c17ba7daa18764771836b34fab7d3fbd93ed633878" "checksum clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b957d88f4b6a63b9d70d5f454ac8011819c6efa7727858f458ab71c756ce2d3e" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" "checksum cookie 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1465f8134efa296b4c19db34d909637cb2bf0f7aaf21299e23e18fa29ac557cf" @@ -2171,6 +2206,8 @@ dependencies = [ "checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" "checksum nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "2f9667ddcc6cc8a43afc9b7917599d7216aa09c463919ea32c59ed6cac8bc945" "checksum nom 4.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b30adc557058ce00c9d0d7cb3c6e0b5bc6f36e2e2eabe74b0ba726d194abd588" +"checksum num-integer 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)" = "e83d528d2677f0518c570baf2b7abdcf0cd2d248860b68507bdcb3e91d4c0cea" +"checksum num-traits 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0b3a5d7cc97d6d30d8b9bc8fa19bf45349ffe46241e8816f50f62f6d6aaabee1" "checksum num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1a23f0ed30a54abaa0c7e83b1d2d87ada7c3c23078d1d87815af3e3b6385fbba" "checksum opaque-debug 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "93f5bb2e8e8dec81642920ccff6b61f1eb94fa3020c5a325c9851ff604152409" "checksum owning_ref 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "49a4b8ea2179e6a2e27411d3bca09ca6dd630821cf6894c6c7c8467a8ee7ef13" @@ -2181,6 +2218,7 @@ dependencies = [ "checksum phf_codegen 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e" "checksum phf_generator 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "09364cc93c159b8b06b1f4dd8a4398984503483891b0c26b867cf431fb132662" "checksum phf_shared 0.7.24 (registry+https://github.com/rust-lang/crates.io-index)" = "234f71a15de2288bcb7e3b6515828d22af7ec8598ee6d24c3b526fa0a80b67a0" +"checksum pretty_env_logger 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df8b3f4e0475def7d9c2e5de8e5a1306949849761e107b360d03e98eafaffd61" "checksum proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)" = "4d317f9caece796be1980837fd5cb3dfec5613ebdb04ad0956deea83ce168915" "checksum quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9274b940887ce9addde99c4eee6b5c44cc494b182b97e73dc8ffdcb3397fd3f0" "checksum quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)" = "cdd8e04bd9c52e0342b406469d494fcb033be4bdbe5c606016defbb1681411e1" diff --git a/Cargo.toml b/Cargo.toml index 3a5cff7..adeaf15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ edition = "2018" actix-web = "0.7.18" structopt = "0.2.14" log = "0.4.6" -env_logger = "0.6.0" actix = "0.7.9" actix-redis = "0.5.1" redis-async = "0.4.4" @@ -24,6 +23,7 @@ regex = "1.1.5" serde_json = "1.0.39" serde_derive = "1.0.90" serde = "1.0.90" +pretty_env_logger = "0.3.0" [features] default = [ "production" ] diff --git a/src/main.rs b/src/main.rs index e6f5179..1c25b6e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,12 @@ mod pubsub; mod query; use futures::stream::Stream; +use log::info; +use pretty_env_logger; use warp::{path, Filter}; fn main() { - use warp::path; + pretty_env_logger::init(); let base = path!("api" / "v1" / "streaming"); // GET /api/v1/streaming/user @@ -12,7 +14,10 @@ fn main() { .and(path("user")) .and(path::end()) // TODO get user id from postgress - .map(|| pubsub::stream_from("1".to_string())); + .map(|| { + info!("GET /api/v1/streaming/user"); + pubsub::stream_from("1".to_string()) + }); // GET /api/v1/streaming/user/notification let user_timeline_notifications = base @@ -22,14 +27,14 @@ fn main() { .map(|| { let full_stream = pubsub::stream_from("1".to_string()); // TODO: filter stream to just have notifications + info!("GET /api/v1/streaming/user/notification"); full_stream }); - // GET /api/v1/streaming/public - let public_timeline = base - .and(path("public")) - .and(path::end()) - .map(|| pubsub::stream_from("public".to_string())); + let public_timeline = base.and(path("public")).and(path::end()).map(|| { + info!("GET /api/v1/streaming/public"); + pubsub::stream_from("public".to_string()) + }); // GET /api/v1/streaming/public?only_media=true let public_timeline_media = base @@ -37,6 +42,7 @@ fn main() { .and(warp::query()) .and(path::end()) .map(|q: query::Media| { + info!("GET /api/v1/streaming/public?only_media=true"); if q.only_media == "1" || q.only_media == "true" { pubsub::stream_from("public:media".to_string()) } else { @@ -48,7 +54,10 @@ fn main() { let local_timeline = base .and(path!("public" / "local")) .and(path::end()) - .map(|| pubsub::stream_from("public:local".to_string())); + .map(|| { + info!("GET /api/v1/streaming/public/local"); + pubsub::stream_from("public:local".to_string()) + }); // GET /api/v1/streaming/public/local?only_media=true let local_timeline_media = base @@ -56,6 +65,7 @@ fn main() { .and(warp::query()) .and(path::end()) .map(|q: query::Media| { + info!("GET /api/v1/streaming/public/local?only_media=true"); if q.only_media == "1" || q.only_media == "true" { pubsub::stream_from("public:local:media".to_string()) } else { @@ -68,28 +78,40 @@ fn main() { .and(path("direct")) .and(path::end()) // TODO get user id from postgress - .map(|| pubsub::stream_from("direct:1".to_string())); + .map(|| { + info!("GET /api/v1/streaming/direct"); + pubsub::stream_from("direct:1".to_string()) + }); // GET /api/v1/streaming/hashtag?tag=:hashtag let hashtag_timeline = base .and(path("hashtag")) .and(warp::query()) .and(path::end()) - .map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}", q.tag))); + .map(|q: query::Hashtag| { + info!("GET /api/v1/streaming/hashtag?tag=:hashtag"); + pubsub::stream_from(format!("hashtag:{}", q.tag)) + }); // GET /api/v1/streaming/hashtag/local?tag=:hashtag let hashtag_timeline_local = base .and(path!("hashtag" / "local")) .and(warp::query()) .and(path::end()) - .map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}:local", q.tag))); + .map(|q: query::Hashtag| { + info!("GET /api/v1/streaming/hashtag/local?tag=:hashtag"); + pubsub::stream_from(format!("hashtag:{}:local", q.tag)) + }); // GET /api/v1/streaming/list?list=:list_id let list_timeline = base .and(path("list")) .and(warp::query()) .and(path::end()) - .map(|q: query::List| pubsub::stream_from(format!("list:{}", q.list))); + .map(|q: query::List| { + info!("GET /api/v1/streaming/list?list=:list_id"); + pubsub::stream_from(format!("list:{}", q.list)) + }); let routes = user_timeline .or(user_timeline_notifications) @@ -123,5 +145,6 @@ fn main() { )) }); + info!("starting streaming api server"); warp::serve(routes).run(([127, 0, 0, 1], 3030)); } diff --git a/src/pubsub.rs b/src/pubsub.rs index 583bb69..a818e26 100644 --- a/src/pubsub.rs +++ b/src/pubsub.rs @@ -1,4 +1,5 @@ use futures::{Async, Future, Poll}; +use log::{debug, info}; use regex::Regex; use serde_json::Value; use tokio::io::{AsyncRead, AsyncWrite, Error, ReadHalf, WriteHalf}; @@ -18,6 +19,7 @@ impl Stream for Receiver { let re = Regex::new(r"(?x)(?P\{.*\})").unwrap(); if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) { + debug!("{}", &cap["json"]); let json_string = cap["json"].to_string(); let json: Value = serde_json::from_str(&json_string.clone())?; return Ok(Async::Ready(Some(json))); @@ -36,7 +38,7 @@ impl Future for Sender { type Item = (); type Error = Box; fn poll(&mut self) -> Poll { - println!("Subscribing to {}", &self.channel); + info!("Subscribing to {}", &self.channel); let subscribe_cmd = format!( "*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n", self.channel.len(),