diff --git a/Cargo.lock b/Cargo.lock index fa13645..ed90165 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.18" @@ -254,7 +263,11 @@ version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ + "iana-time-zone", + "js-sys", "num-traits", + "wasm-bindgen", + "windows-link", ] [[package]] @@ -314,6 +327,31 @@ dependencies = [ "libc", ] +[[package]] +name = "cron" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-trait", + "base", + "chrono", + "cron 0.15.0", + "log", + "tokio", + "tucana", +] + +[[package]] +name = "cron" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740" +dependencies = [ + "chrono", + "once_cell", + "winnow", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -731,6 +769,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "1.5.0" @@ -2234,12 +2296,65 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -2331,6 +2446,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.6.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28" +dependencies = [ + "memchr", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" diff --git a/Cargo.toml b/Cargo.toml index aab0735..a7aabd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["adapter/rest", "crates/base"] +members = [ "adapter/cron","adapter/rest", "crates/base"] resolver = "3" [workspace.package] @@ -22,6 +22,8 @@ anyhow = "1.0.98" prost = "0.14.0" tonic-health = "0.14.0" futures-lite = "2.6.1" +chrono = "0.4.42" +cron = "0.15.0" [workspace.dependencies.base] path = "../draco/crates/base" diff --git a/adapter/cron/Cargo.toml b/adapter/cron/Cargo.toml new file mode 100644 index 0000000..d555ac2 --- /dev/null +++ b/adapter/cron/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "cron" +version.workspace = true +edition.workspace = true + +[dependencies] +tokio = {workspace = true} +chrono = {workspace = true} +cron = {workspace = true} +base = {workspace = true} +tucana = {workspace = true} +async-trait = {workspace = true} +log = { workspace = true } +anyhow = {workspace = true} diff --git a/adapter/cron/src/main.rs b/adapter/cron/src/main.rs new file mode 100644 index 0000000..16360ed --- /dev/null +++ b/adapter/cron/src/main.rs @@ -0,0 +1,142 @@ +use async_trait::async_trait; +use base::runner::{ServerContext, ServerRunner}; +use base::store::FlowIdentifyResult; +use base::traits::{IdentifiableFlow, LoadConfig, Server}; +use chrono::{DateTime, Datelike, Timelike, Utc}; +use cron::Schedule; +use std::str::FromStr; +use tucana::shared::ValidationFlow; +use tucana::shared::value::Kind; + +#[derive(Default)] +struct Cron {} + +#[derive(Clone)] +struct CronConfig {} + +impl LoadConfig for CronConfig { + fn load() -> Self { + Self {} + } +} + +#[tokio::main] +async fn main() { + let server = Cron::default(); + let runner = ServerRunner::new(server).await.unwrap(); + runner.serve().await.unwrap(); +} + +struct Time { + now: DateTime, +} + +fn extract_flow_setting_field(flow: &ValidationFlow, name: &str) -> Option { + flow.settings + .iter() + .find(|s| s.flow_setting_id == name) + .and_then(|s| s.value.as_ref()) + .and_then(|v| v.kind.as_ref()) + .and_then(|k| match k { + Kind::StringValue(s) => Some(s.clone()), + _ => None, + }) +} + +impl IdentifiableFlow for Time { + fn identify(&self, flow: &tucana::shared::ValidationFlow) -> bool { + let Some(minute) = extract_flow_setting_field(flow, "CRON_MINUTE") else { + return false; + }; + let Some(hour) = extract_flow_setting_field(flow, "CRON_HOUR") else { + return false; + }; + let Some(dom) = extract_flow_setting_field(flow, "CRON_DAY_OF_MONTH") else { + return false; + }; + let Some(month) = extract_flow_setting_field(flow, "CRON_MONTH") else { + return false; + }; + let Some(dow) = extract_flow_setting_field(flow, "CRON_DAY_OF_WEEK") else { + return false; + }; + + let expression = format!("* {} {} {} {} {}", minute, hour, dom, month, dow); + let schedule = match Schedule::from_str(expression.as_str()) { + Ok(s) => s, + Err(e) => { + log::error!( + "Could not create schedule from expression ({}). Reason: {:?}", + expression, + e + ); + return false; + } + }; + let next = match schedule.upcoming(Utc).next() { + Some(n) => n, + None => { + log::error!("Could not find any upcomming schedules"); + return false; + } + }; + + self.now.year() == next.year() + && self.now.month() == next.month() + && self.now.day() == next.day() + && self.now.hour() == next.hour() + && self.now.minute() == next.minute() + } +} + +#[async_trait] +impl Server for Cron { + async fn init(&mut self, _ctx: &ServerContext) -> anyhow::Result<()> { + Ok(()) + } + + async fn run(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { + log::info!("Starting Cron adapter"); + let expression = "0 * * * * *"; + let schedule = Schedule::from_str(expression)?; + let pattern = "CRON.<"; + + loop { + let now = Utc::now(); + log::info!("Scheduled: {:?}", now); + if let Some(next) = schedule.upcoming(Utc).take(1).next() { + let until_next = next - now; + tokio::time::sleep(until_next.to_std()?).await; + + let time = Time { now: next }; + match ctx + .adapter_store + .get_possible_flow_match(pattern.to_string(), time) + .await + { + FlowIdentifyResult::None => { + log::debug!("No Flow identified for this schedule"); + } + FlowIdentifyResult::Single(flow) => { + log::debug!("One Flow identified for this schedule"); + ctx.adapter_store + .validate_and_execute_flow(flow, None) + .await; + } + FlowIdentifyResult::Multiple(flows) => { + log::debug!("Multiple Flows identified for this schedule"); + for flow in flows { + ctx.adapter_store + .validate_and_execute_flow(flow, None) + .await; + } + } + } + } + } + } + + async fn shutdown(&mut self, _ctx: &ServerContext) -> anyhow::Result<()> { + Ok(()) + } +}