Merge pull request #12 from tootsuite/connection-pool

Connection pool
This commit is contained in:
Daniel Sockwell 2019-04-29 08:50:47 -04:00 committed by GitHub
commit 9a3544acfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 622 additions and 455 deletions

661
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -12,12 +12,11 @@ log = "0.4.6"
actix = "0.7.9"
actix-redis = "0.5.1"
redis-async = "0.4.4"
uuid = "0.7.2"
envconfig = "0.5.0"
envconfig_derive = "0.5.0"
whoami = "0.4.1"
futures = "0.1.26"
tokio = "0.1.18"
tokio = "0.1.19"
warp = "0.1.15"
regex = "1.1.5"
serde_json = "1.0.39"
@ -25,6 +24,7 @@ serde_derive = "1.0.90"
serde = "1.0.90"
pretty_env_logger = "0.3.0"
postgres = "0.15.2"
uuid = { version = "0.7", features = ["v4"] }
[features]
default = [ "production" ]

View File

@ -1,7 +1,15 @@
# RageQuit
A blazingly fast drop-in replacement for the Mastodon streaming api server
## Usage
```shell
$ ragequit --port 4002 # Default port is 3666
```
## Notes on data flow
The current structure of the app is as follows:
Client Request --> Warp
Warp filters for valid requests and parses request data. Based on that data, it repeatedly polls the StreamManager
Warp --> StreamManager
The StreamManager consults a hash table to see if there is a currently open PubSub channel. If there is, it uses that channel; if not, it creates a new channel using the methods in `pubsub.rs`. Either way, it ends up with a Receiver to poll. The StreamManager polls the Receiver, providing info about which StreamManager it is that is doing the polling. The stream manager is also responsible for monitoring the hash table to see if it contains any Receivers that no longer have active clients; if it does, the StreamManager removes them from the hash table (which causes them to be dropped from memory and causes the PubSub connection to be closed).
StreamManger --> Receiver
The Receiver receives data from Redis and stores it in a series of queues (one for each StreamManager). When polled by the StreamManager, it sends back the messages relevant to that StreamManager and removes them from the queue.

View File

