added Event::Shutdown

This commit is contained in:
some body 2021-09-17 11:25:25 -05:00
parent 999964f257
commit e7b30d1753
3 changed files with 73 additions and 49 deletions

View File

@ -11,7 +11,9 @@ pub enum EventKind {
Internal(InternalEvent),
}
//make this an option or create a new type because
//to the receiving endpoint the sender_name is irrelevant
//it only really matters for the routing phase from state::run()
pub struct Event {
pub kind: EventKind,
pub result_sender: oneshot::Sender<EventResponse>,
@ -29,3 +31,15 @@ pub enum EventResponse {
//use Error type
Failed(String),
}
impl Event {
pub fn new(event_raw: EventRaw) -> (Event,oneshot::Receiver<EventResponse>) {
let (s,r) = oneshot::channel();
(Event {
kind: event_raw.kind,
result_sender: s,
//TODO
sender_name: event_raw.sender_name,
},r)
}
}

View File

@ -30,7 +30,16 @@ impl Endpoint for IrcEndpoint {
//TODO find apropiate defeault or size
let (event_sender,event_receiver) = mpsc::channel(64);
let (event_response_send, event_response_recv) = oneshot::channel();
event_state_handler_sender.send(Ok(event_sender));
event_handler.send(
Event {
kind: EventKind::Internal(InternalEvent::Test),
result_sender: event_response_send,
sender_name: self.name(),
}).await;
break;
loop {

View File

@ -1,20 +1,20 @@
use eyre::Result;
use futures::stream::StreamExt;
use async_trait::async_trait;
use tokio::task::spawn_local;
use std::time::Instant;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::mpsc;
use tokio::sync::{oneshot,Mutex,mpsc};
use crate::event::EventRaw;
use crate::event::{EventKind,Event,EventResponse,InternalEvent};
use futures::stream::iter;
use crate::endpoint::Endpoint;
use crate::irc_endpoint::IrcEndpoint;
pub struct EndpointReceiverPair {
endpoint: Arc<dyn Endpoint>,
receiver: Option<mpsc::Receiver<Event>>,
sender: Option<mpsc::Sender<Event>>,
}
pub struct RircdState {
@ -33,7 +33,7 @@ impl RircdState {
let ep_pairs: Vec<EndpointReceiverPair> = vec![
EndpointReceiverPair {
endpoint: Arc::new(IrcEndpoint {} ),
receiver: None,
sender: None,
},
];
@ -57,7 +57,7 @@ impl RircdState {
//let couldnt_send_to = Vec::new();
//buffer that holds names/ids of the endpoints
//where the event couldn't be send to +Reason<>
let mut endpoint_responses: Vec<mpsc::Receiver<EventResponse>>= Vec::new();
let mut endpoint_responses: Vec<mpsc::Receiver<EventResponse>> = Vec::new();
//self.endpoints.iter().filter(|ep| ep.is_active())
// .then(|ep| ep.
@ -70,63 +70,39 @@ impl RircdState {
let (event_sender,mut event_receiver) = mpsc::channel::<Event>(64);
//try to start each endpoint
self.endpoints.iter().filter(|ep| ep.endpoint.is_active()).for_each(|endpoint| {
self.endpoints.iter_mut().filter(|ep| ep.endpoint.is_active()).for_each(|endpoint| {
let endpoint_name = endpoint.endpoint.name();
let (success_status_send,success_status_recv) = oneshot::channel::<Result<mpsc::Sender<Event>>>();
tokio::spawn(endpoint.endpoint.clone().listen(success_status_send, event_sender.to_owned()));
//TODO color err msg
//TODO paralelize and sync at the end
futures::executor::block_on(async move {
println!("starting {} | {}", endpoint_name,
match success_status_recv.await {
Ok(_) => {
let self = self;
format!("SUCCEEDED |")
},
Err(e) => format!("FAILED | <{}>", e),
});
});
println!("starting {} | {}", endpoint_name,
match futures::executor::block_on(async move {
//TODO find out when, why and if V this V can fail
success_status_recv.await.unwrap()
}) {
Ok(sender) => {
endpoint.sender = Some(sender);
format!("SUCCESS |")
},
Err(e) => format!("FAILED | <{}>", e),
});
});
//TODO Err() if not at least one Endpoint is running because
//if none is running there is no way to activate further (user locked out)
//> who needs a daemon that does nothing and where nothing can be configured to be done
//////////////////////////////////////////////
//
// T O D O
//
// Add a mechanism to on received
// Event::Message's to only send
// them to ALL other endpoints
// except to that where it originated
// from TO PREVENT ECHOING
//
// Eg over a .send_from field
// eg:
//
// match event.kind {
// Message(msg) => {
// for ep_rc in endpoints_receivers {
// if ep_rc != msg.from {
// ep_rc.send(msg)
// }
// }
// },
// }
//
// ^^^ FOR THIS maybe having a wrapper around (R) from
// the endpoint_receivers[R] is required. That contains
// a ref to the endpoint or sth ... idk
//
//////////////////////////////////////////////
loop {
println!("received event");
tokio::select! {
event = event_receiver.recv() => {
//TODO don't unwrap() I guess
//TODO V find out why when and if V this can fail
let event = event.unwrap();
//EVENT MATCHING
@ -136,7 +112,32 @@ impl RircdState {
EventKind::Internal(ev) => {
match ev {
InternalEvent::Shutdown => {
//TODO shutdown all endpoints
//TODO Output=Result<>
// -> print warning if
// some thing couldn't be
// shut down and maybe do
// a "waiting for xxx to finish"
// linux init system style waiting
// idk thingy
//
// TODO uhm why not instead make a
// shutdown() function and resuse some code
iter(self.endpoints).all(|ep| async {
let (ep_ev,r) = Event::new( EventRaw {
kind: EventKind::Internal(InternalEvent::Shutdown),
sender_name: "".into() //TODO look TODO in event.rs
});
//TODO V find out why when and if V this can fail
assert!(ep.sender.unwrap().send(ep_ev).await.is_ok());
println!("shutting down | {}", match r.await.unwrap() {
EventResponse::Success => format!("SUCCESS"),
//TODO impl
_ => format!("FAILED | <{}>", "reason :("),
});
true
}).await;
break;
},
_ => {},