Add the first draft of an event handler system

This commit is contained in:
Rodolphe Breard 2019-01-05 13:00:09 +01:00
parent a6b9d18374
commit 2b87c9c3c3
4 changed files with 95 additions and 13 deletions

View file

@ -1,7 +1,14 @@
use log::debug;
use env_logger::{Builder, Env};
use opensmtpd::SmtpIn;
use opensmtpd::{Entry, EventHandler, MatchEvent, SmtpIn};
fn on_event(entry: &Entry) -> bool {
debug!("Event received: {:?}", entry);
true
}
fn main() {
Builder::from_env(Env::default().default_filter_or("debug")).init();
SmtpIn::new().run();
let h = vec![EventHandler::new(MatchEvent::All, on_event)];
SmtpIn::new().event_handlers(h).run();
}

View file

@ -1,4 +1,5 @@
use crate::entry::Entry;
use crate::event_handlers::EventHandler;
use std::fmt;
pub struct Error {
@ -41,3 +42,9 @@ impl From<std::sync::mpsc::SendError<Entry>> for Error {
Error::new(&format!("IO error: {}", error))
}
}
impl From<std::sync::mpsc::SendError<EventHandler>> for Error {
fn from(error: std::sync::mpsc::SendError<EventHandler>) -> Self {
Error::new(&format!("IO error: {}", error))
}
}

30
src/event_handlers.rs Normal file
View file

@ -0,0 +1,30 @@
use crate::entry::{Entry, Event};
#[derive(Clone, Debug, PartialEq)]
pub enum MatchEvent {
Evt(Vec<Event>),
All,
}
#[derive(Clone)]
pub struct EventHandler {
event: MatchEvent,
callback: (fn(&Entry) -> bool),
}
impl EventHandler {
pub fn new(event: MatchEvent, callback: (fn(&Entry) -> bool)) -> Self {
EventHandler { event, callback }
}
pub fn is_callable(&self, event: Event) -> bool {
match &self.event {
MatchEvent::All => true,
MatchEvent::Evt(v) => v.contains(&event),
}
}
pub fn call(&self, entry: &Entry) -> bool {
(self.callback)(entry)
}
}

View file

@ -1,16 +1,50 @@
mod entry;
mod errors;
mod event_handlers;
use crate::entry::Entry;
use crate::errors::Error;
use log::{debug, error, warn};
use std::collections::HashMap;
use std::io;
use std::sync::mpsc;
use std::thread;
pub use crate::entry::{Entry, Event};
pub use crate::errors::Error;
pub use crate::event_handlers::{EventHandler, MatchEvent};
struct SessionHandler {
entry_rx: mpsc::Receiver<Entry>,
event_handlers: Vec<EventHandler>,
}
impl SessionHandler {
fn new(entry_rx: mpsc::Receiver<Entry>, handlers_rx: mpsc::Receiver<EventHandler>) -> Self {
debug!("New thread for session {}", thread::current().name().unwrap());
let mut event_handlers = Vec::new();
for h in handlers_rx.iter() {
debug!("Event handler registered");
event_handlers.push(h);
}
SessionHandler {
entry_rx,
event_handlers,
}
}
fn read_entries(&self) {
for e in self.entry_rx.iter() {
for h in self.event_handlers.iter() {
if h.is_callable(e.event.clone()) {
h.call(&e);
}
}
}
}
}
pub struct SmtpIn {
sessions: HashMap<u64, (mpsc::Sender<Entry>, thread::JoinHandle<()>)>,
event_handlers: Vec<EventHandler>,
}
impl SmtpIn {
@ -32,18 +66,16 @@ impl SmtpIn {
let channel = match self.sessions.get(&entry.session_id) {
Some((r, _)) => r,
None => {
let (tx, rx) = mpsc::channel();
let (handlers_tx, handlers_rx) = mpsc::channel();
let (entry_tx, entry_rx) = mpsc::channel();
let name = entry.session_id.to_string();
let handle = thread::Builder::new().name(name).spawn(move || {
debug!(
"New thread for session {}",
thread::current().name().unwrap()
);
for e in rx.iter() {
debug!("thread {}: {:?}", thread::current().name().unwrap(), e);
}
SessionHandler::new(entry_rx, handlers_rx).read_entries();
})?;
self.sessions.insert(entry.session_id, (tx, handle));
for h in self.event_handlers.iter() {
handlers_tx.send(h.clone())?;
}
self.sessions.insert(entry.session_id, (entry_tx, handle));
let (r, _) = self.sessions.get(&entry.session_id).unwrap();
r
}
@ -69,9 +101,15 @@ impl SmtpIn {
pub fn new() -> Self {
SmtpIn {
sessions: HashMap::new(),
event_handlers: Vec::new(),
}
}
pub fn event_handlers(&mut self, handlers: Vec<EventHandler>) -> &mut Self {
self.event_handlers = handlers.clone();
self
}
/// Run the infinite loop that will read and process input from stdin.
pub fn run(&mut self) {
loop {