From ebe9aeccbceca65818a5e617ef42b09904b39987 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Wed, 11 Mar 2020 13:26:11 -0400 Subject: [PATCH] Hotfix for v0.4.8 (#87) Fixes a regression in v0.4.8 that caused some JSON output to not be properly escaped, leading it to not correctly display in the browser. --- src/redis_to_client_stream/client_agent.rs | 38 +++++++++------------- src/redis_to_client_stream/mod.rs | 20 ++++++++---- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/src/redis_to_client_stream/client_agent.rs b/src/redis_to_client_stream/client_agent.rs index c71be4e..00aab56 100644 --- a/src/redis_to_client_stream/client_agent.rs +++ b/src/redis_to_client_stream/client_agent.rs @@ -18,7 +18,7 @@ use super::receiver::Receiver; use crate::{config, parse_client_request::user::User}; use futures::{Async, Poll}; -use serde_json::{json, Value}; +use serde_json::Value; use std::sync; use tokio::io::Error; use uuid::Uuid; @@ -71,7 +71,7 @@ impl ClientAgent { /// The stream that the `ClientAgent` manages. `Poll` is the only method implemented. impl futures::stream::Stream for ClientAgent { - type Item = Value; + type Item = Toot; type Error = Error; /// Checks for any new messages that should be sent to the client. @@ -102,17 +102,18 @@ impl futures::stream::Stream for ClientAgent { let toot = Toot::from_json(value); toot.filter(&user) } - Ok(inner_value) => Ok(inner_value), + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), Err(e) => Err(e), } } } /// The message to send to the client (which might not literally be a toot in some cases). -struct Toot { - category: String, - payload: Value, - language: Option, +pub struct Toot { + pub category: String, + pub payload: Value, + pub language: Option, } impl Toot { @@ -132,29 +133,20 @@ impl Toot { } } - /// Convert a `Toot` to JSON inside an Option. - fn to_optional_json(&self) -> Option { - Some(json!( - {"event": self.category, - "payload": self.payload,} - )) - } - /// Filter out any `Toot`'s that fail the provided filter. - fn filter(&self, user: &User) -> Result>, Error> { + fn filter(self, user: &User) -> Result>, Error> { let toot = self; - let (send_msg, skip_msg) = ( - Ok(Async::Ready(toot.to_optional_json())), - Ok(Async::NotReady), - ); + let category = toot.category.clone(); + let toot_language = &toot.language.clone().expect("Valid lanugage"); + let (send_msg, skip_msg) = (Ok(Async::Ready(Some(toot))), Ok(Async::NotReady)); - if toot.category == "update" { + if category == "update" { use crate::parse_client_request::user::Filter; - let toot_language = &toot.language.clone().expect("Valid lanugage"); + match &user.filter { Filter::NoFilter => send_msg, - Filter::Notification if toot.category == "notification" => send_msg, + Filter::Notification if category == "notification" => send_msg, // If not, skip it Filter::Notification => skip_msg, Filter::Language if user.langs.is_none() => send_msg, diff --git a/src/redis_to_client_stream/mod.rs b/src/redis_to_client_stream/mod.rs index 609843b..95b0a64 100644 --- a/src/redis_to_client_stream/mod.rs +++ b/src/redis_to_client_stream/mod.rs @@ -6,6 +6,7 @@ pub mod redis; pub use client_agent::ClientAgent; use futures::{future::Future, stream::Stream, Async}; use log; +use serde_json::json; use std::time; /// Send a stream of replies to a Server Sent Events client. @@ -16,9 +17,9 @@ pub fn send_updates_to_sse( ) -> impl warp::reply::Reply { let event_stream = tokio::timer::Interval::new(time::Instant::now(), update_interval) .filter_map(move |_| match client_agent.poll() { - Ok(Async::Ready(Some(json_value))) => Some(( - warp::sse::event(json_value["event"].clone().to_string()), - warp::sse::data(json_value["payload"].clone()), + Ok(Async::Ready(Some(toot))) => Some(( + warp::sse::event(toot.category), + warp::sse::data(toot.payload), )), _ => None, }); @@ -89,10 +90,15 @@ pub fn send_updates_to_ws( // Every time you get an event from that stream, send it through the pipe event_stream .for_each(move |_instant| { - if let Ok(Async::Ready(Some(json_value))) = client_agent.poll() { - let toot = json_value["payload"]["content"].clone(); - log::warn!("toot: {}\n in TL: {}\nuser: {}({})", toot, tl, email, id); - let msg = warp::ws::Message::text(json_value.to_string()); + if let Ok(Async::Ready(Some(toot))) = client_agent.poll() { + let txt = &toot.payload["content"]; + log::warn!("toot: {}\n in TL: {}\nuser: {}({})", txt, tl, email, id); + + let msg = warp::ws::Message::text( + json!({"event": toot.category, + "payload": toot.payload.to_string()}) + .to_string(), + ); tx.unbounded_send(msg).expect("No send error"); };