Rircd/src/state.rs

169 lines
6.4 KiB
Rust

use eyre::Result;
use futures::stream::StreamExt;
use async_trait::async_trait;
use tokio::task::spawn_local;
use std::time::Instant;
use std::sync::Arc;
use tokio::sync::{oneshot,Mutex,mpsc};
use crate::event::EventRaw;
use crate::event::{EventKind,Event,EventResponse,InternalEvent};
use futures::stream::iter;
use crate::endpoint::Endpoint;
use crate::irc_endpoint::IrcEndpoint;
pub struct EndpointReceiverPair {
endpoint: Arc<dyn Endpoint>,
sender: Option<mpsc::Sender<Event>>,
}
pub struct RircdState {
endpoints: Vec<EndpointReceiverPair>,
//to determine program runtime
creation_timestamp: Option<Instant>,
plainlog: Option<()>,
sqlite_log: Option<()>,
postgres_log: Option<()>,
}
impl RircdState {
pub fn new() -> Result<Self> {
//TODO remove (for testing)
let ep_pairs: Vec<EndpointReceiverPair> = vec![
EndpointReceiverPair {
endpoint: Arc::new(IrcEndpoint {} ),
sender: None,
},
];
//TODO impl
Ok(RircdState {
endpoints: ep_pairs,
creation_timestamp: Some(Instant::now()),
plainlog: None,
sqlite_log: None,
postgres_log: None,
})
}
//TODO impl
pub fn from_config(config: String) -> Result<Self> {
Self::new()
}
pub async fn broadcast_event(self: &Arc<Self>, event: EventRaw) -> Result<()> {
//TODO impl V
//let couldnt_send_to = Vec::new();
//buffer that holds names/ids of the endpoints
//where the event couldn't be send to +Reason<>
let mut endpoint_responses: Vec<mpsc::Receiver<EventResponse>> = Vec::new();
//self.endpoints.iter().filter(|ep| ep.is_active())
// .then(|ep| ep.
Ok(())
}
pub async fn run(mut self) -> Result<()> {
//TODO choose good default/custom chan size
let (event_sender,mut event_receiver) = mpsc::channel::<Event>(64);
//try to start each endpoint
self.endpoints.iter_mut().filter(|ep| ep.endpoint.is_active()).for_each(|endpoint| {
let endpoint_name = endpoint.endpoint.name();
let (success_status_send,success_status_recv) = oneshot::channel::<Result<mpsc::Sender<Event>>>();
tokio::spawn(endpoint.endpoint.clone().listen(success_status_send, event_sender.to_owned()));
//TODO color err msg
//TODO paralelize and sync at the end
println!("starting {} | {}", endpoint_name,
match futures::executor::block_on(async move {
//TODO find out when, why and if V this V can fail
success_status_recv.await.unwrap()
}) {
Ok(sender) => {
endpoint.sender = Some(sender);
format!("SUCCESS |")
},
Err(e) => format!("FAILED | <{}>", e),
});
});
//TODO Err() if not at least one Endpoint is running because
//if none is running there is no way to activate further (user locked out)
//> who needs a daemon that does nothing and where nothing can be configured to be done
loop {
println!("received event");
tokio::select! {
event = event_receiver.recv() => {
//TODO V find out why when and if V this can fail
let event = event.unwrap();
//EVENT MATCHING
match &event.kind {
//INTERNAL
EventKind::Internal(ev) => {
match ev {
InternalEvent::Shutdown => {
//TODO Output=Result<>
// -> print warning if
// some thing couldn't be
// shut down and maybe do
// a "waiting for xxx to finish"
// linux init system style waiting
// idk thingy
//
// TODO uhm why not instead make a
// shutdown() function and resuse some code
iter(self.endpoints).all(|ep| async {
let (ep_ev,r) = Event::new( EventRaw {
kind: EventKind::Internal(InternalEvent::Shutdown),
sender_name: "".into() //TODO look TODO in event.rs
});
//TODO V find out why when and if V this can fail
assert!(ep.sender.unwrap().send(ep_ev).await.is_ok());
println!("shutting down | {}", match r.await.unwrap() {
EventResponse::Success => format!("SUCCESS"),
//TODO impl
_ => format!("FAILED | <{}>", "reason :("),
});
true
}).await;
break;
},
_ => {},
}
assert!( event.result_sender.send(EventResponse::Success).is_ok());
},
//MESSAGE
EventKind::Message => {
//TODO maybe notify sender if one
//of the endpoints isn't connected
//or just write to log or do nothing depending
//on the chosen configuration
iter(&self.endpoints).all(|ep| async {
if ep.endpoint.name() == event.sender_name {
ep.sender.unwrap().send(event);
}
false
}).await;
},
};
},
};
}
Ok(())
}
}