Refacto RedisConn [WIP]

This commit is contained in:
Daniel Sockwell 2020-04-13 11:34:38 -04:00
parent 829d193ec3
commit eaa308b3eb
3 changed files with 37 additions and 31 deletions

View File

@ -5,7 +5,7 @@ use flodgatt::request::{Handler, Subscription, Timeline};
use flodgatt::response::redis;
use flodgatt::response::stream;
use futures::{future::lazy, stream::Stream as _Stream};
use futures::{future::lazy, stream::Stream as _};
use std::fs;
use std::net::SocketAddr;
use std::os::unix::fs::PermissionsExt;

View File

@ -1,12 +1,13 @@
mod err;
pub use err::RedisConnErr;
use super::msg::{RedisParseErr, RedisParseOutput};
use super::msg::{RedisMsg, RedisParseErr, RedisParseOutput};
use super::ManagerErr;
use crate::config::Redis;
use crate::event::Event;
use crate::request::{Stream, Timeline};
use futures::{Async, Poll};
use futures::Async;
use lru::LruCache;
use std::convert::{TryFrom, TryInto};
use std::io::{Read, Write};
@ -15,6 +16,7 @@ use std::str;
use std::time::Duration;
type Result<T> = std::result::Result<T, RedisConnErr>;
type Poll = futures::Poll<Option<(Timeline, Event)>, ManagerErr>;
#[derive(Debug)]
pub struct RedisConn {
@ -46,15 +48,12 @@ impl RedisConn {
Ok(redis_conn)
}
pub fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, ManagerErr> {
pub fn poll_redis(&mut self) -> Poll {
let mut size = 100; // large enough to handle subscribe/unsubscribe notice
let (mut buffer, mut first_read) = (vec![0u8; size], true);
loop {
match self.primary.read(&mut buffer) {
Ok(n) if n != size => {
self.redis_input.extend_from_slice(&buffer[..n]);
break;
}
Ok(n) if n != size => break self.redis_input.extend_from_slice(&buffer[..n]),
Ok(n) => self.redis_input.extend_from_slice(&buffer[..n]),
Err(_) => break,
};
@ -68,6 +67,7 @@ impl RedisConn {
if self.redis_input.is_empty() {
return Ok(Async::NotReady);
}
let input = self.redis_input.clone();
self.redis_input.clear();
@ -82,16 +82,12 @@ impl RedisConn {
let (res, leftover) = match RedisParseOutput::try_from(input) {
Ok(Msg(msg)) => match &self.redis_namespace {
Some(ns) if msg.timeline_txt.starts_with(&format!("{}:timeline:", ns)) => {
let trimmed_tl_txt = &msg.timeline_txt[ns.len() + ":timeline:".len()..];
let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut self.tag_id_cache)?;
let event = msg.event_txt.try_into()?;
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
let trimmed_tl = &msg.timeline_txt[ns.len() + ":timeline:".len()..];
(self.into_tl_event(trimmed_tl, &msg)?, msg.leftover_input)
}
None => {
let trimmed_tl_txt = &msg.timeline_txt["timeline:".len()..];
let tl = Timeline::from_redis_text(trimmed_tl_txt, &mut self.tag_id_cache)?;
let event = msg.event_txt.try_into()?;
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
let trimmed_tl = &msg.timeline_txt["timeline:".len()..];
(self.into_tl_event(trimmed_tl, &msg)?, msg.leftover_input)
}
Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input),
},
@ -104,6 +100,12 @@ impl RedisConn {
res
}
fn into_tl_event<'a>(&mut self, tl: &'a str, msg: &'a RedisMsg) -> Result<Poll> {
let tl = Timeline::from_redis_text(tl, &mut self.tag_id_cache)?;
let event = msg.event_txt.try_into().expect("TODO");
Ok(Ok(Async::Ready(Some((tl, event)))))
}
pub fn update_cache(&mut self, hashtag: String, id: i64) {
self.tag_id_cache.put(hashtag.clone(), id);
self.tag_name_cache.put(id, hashtag);
@ -116,18 +118,9 @@ impl RedisConn {
};
let tl = timeline.to_redis_raw_timeline(hashtag)?;
let (primary_cmd, secondary_cmd) = match cmd {
RedisCmd::Subscribe => (
format!("*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl),
format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n1\r\n", tl.len(), tl),
),
RedisCmd::Unsubscribe => (
format!("*2\r\n$11\r\nunsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl),
format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n0\r\n", tl.len(), tl),
),
};
self.primary.write_all(&primary_cmd.as_bytes())?;
self.secondary.write_all(&secondary_cmd.as_bytes())?;
let (primary_cmd, secondary_cmd) = cmd.into_sendable(&tl);
self.primary.write_all(&primary_cmd)?;
self.secondary.write_all(&secondary_cmd)?;
Ok(())
}
@ -174,3 +167,18 @@ pub enum RedisCmd {
Subscribe,
Unsubscribe,
}
impl RedisCmd {
pub fn into_sendable(&self, tl: &String) -> (Vec<u8>, Vec<u8>) {
match self {
RedisCmd::Subscribe => (
format!("*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl).into_bytes(),
format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n1\r\n", tl.len(), tl).into_bytes(),
),
RedisCmd::Unsubscribe => (
format!("*2\r\n$11\r\nunsubscribe\r\n${}\r\n{}\r\n", tl.len(), tl).into_bytes(),
format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n0\r\n", tl.len(), tl).into_bytes(),
),
}
}
}

View File

@ -28,9 +28,8 @@ pub struct Manager {
}
impl Manager {
/// Create a new `Receiver`, with its own Redis connections (but, as yet, no
/// Create a new `Manager`, with its own Redis connections (but, as yet, no
/// active subscriptions).
pub fn try_from(
redis_cfg: config::Redis,
tx: watch::Sender<(Timeline, Event)>,
@ -38,7 +37,6 @@ impl Manager {
) -> Result<Self> {
Ok(Self {
redis_connection: RedisConn::new(redis_cfg)?,
clients_per_timeline: HashMap::new(),
tx,
rx,