Skip to content

Commit

Permalink
fixup! backend: use v2 tables through views where possible (v2 phase 3)
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Feb 3, 2025
1 parent 88cbba0 commit 5150e8e
Show file tree
Hide file tree
Showing 22 changed files with 245 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ ALTER TABLE v2_job_completed
DROP COLUMN status CASCADE,
DROP COLUMN completed_at CASCADE,
DROP COLUMN worker CASCADE,
DROP COLUMN preprocessed_args CASCADE,
DROP COLUMN workflow_as_code_status CASCADE,
DROP COLUMN result_columns CASCADE,
DROP COLUMN extras CASCADE;
Expand Down
1 change: 0 additions & 1 deletion backend/migrations/20250201124423_v2_job_completed.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ ALTER TABLE v2_job_completed
ADD COLUMN IF NOT EXISTS status job_status,
ADD COLUMN IF NOT EXISTS completed_at TIMESTAMP WITH TIME ZONE,
ADD COLUMN IF NOT EXISTS worker VARCHAR(255),
ADD COLUMN IF NOT EXISTS preprocessed_args BOOL NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS workflow_as_code_status JSONB,
ADD COLUMN IF NOT EXISTS result_columns TEXT[],
ADD COLUMN IF NOT EXISTS extras JSONB;
Expand Down
1 change: 1 addition & 0 deletions backend/migrations/20250201124431_v2_job.down.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ALTER TABLE v2_job
DROP COLUMN flow_innermost_root_job CASCADE,
DROP COLUMN trigger CASCADE,
DROP COLUMN trigger_kind CASCADE,
DROP COLUMN preprocessed CASCADE,
DROP COLUMN same_worker CASCADE,
DROP COLUMN visible_to_owner CASCADE,
DROP COLUMN concurrent_limit CASCADE,
Expand Down
1 change: 1 addition & 0 deletions backend/migrations/20250201124431_v2_job.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ALTER TABLE v2_job
ADD COLUMN IF NOT EXISTS flow_innermost_root_job UUID,
ADD COLUMN IF NOT EXISTS trigger VARCHAR(255),
ADD COLUMN IF NOT EXISTS trigger_kind job_trigger_kind,
ADD COLUMN IF NOT EXISTS preprocessed BOOLEAN,
ADD COLUMN IF NOT EXISTS same_worker BOOLEAN DEFAULT FALSE NOT NULL,
ADD COLUMN IF NOT EXISTS visible_to_owner BOOLEAN DEFAULT TRUE NOT NULL,
ADD COLUMN IF NOT EXISTS concurrent_limit INTEGER,
Expand Down
14 changes: 8 additions & 6 deletions backend/migrations/20250201124743_v2_job_queue_sync.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ BEGIN
NEW.__args = jsonb_set(
coalesce(NEW.__args, '{}'::JSONB),
'{_ENTRYPOINT_OVERRIDE}',
job.script_entrypoint_override
to_jsonb(job.script_entrypoint_override)
);
END IF;
RETURN NEW;
Expand All @@ -74,7 +74,7 @@ CREATE OR REPLACE FUNCTION v2_job_queue_after_insert() RETURNS TRIGGER AS $$ BEG
kind, runnable_id, runnable_path, parent_job,
script_lang, script_entrypoint_override,
flow_step, flow_step_id, flow_innermost_root_job,
trigger, trigger_kind,
trigger, trigger_kind, preprocessed,
tag, same_worker, visible_to_owner, concurrent_limit, concurrency_time_window_s, cache_ttl, timeout, priority,
args, pre_run_error,
raw_code, raw_lock, raw_flow
Expand All @@ -84,6 +84,7 @@ CREATE OR REPLACE FUNCTION v2_job_queue_after_insert() RETURNS TRIGGER AS $$ BEG
NEW.__language, NEW.__args->>'_ENTRYPOINT_OVERRIDE',
NULL, NEW.__flow_step_id, NEW.__root_job,
NEW.__schedule_path, CASE WHEN NEW.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
CASE WHEN NEW.__args->>'wm_trigger' IS NOT NULL THEN FALSE END,
NEW.tag, NEW.__same_worker, NEW.__visible_to_owner, NEW.__concurrent_limit, NEW.__concurrency_time_window_s,
NEW.__cache_ttl, NEW.__timeout, NEW.priority,
NEW.__args, NEW.__pre_run_error,
Expand All @@ -107,6 +108,7 @@ CREATE OR REPLACE FUNCTION v2_job_queue_after_insert() RETURNS TRIGGER AS $$ BEG
trigger_kind = EXCLUDED.trigger_kind,
tag = EXCLUDED.tag,
same_worker = EXCLUDED.same_worker,
preprocessed = EXCLUDED.preprocessed,
visible_to_owner = EXCLUDED.visible_to_owner,
concurrent_limit = EXCLUDED.concurrent_limit,
concurrency_time_window_s = EXCLUDED.concurrency_time_window_s,
Expand Down Expand Up @@ -150,9 +152,9 @@ CREATE OR REPLACE FUNCTION v2_job_queue_before_update() RETURNS TRIGGER AS $$ BE
NEW.__canceled := TRUE;
END IF;
-- `v2_job`: Only `args` are updated
IF NEW.__args::text IS DISTINCT FROM OLD.__args::text THEN
IF NEW.__args::TEXT IS DISTINCT FROM OLD.__args::TEXT THEN
UPDATE v2_job
SET args = NEW.__args
SET args = NEW.__args, preprocessed = CASE WHEN preprocessed = FALSE THEN TRUE END
WHERE id = NEW.id;
END IF;
-- `v2_job_runtime`:
Expand All @@ -165,8 +167,8 @@ CREATE OR REPLACE FUNCTION v2_job_queue_before_update() RETURNS TRIGGER AS $$ BE
;
END IF;
-- `v2_job_status`:
IF NEW.__flow_status::text IS DISTINCT FROM OLD.__flow_status::text OR
NEW.__leaf_jobs::text IS DISTINCT FROM OLD.__leaf_jobs::text THEN
IF NEW.__flow_status::TEXT IS DISTINCT FROM OLD.__flow_status::TEXT OR
NEW.__leaf_jobs::TEXT IS DISTINCT FROM OLD.__leaf_jobs::TEXT THEN
IF NEW.__is_flow_step = false AND NEW.__parent_job IS NOT NULL THEN
-- `workflow_as_code`:
INSERT INTO v2_job_status (id, workflow_as_code_status)
Expand Down
21 changes: 7 additions & 14 deletions backend/migrations/20250201124744_v2_job_completed_sync.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ DECLARE job v2_job;
DECLARE final_labels TEXT[];
BEGIN
-- New columns synchronization:
-- 1. `preprocessed_args` <-> `flow_status._metadata.preprocessed_args`
-- 2. `result_columns` <-> `flow_status._metadata.column_order`
-- 3. `v2_job.labels` <-> `result.wm_labels`
-- 1. `result_columns` <-> `flow_status._metadata.column_order`
-- 2. `v2_job.labels` <-> `result.wm_labels`
IF NEW.__created_by IS NULL THEN
-- v2 -> v1
-- When inserting to `v2_job_completed` from `v2` code, set `v1` columns:
Expand Down Expand Up @@ -38,8 +37,7 @@ BEGIN
NEW.__visible_to_owner := job.visible_to_owner;
NEW.__tag := job.tag;
NEW.__priority := job.priority;
-- 1. `preprocessed_args` -> `flow_status._metadata.preprocessed_args`
IF NEW.preprocessed_args = true AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN
IF job.preprocessed = TRUE AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN
NEW.flow_status := jsonb_set(
coalesce(NEW.flow_status, '{}'::JSONB),
'{_metadata}',
Expand All @@ -50,7 +48,7 @@ BEGIN
)
);
END IF;
-- 2. `result_columns` -> `flow_status._metadata.column_order`
-- 1. `result_columns` -> `flow_status._metadata.column_order`
IF NEW.result_columns IS NOT NULL AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN
NEW.flow_status := jsonb_set(
coalesce(NEW.flow_status, '{}'::JSONB),
Expand All @@ -62,7 +60,7 @@ BEGIN
)
);
END IF;
-- 3. `v2_job.labels` -> `result.wm_labels`
-- 2. `v2_job.labels` -> `result.wm_labels`
IF job.labels IS NOT NULL AND (NEW.result IS NULL OR jsonb_typeof(NEW.result) = 'object') THEN
IF jsonb_typeof(NEW.result->'wm_labels') = 'array' AND (
SELECT bool_and(jsonb_typeof(elem) = 'string')
Expand Down Expand Up @@ -94,19 +92,14 @@ BEGIN
WHEN NEW.__success THEN 'success'::job_status
ELSE 'failure'::job_status
END;
-- 1. `preprocessed_args` <- `flow_status._metadata.preprocessed_args`
NEW.preprocessed_args := CASE
WHEN NEW.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE
ELSE FALSE
END;
-- 2. `result_columns` <- `flow_status._metadata.column_order`
-- 1. `result_columns` <- `flow_status._metadata.column_order`
IF jsonb_typeof(NEW.flow_status->'_metadata'->'column_order') = 'array' AND (
SELECT bool_and(jsonb_typeof(elem) = 'string')
FROM jsonb_array_elements(NEW.flow_status->'_metadata'->'column_order') AS elem
) THEN
NEW.result_columns := translate(NEW.flow_status->'_metadata'->>'column_order', '[]', '{}')::TEXT[];
END IF;
-- 3. `v2_job.labels` <- `result.wm_labels`
-- 2. `v2_job.labels` <- `result.wm_labels`
IF jsonb_typeof(NEW.result->'wm_labels') = 'array' AND (
SELECT bool_and(jsonb_typeof(elem) = 'string')
FROM jsonb_array_elements(NEW.result->'wm_labels') AS elem
Expand Down
18 changes: 12 additions & 6 deletions backend/migrations/20250201124748_v2_migrate_from_v1.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ SET completed_at = started_at + (interval '1 millisecond' * duration_ms),
WHEN __success THEN 'success'::job_status
ELSE 'failure'::job_status
END,
preprocessed_args = CASE
WHEN flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE
ELSE FALSE
END,
result_columns = CASE
WHEN jsonb_typeof(flow_status->'_metadata'->'column_order') = 'array' AND (
SELECT bool_and(jsonb_typeof(elem) = 'string')
Expand All @@ -27,7 +23,7 @@ INSERT INTO v2_job (
kind, runnable_id, runnable_path, parent_job,
script_lang, script_entrypoint_override,
flow_step_id, flow_innermost_root_job,
trigger, trigger_kind,
trigger, trigger_kind, preprocessed,
tag, same_worker, visible_to_owner, concurrent_limit, concurrency_time_window_s, cache_ttl, timeout, priority,
args, pre_run_error,
raw_code, raw_lock, raw_flow
Expand All @@ -37,6 +33,7 @@ INSERT INTO v2_job (
__language, __args->>'_ENTRYPOINT_OVERRIDE',
__flow_step_id, __root_job,
__schedule_path, CASE WHEN __schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
CASE WHEN __args->>'wm_trigger' IS NOT NULL THEN FALSE END,
tag, __same_worker, __visible_to_owner, __concurrent_limit, __concurrency_time_window_s,
__cache_ttl, __timeout, priority,
__args, __pre_run_error,
Expand All @@ -49,7 +46,7 @@ INSERT INTO v2_job (
id, workspace_id, created_at, created_by, permissioned_as, permissioned_as_email,
kind, runnable_id, runnable_path, parent_job,
script_lang, script_entrypoint_override,
trigger, trigger_kind,
trigger, trigger_kind, preprocessed,
tag, visible_to_owner, priority,
args,
raw_code, raw_lock, raw_flow,
Expand All @@ -59,6 +56,10 @@ INSERT INTO v2_job (
__job_kind, __script_hash, __script_path, __parent_job,
__language, __args->>'_ENTRYPOINT_OVERRIDE',
__schedule_path, CASE WHEN __schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
CASE
WHEN flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE
WHEN __args->>'wm_trigger' IS NOT NULL THEN FALSE
END,
__tag, __visible_to_owner, __priority,
__args,
__raw_code, __raw_lock, __raw_flow,
Expand Down Expand Up @@ -92,6 +93,7 @@ UPDATE v2_job SET
flow_innermost_root_job = v2_job_queue.__root_job,
trigger = v2_job_queue.__schedule_path,
trigger_kind = CASE WHEN v2_job_queue.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
preprocessed = CASE WHEN v2_job_queue.__args->>'wm_trigger' IS NOT NULL THEN FALSE END,
tag = v2_job_queue.tag,
same_worker = v2_job_queue.__same_worker,
visible_to_owner = v2_job_queue.__visible_to_owner,
Expand Down Expand Up @@ -122,6 +124,10 @@ UPDATE v2_job SET
script_entrypoint_override = v2_job_completed.__args->>'_ENTRYPOINT_OVERRIDE',
trigger = v2_job_completed.__schedule_path,
trigger_kind = CASE WHEN v2_job_completed.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
preprocessed = CASE
WHEN v2_job_completed.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE
WHEN v2_job_completed.__args->>'wm_trigger' IS NOT NULL THEN FALSE
END,
tag = v2_job_completed.__tag,
visible_to_owner = v2_job_completed.__visible_to_owner,
priority = v2_job_completed.__priority,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
-- Add down migration script here
LOCK TABLE v2_job_completed IN ACCESS EXCLUSIVE MODE;
UPDATE v2_job_completed SET started_at = completed_at WHERE started_at IS NULL;
ALTER TABLE v2_job_completed ALTER COLUMN status DROP NOT NULL;
ALTER TABLE v2_job_completed ALTER COLUMN completed_at DROP DEFAULT;
ALTER TABLE v2_job_completed ALTER COLUMN completed_at DROP NOT NULL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ SELECT
j.flow_step_id,
j.cache_ttl,
j.priority,
NULL::TEXT AS logs
NULL::TEXT AS logs,
j.script_entrypoint_override
FROM v2_job_queue q
JOIN v2_job j USING (id)
LEFT JOIN v2_job_runtime r USING (id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ SELECT
c.memory_peak AS mem_peak,
j.tag,
j.priority,
NULL::TEXT AS logs
NULL::TEXT AS logs,
result_columns
FROM v2_job_completed c
JOIN v2_job j USING (id)
;
56 changes: 26 additions & 30 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,15 +680,16 @@ macro_rules! get_job_query {
("v2_as_completed_job", $($opts:tt)*) => {
get_job_query!(
@impl "v2_as_completed_job", ($($opts)*),
"duration_ms, success, result, deleted, is_skipped, result->'wm_labels' as labels, \
"duration_ms, success, result, result_columns, deleted, is_skipped, result->'wm_labels' as labels, \
CASE WHEN result is null or pg_column_size(result) < 90000 THEN result ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as result",
)
};
("v2_as_queue", $($opts:tt)*) => {
get_job_query!(
@impl "v2_as_queue", ($($opts)*),
"scheduled_for, running, last_ping, suspend, suspend_until, same_worker, pre_run_error, visible_to_owner, \
root_job, leaf_jobs, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id, cache_ttl",
root_job, leaf_jobs, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id, cache_ttl,\
script_entrypoint_override",
)
};
(@impl $table:literal, (with_logs: $with_logs:expr, $($rest:tt)*), $additional_fields:literal, $($args:tt)*) => {
Expand Down Expand Up @@ -2748,6 +2749,7 @@ impl<'a> From<UnifiedJob> for Job {
script_path: uj.script_path,
args: None,
result: None,
result_columns: None,
logs: None,
flow_status: None,
deleted: uj.deleted,
Expand Down Expand Up @@ -2794,6 +2796,7 @@ impl<'a> From<UnifiedJob> for Job {
permissioned_as: uj.permissioned_as,
is_flow_step: uj.is_flow_step,
language: uj.language,
script_entrypoint_override: None,
same_worker: false,
pre_run_error: None,
email: uj.email,
Expand Down Expand Up @@ -3552,10 +3555,9 @@ pub async fn run_wait_result_internal(
let row = sqlx::query!(
"SELECT
result AS \"result: sqlx::types::Json<Box<RawValue>>\",
language AS \"language: ScriptLang\",
flow_status AS \"flow_status: sqlx::types::Json<Box<RawValue>>\",
success AS \"success!\"
FROM v2_as_completed_job
result_columns,
status = 'success' AS \"success!\"
FROM v2_job_completed
WHERE id = $1 AND workspace_id = $2",
uuid,
&w_id
Expand All @@ -3564,8 +3566,7 @@ pub async fn run_wait_result_internal(
.await?;
if let Some(mut raw_result) = row {
format_result(
raw_result.language.as_ref(),
raw_result.flow_status.as_ref(),
raw_result.result_columns.as_ref(),
raw_result.result.as_mut(),
);
result = raw_result.result.map(|x| x.0);
Expand Down Expand Up @@ -5521,8 +5522,7 @@ async fn get_completed_job<'a>(
#[derive(FromRow)]
pub struct RawResult {
pub result: Option<sqlx::types::Json<Box<RawValue>>>,
pub flow_status: Option<sqlx::types::Json<Box<RawValue>>>,
pub language: Option<ScriptLang>,
pub result_columns: Option<Vec<String>>,
pub created_by: Option<String>,
}

Expand All @@ -5541,11 +5541,11 @@ async fn get_completed_job_result(
RawResult,
"SELECT
result #> $3 AS \"result: sqlx::types::Json<Box<RawValue>>\",
flow_status AS \"flow_status: sqlx::types::Json<Box<RawValue>>\",
language AS \"language: ScriptLang\",
result_columns,
created_by AS \"created_by!\"
FROM v2_as_completed_job
WHERE id = $1 AND workspace_id = $2 AND ($4::text[] IS NULL OR tag = ANY($4))",
FROM v2_job_completed c
JOIN v2_job USING (id)
WHERE c.id = $1 AND c.workspace_id = $2 AND ($4::text[] IS NULL OR tag = ANY($4))",
id,
&w_id,
json_path.split(".").collect::<Vec<_>>() as Vec<&str>,
Expand All @@ -5558,11 +5558,11 @@ async fn get_completed_job_result(
RawResult,
"SELECT
result AS \"result: sqlx::types::Json<Box<RawValue>>\",
flow_status AS \"flow_status: sqlx::types::Json<Box<RawValue>>\",
language AS \"language: ScriptLang\",
result_columns,
created_by AS \"created_by!\"
FROM v2_as_completed_job
WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))",
FROM v2_job_completed c
JOIN v2_job USING (id)
WHERE c.id = $1 AND c.workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))",
id,
&w_id,
tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>,
Expand Down Expand Up @@ -5612,8 +5612,7 @@ async fn get_completed_job_result(
}

format_result(
raw_result.language.as_ref(),
raw_result.flow_status.as_ref(),
raw_result.result_columns.as_ref(),
raw_result.result.as_mut(),
);

Expand Down Expand Up @@ -5685,12 +5684,13 @@ async fn get_completed_job_result_maybe(
.flatten();
let result_o = sqlx::query!(
"SELECT
result AS \"result: sqlx::types::Json<Box<RawValue>>\", success AS \"success!\",
language AS \"language: ScriptLang\",
flow_status AS \"flow_status: sqlx::types::Json<Box<RawValue>>\",
result AS \"result: sqlx::types::Json<Box<RawValue>>\",
result_columns,
status = 'success' AS \"success!\",
created_by AS \"created_by!\"
FROM v2_as_completed_job
WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))",
FROM v2_job_completed c
JOIN v2_job j USING (id)
WHERE c.id = $1 AND c.workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))",
id,
&w_id,
tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>,
Expand All @@ -5699,11 +5699,7 @@ async fn get_completed_job_result_maybe(
.await?;

if let Some(mut res) = result_o {
format_result(
res.language.as_ref(),
res.flow_status.as_ref(),
res.result.as_mut(),
);
format_result(res.result_columns.as_ref(), res.result.as_mut());
if opt_authed.is_none() && res.created_by != "anonymous" {
return Err(Error::BadRequest(
"As a non logged in user, you can only see jobs ran by anonymous users".to_string(),
Expand Down
Loading

0 comments on commit 5150e8e

Please sign in to comment.