mirror of https://github.com/mastodon/flodgatt
commit
839f04a2e6
File diff suppressed because it is too large
Load Diff
11
Cargo.toml
11
Cargo.toml
|
@ -2,19 +2,11 @@
|
|||
name = "ragequit"
|
||||
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
|
||||
version = "0.1.0"
|
||||
authors = ["Julian Laubstein <contact@julianlaubstein.de>"]
|
||||
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
actix-web = "0.7.18"
|
||||
structopt = "0.2.14"
|
||||
log = "0.4.6"
|
||||
actix = "0.7.9"
|
||||
actix-redis = "0.5.1"
|
||||
redis-async = "0.4.4"
|
||||
envconfig = "0.5.0"
|
||||
envconfig_derive = "0.5.0"
|
||||
whoami = "0.4.1"
|
||||
futures = "0.1.26"
|
||||
tokio = "0.1.19"
|
||||
warp = "0.1.15"
|
||||
|
@ -25,6 +17,7 @@ serde = "1.0.90"
|
|||
pretty_env_logger = "0.3.0"
|
||||
postgres = "0.15.2"
|
||||
uuid = { version = "0.7", features = ["v4"] }
|
||||
dotenv = "0.14.0"
|
||||
|
||||
[features]
|
||||
default = [ "production" ]
|
||||
|
|
30
README.md
30
README.md
|
@ -1,4 +1,32 @@
|
|||
# RageQuit
|
||||
# Mastodon Streaming Server
|
||||
A WIP blazingly fast drop-in replacement for the Mastodon streaming api server.
|
||||
|
||||
## Current status
|
||||
The streaming server is very much a work in progress. It is currently missing essential features including support for SSL, CORS, and separate development/production environments. However, it has reached the point where it is usable/testable in a localhost development environment and I would greatly appreciate any testing, bug reports, or other feedback you could provide.
|
||||
|
||||
## Installation
|
||||
Installing the WIP version requires the Rust toolchain (the released version will be available as a pre-compiled binary). To install, clone this repository and run `cargo build` (to build the server) `cargo run` (to both build and run the server), or `cargo build --release` (to build the server with release optimizations).
|
||||
|
||||
## Connection to Mastodon
|
||||
The streaming server expects to connect to a running development version of Mastodon built off of the `master` branch. Specifically, it needs to connect to both the Postgres database (to authenticate users) and to the Redis database. You should run Mastodon in whatever way you normally do and configure the streaming server to connect to the appropriate databases.
|
||||
|
||||
## Configuring
|
||||
You may edit the (currently limited) configuration variables in the `.env` file. Note that, by default, this server is configured to run on port 4000. This allows for easy testing with the development version of Mastodon (which, by default, is configured to communicate with a streaming server running on `localhost:4000`). However, it also conflicts with the current/Node.js version of Mastodon's streaming server, which runs on the same port. Thus, to test this server, you should disable the other streaming server or move it to a non-conflicting port.
|
||||
|
||||
## Documentation
|
||||
Build documentation with `cargo doc --open`, which will build the Markdown docs and open them in your browser. Please consult those docs for a description of the code structure/organization.
|
||||
|
||||
## Running
|
||||
As noted above, you can run the server with `cargo run`. Alternatively, if you built the sever using `cargo build` or `cargo build --release`, you can run the executable produced in the `target/build/debug` folder or the `target/build/release` folder.
|
||||
|
||||
## Unit and (limited) integration tests
|
||||
You can run basic unit test of the public Server Sent Event endpoints with `cargo test`. You can run integration tests of the authenticated SSE endpoints (which require a Postgres connection) with `cargo test -- --ignored`.
|
||||
|
||||
## Manual testing
|
||||
Once the streaming server is running, you can also test it manually. You can test it using a browser connected to the relevant Mastodon development server. Or you can test the SSE endpoints with `curl`, PostMan, or any other HTTP client. Similarly, you can test the WebSocket endpoints with `websocat` or any other WebSocket client.
|
||||
|
||||
## Memory/CPU usage
|
||||
Note that memory usage is higher when running the development version of the streaming server (the one generated with `cargo run` or `cargo build`). If you are interested in measuring RAM or CPU usage, you should likely run `cargo build --release` and test the release version of the executable.
|
||||
|
||||
## Load testing
|
||||
I have not yet found a good way to test the streaming server under load. I have experimented with using `artillery` or other load-testing utilities. However, every utility I am familiar with or have found is built around either HTTP requests or WebSocket connections in which the client sends messages. I have not found a good solution to test receiving SSEs or WebSocket connections where the client does not transmit data after establishing the connection. If you are aware of a good way to do load testing, please let me know.
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
# Uncomment any of the variables below to customize your enviornment
|
||||
|
||||
#SERVER_ADDR=
|
||||
#REDIS_ADDR=
|
||||
#POSTGRES_ADDR=
|
|
@ -1,6 +0,0 @@
|
|||
use crate::AppState;
|
||||
use actix_web::{HttpRequest, Responder};
|
||||
|
||||
pub fn index(_req: HttpRequest<AppState>) -> impl Responder {
|
||||
"placeholder response from direct::index"
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
use crate::AppState;
|
||||
use actix_web::{HttpRequest, Responder};
|
||||
|
||||
pub fn index(_req: HttpRequest<AppState>) -> impl Responder {
|
||||
"placeholder response from hashtag::index"
|
||||
}
|
||||
|
||||
pub fn local(_req: HttpRequest<AppState>) -> impl Responder {
|
||||
"placeholder response from hashtag::local"
|
||||
}
|
|
@ -1,6 +0,0 @@
|
|||
use crate::AppState;
|
||||
use actix_web::{HttpRequest, Responder};
|
||||
|
||||
pub fn index(_req: HttpRequest<AppState>) -> impl Responder {
|
||||
"placeholder response from list::index"
|
||||
}
|
|
@ -1,5 +0,0 @@
|
|||
pub mod direct;
|
||||
pub mod hashtag;
|
||||
pub mod list;
|
||||
pub mod public;
|
||||
pub mod user;
|
|
@ -1,10 +0,0 @@
|
|||
use crate::AppState;
|
||||
use actix_web::{HttpRequest, Responder};
|
||||
|
||||
pub fn index(_req: HttpRequest<AppState>) -> impl Responder {
|
||||
"placeholder response from public::index"
|
||||
}
|
||||
|
||||
pub fn local(_req: HttpRequest<AppState>) -> impl Responder {
|
||||
"placeholder response from public::local"
|
||||
}
|
|
@ -1,6 +0,0 @@
|
|||
use crate::AppState;
|
||||
use actix_web::{HttpRequest, Responder};
|
||||
|
||||
pub fn index(_req: HttpRequest<AppState>) -> impl Responder {
|
||||
"placeholder response from user::index"
|
||||
}
|
|
@ -1,2 +0,0 @@
|
|||
pub mod http;
|
||||
pub mod ws;
|
|
@ -1,46 +0,0 @@
|
|||
use crate::{common::HEARTBEAT_INTERVAL_SECONDS, AppState};
|
||||
use actix::{Actor, AsyncContext, StreamHandler};
|
||||
use actix_redis::{Command, RespValue};
|
||||
use actix_web::{ws, HttpRequest, Responder};
|
||||
use log::{debug, info};
|
||||
use std::time::Duration;
|
||||
|
||||
/// Define http actor
|
||||
struct WebsocketActor;
|
||||
|
||||
impl Actor for WebsocketActor {
|
||||
type Context = ws::WebsocketContext<Self, AppState>;
|
||||
}
|
||||
|
||||
/// Handler for ws::Message message
|
||||
impl StreamHandler<ws::Message, ws::ProtocolError> for WebsocketActor {
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
ctx.run_interval(
|
||||
Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS),
|
||||
|_, inner_ctx| {
|
||||
inner_ctx.ping("Ping from StreamHandler");
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// This appears to be an echo server based on the actix_web documentation
|
||||
// We won't actually need echo functionality in the final server
|
||||
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
|
||||
debug!("Message {:?}", msg);
|
||||
|
||||
let red = ctx
|
||||
.state()
|
||||
.redis
|
||||
.send(Command(RespValue::SimpleString("GET".into())));
|
||||
|
||||
match msg {
|
||||
ws::Message::Pong(msg) => debug!("matched: {}", msg),
|
||||
ws::Message::Text(text) => ctx.text(text),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn index(req: HttpRequest<AppState>) -> impl Responder {
|
||||
ws::start(&req, WebsocketActor)
|
||||
}
|
|
@ -1 +0,0 @@
|
|||
pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 30;
|
60
src/env.rs
60
src/env.rs
|
@ -1,60 +0,0 @@
|
|||
use envconfig::Envconfig;
|
||||
use std::net::IpAddr;
|
||||
|
||||
/// Returns the current users username.
|
||||
/// TODO: Find a way to do this cross-platform
|
||||
pub fn current_user() -> String {
|
||||
whoami::username()
|
||||
}
|
||||
|
||||
#[cfg(feature = "production")]
|
||||
#[derive(Envconfig)]
|
||||
/// Production DB configuration
|
||||
pub struct DbConfig {
|
||||
#[envconfig(from = "DB_USER", default = "mastodon")]
|
||||
user: String,
|
||||
#[envconfig(from = "DB_PASS", default = "")]
|
||||
password: String,
|
||||
#[envconfig(from = "DB_NAME", default = "mastodon_production")]
|
||||
database: String,
|
||||
#[envconfig(from = "DB_HOST", default = "localhost")]
|
||||
host: String,
|
||||
#[envconfig(from = "DB_PORT", default = "5432")]
|
||||
port: u16,
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "production"))]
|
||||
#[derive(Envconfig)]
|
||||
/// Development DB configuration
|
||||
pub struct DbConfig {
|
||||
#[envconfig(from = "DB_USER", default = current_user())]
|
||||
pub user: String,
|
||||
#[envconfig(from = "DB_PASS", default = "")]
|
||||
pub password: String,
|
||||
#[envconfig(from = "DB_NAME", default = "mastodon_development")]
|
||||
pub database: String,
|
||||
#[envconfig(from = "DB_HOST", default = "localhost")]
|
||||
pub host: String,
|
||||
#[envconfig(from = "DB_PORT", default = "5432")]
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
#[derive(Envconfig)]
|
||||
pub struct ServerConfig {
|
||||
#[envconfig(from = "BIND", default = "0.0.0.0")]
|
||||
pub address: IpAddr,
|
||||
#[envconfig(from = "PORT", default = "4000")]
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
#[derive(Envconfig)]
|
||||
pub struct RedisConfig {
|
||||
#[envconfig(from = "REDIS_HOST", default = "127.0.0.1")]
|
||||
pub host: IpAddr,
|
||||
#[envconfig(from = "REDIS_PORT", default = "6379")]
|
||||
pub port: u16,
|
||||
#[envconfig(from = "REDIS_DB", default = "0")]
|
||||
pub db: u16,
|
||||
#[envconfig(from = "REDIS_PASSWORD", default = "")]
|
||||
pub password: String,
|
||||
}
|
100
src/main.old.rs
100
src/main.old.rs
|
@ -1,100 +0,0 @@
|
|||
#[macro_use]
|
||||
extern crate envconfig_derive;
|
||||
|
||||
mod api;
|
||||
mod common;
|
||||
mod env;
|
||||
mod middleware;
|
||||
|
||||
use actix::prelude::*;
|
||||
use actix_redis::RedisActor;
|
||||
use actix_web::{http::header, middleware::cors::Cors, server, App, HttpResponse};
|
||||
use env::{RedisConfig, ServerConfig};
|
||||
use env_logger::Builder;
|
||||
use envconfig::Envconfig;
|
||||
use log::info;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
const ENV_LOG_VARIABLE: &str = "STREAMING_API_LOG";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
redis: Addr<RedisActor>,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
Builder::from_env(ENV_LOG_VARIABLE).init();
|
||||
|
||||
info!("starting streaming api server");
|
||||
|
||||
let server_cfg = ServerConfig::init().expect("failed to obtain server environment");
|
||||
let redis_cfg = RedisConfig::init().expect("failed to obtain redis environment");
|
||||
|
||||
let sys = System::new("streaming-api-server");
|
||||
|
||||
let redis_addr = RedisActor::start(format!("{}:{}", redis_cfg.host, redis_cfg.port));
|
||||
|
||||
let app_state = AppState {
|
||||
redis: redis_addr.clone(),
|
||||
};
|
||||
|
||||
server::new(move || endpoints(&app_state))
|
||||
.bind(SocketAddr::new(server_cfg.address, server_cfg.port))
|
||||
.unwrap()
|
||||
.start();
|
||||
|
||||
sys.run();
|
||||
}
|
||||
|
||||
fn endpoints(app_state: &AppState) -> App<AppState> {
|
||||
use api::http;
|
||||
use api::ws;
|
||||
|
||||
App::with_state(app_state.clone())
|
||||
.prefix("/api/v1")
|
||||
.resource("/streaming", |r| r.with(ws::index))
|
||||
.resource("/streaming/health", |r| {
|
||||
r.middleware(cors_middleware());
|
||||
r.get().f(|_| HttpResponse::Ok())
|
||||
})
|
||||
.resource("/streaming/user", |r| {
|
||||
r.middleware(cors_middleware());
|
||||
r.get().with(http::user::index)
|
||||
})
|
||||
.resource("/streaming/public", |r| {
|
||||
r.middleware(cors_middleware());
|
||||
r.get().with(http::public::index)
|
||||
})
|
||||
.resource("/streaming/public/local", |r| {
|
||||
r.middleware(cors_middleware());
|
||||
r.get().with(http::public::local)
|
||||
})
|
||||
.resource("/streaming/direct", |r| {
|
||||
r.middleware(cors_middleware());
|
||||
r.get().with(http::direct::index)
|
||||
})
|
||||
.resource("/streaming/hashtag", |r| {
|
||||
r.middleware(cors_middleware());
|
||||
r.get().with(http::hashtag::index)
|
||||
})
|
||||
.resource("/streaming/hashtag/local", |r| {
|
||||
r.middleware(cors_middleware());
|
||||
r.get().with(http::hashtag::local)
|
||||
})
|
||||
.resource("/streaming/list", |r| {
|
||||
r.middleware(cors_middleware());
|
||||
r.get().with(http::list::index)
|
||||
})
|
||||
}
|
||||
|
||||
fn cors_middleware() -> Cors {
|
||||
Cors::build()
|
||||
.allowed_origin("*")
|
||||
.allowed_methods(vec!["GET", "OPTIONS"])
|
||||
.allowed_headers(vec![
|
||||
header::AUTHORIZATION,
|
||||
header::ACCEPT,
|
||||
header::CACHE_CONTROL,
|
||||
])
|
||||
.finish()
|
||||
}
|
10
src/main.rs
10
src/main.rs
|
@ -33,9 +33,12 @@ pub mod stream;
|
|||
pub mod timeline;
|
||||
pub mod user;
|
||||
pub mod ws;
|
||||
use dotenv::dotenv;
|
||||
use futures::stream::Stream;
|
||||
use futures::Async;
|
||||
use receiver::Receiver;
|
||||
use std::env;
|
||||
use std::net::SocketAddr;
|
||||
use stream::StreamManager;
|
||||
use user::{Scope, User};
|
||||
use warp::path;
|
||||
|
@ -43,6 +46,7 @@ use warp::Filter as WarpFilter;
|
|||
|
||||
fn main() {
|
||||
pretty_env_logger::init();
|
||||
dotenv().ok();
|
||||
|
||||
let redis_updates = StreamManager::new(Receiver::new());
|
||||
let redis_updates_sse = redis_updates.blank_copy();
|
||||
|
@ -138,5 +142,9 @@ fn main() {
|
|||
},
|
||||
);
|
||||
|
||||
warp::serve(websocket.or(routes)).run(([127, 0, 0, 1], 4000));
|
||||
let address: SocketAddr = env::var("SERVER_ADDR")
|
||||
.unwrap_or("127.0.0.1:4000".to_owned())
|
||||
.parse()
|
||||
.expect("static string");
|
||||
warp::serve(websocket.or(routes)).run(address);
|
||||
}
|
||||
|
|
|
@ -1,21 +0,0 @@
|
|||
use crate::AppState;
|
||||
use actix_web::{
|
||||
error::Result,
|
||||
http::header::AUTHORIZATION,
|
||||
middleware::{Middleware, Started},
|
||||
HttpRequest, HttpResponse,
|
||||
};
|
||||
|
||||
pub struct Auth;
|
||||
|
||||
impl Middleware<AppState> for Auth {
|
||||
fn start(&self, req: &HttpRequest<AppState>) -> Result<Started> {
|
||||
let res = req
|
||||
.headers()
|
||||
.get(AUTHORIZATION)
|
||||
.map(|bearer| Started::Done)
|
||||
.unwrap_or_else(|| Started::Response(HttpResponse::Unauthorized().finish()));
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
}
|
|
@ -1 +0,0 @@
|
|||
pub mod auth;
|
|
@ -7,6 +7,7 @@ use log::info;
|
|||
use regex::Regex;
|
||||
use serde_json::Value;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::env;
|
||||
use std::io::{Read, Write};
|
||||
use std::net::TcpStream;
|
||||
use std::time::{Duration, Instant};
|
||||
|
@ -48,12 +49,13 @@ impl Default for Receiver {
|
|||
}
|
||||
impl Receiver {
|
||||
pub fn new() -> Self {
|
||||
let pubsub_connection = TcpStream::connect("127.0.0.1:6379").expect("Can connect to Redis");
|
||||
let redis_addr = env::var("REDIS_ADDR").unwrap_or("127.0.0.1:6379".to_string());
|
||||
let pubsub_connection = TcpStream::connect(&redis_addr).expect("Can connect to Redis");
|
||||
pubsub_connection
|
||||
.set_read_timeout(Some(Duration::from_millis(10)))
|
||||
.expect("Can set read timeout for Redis connection");
|
||||
let secondary_redis_connection =
|
||||
TcpStream::connect("127.0.0.1:6379").expect("Can connect to Redis");
|
||||
TcpStream::connect(&redis_addr).expect("Can connect to Redis");
|
||||
secondary_redis_connection
|
||||
.set_read_timeout(Some(Duration::from_millis(10)))
|
||||
.expect("Can set read timeout for Redis connection");
|
||||
|
|
12
src/user.rs
12
src/user.rs
|
@ -2,15 +2,17 @@
|
|||
use crate::{any_of, query};
|
||||
use log::info;
|
||||
use postgres;
|
||||
use std::env;
|
||||
use warp::Filter as WarpFilter;
|
||||
|
||||
/// (currently hardcoded to localhost)
|
||||
pub fn connect_to_postgres() -> postgres::Connection {
|
||||
postgres::Connection::connect(
|
||||
"postgres://dsock@localhost/mastodon_development",
|
||||
postgres::TlsMode::None,
|
||||
)
|
||||
.expect("Can connect to local Postgres")
|
||||
let postgres_addr = env::var("POSTGRESS_ADDR").unwrap_or(format!(
|
||||
"postgres://{}@localhost/mastodon_development",
|
||||
env::var("USER").expect("User env var should exist")
|
||||
));
|
||||
postgres::Connection::connect(postgres_addr, postgres::TlsMode::None)
|
||||
.expect("Can connect to local Postgres")
|
||||
}
|
||||
|
||||
/// The filters that can be applied to toots after they come from Redis
|
||||
|
|
Loading…
Reference in New Issue