This code should have been included with the previous PR
This commit is contained in:
Daniel Sockwell 2019-04-21 09:31:16 -04:00
parent e1257146cd
commit 4832f59f2f
2 changed files with 35 additions and 24 deletions

View File

@ -112,6 +112,9 @@ fn main() {
let payload = item["payload"].clone();
let event = item["event"].to_string().clone();
let toot_lang = item["language"].to_string().clone();
println!("ding");
match &user.filter {
Filter::Notification if event != "notification" => None,
Filter::Language if !user.langs.contains(&toot_lang) => None,
@ -121,6 +124,7 @@ fn main() {
None,
))
})
.with(warp::reply::with::header("Connection", "keep-alive"))
.recover(error::handle_errors);
warp::serve(routes).run(([127, 0, 0, 1], 3030));

View File

@ -9,8 +9,30 @@ use warp::Stream;
pub struct Receiver {
rx: ReadHalf<TcpStream>,
tx: WriteHalf<TcpStream>,
timeline: String,
pub user: User,
}
impl Receiver {
fn new(socket: TcpStream, timeline: String, user: User) -> Self {
let (rx, mut tx) = socket.split();
let channel = format!("timeline:{}", timeline);
info!("Subscribing to {}", &channel);
let subscribe_cmd = format!(
"*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n",
channel.len(),
channel
);
let buffer = subscribe_cmd.as_bytes();
tx.poll_write(&buffer).unwrap();
Self {
rx,
tx,
timeline,
user,
}
}
}
impl Stream for Receiver {
type Item = Value;
type Error = Error;
@ -31,24 +53,16 @@ impl Stream for Receiver {
Ok(Async::NotReady)
}
}
struct Sender {
tx: WriteHalf<TcpStream>,
channel: String,
}
impl Future for Sender {
type Item = ();
type Error = Box<Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
info!("Subscribing to {}", &self.channel);
let subscribe_cmd = format!(
impl Drop for Receiver {
fn drop(&mut self) {
let channel = format!("timeline:{}", self.timeline);
let unsubscribe_cmd = format!(
"*2\r\n$9\r\nsubscribe\r\n${}\r\n{}\r\n",
self.channel.len(),
self.channel
channel.len(),
channel
);
let buffer = subscribe_cmd.as_bytes();
self.tx.poll_write(&buffer)?;
Ok(Async::NotReady)
self.tx.poll_write(unsubscribe_cmd.as_bytes()).unwrap();
println!("Receiver got dropped!");
}
}
@ -58,20 +72,13 @@ fn get_socket() -> impl Future<Item = TcpStream, Error = Box<Error>> {
connection.and_then(Ok).map_err(Box::new)
}
fn send_subscribe_cmd(tx: WriteHalf<TcpStream>, channel: String) {
let sender = Sender { tx, channel };
tokio::spawn(sender.map_err(|e| eprintln!("{}", e)));
}
pub fn stream_from(
timeline: String,
user: User,
) -> impl Future<Item = Receiver, Error = warp::reject::Rejection> {
get_socket()
.and_then(move |socket| {
let (rx, tx) = socket.split();
send_subscribe_cmd(tx, format!("timeline:{}", timeline));
let stream_of_data_from_redis = Receiver { rx, user };
let stream_of_data_from_redis = Receiver::new(socket, timeline, user);
Ok(stream_of_data_from_redis)
})
.map_err(warp::reject::custom)