diff --git a/README.md b/README.md index ea34ce4..9878e82 100644 --- a/README.md +++ b/README.md @@ -7,12 +7,10 @@ Rust binding for [OpenSMTPD] filters. This is a work in progress, the API is **not** stabilized yet. -- [ ] Thread-pool - [x] Reports - [ ] Filters -- [x] Session-level context -- [ ] Pool-level context - [ ] Filter-level context +- [ ] Session-level context [OpenSMTPD]: https://www.opensmtpd.org/ diff --git a/opensmtpd-derive/Cargo.toml b/opensmtpd-derive/Cargo.toml index 937e317..672047f 100644 --- a/opensmtpd-derive/Cargo.toml +++ b/opensmtpd-derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opensmtpd_derive" -version = "0.1.0" +version = "0.2.0" authors = ["Rodolphe Bréard "] edition = "2018" description = "Interface for OpenSMTPD filters" @@ -15,5 +15,5 @@ include = ["src/**/*", "Cargo.toml", "LICENSE-*.txt"] proc-macro = true [dependencies] -quote = "0.6" -syn = { version = "0.15", features = ["full"] } +quote = "1.0" +syn = { version = "1.0", features = ["full", "extra-traits"] } diff --git a/opensmtpd-derive/src/lib.rs b/opensmtpd-derive/src/lib.rs index 8c32abe..8dc5a58 100644 --- a/opensmtpd-derive/src/lib.rs +++ b/opensmtpd-derive/src/lib.rs @@ -10,70 +10,152 @@ extern crate proc_macro; use proc_macro::TokenStream; use quote::quote; -use syn::{parse_macro_input, ItemFn}; +use syn::parse::{Parse, ParseStream}; +use syn::punctuated::Punctuated; +use syn::{parenthesized, parse_macro_input, ExprArray, Ident, ItemFn, Result, Token, TypePath}; -fn get_type( - params: &syn::punctuated::Punctuated, -) -> Result<(Box, syn::Type), ()> { - match params.iter().count() { - 1 => { - let ctx = Box::new(syn::Type::Verbatim(syn::TypeVerbatim { - tts: quote! { - opensmtpd::NoContext - }, - })); - let cb = syn::Type::Verbatim(syn::TypeVerbatim { - tts: quote! { opensmtpd::Callback::NoCtx }, - }); - Ok((ctx, cb)) - } - 2 => match params.iter().next().unwrap() { - syn::FnArg::Captured(ref a) => match &a.ty { - syn::Type::Reference(r) => { - let cb = match r.mutability { - Some(_) => syn::Type::Verbatim(syn::TypeVerbatim { - tts: quote! { opensmtpd::Callback::CtxMut }, - }), - None => syn::Type::Verbatim(syn::TypeVerbatim { - tts: quote! { opensmtpd::Callback::Ctx }, - }), - }; - Ok((r.elem.clone(), cb)) - } - _ => Err(()), - }, - _ => Err(()), - }, - _ => Err(()), +#[derive(Debug)] +struct OpenSmtpdAttributes { + version: Ident, + subsystem: Ident, + events: Punctuated, +} + +impl Parse for OpenSmtpdAttributes { + fn parse(input: ParseStream) -> Result { + let version = input.parse()?; + let _: Token![,] = input.parse()?; + let subsystem = input.parse()?; + let _: Token![,] = input.parse()?; + let _match: Token![match] = input.parse()?; + let content; + let _ = parenthesized!(content in input); + let events = content.parse_terminated(Ident::parse)?; + Ok(OpenSmtpdAttributes { + version, + subsystem, + events, + }) } } -#[proc_macro_attribute] -pub fn event(attr: TokenStream, input: TokenStream) -> TokenStream { - let attr = attr.to_string(); - let item = parse_macro_input!(input as ItemFn); - let fn_name = &item.ident; - let fn_params = &item.decl.inputs; - let (ctx_type, callback_type) = match get_type(fn_params) { - Ok(t) => t, - Err(_e) => { - panic!(); +impl OpenSmtpdAttributes { + fn get_version(&self) -> String { + format!( + "opensmtpd::entry::Version::{}", + self.version.to_string().to_uppercase() + ) + } + + fn get_subsystem(&self) -> String { + let subsystem = match self.subsystem.to_string().as_str() { + "smtp_in" => "SmtpIn", + "smtp_out" => "SmtpOut", + _ => "", + }; + format!("opensmtpd::entry::Subsystem::{}", subsystem) + } + + fn get_events(&self) -> String { + let events = if self + .events + .iter() + .find(|&e| e.to_string().to_lowercase().as_str() == "all") + .is_some() + { + let lst = [ + "LinkAuth", + "LinkConnect", + "LinkDisconnect", + "LinkIdentify", + "LinkReset", + "LinkTls", + "TxBegin", + "TxMail", + "TxRcpt", + "TxEnvelope", + "TxData", + "TxCommit", + "TxRollback", + "ProtocolClient", + "ProtocolServer", + "FilterResponse", + "Timeout", + ]; + lst.iter() + .map(|e| format!("opensmtpd::entry::Event::{}", e)) + .collect::>() + } else { + self.events + .iter() + .map(|e| { + let name = match e.to_string().as_str() { + "link_auth" => "LinkAuth", + "link_connect" => "LinkConnect", + "link_disconnect" => "LinkDisconnect", + "link_identify" => "LinkIdentify", + "link_reset" => "LinkReset", + "link_tls" => "LinkTls", + "tx_begin" => "TxBegin", + "tx_mail" => "TxMail", + "tx_rcpt" => "TxRcpt", + "tx_envelope" => "TxEnvelope", + "tx_data" => "TxData", + "tx_commit" => "TxCommit", + "tx_rollback" => "TxRollback", + "protocol_client" => "ProtocolClient", + "protocol_server" => "ProtocolServer", + "filter_response" => "FilterResponse", + "timeout" => "Timeout", + _ => "", + }; + format!("opensmtpd::entry::Event::{}", name) + }) + .collect::>() + }; + format!("[{}]", events.join(", ")) + } +} + +macro_rules! parse_item { + ($item: expr, $type: ty) => { + match syn::parse_str::<$type>($item) { + Ok(i) => i, + Err(e) => { + return TokenStream::from(e.to_compile_error()); + } } }; - let fn_body = &item.block; - let fn_output = &item.decl.output; - let output = quote! { - fn #fn_name() -> opensmtpd::EventHandler<#ctx_type> { - opensmtpd::EventHandler::new( - #attr, - #callback_type(|#fn_params| #fn_output #fn_body) - ) - } - }; - output.into() } #[proc_macro_attribute] pub fn report(attr: TokenStream, input: TokenStream) -> TokenStream { - event(attr, input) + let attr = parse_macro_input!(attr as OpenSmtpdAttributes); + let version = parse_item!(&attr.get_version(), TypePath); + let subsystem = parse_item!(&attr.get_subsystem(), TypePath); + let events = parse_item!(&attr.get_events(), ExprArray); + let item = parse_macro_input!(input as ItemFn); + let fn_name = &item.sig.ident; + let fn_params = &item.sig.inputs; + let fn_body = &item.block; + let output = quote! { + fn #fn_name() -> opensmtpd::Handler { + opensmtpd::Handler::new( + #version, + opensmtpd::entry::Kind::Report, + #subsystem, + &#events, + |_output: &mut dyn opensmtpd::output::FilterOutput, entry: &opensmtpd::entry::Entry,| { + // TODO: look at `item.sig.output` and adapt the calling scheme. + // example: if no return, add `Ok(())`. + // https://docs.rs/syn/1.0.5/syn/struct.Signature.html + let inner_fn = |#fn_params| -> Result<(), String> { + #fn_body + }; + inner_fn(entry) + }, + ) + } + }; + output.into() } diff --git a/opensmtpd/Cargo.toml b/opensmtpd/Cargo.toml index 010b423..17cc838 100644 --- a/opensmtpd/Cargo.toml +++ b/opensmtpd/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opensmtpd" -version = "0.1.0" +version = "0.2.0" authors = ["Rodolphe Bréard "] edition = "2018" description = "Interface for OpenSMTPD filters" @@ -14,12 +14,12 @@ include = ["src/**/*", "Cargo.toml", "LICENSE-*.txt"] [dependencies] log = {version = "0.4", features = ["std"]} nom = "5.0" -opensmtpd_derive = { path = "../opensmtpd-derive", version = "0.1" } +opensmtpd_derive = { path = "../opensmtpd-derive", version = "0.2" } [[example]] -name = "dummy" -path = "examples/dummy.rs" +name = "echo" +path = "examples/echo.rs" -[[example]] -name = "counter" -path = "examples/session_event_counter.rs" +#[[example]] +#name = "counter" +#path = "examples/session_event_counter.rs" diff --git a/opensmtpd/examples/dummy.rs b/opensmtpd/examples/dummy.rs deleted file mode 100644 index a01eb14..0000000 --- a/opensmtpd/examples/dummy.rs +++ /dev/null @@ -1,19 +0,0 @@ -use log::{debug, info, Level}; -use opensmtpd::{event, handlers, Entry, SmtpIn, SmtpdLogger}; - -#[event(Any)] -fn on_event(entry: &Entry) { - debug!("Event received: {:?}", entry); -} - -#[event(LinkConnect)] -fn on_connect(entry: &Entry) { - info!("New client on session {:x}.", entry.get_session_id()); -} - -fn main() { - let _ = SmtpdLogger::new().set_level(Level::Debug).init(); - SmtpIn::new() - .event_handlers(handlers!(on_event, on_connect)) - .run(); -} diff --git a/opensmtpd/examples/echo.rs b/opensmtpd/examples/echo.rs new file mode 100644 index 0000000..14a0ff8 --- /dev/null +++ b/opensmtpd/examples/echo.rs @@ -0,0 +1,18 @@ +use opensmtpd::entry::Entry; +use opensmtpd::{report, simple_filter}; + +#[report(v1, smtp_in, match(all))] +fn echo_handler(entry: &Entry) -> Result<(), String> { + log::info!("TEST ENTRY: {:?}", entry); + Ok(()) +} + +#[report(v1, smtp_in, match(link_disconnect))] +fn test(entry: &Entry) { + log::info!("HAZ LINK DISCONNECT: {:?}", entry); + Ok(()) // TODO: REMOVE ME! +} + +fn main() { + simple_filter!(echo_handler, test); +} diff --git a/opensmtpd/examples/session_event_counter.rs b/opensmtpd/examples/session_event_counter.rs index 90d54c4..3dd93cb 100644 --- a/opensmtpd/examples/session_event_counter.rs +++ b/opensmtpd/examples/session_event_counter.rs @@ -1,5 +1,4 @@ -use log::{info, Level}; -use opensmtpd::{handlers, report, Entry, SmtpIn, SmtpdLogger}; +use opensmtpd::{report, simple_filter}; #[derive(Clone, Default)] struct MyContext { @@ -13,6 +12,5 @@ fn on_report(ctx: &mut MyContext, entry: &Entry) { } fn main() { - let _ = SmtpdLogger::new().set_level(Level::Debug).init(); - SmtpIn::new().event_handlers(handlers!(on_report)).run(); + simple_filter!(vec![on_report]); } diff --git a/opensmtpd/src/entry.rs b/opensmtpd/src/entry.rs index e3ed036..901b3cf 100644 --- a/opensmtpd/src/entry.rs +++ b/opensmtpd/src/entry.rs @@ -8,31 +8,32 @@ use crate::errors::Error; use nom::branch::alt; -use nom::bytes::complete::{tag, take_till}; -use nom::character::complete::{digit1, hex_digit1, line_ending}; +use nom::bytes::streaming::{tag, take_till}; +use nom::character::streaming::{digit1, hex_digit1, line_ending}; use nom::combinator::{map_res, value}; use nom::multi::many0; +use nom::Err::Incomplete; use nom::IResult; use std::str::FromStr; -#[derive(Clone, Debug, PartialEq)] -enum Version { +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Version { V1, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum Kind { Report, Filter, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum Subsystem { SmtpIn, SmtpOut, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] pub enum Event { LinkAuth, LinkConnect, @@ -114,16 +115,17 @@ pub enum Entry { V1Filter(V1Filter), } -impl FromStr for Entry { - type Err = Error; - - fn from_str(entry: &str) -> Result { - let (_, res) = parse_entry(entry)?; - Ok(res) - } -} - impl Entry { + pub fn new(entry: &str) -> Result<(String, Option), Error> { + match parse_entry(entry) { + Ok((remainder, entry)) => Ok((remainder.to_string(), Some(entry))), + Err(e) => match e { + Incomplete(_) => Ok((String::new(), None)), + _ => Err(e.into()), + }, + } + } + pub fn get_event(&self) -> Event { match self { Entry::V1Report(r) => r.event.to_owned(), @@ -244,7 +246,7 @@ fn parse_v1_report(input: &str) -> IResult<&str, Entry> { let (input, _) = separator(input)?; let (input, session_id) = parse_session_id(input)?; let (input, params) = many0(parse_param)(input)?; - let _ = line_ending(input)?; + let (input, _) = line_ending(input)?; let report = V1Report { timestamp, subsystem, @@ -266,7 +268,7 @@ fn parse_v1_filter(input: &str) -> IResult<&str, Entry> { let (input, _) = separator(input)?; let (input, token) = parse_token(input)?; let (input, params) = many0(parse_param)(input)?; - let _ = line_ending(input)?; + let (input, _) = line_ending(input)?; let filter = V1Filter { timestamp, subsystem, diff --git a/opensmtpd/src/errors.rs b/opensmtpd/src/errors.rs index 1ba6b8d..bb7b240 100644 --- a/opensmtpd/src/errors.rs +++ b/opensmtpd/src/errors.rs @@ -6,8 +6,6 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use crate::entry::Entry; -use crate::event_handlers::EventHandler; use std::fmt; pub struct Error { @@ -34,6 +32,18 @@ impl From for Error { } } +impl From for Error { + fn from(error: std::string::String) -> Self { + Error { message: error } + } +} + +impl From for Error { + fn from(error: std::str::Utf8Error) -> Self { + Error::new(&format!("UTF8 error: {}", error)) + } +} + impl From for Error { fn from(error: log::SetLoggerError) -> Self { Error::new(&format!("Logger error: {}", error)) @@ -50,15 +60,3 @@ impl From> for Error { Error::new(&format!("Parsing error: {}", msg)) } } - -impl From> for Error { - fn from(error: std::sync::mpsc::SendError) -> Self { - Error::new(&format!("IO error: {}", error)) - } -} - -impl From>> for Error { - fn from(error: std::sync::mpsc::SendError>) -> Self { - Error::new(&format!("IO error: {}", error)) - } -} diff --git a/opensmtpd/src/event_handlers.rs b/opensmtpd/src/event_handlers.rs deleted file mode 100644 index 361eb97..0000000 --- a/opensmtpd/src/event_handlers.rs +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright (c) 2019 Rodolphe Bréard -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use crate::entry::{Entry, Event}; -use std::str::FromStr; - -#[derive(Clone, Debug, PartialEq)] -pub enum MatchEvent { - Evt(Vec), - All, -} - -#[derive(Clone)] -pub enum Callback { - NoCtx(fn(&Entry)), - Ctx(fn(&T, &Entry)), - CtxMut(fn(&mut T, &Entry)), -} - -#[derive(Clone)] -pub struct EventHandler { - pub(crate) event: MatchEvent, - callback: Callback, -} - -fn get_events_from_string(event_str: &str) -> MatchEvent { - let mut events = Vec::new(); - for name in event_str.split(" , ") { - match name { - "Any" | "All" => { - return MatchEvent::All; - } - _ => { - if let Ok(e) = Event::from_str(name) { - events.push(e); - } - } - } - } - MatchEvent::Evt(events) -} - -impl EventHandler { - pub fn new(event_str: &str, callback: Callback) -> Self { - EventHandler { - event: get_events_from_string(event_str), - callback, - } - } - - pub fn is_callable(&self, event: &Event) -> bool { - match &self.event { - MatchEvent::All => true, - MatchEvent::Evt(v) => v.contains(&event), - } - } - - pub fn call(&self, entry: &Entry, context: &mut T) { - match self.callback { - Callback::NoCtx(f) => f(entry), - Callback::Ctx(f) => f(context, entry), - Callback::CtxMut(f) => f(context, entry), - }; - } -} - -#[cfg(test)] -mod test { - use crate::*; - - #[test] - fn test_eventhandler_build_noctx() { - EventHandler::new("Any", Callback::NoCtx::(|_entry: &Entry| {})); - } - - #[test] - fn test_eventhandler_build_ctx() { - EventHandler::new( - "Any", - Callback::Ctx(|_context: &NoContext, _entry: &Entry| {}), - ); - } - - #[test] - fn test_eventhandler_build_ctxmut() { - EventHandler::new( - "Any", - Callback::CtxMut(|_context: &mut NoContext, _entry: &Entry| {}), - ); - } -} diff --git a/opensmtpd/src/handler.rs b/opensmtpd/src/handler.rs new file mode 100644 index 0000000..cb41b81 --- /dev/null +++ b/opensmtpd/src/handler.rs @@ -0,0 +1,65 @@ +// Copyright (c) 2019 Rodolphe Bréard +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use crate::entry::{Entry, Event, Kind, Subsystem, Version}; +use crate::errors::Error; +use crate::output::FilterOutput; +use std::collections::HashSet; + +macro_rules! handle { + ($self: ident, $obj: ident, $version: expr, $kind: expr, $entry: ident, $output: ident) => {{ + if $self.version == $version + && $self.kind == $kind + && $self.subsystem == $obj.subsystem + && $self.events.contains(&$obj.event) + { + ($self.action)($output, $entry)?; + } + Ok(()) + }}; +} + +type Callback = fn(&mut dyn FilterOutput, &Entry) -> Result<(), String>; + +#[derive(Clone)] +pub struct Handler { + version: Version, + kind: Kind, + subsystem: Subsystem, + events: HashSet, + action: Callback, +} + +impl Handler { + pub fn new( + version: Version, + kind: Kind, + subsystem: Subsystem, + events: &[Event], + action: Callback, + ) -> Self { + Handler { + version, + kind, + subsystem, + events: events.iter().cloned().collect(), + action, + } + } + + pub fn send(&self, entry: &Entry, output: &mut dyn FilterOutput) -> Result<(), Error> { + match entry { + Entry::V1Report(report) => { + handle!(self, report, Version::V1, Kind::Report, entry, output) + } + Entry::V1Filter(filter) => { + handle!(self, filter, Version::V1, Kind::Filter, entry, output) + } + } + } +} diff --git a/opensmtpd/src/response.rs b/opensmtpd/src/input.rs similarity index 68% rename from opensmtpd/src/response.rs rename to opensmtpd/src/input.rs index a0a75df..ba3b057 100644 --- a/opensmtpd/src/response.rs +++ b/opensmtpd/src/input.rs @@ -6,6 +6,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -pub enum Response { - None, +use crate::entry::Entry; +use crate::errors::Error; + +pub trait FilterInput { + fn next(&mut self) -> Result; } + +mod stdin; +pub use stdin::StdIn; diff --git a/opensmtpd/src/input/stdin.rs b/opensmtpd/src/input/stdin.rs new file mode 100644 index 0000000..bd483b3 --- /dev/null +++ b/opensmtpd/src/input/stdin.rs @@ -0,0 +1,66 @@ +// Copyright (c) 2019 Rodolphe Bréard +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use crate::entry::Entry; +use crate::errors::Error; +use crate::input::FilterInput; +use std::default::Default; +use std::io::{self, Read}; +use std::str; + +const BUFFER_SIZE: usize = 4096; + +pub struct StdIn { + buffer: [u8; BUFFER_SIZE], + stdin: io::Stdin, + input: String, +} + +impl Default for StdIn { + fn default() -> Self { + StdIn { + buffer: [0; BUFFER_SIZE], + stdin: io::stdin(), + input: String::new(), + } + } +} + +impl FilterInput for StdIn { + fn next(&mut self) -> Result { + let mut force_read = false; + loop { + if force_read || self.input.is_empty() { + // Read stdin in self.buffer + self.buffer.copy_from_slice(&[0; BUFFER_SIZE]); + let len = self.stdin.read(&mut self.buffer)?; + if len == 0 { + continue; + } + // Put the buffer's content in self.input + self.input += match self.buffer.iter().position(|&x| x == 0) { + Some(i) => str::from_utf8(&self.buffer[..i]), + None => str::from_utf8(&self.buffer), + }?; + } + // Try to build an entry from self.input + let (remainder, entry_opt) = Entry::new(&self.input)?; + match entry_opt { + // We have at least one entry. + Some(entry) => { + self.input = remainder; + return Ok(entry); + } + // The data is incomplete, no entry could be built. + None => { + force_read = true; + } + }; + } + } +} diff --git a/opensmtpd/src/lib.rs b/opensmtpd/src/lib.rs index 763a5f1..d204577 100644 --- a/opensmtpd/src/lib.rs +++ b/opensmtpd/src/lib.rs @@ -6,160 +6,101 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -mod entry; mod errors; -mod event_handlers; +mod handler; mod logger; -mod response; -mod session_handler; -use crate::session_handler::SessionHandler; -use log::{error, warn}; -use std::collections::HashMap; -use std::io; -use std::str::FromStr; -use std::sync::mpsc; -use std::thread; +pub mod entry; +pub mod input; +pub mod output; + +use log; +use std::default::Default; -pub use crate::entry::{Entry, Event}; pub use crate::errors::Error; -pub use crate::event_handlers::{Callback, EventHandler, MatchEvent}; +pub use crate::handler::Handler; pub use crate::logger::SmtpdLogger; -pub use crate::response::Response; -pub use opensmtpd_derive::{event, report}; +pub use opensmtpd_derive::report; #[macro_export] -macro_rules! handlers { - ( $( $x:expr ),* ) => { - { - let mut temp_vec = Vec::new(); - $( - temp_vec.push(($x)()); - )* - temp_vec - } +macro_rules! simple_filter { + ($( $x:expr ),*) => { + opensmtpd::simple_filter_log_level!(log::Level::Info $(,$x)*); }; } -#[derive(Clone, Default)] -pub struct NoContext; - -#[derive(Default)] -pub struct SmtpIn { - sessions: HashMap, thread::JoinHandle<()>)>, - event_handlers: Vec>, +#[macro_export] +macro_rules! simple_filter_log_level { + ($log_level: expr, $( $x:expr ),*) => { + let mut handlers = Vec::new(); + $( + handlers.push(($x)()); + )*; + let _ = opensmtpd::SmtpdLogger::new() + .set_level($log_level) + .init(); + opensmtpd::Filter::::default().set_handlers(&handlers).register_events().run(); + }; } -impl SmtpIn { - fn register_events(&self) { - let mut evts = Vec::new(); - for eh in self.event_handlers.iter() { - match eh.event { - MatchEvent::Evt(ref v) => { - for e in v.iter() { - evts.push(e); - } - } - MatchEvent::All => { - println!("register|report|smtp-in|*"); - evts.clear(); - break; - } - } - } - evts.dedup(); - for e in evts.iter() { - println!("register|report|smtp-in|{}", e.to_string()); - } - // TODO: register filters - // println!("register|filter|smtp-in|{}", "name"); - println!("register|ready"); - } +pub struct Filter +where + I: crate::input::FilterInput + Default, + O: crate::output::FilterOutput + Default, +{ + input: I, + output: O, + handlers: Vec, +} - /// Read a line from the standard input. - /// Since EOF should not append, it is considered as an error. - fn read(&self) -> Result { - let mut input = String::new(); - let nb = io::stdin().read_line(&mut input)?; - match nb { - 0 => Err(Error::new("end of file")), - _ => Ok(input), +impl Default for Filter +where + I: crate::input::FilterInput + Default, + O: crate::output::FilterOutput + Default, +{ + fn default() -> Self { + Filter { + input: I::default(), + output: O::default(), + handlers: Vec::new(), } } +} - /// Dispatch the entry into its session's thread. If such thread does not - /// already exists, creates it. - fn dispatch(&mut self, input: &str) -> Result<(), Error> { - let entry = Entry::from_str(input)?; - let id = entry.get_session_id(); - let disconnect = entry.is_disconnect(); - let channel = match self.sessions.get(&id) { - Some((r, _)) => r, - None => { - let (handlers_tx, handlers_rx) = mpsc::channel(); - let (entry_tx, entry_rx) = mpsc::channel(); - let name = entry.get_session_id().to_string(); - let handle = thread::Builder::new().name(name).spawn(move || { - SessionHandler::new(entry_rx, &handlers_rx).read_entries(); - })?; - for h in self.event_handlers.iter() { - handlers_tx.send(h.clone())?; - } - self.sessions - .insert(entry.get_session_id(), (entry_tx, handle)); - let (r, _) = &self.sessions[&entry.get_session_id()]; - r - } - }; - channel.send(entry)?; - if disconnect { - let _ = self.sessions.remove(&id); - } - Ok(()) - } - - /// Allow each child thread to exit gracefully. First, the session table is - /// drained so all the references to the senders are dropped, which will - /// cause the receivers threads to exit. Then, we uses the join handlers in - /// order to wait for the actual exit. - fn graceful_exit_children(&mut self) { - let mut handles = Vec::new(); - for (_, (_, h)) in self.sessions.drain() { - handles.push(h); - } - for h in handles { - let _ = h.join(); - } - } - - pub fn new() -> Self { - SmtpIn { - sessions: HashMap::new(), - event_handlers: Vec::new(), - } - } - - pub fn event_handlers(&mut self, handlers: Vec>) -> &mut Self { - self.event_handlers = handlers.to_owned(); +impl Filter +where + I: crate::input::FilterInput + Default, + O: crate::output::FilterOutput + Default, +{ + pub fn set_handlers(&mut self, handlers: &[Handler]) -> &mut Self { + self.handlers = handlers.to_vec(); + self + } + + pub fn register_events(&mut self) -> &mut Self { + // TODO: use self.output to register events self } - /// Run the infinite loop that will read and process input from stdin. pub fn run(&mut self) { - self.register_events(); loop { - let line = match self.read() { - Ok(l) => l, + match self.input.next() { + Ok(entry) => { + log::debug!("{:?}", entry); + for h in self.handlers.iter() { + match h.send(&entry, &mut self.output) { + Ok(_) => {} + Err(e) => { + log::warn!("Warning: {}", e); + } + }; + } + } Err(e) => { - self.graceful_exit_children(); - error!("{}", e); + log::error!("Error: {}", e); std::process::exit(1); } }; - match self.dispatch(&line) { - Ok(_) => {} - Err(e) => warn!("{}", e), - } } } } diff --git a/opensmtpd/src/output.rs b/opensmtpd/src/output.rs new file mode 100644 index 0000000..5a60636 --- /dev/null +++ b/opensmtpd/src/output.rs @@ -0,0 +1,19 @@ +// Copyright (c) 2019 Rodolphe Bréard +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use crate::errors::Error; + +pub trait FilterOutput { + fn send(&mut self, msg: &str) -> Result<(), Error>; +} + +mod null; +mod stdout; + +pub use null::NullOutput; +pub use stdout::{StdErr, StdOut}; diff --git a/opensmtpd/src/output/null.rs b/opensmtpd/src/output/null.rs new file mode 100644 index 0000000..acfcaf4 --- /dev/null +++ b/opensmtpd/src/output/null.rs @@ -0,0 +1,25 @@ +// Copyright (c) 2019 Rodolphe Bréard +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use crate::errors::Error; +use crate::output::FilterOutput; +use std::default::Default; + +pub struct NullOutput {} + +impl Default for NullOutput { + fn default() -> Self { + NullOutput {} + } +} + +impl FilterOutput for NullOutput { + fn send(&mut self, _msg: &str) -> Result<(), Error> { + Ok(()) + } +} diff --git a/opensmtpd/src/output/stdout.rs b/opensmtpd/src/output/stdout.rs new file mode 100644 index 0000000..9d2dd3e --- /dev/null +++ b/opensmtpd/src/output/stdout.rs @@ -0,0 +1,33 @@ +// Copyright (c) 2019 Rodolphe Bréard +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use crate::errors::Error; +use crate::output::FilterOutput; +use std::default::Default; + +macro_rules! new_stdout { + ($name: ident, $out: ident) => { + pub struct $name {} + + impl Default for $name { + fn default() -> Self { + $name {} + } + } + + impl FilterOutput for $name { + fn send(&mut self, msg: &str) -> Result<(), Error> { + $out!("{}", msg); + Ok(()) + } + } + }; +} + +new_stdout!(StdOut, println); +new_stdout!(StdErr, eprintln); diff --git a/opensmtpd/src/session_handler.rs b/opensmtpd/src/session_handler.rs deleted file mode 100644 index 90eff57..0000000 --- a/opensmtpd/src/session_handler.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) 2019 Rodolphe Bréard -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use crate::entry::Entry; -use crate::event_handlers::EventHandler; -use log::debug; -use std::sync::mpsc; -use std::thread; - -pub struct SessionHandler { - entry_rx: mpsc::Receiver, - event_handlers: Vec>, -} - -impl SessionHandler { - pub fn new( - entry_rx: mpsc::Receiver, - handlers_rx: &mpsc::Receiver>, - ) -> Self { - debug!( - "New thread for session {}", - thread::current().name().unwrap() - ); - let mut event_handlers = Vec::new(); - for h in handlers_rx.iter() { - debug!("Event handler registered"); - event_handlers.push(h); - } - SessionHandler { - entry_rx, - event_handlers, - } - } - - pub fn read_entries(&self) { - let mut context: T = Default::default(); - for e in self.entry_rx.iter() { - for h in self.event_handlers.iter() { - if h.is_callable(&e.get_event()) { - h.call(&e, &mut context); - } - } - } - } -}