From 5150e8e787364a0b7858948fd9598d7a8a41bc4b Mon Sep 17 00:00:00 2001 From: Abel Lucas Date: Mon, 3 Feb 2025 18:57:26 +0100 Subject: [PATCH] fixup! backend: use v2 tables through views where possible (v2 phase 3) --- .../20250201124423_v2_job_completed.down.sql | 1 - .../20250201124423_v2_job_completed.up.sql | 1 - .../migrations/20250201124431_v2_job.down.sql | 1 + .../migrations/20250201124431_v2_job.up.sql | 1 + .../20250201124743_v2_job_queue_sync.up.sql | 14 +-- ...0250201124744_v2_job_completed_sync.up.sql | 21 ++--- .../20250201124748_v2_migrate_from_v1.up.sql | 18 ++-- ...5629_v2_job_completed_constraints.down.sql | 2 + ...1145630_v2_queue_compatibility_view.up.sql | 3 +- ...v2_completed_job_compatibility_view.up.sql | 3 +- backend/windmill-api/src/jobs.rs | 56 +++++------ backend/windmill-common/src/jobs.rs | 49 +++------- backend/windmill-common/src/worker.rs | 5 +- backend/windmill-queue/src/jobs.rs | 79 +++++++++++++++- backend/windmill-worker/src/bun_executor.rs | 18 ++-- backend/windmill-worker/src/common.rs | 10 -- backend/windmill-worker/src/deno_executor.rs | 17 ++-- backend/windmill-worker/src/php_executor.rs | 9 +- .../windmill-worker/src/python_executor.rs | 29 +++--- .../windmill-worker/src/result_processor.rs | 94 +++++++++---------- backend/windmill-worker/src/worker.rs | 3 + backend/windmill-worker/src/worker_flow.rs | 20 ++-- 22 files changed, 245 insertions(+), 209 deletions(-) diff --git a/backend/migrations/20250201124423_v2_job_completed.down.sql b/backend/migrations/20250201124423_v2_job_completed.down.sql index 34fd3c9a3f9b9..40eb26c745358 100644 --- a/backend/migrations/20250201124423_v2_job_completed.down.sql +++ b/backend/migrations/20250201124423_v2_job_completed.down.sql @@ -3,7 +3,6 @@ ALTER TABLE v2_job_completed DROP COLUMN status CASCADE, DROP COLUMN completed_at CASCADE, DROP COLUMN worker CASCADE, - DROP COLUMN preprocessed_args CASCADE, DROP COLUMN workflow_as_code_status CASCADE, DROP COLUMN result_columns CASCADE, DROP COLUMN extras CASCADE; diff --git a/backend/migrations/20250201124423_v2_job_completed.up.sql b/backend/migrations/20250201124423_v2_job_completed.up.sql index 350e1a725ee8b..90ec6664aba41 100644 --- a/backend/migrations/20250201124423_v2_job_completed.up.sql +++ b/backend/migrations/20250201124423_v2_job_completed.up.sql @@ -4,7 +4,6 @@ ALTER TABLE v2_job_completed ADD COLUMN IF NOT EXISTS status job_status, ADD COLUMN IF NOT EXISTS completed_at TIMESTAMP WITH TIME ZONE, ADD COLUMN IF NOT EXISTS worker VARCHAR(255), - ADD COLUMN IF NOT EXISTS preprocessed_args BOOL NOT NULL DEFAULT FALSE, ADD COLUMN IF NOT EXISTS workflow_as_code_status JSONB, ADD COLUMN IF NOT EXISTS result_columns TEXT[], ADD COLUMN IF NOT EXISTS extras JSONB; diff --git a/backend/migrations/20250201124431_v2_job.down.sql b/backend/migrations/20250201124431_v2_job.down.sql index 604aff9ecdae2..3adc4918aa024 100644 --- a/backend/migrations/20250201124431_v2_job.down.sql +++ b/backend/migrations/20250201124431_v2_job.down.sql @@ -22,6 +22,7 @@ ALTER TABLE v2_job DROP COLUMN flow_innermost_root_job CASCADE, DROP COLUMN trigger CASCADE, DROP COLUMN trigger_kind CASCADE, + DROP COLUMN preprocessed CASCADE, DROP COLUMN same_worker CASCADE, DROP COLUMN visible_to_owner CASCADE, DROP COLUMN concurrent_limit CASCADE, diff --git a/backend/migrations/20250201124431_v2_job.up.sql b/backend/migrations/20250201124431_v2_job.up.sql index 43556d36252b9..29ff7f88f7ea1 100644 --- a/backend/migrations/20250201124431_v2_job.up.sql +++ b/backend/migrations/20250201124431_v2_job.up.sql @@ -20,6 +20,7 @@ ALTER TABLE v2_job ADD COLUMN IF NOT EXISTS flow_innermost_root_job UUID, ADD COLUMN IF NOT EXISTS trigger VARCHAR(255), ADD COLUMN IF NOT EXISTS trigger_kind job_trigger_kind, + ADD COLUMN IF NOT EXISTS preprocessed BOOLEAN, ADD COLUMN IF NOT EXISTS same_worker BOOLEAN DEFAULT FALSE NOT NULL, ADD COLUMN IF NOT EXISTS visible_to_owner BOOLEAN DEFAULT TRUE NOT NULL, ADD COLUMN IF NOT EXISTS concurrent_limit INTEGER, diff --git a/backend/migrations/20250201124743_v2_job_queue_sync.up.sql b/backend/migrations/20250201124743_v2_job_queue_sync.up.sql index 6f931dca0499e..a0aa0c3197e75 100644 --- a/backend/migrations/20250201124743_v2_job_queue_sync.up.sql +++ b/backend/migrations/20250201124743_v2_job_queue_sync.up.sql @@ -53,7 +53,7 @@ BEGIN NEW.__args = jsonb_set( coalesce(NEW.__args, '{}'::JSONB), '{_ENTRYPOINT_OVERRIDE}', - job.script_entrypoint_override + to_jsonb(job.script_entrypoint_override) ); END IF; RETURN NEW; @@ -74,7 +74,7 @@ CREATE OR REPLACE FUNCTION v2_job_queue_after_insert() RETURNS TRIGGER AS $$ BEG kind, runnable_id, runnable_path, parent_job, script_lang, script_entrypoint_override, flow_step, flow_step_id, flow_innermost_root_job, - trigger, trigger_kind, + trigger, trigger_kind, preprocessed, tag, same_worker, visible_to_owner, concurrent_limit, concurrency_time_window_s, cache_ttl, timeout, priority, args, pre_run_error, raw_code, raw_lock, raw_flow @@ -84,6 +84,7 @@ CREATE OR REPLACE FUNCTION v2_job_queue_after_insert() RETURNS TRIGGER AS $$ BEG NEW.__language, NEW.__args->>'_ENTRYPOINT_OVERRIDE', NULL, NEW.__flow_step_id, NEW.__root_job, NEW.__schedule_path, CASE WHEN NEW.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, + CASE WHEN NEW.__args->>'wm_trigger' IS NOT NULL THEN FALSE END, NEW.tag, NEW.__same_worker, NEW.__visible_to_owner, NEW.__concurrent_limit, NEW.__concurrency_time_window_s, NEW.__cache_ttl, NEW.__timeout, NEW.priority, NEW.__args, NEW.__pre_run_error, @@ -107,6 +108,7 @@ CREATE OR REPLACE FUNCTION v2_job_queue_after_insert() RETURNS TRIGGER AS $$ BEG trigger_kind = EXCLUDED.trigger_kind, tag = EXCLUDED.tag, same_worker = EXCLUDED.same_worker, + preprocessed = EXCLUDED.preprocessed, visible_to_owner = EXCLUDED.visible_to_owner, concurrent_limit = EXCLUDED.concurrent_limit, concurrency_time_window_s = EXCLUDED.concurrency_time_window_s, @@ -150,9 +152,9 @@ CREATE OR REPLACE FUNCTION v2_job_queue_before_update() RETURNS TRIGGER AS $$ BE NEW.__canceled := TRUE; END IF; -- `v2_job`: Only `args` are updated - IF NEW.__args::text IS DISTINCT FROM OLD.__args::text THEN + IF NEW.__args::TEXT IS DISTINCT FROM OLD.__args::TEXT THEN UPDATE v2_job - SET args = NEW.__args + SET args = NEW.__args, preprocessed = CASE WHEN preprocessed = FALSE THEN TRUE END WHERE id = NEW.id; END IF; -- `v2_job_runtime`: @@ -165,8 +167,8 @@ CREATE OR REPLACE FUNCTION v2_job_queue_before_update() RETURNS TRIGGER AS $$ BE ; END IF; -- `v2_job_status`: - IF NEW.__flow_status::text IS DISTINCT FROM OLD.__flow_status::text OR - NEW.__leaf_jobs::text IS DISTINCT FROM OLD.__leaf_jobs::text THEN + IF NEW.__flow_status::TEXT IS DISTINCT FROM OLD.__flow_status::TEXT OR + NEW.__leaf_jobs::TEXT IS DISTINCT FROM OLD.__leaf_jobs::TEXT THEN IF NEW.__is_flow_step = false AND NEW.__parent_job IS NOT NULL THEN -- `workflow_as_code`: INSERT INTO v2_job_status (id, workflow_as_code_status) diff --git a/backend/migrations/20250201124744_v2_job_completed_sync.up.sql b/backend/migrations/20250201124744_v2_job_completed_sync.up.sql index e30cb2cb612ba..a9773022340bc 100644 --- a/backend/migrations/20250201124744_v2_job_completed_sync.up.sql +++ b/backend/migrations/20250201124744_v2_job_completed_sync.up.sql @@ -8,9 +8,8 @@ DECLARE job v2_job; DECLARE final_labels TEXT[]; BEGIN -- New columns synchronization: - -- 1. `preprocessed_args` <-> `flow_status._metadata.preprocessed_args` - -- 2. `result_columns` <-> `flow_status._metadata.column_order` - -- 3. `v2_job.labels` <-> `result.wm_labels` + -- 1. `result_columns` <-> `flow_status._metadata.column_order` + -- 2. `v2_job.labels` <-> `result.wm_labels` IF NEW.__created_by IS NULL THEN -- v2 -> v1 -- When inserting to `v2_job_completed` from `v2` code, set `v1` columns: @@ -38,8 +37,7 @@ BEGIN NEW.__visible_to_owner := job.visible_to_owner; NEW.__tag := job.tag; NEW.__priority := job.priority; - -- 1. `preprocessed_args` -> `flow_status._metadata.preprocessed_args` - IF NEW.preprocessed_args = true AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN + IF job.preprocessed = TRUE AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN NEW.flow_status := jsonb_set( coalesce(NEW.flow_status, '{}'::JSONB), '{_metadata}', @@ -50,7 +48,7 @@ BEGIN ) ); END IF; - -- 2. `result_columns` -> `flow_status._metadata.column_order` + -- 1. `result_columns` -> `flow_status._metadata.column_order` IF NEW.result_columns IS NOT NULL AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN NEW.flow_status := jsonb_set( coalesce(NEW.flow_status, '{}'::JSONB), @@ -62,7 +60,7 @@ BEGIN ) ); END IF; - -- 3. `v2_job.labels` -> `result.wm_labels` + -- 2. `v2_job.labels` -> `result.wm_labels` IF job.labels IS NOT NULL AND (NEW.result IS NULL OR jsonb_typeof(NEW.result) = 'object') THEN IF jsonb_typeof(NEW.result->'wm_labels') = 'array' AND ( SELECT bool_and(jsonb_typeof(elem) = 'string') @@ -94,19 +92,14 @@ BEGIN WHEN NEW.__success THEN 'success'::job_status ELSE 'failure'::job_status END; - -- 1. `preprocessed_args` <- `flow_status._metadata.preprocessed_args` - NEW.preprocessed_args := CASE - WHEN NEW.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE - ELSE FALSE - END; - -- 2. `result_columns` <- `flow_status._metadata.column_order` + -- 1. `result_columns` <- `flow_status._metadata.column_order` IF jsonb_typeof(NEW.flow_status->'_metadata'->'column_order') = 'array' AND ( SELECT bool_and(jsonb_typeof(elem) = 'string') FROM jsonb_array_elements(NEW.flow_status->'_metadata'->'column_order') AS elem ) THEN NEW.result_columns := translate(NEW.flow_status->'_metadata'->>'column_order', '[]', '{}')::TEXT[]; END IF; - -- 3. `v2_job.labels` <- `result.wm_labels` + -- 2. `v2_job.labels` <- `result.wm_labels` IF jsonb_typeof(NEW.result->'wm_labels') = 'array' AND ( SELECT bool_and(jsonb_typeof(elem) = 'string') FROM jsonb_array_elements(NEW.result->'wm_labels') AS elem diff --git a/backend/migrations/20250201124748_v2_migrate_from_v1.up.sql b/backend/migrations/20250201124748_v2_migrate_from_v1.up.sql index 00ee695ff7743..68242e4915050 100644 --- a/backend/migrations/20250201124748_v2_migrate_from_v1.up.sql +++ b/backend/migrations/20250201124748_v2_migrate_from_v1.up.sql @@ -9,10 +9,6 @@ SET completed_at = started_at + (interval '1 millisecond' * duration_ms), WHEN __success THEN 'success'::job_status ELSE 'failure'::job_status END, - preprocessed_args = CASE - WHEN flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE - ELSE FALSE - END, result_columns = CASE WHEN jsonb_typeof(flow_status->'_metadata'->'column_order') = 'array' AND ( SELECT bool_and(jsonb_typeof(elem) = 'string') @@ -27,7 +23,7 @@ INSERT INTO v2_job ( kind, runnable_id, runnable_path, parent_job, script_lang, script_entrypoint_override, flow_step_id, flow_innermost_root_job, - trigger, trigger_kind, + trigger, trigger_kind, preprocessed, tag, same_worker, visible_to_owner, concurrent_limit, concurrency_time_window_s, cache_ttl, timeout, priority, args, pre_run_error, raw_code, raw_lock, raw_flow @@ -37,6 +33,7 @@ INSERT INTO v2_job ( __language, __args->>'_ENTRYPOINT_OVERRIDE', __flow_step_id, __root_job, __schedule_path, CASE WHEN __schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, + CASE WHEN __args->>'wm_trigger' IS NOT NULL THEN FALSE END, tag, __same_worker, __visible_to_owner, __concurrent_limit, __concurrency_time_window_s, __cache_ttl, __timeout, priority, __args, __pre_run_error, @@ -49,7 +46,7 @@ INSERT INTO v2_job ( id, workspace_id, created_at, created_by, permissioned_as, permissioned_as_email, kind, runnable_id, runnable_path, parent_job, script_lang, script_entrypoint_override, - trigger, trigger_kind, + trigger, trigger_kind, preprocessed, tag, visible_to_owner, priority, args, raw_code, raw_lock, raw_flow, @@ -59,6 +56,10 @@ INSERT INTO v2_job ( __job_kind, __script_hash, __script_path, __parent_job, __language, __args->>'_ENTRYPOINT_OVERRIDE', __schedule_path, CASE WHEN __schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, + CASE + WHEN flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE + WHEN __args->>'wm_trigger' IS NOT NULL THEN FALSE + END, __tag, __visible_to_owner, __priority, __args, __raw_code, __raw_lock, __raw_flow, @@ -92,6 +93,7 @@ UPDATE v2_job SET flow_innermost_root_job = v2_job_queue.__root_job, trigger = v2_job_queue.__schedule_path, trigger_kind = CASE WHEN v2_job_queue.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, + preprocessed = CASE WHEN v2_job_queue.__args->>'wm_trigger' IS NOT NULL THEN FALSE END, tag = v2_job_queue.tag, same_worker = v2_job_queue.__same_worker, visible_to_owner = v2_job_queue.__visible_to_owner, @@ -122,6 +124,10 @@ UPDATE v2_job SET script_entrypoint_override = v2_job_completed.__args->>'_ENTRYPOINT_OVERRIDE', trigger = v2_job_completed.__schedule_path, trigger_kind = CASE WHEN v2_job_completed.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, + preprocessed = CASE + WHEN v2_job_completed.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE + WHEN v2_job_completed.__args->>'wm_trigger' IS NOT NULL THEN FALSE + END, tag = v2_job_completed.__tag, visible_to_owner = v2_job_completed.__visible_to_owner, priority = v2_job_completed.__priority, diff --git a/backend/migrations/20250201145629_v2_job_completed_constraints.down.sql b/backend/migrations/20250201145629_v2_job_completed_constraints.down.sql index 928a5feb5a44a..bf4a3994701fa 100644 --- a/backend/migrations/20250201145629_v2_job_completed_constraints.down.sql +++ b/backend/migrations/20250201145629_v2_job_completed_constraints.down.sql @@ -1,4 +1,6 @@ -- Add down migration script here +LOCK TABLE v2_job_completed IN ACCESS EXCLUSIVE MODE; +UPDATE v2_job_completed SET started_at = completed_at WHERE started_at IS NULL; ALTER TABLE v2_job_completed ALTER COLUMN status DROP NOT NULL; ALTER TABLE v2_job_completed ALTER COLUMN completed_at DROP DEFAULT; ALTER TABLE v2_job_completed ALTER COLUMN completed_at DROP NOT NULL; diff --git a/backend/migrations/20250201145630_v2_queue_compatibility_view.up.sql b/backend/migrations/20250201145630_v2_queue_compatibility_view.up.sql index 0c4801bf6ccbf..4fb27be8d1924 100644 --- a/backend/migrations/20250201145630_v2_queue_compatibility_view.up.sql +++ b/backend/migrations/20250201145630_v2_queue_compatibility_view.up.sql @@ -42,7 +42,8 @@ SELECT j.flow_step_id, j.cache_ttl, j.priority, - NULL::TEXT AS logs + NULL::TEXT AS logs, + j.script_entrypoint_override FROM v2_job_queue q JOIN v2_job j USING (id) LEFT JOIN v2_job_runtime r USING (id) diff --git a/backend/migrations/20250201145631_v2_completed_job_compatibility_view.up.sql b/backend/migrations/20250201145631_v2_completed_job_compatibility_view.up.sql index 6d04a518ae124..d4c013397f8dd 100644 --- a/backend/migrations/20250201145631_v2_completed_job_compatibility_view.up.sql +++ b/backend/migrations/20250201145631_v2_completed_job_compatibility_view.up.sql @@ -33,7 +33,8 @@ SELECT c.memory_peak AS mem_peak, j.tag, j.priority, - NULL::TEXT AS logs + NULL::TEXT AS logs, + result_columns FROM v2_job_completed c JOIN v2_job j USING (id) ; diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 4d0c242f8867c..4c89054304c2e 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -680,7 +680,7 @@ macro_rules! get_job_query { ("v2_as_completed_job", $($opts:tt)*) => { get_job_query!( @impl "v2_as_completed_job", ($($opts)*), - "duration_ms, success, result, deleted, is_skipped, result->'wm_labels' as labels, \ + "duration_ms, success, result, result_columns, deleted, is_skipped, result->'wm_labels' as labels, \ CASE WHEN result is null or pg_column_size(result) < 90000 THEN result ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as result", ) }; @@ -688,7 +688,8 @@ macro_rules! get_job_query { get_job_query!( @impl "v2_as_queue", ($($opts)*), "scheduled_for, running, last_ping, suspend, suspend_until, same_worker, pre_run_error, visible_to_owner, \ - root_job, leaf_jobs, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id, cache_ttl", + root_job, leaf_jobs, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id, cache_ttl,\ + script_entrypoint_override", ) }; (@impl $table:literal, (with_logs: $with_logs:expr, $($rest:tt)*), $additional_fields:literal, $($args:tt)*) => { @@ -2748,6 +2749,7 @@ impl<'a> From for Job { script_path: uj.script_path, args: None, result: None, + result_columns: None, logs: None, flow_status: None, deleted: uj.deleted, @@ -2794,6 +2796,7 @@ impl<'a> From for Job { permissioned_as: uj.permissioned_as, is_flow_step: uj.is_flow_step, language: uj.language, + script_entrypoint_override: None, same_worker: false, pre_run_error: None, email: uj.email, @@ -3552,10 +3555,9 @@ pub async fn run_wait_result_internal( let row = sqlx::query!( "SELECT result AS \"result: sqlx::types::Json>\", - language AS \"language: ScriptLang\", - flow_status AS \"flow_status: sqlx::types::Json>\", - success AS \"success!\" - FROM v2_as_completed_job + result_columns, + status = 'success' AS \"success!\" + FROM v2_job_completed WHERE id = $1 AND workspace_id = $2", uuid, &w_id @@ -3564,8 +3566,7 @@ pub async fn run_wait_result_internal( .await?; if let Some(mut raw_result) = row { format_result( - raw_result.language.as_ref(), - raw_result.flow_status.as_ref(), + raw_result.result_columns.as_ref(), raw_result.result.as_mut(), ); result = raw_result.result.map(|x| x.0); @@ -5521,8 +5522,7 @@ async fn get_completed_job<'a>( #[derive(FromRow)] pub struct RawResult { pub result: Option>>, - pub flow_status: Option>>, - pub language: Option, + pub result_columns: Option>, pub created_by: Option, } @@ -5541,11 +5541,11 @@ async fn get_completed_job_result( RawResult, "SELECT result #> $3 AS \"result: sqlx::types::Json>\", - flow_status AS \"flow_status: sqlx::types::Json>\", - language AS \"language: ScriptLang\", + result_columns, created_by AS \"created_by!\" - FROM v2_as_completed_job - WHERE id = $1 AND workspace_id = $2 AND ($4::text[] IS NULL OR tag = ANY($4))", + FROM v2_job_completed c + JOIN v2_job USING (id) + WHERE c.id = $1 AND c.workspace_id = $2 AND ($4::text[] IS NULL OR tag = ANY($4))", id, &w_id, json_path.split(".").collect::>() as Vec<&str>, @@ -5558,11 +5558,11 @@ async fn get_completed_job_result( RawResult, "SELECT result AS \"result: sqlx::types::Json>\", - flow_status AS \"flow_status: sqlx::types::Json>\", - language AS \"language: ScriptLang\", + result_columns, created_by AS \"created_by!\" - FROM v2_as_completed_job - WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + FROM v2_job_completed c + JOIN v2_job USING (id) + WHERE c.id = $1 AND c.workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", id, &w_id, tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>, @@ -5612,8 +5612,7 @@ async fn get_completed_job_result( } format_result( - raw_result.language.as_ref(), - raw_result.flow_status.as_ref(), + raw_result.result_columns.as_ref(), raw_result.result.as_mut(), ); @@ -5685,12 +5684,13 @@ async fn get_completed_job_result_maybe( .flatten(); let result_o = sqlx::query!( "SELECT - result AS \"result: sqlx::types::Json>\", success AS \"success!\", - language AS \"language: ScriptLang\", - flow_status AS \"flow_status: sqlx::types::Json>\", + result AS \"result: sqlx::types::Json>\", + result_columns, + status = 'success' AS \"success!\", created_by AS \"created_by!\" - FROM v2_as_completed_job - WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + FROM v2_job_completed c + JOIN v2_job j USING (id) + WHERE c.id = $1 AND c.workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", id, &w_id, tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>, @@ -5699,11 +5699,7 @@ async fn get_completed_job_result_maybe( .await?; if let Some(mut res) = result_o { - format_result( - res.language.as_ref(), - res.flow_status.as_ref(), - res.result.as_mut(), - ); + format_result(res.result_columns.as_ref(), res.result.as_mut()); if opt_authed.is_none() && res.created_by != "anonymous" { return Err(Error::BadRequest( "As a non logged in user, you can only see jobs ran by anonymous users".to_string(), diff --git a/backend/windmill-common/src/jobs.rs b/backend/windmill-common/src/jobs.rs index 5a784666275d0..be06cdd22199c 100644 --- a/backend/windmill-common/src/jobs.rs +++ b/backend/windmill-common/src/jobs.rs @@ -71,6 +71,7 @@ pub struct QueuedJob { pub script_hash: Option, #[serde(skip_serializing_if = "Option::is_none")] pub script_path: Option, + pub script_entrypoint_override: Option, pub args: Option>>>, #[serde(skip_serializing_if = "Option::is_none")] pub logs: Option, @@ -170,6 +171,7 @@ impl Default for QueuedJob { flow_status: None, is_flow_step: false, language: None, + script_entrypoint_override: None, same_worker: false, pre_run_error: None, email: "".to_string(), @@ -207,6 +209,7 @@ pub struct CompletedJob { pub args: Option>>>, #[serde(skip_serializing_if = "Option::is_none")] pub result: Option>>, + pub result_columns: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub logs: Option, pub deleted: bool, @@ -529,19 +532,9 @@ pub async fn get_payload_tag_from_prefixed_path( Ok((payload, tag, on_behalf_of)) } -#[derive(Deserialize)] -struct FlowStatusMetadata { - column_order: Vec, -} - -#[derive(Deserialize)] -struct FlowStatusWithMetadataOnly { - _metadata: FlowStatusMetadata, -} - pub fn order_columns( rows: Option>>, - column_order: Vec, + column_order: &Vec, ) -> Option> { if let Some(mut rows) = rows { if let Some(first_row) = rows.get(0) { @@ -549,7 +542,7 @@ pub fn order_columns( if let Ok(first_row) = first_row { let mut new_first_row = IndexMap::new(); for col in column_order { - if let Some(val) = first_row.get(&col) { + if let Some(val) = first_row.get(col) { new_first_row.insert(col.clone(), val.clone()); } } @@ -566,39 +559,21 @@ pub fn order_columns( } pub fn format_result( - language: Option<&ScriptLang>, - flow_status: Option<&sqlx::types::Json>>, + result_columns: Option<&Vec>, result: Option<&mut sqlx::types::Json>>, ) -> () { - match language { - Some(&ScriptLang::Postgresql) - | Some(&ScriptLang::Mysql) - | Some(&ScriptLang::Snowflake) - | Some(&ScriptLang::Bigquery) => { - if let Some(Ok(flow_status)) = - flow_status.map(|x| serde_json::from_str::(x.get())) - { - if let Some(result) = result { - let rows = serde_json::from_str::>>(result.get()).ok(); - if let Some(ordered_result) = - order_columns(rows, flow_status._metadata.column_order) - { - *result = sqlx::types::Json(ordered_result); - } - } + if let Some(result_columns) = result_columns { + if let Some(result) = result { + let rows = serde_json::from_str::>>(result.get()).ok(); + if let Some(ordered_result) = order_columns(rows, result_columns) { + *result = sqlx::types::Json(ordered_result); } } - _ => {} } } pub fn format_completed_job_result(mut cj: CompletedJob) -> CompletedJob { - format_result( - cj.language.as_ref(), - cj.flow_status.as_ref(), - cj.result.as_mut(), - ); - + format_result(cj.result_columns.as_ref(), cj.result.as_mut()); cj } diff --git a/backend/windmill-common/src/worker.rs b/backend/windmill-common/src/worker.rs index 6fd5d15f4ccde..42a26f0ff06c9 100644 --- a/backend/windmill-common/src/worker.rs +++ b/backend/windmill-common/src/worker.rs @@ -131,7 +131,7 @@ fn format_pull_query(peek: String) -> String { flow_innermost_root_job AS root_job, flow_step_id, flow_step_id IS NOT NULL AS is_flow_step, same_worker, pre_run_error, visible_to_owner, tag, concurrent_limit, concurrency_time_window_s, timeout, cache_ttl, priority, raw_code, raw_lock, - raw_flow + raw_flow, script_entrypoint_override FROM v2_job WHERE id = (SELECT id FROM peek) ) SELECT id, workspace_id, parent_job, created_by, created_at, started_at, scheduled_for, @@ -141,7 +141,8 @@ fn format_pull_query(peek: String) -> String { same_worker, pre_run_error, email, visible_to_owner, mem_peak, root_job, flow_leaf_jobs as leaf_jobs, tag, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id, cache_ttl, priority, - raw_code, raw_lock, raw_flow + raw_code, raw_lock, raw_flow, + script_entrypoint_override FROM q, r, j LEFT JOIN v2_job_status f USING (id)", peek diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index c0433518e4ef8..b4c18a1b7d05d 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -396,23 +396,43 @@ pub struct WrappedError { pub trait ValidableJson { fn is_valid_json(&self) -> bool; + fn wm_labels(&self) -> Option>; +} + +#[derive(serde::Deserialize)] +struct ResultLabels { + wm_labels: Vec, } impl ValidableJson for WrappedError { fn is_valid_json(&self) -> bool { true } + + fn wm_labels(&self) -> Option> { + None + } } impl ValidableJson for Box { fn is_valid_json(&self) -> bool { !self.get().is_empty() } + + fn wm_labels(&self) -> Option> { + serde_json::from_str::(self.get()) + .ok() + .map(|r| r.wm_labels) + } } -impl ValidableJson for Arc> { +impl ValidableJson for Arc { fn is_valid_json(&self) -> bool { - !self.get().is_empty() + T::is_valid_json(&self) + } + + fn wm_labels(&self) -> Option> { + T::wm_labels(&self) } } @@ -420,12 +440,22 @@ impl ValidableJson for serde_json::Value { fn is_valid_json(&self) -> bool { true } + + fn wm_labels(&self) -> Option> { + serde_json::from_value::(self.clone()) + .ok() + .map(|r| r.wm_labels) + } } impl ValidableJson for Json { fn is_valid_json(&self) -> bool { self.0.is_valid_json() } + + fn wm_labels(&self) -> Option> { + self.0.wm_labels() + } } pub async fn register_metric( @@ -500,6 +530,7 @@ pub async fn add_completed_job_error( false, false, Json(&result), + None, mem_peak, canceled_by, flow_is_done, @@ -519,6 +550,7 @@ pub async fn add_completed_job( success: bool, skipped: bool, result: Json<&T>, + result_columns: Option>, mem_peak: i32, canceled_by: Option, flow_is_done: bool, @@ -534,6 +566,7 @@ pub async fn add_completed_job( )); } + let result_columns = result_columns.as_ref(); let _job_id = queued_job.id; let (opt_uuid, _duration, _skip_downstream_error_handlers) = (|| async { let mut tx = db.begin().await?; @@ -557,13 +590,14 @@ pub async fn add_completed_job( , started_at , duration_ms , result + , result_columns , canceled_by , canceled_reason , flow_status , memory_peak , status ) - VALUES ($1, $2, $3, COALESCE($12::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($3, now()))))*1000), $5, $7, $8, $9,\ + VALUES ($1, $2, $3, COALESCE($12::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($3, now()))))*1000), $5, $13, $7, $8, $9,\ $11, CASE WHEN $6::BOOL THEN 'canceled'::job_status WHEN $10::BOOL THEN 'skipped'::job_status WHEN $4::BOOL THEN 'success'::job_status @@ -581,11 +615,25 @@ pub async fn add_completed_job( /* $10 */ skipped, /* $11 */ if mem_peak > 0 { Some(mem_peak) } else { None }, /* $12 */ duration, + /* $13 */ result_columns as Option<&Vec>, ) .fetch_one(&mut *tx) .await .map_err(|e| Error::InternalErr(format!("Could not add completed job {job_id}: {e:#}")))?; + if let Some(labels) = result.wm_labels() { + sqlx::query!( + "UPDATE v2_job SET labels = ( + SELECT array_agg(DISTINCT all_labels) + FROM unnest(coalesce(labels, ARRAY[]::TEXT[]) || $2) all_labels + ) WHERE id = $1", + job_id, + labels as Vec + ) + .execute(&mut *tx) + .await + .map_err(|e| Error::InternalErr(format!("Could not update job labels: {e:#}")))?; + } if !queued_job.is_flow_step { if _duration > 500 @@ -3674,16 +3722,35 @@ pub async fn push<'c, 'd>( }; let raw_flow = raw_flow.map(Json); + let preprocessed = if args.args.contains_key("wm_trigger") + || args + .extra + .as_ref() + .map_or(false, |x| x.contains_key("wm_trigger")) + { + Some(false) + } else { + None + }; + let script_entrypoint_override: Option = match args.args.get(ENTRYPOINT_OVERRIDE) { + Some(x) => Some(x.clone()), + None => args + .extra + .as_mut() + .map(|extra| extra.remove(ENTRYPOINT_OVERRIDE)) + .flatten(), + } + .and_then(|x| serde_json::from_str(x.get()).ok()); sqlx::query!( "INSERT INTO v2_job (id, workspace_id, raw_code, raw_lock, raw_flow, tag, parent_job, created_by, permissioned_as, runnable_id, runnable_path, args, kind, trigger, script_lang, same_worker, pre_run_error, permissioned_as_email, visible_to_owner, flow_innermost_root_job, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id, - cache_ttl, priority, trigger_kind) + cache_ttl, priority, trigger_kind, preprocessed, script_entrypoint_override) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, - CASE WHEN $14::VARCHAR IS NOT NULL THEN 'schedule'::job_trigger_kind END)", + CASE WHEN $14::VARCHAR IS NOT NULL THEN 'schedule'::job_trigger_kind END, $27, $28)", job_id, workspace_id, raw_code, @@ -3714,6 +3781,8 @@ pub async fn push<'c, 'd>( flow_step_id, cache_ttl, final_priority, + preprocessed, + script_entrypoint_override, ) .execute(&mut *tx) .warn_after_seconds(1) diff --git a/backend/windmill-worker/src/bun_executor.rs b/backend/windmill-worker/src/bun_executor.rs index 77bad32a5d98e..d4ad47c5e26dc 100644 --- a/backend/windmill-worker/src/bun_executor.rs +++ b/backend/windmill-worker/src/bun_executor.rs @@ -19,9 +19,8 @@ use crate::common::build_envs_map; use crate::{ common::{ - create_args_and_out_file, get_main_override, get_reserved_variables, parse_npm_config, - read_file, read_file_content, read_result, start_child_process, write_file_binary, - OccupancyMetrics, + create_args_and_out_file, get_reserved_variables, parse_npm_config, read_file, + read_file_content, read_result, start_child_process, write_file_binary, OccupancyMetrics, }, handle_child::handle_child, AuthedClientBackgroundTask, BUNFIG_INSTALL_SCOPES, BUN_BUNDLE_CACHE_DIR, BUN_CACHE_DIR, @@ -875,7 +874,7 @@ pub async fn handle_bun_job( if codebase.is_some() { annotation.nodejs = true } - let (main_override, apply_preprocessor) = match get_main_override(job.args.as_ref()) { + let (main_override, apply_preprocessor) = match job.script_entrypoint_override.as_deref() { Some(main_override) => { if main_override == PREPROCESSOR_FAKE_ENTRYPOINT { (None, true) @@ -1051,9 +1050,12 @@ pub async fn handle_bun_job( return Ok(()) as error::Result<()>; } // let mut start = Instant::now(); - let args = - windmill_parser_ts::parse_deno_signature(inner_content, true, main_override.clone())? - .args; + let args = windmill_parser_ts::parse_deno_signature( + inner_content, + true, + main_override.map(ToString::to_string), + )? + .args; let pre_args = if apply_preprocessor { Some( @@ -1087,7 +1089,7 @@ pub async fn handle_bun_job( let spread = args.into_iter().map(|x| x.name).join(","); // logs.push_str(format!("infer args: {:?}\n", start.elapsed().as_micros()).as_str()); // we cannot use Bun.read and Bun.write because it results in an EBADF error on cloud - let main_name = main_override.unwrap_or("main".to_string()); + let main_name = main_override.unwrap_or("main"); let main_import = if codebase.is_some() || has_bundle_cache { "./main.js" diff --git a/backend/windmill-worker/src/common.rs b/backend/windmill-worker/src/common.rs index 7cd4ef643b101..56a44284b6544 100644 --- a/backend/windmill-worker/src/common.rs +++ b/backend/windmill-worker/src/common.rs @@ -12,7 +12,6 @@ use sqlx::types::Json; use sqlx::{Pool, Postgres}; use tokio::process::Command; use tokio::{fs::File, io::AsyncReadExt}; -use windmill_common::jobs::ENTRYPOINT_OVERRIDE; #[cfg(feature = "parquet")] use windmill_common::s3_helpers::{ @@ -440,15 +439,6 @@ pub async fn build_envs_map(context: Vec) -> HashMap>>>) -> Option { - return args - .map(|x| { - x.0.get(ENTRYPOINT_OVERRIDE) - .map(|x| x.get().to_string().replace("\"", "")) - }) - .flatten(); -} - pub fn sizeof_val(v: &serde_json::Value) -> usize { std::mem::size_of::() + match v { diff --git a/backend/windmill-worker/src/deno_executor.rs b/backend/windmill-worker/src/deno_executor.rs index 2385771a159af..ae3520ec6406c 100644 --- a/backend/windmill-worker/src/deno_executor.rs +++ b/backend/windmill-worker/src/deno_executor.rs @@ -7,8 +7,8 @@ use windmill_queue::{append_logs, CanceledBy}; use crate::{ common::{ - create_args_and_out_file, get_main_override, get_reserved_variables, parse_npm_config, - read_file, read_result, start_child_process, OccupancyMetrics, + create_args_and_out_file, get_reserved_variables, parse_npm_config, read_file, read_result, + start_child_process, OccupancyMetrics, }, handle_child::handle_child, AuthedClientBackgroundTask, DENO_CACHE_DIR, DENO_PATH, DISABLE_NSJAIL, HOME_ENV, @@ -197,7 +197,7 @@ pub async fn handle_deno_job( let logs1 = "\n\n--- DENO CODE EXECUTION ---\n".to_string(); append_logs(&job.id, &job.workspace_id, logs1, db).await; - let (main_override, apply_preprocessor) = match get_main_override(job.args.as_ref()) { + let (main_override, apply_preprocessor) = match job.script_entrypoint_override.as_deref() { Some(main_override) => { if main_override == PREPROCESSOR_FAKE_ENTRYPOINT { (None, true) @@ -212,9 +212,12 @@ pub async fn handle_deno_job( let write_wrapper_f = async { // let mut start = Instant::now(); - let args = - windmill_parser_ts::parse_deno_signature(inner_content, true, main_override.clone())? - .args; + let args = windmill_parser_ts::parse_deno_signature( + inner_content, + true, + main_override.map(ToString::to_string), + )? + .args; let pre_args = if apply_preprocessor { Some( @@ -246,7 +249,7 @@ pub async fn handle_deno_job( .join("\n "); let spread = args.into_iter().map(|x| x.name).join(","); - let main_name = main_override.unwrap_or("main".to_string()); + let main_name = main_override.unwrap_or("main"); // logs.push_str(format!("infer args: {:?}\n", start.elapsed().as_micros()).as_str()); let (preprocessor_import, preprocessor) = if let Some(pre_args) = pre_args { let pre_spread = pre_args.into_iter().map(|x| x.name).join(","); diff --git a/backend/windmill-worker/src/php_executor.rs b/backend/windmill-worker/src/php_executor.rs index 7eb0d9542900c..b95319b6d5c63 100644 --- a/backend/windmill-worker/src/php_executor.rs +++ b/backend/windmill-worker/src/php_executor.rs @@ -190,11 +190,14 @@ pub async fn handle_php_job( let _ = write_file(job_dir, "main.php", inner_content)?; - let main_override = get_main_override(job.args.as_ref()); + let main_override = job.script_entrypoint_override.as_deref(); let write_wrapper_f = async { - let args = - windmill_parser_php::parse_php_signature(inner_content, main_override.clone())?.args; + let args = windmill_parser_php::parse_php_signature( + inner_content, + main_override.map(ToString::to_string), + )? + .args; let args_to_include = args .iter() diff --git a/backend/windmill-worker/src/python_executor.rs b/backend/windmill-worker/src/python_executor.rs index 237c2e03a5a3a..2e798a8cf0876 100644 --- a/backend/windmill-worker/src/python_executor.rs +++ b/backend/windmill-worker/src/python_executor.rs @@ -10,7 +10,7 @@ use anyhow::anyhow; use itertools::Itertools; use regex::Regex; use serde_json::value::RawValue; -use sqlx::{types::Json, Pool, Postgres}; +use sqlx::{Pool, Postgres}; use tokio::{ fs::{metadata, DirBuilder, File}, io::AsyncReadExt, @@ -35,8 +35,8 @@ use windmill_common::{ #[cfg(feature = "enterprise")] use windmill_common::variables::get_secret_value_as_admin; -use windmill_queue::{append_logs, CanceledBy}; use std::env::var; +use windmill_queue::{append_logs, CanceledBy}; lazy_static::lazy_static! { static ref PYTHON_PATH: String = @@ -84,8 +84,8 @@ use windmill_common::s3_helpers::OBJECT_STORE_CACHE_SETTINGS; use crate::{ common::{ - create_args_and_out_file, get_main_override, get_reserved_variables, read_file, - read_result, start_child_process, OccupancyMetrics, + create_args_and_out_file, get_reserved_variables, read_file, read_result, + start_child_process, OccupancyMetrics, }, handle_child::handle_child, AuthedClientBackgroundTask, DISABLE_NSJAIL, DISABLE_NUSER, HOME_ENV, INSTANCE_PYTHON_VERSION, @@ -902,8 +902,6 @@ pub async fn handle_python_job( tracing::debug!("Finished deps postinstall stage"); } - - if no_uv { append_logs( &job.id, @@ -937,9 +935,9 @@ pub async fn handle_python_job( pre_spread, ) = prepare_wrapper( job_dir, + job.script_entrypoint_override.as_deref(), inner_content, &script_path, - job.args.as_ref(), false, ) .await?; @@ -1057,7 +1055,7 @@ except BaseException as e: // Usefull if certain wheels needs to be preinstalled before execution. let global_site_packages_path = py_version.to_cache_dir() + "/global-site-packages"; let additional_python_paths_folders = { - let mut paths= additional_python_paths.clone(); + let mut paths = additional_python_paths.clone(); if std::fs::metadata(&global_site_packages_path).is_ok() { // We want global_site_packages_path to be included in additonal_python_paths_folders, but // we don't want it to be included in global_site_packages_path. @@ -1207,9 +1205,9 @@ mount {{ async fn prepare_wrapper( job_dir: &str, + job_script_entrypoint_override: Option<&str>, inner_content: &str, script_path: &str, - args: Option<&Json>>>, skip_preprocessor: bool, ) -> error::Result<( &'static str, @@ -1223,7 +1221,7 @@ async fn prepare_wrapper( Option, Option, )> { - let (main_override, apply_preprocessor) = match get_main_override(args) { + let (main_override, apply_preprocessor) = match job_script_entrypoint_override { Some(main_override) => { if !skip_preprocessor && main_override == PREPROCESSOR_FAKE_ENTRYPOINT { (None, true) @@ -1272,7 +1270,10 @@ async fn prepare_wrapper( let _ = write_file(job_dir, "loader.py", RELATIVE_PYTHON_LOADER)?; } - let sig = windmill_parser_py::parse_python_signature(inner_content, main_override.clone())?; + let sig = windmill_parser_py::parse_python_signature( + inner_content, + main_override.map(ToString::to_string), + )?; let pre_sig = if apply_preprocessor { Some(windmill_parser_py::parse_python_signature( @@ -1389,7 +1390,7 @@ async fn prepare_wrapper( last, transforms, spread, - main_override, + main_override.map(ToString::to_string), pre_spread, )) } @@ -1721,7 +1722,7 @@ async fn spawn_uv_install( // Track https://github.com/astral-sh/uv/issues/6715 if let Some(cert_path) = INDEX_CERT.as_ref() { // Once merged --cert can be used instead - // + // // command_args.extend(["--cert", cert_path]); envs.push(("SSL_CERT_FILE", cert_path)); } @@ -2458,7 +2459,7 @@ pub async fn start_worker( spread, _, _, - ) = prepare_wrapper(job_dir, inner_content, script_path, _args.as_ref(), true).await?; + ) = prepare_wrapper(job_dir, None, inner_content, script_path, true).await?; { let indented_transforms = transforms diff --git a/backend/windmill-worker/src/result_processor.rs b/backend/windmill-worker/src/result_processor.rs index 3449c2ab24cca..55ffd5c9237b7 100644 --- a/backend/windmill-worker/src/result_processor.rs +++ b/backend/windmill-worker/src/result_processor.rs @@ -232,6 +232,7 @@ async fn send_job_completed( job_completed_tx: JobCompletedSender, job: Arc, result: Arc>, + result_columns: Option>, mem_peak: i32, canceled_by: Option, success: bool, @@ -242,6 +243,7 @@ async fn send_job_completed( let jc = JobCompleted { job, result, + result_columns, mem_peak, canceled_by, success, @@ -272,59 +274,22 @@ pub async fn process_result( ) -> error::Result { match result { Ok(r) => { - let job = if column_order.is_some() || new_args.is_some() { - let mut updated_job = (*job).clone(); - if let Some(column_order) = column_order { - match updated_job.flow_status { - Some(_) => { - tracing::warn!("flow_status was expected to be none"); - } - None => { - updated_job.flow_status = - Some(sqlx::types::Json(to_raw_value(&serde_json::json!({ - "_metadata": { - "column_order": column_order - } - })))); - } - } - } - if let Some(new_args) = new_args { - match updated_job.flow_status { - Some(_) => { - tracing::warn!("flow_status was expected to be none"); - } - None => { - // TODO save original args somewhere - // if let Some(args) = updated_job.args.as_mut() { - // args.0.remove(ENTRYPOINT_OVERRIDE); - // } - updated_job.flow_status = - Some(sqlx::types::Json(to_raw_value(&serde_json::json!({ - "_metadata": { - "preprocessed_args": true - } - })))); - } - } - // For the front-end, change the args by the preprocessed args (script only). - sqlx::query!( - "UPDATE v2_job SET args = $1 WHERE id = $2", - Json(new_args) as Json>>, - updated_job.id - ) - .execute(db) - .await?; - } - Arc::new(updated_job) - } else { - job - }; + // Update script args to preprocessed args + if let Some(preprocessed_args) = new_args { + sqlx::query!( + "UPDATE v2_job SET args = $1, preprocessed = TRUE WHERE id = $2", + Json(preprocessed_args) as Json>>, + job.id + ) + .execute(db) + .await?; + } send_job_completed( job_completed_tx, job, r, + column_order, mem_peak, canceled_by, true, @@ -370,6 +335,7 @@ pub async fn process_result( job_completed_tx, job, Arc::new(to_raw_value(&error_value)), + None, mem_peak, canceled_by, false, @@ -442,7 +408,17 @@ pub async fn handle_receive_completed_job( } pub async fn process_completed_job( - JobCompleted { job, result, mem_peak, success, cached_res_path, canceled_by, duration, .. }: JobCompleted, + JobCompleted { + job, + result, + mem_peak, + success, + cached_res_path, + canceled_by, + duration, + result_columns, + .. + }: JobCompleted, client: &AuthedClient, db: &DB, worker_dir: &str, @@ -462,12 +438,32 @@ pub async fn process_completed_job( let job_id = job.id.clone(); let workspace_id = job.workspace_id.clone(); + if job.flow_step_id.as_deref() == Some("preprocessor") { + // Do this before inserting to `v2_job_completed` for backwards compatibility + // when we set `flow_status->_metadata->preprocessed_args` to true. + sqlx::query!( + r#"UPDATE v2_job SET + args = '{"reason":"PREPROCESSOR_ARGS_ARE_DISCARDED"}'::jsonb, + preprocessed = TRUE + WHERE id = $1 AND preprocessed = FALSE"#, + job.id + ) + .execute(db) + .await + .map_err(|e| { + Error::InternalErr(format!( + "error while deleting args of preprocessing step: {e:#}" + )) + })?; + } + add_completed_job( db, &job, true, false, Json(&result), + result_columns, mem_peak.to_owned(), canceled_by, false, diff --git a/backend/windmill-worker/src/worker.rs b/backend/windmill-worker/src/worker.rs index a8167c3077465..c11a9f0adba28 100644 --- a/backend/windmill-worker/src/worker.rs +++ b/backend/windmill-worker/src/worker.rs @@ -1488,6 +1488,7 @@ pub async fn run_worker( job: Arc::new(job.job), success: true, result: Arc::new(empty_result()), + result_columns: None, mem_peak: 0, cached_res_path: None, token: "".to_string(), @@ -1884,6 +1885,7 @@ pub enum SendResult { pub struct JobCompleted { pub job: Arc, pub result: Arc>, + pub result_columns: Option>, pub mem_peak: i32, pub success: bool, pub cached_res_path: Option, @@ -2070,6 +2072,7 @@ async fn handle_queued_job( .send(JobCompleted { job, result, + result_columns: None, mem_peak: 0, canceled_by: None, success: true, diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index f4b5f40f23703..2dc82222e4e36 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -408,7 +408,9 @@ pub async fn update_flow_status_after_job_completion_internal( if matches!(module_step, Step::PreprocessorStep) { sqlx::query!( - "UPDATE v2_job SET args = (SELECT result FROM v2_job_completed WHERE id = $1) + "UPDATE v2_job SET + args = (SELECT result FROM v2_job_completed WHERE id = $1), + preprocessed = TRUE WHERE id = $2", job_id_for_status, flow @@ -420,18 +422,6 @@ pub async fn update_flow_status_after_job_completion_internal( "error while updating args in preprocessing step: {e:#}" )) })?; - - sqlx::query!( - r#"UPDATE v2_job SET args = '{"reason":"PREPROCESSOR_ARGS_ARE_DISCARDED"}'::jsonb WHERE id = $1"#, - job_id_for_status - ) - .execute(db) - .await - .map_err(|e| { - Error::InternalErr(format!( - "error while deleting args of preprocessing step: {e:#}" - )) - })?; } let mut tx = db.begin().await?; @@ -1106,6 +1096,7 @@ pub async fn update_flow_status_after_job_completion_internal( success, stop_early && skip_if_stop_early, Json(&nresult), + None, 0, None, true, @@ -1123,6 +1114,7 @@ pub async fn update_flow_status_after_job_completion_internal( |e| json!({"error": format!("Impossible to serialize error: {e:#}")}), ), ), + None, 0, None, true, @@ -1432,7 +1424,7 @@ pub async fn update_flow_status_in_progress( Ok(step) } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub enum Step { Step(usize), PreprocessorStep,