diff --git a/src/response/redis/connection.rs b/src/response/redis/connection.rs index c7edbf4..8bf0697 100644 --- a/src/response/redis/connection.rs +++ b/src/response/redis/connection.rs @@ -22,8 +22,7 @@ pub(super) struct RedisConn { // TODO: eventually, it might make sense to have Mastodon publish to timelines with // the tag number instead of the tag name. This would save us from dealing // with a cache here and would be consistent with how lists/users are handled. - pub(super) tag_id_cache: LruCache, - tag_name_cache: LruCache, + pub(super) tag_name_cache: LruCache, pub(super) input: Vec, } @@ -37,7 +36,6 @@ impl RedisConn { Ok(Self { primary: conn, secondary: Self::new_connection(&addr, redis_cfg.password.as_ref())?, - tag_id_cache: LruCache::new(1000), tag_name_cache: LruCache::new(1000), namespace: redis_cfg.namespace.clone().0, input: vec![0; 4096 * 4], @@ -62,11 +60,6 @@ impl RedisConn { } } - pub(super) fn update_cache(&mut self, hashtag: String, id: i64) { - self.tag_id_cache.put(hashtag.clone(), id); - self.tag_name_cache.put(id, hashtag); - } - pub(crate) fn send_cmd(&mut self, cmd: RedisCmd, timelines: &[Timeline]) -> Result<()> { let namespace = self.namespace.take(); let timelines: Result> = timelines diff --git a/src/response/redis/manager.rs b/src/response/redis/manager.rs index 8a4f9b6..27ded0c 100644 --- a/src/response/redis/manager.rs +++ b/src/response/redis/manager.rs @@ -13,6 +13,7 @@ pub(self) use super::EventErr; use futures::{Async, Poll, Stream}; use hashbrown::{HashMap, HashSet}; +use lru::LruCache; use std::convert::{TryFrom, TryInto}; use std::str; use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; @@ -29,6 +30,7 @@ pub struct Manager { ping_time: Instant, channel_id: u32, unread_idx: (usize, usize), + tag_id_cache: LruCache, } impl Stream for Manager { @@ -47,27 +49,20 @@ impl Stream for Manager { let mut unread = str::from_utf8(input).unwrap_or_else(|e| { str::from_utf8(input.split_at(e.valid_up_to()).0).expect("guaranteed by `split_at`") }); - let tag_id_cache = &mut self.redis_conn.tag_id_cache; - let redis_namespace = &self.redis_conn.namespace; + while !unread.is_empty() { use RedisParseOutput::*; match RedisParseOutput::try_from(unread) { Ok(Msg(msg)) => { - let tl_matching_ns = match redis_namespace { - Some(ns) if msg.timeline_txt.starts_with(ns) => { - Some(&msg.timeline_txt[ns.len() + ":timeline:".len()..]) - } - None => Some(&msg.timeline_txt["timeline:".len()..]), - Some(_non_matching_ns) => None, - }; - - if let Some(trimmed_tl) = tl_matching_ns { - let tl = Timeline::from_redis_text(trimmed_tl, tag_id_cache)?; + // If we get a message and it matches the redis_namespace, get the msg's + // Event and send it to all channels matching the msg's Timeline + if let Some(tl) = msg.timeline_matching_ns(&self.redis_conn.namespace) { + let tl = Timeline::from_redis_text(tl, &mut self.tag_id_cache)?; let event: Arc = Arc::new(msg.event_txt.try_into()?); if let Some(channels) = self.timelines.get_mut(&tl) { - for (_id, channel) in channels { + for channel in channels.values_mut() { if let Ok(Async::NotReady) = channel.poll_ready() { - log::warn!("{:?} channel full", tl); + log::warn!("{:?} channel full\ncan't send:{:?}", tl, event); return Ok(Async::NotReady); } let _ = channel.try_send(event.clone()); // err just means channel will be closed @@ -78,33 +73,38 @@ impl Stream for Manager { } Ok(NonMsg(leftover_input)) => unread = leftover_input, Err(RedisParseErr::Incomplete) => { - if self.unread_idx.0 == 0 { - // msg already first; no copying needed - } else if self.unread_idx.0 >= (self.unread_idx.1 - self.unread_idx.0) { - let (read, unread) = self.redis_conn.input[..self.unread_idx.1] - .split_at_mut(self.unread_idx.0); - for (i, b) in unread.iter().enumerate() { - read[i] = *b; - } - } else { - // Less efficient, but should never occur in production - log::warn!("Moving partial input requires heap allocation"); - self.redis_conn.input = - self.redis_conn.input[self.unread_idx.0..].into(); - } - self.unread_idx = (0, self.unread_idx.1 - self.unread_idx.0); + self.copy_partial_msg(); unread = ""; } Err(e) => Err(Error::RedisParseErr(e, unread.to_string()))?, }; self.unread_idx.0 = self.unread_idx.1 - unread.len(); } + if self.unread_idx.0 == self.unread_idx.1 { + self.unread_idx = (0, 0) + } } Ok(Async::Ready(Some(()))) } } impl Manager { + fn copy_partial_msg(&mut self) { + if self.unread_idx.0 == 0 { + // msg already first; no copying needed + } else if self.unread_idx.0 >= (self.unread_idx.1 - self.unread_idx.0) { + let (read, unread) = + self.redis_conn.input[..self.unread_idx.1].split_at_mut(self.unread_idx.0); + for (i, b) in unread.iter().enumerate() { + read[i] = *b; + } + } else { + // Less efficient, but should never occur in production + log::warn!("Moving partial input requires heap allocation"); + self.redis_conn.input = self.redis_conn.input[self.unread_idx.0..].into(); + } + self.unread_idx = (0, self.unread_idx.1 - self.unread_idx.0); + } /// Create a new `Manager`, with its own Redis connections (but no active subscriptions). pub fn try_from(redis_cfg: &config::Redis) -> Result { Ok(Self { @@ -113,6 +113,7 @@ impl Manager { ping_time: Instant::now(), channel_id: 0, unread_idx: (0, 0), + tag_id_cache: LruCache::new(1000), }) } @@ -123,7 +124,8 @@ impl Manager { pub fn subscribe(&mut self, subscription: &Subscription, channel: EventChannel) { let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); if let (Some(hashtag), Some(id)) = (tag, tl.tag()) { - self.redis_conn.update_cache(hashtag, id); + self.tag_id_cache.put(hashtag.clone(), id); + self.redis_conn.tag_name_cache.put(id, hashtag); }; let channels = self.timelines.entry(tl).or_default(); diff --git a/src/response/redis/msg.rs b/src/response/redis/msg.rs index 8af186b..50b4ae0 100644 --- a/src/response/redis/msg.rs +++ b/src/response/redis/msg.rs @@ -39,6 +39,18 @@ pub(crate) struct RedisMsg<'a> { pub(crate) leftover_input: &'a str, } +impl<'a> RedisMsg<'a> { + pub(super) fn timeline_matching_ns(&self, namespace: &Option) -> Option<&str> { + match namespace { + Some(ns) if self.timeline_txt.starts_with(ns) => { + Some(&self.timeline_txt[ns.len() + ":timeline:".len()..]) + } + None => Some(&self.timeline_txt["timeline:".len()..]), + Some(_non_matching_ns) => None, + } + } +} + impl<'a> TryFrom<&'a str> for RedisParseOutput<'a> { type Error = RedisParseErr; fn try_from(utf8: &'a str) -> Result, Self::Error> {