Use replace unbounded channels with bounded

This also slightly reduces memory use
This commit is contained in:
Daniel Sockwell 2020-04-24 09:17:26 -04:00
parent d20ae8a73c
commit 57359194e9
4 changed files with 9 additions and 9 deletions

View File

@ -31,7 +31,7 @@ fn main() -> Result<(), Error> {
.map(move |subscription: Subscription, sse: warp::sse::Sse| {
log::info!("Incoming SSE request for {:?}", subscription.timeline);
let mut manager = sse_manager.lock().unwrap_or_else(RedisManager::recover);
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (event_tx, event_rx) = mpsc::channel(5);
manager.subscribe(&subscription, event_tx);
let sse_stream = SseStream::new(subscription);
sse_stream.send_events(sse, event_rx)
@ -46,7 +46,7 @@ fn main() -> Result<(), Error> {
.map(move |subscription: Subscription, ws: Ws2| {
log::info!("Incoming websocket request for {:?}", subscription.timeline);
let mut manager = ws_manager.lock().unwrap_or_else(RedisManager::recover);
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (event_tx, event_rx) = mpsc::channel(5);
manager.subscribe(&subscription, event_tx);
let token = subscription.access_token.clone().unwrap_or_default(); // token sent for security
let ws_stream = WsStream::new(subscription);

View File

@ -15,14 +15,14 @@ use futures::Async;
use hashbrown::HashMap;
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
use std::time::{Duration, Instant};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::Sender;
type Result<T> = std::result::Result<T, Error>;
/// The item that streams from Redis and is polled by the `ClientAgent`
pub struct Manager {
redis_connection: RedisConn,
timelines: HashMap<Timeline, HashMap<u32, UnboundedSender<Event>>>,
timelines: HashMap<Timeline, HashMap<u32, Sender<Event>>>,
ping_time: Instant,
channel_id: u32,
}
@ -43,7 +43,7 @@ impl Manager {
Arc::new(Mutex::new(self))
}
pub fn subscribe(&mut self, subscription: &Subscription, channel: UnboundedSender<Event>) {
pub fn subscribe(&mut self, subscription: &Subscription, channel: Sender<Event>) {
let (tag, tl) = (subscription.hashtag_name.clone(), subscription.timeline);
if let (Some(hashtag), Some(id)) = (tag, tl.tag()) {
self.redis_connection.update_cache(hashtag, id);

View File

@ -3,11 +3,11 @@ use crate::request::Subscription;
use futures::stream::Stream;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::Receiver;
use warp::reply::Reply;
use warp::sse::Sse as WarpSse;
type EventRx = UnboundedReceiver<Event>;
type EventRx = Receiver<Event>;
pub struct Sse(Subscription);

View File

@ -3,10 +3,10 @@ use crate::request::Subscription;
use futures::future::Future;
use futures::stream::Stream;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{self, Receiver, UnboundedSender};
use warp::ws::{Message, WebSocket};
type EventRx = UnboundedReceiver<Event>;
type EventRx = Receiver<Event>;
type MsgTx = UnboundedSender<Message>;
pub struct Ws(Subscription);