mirror of https://github.com/mastodon/flodgatt
Finish substantive work for Redis error handling
This commit is contained in:
parent
7fc19c33b3
commit
d5528aaf0c
|
@ -1,4 +1,4 @@
|
|||
//use std::{error::Error, fmt};
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum TimelineErr {
|
||||
|
@ -11,3 +11,14 @@ impl From<std::num::ParseIntError> for TimelineErr {
|
|||
Self::InvalidInput
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for TimelineErr {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
use TimelineErr::*;
|
||||
let msg = match self {
|
||||
RedisNamespaceMismatch => "TODO: Cut this error",
|
||||
InvalidInput => "The timeline text from Redis could not be parsed into a supported timeline. TODO: add incoming timeline text"
|
||||
};
|
||||
write!(f, "{}", msg)
|
||||
}
|
||||
}
|
||||
|
|
89
src/main.rs
89
src/main.rs
|
@ -1,11 +1,11 @@
|
|||
use flodgatt::{
|
||||
config::{DeploymentConfig, EnvVar, PostgresConfig, RedisConfig},
|
||||
parse_client_request::{PgPool, Subscription},
|
||||
redis_to_client_stream::{ClientAgent, EventStream},
|
||||
redis_to_client_stream::{ClientAgent, EventStream, Receiver},
|
||||
};
|
||||
use std::{collections::HashMap, env, fs, net, os::unix::fs::PermissionsExt};
|
||||
use std::{env, fs, net::SocketAddr, os::unix::fs::PermissionsExt};
|
||||
use tokio::net::UnixListener;
|
||||
use warp::{path, ws::Ws2, Filter};
|
||||
use warp::{http::StatusCode, path, ws::Ws2, Filter, Rejection};
|
||||
|
||||
fn main() {
|
||||
dotenv::from_filename(match env::var("ENV").ok().as_ref().map(String::as_str) {
|
||||
|
@ -14,36 +14,35 @@ fn main() {
|
|||
Some(unsupported) => EnvVar::err("ENV", unsupported, "`production` or `development`"),
|
||||
})
|
||||
.ok();
|
||||
let env_vars_map: HashMap<_, _> = dotenv::vars().collect();
|
||||
let env_vars = EnvVar::new(env_vars_map);
|
||||
let env_vars = EnvVar::new(dotenv::vars().collect());
|
||||
pretty_env_logger::init();
|
||||
log::info!("Environmental variables Flodgatt received: {}", &env_vars);
|
||||
|
||||
log::info!(
|
||||
"Flodgatt recognized the following environmental variables:{}",
|
||||
env_vars.clone()
|
||||
);
|
||||
let postgres_cfg = PostgresConfig::from_env(env_vars.clone());
|
||||
let redis_cfg = RedisConfig::from_env(env_vars.clone());
|
||||
let cfg = DeploymentConfig::from_env(env_vars.clone());
|
||||
|
||||
let postgres_cfg = PostgresConfig::from_env(env_vars.clone());
|
||||
let pg_pool = PgPool::new(postgres_cfg);
|
||||
|
||||
let client_agent_sse = ClientAgent::blank(redis_cfg);
|
||||
let client_agent_ws = client_agent_sse.clone_with_shared_receiver();
|
||||
|
||||
let sharable_receiver = Receiver::try_from(redis_cfg)
|
||||
.unwrap_or_else(|e| {
|
||||
log::error!("{}\nFlodgatt shutting down...", e);
|
||||
std::process::exit(1);
|
||||
})
|
||||
.into_arc();
|
||||
log::info!("Streaming server initialized and ready to accept connections");
|
||||
|
||||
// Server Sent Events
|
||||
let sse_receiver = sharable_receiver.clone();
|
||||
let (sse_interval, whitelist_mode) = (*cfg.sse_interval, *cfg.whitelist_mode);
|
||||
let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode)
|
||||
.and(warp::sse())
|
||||
.map(
|
||||
move |subscription: Subscription, sse_connection_to_client: warp::sse::Sse| {
|
||||
log::info!("Incoming SSE request for {:?}", subscription.timeline);
|
||||
// Create a new ClientAgent
|
||||
let mut client_agent = client_agent_sse.clone_with_shared_receiver();
|
||||
// Assign ClientAgent to generate stream of updates for the user/timeline pair
|
||||
client_agent.init_for_user(subscription);
|
||||
let mut client_agent = ClientAgent::new(sse_receiver.clone(), &subscription);
|
||||
client_agent.subscribe();
|
||||
|
||||
// send the updates through the SSE connection
|
||||
EventStream::to_sse(client_agent, sse_connection_to_client, sse_interval)
|
||||
},
|
||||
|
@ -51,24 +50,20 @@ fn main() {
|
|||
.with(warp::reply::with::header("Connection", "keep-alive"));
|
||||
|
||||
// WebSocket
|
||||
let ws_receiver = sharable_receiver.clone();
|
||||
let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode);
|
||||
let websocket_routes = Subscription::from_ws_request(pg_pool.clone(), whitelist_mode)
|
||||
let ws_routes = Subscription::from_ws_request(pg_pool.clone(), whitelist_mode)
|
||||
.and(warp::ws::ws2())
|
||||
.map(move |subscription: Subscription, ws: Ws2| {
|
||||
log::info!("Incoming websocket request for {:?}", subscription.timeline);
|
||||
let mut client_agent = ClientAgent::new(ws_receiver.clone(), &subscription);
|
||||
client_agent.subscribe();
|
||||
|
||||
let token = subscription.access_token.clone();
|
||||
// Create a new ClientAgent
|
||||
let mut client_agent = client_agent_ws.clone_with_shared_receiver();
|
||||
// Assign that agent to generate a stream of updates for the user/timeline pair
|
||||
client_agent.init_for_user(subscription);
|
||||
// send the updates through the WS connection (along with the User's access_token
|
||||
// which is sent for security)
|
||||
// send the updates through the WS connection
|
||||
// (along with the User's access_token which is sent for security)
|
||||
(
|
||||
ws.on_upgrade(move |socket| {
|
||||
EventStream::to_ws(socket, client_agent, ws_update_interval)
|
||||
}),
|
||||
token.unwrap_or_else(String::new),
|
||||
ws.on_upgrade(move |s| EventStream::to_ws(s, client_agent, ws_update_interval)),
|
||||
subscription.access_token.unwrap_or_else(String::new),
|
||||
)
|
||||
})
|
||||
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
|
||||
|
@ -84,33 +79,23 @@ fn main() {
|
|||
log::info!("Using Unix socket {}", socket);
|
||||
fs::remove_file(socket).unwrap_or_default();
|
||||
let incoming = UnixListener::bind(socket).unwrap().incoming();
|
||||
|
||||
fs::set_permissions(socket, PermissionsExt::from_mode(0o666)).unwrap();
|
||||
|
||||
warp::serve(
|
||||
health.or(websocket_routes.or(sse_routes).with(cors).recover(
|
||||
|rejection: warp::reject::Rejection| {
|
||||
let err_txt = match rejection.cause() {
|
||||
Some(text)
|
||||
if text.to_string() == "Missing request header 'authorization'" =>
|
||||
{
|
||||
"Error: Missing access token".to_string()
|
||||
}
|
||||
Some(text) => text.to_string(),
|
||||
None => "Error: Nonexistant endpoint".to_string(),
|
||||
};
|
||||
let json = warp::reply::json(&err_txt);
|
||||
|
||||
Ok(warp::reply::with_status(
|
||||
json,
|
||||
warp::http::StatusCode::UNAUTHORIZED,
|
||||
))
|
||||
},
|
||||
)),
|
||||
health.or(ws_routes.or(sse_routes).with(cors).recover(|r: Rejection| {
|
||||
let json_err = match r.cause() {
|
||||
Some(text) if text.to_string() == "Missing request header 'authorization'" => {
|
||||
warp::reply::json(&"Error: Missing access token".to_string())
|
||||
}
|
||||
Some(text) => warp::reply::json(&text.to_string()),
|
||||
None => warp::reply::json(&"Error: Nonexistant endpoint".to_string()),
|
||||
};
|
||||
Ok(warp::reply::with_status(json_err, StatusCode::UNAUTHORIZED))
|
||||
})),
|
||||
)
|
||||
.run_incoming(incoming);
|
||||
} else {
|
||||
let server_addr = net::SocketAddr::new(*cfg.address, cfg.port.0);
|
||||
warp::serve(health.or(websocket_routes.or(sse_routes).with(cors))).run(server_addr);
|
||||
}
|
||||
let server_addr = SocketAddr::new(*cfg.address, *cfg.port);
|
||||
warp::serve(health.or(ws_routes.or(sse_routes).with(cors))).run(server_addr);
|
||||
};
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ use crate::err::TimelineErr;
|
|||
use crate::log_fatal;
|
||||
use lru::LruCache;
|
||||
use std::collections::HashSet;
|
||||
use uuid::Uuid;
|
||||
use warp::reject::Rejection;
|
||||
|
||||
use super::query;
|
||||
|
@ -50,6 +51,7 @@ macro_rules! parse_sse_query {
|
|||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct Subscription {
|
||||
pub id: Uuid,
|
||||
pub timeline: Timeline,
|
||||
pub allowed_langs: HashSet<String>,
|
||||
pub blocks: Blocks,
|
||||
|
@ -60,6 +62,7 @@ pub struct Subscription {
|
|||
impl Default for Subscription {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
id: Uuid::new_v4(),
|
||||
timeline: Timeline(Stream::Unset, Reach::Local, Content::Notification),
|
||||
allowed_langs: HashSet::new(),
|
||||
blocks: Blocks::default(),
|
||||
|
@ -123,6 +126,7 @@ impl Subscription {
|
|||
};
|
||||
|
||||
Ok(Subscription {
|
||||
id: Uuid::new_v4(),
|
||||
timeline,
|
||||
allowed_langs: user.allowed_langs,
|
||||
blocks: Blocks {
|
||||
|
|
|
@ -15,9 +15,8 @@
|
|||
//! Because `StreamManagers` are lightweight data structures that do not directly
|
||||
//! communicate with Redis, it we create a new `ClientAgent` for
|
||||
//! each new client connection (each in its own thread).use super::{message::Message, receiver::Receiver}
|
||||
use super::{receiver::Receiver, redis::RedisConnErr};
|
||||
use super::receiver::{Receiver, ReceiverErr};
|
||||
use crate::{
|
||||
config,
|
||||
messages::Event,
|
||||
parse_client_request::{Stream::Public, Subscription, Timeline},
|
||||
};
|
||||
|
@ -25,33 +24,20 @@ use futures::{
|
|||
Async::{self, NotReady, Ready},
|
||||
Poll,
|
||||
};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use uuid::Uuid;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
/// Struct for managing all Redis streams.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ClientAgent {
|
||||
receiver: Arc<Mutex<Receiver>>,
|
||||
id: Uuid,
|
||||
pub subscription: Subscription,
|
||||
}
|
||||
|
||||
impl ClientAgent {
|
||||
/// Create a new `ClientAgent` with no shared data.
|
||||
pub fn blank(redis_cfg: config::RedisConfig) -> Self {
|
||||
pub fn new(receiver: Arc<Mutex<Receiver>>, subscription: &Subscription) -> Self {
|
||||
ClientAgent {
|
||||
receiver: Arc::new(Mutex::new(Receiver::new(redis_cfg))),
|
||||
id: Uuid::default(),
|
||||
subscription: Subscription::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Clones the `ClientAgent`, sharing the `Receiver`.
|
||||
pub fn clone_with_shared_receiver(&self) -> Self {
|
||||
Self {
|
||||
receiver: self.receiver.clone(),
|
||||
id: self.id,
|
||||
subscription: self.subscription.clone(),
|
||||
receiver,
|
||||
subscription: subscription.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,25 +49,32 @@ impl ClientAgent {
|
|||
/// a different user, the `Receiver` is responsible for figuring
|
||||
/// that out and avoiding duplicated connections. Thus, it is safe to
|
||||
/// use this method for each new client connection.
|
||||
pub fn init_for_user(&mut self, subscription: Subscription) {
|
||||
use std::time::Instant;
|
||||
self.id = Uuid::new_v4();
|
||||
self.subscription = subscription;
|
||||
let start_time = Instant::now();
|
||||
let mut receiver = self.receiver.lock().expect("No thread panic (stream.rs)");
|
||||
receiver.manage_new_timeline(
|
||||
self.id,
|
||||
self.subscription.timeline,
|
||||
self.subscription.hashtag_name.clone(),
|
||||
);
|
||||
log::info!("init_for_user had lock for: {:?}", start_time.elapsed());
|
||||
pub fn subscribe(&mut self) {
|
||||
let mut receiver = self.lock_receiver();
|
||||
receiver
|
||||
.add_subscription(&self.subscription)
|
||||
.unwrap_or_else(|e| log::error!("Could not subscribe to the Redis channel: {}", e))
|
||||
}
|
||||
|
||||
fn lock_receiver(&self) -> MutexGuard<Receiver> {
|
||||
match self.receiver.lock() {
|
||||
Ok(inner) => inner,
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
"Another thread crashed: {}\n
|
||||
Attempting to continue, possibly with invalid data",
|
||||
e
|
||||
);
|
||||
e.into_inner()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The stream that the `ClientAgent` manages. `Poll` is the only method implemented.
|
||||
impl futures::stream::Stream for ClientAgent {
|
||||
type Item = Event;
|
||||
type Error = RedisConnErr;
|
||||
type Error = ReceiverErr;
|
||||
|
||||
/// Checks for any new messages that should be sent to the client.
|
||||
///
|
||||
|
@ -93,11 +86,8 @@ impl futures::stream::Stream for ClientAgent {
|
|||
/// errors from the underlying data structures.
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
let result = {
|
||||
let mut receiver = self
|
||||
.receiver
|
||||
.lock()
|
||||
.expect("ClientAgent: No other thread panic");
|
||||
receiver.poll_for(self.id, self.subscription.timeline)
|
||||
let mut receiver = self.lock_receiver();
|
||||
receiver.poll_for(self.subscription.id, self.subscription.timeline)
|
||||
};
|
||||
|
||||
let allowed_langs = &self.subscription.allowed_langs;
|
||||
|
@ -131,7 +121,7 @@ impl futures::stream::Stream for ClientAgent {
|
|||
},
|
||||
Ok(Ready(None)) => Ok(Ready(None)),
|
||||
Ok(NotReady) => Ok(NotReady),
|
||||
Err(_e) => todo!("Handle err gracefully"),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ use warp::{
|
|||
sse::Sse,
|
||||
ws::{Message, WebSocket},
|
||||
};
|
||||
|
||||
pub struct EventStream;
|
||||
|
||||
impl EventStream {
|
||||
|
@ -32,7 +31,7 @@ impl EventStream {
|
|||
.map(|_r| ())
|
||||
.map_err(|e| match e.to_string().as_ref() {
|
||||
"IO error: Broken pipe (os error 32)" => (), // just closed unix socket
|
||||
_ => log::warn!("websocket send error: {}", e),
|
||||
_ => log::warn!("WebSocket send error: {}", e),
|
||||
}),
|
||||
);
|
||||
|
||||
|
@ -42,7 +41,6 @@ impl EventStream {
|
|||
match ws_rx.poll() {
|
||||
Ok(Async::NotReady) | Ok(Async::Ready(Some(_))) => futures::future::ok(true),
|
||||
Ok(Async::Ready(None)) => {
|
||||
// TODO: consider whether we should manually drop closed connections here
|
||||
log::info!("Client closed WebSocket connection for {:?}", timeline);
|
||||
futures::future::ok(false)
|
||||
}
|
||||
|
@ -58,27 +56,35 @@ impl EventStream {
|
|||
}
|
||||
});
|
||||
|
||||
let mut time = Instant::now();
|
||||
let mut last_ping_time = Instant::now();
|
||||
// Every time you get an event from that stream, send it through the pipe
|
||||
event_stream
|
||||
.for_each(move |_instant| {
|
||||
if let Ok(Async::Ready(Some(msg))) = client_agent.poll() {
|
||||
tx.unbounded_send(Message::text(msg.to_json_string()))
|
||||
.expect("No send error");
|
||||
};
|
||||
if time.elapsed() > Duration::from_secs(30) {
|
||||
tx.unbounded_send(Message::text("{}")).expect("Can ping");
|
||||
time = Instant::now();
|
||||
match client_agent.poll() {
|
||||
Ok(Async::Ready(Some(msg))) => tx
|
||||
.unbounded_send(Message::text(msg.to_json_string()))
|
||||
.unwrap_or_else(|e| {
|
||||
log::error!("Could not send message to WebSocket: {}", e)
|
||||
}),
|
||||
Ok(Async::Ready(None)) => log::info!("WebSocket ClientAgent got Ready(None)"),
|
||||
Ok(Async::NotReady) if last_ping_time.elapsed() > Duration::from_secs(30) => {
|
||||
tx.unbounded_send(Message::text("{}")).unwrap_or_else(|e| {
|
||||
log::error!("Could not send ping to WebSocket: {}", e)
|
||||
});
|
||||
last_ping_time = Instant::now();
|
||||
}
|
||||
Ok(Async::NotReady) => (), // no new messages; nothing to do
|
||||
Err(e) => log::error!("{}\n Dropping WebSocket message and continuing.", e),
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.then(move |result| {
|
||||
// TODO: consider whether we should manually drop closed connections here
|
||||
log::info!("WebSocket connection for {:?} closed.", timeline);
|
||||
result
|
||||
})
|
||||
.map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e))
|
||||
}
|
||||
|
||||
pub fn to_sse(mut client_agent: ClientAgent, sse: Sse, interval: Duration) -> impl Reply {
|
||||
let event_stream =
|
||||
tokio::timer::Interval::new(Instant::now(), interval).filter_map(move |_| {
|
||||
|
@ -87,7 +93,15 @@ impl EventStream {
|
|||
warp::sse::event(event.event_name()),
|
||||
warp::sse::data(event.payload().unwrap_or_else(String::new)),
|
||||
)),
|
||||
_ => None,
|
||||
Ok(Async::Ready(None)) => {
|
||||
log::info!("SSE ClientAgent got Ready(None)");
|
||||
None
|
||||
}
|
||||
Ok(Async::NotReady) => None,
|
||||
Err(e) => {
|
||||
log::error!("{}\n Dropping SSE message and continuing.", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -12,6 +12,6 @@ pub use redis::redis_msg;
|
|||
//#[cfg(test)]
|
||||
//pub use receiver::process_messages;
|
||||
//#[cfg(test)]
|
||||
pub use receiver::{MessageQueues, MsgQueue};
|
||||
pub use receiver::{MessageQueues, MsgQueue, Receiver, ReceiverErr};
|
||||
//#[cfg(test)]
|
||||
//pub use redis::redis_msg::{RedisMsg, RedisUtf8};
|
||||
|
|
|
@ -2,6 +2,7 @@ use super::super::{redis::RedisConnErr, redis_msg::RedisParseErr};
|
|||
use crate::err::TimelineErr;
|
||||
|
||||
use serde_json;
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ReceiverErr {
|
||||
|
@ -11,6 +12,19 @@ pub enum ReceiverErr {
|
|||
RedisConnErr(RedisConnErr),
|
||||
}
|
||||
|
||||
impl fmt::Display for ReceiverErr {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
use ReceiverErr::*;
|
||||
match self {
|
||||
EventErr(inner) => write!(f, "{}", inner),
|
||||
RedisParseErr(inner) => write!(f, "{}", inner),
|
||||
RedisConnErr(inner) => write!(f, "{}", inner),
|
||||
TimelineErr(inner) => write!(f, "{}", inner),
|
||||
}?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for ReceiverErr {
|
||||
fn from(error: serde_json::Error) -> Self {
|
||||
Self::EventErr(error)
|
||||
|
|
|
@ -12,37 +12,42 @@ use super::redis::{redis_connection::RedisCmd, RedisConn};
|
|||
use crate::{
|
||||
config,
|
||||
messages::Event,
|
||||
parse_client_request::{Stream, Timeline},
|
||||
parse_client_request::{Stream, Subscription, Timeline},
|
||||
};
|
||||
|
||||
use futures::{Async, Poll};
|
||||
use std::collections::HashMap;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
result,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
type Result<T> = result::Result<T, ReceiverErr>;
|
||||
|
||||
/// The item that streams from Redis and is polled by the `ClientAgent`
|
||||
#[derive(Debug)]
|
||||
pub struct Receiver {
|
||||
redis_connection: RedisConn,
|
||||
pub msg_queues: MessageQueues,
|
||||
clients_per_timeline: HashMap<Timeline, i32>,
|
||||
// hashtag_cache: LruCache<i64, String>,
|
||||
// TODO: eventually, it might make sense to have Mastodon publish to timelines with
|
||||
// the tag number instead of the tag name. This would save us from dealing
|
||||
// with a cache here and would be consistent with how lists/users are handled.
|
||||
}
|
||||
|
||||
impl Receiver {
|
||||
/// Create a new `Receiver`, with its own Redis connections (but, as yet, no
|
||||
/// active subscriptions).
|
||||
pub fn new(redis_cfg: config::RedisConfig) -> Self {
|
||||
let redis_connection = RedisConn::new(redis_cfg).expect("TODO");
|
||||
pub fn try_from(redis_cfg: config::RedisConfig) -> Result<Self> {
|
||||
let redis_connection = RedisConn::new(redis_cfg)?;
|
||||
|
||||
Self {
|
||||
Ok(Self {
|
||||
redis_connection,
|
||||
msg_queues: MessageQueues(HashMap::new()),
|
||||
clients_per_timeline: HashMap::new(),
|
||||
// should this be a run-time option?
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn into_arc(self) -> Arc<Mutex<Self>> {
|
||||
Arc::new(Mutex::new(self))
|
||||
}
|
||||
|
||||
/// Assigns the `Receiver` a new timeline to monitor and runs other
|
||||
|
@ -51,13 +56,15 @@ impl Receiver {
|
|||
/// Note: this method calls `subscribe_or_unsubscribe_as_needed`,
|
||||
/// so Redis PubSub subscriptions are only updated when a new timeline
|
||||
/// comes under management for the first time.
|
||||
pub fn manage_new_timeline(&mut self, id: Uuid, tl: Timeline, hashtag: Option<String>) {
|
||||
if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (hashtag, tl) {
|
||||
pub fn add_subscription(&mut self, subscription: &Subscription) -> Result<()> {
|
||||
let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline);
|
||||
|
||||
if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (tag, tl) {
|
||||
self.redis_connection.update_cache(hashtag, id);
|
||||
};
|
||||
|
||||
self.msg_queues.insert(id, MsgQueue::new(tl));
|
||||
self.subscribe_or_unsubscribe_as_needed(tl);
|
||||
self.msg_queues.insert(subscription.id, MsgQueue::new(tl));
|
||||
self.subscribe_or_unsubscribe_as_needed(tl)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the oldest message in the `ClientAgent`'s queue (if any).
|
||||
|
@ -102,8 +109,8 @@ impl Receiver {
|
|||
/// Drop any PubSub subscriptions that don't have active clients and check
|
||||
/// that there's a subscription to the current one. If there isn't, then
|
||||
/// subscribe to it.
|
||||
fn subscribe_or_unsubscribe_as_needed(&mut self, timeline: Timeline) {
|
||||
let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(timeline);
|
||||
fn subscribe_or_unsubscribe_as_needed(&mut self, tl: Timeline) -> Result<()> {
|
||||
let timelines_to_modify = self.msg_queues.calculate_timelines_to_add_or_drop(tl);
|
||||
|
||||
// Record the lower number of clients subscribed to that channel
|
||||
for change in timelines_to_modify {
|
||||
|
@ -118,14 +125,11 @@ impl Receiver {
|
|||
// If no clients, unsubscribe from the channel
|
||||
use RedisCmd::*;
|
||||
if *count_of_subscribed_clients <= 0 {
|
||||
self.redis_connection
|
||||
.send_cmd(Unsubscribe, &timeline)
|
||||
.expect("TODO");
|
||||
self.redis_connection.send_cmd(Unsubscribe, &timeline)?;
|
||||
} else if *count_of_subscribed_clients == 1 && change.in_subscriber_number == 1 {
|
||||
self.redis_connection
|
||||
.send_cmd(Subscribe, &timeline)
|
||||
.expect("TODO");
|
||||
self.redis_connection.send_cmd(Subscribe, &timeline)?
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
pub mod redis_cmd;
|
||||
pub mod redis_connection;
|
||||
pub mod redis_msg;
|
||||
|
||||
|
|
|
@ -3,8 +3,8 @@ use std::fmt;
|
|||
#[derive(Debug)]
|
||||
pub enum RedisConnErr {
|
||||
ConnectionErr { addr: String, inner: std::io::Error },
|
||||
// TODO ^^^^ better name?
|
||||
UnknownRedisErr(String),
|
||||
InvalidRedisReply(String),
|
||||
UnknownRedisErr(std::io::Error),
|
||||
IncorrectPassword(String),
|
||||
MissingPassword,
|
||||
NotRedis(String),
|
||||
|
@ -28,10 +28,13 @@ impl fmt::Display for RedisConnErr {
|
|||
Connection Error: {}",
|
||||
addr, inner
|
||||
),
|
||||
UnknownRedisErr(unexpected_reply) => format!(
|
||||
"Could not connect to Redis for an unknown reason. Expected `+PONG` reply but got `{}`",
|
||||
InvalidRedisReply(unexpected_reply) => format!(
|
||||
"Received and unexpected reply from Redis. Expected `+PONG` reply but got `{}`",
|
||||
unexpected_reply
|
||||
),
|
||||
UnknownRedisErr(io_err) => {
|
||||
format!("Unexpected failure communicating with Redis: {}", io_err)
|
||||
}
|
||||
IncorrectPassword(attempted_password) => format!(
|
||||
"Incorrect Redis password. You supplied `{}`.\n \
|
||||
Please supply correct password with REDIS_PASSWORD environmental variable.",
|
||||
|
@ -51,48 +54,8 @@ impl fmt::Display for RedisConnErr {
|
|||
}
|
||||
}
|
||||
|
||||
// die_with_msg(format!(
|
||||
// r"Incorrect Redis password. You supplied `{}`.
|
||||
// Please supply correct password with REDIS_PASSWORD environmental variable.",
|
||||
// password,
|
||||
// ))
|
||||
|
||||
// impl fmt::Display for RedisParseErr {
|
||||
// fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
// use RedisParseErr::*;
|
||||
// let msg = match self {
|
||||
// Incomplete => "The input from Redis does not form a complete message, likely because \
|
||||
// the input buffer filled partway through a message. Save this input \
|
||||
// and try again with additional input from Redis."
|
||||
// .to_string(),
|
||||
// InvalidNumber(parse_int_err) => format!(
|
||||
// "Redis indicated that an item would be a number, but it could not be parsed: {}",
|
||||
// parse_int_err
|
||||
// ),
|
||||
|
||||
// InvalidLineStart(line_start_char) => format!(
|
||||
// "A line from Redis started with `{}`, which is not a valid character to indicate \
|
||||
// the type of the Redis line.",
|
||||
// line_start_char
|
||||
// ),
|
||||
// InvalidLineEnd => "A Redis line ended before expected line length".to_string(),
|
||||
// IncorrectRedisType => "Received a Redis type that is not supported in this context. \
|
||||
// Flodgatt expects each message from Redis to be a Redis array \
|
||||
// consisting of bulk strings or integers."
|
||||
// .to_string(),
|
||||
// MissingField => "Redis input was missing a field Flodgatt expected (e.g., a `message` \
|
||||
// without a payload line)"
|
||||
// .to_string(),
|
||||
// UnsupportedTimeline => {
|
||||
// "The raw timeline received from Redis could not be parsed into a \
|
||||
// supported timeline"
|
||||
// .to_string()
|
||||
// }
|
||||
// UnsupportedEvent(e) => format!(
|
||||
// "The event text from Redis could not be parsed into a valid event: {}",
|
||||
// e
|
||||
// ),
|
||||
// };
|
||||
// write!(f, "{}", msg)
|
||||
// }
|
||||
// }
|
||||
impl From<std::io::Error> for RedisConnErr {
|
||||
fn from(e: std::io::Error) -> RedisConnErr {
|
||||
RedisConnErr::UnknownRedisErr(e)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@ use std::{
|
|||
use futures::{Async, Poll};
|
||||
use lru::LruCache;
|
||||
|
||||
type Result<T> = std::result::Result<T, RedisConnErr>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RedisConn {
|
||||
primary: TcpStream,
|
||||
|
@ -33,32 +35,38 @@ pub struct RedisConn {
|
|||
}
|
||||
|
||||
impl RedisConn {
|
||||
pub fn new(redis_cfg: RedisConfig) -> Result<Self, RedisConnErr> {
|
||||
pub fn new(redis_cfg: RedisConfig) -> Result<Self> {
|
||||
let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port);
|
||||
let conn = Self::new_connection(&addr, &redis_cfg.password.as_ref())?;
|
||||
conn.set_nonblocking(true)
|
||||
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
|
||||
|
||||
Ok(Self {
|
||||
let redis_conn = Self {
|
||||
primary: conn,
|
||||
secondary: Self::new_connection(&addr, &redis_cfg.password.as_ref())?,
|
||||
tag_id_cache: LruCache::new(1000),
|
||||
tag_name_cache: LruCache::new(1000),
|
||||
// TODO: eventually, it might make sense to have Mastodon publish to timelines with
|
||||
// the tag number instead of the tag name. This would save us from dealing
|
||||
// with a cache here and would be consistent with how lists/users are handled.
|
||||
redis_namespace: redis_cfg.namespace.clone(),
|
||||
redis_poll_interval: *redis_cfg.polling_interval,
|
||||
redis_input: Vec::new(),
|
||||
redis_polled_at: Instant::now(),
|
||||
})
|
||||
};
|
||||
|
||||
Ok(redis_conn)
|
||||
}
|
||||
|
||||
pub fn poll_redis(&mut self) -> Poll<Option<(Timeline, Event)>, ReceiverErr> {
|
||||
let mut buffer = vec![0u8; 6000];
|
||||
if self.redis_polled_at.elapsed() > self.redis_poll_interval {
|
||||
match self.primary.read(&mut buffer) {
|
||||
Ok(bytes_read) => self.redis_input.extend_from_slice(&buffer[..bytes_read]),
|
||||
Err(e) => log::error!("Error polling Redis: {}\nRetrying...", e),
|
||||
if let Ok(bytes_read) = self.primary.read(&mut buffer) {
|
||||
self.redis_input.extend_from_slice(&buffer[..bytes_read]);
|
||||
}
|
||||
}
|
||||
if self.redis_input.is_empty() {
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
let input = self.redis_input.clone();
|
||||
self.redis_input.clear();
|
||||
|
||||
|
@ -100,7 +108,7 @@ impl RedisConn {
|
|||
self.tag_name_cache.put(id, hashtag);
|
||||
}
|
||||
|
||||
fn new_connection(addr: &String, pass: &Option<&String>) -> Result<TcpStream, RedisConnErr> {
|
||||
fn new_connection(addr: &String, pass: &Option<&String>) -> Result<TcpStream> {
|
||||
match TcpStream::connect(&addr) {
|
||||
Ok(mut conn) => {
|
||||
if let Some(password) = pass {
|
||||
|
@ -115,7 +123,7 @@ impl RedisConn {
|
|||
Err(e) => Err(RedisConnErr::with_addr(&addr, e)),
|
||||
}
|
||||
}
|
||||
fn auth_connection(conn: &mut TcpStream, addr: &str, pass: &str) -> Result<(), RedisConnErr> {
|
||||
fn auth_connection(conn: &mut TcpStream, addr: &str, pass: &str) -> Result<()> {
|
||||
conn.write_all(&format!("*2\r\n$4\r\nauth\r\n${}\r\n{}\r\n", pass.len(), pass).as_bytes())
|
||||
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
|
||||
let mut buffer = vec![0u8; 5];
|
||||
|
@ -129,7 +137,7 @@ impl RedisConn {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<(), RedisConnErr> {
|
||||
fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<()> {
|
||||
conn.write_all(b"PING\r\n")
|
||||
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
|
||||
let mut buffer = vec![0u8; 7];
|
||||
|
@ -140,11 +148,11 @@ impl RedisConn {
|
|||
"+PONG\r\n" => Ok(()),
|
||||
"-NOAUTH" => Err(RedisConnErr::MissingPassword),
|
||||
"HTTP/1." => Err(RedisConnErr::NotRedis(addr.to_string())),
|
||||
_ => Err(RedisConnErr::UnknownRedisErr(reply.to_string())),
|
||||
_ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_cmd(&mut self, cmd: RedisCmd, timeline: &Timeline) -> Result<(), RedisConnErr> {
|
||||
pub fn send_cmd(&mut self, cmd: RedisCmd, timeline: &Timeline) -> Result<()> {
|
||||
let hashtag = match timeline {
|
||||
Timeline(Stream::Hashtag(id), _, _) => self.tag_name_cache.get(id),
|
||||
_non_hashtag_timeline => None,
|
||||
|
@ -161,12 +169,8 @@ impl RedisConn {
|
|||
format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n$1\r\n0\r\n", tl.len(), tl),
|
||||
),
|
||||
};
|
||||
self.secondary
|
||||
.write_all(&primary_cmd.as_bytes())
|
||||
.expect("TODO");
|
||||
self.secondary
|
||||
.write_all(&secondary_cmd.as_bytes())
|
||||
.expect("TODO");
|
||||
self.primary.write_all(&primary_cmd.as_bytes())?;
|
||||
self.secondary.write_all(&secondary_cmd.as_bytes())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue