mirror of https://github.com/mastodon/flodgatt
Minor performance tune (#127)
* Tweak release profile & micro optimizations * Replace std HashMap with hashbrown::HashMap The hashbrown::HashMap is faster than the std::collections::HashMap, though it does not protect as well against malicious hash collisions (e.g., in a DDoS). Since we don't expose the hashing externally, we should switch to the faster implementation.
This commit is contained in:
parent
cd82e2d34c
commit
fa8b695129
|
@ -8,6 +8,14 @@ dependencies = [
|
||||||
"const-random 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
"const-random 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ahash"
|
||||||
|
version = "0.3.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
dependencies = [
|
||||||
|
"const-random 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "aho-corasick"
|
name = "aho-corasick"
|
||||||
version = "0.7.6"
|
version = "0.7.6"
|
||||||
|
@ -44,6 +52,11 @@ name = "autocfg"
|
||||||
version = "0.1.7"
|
version = "0.1.7"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "autocfg"
|
||||||
|
version = "1.0.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "backtrace"
|
name = "backtrace"
|
||||||
version = "0.3.15"
|
version = "0.3.15"
|
||||||
|
@ -62,7 +75,7 @@ name = "backtrace-sys"
|
||||||
version = "0.1.28"
|
version = "0.1.28"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)",
|
"cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
|
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -163,7 +176,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cc"
|
name = "cc"
|
||||||
version = "1.0.36"
|
version = "1.0.50"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -440,11 +453,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flodgatt"
|
name = "flodgatt"
|
||||||
version = "0.7.0"
|
version = "0.7.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"hashbrown 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"lru 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
"lru 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"postgres 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"postgres 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
@ -650,6 +664,15 @@ dependencies = [
|
||||||
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hashbrown"
|
||||||
|
version = "0.7.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
dependencies = [
|
||||||
|
"ahash 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "headers"
|
name = "headers"
|
||||||
version = "0.2.1"
|
version = "0.2.1"
|
||||||
|
@ -1051,7 +1074,7 @@ version = "0.9.49"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)",
|
"cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
|
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"pkg-config 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
|
"pkg-config 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"vcpkg 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
"vcpkg 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
@ -2378,11 +2401,13 @@ dependencies = [
|
||||||
|
|
||||||
[metadata]
|
[metadata]
|
||||||
"checksum ahash 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "6f33b5018f120946c1dcf279194f238a9f146725593ead1c08fa47ff22b0b5d3"
|
"checksum ahash 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "6f33b5018f120946c1dcf279194f238a9f146725593ead1c08fa47ff22b0b5d3"
|
||||||
|
"checksum ahash 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0989268a37e128d4d7a8028f1c60099430113fdbc70419010601ce51a228e4fe"
|
||||||
"checksum aho-corasick 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "58fb5e95d83b38284460a5fda7d6470aa0b8844d283a0b614b8535e880800d2d"
|
"checksum aho-corasick 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "58fb5e95d83b38284460a5fda7d6470aa0b8844d283a0b614b8535e880800d2d"
|
||||||
"checksum antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5"
|
"checksum antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5"
|
||||||
"checksum arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "92c7fb76bc8826a8b33b4ee5bb07a247a81e76764ab4d55e8f73e3a4d8808c71"
|
"checksum arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "92c7fb76bc8826a8b33b4ee5bb07a247a81e76764ab4d55e8f73e3a4d8808c71"
|
||||||
"checksum atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9a7d5b8723950951411ee34d271d99dddcc2035a16ab25310ea2c8cfd4369652"
|
"checksum atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9a7d5b8723950951411ee34d271d99dddcc2035a16ab25310ea2c8cfd4369652"
|
||||||
"checksum autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2"
|
"checksum autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2"
|
||||||
|
"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
|
||||||
"checksum backtrace 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "f106c02a3604afcdc0df5d36cc47b44b55917dbaf3d808f71c163a0ddba64637"
|
"checksum backtrace 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "f106c02a3604afcdc0df5d36cc47b44b55917dbaf3d808f71c163a0ddba64637"
|
||||||
"checksum backtrace-sys 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "797c830ac25ccc92a7f8a7b9862bde440715531514594a6154e3d4a54dd769b6"
|
"checksum backtrace-sys 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "797c830ac25ccc92a7f8a7b9862bde440715531514594a6154e3d4a54dd769b6"
|
||||||
"checksum base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e"
|
"checksum base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e"
|
||||||
|
@ -2398,7 +2423,7 @@ dependencies = [
|
||||||
"checksum bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "10004c15deb332055f7a4a208190aed362cf9a7c2f6ab70a305fba50e1105f38"
|
"checksum bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "10004c15deb332055f7a4a208190aed362cf9a7c2f6ab70a305fba50e1105f38"
|
||||||
"checksum c2-chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7d64d04786e0f528460fc884753cf8dddcc466be308f6026f8e355c41a0e4101"
|
"checksum c2-chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7d64d04786e0f528460fc884753cf8dddcc466be308f6026f8e355c41a0e4101"
|
||||||
"checksum cast 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "926013f2860c46252efceabb19f4a6b308197505082c609025aa6706c011d427"
|
"checksum cast 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "926013f2860c46252efceabb19f4a6b308197505082c609025aa6706c011d427"
|
||||||
"checksum cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)" = "a0c56216487bb80eec9c4516337b2588a4f2a2290d72a1416d930e4dcdb0c90d"
|
"checksum cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)" = "95e28fa049fda1c330bcf9d723be7663a899c4679724b34c81e9f5a326aab8cd"
|
||||||
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
|
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
|
||||||
"checksum chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "77d81f58b7301084de3b958691458a53c3f7e0b1d702f77e550b6a88e3a88abe"
|
"checksum chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "77d81f58b7301084de3b958691458a53c3f7e0b1d702f77e550b6a88e3a88abe"
|
||||||
"checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9"
|
"checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9"
|
||||||
|
@ -2450,6 +2475,7 @@ dependencies = [
|
||||||
"checksum getrandom 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "473a1265acc8ff1e808cd0a1af8cee3c2ee5200916058a2ca113c29f2d903571"
|
"checksum getrandom 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "473a1265acc8ff1e808cd0a1af8cee3c2ee5200916058a2ca113c29f2d903571"
|
||||||
"checksum h2 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "85ab6286db06040ddefb71641b50017c06874614001a134b423783e2db2920bd"
|
"checksum h2 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "85ab6286db06040ddefb71641b50017c06874614001a134b423783e2db2920bd"
|
||||||
"checksum hashbrown 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead"
|
"checksum hashbrown 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead"
|
||||||
|
"checksum hashbrown 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "479e9d9a1a3f8c489868a935b557ab5710e3e223836da2ecd52901d88935cb56"
|
||||||
"checksum headers 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dc6e2e51d356081258ef05ff4c648138b5d3fe64b7300aaad3b820554a2b7fb6"
|
"checksum headers 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dc6e2e51d356081258ef05ff4c648138b5d3fe64b7300aaad3b820554a2b7fb6"
|
||||||
"checksum headers-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "51ae5b0b5417559ee1d2733b21d33b0868ae9e406bd32eb1a51d613f66ed472a"
|
"checksum headers-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "51ae5b0b5417559ee1d2733b21d33b0868ae9e406bd32eb1a51d613f66ed472a"
|
||||||
"checksum headers-derive 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "97c462e8066bca4f0968ddf8d12de64c40f2c2187b3b9a2fa994d06e8ad444a9"
|
"checksum headers-derive 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "97c462e8066bca4f0968ddf8d12de64c40f2c2187b3b9a2fa994d06e8ad444a9"
|
||||||
|
|
|
@ -25,6 +25,7 @@ r2d2_postgres = "0.16.0"
|
||||||
r2d2 = "0.8.8"
|
r2d2 = "0.8.8"
|
||||||
lru = "0.4.3"
|
lru = "0.4.3"
|
||||||
urlencoding = "1.0.0"
|
urlencoding = "1.0.0"
|
||||||
|
hashbrown = "0.7.1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
criterion = "0.3"
|
criterion = "0.3"
|
||||||
|
@ -40,3 +41,10 @@ default = [ "production" ]
|
||||||
bench = []
|
bench = []
|
||||||
stub_status = []
|
stub_status = []
|
||||||
production = []
|
production = []
|
||||||
|
|
||||||
|
[profile.release]
|
||||||
|
lto = "fat"
|
||||||
|
panic = "abort"
|
||||||
|
codegen-units = 1
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use std::{collections::HashMap, fmt};
|
use hashbrown::HashMap;
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
pub struct EnvVar(pub HashMap<String, String>);
|
pub struct EnvVar(pub HashMap<String, String>);
|
||||||
impl std::ops::Deref for EnvVar {
|
impl std::ops::Deref for EnvVar {
|
||||||
|
|
|
@ -9,9 +9,10 @@ use {application::Application, attachment::Attachment, card::Card, poll::Poll};
|
||||||
use crate::log_fatal;
|
use crate::log_fatal;
|
||||||
use crate::parse_client_request::Blocks;
|
use crate::parse_client_request::Blocks;
|
||||||
|
|
||||||
|
use hashbrown::HashSet;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::boxed::Box;
|
use std::boxed::Box;
|
||||||
use std::{collections::HashSet, string::String};
|
use std::string::String;
|
||||||
|
|
||||||
#[serde(deny_unknown_fields)]
|
#[serde(deny_unknown_fields)]
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use crate::parse_client_request::Blocks;
|
use crate::parse_client_request::Blocks;
|
||||||
|
use hashbrown::HashSet;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::collections::HashSet;
|
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
|
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
|
||||||
pub struct DynamicEvent {
|
pub struct DynamicEvent {
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use super::*;
|
use super::*;
|
||||||
use std::collections::HashMap;
|
use hashbrown::HashMap;
|
||||||
|
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "snake_case")]
|
||||||
#[derive(Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Deserialize, Debug, Clone, PartialEq)]
|
||||||
|
|
|
@ -4,8 +4,8 @@ use crate::{
|
||||||
parse_client_request::subscription::{Scope, UserData},
|
parse_client_request::subscription::{Scope, UserData},
|
||||||
};
|
};
|
||||||
use ::postgres;
|
use ::postgres;
|
||||||
|
use hashbrown::HashSet;
|
||||||
use r2d2_postgres::PostgresConnectionManager;
|
use r2d2_postgres::PostgresConnectionManager;
|
||||||
use std::collections::HashSet;
|
|
||||||
use warp::reject::Rejection;
|
use warp::reject::Rejection;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
|
|
@ -9,8 +9,8 @@ use super::postgres::PgPool;
|
||||||
use super::query::Query;
|
use super::query::Query;
|
||||||
use crate::err::TimelineErr;
|
use crate::err::TimelineErr;
|
||||||
use crate::log_fatal;
|
use crate::log_fatal;
|
||||||
|
use hashbrown::HashSet;
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
use std::collections::HashSet;
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use warp::reject::Rejection;
|
use warp::reject::Rejection;
|
||||||
|
|
||||||
|
@ -59,6 +59,13 @@ pub struct Subscription {
|
||||||
pub access_token: Option<String>,
|
pub access_token: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Default, Debug, PartialEq)]
|
||||||
|
pub struct Blocks {
|
||||||
|
pub blocked_domains: HashSet<String>,
|
||||||
|
pub blocked_users: HashSet<i64>,
|
||||||
|
pub blocking_users: HashSet<i64>,
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for Subscription {
|
impl Default for Subscription {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
@ -301,14 +308,6 @@ pub enum Scope {
|
||||||
Lists,
|
Lists,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Default, Debug, PartialEq)]
|
|
||||||
pub struct Blocks {
|
|
||||||
pub blocked_domains: HashSet<String>,
|
|
||||||
pub blocked_users: HashSet<i64>,
|
|
||||||
pub blocking_users: HashSet<i64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
|
||||||
pub struct UserData {
|
pub struct UserData {
|
||||||
pub id: i64,
|
pub id: i64,
|
||||||
pub allowed_langs: HashSet<String>,
|
pub allowed_langs: HashSet<String>,
|
||||||
|
|
|
@ -95,7 +95,7 @@ impl futures::stream::Stream for ClientAgent {
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
let result = {
|
let result = {
|
||||||
let mut receiver = self.lock_receiver();
|
let mut receiver = self.lock_receiver();
|
||||||
receiver.poll_for(self.subscription.id, self.subscription.timeline)
|
receiver.poll_for(self.subscription.id)
|
||||||
};
|
};
|
||||||
|
|
||||||
let timeline = &self.subscription.timeline;
|
let timeline = &self.subscription.timeline;
|
||||||
|
@ -105,6 +105,8 @@ impl futures::stream::Stream for ClientAgent {
|
||||||
|
|
||||||
use crate::messages::{CheckedEvent::Update, Event::*};
|
use crate::messages::{CheckedEvent::Update, Event::*};
|
||||||
match result {
|
match result {
|
||||||
|
Ok(NotReady) => Ok(NotReady),
|
||||||
|
Ok(Ready(None)) => Ok(Ready(None)),
|
||||||
Ok(Async::Ready(Some(event))) => match event {
|
Ok(Async::Ready(Some(event))) => match event {
|
||||||
TypeSafe(Update { payload, queued_at }) => match timeline {
|
TypeSafe(Update { payload, queued_at }) => match timeline {
|
||||||
Timeline(Public, _, _) if payload.language_not(allowed_langs) => block,
|
Timeline(Public, _, _) if payload.language_not(allowed_langs) => block,
|
||||||
|
@ -119,9 +121,6 @@ impl futures::stream::Stream for ClientAgent {
|
||||||
},
|
},
|
||||||
Dynamic(non_update) => send(Dynamic(non_update)),
|
Dynamic(non_update) => send(Dynamic(non_update)),
|
||||||
},
|
},
|
||||||
|
|
||||||
Ok(Ready(None)) => Ok(Ready(None)),
|
|
||||||
Ok(NotReady) => Ok(NotReady),
|
|
||||||
Err(e) => Err(e),
|
Err(e) => Err(e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,30 +44,32 @@ impl EventStream {
|
||||||
// change](github.com/tootsuite/flodgatt/issues/121) is implemented, we'll
|
// change](github.com/tootsuite/flodgatt/issues/121) is implemented, we'll
|
||||||
// need to receive messages from the client. If so, we'll need a
|
// need to receive messages from the client. If so, we'll need a
|
||||||
// `receive_from_ws.poll() call here (or later)`
|
// `receive_from_ws.poll() call here (or later)`
|
||||||
|
|
||||||
match client_agent.poll() {
|
match client_agent.poll() {
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
if last_ping_time.elapsed() > Duration::from_secs(30) {
|
||||||
|
last_ping_time = Instant::now();
|
||||||
|
match tx.unbounded_send(Message::text("{}")) {
|
||||||
|
Ok(_) => futures::future::ok(true),
|
||||||
|
Err(_) => client_agent.disconnect(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
futures::future::ok(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(Async::Ready(Some(msg))) => {
|
Ok(Async::Ready(Some(msg))) => {
|
||||||
match tx.unbounded_send(Message::text(msg.to_json_string())) {
|
match tx.unbounded_send(Message::text(msg.to_json_string())) {
|
||||||
Ok(_) => futures::future::ok(true),
|
Ok(_) => futures::future::ok(true),
|
||||||
Err(_) => client_agent.disconnect(),
|
Err(_) => client_agent.disconnect(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(None)) => {
|
|
||||||
log::info!("WebSocket ClientAgent got Ready(None)");
|
|
||||||
futures::future::ok(true)
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) if last_ping_time.elapsed() > Duration::from_secs(30) => {
|
|
||||||
last_ping_time = Instant::now();
|
|
||||||
match tx.unbounded_send(Message::text("{}")) {
|
|
||||||
Ok(_) => futures::future::ok(true),
|
|
||||||
Err(_) => client_agent.disconnect(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => futures::future::ok(true), // no new messages; nothing to do
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::error!("{}\n Dropping WebSocket message and continuing.", e);
|
log::error!("{}\n Dropping WebSocket message and continuing.", e);
|
||||||
futures::future::ok(true)
|
futures::future::ok(true)
|
||||||
}
|
}
|
||||||
|
Ok(Async::Ready(None)) => {
|
||||||
|
log::info!("WebSocket ClientAgent got Ready(None)");
|
||||||
|
futures::future::ok(true)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.for_each(move |_instant| Ok(()))
|
.for_each(move |_instant| Ok(()))
|
||||||
|
|
|
@ -6,6 +6,7 @@ use std::fmt;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum ReceiverErr {
|
pub enum ReceiverErr {
|
||||||
|
InvalidId,
|
||||||
TimelineErr(TimelineErr),
|
TimelineErr(TimelineErr),
|
||||||
EventErr(serde_json::Error),
|
EventErr(serde_json::Error),
|
||||||
RedisParseErr(RedisParseErr),
|
RedisParseErr(RedisParseErr),
|
||||||
|
@ -16,6 +17,10 @@ impl fmt::Display for ReceiverErr {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
use ReceiverErr::*;
|
use ReceiverErr::*;
|
||||||
match self {
|
match self {
|
||||||
|
InvalidId => write!(
|
||||||
|
f,
|
||||||
|
"Attempted to get messages for a subscription that had not been set up."
|
||||||
|
),
|
||||||
EventErr(inner) => write!(f, "{}", inner),
|
EventErr(inner) => write!(f, "{}", inner),
|
||||||
RedisParseErr(inner) => write!(f, "{}", inner),
|
RedisParseErr(inner) => write!(f, "{}", inner),
|
||||||
RedisConnErr(inner) => write!(f, "{}", inner),
|
RedisConnErr(inner) => write!(f, "{}", inner),
|
||||||
|
|
|
@ -1,10 +1,8 @@
|
||||||
use crate::messages::Event;
|
use crate::messages::Event;
|
||||||
use crate::parse_client_request::Timeline;
|
use crate::parse_client_request::Timeline;
|
||||||
|
|
||||||
use std::{
|
use hashbrown::HashMap;
|
||||||
collections::{HashMap, VecDeque},
|
use std::{collections::VecDeque, fmt};
|
||||||
fmt,
|
|
||||||
};
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
|
@ -15,13 +15,17 @@ use crate::{
|
||||||
parse_client_request::{Stream, Subscription, Timeline},
|
parse_client_request::{Stream, Subscription, Timeline},
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{Async, Poll};
|
use {
|
||||||
|
futures::{Async, Poll},
|
||||||
|
hashbrown::HashMap,
|
||||||
|
uuid::Uuid,
|
||||||
|
};
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
|
||||||
result,
|
result,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
type Result<T> = result::Result<T, ReceiverErr>;
|
type Result<T> = result::Result<T, ReceiverErr>;
|
||||||
|
|
||||||
|
@ -29,6 +33,8 @@ type Result<T> = result::Result<T, ReceiverErr>;
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Receiver {
|
pub struct Receiver {
|
||||||
redis_connection: RedisConn,
|
redis_connection: RedisConn,
|
||||||
|
redis_poll_interval: Duration,
|
||||||
|
redis_polled_at: Instant,
|
||||||
pub msg_queues: MessageQueues,
|
pub msg_queues: MessageQueues,
|
||||||
clients_per_timeline: HashMap<Timeline, i32>,
|
clients_per_timeline: HashMap<Timeline, i32>,
|
||||||
}
|
}
|
||||||
|
@ -37,9 +43,12 @@ impl Receiver {
|
||||||
/// Create a new `Receiver`, with its own Redis connections (but, as yet, no
|
/// Create a new `Receiver`, with its own Redis connections (but, as yet, no
|
||||||
/// active subscriptions).
|
/// active subscriptions).
|
||||||
pub fn try_from(redis_cfg: config::RedisConfig) -> Result<Self> {
|
pub fn try_from(redis_cfg: config::RedisConfig) -> Result<Self> {
|
||||||
|
let redis_poll_interval = *redis_cfg.polling_interval;
|
||||||
let redis_connection = RedisConn::new(redis_cfg)?;
|
let redis_connection = RedisConn::new(redis_cfg)?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
redis_polled_at: Instant::now(),
|
||||||
|
redis_poll_interval,
|
||||||
redis_connection,
|
redis_connection,
|
||||||
msg_queues: MessageQueues(HashMap::new()),
|
msg_queues: MessageQueues(HashMap::new()),
|
||||||
clients_per_timeline: HashMap::new(),
|
clients_per_timeline: HashMap::new(),
|
||||||
|
@ -103,35 +112,38 @@ impl Receiver {
|
||||||
/// Redis is significantly more time consuming that simply returning the
|
/// Redis is significantly more time consuming that simply returning the
|
||||||
/// message already in a queue. Thus, we only poll Redis if it has not
|
/// message already in a queue. Thus, we only poll Redis if it has not
|
||||||
/// been polled lately.
|
/// been polled lately.
|
||||||
pub fn poll_for(&mut self, id: Uuid, timeline: Timeline) -> Poll<Option<Event>, ReceiverErr> {
|
pub fn poll_for(&mut self, id: Uuid) -> Poll<Option<Event>, ReceiverErr> {
|
||||||
loop {
|
// let (t1, mut polled_redis) = (Instant::now(), false);
|
||||||
match self.redis_connection.poll_redis() {
|
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
|
||||||
Ok(Async::Ready(Some((timeline, event)))) => {
|
loop {
|
||||||
self.msg_queues
|
match self.redis_connection.poll_redis() {
|
||||||
.values_mut()
|
Ok(Async::NotReady) => break,
|
||||||
.filter(|msg_queue| msg_queue.timeline == timeline)
|
Ok(Async::Ready(Some((timeline, event)))) => {
|
||||||
.for_each(|msg_queue| {
|
self.msg_queues
|
||||||
msg_queue.messages.push_back(event.clone());
|
.values_mut()
|
||||||
});
|
.filter(|msg_queue| msg_queue.timeline == timeline)
|
||||||
|
.for_each(|msg_queue| {
|
||||||
|
msg_queue.messages.push_back(event.clone());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(Async::Ready(None)) => (), // subscription cmd or msg for other namespace
|
||||||
|
Err(err) => Err(err)?,
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => break,
|
|
||||||
Ok(Async::Ready(None)) => (),
|
|
||||||
Err(err) => Err(err)?,
|
|
||||||
}
|
}
|
||||||
|
// polled_redis = true;
|
||||||
|
self.redis_polled_at = Instant::now();
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the `msg_queue` being polled has any new messages, return the first (oldest) one
|
// If the `msg_queue` being polled has any new messages, return the first (oldest) one
|
||||||
match self.msg_queues.get_mut(&id) {
|
let msg_q = self.msg_queues.get_mut(&id).ok_or(ReceiverErr::InvalidId)?;
|
||||||
Some(msg_q) => match msg_q.messages.pop_front() {
|
let res = match msg_q.messages.pop_front() {
|
||||||
Some(event) => Ok(Async::Ready(Some(event))),
|
Some(event) => Ok(Async::Ready(Some(event))),
|
||||||
None => Ok(Async::NotReady),
|
None => Ok(Async::NotReady),
|
||||||
},
|
};
|
||||||
None => {
|
// if !polled_redis {
|
||||||
log::error!("Polled a MsgQueue that had not been set up. Setting it up now.");
|
// log::info!("poll_for in {:?}", t1.elapsed());
|
||||||
self.msg_queues.insert(id, MsgQueue::new(timeline));
|
// }
|
||||||
Ok(Async::NotReady)
|
res
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn count_connections(&self) -> String {
|
pub fn count_connections(&self) -> String {
|
||||||
|
|
|
@ -14,7 +14,7 @@ use std::{
|
||||||
io::{Read, Write},
|
io::{Read, Write},
|
||||||
net::TcpStream,
|
net::TcpStream,
|
||||||
str,
|
str,
|
||||||
time::{Duration, Instant},
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{Async, Poll};
|
use futures::{Async, Poll};
|
||||||
|
@ -26,8 +26,6 @@ type Result<T> = std::result::Result<T, RedisConnErr>;
|
||||||
pub struct RedisConn {
|
pub struct RedisConn {
|
||||||
primary: TcpStream,
|
primary: TcpStream,
|
||||||
secondary: TcpStream,
|
secondary: TcpStream,
|
||||||
redis_poll_interval: Duration,
|
|
||||||
redis_polled_at: Instant,
|
|
||||||
redis_namespace: Option<String>,
|
redis_namespace: Option<String>,
|
||||||
tag_id_cache: LruCache<String, i64>,
|
tag_id_cache: LruCache<String, i64>,
|
||||||
tag_name_cache: LruCache<i64, String>,
|
tag_name_cache: LruCache<i64, String>,
|
||||||
|
@ -49,21 +47,32 @@ impl RedisConn {
|
||||||
// the tag number instead of the tag name. This would save us from dealing
|
// the tag number instead of the tag name. This would save us from dealing
|
||||||
// with a cache here and would be consistent with how lists/users are handled.
|
// with a cache here and would be consistent with how lists/users are handled.
|
||||||
redis_namespace: redis_cfg.namespace.clone(),
|
redis_namespace: redis_cfg.namespace.clone(),
|
||||||
redis_poll_interval: *redis_cfg.polling_interval,
|
|
||||||
redis_input: Vec::new(),
|
redis_input: Vec::new(),
|
||||||
redis_polled_at: Instant::now(),
|
|
||||||
};
|
};
|
||||||
Ok(redis_conn)
|
Ok(redis_conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, ReceiverErr> {
|
pub fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, ReceiverErr> {
|
||||||
let mut buffer = vec![0u8; 6000];
|
let mut size = 100; // large enough to handle subscribe/unsubscribe notice
|
||||||
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
|
let (mut buffer, mut first_read) = (vec![0u8; size], true);
|
||||||
if let Ok(bytes_read) = self.primary.read(&mut buffer) {
|
loop {
|
||||||
self.redis_input.extend_from_slice(&buffer[..bytes_read]);
|
match self.primary.read(&mut buffer) {
|
||||||
|
Ok(n) if n != size => {
|
||||||
|
self.redis_input.extend_from_slice(&buffer[..n]);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(n) => {
|
||||||
|
self.redis_input.extend_from_slice(&buffer[..n]);
|
||||||
|
}
|
||||||
|
Err(_) => break,
|
||||||
};
|
};
|
||||||
self.redis_polled_at = Instant::now();
|
if first_read {
|
||||||
|
size = 2000;
|
||||||
|
buffer = vec![0u8; size];
|
||||||
|
first_read = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.redis_input.is_empty() {
|
if self.redis_input.is_empty() {
|
||||||
return Ok(Async::NotReady);
|
return Ok(Async::NotReady);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue