From 45639f18c028753ca7631339ca7c564ea5bb30c0 Mon Sep 17 00:00:00 2001 From: Rodolphe Breard Date: Tue, 17 Sep 2019 16:45:04 +0200 Subject: [PATCH] Refactor the library Threads are a bad idea because for now the filter API is not guaranteed to be state-less. The interface is now synchronous, which should be enough for most filters. The refactoring brought other changes, the most important being the concept of modular input sources and output destination and the complete rewrite of the procedural macro. --- README.md | 4 +- opensmtpd-derive/Cargo.toml | 6 +- opensmtpd-derive/src/lib.rs | 194 +++++++++++++------ opensmtpd/Cargo.toml | 14 +- opensmtpd/examples/dummy.rs | 19 -- opensmtpd/examples/echo.rs | 18 ++ opensmtpd/examples/session_event_counter.rs | 6 +- opensmtpd/src/entry.rs | 38 ++-- opensmtpd/src/errors.rs | 26 ++- opensmtpd/src/event_handlers.rs | 96 ---------- opensmtpd/src/handler.rs | 65 +++++++ opensmtpd/src/{response.rs => input.rs} | 10 +- opensmtpd/src/input/stdin.rs | 66 +++++++ opensmtpd/src/lib.rs | 197 +++++++------------- opensmtpd/src/output.rs | 19 ++ opensmtpd/src/output/null.rs | 25 +++ opensmtpd/src/output/stdout.rs | 33 ++++ opensmtpd/src/session_handler.rs | 50 ----- 18 files changed, 486 insertions(+), 400 deletions(-) delete mode 100644 opensmtpd/examples/dummy.rs create mode 100644 opensmtpd/examples/echo.rs delete mode 100644 opensmtpd/src/event_handlers.rs create mode 100644 opensmtpd/src/handler.rs rename opensmtpd/src/{response.rs => input.rs} (68%) create mode 100644 opensmtpd/src/input/stdin.rs create mode 100644 opensmtpd/src/output.rs create mode 100644 opensmtpd/src/output/null.rs create mode 100644 opensmtpd/src/output/stdout.rs delete mode 100644 opensmtpd/src/session_handler.rs diff --git a/README.md b/README.md index ea34ce4..9878e82 100644 --- a/README.md +++ b/README.md @@ -7,12 +7,10 @@ Rust binding for [OpenSMTPD] filters. This is a work in progress, the API is **not** stabilized yet. -- [ ] Thread-pool - [x] Reports - [ ] Filters -- [x] Session-level context -- [ ] Pool-level context - [ ] Filter-level context +- [ ] Session-level context [OpenSMTPD]: https://www.opensmtpd.org/ diff --git a/opensmtpd-derive/Cargo.toml b/opensmtpd-derive/Cargo.toml index 937e317..672047f 100644 --- a/opensmtpd-derive/Cargo.toml +++ b/opensmtpd-derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opensmtpd_derive" -version = "0.1.0" +version = "0.2.0" authors = ["Rodolphe Bréard "] edition = "2018" description = "Interface for OpenSMTPD filters" @@ -15,5 +15,5 @@ include = ["src/**/*", "Cargo.toml", "LICENSE-*.txt"] proc-macro = true [dependencies] -quote = "0.6" -syn = { version = "0.15", features = ["full"] } +quote = "1.0" +syn = { version = "1.0", features = ["full", "extra-traits"] } diff --git a/opensmtpd-derive/src/lib.rs b/opensmtpd-derive/src/lib.rs index 8c32abe..8dc5a58 100644 --- a/opensmtpd-derive/src/lib.rs +++ b/opensmtpd-derive/src/lib.rs @@ -10,70 +10,152 @@ extern crate proc_macro; use proc_macro::TokenStream; use quote::quote; -use syn::{parse_macro_input, ItemFn}; +use syn::parse::{Parse, ParseStream}; +use syn::punctuated::Punctuated; +use syn::{parenthesized, parse_macro_input, ExprArray, Ident, ItemFn, Result, Token, TypePath}; -fn get_type( - params: &syn::punctuated::Punctuated, -) -> Result<(Box, syn::Type), ()> { - match params.iter().count() { - 1 => { - let ctx = Box::new(syn::Type::Verbatim(syn::TypeVerbatim { - tts: quote! { - opensmtpd::NoContext - }, - })); - let cb = syn::Type::Verbatim(syn::TypeVerbatim { - tts: quote! { opensmtpd::Callback::NoCtx }, - }); - Ok((ctx, cb)) - } - 2 => match params.iter().next().unwrap() { - syn::FnArg::Captured(ref a) => match &a.ty { - syn::Type::Reference(r) => { - let cb = match r.mutability { - Some(_) => syn::Type::Verbatim(syn::TypeVerbatim { - tts: quote! { opensmtpd::Callback::CtxMut }, - }), - None => syn::Type::Verbatim(syn::TypeVerbatim { - tts: quote! { opensmtpd::Callback::Ctx }, - }), - }; - Ok((r.elem.clone(), cb)) - } - _ => Err(()), - }, - _ => Err(()), - }, - _ => Err(()), +#[derive(Debug)] +struct OpenSmtpdAttributes { + version: Ident, + subsystem: Ident, + events: Punctuated, +} + +impl Parse for OpenSmtpdAttributes { + fn parse(input: ParseStream) -> Result { + let version = input.parse()?; + let _: Token![,] = input.parse()?; + let subsystem = input.parse()?; + let _: Token![,] = input.parse()?; + let _match: Token![match] = input.parse()?; + let content; + let _ = parenthesized!(content in input); + let events = content.parse_terminated(Ident::parse)?; + Ok(OpenSmtpdAttributes { + version, + subsystem, + events, + }) } } -#[proc_macro_attribute] -pub fn event(attr: TokenStream, input: TokenStream) -> TokenStream { - let attr = attr.to_string(); - let item = parse_macro_input!(input as ItemFn); - let fn_name = &item.ident; - let fn_params = &item.decl.inputs; - let (ctx_type, callback_type) = match get_type(fn_params) { - Ok(t) => t, - Err(_e) => { - panic!(); +impl OpenSmtpdAttributes { + fn get_version(&self) -> String { + format!( + "opensmtpd::entry::Version::{}", + self.version.to_string().to_uppercase() + ) + } + + fn get_subsystem(&self) -> String { + let subsystem = match self.subsystem.to_string().as_str() { + "smtp_in" => "SmtpIn", + "smtp_out" => "SmtpOut", + _ => "", + }; + format!("opensmtpd::entry::Subsystem::{}", subsystem) + } + + fn get_events(&self) -> String { + let events = if self + .events + .iter() + .find(|&e| e.to_string().to_lowercase().as_str() == "all") + .is_some() + { + let lst = [ + "LinkAuth", + "LinkConnect", + "LinkDisconnect", + "LinkIdentify", + "LinkReset", + "LinkTls", + "TxBegin", + "TxMail", + "TxRcpt", + "TxEnvelope", + "TxData", + "TxCommit", + "TxRollback", + "ProtocolClient", + "ProtocolServer", + "FilterResponse", + "Timeout", + ]; + lst.iter() + .map(|e| format!("opensmtpd::entry::Event::{}", e)) + .collect::>() + } else { + self.events + .iter() + .map(|e| { + let name = match e.to_string().as_str() { + "link_auth" => "LinkAuth", + "link_connect" => "LinkConnect", + "link_disconnect" => "LinkDisconnect", + "link_identify" => "LinkIdentify", + "link_reset" => "LinkReset", + "link_tls" => "LinkTls", + "tx_begin" => "TxBegin", + "tx_mail" => "TxMail", + "tx_rcpt" => "TxRcpt", + "tx_envelope" => "TxEnvelope", + "tx_data" => "TxData", + "tx_commit" => "TxCommit", + "tx_rollback" => "TxRollback", + "protocol_client" => "ProtocolClient", + "protocol_server" => "ProtocolServer", + "filter_response" => "FilterResponse", + "timeout" => "Timeout", + _ => "", + }; + format!("opensmtpd::entry::Event::{}", name) + }) + .collect::>() + }; + format!("[{}]", events.join(", ")) + } +} + +macro_rules! parse_item { + ($item: expr, $type: ty) => { + match syn::parse_str::<$type>($item) { + Ok(i) => i, + Err(e) => { + return TokenStream::from(e.to_compile_error()); + } } }; - let fn_body = &item.block; - let fn_output = &item.decl.output; - let output = quote! { - fn #fn_name() -> opensmtpd::EventHandler<#ctx_type> { - opensmtpd::EventHandler::new( - #attr, - #callback_type(|#fn_params| #fn_output #fn_body) - ) - } - }; - output.into() } #[proc_macro_attribute] pub fn report(attr: TokenStream, input: TokenStream) -> TokenStream { - event(attr, input) + let attr = parse_macro_input!(attr as OpenSmtpdAttributes); + let version = parse_item!(&attr.get_version(), TypePath); + let subsystem = parse_item!(&attr.get_subsystem(), TypePath); + let events = parse_item!(&attr.get_events(), ExprArray); + let item = parse_macro_input!(input as ItemFn); + let fn_name = &item.sig.ident; + let fn_params = &item.sig.inputs; + let fn_body = &item.block; + let output = quote! { + fn #fn_name() -> opensmtpd::Handler { + opensmtpd::Handler::new( + #version, + opensmtpd::entry::Kind::Report, + #subsystem, + &#events, + |_output: &mut dyn opensmtpd::output::FilterOutput, entry: &opensmtpd::entry::Entry,| { + // TODO: look at `item.sig.output` and adapt the calling scheme. + // example: if no return, add `Ok(())`. + // https://docs.rs/syn/1.0.5/syn/struct.Signature.html + let inner_fn = |#fn_params| -> Result<(), String> { + #fn_body + }; + inner_fn(entry) + }, + ) + } + }; + output.into() } diff --git a/opensmtpd/Cargo.toml b/opensmtpd/Cargo.toml index 010b423..17cc838 100644 --- a/opensmtpd/Cargo.toml +++ b/opensmtpd/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opensmtpd" -version = "0.1.0" +version = "0.2.0" authors = ["Rodolphe Bréard "] edition = "2018" description = "Interface for OpenSMTPD filters" @@ -14,12 +14,12 @@ include = ["src/**/*", "Cargo.toml", "LICENSE-*.txt"] [dependencies] log = {version = "0.4", features = ["std"]} nom = "5.0" -opensmtpd_derive = { path = "../opensmtpd-derive", version = "0.1" } +opensmtpd_derive = { path = "../opensmtpd-derive", version = "0.2" } [[example]] -name = "dummy" -path = "examples/dummy.rs" +name = "echo" +path = "examples/echo.rs" -[[example]] -name = "counter" -path = "examples/session_event_counter.rs" +#[[example]] +#name = "counter" +#path = "examples/session_event_counter.rs" diff --git a/opensmtpd/examples/dummy.rs b/opensmtpd/examples/dummy.rs deleted file mode 100644 index a01eb14..0000000 --- a/opensmtpd/examples/dummy.rs +++ /dev/null @@ -1,19 +0,0 @@ -use log::{debug, info, Level}; -use opensmtpd::{event, handlers, Entry, SmtpIn, SmtpdLogger}; - -#[event(Any)] -fn on_event(entry: &Entry) { - debug!("Event received: {:?}", entry); -} - -#[event(LinkConnect)] -fn on_connect(entry: &Entry) { - info!("New client on session {:x}.", entry.get_session_id()); -} - -fn main() { - let _ = SmtpdLogger::new().set_level(Level::Debug).init(); - SmtpIn::new() - .event_handlers(handlers!(on_event, on_connect)) - .run(); -} diff --git a/opensmtpd/examples/echo.rs b/opensmtpd/examples/echo.rs new file mode 100644 index 0000000..14a0ff8 --- /dev/null +++ b/opensmtpd/examples/echo.rs @@ -0,0 +1,18 @@ +use opensmtpd::entry::Entry; +use opensmtpd::{report, simple_filter}; + +#[report(v1, smtp_in, match(all))] +fn echo_handler(entry: &Entry) -> Result<(), String> { + log::info!("TEST ENTRY: {:?}", entry); + Ok(()) +} + +#[report(v1, smtp_in, match(link_disconnect))] +fn test(entry: &Entry) { + log::info!("HAZ LINK DISCONNECT: {:?}", entry); + Ok(()) // TODO: REMOVE ME! +} + +fn main() { + simple_filter!(echo_handler, test); +} diff --git a/opensmtpd/examples/session_event_counter.rs b/opensmtpd/examples/session_event_counter.rs index 90d54c4..3dd93cb 100644 --- a/opensmtpd/examples/session_event_counter.rs +++ b/opensmtpd/examples/session_event_counter.rs @@ -1,5 +1,4 @@ -use log::{info, Level}; -use opensmtpd::{handlers, report, Entry, SmtpIn, SmtpdLogger}; +use opensmtpd::{report, simple_filter}; #[derive(Clone, Default)] struct MyContext { @@ -13,6 +12,5 @@ fn on_report(ctx: &mut MyContext, entry: &Entry) { } fn main() { - let _ = SmtpdLogger::new().set_level(Level::Debug).init(); - SmtpIn::new().event_handlers(handlers!(on_report)).run(); + simple_filter!(vec![on_report]); } diff --git a/opensmtpd/src/entry.rs b/opensmtpd/src/entry.rs index e3ed036..901b3cf 100644 --- a/opensmtpd/src/entry.rs +++ b/opensmtpd/src/entry.rs @@ -8,31 +8,32 @@ use crate::errors::Error; use nom::branch::alt; -use nom::bytes::complete::{tag, take_till}; -use nom::character::complete::{digit1, hex_digit1, line_ending}; +use nom::bytes::streaming::{tag, take_till}; +use nom::character::streaming::{digit1, hex_digit1, line_ending}; use nom::combinator::{map_res, value}; use nom::multi::many0; +use nom::Err::Incomplete; use nom::IResult; use std::str::FromStr; -#[derive(Clone, Debug, PartialEq)] -enum Version { +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Version { V1, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum Kind { Report, Filter, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum Subsystem { SmtpIn, SmtpOut, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] pub enum Event { LinkAuth, LinkConnect, @@ -114,16 +115,17 @@ pub enum Entry { V1Filter(V1Filter), } -impl FromStr for Entry { - type Err = Error; - - fn from_str(entry: &str) -> Result { - let (_, res) = parse_entry(entry)?; - Ok(res) - } -} - impl Entry { + pub fn new(entry: &str) -> Result<(String, Option), Error> { + match parse_entry(entry) { + Ok((remainder, entry)) => Ok((remainder.to_string(), Some(entry))), + Err(e) => match e { + Incomplete(_) => Ok((String::new(), None)), + _ => Err(e.into()), + }, + } + } + pub fn get_event(&self) -> Event { match self { Entry::V1Report(r) => r.event.to_owned(), @@ -244,7 +246,7 @@ fn parse_v1_report(input: &str) -> IResult<&str, Entry> { let (input, _) = separator(input)?; let (input, session_id) = parse_session_id(input)?; let (input, params) = many0(parse_param)(input)?; - let _ = line_ending(input)?; + let (input, _) = line_ending(input)?; let report = V1Report { timestamp, subsystem, @@ -266,7 +268,7 @@ fn parse_v1_filter(input: &str) -> IResult<&str, Entry> { let (input, _) = separator(input)?; let (input, token) = parse_token(input)?; let (input, params) = many0(parse_param)(input)?; - let _ = line_ending(input)?; + let (input, _) = line_ending(input)?; let filter = V1Filter { timestamp, subsystem, diff --git a/opensmtpd/src/errors.rs b/opensmtpd/src/errors.rs index 1ba6b8d..bb7b240 100644 --- a/opensmtpd/src/errors.rs +++ b/opensmtpd/src/errors.rs @@ -6,8 +6,6 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use crate::entry::Entry; -use crate::event_handlers::EventHandler; use std::fmt; pub struct Error { @@ -34,6 +32,18 @@ impl From for Error { } } +impl From for Error { + fn from(error: std::string::String) -> Self { + Error { message: error } + } +} + +impl From for Error { + fn from(error: std::str::Utf8Error) -> Self { + Error::new(&format!("UTF8 error: {}", error)) + } +} + impl From for Error { fn from(error: log::SetLoggerError) -> Self { Error::new(&format!("Logger error: {}", error)) @@ -50,15 +60,3 @@ impl From> for Error { Error::new(&format!("Parsing error: {}", msg)) } } - -impl From> for Error { - fn from(error: std::sync::mpsc::SendError) -> Self { - Error::new(&format!("IO error: {}", error)) - } -} - -impl From>> for Error { - fn from(error: std::sync::mpsc::SendError>) -> Self { - Error::new(&format!("IO error: {}", error)) - } -} diff --git a/opensmtpd/src/event_handlers.rs b/opensmtpd/src/event_handlers.rs deleted file mode 100644 index 361eb97..0000000 --- a/opensmtpd/src/event_handlers.rs +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright (c) 2019 Rodolphe Bréard -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use crate::entry::{Entry, Event}; -use std::str::FromStr; - -#[derive(Clone, Debug, PartialEq)] -pub enum MatchEvent { - Evt(Vec), - All, -} - -#[derive(Clone)] -pub enum Callback { - NoCtx(fn(&Entry)), - Ctx(fn(&T, &Entry)), - CtxMut(fn(&mut T, &Entry)), -} - -#[derive(Clone)] -pub struct EventHandler { - pub(crate) event: MatchEvent, - callback: Callback, -} - -fn get_events_from_string(event_str: &str) -> MatchEvent { - let mut events = Vec::new(); - for name in event_str.split(" , ") { - match name { - "Any" | "All" => { - return MatchEvent::All; - } - _ => { - if let Ok(e) = Event::from_str(name) { - events.push(e); - } - } - } - } - MatchEvent::Evt(events) -} - -impl EventHandler { - pub fn new(event_str: &str, callback: Callback) -> Self { - EventHandler { - event: get_events_from_string(event_str), - callback, - } - } - - pub fn is_callable(&self, event: &Event) -> bool { - match &self.event { - MatchEvent::All => true, - MatchEvent::Evt(v) => v.contains(&event), - } - } - - pub fn call(&self, entry: &Entry, context: &mut T) { - match self.callback { - Callback::NoCtx(f) => f(entry), - Callback::Ctx(f) => f(context, entry), - Callback::CtxMut(f) => f(context, entry), - }; - } -} - -#[cfg(test)] -mod test { - use crate::*; - - #[test] - fn test_eventhandler_build_noctx() { - EventHandler::new("Any", Callback::NoCtx::(|_entry: &Entry| {})); - } - - #[test] - fn test_eventhandler_build_ctx() { - EventHandler::new( - "Any", - Callback::Ctx(|_context: &NoContext, _entry: &Entry| {}), - ); - } - - #[test] - fn test_eventhandler_build_ctxmut() { - EventHandler::new( - "Any", - Callback::CtxMut(|_context: &mut NoContext, _entry: &Entry| {}), - ); - } -} diff --git a/opensmtpd/src/handler.rs b/opensmtpd/src/handler.rs new file mode 100644 index 0000000..cb41b81 --- /dev/null +++ b/opensmtpd/src/handler.rs @@ -0,0 +1,65 @@ +// Copyright (c) 2019 Rodolphe Bréard +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use crate::entry::{Entry, Event, Kind, Subsystem, Version}; +use crate::errors::Error; +use crate::output::FilterOutput; +use std::collections::HashSet; + +macro_rules! handle { + ($self: ident, $obj: ident, $version: expr, $kind: expr, $entry: ident, $output: ident) => {{ + if $self.version == $version + && $self.kind == $kind + && $self.subsystem == $obj.subsystem + && $self.events.contains(&$obj.event) + { + ($self.action)($output, $entry)?; + } + Ok(()) + }}; +} + +type Callback = fn(&mut dyn FilterOutput, &Entry) -> Result<(), String>; + +#[derive(Clone)] +pub struct Handler { + version: Version, + kind: Kind, + subsystem: Subsystem, + events: HashSet, + action: Callback, +} + +impl Handler { + pub fn new( + version: Version, + kind: Kind, + subsystem: Subsystem, + events: &[Event], + action: Callback, + ) -> Self { + Handler { + version, + kind, + subsystem, + events: events.iter().cloned().collect(), + action, + } + } + + pub fn send(&self, entry: &Entry, output: &mut dyn FilterOutput) -> Result<(), Error> { + match entry { + Entry::V1Report(report) => { + handle!(self, report, Version::V1, Kind::Report, entry, output) + } + Entry::V1Filter(filter) => { + handle!(self, filter, Version::V1, Kind::Filter, entry, output) + } + } + } +} diff --git a/opensmtpd/src/response.rs b/opensmtpd/src/input.rs similarity index 68% rename from opensmtpd/src/response.rs rename to opensmtpd/src/input.rs index a0a75df..ba3b057 100644 --- a/opensmtpd/src/response.rs +++ b/opensmtpd/src/input.rs @@ -6,6 +6,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -pub enum Response { - None, +use crate::entry::Entry; +use crate::errors::Error; + +pub trait FilterInput { + fn next(&mut self) -> Result; } + +mod stdin; +pub use stdin::StdIn; diff --git a/opensmtpd/src/input/stdin.rs b/opensmtpd/src/input/stdin.rs new file mode 100644 index 0000000..bd483b3 --- /dev/null +++ b/opensmtpd/src/input/stdin.rs @@ -0,0 +1,66 @@ +// Copyright (c) 2019 Rodolphe Bréard +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use crate::entry::Entry; +use crate::errors::Error; +use crate::input::FilterInput; +use std::default::Default; +use std::io::{self, Read}; +use std::str; + +const BUFFER_SIZE: usize = 4096; + +pub struct StdIn { + buffer: [u8; BUFFER_SIZE], + stdin: io::Stdin, + input: String, +} + +impl Default for StdIn { + fn default() -> Self { + StdIn { + buffer: [0; BUFFER_SIZE], + stdin: io::stdin(), + input: String::new(), + } + } +} + +impl FilterInput for StdIn { + fn next(&mut self) -> Result { + let mut force_read = false; + loop { + if force_read || self.input.is_empty() { + // Read stdin in self.buffer + self.buffer.copy_from_slice(&[0; BUFFER_SIZE]); + let len = self.stdin.read(&mut self.buffer)?; + if len == 0 { + continue; + } + // Put the buffer's content in self.input + self.input += match self.buffer.iter().position(|&x| x == 0) { + Some(i) => str::from_utf8(&self.buffer[..i]), + None => str::from_utf8(&self.buffer), + }?; + } + // Try to build an entry from self.input + let (remainder, entry_opt) = Entry::new(&self.input)?; + match entry_opt { + // We have at least one entry. + Some(entry) => { + self.input = remainder; + return Ok(entry); + } + // The data is incomplete, no entry could be built. + None => { + force_read = true; + } + }; + } + } +} diff --git a/opensmtpd/src/lib.rs b/opensmtpd/src/lib.rs index 763a5f1..d204577 100644 --- a/opensmtpd/src/lib.rs +++ b/opensmtpd/src/lib.rs @@ -6,160 +6,101 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -mod entry; mod errors; -mod event_handlers; +mod handler; mod logger; -mod response; -mod session_handler; -use crate::session_handler::SessionHandler; -use log::{error, warn}; -use std::collections::HashMap; -use std::io; -use std::str::FromStr; -use std::sync::mpsc; -use std::thread; +pub mod entry; +pub mod input; +pub mod output; + +use log; +use std::default::Default; -pub use crate::entry::{Entry, Event}; pub use crate::errors::Error; -pub use crate::event_handlers::{Callback, EventHandler, MatchEvent}; +pub use crate::handler::Handler; pub use crate::logger::SmtpdLogger; -pub use crate::response::Response; -pub use opensmtpd_derive::{event, report}; +pub use opensmtpd_derive::report; #[macro_export] -macro_rules! handlers { - ( $( $x:expr ),* ) => { - { - let mut temp_vec = Vec::new(); - $( - temp_vec.push(($x)()); - )* - temp_vec - } +macro_rules! simple_filter { + ($( $x:expr ),*) => { + opensmtpd::simple_filter_log_level!(log::Level::Info $(,$x)*); }; } -#[derive(Clone, Default)] -pub struct NoContext; - -#[derive(Default)] -pub struct SmtpIn { - sessions: HashMap, thread::JoinHandle<()>)>, - event_handlers: Vec>, +#[macro_export] +macro_rules! simple_filter_log_level { + ($log_level: expr, $( $x:expr ),*) => { + let mut handlers = Vec::new(); + $( + handlers.push(($x)()); + )*; + let _ = opensmtpd::SmtpdLogger::new() + .set_level($log_level) + .init(); + opensmtpd::Filter::::default().set_handlers(&handlers).register_events().run(); + }; } -impl SmtpIn { - fn register_events(&self) { - let mut evts = Vec::new(); - for eh in self.event_handlers.iter() { - match eh.event { - MatchEvent::Evt(ref v) => { - for e in v.iter() { - evts.push(e); - } - } - MatchEvent::All => { - println!("register|report|smtp-in|*"); - evts.clear(); - break; - } - } - } - evts.dedup(); - for e in evts.iter() { - println!("register|report|smtp-in|{}", e.to_string()); - } - // TODO: register filters - // println!("register|filter|smtp-in|{}", "name"); - println!("register|ready"); - } +pub struct Filter +where + I: crate::input::FilterInput + Default, + O: crate::output::FilterOutput + Default, +{ + input: I, + output: O, + handlers: Vec, +} - /// Read a line from the standard input. - /// Since EOF should not append, it is considered as an error. - fn read(&self) -> Result { - let mut input = String::new(); - let nb = io::stdin().read_line(&mut input)?; - match nb { - 0 => Err(Error::new("end of file")), - _ => Ok(input), +impl Default for Filter +where + I: crate::input::FilterInput + Default, + O: crate::output::FilterOutput + Default, +{ + fn default() -> Self { + Filter { + input: I::default(), + output: O::default(), + handlers: Vec::new(), } } +} - /// Dispatch the entry into its session's thread. If such thread does not - /// already exists, creates it. - fn dispatch(&mut self, input: &str) -> Result<(), Error> { - let entry = Entry::from_str(input)?; - let id = entry.get_session_id(); - let disconnect = entry.is_disconnect(); - let channel = match self.sessions.get(&id) { - Some((r, _)) => r, - None => { - let (handlers_tx, handlers_rx) = mpsc::channel(); - let (entry_tx, entry_rx) = mpsc::channel(); - let name = entry.get_session_id().to_string(); - let handle = thread::Builder::new().name(name).spawn(move || { - SessionHandler::new(entry_rx, &handlers_rx).read_entries(); - })?; - for h in self.event_handlers.iter() { - handlers_tx.send(h.clone())?; - } - self.sessions - .insert(entry.get_session_id(), (entry_tx, handle)); - let (r, _) = &self.sessions[&entry.get_session_id()]; - r - } - }; - channel.send(entry)?; - if disconnect { - let _ = self.sessions.remove(&id); - } - Ok(()) - } - - /// Allow each child thread to exit gracefully. First, the session table is - /// drained so all the references to the senders are dropped, which will - /// cause the receivers threads to exit. Then, we uses the join handlers in - /// order to wait for the actual exit. - fn graceful_exit_children(&mut self) { - let mut handles = Vec::new(); - for (_, (_, h)) in self.sessions.drain() { - handles.push(h); - } - for h in handles { - let _ = h.join(); - } - } - - pub fn new() -> Self { - SmtpIn { - sessions: HashMap::new(), - event_handlers: Vec::new(), - } - } - - pub fn event_handlers(&mut self, handlers: Vec>) -> &mut Self { - self.event_handlers = handlers.to_owned(); +impl Filter +where + I: crate::input::FilterInput + Default, + O: crate::output::FilterOutput + Default, +{ + pub fn set_handlers(&mut self, handlers: &[Handler]) -> &mut Self { + self.handlers = handlers.to_vec(); + self + } + + pub fn register_events(&mut self) -> &mut Self { + // TODO: use self.output to register events self } - /// Run the infinite loop that will read and process input from stdin. pub fn run(&mut self) { - self.register_events(); loop { - let line = match self.read() { - Ok(l) => l, + match self.input.next() { + Ok(entry) => { + log::debug!("{:?}", entry); + for h in self.handlers.iter() { + match h.send(&entry, &mut self.output) { + Ok(_) => {} + Err(e) => { + log::warn!("Warning: {}", e); + } + }; + } + } Err(e) => { - self.graceful_exit_children(); - error!("{}", e); + log::error!("Error: {}", e); std::process::exit(1); } }; - match self.dispatch(&line) { - Ok(_) => {} - Err(e) => warn!("{}", e), - } } } } diff --git a/opensmtpd/src/output.rs b/opensmtpd/src/output.rs new file mode 100644 index 0000000..5a60636 --- /dev/null +++ b/opensmtpd/src/output.rs @@ -0,0 +1,19 @@ +// Copyright (c) 2019 Rodolphe Bréard +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use crate::errors::Error; + +pub trait FilterOutput { + fn send(&mut self, msg: &str) -> Result<(), Error>; +} + +mod null; +mod stdout; + +pub use null::NullOutput; +pub use stdout::{StdErr, StdOut}; diff --git a/opensmtpd/src/output/null.rs b/opensmtpd/src/output/null.rs new file mode 100644 index 0000000..acfcaf4 --- /dev/null +++ b/opensmtpd/src/output/null.rs @@ -0,0 +1,25 @@ +// Copyright (c) 2019 Rodolphe Bréard +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use crate::errors::Error; +use crate::output::FilterOutput; +use std::default::Default; + +pub struct NullOutput {} + +impl Default for NullOutput { + fn default() -> Self { + NullOutput {} + } +} + +impl FilterOutput for NullOutput { + fn send(&mut self, _msg: &str) -> Result<(), Error> { + Ok(()) + } +} diff --git a/opensmtpd/src/output/stdout.rs b/opensmtpd/src/output/stdout.rs new file mode 100644 index 0000000..9d2dd3e --- /dev/null +++ b/opensmtpd/src/output/stdout.rs @@ -0,0 +1,33 @@ +// Copyright (c) 2019 Rodolphe Bréard +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use crate::errors::Error; +use crate::output::FilterOutput; +use std::default::Default; + +macro_rules! new_stdout { + ($name: ident, $out: ident) => { + pub struct $name {} + + impl Default for $name { + fn default() -> Self { + $name {} + } + } + + impl FilterOutput for $name { + fn send(&mut self, msg: &str) -> Result<(), Error> { + $out!("{}", msg); + Ok(()) + } + } + }; +} + +new_stdout!(StdOut, println); +new_stdout!(StdErr, eprintln); diff --git a/opensmtpd/src/session_handler.rs b/opensmtpd/src/session_handler.rs deleted file mode 100644 index 90eff57..0000000 --- a/opensmtpd/src/session_handler.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) 2019 Rodolphe Bréard -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use crate::entry::Entry; -use crate::event_handlers::EventHandler; -use log::debug; -use std::sync::mpsc; -use std::thread; - -pub struct SessionHandler { - entry_rx: mpsc::Receiver, - event_handlers: Vec>, -} - -impl SessionHandler { - pub fn new( - entry_rx: mpsc::Receiver, - handlers_rx: &mpsc::Receiver>, - ) -> Self { - debug!( - "New thread for session {}", - thread::current().name().unwrap() - ); - let mut event_handlers = Vec::new(); - for h in handlers_rx.iter() { - debug!("Event handler registered"); - event_handlers.push(h); - } - SessionHandler { - entry_rx, - event_handlers, - } - } - - pub fn read_entries(&self) { - let mut context: T = Default::default(); - for e in self.entry_rx.iter() { - for h in self.event_handlers.iter() { - if h.is_callable(&e.get_event()) { - h.call(&e, &mut context); - } - } - } - } -}