From 160e31a20fcfadf36e77e198ec9bc5eb9a2d4022 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Wed, 15 Apr 2020 17:20:30 -0400 Subject: [PATCH] Add temporary perf metrics --- src/main.rs | 18 +++++++++++++----- src/response/redis/connection.rs | 10 +++++++++- src/response/redis/msg.rs | 2 -- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index aef8519..f1f3e3c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -78,14 +78,23 @@ fn main() -> Result<(), FatalErr> { .allow_methods(cfg.cors.allowed_methods) .allow_headers(cfg.cors.allowed_headers); + // use futures::future::Future; let streaming_server = move || { let manager = shared_manager.clone(); let stream = Interval::new(Instant::now(), poll_freq) + // .take(1200) .map_err(|e| log::error!("{}", e)) - .for_each(move |_| { - let mut manager = manager.lock().unwrap_or_else(redis::Manager::recover); - manager.poll_broadcast().map_err(FatalErr::log) - }); + .for_each( + move |_| { + let mut manager = manager.lock().unwrap_or_else(redis::Manager::recover); + manager.poll_broadcast().map_err(FatalErr::log) + }, // ).and_then(|_| { + // log::info!("shutting down!"); + // std::process::exit(0); + // futures::future::ok(()) + // } + ); + warp::spawn(lazy(move || stream)); warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err)) }; @@ -95,7 +104,6 @@ fn main() -> Result<(), FatalErr> { fs::remove_file(socket).unwrap_or_default(); let incoming = UnixListener::bind(socket)?.incoming(); fs::set_permissions(socket, PermissionsExt::from_mode(0o666))?; - tokio::run(lazy(|| streaming_server().serve_incoming(incoming))); } else { let server_addr = SocketAddr::new(*cfg.address, *cfg.port); diff --git a/src/response/redis/connection.rs b/src/response/redis/connection.rs index f4b2dd1..dc50f37 100644 --- a/src/response/redis/connection.rs +++ b/src/response/redis/connection.rs @@ -58,7 +58,7 @@ impl RedisConn { Err(_) => break, }; if first_read { - size = 2000; + size = 5000; buffer = vec![0_u8; size]; first_read = false; } @@ -115,6 +115,14 @@ impl RedisConn { Timeline(Stream::Hashtag(id), _, _) => self.tag_name_cache.get(id), _non_hashtag_timeline => None, }; + log::info!( + "RedisConn.redis_input size: {}\n\ + RedisConn.redis_input capacity: {}\n\ + RedisConn.redis_input length: {}", + std::mem::size_of_val(&self.redis_input), + self.redis_input.capacity(), + self.redis_input.len() + ); let tl = timeline.to_redis_raw_timeline(hashtag)?; let (primary_cmd, secondary_cmd) = cmd.into_sendable(&tl); diff --git a/src/response/redis/msg.rs b/src/response/redis/msg.rs index 0a592d5..cf9f336 100644 --- a/src/response/redis/msg.rs +++ b/src/response/redis/msg.rs @@ -36,8 +36,6 @@ pub enum RedisParseOutput<'a> { NonMsg(&'a str), } -// TODO -- should this impl Iterator? - #[derive(Debug, Clone, PartialEq)] pub struct RedisMsg<'a> { pub timeline_txt: &'a str,