diff --git a/src/main.rs b/src/main.rs index 3d9f191..8afcc13 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,7 @@ fn main() -> Result<(), Error> { .map(move |subscription: Subscription, sse: warp::sse::Sse| { log::info!("Incoming SSE request for {:?}", subscription.timeline); let mut manager = sse_manager.lock().unwrap_or_else(RedisManager::recover); - let (event_tx, event_rx) = mpsc::unbounded_channel(); + let (event_tx, event_rx) = mpsc::channel(5); manager.subscribe(&subscription, event_tx); let sse_stream = SseStream::new(subscription); sse_stream.send_events(sse, event_rx) @@ -46,7 +46,7 @@ fn main() -> Result<(), Error> { .map(move |subscription: Subscription, ws: Ws2| { log::info!("Incoming websocket request for {:?}", subscription.timeline); let mut manager = ws_manager.lock().unwrap_or_else(RedisManager::recover); - let (event_tx, event_rx) = mpsc::unbounded_channel(); + let (event_tx, event_rx) = mpsc::channel(5); manager.subscribe(&subscription, event_tx); let token = subscription.access_token.clone().unwrap_or_default(); // token sent for security let ws_stream = WsStream::new(subscription); diff --git a/src/response/redis/manager.rs b/src/response/redis/manager.rs index eed4507..ffdc6b0 100644 --- a/src/response/redis/manager.rs +++ b/src/response/redis/manager.rs @@ -15,14 +15,14 @@ use futures::Async; use hashbrown::HashMap; use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; use std::time::{Duration, Instant}; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::Sender; type Result = std::result::Result; /// The item that streams from Redis and is polled by the `ClientAgent` pub struct Manager { redis_connection: RedisConn, - timelines: HashMap>>, + timelines: HashMap>>, ping_time: Instant, channel_id: u32, } @@ -43,7 +43,7 @@ impl Manager { Arc::new(Mutex::new(self)) } - pub fn subscribe(&mut self, subscription: &Subscription, channel: UnboundedSender) { + pub fn subscribe(&mut self, subscription: &Subscription, channel: Sender) { let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline); if let (Some(hashtag), Some(id)) = (tag, tl.tag()) { self.redis_connection.update_cache(hashtag, id); diff --git a/src/response/stream/sse.rs b/src/response/stream/sse.rs index 973c04c..9ee5bd2 100644 --- a/src/response/stream/sse.rs +++ b/src/response/stream/sse.rs @@ -3,11 +3,11 @@ use crate::request::Subscription; use futures::stream::Stream; use std::time::Duration; -use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc::Receiver; use warp::reply::Reply; use warp::sse::Sse as WarpSse; -type EventRx = UnboundedReceiver; +type EventRx = Receiver; pub struct Sse(Subscription); diff --git a/src/response/stream/ws.rs b/src/response/stream/ws.rs index e4d38e1..b90f001 100644 --- a/src/response/stream/ws.rs +++ b/src/response/stream/ws.rs @@ -3,10 +3,10 @@ use crate::request::Subscription; use futures::future::Future; use futures::stream::Stream; -use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{self, Receiver, UnboundedSender}; use warp::ws::{Message, WebSocket}; -type EventRx = UnboundedReceiver; +type EventRx = Receiver; type MsgTx = UnboundedSender; pub struct Ws(Subscription);