diff --git a/Cargo.lock b/Cargo.lock index 2e0478df..1732b3d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1491,6 +1491,8 @@ dependencies = [ "kafka", "quick-xml", "regex", + "rove", + "rove_connector", "serde", "test-case", "thiserror", diff --git a/ingestion/Cargo.toml b/ingestion/Cargo.toml index c98585b5..7bb2ee58 100644 --- a/ingestion/Cargo.toml +++ b/ingestion/Cargo.toml @@ -20,6 +20,8 @@ futures.workspace = true kafka.workspace = true quick-xml.workspace = true regex.workspace = true +rove.workspace = true +rove_connector = { path = "../rove_connector" } serde.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/ingestion/src/kldata.rs b/ingestion/src/kldata.rs index 827a536b..42b4f7ab 100644 --- a/ingestion/src/kldata.rs +++ b/ingestion/src/kldata.rs @@ -366,6 +366,7 @@ pub async fn filter_and_label_kldata<'a>( data.push(Datum { timeseries_id, + param_id: param.id, timestamp: in_datum.timestamp, value: in_datum.value, }); diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index 78622cc6..a969e7ab 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -73,6 +73,7 @@ struct IngestorState { db_pool: PgConnectionPool, param_conversions: ParamConversions, // converts param codes to element ids permit_tables: Arc>, + qc_scheduler: Arc>, } impl FromRef for PgConnectionPool { @@ -93,6 +94,12 @@ impl FromRef for Arc for Arc> { + fn from_ref(state: &IngestorState) -> Arc> { + state.qc_scheduler.clone() + } +} + /// Represents the different Data types observation can have #[derive(Debug, PartialEq)] pub enum ObsType<'a> { @@ -103,6 +110,8 @@ pub enum ObsType<'a> { /// Generic container for a piece of data ready to be inserted into the DB pub struct Datum<'a> { timeseries_id: i32, + // needed for QC + param_id: i32, timestamp: DateTime, value: ObsType<'a>, } @@ -110,7 +119,7 @@ pub struct Datum<'a> { pub type Data<'a> = Vec>; // TODO: benchmark insertion of scalar and non-scalar together vs separately? -pub async fn insert_data(data: Data<'_>, conn: &mut PooledPgConn<'_>) -> Result<(), Error> { +pub async fn insert_data(data: &Data<'_>, conn: &mut PooledPgConn<'_>) -> Result<(), Error> { // TODO: the conflict resolution on this query is an imperfect solution, and needs improvement // // I learned from Søren that obsinn and kvalobs organise updates and deletions by sending new @@ -170,6 +179,10 @@ pub async fn insert_data(data: Data<'_>, conn: &mut PooledPgConn<'_>) -> Result< Ok(()) } +pub async fn qc_data(data: &Data<'_>, scheduler: &rove::Scheduler<'static>) -> Result<(), Error> { + todo!() +} + pub mod kldata; use kldata::{filter_and_label_kldata, parse_kldata}; @@ -193,6 +206,7 @@ async fn handle_kldata( State(pool): State, State(param_conversions): State, State(permit_table): State>>, + State(qc_scheduler): State>>, body: String, ) -> Json { let result: Result = async { @@ -204,7 +218,9 @@ async fn handle_kldata( filter_and_label_kldata(obsinn_chunk, &mut conn, param_conversions, permit_table) .await?; - insert_data(data, &mut conn).await?; + insert_data(&data, &mut conn).await?; + + qc_data(&data, &qc_scheduler).await?; Ok(message_id) } @@ -255,10 +271,13 @@ pub async fn run( db_pool: PgConnectionPool, param_conversion_path: &str, permit_tables: Arc>, + qc_scheduler: rove::Scheduler<'static>, ) -> Result<(), Box> { // set up param conversion map let param_conversions = get_conversions(param_conversion_path)?; + let qc_scheduler = Arc::new(qc_scheduler); + // build our application with a single route let app = Router::new() .route("/kldata", post(handle_kldata)) @@ -266,6 +285,7 @@ pub async fn run( db_pool, param_conversions, permit_tables, + qc_scheduler, }); // run our app with hyper, listening globally on port 3001 diff --git a/ingestion/src/main.rs b/ingestion/src/main.rs index a1c1cfeb..3ad678bc 100644 --- a/ingestion/src/main.rs +++ b/ingestion/src/main.rs @@ -1,5 +1,13 @@ use bb8_postgres::PostgresConnectionManager; -use std::sync::{Arc, RwLock}; +use rove::{ + data_switch::{DataConnector, DataSwitch}, + load_pipelines, +}; +use rove_connector::Connector; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; use tokio_postgres::NoTls; use lard_ingestion::permissions; @@ -23,6 +31,22 @@ async fn main() -> Result<(), Box> { let permit_tables = Arc::new(RwLock::new(permissions::fetch_permits().await?)); let background_permit_tables = permit_tables.clone(); + // Set up postgres connection pool + let manager = + PostgresConnectionManager::new_from_stringlike(std::env::var("LARD_CONN_STRING")?, NoTls)?; + let db_pool = bb8::Pool::builder().build(manager).await?; + + // QC system + let scheduler = rove::Scheduler::new( + load_pipelines("").unwrap(), + DataSwitch::new(HashMap::from([( + "lard", + Box::new(Connector { + pool: db_pool.clone(), + }) as Box, + )])), + ); + println!("Spawing task to fetch permissions from StInfoSys..."); // background task to refresh permit tables every 30 mins tokio::task::spawn(async move { @@ -42,11 +66,6 @@ async fn main() -> Result<(), Box> { } }); - // Set up postgres connection pool - let manager = - PostgresConnectionManager::new_from_stringlike(std::env::var("LARD_CONN_STRING")?, NoTls)?; - let db_pool = bb8::Pool::builder().build(manager).await?; - // Spawn kvkafka reader #[cfg(feature = "kafka_prod")] { @@ -60,5 +79,5 @@ async fn main() -> Result<(), Box> { // Set up and run our server + database println!("Ingestion server started!"); - lard_ingestion::run(db_pool, PARAMCONV, permit_tables).await + lard_ingestion::run(db_pool, PARAMCONV, permit_tables, scheduler).await }