Skip to content

Commit

Permalink
fixup! backend: use v2 tables through views where possible (v2 phase 3)
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Feb 4, 2025
1 parent 5150e8e commit 7a84e2b
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 34 deletions.
2 changes: 1 addition & 1 deletion backend/migrations/20250201124743_v2_job_queue_sync.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ CREATE OR REPLACE FUNCTION v2_job_queue_after_insert() RETURNS TRIGGER AS $$ BEG
NEW.__language, NEW.__args->>'_ENTRYPOINT_OVERRIDE',
NULL, NEW.__flow_step_id, NEW.__root_job,
NEW.__schedule_path, CASE WHEN NEW.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
CASE WHEN NEW.__args->>'wm_trigger' IS NOT NULL THEN FALSE END,
CASE WHEN NEW.__args->'wm_trigger'->>'kind' = 'webhook' THEN FALSE END,
NEW.tag, NEW.__same_worker, NEW.__visible_to_owner, NEW.__concurrent_limit, NEW.__concurrency_time_window_s,
NEW.__cache_ttl, NEW.__timeout, NEW.priority,
NEW.__args, NEW.__pre_run_error,
Expand Down
31 changes: 20 additions & 11 deletions backend/migrations/20250201124744_v2_job_completed_sync.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ BEGIN
-- New columns synchronization:
-- 1. `result_columns` <-> `flow_status._metadata.column_order`
-- 2. `v2_job.labels` <-> `result.wm_labels`
-- 3. `v2_job.preprocessed` <-> flow_status._metadata.preprocessed_args`
IF NEW.__created_by IS NULL THEN
-- v2 -> v1
-- When inserting to `v2_job_completed` from `v2` code, set `v1` columns:
Expand Down Expand Up @@ -37,17 +38,6 @@ BEGIN
NEW.__visible_to_owner := job.visible_to_owner;
NEW.__tag := job.tag;
NEW.__priority := job.priority;
IF job.preprocessed = TRUE AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN
NEW.flow_status := jsonb_set(
coalesce(NEW.flow_status, '{}'::JSONB),
'{_metadata}',
jsonb_set(
coalesce(NEW.flow_status->'_metadata', '{}'::JSONB),
'{preprocessed_args}',
'true'::JSONB
)
);
END IF;
-- 1. `result_columns` -> `flow_status._metadata.column_order`
IF NEW.result_columns IS NOT NULL AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN
NEW.flow_status := jsonb_set(
Expand Down Expand Up @@ -83,6 +73,18 @@ BEGIN
to_jsonb(final_labels)
);
END IF;
-- 3. `v2_job.preprocessed` -> flow_status._metadata.preprocessed_args`
IF job.preprocessed = TRUE AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN
NEW.flow_status := jsonb_set(
coalesce(NEW.flow_status, '{}'::JSONB),
'{_metadata}',
jsonb_set(
coalesce(NEW.flow_status->'_metadata', '{}'::JSONB),
'{preprocessed_args}',
'true'::JSONB
)
);
END IF;
ELSE
-- v1 -> v2
NEW.completed_at := now();
Expand Down Expand Up @@ -114,6 +116,13 @@ BEGIN
)
WHERE id = NEW.id;
END IF;
-- 3. `v2_job.preprocessed` <- flow_status._metadata.preprocessed_args`
IF NEW.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN
UPDATE v2_job SET
args = NEW.__args,
preprocessed = TRUE
WHERE id = NEW.id AND preprocessed = FALSE;
END IF;
END IF;
RETURN NEW;
END $$ LANGUAGE plpgsql;
Expand Down
8 changes: 4 additions & 4 deletions backend/migrations/20250201124748_v2_migrate_from_v1.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ INSERT INTO v2_job (
__language, __args->>'_ENTRYPOINT_OVERRIDE',
__flow_step_id, __root_job,
__schedule_path, CASE WHEN __schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
CASE WHEN __args->>'wm_trigger' IS NOT NULL THEN FALSE END,
CASE WHEN __args->'wm_trigger'->>'kind' = 'webhook' THEN FALSE END,
tag, __same_worker, __visible_to_owner, __concurrent_limit, __concurrency_time_window_s,
__cache_ttl, __timeout, priority,
__args, __pre_run_error,
Expand All @@ -58,7 +58,7 @@ INSERT INTO v2_job (
__schedule_path, CASE WHEN __schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
CASE
WHEN flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE
WHEN __args->>'wm_trigger' IS NOT NULL THEN FALSE
WHEN __args->'wm_trigger'->>'kind' = 'webhook' THEN FALSE
END,
__tag, __visible_to_owner, __priority,
__args,
Expand Down Expand Up @@ -93,7 +93,7 @@ UPDATE v2_job SET
flow_innermost_root_job = v2_job_queue.__root_job,
trigger = v2_job_queue.__schedule_path,
trigger_kind = CASE WHEN v2_job_queue.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
preprocessed = CASE WHEN v2_job_queue.__args->>'wm_trigger' IS NOT NULL THEN FALSE END,
preprocessed = CASE WHEN v2_job_queue.__args->'wm_trigger'->>'kind' = 'webhook' THEN FALSE END,
tag = v2_job_queue.tag,
same_worker = v2_job_queue.__same_worker,
visible_to_owner = v2_job_queue.__visible_to_owner,
Expand Down Expand Up @@ -126,7 +126,7 @@ UPDATE v2_job SET
trigger_kind = CASE WHEN v2_job_completed.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
preprocessed = CASE
WHEN v2_job_completed.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE
WHEN v2_job_completed.__args->>'wm_trigger' IS NOT NULL THEN FALSE
WHEN v2_job_completed.__args->'wm_trigger'->>'kind' = 'webhook' THEN FALSE
END,
tag = v2_job_completed.__tag,
visible_to_owner = v2_job_completed.__visible_to_owner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ SELECT
j.cache_ttl,
j.priority,
NULL::TEXT AS logs,
j.preprocessed,
j.script_entrypoint_override
FROM v2_job_queue q
JOIN v2_job j USING (id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ SELECT
j.tag,
j.priority,
NULL::TEXT AS logs,
j.preprocessed,
result_columns
FROM v2_job_completed c
JOIN v2_job j USING (id)
Expand Down
4 changes: 4 additions & 0 deletions backend/windmill-api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11908,6 +11908,8 @@ components:
type: number
suspend:
type: number
preprocessed:
type: boolean
required:
- id
- running
Expand Down Expand Up @@ -12015,6 +12017,8 @@ components:
type: number
aggregate_wait_time_ms:
type: number
preprocessed:
type: boolean
required:
- id
- created_by
Expand Down
7 changes: 6 additions & 1 deletion backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ macro_rules! get_job_query {
CASE WHEN args is null or pg_column_size(args) < 90000 THEN args ELSE '{{\"reason\": \"WINDMILL_TOO_BIG\"}}'::jsonb END as args, \
{logs} as logs, {code} as raw_code, canceled, canceled_by, canceled_reason, job_kind, \
schedule_path, permissioned_as, flow_status, {flow} as raw_flow, is_flow_step, language, \
{lock} as raw_lock, email, visible_to_owner, mem_peak, tag, priority, {additional_fields} \
{lock} as raw_lock, email, visible_to_owner, mem_peak, tag, priority, preprocessed, {additional_fields} \
FROM {table} LEFT JOIN job_logs ON id = job_id \
WHERE id = $1 AND {table}.workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3)) LIMIT 1",
table = $table,
Expand Down Expand Up @@ -2646,6 +2646,7 @@ pub struct UnifiedJob {
pub labels: Option<serde_json::Value>,
pub self_wait_time_ms: Option<i64>,
pub aggregate_wait_time_ms: Option<i64>,
pub preprocessed: Option<bool>,
}

const CJ_FIELDS: &[&str] = &[
Expand Down Expand Up @@ -2683,6 +2684,7 @@ const CJ_FIELDS: &[&str] = &[
"result->'wm_labels' as labels",
"self_wait_time_ms",
"aggregate_wait_time_ms",
"preprocessed",
];
const QJ_FIELDS: &[&str] = &[
"'QueuedJob' as typ",
Expand Down Expand Up @@ -2719,6 +2721,7 @@ const QJ_FIELDS: &[&str] = &[
"null as labels",
"self_wait_time_ms",
"aggregate_wait_time_ms",
"preprocessed",
];

impl UnifiedJob {
Expand Down Expand Up @@ -2768,6 +2771,7 @@ impl<'a> From<UnifiedJob> for Job {
tag: uj.tag,
priority: uj.priority,
labels: uj.labels,
preprocessed: uj.preprocessed,
},
)),
"QueuedJob" => Job::QueuedJob(JobExtended::new(
Expand Down Expand Up @@ -2812,6 +2816,7 @@ impl<'a> From<UnifiedJob> for Job {
flow_step_id: None,
cache_ttl: None,
priority: uj.priority,
preprocessed: uj.preprocessed,
},
)),
t => panic!("job type {} not valid", t),
Expand Down
5 changes: 5 additions & 0 deletions backend/windmill-common/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub struct QueuedJob {
pub cache_ttl: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub priority: Option<i16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub preprocessed: Option<bool>,
}

impl QueuedJob {
Expand Down Expand Up @@ -187,6 +189,7 @@ impl Default for QueuedJob {
flow_step_id: None,
cache_ttl: None,
priority: None,
preprocessed: None,
}
}
}
Expand Down Expand Up @@ -237,6 +240,8 @@ pub struct CompletedJob {
pub priority: Option<i16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub labels: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub preprocessed: Option<bool>,
}

impl CompletedJob {
Expand Down
5 changes: 2 additions & 3 deletions backend/windmill-common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn format_pull_query(peek: String) -> String {
flow_innermost_root_job AS root_job, flow_step_id, flow_step_id IS NOT NULL AS is_flow_step,
same_worker, pre_run_error, visible_to_owner, tag, concurrent_limit,
concurrency_time_window_s, timeout, cache_ttl, priority, raw_code, raw_lock,
raw_flow, script_entrypoint_override
raw_flow, preprocessed, script_entrypoint_override
FROM v2_job
WHERE id = (SELECT id FROM peek)
) SELECT id, workspace_id, parent_job, created_by, created_at, started_at, scheduled_for,
Expand All @@ -140,8 +140,7 @@ fn format_pull_query(peek: String) -> String {
flow_status, is_flow_step, language, suspend, suspend_until,
same_worker, pre_run_error, email, visible_to_owner, mem_peak,
root_job, flow_leaf_jobs as leaf_jobs, tag, concurrent_limit, concurrency_time_window_s,
timeout, flow_step_id, cache_ttl, priority,
raw_code, raw_lock, raw_flow,
timeout, flow_step_id, cache_ttl, priority, raw_code, raw_lock, raw_flow, preprocessed,
script_entrypoint_override
FROM q, r, j
LEFT JOIN v2_job_status f USING (id)",
Expand Down
17 changes: 7 additions & 10 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2957,6 +2957,7 @@ pub async fn push<'c, 'd>(
}
}

let mut preprocessed = None;
let (
script_hash,
script_path,
Expand Down Expand Up @@ -2986,6 +2987,7 @@ pub async fn push<'c, 'd>(
} => {
let extra = args.extra.get_or_insert_with(HashMap::new);
if apply_preprocessor {
preprocessed = Some(false);
extra.insert(
ENTRYPOINT_OVERRIDE.to_string(),
to_raw_value(&PREPROCESSOR_FAKE_ENTRYPOINT),
Expand Down Expand Up @@ -3404,6 +3406,7 @@ pub async fn push<'c, 'd>(
status.preprocessor_module = None;
extra.remove("wm_trigger");
} else {
preprocessed = Some(false);
extra.entry("wm_trigger".to_string()).or_insert_with(|| {
to_raw_value(&serde_json::json!({
"kind": "webhook",
Expand Down Expand Up @@ -3722,16 +3725,10 @@ pub async fn push<'c, 'd>(
};

let raw_flow = raw_flow.map(Json);
let preprocessed = if args.args.contains_key("wm_trigger")
|| args
.extra
.as_ref()
.map_or(false, |x| x.contains_key("wm_trigger"))
{
Some(false)
} else {
None
};
let preprocessed = preprocessed.or_else(|| match flow_step_id.as_deref() {
Some("preprocessor") => Some(false),
_ => None,
});
let script_entrypoint_override: Option<String> = match args.args.get(ENTRYPOINT_OVERRIDE) {
Some(x) => Some(x.clone()),
None => args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@
// import { copyToClipboard } from '$lib/utils'
// import { deepEqual } from 'fast-equals'
export let flowStatus: any
export let preprocessed: boolean | undefined
// $: args =
// '_metadata' in flowStatus && 'original_args' in flowStatus['_metadata']
// ? flowStatus['_metadata']['original_args']
// : undefined
$: hasPreprocessedArgs =
'_metadata' in flowStatus && !!flowStatus['_metadata']['preprocessed_args']
$: hasPreprocessedArgs = preprocessed === true
// $: argsStr = args !== undefined ? JSON.stringify(args, null, 4) : undefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@
</div>
{/if}
{#if job && job.flow_status && job.job_kind === 'script'}
<PreprocessedArgsDisplay flowStatus={job.flow_status} />
<PreprocessedArgsDisplay preprocessed={job.preprocessed} />
{/if}
{#if persistentScriptDefinition}
<button on:click={() => persistentScriptDrawer.open?.(persistentScriptDefinition)}
Expand Down

0 comments on commit 7a84e2b

Please sign in to comment.