Rircd/src/state.rs

159 lines
5.1 KiB
Rust

use eyre::Result;
use async_trait::async_trait;
use tokio::task::spawn_local;
use std::time::Instant;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::mpsc;
use crate::event::EventRaw;
use crate::event::{EventKind,Event,EventResponse,InternalEvent};
use crate::endpoint::Endpoint;
use crate::irc_endpoint::IrcEndpoint;
pub struct EndpointReceiverPair {
endpoint: Arc<dyn Endpoint>,
receiver: Option<mpsc::Receiver<Event>>,
}
pub struct RircdState {
endpoints: Vec<EndpointReceiverPair>,
//to determine program runtime
creation_timestamp: Option<Instant>,
plainlog: Option<()>,
sqlite_log: Option<()>,
postgres_log: Option<()>,
}
impl RircdState {
pub fn new() -> Result<Self> {
//TODO remove (for testing)
let ep_pairs: Vec<EndpointReceiverPair> = vec![
EndpointReceiverPair {
endpoint: Arc::new(IrcEndpoint {} ),
receiver: None,
},
];
//TODO impl
Ok(RircdState {
endpoints: ep_pairs,
creation_timestamp: Some(Instant::now()),
plainlog: None,
sqlite_log: None,
postgres_log: None,
})
}
//TODO impl
pub fn from_config(config: String) -> Result<Self> {
Self::new()
}
pub async fn broadcast_event(self: &Arc<Self>, event: EventRaw) -> Result<()> {
//TODO impl V
//let couldnt_send_to = Vec::new();
//buffer that holds names/ids of the endpoints
//where the event couldn't be send to +Reason<>
let mut endpoint_responses: Vec<mpsc::Receiver<EventResponse>>= Vec::new();
//self.endpoints.iter().filter(|ep| ep.is_active())
// .then(|ep| ep.
Ok(())
}
pub async fn run(mut self) -> Result<()> {
//TODO choose good default/custom chan size
let (event_sender,mut event_receiver) = mpsc::channel::<Event>(64);
//try to start each endpoint
self.endpoints.iter().filter(|ep| ep.endpoint.is_active()).for_each(|endpoint| {
let endpoint_name = endpoint.endpoint.name();
let (success_status_send,success_status_recv) = oneshot::channel::<Result<mpsc::Sender<Event>>>();
tokio::spawn(endpoint.endpoint.clone().listen(success_status_send, event_sender.to_owned()));
//TODO color err msg
//TODO paralelize and sync at the end
futures::executor::block_on(async move {
println!("starting {} | {}", endpoint_name,
match success_status_recv.await {
Ok(_) => {
let self = self;
format!("SUCCEEDED |")
},
Err(e) => format!("FAILED | <{}>", e),
});
});
});
//TODO Err() if not at least one Endpoint is running because
//if none is running there is no way to activate further (user locked out)
//> who needs a daemon that does nothing and where nothing can be configured to be done
//////////////////////////////////////////////
//
// T O D O
//
// Add a mechanism to on received
// Event::Message's to only send
// them to ALL other endpoints
// except to that where it originated
// from TO PREVENT ECHOING
//
// Eg over a .send_from field
// eg:
//
// match event.kind {
// Message(msg) => {
// for ep_rc in endpoints_receivers {
// if ep_rc != msg.from {
// ep_rc.send(msg)
// }
// }
// },
// }
//
// ^^^ FOR THIS maybe having a wrapper around (R) from
// the endpoint_receivers[R] is required. That contains
// a ref to the endpoint or sth ... idk
//
//////////////////////////////////////////////
loop {
println!("received event");
tokio::select! {
event = event_receiver.recv() => {
//TODO don't unwrap() I guess
let event = event.unwrap();
//EVENT MATCHING
match event.kind {
//INTERNAL
EventKind::Internal(ev) => {
match ev {
InternalEvent::Shutdown => {
//TODO shutdown all endpoints
break;
},
_ => {},
}
assert!( event.result_sender.send(EventResponse::Success).is_ok());
},
//EXTERNAL
EventKind::Message => {
},
};
},
};
}
Ok(())
}
}