diff --git a/Cargo.toml b/Cargo.toml index 69e57a8..7f3a3c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,5 @@ env_logger = { version = "0.10.0", default-features = false } futures = { version = "0.3.28", default-features = false, features = ["std"] } log = { version = "0.4.17", default-features = false } nom = { version = "7.1.3", default-features = false } +sqlx = { version = "0.6.3", default-features = false, features = ["runtime-tokio-rustls", "macros", "migrate", "sqlite", "uuid"] } tokio = { version = "1.27.0", default-features = false, features = ["rt-multi-thread", "io-std", "io-util", "macros", "sync", "time", "process"] } diff --git a/build.rs b/build.rs index 9b6dd22..4d946a3 100644 --- a/build.rs +++ b/build.rs @@ -4,6 +4,10 @@ const VARLIBDIR_NAME: &str = "VARLIBDIR"; const VARLIBDIR_VALUE_DEFAULT: &str = "/var/lib/"; fn main() { + // trigger recompilation when a new migration is added + println!("cargo:rerun-if-changed=migrations"); + + // set the VARLIBDIR env variable if env::var(VARLIBDIR_NAME).is_err() { println!( "cargo:rustc-env={}={}", diff --git a/migrations/20230409141801_initial.sql b/migrations/20230409141801_initial.sql new file mode 100644 index 0000000..d3d840e --- /dev/null +++ b/migrations/20230409141801_initial.sql @@ -0,0 +1,10 @@ +CREATE TABLE key_db ( + selector TEXT, + sdid TEXT, + algorithm TEXT, + creation TEXT, + not_after TEXT, + revocation TEXT, + private_key TEXT, + public_key TEXT +); diff --git a/src/action.rs b/src/action.rs index b5b69cd..29d906f 100644 --- a/src/action.rs +++ b/src/action.rs @@ -1,12 +1,15 @@ use crate::config::Config; use crate::entry::read_entry; +use crate::key::key_rotation; use crate::message::Message; use crate::stdin_reader::StdinReader; +use sqlx::SqlitePool; use std::sync::Arc; use tokio::sync::RwLock; pub enum ActionResult { EndOfStream, + KeyRotation, MessageSent(String), MessageSentError(String), NewEntry(crate::entry::Entry), @@ -15,13 +18,22 @@ pub enum ActionResult { pub async fn new_action( reader_lock: Option>>, - msg_tpl: Option<(Message, &Config)>, + db_opt: Option<(&SqlitePool, &Config)>, + msg_tpl: Option, ) -> ActionResult { if let Some(reader_lock) = reader_lock { return read_entry(reader_lock).await; } - if let Some((msg, cnf)) = msg_tpl { - return msg.sign_and_return(cnf).await; + if let Some((db, cnf)) = db_opt { + match msg_tpl { + Some(msg) => { + return msg.sign_and_return(cnf).await; + } + None => { + key_rotation(db, cnf).await; + return ActionResult::KeyRotation; + } + } } ActionResult::MessageSentError("new_action: invalid parameters".to_string()) } diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..914e3b4 --- /dev/null +++ b/src/db.rs @@ -0,0 +1,17 @@ +use crate::config::Config; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; +use sqlx::{ConnectOptions, SqlitePool}; + +pub async fn init(cnf: &Config) -> Result { + do_init(cnf).await.map_err(|e| e.to_string()) +} + +async fn do_init(cnf: &Config) -> Result { + let mut db_options = SqliteConnectOptions::new() + .filename(cnf.key_data_base()) + .create_if_missing(true); + db_options.log_statements(log::LevelFilter::Trace); + let db_pool = SqlitePoolOptions::new().connect_with(db_options).await?; + sqlx::migrate!().run(&db_pool).await?; + Ok(db_pool) +} diff --git a/src/key.rs b/src/key.rs new file mode 100644 index 0000000..32a33df --- /dev/null +++ b/src/key.rs @@ -0,0 +1,7 @@ +use crate::config::Config; +use sqlx::SqlitePool; + +pub async fn key_rotation(db: &SqlitePool, cnf: &Config) { + use tokio::time::{sleep, Duration}; + sleep(Duration::from_secs(10)).await; +} diff --git a/src/main.rs b/src/main.rs index f7e5c2a..c92b3fc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,8 +2,10 @@ mod action; mod algorithm; mod canonicalization; mod config; +mod db; mod entry; mod handshake; +mod key; mod logs; mod message; mod parsed_message; @@ -15,6 +17,7 @@ use canonicalization::CanonicalizationType; use futures::stream::FuturesUnordered; use futures::StreamExt; use message::Message; +use sqlx::SqlitePool; use std::collections::HashMap; use std::sync::Arc; use stdin_reader::StdinReader; @@ -67,14 +70,17 @@ async fn main() -> Result<(), Box> { Ok(cnf) => { logs::init_log_system(&cnf); log::debug!("{cnf:?}"); - main_loop(&cnf).await + match db::init(&cnf).await { + Ok(pool) => main_loop(&cnf, &pool).await, + Err(e) => eprintln!("{e}"), + } } Err(e) => eprintln!("{e}"), } Ok(()) } -async fn main_loop(cnf: &config::Config) { +async fn main_loop(cnf: &config::Config, db: &SqlitePool) { let mut actions = FuturesUnordered::new(); let mut reader = StdinReader::new(); let mut messages: HashMap = HashMap::new(); @@ -82,9 +88,11 @@ async fn main_loop(cnf: &config::Config) { handshake::register_filter(); log_messages!(messages); let reader_lock = Arc::new(RwLock::new(reader)); - actions.push(new_action(Some(reader_lock.clone()), None)); + actions.push(new_action(Some(reader_lock.clone()), None, None)); + actions.push(new_action(None, Some((db, cnf)), None)); loop { - if actions.is_empty() { + log::debug!("Wat???? {}", actions.len()); + if actions.len() <= 1 { break; } if let Some(action_res) = actions.next().await { @@ -92,6 +100,9 @@ async fn main_loop(cnf: &config::Config) { ActionResult::EndOfStream => { log::debug!("end of input stream"); } + ActionResult::KeyRotation => { + actions.push(new_action(None, Some((db, cnf)), None)); + } ActionResult::MessageSent(msg_id) => { log::debug!("message removed: {msg_id}"); } @@ -108,7 +119,7 @@ async fn main_loop(cnf: &config::Config) { } else { log::debug!("message ready: {msg_id}"); if let Some(m) = messages.remove(&msg_id) { - actions.push(new_action(None, Some((m, cnf)))); + actions.push(new_action(None, Some((db, cnf)), Some(m))); } } } @@ -118,16 +129,16 @@ async fn main_loop(cnf: &config::Config) { if !entry.is_end_of_message() { messages.insert(msg_id.clone(), msg); } else { - actions.push(new_action(None, Some((msg, cnf)))); + actions.push(new_action(None, Some((db, cnf)), Some(msg))); } } } log_messages!(messages); - actions.push(new_action(Some(reader_lock.clone()), None)); + actions.push(new_action(Some(reader_lock.clone()), None, None)); } ActionResult::NewEntryError(err) => { log::error!("invalid filter line: {err}"); - actions.push(new_action(Some(reader_lock.clone()), None)); + actions.push(new_action(Some(reader_lock.clone()), None, None)); } } }