Use a builder instead of a raw function

This pattern will, in the future, allow the registration of events
handlers and context objects.
This commit is contained in:
Rodolphe Breard 2019-01-03 20:16:23 +01:00
parent c25dfb253a
commit 98e4beadd3
3 changed files with 75 additions and 70 deletions

View file

@ -1,6 +1,7 @@
use env_logger::{Builder, Env}; use env_logger::{Builder, Env};
use opensmtpd::SmtpIn;
fn main() { fn main() {
Builder::from_env(Env::default().default_filter_or("debug")).init(); Builder::from_env(Env::default().default_filter_or("debug")).init();
opensmtpd::run(); SmtpIn::new().run();
} }

View file

@ -1,8 +1,6 @@
use crate::errors::Error; use crate::errors::Error;
use nom::{ use nom::{alt, alt_complete, call, complete, cond, do_parse, error_position, map_res, named, tag,
alt, alt_complete, call, complete, cond, do_parse, error_position, map_res, named, tag, take_until, take_while};
take_until, take_while,
};
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum Kind { pub enum Kind {

View file

@ -9,78 +9,84 @@ use std::io;
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
/// Read a line from the standard input. pub struct SmtpIn {
/// Since EOF should not append, it is considered as an error. sessions: HashMap<u64, (mpsc::Sender<Entry>, thread::JoinHandle<()>)>,
fn read() -> Result<String, Error> {
let mut input = String::new();
let nb = io::stdin().read_line(&mut input)?;
match nb {
0 => Err(Error::new("end of file")),
_ => Ok(input),
}
} }
/// Dispatch the entry into its session's thread. If such thread does not impl SmtpIn {
/// already exists, creates it. /// Read a line from the standard input.
fn dispatch( /// Since EOF should not append, it is considered as an error.
sessions: &mut HashMap<u64, (mpsc::Sender<Entry>, thread::JoinHandle<()>)>, fn read() -> Result<String, Error> {
input: &str, let mut input = String::new();
) -> Result<(), Error> { let nb = io::stdin().read_line(&mut input)?;
let entry = Entry::from_str(input)?; match nb {
let channel = match sessions.get(&entry.session_id) { 0 => Err(Error::new("end of file")),
Some((r, _)) => r, _ => Ok(input),
None => {
let (tx, rx) = mpsc::channel();
let name = entry.session_id.to_string();
let handle = thread::Builder::new().name(name).spawn(move || {
debug!(
"New thread for session {}",
thread::current().name().unwrap()
);
for e in rx.iter() {
debug!("thread {}: {:?}", thread::current().name().unwrap(), e);
}
})?;
sessions.insert(entry.session_id, (tx, handle));
let (r, _) = sessions.get(&entry.session_id).unwrap();
r
} }
};
channel.send(entry)?;
Ok(())
}
/// Allow each child thread to exit gracefully. First, the session table is
/// drained so all the references to the senders are dropped, which will
/// cause the receivers threads to exit. Then, we uses the join handlers in
/// order to wait for the actual exit.
fn graceful_exit_children(
sessions: &mut HashMap<u64, (mpsc::Sender<Entry>, thread::JoinHandle<()>)>,
) {
let mut handles = Vec::new();
for (_, (_, h)) in sessions.drain() {
handles.push(h);
} }
for h in handles {
let _ = h.join();
}
}
/// Run the infinite loop that will read and process input from stdin. /// Dispatch the entry into its session's thread. If such thread does not
pub fn run() { /// already exists, creates it.
let mut sessions = HashMap::new(); fn dispatch(&mut self, input: &str) -> Result<(), Error> {
loop { let entry = Entry::from_str(input)?;
let line = match read() { let channel = match self.sessions.get(&entry.session_id) {
Ok(l) => l, Some((r, _)) => r,
Err(e) => { None => {
graceful_exit_children(&mut sessions); let (tx, rx) = mpsc::channel();
error!("{}", e); let name = entry.session_id.to_string();
std::process::exit(1); let handle = thread::Builder::new().name(name).spawn(move || {
debug!(
"New thread for session {}",
thread::current().name().unwrap()
);
for e in rx.iter() {
debug!("thread {}: {:?}", thread::current().name().unwrap(), e);
}
})?;
self.sessions.insert(entry.session_id, (tx, handle));
let (r, _) = self.sessions.get(&entry.session_id).unwrap();
r
} }
}; };
match dispatch(&mut sessions, &line) { channel.send(entry)?;
Ok(_) => {} Ok(())
Err(e) => warn!("{}", e), }
/// Allow each child thread to exit gracefully. First, the session table is
/// drained so all the references to the senders are dropped, which will
/// cause the receivers threads to exit. Then, we uses the join handlers in
/// order to wait for the actual exit.
fn graceful_exit_children(&mut self) {
let mut handles = Vec::new();
for (_, (_, h)) in self.sessions.drain() {
handles.push(h);
}
for h in handles {
let _ = h.join();
}
}
pub fn new() -> Self {
SmtpIn {
sessions: HashMap::new(),
}
}
/// Run the infinite loop that will read and process input from stdin.
pub fn run(&mut self) {
loop {
let line = match SmtpIn::read() {
Ok(l) => l,
Err(e) => {
self.graceful_exit_children();
error!("{}", e);
std::process::exit(1);
}
};
match self.dispatch(&line) {
Ok(_) => {}
Err(e) => warn!("{}", e),
}
} }
} }
} }