mirror of https://github.com/mastodon/flodgatt
Apply clippy lints
This commit is contained in:
parent
d5528aaf0c
commit
9920b20354
|
@ -440,7 +440,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flodgatt"
|
name = "flodgatt"
|
||||||
version = "0.6.6"
|
version = "0.6.7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "flodgatt"
|
name = "flodgatt"
|
||||||
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
|
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
|
||||||
version = "0.6.6"
|
version = "0.6.7"
|
||||||
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
|
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,7 @@ impl fmt::Display for EnvVar {
|
||||||
]
|
]
|
||||||
.iter()
|
.iter()
|
||||||
{
|
{
|
||||||
if let Some(value) = self.get(&env_var.to_string()) {
|
if let Some(value) = self.get(&(*env_var).to_string()) {
|
||||||
result = format!("{}\n {}: {}", result, env_var, value)
|
result = format!("{}\n {}: {}", result, env_var, value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
14
src/main.rs
14
src/main.rs
|
@ -8,7 +8,7 @@ use tokio::net::UnixListener;
|
||||||
use warp::{http::StatusCode, path, ws::Ws2, Filter, Rejection};
|
use warp::{http::StatusCode, path, ws::Ws2, Filter, Rejection};
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
dotenv::from_filename(match env::var("ENV").ok().as_ref().map(String::as_str) {
|
dotenv::from_filename(match env::var("ENV").ok().as_deref() {
|
||||||
Some("production") => ".env.production",
|
Some("production") => ".env.production",
|
||||||
Some("development") | None => ".env",
|
Some("development") | None => ".env",
|
||||||
Some(unsupported) => EnvVar::err("ENV", unsupported, "`production` or `development`"),
|
Some(unsupported) => EnvVar::err("ENV", unsupported, "`production` or `development`"),
|
||||||
|
@ -20,7 +20,7 @@ fn main() {
|
||||||
|
|
||||||
let postgres_cfg = PostgresConfig::from_env(env_vars.clone());
|
let postgres_cfg = PostgresConfig::from_env(env_vars.clone());
|
||||||
let redis_cfg = RedisConfig::from_env(env_vars.clone());
|
let redis_cfg = RedisConfig::from_env(env_vars.clone());
|
||||||
let cfg = DeploymentConfig::from_env(env_vars.clone());
|
let cfg = DeploymentConfig::from_env(env_vars);
|
||||||
|
|
||||||
let pg_pool = PgPool::new(postgres_cfg);
|
let pg_pool = PgPool::new(postgres_cfg);
|
||||||
|
|
||||||
|
@ -44,15 +44,15 @@ fn main() {
|
||||||
client_agent.subscribe();
|
client_agent.subscribe();
|
||||||
|
|
||||||
// send the updates through the SSE connection
|
// send the updates through the SSE connection
|
||||||
EventStream::to_sse(client_agent, sse_connection_to_client, sse_interval)
|
EventStream::send_to_sse(client_agent, sse_connection_to_client, sse_interval)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.with(warp::reply::with::header("Connection", "keep-alive"));
|
.with(warp::reply::with::header("Connection", "keep-alive"));
|
||||||
|
|
||||||
// WebSocket
|
// WebSocket
|
||||||
let ws_receiver = sharable_receiver.clone();
|
let ws_receiver = sharable_receiver;
|
||||||
let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode);
|
let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode);
|
||||||
let ws_routes = Subscription::from_ws_request(pg_pool.clone(), whitelist_mode)
|
let ws_routes = Subscription::from_ws_request(pg_pool, whitelist_mode)
|
||||||
.and(warp::ws::ws2())
|
.and(warp::ws::ws2())
|
||||||
.map(move |subscription: Subscription, ws: Ws2| {
|
.map(move |subscription: Subscription, ws: Ws2| {
|
||||||
log::info!("Incoming websocket request for {:?}", subscription.timeline);
|
log::info!("Incoming websocket request for {:?}", subscription.timeline);
|
||||||
|
@ -62,7 +62,9 @@ fn main() {
|
||||||
// send the updates through the WS connection
|
// send the updates through the WS connection
|
||||||
// (along with the User's access_token which is sent for security)
|
// (along with the User's access_token which is sent for security)
|
||||||
(
|
(
|
||||||
ws.on_upgrade(move |s| EventStream::to_ws(s, client_agent, ws_update_interval)),
|
ws.on_upgrade(move |s| {
|
||||||
|
EventStream::send_to_ws(s, client_agent, ws_update_interval)
|
||||||
|
}),
|
||||||
subscription.access_token.unwrap_or_else(String::new),
|
subscription.access_token.unwrap_or_else(String::new),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
|
@ -305,10 +305,11 @@ pub struct Notification {
|
||||||
status: Option<Status>,
|
status: Option<Status>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[serde(rename_all = "lowercase", deny_unknown_fields)]
|
#[serde(rename_all = "snake_case", deny_unknown_fields)]
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
enum NotificationType {
|
enum NotificationType {
|
||||||
Follow,
|
Follow,
|
||||||
|
FollowRequest, // Undocumented
|
||||||
Mention,
|
Mention,
|
||||||
Reblog,
|
Reblog,
|
||||||
Favourite,
|
Favourite,
|
||||||
|
|
|
@ -83,7 +83,7 @@ LIMIT 1",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn select_hashtag_id(self, tag_name: &String) -> Result<i64, Rejection> {
|
pub fn select_hashtag_id(self, tag_name: &str) -> Result<i64, Rejection> {
|
||||||
let mut conn = self.0.get().unwrap();
|
let mut conn = self.0.get().unwrap();
|
||||||
let rows = &conn
|
let rows = &conn
|
||||||
.query(
|
.query(
|
||||||
|
|
|
@ -132,7 +132,7 @@ impl Subscription {
|
||||||
blocks: Blocks {
|
blocks: Blocks {
|
||||||
blocking_users: pool.clone().select_blocking_users(user.id),
|
blocking_users: pool.clone().select_blocking_users(user.id),
|
||||||
blocked_users: pool.clone().select_blocked_users(user.id),
|
blocked_users: pool.clone().select_blocked_users(user.id),
|
||||||
blocked_domains: pool.clone().select_blocked_domains(user.id),
|
blocked_domains: pool.select_blocked_domains(user.id),
|
||||||
},
|
},
|
||||||
hashtag_name,
|
hashtag_name,
|
||||||
access_token: q.access_token,
|
access_token: q.access_token,
|
||||||
|
|
|
@ -12,7 +12,7 @@ pub struct EventStream;
|
||||||
|
|
||||||
impl EventStream {
|
impl EventStream {
|
||||||
/// Send a stream of replies to a WebSocket client.
|
/// Send a stream of replies to a WebSocket client.
|
||||||
pub fn to_ws(
|
pub fn send_to_ws(
|
||||||
ws: WebSocket,
|
ws: WebSocket,
|
||||||
mut client_agent: ClientAgent,
|
mut client_agent: ClientAgent,
|
||||||
interval: Duration,
|
interval: Duration,
|
||||||
|
@ -85,7 +85,7 @@ impl EventStream {
|
||||||
.map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e))
|
.map_err(move |e| log::warn!("Error sending to {:?}: {}", timeline, e))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn to_sse(mut client_agent: ClientAgent, sse: Sse, interval: Duration) -> impl Reply {
|
pub fn send_to_sse(mut client_agent: ClientAgent, sse: Sse, interval: Duration) -> impl Reply {
|
||||||
let event_stream =
|
let event_stream =
|
||||||
tokio::timer::Interval::new(Instant::now(), interval).filter_map(move |_| {
|
tokio::timer::Interval::new(Instant::now(), interval).filter_map(move |_| {
|
||||||
match client_agent.poll() {
|
match client_agent.poll() {
|
||||||
|
|
|
@ -4,14 +4,4 @@ mod event_stream;
|
||||||
mod receiver;
|
mod receiver;
|
||||||
mod redis;
|
mod redis;
|
||||||
|
|
||||||
pub use {client_agent::ClientAgent, event_stream::EventStream};
|
pub use {client_agent::ClientAgent, event_stream::EventStream, receiver::Receiver};
|
||||||
|
|
||||||
// TODO remove
|
|
||||||
pub use redis::redis_msg;
|
|
||||||
|
|
||||||
//#[cfg(test)]
|
|
||||||
//pub use receiver::process_messages;
|
|
||||||
//#[cfg(test)]
|
|
||||||
pub use receiver::{MessageQueues, MsgQueue, Receiver, ReceiverErr};
|
|
||||||
//#[cfg(test)]
|
|
||||||
//pub use redis::redis_msg::{RedisMsg, RedisUtf8};
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use super::super::{redis::RedisConnErr, redis_msg::RedisParseErr};
|
use super::super::redis::{RedisConnErr, RedisParseErr};
|
||||||
use crate::err::TimelineErr;
|
use crate::err::TimelineErr;
|
||||||
|
|
||||||
use serde_json;
|
use serde_json;
|
||||||
|
|
|
@ -2,3 +2,4 @@ pub mod redis_connection;
|
||||||
pub mod redis_msg;
|
pub mod redis_msg;
|
||||||
|
|
||||||
pub use redis_connection::{RedisConn, RedisConnErr};
|
pub use redis_connection::{RedisConn, RedisConnErr};
|
||||||
|
pub use redis_msg::RedisParseErr;
|
||||||
|
|
|
@ -37,12 +37,12 @@ pub struct RedisConn {
|
||||||
impl RedisConn {
|
impl RedisConn {
|
||||||
pub fn new(redis_cfg: RedisConfig) -> Result<Self> {
|
pub fn new(redis_cfg: RedisConfig) -> Result<Self> {
|
||||||
let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port);
|
let addr = format!("{}:{}", *redis_cfg.host, *redis_cfg.port);
|
||||||
let conn = Self::new_connection(&addr, &redis_cfg.password.as_ref())?;
|
let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?;
|
||||||
conn.set_nonblocking(true)
|
conn.set_nonblocking(true)
|
||||||
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
|
.map_err(|e| RedisConnErr::with_addr(&addr, e))?;
|
||||||
let redis_conn = Self {
|
let redis_conn = Self {
|
||||||
primary: conn,
|
primary: conn,
|
||||||
secondary: Self::new_connection(&addr, &redis_cfg.password.as_ref())?,
|
secondary: Self::new_connection(&addr, redis_cfg.password.as_ref())?,
|
||||||
tag_id_cache: LruCache::new(1000),
|
tag_id_cache: LruCache::new(1000),
|
||||||
tag_name_cache: LruCache::new(1000),
|
tag_name_cache: LruCache::new(1000),
|
||||||
// TODO: eventually, it might make sense to have Mastodon publish to timelines with
|
// TODO: eventually, it might make sense to have Mastodon publish to timelines with
|
||||||
|
@ -53,7 +53,6 @@ impl RedisConn {
|
||||||
redis_input: Vec::new(),
|
redis_input: Vec::new(),
|
||||||
redis_polled_at: Instant::now(),
|
redis_polled_at: Instant::now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(redis_conn)
|
Ok(redis_conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +107,7 @@ impl RedisConn {
|
||||||
self.tag_name_cache.put(id, hashtag);
|
self.tag_name_cache.put(id, hashtag);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_connection(addr: &String, pass: &Option<&String>) -> Result<TcpStream> {
|
fn new_connection(addr: &str, pass: Option<&String>) -> Result<TcpStream> {
|
||||||
match TcpStream::connect(&addr) {
|
match TcpStream::connect(&addr) {
|
||||||
Ok(mut conn) => {
|
Ok(mut conn) => {
|
||||||
if let Some(password) = pass {
|
if let Some(password) = pass {
|
||||||
|
|
|
@ -87,7 +87,7 @@ fn utf8_to_redis_data<'a>(s: &'a str) -> Result<(RedisData, &'a str), RedisParse
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn after_newline_at<'a>(s: &'a str, start: usize) -> RedisParser<'a, &'a str> {
|
fn after_newline_at(s: &str, start: usize) -> RedisParser<&str> {
|
||||||
let s = s.get(start..).ok_or(Incomplete)?;
|
let s = s.get(start..).ok_or(Incomplete)?;
|
||||||
if !s.starts_with("\r\n") {
|
if !s.starts_with("\r\n") {
|
||||||
return Err(RedisParseErr::InvalidLineEnd);
|
return Err(RedisParseErr::InvalidLineEnd);
|
||||||
|
|
Loading…
Reference in New Issue