From 648bf5d2c6d80e51c670579a0013a94590f77098 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rodolphe=20Br=C3=A9ard?= Date: Sun, 9 Apr 2023 11:07:16 +0200 Subject: [PATCH] Start moving to async --- Cargo.toml | 1 + src/handshake.rs | 4 ++-- src/main.rs | 16 +++++++++------- src/message.rs | 31 ++++++++++++++++--------------- src/stdin_reader.rs | 15 ++++++++++----- 5 files changed, 38 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 13e60ff..93c055d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,3 +12,4 @@ clap = { version = "4.1.13", default-features = false, features = ["std", "deriv env_logger = { version = "0.10.0", default-features = false } log = { version = "0.4.17", default-features = false } nom = { version = "7.1.3", default-features = false } +tokio = { version = "1.27.0", default-features = false, features = ["rt-multi-thread", "io-std", "io-util", "macros", "time", "process"] } diff --git a/src/handshake.rs b/src/handshake.rs index b0c0a3d..a69989d 100644 --- a/src/handshake.rs +++ b/src/handshake.rs @@ -4,9 +4,9 @@ use crate::stdin_reader::StdinReader; pub const CONFIG_END: &[u8] = b"config|ready\n"; pub const CONFIG_TAG: &[u8] = b"config|"; -pub fn read_config(reader: &mut StdinReader) { +pub async fn read_config(reader: &mut StdinReader) { loop { - let line = reader.read_line(); + let line = reader.read_line().await; if line == CONFIG_END { log::trace!("configuration is ready"); return; diff --git a/src/main.rs b/src/main.rs index 0d61563..3685cdf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -56,25 +56,27 @@ macro_rules! log_messages { }; } -fn main() { +#[tokio::main] +async fn main() -> Result<(), Box> { match config::Config::init() { Ok(cnf) => { logs::init_log_system(&cnf); log::debug!("{cnf:?}"); - main_loop(&cnf) + main_loop(&cnf).await } Err(e) => eprintln!("{e}"), } + Ok(()) } -fn main_loop(cnf: &config::Config) { +async fn main_loop(cnf: &config::Config) { let mut reader = StdinReader::new(); let mut messages: HashMap = HashMap::new(); - handshake::read_config(&mut reader); + handshake::read_config(&mut reader).await; handshake::register_filter(); log_messages!(messages); loop { - match Entry::from_bytes(&reader.read_line()) { + match Entry::from_bytes(&reader.read_line().await) { Ok(entry) => { let msg_id = entry.get_msg_id(); match messages.get_mut(&msg_id) { @@ -84,7 +86,7 @@ fn main_loop(cnf: &config::Config) { msg.append_line(entry.get_data()); } else { log::debug!("message ready: {msg_id}"); - msg.sign_and_return(&cnf); + msg.sign_and_return(&cnf).await; messages.remove(&msg_id); log::debug!("message removed: {msg_id}"); } @@ -96,7 +98,7 @@ fn main_loop(cnf: &config::Config) { messages.insert(msg_id, msg); } else { log::debug!("empty new message: {msg_id}"); - msg.sign_and_return(&cnf); + msg.sign_and_return(&cnf).await; } } } diff --git a/src/message.rs b/src/message.rs index b79a56f..c394508 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,7 +1,7 @@ use crate::config::Config; use crate::entry::Entry; use crate::parsed_message::ParsedMessage; -use std::io::{BufWriter, Write}; +use tokio::io::{AsyncWriteExt, BufWriter}; pub const RETURN_SEP: &[u8] = b"|"; pub const RETURN_START: &[u8] = b"filter-dataline|"; @@ -51,7 +51,7 @@ impl Message { self.nb_lines } - pub fn sign_and_return(&self, cnf: &Config) { + pub async fn sign_and_return(&self, cnf: &Config) { log::trace!("content: {}", crate::display_bytes!(&self.content)); match ParsedMessage::from_bytes(&self.content) { Ok(parsed_msg) => { @@ -80,25 +80,26 @@ impl Message { log::error!("{}: unable to parse message", self.session_id); } } - self.print_msg(); + self.print_msg().await; } - fn print_msg(&self) { + async fn print_msg(&self) { let i = self.content.len() - 1; for line in self.content[0..i].split(|&b| b == b'\n') { - self.print_line(line); + self.print_line(line).await; } - self.print_line(b"."); + self.print_line(b".").await; } - fn print_line(&self, line: &[u8]) { - let mut stdout = BufWriter::new(std::io::stdout()); - stdout.write_all(RETURN_START).unwrap(); - stdout.write_all(self.session_id.as_bytes()).unwrap(); - stdout.write_all(RETURN_SEP).unwrap(); - stdout.write_all(self.token.as_bytes()).unwrap(); - stdout.write_all(RETURN_SEP).unwrap(); - stdout.write_all(line).unwrap(); - stdout.write_all(b"\n").unwrap(); + async fn print_line(&self, line: &[u8]) { + let mut stdout = BufWriter::new(tokio::io::stdout()); + stdout.write_all(RETURN_START).await.unwrap(); + stdout.write_all(self.session_id.as_bytes()).await.unwrap(); + stdout.write_all(RETURN_SEP).await.unwrap(); + stdout.write_all(self.token.as_bytes()).await.unwrap(); + stdout.write_all(RETURN_SEP).await.unwrap(); + stdout.write_all(line).await.unwrap(); + stdout.write_all(b"\n").await.unwrap(); + stdout.flush().await.unwrap(); } } diff --git a/src/stdin_reader.rs b/src/stdin_reader.rs index ba16ba9..1868173 100644 --- a/src/stdin_reader.rs +++ b/src/stdin_reader.rs @@ -1,23 +1,28 @@ use crate::display_bytes; -use std::io::{BufRead, BufReader}; +use tokio::io::{AsyncBufReadExt, BufReader}; pub struct StdinReader { - reader: BufReader, + reader: BufReader, buffer: Vec, } impl StdinReader { pub fn new() -> Self { Self { - reader: BufReader::new(std::io::stdin()), + reader: BufReader::new(tokio::io::stdin()), buffer: Vec::with_capacity(crate::DEFAULT_BUFF_SIZE), } } - pub fn read_line(&mut self) -> Vec { + pub async fn read_line(&mut self) -> Vec { self.buffer.clear(); log::trace!("reading line from stdin"); - if self.reader.read_until(b'\n', &mut self.buffer).unwrap() == 0 { + if self + .reader + .read_until(b'\n', &mut self.buffer) + .await + .unwrap() == 0 + { std::process::exit(0) } log::trace!("line read from stdin: {}", display_bytes!(self.buffer));