Refactor the reader/dispatcher

The previous design did not handled errors correctly and was kind of
spaghetti code. With the new one, the reader and the dispatcher are
clearly separated. The filter will only exit on an error from the reader
or if EOF has been reached, any other error is displayed but does not
exit the filter, which is required by the API. If the filter must exit,
all threads are gracefully stopped.
This commit is contained in:
Rodolphe Breard 2018-12-29 20:22:37 +01:00
parent b5cfe79947
commit 24b332c615
3 changed files with 77 additions and 38 deletions

View file

@ -1,6 +1,3 @@
fn main() { fn main() {
match opensmtpd::dispatch() { opensmtpd::run();
Ok(_) => {}
Err(e) => eprintln!("Error: {}", e.as_str()),
}
} }

View file

@ -15,12 +15,8 @@ impl Error {
Error::new(&msg) Error::new(&msg)
} }
pub fn new_param(param: &str, msg: &str) -> Self { pub fn display(&self) {
Error::new(&format!("{}: {}", param, msg)) eprintln!("Error: {}", self.message);
}
pub fn as_str(&self) -> &str {
&self.message
} }
} }

View file

@ -8,33 +8,79 @@ use std::io;
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
pub fn dispatch() -> Result<(), Error> { /// Read a line from the standard input.
let mut sessions = HashMap::new(); /// Since EOF should not append, it is considered as an error.
loop { fn read() -> Result<String, Error> {
let mut input = String::new(); let mut input = String::new();
let nb = io::stdin().read_line(&mut input)?; let nb = io::stdin().read_line(&mut input)?;
if nb == 0 { match nb {
continue; 0 => Err(Error::new("end of file")),
} _ => Ok(input),
let entry = Entry::from_str(input.as_str())?; }
let channel = match sessions.get(&entry.session_id) { }
Some(c) => c,
None => { /// Dispatch the entry into its session's thread. If such thread does not
let (tx, rx) = mpsc::channel(); /// already exists, creates it.
let name = entry.session_id.to_string(); fn dispatch(
thread::Builder::new().name(name).spawn(move || { sessions: &mut HashMap<u64, (mpsc::Sender<Entry>, thread::JoinHandle<()>)>,
for e in rx.iter() { input: &str,
println!( ) -> Result<(), Error> {
"Debug: thread {}: {:?}", let entry = Entry::from_str(input)?;
thread::current().name().unwrap(), let channel = match sessions.get(&entry.session_id) {
e Some((r, _)) => r,
); None => {
} let (tx, rx) = mpsc::channel();
})?; let name = entry.session_id.to_string();
sessions.insert(entry.session_id, tx); let handle = thread::Builder::new().name(name).spawn(move || {
sessions.get(&entry.session_id).unwrap() println!("New thread: {}", thread::current().name().unwrap());
} for e in rx.iter() {
}; println!(
channel.send(entry)?; "Debug: thread {}: {:?}",
thread::current().name().unwrap(),
e
);
}
})?;
sessions.insert(entry.session_id, (tx, handle));
let (r, _) = sessions.get(&entry.session_id).unwrap();
r
}
};
channel.send(entry)?;
Ok(())
}
/// Allow each child thread to exit gracefully. First, the session table is
/// drained so all the references to the senders are dropped, which will
/// cause the receivers threads to exit. Then, we uses the join handlers in
/// order to wait for the actual exit.
fn graceful_exit_children(
sessions: &mut HashMap<u64, (mpsc::Sender<Entry>, thread::JoinHandle<()>)>,
) {
let mut handles = Vec::new();
for (_, (_, h)) in sessions.drain() {
handles.push(h);
}
for h in handles {
let _ = h.join();
}
}
/// Run the infinite loop that will read and process input from stdin.
pub fn run() {
let mut sessions = HashMap::new();
loop {
let line = match read() {
Ok(l) => l,
Err(e) => {
graceful_exit_children(&mut sessions);
e.display();
std::process::exit(1);
}
};
match dispatch(&mut sessions, &line) {
Ok(_) => {}
Err(e) => e.display(),
}
} }
} }