From 43083d6e8ddce521dd69a53e2be8da2e0f543c14 Mon Sep 17 00:00:00 2001 From: some body Date: Fri, 17 Sep 2021 07:32:02 -0500 Subject: [PATCH] changing endpoint storage mechanism to allow inter-Endpoint messaging --- src/endpoint.rs | 2 ++ src/event.rs | 6 ++++++ src/state.rs | 55 ++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 51 insertions(+), 12 deletions(-) diff --git a/src/endpoint.rs b/src/endpoint.rs index 2763c59..16a6a8f 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -9,6 +9,8 @@ use crate::event::Event; pub trait Endpoint { fn name(&self) -> String; fn is_active(&self) -> bool; + async fn listen(self: Arc, event_sender: oneshot::Sender>>, event_handler: mpsc::Sender); + } diff --git a/src/event.rs b/src/event.rs index 0efe008..7478d0a 100644 --- a/src/event.rs +++ b/src/event.rs @@ -15,6 +15,12 @@ pub enum EventKind { pub struct Event { pub kind: EventKind, pub result_sender: oneshot::Sender, + pub sender_name: String, +} + +pub struct EventRaw { + pub kind: EventKind, + pub sender_name: String, } pub enum EventResponse { diff --git a/src/state.rs b/src/state.rs index f5dcd5b..cd19c6b 100644 --- a/src/state.rs +++ b/src/state.rs @@ -5,13 +5,20 @@ use std::time::Instant; use std::sync::Arc; use tokio::sync::oneshot; use tokio::sync::mpsc; +use crate::event::EventRaw; use crate::event::{EventKind,Event,EventResponse,InternalEvent}; use crate::endpoint::Endpoint; use crate::irc_endpoint::IrcEndpoint; + +pub struct EndpointReceiverPair { + endpoint: Arc, + receiver: Option>, +} + pub struct RircdState { - endpoints: Vec>, + endpoints: Vec, //to determine program runtime creation_timestamp: Option, @@ -23,14 +30,16 @@ pub struct RircdState { impl RircdState { pub fn new() -> Result { //TODO remove (for testing) - let eps: Vec> = vec![ - Arc::new(IrcEndpoint {} ), + let ep_pairs: Vec = vec![ + EndpointReceiverPair { + endpoint: Arc::new(IrcEndpoint {} ), + receiver: None, + }, ]; //TODO impl Ok(RircdState { - endpoints: eps, - //endpoints: vec![], + endpoints: ep_pairs, creation_timestamp: Some(Instant::now()), plainlog: None, sqlite_log: None, @@ -43,21 +52,37 @@ impl RircdState { Self::new() } + pub async fn broadcast_event(self: &Arc, event: EventRaw) -> Result<()> { + //TODO impl V + //let couldnt_send_to = Vec::new(); + //buffer that holds names/ids of the endpoints + //where the event couldn't be send to +Reason<> + let mut endpoint_responses: Vec>= Vec::new(); + + //self.endpoints.iter().filter(|ep| ep.is_active()) + // .then(|ep| ep. + + Ok(()) + } + pub async fn run(mut self) -> Result<()> { //TODO choose good default/custom chan size let (event_sender,mut event_receiver) = mpsc::channel::(64); //try to start each endpoint - self.endpoints.iter().filter(|ep| ep.is_active()).for_each(|endpoint| { - let endpoint_name = endpoint.name(); + self.endpoints.iter().filter(|ep| ep.endpoint.is_active()).for_each(|endpoint| { + let endpoint_name = endpoint.endpoint.name(); let (success_status_send,success_status_recv) = oneshot::channel::>>(); - tokio::spawn(endpoint.clone().listen(success_status_send, event_sender.to_owned())); + tokio::spawn(endpoint.endpoint.clone().listen(success_status_send, event_sender.to_owned())); //TODO color err msg //TODO paralelize and sync at the end futures::executor::block_on(async move { println!("starting {} | {}", endpoint_name, match success_status_recv.await { - Ok(_) => format!("SUCCEEDED |"), + Ok(_) => { + + format!("SUCCEEDED |") + }, Err(e) => format!("FAILED | <{}>", e), }); }); @@ -103,9 +128,12 @@ impl RircdState { event = event_receiver.recv() => { //TODO don't unwrap() I guess let event = event.unwrap(); + + //EVENT MATCHING match event.kind { + + //INTERNAL EventKind::Internal(ev) => { - //TODO impl TODO don't unwrap match ev { InternalEvent::Shutdown => { //TODO shutdown all endpoints @@ -115,8 +143,11 @@ impl RircdState { } assert!( event.result_sender.send(EventResponse::Success).is_ok()); }, - EventKind::Message => {}, - _ => {}, + + //EXTERNAL + EventKind::Message => { + + }, }; }, };