mirror of https://github.com/mastodon/flodgatt
Hotfix of previous commit
This commit is contained in:
parent
66553408fb
commit
2221723c32
|
@ -22,8 +22,7 @@ pub(super) struct RedisConn {
|
|||
// TODO: eventually, it might make sense to have Mastodon publish to timelines with
|
||||
// the tag number instead of the tag name. This would save us from dealing
|
||||
// with a cache here and would be consistent with how lists/users are handled.
|
||||
pub(super) tag_id_cache: LruCache<String, i64>,
|
||||
tag_name_cache: LruCache<i64, String>,
|
||||
pub(super) tag_name_cache: LruCache<i64, String>,
|
||||
pub(super) input: Vec<u8>,
|
||||
}
|
||||
|
||||
|
@ -37,7 +36,6 @@ impl RedisConn {
|
|||
Ok(Self {
|
||||
primary: conn,
|
||||
secondary: Self::new_connection(&addr, redis_cfg.password.as_ref())?,
|
||||
tag_id_cache: LruCache::new(1000),
|
||||
tag_name_cache: LruCache::new(1000),
|
||||
namespace: redis_cfg.namespace.clone().0,
|
||||
input: vec![0; 4096 * 4],
|
||||
|
@ -62,11 +60,6 @@ impl RedisConn {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) fn update_cache(&mut self, hashtag: String, id: i64) {
|
||||
self.tag_id_cache.put(hashtag.clone(), id);
|
||||
self.tag_name_cache.put(id, hashtag);
|
||||
}
|
||||
|
||||
pub(crate) fn send_cmd(&mut self, cmd: RedisCmd, timelines: &[Timeline]) -> Result<()> {
|
||||
let namespace = self.namespace.take();
|
||||
let timelines: Result<Vec<String>> = timelines
|
||||
|
|
|
@ -13,6 +13,7 @@ pub(self) use super::EventErr;
|
|||
|
||||
use futures::{Async, Poll, Stream};
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use lru::LruCache;
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use std::str;
|
||||
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
|
||||
|
@ -29,6 +30,7 @@ pub struct Manager {
|
|||
ping_time: Instant,
|
||||
channel_id: u32,
|
||||
unread_idx: (usize, usize),
|
||||
tag_id_cache: LruCache<String, i64>,
|
||||
}
|
||||
|
||||
impl Stream for Manager {
|
||||
|
@ -47,27 +49,20 @@ impl Stream for Manager {
|
|||
let mut unread = str::from_utf8(input).unwrap_or_else(|e| {
|
||||
str::from_utf8(input.split_at(e.valid_up_to()).0).expect("guaranteed by `split_at`")
|
||||
});
|
||||
let tag_id_cache = &mut self.redis_conn.tag_id_cache;
|
||||
let redis_namespace = &self.redis_conn.namespace;
|
||||
|
||||
while !unread.is_empty() {
|
||||
use RedisParseOutput::*;
|
||||
match RedisParseOutput::try_from(unread) {
|
||||
Ok(Msg(msg)) => {
|
||||
let tl_matching_ns = match redis_namespace {
|
||||
Some(ns) if msg.timeline_txt.starts_with(ns) => {
|
||||
Some(&msg.timeline_txt[ns.len() + ":timeline:".len()..])
|
||||
}
|
||||
None => Some(&msg.timeline_txt["timeline:".len()..]),
|
||||
Some(_non_matching_ns) => None,
|
||||
};
|
||||
|
||||
if let Some(trimmed_tl) = tl_matching_ns {
|
||||
let tl = Timeline::from_redis_text(trimmed_tl, tag_id_cache)?;
|
||||
// If we get a message and it matches the redis_namespace, get the msg's
|
||||
// Event and send it to all channels matching the msg's Timeline
|
||||
if let Some(tl) = msg.timeline_matching_ns(&self.redis_conn.namespace) {
|
||||
let tl = Timeline::from_redis_text(tl, &mut self.tag_id_cache)?;
|
||||
let event: Arc<Event> = Arc::new(msg.event_txt.try_into()?);
|
||||
if let Some(channels) = self.timelines.get_mut(&tl) {
|
||||
for (_id, channel) in channels {
|
||||
for channel in channels.values_mut() {
|
||||
if let Ok(Async::NotReady) = channel.poll_ready() {
|
||||
log::warn!("{:?} channel full", tl);
|
||||
log::warn!("{:?} channel full\ncan't send:{:?}", tl, event);
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
let _ = channel.try_send(event.clone()); // err just means channel will be closed
|
||||
|
@ -78,33 +73,38 @@ impl Stream for Manager {
|
|||
}
|
||||
Ok(NonMsg(leftover_input)) => unread = leftover_input,
|
||||
Err(RedisParseErr::Incomplete) => {
|
||||
if self.unread_idx.0 == 0 {
|
||||
// msg already first; no copying needed
|
||||
} else if self.unread_idx.0 >= (self.unread_idx.1 - self.unread_idx.0) {
|
||||
let (read, unread) = self.redis_conn.input[..self.unread_idx.1]
|
||||
.split_at_mut(self.unread_idx.0);
|
||||
for (i, b) in unread.iter().enumerate() {
|
||||
read[i] = *b;
|
||||
}
|
||||
} else {
|
||||
// Less efficient, but should never occur in production
|
||||
log::warn!("Moving partial input requires heap allocation");
|
||||
self.redis_conn.input =
|
||||
self.redis_conn.input[self.unread_idx.0..].into();
|
||||
}
|
||||
self.unread_idx = (0, self.unread_idx.1 - self.unread_idx.0);
|
||||
self.copy_partial_msg();
|
||||
unread = "";
|
||||
}
|
||||
Err(e) => Err(Error::RedisParseErr(e, unread.to_string()))?,
|
||||
};
|
||||
self.unread_idx.0 = self.unread_idx.1 - unread.len();
|
||||
}
|
||||
if self.unread_idx.0 == self.unread_idx.1 {
|
||||
self.unread_idx = (0, 0)
|
||||
}
|
||||
}
|
||||
Ok(Async::Ready(Some(())))
|
||||
}
|
||||
}
|
||||
|
||||
impl Manager {
|
||||
fn copy_partial_msg(&mut self) {
|
||||
if self.unread_idx.0 == 0 {
|
||||
// msg already first; no copying needed
|
||||
} else if self.unread_idx.0 >= (self.unread_idx.1 - self.unread_idx.0) {
|
||||
let (read, unread) =
|
||||
self.redis_conn.input[..self.unread_idx.1].split_at_mut(self.unread_idx.0);
|
||||
for (i, b) in unread.iter().enumerate() {
|
||||
read[i] = *b;
|
||||
}
|
||||
} else {
|
||||
// Less efficient, but should never occur in production
|
||||
log::warn!("Moving partial input requires heap allocation");
|
||||
self.redis_conn.input = self.redis_conn.input[self.unread_idx.0..].into();
|
||||
}
|
||||
self.unread_idx = (0, self.unread_idx.1 - self.unread_idx.0);
|
||||
}
|
||||
/// Create a new `Manager`, with its own Redis connections (but no active subscriptions).
|
||||
pub fn try_from(redis_cfg: &config::Redis) -> Result<Self> {
|
||||
Ok(Self {
|
||||
|
@ -113,6 +113,7 @@ impl Manager {
|
|||
ping_time: Instant::now(),
|
||||
channel_id: 0,
|
||||
unread_idx: (0, 0),
|
||||
tag_id_cache: LruCache::new(1000),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -123,7 +124,8 @@ impl Manager {
|
|||
pub fn subscribe(&mut self, subscription: &Subscription, channel: EventChannel) {
|
||||
let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline);
|
||||
if let (Some(hashtag), Some(id)) = (tag, tl.tag()) {
|
||||
self.redis_conn.update_cache(hashtag, id);
|
||||
self.tag_id_cache.put(hashtag.clone(), id);
|
||||
self.redis_conn.tag_name_cache.put(id, hashtag);
|
||||
};
|
||||
|
||||
let channels = self.timelines.entry(tl).or_default();
|
||||
|
|
|
@ -39,6 +39,18 @@ pub(crate) struct RedisMsg<'a> {
|
|||
pub(crate) leftover_input: &'a str,
|
||||
}
|
||||
|
||||
impl<'a> RedisMsg<'a> {
|
||||
pub(super) fn timeline_matching_ns(&self, namespace: &Option<String>) -> Option<&str> {
|
||||
match namespace {
|
||||
Some(ns) if self.timeline_txt.starts_with(ns) => {
|
||||
Some(&self.timeline_txt[ns.len() + ":timeline:".len()..])
|
||||
}
|
||||
None => Some(&self.timeline_txt["timeline:".len()..]),
|
||||
Some(_non_matching_ns) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> TryFrom<&'a str> for RedisParseOutput<'a> {
|
||||
type Error = RedisParseErr;
|
||||
fn try_from(utf8: &'a str) -> Result<RedisParseOutput<'a>, Self::Error> {
|
||||
|
|
Loading…
Reference in New Issue