diff --git a/src/main.rs b/src/main.rs index eb6a819..0dead6b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,11 @@ mod error; -mod pubsub; mod query; +mod receiver; mod stream; mod user; mod utils; use futures::stream::Stream; +use receiver::Receiver; use stream::StreamManager; use user::{Filter, Scope, User}; use warp::{path, Filter as WarpFilter}; @@ -91,7 +92,7 @@ fn main() { .and(path::end()) .map(|list: i64, user: User| (format!("list:{}", list), user.with_no_filter())); - let redis_updates = StreamManager::new(); + let redis_updates = StreamManager::new(Receiver::new()); let routes = or!( user_timeline, user_timeline_notifications, diff --git a/src/pubsub.rs b/src/pubsub.rs deleted file mode 100644 index 7b4fa94..0000000 --- a/src/pubsub.rs +++ /dev/null @@ -1,102 +0,0 @@ -use crate::stream; -use crate::user::User; -use futures::{Async, Future, Poll}; -use log::info; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::{thread, time}; -use tokio::net::TcpStream; -use warp::Stream; - -pub static OPEN_CONNECTIONS: AtomicUsize = AtomicUsize::new(0); -pub static MAX_CONNECTIONS: AtomicUsize = AtomicUsize::new(400); - -pub struct RedisCmd { - resp_cmd: String, -} -impl RedisCmd { - fn new(cmd: impl std::fmt::Display, arg: impl std::fmt::Display) -> Self { - let (cmd, arg) = (cmd.to_string(), arg.to_string()); - let resp_cmd = format!( - "*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n", - cmd_length = cmd.len(), - cmd = cmd, - arg_length = arg.len(), - arg = arg - ); - Self { resp_cmd } - } - pub fn subscribe_to_timeline(timeline: &str) -> String { - let channel = format!("timeline:{}", timeline); - let subscribe = RedisCmd::new("subscribe", &channel); - info!("Subscribing to {}", &channel); - subscribe.resp_cmd - } - pub fn unsubscribe_from_timeline(timeline: &str) -> String { - let channel = format!("timeline:{}", timeline); - let unsubscribe = RedisCmd::new("unsubscribe", &channel); - info!("Unsubscribing from {}", &channel); - unsubscribe.resp_cmd - } -} - -use tokio::net::tcp::ConnectFuture; -struct Socket { - connect: ConnectFuture, - tx: tokio::sync::mpsc::Sender, -} -impl Socket { - fn new(address: impl std::fmt::Display, tx: tokio::sync::mpsc::Sender) -> Self { - let address = address - .to_string() - .parse() - .expect("Unable to parse address"); - let connect = TcpStream::connect(&address); - Self { connect, tx } - } -} -impl Future for Socket { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll { - match self.connect.poll() { - Ok(Async::Ready(socket)) => { - self.tx.clone().try_send(socket).expect("Socket created"); - Ok(Async::Ready(())) - } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => { - info!("failed to connect: {}", e); - Ok(Async::Ready(())) - } - } - } -} - -pub struct PubSub {} - -impl PubSub { - pub fn from(timeline: impl std::fmt::Display, user: &User) -> stream::Receiver { - while OPEN_CONNECTIONS.load(Ordering::Relaxed) > MAX_CONNECTIONS.load(Ordering::Relaxed) { - thread::sleep(time::Duration::from_millis(1000)); - } - let new_connections = OPEN_CONNECTIONS.fetch_add(1, Ordering::Relaxed) + 1; - info!("{} connection(s) now open", new_connections); - - let (tx, mut rx) = tokio::sync::mpsc::channel(5); - let socket = Socket::new("127.0.0.1:6379", tx); - - tokio::spawn(futures::future::lazy(move || socket)); - - let socket = loop { - if let Ok(Async::Ready(Some(msg))) = rx.poll() { - break msg; - } - thread::sleep(time::Duration::from_millis(100)); - }; - - let timeline = timeline.to_string(); - let stream_of_data_from_redis = stream::Receiver::new(socket, timeline, user); - stream_of_data_from_redis - } -} diff --git a/src/receiver.rs b/src/receiver.rs new file mode 100644 index 0000000..f3d400f --- /dev/null +++ b/src/receiver.rs @@ -0,0 +1,125 @@ +use crate::user::User; +use futures::stream::Stream; +use futures::{Async, Poll}; +use log::info; +use regex::Regex; +use serde_json::Value; +use std::collections::{HashMap, VecDeque}; +use tokio::io::{AsyncRead, Error}; +use uuid::Uuid; + +use std::io::{Read, Write}; +use std::net::TcpStream; +use std::time::Duration; + +#[derive(Debug)] +pub struct Receiver { + stream: TcpStream, + tl: String, + pub user: User, + polled_by: Uuid, + msg_queue: HashMap>, +} +impl Receiver { + pub fn new() -> Self { + let stream = TcpStream::connect("127.0.0.1:6379").unwrap(); + + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .unwrap(); + Self { + stream, + tl: String::new(), + user: User::public(), + polled_by: Uuid::new_v4(), + msg_queue: HashMap::new(), + } + } + pub fn set_polled_by(&mut self, id: Uuid) -> &Self { + self.polled_by = id; + self + } + pub fn subscribe(&mut self, tl: &str) { + let subscribe_cmd = redis_cmd_from("subscribe", &tl); + info!("Subscribing to {}", &tl); + self.stream + .write(&subscribe_cmd) + .expect("Can subscribe to Redis"); + } + pub fn unsubscribe(&mut self, tl: &str) { + let unsubscribe_cmd = redis_cmd_from("unsubscribe", &tl); + info!("Subscribing to {}", &tl); + self.stream + .write(&unsubscribe_cmd) + .expect("Can unsubscribe from Redis"); + } +} +impl Stream for Receiver { + type Item = Value; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let mut buffer = vec![0u8; 3000]; + let polled_by = self.polled_by; + self.msg_queue + .entry(polled_by) + .or_insert_with(VecDeque::new); + info!("Being polled by StreamManager with uuid: {}", polled_by); + + let mut async_stream = AsyncReadableStream(&mut self.stream); + + if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer)? { + // capture everything between `{` and `}` as potential JSON + // TODO: figure out if `(?x)` is needed + let re = Regex::new(r"(?P\{.*\})").expect("Valid hard-coded regex"); + + if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) { + let json: Value = serde_json::from_str(&cap["json"].to_string().clone())?; + for value in self.msg_queue.values_mut() { + value.push_back(json.clone()); + } + } + } + if let Some(value) = self.msg_queue.entry(polled_by).or_default().pop_front() { + Ok(Async::Ready(Some(value))) + } else { + Ok(Async::NotReady) + } + } +} +impl Drop for Receiver { + fn drop(&mut self) { + let timeline = self.tl.clone(); + self.unsubscribe(&timeline); + } +} + +struct AsyncReadableStream<'a>(&'a mut TcpStream); + +impl<'a> Read for AsyncReadableStream<'a> { + fn read(&mut self, buffer: &mut [u8]) -> Result { + self.0.read(buffer) + } +} + +impl<'a> AsyncRead for AsyncReadableStream<'a> { + fn poll_read(&mut self, buf: &mut [u8]) -> Poll { + match self.read(buf) { + Ok(t) => Ok(Async::Ready(t)), + Err(_) => Ok(Async::NotReady), + } + } +} + +fn redis_cmd_from(cmd: impl std::fmt::Display, timeline: impl std::fmt::Display) -> Vec { + let (cmd, arg) = (cmd.to_string(), format!("timeline:{}", timeline)); + format!( + "*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n", + cmd_length = cmd.len(), + cmd = cmd, + arg_length = arg.len(), + arg = arg + ) + .as_bytes() + .to_owned() +} diff --git a/src/stream.rs b/src/stream.rs index 0e03342..55c295c 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,31 +1,26 @@ -use crate::pubsub; -use crate::pubsub::PubSub; +use crate::receiver::Receiver; use crate::user::User; use futures::stream::Stream; use futures::{Async, Poll}; -use log::info; -use regex::Regex; use serde_json::Value; -use std::collections::{HashMap, VecDeque}; -use std::sync::atomic::Ordering; +use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time::Instant; -use tokio::io::{AsyncRead, AsyncWrite, Error, ReadHalf, WriteHalf}; -use tokio::net::TcpStream; +use tokio::io::Error; use uuid::Uuid; #[derive(Clone)] pub struct StreamManager { - recv: Arc>>, - last_polled: Arc>>, + receiver: Arc>, + subscriptions: Arc>>, current_stream: String, id: uuid::Uuid, } impl StreamManager { - pub fn new() -> Self { + pub fn new(reciever: Receiver) -> Self { StreamManager { - recv: Arc::new(Mutex::new(HashMap::new())), - last_polled: Arc::new(Mutex::new(HashMap::new())), + receiver: Arc::new(Mutex::new(reciever)), + subscriptions: Arc::new(Mutex::new(HashMap::new())), current_stream: String::new(), id: Uuid::new_v4(), } @@ -36,25 +31,32 @@ impl StreamManager { StreamManager { id, ..self.clone() } } - pub fn add(&mut self, timeline: &String, user: &User) -> &Self { - let mut streams = self.recv.lock().expect("No other thread panic"); - streams - .entry(timeline.clone()) - .or_insert_with(|| PubSub::from(&timeline, &user)); - let mut last_polled = self.last_polled.lock().expect("No other thread panic"); - last_polled.insert(timeline.clone(), Instant::now()); - - // Drop any streams that haven't been polled in the last 30 seconds - last_polled - .clone() - .iter() - .filter(|(_, time)| time.elapsed().as_secs() > 30) - .for_each(|(key, _)| { - last_polled.remove(key); - streams.remove(key); + pub fn add(&mut self, timeline: &str, _user: &User) -> &Self { + let mut subscriptions = self.subscriptions.lock().expect("No other thread panic"); + let mut receiver = self.receiver.lock().unwrap(); + subscriptions + .entry(timeline.to_string()) + .or_insert_with(|| { + receiver.subscribe(timeline); + Instant::now() }); - self.current_stream = timeline.clone(); + // Unsubscribe from that haven't been polled in the last 30 seconds + let channels = subscriptions.clone(); + let channels_to_unsubscribe = channels + .iter() + .filter(|(_, time)| time.elapsed().as_secs() > 30); + for (channel, _) in channels_to_unsubscribe { + receiver.unsubscribe(&channel); + } + // Update our map of streams + *subscriptions = channels + .clone() + .into_iter() + .filter(|(_, time)| time.elapsed().as_secs() > 30) + .collect(); + + self.current_stream = timeline.to_string(); self } } @@ -63,15 +65,14 @@ impl Stream for StreamManager { type Error = Error; fn poll(&mut self) -> Poll, Self::Error> { - let mut last_polled = self.last_polled.lock().expect("No other thread panic"); + let mut subscriptions = self.subscriptions.lock().expect("No other thread panic"); let target_stream = self.current_stream.clone(); - last_polled.insert(target_stream.clone(), Instant::now()); + subscriptions.insert(target_stream.clone(), Instant::now()); - let mut streams = self.recv.lock().expect("No other thread panic"); - let shared_conn = streams.get_mut(&target_stream).expect("known key"); - shared_conn.set_polled_by(self.id); + let mut receiver = self.receiver.lock().expect("No other thread panic"); + receiver.set_polled_by(self.id); - match shared_conn.poll() { + match receiver.poll() { Ok(Async::Ready(Some(value))) => Ok(Async::Ready(Some(value))), Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::NotReady) => Ok(Async::NotReady), @@ -79,70 +80,3 @@ impl Stream for StreamManager { } } } - -#[derive(Debug)] -pub struct Receiver { - rx: ReadHalf, - tx: WriteHalf, - tl: String, - pub user: User, - polled_by: Uuid, - msg_queue: HashMap>, -} -impl Receiver { - pub fn new(socket: TcpStream, tl: String, user: &User) -> Self { - let (rx, mut tx) = socket.split(); - tx.poll_write(pubsub::RedisCmd::subscribe_to_timeline(&tl).as_bytes()) - .expect("Can subscribe to Redis"); - Self { - rx, - tx, - tl, - user: user.clone(), - polled_by: Uuid::new_v4(), - msg_queue: HashMap::new(), - } - } - pub fn set_polled_by(&mut self, id: Uuid) -> &Self { - self.polled_by = id; - self - } -} -impl Stream for Receiver { - type Item = Value; - type Error = Error; - - fn poll(&mut self) -> Poll, Self::Error> { - let mut buffer = vec![0u8; 3000]; - let polled_by = self.polled_by; - self.msg_queue.entry(polled_by).or_insert(VecDeque::new()); - info!("Being polled by StreamManager with uuid: {}", polled_by); - if let Async::Ready(num_bytes_read) = self.rx.poll_read(&mut buffer)? { - // capture everything between `{` and `}` as potential JSON - // TODO: figure out if `(?x)` is needed - let re = Regex::new(r"(?P\{.*\})").expect("Valid hard-coded regex"); - - if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) { - let json: Value = serde_json::from_str(&cap["json"].to_string().clone())?; - for value in self.msg_queue.values_mut() { - value.push_back(json.clone()); - } - } - } - if let Some(value) = self.msg_queue.entry(polled_by).or_default().pop_front() { - Ok(Async::Ready(Some(value))) - } else { - Ok(Async::NotReady) - } - } -} -impl Drop for Receiver { - fn drop(&mut self) { - let channel = format!("timeline:{}", self.tl); - self.tx - .poll_write(pubsub::RedisCmd::unsubscribe_from_timeline(&channel).as_bytes()) - .expect("Can unsubscribe from Redis"); - let open_connections = pubsub::OPEN_CONNECTIONS.fetch_sub(1, Ordering::Relaxed) - 1; - info!("Receiver dropped. {} connection(s) open", open_connections); - } -}