mirror of https://github.com/mastodon/flodgatt
Refactor RedisConn and Receiver
This commit is contained in:
parent
6f64c511d2
commit
e1909bf14f
|
@ -199,105 +199,117 @@ impl Timeline {
|
|||
pub fn from_redis_text(
|
||||
timeline: &str,
|
||||
cache: &mut LruCache<String, i64>,
|
||||
namespace: &Option<String>,
|
||||
) -> Result<Self, TimelineErr> {
|
||||
use crate::err::TimelineErr::RedisNamespaceMismatch;
|
||||
use {Content::*, Reach::*, Stream::*};
|
||||
let timeline_slice = &timeline.split(":").collect::<Vec<&str>>()[..];
|
||||
|
||||
#[rustfmt::skip]
|
||||
let (stream, reach, content) = if let Some(ns) = namespace {
|
||||
match timeline_slice {
|
||||
[n, "timeline", "public"] if n == ns => (Public, Federated, All),
|
||||
[_, "timeline", "public"]
|
||||
| ["timeline", "public"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
[n, "timeline", "public", "local"] if ns == n => (Public, Local, All),
|
||||
[_, "timeline", "public", "local"]
|
||||
| ["timeline", "public", "local"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
[n, "timeline", "public", "media"] if ns == n => (Public, Federated, Media),
|
||||
[_, "timeline", "public", "media"]
|
||||
| ["timeline", "public", "media"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
[n, "timeline", "public", "local", "media"] if ns == n => (Public, Local, Media),
|
||||
[_, "timeline", "public", "local", "media"]
|
||||
| ["timeline", "public", "local", "media"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
[n, "timeline", "hashtag", tag_name] if ns == n => {
|
||||
let tag_id = *cache
|
||||
.get(&tag_name.to_string())
|
||||
.unwrap_or_else(|| log_fatal!("No cached id for `{}`", tag_name));
|
||||
(Hashtag(tag_id), Federated, All)
|
||||
}
|
||||
[_, "timeline", "hashtag", _tag]
|
||||
| ["timeline", "hashtag", _tag] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
[n, "timeline", "hashtag", _tag, "local"] if ns == n => (Hashtag(0), Local, All),
|
||||
[_, "timeline", "hashtag", _tag, "local"]
|
||||
| ["timeline", "hashtag", _tag, "local"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
[n, "timeline", id] if ns == n => (User(id.parse().unwrap()), Federated, All),
|
||||
[_, "timeline", _id]
|
||||
| ["timeline", _id] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
[n, "timeline", id, "notification"] if ns == n =>
|
||||
(User(id.parse()?), Federated, Notification),
|
||||
|
||||
[_, "timeline", _id, "notification"]
|
||||
| ["timeline", _id, "notification"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
[n, "timeline", "list", id] if ns == n => (List(id.parse()?), Federated, All),
|
||||
[_, "timeline", "list", _id]
|
||||
| ["timeline", "list", _id] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
[n, "timeline", "direct", id] if ns == n => (Direct(id.parse()?), Federated, All),
|
||||
[_, "timeline", "direct", _id]
|
||||
| ["timeline", "direct", _id] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
[..] => log_fatal!("Unexpected channel from Redis: {:?}", timeline_slice),
|
||||
}
|
||||
} else {
|
||||
match timeline_slice {
|
||||
["timeline", "public"] => (Public, Federated, All),
|
||||
[_, "timeline", "public"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
["timeline", "public", "local"] => (Public, Local, All),
|
||||
[_, "timeline", "public", "local"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
["timeline", "public", "media"] => (Public, Federated, Media),
|
||||
|
||||
[_, "timeline", "public", "media"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
["timeline", "public", "local", "media"] => (Public, Local, Media),
|
||||
[_, "timeline", "public", "local", "media"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
["timeline", "hashtag", _tag] => (Hashtag(0), Federated, All),
|
||||
[_, "timeline", "hashtag", _tag] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
["timeline", "hashtag", _tag, "local"] => (Hashtag(0), Local, All),
|
||||
[_, "timeline", "hashtag", _tag, "local"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
["timeline", id] => (User(id.parse().unwrap()), Federated, All),
|
||||
[_, "timeline", _id] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
["timeline", id, "notification"] => {
|
||||
(User(id.parse().unwrap()), Federated, Notification)
|
||||
}
|
||||
[_, "timeline", _id, "notification"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
["timeline", "list", id] => (List(id.parse().unwrap()), Federated, All),
|
||||
[_, "timeline", "list", _id] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
["timeline", "direct", id] => (Direct(id.parse().unwrap()), Federated, All),
|
||||
[_, "timeline", "direct", _id] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// Other endpoints don't exist:
|
||||
[..] => Err(TimelineErr::InvalidInput)?,
|
||||
}
|
||||
let mut id_from_tag = |tag: &str| match cache.get(&tag.to_string()) {
|
||||
Some(id) => Ok(*id),
|
||||
None => Err(TimelineErr::InvalidInput), // TODO more specific
|
||||
};
|
||||
|
||||
Ok(Timeline(stream, reach, content))
|
||||
use {Content::*, Reach::*, Stream::*};
|
||||
Ok(match &timeline.split(":").collect::<Vec<&str>>()[..] {
|
||||
["public"] => Timeline(Public, Federated, All),
|
||||
["public", "local"] => Timeline(Public, Local, All),
|
||||
["public", "media"] => Timeline(Public, Federated, Media),
|
||||
["public", "local", "media"] => Timeline(Public, Local, Media),
|
||||
["hashtag", tag] => Timeline(Hashtag(id_from_tag(tag)?), Federated, All),
|
||||
["hashtag", tag, "local"] => Timeline(Hashtag(id_from_tag(tag)?), Local, All),
|
||||
[id] => Timeline(User(id.parse().unwrap()), Federated, All),
|
||||
[id, "notification"] => Timeline(User(id.parse().unwrap()), Federated, Notification),
|
||||
["list", id] => Timeline(List(id.parse().unwrap()), Federated, All),
|
||||
["direct", id] => Timeline(Direct(id.parse().unwrap()), Federated, All),
|
||||
// Other endpoints don't exist:
|
||||
[..] => Err(TimelineErr::InvalidInput)?,
|
||||
})
|
||||
// let (stream, reach, content) = if let Some(ns) = namespace {
|
||||
// match timeline_slice {
|
||||
// [n, "timeline", "public"] if n == ns => (Public, Federated, All),
|
||||
// [_, "timeline", "public"]
|
||||
// | ["timeline", "public"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// [n, "timeline", "public", "local"] if ns == n => (Public, Local, All),
|
||||
// [_, "timeline", "public", "local"]
|
||||
// | ["timeline", "public", "local"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// [n, "timeline", "public", "media"] if ns == n => (Public, Federated, Media),
|
||||
// [_, "timeline", "public", "media"]
|
||||
// | ["timeline", "public", "media"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// [n, "timeline", "public", "local", "media"] if ns == n => (Public, Local, Media),
|
||||
// [_, "timeline", "public", "local", "media"]
|
||||
// | ["timeline", "public", "local", "media"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// [n, "timeline", "hashtag", tag_name] if ns == n => {
|
||||
// let tag_id = *cache
|
||||
// .get(&tag_name.to_string())
|
||||
// .unwrap_or_else(|| log_fatal!("No cached id for `{}`", tag_name));
|
||||
// (Hashtag(tag_id), Federated, All)
|
||||
// }
|
||||
// [_, "timeline", "hashtag", _tag]
|
||||
// | ["timeline", "hashtag", _tag] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// [n, "timeline", "hashtag", _tag, "local"] if ns == n => (Hashtag(0), Local, All),
|
||||
// [_, "timeline", "hashtag", _tag, "local"]
|
||||
// | ["timeline", "hashtag", _tag, "local"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// [n, "timeline", id] if ns == n => (User(id.parse().unwrap()), Federated, All),
|
||||
// [_, "timeline", _id]
|
||||
// | ["timeline", _id] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// [n, "timeline", id, "notification"] if ns == n =>
|
||||
// (User(id.parse()?), Federated, Notification),
|
||||
|
||||
// [_, "timeline", _id, "notification"]
|
||||
// | ["timeline", _id, "notification"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// [n, "timeline", "list", id] if ns == n => (List(id.parse()?), Federated, All),
|
||||
// [_, "timeline", "list", _id]
|
||||
// | ["timeline", "list", _id] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// [n, "timeline", "direct", id] if ns == n => (Direct(id.parse()?), Federated, All),
|
||||
// [_, "timeline", "direct", _id]
|
||||
// | ["timeline", "direct", _id] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// [..] => log_fatal!("Unexpected channel from Redis: {:?}", timeline_slice),
|
||||
// }
|
||||
// } else {
|
||||
// match timeline_slice {
|
||||
// ["timeline", "public"] => (Public, Federated, All),
|
||||
// [_, "timeline", "public"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// ["timeline", "public", "local"] => (Public, Local, All),
|
||||
// [_, "timeline", "public", "local"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// ["timeline", "public", "media"] => (Public, Federated, Media),
|
||||
|
||||
// [_, "timeline", "public", "media"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// ["timeline", "public", "local", "media"] => (Public, Local, Media),
|
||||
// [_, "timeline", "public", "local", "media"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// ["timeline", "hashtag", _tag] => (Hashtag(0), Federated, All),
|
||||
// [_, "timeline", "hashtag", _tag] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// ["timeline", "hashtag", _tag, "local"] => (Hashtag(0), Local, All),
|
||||
// [_, "timeline", "hashtag", _tag, "local"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// ["timeline", id] => (User(id.parse().unwrap()), Federated, All),
|
||||
// [_, "timeline", _id] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// ["timeline", id, "notification"] => {
|
||||
// (User(id.parse().unwrap()), Federated, Notification)
|
||||
// }
|
||||
// [_, "timeline", _id, "notification"] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// ["timeline", "list", id] => (List(id.parse().unwrap()), Federated, All),
|
||||
// [_, "timeline", "list", _id] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// ["timeline", "direct", id] => (Direct(id.parse().unwrap()), Federated, All),
|
||||
// [_, "timeline", "direct", _id] => Err(RedisNamespaceMismatch)?,
|
||||
|
||||
// // Other endpoints don't exist:
|
||||
// [..] => Err(TimelineErr::InvalidInput)?,
|
||||
// }
|
||||
// };
|
||||
}
|
||||
fn from_query_and_user(q: &Query, user: &UserData, pool: PgPool) -> Result<Self, Rejection> {
|
||||
use {warp::reject::custom, Content::*, Reach::*, Scope::*, Stream::*};
|
||||
|
|
|
@ -7,44 +7,27 @@ pub use message_queues::{MessageQueues, MsgQueue};
|
|||
|
||||
use crate::{
|
||||
config,
|
||||
err::{RedisParseErr, TimelineErr},
|
||||
messages::Event,
|
||||
parse_client_request::{Stream, Timeline},
|
||||
pubsub_cmd,
|
||||
redis_to_client_stream::redis::{
|
||||
redis_cmd,
|
||||
redis_msg::{RedisMsg, RedisParseOutput},
|
||||
RedisConn,
|
||||
},
|
||||
redis_to_client_stream::redis::RedisConn,
|
||||
};
|
||||
use futures::{Async, Poll};
|
||||
use lru::LruCache;
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
convert::TryFrom,
|
||||
io::Read,
|
||||
net, str,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use std::{collections::HashMap, time::Instant};
|
||||
use tokio::io::Error;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// The item that streams from Redis and is polled by the `ClientAgent`
|
||||
#[derive(Debug)]
|
||||
pub struct Receiver {
|
||||
pub pubsub_connection: net::TcpStream,
|
||||
secondary_redis_connection: net::TcpStream,
|
||||
redis_poll_interval: Duration,
|
||||
redis_polled_at: Instant,
|
||||
redis_connection: RedisConn,
|
||||
timeline: Timeline,
|
||||
manager_id: Uuid,
|
||||
pub msg_queues: MessageQueues,
|
||||
clients_per_timeline: HashMap<Timeline, i32>,
|
||||
cache: Cache,
|
||||
redis_input: Vec<u8>,
|
||||
redis_namespace: Option<String>,
|
||||
// TODO remove items ^^^^^ moved to RedisConn
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -60,19 +43,10 @@ impl Receiver {
|
|||
/// Create a new `Receiver`, with its own Redis connections (but, as yet, no
|
||||
/// active subscriptions).
|
||||
pub fn new(redis_cfg: config::RedisConfig) -> Self {
|
||||
let redis_namespace = redis_cfg.namespace.clone();
|
||||
|
||||
let RedisConn {
|
||||
primary: pubsub_connection,
|
||||
secondary: secondary_redis_connection,
|
||||
polling_interval: redis_poll_interval,
|
||||
} = RedisConn::new(redis_cfg);
|
||||
let redis_connection = RedisConn::new(redis_cfg);
|
||||
|
||||
Self {
|
||||
pubsub_connection,
|
||||
secondary_redis_connection,
|
||||
redis_poll_interval,
|
||||
redis_polled_at: Instant::now(),
|
||||
redis_connection,
|
||||
timeline: Timeline::empty(),
|
||||
manager_id: Uuid::default(),
|
||||
msg_queues: MessageQueues(HashMap::new()),
|
||||
|
@ -81,8 +55,6 @@ impl Receiver {
|
|||
id_to_hashtag: LruCache::new(1000),
|
||||
hashtag_to_id: LruCache::new(1000),
|
||||
}, // should these be run-time options?
|
||||
redis_input: Vec::new(),
|
||||
redis_namespace,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -96,7 +68,8 @@ impl Receiver {
|
|||
self.timeline = tl;
|
||||
if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (hashtag, tl) {
|
||||
self.cache.id_to_hashtag.put(id, hashtag.clone());
|
||||
self.cache.hashtag_to_id.put(hashtag, id);
|
||||
self.cache.hashtag_to_id.put(hashtag.clone(), id);
|
||||
self.redis_connection.update_cache(hashtag, id);
|
||||
};
|
||||
|
||||
self.msg_queues.insert(id, MsgQueue::new(tl));
|
||||
|
@ -133,9 +106,11 @@ impl Receiver {
|
|||
|
||||
// If no clients, unsubscribe from the channel
|
||||
if *count_of_subscribed_clients <= 0 {
|
||||
pubsub_cmd!("unsubscribe", self, timeline.to_redis_raw_timeline(hashtag));
|
||||
self.redis_connection
|
||||
.send_unsubscribe_cmd(&timeline.to_redis_raw_timeline(hashtag));
|
||||
} else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 {
|
||||
pubsub_cmd!("subscribe", self, timeline.to_redis_raw_timeline(hashtag));
|
||||
self.redis_connection
|
||||
.send_subscribe_cmd(&timeline.to_redis_raw_timeline(hashtag));
|
||||
}
|
||||
}
|
||||
if start_time.elapsed().as_millis() > 1 {
|
||||
|
@ -157,72 +132,18 @@ impl futures::stream::Stream for Receiver {
|
|||
/// been polled lately.
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
let (timeline, id) = (self.timeline.clone(), self.manager_id);
|
||||
|
||||
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
|
||||
let mut buffer = vec![0u8; 6000];
|
||||
if let Ok(Async::Ready(bytes_read)) = self.poll_read(&mut buffer) {
|
||||
let (cache, ns) = (&mut self.cache.hashtag_to_id, &self.redis_namespace);
|
||||
|
||||
use TimelineErr::*;
|
||||
let mut remaining_input =
|
||||
str::from_utf8(&buffer[..bytes_read]).expect("TODO: handle partial characters");
|
||||
use RedisParseOutput::*;
|
||||
loop {
|
||||
match RedisParseOutput::try_from(remaining_input) {
|
||||
Ok(Msg(msg)) => {
|
||||
let timeline =
|
||||
match Timeline::from_redis_text(msg.timeline_txt, cache, ns) {
|
||||
Ok(timeline) => timeline,
|
||||
Err(TimelineErr::RedisNamespaceMismatch) => {
|
||||
remaining_input = msg.leftover_input;
|
||||
break;
|
||||
}
|
||||
Err(TimelineErr::InvalidInput) => {
|
||||
log::error!("{:?}\n{:?}", InvalidInput, msg);
|
||||
remaining_input = msg.leftover_input;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let event: Event = serde_json::from_str(msg.event_txt).expect("TODO");
|
||||
|
||||
for msg_queue in self.msg_queues.values_mut() {
|
||||
if msg_queue.timeline == timeline {
|
||||
msg_queue.messages.push_back(event.clone());
|
||||
}
|
||||
}
|
||||
log::info!("Got a msg from Redis for {:?}", timeline);
|
||||
remaining_input = msg.leftover_input;
|
||||
continue;
|
||||
}
|
||||
Ok(NonMsg(leftover_input)) => {
|
||||
log::info!("Got a non-msg from Redis.");
|
||||
remaining_input = leftover_input;
|
||||
continue;
|
||||
}
|
||||
Err(RedisParseErr::Incomplete) => {
|
||||
log::info!(
|
||||
"Got an incomplete msg from Redis: {:?}",
|
||||
String::from_utf8_lossy(&buffer[..bytes_read])
|
||||
);
|
||||
break;
|
||||
}
|
||||
Err(other) => {
|
||||
log::error!(
|
||||
"{:?}\nRedis input: {:?}",
|
||||
other,
|
||||
String::from_utf8_lossy(&buffer[..bytes_read])
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.redis_input
|
||||
.extend_from_slice(remaining_input.as_bytes());
|
||||
|
||||
// self.redis_input.extend_from_slice(remaining_input);
|
||||
// TODO add partial chars: ^^^^
|
||||
loop {
|
||||
match self.redis_connection.poll_redis() {
|
||||
Ok(Async::Ready(Some((timeline, event)))) => self
|
||||
.msg_queues
|
||||
.values_mut()
|
||||
.filter(|msg_queue| msg_queue.timeline == timeline)
|
||||
.for_each(|msg_queue| {
|
||||
msg_queue.messages.push_back(event.clone());
|
||||
}),
|
||||
Ok(Async::NotReady) => break,
|
||||
Ok(Async::Ready(None)) => (),
|
||||
Err(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -236,39 +157,3 @@ impl futures::stream::Stream for Receiver {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for Receiver {
|
||||
fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
self.pubsub_connection.read(buffer)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for Receiver {
|
||||
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
|
||||
match self.read(buf) {
|
||||
Ok(t) => Ok(Async::Ready(t)),
|
||||
Err(_) => Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// #[must_use]
|
||||
// pub fn process_messages<'a>(
|
||||
// input: RedisUtf8<'a>,
|
||||
// cache: &mut LruCache<String, i64>,
|
||||
// namespace: &Option<String>,
|
||||
// msg_queues: &mut MessageQueues,
|
||||
// ) -> Result<RedisUtf8<'a>, RedisParseErr> {
|
||||
// let r_msg = RedisMsg::try_from(RedisUtf8::from(input))?;
|
||||
// let timeline = Timeline::from_redis_text(r_msg.timeline_txt, cache, namespace)
|
||||
// .expect("TODO and handle timelines we skip");
|
||||
// let event: Event = serde_json::from_str(r_msg.event_txt).expect("TODO");
|
||||
|
||||
// for msg_queue in msg_queues.values_mut() {
|
||||
// if msg_queue.timeline == timeline {
|
||||
// msg_queue.messages.push_back(event.clone());
|
||||
// }
|
||||
// }
|
||||
|
||||
// Ok(r_msg.leftover_input)
|
||||
// }
|
||||
|
|
|
@ -10,7 +10,7 @@ macro_rules! pubsub_cmd {
|
|||
let namespace = $self.redis_namespace.clone();
|
||||
|
||||
$self
|
||||
.pubsub_connection
|
||||
.primary
|
||||
.write_all(&redis_cmd::pubsub($cmd, $tl, namespace.clone()))
|
||||
.expect("Can send command to Redis");
|
||||
// Because we keep track of the number of clients subscribed to a channel on our end,
|
||||
|
@ -21,7 +21,7 @@ macro_rules! pubsub_cmd {
|
|||
_ => panic!("Given unacceptable PUBSUB command"),
|
||||
};
|
||||
$self
|
||||
.secondary_redis_connection
|
||||
.secondary
|
||||
.write_all(&redis_cmd::set(
|
||||
format!("subscribed:{}", $tl),
|
||||
subscription_new_number,
|
||||
|
@ -29,7 +29,7 @@ macro_rules! pubsub_cmd {
|
|||
))
|
||||
.expect("Can set Redis");
|
||||
|
||||
log::info!("Now subscribed to: {:#?}", $self.msg_queues);
|
||||
// TODO: re-enable info logging >>> log::info!("Now subscribed to: {:#?}", $self.msg_queues);
|
||||
}};
|
||||
}
|
||||
/// Send a `SUBSCRIBE` or `UNSUBSCRIBE` command to a specific timeline
|
||||
|
|
|
@ -1,12 +1,138 @@
|
|||
use super::redis_cmd;
|
||||
use super::{
|
||||
redis_cmd,
|
||||
redis_msg::{RedisMsg, RedisParseOutput},
|
||||
};
|
||||
use crate::config::RedisConfig;
|
||||
use crate::err;
|
||||
use std::{io::Read, io::Write, net, time::Duration};
|
||||
use crate::err::{self, RedisParseErr};
|
||||
use crate::messages::Event;
|
||||
use crate::parse_client_request::Timeline;
|
||||
use crate::pubsub_cmd;
|
||||
use futures::{Async, Poll};
|
||||
use lru::LruCache;
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
io::Read,
|
||||
io::Write,
|
||||
net, str,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RedisConn {
|
||||
pub primary: net::TcpStream,
|
||||
pub secondary: net::TcpStream,
|
||||
pub polling_interval: Duration,
|
||||
primary: net::TcpStream,
|
||||
secondary: net::TcpStream,
|
||||
redis_poll_interval: Duration,
|
||||
redis_polled_at: Instant,
|
||||
redis_namespace: Option<String>,
|
||||
cache: LruCache<String, i64>,
|
||||
redis_input: Vec<u8>, // TODO: Consider queue internal to RedisConn
|
||||
}
|
||||
|
||||
impl RedisConn {
|
||||
pub fn new(redis_cfg: RedisConfig) -> Self {
|
||||
let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port);
|
||||
let conn_err = |e| {
|
||||
err::die_with_msg(format!(
|
||||
"Could not connect to Redis at {}:{}.\n Error detail: {}",
|
||||
*redis_cfg.host, *redis_cfg.port, e,
|
||||
))
|
||||
};
|
||||
let update_conn = |mut conn| {
|
||||
if let Some(password) = redis_cfg.password.clone() {
|
||||
conn = send_password(conn, &password);
|
||||
}
|
||||
conn = send_test_ping(conn);
|
||||
conn.set_read_timeout(Some(Duration::from_millis(10)))
|
||||
.expect("Can set read timeout for Redis connection");
|
||||
if let Some(db) = &*redis_cfg.db {
|
||||
conn = set_db(conn, db);
|
||||
}
|
||||
conn
|
||||
};
|
||||
let (primary_conn, secondary_conn) = (
|
||||
update_conn(net::TcpStream::connect(addr.clone()).unwrap_or_else(conn_err)),
|
||||
update_conn(net::TcpStream::connect(addr).unwrap_or_else(conn_err)),
|
||||
);
|
||||
primary_conn
|
||||
.set_nonblocking(true)
|
||||
.expect("set_nonblocking call failed");
|
||||
|
||||
Self {
|
||||
primary: primary_conn,
|
||||
secondary: secondary_conn,
|
||||
cache: LruCache::new(1000),
|
||||
redis_namespace: redis_cfg.namespace.clone(),
|
||||
redis_poll_interval: *redis_cfg.polling_interval,
|
||||
redis_input: Vec::new(),
|
||||
redis_polled_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, RedisParseErr> {
|
||||
let mut buffer = vec![0u8; 6000];
|
||||
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
|
||||
if let Ok(Async::Ready(bytes_read)) = self.poll_read(&mut buffer) {
|
||||
self.redis_input.extend_from_slice(&buffer[..bytes_read]);
|
||||
}
|
||||
}
|
||||
let input = self.redis_input.clone();
|
||||
self.redis_input.clear();
|
||||
let (input, invalid_bytes) = match str::from_utf8(&input) {
|
||||
Ok(input) => (input, None),
|
||||
Err(e) => {
|
||||
let (valid, invalid) = input.split_at(e.valid_up_to());
|
||||
(str::from_utf8(valid).expect("Guaranteed ^"), Some(invalid))
|
||||
}
|
||||
};
|
||||
|
||||
let (cache, ns) = (&mut self.cache, &self.redis_namespace);
|
||||
|
||||
use {Async::*, RedisParseOutput::*};
|
||||
|
||||
let (res, leftover) = match RedisParseOutput::try_from(input) {
|
||||
Ok(Msg(msg)) => match ns {
|
||||
Some(ns) if msg.timeline_txt.starts_with(&format!("{}:timeline:", ns)) => {
|
||||
let tl = Timeline::from_redis_text(
|
||||
&msg.timeline_txt[ns.len() + ":timeline:".len()..],
|
||||
cache,
|
||||
)
|
||||
.unwrap_or_else(|_| todo!());
|
||||
let event: Event = serde_json::from_str(msg.event_txt).expect("TODO");
|
||||
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
|
||||
}
|
||||
None => {
|
||||
let tl =
|
||||
Timeline::from_redis_text(&msg.timeline_txt["timeline:".len()..], cache)
|
||||
.unwrap_or_else(|_| todo!());
|
||||
|
||||
let event: Event = serde_json::from_str(msg.event_txt).expect("TODO");
|
||||
|
||||
(Ok(Ready(Some((tl, event)))), msg.leftover_input)
|
||||
}
|
||||
Some(_non_matching_namespace) => (Ok(Ready(None)), msg.leftover_input),
|
||||
},
|
||||
Ok(NonMsg(leftover)) => (Ok(Ready(None)), leftover),
|
||||
Err(RedisParseErr::Incomplete) => (Ok(NotReady), input),
|
||||
Err(_other) => todo!(),
|
||||
};
|
||||
self.redis_input.extend_from_slice(leftover.as_bytes());
|
||||
if let Some(bytes) = invalid_bytes {
|
||||
self.redis_input.extend_from_slice(bytes);
|
||||
};
|
||||
res
|
||||
}
|
||||
|
||||
pub fn update_cache(&mut self, hashtag: String, id: i64) {
|
||||
self.cache.put(hashtag, id);
|
||||
}
|
||||
|
||||
pub fn send_unsubscribe_cmd(&mut self, timeline: &str) {
|
||||
pubsub_cmd!("unsubscribe", self, timeline);
|
||||
}
|
||||
pub fn send_subscribe_cmd(&mut self, timeline: &str) {
|
||||
pubsub_cmd!("subscribe", self, timeline);
|
||||
}
|
||||
}
|
||||
|
||||
fn send_password(mut conn: net::TcpStream, password: &str) -> net::TcpStream {
|
||||
|
@ -53,39 +179,17 @@ fn send_test_ping(mut conn: net::TcpStream) -> net::TcpStream {
|
|||
conn
|
||||
}
|
||||
|
||||
impl RedisConn {
|
||||
pub fn new(redis_cfg: RedisConfig) -> Self {
|
||||
let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port);
|
||||
let conn_err = |e| {
|
||||
err::die_with_msg(format!(
|
||||
"Could not connect to Redis at {}:{}.\n Error detail: {}",
|
||||
*redis_cfg.host, *redis_cfg.port, e,
|
||||
))
|
||||
};
|
||||
let update_conn = |mut conn| {
|
||||
if let Some(password) = redis_cfg.password.clone() {
|
||||
conn = send_password(conn, &password);
|
||||
}
|
||||
conn = send_test_ping(conn);
|
||||
conn.set_read_timeout(Some(Duration::from_millis(10)))
|
||||
.expect("Can set read timeout for Redis connection");
|
||||
if let Some(db) = &*redis_cfg.db {
|
||||
conn = set_db(conn, db);
|
||||
}
|
||||
conn
|
||||
};
|
||||
let (primary_conn, secondary_conn) = (
|
||||
update_conn(net::TcpStream::connect(addr.clone()).unwrap_or_else(conn_err)),
|
||||
update_conn(net::TcpStream::connect(addr).unwrap_or_else(conn_err)),
|
||||
);
|
||||
primary_conn
|
||||
.set_nonblocking(true)
|
||||
.expect("set_nonblocking call failed");
|
||||
impl Read for RedisConn {
|
||||
fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
self.primary.read(buffer)
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
primary: primary_conn,
|
||||
secondary: secondary_conn,
|
||||
polling_interval: *redis_cfg.polling_interval,
|
||||
impl AsyncRead for RedisConn {
|
||||
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
|
||||
match self.read(buf) {
|
||||
Ok(t) => Ok(Async::Ready(t)),
|
||||
Err(_) => Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue