Share a single Redis connection

This commit revises the code structure to share a single connection
to Redis (with multiple subscriptions on that connection) rather than
mutiple connections (each with one subscription).  It also simplifies the code based on that change.
This commit is contained in:
Daniel Sockwell 2019-04-30 09:44:51 -04:00
parent 9a3544acfb
commit 62db7ae0ff
4 changed files with 165 additions and 207 deletions

View File

@ -1,10 +1,11 @@
mod error;
mod pubsub;
mod query;
mod receiver;
mod stream;
mod user;
mod utils;
use futures::stream::Stream;
use receiver::Receiver;
use stream::StreamManager;
use user::{Filter, Scope, User};
use warp::{path, Filter as WarpFilter};
@ -91,7 +92,7 @@ fn main() {
.and(path::end())
.map(|list: i64, user: User| (format!("list:{}", list), user.with_no_filter()));
let redis_updates = StreamManager::new();
let redis_updates = StreamManager::new(Receiver::new());
let routes = or!(
user_timeline,
user_timeline_notifications,

View File

@ -1,102 +0,0 @@
use crate::stream;
use crate::user::User;
use futures::{Async, Future, Poll};
use log::info;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{thread, time};
use tokio::net::TcpStream;
use warp::Stream;
pub static OPEN_CONNECTIONS: AtomicUsize = AtomicUsize::new(0);
pub static MAX_CONNECTIONS: AtomicUsize = AtomicUsize::new(400);
pub struct RedisCmd {
resp_cmd: String,
}
impl RedisCmd {
fn new(cmd: impl std::fmt::Display, arg: impl std::fmt::Display) -> Self {
let (cmd, arg) = (cmd.to_string(), arg.to_string());
let resp_cmd = format!(
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
cmd_length = cmd.len(),
cmd = cmd,
arg_length = arg.len(),
arg = arg
);
Self { resp_cmd }
}
pub fn subscribe_to_timeline(timeline: &str) -> String {
let channel = format!("timeline:{}", timeline);
let subscribe = RedisCmd::new("subscribe", &channel);
info!("Subscribing to {}", &channel);
subscribe.resp_cmd
}
pub fn unsubscribe_from_timeline(timeline: &str) -> String {
let channel = format!("timeline:{}", timeline);
let unsubscribe = RedisCmd::new("unsubscribe", &channel);
info!("Unsubscribing from {}", &channel);
unsubscribe.resp_cmd
}
}
use tokio::net::tcp::ConnectFuture;
struct Socket {
connect: ConnectFuture,
tx: tokio::sync::mpsc::Sender<TcpStream>,
}
impl Socket {
fn new(address: impl std::fmt::Display, tx: tokio::sync::mpsc::Sender<TcpStream>) -> Self {
let address = address
.to_string()
.parse()
.expect("Unable to parse address");
let connect = TcpStream::connect(&address);
Self { connect, tx }
}
}
impl Future for Socket {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.connect.poll() {
Ok(Async::Ready(socket)) => {
self.tx.clone().try_send(socket).expect("Socket created");
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
info!("failed to connect: {}", e);
Ok(Async::Ready(()))
}
}
}
}
pub struct PubSub {}
impl PubSub {
pub fn from(timeline: impl std::fmt::Display, user: &User) -> stream::Receiver {
while OPEN_CONNECTIONS.load(Ordering::Relaxed) > MAX_CONNECTIONS.load(Ordering::Relaxed) {
thread::sleep(time::Duration::from_millis(1000));
}
let new_connections = OPEN_CONNECTIONS.fetch_add(1, Ordering::Relaxed) + 1;
info!("{} connection(s) now open", new_connections);
let (tx, mut rx) = tokio::sync::mpsc::channel(5);
let socket = Socket::new("127.0.0.1:6379", tx);
tokio::spawn(futures::future::lazy(move || socket));
let socket = loop {
if let Ok(Async::Ready(Some(msg))) = rx.poll() {
break msg;
}
thread::sleep(time::Duration::from_millis(100));
};
let timeline = timeline.to_string();
let stream_of_data_from_redis = stream::Receiver::new(socket, timeline, user);
stream_of_data_from_redis
}
}

125
src/receiver.rs Normal file
View File

@ -0,0 +1,125 @@
use crate::user::User;
use futures::stream::Stream;
use futures::{Async, Poll};
use log::info;
use regex::Regex;
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use tokio::io::{AsyncRead, Error};
use uuid::Uuid;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::Duration;
#[derive(Debug)]
pub struct Receiver {
stream: TcpStream,
tl: String,
pub user: User,
polled_by: Uuid,
msg_queue: HashMap<Uuid, VecDeque<Value>>,
}
impl Receiver {
pub fn new() -> Self {
let stream = TcpStream::connect("127.0.0.1:6379").unwrap();
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.unwrap();
Self {
stream,
tl: String::new(),
user: User::public(),
polled_by: Uuid::new_v4(),
msg_queue: HashMap::new(),
}
}
pub fn set_polled_by(&mut self, id: Uuid) -> &Self {
self.polled_by = id;
self
}
pub fn subscribe(&mut self, tl: &str) {
let subscribe_cmd = redis_cmd_from("subscribe", &tl);
info!("Subscribing to {}", &tl);
self.stream
.write(&subscribe_cmd)
.expect("Can subscribe to Redis");
}
pub fn unsubscribe(&mut self, tl: &str) {
let unsubscribe_cmd = redis_cmd_from("unsubscribe", &tl);
info!("Subscribing to {}", &tl);
self.stream
.write(&unsubscribe_cmd)
.expect("Can unsubscribe from Redis");
}
}
impl Stream for Receiver {
type Item = Value;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
let mut buffer = vec![0u8; 3000];
let polled_by = self.polled_by;
self.msg_queue
.entry(polled_by)
.or_insert_with(VecDeque::new);
info!("Being polled by StreamManager with uuid: {}", polled_by);
let mut async_stream = AsyncReadableStream(&mut self.stream);
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer)? {
// capture everything between `{` and `}` as potential JSON
// TODO: figure out if `(?x)` is needed
let re = Regex::new(r"(?P<json>\{.*\})").expect("Valid hard-coded regex");
if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) {
let json: Value = serde_json::from_str(&cap["json"].to_string().clone())?;
for value in self.msg_queue.values_mut() {
value.push_back(json.clone());
}
}
}
if let Some(value) = self.msg_queue.entry(polled_by).or_default().pop_front() {
Ok(Async::Ready(Some(value)))
} else {
Ok(Async::NotReady)
}
}
}
impl Drop for Receiver {
fn drop(&mut self) {
let timeline = self.tl.clone();
self.unsubscribe(&timeline);
}
}
struct AsyncReadableStream<'a>(&'a mut TcpStream);
impl<'a> Read for AsyncReadableStream<'a> {
fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> {
self.0.read(buffer)
}
}
impl<'a> AsyncRead for AsyncReadableStream<'a> {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
match self.read(buf) {
Ok(t) => Ok(Async::Ready(t)),
Err(_) => Ok(Async::NotReady),
}
}
}
fn redis_cmd_from(cmd: impl std::fmt::Display, timeline: impl std::fmt::Display) -> Vec<u8> {
let (cmd, arg) = (cmd.to_string(), format!("timeline:{}", timeline));
format!(
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
cmd_length = cmd.len(),
cmd = cmd,
arg_length = arg.len(),
arg = arg
)
.as_bytes()
.to_owned()
}

View File

@ -1,31 +1,26 @@
use crate::pubsub;
use crate::pubsub::PubSub;
use crate::receiver::Receiver;
use crate::user::User;
use futures::stream::Stream;
use futures::{Async, Poll};
use log::info;
use regex::Regex;
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::Ordering;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tokio::io::{AsyncRead, AsyncWrite, Error, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio::io::Error;
use uuid::Uuid;
#[derive(Clone)]
pub struct StreamManager {
recv: Arc<Mutex<HashMap<String, Receiver>>>,
last_polled: Arc<Mutex<HashMap<String, Instant>>>,
receiver: Arc<Mutex<Receiver>>,
subscriptions: Arc<Mutex<HashMap<String, Instant>>>,
current_stream: String,
id: uuid::Uuid,
}
impl StreamManager {
pub fn new() -> Self {
pub fn new(reciever: Receiver) -> Self {
StreamManager {
recv: Arc::new(Mutex::new(HashMap::new())),
last_polled: Arc::new(Mutex::new(HashMap::new())),
receiver: Arc::new(Mutex::new(reciever)),
subscriptions: Arc::new(Mutex::new(HashMap::new())),
current_stream: String::new(),
id: Uuid::new_v4(),
}
@ -36,25 +31,32 @@ impl StreamManager {
StreamManager { id, ..self.clone() }
}
pub fn add(&mut self, timeline: &String, user: &User) -> &Self {
let mut streams = self.recv.lock().expect("No other thread panic");
streams
.entry(timeline.clone())
.or_insert_with(|| PubSub::from(&timeline, &user));
let mut last_polled = self.last_polled.lock().expect("No other thread panic");
last_polled.insert(timeline.clone(), Instant::now());
// Drop any streams that haven't been polled in the last 30 seconds
last_polled
.clone()
.iter()
.filter(|(_, time)| time.elapsed().as_secs() > 30)
.for_each(|(key, _)| {
last_polled.remove(key);
streams.remove(key);
pub fn add(&mut self, timeline: &str, _user: &User) -> &Self {
let mut subscriptions = self.subscriptions.lock().expect("No other thread panic");
let mut receiver = self.receiver.lock().unwrap();
subscriptions
.entry(timeline.to_string())
.or_insert_with(|| {
receiver.subscribe(timeline);
Instant::now()
});
self.current_stream = timeline.clone();
// Unsubscribe from that haven't been polled in the last 30 seconds
let channels = subscriptions.clone();
let channels_to_unsubscribe = channels
.iter()
.filter(|(_, time)| time.elapsed().as_secs() > 30);
for (channel, _) in channels_to_unsubscribe {
receiver.unsubscribe(&channel);
}
// Update our map of streams
*subscriptions = channels
.clone()
.into_iter()
.filter(|(_, time)| time.elapsed().as_secs() > 30)
.collect();
self.current_stream = timeline.to_string();
self
}
}
@ -63,15 +65,14 @@ impl Stream for StreamManager {
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut last_polled = self.last_polled.lock().expect("No other thread panic");
let mut subscriptions = self.subscriptions.lock().expect("No other thread panic");
let target_stream = self.current_stream.clone();
last_polled.insert(target_stream.clone(), Instant::now());
subscriptions.insert(target_stream.clone(), Instant::now());
let mut streams = self.recv.lock().expect("No other thread panic");
let shared_conn = streams.get_mut(&target_stream).expect("known key");
shared_conn.set_polled_by(self.id);
let mut receiver = self.receiver.lock().expect("No other thread panic");
receiver.set_polled_by(self.id);
match shared_conn.poll() {
match receiver.poll() {
Ok(Async::Ready(Some(value))) => Ok(Async::Ready(Some(value))),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
@ -79,70 +80,3 @@ impl Stream for StreamManager {
}
}
}
#[derive(Debug)]
pub struct Receiver {
rx: ReadHalf<TcpStream>,
tx: WriteHalf<TcpStream>,
tl: String,
pub user: User,
polled_by: Uuid,
msg_queue: HashMap<Uuid, VecDeque<Value>>,
}
impl Receiver {
pub fn new(socket: TcpStream, tl: String, user: &User) -> Self {
let (rx, mut tx) = socket.split();
tx.poll_write(pubsub::RedisCmd::subscribe_to_timeline(&tl).as_bytes())
.expect("Can subscribe to Redis");
Self {
rx,
tx,
tl,
user: user.clone(),
polled_by: Uuid::new_v4(),
msg_queue: HashMap::new(),
}
}
pub fn set_polled_by(&mut self, id: Uuid) -> &Self {
self.polled_by = id;
self
}
}
impl Stream for Receiver {
type Item = Value;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
let mut buffer = vec![0u8; 3000];
let polled_by = self.polled_by;
self.msg_queue.entry(polled_by).or_insert(VecDeque::new());
info!("Being polled by StreamManager with uuid: {}", polled_by);
if let Async::Ready(num_bytes_read) = self.rx.poll_read(&mut buffer)? {
// capture everything between `{` and `}` as potential JSON
// TODO: figure out if `(?x)` is needed
let re = Regex::new(r"(?P<json>\{.*\})").expect("Valid hard-coded regex");
if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) {
let json: Value = serde_json::from_str(&cap["json"].to_string().clone())?;
for value in self.msg_queue.values_mut() {
value.push_back(json.clone());
}
}
}
if let Some(value) = self.msg_queue.entry(polled_by).or_default().pop_front() {
Ok(Async::Ready(Some(value)))
} else {
Ok(Async::NotReady)
}
}
}
impl Drop for Receiver {
fn drop(&mut self) {
let channel = format!("timeline:{}", self.tl);
self.tx
.poll_write(pubsub::RedisCmd::unsubscribe_from_timeline(&channel).as_bytes())
.expect("Can unsubscribe from Redis");
let open_connections = pubsub::OPEN_CONNECTIONS.fetch_sub(1, Ordering::Relaxed) - 1;
info!("Receiver dropped. {} connection(s) open", open_connections);
}
}