Tweak release profile & micro optimizations

This commit is contained in:
Daniel Sockwell 2020-04-06 17:45:21 -04:00
parent d23cc40bea
commit f87cde6600
8 changed files with 92 additions and 64 deletions

10
Cargo.lock generated
View File

@ -62,7 +62,7 @@ name = "backtrace-sys"
version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)",
"cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -163,7 +163,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "cc"
version = "1.0.36"
version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -440,7 +440,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "flodgatt"
version = "0.7.0"
version = "0.7.1"
dependencies = [
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1051,7 +1051,7 @@ version = "0.9.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)",
"cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
"pkg-config 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
"vcpkg 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2398,7 +2398,7 @@ dependencies = [
"checksum bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "10004c15deb332055f7a4a208190aed362cf9a7c2f6ab70a305fba50e1105f38"
"checksum c2-chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7d64d04786e0f528460fc884753cf8dddcc466be308f6026f8e355c41a0e4101"
"checksum cast 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "926013f2860c46252efceabb19f4a6b308197505082c609025aa6706c011d427"
"checksum cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)" = "a0c56216487bb80eec9c4516337b2588a4f2a2290d72a1416d930e4dcdb0c90d"
"checksum cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)" = "95e28fa049fda1c330bcf9d723be7663a899c4679724b34c81e9f5a326aab8cd"
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
"checksum chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "77d81f58b7301084de3b958691458a53c3f7e0b1d702f77e550b6a88e3a88abe"
"checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9"

View File

@ -40,3 +40,8 @@ default = [ "production" ]
bench = []
stub_status = []
production = []
[profile.release]
lto = "fat"
panic = "abort"

View File

@ -59,6 +59,13 @@ pub struct Subscription {
pub access_token: Option<String>,
}
#[derive(Clone, Default, Debug, PartialEq)]
pub struct Blocks {
pub blocked_domains: HashSet<String>,
pub blocked_users: HashSet<i64>,
pub blocking_users: HashSet<i64>,
}
impl Default for Subscription {
fn default() -> Self {
Self {
@ -301,14 +308,6 @@ pub enum Scope {
Lists,
}
#[derive(Clone, Default, Debug, PartialEq)]
pub struct Blocks {
pub blocked_domains: HashSet<String>,
pub blocked_users: HashSet<i64>,
pub blocking_users: HashSet<i64>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct UserData {
pub id: i64,
pub allowed_langs: HashSet<String>,

View File

@ -95,7 +95,7 @@ impl futures::stream::Stream for ClientAgent {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let result = {
let mut receiver = self.lock_receiver();
receiver.poll_for(self.subscription.id, self.subscription.timeline)
receiver.poll_for(self.subscription.id)
};
let timeline = &self.subscription.timeline;
@ -105,6 +105,8 @@ impl futures::stream::Stream for ClientAgent {
use crate::messages::{CheckedEvent::Update, Event::*};
match result {
Ok(NotReady) => Ok(NotReady),
Ok(Ready(None)) => Ok(Ready(None)),
Ok(Async::Ready(Some(event))) => match event {
TypeSafe(Update { payload, queued_at }) => match timeline {
Timeline(Public, _, _) if payload.language_not(allowed_langs) => block,
@ -119,9 +121,6 @@ impl futures::stream::Stream for ClientAgent {
},
Dynamic(non_update) => send(Dynamic(non_update)),
},
Ok(Ready(None)) => Ok(Ready(None)),
Ok(NotReady) => Ok(NotReady),
Err(e) => Err(e),
}
}

View File

@ -44,30 +44,32 @@ impl EventStream {
// change](github.com/tootsuite/flodgatt/issues/121) is implemented, we'll
// need to receive messages from the client. If so, we'll need a
// `receive_from_ws.poll() call here (or later)`
match client_agent.poll() {
Ok(Async::NotReady) => {
if last_ping_time.elapsed() > Duration::from_secs(30) {
last_ping_time = Instant::now();
match tx.unbounded_send(Message::text("{}")) {
Ok(_) => futures::future::ok(true),
Err(_) => client_agent.disconnect(),
}
} else {
futures::future::ok(true)
}
}
Ok(Async::Ready(Some(msg))) => {
match tx.unbounded_send(Message::text(msg.to_json_string())) {
Ok(_) => futures::future::ok(true),
Err(_) => client_agent.disconnect(),
}
}
Ok(Async::Ready(None)) => {
log::info!("WebSocket ClientAgent got Ready(None)");
futures::future::ok(true)
}
Ok(Async::NotReady) if last_ping_time.elapsed() > Duration::from_secs(30) => {
last_ping_time = Instant::now();
match tx.unbounded_send(Message::text("{}")) {
Ok(_) => futures::future::ok(true),
Err(_) => client_agent.disconnect(),
}
}
Ok(Async::NotReady) => futures::future::ok(true), // no new messages; nothing to do
Err(e) => {
log::error!("{}\n Dropping WebSocket message and continuing.", e);
futures::future::ok(true)
}
Ok(Async::Ready(None)) => {
log::info!("WebSocket ClientAgent got Ready(None)");
futures::future::ok(true)
}
}
})
.for_each(move |_instant| Ok(()))

View File

@ -6,6 +6,7 @@ use std::fmt;
#[derive(Debug)]
pub enum ReceiverErr {
InvalidId,
TimelineErr(TimelineErr),
EventErr(serde_json::Error),
RedisParseErr(RedisParseErr),
@ -16,6 +17,10 @@ impl fmt::Display for ReceiverErr {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
use ReceiverErr::*;
match self {
InvalidId => write!(
f,
"Attempted to get messages for a subscription that had not been set up."
),
EventErr(inner) => write!(f, "{}", inner),
RedisParseErr(inner) => write!(f, "{}", inner),
RedisConnErr(inner) => write!(f, "{}", inner),

View File

@ -20,6 +20,7 @@ use std::{
collections::HashMap,
result,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use uuid::Uuid;
@ -29,6 +30,8 @@ type Result<T> = result::Result<T, ReceiverErr>;
#[derive(Debug)]
pub struct Receiver {
redis_connection: RedisConn,
redis_poll_interval: Duration,
redis_polled_at: Instant,
pub msg_queues: MessageQueues,
clients_per_timeline: HashMap<Timeline, i32>,
}
@ -37,9 +40,12 @@ impl Receiver {
/// Create a new `Receiver`, with its own Redis connections (but, as yet, no
/// active subscriptions).
pub fn try_from(redis_cfg: config::RedisConfig) -> Result<Self> {
let redis_poll_interval = *redis_cfg.polling_interval;
let redis_connection = RedisConn::new(redis_cfg)?;
Ok(Self {
redis_polled_at: Instant::now(),
redis_poll_interval,
redis_connection,
msg_queues: MessageQueues(HashMap::new()),
clients_per_timeline: HashMap::new(),
@ -103,35 +109,38 @@ impl Receiver {
/// Redis is significantly more time consuming that simply returning the
/// message already in a queue. Thus, we only poll Redis if it has not
/// been polled lately.
pub fn poll_for(&mut self, id: Uuid, timeline: Timeline) -> Poll<Option<Event>, ReceiverErr> {
loop {
match self.redis_connection.poll_redis() {
Ok(Async::Ready(Some((timeline, event)))) => {
self.msg_queues
.values_mut()
.filter(|msg_queue| msg_queue.timeline == timeline)
.for_each(|msg_queue| {
msg_queue.messages.push_back(event.clone());
});
pub fn poll_for(&mut self, id: Uuid) -> Poll<Option<Event>, ReceiverErr> {
// let (t1, mut polled_redis) = (Instant::now(), false);
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
loop {
match self.redis_connection.poll_redis() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some((timeline, event)))) => {
self.msg_queues
.values_mut()
.filter(|msg_queue| msg_queue.timeline == timeline)
.for_each(|msg_queue| {
msg_queue.messages.push_back(event.clone());
});
}
Ok(Async::Ready(None)) => (), // subscription cmd or msg for other namespace
Err(err) => Err(err)?,
}
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) => (),
Err(err) => Err(err)?,
}
// polled_redis = true;
self.redis_polled_at = Instant::now();
}
// If the `msg_queue` being polled has any new messages, return the first (oldest) one
match self.msg_queues.get_mut(&id) {
Some(msg_q) => match msg_q.messages.pop_front() {
Some(event) => Ok(Async::Ready(Some(event))),
None => Ok(Async::NotReady),
},
None => {
log::error!("Polled a MsgQueue that had not been set up. Setting it up now.");
self.msg_queues.insert(id, MsgQueue::new(timeline));
Ok(Async::NotReady)
}
}
let msg_q = self.msg_queues.get_mut(&id).ok_or(ReceiverErr::InvalidId)?;
let res = match msg_q.messages.pop_front() {
Some(event) => Ok(Async::Ready(Some(event))),
None => Ok(Async::NotReady),
};
// if !polled_redis {
// log::info!("poll_for in {:?}", t1.elapsed());
// }
res
}
pub fn count_connections(&self) -> String {

View File

@ -14,7 +14,7 @@ use std::{
io::{Read, Write},
net::TcpStream,
str,
time::{Duration, Instant},
time::Duration,
};
use futures::{Async, Poll};
@ -26,8 +26,6 @@ type Result<T> = std::result::Result<T, RedisConnErr>;
pub struct RedisConn {
primary: TcpStream,
secondary: TcpStream,
redis_poll_interval: Duration,
redis_polled_at: Instant,
redis_namespace: Option<String>,
tag_id_cache: LruCache<String, i64>,
tag_name_cache: LruCache<i64, String>,
@ -49,21 +47,32 @@ impl RedisConn {
// the tag number instead of the tag name. This would save us from dealing
// with a cache here and would be consistent with how lists/users are handled.
redis_namespace: redis_cfg.namespace.clone(),
redis_poll_interval: *redis_cfg.polling_interval,
redis_input: Vec::new(),
redis_polled_at: Instant::now(),
};
Ok(redis_conn)
}
pub fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, ReceiverErr> {
let mut buffer = vec![0u8; 6000];
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
if let Ok(bytes_read) = self.primary.read(&mut buffer) {
self.redis_input.extend_from_slice(&buffer[..bytes_read]);
let mut size = 100; // large enough to handle subscribe/unsubscribe notice
let (mut buffer, mut first_read) = (vec![0u8; size], true);
loop {
match self.primary.read(&mut buffer) {
Ok(n) if n != size => {
self.redis_input.extend_from_slice(&buffer[..n]);
break;
}
Ok(n) => {
self.redis_input.extend_from_slice(&buffer[..n]);
}
Err(_) => break,
};
self.redis_polled_at = Instant::now();
if first_read {
size = 2000;
buffer = vec![0u8; size];
first_read = false;
}
}
if self.redis_input.is_empty() {
return Ok(Async::NotReady);
}