Start moving to async

This commit is contained in:
Rodolphe Bréard 2023-04-09 11:07:16 +02:00
parent d4f92bc430
commit 648bf5d2c6
5 changed files with 38 additions and 29 deletions

View file

@ -12,3 +12,4 @@ clap = { version = "4.1.13", default-features = false, features = ["std", "deriv
env_logger = { version = "0.10.0", default-features = false }
log = { version = "0.4.17", default-features = false }
nom = { version = "7.1.3", default-features = false }
tokio = { version = "1.27.0", default-features = false, features = ["rt-multi-thread", "io-std", "io-util", "macros", "time", "process"] }

View file

@ -4,9 +4,9 @@ use crate::stdin_reader::StdinReader;
pub const CONFIG_END: &[u8] = b"config|ready\n";
pub const CONFIG_TAG: &[u8] = b"config|";
pub fn read_config(reader: &mut StdinReader) {
pub async fn read_config(reader: &mut StdinReader) {
loop {
let line = reader.read_line();
let line = reader.read_line().await;
if line == CONFIG_END {
log::trace!("configuration is ready");
return;

View file

@ -56,25 +56,27 @@ macro_rules! log_messages {
};
}
fn main() {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
match config::Config::init() {
Ok(cnf) => {
logs::init_log_system(&cnf);
log::debug!("{cnf:?}");
main_loop(&cnf)
main_loop(&cnf).await
}
Err(e) => eprintln!("{e}"),
}
Ok(())
}
fn main_loop(cnf: &config::Config) {
async fn main_loop(cnf: &config::Config) {
let mut reader = StdinReader::new();
let mut messages: HashMap<String, Message> = HashMap::new();
handshake::read_config(&mut reader);
handshake::read_config(&mut reader).await;
handshake::register_filter();
log_messages!(messages);
loop {
match Entry::from_bytes(&reader.read_line()) {
match Entry::from_bytes(&reader.read_line().await) {
Ok(entry) => {
let msg_id = entry.get_msg_id();
match messages.get_mut(&msg_id) {
@ -84,7 +86,7 @@ fn main_loop(cnf: &config::Config) {
msg.append_line(entry.get_data());
} else {
log::debug!("message ready: {msg_id}");
msg.sign_and_return(&cnf);
msg.sign_and_return(&cnf).await;
messages.remove(&msg_id);
log::debug!("message removed: {msg_id}");
}
@ -96,7 +98,7 @@ fn main_loop(cnf: &config::Config) {
messages.insert(msg_id, msg);
} else {
log::debug!("empty new message: {msg_id}");
msg.sign_and_return(&cnf);
msg.sign_and_return(&cnf).await;
}
}
}

View file

@ -1,7 +1,7 @@
use crate::config::Config;
use crate::entry::Entry;
use crate::parsed_message::ParsedMessage;
use std::io::{BufWriter, Write};
use tokio::io::{AsyncWriteExt, BufWriter};
pub const RETURN_SEP: &[u8] = b"|";
pub const RETURN_START: &[u8] = b"filter-dataline|";
@ -51,7 +51,7 @@ impl Message {
self.nb_lines
}
pub fn sign_and_return(&self, cnf: &Config) {
pub async fn sign_and_return(&self, cnf: &Config) {
log::trace!("content: {}", crate::display_bytes!(&self.content));
match ParsedMessage::from_bytes(&self.content) {
Ok(parsed_msg) => {
@ -80,25 +80,26 @@ impl Message {
log::error!("{}: unable to parse message", self.session_id);
}
}
self.print_msg();
self.print_msg().await;
}
fn print_msg(&self) {
async fn print_msg(&self) {
let i = self.content.len() - 1;
for line in self.content[0..i].split(|&b| b == b'\n') {
self.print_line(line);
self.print_line(line).await;
}
self.print_line(b".");
self.print_line(b".").await;
}
fn print_line(&self, line: &[u8]) {
let mut stdout = BufWriter::new(std::io::stdout());
stdout.write_all(RETURN_START).unwrap();
stdout.write_all(self.session_id.as_bytes()).unwrap();
stdout.write_all(RETURN_SEP).unwrap();
stdout.write_all(self.token.as_bytes()).unwrap();
stdout.write_all(RETURN_SEP).unwrap();
stdout.write_all(line).unwrap();
stdout.write_all(b"\n").unwrap();
async fn print_line(&self, line: &[u8]) {
let mut stdout = BufWriter::new(tokio::io::stdout());
stdout.write_all(RETURN_START).await.unwrap();
stdout.write_all(self.session_id.as_bytes()).await.unwrap();
stdout.write_all(RETURN_SEP).await.unwrap();
stdout.write_all(self.token.as_bytes()).await.unwrap();
stdout.write_all(RETURN_SEP).await.unwrap();
stdout.write_all(line).await.unwrap();
stdout.write_all(b"\n").await.unwrap();
stdout.flush().await.unwrap();
}
}

View file

@ -1,23 +1,28 @@
use crate::display_bytes;
use std::io::{BufRead, BufReader};
use tokio::io::{AsyncBufReadExt, BufReader};
pub struct StdinReader {
reader: BufReader<std::io::Stdin>,
reader: BufReader<tokio::io::Stdin>,
buffer: Vec<u8>,
}
impl StdinReader {
pub fn new() -> Self {
Self {
reader: BufReader::new(std::io::stdin()),
reader: BufReader::new(tokio::io::stdin()),
buffer: Vec::with_capacity(crate::DEFAULT_BUFF_SIZE),
}
}
pub fn read_line(&mut self) -> Vec<u8> {
pub async fn read_line(&mut self) -> Vec<u8> {
self.buffer.clear();
log::trace!("reading line from stdin");
if self.reader.read_until(b'\n', &mut self.buffer).unwrap() == 0 {
if self
.reader
.read_until(b'\n', &mut self.buffer)
.await
.unwrap() == 0
{
std::process::exit(0)
}
log::trace!("line read from stdin: {}", display_bytes!(self.buffer));