@ -1,11 +1,11 @@
mod error;
mod pubsub;
mod query;
mod stream;
mod user;
mod utils;
use futures::stream::Stream;
use pretty_env_logger;
use pubsub::stream_from;
use stream::StreamManager;
use user::{Filter, Scope, User};
use warp::{path, Filter as WarpFilter};
@ -17,21 +17,21 @@ fn main() {
.and(path::end())
.and(user::get_access_token(Scope::Private))
.and_then(|token| user::get_account(token, Scope::Private))
.map(|user: User| stream_from(user.id.to_string(), user));
.map(|user: User| (user.id.to_string(), user));
// GET /api/v1/streaming/user/notification [private; notification filter]
let user_timeline_notifications = path!("api" / "v1" / "streaming" / "user" / "notification")
.and(path::end())
.and(user::get_access_token(Scope::Private))
.and_then(|token| user::get_account(token, Scope::Private))
.map(|user: User| stream_from(user.id.to_string(), user.with_notification_filter()));
.map(|user: User| (user.id.to_string(), user.with_notification_filter()));
// GET /api/v1/streaming/public [public; language filter]
let public_timeline = path!("api" / "v1" / "streaming" / "public")
.and(path::end())
.and(user::get_access_token(user::Scope::Public))
.and_then(|token| user::get_account(token, Scope::Public))
.map(|user: User| stream_from("public".into(), user.with_language_filter()));
.map(|user: User| ("public".to_owned(), user.with_language_filter()));
// GET /api/v1/streaming/public?only_media=true [public; language filter]
let public_timeline_media = path!("api" / "v1" / "streaming" / "public")
@ -40,8 +40,8 @@ fn main() {
.and_then(|token| user::get_account(token, Scope::Public))
.and(warp::query())
.map(|user: User, q: query::Media| match q.only_media.as_ref() {
"1" | "true" => stream_from("public:media".into(), user.with_language_filter()),
_ => stream_from("public".into(), user.with_language_filter()),
"1" | "true" => ("public:media".to_owned(), user.with_language_filter()),
_ => ("public".to_owned(), user.with_language_filter()),
});
// GET /api/v1/streaming/public/local [public; language filter]
@ -49,7 +49,7 @@ fn main() {
.and(path::end())
.and(user::get_access_token(user::Scope::Public))
.and_then(|token| user::get_account(token, Scope::Public))
.map(|user: User| stream_from("public:local".into(), user.with_language_filter()));
.map(|user: User| ("public:local".to_owned(), user.with_language_filter()));
// GET /api/v1/streaming/public/local?only_media=true [public; language filter]
let local_timeline_media = path!("api" / "v1" / "streaming" / "public" / "local")
@ -58,8 +58,8 @@ fn main() {
.and(warp::query())
.and(path::end())
.map(|user: User, q: query::Media| match q.only_media.as_ref() {
"1" | "true" => stream_from("public:local:media".into(), user.with_language_filter()),
_ => stream_from("public:local".into(), user.with_language_filter()),
"1" | "true" => ("public:local:media".to_owned(), user.with_language_filter()),
_ => ("public:local".to_owned(), user.with_language_filter()),
});
// GET /api/v1/streaming/direct [private; *no* filter]
@ -67,30 +67,31 @@ fn main() {
.and(path::end())
.and(user::get_access_token(Scope::Private))
.and_then(|token| user::get_account(token, Scope::Private))
.map(|user: User| stream_from(format!("direct:{}", user.id), user.with_no_filter()));
.map(|user: User| (format!("direct:{}", user.id), user.with_no_filter()));
// GET /api/v1/streaming/hashtag?tag=:hashtag [public; no filter]
let hashtag_timeline = path!("api" / "v1" / "streaming" / "hashtag")
.and(warp::query())
.and(path::end())
.map(|q: query::Hashtag| stream_from(format!("hashtag:{}", q.tag), User::public()));
.map(|q: query::Hashtag| (format!("hashtag:{}", q.tag), User::public()));
// GET /api/v1/streaming/hashtag/local?tag=:hashtag [public; no filter]
let hashtag_timeline_local = path!("api" / "v1" / "streaming" / "hashtag" / "local")
.and(warp::query())
.and(path::end())
.map(|q: query::Hashtag| stream_from(format!("hashtag:{}:local", q.tag), User::public()));
.map(|q: query::Hashtag| (format!("hashtag:{}:local", q.tag), User::public()));
// GET /api/v1/streaming/list?list=:list_id [private; no filter]
let list_timeline = path!("api" / "v1" / "streaming" / "list")
.and(user::get_access_token(Scope::Private))
.and_then(|token| user::get_account(token, Scope::Private))
.and(warp::query())
.and_then(|user: User, q: query::List| user.is_authorized_for_list(q.list))
.and_then(|user: User, q: query::List| (user.is_authorized_for_list(q.list), Ok(user)))
.untuple_one()
.and(path::end())
.map(|list: i64, user: User| stream_from(format!("list:{}", list), user.with_no_filter()));
.map(|list: i64, user: User| (format!("list:{}", list), user.with_no_filter()));
let redis_updates = StreamManager::new();
let routes = or!(
user_timeline,
user_timeline_notifications,
@ -103,27 +104,29 @@ fn main() {
hashtag_timeline_local,
list_timeline
)
.and_then(|event_stream| event_stream)
.untuple_one()
.and(warp::sse())
.map(|event_stream: pubsub::Receiver, sse: warp::sse::Sse| {
let user = event_stream.user.clone();
sse.reply(warp::sse::keep(
event_stream.filter_map(move |item| {
let payload = item["payload"].clone();
let event = item["event"].to_string().clone();
let toot_lang = item["language"].to_string().clone();
.and(warp::any().map(move || redis_updates.new_copy()))
.map(
|timeline: String, user: User, sse: warp::sse::Sse, mut event_stream: StreamManager| {
event_stream.add(&timeline, &user);
sse.reply(warp::sse::keep(
event_stream.filter_map(move |item| {
let payload = item["payload"].clone();
let event = item["event"].clone().to_string();
let toot_lang = payload["language"].as_str().expect("redis str").to_string();
let user_langs = user.langs.clone();
println!("ding");
match &user.filter {
Filter::Notification if event != "notification" => None,
Filter::Language if !user.langs.contains(&toot_lang) => None,
_ => Some((warp::sse::event(event), warp::sse::data(payload))),
}
}),
None,
))
})
match (&user.filter, user_langs) {
(Filter::Notification, _) if event != "notification" => None,
(Filter::Language, Some(ref langs)) if !langs.contains(&toot_lang) => None,
_ => Some((warp::sse::event(event), warp::sse::data(payload))),
}
}),
None,
))
},
)
.with(warp::reply::with::header("Connection", "keep-alive"))
.recover(error::handle_errors);

View File

@ -1,85 +1,102 @@
use crate::stream;
use crate::user::User;
use futures::{Async, Future, Poll};
use log::{debug, info};
use regex::Regex;
use serde_json::Value;
use tokio::io::{AsyncRead, AsyncWrite, Error, ReadHalf, WriteHalf};
use log::info;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{thread, time};
use tokio::net::TcpStream;
use warp::Stream;
pub struct Receiver {
rx: ReadHalf<TcpStream>,
tx: WriteHalf<TcpStream>,
timeline: String,
pub user: User,
pub static OPEN_CONNECTIONS: AtomicUsize = AtomicUsize::new(0);
pub static MAX_CONNECTIONS: AtomicUsize = AtomicUsize::new(400);
pub struct RedisCmd {
resp_cmd: String,
}
impl Receiver {
fn new(socket: TcpStream, timeline: String, user: User) -> Self {
let (rx, mut tx) = socket.split();
impl RedisCmd {
fn new(cmd: impl std::fmt::Display, arg: impl std::fmt::Display) -> Self {
let (cmd, arg) = (cmd.to_string(), arg.to_string());
let resp_cmd = format!(
"*2\r\n${cmd_length}\r\n{cmd}\r\n${arg_length}\r\n{arg}\r\n",
cmd_length = cmd.len(),
cmd = cmd,
arg_length = arg.len(),
arg = arg
);
Self { resp_cmd }
}
pub fn subscribe_to_timeline(timeline: &str) -> String {
let channel = format!("timeline:{}", timeline);
let subscribe = RedisCmd::new("subscribe", &channel);
info!("Subscribing to {}", &channel);
let subscribe_cmd = format!(
"*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n",
channel.len(),
channel
);
let buffer = subscribe_cmd.as_bytes();
tx.poll_write(&buffer).unwrap();
Self {
rx,
tx,
timeline,
user,
}
subscribe.resp_cmd
}
pub fn unsubscribe_from_timeline(timeline: &str) -> String {
let channel = format!("timeline:{}", timeline);
let unsubscribe = RedisCmd::new("unsubscribe", &channel);
info!("Unsubscribing from {}", &channel);
unsubscribe.resp_cmd
}
}
impl Stream for Receiver {
type Item = Value;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
let mut buffer = vec![0u8; 3000];
if let Async::Ready(num_bytes_read) = self.rx.poll_read(&mut buffer)? {
let re = Regex::new(r"(?x)(?P<json>\{.*\})").unwrap();
use tokio::net::tcp::ConnectFuture;
struct Socket {
connect: ConnectFuture,
tx: tokio::sync::mpsc::Sender<TcpStream>,
}
impl Socket {
fn new(address: impl std::fmt::Display, tx: tokio::sync::mpsc::Sender<TcpStream>) -> Self {
let address = address
.to_string()
.parse()
.expect("Unable to parse address");
let connect = TcpStream::connect(&address);
Self { connect, tx }
}
}
impl Future for Socket {
type Item = ();
type Error = ();
if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) {
debug!("{}", &cap["json"]);
let json_string = cap["json"].to_string();
let json: Value = serde_json::from_str(&json_string.clone())?;
return Ok(Async::Ready(Some(json)));
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.connect.poll() {
Ok(Async::Ready(socket)) => {
self.tx.clone().try_send(socket).expect("Socket created");
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
info!("failed to connect: {}", e);
Ok(Async::Ready(()))
}
return Ok(Async::NotReady);
}
Ok(Async::NotReady)
}
}
impl Drop for Receiver {
fn drop(&mut self) {
let channel = format!("timeline:{}", self.timeline);
let unsubscribe_cmd = format!(
"*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n",
channel.len(),
channel
);
self.tx.poll_write(unsubscribe_cmd.as_bytes()).unwrap();
println!("Receiver got dropped!");
}
}
fn get_socket() -> impl Future<Item = TcpStream, Error = Box<Error>> {
let address = "127.0.0.1:6379".parse().expect("Unable to parse address");
let connection = TcpStream::connect(&address);
connection.and_then(Ok).map_err(Box::new)
}
pub struct PubSub {}
pub fn stream_from(
timeline: String,
user: User,
) -> impl Future<Item = Receiver, Error = warp::reject::Rejection> {
get_socket()
.and_then(move |socket| {
let stream_of_data_from_redis = Receiver::new(socket, timeline, user);
Ok(stream_of_data_from_redis)
})
.map_err(warp::reject::custom)
impl PubSub {
pub fn from(timeline: impl std::fmt::Display, user: &User) -> stream::Receiver {
while OPEN_CONNECTIONS.load(Ordering::Relaxed) > MAX_CONNECTIONS.load(Ordering::Relaxed) {
thread::sleep(time::Duration::from_millis(1000));
}
let new_connections = OPEN_CONNECTIONS.fetch_add(1, Ordering::Relaxed) + 1;
info!("{} connection(s) now open", new_connections);
let (tx, mut rx) = tokio::sync::mpsc::channel(5);
let socket = Socket::new("127.0.0.1:6379", tx);
tokio::spawn(futures::future::lazy(move || socket));
let socket = loop {
if let Ok(Async::Ready(Some(msg))) = rx.poll() {
break msg;
}
thread::sleep(time::Duration::from_millis(100));
};
let timeline = timeline.to_string();
let stream_of_data_from_redis = stream::Receiver::new(socket, timeline, user);
stream_of_data_from_redis
}
}

View File

@ -1,18 +1,18 @@
use serde_derive::Deserialize;
#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
pub struct Media {
pub only_media: String,
}
#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
pub struct Hashtag {
pub tag: String,
}
#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
pub struct List {
pub list: i64,
}
#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
pub struct Auth {
pub access_token: String,
}

148
src/stream.rs Normal file
View File

@ -0,0 +1,148 @@
use crate::pubsub;
use crate::pubsub::PubSub;
use crate::user::User;
use futures::stream::Stream;
use futures::{Async, Poll};
use log::info;
use regex::Regex;
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tokio::io::{AsyncRead, AsyncWrite, Error, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use uuid::Uuid;
#[derive(Clone)]
pub struct StreamManager {
recv: Arc<Mutex<HashMap<String, Receiver>>>,
last_polled: Arc<Mutex<HashMap<String, Instant>>>,
current_stream: String,
id: uuid::Uuid,
}
impl StreamManager {
pub fn new() -> Self {
StreamManager {
recv: Arc::new(Mutex::new(HashMap::new())),
last_polled: Arc::new(Mutex::new(HashMap::new())),
current_stream: String::new(),
id: Uuid::new_v4(),
}
}
pub fn new_copy(&self) -> Self {
let id = Uuid::new_v4();
StreamManager { id, ..self.clone() }
}
pub fn add(&mut self, timeline: &String, user: &User) -> &Self {
let mut streams = self.recv.lock().expect("No other thread panic");
streams
.entry(timeline.clone())
.or_insert_with(|| PubSub::from(&timeline, &user));
let mut last_polled = self.last_polled.lock().expect("No other thread panic");
last_polled.insert(timeline.clone(), Instant::now());
// Drop any streams that haven't been polled in the last 30 seconds
last_polled
.clone()
.iter()
.filter(|(_, time)| time.elapsed().as_secs() > 30)
.for_each(|(key, _)| {
last_polled.remove(key);
streams.remove(key);
});
self.current_stream = timeline.clone();
self
}
}
impl Stream for StreamManager {
type Item = Value;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut last_polled = self.last_polled.lock().expect("No other thread panic");
let target_stream = self.current_stream.clone();
last_polled.insert(target_stream.clone(), Instant::now());
let mut streams = self.recv.lock().expect("No other thread panic");
let shared_conn = streams.get_mut(&target_stream).expect("known key");
shared_conn.set_polled_by(self.id);
match shared_conn.poll() {
Ok(Async::Ready(Some(value))) => Ok(Async::Ready(Some(value))),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
}
}
#[derive(Debug)]
pub struct Receiver {
rx: ReadHalf<TcpStream>,
tx: WriteHalf<TcpStream>,
tl: String,
pub user: User,
polled_by: Uuid,
msg_queue: HashMap<Uuid, VecDeque<Value>>,
}
impl Receiver {
pub fn new(socket: TcpStream, tl: String, user: &User) -> Self {
let (rx, mut tx) = socket.split();
tx.poll_write(pubsub::RedisCmd::subscribe_to_timeline(&tl).as_bytes())
.expect("Can subscribe to Redis");
Self {
rx,
tx,
tl,
user: user.clone(),
polled_by: Uuid::new_v4(),
msg_queue: HashMap::new(),
}
}
pub fn set_polled_by(&mut self, id: Uuid) -> &Self {
self.polled_by = id;
self
}
}
impl Stream for Receiver {
type Item = Value;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Value>, Self::Error> {
let mut buffer = vec![0u8; 3000];
let polled_by = self.polled_by;
self.msg_queue.entry(polled_by).or_insert(VecDeque::new());
info!("Being polled by StreamManager with uuid: {}", polled_by);
if let Async::Ready(num_bytes_read) = self.rx.poll_read(&mut buffer)? {
// capture everything between `{` and `}` as potential JSON
// TODO: figure out if `(?x)` is needed
let re = Regex::new(r"(?P<json>\{.*\})").expect("Valid hard-coded regex");
if let Some(cap) = re.captures(&String::from_utf8_lossy(&buffer[..num_bytes_read])) {
let json: Value = serde_json::from_str(&cap["json"].to_string().clone())?;
for value in self.msg_queue.values_mut() {
value.push_back(json.clone());
}
}
}
if let Some(value) = self.msg_queue.entry(polled_by).or_default().pop_front() {
Ok(Async::Ready(Some(value)))
} else {
Ok(Async::NotReady)
}
}
}
impl Drop for Receiver {
fn drop(&mut self) {
let channel = format!("timeline:{}", self.tl);
self.tx
.poll_write(pubsub::RedisCmd::unsubscribe_from_timeline(&channel).as_bytes())
.expect("Can unsubscribe from Redis");
let open_connections = pubsub::OPEN_CONNECTIONS.fetch_sub(1, Ordering::Relaxed) - 1;
info!("Receiver dropped. {} connection(s) open", open_connections);
}
}

View File

@ -23,17 +23,17 @@ fn conn() -> postgres::Connection {
)
.unwrap()
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum Filter {
None,
Language,
Notification,
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct User {
pub id: i64,
pub langs: Vec<String>,
pub langs: Option<Vec<String>>,
pub logged_in: bool,
pub filter: Filter,
}
@ -56,7 +56,7 @@ impl User {
..self
}
}
pub fn is_authorized_for_list(self, list: i64) -> Result<(i64, User), warp::reject::Rejection> {
pub fn is_authorized_for_list(&self, list: i64) -> Result<i64, warp::reject::Rejection> {
let conn = conn();
// For the Postgres query, `id` = list number; `account_id` = user.id
let rows = &conn
@ -68,7 +68,7 @@ impl User {
if !rows.is_empty() {
let id_of_account_that_owns_the_list: i64 = rows.get(0).get(1);
if id_of_account_that_owns_the_list == self.id {
return Ok((list, self));
return Ok(list);
}
};
@ -77,7 +77,7 @@ impl User {
pub fn public() -> Self {
User {
id: -1,
langs: Vec::new(),
langs: None,
logged_in: false,
filter: Filter::None,
}
@ -107,17 +107,19 @@ LIMIT 1",
if !result.is_empty() {
let only_row = result.get(0);
let id: i64 = only_row.get(1);
let langs: Vec<String> = only_row.get(2);
let langs: Option<Vec<String>> = only_row.get(2);
println!("Granting logged-in access");
Ok(User {
id: id,
id,
langs,
logged_in: true,
filter: Filter::None,
})
} else if let Scope::Public = scope {
println!("Granting public access");
Ok(User {
id: -1,
langs: Vec::new(),
langs: None,
logged_in: false,
filter: Filter::None,
})