diff --git a/Cargo.lock b/Cargo.lock index d5dd39c..0318b3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,14 @@ dependencies = [ "const-random 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ahash" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "const-random 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "aho-corasick" version = "0.7.6" @@ -44,6 +52,11 @@ name = "autocfg" version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "autocfg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "backtrace" version = "0.3.15" @@ -62,7 +75,7 @@ name = "backtrace-sys" version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)", + "cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -163,7 +176,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "cc" -version = "1.0.36" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -440,11 +453,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.7.0" +version = "0.7.1" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", + "hashbrown 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "lru 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "postgres 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -650,6 +664,15 @@ dependencies = [ "autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hashbrown" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "ahash 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "headers" version = "0.2.1" @@ -1051,7 +1074,7 @@ version = "0.9.49" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", - "cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)", + "cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)", "pkg-config 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", "vcpkg 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2378,11 +2401,13 @@ dependencies = [ [metadata] "checksum ahash 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "6f33b5018f120946c1dcf279194f238a9f146725593ead1c08fa47ff22b0b5d3" +"checksum ahash 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0989268a37e128d4d7a8028f1c60099430113fdbc70419010601ce51a228e4fe" "checksum aho-corasick 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "58fb5e95d83b38284460a5fda7d6470aa0b8844d283a0b614b8535e880800d2d" "checksum antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" "checksum arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "92c7fb76bc8826a8b33b4ee5bb07a247a81e76764ab4d55e8f73e3a4d8808c71" "checksum atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9a7d5b8723950951411ee34d271d99dddcc2035a16ab25310ea2c8cfd4369652" "checksum autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2" +"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" "checksum backtrace 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "f106c02a3604afcdc0df5d36cc47b44b55917dbaf3d808f71c163a0ddba64637" "checksum backtrace-sys 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "797c830ac25ccc92a7f8a7b9862bde440715531514594a6154e3d4a54dd769b6" "checksum base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e" @@ -2398,7 +2423,7 @@ dependencies = [ "checksum bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "10004c15deb332055f7a4a208190aed362cf9a7c2f6ab70a305fba50e1105f38" "checksum c2-chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7d64d04786e0f528460fc884753cf8dddcc466be308f6026f8e355c41a0e4101" "checksum cast 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "926013f2860c46252efceabb19f4a6b308197505082c609025aa6706c011d427" -"checksum cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)" = "a0c56216487bb80eec9c4516337b2588a4f2a2290d72a1416d930e4dcdb0c90d" +"checksum cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)" = "95e28fa049fda1c330bcf9d723be7663a899c4679724b34c81e9f5a326aab8cd" "checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" "checksum chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "77d81f58b7301084de3b958691458a53c3f7e0b1d702f77e550b6a88e3a88abe" "checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" @@ -2450,6 +2475,7 @@ dependencies = [ "checksum getrandom 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "473a1265acc8ff1e808cd0a1af8cee3c2ee5200916058a2ca113c29f2d903571" "checksum h2 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "85ab6286db06040ddefb71641b50017c06874614001a134b423783e2db2920bd" "checksum hashbrown 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead" +"checksum hashbrown 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "479e9d9a1a3f8c489868a935b557ab5710e3e223836da2ecd52901d88935cb56" "checksum headers 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dc6e2e51d356081258ef05ff4c648138b5d3fe64b7300aaad3b820554a2b7fb6" "checksum headers-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "51ae5b0b5417559ee1d2733b21d33b0868ae9e406bd32eb1a51d613f66ed472a" "checksum headers-derive 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "97c462e8066bca4f0968ddf8d12de64c40f2c2187b3b9a2fa994d06e8ad444a9" diff --git a/Cargo.toml b/Cargo.toml index aaee961..bd642ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ r2d2_postgres = "0.16.0" r2d2 = "0.8.8" lru = "0.4.3" urlencoding = "1.0.0" +hashbrown = "0.7.1" [dev-dependencies] criterion = "0.3" @@ -40,3 +41,10 @@ default = [ "production" ] bench = [] stub_status = [] production = [] + +[profile.release] +lto = "fat" +panic = "abort" +codegen-units = 1 + + diff --git a/src/config/environmental_variables.rs b/src/config/environmental_variables.rs index 6588765..960ce7c 100644 --- a/src/config/environmental_variables.rs +++ b/src/config/environmental_variables.rs @@ -1,4 +1,5 @@ -use std::{collections::HashMap, fmt}; +use hashbrown::HashMap; +use std::fmt; pub struct EnvVar(pub HashMap); impl std::ops::Deref for EnvVar { diff --git a/src/messages/event/checked_event/status/mod.rs b/src/messages/event/checked_event/status/mod.rs index 427bc17..8656231 100644 --- a/src/messages/event/checked_event/status/mod.rs +++ b/src/messages/event/checked_event/status/mod.rs @@ -9,9 +9,10 @@ use {application::Application, attachment::Attachment, card::Card, poll::Poll}; use crate::log_fatal; use crate::parse_client_request::Blocks; +use hashbrown::HashSet; use serde::{Deserialize, Serialize}; use std::boxed::Box; -use std::{collections::HashSet, string::String}; +use std::string::String; #[serde(deny_unknown_fields)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] diff --git a/src/messages/event/dynamic_event.rs b/src/messages/event/dynamic_event.rs index 45ed3ba..9bff2fd 100644 --- a/src/messages/event/dynamic_event.rs +++ b/src/messages/event/dynamic_event.rs @@ -1,7 +1,7 @@ use crate::parse_client_request::Blocks; +use hashbrown::HashSet; use serde::{Deserialize, Serialize}; use serde_json::Value; -use std::collections::HashSet; #[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] pub struct DynamicEvent { diff --git a/src/messages/test.rs b/src/messages/test.rs index 12b0190..c6d82ae 100644 --- a/src/messages/test.rs +++ b/src/messages/test.rs @@ -1,5 +1,5 @@ use super::*; -use std::collections::HashMap; +use hashbrown::HashMap; #[serde(rename_all = "snake_case")] #[derive(Deserialize, Debug, Clone, PartialEq)] diff --git a/src/parse_client_request/postgres.rs b/src/parse_client_request/postgres.rs index 4bb1348..d3120cf 100644 --- a/src/parse_client_request/postgres.rs +++ b/src/parse_client_request/postgres.rs @@ -4,8 +4,8 @@ use crate::{ parse_client_request::subscription::{Scope, UserData}, }; use ::postgres; +use hashbrown::HashSet; use r2d2_postgres::PostgresConnectionManager; -use std::collections::HashSet; use warp::reject::Rejection; #[derive(Clone, Debug)] diff --git a/src/parse_client_request/subscription.rs b/src/parse_client_request/subscription.rs index 790a6ab..783ee3c 100644 --- a/src/parse_client_request/subscription.rs +++ b/src/parse_client_request/subscription.rs @@ -9,8 +9,8 @@ use super::postgres::PgPool; use super::query::Query; use crate::err::TimelineErr; use crate::log_fatal; +use hashbrown::HashSet; use lru::LruCache; -use std::collections::HashSet; use uuid::Uuid; use warp::reject::Rejection; @@ -59,6 +59,13 @@ pub struct Subscription { pub access_token: Option, } +#[derive(Clone, Default, Debug, PartialEq)] +pub struct Blocks { + pub blocked_domains: HashSet, + pub blocked_users: HashSet, + pub blocking_users: HashSet, +} + impl Default for Subscription { fn default() -> Self { Self { @@ -301,14 +308,6 @@ pub enum Scope { Lists, } -#[derive(Clone, Default, Debug, PartialEq)] -pub struct Blocks { - pub blocked_domains: HashSet, - pub blocked_users: HashSet, - pub blocking_users: HashSet, -} - -#[derive(Clone, Debug, PartialEq)] pub struct UserData { pub id: i64, pub allowed_langs: HashSet, diff --git a/src/redis_to_client_stream/client_agent.rs b/src/redis_to_client_stream/client_agent.rs index 1a729ed..a9681e0 100644 --- a/src/redis_to_client_stream/client_agent.rs +++ b/src/redis_to_client_stream/client_agent.rs @@ -95,7 +95,7 @@ impl futures::stream::Stream for ClientAgent { fn poll(&mut self) -> Poll, Self::Error> { let result = { let mut receiver = self.lock_receiver(); - receiver.poll_for(self.subscription.id, self.subscription.timeline) + receiver.poll_for(self.subscription.id) }; let timeline = &self.subscription.timeline; @@ -105,6 +105,8 @@ impl futures::stream::Stream for ClientAgent { use crate::messages::{CheckedEvent::Update, Event::*}; match result { + Ok(NotReady) => Ok(NotReady), + Ok(Ready(None)) => Ok(Ready(None)), Ok(Async::Ready(Some(event))) => match event { TypeSafe(Update { payload, queued_at }) => match timeline { Timeline(Public, _, _) if payload.language_not(allowed_langs) => block, @@ -119,9 +121,6 @@ impl futures::stream::Stream for ClientAgent { }, Dynamic(non_update) => send(Dynamic(non_update)), }, - - Ok(Ready(None)) => Ok(Ready(None)), - Ok(NotReady) => Ok(NotReady), Err(e) => Err(e), } } diff --git a/src/redis_to_client_stream/event_stream.rs b/src/redis_to_client_stream/event_stream.rs index 9f56c84..8c28ecb 100644 --- a/src/redis_to_client_stream/event_stream.rs +++ b/src/redis_to_client_stream/event_stream.rs @@ -44,30 +44,32 @@ impl EventStream { // change](github.com/tootsuite/flodgatt/issues/121) is implemented, we'll // need to receive messages from the client. If so, we'll need a // `receive_from_ws.poll() call here (or later)` - match client_agent.poll() { + Ok(Async::NotReady) => { + if last_ping_time.elapsed() > Duration::from_secs(30) { + last_ping_time = Instant::now(); + match tx.unbounded_send(Message::text("{}")) { + Ok(_) => futures::future::ok(true), + Err(_) => client_agent.disconnect(), + } + } else { + futures::future::ok(true) + } + } Ok(Async::Ready(Some(msg))) => { match tx.unbounded_send(Message::text(msg.to_json_string())) { Ok(_) => futures::future::ok(true), Err(_) => client_agent.disconnect(), } } - Ok(Async::Ready(None)) => { - log::info!("WebSocket ClientAgent got Ready(None)"); - futures::future::ok(true) - } - Ok(Async::NotReady) if last_ping_time.elapsed() > Duration::from_secs(30) => { - last_ping_time = Instant::now(); - match tx.unbounded_send(Message::text("{}")) { - Ok(_) => futures::future::ok(true), - Err(_) => client_agent.disconnect(), - } - } - Ok(Async::NotReady) => futures::future::ok(true), // no new messages; nothing to do Err(e) => { log::error!("{}\n Dropping WebSocket message and continuing.", e); futures::future::ok(true) } + Ok(Async::Ready(None)) => { + log::info!("WebSocket ClientAgent got Ready(None)"); + futures::future::ok(true) + } } }) .for_each(move |_instant| Ok(())) diff --git a/src/redis_to_client_stream/receiver/err.rs b/src/redis_to_client_stream/receiver/err.rs index b3723e1..4921145 100644 --- a/src/redis_to_client_stream/receiver/err.rs +++ b/src/redis_to_client_stream/receiver/err.rs @@ -6,6 +6,7 @@ use std::fmt; #[derive(Debug)] pub enum ReceiverErr { + InvalidId, TimelineErr(TimelineErr), EventErr(serde_json::Error), RedisParseErr(RedisParseErr), @@ -16,6 +17,10 @@ impl fmt::Display for ReceiverErr { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { use ReceiverErr::*; match self { + InvalidId => write!( + f, + "Attempted to get messages for a subscription that had not been set up." + ), EventErr(inner) => write!(f, "{}", inner), RedisParseErr(inner) => write!(f, "{}", inner), RedisConnErr(inner) => write!(f, "{}", inner), diff --git a/src/redis_to_client_stream/receiver/message_queues.rs b/src/redis_to_client_stream/receiver/message_queues.rs index 9044460..ace23d8 100644 --- a/src/redis_to_client_stream/receiver/message_queues.rs +++ b/src/redis_to_client_stream/receiver/message_queues.rs @@ -1,10 +1,8 @@ use crate::messages::Event; use crate::parse_client_request::Timeline; -use std::{ - collections::{HashMap, VecDeque}, - fmt, -}; +use hashbrown::HashMap; +use std::{collections::VecDeque, fmt}; use uuid::Uuid; #[derive(Clone)] diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index 2229f2f..2df1cbc 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -15,13 +15,17 @@ use crate::{ parse_client_request::{Stream, Subscription, Timeline}, }; -use futures::{Async, Poll}; +use { + futures::{Async, Poll}, + hashbrown::HashMap, + uuid::Uuid, +}; + use std::{ - collections::HashMap, result, sync::{Arc, Mutex}, + time::{Duration, Instant}, }; -use uuid::Uuid; type Result = result::Result; @@ -29,6 +33,8 @@ type Result = result::Result; #[derive(Debug)] pub struct Receiver { redis_connection: RedisConn, + redis_poll_interval: Duration, + redis_polled_at: Instant, pub msg_queues: MessageQueues, clients_per_timeline: HashMap, } @@ -37,9 +43,12 @@ impl Receiver { /// Create a new `Receiver`, with its own Redis connections (but, as yet, no /// active subscriptions). pub fn try_from(redis_cfg: config::RedisConfig) -> Result { + let redis_poll_interval = *redis_cfg.polling_interval; let redis_connection = RedisConn::new(redis_cfg)?; Ok(Self { + redis_polled_at: Instant::now(), + redis_poll_interval, redis_connection, msg_queues: MessageQueues(HashMap::new()), clients_per_timeline: HashMap::new(), @@ -103,35 +112,38 @@ impl Receiver { /// Redis is significantly more time consuming that simply returning the /// message already in a queue. Thus, we only poll Redis if it has not /// been polled lately. - pub fn poll_for(&mut self, id: Uuid, timeline: Timeline) -> Poll, ReceiverErr> { - loop { - match self.redis_connection.poll_redis() { - Ok(Async::Ready(Some((timeline, event)))) => { - self.msg_queues - .values_mut() - .filter(|msg_queue| msg_queue.timeline == timeline) - .for_each(|msg_queue| { - msg_queue.messages.push_back(event.clone()); - }); + pub fn poll_for(&mut self, id: Uuid) -> Poll, ReceiverErr> { + // let (t1, mut polled_redis) = (Instant::now(), false); + if self.redis_polled_at.elapsed() > self.redis_poll_interval { + loop { + match self.redis_connection.poll_redis() { + Ok(Async::NotReady) => break, + Ok(Async::Ready(Some((timeline, event)))) => { + self.msg_queues + .values_mut() + .filter(|msg_queue| msg_queue.timeline == timeline) + .for_each(|msg_queue| { + msg_queue.messages.push_back(event.clone()); + }); + } + Ok(Async::Ready(None)) => (), // subscription cmd or msg for other namespace + Err(err) => Err(err)?, } - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) => (), - Err(err) => Err(err)?, } + // polled_redis = true; + self.redis_polled_at = Instant::now(); } // If the `msg_queue` being polled has any new messages, return the first (oldest) one - match self.msg_queues.get_mut(&id) { - Some(msg_q) => match msg_q.messages.pop_front() { - Some(event) => Ok(Async::Ready(Some(event))), - None => Ok(Async::NotReady), - }, - None => { - log::error!("Polled a MsgQueue that had not been set up. Setting it up now."); - self.msg_queues.insert(id, MsgQueue::new(timeline)); - Ok(Async::NotReady) - } - } + let msg_q = self.msg_queues.get_mut(&id).ok_or(ReceiverErr::InvalidId)?; + let res = match msg_q.messages.pop_front() { + Some(event) => Ok(Async::Ready(Some(event))), + None => Ok(Async::NotReady), + }; + // if !polled_redis { + // log::info!("poll_for in {:?}", t1.elapsed()); + // } + res } pub fn count_connections(&self) -> String { diff --git a/src/redis_to_client_stream/redis/redis_connection/mod.rs b/src/redis_to_client_stream/redis/redis_connection/mod.rs index bab23ca..f34bc8b 100644 --- a/src/redis_to_client_stream/redis/redis_connection/mod.rs +++ b/src/redis_to_client_stream/redis/redis_connection/mod.rs @@ -14,7 +14,7 @@ use std::{ io::{Read, Write}, net::TcpStream, str, - time::{Duration, Instant}, + time::Duration, }; use futures::{Async, Poll}; @@ -26,8 +26,6 @@ type Result = std::result::Result; pub struct RedisConn { primary: TcpStream, secondary: TcpStream, - redis_poll_interval: Duration, - redis_polled_at: Instant, redis_namespace: Option, tag_id_cache: LruCache, tag_name_cache: LruCache, @@ -49,21 +47,32 @@ impl RedisConn { // the tag number instead of the tag name. This would save us from dealing // with a cache here and would be consistent with how lists/users are handled. redis_namespace: redis_cfg.namespace.clone(), - redis_poll_interval: *redis_cfg.polling_interval, redis_input: Vec::new(), - redis_polled_at: Instant::now(), }; Ok(redis_conn) } pub fn poll_redis(&mut self) -> Poll, ReceiverErr> { - let mut buffer = vec![0u8; 6000]; - if self.redis_polled_at.elapsed() > self.redis_poll_interval { - if let Ok(bytes_read) = self.primary.read(&mut buffer) { - self.redis_input.extend_from_slice(&buffer[..bytes_read]); + let mut size = 100; // large enough to handle subscribe/unsubscribe notice + let (mut buffer, mut first_read) = (vec![0u8; size], true); + loop { + match self.primary.read(&mut buffer) { + Ok(n) if n != size => { + self.redis_input.extend_from_slice(&buffer[..n]); + break; + } + Ok(n) => { + self.redis_input.extend_from_slice(&buffer[..n]); + } + Err(_) => break, }; - self.redis_polled_at = Instant::now(); + if first_read { + size = 2000; + buffer = vec![0u8; size]; + first_read = false; + } } + if self.redis_input.is_empty() { return Ok(Async::NotReady); }