Merge pull request #14 from tootsuite/sharedsocket

Share a single Redis connection
This commit is contained in:
Daniel Sockwell 2019-04-30 10:07:37 -04:00 committed by GitHub
commit ae08218c0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 167 additions and 209 deletions

View File

@ -9,7 +9,7 @@ Client Request --> Warp
Warp filters for valid requests and parses request data. Based on that data, it repeatedly polls the StreamManager
Warp --> StreamManager
The StreamManager consults a hash table to see if there is a currently open PubSub channel. If there is, it uses that channel; if not, it creates a new channel using the methods in `pubsub.rs`. Either way, it ends up with a Receiver to poll. The StreamManager polls the Receiver, providing info about which StreamManager it is that is doing the polling. The stream manager is also responsible for monitoring the hash table to see if it contains any Receivers that no longer have active clients; if it does, the StreamManager removes them from the hash table (which causes them to be dropped from memory and causes the PubSub connection to be closed).
The StreamManager consults a hash table to see if there is a currently open PubSub channel. If there is, it uses that channel; if not, it (synchronously) sends a subscribe command to Redis. The StreamManager polls the Receiver, providing info about which StreamManager it is that is doing the polling. The stream manager is also responsible for monitoring the hash table to see if it should unsubscribe from any channels and, if necessary, sending the unsubscribe command.
StreamManger --> Receiver
The Receiver receives data from Redis and stores it in a series of queues (one for each StreamManager). When polled by the StreamManager, it sends back the messages relevant to that StreamManager and removes them from the queue.
The Receiver receives data from Redis and stores it in a series of queues (one for each StreamManager). When (asynchronously) polled by the StreamManager, it sends back the messages relevant to that StreamManager and removes them from the queue.

View File

