From c9bc516796ffdb504d2d4636882097a97f6ec6cf Mon Sep 17 00:00:00 2001 From: Abel Lucas Date: Mon, 3 Feb 2025 09:35:45 +0100 Subject: [PATCH] poc --- .../20250201124345_v2_job_status.up.sql | 1 + .../20250201124423_v2_job_completed.up.sql | 2 +- .../20250201124743_v2_job_queue_sync.up.sql | 6 -- ...0250201124744_v2_job_completed_sync.up.sql | 5 +- .../20250201124748_v2_migrate_from_v1.up.sql | 3 +- ...1145630_v2_queue_compatibility_view.up.sql | 2 +- ...v2_completed_job_compatibility_view.up.sql | 6 +- backend/windmill-api/src/jobs.rs | 52 ++++++------- backend/windmill-common/src/jobs.rs | 47 +++--------- backend/windmill-queue/src/jobs.rs | 64 +++++++++++++++- .../windmill-worker/src/result_processor.rs | 75 +++++++------------ backend/windmill-worker/src/worker.rs | 6 ++ backend/windmill-worker/src/worker_flow.rs | 40 +++++++--- 13 files changed, 162 insertions(+), 147 deletions(-) diff --git a/backend/migrations/20250201124345_v2_job_status.up.sql b/backend/migrations/20250201124345_v2_job_status.up.sql index c43ab1f78a82d..3ab10e4fa7dec 100644 --- a/backend/migrations/20250201124345_v2_job_status.up.sql +++ b/backend/migrations/20250201124345_v2_job_status.up.sql @@ -4,6 +4,7 @@ CREATE TABLE IF NOT EXISTS v2_job_status ( -- Flow status fields: flow_status JSONB, flow_leaf_jobs JSONB, + flow_preprocessed_args JSONB, -- Workflow as code fields: workflow_as_code_status JSONB ); diff --git a/backend/migrations/20250201124423_v2_job_completed.up.sql b/backend/migrations/20250201124423_v2_job_completed.up.sql index 350e1a725ee8b..d6756cfd7ab07 100644 --- a/backend/migrations/20250201124423_v2_job_completed.up.sql +++ b/backend/migrations/20250201124423_v2_job_completed.up.sql @@ -4,7 +4,7 @@ ALTER TABLE v2_job_completed ADD COLUMN IF NOT EXISTS status job_status, ADD COLUMN IF NOT EXISTS completed_at TIMESTAMP WITH TIME ZONE, ADD COLUMN IF NOT EXISTS worker VARCHAR(255), - ADD COLUMN IF NOT EXISTS preprocessed_args BOOL NOT NULL DEFAULT FALSE, + ADD COLUMN IF NOT EXISTS preprocessed_args JSONB, ADD COLUMN IF NOT EXISTS workflow_as_code_status JSONB, ADD COLUMN IF NOT EXISTS result_columns TEXT[], ADD COLUMN IF NOT EXISTS extras JSONB; diff --git a/backend/migrations/20250201124743_v2_job_queue_sync.up.sql b/backend/migrations/20250201124743_v2_job_queue_sync.up.sql index 6f931dca0499e..d69ba437d4b6a 100644 --- a/backend/migrations/20250201124743_v2_job_queue_sync.up.sql +++ b/backend/migrations/20250201124743_v2_job_queue_sync.up.sql @@ -149,12 +149,6 @@ CREATE OR REPLACE FUNCTION v2_job_queue_before_update() RETURNS TRIGGER AS $$ BE IF NEW.canceled_by IS NOT NULL THEN NEW.__canceled := TRUE; END IF; - -- `v2_job`: Only `args` are updated - IF NEW.__args::text IS DISTINCT FROM OLD.__args::text THEN - UPDATE v2_job - SET args = NEW.__args - WHERE id = NEW.id; - END IF; -- `v2_job_runtime`: IF NEW.__last_ping IS DISTINCT FROM OLD.__last_ping OR NEW.__mem_peak IS DISTINCT FROM OLD.__mem_peak THEN INSERT INTO v2_job_runtime (id, ping, memory_peak) diff --git a/backend/migrations/20250201124744_v2_job_completed_sync.up.sql b/backend/migrations/20250201124744_v2_job_completed_sync.up.sql index e30cb2cb612ba..5d4ba3520412d 100644 --- a/backend/migrations/20250201124744_v2_job_completed_sync.up.sql +++ b/backend/migrations/20250201124744_v2_job_completed_sync.up.sql @@ -39,7 +39,7 @@ BEGIN NEW.__tag := job.tag; NEW.__priority := job.priority; -- 1. `preprocessed_args` -> `flow_status._metadata.preprocessed_args` - IF NEW.preprocessed_args = true AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN + IF NEW.preprocessed_args IS NOT NULL AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN NEW.flow_status := jsonb_set( coalesce(NEW.flow_status, '{}'::JSONB), '{_metadata}', @@ -96,8 +96,7 @@ BEGIN END; -- 1. `preprocessed_args` <- `flow_status._metadata.preprocessed_args` NEW.preprocessed_args := CASE - WHEN NEW.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE - ELSE FALSE + WHEN NEW.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN NEW.__args END; -- 2. `result_columns` <- `flow_status._metadata.column_order` IF jsonb_typeof(NEW.flow_status->'_metadata'->'column_order') = 'array' AND ( diff --git a/backend/migrations/20250201124748_v2_migrate_from_v1.up.sql b/backend/migrations/20250201124748_v2_migrate_from_v1.up.sql index 00ee695ff7743..8882b21a74b49 100644 --- a/backend/migrations/20250201124748_v2_migrate_from_v1.up.sql +++ b/backend/migrations/20250201124748_v2_migrate_from_v1.up.sql @@ -10,8 +10,7 @@ SET completed_at = started_at + (interval '1 millisecond' * duration_ms), ELSE 'failure'::job_status END, preprocessed_args = CASE - WHEN flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE - ELSE FALSE + WHEN flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN __args END, result_columns = CASE WHEN jsonb_typeof(flow_status->'_metadata'->'column_order') = 'array' AND ( diff --git a/backend/migrations/20250201145630_v2_queue_compatibility_view.up.sql b/backend/migrations/20250201145630_v2_queue_compatibility_view.up.sql index 0c4801bf6ccbf..5a7719011f730 100644 --- a/backend/migrations/20250201145630_v2_queue_compatibility_view.up.sql +++ b/backend/migrations/20250201145630_v2_queue_compatibility_view.up.sql @@ -11,7 +11,7 @@ SELECT q.running, j.runnable_id AS script_hash, j.runnable_path AS script_path, - j.args, + COALESCE(s.flow_preprocessed_args, j.args) AS args, j.raw_code, q.canceled_by IS NOT NULL AS canceled, q.canceled_by, diff --git a/backend/migrations/20250201145631_v2_completed_job_compatibility_view.up.sql b/backend/migrations/20250201145631_v2_completed_job_compatibility_view.up.sql index 6d04a518ae124..6ea2c5209588c 100644 --- a/backend/migrations/20250201145631_v2_completed_job_compatibility_view.up.sql +++ b/backend/migrations/20250201145631_v2_completed_job_compatibility_view.up.sql @@ -10,7 +10,8 @@ SELECT c.status = 'success' AS success, j.runnable_id AS script_hash, j.runnable_path AS script_path, - j.args, + COALESCE(c.preprocessed_args, j.args) + AS args, c.result, FALSE AS deleted, j.raw_code, @@ -33,7 +34,8 @@ SELECT c.memory_peak AS mem_peak, j.tag, j.priority, - NULL::TEXT AS logs + NULL::TEXT AS logs, + result_columns FROM v2_job_completed c JOIN v2_job j USING (id) ; diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 4d0c242f8867c..298a34fe4cc4f 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -680,7 +680,7 @@ macro_rules! get_job_query { ("v2_as_completed_job", $($opts:tt)*) => { get_job_query!( @impl "v2_as_completed_job", ($($opts)*), - "duration_ms, success, result, deleted, is_skipped, result->'wm_labels' as labels, \ + "duration_ms, success, result, result_columns, deleted, is_skipped, result->'wm_labels' as labels, \ CASE WHEN result is null or pg_column_size(result) < 90000 THEN result ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as result", ) }; @@ -2748,6 +2748,7 @@ impl<'a> From for Job { script_path: uj.script_path, args: None, result: None, + result_columns: None, logs: None, flow_status: None, deleted: uj.deleted, @@ -3552,10 +3553,9 @@ pub async fn run_wait_result_internal( let row = sqlx::query!( "SELECT result AS \"result: sqlx::types::Json>\", - language AS \"language: ScriptLang\", - flow_status AS \"flow_status: sqlx::types::Json>\", - success AS \"success!\" - FROM v2_as_completed_job + result_columns, + status = 'success' AS \"success!\" + FROM v2_job_completed WHERE id = $1 AND workspace_id = $2", uuid, &w_id @@ -3564,8 +3564,7 @@ pub async fn run_wait_result_internal( .await?; if let Some(mut raw_result) = row { format_result( - raw_result.language.as_ref(), - raw_result.flow_status.as_ref(), + raw_result.result_columns.as_ref(), raw_result.result.as_mut(), ); result = raw_result.result.map(|x| x.0); @@ -5521,8 +5520,7 @@ async fn get_completed_job<'a>( #[derive(FromRow)] pub struct RawResult { pub result: Option>>, - pub flow_status: Option>>, - pub language: Option, + pub result_columns: Option>, pub created_by: Option, } @@ -5541,11 +5539,11 @@ async fn get_completed_job_result( RawResult, "SELECT result #> $3 AS \"result: sqlx::types::Json>\", - flow_status AS \"flow_status: sqlx::types::Json>\", - language AS \"language: ScriptLang\", + result_columns, created_by AS \"created_by!\" - FROM v2_as_completed_job - WHERE id = $1 AND workspace_id = $2 AND ($4::text[] IS NULL OR tag = ANY($4))", + FROM v2_job_completed c + JOIN v2_job USING (id) + WHERE c.id = $1 AND c.workspace_id = $2 AND ($4::text[] IS NULL OR tag = ANY($4))", id, &w_id, json_path.split(".").collect::>() as Vec<&str>, @@ -5558,11 +5556,11 @@ async fn get_completed_job_result( RawResult, "SELECT result AS \"result: sqlx::types::Json>\", - flow_status AS \"flow_status: sqlx::types::Json>\", - language AS \"language: ScriptLang\", + result_columns, created_by AS \"created_by!\" - FROM v2_as_completed_job - WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + FROM v2_job_completed c + JOIN v2_job USING (id) + WHERE c.id = $1 AND c.workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", id, &w_id, tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>, @@ -5612,8 +5610,7 @@ async fn get_completed_job_result( } format_result( - raw_result.language.as_ref(), - raw_result.flow_status.as_ref(), + raw_result.result_columns.as_ref(), raw_result.result.as_mut(), ); @@ -5685,12 +5682,13 @@ async fn get_completed_job_result_maybe( .flatten(); let result_o = sqlx::query!( "SELECT - result AS \"result: sqlx::types::Json>\", success AS \"success!\", - language AS \"language: ScriptLang\", - flow_status AS \"flow_status: sqlx::types::Json>\", + result AS \"result: sqlx::types::Json>\", + result_columns, + status = 'success' AS \"success!\", created_by AS \"created_by!\" - FROM v2_as_completed_job - WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + FROM v2_job_completed c + JOIN v2_job j USING (id) + WHERE c.id = $1 AND c.workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", id, &w_id, tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>, @@ -5699,11 +5697,7 @@ async fn get_completed_job_result_maybe( .await?; if let Some(mut res) = result_o { - format_result( - res.language.as_ref(), - res.flow_status.as_ref(), - res.result.as_mut(), - ); + format_result(res.result_columns.as_ref(), res.result.as_mut()); if opt_authed.is_none() && res.created_by != "anonymous" { return Err(Error::BadRequest( "As a non logged in user, you can only see jobs ran by anonymous users".to_string(), diff --git a/backend/windmill-common/src/jobs.rs b/backend/windmill-common/src/jobs.rs index 5a784666275d0..32b30a912a439 100644 --- a/backend/windmill-common/src/jobs.rs +++ b/backend/windmill-common/src/jobs.rs @@ -207,6 +207,7 @@ pub struct CompletedJob { pub args: Option>>>, #[serde(skip_serializing_if = "Option::is_none")] pub result: Option>>, + pub result_columns: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub logs: Option, pub deleted: bool, @@ -529,19 +530,9 @@ pub async fn get_payload_tag_from_prefixed_path( Ok((payload, tag, on_behalf_of)) } -#[derive(Deserialize)] -struct FlowStatusMetadata { - column_order: Vec, -} - -#[derive(Deserialize)] -struct FlowStatusWithMetadataOnly { - _metadata: FlowStatusMetadata, -} - pub fn order_columns( rows: Option>>, - column_order: Vec, + column_order: &Vec, ) -> Option> { if let Some(mut rows) = rows { if let Some(first_row) = rows.get(0) { @@ -549,7 +540,7 @@ pub fn order_columns( if let Ok(first_row) = first_row { let mut new_first_row = IndexMap::new(); for col in column_order { - if let Some(val) = first_row.get(&col) { + if let Some(val) = first_row.get(col) { new_first_row.insert(col.clone(), val.clone()); } } @@ -566,39 +557,21 @@ pub fn order_columns( } pub fn format_result( - language: Option<&ScriptLang>, - flow_status: Option<&sqlx::types::Json>>, + result_columns: Option<&Vec>, result: Option<&mut sqlx::types::Json>>, ) -> () { - match language { - Some(&ScriptLang::Postgresql) - | Some(&ScriptLang::Mysql) - | Some(&ScriptLang::Snowflake) - | Some(&ScriptLang::Bigquery) => { - if let Some(Ok(flow_status)) = - flow_status.map(|x| serde_json::from_str::(x.get())) - { - if let Some(result) = result { - let rows = serde_json::from_str::>>(result.get()).ok(); - if let Some(ordered_result) = - order_columns(rows, flow_status._metadata.column_order) - { - *result = sqlx::types::Json(ordered_result); - } - } + if let Some(result_columns) = result_columns { + if let Some(result) = result { + let rows = serde_json::from_str::>>(result.get()).ok(); + if let Some(ordered_result) = order_columns(rows, result_columns) { + *result = sqlx::types::Json(ordered_result); } } - _ => {} } } pub fn format_completed_job_result(mut cj: CompletedJob) -> CompletedJob { - format_result( - cj.language.as_ref(), - cj.flow_status.as_ref(), - cj.result.as_mut(), - ); - + format_result(cj.result_columns.as_ref(), cj.result.as_mut()); cj } diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index c0433518e4ef8..433c7cb1ad902 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -161,6 +161,7 @@ pub async fn cancel_single_job<'c>( &job_running, job_running.mem_peak.unwrap_or(0), Some(CanceledBy { username: Some(username.to_string()), reason: Some(reason) }), + None, e, "server", false, @@ -396,23 +397,43 @@ pub struct WrappedError { pub trait ValidableJson { fn is_valid_json(&self) -> bool; + fn wm_labels(&self) -> Option>; +} + +#[derive(serde::Deserialize)] +struct ResultLabels { + wm_labels: Vec, } impl ValidableJson for WrappedError { fn is_valid_json(&self) -> bool { true } + + fn wm_labels(&self) -> Option> { + None + } } impl ValidableJson for Box { fn is_valid_json(&self) -> bool { !self.get().is_empty() } + + fn wm_labels(&self) -> Option> { + serde_json::from_str::(self.get()) + .ok() + .map(|r| r.wm_labels) + } } -impl ValidableJson for Arc> { +impl ValidableJson for Arc { fn is_valid_json(&self) -> bool { - !self.get().is_empty() + T::is_valid_json(&self) + } + + fn wm_labels(&self) -> Option> { + T::wm_labels(&self) } } @@ -420,12 +441,22 @@ impl ValidableJson for serde_json::Value { fn is_valid_json(&self) -> bool { true } + + fn wm_labels(&self) -> Option> { + serde_json::from_value::(self.clone()) + .ok() + .map(|r| r.wm_labels) + } } impl ValidableJson for Json { fn is_valid_json(&self) -> bool { self.0.is_valid_json() } + + fn wm_labels(&self) -> Option> { + self.0.wm_labels() + } } pub async fn register_metric( @@ -463,6 +494,7 @@ pub async fn add_completed_job_error( queued_job: &QueuedJob, mem_peak: i32, canceled_by: Option, + preprocessed_args: Option>>, e: serde_json::Value, _worker_name: &str, flow_is_done: bool, @@ -500,6 +532,8 @@ pub async fn add_completed_job_error( false, false, Json(&result), + None, + preprocessed_args, mem_peak, canceled_by, flow_is_done, @@ -519,6 +553,8 @@ pub async fn add_completed_job( success: bool, skipped: bool, result: Json<&T>, + result_columns: Option>, + preprocessed_args: Option>>, mem_peak: i32, canceled_by: Option, flow_is_done: bool, @@ -534,6 +570,8 @@ pub async fn add_completed_job( )); } + let result_columns = result_columns.as_ref(); + let preprocessed_args = preprocessed_args.as_ref(); let _job_id = queued_job.id; let (opt_uuid, _duration, _skip_downstream_error_handlers) = (|| async { let mut tx = db.begin().await?; @@ -557,17 +595,20 @@ pub async fn add_completed_job( , started_at , duration_ms , result + , result_columns , canceled_by , canceled_reason , flow_status , memory_peak , status + , preprocessed_args ) - VALUES ($1, $2, $3, COALESCE($12::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($3, now()))))*1000), $5, $7, $8, $9,\ + VALUES ($1, $2, $3, COALESCE($12::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($3, now()))))*1000), $5, $13, $7, $8, $9,\ $11, CASE WHEN $6::BOOL THEN 'canceled'::job_status WHEN $10::BOOL THEN 'skipped'::job_status WHEN $4::BOOL THEN 'success'::job_status - ELSE 'failure'::job_status END) + ELSE 'failure'::job_status END, + COALESCE($14, (SELECT flow_preprocessed_args FROM v2_job_status WHERE id = $2))) ON CONFLICT (id) DO UPDATE SET status = EXCLUDED.status, result = $5 RETURNING duration_ms AS \"duration_ms!\"", /* $1 */ queued_job.workspace_id, /* $2 */ queued_job.id, @@ -581,11 +622,26 @@ pub async fn add_completed_job( /* $10 */ skipped, /* $11 */ if mem_peak > 0 { Some(mem_peak) } else { None }, /* $12 */ duration, + /* $13 */ result_columns as Option<&Vec>, + /* $14 */ preprocessed_args.map(Json) as Option>>>, ) .fetch_one(&mut *tx) .await .map_err(|e| Error::InternalErr(format!("Could not add completed job {job_id}: {e:#}")))?; + if let Some(labels) = result.wm_labels() { + sqlx::query!( + "UPDATE v2_job SET labels = ( + SELECT array_agg(DISTINCT all_labels) + FROM unnest(coalesce(labels, ARRAY[]::TEXT[]) || $2) all_labels + ) WHERE id = $1", + job_id, + labels as Vec + ) + .execute(&mut *tx) + .await + .map_err(|e| Error::InternalErr(format!("Could not update job labels: {e:#}")))?; + } if !queued_job.is_flow_step { if _duration > 500 diff --git a/backend/windmill-worker/src/result_processor.rs b/backend/windmill-worker/src/result_processor.rs index 3449c2ab24cca..beb969bdf22b1 100644 --- a/backend/windmill-worker/src/result_processor.rs +++ b/backend/windmill-worker/src/result_processor.rs @@ -232,6 +232,8 @@ async fn send_job_completed( job_completed_tx: JobCompletedSender, job: Arc, result: Arc>, + result_columns: Option>, + preprocessed_args: Option>>, mem_peak: i32, canceled_by: Option, success: bool, @@ -242,6 +244,8 @@ async fn send_job_completed( let jc = JobCompleted { job, result, + result_columns, + preprocessed_args, mem_peak, canceled_by, success, @@ -272,59 +276,12 @@ pub async fn process_result( ) -> error::Result { match result { Ok(r) => { - let job = if column_order.is_some() || new_args.is_some() { - let mut updated_job = (*job).clone(); - if let Some(column_order) = column_order { - match updated_job.flow_status { - Some(_) => { - tracing::warn!("flow_status was expected to be none"); - } - None => { - updated_job.flow_status = - Some(sqlx::types::Json(to_raw_value(&serde_json::json!({ - "_metadata": { - "column_order": column_order - } - })))); - } - } - } - if let Some(new_args) = new_args { - match updated_job.flow_status { - Some(_) => { - tracing::warn!("flow_status was expected to be none"); - } - None => { - // TODO save original args somewhere - // if let Some(args) = updated_job.args.as_mut() { - // args.0.remove(ENTRYPOINT_OVERRIDE); - // } - updated_job.flow_status = - Some(sqlx::types::Json(to_raw_value(&serde_json::json!({ - "_metadata": { - "preprocessed_args": true - } - })))); - } - } - // For the front-end, change the args by the preprocessed args (script only). - sqlx::query!( - "UPDATE v2_job SET args = $1 WHERE id = $2", - Json(new_args) as Json>>, - updated_job.id - ) - .execute(db) - .await?; - } - Arc::new(updated_job) - } else { - job - }; - send_job_completed( job_completed_tx, job, r, + column_order, + new_args, mem_peak, canceled_by, true, @@ -370,6 +327,8 @@ pub async fn process_result( job_completed_tx, job, Arc::new(to_raw_value(&error_value)), + None, + None, mem_peak, canceled_by, false, @@ -442,7 +401,18 @@ pub async fn handle_receive_completed_job( } pub async fn process_completed_job( - JobCompleted { job, result, mem_peak, success, cached_res_path, canceled_by, duration, .. }: JobCompleted, + JobCompleted { + job, + result, + mem_peak, + success, + cached_res_path, + canceled_by, + duration, + result_columns, + preprocessed_args, + .. + }: JobCompleted, client: &AuthedClient, db: &DB, worker_dir: &str, @@ -468,6 +438,8 @@ pub async fn process_completed_job( true, false, Json(&result), + result_columns, + preprocessed_args, mem_peak.to_owned(), canceled_by, false, @@ -510,6 +482,7 @@ pub async fn process_completed_job( &job, mem_peak.to_owned(), canceled_by, + preprocessed_args, serde_json::from_str(result.get()).unwrap_or_else( |_| json!({ "message": format!("Non serializable error: {}", result.get()) }), ), @@ -580,6 +553,7 @@ pub async fn handle_job_error( job, mem_peak, canceled_by.clone(), + None, err.clone(), worker_name, false, @@ -640,6 +614,7 @@ pub async fn handle_job_error( &parent_job, mem_peak, canceled_by.clone(), + None, e, worker_name, false, diff --git a/backend/windmill-worker/src/worker.rs b/backend/windmill-worker/src/worker.rs index a8167c3077465..ffeefdd3da6e7 100644 --- a/backend/windmill-worker/src/worker.rs +++ b/backend/windmill-worker/src/worker.rs @@ -1488,6 +1488,8 @@ pub async fn run_worker( job: Arc::new(job.job), success: true, result: Arc::new(empty_result()), + result_columns: None, + preprocessed_args: None, mem_peak: 0, cached_res_path: None, token: "".to_string(), @@ -1884,6 +1886,8 @@ pub enum SendResult { pub struct JobCompleted { pub job: Arc, pub result: Arc>, + pub result_columns: Option>, + pub preprocessed_args: Option>>, pub mem_peak: i32, pub success: bool, pub cached_res_path: Option, @@ -2070,6 +2074,8 @@ async fn handle_queued_job( .send(JobCompleted { job, result, + result_columns: None, + preprocessed_args: None, mem_peak: 0, canceled_by: None, success: true, diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index f4b5f40f23703..677a0ba00e1b3 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -408,7 +408,7 @@ pub async fn update_flow_status_after_job_completion_internal( if matches!(module_step, Step::PreprocessorStep) { sqlx::query!( - "UPDATE v2_job SET args = (SELECT result FROM v2_job_completed WHERE id = $1) + "UPDATE v2_job_status SET flow_preprocessed_args = (SELECT result FROM v2_job_completed WHERE id = $1) WHERE id = $2", job_id_for_status, flow @@ -1077,6 +1077,7 @@ pub async fn update_flow_status_after_job_completion_internal( username: flow_job.canceled_by.clone(), reason: flow_job.canceled_reason.clone(), }), + None, canceled_job_to_result(&flow_job), worker_name, true, @@ -1106,6 +1107,8 @@ pub async fn update_flow_status_after_job_completion_internal( success, stop_early && skip_if_stop_early, Json(&nresult), + None, + None, 0, None, true, @@ -1123,6 +1126,8 @@ pub async fn update_flow_status_after_job_completion_internal( |e| json!({"error": format!("Impossible to serialize error: {e:#}")}), ), ), + None, + None, 0, None, true, @@ -1156,8 +1161,18 @@ pub async fn update_flow_status_after_job_completion_internal( db, ) .await; - let _ = add_completed_job_error(db, &flow_job, 0, None, e, worker_name, true, None) - .await; + let _ = add_completed_job_error( + db, + &flow_job, + 0, + None, + None, + e, + worker_name, + true, + None, + ) + .await; true } Ok(_) => false, @@ -1432,7 +1447,7 @@ pub async fn update_flow_status_in_progress( Ok(step) } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub enum Step { Step(usize), PreprocessorStep, @@ -1666,14 +1681,15 @@ async fn push_next_flow_job( Step::FailureStep => status.failure_module.module_status.clone(), }; - let fj: mappable_rc::Marc = flow_job.clone().into(); - let arc_flow_job_args: Marc>> = Marc::map(fj, |x| { - if let Some(args) = &x.args { - &args.0 - } else { - &EHM - } - }); + let arc_flow_job_args = if let (Step::Step(0), Some(result)) = (step, last_job_result.clone()) { + // `args` are retrieved from preprocessor result: + Marc::new(serde_json::from_str(result.get())?) + } else { + Marc::map(flow_job.clone().into(), |x: &QueuedJob| match &x.args { + Some(args) => &args.0, + _ => &EHM, + }) + }; // if this is an empty module of if the module has already been completed, successfully, update the parent flow if flow.modules.is_empty() || matches!(status_module, FlowStatusModule::Success { .. }) {