mirror of https://github.com/mastodon/flodgatt
Merge pull request #9 from tootsuite/filter-notification
Add language and notification filters
This commit is contained in:
commit
fc7108c84a
52
src/auth.rs
52
src/auth.rs
|
@ -1,52 +0,0 @@
|
|||
use super::query;
|
||||
use postgres;
|
||||
use warp::Filter;
|
||||
|
||||
pub fn get_token() -> warp::filters::BoxedFilter<(String,)> {
|
||||
let token_from_header = warp::header::header::<String>("authorization")
|
||||
.map(|auth: String| auth.split(' ').nth(1).unwrap_or("invalid").to_string());
|
||||
|
||||
let token_from_query = warp::query().map(|q: query::Auth| q.access_token);
|
||||
token_from_query.or(token_from_header).unify().boxed()
|
||||
}
|
||||
|
||||
pub fn get_account_id_from_token(token: String) -> Result<i64, warp::reject::Rejection> {
|
||||
if let Ok(account_id) = get_account_id(token) {
|
||||
Ok(account_id)
|
||||
} else {
|
||||
Err(warp::reject::custom("Error: Invalid access token"))
|
||||
}
|
||||
}
|
||||
|
||||
fn conn() -> postgres::Connection {
|
||||
postgres::Connection::connect(
|
||||
"postgres://dsock@localhost/mastodon_development",
|
||||
postgres::TlsMode::None,
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn get_account_id(token: String) -> Result<i64, ()> {
|
||||
let conn = conn();
|
||||
let result = &conn
|
||||
.query(
|
||||
"
|
||||
SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages
|
||||
FROM
|
||||
oauth_access_tokens
|
||||
INNER JOIN users ON
|
||||
oauth_access_tokens.resource_owner_id = users.id
|
||||
WHERE oauth_access_tokens.token = $1
|
||||
AND oauth_access_tokens.revoked_at IS NULL
|
||||
LIMIT 1",
|
||||
&[&token],
|
||||
)
|
||||
.expect("Hard-coded query will return Some([0 or more rows])");
|
||||
if !result.is_empty() {
|
||||
let only_row = result.get(0);
|
||||
let account_id: i64 = only_row.get(1);
|
||||
Ok(account_id)
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
}
|
|
@ -19,7 +19,7 @@ pub fn handle_errors(
|
|||
"Error: Missing access token".to_string()
|
||||
}
|
||||
Some(text) => text.to_string(),
|
||||
None => "Unknown server error".to_string(),
|
||||
None => "Error: Nonexistant endpoint".to_string(),
|
||||
};
|
||||
let json = warp::reply::json(&ErrorMessage::new(err_txt));
|
||||
Ok(warp::reply::with_status(
|
||||
|
|
93
src/main.rs
93
src/main.rs
|
@ -1,85 +1,94 @@
|
|||
mod auth;
|
||||
mod error;
|
||||
mod pubsub;
|
||||
mod query;
|
||||
mod user;
|
||||
mod utils;
|
||||
use futures::stream::Stream;
|
||||
use pretty_env_logger;
|
||||
use warp::{path, Filter};
|
||||
use pubsub::{stream_from, Filter};
|
||||
use user::{Scope, User};
|
||||
use warp::{path, Filter as WarpFilter};
|
||||
|
||||
fn main() {
|
||||
pretty_env_logger::init();
|
||||
|
||||
// GET /api/v1/streaming/user
|
||||
// GET /api/v1/streaming/user [private; language filter]
|
||||
let user_timeline = path!("api" / "v1" / "streaming" / "user")
|
||||
.and(path::end())
|
||||
.and(auth::get_token())
|
||||
.and_then(auth::get_account_id_from_token)
|
||||
.map(|account_id: i64| pubsub::stream_from(account_id.to_string()));
|
||||
.and(user::get_access_token(Scope::Private))
|
||||
.and_then(|token| user::get_account(token, Scope::Private))
|
||||
.map(|user: User| stream_from(user.id, Filter::None));
|
||||
|
||||
// GET /api/v1/streaming/user/notification
|
||||
// GET /api/v1/streaming/user/notification [private; notification filter]
|
||||
let user_timeline_notifications = path!("api" / "v1" / "streaming" / "user" / "notification")
|
||||
.and(path::end())
|
||||
.and(auth::get_token())
|
||||
.and_then(auth::get_account_id_from_token)
|
||||
.map(|account_id: i64| {
|
||||
let full_stream = pubsub::stream_from(account_id.to_string());
|
||||
// TODO: filter stream to just have notifications
|
||||
full_stream
|
||||
});
|
||||
.and(user::get_access_token(Scope::Private))
|
||||
.and_then(|token| user::get_account(token, Scope::Private))
|
||||
.map(|user: User| stream_from(user.id, Filter::Notification));
|
||||
|
||||
// GET /api/v1/streaming/public
|
||||
// GET /api/v1/streaming/public [public; language filter]
|
||||
let public_timeline = path!("api" / "v1" / "streaming" / "public")
|
||||
.and(path::end())
|
||||
.map(|| pubsub::stream_from("public".to_string()));
|
||||
.and(user::get_access_token(user::Scope::Public))
|
||||
.and_then(|token| user::get_account(token, Scope::Public))
|
||||
.map(|user: User| stream_from("public".into(), Filter::Language(user.langs)));
|
||||
|
||||
// GET /api/v1/streaming/public?only_media=true
|
||||
// GET /api/v1/streaming/public?only_media=true [public; language filter]
|
||||
let public_timeline_media = path!("api" / "v1" / "streaming" / "public")
|
||||
.and(path::end())
|
||||
.and(user::get_access_token(user::Scope::Public))
|
||||
.and_then(|token| user::get_account(token, Scope::Public))
|
||||
.and(warp::query())
|
||||
.map(|q: query::Media| match q.only_media.as_ref() {
|
||||
"1" | "true" => pubsub::stream_from("public:media".to_string()),
|
||||
_ => pubsub::stream_from("public".to_string()),
|
||||
.map(|user: User, q: query::Media| match q.only_media.as_ref() {
|
||||
"1" | "true" => stream_from("public:media".into(), Filter::Language(user.langs)),
|
||||
_ => stream_from("public".into(), Filter::Language(user.langs)),
|
||||
});
|
||||
|
||||
// GET /api/v1/streaming/public/local
|
||||
// GET /api/v1/streaming/public/local [public; language filter]
|
||||
let local_timeline = path!("api" / "v1" / "streaming" / "public" / "local")
|
||||
.and(path::end())
|
||||
.map(|| pubsub::stream_from("public:local".to_string()));
|
||||
.and(user::get_access_token(user::Scope::Public))
|
||||
.and_then(|token| user::get_account(token, Scope::Public))
|
||||
.map(|user: User| stream_from("public:local".into(), Filter::Language(user.langs)));
|
||||
|
||||
// GET /api/v1/streaming/public/local?only_media=true
|
||||
// GET /api/v1/streaming/public/local?only_media=true [public; language filter]
|
||||
let local_timeline_media = path!("api" / "v1" / "streaming" / "public" / "local")
|
||||
.and(user::get_access_token(user::Scope::Public))
|
||||
.and_then(|token| user::get_account(token, Scope::Public))
|
||||
.and(warp::query())
|
||||
.and(path::end())
|
||||
.map(|q: query::Media| match q.only_media.as_ref() {
|
||||
"1" | "true" => pubsub::stream_from("public:local:media".to_string()),
|
||||
_ => pubsub::stream_from("public:local".to_string()),
|
||||
.map(|user: User, q: query::Media| match q.only_media.as_ref() {
|
||||
"1" | "true" => stream_from("public:local:media".into(), Filter::Language(user.langs)),
|
||||
_ => stream_from("public:local".into(), Filter::None),
|
||||
});
|
||||
|
||||
// GET /api/v1/streaming/direct
|
||||
// GET /api/v1/streaming/direct [private; *no* filter]
|
||||
let direct_timeline = path!("api" / "v1" / "streaming" / "direct")
|
||||
.and(path::end())
|
||||
.and(auth::get_token())
|
||||
.and_then(auth::get_account_id_from_token)
|
||||
.map(|account_id: i64| pubsub::stream_from(format!("direct:{}", account_id)));
|
||||
.and(user::get_access_token(Scope::Private))
|
||||
.and_then(|token| user::get_account(token, Scope::Private))
|
||||
.map(|account: User| stream_from(format!("direct:{}", account.id), Filter::None));
|
||||
|
||||
// GET /api/v1/streaming/hashtag?tag=:hashtag
|
||||
// GET /api/v1/streaming/hashtag?tag=:hashtag [public; no filter]
|
||||
let hashtag_timeline = path!("api" / "v1" / "streaming" / "hashtag")
|
||||
.and(warp::query())
|
||||
.and(path::end())
|
||||
.map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}", q.tag)));
|
||||
.map(|q: query::Hashtag| stream_from(format!("hashtag:{}", q.tag), Filter::None));
|
||||
|
||||
// GET /api/v1/streaming/hashtag/local?tag=:hashtag
|
||||
// GET /api/v1/streaming/hashtag/local?tag=:hashtag [public; no filter]
|
||||
let hashtag_timeline_local = path!("api" / "v1" / "streaming" / "hashtag" / "local")
|
||||
.and(warp::query())
|
||||
.and(path::end())
|
||||
.map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}:local", q.tag)));
|
||||
.map(|q: query::Hashtag| stream_from(format!("hashtag:{}:local", q.tag), Filter::None));
|
||||
|
||||
// GET /api/v1/streaming/list?list=:list_id
|
||||
// GET /api/v1/streaming/list?list=:list_id [private; no filter]
|
||||
let list_timeline = path!("api" / "v1" / "streaming" / "list")
|
||||
.and(user::get_access_token(Scope::Private))
|
||||
.and_then(|token| user::get_account(token, Scope::Private))
|
||||
.and(warp::query())
|
||||
.and(path::end())
|
||||
.map(|q: query::List| pubsub::stream_from(format!("list:{}", q.list)));
|
||||
// TODO: filter down to lists the user can access
|
||||
.map(|_user: User, q: query::List| stream_from(format!("list:{}", q.list), Filter::None));
|
||||
|
||||
let routes = or!(
|
||||
user_timeline,
|
||||
|
@ -96,11 +105,17 @@ fn main() {
|
|||
.and_then(|event_stream| event_stream)
|
||||
.and(warp::sse())
|
||||
.map(|event_stream: pubsub::Receiver, sse: warp::sse::Sse| {
|
||||
let filter = event_stream.filter.clone();
|
||||
sse.reply(warp::sse::keep(
|
||||
event_stream.map(|item| {
|
||||
event_stream.filter_map(move |item| {
|
||||
let payload = item["payload"].clone();
|
||||
let event = item["event"].clone();
|
||||
(warp::sse::event(event), warp::sse::data(payload))
|
||||
let event = item["event"].to_string().clone();
|
||||
let lang = item["language"].to_string().clone();
|
||||
match filter {
|
||||
Filter::Notification if event != "notification" => None,
|
||||
Filter::Language(ref vec) if !vec.contains(&lang) => None,
|
||||
_ => Some((warp::sse::event(event), warp::sse::data(payload))),
|
||||
}
|
||||
}),
|
||||
None,
|
||||
))
|
||||
|
|
|
@ -6,8 +6,15 @@ use tokio::io::{AsyncRead, AsyncWrite, Error, ReadHalf, WriteHalf};
|
|||
use tokio::net::TcpStream;
|
||||
use warp::Stream;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum Filter {
|
||||
None,
|
||||
Language(Vec<String>),
|
||||
Notification,
|
||||
}
|
||||
pub struct Receiver {
|
||||
rx: ReadHalf<TcpStream>,
|
||||
pub filter: Filter,
|
||||
}
|
||||
impl Stream for Receiver {
|
||||
type Item = Value;
|
||||
|
@ -64,14 +71,14 @@ fn send_subscribe_cmd(tx: WriteHalf<TcpStream>, channel: String) {
|
|||
/// Create a stream from a string.
|
||||
pub fn stream_from(
|
||||
timeline: String,
|
||||
filter: Filter,
|
||||
) -> impl Future<Item = Receiver, Error = warp::reject::Rejection> {
|
||||
get_socket()
|
||||
.and_then(move |socket| {
|
||||
let (rx, tx) = socket.split();
|
||||
send_subscribe_cmd(tx, format!("timeline:{}", timeline));
|
||||
let stream_of_data_from_redis = Receiver { rx };
|
||||
let stream_of_data_from_redis = Receiver { rx, filter };
|
||||
Ok(stream_of_data_from_redis)
|
||||
})
|
||||
.and_then(Ok)
|
||||
.map_err(warp::reject::custom)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
use crate::{or, query};
|
||||
use postgres;
|
||||
use warp::Filter;
|
||||
|
||||
pub fn get_access_token(scope: Scope) -> warp::filters::BoxedFilter<(String,)> {
|
||||
let token_from_header = warp::header::header::<String>("authorization")
|
||||
.map(|auth: String| auth.split(' ').nth(1).unwrap_or("invalid").to_string());
|
||||
let token_from_query = warp::query().map(|q: query::Auth| q.access_token);
|
||||
let public = warp::any().map(|| "no access token".to_string());
|
||||
|
||||
match scope {
|
||||
// if they're trying to access a private scope without an access token, reject the request
|
||||
Scope::Private => or!(token_from_query, token_from_header).boxed(),
|
||||
// if they're trying to access a public scope without an access token, proceed
|
||||
Scope::Public => or!(token_from_query, token_from_header, public).boxed(),
|
||||
}
|
||||
}
|
||||
|
||||
fn conn() -> postgres::Connection {
|
||||
postgres::Connection::connect(
|
||||
"postgres://dsock@localhost/mastodon_development",
|
||||
postgres::TlsMode::None,
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub struct User {
|
||||
pub id: String,
|
||||
pub langs: Vec<String>,
|
||||
pub logged_in: bool,
|
||||
}
|
||||
|
||||
pub enum Scope {
|
||||
Public,
|
||||
Private,
|
||||
}
|
||||
pub fn get_account(token: String, scope: Scope) -> Result<User, warp::reject::Rejection> {
|
||||
let conn = conn();
|
||||
let result = &conn
|
||||
.query(
|
||||
"
|
||||
SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages
|
||||
FROM
|
||||
oauth_access_tokens
|
||||
INNER JOIN users ON
|
||||
oauth_access_tokens.resource_owner_id = users.id
|
||||
WHERE oauth_access_tokens.token = $1
|
||||
AND oauth_access_tokens.revoked_at IS NULL
|
||||
LIMIT 1",
|
||||
&[&token],
|
||||
)
|
||||
.expect("Hard-coded query will return Some([0 or more rows])");
|
||||
if !result.is_empty() {
|
||||
let only_row = result.get(0);
|
||||
let id: i64 = only_row.get(1);
|
||||
let langs: Vec<String> = only_row.get(2);
|
||||
Ok(User {
|
||||
id: id.to_string(),
|
||||
langs,
|
||||
logged_in: true,
|
||||
})
|
||||
} else if let Scope::Public = scope {
|
||||
Ok(User {
|
||||
id: String::new(),
|
||||
langs: Vec::new(),
|
||||
logged_in: false,
|
||||
})
|
||||
} else {
|
||||
Err(warp::reject::custom("Error: Invalid access token"))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue