From e1be0db9745dbc18b40ba5f9fa0ee352eb2ec798 Mon Sep 17 00:00:00 2001 From: Daniel Sockwell Date: Wed, 8 Apr 2020 18:13:49 -0400 Subject: [PATCH] Improve pubsub implementation [WIP] --- Cargo.toml | 6 +- src/main.rs | 56 +++- .../event/checked_event/status/mod.rs | 4 +- src/messages/event/mod.rs | 6 +- src/redis_to_client_stream/event_stream.rs | 276 ++++++++++++++---- src/redis_to_client_stream/receiver/mod.rs | 114 +++----- 6 files changed, 301 insertions(+), 161 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bd642ae..e28b046 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,8 +43,8 @@ stub_status = [] production = [] [profile.release] -lto = "fat" -panic = "abort" -codegen-units = 1 +#lto = "fat" +#panic = "abort" +#codegen-units = 1 diff --git a/src/main.rs b/src/main.rs index 4435ddd..f403111 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,10 @@ use flodgatt::{ redis_to_client_stream::{EventStream, Receiver}, }; use std::{env, fs, net::SocketAddr, os::unix::fs::PermissionsExt}; -use tokio::{net::UnixListener, sync::watch}; +use tokio::{ + net::UnixListener, + sync::{mpsc, watch}, +}; use warp::{http::StatusCode, path, ws::Ws2, Filter, Rejection}; fn main() { @@ -24,8 +27,10 @@ fn main() { let cfg = DeploymentConfig::from_env(env_vars); let pg_pool = PgPool::new(postgres_cfg); - let (tx, rx) = watch::channel((Timeline::empty(), Event::EventNotReady)); - let receiver = Receiver::try_from(redis_cfg, tx) + let (event_tx, event_rx) = watch::channel((Timeline::empty(), Event::Ping)); + let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel(); + let poll_freq = *redis_cfg.polling_interval; + let receiver = Receiver::try_from(redis_cfg, event_tx, subscribe_rx) .unwrap_or_else(|e| { log::error!("{}\nFlodgatt shutting down...", e); std::process::exit(1); @@ -35,7 +40,7 @@ fn main() { // Server Sent Events let sse_receiver = receiver.clone(); - let sse_rx = rx.clone(); + let sse_rx = event_rx.clone(); let whitelist_mode = *cfg.whitelist_mode; let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode) .and(warp::sse()) @@ -75,8 +80,8 @@ fn main() { log::error!("Could not subscribe to the Redis channel: {}", e) }); } - - let ws_rx = rx.clone(); + let ws_subscribe_tx = subscribe_tx.clone(); + let ws_rx = event_rx.clone(); let token = subscription .clone() .access_token @@ -85,7 +90,9 @@ fn main() { // send the updates through the WS connection // (along with the User's access_token which is sent for security) ( - ws.on_upgrade(move |s| EventStream::send_to_ws(s, subscription, ws_rx)), + ws.on_upgrade(move |s| { + EventStream::send_to_ws(s, subscription, ws_rx, ws_subscribe_tx) + }), token, ) }) @@ -98,14 +105,12 @@ fn main() { #[cfg(feature = "stub_status")] let status_endpoints = { - let (r1, r2, r3) = (receiver.clone(), receiver.clone(), receiver.clone()); + let (r1, r3) = (receiver.clone(), receiver.clone()); warp::path!("api" / "v1" / "streaming" / "health") .map(|| "OK") .or(warp::path!("api" / "v1" / "streaming" / "status") .and(warp::path::end()) .map(move || r1.lock().expect("TODO").count_connections())) - .or(warp::path!("api" / "v1" / "streaming" / "status" / "queue") - .map(move || r2.lock().expect("TODO").queue_length())) .or( warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline") .map(move || r3.lock().expect("TODO").list_connections()), @@ -114,11 +119,12 @@ fn main() { #[cfg(not(feature = "stub_status"))] let status_endpoints = warp::path!("api" / "v1" / "streaming" / "health").map(|| "OK"); - let receiver = receiver.clone(); - std::thread::spawn(move || loop { - std::thread::sleep(std::time::Duration::from_millis(1000)); - receiver.lock().unwrap().poll_broadcast(); - }); + // let receiver_old = receiver.clone(); + // // TODO make vvvv a green thread + // std::thread::spawn(move || loop { + // std::thread::sleep(std::time::Duration::from_millis(10000)); + // receiver_old.lock().unwrap().poll_broadcast(); + // }); if let Some(socket) = &*cfg.unix_socket { log::info!("Using Unix socket {}", socket); @@ -146,7 +152,25 @@ fn main() { ) .run_incoming(incoming); } else { + use futures::{future::lazy, stream::Stream as _Stream}; + use std::time::Instant; + let server_addr = SocketAddr::new(*cfg.address, *cfg.port); - warp::serve(ws_routes.or(sse_routes).with(cors).or(status_endpoints)).run(server_addr); + + let receiver = receiver.clone(); + tokio::run(lazy(move || { + let receiver = receiver.clone(); + tokio::spawn(lazy(move || { + tokio::timer::Interval::new(Instant::now(), poll_freq) + .map_err(|e| log::error!("{}", e)) + .for_each(move |_| { + let receiver = receiver.clone(); + receiver.lock().expect("TODO").poll_broadcast(); + Ok(()) + }) + })); + + warp::serve(ws_routes.or(sse_routes).with(cors).or(status_endpoints)).bind(server_addr) + })); }; } diff --git a/src/messages/event/checked_event/status/mod.rs b/src/messages/event/checked_event/status/mod.rs index 8656231..a88512f 100644 --- a/src/messages/event/checked_event/status/mod.rs +++ b/src/messages/event/checked_event/status/mod.rs @@ -21,7 +21,8 @@ pub struct Status { uri: String, created_at: String, account: Account, - content: String, + // TODO remove pub + pub content: String, visibility: Visibility, sensitive: bool, spoiler_text: String, @@ -40,6 +41,7 @@ pub struct Status { poll: Option, card: Option, language: Option, + text: Option, // ↓↓↓ Only for authorized users favourited: Option, diff --git a/src/messages/event/mod.rs b/src/messages/event/mod.rs index e1a8ddc..3c70b7b 100644 --- a/src/messages/event/mod.rs +++ b/src/messages/event/mod.rs @@ -11,7 +11,7 @@ use std::string::String; pub enum Event { TypeSafe(CheckedEvent), Dynamic(DynamicEvent), - EventNotReady, + Ping, } impl Event { @@ -38,7 +38,7 @@ impl Event { CheckedEvent::FiltersChanged => "filters_changed", }, Self::Dynamic(dyn_event) => &dyn_event.event, - Self::EventNotReady => panic!("event_name() called on EventNotReady"), + Self::Ping => panic!("event_name() called on EventNotReady"), }) } @@ -56,7 +56,7 @@ impl Event { FiltersChanged => None, }, Self::Dynamic(dyn_event) => Some(dyn_event.payload.to_string()), - Self::EventNotReady => panic!("payload() called on EventNotReady"), + Self::Ping => panic!("payload() called on EventNotReady"), } } } diff --git a/src/redis_to_client_stream/event_stream.rs b/src/redis_to_client_stream/event_stream.rs index 4603a3f..f25fbd4 100644 --- a/src/redis_to_client_stream/event_stream.rs +++ b/src/redis_to_client_stream/event_stream.rs @@ -4,7 +4,7 @@ use crate::parse_client_request::{Subscription, Timeline}; use futures::{future::Future, stream::Stream}; use log; use std::time::Duration; -use tokio::sync::watch; +use tokio::sync::{mpsc, watch}; use warp::{ reply::Reply, sse::Sse, @@ -17,20 +17,22 @@ impl EventStream { pub fn send_to_ws( ws: WebSocket, subscription: Subscription, - ws_rx: watch::Receiver<(Timeline, Event)>, + event_rx: watch::Receiver<(Timeline, Event)>, + mut subscribe_tx: mpsc::UnboundedSender, ) -> impl Future { let (transmit_to_ws, _receive_from_ws) = ws.split(); let target_timeline = subscription.timeline; - let allowed_langs = subscription.allowed_langs; + let user_langs = subscription.allowed_langs; let blocks = subscription.blocks; // Create a pipe - let (tx, rx) = futures::sync::mpsc::unbounded(); + let (ws_tx, ws_rx) = futures::sync::mpsc::unbounded(); // Send one end of it to a different green thread and tell that end to forward // whatever it gets on to the WebSocket client warp::spawn( - rx.map_err(|()| -> warp::Error { unreachable!() }) + ws_rx + .map_err(|()| -> warp::Error { unreachable!() }) .forward(transmit_to_ws) .map(|_r| ()) .map_err(|e| match e.to_string().as_ref() { @@ -39,43 +41,164 @@ impl EventStream { }), ); - return ws_rx - .for_each(move |(timeline, event)| { - if target_timeline == timeline { - log::info!("Got event for {:?}", timeline); - use crate::messages::{CheckedEvent::Update, Event::*}; - use crate::parse_client_request::Stream::Public; - match event { - TypeSafe(Update { payload, queued_at }) => match timeline { - Timeline(Public, _, _) if payload.language_not(&allowed_langs) => (), - _ if payload.involves_any(&blocks) => (), - // TODO filter vvvv - _ => tx - .unbounded_send(Message::text( - TypeSafe(Update { payload, queued_at }).to_json_string(), - )) - .expect("TODO"), - }, - TypeSafe(non_update) => tx - .unbounded_send(Message::text(TypeSafe(non_update).to_json_string())) - .expect("TODO"), - Dynamic(event) if event.event == "update" => match timeline { - Timeline(Public, _, _) if event.language_not(&allowed_langs) => (), - _ if event.involves_any(&blocks) => (), - // TODO filter vvvv - _ => tx - .unbounded_send(Message::text(Dynamic(event).to_json_string())) - .expect("TODO"), - }, - Dynamic(non_update) => tx - .unbounded_send(Message::text(Dynamic(non_update).to_json_string())) - .expect("TODO"), - EventNotReady => panic!("TODO"), - } + event_rx.map_err(|_| ()).for_each(move |(tl, event)| { + if target_timeline == tl { + use crate::messages::{CheckedEvent::Update, Event::*}; + use crate::parse_client_request::Stream::Public; + + log::info!("Got event for {:?}", tl); + if let Event::TypeSafe(Update { payload, .. }) = event.clone() { + log::info!("{:?}", &payload.content); } - Ok(()) - }) - .map_err(|_| ()); + match event { + Ping => match ws_tx.unbounded_send(Message::text("{}")) { + Ok(_) => Ok(()), + Err(_) => { + subscribe_tx.try_send(tl).expect("TODO"); + Err(()) + } + }, + TypeSafe(Update { payload, queued_at }) => match tl { + Timeline(Public, _, _) if payload.language_not(&user_langs) => Ok(()), + _ if payload.involves_any(&blocks) => Ok(()), + _ => match ws_tx.unbounded_send(Message::text( + TypeSafe(Update { payload, queued_at }).to_json_string(), + )) { + Ok(_) => Ok(()), + Err(_) => { + subscribe_tx.try_send(tl).expect("TODO"); + Err(()) + } + }, + }, + TypeSafe(non_update) => match ws_tx + .unbounded_send(Message::text(TypeSafe(non_update).to_json_string())) + { + Ok(_) => Ok(()), + Err(_) => { + subscribe_tx.try_send(tl).expect("TODO"); + Err(()) + } + }, + Dynamic(event) if event.event == "update" => match tl { + Timeline(Public, _, _) if event.language_not(&user_langs) => Ok(()), + _ if event.involves_any(&blocks) => Ok(()), + _ => match ws_tx + .unbounded_send(Message::text(Dynamic(event).to_json_string())) + { + Ok(_) => Ok(()), + Err(_) => { + subscribe_tx.try_send(tl).expect("TODO"); + Err(()) + } + }, + }, + Dynamic(non_update) => match ws_tx + .unbounded_send(Message::text(Dynamic(non_update).to_json_string())) + { + Ok(_) => Ok(()), + Err(_) => { + subscribe_tx.try_send(tl).expect("TODO"); + Err(()) + } + }, + } + } else { + if let Event::Ping = event { + match ws_tx.unbounded_send(Message::text("{}")) { + Ok(_) => Ok(()), + Err(_) => { + subscribe_tx.try_send(target_timeline).expect("TODO"); + Err(()) + } + } + } else { + Ok(()) + } + } + }) + + // event_rx + // .take_while(move |(tl, event)| { + // let (tl, event) = (*tl, event.clone()); + // if target_timeline == tl { + // log::info!("Got event for {:?}", tl); + // use crate::messages::{CheckedEvent::Update, Event::*}; + // use crate::parse_client_request::Stream::Public; + // match event { + // Ping => match ws_tx.unbounded_send(Message::text("{}")) { + // Ok(_) => futures::future::ok(true), + // Err(_) => { + // subscribe_tx.try_send(tl).expect("TODO"); + // futures::future::ok(false) + // } + // }, + // TypeSafe(Update { payload, queued_at }) => match tl { + // Timeline(Public, _, _) if payload.language_not(&user_langs) => { + // futures::future::ok(true) + // } + // _ if payload.involves_any(&blocks) => futures::future::ok(true), + // _ => match ws_tx.unbounded_send(Message::text( + // TypeSafe(Update { payload, queued_at }).to_json_string(), + // )) { + // Ok(_) => futures::future::ok(true), + // Err(_) => { + // subscribe_tx.try_send(tl).expect("TODO"); + // futures::future::ok(false) + // } + // }, + // }, + // TypeSafe(non_update) => match ws_tx + // .unbounded_send(Message::text(TypeSafe(non_update).to_json_string())) + // { + // Ok(_) => futures::future::ok(true), + // Err(_) => { + // subscribe_tx.try_send(tl).expect("TODO"); + // futures::future::ok(false) + // } + // }, + // Dynamic(event) if event.event == "update" => match tl { + // Timeline(Public, _, _) if event.language_not(&user_langs) => { + // futures::future::ok(true) + // } + // _ if event.involves_any(&blocks) => futures::future::ok(true), + // _ => match ws_tx + // .unbounded_send(Message::text(Dynamic(event).to_json_string())) + // { + // Ok(_) => futures::future::ok(true), + // Err(_) => { + // subscribe_tx.try_send(tl).expect("TODO"); + // futures::future::ok(false) + // } + // }, + // }, + // Dynamic(non_update) => match ws_tx + // .unbounded_send(Message::text(Dynamic(non_update).to_json_string())) + // { + // Ok(_) => futures::future::ok(true), + // Err(_) => { + // subscribe_tx.try_send(tl).expect("TODO"); + // futures::future::ok(false) + // } + // }, + // } + // } else { + // if let Event::Ping = event { + // match ws_tx.unbounded_send(Message::text("{}")) { + // Ok(_) => futures::future::ok(true), + // Err(_) => { + // subscribe_tx.try_send(target_timeline).expect("TODO"); + // futures::future::ok(false) + // } + // } + // } else { + // futures::future::ok(true) + // } + // } + // }) + // .for_each(|_| Ok(())) + // .map_err(|_| ()) + // .map(|_| ()) } pub fn send_to_sse( @@ -96,7 +219,6 @@ impl EventStream { TypeSafe(Update { payload, queued_at }) => match timeline { Timeline(Public, _, _) if payload.language_not(&allowed_langs) => None, _ if payload.involves_any(&blocks) => None, - // TODO filter vvvv _ => { let event = Event::TypeSafe(CheckedEvent::Update { payload, queued_at }); @@ -116,22 +238,16 @@ impl EventStream { Dynamic(event) if event.event == "update" => match timeline { Timeline(Public, _, _) if event.language_not(&allowed_langs) => None, _ if event.involves_any(&blocks) => None, - // TODO filter vvvv - _ => { - let event = Event::Dynamic(event); - Some(( - warp::sse::event(event.event_name()), - warp::sse::data(event.payload().unwrap_or_else(String::new)), - )) - } + _ => Some(( + warp::sse::event(event.event), + warp::sse::data(event.payload.to_string()), + )), }, - Dynamic(non_update) => { - let event = Event::Dynamic(non_update); - Some(( - warp::sse::event(event.event_name()), - warp::sse::data(event.payload().unwrap_or_else(String::new)), - )) - } + Dynamic(non_update) => Some(( + warp::sse::event(non_update.event), + warp::sse::data(non_update.payload.to_string()), + )), + // TODO: Fix for Ping EventNotReady => panic!("TODO"), } } else { @@ -147,3 +263,49 @@ impl EventStream { ) } } + +// if target_timeline == tl { +// log::info!("Got event for {:?}", tl); +// use crate::messages::{CheckedEvent::Update, Event::*}; +// use crate::parse_client_request::Stream::Public; +// match event { +// TypeSafe(Update { payload, queued_at }) => match tl { +// Timeline(Public, _, _) if payload.language_not(&user_langs) => Ok(()), +// _ if payload.involves_any(&blocks) => Ok(()), +// _ => Ok(ws_tx +// .unbounded_send(Message::text( +// TypeSafe(Update { payload, queued_at }).to_json_string(), +// )) +// .unwrap_or_else(|_| subscribe_tx.try_send(tl).expect("TODO"))), +// }, +// TypeSafe(non_update) => Ok(ws_tx +// .unbounded_send(Message::text(TypeSafe(non_update).to_json_string())) +// .unwrap_or_else(|_| subscribe_tx.try_send(tl).expect("TODO"))), +// Dynamic(event) if event.event == "update" => match tl { +// Timeline(Public, _, _) if event.language_not(&user_langs) => Ok(()), +// _ if event.involves_any(&blocks) => Ok(()), +// _ => Ok(ws_tx +// .unbounded_send(Message::text(Dynamic(event).to_json_string())) +// .unwrap_or_else(|_| subscribe_tx.try_send(tl).expect("TODO"))), +// }, +// Dynamic(non_update) => Ok(ws_tx +// .unbounded_send(Message::text(Dynamic(non_update).to_json_string())) +// .unwrap_or_else(|_| subscribe_tx.try_send(tl).expect("TODO"))), +// Ping => Ok(match ws_tx.unbounded_send(Message::text("{}")) { +// Ok(_) => (), +// Err(_) => { +// subscribe_tx.try_send(tl).expect("TODO"); +// } +// }), +// } +// } else { +// if let Event::Ping = event { +// Ok(ws_tx +// .unbounded_send(Message::text("{}")) +// .unwrap_or_else(|_| { +// subscribe_tx.try_send(target_timeline).expect("TODO") +// })) +// } else { +// Ok(()) +// } +// } diff --git a/src/redis_to_client_stream/receiver/mod.rs b/src/redis_to_client_stream/receiver/mod.rs index d487763..b883604 100644 --- a/src/redis_to_client_stream/receiver/mod.rs +++ b/src/redis_to_client_stream/receiver/mod.rs @@ -2,10 +2,7 @@ //! polled by the correct `ClientAgent`. Also manages sububscriptions and //! unsubscriptions to/from Redis. mod err; -mod message_queues; - pub use err::ReceiverErr; -pub use message_queues::{MessageQueues, MsgQueue}; use super::redis::{redis_connection::RedisCmd, RedisConn}; @@ -15,12 +12,9 @@ use crate::{ parse_client_request::{Stream, Subscription, Timeline}, }; -use { - futures::{Async, Poll}, - hashbrown::HashMap, - tokio::sync::watch, - uuid::Uuid, -}; +use futures::{Async, Stream as _Stream}; +use hashbrown::HashMap; +use tokio::sync::{mpsc, watch}; use std::{ result, @@ -34,11 +28,11 @@ type Result = result::Result; #[derive(Debug)] pub struct Receiver { redis_connection: RedisConn, - redis_poll_interval: Duration, - redis_polled_at: Instant, - pub msg_queues: MessageQueues, clients_per_timeline: HashMap, - channel: watch::Sender<(Timeline, Event)>, + tx: watch::Sender<(Timeline, Event)>, + rx: mpsc::UnboundedReceiver, + ping_time: Instant, + time: Instant, } impl Receiver { @@ -46,18 +40,16 @@ impl Receiver { /// active subscriptions). pub fn try_from( redis_cfg: config::RedisConfig, - sender: watch::Sender<(Timeline, Event)>, + tx: watch::Sender<(Timeline, Event)>, + rx: mpsc::UnboundedReceiver, ) -> Result { - let redis_poll_interval = *redis_cfg.polling_interval; - let redis_connection = RedisConn::new(redis_cfg)?; - Ok(Self { - redis_polled_at: Instant::now(), - redis_poll_interval, - redis_connection, - msg_queues: MessageQueues(HashMap::new()), + redis_connection: RedisConn::new(redis_cfg)?, clients_per_timeline: HashMap::new(), - channel: sender, + tx, + rx, + ping_time: Instant::now(), + time: Instant::now(), }) } @@ -65,15 +57,12 @@ impl Receiver { Arc::new(Mutex::new(self)) } - /// Assigns the `Receiver` a new timeline to monitor and runs other - /// first-time setup. pub fn add_subscription(&mut self, subscription: &Subscription) -> Result<()> { let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); if let (Some(hashtag), Timeline(Stream::Hashtag(id), _, _)) = (tag, tl) { self.redis_connection.update_cache(hashtag, id); }; - self.msg_queues.insert(subscription.id, MsgQueue::new(tl)); let number_of_subscriptions = self .clients_per_timeline @@ -85,13 +74,11 @@ impl Receiver { if *number_of_subscriptions == 1 { self.redis_connection.send_cmd(Subscribe, &tl)? }; - + log::info!("Started stream for {:?}", tl); Ok(()) } - pub fn remove_subscription(&mut self, subscription: &Subscription) -> Result<()> { - let tl = subscription.timeline; - self.msg_queues.remove(&subscription.id); + pub fn remove_subscription(&mut self, tl: Timeline) -> Result<()> { let number_of_subscriptions = self .clients_per_timeline .entry(tl) @@ -108,58 +95,33 @@ impl Receiver { self.redis_connection.send_cmd(Unsubscribe, &tl)?; self.clients_per_timeline.remove_entry(&tl); }; - + log::info!("Ended stream for {:?}", tl); Ok(()) } + pub fn poll_broadcast(&mut self) { - loop { + log::info!("{:?}", self.time.elapsed()); + + while let Ok(Async::Ready(Some(tl))) = self.rx.poll() { + self.remove_subscription(tl).expect("TODO"); + } + + if self.ping_time.elapsed() > Duration::from_secs(30) { + self.ping_time = Instant::now(); + self.tx + .broadcast((Timeline::empty(), Event::Ping)) + .expect("TODO"); + } else { match self.redis_connection.poll_redis() { - Ok(Async::NotReady) => break, + Ok(Async::NotReady) => (), Ok(Async::Ready(Some((timeline, event)))) => { - self.channel.broadcast((timeline, event)).expect("TODO"); + self.tx.broadcast((timeline, event)).expect("TODO"); } Ok(Async::Ready(None)) => (), // subscription cmd or msg for other namespace Err(_err) => panic!("TODO"), } } - } - - /// Returns the oldest message in the `ClientAgent`'s queue (if any). - /// - /// Note: This method does **not** poll Redis every time, because polling - /// Redis is significantly more time consuming that simply returning the - /// message already in a queue. Thus, we only poll Redis if it has not - /// been polled lately. - pub fn poll_for(&mut self, id: Uuid) -> Poll, ReceiverErr> { - if self.redis_polled_at.elapsed() > self.redis_poll_interval { - log::info!("Polling Redis"); - loop { - match self.redis_connection.poll_redis() { - Ok(Async::NotReady) => break, - Ok(Async::Ready(Some((timeline, event)))) => { - self.msg_queues - .values_mut() - .filter(|msg_queue| msg_queue.timeline == timeline) - .for_each(|msg_queue| { - msg_queue.messages.push_back(event.clone()); - }); - } - Ok(Async::Ready(None)) => (), // subscription cmd or msg for other namespace - Err(err) => Err(err)?, - } - } - - self.redis_polled_at = Instant::now(); - } - - // If the `msg_queue` being polled has any new messages, return the first (oldest) one - let msg_q = self.msg_queues.get_mut(&id).ok_or(ReceiverErr::InvalidId)?; - let res = match msg_q.messages.pop_front() { - Some(event) => Ok(Async::Ready(Some(event))), - None => Ok(Async::NotReady), - }; - - res + self.time = Instant::now(); } pub fn count_connections(&self) -> String { @@ -182,14 +144,4 @@ impl Receiver { }) .collect() } - - pub fn queue_length(&self) -> String { - format!( - "Longest MessageQueue: {}", - self.msg_queues - .0 - .values() - .fold(0, |acc, el| acc.max(el.messages.len())) - ) - } }