Refactor the library

Threads are a bad idea because for now the filter API is not guaranteed
to be state-less. The interface is now synchronous, which should be
enough for most filters.
The refactoring brought other changes, the most important being the
concept of modular input sources and output destination and the complete
rewrite of the procedural macro.
This commit is contained in:
Rodolphe Breard 2019-09-17 16:45:04 +02:00
parent 988f028c23
commit 45639f18c0
18 changed files with 486 additions and 400 deletions

View file

@ -6,160 +6,101 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
mod entry;
mod errors;
mod event_handlers;
mod handler;
mod logger;
mod response;
mod session_handler;
use crate::session_handler::SessionHandler;
use log::{error, warn};
use std::collections::HashMap;
use std::io;
use std::str::FromStr;
use std::sync::mpsc;
use std::thread;
pub mod entry;
pub mod input;
pub mod output;
use log;
use std::default::Default;
pub use crate::entry::{Entry, Event};
pub use crate::errors::Error;
pub use crate::event_handlers::{Callback, EventHandler, MatchEvent};
pub use crate::handler::Handler;
pub use crate::logger::SmtpdLogger;
pub use crate::response::Response;
pub use opensmtpd_derive::{event, report};
pub use opensmtpd_derive::report;
#[macro_export]
macro_rules! handlers {
( $( $x:expr ),* ) => {
{
let mut temp_vec = Vec::new();
$(
temp_vec.push(($x)());
)*
temp_vec
}
macro_rules! simple_filter {
($( $x:expr ),*) => {
opensmtpd::simple_filter_log_level!(log::Level::Info $(,$x)*);
};
}
#[derive(Clone, Default)]
pub struct NoContext;
#[derive(Default)]
pub struct SmtpIn<T> {
sessions: HashMap<u64, (mpsc::Sender<Entry>, thread::JoinHandle<()>)>,
event_handlers: Vec<EventHandler<T>>,
#[macro_export]
macro_rules! simple_filter_log_level {
($log_level: expr, $( $x:expr ),*) => {
let mut handlers = Vec::new();
$(
handlers.push(($x)());
)*;
let _ = opensmtpd::SmtpdLogger::new()
.set_level($log_level)
.init();
opensmtpd::Filter::<opensmtpd::input::StdIn, opensmtpd::output::StdOut>::default().set_handlers(&handlers).register_events().run();
};
}
impl<T: Clone + Default + 'static> SmtpIn<T> {
fn register_events(&self) {
let mut evts = Vec::new();
for eh in self.event_handlers.iter() {
match eh.event {
MatchEvent::Evt(ref v) => {
for e in v.iter() {
evts.push(e);
}
}
MatchEvent::All => {
println!("register|report|smtp-in|*");
evts.clear();
break;
}
}
}
evts.dedup();
for e in evts.iter() {
println!("register|report|smtp-in|{}", e.to_string());
}
// TODO: register filters
// println!("register|filter|smtp-in|{}", "name");
println!("register|ready");
}
pub struct Filter<I, O>
where
I: crate::input::FilterInput + Default,
O: crate::output::FilterOutput + Default,
{
input: I,
output: O,
handlers: Vec<Handler>,
}
/// Read a line from the standard input.
/// Since EOF should not append, it is considered as an error.
fn read(&self) -> Result<String, Error> {
let mut input = String::new();
let nb = io::stdin().read_line(&mut input)?;
match nb {
0 => Err(Error::new("end of file")),
_ => Ok(input),
impl<I, O> Default for Filter<I, O>
where
I: crate::input::FilterInput + Default,
O: crate::output::FilterOutput + Default,
{
fn default() -> Self {
Filter {
input: I::default(),
output: O::default(),
handlers: Vec::new(),
}
}
}
/// Dispatch the entry into its session's thread. If such thread does not
/// already exists, creates it.
fn dispatch(&mut self, input: &str) -> Result<(), Error> {
let entry = Entry::from_str(input)?;
let id = entry.get_session_id();
let disconnect = entry.is_disconnect();
let channel = match self.sessions.get(&id) {
Some((r, _)) => r,
None => {
let (handlers_tx, handlers_rx) = mpsc::channel();
let (entry_tx, entry_rx) = mpsc::channel();
let name = entry.get_session_id().to_string();
let handle = thread::Builder::new().name(name).spawn(move || {
SessionHandler::new(entry_rx, &handlers_rx).read_entries();
})?;
for h in self.event_handlers.iter() {
handlers_tx.send(h.clone())?;
}
self.sessions
.insert(entry.get_session_id(), (entry_tx, handle));
let (r, _) = &self.sessions[&entry.get_session_id()];
r
}
};
channel.send(entry)?;
if disconnect {
let _ = self.sessions.remove(&id);
}
Ok(())
}
/// Allow each child thread to exit gracefully. First, the session table is
/// drained so all the references to the senders are dropped, which will
/// cause the receivers threads to exit. Then, we uses the join handlers in
/// order to wait for the actual exit.
fn graceful_exit_children(&mut self) {
let mut handles = Vec::new();
for (_, (_, h)) in self.sessions.drain() {
handles.push(h);
}
for h in handles {
let _ = h.join();
}
}
pub fn new() -> Self {
SmtpIn {
sessions: HashMap::new(),
event_handlers: Vec::new(),
}
}
pub fn event_handlers(&mut self, handlers: Vec<EventHandler<T>>) -> &mut Self {
self.event_handlers = handlers.to_owned();
impl<I, O> Filter<I, O>
where
I: crate::input::FilterInput + Default,
O: crate::output::FilterOutput + Default,
{
pub fn set_handlers(&mut self, handlers: &[Handler]) -> &mut Self {
self.handlers = handlers.to_vec();
self
}
pub fn register_events(&mut self) -> &mut Self {
// TODO: use self.output to register events
self
}
/// Run the infinite loop that will read and process input from stdin.
pub fn run(&mut self) {
self.register_events();
loop {
let line = match self.read() {
Ok(l) => l,
match self.input.next() {
Ok(entry) => {
log::debug!("{:?}", entry);
for h in self.handlers.iter() {
match h.send(&entry, &mut self.output) {
Ok(_) => {}
Err(e) => {
log::warn!("Warning: {}", e);
}
};
}
}
Err(e) => {
self.graceful_exit_children();
error!("{}", e);
log::error!("Error: {}", e);
std::process::exit(1);
}
};
match self.dispatch(&line) {
Ok(_) => {}
Err(e) => warn!("{}", e),
}
}
}
}