diff --git a/backend/.sqlx/query-016bf078cdea0aae4a05ae7e004fad573d5c7cbdca975edc34f36890c824c44b.json b/backend/.sqlx/query-016bf078cdea0aae4a05ae7e004fad573d5c7cbdca975edc34f36890c824c44b.json new file mode 100644 index 0000000000000..28a9e33022432 --- /dev/null +++ b/backend/.sqlx/query-016bf078cdea0aae4a05ae7e004fad573d5c7cbdca975edc34f36890c824c44b.json @@ -0,0 +1,71 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n email AS \"email!\",\n created_by AS \"created_by!\",\n parent_job, permissioned_as AS \"permissioned_as!\",\n script_path, schedule_path, flow_step_id, root_job,\n scheduled_for AS \"scheduled_for!: chrono::DateTime\"\n FROM queue WHERE id = $1 AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "email!", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "created_by!", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "parent_job", + "type_info": "Uuid" + }, + { + "ordinal": 3, + "name": "permissioned_as!", + "type_info": "Varchar" + }, + { + "ordinal": 4, + "name": "script_path", + "type_info": "Varchar" + }, + { + "ordinal": 5, + "name": "schedule_path", + "type_info": "Varchar" + }, + { + "ordinal": 6, + "name": "flow_step_id", + "type_info": "Varchar" + }, + { + "ordinal": 7, + "name": "root_job", + "type_info": "Uuid" + }, + { + "ordinal": 8, + "name": "scheduled_for!: chrono::DateTime", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true, + true, + true, + true, + true, + true, + true, + true + ] + }, + "hash": "016bf078cdea0aae4a05ae7e004fad573d5c7cbdca975edc34f36890c824c44b" +} diff --git a/backend/.sqlx/query-07a7f1da7ee77324a73eb5b3743e4a801e0c446c55fc9fd8bc75e36d58073bee.json b/backend/.sqlx/query-07a7f1da7ee77324a73eb5b3743e4a801e0c446c55fc9fd8bc75e36d58073bee.json new file mode 100644 index 0000000000000..15f9729ee19ee --- /dev/null +++ b/backend/.sqlx/query-07a7f1da7ee77324a73eb5b3743e4a801e0c446c55fc9fd8bc75e36d58073bee.json @@ -0,0 +1,48 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n substr(concat(coalesce(completed_job.logs, ''), job_logs.logs), greatest($1 - job_logs.log_offset, 0)) AS logs,\n mem_peak,\n CASE WHEN is_flow_step is true then NULL else flow_status END AS \"flow_status: sqlx::types::Json>\",\n job_logs.log_offset + char_length(job_logs.logs) + 1 AS log_offset,\n created_by AS \"created_by!\"\n FROM completed_job\n LEFT JOIN job_logs ON job_logs.job_id = completed_job.id \n WHERE completed_job.workspace_id = $2 AND completed_job.id = $3", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "logs", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "mem_peak", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "flow_status: sqlx::types::Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "log_offset", + "type_info": "Int4" + }, + { + "ordinal": 4, + "name": "created_by!", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Int4", + "Text", + "Uuid" + ] + }, + "nullable": [ + null, + true, + null, + null, + true + ] + }, + "hash": "07a7f1da7ee77324a73eb5b3743e4a801e0c446c55fc9fd8bc75e36d58073bee" +} diff --git a/backend/.sqlx/query-0ef638eb62cb8b285cb20855679486b78eae82901a0128b9c9c837c9e9e91212.json b/backend/.sqlx/query-0ef638eb62cb8b285cb20855679486b78eae82901a0128b9c9c837c9e9e91212.json new file mode 100644 index 0000000000000..85523ee5c3736 --- /dev/null +++ b/backend/.sqlx/query-0ef638eb62cb8b285cb20855679486b78eae82901a0128b9c9c837c9e9e91212.json @@ -0,0 +1,38 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n success AS \"success!\",\n result AS \"result: Json>\",\n started_at AS \"started_at!\"FROM completed_job WHERE workspace_id = $1 AND schedule_path = $2 AND script_path = $3 AND id != $4\n ORDER BY created_at DESC\n LIMIT $5", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "success!", + "type_info": "Bool" + }, + { + "ordinal": 1, + "name": "result: Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 2, + "name": "started_at!", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Uuid", + "Int8" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "0ef638eb62cb8b285cb20855679486b78eae82901a0128b9c9c837c9e9e91212" +} diff --git a/backend/.sqlx/query-11d59fb24aeb40f82e6fd11b697f26e14a0ae955fabeecc4a936b95937bf04d1.json b/backend/.sqlx/query-11d59fb24aeb40f82e6fd11b697f26e14a0ae955fabeecc4a936b95937bf04d1.json new file mode 100644 index 0000000000000..3375abd07a1d9 --- /dev/null +++ b/backend/.sqlx/query-11d59fb24aeb40f82e6fd11b697f26e14a0ae955fabeecc4a936b95937bf04d1.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET flow_status = jsonb_set(\n jsonb_set(flow_status, ARRAY['modules', $4::INTEGER::TEXT, 'job'], to_jsonb($1::UUID::TEXT)),\n ARRAY['modules', $4::INTEGER::TEXT, 'type'],\n to_jsonb('InProgress'::text)\n )\n WHERE id = $2 AND workspace_id = $3", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Text", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "11d59fb24aeb40f82e6fd11b697f26e14a0ae955fabeecc4a936b95937bf04d1" +} diff --git a/backend/.sqlx/query-15697f3b63f88b9cfa33ab0aa64b441961aad80bf9fd0125bcf55a729e556d1e.json b/backend/.sqlx/query-15697f3b63f88b9cfa33ab0aa64b441961aad80bf9fd0125bcf55a729e556d1e.json new file mode 100644 index 0000000000000..4199d677a746a --- /dev/null +++ b/backend/.sqlx/query-15697f3b63f88b9cfa33ab0aa64b441961aad80bf9fd0125bcf55a729e556d1e.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE\n FROM parallel_monitor_lock\n WHERE last_ping IS NOT NULL AND last_ping < NOW() - ($1 || ' seconds')::interval \n RETURNING parent_flow_id, job_id, last_ping, (SELECT workspace_id FROM queue q\n WHERE q.id = parent_flow_id AND q.running = true AND q.canceled = false) AS workspace_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "parent_flow_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "job_id", + "type_info": "Uuid" + }, + { + "ordinal": 2, + "name": "last_ping", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "workspace_id", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false, + true, + null + ] + }, + "hash": "15697f3b63f88b9cfa33ab0aa64b441961aad80bf9fd0125bcf55a729e556d1e" +} diff --git a/backend/.sqlx/query-1d1098cc9367502faa1483627bf534472a6cae70d7964ba019aa2121c3929234.json b/backend/.sqlx/query-1d1098cc9367502faa1483627bf534472a6cae70d7964ba019aa2121c3929234.json new file mode 100644 index 0000000000000..abbd8b2a6d272 --- /dev/null +++ b/backend/.sqlx/query-1d1098cc9367502faa1483627bf534472a6cae70d7964ba019aa2121c3929234.json @@ -0,0 +1,68 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n result AS \"result: sqlx::types::Json>\",\n language AS \"language: ScriptLang\",\n flow_status AS \"flow_status: sqlx::types::Json>\",\n success AS \"success!\"\n FROM completed_job\n WHERE id = $1 AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "result: sqlx::types::Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 1, + "name": "language: ScriptLang", + "type_info": { + "Custom": { + "name": "script_lang", + "kind": { + "Enum": [ + "python3", + "deno", + "go", + "bash", + "postgresql", + "nativets", + "bun", + "mysql", + "bigquery", + "snowflake", + "graphql", + "powershell", + "mssql", + "php", + "bunnative", + "rust", + "ansible", + "csharp", + "oracledb" + ] + } + } + } + }, + { + "ordinal": 2, + "name": "flow_status: sqlx::types::Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "success!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true, + true, + true + ] + }, + "hash": "1d1098cc9367502faa1483627bf534472a6cae70d7964ba019aa2121c3929234" +} diff --git a/backend/.sqlx/query-20d9a1b3a6631f97836e7b8d96cdec706ba1cc2d5d432e397633a1f79e67589a.json b/backend/.sqlx/query-20d9a1b3a6631f97836e7b8d96cdec706ba1cc2d5d432e397633a1f79e67589a.json new file mode 100644 index 0000000000000..7a1fe7377e2d6 --- /dev/null +++ b/backend/.sqlx/query-20d9a1b3a6631f97836e7b8d96cdec706ba1cc2d5d432e397633a1f79e67589a.json @@ -0,0 +1,69 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n result AS \"result: sqlx::types::Json>\",\n flow_status AS \"flow_status: sqlx::types::Json>\",\n language AS \"language: ScriptLang\",\n created_by AS \"created_by!\"\n FROM completed_job\n WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "result: sqlx::types::Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 1, + "name": "flow_status: sqlx::types::Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 2, + "name": "language: ScriptLang", + "type_info": { + "Custom": { + "name": "script_lang", + "kind": { + "Enum": [ + "python3", + "deno", + "go", + "bash", + "postgresql", + "nativets", + "bun", + "mysql", + "bigquery", + "snowflake", + "graphql", + "powershell", + "mssql", + "php", + "bunnative", + "rust", + "ansible", + "csharp", + "oracledb" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "created_by!", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text", + "TextArray" + ] + }, + "nullable": [ + true, + true, + true, + true + ] + }, + "hash": "20d9a1b3a6631f97836e7b8d96cdec706ba1cc2d5d432e397633a1f79e67589a" +} diff --git a/backend/.sqlx/query-2a0b59e2770b27a1f2a8baddc67dba29216a1aad733171c25cc4aae5b3c84d54.json b/backend/.sqlx/query-2a0b59e2770b27a1f2a8baddc67dba29216a1aad733171c25cc4aae5b3c84d54.json new file mode 100644 index 0000000000000..8ff5e29db1eef --- /dev/null +++ b/backend/.sqlx/query-2a0b59e2770b27a1f2a8baddc67dba29216a1aad733171c25cc4aae5b3c84d54.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue SET\n flow_status = JSONB_SET(flow_status, ARRAY['modules', flow_status->>'step'::text], $1),\n suspend = $2,\n suspend_until = now() + $3\n WHERE id = $4", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Jsonb", + "Int4", + "Interval", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "2a0b59e2770b27a1f2a8baddc67dba29216a1aad733171c25cc4aae5b3c84d54" +} diff --git a/backend/.sqlx/query-32fdc66931dcf34f6ef5cdf3fd335d9f990eaa3dbb396290477159012e86af14.json b/backend/.sqlx/query-32fdc66931dcf34f6ef5cdf3fd335d9f990eaa3dbb396290477159012e86af14.json new file mode 100644 index 0000000000000..4767ba9a23bde --- /dev/null +++ b/backend/.sqlx/query-32fdc66931dcf34f6ef5cdf3fd335d9f990eaa3dbb396290477159012e86af14.json @@ -0,0 +1,58 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n id AS \"id!\", workspace_id AS \"workspace_id!\", parent_job, is_flow_step,\n flow_status AS \"flow_status: Box\", last_ping, same_worker\n FROM queue\n WHERE running = true AND suspend = 0 AND suspend_until IS null AND scheduled_for <= now()\n AND (job_kind = 'flow' OR job_kind = 'flowpreview' OR job_kind = 'flownode')\n AND last_ping IS NOT NULL AND last_ping < NOW() - ($1 || ' seconds')::interval\n AND canceled = false\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "workspace_id!", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "parent_job", + "type_info": "Uuid" + }, + { + "ordinal": 3, + "name": "is_flow_step", + "type_info": "Bool" + }, + { + "ordinal": 4, + "name": "flow_status: Box", + "type_info": "Jsonb" + }, + { + "ordinal": 5, + "name": "last_ping", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "same_worker", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + true, + true, + true, + true, + true, + true, + true + ] + }, + "hash": "32fdc66931dcf34f6ef5cdf3fd335d9f990eaa3dbb396290477159012e86af14" +} diff --git a/backend/.sqlx/query-3c423bcb10668bdd131f7b6a9b0fcc8f9b1909f255cdfce0fb83496aac0fc021.json b/backend/.sqlx/query-3c423bcb10668bdd131f7b6a9b0fcc8f9b1909f255cdfce0fb83496aac0fc021.json new file mode 100644 index 0000000000000..090efe49644e7 --- /dev/null +++ b/backend/.sqlx/query-3c423bcb10668bdd131f7b6a9b0fcc8f9b1909f255cdfce0fb83496aac0fc021.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id AS \"id!\", flow_status AS \"flow_status!: Json\"\n FROM completed_job\n WHERE parent_job = $1 AND workspace_id = $2 AND flow_status IS NOT NULL", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "flow_status!: Json", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true + ] + }, + "hash": "3c423bcb10668bdd131f7b6a9b0fcc8f9b1909f255cdfce0fb83496aac0fc021" +} diff --git a/backend/.sqlx/query-4504f3a5d3cffd56d51bd263e6759404a3a5889dd7c61cb077e17b877b027eff.json b/backend/.sqlx/query-4504f3a5d3cffd56d51bd263e6759404a3a5889dd7c61cb077e17b877b027eff.json new file mode 100644 index 0000000000000..76e4896aaaec0 --- /dev/null +++ b/backend/.sqlx/query-4504f3a5d3cffd56d51bd263e6759404a3a5889dd7c61cb077e17b877b027eff.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue\n SET canceled = true\n , canceled_by = 'timeout'\n , canceled_reason = $1\n WHERE id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "4504f3a5d3cffd56d51bd263e6759404a3a5889dd7c61cb077e17b877b027eff" +} diff --git a/backend/.sqlx/query-4671f1727d0563490534c426375738478f3d93f6bb42aaf021794392328c8875.json b/backend/.sqlx/query-4671f1727d0563490534c426375738478f3d93f6bb42aaf021794392328c8875.json new file mode 100644 index 0000000000000..e497d7f59e5e0 --- /dev/null +++ b/backend/.sqlx/query-4671f1727d0563490534c426375738478f3d93f6bb42aaf021794392328c8875.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT created_by AS \"created_by!\", args as \"args: sqlx::types::Json>\"\n FROM completed_job \n WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "created_by!", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "args: sqlx::types::Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text", + "TextArray" + ] + }, + "nullable": [ + true, + true + ] + }, + "hash": "4671f1727d0563490534c426375738478f3d93f6bb42aaf021794392328c8875" +} diff --git a/backend/.sqlx/query-4914b74a7956ab41e1a1c812460cc84bbd5e4045acc529f61de892a4e3aceef9.json b/backend/.sqlx/query-4914b74a7956ab41e1a1c812460cc84bbd5e4045acc529f61de892a4e3aceef9.json new file mode 100644 index 0000000000000..26af54b1facf1 --- /dev/null +++ b/backend/.sqlx/query-4914b74a7956ab41e1a1c812460cc84bbd5e4045acc529f61de892a4e3aceef9.json @@ -0,0 +1,35 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT username, is_admin, operator FROM usr WHERE\n email = $1 AND workspace_id = $2 AND disabled = false", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "username", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "is_admin", + "type_info": "Bool" + }, + { + "ordinal": 2, + "name": "operator", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "4914b74a7956ab41e1a1c812460cc84bbd5e4045acc529f61de892a4e3aceef9" +} diff --git a/backend/.sqlx/query-52bd8efeaec0d0c2aa77d777a0b6559a1aa4ca9ebd4f9b535014cbcb113f9b92.json b/backend/.sqlx/query-52bd8efeaec0d0c2aa77d777a0b6559a1aa4ca9ebd4f9b535014cbcb113f9b92.json new file mode 100644 index 0000000000000..b4576d8682133 --- /dev/null +++ b/backend/.sqlx/query-52bd8efeaec0d0c2aa77d777a0b6559a1aa4ca9ebd4f9b535014cbcb113f9b92.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT created_by AS \"created_by!\", args as \"args: sqlx::types::Json>\"\n FROM queue\n WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "created_by!", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "args: sqlx::types::Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text", + "TextArray" + ] + }, + "nullable": [ + true, + true + ] + }, + "hash": "52bd8efeaec0d0c2aa77d777a0b6559a1aa4ca9ebd4f9b535014cbcb113f9b92" +} diff --git a/backend/.sqlx/query-639dbfa0c98d8b91006823f4c645a1105d6c1cc58990937c9bcf693a8812920c.json b/backend/.sqlx/query-639dbfa0c98d8b91006823f4c645a1105d6c1cc58990937c9bcf693a8812920c.json new file mode 100644 index 0000000000000..739be2be53248 --- /dev/null +++ b/backend/.sqlx/query-639dbfa0c98d8b91006823f4c645a1105d6c1cc58990937c9bcf693a8812920c.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT result #> $3 AS \"result: Json>\"\n FROM completed_job WHERE id = $1 AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "result: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text", + "TextArray" + ] + }, + "nullable": [ + null + ] + }, + "hash": "639dbfa0c98d8b91006823f4c645a1105d6c1cc58990937c9bcf693a8812920c" +} diff --git a/backend/.sqlx/query-641087f3166faee8baad063fd569b61aa4d21a15a9bc06e0c2fd15b47eb7beb0.json b/backend/.sqlx/query-641087f3166faee8baad063fd569b61aa4d21a15a9bc06e0c2fd15b47eb7beb0.json new file mode 100644 index 0000000000000..b070f8eb7f689 --- /dev/null +++ b/backend/.sqlx/query-641087f3166faee8baad063fd569b61aa4d21a15a9bc06e0c2fd15b47eb7beb0.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n id As \"id!\",\n flow_status->'restarted_from'->'flow_job_id' AS \"restarted_from: Json\"\n FROM queue\n WHERE COALESCE((SELECT root_job FROM queue WHERE id = $1), $1) = id AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "restarted_from: Json", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + null + ] + }, + "hash": "641087f3166faee8baad063fd569b61aa4d21a15a9bc06e0c2fd15b47eb7beb0" +} diff --git a/backend/.sqlx/query-66bf488f2eeaf5b4c4cb8c579d7a15eb87516c319c1fcbb3e46032bb9fdf718e.json b/backend/.sqlx/query-66bf488f2eeaf5b4c4cb8c579d7a15eb87516c319c1fcbb3e46032bb9fdf718e.json new file mode 100644 index 0000000000000..faf1201af104e --- /dev/null +++ b/backend/.sqlx/query-66bf488f2eeaf5b4c4cb8c579d7a15eb87516c319c1fcbb3e46032bb9fdf718e.json @@ -0,0 +1,38 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n success AS \"success!\",\n result AS \"result: Json>\",\n started_at AS \"started_at!\"\n FROM completed_job\n WHERE workspace_id = $1 AND schedule_path = $2 AND script_path = $3 AND id != $4\n ORDER BY created_at DESC\n LIMIT $5", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "success!", + "type_info": "Bool" + }, + { + "ordinal": 1, + "name": "result: Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 2, + "name": "started_at!", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Uuid", + "Int8" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "66bf488f2eeaf5b4c4cb8c579d7a15eb87516c319c1fcbb3e46032bb9fdf718e" +} diff --git a/backend/.sqlx/query-672363560895871e4ab19e0dd0dc36afdbc58470664b6cefa8cec25515a42f13.json b/backend/.sqlx/query-672363560895871e4ab19e0dd0dc36afdbc58470664b6cefa8cec25515a42f13.json new file mode 100644 index 0000000000000..53ed43a6e03bf --- /dev/null +++ b/backend/.sqlx/query-672363560895871e4ab19e0dd0dc36afdbc58470664b6cefa8cec25515a42f13.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT result #> $3 AS \"result: Json>\"\n FROM completed_job WHERE id = $1 AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "result: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text", + "TextArray" + ] + }, + "nullable": [ + null + ] + }, + "hash": "672363560895871e4ab19e0dd0dc36afdbc58470664b6cefa8cec25515a42f13" +} diff --git a/backend/.sqlx/query-6cb2c77bb90679a36189007b1f70406fe28923f51fc465ae0f45d7f317077bf5.json b/backend/.sqlx/query-6cb2c77bb90679a36189007b1f70406fe28923f51fc465ae0f45d7f317077bf5.json new file mode 100644 index 0000000000000..e41413a7bfd87 --- /dev/null +++ b/backend/.sqlx/query-6cb2c77bb90679a36189007b1f70406fe28923f51fc465ae0f45d7f317077bf5.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET flow_status = JSONB_SET(flow_status, ARRAY['cleanup_module', 'flow_jobs_to_clean'], COALESCE(flow_status->'cleanup_module'->'flow_jobs_to_clean', '[]'::jsonb) || $1)\n WHERE id = $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Jsonb", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "6cb2c77bb90679a36189007b1f70406fe28923f51fc465ae0f45d7f317077bf5" +} diff --git a/backend/.sqlx/query-6dffdd5066f109292ff14e0b81073c80b0126183c23f2996092d6e9124e37b9c.json b/backend/.sqlx/query-6dffdd5066f109292ff14e0b81073c80b0126183c23f2996092d6e9124e37b9c.json new file mode 100644 index 0000000000000..acea33426d857 --- /dev/null +++ b/backend/.sqlx/query-6dffdd5066f109292ff14e0b81073c80b0126183c23f2996092d6e9124e37b9c.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT result AS \"result: SqlxJson>\", success AS \"success!\"\n FROM completed_job WHERE id = $1 AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "result: SqlxJson>", + "type_info": "Jsonb" + }, + { + "ordinal": 1, + "name": "success!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true + ] + }, + "hash": "6dffdd5066f109292ff14e0b81073c80b0126183c23f2996092d6e9124e37b9c" +} diff --git a/backend/.sqlx/query-858db2a501abcfffbcce19d60cc241060c93354de19f0fa80b8f45290e8b992d.json b/backend/.sqlx/query-858db2a501abcfffbcce19d60cc241060c93354de19f0fa80b8f45290e8b992d.json new file mode 100644 index 0000000000000..f3d1c484fe6a2 --- /dev/null +++ b/backend/.sqlx/query-858db2a501abcfffbcce19d60cc241060c93354de19f0fa80b8f45290e8b992d.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET flow_status = JSONB_SET(flow_status, ARRAY['modules', $1::TEXT], $2)\n WHERE id = $3", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Jsonb", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "858db2a501abcfffbcce19d60cc241060c93354de19f0fa80b8f45290e8b992d" +} diff --git a/backend/.sqlx/query-866c1b86d63466df84877e81655ce999284f3d2d854a00fefcaf7c044dcf71ca.json b/backend/.sqlx/query-866c1b86d63466df84877e81655ce999284f3d2d854a00fefcaf7c044dcf71ca.json new file mode 100644 index 0000000000000..c4905590b645f --- /dev/null +++ b/backend/.sqlx/query-866c1b86d63466df84877e81655ce999284f3d2d854a00fefcaf7c044dcf71ca.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET flow_status = jsonb_set(\n jsonb_set(flow_status, ARRAY['failure_module', 'job'], to_jsonb($1::UUID::TEXT)),\n ARRAY['failure_module', 'type'],\n to_jsonb('InProgress'::text)\n )\n WHERE id = $2 AND workspace_id = $3", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "866c1b86d63466df84877e81655ce999284f3d2d854a00fefcaf7c044dcf71ca" +} diff --git a/backend/.sqlx/query-8dd93be44f66c0744ddaff12a9664d9ad745a4a2bb4c0fa36d3caf77fa60e035.json b/backend/.sqlx/query-8dd93be44f66c0744ddaff12a9664d9ad745a4a2bb4c0fa36d3caf77fa60e035.json new file mode 100644 index 0000000000000..dead4b0a556fb --- /dev/null +++ b/backend/.sqlx/query-8dd93be44f66c0744ddaff12a9664d9ad745a4a2bb4c0fa36d3caf77fa60e035.json @@ -0,0 +1,54 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n running AS \"running!\",\n substr(concat(coalesce(queue.logs, ''), job_logs.logs), greatest($1 - job_logs.log_offset, 0)) AS logs,\n mem_peak,\n CASE WHEN is_flow_step is true then NULL else flow_status END AS \"flow_status: sqlx::types::Json>\",\n job_logs.log_offset + char_length(job_logs.logs) + 1 AS log_offset,\n created_by AS \"created_by!\"\n FROM queue\n LEFT JOIN job_logs ON job_logs.job_id = queue.id \n WHERE queue.workspace_id = $2 AND queue.id = $3", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "running!", + "type_info": "Bool" + }, + { + "ordinal": 1, + "name": "logs", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "mem_peak", + "type_info": "Int4" + }, + { + "ordinal": 3, + "name": "flow_status: sqlx::types::Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 4, + "name": "log_offset", + "type_info": "Int4" + }, + { + "ordinal": 5, + "name": "created_by!", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Int4", + "Text", + "Uuid" + ] + }, + "nullable": [ + true, + null, + true, + null, + null, + true + ] + }, + "hash": "8dd93be44f66c0744ddaff12a9664d9ad745a4a2bb4c0fa36d3caf77fa60e035" +} diff --git a/backend/.sqlx/query-9250b087485e51af83aef1f412c85edc114add7a6b6c1e3845ffda344effa03c.json b/backend/.sqlx/query-9250b087485e51af83aef1f412c85edc114add7a6b6c1e3845ffda344effa03c.json new file mode 100644 index 0000000000000..b46a8f1a22db2 --- /dev/null +++ b/backend/.sqlx/query-9250b087485e51af83aef1f412c85edc114add7a6b6c1e3845ffda344effa03c.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET flow_status = flow_status - 'approval_conditions'\n WHERE id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "9250b087485e51af83aef1f412c85edc114add7a6b6c1e3845ffda344effa03c" +} diff --git a/backend/.sqlx/query-92b80a77d292ec734b097b815261ce0cd51d7f699ff296314c9801e115c52228.json b/backend/.sqlx/query-92b80a77d292ec734b097b815261ce0cd51d7f699ff296314c9801e115c52228.json new file mode 100644 index 0000000000000..1975b59d5ac26 --- /dev/null +++ b/backend/.sqlx/query-92b80a77d292ec734b097b815261ce0cd51d7f699ff296314c9801e115c52228.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET flow_status = JSONB_SET(JSONB_SET(flow_status, ARRAY['retry'], $1), ARRAY['modules', $3::TEXT, 'failed_retries'], $4)\n WHERE id = $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Jsonb", + "Uuid", + "Text", + "Jsonb" + ] + }, + "nullable": [] + }, + "hash": "92b80a77d292ec734b097b815261ce0cd51d7f699ff296314c9801e115c52228" +} diff --git a/backend/.sqlx/query-9bdad9fbe8990588d8d769d4a38e2397ee789f6732199a5259f5f4ee2c5a166d.json b/backend/.sqlx/query-9bdad9fbe8990588d8d769d4a38e2397ee789f6732199a5259f5f4ee2c5a166d.json new file mode 100644 index 0000000000000..b49d76d6cf091 --- /dev/null +++ b/backend/.sqlx/query-9bdad9fbe8990588d8d769d4a38e2397ee789f6732199a5259f5f4ee2c5a166d.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id AS \"id!\", result AS \"result: Json>\"\n FROM completed_job WHERE id = ANY($1) AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "result: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "UuidArray", + "Text" + ] + }, + "nullable": [ + true, + true + ] + }, + "hash": "9bdad9fbe8990588d8d769d4a38e2397ee789f6732199a5259f5f4ee2c5a166d" +} diff --git a/backend/.sqlx/query-9c34c717b218c09e3784a5413f7972e5e805dae837a075da4e503494624b518a.json b/backend/.sqlx/query-9c34c717b218c09e3784a5413f7972e5e805dae837a075da4e503494624b518a.json new file mode 100644 index 0000000000000..d22420d6e4d41 --- /dev/null +++ b/backend/.sqlx/query-9c34c717b218c09e3784a5413f7972e5e805dae837a075da4e503494624b518a.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET flow_status = jsonb_set(\n jsonb_set(flow_status, ARRAY['preprocessor_module', 'job'], to_jsonb($1::UUID::TEXT)),\n ARRAY['preprocessor_module', 'type'],\n to_jsonb('InProgress'::text)\n )\n WHERE id = $2 AND workspace_id = $3", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "9c34c717b218c09e3784a5413f7972e5e805dae837a075da4e503494624b518a" +} diff --git a/backend/.sqlx/query-9d616812c5a6ae514f047ce2d035a07ff11d13472c25533824dec93c41ea609c.json b/backend/.sqlx/query-9d616812c5a6ae514f047ce2d035a07ff11d13472c25533824dec93c41ea609c.json new file mode 100644 index 0000000000000..7471bb0bf3a2e --- /dev/null +++ b/backend/.sqlx/query-9d616812c5a6ae514f047ce2d035a07ff11d13472c25533824dec93c41ea609c.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT args AS \"args: Json>>\"\n FROM completed_job WHERE id = $1 AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "args: Json>>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true + ] + }, + "hash": "9d616812c5a6ae514f047ce2d035a07ff11d13472c25533824dec93c41ea609c" +} diff --git a/backend/.sqlx/query-a91798f58fa5948cd1739df4fa2e07cbb3eb08c5d2d22b057796e1156ae2a122.json b/backend/.sqlx/query-a91798f58fa5948cd1739df4fa2e07cbb3eb08c5d2d22b057796e1156ae2a122.json new file mode 100644 index 0000000000000..3dfa6d79e2cee --- /dev/null +++ b/backend/.sqlx/query-a91798f58fa5948cd1739df4fa2e07cbb3eb08c5d2d22b057796e1156ae2a122.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT leaf_jobs->$1::text AS \"leaf_jobs: Json>\", parent_job\n FROM queue\n WHERE COALESCE((SELECT root_job FROM queue WHERE id = $2), $2) = id AND workspace_id = $3", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "leaf_jobs: Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 1, + "name": "parent_job", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Text", + "Uuid", + "Text" + ] + }, + "nullable": [ + null, + true + ] + }, + "hash": "a91798f58fa5948cd1739df4fa2e07cbb3eb08c5d2d22b057796e1156ae2a122" +} diff --git a/backend/.sqlx/query-b0890c1bac6931d848afd88539a0b766a018957c1a325940df8914b28df60aca.json b/backend/.sqlx/query-b0890c1bac6931d848afd88539a0b766a018957c1a325940df8914b28df60aca.json new file mode 100644 index 0000000000000..15b7b27002e71 --- /dev/null +++ b/backend/.sqlx/query-b0890c1bac6931d848afd88539a0b766a018957c1a325940df8914b28df60aca.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT tag AS \"tag!\", count(*) AS \"count!\" FROM queue WHERE\n scheduled_for <= now() - ('3 seconds')::interval AND running = false\n GROUP BY tag", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tag!", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "count!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + true, + null + ] + }, + "hash": "b0890c1bac6931d848afd88539a0b766a018957c1a325940df8914b28df60aca" +} diff --git a/backend/.sqlx/query-b3e41eaff54c5da5e38cff785c17b2d9e014be9d0794e72dc8566485e61492cd.json b/backend/.sqlx/query-b3e41eaff54c5da5e38cff785c17b2d9e014be9d0794e72dc8566485e61492cd.json new file mode 100644 index 0000000000000..ce125516d25ef --- /dev/null +++ b/backend/.sqlx/query-b3e41eaff54c5da5e38cff785c17b2d9e014be9d0794e72dc8566485e61492cd.json @@ -0,0 +1,41 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n script_path, args AS \"args: sqlx::types::Json>>\",\n tag AS \"tag!\", priority\n FROM completed_job\n WHERE id = $1 and workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "script_path", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "args: sqlx::types::Json>>", + "type_info": "Jsonb" + }, + { + "ordinal": 2, + "name": "tag!", + "type_info": "Varchar" + }, + { + "ordinal": 3, + "name": "priority", + "type_info": "Int2" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true, + true, + true + ] + }, + "hash": "b3e41eaff54c5da5e38cff785c17b2d9e014be9d0794e72dc8566485e61492cd" +} diff --git a/backend/.sqlx/query-c7d595d2a12228c49359440ca3a9622f1de5f5ee4bbe5d2b23f6fdb6379cebf3.json b/backend/.sqlx/query-c7d595d2a12228c49359440ca3a9622f1de5f5ee4bbe5d2b23f6fdb6379cebf3.json new file mode 100644 index 0000000000000..a3d8c2502f908 --- /dev/null +++ b/backend/.sqlx/query-c7d595d2a12228c49359440ca3a9622f1de5f5ee4bbe5d2b23f6fdb6379cebf3.json @@ -0,0 +1,47 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE token SET last_used_at = now() WHERE\n token = $1\n AND (expiration > NOW() OR expiration IS NULL)\n AND (workspace_id IS NULL OR workspace_id = $2)\n RETURNING owner, email, super_admin, scopes, label", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "owner", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "email", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "super_admin", + "type_info": "Bool" + }, + { + "ordinal": 3, + "name": "scopes", + "type_info": "TextArray" + }, + { + "ordinal": 4, + "name": "label", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + true, + true, + false, + true, + true + ] + }, + "hash": "c7d595d2a12228c49359440ca3a9622f1de5f5ee4bbe5d2b23f6fdb6379cebf3" +} diff --git a/backend/.sqlx/query-cf80f068b6a8906939f7ea0f1a8311fdabf78d6d5bd12e71070b1dae24df2352.json b/backend/.sqlx/query-cf80f068b6a8906939f7ea0f1a8311fdabf78d6d5bd12e71070b1dae24df2352.json new file mode 100644 index 0000000000000..7373aec488189 --- /dev/null +++ b/backend/.sqlx/query-cf80f068b6a8906939f7ea0f1a8311fdabf78d6d5bd12e71070b1dae24df2352.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET flow_status = JSONB_SET(flow_status, ARRAY['modules', $1::TEXT, 'approvers'], $2)\n WHERE id = $3", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Jsonb", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "cf80f068b6a8906939f7ea0f1a8311fdabf78d6d5bd12e71070b1dae24df2352" +} diff --git a/backend/.sqlx/query-d74d3511d394c7ab2931c413e5ae87df1799a0ea64822449350abab02ab570be.json b/backend/.sqlx/query-d74d3511d394c7ab2931c413e5ae87df1799a0ea64822449350abab02ab570be.json new file mode 100644 index 0000000000000..e4f5aed798a9c --- /dev/null +++ b/backend/.sqlx/query-d74d3511d394c7ab2931c413e5ae87df1799a0ea64822449350abab02ab570be.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET flow_status = JSONB_SET(flow_status, ARRAY['approval_conditions'], $1)\n WHERE id = $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Jsonb", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "d74d3511d394c7ab2931c413e5ae87df1799a0ea64822449350abab02ab570be" +} diff --git a/backend/.sqlx/query-d7ce28c7cbd4974c72969858659a2a5c7448c919ae522e91332fa9a6212f5ddf.json b/backend/.sqlx/query-d7ce28c7cbd4974c72969858659a2a5c7448c919ae522e91332fa9a6212f5ddf.json new file mode 100644 index 0000000000000..d1b5b0030c44b --- /dev/null +++ b/backend/.sqlx/query-d7ce28c7cbd4974c72969858659a2a5c7448c919ae522e91332fa9a6212f5ddf.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET flow_status = JSONB_SET(flow_status, ARRAY['retry'], $1)\n WHERE id = $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Jsonb", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "d7ce28c7cbd4974c72969858659a2a5c7448c919ae522e91332fa9a6212f5ddf" +} diff --git a/backend/.sqlx/query-d91a447f3abcd39559d614ab7d423d0287bd34e463967fbaf0a3d590b59c9865.json b/backend/.sqlx/query-d91a447f3abcd39559d614ab7d423d0287bd34e463967fbaf0a3d590b59c9865.json new file mode 100644 index 0000000000000..bce712700f556 --- /dev/null +++ b/backend/.sqlx/query-d91a447f3abcd39559d614ab7d423d0287bd34e463967fbaf0a3d590b59c9865.json @@ -0,0 +1,75 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n result AS \"result: sqlx::types::Json>\", success AS \"success!\",\n language AS \"language: ScriptLang\",\n flow_status AS \"flow_status: sqlx::types::Json>\",\n created_by AS \"created_by!\"\n FROM completed_job\n WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "result: sqlx::types::Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 1, + "name": "success!", + "type_info": "Bool" + }, + { + "ordinal": 2, + "name": "language: ScriptLang", + "type_info": { + "Custom": { + "name": "script_lang", + "kind": { + "Enum": [ + "python3", + "deno", + "go", + "bash", + "postgresql", + "nativets", + "bun", + "mysql", + "bigquery", + "snowflake", + "graphql", + "powershell", + "mssql", + "php", + "bunnative", + "rust", + "ansible", + "csharp", + "oracledb" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "flow_status: sqlx::types::Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 4, + "name": "created_by!", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text", + "TextArray" + ] + }, + "nullable": [ + true, + true, + true, + true, + true + ] + }, + "hash": "d91a447f3abcd39559d614ab7d423d0287bd34e463967fbaf0a3d590b59c9865" +} diff --git a/backend/.sqlx/query-df533f1988e409b70a3e0966825d01993cd52e8e85943440081b8dbd3b9ae5a4.json b/backend/.sqlx/query-df533f1988e409b70a3e0966825d01993cd52e8e85943440081b8dbd3b9ae5a4.json new file mode 100644 index 0000000000000..a4f3b9678286e --- /dev/null +++ b/backend/.sqlx/query-df533f1988e409b70a3e0966825d01993cd52e8e85943440081b8dbd3b9ae5a4.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET flow_status = flow_status - 'retry'\n WHERE id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "df533f1988e409b70a3e0966825d01993cd52e8e85943440081b8dbd3b9ae5a4" +} diff --git a/backend/.sqlx/query-ef8868893643a1a71531c1113d5cb38c5c204b3bc34c921b2f653c738af556a9.json b/backend/.sqlx/query-ef8868893643a1a71531c1113d5cb38c5c204b3bc34c921b2f653c738af556a9.json new file mode 100644 index 0000000000000..055944bd706a8 --- /dev/null +++ b/backend/.sqlx/query-ef8868893643a1a71531c1113d5cb38c5c204b3bc34c921b2f653c738af556a9.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id AS \"id!\", flow_status AS \"flow_status!: Json\"\n FROM completed_job WHERE id = $1 AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "flow_status!: Json", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true + ] + }, + "hash": "ef8868893643a1a71531c1113d5cb38c5c204b3bc34c921b2f653c738af556a9" +} diff --git a/backend/.sqlx/query-f7b1445ec1f0d86efb6f8e0939430e7294bcac06bb7930dcf4d46427571662cb.json b/backend/.sqlx/query-f7b1445ec1f0d86efb6f8e0939430e7294bcac06bb7930dcf4d46427571662cb.json new file mode 100644 index 0000000000000..5948eedef5160 --- /dev/null +++ b/backend/.sqlx/query-f7b1445ec1f0d86efb6f8e0939430e7294bcac06bb7930dcf4d46427571662cb.json @@ -0,0 +1,35 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue SET mem_peak = $1, last_ping = now()\n WHERE id = $2\n RETURNING canceled AS \"canceled!\", canceled_by, canceled_reason", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "canceled!", + "type_info": "Bool" + }, + { + "ordinal": 1, + "name": "canceled_by", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "canceled_reason", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Int4", + "Uuid" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "f7b1445ec1f0d86efb6f8e0939430e7294bcac06bb7930dcf4d46427571662cb" +} diff --git a/backend/.sqlx/query-ff14230469026418966ec79b77f549b2fb27c90556484f3914666d5ad7f8f107.json b/backend/.sqlx/query-ff14230469026418966ec79b77f549b2fb27c90556484f3914666d5ad7f8f107.json new file mode 100644 index 0000000000000..1744aa7167e69 --- /dev/null +++ b/backend/.sqlx/query-ff14230469026418966ec79b77f549b2fb27c90556484f3914666d5ad7f8f107.json @@ -0,0 +1,70 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n result #> $3 AS \"result: sqlx::types::Json>\",\n flow_status AS \"flow_status: sqlx::types::Json>\",\n language AS \"language: ScriptLang\",\n created_by AS \"created_by!\"\n FROM completed_job\n WHERE id = $1 AND workspace_id = $2 AND ($4::text[] IS NULL OR tag = ANY($4))", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "result: sqlx::types::Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 1, + "name": "flow_status: sqlx::types::Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 2, + "name": "language: ScriptLang", + "type_info": { + "Custom": { + "name": "script_lang", + "kind": { + "Enum": [ + "python3", + "deno", + "go", + "bash", + "postgresql", + "nativets", + "bun", + "mysql", + "bigquery", + "snowflake", + "graphql", + "powershell", + "mssql", + "php", + "bunnative", + "rust", + "ansible", + "csharp", + "oracledb" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "created_by!", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text", + "TextArray", + "TextArray" + ] + }, + "nullable": [ + null, + true, + true, + true + ] + }, + "hash": "ff14230469026418966ec79b77f549b2fb27c90556484f3914666d5ad7f8f107" +} diff --git a/backend/src/monitor.rs b/backend/src/monitor.rs index 4a279259038b2..30d7dedda92a3 100644 --- a/backend/src/monitor.rs +++ b/backend/src/monitor.rs @@ -19,6 +19,7 @@ use tokio::{ join, sync::{mpsc, RwLock}, }; +use uuid::Uuid; #[cfg(feature = "embedding")] use windmill_api::embeddings::update_embeddings_db; @@ -36,7 +37,7 @@ use windmill_common::{ auth::JWT_SECRET, ee::CriticalErrorChannel, error, - flow_status::FlowStatusModule, + flow_status::{FlowStatus, FlowStatusModule}, global_settings::{ BASE_URL_SETTING, BUNFIG_INSTALL_SCOPES_SETTING, CRITICAL_ALERT_MUTE_UI_SETTING, CRITICAL_ERROR_CHANNELS_SETTING, DEFAULT_TAGS_PER_WORKSPACE_SETTING, @@ -1625,20 +1626,28 @@ async fn handle_zombie_jobs(db: &Pool, base_internal_url: &str, worker } async fn handle_zombie_flows(db: &DB) -> error::Result<()> { - let flows = sqlx::query_as::<_, QueuedJob>( + let flows = sqlx::query!( r#" - SELECT * + SELECT + id AS "id!", workspace_id AS "workspace_id!", parent_job, is_flow_step, + flow_status AS "flow_status: Box", last_ping, same_worker FROM queue - WHERE running = true AND suspend = 0 AND suspend_until IS null AND scheduled_for <= now() AND (job_kind = 'flow' OR job_kind = 'flowpreview' OR job_kind = 'flownode') - AND last_ping IS NOT NULL AND last_ping < NOW() - ($1 || ' seconds')::interval AND canceled = false + WHERE running = true AND suspend = 0 AND suspend_until IS null AND scheduled_for <= now() + AND (job_kind = 'flow' OR job_kind = 'flowpreview' OR job_kind = 'flownode') + AND last_ping IS NOT NULL AND last_ping < NOW() - ($1 || ' seconds')::interval + AND canceled = false "#, - ).bind(FLOW_ZOMBIE_TRANSITION_TIMEOUT.as_str()) + FLOW_ZOMBIE_TRANSITION_TIMEOUT.as_str() + ) .fetch_all(db) .await?; for flow in flows { - let status = flow.parse_flow_status(); - if !flow.same_worker + let status = flow + .flow_status + .as_deref() + .and_then(|x| serde_json::from_str::(x).ok()); + if !flow.same_worker.unwrap_or(false) && status.is_some_and(|s| { s.modules .get(0) @@ -1664,44 +1673,39 @@ async fn handle_zombie_flows(db: &DB) -> error::Result<()> { let now = now_from_db(db).await?; let reason = format!( "{} was hanging in between 2 steps. Last ping: {last_ping:?} (now: {now})", - if flow.is_flow_step && flow.parent_job.is_some() { + if flow.is_flow_step.unwrap_or(false) && flow.parent_job.is_some() { format!("Flow was cancelled because subflow {id}") } else { format!("Flow {id} was cancelled because it") } ); report_critical_error(reason.clone(), db.clone(), Some(&flow.workspace_id), None).await; - cancel_zombie_flow_job(db, flow, reason).await?; + cancel_zombie_flow_job(db, flow.id, &flow.workspace_id, reason).await?; } } let flows2 = sqlx::query!( - " - DELETE - FROM parallel_monitor_lock - WHERE last_ping IS NOT NULL AND last_ping < NOW() - ($1 || ' seconds')::interval - RETURNING parent_flow_id, job_id, last_ping - ", + r#" + DELETE + FROM parallel_monitor_lock + WHERE last_ping IS NOT NULL AND last_ping < NOW() - ($1 || ' seconds')::interval + RETURNING parent_flow_id, job_id, last_ping, (SELECT workspace_id FROM queue q + WHERE q.id = parent_flow_id AND q.running = true AND q.canceled = false) AS workspace_id + "#, FLOW_ZOMBIE_TRANSITION_TIMEOUT.as_str() ) .fetch_all(db) .await?; for flow in flows2 { - let in_queue = sqlx::query_as::<_, QueuedJob>( - "SELECT * FROM queue WHERE id = $1 AND running = true AND canceled = false", - ) - .bind(flow.parent_flow_id) - .fetch_optional(db) - .await?; - if let Some(job) = in_queue { + if let Some(parent_flow_workspace_id) = flow.workspace_id { tracing::error!( "parallel Zombie flow detected: {} in workspace {}. Last ping was: {:?}.", - job.id, - job.workspace_id, + flow.parent_flow_id, + parent_flow_workspace_id, flow.last_ping ); - cancel_zombie_flow_job(db, job, + cancel_zombie_flow_job(db, flow.parent_flow_id, &parent_flow_workspace_id, format!("Flow {} cancelled as one of the parallel branch {} was unable to make the last transition ", flow.parent_flow_id, flow.job_id)) .await?; } else { @@ -1713,27 +1717,28 @@ async fn handle_zombie_flows(db: &DB) -> error::Result<()> { async fn cancel_zombie_flow_job( db: &Pool, - flow: QueuedJob, + id: Uuid, + workspace_id: &str, message: String, ) -> Result<(), error::Error> { - let tx = db.begin().await.unwrap(); + let mut tx = db.begin().await?; tracing::error!( "zombie flow detected: {} in workspace {}. Cancelling it.", - flow.id, - flow.workspace_id + id, + workspace_id ); - let (ntx, _) = cancel_job( + (tx, _) = cancel_job( "monitor", Some(message), - flow.id, - flow.workspace_id.as_str(), + id, + workspace_id, tx, db, true, false, ) .await?; - ntx.commit().await?; + tx.commit().await?; Ok(()) } diff --git a/backend/tests/worker.rs b/backend/tests/worker.rs index 001b15ac45399..3f1c3f16c8809 100644 --- a/backend/tests/worker.rs +++ b/backend/tests/worker.rs @@ -18,8 +18,6 @@ use tokio::time::{timeout, Duration}; use windmill_api_client::types::{CreateFlowBody, RawScript}; -use sqlx::query; - #[cfg(feature = "enterprise")] use windmill_api_client::types::{EditSchedule, NewSchedule, ScriptArgs}; @@ -3191,7 +3189,7 @@ async fn test_script_schedule_handlers(db: Pool) { let uuid = uuid.unwrap().unwrap(); let completed_job = - query!("SELECT script_path FROM completed_job WHERE id = $1", uuid) + sqlx::query!("SELECT script_path FROM completed_job WHERE id = $1", uuid) .fetch_one(&db2) .await .unwrap(); @@ -3259,7 +3257,7 @@ async fn test_script_schedule_handlers(db: Pool) { let uuid = uuid.unwrap().unwrap(); let completed_job = - query!("SELECT script_path FROM completed_job WHERE id = $1", uuid) + sqlx::query!("SELECT script_path FROM completed_job WHERE id = $1", uuid) .fetch_one(&db2) .await .unwrap(); @@ -3343,7 +3341,7 @@ async fn test_flow_schedule_handlers(db: Pool) { let uuid = uuid.unwrap().unwrap(); let completed_job = - query!("SELECT script_path FROM completed_job WHERE id = $1", uuid) + sqlx::query!("SELECT script_path FROM completed_job WHERE id = $1", uuid) .fetch_one(&db2) .await .unwrap(); @@ -3412,7 +3410,7 @@ async fn test_flow_schedule_handlers(db: Pool) { let uuid = uuid.unwrap().unwrap(); let completed_job = - query!("SELECT script_path FROM completed_job WHERE id = $1", uuid) + sqlx::query!("SELECT script_path FROM completed_job WHERE id = $1", uuid) .fetch_one(&db2) .await .unwrap(); @@ -3487,7 +3485,7 @@ async fn run_deployed_relative_imports( async move { completed.next().await; // deployed script - let script = query!( + let script = sqlx::query!( "SELECT hash FROM script WHERE path = $1", "f/system/test_import".to_string() ) diff --git a/backend/windmill-api/src/auth.rs b/backend/windmill-api/src/auth.rs index 64024f1ab12aa..71c62704778be 100644 --- a/backend/windmill-api/src/auth.rs +++ b/backend/windmill-api/src/auth.rs @@ -151,12 +151,16 @@ impl AuthCache { } } _ => { - let user_o = sqlx::query_as::<_, (Option, Option, bool, Option>, Option)>( - "UPDATE token SET last_used_at = now() WHERE token = $1 AND (expiration > NOW() \ - OR expiration IS NULL) AND (workspace_id IS NULL OR workspace_id = $2) RETURNING owner, email, super_admin, scopes, label", + let user_o = sqlx::query!( + "UPDATE token SET last_used_at = now() WHERE + token = $1 + AND (expiration > NOW() OR expiration IS NULL) + AND (workspace_id IS NULL OR workspace_id = $2) + RETURNING owner, email, super_admin, scopes, label", + token, + w_id.as_ref(), ) - .bind(token) - .bind(w_id.as_ref()) + .map(|x| (x.owner, x.email, x.super_admin, x.scopes, x.label)) .fetch_optional(&self.db) .await .ok() @@ -251,12 +255,13 @@ impl AuthCache { (_, Some(email), super_admin, scopes, label) => { let username_override = username_override_from_label(label); if w_id.is_some() { - let row_o = sqlx::query_as::<_, (String, bool, bool)>( - "SELECT username, is_admin, operator FROM usr where email = $1 AND \ - workspace_id = $2 AND disabled = false", + let row_o = sqlx::query!( + "SELECT username, is_admin, operator FROM usr WHERE + email = $1 AND workspace_id = $2 AND disabled = false", + &email, + w_id.as_ref().unwrap() ) - .bind(&email) - .bind(&w_id.as_ref().unwrap()) + .map(|x| (x.username, x.is_admin, x.operator)) .fetch_optional(&self.db) .await .unwrap_or(Some(("error".to_string(), false, false))); diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 5dbd739f5e501..2eb119fd083c3 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -85,8 +85,8 @@ use windmill_common::{METRICS_DEBUG_ENABLED, METRICS_ENABLED}; use windmill_common::{get_latest_deployed_hash_for_path, BASE_URL}; use windmill_queue::{ - cancel_job, get_queued_job, get_result_and_success_by_id_from_flow, job_is_complete, push, - PushArgs, PushArgsOwned, PushIsolationLevel, + cancel_job, get_result_and_success_by_id_from_flow, job_is_complete, push, PushArgs, + PushArgsOwned, PushIsolationLevel, }; #[cfg(feature = "prometheus")] @@ -334,16 +334,21 @@ async fn get_root_job( Ok(Json(res)) } -async fn compute_root_job_for_flow(db: &DB, w_id: &str, job_id: Uuid) -> error::Result { - let mut job = get_queued_job(&job_id, w_id, db).await?; - while let Some(j) = job { - if let Some(uuid) = j.parent_job { - job = get_queued_job(&uuid, w_id, db).await?; - } else { - return Ok(j.id.to_string()); +async fn compute_root_job_for_flow(db: &DB, w_id: &str, mut job_id: Uuid) -> error::Result { + // TODO: use `root_job` ? + loop { + job_id = match sqlx::query_scalar!( + "SELECT parent_job FROM queue WHERE id = $1 AND workspace_id = $2", + job_id, + w_id + ) + .fetch_one(db) + .await + { + Ok(Some(job_id)) => job_id, + _ => return Ok(job_id.to_string()), } } - Ok(job_id.to_string()) } async fn get_db_clock(Extension(db): Extension) -> windmill_common::error::JsonResult { @@ -1085,29 +1090,23 @@ async fn get_job_logs( } } -#[derive(FromRow)] -pub struct RawArgs { - pub args: Option>>, - pub created_by: String, -} - async fn get_args( OptAuthed(opt_authed): OptAuthed, Extension(db): Extension, Path((w_id, id)): Path<(String, Uuid)>, -) -> error::JsonResult> { +) -> JsonResult> { let tags = opt_authed .as_ref() .map(|authed| get_scope_tags(authed)) .flatten(); - let record = sqlx::query_as::<_, RawArgs>( - "SELECT created_by, args + let record = sqlx::query!( + "SELECT created_by AS \"created_by!\", args as \"args: sqlx::types::Json>\" FROM completed_job - WHERE completed_job.id = $1 AND completed_job.workspace_id = $2 AND ($3::text[] IS NULL OR completed_job.tag = ANY($3))", + WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + id, + &w_id, + tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>, ) - .bind(&id) - .bind(&w_id) - .bind(tags.as_ref().map(|v| v.as_slice())) .fetch_optional(&db) .await?; @@ -1122,14 +1121,14 @@ async fn get_args( Ok(Json(record.args.map(|x| x.0).unwrap_or_default())) } else { - let record = sqlx::query_as::<_, RawArgs>( - "SELECT created_by, args + let record = sqlx::query!( + "SELECT created_by AS \"created_by!\", args as \"args: sqlx::types::Json>\" FROM queue - WHERE queue.id = $1 AND queue.workspace_id = $2 AND ($3::text[] IS NULL OR queue.tag = ANY($3))", + WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + id, + &w_id, + tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>, ) - .bind(&id) - .bind(&w_id) - .bind(tags.as_ref().map(|v| v.as_slice())) .fetch_optional(&db) .await?; let record = not_found_if_none(record, "Job Args", id.to_string())?; @@ -3134,11 +3133,15 @@ pub async fn restart_flow( check_license_key_valid().await?; let mut tx = user_db.clone().begin(&authed).await?; - let completed_job = sqlx::query_as::<_, CompletedJob>( - "SELECT *, result->'wm_labels' as labels from completed_job WHERE id = $1 and workspace_id = $2", + let completed_job = sqlx::query!( + "SELECT + script_path, args AS \"args: sqlx::types::Json>>\", + tag AS \"tag!\", priority + FROM completed_job + WHERE id = $1 and workspace_id = $2", + job_id, + &w_id, ) - .bind(job_id) - .bind(&w_id) .fetch_optional(&mut *tx) .await? .with_context(|| "Unable to find completed job with the given job UUID")?; @@ -3164,11 +3167,7 @@ pub async fn restart_flow( &db, tx, &w_id, - JobPayload::RestartedFlow { - completed_job_id: job_id, - step_id: step_id, - branch_or_iteration_n: branch_or_iteration_n, - }, + JobPayload::RestartedFlow { completed_job_id: job_id, step_id, branch_or_iteration_n }, push_args, &authed.username, &authed.email, @@ -3569,11 +3568,17 @@ pub async fn run_wait_result( } if result.is_none() { - let row = sqlx::query_as::<_, RawResultWithSuccess>( - "SELECT '' as created_by, result, language, flow_status, success FROM completed_job WHERE id = $1 AND workspace_id = $2", + let row = sqlx::query!( + "SELECT + result AS \"result: sqlx::types::Json>\", + language AS \"language: ScriptLang\", + flow_status AS \"flow_status: sqlx::types::Json>\", + success AS \"success!\" + FROM completed_job + WHERE id = $1 AND workspace_id = $2", + uuid, + &w_id ) - .bind(uuid) - .bind(&w_id) .fetch_optional(db) .await?; if let Some(mut raw_result) = row { @@ -5131,29 +5136,33 @@ async fn get_job_update( Path((w_id, job_id)): Path<(String, Uuid)>, Query(JobUpdateQuery { running, log_offset, get_progress }): Query, ) -> error::JsonResult { - let record = sqlx::query_as::<_, JobUpdateRow>( - "SELECT running, substr(concat(coalesce(queue.logs, ''), job_logs.logs), greatest($1 - job_logs.log_offset, 0)) as logs, mem_peak, - CASE WHEN is_flow_step is true then NULL else flow_status END as flow_status, - job_logs.log_offset + char_length(job_logs.logs) + 1 as log_offset, created_by + let record = sqlx::query!( + "SELECT + running AS \"running!\", + substr(concat(coalesce(queue.logs, ''), job_logs.logs), greatest($1 - job_logs.log_offset, 0)) AS logs, + mem_peak, + CASE WHEN is_flow_step is true then NULL else flow_status END AS \"flow_status: sqlx::types::Json>\", + job_logs.log_offset + char_length(job_logs.logs) + 1 AS log_offset, + created_by AS \"created_by!\" FROM queue LEFT JOIN job_logs ON job_logs.job_id = queue.id WHERE queue.workspace_id = $2 AND queue.id = $3", + log_offset, + &w_id, + job_id ) - .bind(log_offset) - .bind(&w_id) - .bind(&job_id) .fetch_optional(&db) .await?; let progress: Option = if get_progress == Some(true) { sqlx::query_scalar!( - "SELECT scalar_int FROM job_stats WHERE workspace_id = $1 AND job_id = $2 AND metric_id = $3", - &w_id, - job_id, - "progress_perc" - ) - .fetch_optional(&db) - .await?.and_then(|inner| inner) + "SELECT scalar_int FROM job_stats WHERE workspace_id = $1 AND job_id = $2 AND metric_id = $3", + &w_id, + job_id, + "progress_perc" + ) + .fetch_optional(&db) + .await?.and_then(|inner| inner) } else { None }; @@ -5181,17 +5190,20 @@ async fn get_job_update( .map(|x: sqlx::types::Json>| x.0), })) } else { - let record = sqlx::query_as::<_, JobUpdateRow>( - "SELECT false as running, substr(concat(coalesce(completed_job.logs, ''), job_logs.logs), greatest($1 - job_logs.log_offset, 0)) as logs, mem_peak, - CASE WHEN is_flow_step is true then NULL else flow_status END as flow_status, - job_logs.log_offset + char_length(job_logs.logs) + 1 as log_offset, created_by - FROM completed_job - LEFT JOIN job_logs ON job_logs.job_id = completed_job.id - WHERE completed_job.workspace_id = $2 AND id = $3", + let record = sqlx::query!( + "SELECT + substr(concat(coalesce(completed_job.logs, ''), job_logs.logs), greatest($1 - job_logs.log_offset, 0)) AS logs, + mem_peak, + CASE WHEN is_flow_step is true then NULL else flow_status END AS \"flow_status: sqlx::types::Json>\", + job_logs.log_offset + char_length(job_logs.logs) + 1 AS log_offset, + created_by AS \"created_by!\" + FROM completed_job + LEFT JOIN job_logs ON job_logs.job_id = completed_job.id + WHERE completed_job.workspace_id = $2 AND completed_job.id = $3", + log_offset, + &w_id, + job_id ) - .bind(log_offset) - .bind(&w_id) - .bind(&job_id) .fetch_optional(&db) .await?; if let Some(record) = record { @@ -5482,15 +5494,6 @@ pub struct RawResult { pub created_by: Option, } -#[derive(FromRow)] -pub struct RawResultWithSuccess { - pub result: Option>>, - pub flow_status: Option>>, - pub language: Option, - pub success: bool, - pub created_by: String, -} - async fn get_completed_job_result( OptAuthed(opt_authed): OptAuthed, Extension(db): Extension, @@ -5502,27 +5505,38 @@ async fn get_completed_job_result( .map(|authed| get_scope_tags(authed)) .flatten(); let result_o = if let Some(json_path) = json_path { - sqlx::query_as::<_, RawResult>( - "SELECT result #> $3 as result, flow_status, language, created_by FROM completed_job WHERE id = $1 AND workspace_id = $2 AND ($4::text[] IS NULL OR tag = ANY($4))", - ) - .bind(id) - .bind(&w_id) - .bind( - json_path - .split(".") - .map(|x| x.to_string()) - .collect::>(), + sqlx::query_as!( + RawResult, + "SELECT + result #> $3 AS \"result: sqlx::types::Json>\", + flow_status AS \"flow_status: sqlx::types::Json>\", + language AS \"language: ScriptLang\", + created_by AS \"created_by!\" + FROM completed_job + WHERE id = $1 AND workspace_id = $2 AND ($4::text[] IS NULL OR tag = ANY($4))", + id, + &w_id, + json_path.split(".").collect::>() as Vec<&str>, + tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>, ) - .bind(tags.as_ref().map(|v| v.as_slice())) .fetch_optional(&db) .await? } else { - sqlx::query_as::<_, RawResult>("SELECT result, flow_status, language, created_by FROM completed_job WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))") - .bind(id) - .bind(&w_id) - .bind(tags.as_ref().map(|v| v.as_slice())) - .fetch_optional(&db) - .await? + sqlx::query_as!( + RawResult, + "SELECT + result AS \"result: sqlx::types::Json>\", + flow_status AS \"flow_status: sqlx::types::Json>\", + language AS \"language: ScriptLang\", + created_by AS \"created_by!\" + FROM completed_job + WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + id, + &w_id, + tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>, + ) + .fetch_optional(&db) + .await? }; let mut raw_result = not_found_if_none(result_o, "Completed Job", id.to_string())?; @@ -5637,12 +5651,18 @@ async fn get_completed_job_result_maybe( .as_ref() .map(|authed| get_scope_tags(authed)) .flatten(); - let result_o = sqlx::query_as::<_, RawResultWithSuccess>( - "SELECT result, success, language, flow_status, created_by FROM completed_job WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + let result_o = sqlx::query!( + "SELECT + result AS \"result: sqlx::types::Json>\", success AS \"success!\", + language AS \"language: ScriptLang\", + flow_status AS \"flow_status: sqlx::types::Json>\", + created_by AS \"created_by!\" + FROM completed_job + WHERE id = $1 AND workspace_id = $2 AND ($3::text[] IS NULL OR tag = ANY($3))", + id, + &w_id, + tags.as_ref().map(|v| v.as_slice()) as Option<&[&str]>, ) - .bind(id) - .bind(&w_id) - .bind(tags.as_ref().map(|v| v.as_slice())) .fetch_optional(&db) .await?; diff --git a/backend/windmill-api/src/resources.rs b/backend/windmill-api/src/resources.rs index 88f8d79c29e46..8848ed866feb1 100644 --- a/backend/windmill-api/src/resources.rs +++ b/backend/windmill-api/src/resources.rs @@ -31,7 +31,6 @@ use windmill_audit::ActionKind; use windmill_common::{ db::UserDB, error::{Error, JsonResult, Result}, - jobs::QueuedJob, utils::{not_found_if_none, paginate, require_admin, Pagination, StripPath}, variables, }; @@ -537,16 +536,23 @@ pub async fn transform_json_value<'c>( } Value::String(y) if y.starts_with("$") && job_id.is_some() => { let mut tx = authed_transaction_or_default(authed, user_db.clone(), db).await?; - let job = sqlx::query_as::<_, QueuedJob>( - "SELECT * FROM queue WHERE id = $1 AND workspace_id = $2", + let job_id = job_id.unwrap(); + let job = sqlx::query!( + "SELECT + email AS \"email!\", + created_by AS \"created_by!\", + parent_job, permissioned_as AS \"permissioned_as!\", + script_path, schedule_path, flow_step_id, root_job, + scheduled_for AS \"scheduled_for!: chrono::DateTime\" + FROM queue WHERE id = $1 AND workspace_id = $2", + job_id, + workspace ) - .bind(job_id.unwrap()) - .bind(workspace) .fetch_optional(&mut *tx) .await?; tx.commit().await?; - let job = not_found_if_none(job, "Job", job_id.unwrap().to_string())?; + let job = not_found_if_none(job, "Job", job_id.to_string())?; let flow_path = if let Some(uuid) = job.parent_job { let mut tx: Transaction<'_, Postgres> = @@ -563,11 +569,11 @@ pub async fn transform_json_value<'c>( let variables = variables::get_reserved_variables( db, - &job.workspace_id, + workspace, token, &job.email, &job.created_by, - &job.id.to_string(), + &job_id.to_string(), &job.permissioned_as, job.script_path.clone(), job.parent_job.map(|x| x.to_string()), diff --git a/backend/windmill-api/src/websocket_triggers.rs b/backend/windmill-api/src/websocket_triggers.rs index d6260691c7e3d..f5a2884214406 100644 --- a/backend/windmill-api/src/websocket_triggers.rs +++ b/backend/windmill-api/src/websocket_triggers.rs @@ -558,17 +558,12 @@ async fn wait_runnable_result( .into()); } - #[derive(sqlx::FromRow)] - struct RawResult { - result: Option>>, - success: bool, - } - - let result = sqlx::query_as::<_, RawResult>( - "SELECT result, success FROM completed_job WHERE id = $1 AND workspace_id = $2", + let result = sqlx::query!( + "SELECT result AS \"result: SqlxJson>\", success AS \"success!\" + FROM completed_job WHERE id = $1 AND workspace_id = $2", + Uuid::parse_str(&job_id)?, + workspace_id ) - .bind(Uuid::parse_str(&job_id).unwrap()) - .bind(workspace_id) .fetch_optional(db) .await; diff --git a/backend/windmill-common/src/queue.rs b/backend/windmill-common/src/queue.rs index 6cbb5611c55ea..3b427e9bb44fd 100644 --- a/backend/windmill-common/src/queue.rs +++ b/backend/windmill-common/src/queue.rs @@ -3,14 +3,14 @@ use std::collections::HashMap; use sqlx::{Pool, Postgres}; pub async fn get_queue_counts(db: &Pool) -> HashMap { - sqlx::query_as::<_, (String, i64)>( - "SELECT tag, count(*) as count FROM queue WHERE + sqlx::query!( + "SELECT tag AS \"tag!\", count(*) AS \"count!\" FROM queue WHERE scheduled_for <= now() - ('3 seconds')::interval AND running = false GROUP BY tag", ) .fetch_all(db) .await .ok() - .map(|v| v.into_iter().map(|(k, v)| (k, v as u32)).collect()) + .map(|v| v.into_iter().map(|x| (x.tag, x.count as u32)).collect()) .unwrap_or_else(|| HashMap::new()) } diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 7205d1571aefe..818e7b534d327 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -40,7 +40,7 @@ use windmill_common::{ add_virtual_items_if_necessary, FlowModule, FlowModuleValue, FlowValue, InputTransform, }, jobs::{ - get_payload_tag_from_prefixed_path, CompletedJob, JobKind, JobPayload, QueuedJob, RawCode, + get_payload_tag_from_prefixed_path, JobKind, JobPayload, QueuedJob, RawCode, ENTRYPOINT_OVERRIDE, PREPROCESSOR_FAKE_ENTRYPOINT, }, schedule::Schedule, @@ -1381,15 +1381,23 @@ async fn apply_schedule_handlers<'a, 'c, T: Serialize + Send + Sync>( let times = schedule.on_failure_times.unwrap_or(1).max(1); let exact = schedule.on_failure_exact.unwrap_or(false); if times > 1 || exact { - let past_jobs = sqlx::query_as::<_, CompletedJobSubset>( - "SELECT success, result, started_at FROM completed_job WHERE workspace_id = $1 AND schedule_path = $2 AND script_path = $3 AND id != $4 ORDER BY created_at DESC LIMIT $5", + let past_jobs = sqlx::query!( + "SELECT + success AS \"success!\", + result AS \"result: Json>\", + started_at AS \"started_at!\" + FROM completed_job + WHERE workspace_id = $1 AND schedule_path = $2 AND script_path = $3 AND id != $4 + ORDER BY created_at DESC + LIMIT $5", + &schedule.workspace_id, + &schedule.path, + script_path, + job_id, + if exact { times } else { times - 1 } as i64 ) - .bind(&schedule.workspace_id) - .bind(&schedule.path) - .bind(script_path) - .bind(job_id) - .bind(if exact { times } else { times - 1 } as i64,) - .fetch_all(db).await?; + .fetch_all(db) + .await?; let match_times = if exact { past_jobs.len() == times as usize @@ -1444,15 +1452,22 @@ async fn apply_schedule_handlers<'a, 'c, T: Serialize + Send + Sync>( if let Some(ref on_recovery_path) = schedule.on_recovery.clone() { let tx = db.begin().await?; let times = schedule.on_recovery_times.unwrap_or(1).max(1); - let past_jobs = sqlx::query_as::<_, CompletedJobSubset>( - "SELECT success, result, started_at FROM completed_job WHERE workspace_id = $1 AND schedule_path = $2 AND script_path = $3 AND id != $4 ORDER BY created_at DESC LIMIT $5", + let past_jobs = sqlx::query!( + "SELECT + success AS \"success!\", + result AS \"result: Json>\", + started_at AS \"started_at!\"\ + FROM completed_job WHERE workspace_id = $1 AND schedule_path = $2 AND script_path = $3 AND id != $4 + ORDER BY created_at DESC + LIMIT $5", + &schedule.workspace_id, + &schedule.path, + script_path, + job_id, + times as i64 ) - .bind(&schedule.workspace_id) - .bind(&schedule.path) - .bind(script_path) - .bind(job_id) - .bind(times as i64) - .fetch_all(db).await?; + .fetch_all(db) + .await?; if past_jobs.len() < times as usize { return Ok(()); @@ -1464,7 +1479,7 @@ async fn apply_schedule_handlers<'a, 'c, T: Serialize + Send + Sync>( return Ok(()); } - let failed_job = past_jobs[past_jobs.len() - 1].clone(); + let failed_job = &past_jobs[past_jobs.len() - 1]; if !failed_job.success { handle_recovered_schedule( @@ -1476,7 +1491,8 @@ async fn apply_schedule_handlers<'a, 'c, T: Serialize + Send + Sync>( schedule.is_flow, w_id, &on_recovery_path, - failed_job, + failed_job.result.as_ref().map(AsRef::as_ref), + failed_job.started_at, result, times, started_at, @@ -1622,7 +1638,8 @@ async fn handle_recovered_schedule<'a, 'c, T: Serialize + Send + Sync>( is_flow: bool, w_id: &str, on_recovery_path: &str, - error_job: CompletedJobSubset, + result: Option<&Box>, + started_at: DateTime, successful_job_result: Json<&'a T>, successful_times: i32, successful_job_started_at: DateTime, @@ -1632,10 +1649,7 @@ async fn handle_recovered_schedule<'a, 'c, T: Serialize + Send + Sync>( get_payload_tag_from_prefixed_path(on_recovery_path, db, w_id).await?; let mut extra = HashMap::new(); - extra.insert( - "error_started_at".to_string(), - to_raw_value(&error_job.started_at), - ); + extra.insert("error_started_at".to_string(), to_raw_value(&started_at)); extra.insert("schedule_path".to_string(), to_raw_value(&schedule_path)); extra.insert("path".to_string(), to_raw_value(&script_path)); extra.insert("is_flow".to_string(), to_raw_value(&is_flow)); @@ -1661,9 +1675,8 @@ async fn handle_recovered_schedule<'a, 'c, T: Serialize + Send + Sync>( } } - let args = error_job - .result - .and_then(|x| serde_json::from_str::>>(x.0.get()).ok()) + let args = result + .and_then(|x| serde_json::from_str::>>(x.get()).ok()) .unwrap_or_else(HashMap::new); let (email, permissioned_as) = if let Some(on_behalf_of) = on_behalf_of.as_ref() { @@ -2193,17 +2206,6 @@ fn fullpath_with_workspace( ) } -#[derive(FromRow)] -pub struct ResultR { - result: Option>>, -} - -#[derive(FromRow)] -pub struct ResultWithId { - result: Option>>, - id: Uuid, -} - pub async fn get_result_by_id( db: Pool, w_id: String, @@ -2222,25 +2224,29 @@ pub async fn get_result_by_id( { Ok(res) => Ok(res), Err(_) => { - let running_flow_job =sqlx::query_as::<_, QueuedJob>( - "SELECT * FROM queue WHERE COALESCE((SELECT root_job FROM queue WHERE id = $1), $1) = id AND workspace_id = $2" - ).bind(flow_id) - .bind(&w_id) - .fetch_optional(&db).await?; - match running_flow_job { - Some(job) => { - let restarted_from = windmill_common::utils::not_found_if_none( - job.parse_flow_status() - .map(|status| status.restarted_from) - .flatten(), + let root = sqlx::query!( + "SELECT + id As \"id!\", + flow_status->'restarted_from'->'flow_job_id' AS \"restarted_from: Json\" + FROM queue + WHERE COALESCE((SELECT root_job FROM queue WHERE id = $1), $1) = id AND workspace_id = $2", + flow_id, + &w_id + ) + .fetch_optional(&db) + .await?; + match root { + Some(root) => { + let restarted_from_id = not_found_if_none( + root.restarted_from, "Id not found in the result's mapping of the root job and root job had no restarted from information", - format!("parent: {}, root: {}, id: {}", flow_id, job.id, node_id), + format!("parent: {}, root: {}, id: {}", flow_id, root.id, node_id), )?; get_result_by_id_from_original_flow( &db, w_id.as_str(), - &restarted_from.flow_job_id, + &restarted_from_id, node_id.as_str(), json_path.clone(), ) @@ -2261,12 +2267,6 @@ pub async fn get_result_by_id( } } -#[derive(FromRow)] -struct FlowJobResult { - leaf_jobs: Option>>, - parent_job: Option, -} - pub async fn get_result_and_success_by_id_from_flow( db: &Pool, w_id: &str, @@ -2364,11 +2364,14 @@ pub async fn get_result_by_id_from_running_flow_inner( flow_id: &Uuid, node_id: &str, ) -> error::Result { - let flow_job_result = sqlx::query_as::<_, FlowJobResult>( - "SELECT leaf_jobs->$1::text as leaf_jobs, parent_job FROM queue WHERE COALESCE((SELECT root_job FROM queue WHERE id = $2), $2) = id AND workspace_id = $3") - .bind(node_id) - .bind(flow_id) - .bind(w_id) + let flow_job_result = sqlx::query!( + "SELECT leaf_jobs->$1::text AS \"leaf_jobs: Json>\", parent_job + FROM queue + WHERE COALESCE((SELECT root_job FROM queue WHERE id = $2), $2) = id AND workspace_id = $3", + node_id, + flow_id, + w_id, + ) .fetch_optional(db) .await?; @@ -2402,18 +2405,13 @@ pub async fn get_result_by_id_from_running_flow_inner( Ok(result_id) } -#[async_recursion] async fn get_completed_flow_node_result_rec( db: &Pool, w_id: &str, - subflows: Vec, + subflows: impl std::iter::Iterator, node_id: &str, ) -> error::Result> { - for subflow in subflows { - let flow_status = subflow.parse_flow_status().ok_or_else(|| { - error::Error::InternalErr(format!("Could not parse flow status of {}", subflow.id)) - })?; - + for (id, flow_status) in subflows { if let Some(node_status) = flow_status .modules .iter() @@ -2424,15 +2422,27 @@ async fn get_completed_flow_node_result_rec( (Some(_), Some(jobs)) => Ok(Some(JobResult::ListJob(jobs))), _ => Err(error::Error::NotFound(format!( "Flow result by id not found going top-down in subflows (currently: {}), (id: {})", - subflow.id, + id, node_id, ))), }; } else { - let subflows = sqlx::query_as::<_, CompletedJob>( - "SELECT *, null as labels FROM completed_job WHERE parent_job = $1 AND workspace_id = $2 AND flow_status IS NOT NULL", - ).bind(subflow.id).bind(w_id).fetch_all(db).await?; - match get_completed_flow_node_result_rec(db, w_id, subflows, node_id).await? { + let subflows = sqlx::query!( + "SELECT id AS \"id!\", flow_status AS \"flow_status!: Json\" + FROM completed_job + WHERE parent_job = $1 AND workspace_id = $2 AND flow_status IS NOT NULL", + id, + w_id + ) + .map(|record| (record.id, record.flow_status.0)) + .fetch_all(db) + .await? + .into_iter(); + match Box::pin(get_completed_flow_node_result_rec( + db, w_id, subflows, node_id, + )) + .await? + { Some(res) => return Ok(Some(res)), None => continue, }; @@ -2448,23 +2458,25 @@ async fn get_result_by_id_from_original_flow_inner( completed_flow_id: &Uuid, node_id: &str, ) -> error::Result { - let flow_job = sqlx::query_as::<_, CompletedJob>( - "SELECT *, null as labels FROM completed_job WHERE id = $1 AND workspace_id = $2", + let flow_job = sqlx::query!( + "SELECT id AS \"id!\", flow_status AS \"flow_status!: Json\" + FROM completed_job WHERE id = $1 AND workspace_id = $2", + completed_flow_id, + w_id ) - .bind(completed_flow_id) - .bind(w_id) + .map(|record| (record.id, record.flow_status.0)) .fetch_optional(db) .await?; - let flow_job = windmill_common::utils::not_found_if_none( + let flow_job = not_found_if_none( flow_job, "Root completed job", format!("root: {}, id: {}", completed_flow_id, node_id), )?; - match get_completed_flow_node_result_rec(db, w_id, vec![flow_job], node_id).await? { + match get_completed_flow_node_result_rec(db, w_id, [flow_job].into_iter(), node_id).await? { Some(res) => Ok(res), - None => Err(error::Error::NotFound(format!( + None => Err(Error::NotFound(format!( "Flow result by id not found going top-down from {}, (id: {})", completed_flow_id, node_id ))), @@ -2500,26 +2512,26 @@ async fn extract_result_from_job_result( let Some(job_id) = job_ids.get(idx).cloned() else { return Ok(to_raw_value(&serde_json::Value::Null)); }; - Ok(sqlx::query_as::<_, ResultR>( - "SELECT result #> $3 as result FROM completed_job WHERE id = $1 AND workspace_id = $2", - ) - .bind(job_id) - .bind(w_id) - .bind( - parts.map(|x| x.to_string()).collect::>() + Ok(sqlx::query_scalar!( + "SELECT result #> $3 AS \"result: Json>\" + FROM completed_job WHERE id = $1 AND workspace_id = $2", + job_id, + w_id, + parts.collect::>() as Vec<&str> ) .fetch_optional(db) .await? - .map(|r| r.result.map(|x| x.0)) .flatten() + .map(|x| x.0) .unwrap_or_else(|| to_raw_value(&serde_json::Value::Null))) } None => { - let rows = sqlx::query_as::<_, ResultWithId>( - "SELECT id, result FROM completed_job WHERE id = ANY($1) AND workspace_id = $2", + let rows = sqlx::query!( + "SELECT id AS \"id!\", result AS \"result: Json>\" + FROM completed_job WHERE id = ANY($1) AND workspace_id = $2", + job_ids.as_slice(), + w_id ) - .bind(job_ids.as_slice()) - .bind(w_id) .fetch_all(db) .await? .into_iter() @@ -2536,15 +2548,15 @@ async fn extract_result_from_job_result( Ok(to_raw_value(&result)) } }, - JobResult::SingleJob(x) => Ok(sqlx::query_as::<_, ResultR>( - "SELECT result #> $3 as result FROM completed_job WHERE id = $1 AND workspace_id = $2", - ) - .bind(x) - .bind(w_id) - .bind( + JobResult::SingleJob(x) => Ok(sqlx::query!( + "SELECT result #> $3 AS \"result: Json>\" + FROM completed_job WHERE id = $1 AND workspace_id = $2", + x, + w_id, json_path - .map(|x| x.split(".").map(|x| x.to_string()).collect::>()) - .unwrap_or_default(), + .as_ref() + .map(|x| x.split(".").collect::>()) + .unwrap_or_default() as Vec<&str>, ) .fetch_optional(db) .await? diff --git a/backend/windmill-worker/src/handle_child.rs b/backend/windmill-worker/src/handle_child.rs index 5dcdbfa28763e..8088f078cb874 100644 --- a/backend/windmill-worker/src/handle_child.rs +++ b/backend/windmill-worker/src/handle_child.rs @@ -206,7 +206,7 @@ pub async fn handle_child( let set_reason = async { if matches!(kill_reason, KillReason::Timeout { .. }) { - if let Err(err) = sqlx::query( + if let Err(err) = sqlx::query!( r#" UPDATE queue SET canceled = true @@ -214,9 +214,9 @@ pub async fn handle_child( , canceled_reason = $1 WHERE id = $2 "#, + format!("duration > {}", timeout_duration.as_secs()), + job_id ) - .bind(format!("duration > {}", timeout_duration.as_secs())) - .bind(job_id) .execute(&db) .await { @@ -644,9 +644,14 @@ where } } if job_id != Uuid::nil() { - let (canceled, canceled_by, canceled_reason, already_completed) = sqlx::query_as::<_, (bool, Option, Option, bool)>("UPDATE queue SET mem_peak = $1, last_ping = now() WHERE id = $2 RETURNING canceled, canceled_by, canceled_reason, false") - .bind(*mem_peak) - .bind(job_id) + let (canceled, canceled_by, canceled_reason, already_completed) = sqlx::query!( + "UPDATE queue SET mem_peak = $1, last_ping = now() + WHERE id = $2 + RETURNING canceled AS \"canceled!\", canceled_by, canceled_reason", + *mem_peak, + job_id + ) + .map(|x| (x.canceled, x.canceled_by, x.canceled_reason, false)) .fetch_optional(&db) .await .unwrap_or_else(|e| { diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index 3d375c776cdf7..b81717cee1936 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -925,13 +925,12 @@ pub async fn update_flow_status_after_job_completion_internal( if old_status.retry.fail_count > 0 && matches!(&new_status, Some(FlowStatusModule::Success { .. })) { - sqlx::query( + sqlx::query!( "UPDATE queue SET flow_status = flow_status - 'retry' - WHERE id = $1 - RETURNING flow_status", + WHERE id = $1", + flow ) - .bind(flow) .execute(&mut *tx) .await .context("remove flow status retry")?; @@ -1355,41 +1354,51 @@ pub async fn update_flow_status_in_progress( let step = get_step_of_flow_status(db, flow).await?; match step { Step::Step(step) => { - sqlx::query(&format!( + sqlx::query!( "UPDATE queue - SET flow_status = jsonb_set(jsonb_set(flow_status, '{{modules, {step}, job}}', $1), '{{modules, {step}, type}}', $2) - WHERE id = $3 AND workspace_id = $4", - )) - .bind(json!(job_in_progress.to_string())) - .bind(json!("InProgress")) - .bind(flow) - .bind(w_id) + SET flow_status = jsonb_set( + jsonb_set(flow_status, ARRAY['modules', $4::INTEGER::TEXT, 'job'], to_jsonb($1::UUID::TEXT)), + ARRAY['modules', $4::INTEGER::TEXT, 'type'], + to_jsonb('InProgress'::text) + ) + WHERE id = $2 AND workspace_id = $3", + job_in_progress, + flow, + w_id, + step as i32 + ) .execute(db) .await?; } Step::PreprocessorStep => { - sqlx::query(&format!( + sqlx::query!( "UPDATE queue - SET flow_status = jsonb_set(jsonb_set(flow_status, '{{preprocessor_module, job}}', $1), '{{preprocessor_module, type}}', $2) - WHERE id = $3 AND workspace_id = $4", - )) - .bind(json!(job_in_progress.to_string())) - .bind(json!("InProgress")) - .bind(flow) - .bind(w_id) + SET flow_status = jsonb_set( + jsonb_set(flow_status, ARRAY['preprocessor_module', 'job'], to_jsonb($1::UUID::TEXT)), + ARRAY['preprocessor_module', 'type'], + to_jsonb('InProgress'::text) + ) + WHERE id = $2 AND workspace_id = $3", + job_in_progress, + flow, + w_id + ) .execute(db) .await?; } Step::FailureStep => { - sqlx::query(&format!( + sqlx::query!( "UPDATE queue - SET flow_status = jsonb_set(jsonb_set(flow_status, '{{failure_module, job}}', $1), '{{failure_module, type}}', $2) - WHERE id = $3 AND workspace_id = $4", - )) - .bind(json!(job_in_progress.to_string())) - .bind(json!("InProgress")) - .bind(flow) - .bind(w_id) + SET flow_status = jsonb_set( + jsonb_set(flow_status, ARRAY['failure_module', 'job'], to_jsonb($1::UUID::TEXT)), + ARRAY['failure_module', 'type'], + to_jsonb('InProgress'::text) + ) + WHERE id = $2 AND workspace_id = $3", + job_in_progress, + flow, + w_id + ) .execute(db) .await?; } @@ -1572,11 +1581,6 @@ pub struct ResumeRow { pub resume_id: i32, } -#[derive(FromRow)] -pub struct RawArgs { - pub args: Option>>>, -} - lazy_static::lazy_static! { static ref CRASH_FORCEFULLY_AT_STEP: Option = std::env::var("CRASH_FORCEFULLY_AT_STEP") .ok() @@ -1872,13 +1876,13 @@ async fn push_next_flow_job( user_groups_required: user_groups_required, self_approval_disabled: self_approval_disabled, }; - sqlx::query( + sqlx::query!( "UPDATE queue SET flow_status = JSONB_SET(flow_status, ARRAY['approval_conditions'], $1) WHERE id = $2", + json!(approval_conditions), + flow_job.id ) - .bind(json!(approval_conditions)) - .bind(flow_job.id) .execute(&mut *tx) .await?; } @@ -1904,32 +1908,33 @@ async fn push_next_flow_job( resume_messages.push(to_raw_value(&js)); } - sqlx::query( + sqlx::query!( "UPDATE queue SET flow_status = JSONB_SET(flow_status, ARRAY['modules', $1::TEXT, 'approvers'], $2) WHERE id = $3", + (status.step - 1).to_string(), + json!(resumes + .into_iter() + .map(|r| Approval { + resume_id: r.resume_id as u16, + approver: r + .approver.clone() + .unwrap_or_else(|| "unknown".to_string()) + }) + .collect::>() + ), + flow_job.id ) - .bind(status.step - 1) - .bind(json!(resumes - .into_iter() - .map(|r| Approval { - resume_id: r.resume_id as u16, - approver: r - .approver.clone() - .unwrap_or_else(|| "unknown".to_string()) - }) - .collect::>())) - .bind(flow_job.id) .execute(&mut *tx) .await?; // Remove the approval conditions from the flow status - sqlx::query( + sqlx::query!( "UPDATE queue SET flow_status = flow_status - 'approval_conditions' WHERE id = $1", + flow_job.id ) - .bind(flow_job.id) .execute(&mut *tx) .await?; @@ -1942,17 +1947,17 @@ async fn push_next_flow_job( FlowStatusModule::WaitingForPriorSteps { .. } ) && is_disapproved.is_none() { - sqlx::query( + sqlx::query!( "UPDATE queue SET flow_status = JSONB_SET(flow_status, ARRAY['modules', flow_status->>'step'::text], $1), suspend = $2, suspend_until = now() + $3 WHERE id = $4", + json!(FlowStatusModule::WaitingForEvents { id: status_module.id(), count: required_events, job: last }), + (required_events - resume_messages.len() as u16) as i32, + Duration::from_secs(suspend.timeout.map(|t| t.into()).unwrap_or_else(|| 30 * 60)) as Duration, + flow_job.id, ) - .bind(json!(FlowStatusModule::WaitingForEvents { id: status_module.id(), count: required_events, job: last })) - .bind((required_events - resume_messages.len() as u16) as i32) - .bind(Duration::from_secs(suspend.timeout.map(|t| t.into()).unwrap_or_else(|| 30 * 60))) - .bind(flow_job.id) .execute(&mut *tx) .await?; @@ -2128,15 +2133,15 @@ async fn push_next_flow_job( scheduled_for_o = Some(from_now(retry_in)); status.retry.failed_jobs.push(job.clone()); - sqlx::query( + sqlx::query!( "UPDATE queue SET flow_status = JSONB_SET(JSONB_SET(flow_status, ARRAY['retry'], $1), ARRAY['modules', $3::TEXT, 'failed_retries'], $4) WHERE id = $2", + json!(RetryStatus { fail_count, ..status.retry.clone() }), + flow_job.id, + status.step.to_string(), + json!(status.retry.failed_jobs) ) - .bind(json!(RetryStatus { fail_count, ..status.retry.clone() })) - .bind(flow_job.id) - .bind(status.step) - .bind(json!(status.retry.failed_jobs)) .execute(db) .warn_after_seconds(2) .await @@ -2165,13 +2170,13 @@ async fn push_next_flow_job( status_module = status.failure_module.module_status.clone(); if module.retry.as_ref().is_some_and(|x| x.has_attempts()) { - sqlx::query( + sqlx::query!( "UPDATE queue SET flow_status = JSONB_SET(flow_status, ARRAY['retry'], $1) WHERE id = $2", + json!(RetryStatus { fail_count: 0, failed_jobs: vec![] }), + flow_job.id ) - .bind(json!(RetryStatus { fail_count: 0, failed_jobs: vec![] })) - .bind(flow_job.id) .execute(db) .await .context("update flow retry")?; @@ -2224,17 +2229,16 @@ async fn push_next_flow_job( ); Ok(Marc::new(hm)) } else if let Some(id) = get_args_from_id { - let row = sqlx::query_as::<_, RawArgs>( - "SELECT args FROM completed_job WHERE id = $1 AND workspace_id = $2", + let args = sqlx::query_scalar!( + "SELECT args AS \"args: Json>>\" + FROM completed_job WHERE id = $1 AND workspace_id = $2", + id, + &flow_job.workspace_id ) - .bind(id) - .bind(&flow_job.workspace_id) .fetch_optional(db) .await?; - if let Some(raw_args) = row { - Ok(Marc::new( - raw_args.args.map(|x| x.0).unwrap_or_else(HashMap::new), - )) + if let Some(args) = args { + Ok(Marc::new(args.map(|x| x.0).unwrap_or_else(HashMap::new))) } else { Ok(Marc::new(HashMap::new())) } @@ -2313,23 +2317,23 @@ async fn push_next_flow_job( let (job_payloads, next_status) = match next_flow_transform { NextFlowTransform::Continue(job_payload, next_state) => (job_payload, next_state), NextFlowTransform::EmptyInnerFlows => { - sqlx::query( + sqlx::query!( "UPDATE queue SET flow_status = JSONB_SET(flow_status, ARRAY['modules', $1::TEXT], $2) WHERE id = $3", + status.step.to_string(), + json!(FlowStatusModule::Success { + id: status_module.id(), + job: Uuid::nil(), + flow_jobs: Some(vec![]), + flow_jobs_success: Some(vec![]), + branch_chosen: None, + approvers: vec![], + failed_retries: vec![], + skipped: false, + }), + flow_job.id ) - .bind(status.step) - .bind(json!(FlowStatusModule::Success { - id: status_module.id(), - job: Uuid::nil(), - flow_jobs: Some(vec![]), - flow_jobs_success: Some(vec![]), - branch_chosen: None, - approvers: vec![], - failed_retries: vec![], - skipped: false, - })) - .bind(flow_job.id) .execute(db) .await?; // flow is reprocessed by the worker in a state where the module has completed successfully. @@ -2609,13 +2613,13 @@ async fn push_next_flow_job( error::Error::InternalErr(format!("Unable to serialize uuid: {e:#}")) })?; - sqlx::query( + sqlx::query!( "UPDATE queue SET flow_status = JSONB_SET(flow_status, ARRAY['cleanup_module', 'flow_jobs_to_clean'], COALESCE(flow_status->'cleanup_module'->'flow_jobs_to_clean', '[]'::jsonb) || $1) WHERE id = $2", + uuid_singleton_json, + root_job.unwrap_or(flow_job.id) ) - .bind(uuid_singleton_json) - .bind(root_job.unwrap_or(flow_job.id)) .execute(&mut *inner_tx) .await?; }