Use more specific structs to represent an entry

This commit is contained in:
Rodolphe Breard 2019-07-11 22:19:09 +02:00
parent a9fb623791
commit ec5aabcf99
5 changed files with 114 additions and 44 deletions

View file

@ -8,7 +8,7 @@ fn on_event(entry: &Entry) {
#[event(LinkConnect)] #[event(LinkConnect)]
fn on_connect(entry: &Entry) { fn on_connect(entry: &Entry) {
info!("New client on session {:x}.", entry.session_id); info!("New client on session {:x}.", entry.get_session_id());
} }
fn main() { fn main() {

View file

@ -9,7 +9,7 @@ struct MyContext {
#[report(Any)] #[report(Any)]
fn on_report(ctx: &mut MyContext, entry: &Entry) { fn on_report(ctx: &mut MyContext, entry: &Entry) {
ctx.nb += 1; ctx.nb += 1;
info!("Event received: {}, {}", entry.session_id, ctx.nb); info!("Event received: {}, {}", entry.get_session_id(), ctx.nb);
} }
fn main() { fn main() {

View file

@ -16,6 +16,11 @@ use nom::sequence::preceded;
use nom::IResult; use nom::IResult;
use std::str::FromStr; use std::str::FromStr;
#[derive(Clone, Debug, PartialEq)]
enum Version {
V1,
}
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub enum Kind { pub enum Kind {
Report, Report,
@ -43,20 +48,23 @@ pub enum Event {
TxRollback, TxRollback,
ProtocolClient, ProtocolClient,
ProtocolServer, ProtocolServer,
Timeout,
FilterResponse, FilterResponse,
Timeout,
} }
impl FromStr for Event { impl FromStr for Event {
type Err = Error; type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s let s = s.to_lowercase();
.to_lowercase() let s = if !s.contains('-') {
.replace("link", "link-") s.replace("link", "link-")
.replace("tx", "tx-") .replace("tx", "tx-")
.replace("protocol", "protocol-") .replace("protocol", "protocol-")
.replace("filter", "filter-"); .replace("filter", "filter-")
} else {
s
};
let (_, evt) = parse_event(&s)?; let (_, evt) = parse_event(&s)?;
Ok(evt) Ok(evt)
} }
@ -78,8 +86,8 @@ impl ToString for Event {
Event::TxRollback => "tx-rollback", Event::TxRollback => "tx-rollback",
Event::ProtocolClient => "protocol-client", Event::ProtocolClient => "protocol-client",
Event::ProtocolServer => "protocol-server", Event::ProtocolServer => "protocol-server",
Event::Timeout => "timeout",
Event::FilterResponse => "filter-response", Event::FilterResponse => "filter-response",
Event::Timeout => "timeout",
}; };
String::from(s) String::from(s)
} }
@ -98,15 +106,9 @@ impl TimeVal {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct Entry { pub enum Entry {
pub kind: Kind, V1Report(V1Report),
pub version: u8, V1Filter(V1Filter),
pub timestamp: TimeVal,
pub subsystem: Subsystem,
pub event: Event,
pub token: Option<u64>,
pub session_id: u64,
pub params: Option<String>,
} }
impl FromStr for Entry { impl FromStr for Entry {
@ -118,6 +120,48 @@ impl FromStr for Entry {
} }
} }
impl Entry {
pub fn get_event(&self) -> Event {
match self {
Entry::V1Report(r) => r.event.to_owned(),
Entry::V1Filter(f) => f.event.to_owned(),
}
}
pub fn get_session_id(&self) -> u64 {
match self {
Entry::V1Report(r) => r.session_id,
Entry::V1Filter(f) => f.session_id,
}
}
pub fn is_disconnect(&self) -> bool {
match self {
Entry::V1Report(r) => r.event == Event::LinkDisconnect,
Entry::V1Filter(_) => false,
}
}
}
#[derive(Debug)]
pub struct V1Report {
pub timestamp: TimeVal,
pub subsystem: Subsystem,
pub event: Event,
pub session_id: u64,
pub params: Option<String>,
}
#[derive(Debug)]
pub struct V1Filter {
pub timestamp: TimeVal,
pub subsystem: Subsystem,
pub event: Event,
pub session_id: u64,
pub token: u64,
pub params: Option<String>,
}
fn separator(input: &str) -> IResult<&str, &str> { fn separator(input: &str) -> IResult<&str, &str> {
tag("|")(input) tag("|")(input)
} }
@ -129,8 +173,8 @@ fn parse_kind(input: &str) -> IResult<&str, Kind> {
))(input) ))(input)
} }
fn parse_version(input: &str) -> IResult<&str, u8> { fn parse_version(input: &str) -> IResult<&str, Version> {
map_res(digit1, |s: &str| s.parse::<u8>())(input) value(Version::V1, tag("1"))(input)
} }
fn parse_timestamp(input: &str) -> IResult<&str, TimeVal> { fn parse_timestamp(input: &str) -> IResult<&str, TimeVal> {
@ -163,8 +207,8 @@ fn parse_event(input: &str) -> IResult<&str, Event> {
value(Event::TxRollback, tag("tx-rollback")), value(Event::TxRollback, tag("tx-rollback")),
value(Event::ProtocolClient, tag("protocol-client")), value(Event::ProtocolClient, tag("protocol-client")),
value(Event::ProtocolServer, tag("protocol-server")), value(Event::ProtocolServer, tag("protocol-server")),
value(Event::Timeout, tag("timeout")),
value(Event::FilterResponse, tag("filter-response")), value(Event::FilterResponse, tag("filter-response")),
value(Event::Timeout, tag("timeout")),
))(input) ))(input)
} }
@ -181,38 +225,63 @@ fn parse_params(input: &str) -> IResult<&str, String> {
Ok((input, params.0.into_iter().collect())) Ok((input, params.0.into_iter().collect()))
} }
fn parse_entry(input: &str) -> IResult<&str, Entry> { fn parse_v1_report(input: &str) -> IResult<&str, Entry> {
let (input, kind) = parse_kind(input)?;
let (input, _) = separator(input)?;
let (input, version) = parse_version(input)?;
let (input, _) = separator(input)?;
let (input, timestamp) = parse_timestamp(input)?; let (input, timestamp) = parse_timestamp(input)?;
let (input, _) = separator(input)?; let (input, _) = separator(input)?;
let (input, subsystem) = parse_subsystem(input)?; let (input, subsystem) = parse_subsystem(input)?;
let (input, _) = separator(input)?; let (input, _) = separator(input)?;
let (input, event) = parse_event(input)?; let (input, event) = parse_event(input)?;
let (input, token) = if kind == Kind::Filter {
let (input, _) = separator(input)?;
let (input, token) = parse_token(input)?;
(input, Some(token))
} else {
(input, None)
};
let (input, _) = separator(input)?; let (input, _) = separator(input)?;
let (input, session_id) = parse_session_id(input)?; let (input, session_id) = parse_session_id(input)?;
let (input, params) = opt(preceded(separator, parse_params))(input)?; let (input, params) = opt(preceded(separator, parse_params))(input)?;
if params.is_none() { if params.is_none() {
let _ = line_ending(input)?; let _ = line_ending(input)?;
} }
let entry = Entry { let report = V1Report {
kind,
version,
timestamp, timestamp,
subsystem, subsystem,
event, event,
token,
session_id, session_id,
params, params,
}; };
Ok((input, Entry::V1Report(report)))
}
fn parse_v1_filter(input: &str) -> IResult<&str, Entry> {
let (input, timestamp) = parse_timestamp(input)?;
let (input, _) = separator(input)?;
let (input, subsystem) = parse_subsystem(input)?;
let (input, _) = separator(input)?;
let (input, event) = parse_event(input)?;
let (input, _) = separator(input)?;
let (input, token) = parse_token(input)?;
let (input, _) = separator(input)?;
let (input, session_id) = parse_session_id(input)?;
let (input, params) = opt(preceded(separator, parse_params))(input)?;
if params.is_none() {
let _ = line_ending(input)?;
}
let filter = V1Filter {
timestamp,
subsystem,
event,
session_id,
token,
params,
};
Ok((input, Entry::V1Filter(filter)))
}
fn parse_entry(input: &str) -> IResult<&str, Entry> {
let (input, kind) = parse_kind(input)?;
let (input, _) = separator(input)?;
let (input, version) = parse_version(input)?;
let (input, _) = separator(input)?;
let (input, entry) = match version {
Version::V1 => match kind {
Kind::Report => parse_v1_report(input)?,
Kind::Filter => parse_v1_filter(input)?,
},
};
Ok((input, entry)) Ok((input, entry))
} }

