Clean up Receiver's use of Timeline

This commit is contained in:
Daniel Sockwell 2020-03-18 15:43:44 -04:00
parent bef5cfa45b
commit 3ca81f71e9
2 changed files with 54 additions and 41 deletions

View File

@ -54,7 +54,7 @@ impl Timeline {
use {Content::*, Reach::*, Stream::*};
Self(Unset, Local, Notification)
}
pub fn from_redis_channel(raw_timeline: &str, hashtag: Option<i64>) -> Self {
pub fn from_redis_str(raw_timeline: &str, hashtag: Option<i64>) -> Self {
use {Content::*, Reach::*, Stream::*};
match raw_timeline.split(':').collect::<Vec<&str>>()[..] {
["public"] => Timeline(Public, Federated, All),
@ -70,7 +70,7 @@ impl Timeline {
[..] => log_fatal!("Unexpected channel from Redis: {}", raw_timeline),
}
}
pub fn to_redis_channel(&self, hashtag: Option<&String>) -> String {
pub fn to_redis_str(&self, hashtag: Option<&String>) -> String {
use {Content::*, Reach::*, Stream::*};
match self {
Timeline(User(id), Federated, All) => format!("timeline:{}", id),

View File

@ -4,6 +4,7 @@
mod message_queues;
use crate::{
config::{self, RedisInterval},
log_fatal,
parse_client_request::user::{self, postgres, PgPool, Timeline},
pubsub_cmd,
redis_to_client_stream::redis::{redis_cmd, RedisConn, RedisStream},
@ -64,7 +65,7 @@ impl Receiver {
manager_id: Uuid::default(),
msg_queues: MessageQueues(HashMap::new()),
clients_per_timeline: HashMap::new(),
cache: Cache::new(100),
cache: Cache::new(1000), // should this be a run-time option?
pool,
}
}
@ -90,6 +91,26 @@ impl Receiver {
self.timeline = timeline;
}
fn if_hashtag_timeline_get_hashtag_name(&mut self, timeline: Timeline) -> Option<String> {
use user::Stream::*;
if let Timeline(Hashtag(id), _, _) = timeline {
let cached_tag = self.cache.id_to_hashtag.get(&id).map(String::from);
let tag = match cached_tag {
Some(tag) => tag,
None => {
let new_tag =
postgres::select_hashtag_name(&id, self.pool.clone()).expect("TODO");
self.cache.hashtag_to_id.put(new_tag.clone(), id);
self.cache.id_to_hashtag.put(id, new_tag.clone());
new_tag.to_string()
}
};
Some(tag)
} else {
None
}
}
/// Drop any PubSub subscriptions that don't have active clients and check
/// that there's a subscription to the current one. If there isn't, then
/// subscribe to it.
@ -99,42 +120,21 @@ impl Receiver {
// Record the lower number of clients subscribed to that channel
for change in timelines_to_modify {
let timeline = change.timeline;
let opt_hashtag = self.if_hashtag_timeline_get_hashtag_name(timeline);
let opt_hashtag = opt_hashtag.as_ref();
let count_of_subscribed_clients = self
.clients_per_timeline
.entry(change.timeline.clone())
.entry(timeline)
.and_modify(|n| *n += change.in_subscriber_number)
.or_insert_with(|| 1);
use user::Stream::*;
let hashtag = match timeline {
Timeline(Hashtag(id), _, _) => {
// TODO clean this up
let maybe_tag = self.cache.id_to_hashtag.get(&id).map(String::from);
let pool = self.pool.clone();
let tag = maybe_tag.unwrap_or_else(|| {
let tag = &postgres::select_hashtag_name(&id, pool).expect("TODO");
tag.to_string()
});
self.cache.hashtag_to_id.put(tag.clone(), id);
self.cache.id_to_hashtag.put(id, tag.clone());
Some(tag)
}
_ => None,
};
// If no clients, unsubscribe from the channel
if *count_of_subscribed_clients <= 0 {
pubsub_cmd!(
"unsubscribe",
self,
change.timeline.to_redis_channel(hashtag.as_ref())
);
pubsub_cmd!("unsubscribe", self, timeline.to_redis_str(opt_hashtag));
} else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 {
pubsub_cmd!(
"subscribe",
self,
change.timeline.to_redis_channel(hashtag.as_ref())
);
pubsub_cmd!("subscribe", self, timeline.to_redis_str(opt_hashtag));
}
}
if start_time.elapsed().as_millis() > 1 {
@ -155,21 +155,25 @@ impl futures::stream::Stream for Receiver {
/// message already in a queue. Thus, we only poll Redis if it has not
/// been polled lately.
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
let start_time = time::Instant::now();
let (timeline, id) = (self.timeline.clone(), self.manager_id);
if self.redis_polled_at.elapsed() > *self.redis_poll_interval {
for (raw_timeline, msg_value) in self.pubsub_connection.poll_redis() {
let hashtag = if raw_timeline.starts_with("hashtag") {
log::warn!("Found a hashtag in: {:?}", start_time.elapsed());
let tag_name = raw_timeline.split(':').nth(1).expect("TODO").to_string();
log::warn!("Got the tag name in: {:?}", start_time.elapsed());
let tag_id = *self.cache.hashtag_to_id.get(&tag_name).expect("TODO");
log::warn!("Got the cached id in: {:?}", start_time.elapsed());
let tag_name = raw_timeline
.split(':')
.nth(1)
.unwrap_or_else(|| log_fatal!("No hashtag found in `{}`", raw_timeline))
.to_string();
let tag_id = *self
.cache
.hashtag_to_id
.get(&tag_name)
.unwrap_or_else(|| log_fatal!("No cached id for `{}`", tag_name));
Some(tag_id)
} else {
None
};
let timeline = Timeline::from_redis_channel(&raw_timeline, hashtag);
let timeline = Timeline::from_redis_str(&raw_timeline, hashtag);
for msg_queue in self.msg_queues.values_mut() {
if msg_queue.timeline == timeline {
msg_queue.messages.push_back(msg_value.clone());
@ -192,8 +196,17 @@ impl futures::stream::Stream for Receiver {
impl Drop for Receiver {
fn drop(&mut self) {
// TODO fix for hashtags:
let hashtag = None;
pubsub_cmd!("unsubscribe", self, self.timeline.to_redis_channel(hashtag));
log::warn!(
"\
##########################################################
# #
# Drop triggered #
# (see below) #
##########################################################"
);
dbg!(&self);
let opt_hashtag = self.if_hashtag_timeline_get_hashtag_name(self.timeline);
let opt_hashtag = opt_hashtag.as_ref();
pubsub_cmd!("unsubscribe", self, self.timeline.to_redis_str(opt_hashtag));
}
}