diff --git a/src/event.rs b/src/event.rs index 7478d0a..4309d40 100644 --- a/src/event.rs +++ b/src/event.rs @@ -11,7 +11,9 @@ pub enum EventKind { Internal(InternalEvent), } - +//make this an option or create a new type because +//to the receiving endpoint the sender_name is irrelevant +//it only really matters for the routing phase from state::run() pub struct Event { pub kind: EventKind, pub result_sender: oneshot::Sender, @@ -29,3 +31,15 @@ pub enum EventResponse { //use Error type Failed(String), } + +impl Event { + pub fn new(event_raw: EventRaw) -> (Event,oneshot::Receiver) { + let (s,r) = oneshot::channel(); + (Event { + kind: event_raw.kind, + result_sender: s, + //TODO + sender_name: event_raw.sender_name, + },r) + } +} diff --git a/src/irc_endpoint.rs b/src/irc_endpoint.rs index ee16fc6..fd0eb8b 100644 --- a/src/irc_endpoint.rs +++ b/src/irc_endpoint.rs @@ -30,7 +30,16 @@ impl Endpoint for IrcEndpoint { //TODO find apropiate defeault or size let (event_sender,event_receiver) = mpsc::channel(64); + let (event_response_send, event_response_recv) = oneshot::channel(); event_state_handler_sender.send(Ok(event_sender)); + event_handler.send( + Event { + kind: EventKind::Internal(InternalEvent::Test), + result_sender: event_response_send, + sender_name: self.name(), + }).await; + + break; loop { diff --git a/src/state.rs b/src/state.rs index 32389e1..76f7539 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,20 +1,20 @@ use eyre::Result; +use futures::stream::StreamExt; use async_trait::async_trait; use tokio::task::spawn_local; use std::time::Instant; use std::sync::Arc; -use tokio::sync::oneshot; -use tokio::sync::mpsc; +use tokio::sync::{oneshot,Mutex,mpsc}; use crate::event::EventRaw; use crate::event::{EventKind,Event,EventResponse,InternalEvent}; - +use futures::stream::iter; use crate::endpoint::Endpoint; use crate::irc_endpoint::IrcEndpoint; pub struct EndpointReceiverPair { endpoint: Arc, - receiver: Option>, + sender: Option>, } pub struct RircdState { @@ -33,7 +33,7 @@ impl RircdState { let ep_pairs: Vec = vec![ EndpointReceiverPair { endpoint: Arc::new(IrcEndpoint {} ), - receiver: None, + sender: None, }, ]; @@ -57,7 +57,7 @@ impl RircdState { //let couldnt_send_to = Vec::new(); //buffer that holds names/ids of the endpoints //where the event couldn't be send to +Reason<> - let mut endpoint_responses: Vec>= Vec::new(); + let mut endpoint_responses: Vec> = Vec::new(); //self.endpoints.iter().filter(|ep| ep.is_active()) // .then(|ep| ep. @@ -70,63 +70,39 @@ impl RircdState { let (event_sender,mut event_receiver) = mpsc::channel::(64); //try to start each endpoint - self.endpoints.iter().filter(|ep| ep.endpoint.is_active()).for_each(|endpoint| { + self.endpoints.iter_mut().filter(|ep| ep.endpoint.is_active()).for_each(|endpoint| { let endpoint_name = endpoint.endpoint.name(); let (success_status_send,success_status_recv) = oneshot::channel::>>(); tokio::spawn(endpoint.endpoint.clone().listen(success_status_send, event_sender.to_owned())); + //TODO color err msg //TODO paralelize and sync at the end - futures::executor::block_on(async move { - println!("starting {} | {}", endpoint_name, - match success_status_recv.await { - Ok(_) => { - let self = self; - format!("SUCCEEDED |") - }, - Err(e) => format!("FAILED | <{}>", e), - }); - }); + println!("starting {} | {}", endpoint_name, + match futures::executor::block_on(async move { + //TODO find out when, why and if V this V can fail + success_status_recv.await.unwrap() + }) { + Ok(sender) => { + endpoint.sender = Some(sender); + format!("SUCCESS |") + }, + Err(e) => format!("FAILED | <{}>", e), + }); }); + //TODO Err() if not at least one Endpoint is running because //if none is running there is no way to activate further (user locked out) //> who needs a daemon that does nothing and where nothing can be configured to be done - - ////////////////////////////////////////////// - // - // T O D O - // - // Add a mechanism to on received - // Event::Message's to only send - // them to ALL other endpoints - // except to that where it originated - // from TO PREVENT ECHOING - // - // Eg over a .send_from field - // eg: - // - // match event.kind { - // Message(msg) => { - // for ep_rc in endpoints_receivers { - // if ep_rc != msg.from { - // ep_rc.send(msg) - // } - // } - // }, - // } - // - // ^^^ FOR THIS maybe having a wrapper around (R) from - // the endpoint_receivers[R] is required. That contains - // a ref to the endpoint or sth ... idk - // - ////////////////////////////////////////////// + + loop { println!("received event"); tokio::select! { event = event_receiver.recv() => { - //TODO don't unwrap() I guess + //TODO V find out why when and if V this can fail let event = event.unwrap(); //EVENT MATCHING @@ -136,7 +112,32 @@ impl RircdState { EventKind::Internal(ev) => { match ev { InternalEvent::Shutdown => { - //TODO shutdown all endpoints + //TODO Output=Result<> + // -> print warning if + // some thing couldn't be + // shut down and maybe do + // a "waiting for xxx to finish" + // linux init system style waiting + // idk thingy + // + // TODO uhm why not instead make a + // shutdown() function and resuse some code + iter(self.endpoints).all(|ep| async { + let (ep_ev,r) = Event::new( EventRaw { + kind: EventKind::Internal(InternalEvent::Shutdown), + sender_name: "".into() //TODO look TODO in event.rs + }); + //TODO V find out why when and if V this can fail + assert!(ep.sender.unwrap().send(ep_ev).await.is_ok()); + + println!("shutting down | {}", match r.await.unwrap() { + EventResponse::Success => format!("SUCCESS"), + //TODO impl + _ => format!("FAILED | <{}>", "reason :("), + }); + + true + }).await; break; }, _ => {},