diff --git a/Cargo.toml b/Cargo.toml index 40c92d2..6c52410 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,3 +5,4 @@ authors = ["Rodolphe Breard "] edition = "2018" [dependencies] +nom = "^4.1" diff --git a/examples/dummy.rs b/examples/dummy.rs new file mode 100644 index 0000000..b905a67 --- /dev/null +++ b/examples/dummy.rs @@ -0,0 +1,6 @@ +fn main() { + match opensmtpd::dispatch() { + Ok(_) => {} + Err(e) => eprintln!("Error: {}", e.as_str()), + } +} diff --git a/examples/samples/dual_session.txt b/examples/samples/dual_session.txt new file mode 100644 index 0000000..762ebce --- /dev/null +++ b/examples/samples/dual_session.txt @@ -0,0 +1,4 @@ +report|1|1544130229|smtp-in|tx-mail|4b0148c60f798628|fc08ce7d||ok +report|1|1544130229|smtp-in|tx-mail|0f3004c08c82d33e|fc08ce7d||ok +report|1|1544130229|smtp-in|tx-rcpt|4b0148c60f798628|fc08ce7d||ok +report|1|1544130229|smtp-in|tx-rcpt|0f3004c08c82d33e|fc08ce7d||ok diff --git a/examples/samples/single_session.txt b/examples/samples/single_session.txt new file mode 100644 index 0000000..ee5fd76 --- /dev/null +++ b/examples/samples/single_session.txt @@ -0,0 +1,2 @@ +report|1|1544130229|smtp-in|tx-mail|0f3004c08c82d33e|fc08ce7d||ok +report|1|1544130229|smtp-in|tx-rcpt|0f3004c08c82d33e|fc08ce7d||ok diff --git a/src/entry.rs b/src/entry.rs new file mode 100644 index 0000000..6865d39 --- /dev/null +++ b/src/entry.rs @@ -0,0 +1,128 @@ +use crate::errors::Error; +use nom::{alt, call, cond, do_parse, error_position, map_res, named, tag, take_until, take_while}; + +#[derive(Debug, PartialEq)] +pub enum Kind { + Report, + Filter, +} + +#[derive(Debug)] +pub enum Subsystem { + SmtpIn, +} + +#[derive(Debug)] +pub enum Event { + TxMail, + TxRcpt, +} + +#[derive(Debug)] +pub struct Entry { + pub kind: Kind, + pub version: u8, + pub timestamp: u64, + pub subsystem: Subsystem, + pub event: Event, + pub token: Option, + pub session_id: u64, + pub params: String, +} + +fn is_ascii_digit(c: char) -> bool { + c.is_ascii_digit() +} + +fn is_ascii_hexdigit(c: char) -> bool { + c.is_ascii_hexdigit() +} + +fn to_u8(s: &str) -> Result { + s.parse() +} + +fn to_u64(s: &str) -> Result { + s.parse() +} + +fn to_u64_hex(s: &str) -> Result { + u64::from_str_radix(s, 16) +} + +named!(parse_kind<&str, Kind>, + alt!( + tag!("report") => { |_| Kind::Report } | + tag!("filter") => { |_| Kind::Filter } + ) +); + +named!(parse_subsystem<&str, Subsystem>, + alt! ( + tag!("smtp-in") => { |_| Subsystem::SmtpIn } + ) +); + +named!(parse_event<&str, Event>, + alt!( + tag!("tx-mail") => { |_| Event::TxMail } | + tag!("tx-rcpt") => { |_| Event::TxRcpt } + ) +); + +named!(parse_version<&str, u8>, + map_res!(take_while!(is_ascii_digit), to_u8) +); + +named!(parse_u64<&str, u64>, + map_res!(take_while!(is_ascii_digit), to_u64) +); + +named!(parse_u64_hex<&str, u64>, + map_res!(take_while!(is_ascii_hexdigit), to_u64_hex) +); + +named!(parse_token<&str, u64>, + do_parse!( + token: parse_u64_hex >> + tag!("|") >> + (token) + ) +); + +named!( + parse_entry<&str, Entry>, + do_parse!( + kind: parse_kind >> + tag!("|") >> + version: parse_version >> + tag!("|") >> + timestamp: parse_u64 >> + tag!("|") >> + subsystem: parse_subsystem >> + tag!("|") >> + event: parse_event >> + tag!("|") >> + token: cond!(kind == Kind::Filter, parse_token) >> + session_id: parse_u64_hex >> + tag!("|") >> + params: take_until!("\n") >> + (Entry { + kind, + version, + timestamp, + subsystem, + event, + token, + session_id, + params: params.to_string(), + }) + ) +); + +impl Entry { + pub fn from_str(entry: &str) -> Result { + let (_, res) = parse_entry(entry)?; + Ok(res) + } +} diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..63a49b1 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,48 @@ +use crate::entry::Entry; + +pub struct Error { + message: String, +} + +impl Error { + pub fn new(msg: &str) -> Self { + Error { + message: msg.to_string(), + } + } + + pub fn from_string(msg: &String) -> Self { + Error::new(&msg) + } + + pub fn new_param(param: &str, msg: &str) -> Self { + Error::new(&format!("{}: {}", param, msg)) + } + + pub fn as_str(&self) -> &str { + &self.message + } +} + +impl From for Error { + fn from(error: std::io::Error) -> Self { + Error::from_string(&format!("IO error: {}", error)) + } +} + +impl From> for Error { + fn from(error: nom::Err<&str>) -> Self { + let msg = match error { + nom::Err::Incomplete(_) => "not enough data".to_string(), + nom::Err::Error(c) => format!("{:?}", c), + nom::Err::Failure(c) => format!("{:?}", c), + }; + Error::from_string(&format!("Parsing error: {}", msg)) + } +} + +impl From> for Error { + fn from(error: std::sync::mpsc::SendError) -> Self { + Error::from_string(&format!("IO error: {}", error)) + } +} diff --git a/src/lib.rs b/src/lib.rs index 31e1bb2..5e5ffff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,40 @@ -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); +mod entry; +mod errors; + +use crate::entry::Entry; +use crate::errors::Error; +use std::collections::HashMap; +use std::io; +use std::sync::mpsc; +use std::thread; + +pub fn dispatch() -> Result<(), Error> { + let mut sessions = HashMap::new(); + loop { + let mut input = String::new(); + let nb = io::stdin().read_line(&mut input)?; + if nb == 0 { + continue; + } + let entry = Entry::from_str(input.as_str())?; + let channel = match sessions.get(&entry.session_id) { + Some(c) => c, + None => { + let (tx, rx) = mpsc::channel(); + let name = entry.session_id.to_string(); + thread::Builder::new().name(name).spawn(move || { + for e in rx.iter() { + println!( + "Debug: thread {}: {:?}", + thread::current().name().unwrap(), + e + ); + } + })?; + sessions.insert(entry.session_id, tx); + sessions.get(&entry.session_id).unwrap() + } + }; + channel.send(entry)?; } }