Skip to content

Commit

Permalink
ingestion: plumbing for QC and stub qc_data implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
intarga committed Dec 11, 2024
1 parent 01c6fbe commit 938f74d
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ futures.workspace = true
kafka.workspace = true
quick-xml.workspace = true
regex.workspace = true
rove.workspace = true
rove_connector = { path = "../rove_connector" }
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
Expand Down
1 change: 1 addition & 0 deletions ingestion/src/kldata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ pub async fn filter_and_label_kldata<'a>(

data.push(Datum {
timeseries_id,
param_id: param.id,
timestamp: in_datum.timestamp,
value: in_datum.value,
});
Expand Down
24 changes: 22 additions & 2 deletions ingestion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ struct IngestorState {
db_pool: PgConnectionPool,
param_conversions: ParamConversions, // converts param codes to element ids
permit_tables: Arc<RwLock<(ParamPermitTable, StationPermitTable)>>,
qc_scheduler: Arc<rove::Scheduler<'static>>,
}

impl FromRef<IngestorState> for PgConnectionPool {
Expand All @@ -93,6 +94,12 @@ impl FromRef<IngestorState> for Arc<RwLock<(ParamPermitTable, StationPermitTable
}
}

impl FromRef<IngestorState> for Arc<rove::Scheduler<'static>> {
fn from_ref(state: &IngestorState) -> Arc<rove::Scheduler<'static>> {
state.qc_scheduler.clone()
}
}

/// Represents the different Data types observation can have
#[derive(Debug, PartialEq)]
pub enum ObsType<'a> {
Expand All @@ -103,14 +110,16 @@ pub enum ObsType<'a> {
/// Generic container for a piece of data ready to be inserted into the DB
pub struct Datum<'a> {
timeseries_id: i32,
// needed for QC
param_id: i32,
timestamp: DateTime<Utc>,
value: ObsType<'a>,
}

pub type Data<'a> = Vec<Datum<'a>>;

// TODO: benchmark insertion of scalar and non-scalar together vs separately?
pub async fn insert_data(data: Data<'_>, conn: &mut PooledPgConn<'_>) -> Result<(), Error> {
pub async fn insert_data(data: &Data<'_>, conn: &mut PooledPgConn<'_>) -> Result<(), Error> {
// TODO: the conflict resolution on this query is an imperfect solution, and needs improvement
//
// I learned from Søren that obsinn and kvalobs organise updates and deletions by sending new
Expand Down Expand Up @@ -170,6 +179,10 @@ pub async fn insert_data(data: Data<'_>, conn: &mut PooledPgConn<'_>) -> Result<
Ok(())
}

pub async fn qc_data(data: &Data<'_>, scheduler: &rove::Scheduler<'static>) -> Result<(), Error> {
todo!()
}

pub mod kldata;
use kldata::{filter_and_label_kldata, parse_kldata};

Expand All @@ -193,6 +206,7 @@ async fn handle_kldata(
State(pool): State<PgConnectionPool>,
State(param_conversions): State<ParamConversions>,
State(permit_table): State<Arc<RwLock<(ParamPermitTable, StationPermitTable)>>>,
State(qc_scheduler): State<Arc<rove::Scheduler<'static>>>,
body: String,
) -> Json<KldataResp> {
let result: Result<usize, Error> = async {
Expand All @@ -204,7 +218,9 @@ async fn handle_kldata(
filter_and_label_kldata(obsinn_chunk, &mut conn, param_conversions, permit_table)
.await?;

insert_data(data, &mut conn).await?;
insert_data(&data, &mut conn).await?;

qc_data(&data, &qc_scheduler).await?;

Ok(message_id)
}
Expand Down Expand Up @@ -255,17 +271,21 @@ pub async fn run(
db_pool: PgConnectionPool,
param_conversion_path: &str,
permit_tables: Arc<RwLock<(ParamPermitTable, StationPermitTable)>>,
qc_scheduler: rove::Scheduler<'static>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// set up param conversion map
let param_conversions = get_conversions(param_conversion_path)?;

let qc_scheduler = Arc::new(qc_scheduler);

// build our application with a single route
let app = Router::new()
.route("/kldata", post(handle_kldata))
.with_state(IngestorState {
db_pool,
param_conversions,
permit_tables,
qc_scheduler,
});

// run our app with hyper, listening globally on port 3001
Expand Down
33 changes: 26 additions & 7 deletions ingestion/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
use bb8_postgres::PostgresConnectionManager;
use std::sync::{Arc, RwLock};
use rove::{
data_switch::{DataConnector, DataSwitch},
load_pipelines,
};
use rove_connector::Connector;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use tokio_postgres::NoTls;

use lard_ingestion::permissions;
Expand All @@ -23,6 +31,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let permit_tables = Arc::new(RwLock::new(permissions::fetch_permits().await?));
let background_permit_tables = permit_tables.clone();

// Set up postgres connection pool
let manager =
PostgresConnectionManager::new_from_stringlike(std::env::var("LARD_CONN_STRING")?, NoTls)?;
let db_pool = bb8::Pool::builder().build(manager).await?;

// QC system
let scheduler = rove::Scheduler::new(
load_pipelines("").unwrap(),
DataSwitch::new(HashMap::from([(
"lard",
Box::new(Connector {
pool: db_pool.clone(),
}) as Box<dyn DataConnector + Send>,
)])),
);

println!("Spawing task to fetch permissions from StInfoSys...");
// background task to refresh permit tables every 30 mins
tokio::task::spawn(async move {
Expand All @@ -42,11 +66,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}
});

// Set up postgres connection pool
let manager =
PostgresConnectionManager::new_from_stringlike(std::env::var("LARD_CONN_STRING")?, NoTls)?;
let db_pool = bb8::Pool::builder().build(manager).await?;

// Spawn kvkafka reader
#[cfg(feature = "kafka_prod")]
{
Expand All @@ -60,5 +79,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

// Set up and run our server + database
println!("Ingestion server started!");
lard_ingestion::run(db_pool, PARAMCONV, permit_tables).await
lard_ingestion::run(db_pool, PARAMCONV, permit_tables, scheduler).await
}

0 comments on commit 938f74d

Please sign in to comment.