diff --git a/examples/dummy.rs b/examples/dummy.rs index dec43fa..58707ab 100644 --- a/examples/dummy.rs +++ b/examples/dummy.rs @@ -1,6 +1,7 @@ use env_logger::{Builder, Env}; +use opensmtpd::SmtpIn; fn main() { Builder::from_env(Env::default().default_filter_or("debug")).init(); - opensmtpd::run(); + SmtpIn::new().run(); } diff --git a/src/entry.rs b/src/entry.rs index fcc6e26..f6134aa 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -1,8 +1,6 @@ use crate::errors::Error; -use nom::{ - alt, alt_complete, call, complete, cond, do_parse, error_position, map_res, named, tag, - take_until, take_while, -}; +use nom::{alt, alt_complete, call, complete, cond, do_parse, error_position, map_res, named, tag, + take_until, take_while}; #[derive(Debug, PartialEq)] pub enum Kind { diff --git a/src/lib.rs b/src/lib.rs index 098a700..0be2f4f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,78 +9,84 @@ use std::io; use std::sync::mpsc; use std::thread; -/// Read a line from the standard input. -/// Since EOF should not append, it is considered as an error. -fn read() -> Result { - let mut input = String::new(); - let nb = io::stdin().read_line(&mut input)?; - match nb { - 0 => Err(Error::new("end of file")), - _ => Ok(input), - } +pub struct SmtpIn { + sessions: HashMap, thread::JoinHandle<()>)>, } -/// Dispatch the entry into its session's thread. If such thread does not -/// already exists, creates it. -fn dispatch( - sessions: &mut HashMap, thread::JoinHandle<()>)>, - input: &str, -) -> Result<(), Error> { - let entry = Entry::from_str(input)?; - let channel = match sessions.get(&entry.session_id) { - Some((r, _)) => r, - None => { - let (tx, rx) = mpsc::channel(); - let name = entry.session_id.to_string(); - let handle = thread::Builder::new().name(name).spawn(move || { - debug!( - "New thread for session {}", - thread::current().name().unwrap() - ); - for e in rx.iter() { - debug!("thread {}: {:?}", thread::current().name().unwrap(), e); - } - })?; - sessions.insert(entry.session_id, (tx, handle)); - let (r, _) = sessions.get(&entry.session_id).unwrap(); - r +impl SmtpIn { + /// Read a line from the standard input. + /// Since EOF should not append, it is considered as an error. + fn read() -> Result { + let mut input = String::new(); + let nb = io::stdin().read_line(&mut input)?; + match nb { + 0 => Err(Error::new("end of file")), + _ => Ok(input), } - }; - channel.send(entry)?; - Ok(()) -} - -/// Allow each child thread to exit gracefully. First, the session table is -/// drained so all the references to the senders are dropped, which will -/// cause the receivers threads to exit. Then, we uses the join handlers in -/// order to wait for the actual exit. -fn graceful_exit_children( - sessions: &mut HashMap, thread::JoinHandle<()>)>, -) { - let mut handles = Vec::new(); - for (_, (_, h)) in sessions.drain() { - handles.push(h); } - for h in handles { - let _ = h.join(); - } -} -/// Run the infinite loop that will read and process input from stdin. -pub fn run() { - let mut sessions = HashMap::new(); - loop { - let line = match read() { - Ok(l) => l, - Err(e) => { - graceful_exit_children(&mut sessions); - error!("{}", e); - std::process::exit(1); + /// Dispatch the entry into its session's thread. If such thread does not + /// already exists, creates it. + fn dispatch(&mut self, input: &str) -> Result<(), Error> { + let entry = Entry::from_str(input)?; + let channel = match self.sessions.get(&entry.session_id) { + Some((r, _)) => r, + None => { + let (tx, rx) = mpsc::channel(); + let name = entry.session_id.to_string(); + let handle = thread::Builder::new().name(name).spawn(move || { + debug!( + "New thread for session {}", + thread::current().name().unwrap() + ); + for e in rx.iter() { + debug!("thread {}: {:?}", thread::current().name().unwrap(), e); + } + })?; + self.sessions.insert(entry.session_id, (tx, handle)); + let (r, _) = self.sessions.get(&entry.session_id).unwrap(); + r } }; - match dispatch(&mut sessions, &line) { - Ok(_) => {} - Err(e) => warn!("{}", e), + channel.send(entry)?; + Ok(()) + } + + /// Allow each child thread to exit gracefully. First, the session table is + /// drained so all the references to the senders are dropped, which will + /// cause the receivers threads to exit. Then, we uses the join handlers in + /// order to wait for the actual exit. + fn graceful_exit_children(&mut self) { + let mut handles = Vec::new(); + for (_, (_, h)) in self.sessions.drain() { + handles.push(h); + } + for h in handles { + let _ = h.join(); + } + } + + pub fn new() -> Self { + SmtpIn { + sessions: HashMap::new(), + } + } + + /// Run the infinite loop that will read and process input from stdin. + pub fn run(&mut self) { + loop { + let line = match SmtpIn::read() { + Ok(l) => l, + Err(e) => { + self.graceful_exit_children(); + error!("{}", e); + std::process::exit(1); + } + }; + match self.dispatch(&line) { + Ok(_) => {} + Err(e) => warn!("{}", e), + } } } }