diff --git a/backend/.sqlx/query-202a4fee4e70ff74d150a187a8daec3609328b1e3fe6e4fac20fb6fe91ff1732.json b/backend/.sqlx/query-202a4fee4e70ff74d150a187a8daec3609328b1e3fe6e4fac20fb6fe91ff1732.json new file mode 100644 index 0000000000000..bab66359b6fc0 --- /dev/null +++ b/backend/.sqlx/query-202a4fee4e70ff74d150a187a8daec3609328b1e3fe6e4fac20fb6fe91ff1732.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT raw_flow->'failure_module' != 'null'::jsonb\n FROM queue_view\n WHERE id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + null + ] + }, + "hash": "202a4fee4e70ff74d150a187a8daec3609328b1e3fe6e4fac20fb6fe91ff1732" +} diff --git a/backend/.sqlx/query-337f31c2172194cd594042c561998a03f751b246c40daf056fced0fd91f6dd73.json b/backend/.sqlx/query-337f31c2172194cd594042c561998a03f751b246c40daf056fced0fd91f6dd73.json new file mode 100644 index 0000000000000..a23bb3827fc8a --- /dev/null +++ b/backend/.sqlx/query-337f31c2172194cd594042c561998a03f751b246c40daf056fced0fd91f6dd73.json @@ -0,0 +1,84 @@ +{ + "db_name": "PostgreSQL", + "query": "WITH uuid_table as (\n select unnest($11::uuid[]) as uuid\n )\n INSERT INTO queue \n (id, script_hash, script_path, job_kind, language, args, tag, created_by, permissioned_as, email, scheduled_for, workspace_id, concurrent_limit, concurrency_time_window_s, timeout, flow_status)\n (SELECT uuid, $1, $2, $3, $4, ('{ \"uuid\": \"' || uuid || '\" }')::jsonb, $5, $6, $7, $8, $9, $10, $12, $13, $14, $15 FROM uuid_table) \n RETURNING id", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Int8", + "Varchar", + { + "Custom": { + "name": "job_kind", + "kind": { + "Enum": [ + "script", + "preview", + "flow", + "dependencies", + "flowpreview", + "script_hub", + "identity", + "flowdependencies", + "http", + "graphql", + "postgresql", + "noop", + "appdependencies", + "deploymentcallback", + "singlescriptflow" + ] + } + } + }, + { + "Custom": { + "name": "script_lang", + "kind": { + "Enum": [ + "python3", + "deno", + "go", + "bash", + "postgresql", + "nativets", + "bun", + "mysql", + "bigquery", + "snowflake", + "graphql", + "powershell", + "mssql", + "php", + "bunnative", + "rust", + "ansible" + ] + } + } + }, + "Varchar", + "Varchar", + "Varchar", + "Varchar", + "Timestamptz", + "Varchar", + "UuidArray", + "Int4", + "Int4", + "Int4", + "Jsonb" + ] + }, + "nullable": [ + false + ] + }, + "hash": "337f31c2172194cd594042c561998a03f751b246c40daf056fced0fd91f6dd73" +} diff --git a/backend/.sqlx/query-4996b5af348bc335a5ec14496683cee096e37bdde7bb05c11ee0c4792f70dc60.json b/backend/.sqlx/query-4996b5af348bc335a5ec14496683cee096e37bdde7bb05c11ee0c4792f70dc60.json new file mode 100644 index 0000000000000..f21ea12278ccc --- /dev/null +++ b/backend/.sqlx/query-4996b5af348bc335a5ec14496683cee096e37bdde7bb05c11ee0c4792f70dc60.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT raw_flow AS \"raw_flow!: Json>\"\n FROM queue_view WHERE id = $1 AND workspace_id = $2 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "raw_flow!: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true + ] + }, + "hash": "4996b5af348bc335a5ec14496683cee096e37bdde7bb05c11ee0c4792f70dc60" +} diff --git a/backend/.sqlx/query-4b923c94f6adcc7a76e8073de5e46b116dba3211487c8408ce2777aafdf94a44.json b/backend/.sqlx/query-4b923c94f6adcc7a76e8073de5e46b116dba3211487c8408ce2777aafdf94a44.json new file mode 100644 index 0000000000000..9d14bc62cc394 --- /dev/null +++ b/backend/.sqlx/query-4b923c94f6adcc7a76e8073de5e46b116dba3211487c8408ce2777aafdf94a44.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO job (id, workspace_id, raw_code, raw_lock, raw_flow, tag)\n VALUES ($1, $2, $3, $4, $5, $6)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Varchar", + "Text", + "Text", + "Jsonb", + "Varchar" + ] + }, + "nullable": [] + }, + "hash": "4b923c94f6adcc7a76e8073de5e46b116dba3211487c8408ce2777aafdf94a44" +} diff --git a/backend/.sqlx/query-4bbf4dbf5b18d8dfd62c0a029ec4deb95a348dc804ee23a02698adb9aa32b375.json b/backend/.sqlx/query-4bbf4dbf5b18d8dfd62c0a029ec4deb95a348dc804ee23a02698adb9aa32b375.json new file mode 100644 index 0000000000000..8e540c3a90e2b --- /dev/null +++ b/backend/.sqlx/query-4bbf4dbf5b18d8dfd62c0a029ec4deb95a348dc804ee23a02698adb9aa32b375.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n flow_status AS \"flow_status!: Json>\",\n raw_flow->'modules'->(flow_status->'step')::int AS \"module: Json>\"\n FROM queue_view WHERE id = $1 AND workspace_id = $2 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "flow_status!: Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 1, + "name": "module: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + null + ] + }, + "hash": "4bbf4dbf5b18d8dfd62c0a029ec4deb95a348dc804ee23a02698adb9aa32b375" +} diff --git a/backend/.sqlx/query-63e54fe57ec439b68eead00a02209f81076c5317d590e1441b557555b4d7ad96.json b/backend/.sqlx/query-63e54fe57ec439b68eead00a02209f81076c5317d590e1441b557555b4d7ad96.json new file mode 100644 index 0000000000000..a30a87991e753 --- /dev/null +++ b/backend/.sqlx/query-63e54fe57ec439b68eead00a02209f81076c5317d590e1441b557555b4d7ad96.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "WITH uuid_table as (\n select gen_random_uuid() as uuid from generate_series(1, $5)\n )\n INSERT INTO job\n (id, workspace_id, raw_code, raw_lock, raw_flow)\n (SELECT uuid, $1, $2, $3, $4 FROM uuid_table)\n RETURNING id", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Varchar", + "Text", + "Text", + "Jsonb", + "Int4" + ] + }, + "nullable": [ + false + ] + }, + "hash": "63e54fe57ec439b68eead00a02209f81076c5317d590e1441b557555b4d7ad96" +} diff --git a/backend/.sqlx/query-6673aeb02e1c39616c6f38fd2bad841271bc08401573f4d1be8e8896cd77507b.json b/backend/.sqlx/query-6673aeb02e1c39616c6f38fd2bad841271bc08401573f4d1be8e8896cd77507b.json new file mode 100644 index 0000000000000..c231cf880ffa1 --- /dev/null +++ b/backend/.sqlx/query-6673aeb02e1c39616c6f38fd2bad841271bc08401573f4d1be8e8896cd77507b.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT raw_flow->'failure_module' != 'null'::jsonb\n FROM completed_job_view\n WHERE id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + null + ] + }, + "hash": "6673aeb02e1c39616c6f38fd2bad841271bc08401573f4d1be8e8896cd77507b" +} diff --git a/backend/.sqlx/query-a0a97ad8c36356407f13e6b429d923f59d18743efc77e44abc57632252488160.json b/backend/.sqlx/query-a0a97ad8c36356407f13e6b429d923f59d18743efc77e44abc57632252488160.json new file mode 100644 index 0000000000000..9b6db13f973d0 --- /dev/null +++ b/backend/.sqlx/query-a0a97ad8c36356407f13e6b429d923f59d18743efc77e44abc57632252488160.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT raw_flow AS \"raw_flow!: Json>\"\n FROM completed_job_view WHERE id = $1 AND workspace_id = $2 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "raw_flow!: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true + ] + }, + "hash": "a0a97ad8c36356407f13e6b429d923f59d18743efc77e44abc57632252488160" +} diff --git a/backend/.sqlx/query-a1a339c9e744b5661d2b4d1354e593b97b4e007fa9a499b49c3d1ebf9b645a3b.json b/backend/.sqlx/query-a1a339c9e744b5661d2b4d1354e593b97b4e007fa9a499b49c3d1ebf9b645a3b.json new file mode 100644 index 0000000000000..8d4654b6410b4 --- /dev/null +++ b/backend/.sqlx/query-a1a339c9e744b5661d2b4d1354e593b97b4e007fa9a499b49c3d1ebf9b645a3b.json @@ -0,0 +1,35 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json>\"\n FROM queue_view WHERE id = $1 AND workspace_id = $2 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "raw_code", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "raw_lock", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "raw_flow: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "a1a339c9e744b5661d2b4d1354e593b97b4e007fa9a499b49c3d1ebf9b645a3b" +} diff --git a/backend/.sqlx/query-c05f21e7027a985e5a657b689a0f5bedefcaa15b7887402c0d5d33cb7b5fb362.json b/backend/.sqlx/query-c05f21e7027a985e5a657b689a0f5bedefcaa15b7887402c0d5d33cb7b5fb362.json new file mode 100644 index 0000000000000..d108bb7203a9f --- /dev/null +++ b/backend/.sqlx/query-c05f21e7027a985e5a657b689a0f5bedefcaa15b7887402c0d5d33cb7b5fb362.json @@ -0,0 +1,35 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json>\"\n FROM queue_view WHERE id = $1 AND workspace_id = $2 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "raw_code", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "raw_lock", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "raw_flow: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "c05f21e7027a985e5a657b689a0f5bedefcaa15b7887402c0d5d33cb7b5fb362" +} diff --git a/backend/.sqlx/query-e0e895527d6807699c918ca87e0b016751c9b5a1dab632dc4abdb1dd6aa1fcbc.json b/backend/.sqlx/query-e0e895527d6807699c918ca87e0b016751c9b5a1dab632dc4abdb1dd6aa1fcbc.json new file mode 100644 index 0000000000000..841a7e840244b --- /dev/null +++ b/backend/.sqlx/query-e0e895527d6807699c918ca87e0b016751c9b5a1dab632dc4abdb1dd6aa1fcbc.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT raw_flow AS \"raw_flow!: Json>\"\n FROM completed_job_view WHERE id = $1 AND workspace_id = $2 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "raw_flow!: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true + ] + }, + "hash": "e0e895527d6807699c918ca87e0b016751c9b5a1dab632dc4abdb1dd6aa1fcbc" +} diff --git a/backend/.sqlx/query-f5c94d89eb8c86916d7c64ffa8ea2de1feecbfde74e9a1374b3668ec72d7bac1.json b/backend/.sqlx/query-f5c94d89eb8c86916d7c64ffa8ea2de1feecbfde74e9a1374b3668ec72d7bac1.json new file mode 100644 index 0000000000000..53302e6616cec --- /dev/null +++ b/backend/.sqlx/query-f5c94d89eb8c86916d7c64ffa8ea2de1feecbfde74e9a1374b3668ec72d7bac1.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT raw_flow->'modules'->($1)::text->'value'->>'type' = 'flow' FROM queue_view WHERE id = $2 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "?column?", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Text", + "Uuid" + ] + }, + "nullable": [ + null + ] + }, + "hash": "f5c94d89eb8c86916d7c64ffa8ea2de1feecbfde74e9a1374b3668ec72d7bac1" +} diff --git a/backend/migrations/20241106112544_create_job_table_and_views.down.sql b/backend/migrations/20241106112544_create_job_table_and_views.down.sql new file mode 100644 index 0000000000000..1ab1094b41a11 --- /dev/null +++ b/backend/migrations/20241106112544_create_job_table_and_views.down.sql @@ -0,0 +1,3 @@ +DROP VIEW queue_view CASCADE; +DROP VIEW completed_job_view CASCADE; +DROP TABLE job CASCADE; diff --git a/backend/migrations/20241106112544_create_job_table_and_views.up.sql b/backend/migrations/20241106112544_create_job_table_and_views.up.sql new file mode 100644 index 0000000000000..de68d32960a6e --- /dev/null +++ b/backend/migrations/20241106112544_create_job_table_and_views.up.sql @@ -0,0 +1,97 @@ +CREATE TABLE job ( + id UUID PRIMARY KEY, + raw_code TEXT, + raw_lock TEXT, + raw_flow jsonb NULL, + tag VARCHAR(50), + workspace_id VARCHAR(50) +); + +CREATE OR REPLACE VIEW queue_view AS +SELECT + queue.id, + queue.workspace_id, + queue.parent_job, + queue.created_by, + queue.created_at, + queue.started_at, + queue.scheduled_for, + queue.running, + queue.script_hash, + queue.script_path, + queue.args, + concat(coalesce(queue.logs, ''), coalesce(job_logs.logs, '')) as logs, + coalesce(queue.raw_code, job.raw_code) as raw_code, + queue.canceled, + queue.canceled_by, + queue.canceled_reason, + queue.last_ping, + queue.job_kind, + queue.env_id, + queue.schedule_path, + queue.permissioned_as, + queue.flow_status, + coalesce(queue.raw_flow, job.raw_flow) as raw_flow, + queue.is_flow_step, + queue.language, + queue.suspend, + queue.suspend_until, + queue.same_worker, + coalesce(queue.raw_lock, job.raw_lock) as raw_lock, + queue.pre_run_error, + queue.email, + queue.visible_to_owner, + queue.mem_peak, + queue.root_job, + queue.leaf_jobs, + queue.tag, + queue.concurrent_limit, + queue.concurrency_time_window_s, + queue.timeout, + queue.flow_step_id, + queue.cache_ttl, + queue.priority, + job_logs.log_offset +FROM queue +LEFT JOIN job ON queue.id = job.id AND queue.workspace_id = job.workspace_id +LEFT JOIN job_logs ON queue.id = job_logs.job_id; + +CREATE OR REPLACE VIEW completed_job_view AS +SELECT + completed_job.id, + completed_job.workspace_id, + completed_job.parent_job, + completed_job.created_by, + completed_job.created_at, + completed_job.duration_ms, + completed_job.success, + completed_job.script_hash, + completed_job.script_path, + completed_job.args, + completed_job.result, + concat(coalesce(completed_job.logs, ''), coalesce(job_logs.logs, '')) as logs, + completed_job.deleted, + coalesce(completed_job.raw_code, job.raw_code) as raw_code, + completed_job.canceled, + completed_job.canceled_by, + completed_job.canceled_reason, + completed_job.job_kind, + completed_job.env_id, + completed_job.schedule_path, + completed_job.permissioned_as, + completed_job.flow_status, + coalesce(completed_job.raw_flow, job.raw_flow) as raw_flow, + completed_job.is_flow_step, + completed_job.language, + completed_job.started_at, + completed_job.is_skipped, + coalesce(completed_job.raw_lock, job.raw_lock) as raw_lock, + completed_job.email, + completed_job.visible_to_owner, + completed_job.mem_peak, + completed_job.tag, + completed_job.priority, + job_logs.log_offset +FROM completed_job +LEFT JOIN job ON completed_job.id = job.id AND completed_job.workspace_id = job.workspace_id +LEFT JOIN job_logs ON completed_job.id = job_logs.job_id; diff --git a/backend/migrations/20241118060250_update_raw_flow_in_views.down.sql b/backend/migrations/20241118060250_update_raw_flow_in_views.down.sql new file mode 100644 index 0000000000000..f9cbb137e2627 --- /dev/null +++ b/backend/migrations/20241118060250_update_raw_flow_in_views.down.sql @@ -0,0 +1,30 @@ +-- Add down migration script here +DO $$ +DECLARE + t TEXT; +BEGIN + FOR t IN VALUES ('queue'), ('completed_job') LOOP + EXECUTE format( + 'CREATE OR REPLACE VIEW '||t||'_view AS + SELECT %s, job_logs.log_offset FROM '||t||' + LEFT JOIN job ON '||t||'.id = job.id AND '||t||'.workspace_id = job.workspace_id + LEFT JOIN job_logs ON '||t||'.id = job_logs.job_id', ( + SELECT string_agg( + CASE + WHEN column_name = 'logs' THEN + -- Concatenate logs from base and job_logs. + 'concat(coalesce('||t||'.logs, ''''), coalesce(job_logs.logs, '''')) as logs' + WHEN column_name = 'raw_code' OR column_name = 'raw_lock' OR column_name = 'raw_flow' THEN + -- Coalesce column from base and job. + format('coalesce('||t||'.%s, job.%s) as %s', column_name, column_name, column_name) + ELSE + format('%s.%s', t, column_name) + END, + ', ' + ) + FROM information_schema.columns + WHERE table_name = t + ) + ); + END LOOP; +END $$; diff --git a/backend/migrations/20241118060250_update_raw_flow_in_views.up.sql b/backend/migrations/20241118060250_update_raw_flow_in_views.up.sql new file mode 100644 index 0000000000000..11bd29ff2e455 --- /dev/null +++ b/backend/migrations/20241118060250_update_raw_flow_in_views.up.sql @@ -0,0 +1,33 @@ +-- Add up migration script here +DO $$ +DECLARE + t TEXT; +BEGIN + FOR t IN VALUES ('queue'), ('completed_job') LOOP + EXECUTE format( + 'CREATE OR REPLACE VIEW '||t||'_view AS + SELECT %s, job_logs.log_offset FROM '||t||' + LEFT JOIN job ON '||t||'.id = job.id AND '||t||'.workspace_id = job.workspace_id + LEFT JOIN job_logs ON '||t||'.id = job_logs.job_id', ( + SELECT string_agg( + CASE + WHEN column_name = 'logs' THEN + -- Concatenate logs from base and job_logs. + 'concat(coalesce('||t||'.logs, ''''), coalesce(job_logs.logs, '''')) as logs' + WHEN column_name = 'raw_code' OR column_name = 'raw_lock' THEN + -- Coalesce column from base and job. + format('coalesce('||t||'.%s, job.%s) as %s', column_name, column_name, column_name) + WHEN column_name = 'raw_flow' THEN + -- When job_kind is 'flow', get raw_flow from flow table, otherwise get raw_flow from base or job. + 'CASE WHEN '||t||'.job_kind = ''flow'' THEN (SELECT value FROM flow WHERE flow.path = '||t||'.script_path AND flow.workspace_id = '||t||'.workspace_id) ELSE coalesce('||t||'.raw_flow, job.raw_flow) END as raw_flow' + ELSE + format('%s.%s', t, column_name) + END, + ', ' + ) + FROM information_schema.columns + WHERE table_name = t + ) + ); + END LOOP; +END $$; diff --git a/backend/migrations/20241118072014_create_inline_script.down.sql b/backend/migrations/20241118072014_create_inline_script.down.sql new file mode 100644 index 0000000000000..4c17a932966fe --- /dev/null +++ b/backend/migrations/20241118072014_create_inline_script.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS inline_script; \ No newline at end of file diff --git a/backend/migrations/20241118072014_create_inline_script.up.sql b/backend/migrations/20241118072014_create_inline_script.up.sql new file mode 100644 index 0000000000000..d16d96a2cd34c --- /dev/null +++ b/backend/migrations/20241118072014_create_inline_script.up.sql @@ -0,0 +1,13 @@ +-- Add up migration script here +ALTER TYPE JOB_KIND ADD VALUE IF NOT EXISTS 'inlinescript'; + +CREATE TABLE inline_script ( + workspace_id VARCHAR(50) NOT NULL REFERENCES workspace(id), + flow VARCHAR(255) NOT NULL, + lock TEXT, + path TEXT, + hash BIGINT NOT NULL, + content TEXT NOT NULL, + PRIMARY KEY (flow, hash), + FOREIGN KEY (flow, workspace_id) REFERENCES flow (path, workspace_id) ON DELETE CASCADE +); diff --git a/backend/windmill-api/src/flows.rs b/backend/windmill-api/src/flows.rs index f02f91dc41262..8d24bd0a646ad 100644 --- a/backend/windmill-api/src/flows.rs +++ b/backend/windmill-api/src/flows.rs @@ -39,7 +39,7 @@ use windmill_common::HUB_BASE_URL; use windmill_common::{ db::UserDB, error::{self, to_anyhow, Error, JsonResult, Result}, - flows::{Flow, FlowWithStarred, ListFlowQuery, ListableFlow, NewFlow}, + flows::{resolve, Flow, FlowWithStarred, ListFlowQuery, ListableFlow, NewFlow}, jobs::JobPayload, schedule::Schedule, scripts::Schema, @@ -98,7 +98,7 @@ async fn list_search_flows( let n = 3; let mut tx = user_db.begin(&authed).await?; - let rows = sqlx::query_as::<_, SearchFlow>( + let mut rows = sqlx::query_as::<_, SearchFlow>( "SELECT flow.path, flow_version.value FROM flow LEFT JOIN flow_version ON flow_version.id = flow.versions[array_upper(flow.versions, 1)] @@ -110,6 +110,9 @@ async fn list_search_flows( .await? .into_iter() .collect::>(); + for row in &mut rows { + resolve(&mut *tx, &w_id, &mut row.value.0).await; + } tx.commit().await?; Ok(Json(rows)) } @@ -578,11 +581,12 @@ async fn get_flow_version( WHERE flow.path = $1 AND flow.workspace_id = $2 AND flow_version.id = $3", ) .bind(path) - .bind(w_id) + .bind(&w_id) .bind(version) .fetch_optional(&mut *tx) .await?; + let flow = resolve_flow_value_if_some(&mut *tx, &w_id, flow, |f| &mut f.value).await; tx.commit().await?; let flow = not_found_if_none(flow, "Flow version", version.to_string())?; @@ -942,7 +946,7 @@ async fn get_flow_by_path( WHERE flow.path = $1 AND flow.workspace_id = $2" ) .bind(path) - .bind(w_id) + .bind(&w_id) .bind(&authed.username) .fetch_optional(&mut *tx) .await? @@ -954,10 +958,11 @@ async fn get_flow_by_path( WHERE flow.path = $1 AND flow.workspace_id = $2" ) .bind(path) - .bind(w_id) + .bind(&w_id) .fetch_optional(&mut *tx) .await? }; + let flow_o = resolve_flow_value_if_some(&mut *tx, &w_id, flow_o, |f| &mut f.flow.value).await; tx.commit().await?; let flow = not_found_if_none(flow_o, "Flow", path)?; @@ -1004,9 +1009,10 @@ async fn get_flow_by_path_w_draft( WHERE flow.path = $1 AND flow.workspace_id = $2", ) .bind(path) - .bind(w_id) + .bind(&w_id) .fetch_optional(&mut *tx) .await?; + let flow_o = resolve_flow_value_if_some(&mut *tx, &w_id, flow_o, |f| &mut f.value).await; tx.commit().await?; let flow = not_found_if_none(flow_o, "Flow", path)?; @@ -1485,3 +1491,14 @@ mod tests { assert_eq!(Some(81 * SECOND), retry.max_interval()); } } + +async fn resolve_flow_value_if_some( + conn: &mut sqlx::PgConnection, + workspace_id: &str, + maybe: Option, + f: impl FnOnce(&mut T) -> &mut Box +) -> Option { + let Some(mut value) = maybe else { return None; }; + resolve(conn, workspace_id, f(&mut value)).await; + Some(value) +} diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 3c7cea1584f4a..7fe23e4a165de 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -66,7 +66,7 @@ use windmill_common::{ db::UserDB, error::{self, to_anyhow, Error}, flow_status::{Approval, FlowStatus, FlowStatusModule}, - flows::{add_virtual_items_if_necessary, FlowValue}, + flows::FlowValue, jobs::{script_path_to_payload, CompletedJob, JobKind, JobPayload, QueuedJob, RawCode}, oauth2::HmacSha256, scripts::{ScriptHash, ScriptLang}, @@ -683,23 +683,18 @@ async fn get_job( } lazy_static::lazy_static! { - static ref GET_COMPLETED_JOB_QUERY_NO_LOGS: String = generate_get_job_query(true, "completed_job"); - static ref GET_COMPLETED_JOB_QUERY: String = generate_get_job_query(false, "completed_job"); - static ref GET_QUEUED_JOB_QUERY_NO_LOGS: String = generate_get_job_query(true, "queue"); - static ref GET_QUEUED_JOB_QUERY: String = generate_get_job_query(false, "queue"); + static ref GET_COMPLETED_JOB_QUERY_NO_LOGS: String = generate_get_job_query(true, "completed_job_view"); + static ref GET_COMPLETED_JOB_QUERY: String = generate_get_job_query(false, "completed_job_view"); + static ref GET_QUEUED_JOB_QUERY_NO_LOGS: String = generate_get_job_query(true, "queue_view"); + static ref GET_QUEUED_JOB_QUERY: String = generate_get_job_query(false, "queue_view"); } fn generate_get_job_query(no_logs: bool, table: &str) -> String { let log_expr = if no_logs { "null".to_string() } else { - format!("right(concat(coalesce({table}.logs, ''), job_logs.logs), 20000)") + format!("right({table}.logs, 20000)") }; - let join = if no_logs { - "".to_string() - } else { - format!("LEFT JOIN job_logs ON {table}.id = job_logs.job_id") - }; - let additional_fields = if table == "completed_job" { + let additional_fields = if table == "completed_job_view" { "duration_ms, success, result, @@ -733,7 +728,6 @@ fn generate_get_job_query(no_logs: bool, table: &str) -> String { schedule_path, permissioned_as, flow_status, raw_flow, is_flow_step, language, raw_lock, email, visible_to_owner, mem_peak, tag, priority, {additional_fields} FROM {table} - {join} WHERE id = $1 AND {table}.workspace_id = $2"); } pub async fn get_queued_job_ex( @@ -2306,6 +2300,14 @@ pub struct JobExtended { #[serde(flatten)] inner: T, + // Imported from `JobDefinition`: + #[serde(skip_serializing_if = "Option::is_none")] + pub raw_code: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub raw_lock: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub raw_flow: Option>>, + #[sqlx(skip)] #[serde(skip_serializing_if = "Option::is_none")] pub self_wait_time_ms: Option, @@ -2316,7 +2318,7 @@ pub struct JobExtended { impl JobExtended { pub fn new(self_wait_time_ms: Option, aggregate_wait_time_ms: Option, inner: T) -> Self { - Self { inner, self_wait_time_ms, aggregate_wait_time_ms } + Self { inner, raw_code: None, raw_lock: None, raw_flow: None, self_wait_time_ms, aggregate_wait_time_ms } } } @@ -2636,12 +2638,10 @@ impl<'a> From for Job { deleted: uj.deleted, canceled: uj.canceled, canceled_by: uj.canceled_by, - raw_code: None, canceled_reason: None, job_kind: uj.job_kind, schedule_path: uj.schedule_path, permissioned_as: uj.permissioned_as, - raw_flow: None, is_flow_step: uj.is_flow_step, language: uj.language, is_skipped: uj.is_skipped, @@ -2666,8 +2666,6 @@ impl<'a> From for Job { scheduled_for: uj.scheduled_for.unwrap(), logs: None, flow_status: None, - raw_code: None, - raw_lock: None, canceled: uj.canceled, canceled_by: uj.canceled_by, canceled_reason: None, @@ -2675,7 +2673,6 @@ impl<'a> From for Job { job_kind: uj.job_kind, schedule_path: uj.schedule_path, permissioned_as: uj.permissioned_as, - raw_flow: None, is_flow_step: uj.is_flow_step, language: uj.language, same_worker: false, @@ -3144,15 +3141,15 @@ pub async fn run_workflow_as_code( } let job = not_found_if_none(job, "Queued Job", &job_id.to_string())?; - let JobExtended { inner: job, .. } = job; + let JobExtended { inner: job, raw_code, raw_lock, .. } = job; let (job_payload, tag, _delete_after_use, timeout) = match job.job_kind { JobKind::Preview => ( JobPayload::Code(RawCode { hash: None, - content: job.raw_code.unwrap_or_default(), + content: raw_code.unwrap_or_default(), path: job.script_path, language: job.language.unwrap_or_else(|| ScriptLang::Deno), - lock: job.raw_lock, + lock: raw_lock, custom_concurrency_key: windmill_queue::custom_concurrency_key(&db, job.id) .await .map_err(to_anyhow)?, @@ -4434,7 +4431,7 @@ async fn add_batch_jobs( } } "flow" => { - let (mut value, job_kind, path) = if let Some(value) = batch_info.flow_value { + let (value, job_kind, path) = if let Some(value) = batch_info.flow_value { (value, JobKind::FlowPreview, None) } else if let Some(path) = batch_info.path { let value_json = sqlx::query!( @@ -4459,7 +4456,6 @@ async fn add_batch_jobs( "Path is required if no value is not provided" ))? }; - add_virtual_items_if_necessary(&mut value.modules); let flow_status = FlowStatus::new(&value); ( None, // script_hash @@ -4514,11 +4510,24 @@ async fn add_batch_jobs( let uuids = sqlx::query_scalar!( r#"WITH uuid_table as ( - select gen_random_uuid() as uuid from generate_series(1, $11) + select gen_random_uuid() as uuid from generate_series(1, $5) + ) + INSERT INTO job + (id, workspace_id, raw_code, raw_lock, raw_flow) + (SELECT uuid, $1, $2, $3, $4 FROM uuid_table) + RETURNING id"#, + w_id, raw_code, raw_lock, raw_flow.map(sqlx::types::Json) as Option>, n + ) + .fetch_all(&db) + .await?; + + let uuids = sqlx::query_scalar!( + r#"WITH uuid_table as ( + select unnest($11::uuid[]) as uuid ) INSERT INTO queue - (id, script_hash, script_path, job_kind, language, args, tag, created_by, permissioned_as, email, scheduled_for, workspace_id, concurrent_limit, concurrency_time_window_s, timeout, raw_code, raw_lock, raw_flow, flow_status) - (SELECT uuid, $1, $2, $3, $4, ('{ "uuid": "' || uuid || '" }')::jsonb, $5, $6, $7, $8, $9, $10, $12, $13, $14, $15, $16, $17, $18 FROM uuid_table) + (id, script_hash, script_path, job_kind, language, args, tag, created_by, permissioned_as, email, scheduled_for, workspace_id, concurrent_limit, concurrency_time_window_s, timeout, flow_status) + (SELECT uuid, $1, $2, $3, $4, ('{ "uuid": "' || uuid || '" }')::jsonb, $5, $6, $7, $8, $9, $10, $12, $13, $14, $15 FROM uuid_table) RETURNING id"#, hash.map(|h| h.0), path, @@ -4530,13 +4539,10 @@ async fn add_batch_jobs( authed.email, Utc::now(), w_id, - n, + &uuids, concurrent_limit, concurrent_time_window_s, timeout, - raw_code, - raw_lock, - raw_flow.map(sqlx::types::Json) as Option>, flow_status.map(sqlx::types::Json) as Option> ) .fetch_all(&db) diff --git a/backend/windmill-common/src/flows.rs b/backend/windmill-common/src/flows.rs index 5d7e1b4621528..39f8fcb3f5e7a 100644 --- a/backend/windmill-common/src/flows.rs +++ b/backend/windmill-common/src/flows.rs @@ -14,12 +14,15 @@ use std::{ use rand::Rng; use serde::{Deserialize, Serialize, Serializer}; +use sqlx::types::Json; +use sqlx::types::JsonRawValue; use crate::{ error::Error, more_serde::{default_empty_string, default_id, default_null, default_true, is_default}, scripts::{Schema, ScriptHash, ScriptLang}, }; +use crate::worker::to_raw_value; #[derive(Serialize, Deserialize, sqlx::FromRow)] pub struct Flow { @@ -27,7 +30,7 @@ pub struct Flow { pub path: String, pub summary: String, pub description: String, - pub value: sqlx::types::Json>, + pub value: Json>, pub edited_by: String, pub edited_at: chrono::DateTime, pub archived: bool, @@ -480,6 +483,24 @@ pub enum FlowModuleValue { #[serde(skip_serializing_if = "Option::is_none")] is_trigger: Option, }, + InlineScript { + #[serde(default)] + #[serde(alias = "input_transform", serialize_with = "ordered_map")] + input_transforms: HashMap, + flow_path: String, + hash: ScriptHash, // reference to `inline_script` table on (flow.path, hash) + #[serde(skip_serializing_if = "is_none_or_empty")] + tag: Option, + language: ScriptLang, + #[serde(skip_serializing_if = "Option::is_none")] + custom_concurrency_key: Option, + #[serde(skip_serializing_if = "Option::is_none")] + concurrent_limit: Option, + #[serde(skip_serializing_if = "Option::is_none")] + concurrency_time_window_s: Option, + #[serde(skip_serializing_if = "Option::is_none")] + is_trigger: Option, + }, Identity, } @@ -494,6 +515,7 @@ struct UntaggedFlowModuleValue { #[serde(alias = "input_transform")] input_transforms: Option>, path: Option, + flow_path: Option, hash: Option, tag_override: Option, iterator: Option, @@ -583,6 +605,23 @@ impl<'de> Deserialize<'de> for FlowModuleValue { concurrency_time_window_s: untagged.concurrency_time_window_s, is_trigger: untagged.is_trigger, }), + "inlinescript" => Ok(FlowModuleValue::InlineScript { + input_transforms: untagged.input_transforms.unwrap_or_default(), + flow_path: untagged + .flow_path + .ok_or_else(|| serde::de::Error::missing_field("flow_path"))?, + hash: untagged + .hash + .ok_or_else(|| serde::de::Error::missing_field("hash"))?, + tag: untagged.tag, + language: untagged + .language + .ok_or_else(|| serde::de::Error::missing_field("language"))?, + custom_concurrency_key: untagged.custom_concurrency_key, + concurrent_limit: untagged.concurrent_limit, + concurrency_time_window_s: untagged.concurrency_time_window_s, + is_trigger: untagged.is_trigger, + }), "identity" => Ok(FlowModuleValue::Identity), other => Err(serde::de::Error::unknown_variant( other, @@ -657,14 +696,14 @@ pub async fn has_failure_module<'c>(flow: sqlx::types::Uuid, db: &sqlx::Pool'failure_module' != 'null'::jsonb - FROM completed_job + FROM completed_job_view WHERE id = $1", flow ) } else { sqlx::query_scalar!( "SELECT raw_flow->'failure_module' != 'null'::jsonb - FROM queue + FROM queue_view WHERE id = $1", flow ) @@ -678,3 +717,61 @@ pub async fn has_failure_module<'c>(flow: sqlx::types::Uuid, db: &sqlx::Pool +) { + let Ok(mut val) = serde_json::from_str::(value.get()) else { return }; + for module in &mut val.modules { + resolve_module(conn, workspace_id, &mut module.value).await; + } + *value = to_raw_value(&val); +} + +/// Resolve loadable module values recursively. +pub async fn resolve_module( + conn: &mut sqlx::PgConnection, + workspace_id: &str, + value: &mut Box +) { + use FlowModuleValue::*; + + let Ok(mut val) = serde_json::from_str::(value.get()) else { return }; + match &mut val { + InlineScript { .. } => { + let InlineScript { + input_transforms, flow_path, hash, tag, language, + custom_concurrency_key, concurrent_limit, concurrency_time_window_s, is_trigger + } = std::mem::replace(&mut val, Identity) else { unreachable!() }; + let Ok((content, lock, path)) = sqlx::query!( + "SELECT content, lock, path FROM inline_script WHERE hash = $1 AND flow = $2 AND \ + workspace_id = $3", + hash.0, flow_path, workspace_id + ) + .fetch_one(conn) + .await + .map(|record| (record.content, record.lock, record.path)) else { return }; + val = RawScript { + input_transforms, content, lock, path, tag, language, custom_concurrency_key, + concurrent_limit, concurrency_time_window_s, is_trigger + }; + }, + ForloopFlow { modules, .. } | WhileloopFlow { modules, .. } => { + for module in modules { + Box::pin(resolve_module(conn, workspace_id, &mut module.value)).await; + } + }, + BranchOne { branches, .. } | BranchAll { branches, .. } => { + for branch in branches { + for module in &mut branch.modules { + Box::pin(resolve_module(conn, workspace_id, &mut module.value)).await; + } + } + } + _ => {} + } + *value = to_raw_value(&val); +} diff --git a/backend/windmill-common/src/jobs.rs b/backend/windmill-common/src/jobs.rs index 37bc220b9454b..1cbafddaf3146 100644 --- a/backend/windmill-common/src/jobs.rs +++ b/backend/windmill-common/src/jobs.rs @@ -22,7 +22,7 @@ use crate::{ worker::{to_raw_value, TMP_DIR}, }; -#[derive(sqlx::Type, Serialize, Deserialize, Debug, PartialEq, Clone)] +#[derive(sqlx::Type, Serialize, Deserialize, Debug, PartialEq, Copy, Clone)] #[sqlx(type_name = "JOB_KIND", rename_all = "lowercase")] #[serde(rename_all(serialize = "lowercase"))] pub enum JobKind { @@ -39,6 +39,7 @@ pub enum JobKind { AppDependencies, Noop, DeploymentCallback, + InlineScript, } #[derive(sqlx::FromRow, Debug, Serialize, Clone)] @@ -60,10 +61,6 @@ pub struct QueuedJob { pub args: Option>>>, #[serde(skip_serializing_if = "Option::is_none")] pub logs: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub raw_code: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub raw_lock: Option, pub canceled: bool, #[serde(skip_serializing_if = "Option::is_none")] pub canceled_by: Option, @@ -77,8 +74,6 @@ pub struct QueuedJob { pub permissioned_as: String, #[serde(skip_serializing_if = "Option::is_none")] pub flow_status: Option>>, - #[serde(skip_serializing_if = "Option::is_none")] - pub raw_flow: Option>>, pub is_flow_step: bool, #[serde(skip_serializing_if = "Option::is_none")] pub language: Option, @@ -133,14 +128,6 @@ impl QueuedJob { ) } - pub fn parse_raw_flow(&self) -> Option { - self.raw_flow.as_ref().and_then(|v| { - let str = (**v).get(); - // tracing::error!("raw_flow: {}", str); - return serde_json::from_str::(str).ok(); - }) - } - pub fn parse_flow_status(&self) -> Option { self.flow_status .as_ref() @@ -163,8 +150,6 @@ impl Default for QueuedJob { script_path: None, args: None, logs: None, - raw_code: None, - raw_lock: None, canceled: false, canceled_by: None, canceled_reason: None, @@ -173,7 +158,6 @@ impl Default for QueuedJob { schedule_path: None, permissioned_as: "".to_string(), flow_status: None, - raw_flow: None, is_flow_step: false, language: None, same_worker: false, @@ -216,8 +200,6 @@ pub struct CompletedJob { #[serde(skip_serializing_if = "Option::is_none")] pub logs: Option, pub deleted: bool, - #[serde(skip_serializing_if = "Option::is_none")] - pub raw_code: Option, pub canceled: bool, #[serde(skip_serializing_if = "Option::is_none")] pub canceled_by: Option, @@ -229,8 +211,6 @@ pub struct CompletedJob { pub permissioned_as: String, #[serde(skip_serializing_if = "Option::is_none")] pub flow_status: Option>>, - #[serde(skip_serializing_if = "Option::is_none")] - pub raw_flow: Option>>, pub is_flow_step: bool, #[serde(skip_serializing_if = "Option::is_none")] pub language: Option, @@ -254,12 +234,6 @@ impl CompletedJob { .flatten() } - pub fn parse_raw_flow(&self) -> Option { - self.raw_flow - .as_ref() - .and_then(|v| serde_json::from_str::((**v).get()).ok()) - } - pub fn parse_flow_status(&self) -> Option { self.flow_status .as_ref() @@ -290,6 +264,16 @@ pub enum JobPayload { priority: Option, apply_preprocessor: bool, }, + InlineScript { + flow_path: String, + hash: ScriptHash, + language: ScriptLang, + custom_concurrency_key: Option, + concurrent_limit: Option, + concurrency_time_window_s: Option, + cache_ttl: Option, + dedicated_worker: Option, + }, Code(RawCode), Dependencies { path: String, diff --git a/backend/windmill-common/src/worker.rs b/backend/windmill-common/src/worker.rs index 1010485e843d3..54f11e7633ffd 100644 --- a/backend/windmill-common/src/worker.rs +++ b/backend/windmill-common/src/worker.rs @@ -90,6 +90,8 @@ lazy_static::lazy_static! { .unwrap_or(false); pub static ref MIN_VERSION: Arc> = Arc::new(RwLock::new(Version::new(0, 0, 0))); + pub static ref MIN_VERSION_IS_AT_LEAST_1_425: Arc> = Arc::new(RwLock::new(false)); + pub static ref MIN_VERSION_IS_AT_LEAST_1_426: Arc> = Arc::new(RwLock::new(false)); } pub async fn make_suspended_pull_query(wc: &WorkerConfig) { @@ -112,10 +114,10 @@ pub async fn make_suspended_pull_query(wc: &WorkerConfig) { LIMIT 1 ) RETURNING id, workspace_id, parent_job, created_by, created_at, started_at, scheduled_for, - running, script_hash, script_path, args, null as logs, raw_code, canceled, canceled_by, + running, script_hash, script_path, args, null as logs, canceled, canceled_by, canceled_reason, last_ping, job_kind, schedule_path, permissioned_as, - flow_status, raw_flow, is_flow_step, language, suspend, suspend_until, - same_worker, raw_lock, pre_run_error, email, visible_to_owner, mem_peak, + flow_status, is_flow_step, language, suspend, suspend_until, + same_worker, pre_run_error, email, visible_to_owner, mem_peak, root_job, leaf_jobs, tag, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id, cache_ttl, priority", wc.worker_tags.iter().map(|x| format!("'{x}'")).join(", ")); let mut l = WORKER_SUSPENDED_PULL_QUERY.write().await; @@ -143,10 +145,10 @@ pub async fn make_pull_query(wc: &WorkerConfig) { LIMIT 1 ) RETURNING id, workspace_id, parent_job, created_by, created_at, started_at, scheduled_for, - running, script_hash, script_path, args, null as logs, raw_code, canceled, canceled_by, + running, script_hash, script_path, args, null as logs, canceled, canceled_by, canceled_reason, last_ping, job_kind, schedule_path, permissioned_as, - flow_status, raw_flow, is_flow_step, language, suspend, suspend_until, - same_worker, raw_lock, pre_run_error, email, visible_to_owner, mem_peak, + flow_status, is_flow_step, language, suspend, suspend_until, + same_worker, pre_run_error, email, visible_to_owner, mem_peak, root_job, leaf_jobs, tag, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id, cache_ttl, priority", tags.tags.iter().map(|x| format!("'{x}'")).join(", ")); @@ -573,6 +575,14 @@ pub async fn update_min_version<'c, E: sqlx::Executor<'c, Database = sqlx::Postg tracing::info!("Minimal worker version: {min_version}"); } + if min_version >= Version::new(1, 425, 0) { + *MIN_VERSION_IS_AT_LEAST_1_425.write().await = true; + } + + if min_version >= Version::new(1, 426, 0) { + *MIN_VERSION_IS_AT_LEAST_1_426.write().await = true; + } + *MIN_VERSION.write().await = min_version.clone(); min_version >= cur_version } diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 20122fd878472..dfa28407e3047 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -48,7 +48,7 @@ use windmill_common::{ Iterator, JobResult, RestartedFrom, RetryStatus, MAX_RETRY_ATTEMPTS, MAX_RETRY_INTERVAL, }, flows::{ - add_virtual_items_if_necessary, FlowModule, FlowModuleValue, FlowValue, InputTransform, + FlowModule, FlowModuleValue, FlowValue, InputTransform, }, jobs::{ get_payload_tag_from_prefixed_path, CompletedJob, JobKind, JobPayload, QueuedJob, RawCode, @@ -59,8 +59,8 @@ use windmill_common::{ users::{SUPERADMIN_NOTIFICATION_EMAIL, SUPERADMIN_SECRET_EMAIL}, utils::{not_found_if_none, report_critical_error, StripPath}, worker::{ - to_raw_value, DEFAULT_TAGS_PER_WORKSPACE, DEFAULT_TAGS_WORKSPACES, NO_LOGS, WORKER_CONFIG, - WORKER_PULL_QUERIES, WORKER_SUSPENDED_PULL_QUERY, + to_raw_value, DEFAULT_TAGS_PER_WORKSPACE, DEFAULT_TAGS_WORKSPACES, MIN_VERSION_IS_AT_LEAST_1_425, + MIN_VERSION_IS_AT_LEAST_1_426, NO_LOGS, WORKER_CONFIG, WORKER_PULL_QUERIES, WORKER_SUSPENDED_PULL_QUERY, }, DB, METRICS_ENABLED, }; @@ -581,6 +581,20 @@ pub async fn add_completed_job< serde_json::to_string(&result).unwrap_or_else(|_| "".to_string()) ); + let (raw_code, raw_lock, raw_flow) = if !*MIN_VERSION_IS_AT_LEAST_1_425.read().await { + sqlx::query!( + "SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json>\" + FROM queue_view WHERE id = $1 AND workspace_id = $2 LIMIT 1", + &job_id, &queued_job.workspace_id + ) + .fetch_one(db) + .await + .map(|record| (record.raw_code, record.raw_lock, record.raw_flow)) + .unwrap_or_default() + } else { + (None, None, None) + }; + let mem_peak = mem_peak.max(queued_job.mem_peak.unwrap_or(0)); add_time!(bench, "add_completed_job query START"); let _duration: i64 = sqlx::query_scalar!( @@ -630,8 +644,8 @@ pub async fn add_completed_job< queued_job.script_path, &queued_job.args as &Option>>>, result as Json<&T>, - queued_job.raw_code, - queued_job.raw_lock, + raw_code, + raw_lock, canceled_by.is_some(), canceled_by.clone().map(|cb| cb.username).flatten(), canceled_by.clone().map(|cb| cb.reason).flatten(), @@ -639,7 +653,7 @@ pub async fn add_completed_job< queued_job.schedule_path, queued_job.permissioned_as, &queued_job.flow_status as &Option>>, - &queued_job.raw_flow as &Option>>, + &raw_flow as &Option>>, queued_job.is_flow_step, skipped, queued_job.language.clone() as Option, @@ -2068,10 +2082,10 @@ async fn pull_single_job_and_mark_as_running_no_concurrency_limit< , suspend_until = null WHERE id = $1 RETURNING id, workspace_id, parent_job, created_by, created_at, started_at, scheduled_for, - running, script_hash, script_path, args, right(logs, 900000) as logs, raw_code, canceled, canceled_by, + running, script_hash, script_path, args, right(logs, 900000) as logs, canceled, canceled_by, canceled_reason, last_ping, job_kind, schedule_path, permissioned_as, - flow_status, raw_flow, is_flow_step, language, suspend, suspend_until, - same_worker, raw_lock, pre_run_error, email, visible_to_owner, mem_peak, + flow_status, is_flow_step, language, suspend, suspend_until, + same_worker, pre_run_error, email, visible_to_owner, mem_peak, root_job, leaf_jobs, tag, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id, cache_ttl, priority", ) @@ -3285,6 +3299,31 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>( priority, ) } + // TODO(uael): apply_preprocessor ? + JobPayload::InlineScript { + flow_path, + hash, + language, + custom_concurrency_key, + concurrent_limit, + concurrency_time_window_s, + cache_ttl, + dedicated_worker + } => ( + Some(hash.0), + Some(flow_path), + None, + JobKind::InlineScript, + None, + None, + Some(language), + custom_concurrency_key, + concurrent_limit, + concurrency_time_window_s, + cache_ttl, + dedicated_worker, + None, + ), JobPayload::ScriptHub { path } => { if path == "hub/7771/slack" || path == "hub/7836/slack" { permissioned_as = SUPERADMIN_NOTIFICATION_EMAIL.to_string(); @@ -3428,8 +3467,6 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>( None, ), JobPayload::RawFlow { mut value, path, restarted_from } => { - add_virtual_items_if_necessary(&mut value.modules); - let flow_status: FlowStatus = match restarted_from { Some(restarted_from_val) => { let (_, _, step_n, truncated_modules, _, user_states, cleanup_module) = @@ -3475,7 +3512,7 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>( None, path, None, - JobKind::FlowPreview, + JobKind::FlowPreview, // `FlowPreview`: mean the `raw_flow` is required. Some(value.clone()), Some(flow_status), None, @@ -3544,7 +3581,7 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>( None, Some(path), None, - JobKind::Flow, + JobKind::FlowPreview, // `FlowPreview`: mean the `raw_flow` is required. Some(flow_value.clone()), Some(FlowStatus::new(&flow_value)), // this is a new flow being pushed, flow_status is set to flow_value None, @@ -3569,17 +3606,13 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>( tx )? .ok_or_else(|| Error::InternalErr(format!("not found flow at path {:?}", path)))?; - let mut value = + let value = serde_json::from_str::(value_json.value.get()).map_err(|err| { Error::InternalErr(format!( "could not convert json to flow for {path}: {err:?}" )) })?; let priority = value.priority; - add_virtual_items_if_necessary(&mut value.modules); - if same_worker { - value.same_worker = true; - } let cache_ttl = value.cache_ttl.map(|x| x as i32).clone(); let custom_concurrency_key = value.concurrency_key.clone(); let concurrency_time_window_s = value.concurrency_time_window_s.clone(); @@ -3587,7 +3620,6 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>( let extra = args.extra.get_or_insert_with(HashMap::new); if !apply_preprocessor { - value.preprocessor_module = None; extra.remove("wm_trigger"); } else { extra.entry("wm_trigger".to_string()).or_insert_with(|| { @@ -3596,13 +3628,25 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>( })) }); } + let (kind, value) = if !apply_preprocessor && value.preprocessor_module.is_some() { + // `FlowPreview`: mean the `raw_flow` is required. + (JobKind::FlowPreview, FlowValue { preprocessor_module: None, ..value }) + } else { + // `Flow`: the `raw_flow` is retrieved from the database when handled. + (JobKind::Flow, value) + }; let status = Some(FlowStatus::new(&value)); + let value = if matches!(kind, JobKind::Flow) && *MIN_VERSION_IS_AT_LEAST_1_426.read().await { + None + } else { + Some(value) + }; ( None, Some(path), None, - JobKind::Flow, - Some(value), + kind, + value, status, // this is a new flow being pushed, flow_status is set to flow_value None, custom_concurrency_key, @@ -3654,18 +3698,28 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>( user_states, preprocessor_module: None, }; + let cache_ttl = raw_flow.cache_ttl.map(|x| x as i32); + let concurrency_key = raw_flow.concurrency_key.clone(); + let concurrency_time_window_s = raw_flow.concurrency_time_window_s; + let concurrent_limit = raw_flow.concurrent_limit; + let value = if *MIN_VERSION_IS_AT_LEAST_1_426.read().await { + // `Flow`: the `raw_flow` is retrieved from the database when handled. + None + } else { + Some(raw_flow) + }; ( None, flow_path, None, JobKind::Flow, - Some(raw_flow.clone()), + value, Some(restarted_flow_status), None, - raw_flow.concurrency_key, - raw_flow.concurrent_limit, - raw_flow.concurrency_time_window_s, - raw_flow.cache_ttl.map(|x| x as i32), + concurrency_key, + concurrent_limit, + concurrency_time_window_s, + cache_ttl, None, priority, ) @@ -3885,6 +3939,27 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>( None }; + let raw_flow = raw_flow.map(Json); + + sqlx::query!( + "INSERT INTO job (id, workspace_id, raw_code, raw_lock, raw_flow, tag) + VALUES ($1, $2, $3, $4, $5, $6)", + job_id, + workspace_id, + raw_code, + raw_lock, + raw_flow.as_ref() as Option<&Json>, + tag, + ) + .execute(&mut tx) + .await?; + + let (raw_code, raw_lock, raw_flow) = if !*MIN_VERSION_IS_AT_LEAST_1_425.read().await { + (raw_code, raw_lock, raw_flow) + } else { + (None, None, None) + }; + tracing::debug!("Pushing job {job_id} with tag {tag}, schedule_path {schedule_path:?}, script_path: {script_path:?}, email {email}, workspace_id {workspace_id}"); let uuid = sqlx::query_scalar!( "INSERT INTO queue @@ -3909,7 +3984,7 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>( Json(args) as Json, job_kind.clone() as JobKind, schedule_path, - raw_flow.map(Json) as Option>, + raw_flow.as_ref() as Option<&Json>, flow_status.map(Json) as Option>, is_flow_step, language as Option, @@ -4010,6 +4085,7 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>( JobKind::FlowDependencies => "jobs.run.flow_dependencies", JobKind::AppDependencies => "jobs.run.app_dependencies", JobKind::DeploymentCallback => "jobs.run.deployment_callback", + JobKind::InlineScript => "jobs.run.inline_script", }; let audit_author = if format!("u/{user}") != permissioned_as && user != permissioned_as { @@ -4091,12 +4167,23 @@ async fn restarted_flows_resolution( )) })?; - let raw_flow = completed_job - .parse_raw_flow() + let flow_value = if let Some(flow_value) = flow_value_if_any { + flow_value + } else { + sqlx::query_scalar!( + "SELECT raw_flow AS \"raw_flow!: Json>\" + FROM completed_job_view WHERE id = $1 AND workspace_id = $2 LIMIT 1", + &completed_flow_id, workspace_id + ) + .fetch_one(db) + .await + .ok() + .and_then(|raw_flow| serde_json::from_str::(raw_flow.get()).ok()) .ok_or(Error::InternalErr(format!( "Unable to parse raw definition for job {} in workspace {}", completed_flow_id, workspace_id, - )))?; + )))? + }; let flow_status = completed_job .parse_flow_status() .ok_or(Error::InternalErr(format!( @@ -4108,19 +4195,10 @@ async fn restarted_flows_resolution( let mut dependent_module = false; let mut truncated_modules: Vec = vec![]; for module in flow_status.modules { - if flow_value_if_any - .clone() - .map(|fv| { - fv.modules - .iter() - .find(|flow_value_module| flow_value_module.id == module.id()) - .is_none() - }) - .unwrap_or(false) - { + let Some(module_definition) = flow_value.modules.iter().find(|flow_value_module| flow_value_module.id == module.id()) else { // skip module as it doesn't appear in the flow_value anymore continue; - } + }; if module.id() == restart_step_id { // if the module ID is the one we want to restart the flow at, or if it's past it in the flow, // set the module as WaitingForPriorSteps as it needs to be re-run @@ -4130,14 +4208,6 @@ async fn restarted_flows_resolution( } else { // expect a module to be either a branchall (resp. loop), and resume the flow from this branch (resp. iteration) let branch_or_iteration_n = branch_or_iteration_n.unwrap(); - let module_definition = raw_flow - .modules - .iter() - .find(|flow_value_module| flow_value_module.id == restart_step_id) - .ok_or(Error::InternalErr(format!( - "Module {} not found in flow definition", - module.id() - )))?; match module_definition.get_value() { Ok(FlowModuleValue::BranchAll { branches, parallel, .. }) => { @@ -4250,7 +4320,7 @@ async fn restarted_flows_resolution( return Ok(( completed_job.script_path, - raw_flow, + flow_value, step_n, truncated_modules, completed_job.priority, diff --git a/backend/windmill-worker/src/dedicated_worker.rs b/backend/windmill-worker/src/dedicated_worker.rs index 1a332e76efd02..a14dfe95f89a3 100644 --- a/backend/windmill-worker/src/dedicated_worker.rs +++ b/backend/windmill-worker/src/dedicated_worker.rs @@ -392,6 +392,45 @@ async fn spawn_dedicated_workers_for_flow( workers.push(dedi_w); } } + FlowModuleValue::InlineScript { hash, language, .. } => { + let spawn = sqlx::query!( + "SELECT content, lock, path FROM inline_script \ + WHERE flow = $1 AND hash = $2 AND workspace_id = $3", + path, hash.0, w_id + ) + .fetch_one(db) + .await + .map(|record| SpawnWorker::RawScript { + path: record.path.unwrap_or_else(|| "".to_string()), + content: record.content, + lock: record.lock, + lang: language.clone(), + }); + match spawn { + Ok(spawn) => { + if let Some(dedi_w) = spawn_dedicated_worker( + spawn, + w_id, + killpill_tx.clone(), + killpill_rx, + db, + worker_dir, + base_internal_url, + worker_name, + job_completed_tx, + Some(module.id.clone()), + ) + .await + { + workers.push(dedi_w); + } + }, + Err(err) => tracing::error!( + "failed to get script for module: {:?}, err: {:?}", + module, err + ) + } + }, FlowModuleValue::Flow { .. } => (), FlowModuleValue::Identity => (), } diff --git a/backend/windmill-worker/src/worker.rs b/backend/windmill-worker/src/worker.rs index e6e92e1682844..b9416dea44b0f 100644 --- a/backend/windmill-worker/src/worker.rs +++ b/backend/windmill-worker/src/worker.rs @@ -1829,20 +1829,33 @@ async fn handle_queued_job( None }; + let (raw_code, raw_lock, raw_flow) = + sqlx::query!( + "SELECT raw_code, raw_lock, raw_flow AS \"raw_flow: Json>\" + FROM queue_view WHERE id = $1 AND workspace_id = $2 LIMIT 1", + &job.id, job.workspace_id + ) + .fetch_one(db) + .await + .map(|record| (record.raw_code, record.raw_lock, record.raw_flow)) + .unwrap_or_default(); + let cached_res_path = if job.cache_ttl.is_some() { let version_hash = if let Some(h) = job.script_hash { - format!("script_{}", h.to_string()) - } else if let Some(rc) = job.raw_code.as_ref() { + if matches!(job.job_kind, JobKind::InlineScript) { + format!("inlinescript_{}", h.to_string()) + } else { + format!("script_{}", h.to_string()) + } + } else if let Some(rc) = raw_code.as_ref() { use std::hash::Hasher; let mut s = DefaultHasher::new(); rc.hash(&mut s); format!("inline_{}", hex::encode(s.finish().to_be_bytes())) - } else if let Some(rc) = job.raw_flow.as_ref() { + } else if let Some(sqlx::types::Json(rc)) = raw_flow.as_ref() { use std::hash::Hasher; let mut s = DefaultHasher::new(); - serde_json::to_string(&rc.0) - .unwrap_or_default() - .hash(&mut s); + rc.get().hash(&mut s); format!("flow_{}", hex::encode(s.finish().to_be_bytes())) } else { "none".to_string() @@ -1919,7 +1932,9 @@ async fn handle_queued_job( } }; if job.is_flow() { - let flow = job.parse_raw_flow(); + let flow = raw_flow + .as_ref() + .and_then(|raw_flow| serde_json::from_str(raw_flow.get()).ok()); handle_flow( job, flow, @@ -1976,6 +1991,7 @@ async fn handle_queued_job( JobKind::Dependencies => { handle_dependency_job( &job, + raw_code, &mut mem_peak, &mut canceled_by, job_dir, @@ -1992,6 +2008,7 @@ async fn handle_queued_job( JobKind::FlowDependencies => { handle_flow_dependency_job( &job, + raw_flow, &mut mem_peak, &mut canceled_by, job_dir, @@ -2031,6 +2048,8 @@ async fn handle_queued_job( let metric_timer = Instant::now(); let r = handle_code_execution_job( job.as_ref(), + raw_code, + raw_lock, db, client, job_dir, @@ -2193,6 +2212,8 @@ pub async fn get_script_content_by_hash( #[tracing::instrument(level = "trace", skip_all)] async fn handle_code_execution_job( job: &QueuedJob, + raw_code: Option, + raw_lock: Option, db: &sqlx::Pool, client: &AuthedClientBackgroundTask, job_dir: &str, @@ -2220,11 +2241,9 @@ async fn handle_code_execution_job( }; ContentReqLangEnvs { - content: job - .raw_code - .clone() + content: raw_code .unwrap_or_else(|| "no raw code".to_owned()), - lockfile: job.raw_lock.clone(), + lockfile: raw_lock, language: job.language.to_owned(), envs: None, codebase, @@ -2241,6 +2260,26 @@ async fn handle_code_execution_job( ) .await? } + JobKind::InlineScript => { + // TODO(uael): why path unused ? + let (content, lock, _path) = sqlx::query!( + "SELECT content, lock, path FROM inline_script \ + WHERE flow = $1 AND hash = $2 AND workspace_id = $3", + job.script_path.as_ref().map(String::as_str).unwrap_or(""), + job.script_hash.unwrap_or(ScriptHash(0)).0, + &job.workspace_id + ) + .fetch_one(db) + .await + .map(|record| (record.content, record.lock, record.path))?; + ContentReqLangEnvs { + content, + lockfile: lock, + language: job.language.to_owned(), + envs: None, + codebase: None, + } + }, JobKind::DeploymentCallback => { get_script_content_by_path(job.script_path.clone(), &job.workspace_id, db).await? } diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index 0a9484ed8819a..e9b2235aeb56a 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -202,6 +202,7 @@ pub async fn update_flow_status_after_job_completion_internal< should_continue_flow, flow_job, flow_value, + raw_flow, stop_early, skip_if_stop_early, nresult, @@ -214,7 +215,7 @@ pub async fn update_flow_status_after_job_completion_internal< "SELECT flow_status AS \"flow_status!: Json>\", raw_flow->'modules'->(flow_status->'step')::int AS \"module: Json>\" - FROM queue WHERE id = $1 AND workspace_id = $2 LIMIT 1", + FROM queue_view WHERE id = $1 AND workspace_id = $2 LIMIT 1", flow, w_id ) .fetch_one(db) @@ -291,7 +292,7 @@ pub async fn update_flow_status_after_job_completion_internal< let is_flow = if let Some(step) = step { sqlx::query_scalar!( - "SELECT raw_flow->'modules'->($1)::text->'value'->>'type' = 'flow' FROM queue WHERE id = $2 LIMIT 1", + "SELECT raw_flow->'modules'->($1)::text->'value'->>'type' = 'flow' FROM queue_view WHERE id = $2 LIMIT 1", step as i32, flow ) .fetch_one(db) @@ -925,7 +926,17 @@ pub async fn update_flow_status_after_job_completion_internal< .unwrap_or_else(|| "none".to_string()); tracing::info!(id = %flow_job.id, root_id = %job_root, "update flow status"); - let flow_value = flow_job.parse_raw_flow(); + let raw_flow = sqlx::query_scalar!( + "SELECT raw_flow AS \"raw_flow!: Json>\" + FROM queue_view WHERE id = $1 AND workspace_id = $2 LIMIT 1", + &flow_job.id, w_id + ) + .fetch_one(db) + .await + .ok(); + let flow_value = raw_flow + .as_ref() + .and_then(|raw_flow| serde_json::from_str::(raw_flow.get()).ok()); let should_continue_flow = match success { _ if stop_early => false, @@ -967,6 +978,7 @@ pub async fn update_flow_status_after_job_completion_internal< should_continue_flow, flow_job, flow_value, + raw_flow, stop_early, skip_if_stop_early, nresult, @@ -1034,7 +1046,7 @@ pub async fn update_flow_status_after_job_completion_internal< let args_hash = hash_args(db, client, w_id, job_id_for_status, &flow_job.args).await; let flow_path = flow_job.script_path(); - let version_hash = if let Some(sqlx::types::Json(s)) = flow_job.raw_flow.as_ref() { + let version_hash = if let Some(sqlx::types::Json(s)) = raw_flow.as_ref() { use std::hash::{Hash, Hasher}; let mut h = DefaultHasher::new(); s.get().hash(&mut h); @@ -1493,12 +1505,17 @@ pub async fn handle_flow( rsmq: Option, job_completed_tx: Sender, ) -> anyhow::Result<()> { - let flow = flow_value + let mut flow = flow_value .with_context(|| "Unable to parse flow definition")?; let status = flow_job .parse_flow_status() .with_context(|| "Unable to parse flow status")?; + add_virtual_items_if_necessary(&mut flow.modules); + if flow_job.same_worker { + flow.same_worker = true; + } + if !flow_job.is_flow_step && status.retry.fail_count == 0 && flow_job.schedule_path.is_some() @@ -2256,6 +2273,7 @@ async fn push_next_flow_job Ok( FlowModuleValue::Script { input_transforms, .. } | FlowModuleValue::RawScript { input_transforms, .. } + | FlowModuleValue::InlineScript { input_transforms, .. } | FlowModuleValue::Flow { input_transforms, .. }, ) => { let ctx = get_transform_context(&flow_job, &previous_id, &status).await?; @@ -2997,10 +3015,10 @@ async fn compute_next_flow_transform( if is_skipped { return trivial_next_job(JobPayload::Identity); } - match &module.get_value()? { + match module.get_value()? { FlowModuleValue::Identity => trivial_next_job(JobPayload::Identity), FlowModuleValue::Flow { path, .. } => { - let payload = flow_to_payload(path, &delete_after_use); + let payload = flow_to_payload(&path, &delete_after_use); Ok(NextFlowTransform::Continue( ContinuePayload::SingleJob(payload), NextStatus::NextStep, @@ -3008,7 +3026,7 @@ async fn compute_next_flow_transform( } FlowModuleValue::Script { path: script_path, hash: script_hash, tag_override, .. } => { let payload = - script_to_payload(script_hash, script_path, db, flow_job, module, tag_override) + script_to_payload(&script_hash, &script_path, db, flow_job, module, &tag_override) .await?; Ok(NextFlowTransform::Continue( ContinuePayload::SingleJob(payload), @@ -3039,14 +3057,14 @@ async fn compute_next_flow_transform( }); let payload = raw_script_to_payload( path, - content, - language, - lock, - custom_concurrency_key, - concurrent_limit, - concurrency_time_window_s, + &content, + &language, + &lock, + &custom_concurrency_key, + &concurrent_limit, + &concurrency_time_window_s, module, - tag, + &tag, &delete_after_use, ); Ok(NextFlowTransform::Continue( @@ -3054,9 +3072,39 @@ async fn compute_next_flow_transform( NextStatus::NextStep, )) } + FlowModuleValue::InlineScript { + flow_path, + hash, + tag, + language, + custom_concurrency_key, + concurrent_limit, + concurrency_time_window_s, + .. + } => { + let payload = JobPayloadWithTag { + payload: JobPayload::InlineScript { + flow_path, + hash, + language, + custom_concurrency_key: custom_concurrency_key.clone(), + concurrent_limit, + concurrency_time_window_s, + cache_ttl: module.cache_ttl.map(|x| x as i32), + dedicated_worker: None, + }, + tag: tag.clone(), + delete_after_use, + timeout: module.timeout, + }; + Ok(NextFlowTransform::Continue( + ContinuePayload::SingleJob(payload), + NextStatus::NextStep, + )) + }, FlowModuleValue::WhileloopFlow { modules, .. } => { // if it's a simple single step flow, we will collapse it as an optimization and need to pass flow_input as an arg - let is_simple = is_simple_modules(modules, flow); + let is_simple = is_simple_modules(&modules, flow); let (flow_jobs, flow_jobs_success) = match status_module { FlowStatusModule::InProgress { flow_jobs: Some(flow_jobs), @@ -3080,7 +3128,7 @@ async fn compute_next_flow_transform( }, while_loop: true, }, - modules, + &modules, flow_job, is_simple, db, @@ -3092,7 +3140,7 @@ async fn compute_next_flow_transform( /* forloop modules are expected set `iter: { value: Value, index: usize }` as job arguments */ FlowModuleValue::ForloopFlow { modules, iterator, parallel, .. } => { // if it's a simple single step flow, we will collapse it as an optimization and need to pass flow_input as an arg - let is_simple = !parallel && is_simple_modules(modules, flow); + let is_simple = !parallel && is_simple_modules(&modules, flow); // if is_simple { // match value { @@ -3109,14 +3157,14 @@ async fn compute_next_flow_transform( flow_job, previous_id, status, - iterator, + &iterator, arc_last_job_result, resumes, resume, approvers, arc_flow_job_args, client, - parallel, + ¶llel, ) .await?; @@ -3127,7 +3175,7 @@ async fn compute_next_flow_transform( flow, status, ns, - modules, + &modules, flow_job, is_simple, db, @@ -3153,7 +3201,7 @@ async fn compute_next_flow_transform( let continue_payload = { let flow_value = FlowValue { - modules: (*modules).clone(), + modules, failure_module: flow.failure_module.clone(), same_worker: flow.same_worker, concurrent_limit: None, @@ -3228,7 +3276,7 @@ async fn compute_next_flow_transform( )))?, }; - let mut modules = if let BranchChosen::Branch { branch } = branch { + let modules = if let BranchChosen::Branch { branch } = branch { branches .get(branch) .map(|b| b.modules.clone()) @@ -3240,7 +3288,6 @@ async fn compute_next_flow_transform( } else { default.clone() }; - add_virtual_items_if_necessary(&mut modules); let mut fm = flow.failure_module.clone(); if let Some(mut failure_module) = flow.failure_module.clone() { @@ -3284,7 +3331,7 @@ async fn compute_next_flow_transform( | FlowStatusModule::WaitingForExecutor { .. } => { if branches.is_empty() { return Ok(NextFlowTransform::EmptyInnerFlows); - } else if *parallel { + } else if parallel { return Ok(NextFlowTransform::Continue( ContinuePayload::BranchAllJobs( branches @@ -3299,8 +3346,7 @@ async fn compute_next_flow_transform( .id_append(&format!("{}-{i}", status.step,)); fm = Some(failure_module); } - let mut modules = b.modules.clone(); - add_virtual_items_if_necessary(&mut modules); + let modules = b.modules.clone(); JobPayloadWithTag { payload: JobPayload::RawFlow { value: FlowValue { @@ -3349,7 +3395,7 @@ async fn compute_next_flow_transform( flow_jobs: Some(flow_jobs), flow_jobs_success, .. - } if !*parallel => ( + } if !parallel => ( BranchAllStatus { branch: branch + 1, len: len.clone() }, flow_jobs.clone(), flow_jobs_success.clone(), @@ -3360,7 +3406,7 @@ async fn compute_next_flow_transform( )))?, }; - let mut modules = branches + let modules = branches .get(branch_status.branch) .map(|b| b.modules.clone()) .ok_or_else(|| { @@ -3369,7 +3415,6 @@ async fn compute_next_flow_transform( )) })?; - add_virtual_items_if_necessary(&mut modules); let mut fm = flow.failure_module.clone(); if let Some(mut failure_module) = flow.failure_module.clone() { failure_module.id_append(&format!("{}-{}", status.step, branch_status.branch)); @@ -3428,8 +3473,7 @@ async fn next_loop_iteration( failure_module.id_append(&format!("{}-{}", status.step, ns.index)); fm = Some(failure_module); } - let mut modules = (*modules).clone(); - add_virtual_items_if_necessary(&mut modules); + let modules = (*modules).clone(); let inner_path = Some(format!("{}/loop-{}", flow_job.script_path(), ns.index)); if is_simple { let value = &modules[0].get_value()?; @@ -3442,6 +3486,7 @@ async fn next_loop_iteration( match value { FlowModuleValue::Script { input_transforms, .. } | FlowModuleValue::RawScript { input_transforms, .. } + | FlowModuleValue::InlineScript { input_transforms, .. } | FlowModuleValue::Flow { input_transforms, .. } => { Some(input_transforms.clone()) } diff --git a/backend/windmill-worker/src/worker_lockfiles.rs b/backend/windmill-worker/src/worker_lockfiles.rs index 8684459d371c1..95e25d57fa684 100644 --- a/backend/windmill-worker/src/worker_lockfiles.rs +++ b/backend/windmill-worker/src/worker_lockfiles.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::hash::{DefaultHasher, Hasher}; use std::path::{Component, Path, PathBuf}; use async_recursion::async_recursion; @@ -204,6 +205,7 @@ pub fn extract_relative_imports( #[tracing::instrument(level = "trace", skip_all)] pub async fn handle_dependency_job( job: &QueuedJob, + raw_code: Option, mem_peak: &mut i32, canceled_by: &mut Option, job_dir: &str, @@ -215,8 +217,8 @@ pub async fn handle_dependency_job, occupancy_metrics: &mut OccupancyMetrics, ) -> error::Result> { - let raw_code = match job.raw_code { - Some(ref code) => code.to_owned(), + let raw_code = match raw_code { + Some(code) => code, None => sqlx::query_scalar!( "SELECT content FROM script WHERE hash = $1 AND workspace_id = $2", &job.script_hash.unwrap_or(ScriptHash(0)).0, @@ -527,6 +529,7 @@ async fn trigger_dependents_to_recompute_dependencies< pub async fn handle_flow_dependency_job( job: &QueuedJob, + raw_flow: Option>>, mem_peak: &mut i32, canceled_by: &mut Option, job_dir: &str, @@ -570,7 +573,7 @@ pub async fn handle_flow_dependency_job( ) .await; // - match new_lock { + let lock = match new_lock { Ok(new_lock) => { let dep_path = path.clone().unwrap_or_else(|| job_path.to_string()); tx = clear_dependency_map_for_item( @@ -961,20 +964,7 @@ async fn lock_modules<'c>( language = ScriptLang::Bun; }; } - e.value = windmill_common::worker::to_raw_value(&FlowModuleValue::RawScript { - lock: Some(new_lock), - path, - input_transforms, - content, - language, - tag, - custom_concurrency_key, - concurrent_limit, - concurrency_time_window_s, - is_trigger, - }); - new_flow_modules.push(e); - continue; + Some(new_lock) } Err(error) => { // TODO: Record flow raw script error lock logs @@ -984,26 +974,59 @@ async fn lock_modules<'c>( error = ?error, "Failed to generate flow lock for raw script" ); - e.value = windmill_common::worker::to_raw_value(&FlowModuleValue::RawScript { - lock: None, - path, - input_transforms, - content, - language, - tag, - custom_concurrency_key, - concurrent_limit, - concurrency_time_window_s, - is_trigger, - }); - new_flow_modules.push(e); - continue; + None } - } + }; + // TODO(uael): check min version before using `InlineScript`. + let hash; + (tx, hash) = create_inline_script(tx, job_path, &job.workspace_id, lock, path, content) + .await?; + e.value = to_raw_value(&FlowModuleValue::InlineScript { + input_transforms, + flow_path: job_path.to_string(), + hash, + tag, + language, + custom_concurrency_key, + concurrent_limit, + concurrency_time_window_s, + is_trigger, + }); + new_flow_modules.push(e); + continue; } Ok((new_flow_modules, tx, modified_ids)) } +async fn create_inline_script<'c>( + mut tx: sqlx::Transaction<'c, sqlx::Postgres>, + flow_path: &str, + workspace_id: &str, + lock: Option, + path: Option, + content: String, +) -> Result<(sqlx::Transaction<'c, sqlx::Postgres>, ScriptHash)> { + let hash = ScriptHash({ + use std::hash::Hash; + + let mut hasher = DefaultHasher::new(); + lock.hash(&mut hasher); + path.hash(&mut hasher); + content.hash(&mut hasher); + hasher.finish() as i64 + }); + + sqlx::query!( + "INSERT INTO inline_script (hash, flow, workspace_id, lock, path, content) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT DO NOTHING", + hash.0, flow_path, workspace_id, lock, path, content + ) + .execute(&mut *tx) + .await?; + Ok((tx, hash)) +} + fn skip_creating_new_lock(language: &ScriptLang, content: &str) -> bool { if language == &ScriptLang::Bun || language == &ScriptLang::Bunnative { let anns = windmill_common::worker::TypeScriptAnnotations::parse(&content);