changing endpoint storage mechanism to allow inter-Endpoint messaging

This commit is contained in:
some body 2021-09-17 07:32:02 -05:00
parent c38b3b5d1c
commit 43083d6e8d
3 changed files with 51 additions and 12 deletions

View File

@ -9,6 +9,8 @@ use crate::event::Event;
pub trait Endpoint { pub trait Endpoint {
fn name(&self) -> String; fn name(&self) -> String;
fn is_active(&self) -> bool; fn is_active(&self) -> bool;
async fn listen(self: Arc<Self>, event_sender: oneshot::Sender<Result<mpsc::Sender<Event>>>, async fn listen(self: Arc<Self>, event_sender: oneshot::Sender<Result<mpsc::Sender<Event>>>,
event_handler: mpsc::Sender<Event>); event_handler: mpsc::Sender<Event>);
} }

View File

@ -15,6 +15,12 @@ pub enum EventKind {
pub struct Event { pub struct Event {
pub kind: EventKind, pub kind: EventKind,
pub result_sender: oneshot::Sender<EventResponse>, pub result_sender: oneshot::Sender<EventResponse>,
pub sender_name: String,
}
pub struct EventRaw {
pub kind: EventKind,
pub sender_name: String,
} }
pub enum EventResponse { pub enum EventResponse {

View File

@ -5,13 +5,20 @@ use std::time::Instant;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::event::EventRaw;
use crate::event::{EventKind,Event,EventResponse,InternalEvent}; use crate::event::{EventKind,Event,EventResponse,InternalEvent};
use crate::endpoint::Endpoint; use crate::endpoint::Endpoint;
use crate::irc_endpoint::IrcEndpoint; use crate::irc_endpoint::IrcEndpoint;
pub struct EndpointReceiverPair {
endpoint: Arc<dyn Endpoint>,
receiver: Option<mpsc::Receiver<Event>>,
}
pub struct RircdState { pub struct RircdState {
endpoints: Vec<Arc<dyn Endpoint>>, endpoints: Vec<EndpointReceiverPair>,
//to determine program runtime //to determine program runtime
creation_timestamp: Option<Instant>, creation_timestamp: Option<Instant>,
@ -23,14 +30,16 @@ pub struct RircdState {
impl RircdState { impl RircdState {
pub fn new() -> Result<Self> { pub fn new() -> Result<Self> {
//TODO remove (for testing) //TODO remove (for testing)
let eps: Vec<Arc<dyn Endpoint>> = vec![ let ep_pairs: Vec<EndpointReceiverPair> = vec![
Arc::new(IrcEndpoint {} ), EndpointReceiverPair {
endpoint: Arc::new(IrcEndpoint {} ),
receiver: None,
},
]; ];
//TODO impl //TODO impl
Ok(RircdState { Ok(RircdState {
endpoints: eps, endpoints: ep_pairs,
//endpoints: vec![],
creation_timestamp: Some(Instant::now()), creation_timestamp: Some(Instant::now()),
plainlog: None, plainlog: None,
sqlite_log: None, sqlite_log: None,
@ -43,21 +52,37 @@ impl RircdState {
Self::new() Self::new()
} }
pub async fn broadcast_event(self: &Arc<Self>, event: EventRaw) -> Result<()> {
//TODO impl V
//let couldnt_send_to = Vec::new();
//buffer that holds names/ids of the endpoints
//where the event couldn't be send to +Reason<>
let mut endpoint_responses: Vec<mpsc::Receiver<EventResponse>>= Vec::new();
//self.endpoints.iter().filter(|ep| ep.is_active())
// .then(|ep| ep.
Ok(())
}
pub async fn run(mut self) -> Result<()> { pub async fn run(mut self) -> Result<()> {
//TODO choose good default/custom chan size //TODO choose good default/custom chan size
let (event_sender,mut event_receiver) = mpsc::channel::<Event>(64); let (event_sender,mut event_receiver) = mpsc::channel::<Event>(64);
//try to start each endpoint //try to start each endpoint
self.endpoints.iter().filter(|ep| ep.is_active()).for_each(|endpoint| { self.endpoints.iter().filter(|ep| ep.endpoint.is_active()).for_each(|endpoint| {
let endpoint_name = endpoint.name(); let endpoint_name = endpoint.endpoint.name();
let (success_status_send,success_status_recv) = oneshot::channel::<Result<mpsc::Sender<Event>>>(); let (success_status_send,success_status_recv) = oneshot::channel::<Result<mpsc::Sender<Event>>>();
tokio::spawn(endpoint.clone().listen(success_status_send, event_sender.to_owned())); tokio::spawn(endpoint.endpoint.clone().listen(success_status_send, event_sender.to_owned()));
//TODO color err msg //TODO color err msg
//TODO paralelize and sync at the end //TODO paralelize and sync at the end
futures::executor::block_on(async move { futures::executor::block_on(async move {
println!("starting {} | {}", endpoint_name, println!("starting {} | {}", endpoint_name,
match success_status_recv.await { match success_status_recv.await {
Ok(_) => format!("SUCCEEDED |"), Ok(_) => {
format!("SUCCEEDED |")
},
Err(e) => format!("FAILED | <{}>", e), Err(e) => format!("FAILED | <{}>", e),
}); });
}); });
@ -103,9 +128,12 @@ impl RircdState {
event = event_receiver.recv() => { event = event_receiver.recv() => {
//TODO don't unwrap() I guess //TODO don't unwrap() I guess
let event = event.unwrap(); let event = event.unwrap();
//EVENT MATCHING
match event.kind { match event.kind {
//INTERNAL
EventKind::Internal(ev) => { EventKind::Internal(ev) => {
//TODO impl TODO don't unwrap
match ev { match ev {
InternalEvent::Shutdown => { InternalEvent::Shutdown => {
//TODO shutdown all endpoints //TODO shutdown all endpoints
@ -115,8 +143,11 @@ impl RircdState {
} }
assert!( event.result_sender.send(EventResponse::Success).is_ok()); assert!( event.result_sender.send(EventResponse::Success).is_ok());
}, },
EventKind::Message => {},
_ => {}, //EXTERNAL
EventKind::Message => {
},
}; };
}, },
}; };