View file

@ -91,22 +91,23 @@ impl<T: Clone + Default + 'static> SmtpIn<T> {
/// already exists, creates it. /// already exists, creates it.
fn dispatch(&mut self, input: &str) -> Result<(), Error> { fn dispatch(&mut self, input: &str) -> Result<(), Error> {
let entry = Entry::from_str(input)?; let entry = Entry::from_str(input)?;
let id = entry.session_id; let id = entry.get_session_id();
let disconnect = entry.event == Event::LinkDisconnect; let disconnect = entry.is_disconnect();
let channel = match self.sessions.get(&id) { let channel = match self.sessions.get(&id) {
Some((r, _)) => r, Some((r, _)) => r,
None => { None => {
let (handlers_tx, handlers_rx) = mpsc::channel(); let (handlers_tx, handlers_rx) = mpsc::channel();
let (entry_tx, entry_rx) = mpsc::channel(); let (entry_tx, entry_rx) = mpsc::channel();
let name = entry.session_id.to_string(); let name = entry.get_session_id().to_string();
let handle = thread::Builder::new().name(name).spawn(move || { let handle = thread::Builder::new().name(name).spawn(move || {
SessionHandler::new(entry_rx, &handlers_rx).read_entries(); SessionHandler::new(entry_rx, &handlers_rx).read_entries();
})?; })?;
for h in self.event_handlers.iter() { for h in self.event_handlers.iter() {
handlers_tx.send(h.clone())?; handlers_tx.send(h.clone())?;
} }
self.sessions.insert(entry.session_id, (entry_tx, handle)); self.sessions
let (r, _) = &self.sessions[&entry.session_id]; .insert(entry.get_session_id(), (entry_tx, handle));
let (r, _) = &self.sessions[&entry.get_session_id()];
r r
} }
}; };

View file

@ -41,7 +41,7 @@ impl<T: Clone + Default> SessionHandler<T> {
let mut context: T = Default::default(); let mut context: T = Default::default();
for e in self.entry_rx.iter() { for e in self.entry_rx.iter() {
for h in self.event_handlers.iter() { for h in self.event_handlers.iter() {
if h.is_callable(&e.event) { if h.is_callable(&e.get_event()) {
h.call(&e, &mut context); h.call(&e, &mut context);
} }
} }