Use procedural macros to define events
The construction of an EventHandler object should not be directly done by the client. Instead, it is easier to use procedural macro to automatize the process, hence exposing a nice and simple interface. Such use of procedural macros requires to crate an additional crate.
This commit is contained in:
parent
ccda4b1517
commit
789455668c
17 changed files with 1190 additions and 76 deletions
200
opensmtpd/src/entry.rs
Normal file
200
opensmtpd/src/entry.rs
Normal file
|
@ -0,0 +1,200 @@
|
|||
use crate::errors::Error;
|
||||
use nom::{alt, alt_complete, call, complete, cond, do_parse, error_position, map_res, named, opt,
|
||||
tag, take_until, take_while};
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum Kind {
|
||||
Report,
|
||||
Filter,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum Subsystem {
|
||||
SmtpIn,
|
||||
SmtpOut,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum Event {
|
||||
LinkConnect,
|
||||
LinkDisconnect,
|
||||
LinkIdentify,
|
||||
LinkTls,
|
||||
TxBegin,
|
||||
TxMail,
|
||||
TxRcpt,
|
||||
TxEnvelope,
|
||||
TxData,
|
||||
TxCommit,
|
||||
TxRollback,
|
||||
ProtocolClient,
|
||||
ProtocolServer,
|
||||
Timeout,
|
||||
FilterResponse,
|
||||
}
|
||||
|
||||
impl Event {
|
||||
pub fn from_str(s: &str) -> Result<Event, Error> {
|
||||
let s = s.to_lowercase()
|
||||
.replace("link", "link-")
|
||||
.replace("tx", "tx-")
|
||||
.replace("protocol", "protocol-")
|
||||
.replace("filter", "filter-");
|
||||
let (_, evt) = parse_event(&s)?;
|
||||
Ok(evt)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TimeVal {
|
||||
pub sec: i64,
|
||||
pub usec: i64,
|
||||
}
|
||||
|
||||
impl TimeVal {
|
||||
pub fn to_string(&self) -> String {
|
||||
format!("{}.{}", self.sec, self.usec)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Entry {
|
||||
pub kind: Kind,
|
||||
pub version: u8,
|
||||
pub timestamp: TimeVal,
|
||||
pub subsystem: Subsystem,
|
||||
pub event: Event,
|
||||
pub token: Option<u64>,
|
||||
pub session_id: u64,
|
||||
pub params: Option<String>,
|
||||
}
|
||||
|
||||
impl Entry {
|
||||
pub fn from_str(entry: &str) -> Result<Entry, Error> {
|
||||
let (_, res) = parse_entry(entry)?;
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
fn is_ascii_digit(c: char) -> bool {
|
||||
c.is_ascii_digit()
|
||||
}
|
||||
|
||||
fn is_ascii_digit_or_neg(c: char) -> bool {
|
||||
c.is_ascii_digit() || c == '-'
|
||||
}
|
||||
|
||||
fn is_ascii_hexdigit(c: char) -> bool {
|
||||
c.is_ascii_hexdigit()
|
||||
}
|
||||
|
||||
fn to_u8(s: &str) -> Result<u8, std::num::ParseIntError> {
|
||||
s.parse()
|
||||
}
|
||||
|
||||
fn to_i64(s: &str) -> Result<i64, std::num::ParseIntError> {
|
||||
s.parse()
|
||||
}
|
||||
|
||||
fn to_u64_hex(s: &str) -> Result<u64, std::num::ParseIntError> {
|
||||
u64::from_str_radix(s, 16)
|
||||
}
|
||||
|
||||
named!(parse_i64<&str, i64>,
|
||||
map_res!(take_while!(is_ascii_digit_or_neg), to_i64)
|
||||
);
|
||||
|
||||
named!(parse_u64_hex<&str, u64>,
|
||||
map_res!(take_while!(is_ascii_hexdigit), to_u64_hex)
|
||||
);
|
||||
|
||||
named!(parse_kind<&str, Kind>,
|
||||
alt_complete!(
|
||||
tag!("report") => { |_| Kind::Report } |
|
||||
tag!("filter") => { |_| Kind::Filter }
|
||||
)
|
||||
);
|
||||
|
||||
named!(parse_version<&str, u8>,
|
||||
map_res!(take_while!(is_ascii_digit), to_u8)
|
||||
);
|
||||
|
||||
named!(parse_timestamp<&str, TimeVal>,
|
||||
do_parse!(
|
||||
sec: parse_i64 >>
|
||||
tag!(".") >>
|
||||
usec: parse_i64 >>
|
||||
(TimeVal { sec, usec})
|
||||
)
|
||||
);
|
||||
|
||||
named!(parse_subsystem<&str, Subsystem>,
|
||||
alt_complete! (
|
||||
tag!("smtp-in") => { |_| Subsystem::SmtpIn } |
|
||||
tag!("smtp-out") => { |_| Subsystem::SmtpOut }
|
||||
)
|
||||
);
|
||||
|
||||
named!(parse_event<&str, Event>,
|
||||
alt_complete!(
|
||||
tag!("link-connect") => { |_| Event::LinkConnect } |
|
||||
tag!("link-disconnect") => { |_| Event::LinkDisconnect } |
|
||||
tag!("link-identify") => { |_| Event::LinkIdentify } |
|
||||
tag!("link-tls") => { |_| Event::LinkTls } |
|
||||
tag!("tx-begin") => { |_| Event::TxBegin } |
|
||||
tag!("tx-mail") => { |_| Event::TxMail } |
|
||||
tag!("tx-rcpt") => { |_| Event::TxRcpt } |
|
||||
tag!("tx-envelope") => { |_| Event::TxEnvelope } |
|
||||
tag!("tx-data") => { |_| Event::TxData } |
|
||||
tag!("tx-commit") => { |_| Event::TxCommit } |
|
||||
tag!("tx-rollback") => { |_| Event::TxRollback } |
|
||||
tag!("protocol-client") => { |_| Event::ProtocolClient } |
|
||||
tag!("protocol-server") => { |_| Event::ProtocolServer } |
|
||||
tag!("timeout") => { |_| Event::Timeout } |
|
||||
tag!("filter-response") => { |_| Event::FilterResponse }
|
||||
)
|
||||
);
|
||||
|
||||
named!(parse_token<&str, u64>,
|
||||
do_parse!(
|
||||
token: parse_u64_hex >>
|
||||
tag!("|") >>
|
||||
(token)
|
||||
)
|
||||
);
|
||||
|
||||
named!(parse_params<&str, String>,
|
||||
do_parse!(
|
||||
tag!("|") >>
|
||||
s: take_until!("\n") >>
|
||||
(s.to_string())
|
||||
)
|
||||
);
|
||||
|
||||
named!(parse_entry<&str, Entry>,
|
||||
do_parse!(
|
||||
kind: parse_kind >>
|
||||
tag!("|") >>
|
||||
version: parse_version >>
|
||||
tag!("|") >>
|
||||
timestamp: parse_timestamp >>
|
||||
tag!("|") >>
|
||||
subsystem: parse_subsystem >>
|
||||
tag!("|") >>
|
||||
event: parse_event >>
|
||||
tag!("|") >>
|
||||
token: cond!(kind == Kind::Filter, parse_token) >>
|
||||
session_id: parse_u64_hex >>
|
||||
params: opt!(parse_params) >>
|
||||
(Entry {
|
||||
kind,
|
||||
version,
|
||||
timestamp,
|
||||
subsystem,
|
||||
event,
|
||||
token,
|
||||
session_id,
|
||||
params,
|
||||
})
|
||||
)
|
||||
);
|
50
opensmtpd/src/errors.rs
Normal file
50
opensmtpd/src/errors.rs
Normal file
|
@ -0,0 +1,50 @@
|
|||
use crate::entry::Entry;
|
||||
use crate::event_handlers::EventHandler;
|
||||
use std::fmt;
|
||||
|
||||
pub struct Error {
|
||||
message: String,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
pub fn new(msg: &str) -> Self {
|
||||
Error {
|
||||
message: msg.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", self.message)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(error: std::io::Error) -> Self {
|
||||
Error::new(&format!("IO error: {}", error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<nom::Err<&str>> for Error {
|
||||
fn from(error: nom::Err<&str>) -> Self {
|
||||
let msg = match error {
|
||||
nom::Err::Incomplete(_) => "not enough data".to_string(),
|
||||
nom::Err::Error(c) => format!("{:?}", c),
|
||||
nom::Err::Failure(c) => format!("{:?}", c),
|
||||
};
|
||||
Error::new(&format!("Parsing error: {}", msg))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::sync::mpsc::SendError<Entry>> for Error {
|
||||
fn from(error: std::sync::mpsc::SendError<Entry>) -> Self {
|
||||
Error::new(&format!("IO error: {}", error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::sync::mpsc::SendError<EventHandler>> for Error {
|
||||
fn from(error: std::sync::mpsc::SendError<EventHandler>) -> Self {
|
||||
Error::new(&format!("IO error: {}", error))
|
||||
}
|
||||
}
|
51
opensmtpd/src/event_handlers.rs
Normal file
51
opensmtpd/src/event_handlers.rs
Normal file
|
@ -0,0 +1,51 @@
|
|||
use crate::entry::{Entry, Event};
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum MatchEvent {
|
||||
Evt(Vec<Event>),
|
||||
All,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct EventHandler {
|
||||
event: MatchEvent,
|
||||
callback: (fn(&Entry) -> bool),
|
||||
}
|
||||
|
||||
impl EventHandler {
|
||||
fn get_events_from_string(event_str: &String) -> MatchEvent {
|
||||
let mut events = Vec::new();
|
||||
for name in event_str.split(" , ") {
|
||||
match name {
|
||||
"Any" | "All" => {
|
||||
return MatchEvent::All;
|
||||
}
|
||||
_ => match Event::from_str(name) {
|
||||
Ok(e) => {
|
||||
events.push(e);
|
||||
}
|
||||
Err(_) => {}
|
||||
},
|
||||
}
|
||||
}
|
||||
MatchEvent::Evt(events)
|
||||
}
|
||||
|
||||
pub fn new(event_str: String, callback: (fn(&Entry) -> bool)) -> Self {
|
||||
EventHandler {
|
||||
event: EventHandler::get_events_from_string(&event_str),
|
||||
callback,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_callable(&self, event: Event) -> bool {
|
||||
match &self.event {
|
||||
MatchEvent::All => true,
|
||||
MatchEvent::Evt(v) => v.contains(&event),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn call(&self, entry: &Entry) -> bool {
|
||||
(self.callback)(entry)
|
||||
}
|
||||
}
|
152
opensmtpd/src/lib.rs
Normal file
152
opensmtpd/src/lib.rs
Normal file
|
@ -0,0 +1,152 @@
|
|||
mod entry;
|
||||
mod errors;
|
||||
mod event_handlers;
|
||||
|
||||
use log::{debug, error, warn};
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
|
||||
pub use crate::entry::{Entry, Event};
|
||||
pub use crate::errors::Error;
|
||||
pub use crate::event_handlers::{EventHandler, MatchEvent};
|
||||
pub use opensmtpd_derive::event;
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! handlers {
|
||||
( $( $x:expr ),* ) => {
|
||||
{
|
||||
let mut temp_vec = Vec::new();
|
||||
$(
|
||||
temp_vec.push(($x)());
|
||||
)*
|
||||
temp_vec
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
struct SessionHandler {
|
||||
entry_rx: mpsc::Receiver<Entry>,
|
||||
event_handlers: Vec<EventHandler>,
|
||||
}
|
||||
|
||||
impl SessionHandler {
|
||||
fn new(entry_rx: mpsc::Receiver<Entry>, handlers_rx: mpsc::Receiver<EventHandler>) -> Self {
|
||||
debug!(
|
||||
"New thread for session {}",
|
||||
thread::current().name().unwrap()
|
||||
);
|
||||
let mut event_handlers = Vec::new();
|
||||
for h in handlers_rx.iter() {
|
||||
debug!("Event handler registered");
|
||||
event_handlers.push(h);
|
||||
}
|
||||
SessionHandler {
|
||||
entry_rx,
|
||||
event_handlers,
|
||||
}
|
||||
}
|
||||
|
||||
fn read_entries(&self) {
|
||||
for e in self.entry_rx.iter() {
|
||||
for h in self.event_handlers.iter() {
|
||||
if h.is_callable(e.event.clone()) {
|
||||
h.call(&e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SmtpIn {
|
||||
sessions: HashMap<u64, (mpsc::Sender<Entry>, thread::JoinHandle<()>)>,
|
||||
event_handlers: Vec<EventHandler>,
|
||||
}
|
||||
|
||||
impl SmtpIn {
|
||||
/// Read a line from the standard input.
|
||||
/// Since EOF should not append, it is considered as an error.
|
||||
fn read() -> Result<String, Error> {
|
||||
let mut input = String::new();
|
||||
let nb = io::stdin().read_line(&mut input)?;
|
||||
match nb {
|
||||
0 => Err(Error::new("end of file")),
|
||||
_ => Ok(input),
|
||||
}
|
||||
}
|
||||
|
||||
/// Dispatch the entry into its session's thread. If such thread does not
|
||||
/// already exists, creates it.
|
||||
fn dispatch(&mut self, input: &str) -> Result<(), Error> {
|
||||
let entry = Entry::from_str(input)?;
|
||||
let id = entry.session_id;
|
||||
let disconnect = entry.event == Event::LinkDisconnect;
|
||||
let channel = match self.sessions.get(&id) {
|
||||
Some((r, _)) => r,
|
||||
None => {
|
||||
let (handlers_tx, handlers_rx) = mpsc::channel();
|
||||
let (entry_tx, entry_rx) = mpsc::channel();
|
||||
let name = entry.session_id.to_string();
|
||||
let handle = thread::Builder::new().name(name).spawn(move || {
|
||||
SessionHandler::new(entry_rx, handlers_rx).read_entries();
|
||||
})?;
|
||||
for h in self.event_handlers.iter() {
|
||||
handlers_tx.send(h.clone())?;
|
||||
}
|
||||
self.sessions.insert(entry.session_id, (entry_tx, handle));
|
||||
let (r, _) = self.sessions.get(&entry.session_id).unwrap();
|
||||
r
|
||||
}
|
||||
};
|
||||
channel.send(entry)?;
|
||||
if disconnect {
|
||||
let _ = self.sessions.remove(&id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Allow each child thread to exit gracefully. First, the session table is
|
||||
/// drained so all the references to the senders are dropped, which will
|
||||
/// cause the receivers threads to exit. Then, we uses the join handlers in
|
||||
/// order to wait for the actual exit.
|
||||
fn graceful_exit_children(&mut self) {
|
||||
let mut handles = Vec::new();
|
||||
for (_, (_, h)) in self.sessions.drain() {
|
||||
handles.push(h);
|
||||
}
|
||||
for h in handles {
|
||||
let _ = h.join();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new() -> Self {
|
||||
SmtpIn {
|
||||
sessions: HashMap::new(),
|
||||
event_handlers: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn event_handlers(&mut self, handlers: Vec<EventHandler>) -> &mut Self {
|
||||
self.event_handlers = handlers.clone();
|
||||
self
|
||||
}
|
||||
|
||||
/// Run the infinite loop that will read and process input from stdin.
|
||||
pub fn run(&mut self) {
|
||||
loop {
|
||||
let line = match SmtpIn::read() {
|
||||
Ok(l) => l,
|
||||
Err(e) => {
|
||||
self.graceful_exit_children();
|
||||
error!("{}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
match self.dispatch(&line) {
|
||||
Ok(_) => {}
|
||||
Err(e) => warn!("{}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Reference in a new issue