Add language and notification filters

This commit is contained in:
Daniel Sockwell 2019-04-19 17:06:29 -04:00
parent 4b39009556
commit 5011abaf77
5 changed files with 135 additions and 94 deletions

View File

@ -1,52 +0,0 @@
use super::query;
use postgres;
use warp::Filter;
pub fn get_token() -> warp::filters::BoxedFilter<(String,)> {
let token_from_header = warp::header::header::<String>("authorization")
.map(|auth: String| auth.split(' ').nth(1).unwrap_or("invalid").to_string());
let token_from_query = warp::query().map(|q: query::Auth| q.access_token);
token_from_query.or(token_from_header).unify().boxed()
}
pub fn get_account_id_from_token(token: String) -> Result<i64, warp::reject::Rejection> {
if let Ok(account_id) = get_account_id(token) {
Ok(account_id)
} else {
Err(warp::reject::custom("Error: Invalid access token"))
}
}
fn conn() -> postgres::Connection {
postgres::Connection::connect(
"postgres://dsock@localhost/mastodon_development",
postgres::TlsMode::None,
)
.unwrap()
}
pub fn get_account_id(token: String) -> Result<i64, ()> {
let conn = conn();
let result = &conn
.query(
"
SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages
FROM
oauth_access_tokens
INNER JOIN users ON
oauth_access_tokens.resource_owner_id = users.id
WHERE oauth_access_tokens.token = $1
AND oauth_access_tokens.revoked_at IS NULL
LIMIT 1",
&[&token],
)
.expect("Hard-coded query will return Some([0 or more rows])");
if !result.is_empty() {
let only_row = result.get(0);
let account_id: i64 = only_row.get(1);
Ok(account_id)
} else {
Err(())
}
}

View File

@ -19,7 +19,7 @@ pub fn handle_errors(
"Error: Missing access token".to_string()
}
Some(text) => text.to_string(),
None => "Unknown server error".to_string(),
None => "Error: Nonexistant endpoint".to_string(),
};
let json = warp::reply::json(&ErrorMessage::new(err_txt));
Ok(warp::reply::with_status(

View File

@ -1,85 +1,94 @@
mod auth;
mod error;
mod pubsub;
mod query;
mod user;
mod utils;
use futures::stream::Stream;
use pretty_env_logger;
use warp::{path, Filter};
use pubsub::{stream_from, Filter};
use user::{Scope, User};
use warp::{path, Filter as WarpFilter};
fn main() {
pretty_env_logger::init();
// GET /api/v1/streaming/user
// GET /api/v1/streaming/user [private; language filter]
let user_timeline = path!("api" / "v1" / "streaming" / "user")
.and(path::end())
.and(auth::get_token())
.and_then(auth::get_account_id_from_token)
.map(|account_id: i64| pubsub::stream_from(account_id.to_string()));
.and(user::get_access_token(Scope::Private))
.and_then(|token| user::get_account(token, Scope::Private))
.map(|user: User| stream_from(user.id, Filter::None));
// GET /api/v1/streaming/user/notification
// GET /api/v1/streaming/user/notification [private; notification filter]
let user_timeline_notifications = path!("api" / "v1" / "streaming" / "user" / "notification")
.and(path::end())
.and(auth::get_token())
.and_then(auth::get_account_id_from_token)
.map(|account_id: i64| {
let full_stream = pubsub::stream_from(account_id.to_string());
// TODO: filter stream to just have notifications
full_stream
});
.and(user::get_access_token(Scope::Private))
.and_then(|token| user::get_account(token, Scope::Private))
.map(|user: User| stream_from(user.id, Filter::Notification));
// GET /api/v1/streaming/public
// GET /api/v1/streaming/public [public; language filter]
let public_timeline = path!("api" / "v1" / "streaming" / "public")
.and(path::end())
.map(|| pubsub::stream_from("public".to_string()));
.and(user::get_access_token(user::Scope::Public))
.and_then(|token| user::get_account(token, Scope::Public))
.map(|user: User| stream_from("public".into(), Filter::Language(user.langs)));
// GET /api/v1/streaming/public?only_media=true
// GET /api/v1/streaming/public?only_media=true [public; language filter]
let public_timeline_media = path!("api" / "v1" / "streaming" / "public")
.and(path::end())
.and(user::get_access_token(user::Scope::Public))
.and_then(|token| user::get_account(token, Scope::Public))
.and(warp::query())
.map(|q: query::Media| match q.only_media.as_ref() {
"1" | "true" => pubsub::stream_from("public:media".to_string()),
_ => pubsub::stream_from("public".to_string()),
.map(|user: User, q: query::Media| match q.only_media.as_ref() {
"1" | "true" => stream_from("public:media".into(), Filter::Language(user.langs)),
_ => stream_from("public".into(), Filter::Language(user.langs)),
});
// GET /api/v1/streaming/public/local
// GET /api/v1/streaming/public/local [public; language filter]
let local_timeline = path!("api" / "v1" / "streaming" / "public" / "local")
.and(path::end())
.map(|| pubsub::stream_from("public:local".to_string()));
.and(user::get_access_token(user::Scope::Public))
.and_then(|token| user::get_account(token, Scope::Public))
.map(|user: User| stream_from("public:local".into(), Filter::Language(user.langs)));
// GET /api/v1/streaming/public/local?only_media=true
// GET /api/v1/streaming/public/local?only_media=true [public; language filter]
let local_timeline_media = path!("api" / "v1" / "streaming" / "public" / "local")
.and(user::get_access_token(user::Scope::Public))
.and_then(|token| user::get_account(token, Scope::Public))
.and(warp::query())
.and(path::end())
.map(|q: query::Media| match q.only_media.as_ref() {
"1" | "true" => pubsub::stream_from("public:local:media".to_string()),
_ => pubsub::stream_from("public:local".to_string()),
.map(|user: User, q: query::Media| match q.only_media.as_ref() {
"1" | "true" => stream_from("public:local:media".into(), Filter::Language(user.langs)),
_ => stream_from("public:local".into(), Filter::None),
});
// GET /api/v1/streaming/direct
// GET /api/v1/streaming/direct [private; *no* filter]
let direct_timeline = path!("api" / "v1" / "streaming" / "direct")
.and(path::end())
.and(auth::get_token())
.and_then(auth::get_account_id_from_token)
.map(|account_id: i64| pubsub::stream_from(format!("direct:{}", account_id)));
.and(user::get_access_token(Scope::Private))
.and_then(|token| user::get_account(token, Scope::Private))
.map(|account: User| stream_from(format!("direct:{}", account.id), Filter::None));
// GET /api/v1/streaming/hashtag?tag=:hashtag
// GET /api/v1/streaming/hashtag?tag=:hashtag [public; no filter]
let hashtag_timeline = path!("api" / "v1" / "streaming" / "hashtag")
.and(warp::query())
.and(path::end())
.map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}", q.tag)));
.map(|q: query::Hashtag| stream_from(format!("hashtag:{}", q.tag), Filter::None));
// GET /api/v1/streaming/hashtag/local?tag=:hashtag
// GET /api/v1/streaming/hashtag/local?tag=:hashtag [public; no filter]
let hashtag_timeline_local = path!("api" / "v1" / "streaming" / "hashtag" / "local")
.and(warp::query())
.and(path::end())
.map(|q: query::Hashtag| pubsub::stream_from(format!("hashtag:{}:local", q.tag)));
.map(|q: query::Hashtag| stream_from(format!("hashtag:{}:local", q.tag), Filter::None));
// GET /api/v1/streaming/list?list=:list_id
// GET /api/v1/streaming/list?list=:list_id [private; no filter]
let list_timeline = path!("api" / "v1" / "streaming" / "list")
.and(user::get_access_token(Scope::Private))
.and_then(|token| user::get_account(token, Scope::Private))
.and(warp::query())
.and(path::end())
.map(|q: query::List| pubsub::stream_from(format!("list:{}", q.list)));
// TODO: filter down to lists the user can access
.map(|_user: User, q: query::List| stream_from(format!("list:{}", q.list), Filter::None));
let routes = or!(
user_timeline,
@ -96,11 +105,17 @@ fn main() {
.and_then(|event_stream| event_stream)
.and(warp::sse())
.map(|event_stream: pubsub::Receiver, sse: warp::sse::Sse| {
let filter = event_stream.filter.clone();
sse.reply(warp::sse::keep(
event_stream.map(|item| {
event_stream.filter_map(move |item| {
let payload = item["payload"].clone();
let event = item["event"].clone();
(warp::sse::event(event), warp::sse::data(payload))
let event = item["event"].to_string().clone();
let lang = item["language"].to_string().clone();
match filter {
Filter::Notification if event != "notification" => None,
Filter::Language(ref vec) if !vec.contains(&lang) => None,
_ => Some((warp::sse::event(event), warp::sse::data(payload))),
}
}),
None,
))

View File

@ -6,8 +6,15 @@ use tokio::io::{AsyncRead, AsyncWrite, Error, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use warp::Stream;
#[derive(Clone)]
pub enum Filter {
None,
Language(Vec<String>),
Notification,
}
pub struct Receiver {
rx: ReadHalf<TcpStream>,
pub filter: Filter,
}
impl Stream for Receiver {
type Item = Value;
@ -64,14 +71,14 @@ fn send_subscribe_cmd(tx: WriteHalf<TcpStream>, channel: String) {
/// Create a stream from a string.
pub fn stream_from(
timeline: String,
filter: Filter,
) -> impl Future<Item = Receiver, Error = warp::reject::Rejection> {
get_socket()
.and_then(move |socket| {
let (rx, tx) = socket.split();
send_subscribe_cmd(tx, format!("timeline:{}", timeline));
let stream_of_data_from_redis = Receiver { rx };
let stream_of_data_from_redis = Receiver { rx, filter };
Ok(stream_of_data_from_redis)
})
.and_then(Ok)
.map_err(warp::reject::custom)
}

71
src/user.rs Normal file
View File

@ -0,0 +1,71 @@
use crate::{or, query};
use postgres;
use warp::Filter;
pub fn get_access_token(scope: Scope) -> warp::filters::BoxedFilter<(String,)> {
let token_from_header = warp::header::header::<String>("authorization")
.map(|auth: String| auth.split(' ').nth(1).unwrap_or("invalid").to_string());
let token_from_query = warp::query().map(|q: query::Auth| q.access_token);
let public = warp::any().map(|| "no access token".to_string());
match scope {
// if they're trying to access a private scope without an access token, reject the request
Scope::Private => or!(token_from_query, token_from_header).boxed(),
// if they're trying to access a public scope without an access token, proceed
Scope::Public => or!(token_from_query, token_from_header, public).boxed(),
}
}
fn conn() -> postgres::Connection {
postgres::Connection::connect(
"postgres://dsock@localhost/mastodon_development",
postgres::TlsMode::None,
)
.unwrap()
}
pub struct User {
pub id: String,
pub langs: Vec<String>,
pub logged_in: bool,
}
pub enum Scope {
Public,
Private,
}
pub fn get_account(token: String, scope: Scope) -> Result<User, warp::reject::Rejection> {
let conn = conn();
let result = &conn
.query(
"
SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages
FROM
oauth_access_tokens
INNER JOIN users ON
oauth_access_tokens.resource_owner_id = users.id
WHERE oauth_access_tokens.token = $1
AND oauth_access_tokens.revoked_at IS NULL
LIMIT 1",
&[&token],
)
.expect("Hard-coded query will return Some([0 or more rows])");
if !result.is_empty() {
let only_row = result.get(0);
let id: i64 = only_row.get(1);
let langs: Vec<String> = only_row.get(2);
Ok(User {
id: id.to_string(),
langs,
logged_in: true,
})
} else if let Scope::Public = scope {
Ok(User {
id: String::new(),
langs: Vec::new(),
logged_in: false,
})
} else {
Err(warp::reject::custom("Error: Invalid access token"))
}
}