@ -1,10 +1,11 @@
mod error;
mod pubsub;
mod query;
mod receiver;
mod stream;
mod user;
mod utils;
use futures::stream::Stream;
use receiver::Receiver;
use stream::StreamManager;
use user::{Filter, Scope, User};
use warp::{path, Filter as WarpFilter};
@ -91,7 +92,7 @@ fn main() {
.and(path::end())
.map(|list: i64, user: User| (format!("list:{}", list), user.with_no_filter()));
let redis_updates = StreamManager::new();
let redis_updates = StreamManager::new(Receiver::new());
let routes = or!(
user_timeline,
user_timeline_notifications,

View File

@ -1,102 +0,0 @@
use crate::stream;
use crate::user::User;
use futures::{Async, Future, Poll};
use log::info;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{thread, time};
use tokio::net::TcpStream;
use warp::Stream;
pub static OPEN_CONNECTIONS: AtomicUsize = AtomicUsize::new(0);
pub static MAX_CONNECTIONS: AtomicUsize = AtomicUsize::new(400);
pub struct RedisCmd {
resp_cmd: String,
}
impl RedisCmd {
fn new(cmd: impl std::fmt::Display, arg: impl std::fmt::Display) -> Self {
let (cmd, arg) = (cmd.to_string(), arg.to_string());
let resp_cmd = format!(
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
cmd_length = cmd.len(),
cmd = cmd,
arg_length = arg.len(),
arg = arg
);
Self { resp_cmd }
}
pub fn subscribe_to_timeline(timeline: &str) -> String {
let channel = format!("timeline:{}", timeline);
let subscribe = RedisCmd::new("subscribe", &channel);
info!("Subscribing to {}", &channel);
subscribe.resp_cmd
}
pub fn unsubscribe_from_timeline(timeline: &str) -> String {
let channel = format!("timeline:{}", timeline);
let unsubscribe = RedisCmd::new("unsubscribe", &channel);
info!("Unsubscribing from {}", &channel);
unsubscribe.resp_cmd
}
}
use tokio::net::tcp::ConnectFuture;
struct Socket {
connect: ConnectFuture,
tx: tokio::sync::mpsc::Sender<TcpStream>,
}
impl Socket {
fn new(address: impl std::fmt::Display, tx: tokio::sync::mpsc::Sender<TcpStream>) -> Self {
let address = address
.to_string()
.parse()
.expect("Unable to parse address");
let connect = TcpStream::connect(&address);
Self { connect, tx }
}
}
impl Future for Socket {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.connect.poll() {
Ok(Async::Ready(socket)) => {
self.tx.clone().try_send(socket).expect("Socket created");
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
info!("failed to connect: {}", e);
Ok(Async::Ready(()))
}
}
}
}
pub struct PubSub {}
impl PubSub {
pub fn from(timeline: impl std::fmt::Display, user: &User) -> stream::Receiver {
while OPEN_CONNECTIONS.load(Ordering::Relaxed) > MAX_CONNECTIONS.load(Ordering::Relaxed) {
thread::sleep(time::Duration::from_millis(1000));
}
let new_connections = OPEN_CONNECTIONS.fetch_add(1, Ordering::Relaxed) + 1;
info!("{} connection(s) now open", new_connections);
let (tx, mut rx) = tokio::sync::mpsc::channel(5);
let socket = Socket::new("127.0.0.1:6379", tx);
tokio::spawn(futures::future::lazy(move || socket));
let socket = loop {
if let Ok(Async::Ready(Some(msg))) = rx.poll() {
break msg;
}
thread::sleep(time::Duration::from_millis(100));
};
let timeline = timeline.to_string();
let stream_of_data_from_redis = stream::Receiver::new(socket, timeline, user);
stream_of_data_from_redis
}
}

125
src/receiver.rs Normal file
View File

@ -0,0 +1,125 @@
use crate::user::User;
use futures::stream::Stream;
use futures::{Async, Poll};
use log::info;
use regex::Regex;
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use tokio::io::{AsyncRead, Error};
use uuid::Uuid;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::Duration;
#[derive(Debug)]
pub struct Receiver {
stream: TcpStream,
tl: String,
pub user: User,
polled_by: Uuid,
msg_queue: HashMap<Uuid, VecDeque<Value>>,
}
impl Receiver {
pub fn new() -> Self {
let stream = TcpStream::connect("127.0.0.1:6379").unwrap();
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.unwrap();
Self {
stream,
tl: String::new(),
user: User::public(),
polled_by: Uuid::new_v4(),
msg_queue: HashMap::new(),
}
}
pub fn set_polled_by(&mut self, id: Uuid) -> &Self {
self.polled_by = id;
self
}
pub fn subscribe(&mut self, tl: &str) {
let subscribe_cmd = redis_cmd_from("subscribe", &tl);
info!("Subscribing to {}", &tl);
self.stream
.write(&subscribe_cmd)
.expect("Can subscribe to Redis");
}
pub fn unsubscribe(&mut self, tl: &str) {
let unsubscribe_cmd = redis_cmd_from("unsubscribe", &tl);
info!("Subscribing to {}", &tl);
self.stream
.write(&unsubscribe_cmd)
.expect("Can unsubscribe from Redis");
}
}
impl Stream for Receiver {
type Item = Value;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
let mut buffer = vec![0u8; 3000];
let polled_by = self.polled_by;
self.msg_queue
.entry(polled_by)
.or_insert_with(VecDeque::new);
info!("Being polled by StreamManager with uuid: {}", polled_by);
let mut async_stream = AsyncReadableStream(&mut self.stream);
if let Async::Ready(num_bytes_read) = async_stream.poll_read(&mut buffer)? {
// capture everything between `{` and `}` as potential JSON
// TODO: figure out if `(?x)` is needed
let re = Regex::new(r"(?P<json>\{.*\})").expect("Valid hard-coded regex");
if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) {
let json: Value = serde_json::from_str(&cap["json"].to_string().clone())?;
for value in self.msg_queue.values_mut() {
value.push_back(json.clone());
}
}
}
if let Some(value) = self.msg_queue.entry(polled_by).or_default().pop_front() {
Ok(Async::Ready(Some(value)))
} else {
Ok(Async::NotReady)
}
}
}
impl Drop for Receiver {
fn drop(&mut self) {
let timeline = self.tl.clone();
self.unsubscribe(&timeline);
}
}
struct AsyncReadableStream<'a>(&'a mut TcpStream);
impl<'a> Read for AsyncReadableStream<'a> {
fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> {
self.0.read(buffer)
}
}
impl<'a> AsyncRead for AsyncReadableStream<'a> {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
match self.read(buf) {
Ok(t) => Ok(Async::Ready(t)),
Err(_) => Ok(Async::NotReady),
}
}
}
fn redis_cmd_from(cmd: impl std::fmt::Display, timeline: impl std::fmt::Display) -> Vec<u8> {
let (cmd, arg) = (cmd.to_string(), format!("timeline:{}", timeline));
format!(
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
cmd_length = cmd.len(),
cmd = cmd,
arg_length = arg.len(),
arg = arg
)
.as_bytes()
.to_owned()
}

View File

