diff --git a/.gitignore b/.gitignore index 53eaa21..7519edc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /target +load_test.sh +/benches/perf_statistics **/*.rs.bk diff --git a/.travis.yml b/.travis.yml index d36f12a..be7e9b5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,10 @@ language: rust rust: - stable + - beta +jobs: + fast_finish: true cache: cargo +branches: + only: + - master diff --git a/Cargo.lock b/Cargo.lock index edb7703..e7dd862 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -406,7 +406,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.8.4" +version = "0.8.5" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -419,7 +419,6 @@ dependencies = [ "pretty_env_logger 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "r2d2 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)", "r2d2_postgres 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", - "regex 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 0a75465..6ee3fe1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.8.4" +version = "0.8.5" authors = ["Daniel Long Sockwell "] edition = "2018" @@ -28,13 +28,12 @@ hashbrown = "0.7.1" [dev-dependencies] criterion = "0.3" -regex = "1.3.2" + [[bench]] name = "parse_redis" harness = false - [features] default = [ "production" ] bench = [] @@ -43,7 +42,7 @@ production = [] [profile.release] lto = "fat" -#panic = "abort" +panic = "abort" codegen-units = 1 diff --git a/benches/parse_redis.rs b/benches/parse_redis.rs index 2ad4e61..df53fb9 100644 --- a/benches/parse_redis.rs +++ b/benches/parse_redis.rs @@ -1,8 +1,8 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use flodgatt::{ - messages::*, - parse_client_request::{Content::*, Reach::*, Stream::*, Timeline}, - redis_to_client_stream::{RedisMsg, RedisParseOutput}, + event::*, + request::{Content::*, Reach::*, Stream::*, Timeline}, + response::{RedisMsg, RedisParseOutput}, }; use lru::LruCache; use std::convert::TryFrom; @@ -19,16 +19,15 @@ fn parse_long_redis_input<'a>(input: &'a str) -> RedisMsg<'a> { fn parse_to_timeline(msg: RedisMsg) -> Timeline { let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..]; let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut LruCache::new(1000)).unwrap(); - assert_eq!(tl, Timeline(User(1), Federated, All)); + assert_eq!(tl, Timeline(User(Id(1)), Federated, All)); tl } fn parse_to_checked_event(msg: RedisMsg) -> Event { Event::TypeSafe(serde_json::from_str(msg.event_txt).unwrap()) } -fn parse_to_dyn_event(msg: RedisMsg) -> String { - let event: Event = Event::Dynamic(serde_json::from_str(msg.event_txt).unwrap()); - event.to_json_string() +fn parse_to_dyn_event(msg: RedisMsg) -> Event { + Event::Dynamic(serde_json::from_str(msg.event_txt).unwrap()) } fn redis_msg_to_event_string(msg: RedisMsg) -> String { @@ -43,16 +42,16 @@ fn criterion_benchmark(c: &mut Criterion) { let input = ONE_MESSAGE_FOR_THE_USER_TIMLINE_FROM_REDIS; let mut group = c.benchmark_group("Parse redis RESP array"); - // group.bench_function("parse redis input to RedisMsg", |b| { - // b.iter(|| black_box(parse_long_redis_input(input))) - // }); + group.bench_function("parse redis input to RedisMsg", |b| { + b.iter(|| black_box(parse_long_redis_input(input))) + }); let msg = parse_long_redis_input(input); - // group.bench_function("parse RedisMsg to Timeline", |b| { - // b.iter(|| black_box(parse_to_timeline(msg.clone()))) - // }); + group.bench_function("parse RedisMsg to Timeline", |b| { + b.iter(|| black_box(parse_to_timeline(msg.clone()))) + }); - group.bench_function("parse RedisMsg -> DynamicEvent -> JSON string", |b| { + group.bench_function("parse RedisMsg -> DynamicEvent", |b| { b.iter(|| black_box(parse_to_dyn_event(msg.clone()))) }); diff --git a/load_test.sample b/load_test.sample new file mode 100755 index 0000000..ed4c30a --- /dev/null +++ b/load_test.sample @@ -0,0 +1,20 @@ +#!/bin/sh +instance='127.0.0.1:4000' +timeline='public:local' +number_of_ws=300 + +command -v websocat >/dev/null || { echo >&2 "Install websocat with `cargo install websocat` to use this script"; exit 1; } + + +echo "Opening $number_of_ws WebSockets to $timeline" +for i in $(seq 0 $number_of_ws); do + sleep 0.1 + websocat wss://${instance}/api/v1/streaming/?stream=${timeline} --no-close > /dev/null & +done + +echo "$number_of_ws WebSocket connections established..." + +sleep 60 + +echo "Closing WebSockets" +echo "Done" diff --git a/src/main.rs b/src/main.rs index aef8519..f1f3e3c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -78,14 +78,23 @@ fn main() -> Result<(), FatalErr> { .allow_methods(cfg.cors.allowed_methods) .allow_headers(cfg.cors.allowed_headers); + // use futures::future::Future; let streaming_server = move || { let manager = shared_manager.clone(); let stream = Interval::new(Instant::now(), poll_freq) + // .take(1200) .map_err(|e| log::error!("{}", e)) - .for_each(move |_| { - let mut manager = manager.lock().unwrap_or_else(redis::Manager::recover); - manager.poll_broadcast().map_err(FatalErr::log) - }); + .for_each( + move |_| { + let mut manager = manager.lock().unwrap_or_else(redis::Manager::recover); + manager.poll_broadcast().map_err(FatalErr::log) + }, // ).and_then(|_| { + // log::info!("shutting down!"); + // std::process::exit(0); + // futures::future::ok(()) + // } + ); + warp::spawn(lazy(move || stream)); warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err)) }; @@ -95,7 +104,6 @@ fn main() -> Result<(), FatalErr> { fs::remove_file(socket).unwrap_or_default(); let incoming = UnixListener::bind(socket)?.incoming(); fs::set_permissions(socket, PermissionsExt::from_mode(0o666))?; - tokio::run(lazy(|| streaming_server().serve_incoming(incoming))); } else { let server_addr = SocketAddr::new(*cfg.address, *cfg.port); diff --git a/src/response/redis/connection.rs b/src/response/redis/connection.rs index f4b2dd1..86451fc 100644 --- a/src/response/redis/connection.rs +++ b/src/response/redis/connection.rs @@ -10,7 +10,7 @@ use crate::request::{Stream, Timeline}; use futures::{Async, Poll}; use lru::LruCache; use std::convert::{TryFrom, TryInto}; -use std::io::{Read, Write}; +use std::io::{self, Read, Write}; use std::net::TcpStream; use std::str; use std::time::Duration; @@ -25,6 +25,7 @@ pub struct RedisConn { tag_id_cache: LruCache, tag_name_cache: LruCache, redis_input: Vec, + cursor: usize, } impl RedisConn { @@ -43,34 +44,32 @@ impl RedisConn { // the tag number instead of the tag name. This would save us from dealing // with a cache here and would be consistent with how lists/users are handled. redis_namespace: redis_cfg.namespace.clone().0, - redis_input: Vec::new(), + redis_input: vec![0_u8; 5000], + cursor: 0, }; Ok(redis_conn) } pub fn poll_redis(&mut self) -> Poll, ManagerErr> { - let mut size = 100; // large enough to handle subscribe/unsubscribe notice - let (mut buffer, mut first_read) = (vec![0_u8; size], true); loop { - match self.primary.read(&mut buffer) { - Ok(n) if n != size => break self.redis_input.extend_from_slice(&buffer[..n]), - Ok(n) => self.redis_input.extend_from_slice(&buffer[..n]), - Err(_) => break, + match self.primary.read(&mut self.redis_input[self.cursor..]) { + Ok(n) => { + self.cursor += n; + if self.redis_input.len() - 1 == self.cursor { + self.redis_input.resize(self.redis_input.len() * 2, 0); + } else { + break; + } + } + Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => { + return Ok(Async::NotReady); + } + Err(e) => break log::error!("{}", e), }; - if first_read { - size = 2000; - buffer = vec![0_u8; size]; - first_read = false; - } } - if self.redis_input.is_empty() { - return Ok(Async::NotReady); - } - - // at this point, we have the raw bytes; now, parse what we can and leave the remainder - let input = self.redis_input.clone(); - self.redis_input.clear(); + // at this point, we have the raw bytes; now, parse a msg + let input = &self.redis_input[..self.cursor]; let (input, invalid_bytes) = str::from_utf8(&input) .map(|input| (input, &b""[..])) @@ -100,8 +99,26 @@ impl RedisConn { Err(RedisParseErr::Incomplete) => (Ok(NotReady), input), Err(other_parse_err) => (Err(ManagerErr::RedisParseErr(other_parse_err)), input), }; - self.redis_input.extend_from_slice(leftover.as_bytes()); - self.redis_input.extend_from_slice(invalid_bytes); + + self.cursor = [leftover.as_bytes(), invalid_bytes] + .concat() + .bytes() + .fold(0, |acc, cur| { + // TODO - make clearer and comment side-effect + self.redis_input[acc] = cur.expect("TODO"); + acc + 1 + }); + + // self.cursor = 0; + // for (i, byte) in [leftover.as_bytes(), invalid_bytes] + // .concat() + // .bytes() + // .enumerate() + // { + // self.redis_input[i] = byte.expect("TODO"); + // self.cursor += 1; + // } + res } diff --git a/src/response/redis/msg.rs b/src/response/redis/msg.rs index 0a592d5..cf9f336 100644 --- a/src/response/redis/msg.rs +++ b/src/response/redis/msg.rs @@ -36,8 +36,6 @@ pub enum RedisParseOutput<'a> { NonMsg(&'a str), } -// TODO -- should this impl Iterator? - #[derive(Debug, Clone, PartialEq)] pub struct RedisMsg<'a> { pub timeline_txt: &'a str,