Read and parse incoming entries then dispatch them into session threads

This commit is contained in:
Rodolphe Breard 2018-12-29 16:56:56 +01:00
parent 010951c884
commit f57a201431
7 changed files with 227 additions and 5 deletions

View file

@ -5,3 +5,4 @@ authors = ["Rodolphe Breard <rodolphe@what.tf>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
nom = "^4.1"

6
examples/dummy.rs Normal file
View file

@ -0,0 +1,6 @@
fn main() {
match opensmtpd::dispatch() {
Ok(_) => {}
Err(e) => eprintln!("Error: {}", e.as_str()),
}
}

View file

@ -0,0 +1,4 @@
report|1|1544130229|smtp-in|tx-mail|4b0148c60f798628|fc08ce7d|<owner-hackers+M85937=gilles=poolp.org@openbsd.org>|ok
report|1|1544130229|smtp-in|tx-mail|0f3004c08c82d33e|fc08ce7d|<owner-hackers+M85937=gilles=poolp.org@openbsd.org>|ok
report|1|1544130229|smtp-in|tx-rcpt|4b0148c60f798628|fc08ce7d|<gilles@poolp.org>|ok
report|1|1544130229|smtp-in|tx-rcpt|0f3004c08c82d33e|fc08ce7d|<gilles@poolp.org>|ok

View file

@ -0,0 +1,2 @@
report|1|1544130229|smtp-in|tx-mail|0f3004c08c82d33e|fc08ce7d|<owner-hackers+M85937=gilles=poolp.org@openbsd.org>|ok
report|1|1544130229|smtp-in|tx-rcpt|0f3004c08c82d33e|fc08ce7d|<gilles@poolp.org>|ok

128
src/entry.rs Normal file
View file

@ -0,0 +1,128 @@
use crate::errors::Error;
use nom::{alt, call, cond, do_parse, error_position, map_res, named, tag, take_until, take_while};
#[derive(Debug, PartialEq)]
pub enum Kind {
Report,
Filter,
}
#[derive(Debug)]
pub enum Subsystem {
SmtpIn,
}
#[derive(Debug)]
pub enum Event {
TxMail,
TxRcpt,
}
#[derive(Debug)]
pub struct Entry {
pub kind: Kind,
pub version: u8,
pub timestamp: u64,
pub subsystem: Subsystem,
pub event: Event,
pub token: Option<u64>,
pub session_id: u64,
pub params: String,
}
fn is_ascii_digit(c: char) -> bool {
c.is_ascii_digit()
}
fn is_ascii_hexdigit(c: char) -> bool {
c.is_ascii_hexdigit()
}
fn to_u8(s: &str) -> Result<u8, std::num::ParseIntError> {
s.parse()
}
fn to_u64(s: &str) -> Result<u64, std::num::ParseIntError> {
s.parse()
}
fn to_u64_hex(s: &str) -> Result<u64, std::num::ParseIntError> {
u64::from_str_radix(s, 16)
}
named!(parse_kind<&str, Kind>,
alt!(
tag!("report") => { |_| Kind::Report } |
tag!("filter") => { |_| Kind::Filter }
)
);
named!(parse_subsystem<&str, Subsystem>,
alt! (
tag!("smtp-in") => { |_| Subsystem::SmtpIn }
)
);
named!(parse_event<&str, Event>,
alt!(
tag!("tx-mail") => { |_| Event::TxMail } |
tag!("tx-rcpt") => { |_| Event::TxRcpt }
)
);
named!(parse_version<&str, u8>,
map_res!(take_while!(is_ascii_digit), to_u8)
);
named!(parse_u64<&str, u64>,
map_res!(take_while!(is_ascii_digit), to_u64)
);
named!(parse_u64_hex<&str, u64>,
map_res!(take_while!(is_ascii_hexdigit), to_u64_hex)
);
named!(parse_token<&str, u64>,
do_parse!(
token: parse_u64_hex >>
tag!("|") >>
(token)
)
);
named!(
parse_entry<&str, Entry>,
do_parse!(
kind: parse_kind >>
tag!("|") >>
version: parse_version >>
tag!("|") >>
timestamp: parse_u64 >>
tag!("|") >>
subsystem: parse_subsystem >>
tag!("|") >>
event: parse_event >>
tag!("|") >>
token: cond!(kind == Kind::Filter, parse_token) >>
session_id: parse_u64_hex >>
tag!("|") >>
params: take_until!("\n") >>
(Entry {
kind,
version,
timestamp,
subsystem,
event,
token,
session_id,
params: params.to_string(),
})
)
);
impl Entry {
pub fn from_str(entry: &str) -> Result<Entry, Error> {
let (_, res) = parse_entry(entry)?;
Ok(res)
}
}

48
src/errors.rs Normal file
View file

@ -0,0 +1,48 @@
use crate::entry::Entry;
pub struct Error {
message: String,
}
impl Error {
pub fn new(msg: &str) -> Self {
Error {
message: msg.to_string(),
}
}
pub fn from_string(msg: &String) -> Self {
Error::new(&msg)
}
pub fn new_param(param: &str, msg: &str) -> Self {
Error::new(&format!("{}: {}", param, msg))
}
pub fn as_str(&self) -> &str {
&self.message
}
}
impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Error::from_string(&format!("IO error: {}", error))
}
}
impl From<nom::Err<&str>> for Error {
fn from(error: nom::Err<&str>) -> Self {
let msg = match error {
nom::Err::Incomplete(_) => "not enough data".to_string(),
nom::Err::Error(c) => format!("{:?}", c),
nom::Err::Failure(c) => format!("{:?}", c),
};
Error::from_string(&format!("Parsing error: {}", msg))
}
}
impl From<std::sync::mpsc::SendError<Entry>> for Error {
fn from(error: std::sync::mpsc::SendError<Entry>) -> Self {
Error::from_string(&format!("IO error: {}", error))
}
}

View file

@ -1,7 +1,40 @@
#[cfg(test)] mod entry;
mod tests { mod errors;
#[test]
fn it_works() { use crate::entry::Entry;
assert_eq!(2 + 2, 4); use crate::errors::Error;
use std::collections::HashMap;
use std::io;
use std::sync::mpsc;
use std::thread;
pub fn dispatch() -> Result<(), Error> {
let mut sessions = HashMap::new();
loop {
let mut input = String::new();
let nb = io::stdin().read_line(&mut input)?;
if nb == 0 {
continue;
}
let entry = Entry::from_str(input.as_str())?;
let channel = match sessions.get(&entry.session_id) {
Some(c) => c,
None => {
let (tx, rx) = mpsc::channel();
let name = entry.session_id.to_string();
thread::Builder::new().name(name).spawn(move || {
for e in rx.iter() {
println!(
"Debug: thread {}: {:?}",
thread::current().name().unwrap(),
e
);
}
})?;
sessions.insert(entry.session_id, tx);
sessions.get(&entry.session_id).unwrap()
}
};
channel.send(entry)?;
} }
} }