From 7a84e2b0f63bd2154e6e0301f940e0941c9616dc Mon Sep 17 00:00:00 2001 From: Abel Lucas Date: Tue, 4 Feb 2025 06:00:13 +0100 Subject: [PATCH] fixup! backend: use v2 tables through views where possible (v2 phase 3) --- .../20250201124743_v2_job_queue_sync.up.sql | 2 +- ...0250201124744_v2_job_completed_sync.up.sql | 31 ++++++++++++------- .../20250201124748_v2_migrate_from_v1.up.sql | 8 ++--- ...1145630_v2_queue_compatibility_view.up.sql | 1 + ...v2_completed_job_compatibility_view.up.sql | 1 + backend/windmill-api/openapi.yaml | 4 +++ backend/windmill-api/src/jobs.rs | 7 ++++- backend/windmill-common/src/jobs.rs | 5 +++ backend/windmill-common/src/worker.rs | 5 ++- backend/windmill-queue/src/jobs.rs | 17 +++++----- .../runs/PreprocessedArgsDisplay.svelte | 5 ++- .../(root)/(logged)/run/[...run]/+page.svelte | 2 +- 12 files changed, 54 insertions(+), 34 deletions(-) diff --git a/backend/migrations/20250201124743_v2_job_queue_sync.up.sql b/backend/migrations/20250201124743_v2_job_queue_sync.up.sql index a0aa0c3197e75..4f93efb9ea7f7 100644 --- a/backend/migrations/20250201124743_v2_job_queue_sync.up.sql +++ b/backend/migrations/20250201124743_v2_job_queue_sync.up.sql @@ -84,7 +84,7 @@ CREATE OR REPLACE FUNCTION v2_job_queue_after_insert() RETURNS TRIGGER AS $$ BEG NEW.__language, NEW.__args->>'_ENTRYPOINT_OVERRIDE', NULL, NEW.__flow_step_id, NEW.__root_job, NEW.__schedule_path, CASE WHEN NEW.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, - CASE WHEN NEW.__args->>'wm_trigger' IS NOT NULL THEN FALSE END, + CASE WHEN NEW.__args->'wm_trigger'->>'kind' = 'webhook' THEN FALSE END, NEW.tag, NEW.__same_worker, NEW.__visible_to_owner, NEW.__concurrent_limit, NEW.__concurrency_time_window_s, NEW.__cache_ttl, NEW.__timeout, NEW.priority, NEW.__args, NEW.__pre_run_error, diff --git a/backend/migrations/20250201124744_v2_job_completed_sync.up.sql b/backend/migrations/20250201124744_v2_job_completed_sync.up.sql index a9773022340bc..0b30faaa1a4fe 100644 --- a/backend/migrations/20250201124744_v2_job_completed_sync.up.sql +++ b/backend/migrations/20250201124744_v2_job_completed_sync.up.sql @@ -10,6 +10,7 @@ BEGIN -- New columns synchronization: -- 1. `result_columns` <-> `flow_status._metadata.column_order` -- 2. `v2_job.labels` <-> `result.wm_labels` + -- 3. `v2_job.preprocessed` <-> flow_status._metadata.preprocessed_args` IF NEW.__created_by IS NULL THEN -- v2 -> v1 -- When inserting to `v2_job_completed` from `v2` code, set `v1` columns: @@ -37,17 +38,6 @@ BEGIN NEW.__visible_to_owner := job.visible_to_owner; NEW.__tag := job.tag; NEW.__priority := job.priority; - IF job.preprocessed = TRUE AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN - NEW.flow_status := jsonb_set( - coalesce(NEW.flow_status, '{}'::JSONB), - '{_metadata}', - jsonb_set( - coalesce(NEW.flow_status->'_metadata', '{}'::JSONB), - '{preprocessed_args}', - 'true'::JSONB - ) - ); - END IF; -- 1. `result_columns` -> `flow_status._metadata.column_order` IF NEW.result_columns IS NOT NULL AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN NEW.flow_status := jsonb_set( @@ -83,6 +73,18 @@ BEGIN to_jsonb(final_labels) ); END IF; + -- 3. `v2_job.preprocessed` -> flow_status._metadata.preprocessed_args` + IF job.preprocessed = TRUE AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN + NEW.flow_status := jsonb_set( + coalesce(NEW.flow_status, '{}'::JSONB), + '{_metadata}', + jsonb_set( + coalesce(NEW.flow_status->'_metadata', '{}'::JSONB), + '{preprocessed_args}', + 'true'::JSONB + ) + ); + END IF; ELSE -- v1 -> v2 NEW.completed_at := now(); @@ -114,6 +116,13 @@ BEGIN ) WHERE id = NEW.id; END IF; + -- 3. `v2_job.preprocessed` <- flow_status._metadata.preprocessed_args` + IF NEW.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN + UPDATE v2_job SET + args = NEW.__args, + preprocessed = TRUE + WHERE id = NEW.id AND preprocessed = FALSE; + END IF; END IF; RETURN NEW; END $$ LANGUAGE plpgsql; diff --git a/backend/migrations/20250201124748_v2_migrate_from_v1.up.sql b/backend/migrations/20250201124748_v2_migrate_from_v1.up.sql index 68242e4915050..52ee55403a69b 100644 --- a/backend/migrations/20250201124748_v2_migrate_from_v1.up.sql +++ b/backend/migrations/20250201124748_v2_migrate_from_v1.up.sql @@ -33,7 +33,7 @@ INSERT INTO v2_job ( __language, __args->>'_ENTRYPOINT_OVERRIDE', __flow_step_id, __root_job, __schedule_path, CASE WHEN __schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, - CASE WHEN __args->>'wm_trigger' IS NOT NULL THEN FALSE END, + CASE WHEN __args->'wm_trigger'->>'kind' = 'webhook' THEN FALSE END, tag, __same_worker, __visible_to_owner, __concurrent_limit, __concurrency_time_window_s, __cache_ttl, __timeout, priority, __args, __pre_run_error, @@ -58,7 +58,7 @@ INSERT INTO v2_job ( __schedule_path, CASE WHEN __schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, CASE WHEN flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE - WHEN __args->>'wm_trigger' IS NOT NULL THEN FALSE + WHEN __args->'wm_trigger'->>'kind' = 'webhook' THEN FALSE END, __tag, __visible_to_owner, __priority, __args, @@ -93,7 +93,7 @@ UPDATE v2_job SET flow_innermost_root_job = v2_job_queue.__root_job, trigger = v2_job_queue.__schedule_path, trigger_kind = CASE WHEN v2_job_queue.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, - preprocessed = CASE WHEN v2_job_queue.__args->>'wm_trigger' IS NOT NULL THEN FALSE END, + preprocessed = CASE WHEN v2_job_queue.__args->'wm_trigger'->>'kind' = 'webhook' THEN FALSE END, tag = v2_job_queue.tag, same_worker = v2_job_queue.__same_worker, visible_to_owner = v2_job_queue.__visible_to_owner, @@ -126,7 +126,7 @@ UPDATE v2_job SET trigger_kind = CASE WHEN v2_job_completed.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, preprocessed = CASE WHEN v2_job_completed.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE - WHEN v2_job_completed.__args->>'wm_trigger' IS NOT NULL THEN FALSE + WHEN v2_job_completed.__args->'wm_trigger'->>'kind' = 'webhook' THEN FALSE END, tag = v2_job_completed.__tag, visible_to_owner = v2_job_completed.__visible_to_owner, diff --git a/backend/migrations/20250201145630_v2_queue_compatibility_view.up.sql b/backend/migrations/20250201145630_v2_queue_compatibility_view.up.sql index 4fb27be8d1924..177705e3c64fb 100644 --- a/backend/migrations/20250201145630_v2_queue_compatibility_view.up.sql +++ b/backend/migrations/20250201145630_v2_queue_compatibility_view.up.sql @@ -43,6 +43,7 @@ SELECT j.cache_ttl, j.priority, NULL::TEXT AS logs, + j.preprocessed, j.script_entrypoint_override FROM v2_job_queue q JOIN v2_job j USING (id) diff --git a/backend/migrations/20250201145631_v2_completed_job_compatibility_view.up.sql b/backend/migrations/20250201145631_v2_completed_job_compatibility_view.up.sql index d4c013397f8dd..bb57e0337a091 100644 --- a/backend/migrations/20250201145631_v2_completed_job_compatibility_view.up.sql +++ b/backend/migrations/20250201145631_v2_completed_job_compatibility_view.up.sql @@ -34,6 +34,7 @@ SELECT j.tag, j.priority, NULL::TEXT AS logs, + j.preprocessed, result_columns FROM v2_job_completed c JOIN v2_job j USING (id) diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index 60f0bcf30eaec..4e949ce8ef1b5 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -11908,6 +11908,8 @@ components: type: number suspend: type: number + preprocessed: + type: boolean required: - id - running @@ -12015,6 +12017,8 @@ components: type: number aggregate_wait_time_ms: type: number + preprocessed: + type: boolean required: - id - created_by diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 4c89054304c2e..d4b6d0b856ad9 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -720,7 +720,7 @@ macro_rules! get_job_query { CASE WHEN args is null or pg_column_size(args) < 90000 THEN args ELSE '{{\"reason\": \"WINDMILL_TOO_BIG\"}}'::jsonb END as args, \ {logs} as logs, {code} as raw_code, canceled, canceled_by, canceled_reason, job_kind, \ schedule_path, permissioned_as, flow_status, {flow} as raw_flow, is_flow_step, language, \ - {lock} as raw_lock, email, visible_to_owner, mem_peak, tag, priority, {additional_fields} \ + {lock} as raw_lock, email, visible_to_owner, mem_peak, tag, priority, preprocessed, {additional_fields} \ FROM {table} LEFT JOIN job_logs ON id = job_id \ WHERE id = $1 AND {table}.workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3)) LIMIT 1", table = $table, @@ -2646,6 +2646,7 @@ pub struct UnifiedJob { pub labels: Option, pub self_wait_time_ms: Option, pub aggregate_wait_time_ms: Option, + pub preprocessed: Option, } const CJ_FIELDS: &[&str] = &[ @@ -2683,6 +2684,7 @@ const CJ_FIELDS: &[&str] = &[ "result->'wm_labels' as labels", "self_wait_time_ms", "aggregate_wait_time_ms", + "preprocessed", ]; const QJ_FIELDS: &[&str] = &[ "'QueuedJob' as typ", @@ -2719,6 +2721,7 @@ const QJ_FIELDS: &[&str] = &[ "null as labels", "self_wait_time_ms", "aggregate_wait_time_ms", + "preprocessed", ]; impl UnifiedJob { @@ -2768,6 +2771,7 @@ impl<'a> From for Job { tag: uj.tag, priority: uj.priority, labels: uj.labels, + preprocessed: uj.preprocessed, }, )), "QueuedJob" => Job::QueuedJob(JobExtended::new( @@ -2812,6 +2816,7 @@ impl<'a> From for Job { flow_step_id: None, cache_ttl: None, priority: uj.priority, + preprocessed: uj.preprocessed, }, )), t => panic!("job type {} not valid", t), diff --git a/backend/windmill-common/src/jobs.rs b/backend/windmill-common/src/jobs.rs index be06cdd22199c..05594d3a58812 100644 --- a/backend/windmill-common/src/jobs.rs +++ b/backend/windmill-common/src/jobs.rs @@ -117,6 +117,8 @@ pub struct QueuedJob { pub cache_ttl: Option, #[serde(skip_serializing_if = "Option::is_none")] pub priority: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub preprocessed: Option, } impl QueuedJob { @@ -187,6 +189,7 @@ impl Default for QueuedJob { flow_step_id: None, cache_ttl: None, priority: None, + preprocessed: None, } } } @@ -237,6 +240,8 @@ pub struct CompletedJob { pub priority: Option, #[serde(skip_serializing_if = "Option::is_none")] pub labels: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub preprocessed: Option, } impl CompletedJob { diff --git a/backend/windmill-common/src/worker.rs b/backend/windmill-common/src/worker.rs index 42a26f0ff06c9..4bbd777cbfe65 100644 --- a/backend/windmill-common/src/worker.rs +++ b/backend/windmill-common/src/worker.rs @@ -131,7 +131,7 @@ fn format_pull_query(peek: String) -> String { flow_innermost_root_job AS root_job, flow_step_id, flow_step_id IS NOT NULL AS is_flow_step, same_worker, pre_run_error, visible_to_owner, tag, concurrent_limit, concurrency_time_window_s, timeout, cache_ttl, priority, raw_code, raw_lock, - raw_flow, script_entrypoint_override + raw_flow, preprocessed, script_entrypoint_override FROM v2_job WHERE id = (SELECT id FROM peek) ) SELECT id, workspace_id, parent_job, created_by, created_at, started_at, scheduled_for, @@ -140,8 +140,7 @@ fn format_pull_query(peek: String) -> String { flow_status, is_flow_step, language, suspend, suspend_until, same_worker, pre_run_error, email, visible_to_owner, mem_peak, root_job, flow_leaf_jobs as leaf_jobs, tag, concurrent_limit, concurrency_time_window_s, - timeout, flow_step_id, cache_ttl, priority, - raw_code, raw_lock, raw_flow, + timeout, flow_step_id, cache_ttl, priority, raw_code, raw_lock, raw_flow, preprocessed, script_entrypoint_override FROM q, r, j LEFT JOIN v2_job_status f USING (id)", diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index b4c18a1b7d05d..4a18da0aef396 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -2957,6 +2957,7 @@ pub async fn push<'c, 'd>( } } + let mut preprocessed = None; let ( script_hash, script_path, @@ -2986,6 +2987,7 @@ pub async fn push<'c, 'd>( } => { let extra = args.extra.get_or_insert_with(HashMap::new); if apply_preprocessor { + preprocessed = Some(false); extra.insert( ENTRYPOINT_OVERRIDE.to_string(), to_raw_value(&PREPROCESSOR_FAKE_ENTRYPOINT), @@ -3404,6 +3406,7 @@ pub async fn push<'c, 'd>( status.preprocessor_module = None; extra.remove("wm_trigger"); } else { + preprocessed = Some(false); extra.entry("wm_trigger".to_string()).or_insert_with(|| { to_raw_value(&serde_json::json!({ "kind": "webhook", @@ -3722,16 +3725,10 @@ pub async fn push<'c, 'd>( }; let raw_flow = raw_flow.map(Json); - let preprocessed = if args.args.contains_key("wm_trigger") - || args - .extra - .as_ref() - .map_or(false, |x| x.contains_key("wm_trigger")) - { - Some(false) - } else { - None - }; + let preprocessed = preprocessed.or_else(|| match flow_step_id.as_deref() { + Some("preprocessor") => Some(false), + _ => None, + }); let script_entrypoint_override: Option = match args.args.get(ENTRYPOINT_OVERRIDE) { Some(x) => Some(x.clone()), None => args diff --git a/frontend/src/lib/components/runs/PreprocessedArgsDisplay.svelte b/frontend/src/lib/components/runs/PreprocessedArgsDisplay.svelte index 27ab382b518ca..03ce501164861 100644 --- a/frontend/src/lib/components/runs/PreprocessedArgsDisplay.svelte +++ b/frontend/src/lib/components/runs/PreprocessedArgsDisplay.svelte @@ -9,15 +9,14 @@ // import { copyToClipboard } from '$lib/utils' // import { deepEqual } from 'fast-equals' - export let flowStatus: any + export let preprocessed: boolean | undefined // $: args = // '_metadata' in flowStatus && 'original_args' in flowStatus['_metadata'] // ? flowStatus['_metadata']['original_args'] // : undefined - $: hasPreprocessedArgs = - '_metadata' in flowStatus && !!flowStatus['_metadata']['preprocessed_args'] + $: hasPreprocessedArgs = preprocessed === true // $: argsStr = args !== undefined ? JSON.stringify(args, null, 4) : undefined diff --git a/frontend/src/routes/(root)/(logged)/run/[...run]/+page.svelte b/frontend/src/routes/(root)/(logged)/run/[...run]/+page.svelte index b8dcfdbbef9b2..7d9d8da47c63f 100644 --- a/frontend/src/routes/(root)/(logged)/run/[...run]/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/run/[...run]/+page.svelte @@ -726,7 +726,7 @@ {/if} {#if job && job.flow_status && job.job_kind === 'script'} - + {/if} {#if persistentScriptDefinition}