@ -1,31 +1,26 @@
use crate::pubsub;
use crate::pubsub::PubSub;
use crate::receiver::Receiver;
use crate::user::User;
use futures::stream::Stream;
use futures::{Async, Poll};
use log::info;
use regex::Regex;
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::Ordering;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tokio::io::{AsyncRead, AsyncWrite, Error, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio::io::Error;
use uuid::Uuid;
#[derive(Clone)]
pub struct StreamManager {
recv: Arc<Mutex<HashMap<String, Receiver>>>,
last_polled: Arc<Mutex<HashMap<String, Instant>>>,
receiver: Arc<Mutex<Receiver>>,
subscriptions: Arc<Mutex<HashMap<String, Instant>>>,
current_stream: String,
id: uuid::Uuid,
}
impl StreamManager {
pub fn new() -> Self {
pub fn new(reciever: Receiver) -> Self {
StreamManager {
recv: Arc::new(Mutex::new(HashMap::new())),
last_polled: Arc::new(Mutex::new(HashMap::new())),
receiver: Arc::new(Mutex::new(reciever)),
subscriptions: Arc::new(Mutex::new(HashMap::new())),
current_stream: String::new(),
id: Uuid::new_v4(),
}
@ -36,25 +31,32 @@ impl StreamManager {
StreamManager { id, ..self.clone() }
}
pub fn add(&mut self, timeline: &String, user: &User) -> &Self {
let mut streams = self.recv.lock().expect("No other thread panic");
streams
.entry(timeline.clone())
.or_insert_with(|| PubSub::from(&timeline, &user));
let mut last_polled = self.last_polled.lock().expect("No other thread panic");
last_polled.insert(timeline.clone(), Instant::now());
// Drop any streams that haven't been polled in the last 30 seconds
last_polled
.clone()
.iter()
.filter(|(_, time)| time.elapsed().as_secs() > 30)
.for_each(|(key, _)| {
last_polled.remove(key);
streams.remove(key);
pub fn add(&mut self, timeline: &str, _user: &User) -> &Self {
let mut subscriptions = self.subscriptions.lock().expect("No other thread panic");
let mut receiver = self.receiver.lock().unwrap();
subscriptions
.entry(timeline.to_string())
.or_insert_with(|| {
receiver.subscribe(timeline);
Instant::now()
});
self.current_stream = timeline.clone();
// Unsubscribe from that haven't been polled in the last 30 seconds
let channels = subscriptions.clone();
let channels_to_unsubscribe = channels
.iter()
.filter(|(_, time)| time.elapsed().as_secs() > 30);
for (channel, _) in channels_to_unsubscribe {
receiver.unsubscribe(&channel);
}
// Update our map of streams
*subscriptions = channels
.clone()
.into_iter()
.filter(|(_, time)| time.elapsed().as_secs() > 30)
.collect();
self.current_stream = timeline.to_string();
self
}
}
@ -63,15 +65,14 @@ impl Stream for StreamManager {
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut last_polled = self.last_polled.lock().expect("No other thread panic");
let mut subscriptions = self.subscriptions.lock().expect("No other thread panic");
let target_stream = self.current_stream.clone();
last_polled.insert(target_stream.clone(), Instant::now());
subscriptions.insert(target_stream.clone(), Instant::now());
let mut streams = self.recv.lock().expect("No other thread panic");
let shared_conn = streams.get_mut(&target_stream).expect("known key");
shared_conn.set_polled_by(self.id);
let mut receiver = self.receiver.lock().expect("No other thread panic");
receiver.set_polled_by(self.id);
match shared_conn.poll() {
match receiver.poll() {
Ok(Async::Ready(Some(value))) => Ok(Async::Ready(Some(value))),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
@ -79,70 +80,3 @@ impl Stream for StreamManager {
}
}
}
#[derive(Debug)]
pub struct Receiver {
rx: ReadHalf<TcpStream>,
tx: WriteHalf<TcpStream>,
tl: String,
pub user: User,
polled_by: Uuid,
msg_queue: HashMap<Uuid, VecDeque<Value>>,
}
impl Receiver {
pub fn new(socket: TcpStream, tl: String, user: &User) -> Self {
let (rx, mut tx) = socket.split();
tx.poll_write(pubsub::RedisCmd::subscribe_to_timeline(&tl).as_bytes())
.expect("Can subscribe to Redis");
Self {
rx,
tx,
tl,
user: user.clone(),
polled_by: Uuid::new_v4(),
msg_queue: HashMap::new(),
}
}
pub fn set_polled_by(&mut self, id: Uuid) -> &Self {
self.polled_by = id;
self
}
}
impl Stream for Receiver {
type Item = Value;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
let mut buffer = vec![0u8; 3000];
let polled_by = self.polled_by;
self.msg_queue.entry(polled_by).or_insert(VecDeque::new());
info!("Being polled by StreamManager with uuid: {}", polled_by);
if let Async::Ready(num_bytes_read) = self.rx.poll_read(&mut buffer)? {
// capture everything between `{` and `}` as potential JSON
// TODO: figure out if `(?x)` is needed
let re = Regex::new(r"(?P<json>\{.*\})").expect("Valid hard-coded regex");
if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) {
let json: Value = serde_json::from_str(&cap["json"].to_string().clone())?;
for value in self.msg_queue.values_mut() {
value.push_back(json.clone());
}
}
}
if let Some(value) = self.msg_queue.entry(polled_by).or_default().pop_front() {
Ok(Async::Ready(Some(value)))
} else {
Ok(Async::NotReady)
}
}
}
impl Drop for Receiver {
fn drop(&mut self) {
let channel = format!("timeline:{}", self.tl);
self.tx
.poll_write(pubsub::RedisCmd::unsubscribe_from_timeline(&channel).as_bytes())
.expect("Can unsubscribe from Redis");
let open_connections = pubsub::OPEN_CONNECTIONS.fetch_sub(1, Ordering::Relaxed) - 1;
info!("Receiver dropped. {} connection(s) open", open_connections);
}
}