From eaa308b3ebc21063b1f327a919b56cb582276e6f Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Mon, 13 Apr 2020 11:34:38 -0400 Subject: [PATCH] Refacto RedisConn [WIP] --- src/main.rs | 2 +- src/response/redis/connection.rs | 62 ++++++++++++++++++-------------- src/response/redis/manager.rs | 4 +-- 3 files changed, 37 insertions(+), 31 deletions(-) diff --git a/src/main.rs b/src/main.rs index 816d2d5..c8be66d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use flodgatt::request::{Handler, Subscription, Timeline}; use flodgatt::response::redis; use flodgatt::response::stream; -use futures::{future::lazy, stream::Stream as _Stream}; +use futures::{future::lazy, stream::Stream as _}; use std::fs; use std::net::SocketAddr; use std::os::unix::fs::PermissionsExt; diff --git a/src/response/redis/connection.rs b/src/response/redis/connection.rs index e955c7f..d47ce89 100644 --- a/src/response/redis/connection.rs +++ b/src/response/redis/connection.rs @@ -1,12 +1,13 @@ mod err; pub use err::RedisConnErr; -use super::msg::{RedisParseErr, RedisParseOutput}; +use super::msg::{RedisMsg, RedisParseErr, RedisParseOutput}; use super::ManagerErr; use crate::config::Redis; use crate::event::Event; use crate::request::{Stream, Timeline}; -use futures::{Async, Poll}; + +use futures::Async; use lru::LruCache; use std::convert::{TryFrom, TryInto}; use std::io::{Read, Write}; @@ -15,6 +16,7 @@ use std::str; use std::time::Duration; type Result = std::result::Result; +type Poll = futures::Poll, ManagerErr>; #[derive(Debug)] pub struct RedisConn { @@ -46,15 +48,12 @@ impl RedisConn { Ok(redis_conn) } - pub fn poll_redis(&mut self) -> Poll, ManagerErr> { + pub fn poll_redis(&mut self) -> Poll { let mut size = 100; // large enough to handle subscribe/unsubscribe notice let (mut buffer, mut first_read) = (vec![0u8; size], true); loop { match self.primary.read(&mut buffer) { - Ok(n) if n != size => { - self.redis_input.extend_from_slice(&buffer[..n]); - break; - } + Ok(n) if n != size => break self.redis_input.extend_from_slice(&buffer[..n]), Ok(n) => self.redis_input.extend_from_slice(&buffer[..n]), Err(_) => break, }; @@ -68,6 +67,7 @@ impl RedisConn { if self.redis_input.is_empty() { return Ok(Async::NotReady); } + let input = self.redis_input.clone(); self.redis_input.clear(); @@ -82,16 +82,12 @@ impl RedisConn { let (res, leftover) = match RedisParseOutput::try_from(input) { Ok(Msg(msg)) => match &self.redis_namespace { Some(ns) if msg.timeline_txt.starts_with(&format!("{}:timeline:", ns)) => { - let trimmed_tl_txt = &msg.timeline_txt[ns.len() + ":timeline:".len()..]; - let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut self.tag_id_cache)?; - let event = msg.event_txt.try_into()?; - (Ok(Ready(Some((tl, event)))), msg.leftover_input) + let trimmed_tl = &msg.timeline_txt[ns.len() + ":timeline:".len()..]; + (self.into_tl_event(trimmed_tl, &msg)?, msg.leftover_input) } None => { - let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..]; - let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut self.tag_id_cache)?; - let event = msg.event_txt.try_into()?; - (Ok(Ready(Some((tl, event)))), msg.leftover_input) + let trimmed_tl = &msg.timeline_txt["timeline:".len()..]; + (self.into_tl_event(trimmed_tl, &msg)?, msg.leftover_input) } Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input), }, @@ -104,6 +100,12 @@ impl RedisConn { res } + fn into_tl_event<'a>(&mut self, tl: &'a str, msg: &'a RedisMsg) -> Result { + let tl = Timeline::from_redis_text(tl, &mut self.tag_id_cache)?; + let event = msg.event_txt.try_into().expect("TODO"); + Ok(Ok(Async::Ready(Some((tl, event))))) + } + pub fn update_cache(&mut self, hashtag: String, id: i64) { self.tag_id_cache.put(hashtag.clone(), id); self.tag_name_cache.put(id, hashtag); @@ -116,18 +118,9 @@ impl RedisConn { }; let tl = timeline.to_redis_raw_timeline(hashtag)?; - let (primary_cmd, secondary_cmd) = match cmd { - RedisCmd::Subscribe => ( - format!("*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl), - format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n1\r\n", tl.len(), tl), - ), - RedisCmd::Unsubscribe => ( - format!("*2\r\n$11\r\nunsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl), - format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n0\r\n", tl.len(), tl), - ), - }; - self.primary.write_all(&primary_cmd.as_bytes())?; - self.secondary.write_all(&secondary_cmd.as_bytes())?; + let (primary_cmd, secondary_cmd) = cmd.into_sendable(&tl); + self.primary.write_all(&primary_cmd)?; + self.secondary.write_all(&secondary_cmd)?; Ok(()) } @@ -174,3 +167,18 @@ pub enum RedisCmd { Subscribe, Unsubscribe, } + +impl RedisCmd { + pub fn into_sendable(&self, tl: &String) -> (Vec, Vec) { + match self { + RedisCmd::Subscribe => ( + format!("*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl).into_bytes(), + format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n1\r\n", tl.len(), tl).into_bytes(), + ), + RedisCmd::Unsubscribe => ( + format!("*2\r\n$11\r\nunsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl).into_bytes(), + format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n0\r\n", tl.len(), tl).into_bytes(), + ), + } + } +} diff --git a/src/response/redis/manager.rs b/src/response/redis/manager.rs index 4d45018..c0fc6d8 100644 --- a/src/response/redis/manager.rs +++ b/src/response/redis/manager.rs @@ -28,9 +28,8 @@ pub struct Manager { } impl Manager { - /// Create a new `Receiver`, with its own Redis connections (but, as yet, no + /// Create a new `Manager`, with its own Redis connections (but, as yet, no /// active subscriptions). - pub fn try_from( redis_cfg: config::Redis, tx: watch::Sender<(Timeline, Event)>, @@ -38,7 +37,6 @@ impl Manager { ) -> Result { Ok(Self { redis_connection: RedisConn::new(redis_cfg)?, - clients_per_timeline: HashMap::new(), tx, rx,