From 2096e1348bdfaeff40f171d670ce1753e39bc93b Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Thu, 19 Mar 2020 13:20:48 -0400 Subject: [PATCH] Add support for announcement event type (#96) --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/redis_to_client_stream/client_agent.rs | 2 + src/redis_to_client_stream/message.rs | 52 +++++++++++++++++++--- 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ba99a7..1433f7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -440,7 +440,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "flodgatt" -version = "0.6.1" +version = "0.6.2" dependencies = [ "criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "dotenv 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 1fc7b75..f3253a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "flodgatt" description = "A blazingly fast drop-in replacement for the Mastodon streaming api server" -version = "0.6.1" +version = "0.6.2" authors = ["Daniel Long Sockwell "] edition = "2018" diff --git a/src/redis_to_client_stream/client_agent.rs b/src/redis_to_client_stream/client_agent.rs index f5fb56c..ee1fe4f 100644 --- a/src/redis_to_client_stream/client_agent.rs +++ b/src/redis_to_client_stream/client_agent.rs @@ -117,6 +117,8 @@ impl futures::stream::Stream for ClientAgent { Conversation(notification) => send(Conversation(notification)), Delete(status_id) => send(Delete(status_id)), FiltersChanged => send(FiltersChanged), + Announcement(content) => send(Announcement(content)), + UnknownEvent(event, payload) => send(UnknownEvent(event, payload)), }, Ok(Ready(None)) => Ok(Ready(None)), Ok(NotReady) => Ok(NotReady), diff --git a/src/redis_to_client_stream/message.rs b/src/redis_to_client_stream/message.rs index ad35d18..31b7d76 100644 --- a/src/redis_to_client_stream/message.rs +++ b/src/redis_to_client_stream/message.rs @@ -11,13 +11,23 @@ pub enum Message { Notification(Value), Delete(String), FiltersChanged, + Announcement(AnnouncementType), + UnknownEvent(String, Value), } #[derive(Debug, Clone)] pub struct Status(Value); +#[derive(Debug, Clone)] +pub enum AnnouncementType { + New(Value), + Delete(String), + Reaction(Value), +} + impl Message { pub fn from_json(json: Value) -> Self { + use AnnouncementType::*; let event = json["event"] .as_str() .unwrap_or_else(|| log_fatal!("Could not process `event` in {:?}", json)); @@ -32,20 +42,48 @@ impl Message { .to_string(), ), "filters_changed" => Self::FiltersChanged, - unsupported_event => log_fatal!( - "Received an unsupported `event` type from Redis: {}", - unsupported_event - ), + "announcement" => Self::Announcement(New(json["payload"].clone())), + "announcement.reaction" => Self::Announcement(Reaction(json["payload"].clone())), + "announcement.delete" => Self::Announcement(Delete( + json["payload"] + .as_str() + .unwrap_or_else(|| log_fatal!("Could not process `payload` in {:?}", json)) + .to_string(), + )), + unexpected_event => { + log::warn!( + "Received an unexpected `event` type from Redis: {}", + unexpected_event + ); + Self::UnknownEvent(event.to_string(), json["payload"].clone()) + } } } pub fn event(&self) -> String { - format!("{}", self).to_lowercase() + use AnnouncementType::*; + match self { + Self::Update(_) => "update", + Self::Conversation(_) => "conversation", + Self::Notification(_) => "notification", + Self::Announcement(New(_)) => "announcement", + Self::Announcement(Reaction(_)) => "announcement.reaction", + Self::UnknownEvent(event, _) => &event, + Self::Delete(_) => "delete", + Self::Announcement(Delete(_)) => "announcement.delete", + Self::FiltersChanged => "filters_changed", + } + .to_string() } pub fn payload(&self) -> String { + use AnnouncementType::*; match self { - Self::Delete(id) => id.clone(), Self::Update(status) => status.0.to_string(), - Self::Conversation(value) | Self::Notification(value) => value.to_string(), + Self::Conversation(value) + | Self::Notification(value) + | Self::Announcement(New(value)) + | Self::Announcement(Reaction(value)) + | Self::UnknownEvent(_, value) => value.to_string(), + Self::Delete(id) | Self::Announcement(Delete(id)) => id.clone(), Self::FiltersChanged => "".to_string(), } }