Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Feb 3, 2025
1 parent 88cbba0 commit c9bc516
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 147 deletions.
1 change: 1 addition & 0 deletions backend/migrations/20250201124345_v2_job_status.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ CREATE TABLE IF NOT EXISTS v2_job_status (
-- Flow status fields:
flow_status JSONB,
flow_leaf_jobs JSONB,
flow_preprocessed_args JSONB,
-- Workflow as code fields:
workflow_as_code_status JSONB
);
Expand Down
2 changes: 1 addition & 1 deletion backend/migrations/20250201124423_v2_job_completed.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ALTER TABLE v2_job_completed
ADD COLUMN IF NOT EXISTS status job_status,
ADD COLUMN IF NOT EXISTS completed_at TIMESTAMP WITH TIME ZONE,
ADD COLUMN IF NOT EXISTS worker VARCHAR(255),
ADD COLUMN IF NOT EXISTS preprocessed_args BOOL NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS preprocessed_args JSONB,
ADD COLUMN IF NOT EXISTS workflow_as_code_status JSONB,
ADD COLUMN IF NOT EXISTS result_columns TEXT[],
ADD COLUMN IF NOT EXISTS extras JSONB;
Expand Down
6 changes: 0 additions & 6 deletions backend/migrations/20250201124743_v2_job_queue_sync.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,6 @@ CREATE OR REPLACE FUNCTION v2_job_queue_before_update() RETURNS TRIGGER AS $$ BE
IF NEW.canceled_by IS NOT NULL THEN
NEW.__canceled := TRUE;
END IF;
-- `v2_job`: Only `args` are updated
IF NEW.__args::text IS DISTINCT FROM OLD.__args::text THEN
UPDATE v2_job
SET args = NEW.__args
WHERE id = NEW.id;
END IF;
-- `v2_job_runtime`:
IF NEW.__last_ping IS DISTINCT FROM OLD.__last_ping OR NEW.__mem_peak IS DISTINCT FROM OLD.__mem_peak THEN
INSERT INTO v2_job_runtime (id, ping, memory_peak)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ BEGIN
NEW.__tag := job.tag;
NEW.__priority := job.priority;
-- 1. `preprocessed_args` -> `flow_status._metadata.preprocessed_args`
IF NEW.preprocessed_args = true AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN
IF NEW.preprocessed_args IS NOT NULL AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object') THEN
NEW.flow_status := jsonb_set(
coalesce(NEW.flow_status, '{}'::JSONB),
'{_metadata}',
Expand Down Expand Up @@ -96,8 +96,7 @@ BEGIN
END;
-- 1. `preprocessed_args` <- `flow_status._metadata.preprocessed_args`
NEW.preprocessed_args := CASE
WHEN NEW.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE
ELSE FALSE
WHEN NEW.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN NEW.__args
END;
-- 2. `result_columns` <- `flow_status._metadata.column_order`
IF jsonb_typeof(NEW.flow_status->'_metadata'->'column_order') = 'array' AND (
Expand Down
3 changes: 1 addition & 2 deletions backend/migrations/20250201124748_v2_migrate_from_v1.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ SET completed_at = started_at + (interval '1 millisecond' * duration_ms),
ELSE 'failure'::job_status
END,
preprocessed_args = CASE
WHEN flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE
ELSE FALSE
WHEN flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN __args
END,
result_columns = CASE
WHEN jsonb_typeof(flow_status->'_metadata'->'column_order') = 'array' AND (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ SELECT
q.running,
j.runnable_id AS script_hash,
j.runnable_path AS script_path,
j.args,
COALESCE(s.flow_preprocessed_args, j.args) AS args,
j.raw_code,
q.canceled_by IS NOT NULL AS canceled,
q.canceled_by,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ SELECT
c.status = 'success' AS success,
j.runnable_id AS script_hash,
j.runnable_path AS script_path,
j.args,
COALESCE(c.preprocessed_args, j.args)
AS args,
c.result,
FALSE AS deleted,
j.raw_code,
Expand All @@ -33,7 +34,8 @@ SELECT
c.memory_peak AS mem_peak,
j.tag,
j.priority,
NULL::TEXT AS logs
NULL::TEXT AS logs,
result_columns
FROM v2_job_completed c
JOIN v2_job j USING (id)
;
52 changes: 23 additions & 29 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ macro_rules! get_job_query {
("v2_as_completed_job", $($opts:tt)*) => {
get_job_query!(
@impl "v2_as_completed_job", ($($opts)*),
"duration_ms, success, result, deleted, is_skipped, result->'wm_labels' as labels, \
"duration_ms, success, result, result_columns, deleted, is_skipped, result->'wm_labels' as labels, \
CASE WHEN result is null or pg_column_size(result) < 90000 THEN result ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as result",
)
};
Expand Down Expand Up @@ -2748,6 +2748,7 @@ impl<'a> From<UnifiedJob> for Job {
script_path: uj.script_path,
args: None,
result: None,
result_columns: None,
logs: None,
flow_status: None,
deleted: uj.deleted,
Expand Down Expand Up @@ -3552,10 +3553,9 @@ pub async fn run_wait_result_internal(
let row = sqlx::query!(
"SELECT
result AS \"result: sqlx::types::Json<Box<RawValue>>\",
language AS \"language: ScriptLang\",
flow_status AS \"flow_status: sqlx::types::Json<Box<RawValue>>\",
success AS \"success!\"
FROM v2_as_completed_job
result_columns,
status = 'success' AS \"success!\"
FROM v2_job_completed
WHERE id = $1 AND workspace_id = $2",
uuid,
&w_id
Expand All @@ -3564,8 +3564,7 @@ pub async fn run_wait_result_internal(
.await?;
if let Some(mut raw_result) = row {
format_result(
raw_result.language.as_ref(),
raw_result.flow_status.as_ref(),
raw_result.result_columns.as_ref(),
raw_result.result.as_mut(),
);
result = raw_result.result.map(|x| x.0);
Expand Down Expand Up @@ -5521,8 +5520,7 @@ async fn get_completed_job<'a>(
#[derive(FromRow)]
pub struct RawResult {
pub result: Option<sqlx::types::Json<Box<RawValue>>>,
pub flow_status: Option<sqlx::types::Json<Box<RawValue>>>,
pub language: Option<ScriptLang>,
pub result_columns: Option<Vec<String>>,
pub created_by: Option<String>,
}

Expand All @@ -5541,11 +5539,11 @@ async fn get_completed_job_result(
RawResult,
"SELECT
result #> $3 AS \"result: sqlx::types::Json<Box<RawValue>>\",
flow_status AS \"flow_status: sqlx::types::Json<Box<RawValue>>\",
language AS \"language: ScriptLang\",
result_columns,
created_by AS \"created_by!\"
FROM v2_as_completed_job
WHERE id = $1 AND workspace_id = $2 AND ($4::text[] IS NULL OR tag = ANY($4))",
FROM v2_job_completed c
JOIN v2_job USING (id)
WHERE c.id = $1 AND c.workspace_id = $2 AND ($4::text[] IS NULL OR tag = ANY($4))",
id,
&w_id,
json_path.split(".").collect::<Vec<_>>() as Vec<&str>,
Expand All @@ -5558,11 +5556,11 @@ async fn get_completed_job_result(
RawResult,
"SELECT
result AS \"result: sqlx::types::Json<Box<RawValue>>\",
flow_status AS \"flow_status: sqlx::types::Json<Box<RawValue>>\",
language AS \"language: ScriptLang\",
result_columns,
created_by AS \"created_by!\"
FROM v2_as_completed_job
WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))",
FROM v2_job_completed c
JOIN v2_job USING (id)
WHERE c.id = $1 AND c.workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))",
id,
&w_id,
tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>,
Expand Down Expand Up @@ -5612,8 +5610,7 @@ async fn get_completed_job_result(
}

format_result(
raw_result.language.as_ref(),
raw_result.flow_status.as_ref(),
raw_result.result_columns.as_ref(),
raw_result.result.as_mut(),
);

Expand Down Expand Up @@ -5685,12 +5682,13 @@ async fn get_completed_job_result_maybe(
.flatten();
let result_o = sqlx::query!(
"SELECT
result AS \"result: sqlx::types::Json<Box<RawValue>>\", success AS \"success!\",
language AS \"language: ScriptLang\",
flow_status AS \"flow_status: sqlx::types::Json<Box<RawValue>>\",
result AS \"result: sqlx::types::Json<Box<RawValue>>\",
result_columns,
status = 'success' AS \"success!\",
created_by AS \"created_by!\"
FROM v2_as_completed_job
WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))",
FROM v2_job_completed c
JOIN v2_job j USING (id)
WHERE c.id = $1 AND c.workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))",
id,
&w_id,
tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>,
Expand All @@ -5699,11 +5697,7 @@ async fn get_completed_job_result_maybe(
.await?;

if let Some(mut res) = result_o {
format_result(
res.language.as_ref(),
res.flow_status.as_ref(),
res.result.as_mut(),
);
format_result(res.result_columns.as_ref(), res.result.as_mut());
if opt_authed.is_none() && res.created_by != "anonymous" {
return Err(Error::BadRequest(
"As a non logged in user, you can only see jobs ran by anonymous users".to_string(),
Expand Down
47 changes: 10 additions & 37 deletions backend/windmill-common/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ pub struct CompletedJob {
pub args: Option<sqlx::types::Json<HashMap<String, Box<RawValue>>>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<sqlx::types::Json<Box<RawValue>>>,
pub result_columns: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub logs: Option<String>,
pub deleted: bool,
Expand Down Expand Up @@ -529,27 +530,17 @@ pub async fn get_payload_tag_from_prefixed_path(
Ok((payload, tag, on_behalf_of))
}

#[derive(Deserialize)]
struct FlowStatusMetadata {
column_order: Vec<String>,
}

#[derive(Deserialize)]
struct FlowStatusWithMetadataOnly {
_metadata: FlowStatusMetadata,
}

pub fn order_columns(
rows: Option<Vec<Box<RawValue>>>,
column_order: Vec<String>,
column_order: &Vec<String>,
) -> Option<Box<RawValue>> {
if let Some(mut rows) = rows {
if let Some(first_row) = rows.get(0) {
let first_row = serde_json::from_str::<HashMap<String, Box<RawValue>>>(first_row.get());
if let Ok(first_row) = first_row {
let mut new_first_row = IndexMap::new();
for col in column_order {
if let Some(val) = first_row.get(&col) {
if let Some(val) = first_row.get(col) {
new_first_row.insert(col.clone(), val.clone());
}
}
Expand All @@ -566,39 +557,21 @@ pub fn order_columns(
}

pub fn format_result(
language: Option<&ScriptLang>,
flow_status: Option<&sqlx::types::Json<Box<RawValue>>>,
result_columns: Option<&Vec<String>>,
result: Option<&mut sqlx::types::Json<Box<RawValue>>>,
) -> () {
match language {
Some(&ScriptLang::Postgresql)
| Some(&ScriptLang::Mysql)
| Some(&ScriptLang::Snowflake)
| Some(&ScriptLang::Bigquery) => {
if let Some(Ok(flow_status)) =
flow_status.map(|x| serde_json::from_str::<FlowStatusWithMetadataOnly>(x.get()))
{
if let Some(result) = result {
let rows = serde_json::from_str::<Vec<Box<RawValue>>>(result.get()).ok();
if let Some(ordered_result) =
order_columns(rows, flow_status._metadata.column_order)
{
*result = sqlx::types::Json(ordered_result);
}
}
if let Some(result_columns) = result_columns {
if let Some(result) = result {
let rows = serde_json::from_str::<Vec<Box<RawValue>>>(result.get()).ok();
if let Some(ordered_result) = order_columns(rows, result_columns) {
*result = sqlx::types::Json(ordered_result);
}
}
_ => {}
}
}

pub fn format_completed_job_result(mut cj: CompletedJob) -> CompletedJob {
format_result(
cj.language.as_ref(),
cj.flow_status.as_ref(),
cj.result.as_mut(),
);

format_result(cj.result_columns.as_ref(), cj.result.as_mut());
cj
}

Expand Down
Loading

0 comments on commit c9bc516

Please sign in to comment.