diff --git a/backend/.sqlx/query-036c84bb9ce72748956bc9c18fbe276444fab025a281dc4784596b0e31c1cb9d.json b/backend/.sqlx/query-036c84bb9ce72748956bc9c18fbe276444fab025a281dc4784596b0e31c1cb9d.json new file mode 100644 index 0000000000000..dedd3fd2725ea --- /dev/null +++ b/backend/.sqlx/query-036c84bb9ce72748956bc9c18fbe276444fab025a281dc4784596b0e31c1cb9d.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "create index concurrently if not exists ix_job_workspace_id_created_at_new_9 ON v2_job (workspace_id, created_at DESC) where kind in ('dependencies', 'flowdependencies', 'appdependencies') AND parent_job IS NULL", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "036c84bb9ce72748956bc9c18fbe276444fab025a281dc4784596b0e31c1cb9d" +} diff --git a/backend/.sqlx/query-0df84fc35f2780ceb7c473b0165ebab93a4bc1bcab166aae68244ab1f3d4df9f.json b/backend/.sqlx/query-0df84fc35f2780ceb7c473b0165ebab93a4bc1bcab166aae68244ab1f3d4df9f.json new file mode 100644 index 0000000000000..d34383496aca2 --- /dev/null +++ b/backend/.sqlx/query-0df84fc35f2780ceb7c473b0165ebab93a4bc1bcab166aae68244ab1f3d4df9f.json @@ -0,0 +1,104 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO completed_job AS cj\n ( workspace_id\n , id\n , parent_job\n , created_by\n , created_at\n , started_at\n , duration_ms\n , success\n , script_hash\n , script_path\n , args\n , result\n , raw_code\n , raw_lock\n , canceled\n , canceled_by\n , canceled_reason\n , job_kind\n , schedule_path\n , permissioned_as\n , flow_status\n , raw_flow\n , is_flow_step\n , is_skipped\n , language\n , email\n , visible_to_owner\n , mem_peak\n , tag\n , priority\n )\n VALUES ($1, $2, $3, $4, $5, COALESCE($6, now()), COALESCE($30::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($6, now()))))*1000), $7, $8, $9,$10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29)\n ON CONFLICT (id) DO UPDATE SET success = $7, result = $11 RETURNING duration_ms AS \"duration_ms!\"", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "duration_ms!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Varchar", + "Uuid", + "Uuid", + "Varchar", + "Timestamptz", + "Timestamptz", + "Bool", + "Int8", + "Varchar", + "Jsonb", + "Jsonb", + "Text", + "Text", + "Bool", + "Varchar", + "Text", + { + "Custom": { + "name": "job_kind", + "kind": { + "Enum": [ + "script", + "preview", + "flow", + "dependencies", + "flowpreview", + "script_hub", + "identity", + "flowdependencies", + "http", + "graphql", + "postgresql", + "noop", + "appdependencies", + "deploymentcallback", + "singlescriptflow", + "flowscript", + "flownode", + "appscript" + ] + } + } + }, + "Varchar", + "Varchar", + "Jsonb", + "Jsonb", + "Bool", + "Bool", + { + "Custom": { + "name": "script_lang", + "kind": { + "Enum": [ + "python3", + "deno", + "go", + "bash", + "postgresql", + "nativets", + "bun", + "mysql", + "bigquery", + "snowflake", + "graphql", + "powershell", + "mssql", + "php", + "bunnative", + "rust", + "ansible", + "csharp", + "oracledb" + ] + } + } + }, + "Varchar", + "Bool", + "Int4", + "Varchar", + "Int2", + "Int8" + ] + }, + "nullable": [ + true + ] + }, + "hash": "0df84fc35f2780ceb7c473b0165ebab93a4bc1bcab166aae68244ab1f3d4df9f" +} diff --git a/backend/.sqlx/query-119469ebfe8572c78ed3ee5ab5b1a6a1cb1b0f31e357b5370f9bb7eab1e20a7b.json b/backend/.sqlx/query-119469ebfe8572c78ed3ee5ab5b1a6a1cb1b0f31e357b5370f9bb7eab1e20a7b.json new file mode 100644 index 0000000000000..bb701df30d8c3 --- /dev/null +++ b/backend/.sqlx/query-119469ebfe8572c78ed3ee5ab5b1a6a1cb1b0f31e357b5370f9bb7eab1e20a7b.json @@ -0,0 +1,27 @@ +{ + "db_name": "PostgreSQL", + "query": "WITH uuid_table as (\n select gen_random_uuid() as uuid from generate_series(1, $6)\n )\n INSERT INTO job\n (id, workspace_id, raw_code, raw_lock, raw_flow, tag)\n (SELECT uuid, $1, $2, $3, $4, $5 FROM uuid_table)\n RETURNING id AS \"id!\"", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Varchar", + "Text", + "Text", + "Jsonb", + "Varchar", + "Int4" + ] + }, + "nullable": [ + true + ] + }, + "hash": "119469ebfe8572c78ed3ee5ab5b1a6a1cb1b0f31e357b5370f9bb7eab1e20a7b" +} diff --git a/backend/.sqlx/query-14540eef4594d9282cee3df4f92a7ed2e67243e5c1522850045b2da42fa914bc.json b/backend/.sqlx/query-14540eef4594d9282cee3df4f92a7ed2e67243e5c1522850045b2da42fa914bc.json new file mode 100644 index 0000000000000..b10496550f261 --- /dev/null +++ b/backend/.sqlx/query-14540eef4594d9282cee3df4f92a7ed2e67243e5c1522850045b2da42fa914bc.json @@ -0,0 +1,91 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n queue.job_kind AS \"job_kind!: JobKind\",\n queue.script_hash AS \"script_hash: ScriptHash\",\n queue.raw_flow AS \"raw_flow: sqlx::types::Json>\",\n completed_job.parent_job AS \"parent_job: Uuid\",\n completed_job.created_at AS \"created_at!: chrono::NaiveDateTime\",\n completed_job.created_by AS \"created_by!\",\n queue.script_path,\n queue.args AS \"args: sqlx::types::Json>\"\n FROM queue\n JOIN completed_job ON completed_job.parent_job = queue.id\n WHERE completed_job.id = $1 AND completed_job.workspace_id = $2\n LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "job_kind!: JobKind", + "type_info": { + "Custom": { + "name": "job_kind", + "kind": { + "Enum": [ + "script", + "preview", + "flow", + "dependencies", + "flowpreview", + "script_hub", + "identity", + "flowdependencies", + "http", + "graphql", + "postgresql", + "noop", + "appdependencies", + "deploymentcallback", + "singlescriptflow", + "flowscript", + "flownode", + "appscript" + ] + } + } + } + }, + { + "ordinal": 1, + "name": "script_hash: ScriptHash", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "raw_flow: sqlx::types::Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "parent_job: Uuid", + "type_info": "Uuid" + }, + { + "ordinal": 4, + "name": "created_at!: chrono::NaiveDateTime", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "created_by!", + "type_info": "Varchar" + }, + { + "ordinal": 6, + "name": "script_path", + "type_info": "Varchar" + }, + { + "ordinal": 7, + "name": "args: sqlx::types::Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true, + true, + true, + true, + true, + true, + true + ] + }, + "hash": "14540eef4594d9282cee3df4f92a7ed2e67243e5c1522850045b2da42fa914bc" +} diff --git a/backend/.sqlx/query-15557c0acea71cee03f42516553fb4f5709e0e1a02a0187e88fa5d9e94ffb91a.json b/backend/.sqlx/query-15557c0acea71cee03f42516553fb4f5709e0e1a02a0187e88fa5d9e94ffb91a.json new file mode 100644 index 0000000000000..dc8718bda6b7a --- /dev/null +++ b/backend/.sqlx/query-15557c0acea71cee03f42516553fb4f5709e0e1a02a0187e88fa5d9e94ffb91a.json @@ -0,0 +1,104 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO queue\n (workspace_id, id, running, parent_job, created_by, permissioned_as, scheduled_for, \n script_hash, script_path, raw_code, raw_lock, args, job_kind, schedule_path, raw_flow, flow_status, is_flow_step, language, started_at, same_worker, pre_run_error, email, visible_to_owner, root_job, tag, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id, cache_ttl, priority, last_ping)\n VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, now()), $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, CASE WHEN $3 THEN now() END, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, NULL) RETURNING id AS \"id!\"", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Varchar", + "Uuid", + "Bool", + "Uuid", + "Varchar", + "Varchar", + "Timestamptz", + "Int8", + "Varchar", + "Text", + "Text", + "Jsonb", + { + "Custom": { + "name": "job_kind", + "kind": { + "Enum": [ + "script", + "preview", + "flow", + "dependencies", + "flowpreview", + "script_hub", + "identity", + "flowdependencies", + "http", + "graphql", + "postgresql", + "noop", + "appdependencies", + "deploymentcallback", + "singlescriptflow", + "flowscript", + "flownode", + "appscript" + ] + } + } + }, + "Varchar", + "Jsonb", + "Jsonb", + "Bool", + { + "Custom": { + "name": "script_lang", + "kind": { + "Enum": [ + "python3", + "deno", + "go", + "bash", + "postgresql", + "nativets", + "bun", + "mysql", + "bigquery", + "snowflake", + "graphql", + "powershell", + "mssql", + "php", + "bunnative", + "rust", + "ansible", + "csharp", + "oracledb" + ] + } + } + }, + "Bool", + "Text", + "Varchar", + "Bool", + "Uuid", + "Varchar", + "Int4", + "Int4", + "Int4", + "Varchar", + "Int4", + "Int2" + ] + }, + "nullable": [ + true + ] + }, + "hash": "15557c0acea71cee03f42516553fb4f5709e0e1a02a0187e88fa5d9e94ffb91a" +} diff --git a/backend/.sqlx/query-2cef109784efc04999e4537e0d1d3fb3221e04f3a7c1abe91dd763f366d06618.json b/backend/.sqlx/query-2cef109784efc04999e4537e0d1d3fb3221e04f3a7c1abe91dd763f366d06618.json new file mode 100644 index 0000000000000..3b7793dd05316 --- /dev/null +++ b/backend/.sqlx/query-2cef109784efc04999e4537e0d1d3fb3221e04f3a7c1abe91dd763f366d06618.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT success AS \"success!\" FROM completed_job WHERE id = ANY($1)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "success!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "UuidArray" + ] + }, + "nullable": [ + true + ] + }, + "hash": "2cef109784efc04999e4537e0d1d3fb3221e04f3a7c1abe91dd763f366d06618" +} diff --git a/backend/.sqlx/query-36b26b3a6458d8a0b4f770d52c1bb09370b905d610b9ceb3cfac11365586320d.json b/backend/.sqlx/query-36b26b3a6458d8a0b4f770d52c1bb09370b905d610b9ceb3cfac11365586320d.json new file mode 100644 index 0000000000000..3b3fa11ffbc6f --- /dev/null +++ b/backend/.sqlx/query-36b26b3a6458d8a0b4f770d52c1bb09370b905d610b9ceb3cfac11365586320d.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT running AS \"running!\" FROM queue WHERE id = $1 AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "running!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true + ] + }, + "hash": "36b26b3a6458d8a0b4f770d52c1bb09370b905d610b9ceb3cfac11365586320d" +} diff --git a/backend/.sqlx/query-3a4f6ebad63a67b648e672312a0c0eb8956d78398d97a18d095f83f0ab0a7915.json b/backend/.sqlx/query-3a4f6ebad63a67b648e672312a0c0eb8956d78398d97a18d095f83f0ab0a7915.json new file mode 100644 index 0000000000000..fdf8dd40f3351 --- /dev/null +++ b/backend/.sqlx/query-3a4f6ebad63a67b648e672312a0c0eb8956d78398d97a18d095f83f0ab0a7915.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue SET running = false, started_at = null\n WHERE last_ping < now() - ($1 || ' seconds')::interval\n AND running = true AND job_kind NOT IN ('flow', 'flowpreview', 'flownode', 'singlescriptflow') AND same_worker = false RETURNING id AS \"id!\", workspace_id AS \"workspace_id!\", last_ping", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "workspace_id!", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "last_ping", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "3a4f6ebad63a67b648e672312a0c0eb8956d78398d97a18d095f83f0ab0a7915" +} diff --git a/backend/.sqlx/query-3a5edf3dd884b5a8862bb112f6520967ed4a218782192c6c6fc1498f45d753a6.json b/backend/.sqlx/query-3a5edf3dd884b5a8862bb112f6520967ed4a218782192c6c6fc1498f45d753a6.json new file mode 100644 index 0000000000000..26d0b63b5cd4d --- /dev/null +++ b/backend/.sqlx/query-3a5edf3dd884b5a8862bb112f6520967ed4a218782192c6c6fc1498f45d753a6.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "create index concurrently if not exists ix_job_created_at ON v2_job (created_at DESC)", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "3a5edf3dd884b5a8862bb112f6520967ed4a218782192c6c6fc1498f45d753a6" +} diff --git a/backend/.sqlx/query-3bacf9cd9aa63f4bec5f983f4a0c3030216b5a4ed669f77962509d1c2c6cb780.json b/backend/.sqlx/query-3bacf9cd9aa63f4bec5f983f4a0c3030216b5a4ed669f77962509d1c2c6cb780.json new file mode 100644 index 0000000000000..200a5bf47f728 --- /dev/null +++ b/backend/.sqlx/query-3bacf9cd9aa63f4bec5f983f4a0c3030216b5a4ed669f77962509d1c2c6cb780.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "create index concurrently if not exists root_job_index_by_path_2 ON v2_job (workspace_id, runnable_path, created_at desc) WHERE parent_job IS NULL", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "3bacf9cd9aa63f4bec5f983f4a0c3030216b5a4ed669f77962509d1c2c6cb780" +} diff --git a/backend/.sqlx/query-3bc1919515120116705d7c250a34f2b9bf7c4bcaedb87c28f974e46c9c42200c.json b/backend/.sqlx/query-3bc1919515120116705d7c250a34f2b9bf7c4bcaedb87c28f974e46c9c42200c.json new file mode 100644 index 0000000000000..125c9593b622e --- /dev/null +++ b/backend/.sqlx/query-3bc1919515120116705d7c250a34f2b9bf7c4bcaedb87c28f974e46c9c42200c.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id AS \"id!\" FROM queue WHERE id = ANY($1) AND schedule_path IS NULL AND ($2::text[] IS NULL OR tag = ANY($2))", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "UuidArray", + "TextArray" + ] + }, + "nullable": [ + true + ] + }, + "hash": "3bc1919515120116705d7c250a34f2b9bf7c4bcaedb87c28f974e46c9c42200c" +} diff --git a/backend/.sqlx/query-402fd5bff6e8420c9b3477f05df625cae355fda673b9e85284e0fcd7d9232eb7.json b/backend/.sqlx/query-402fd5bff6e8420c9b3477f05df625cae355fda673b9e85284e0fcd7d9232eb7.json new file mode 100644 index 0000000000000..83d1b15744d97 --- /dev/null +++ b/backend/.sqlx/query-402fd5bff6e8420c9b3477f05df625cae355fda673b9e85284e0fcd7d9232eb7.json @@ -0,0 +1,73 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n script_path, script_hash AS \"script_hash: ScriptHash\",\n job_kind AS \"job_kind!: JobKind\",\n flow_status AS \"flow_status: Json>\",\n raw_flow AS \"raw_flow: Json>\"\n FROM completed_job WHERE id = $1 and workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "script_path", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "script_hash: ScriptHash", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "job_kind!: JobKind", + "type_info": { + "Custom": { + "name": "job_kind", + "kind": { + "Enum": [ + "script", + "preview", + "flow", + "dependencies", + "flowpreview", + "script_hub", + "identity", + "flowdependencies", + "http", + "graphql", + "postgresql", + "noop", + "appdependencies", + "deploymentcallback", + "singlescriptflow", + "flowscript", + "flownode", + "appscript" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "flow_status: Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 4, + "name": "raw_flow: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true, + true, + true, + true + ] + }, + "hash": "402fd5bff6e8420c9b3477f05df625cae355fda673b9e85284e0fcd7d9232eb7" +} diff --git a/backend/.sqlx/query-44a317f7647e2b515f90dc9c04f7ac75c2c87c7c3036acd96ba72fb2a21700db.json b/backend/.sqlx/query-44a317f7647e2b515f90dc9c04f7ac75c2c87c7c3036acd96ba72fb2a21700db.json new file mode 100644 index 0000000000000..8885d178e210c --- /dev/null +++ b/backend/.sqlx/query-44a317f7647e2b515f90dc9c04f7ac75c2c87c7c3036acd96ba72fb2a21700db.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM completed_job WHERE created_at <= now() - ($1::bigint::text || ' s')::interval AND started_at + ((duration_ms/1000 + $1::bigint) || ' s')::interval <= now() RETURNING id AS \"id!\"", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + true + ] + }, + "hash": "44a317f7647e2b515f90dc9c04f7ac75c2c87c7c3036acd96ba72fb2a21700db" +} diff --git a/backend/.sqlx/query-4d3ef32120623584bf5c13d86ea6ad7b3aa41d9b581738d16fbfff4cc5b72a7a.json b/backend/.sqlx/query-4d3ef32120623584bf5c13d86ea6ad7b3aa41d9b581738d16fbfff4cc5b72a7a.json new file mode 100644 index 0000000000000..9fad8009ec451 --- /dev/null +++ b/backend/.sqlx/query-4d3ef32120623584bf5c13d86ea6ad7b3aa41d9b581738d16fbfff4cc5b72a7a.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT canceled AS \"canceled!\" FROM queue WHERE id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "canceled!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + true + ] + }, + "hash": "4d3ef32120623584bf5c13d86ea6ad7b3aa41d9b581738d16fbfff4cc5b72a7a" +} diff --git a/backend/.sqlx/query-5401c521b5e63b7d9e7bc51c19d116599f6bcedbe70f3bf346b482fe79501958.json b/backend/.sqlx/query-5401c521b5e63b7d9e7bc51c19d116599f6bcedbe70f3bf346b482fe79501958.json new file mode 100644 index 0000000000000..9a1b65ff521bb --- /dev/null +++ b/backend/.sqlx/query-5401c521b5e63b7d9e7bc51c19d116599f6bcedbe70f3bf346b482fe79501958.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "create index concurrently if not exists ix_job_workspace_id_created_at_new_8 ON v2_job (workspace_id, created_at DESC) where kind in ('deploymentcallback') AND parent_job IS NULL", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "5401c521b5e63b7d9e7bc51c19d116599f6bcedbe70f3bf346b482fe79501958" +} diff --git a/backend/.sqlx/query-5bce731932a35dbecc38c7b9665ef1117a15acf7d0d41b93de165e788b55d93f.json b/backend/.sqlx/query-5bce731932a35dbecc38c7b9665ef1117a15acf7d0d41b93de165e788b55d93f.json new file mode 100644 index 0000000000000..f3ff1e599dd35 --- /dev/null +++ b/backend/.sqlx/query-5bce731932a35dbecc38c7b9665ef1117a15acf7d0d41b93de165e788b55d93f.json @@ -0,0 +1,42 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT created_by AS \"created_by!\", CONCAT(coalesce(completed_job.logs, ''), coalesce(job_logs.logs, '')) as logs, job_logs.log_offset, job_logs.log_file_index\n FROM completed_job \n LEFT JOIN job_logs ON job_logs.job_id = completed_job.id \n WHERE completed_job.id = $1 AND completed_job.workspace_id = $2 AND ($3::text[] IS NULL OR completed_job.tag = ANY($3))", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "created_by!", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "logs", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "log_offset", + "type_info": "Int4" + }, + { + "ordinal": 3, + "name": "log_file_index", + "type_info": "TextArray" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text", + "TextArray" + ] + }, + "nullable": [ + true, + null, + false, + true + ] + }, + "hash": "5bce731932a35dbecc38c7b9665ef1117a15acf7d0d41b93de165e788b55d93f" +} diff --git a/backend/.sqlx/query-661f472ff3860983322162420457f5033b9c9afc344d9c3e385ba20a3ad2197a.json b/backend/.sqlx/query-661f472ff3860983322162420457f5033b9c9afc344d9c3e385ba20a3ad2197a.json index 1fa370e682ca6..75b8108281532 100644 --- a/backend/.sqlx/query-661f472ff3860983322162420457f5033b9c9afc344d9c3e385ba20a3ad2197a.json +++ b/backend/.sqlx/query-661f472ff3860983322162420457f5033b9c9afc344d9c3e385ba20a3ad2197a.json @@ -5,7 +5,7 @@ "columns": [ { "ordinal": 0, - "name": "bool", + "name": "?column?", "type_info": "Bool" } ], diff --git a/backend/.sqlx/query-6b0115e40d4361b3ca72dbd071b0a8c0319c5ae0b92f289ec1e74d2478c9e740.json b/backend/.sqlx/query-6b0115e40d4361b3ca72dbd071b0a8c0319c5ae0b92f289ec1e74d2478c9e740.json new file mode 100644 index 0000000000000..265ededd17031 --- /dev/null +++ b/backend/.sqlx/query-6b0115e40d4361b3ca72dbd071b0a8c0319c5ae0b92f289ec1e74d2478c9e740.json @@ -0,0 +1,42 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT created_by AS \"created_by!\", CONCAT(coalesce(queue.logs, ''), coalesce(job_logs.logs, '')) as logs, coalesce(job_logs.log_offset, 0) as log_offset, job_logs.log_file_index\n FROM queue \n LEFT JOIN job_logs ON job_logs.job_id = queue.id \n WHERE queue.id = $1 AND queue.workspace_id = $2 AND ($3::text[] IS NULL OR queue.tag = ANY($3))", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "created_by!", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "logs", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "log_offset", + "type_info": "Int4" + }, + { + "ordinal": 3, + "name": "log_file_index", + "type_info": "TextArray" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text", + "TextArray" + ] + }, + "nullable": [ + true, + null, + null, + true + ] + }, + "hash": "6b0115e40d4361b3ca72dbd071b0a8c0319c5ae0b92f289ec1e74d2478c9e740" +} diff --git a/backend/.sqlx/query-74a2a90d12ca0179c8a80f9bf574066db4e8735c0f717d91391a28bf832c0e71.json b/backend/.sqlx/query-74a2a90d12ca0179c8a80f9bf574066db4e8735c0f717d91391a28bf832c0e71.json new file mode 100644 index 0000000000000..fc80d816cf311 --- /dev/null +++ b/backend/.sqlx/query-74a2a90d12ca0179c8a80f9bf574066db4e8735c0f717d91391a28bf832c0e71.json @@ -0,0 +1,25 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue SET canceled = true, canceled_by = $1, canceled_reason = $2, scheduled_for = now(), suspend = 0 WHERE id = $3 AND workspace_id = $4 AND (canceled = false OR canceled_reason != $2) RETURNING id AS \"id!\"", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Varchar", + "Text", + "Uuid", + "Text" + ] + }, + "nullable": [ + true + ] + }, + "hash": "74a2a90d12ca0179c8a80f9bf574066db4e8735c0f717d91391a28bf832c0e71" +} diff --git a/backend/.sqlx/query-7dc7bc4e22942792938d273655962a95486f6da82cdc08f79dd6cef508256474.json b/backend/.sqlx/query-7dc7bc4e22942792938d273655962a95486f6da82cdc08f79dd6cef508256474.json new file mode 100644 index 0000000000000..94ad2087885d5 --- /dev/null +++ b/backend/.sqlx/query-7dc7bc4e22942792938d273655962a95486f6da82cdc08f79dd6cef508256474.json @@ -0,0 +1,25 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO completed_job AS cj\n ( workspace_id\n , id\n , parent_job\n , created_by\n , created_at\n , started_at\n , duration_ms\n , success\n , script_hash\n , script_path\n , args\n , result\n , raw_code\n , raw_lock\n , canceled\n , canceled_by\n , canceled_reason\n , job_kind\n , schedule_path\n , permissioned_as\n , flow_status\n , raw_flow\n , is_flow_step\n , is_skipped\n , language\n , email\n , visible_to_owner\n , mem_peak\n , tag\n , priority\n )\n SELECT workspace_id\n , id\n , parent_job\n , created_by\n , created_at\n , now()\n , 0\n , false\n , script_hash\n , script_path\n , args\n , $4\n , raw_code\n , raw_lock\n , true\n , $1\n , canceled_reason\n , job_kind\n , schedule_path\n , permissioned_as\n , flow_status\n , raw_flow\n , is_flow_step\n , false\n , language\n , email\n , visible_to_owner\n , mem_peak\n , tag\n , priority FROM queue \n WHERE id = any($2) AND running = false AND parent_job IS NULL AND workspace_id = $3 AND schedule_path IS NULL FOR UPDATE SKIP LOCKED\n ON CONFLICT (id) DO NOTHING RETURNING id AS \"id!\"", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Varchar", + "UuidArray", + "Text", + "Jsonb" + ] + }, + "nullable": [ + true + ] + }, + "hash": "7dc7bc4e22942792938d273655962a95486f6da82cdc08f79dd6cef508256474" +} diff --git a/backend/.sqlx/query-8be277b89102a26dda506202a3ef7eb05342cfb3aa9b4f5d80c70fbc50d437ba.json b/backend/.sqlx/query-8be277b89102a26dda506202a3ef7eb05342cfb3aa9b4f5d80c70fbc50d437ba.json new file mode 100644 index 0000000000000..e5819f5c6ab3f --- /dev/null +++ b/backend/.sqlx/query-8be277b89102a26dda506202a3ef7eb05342cfb3aa9b4f5d80c70fbc50d437ba.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "create index concurrently if not exists ix_job_workspace_id_created_at_new_6 ON v2_job (workspace_id, created_at DESC) where kind in ('script', 'flow') AND parent_job IS NULL", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "8be277b89102a26dda506202a3ef7eb05342cfb3aa9b4f5d80c70fbc50d437ba" +} diff --git a/backend/.sqlx/query-931703a98d2ee5fb58d3380896baaee032e731db1e6bd49d991a54f49ab8fa46.json b/backend/.sqlx/query-931703a98d2ee5fb58d3380896baaee032e731db1e6bd49d991a54f49ab8fa46.json new file mode 100644 index 0000000000000..5f86a45011fe0 --- /dev/null +++ b/backend/.sqlx/query-931703a98d2ee5fb58d3380896baaee032e731db1e6bd49d991a54f49ab8fa46.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "create index concurrently if not exists ix_completed_job_workspace_id_started_at_new_2 ON v2_job_completed (workspace_id, started_at DESC)", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "931703a98d2ee5fb58d3380896baaee032e731db1e6bd49d991a54f49ab8fa46" +} diff --git a/backend/.sqlx/query-96a9357888af26e5ec1e314bb565af05de561a8f9899e4ddca958982fdb67803.json b/backend/.sqlx/query-96a9357888af26e5ec1e314bb565af05de561a8f9899e4ddca958982fdb67803.json new file mode 100644 index 0000000000000..5fbab5bf206e2 --- /dev/null +++ b/backend/.sqlx/query-96a9357888af26e5ec1e314bb565af05de561a8f9899e4ddca958982fdb67803.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id AS \"id!\" FROM queue WHERE schedule_path = $1 AND workspace_id = $2 AND id != $3 AND running = true", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Uuid" + ] + }, + "nullable": [ + true + ] + }, + "hash": "96a9357888af26e5ec1e314bb565af05de561a8f9899e4ddca958982fdb67803" +} diff --git a/backend/.sqlx/query-a33e282d02c53e5d6142dc7e6882a6d8f3d068c55cddf209eb6b6431ca26c910.json b/backend/.sqlx/query-a33e282d02c53e5d6142dc7e6882a6d8f3d068c55cddf209eb6b6431ca26c910.json new file mode 100644 index 0000000000000..20d1bfb0459eb --- /dev/null +++ b/backend/.sqlx/query-a33e282d02c53e5d6142dc7e6882a6d8f3d068c55cddf209eb6b6431ca26c910.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id AS \"id!\" FROM queue WHERE workspace_id = $1 and root_job = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Text", + "Uuid" + ] + }, + "nullable": [ + true + ] + }, + "hash": "a33e282d02c53e5d6142dc7e6882a6d8f3d068c55cddf209eb6b6431ca26c910" +} diff --git a/backend/.sqlx/query-ae8dfecd46425d5f86003eea9a578e9831fc0e700cc76ab9627afe9040a4efe0.json b/backend/.sqlx/query-ae8dfecd46425d5f86003eea9a578e9831fc0e700cc76ab9627afe9040a4efe0.json new file mode 100644 index 0000000000000..c0703bc518664 --- /dev/null +++ b/backend/.sqlx/query-ae8dfecd46425d5f86003eea9a578e9831fc0e700cc76ab9627afe9040a4efe0.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "create index concurrently if not exists ix_job_workspace_id_created_at_new_7 ON v2_job (workspace_id, created_at DESC) where kind in ('script', 'flow') AND parent_job IS NULL", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "ae8dfecd46425d5f86003eea9a578e9831fc0e700cc76ab9627afe9040a4efe0" +} diff --git a/backend/.sqlx/query-b06915e02398511033717ea13b710c86a24fe666884cfd49996dee961751ce51.json b/backend/.sqlx/query-b06915e02398511033717ea13b710c86a24fe666884cfd49996dee961751ce51.json new file mode 100644 index 0000000000000..194fda66efedf --- /dev/null +++ b/backend/.sqlx/query-b06915e02398511033717ea13b710c86a24fe666884cfd49996dee961751ce51.json @@ -0,0 +1,89 @@ +{ + "db_name": "PostgreSQL", + "query": "WITH uuid_table as (\n select unnest($11::uuid[]) as uuid\n )\n INSERT INTO queue \n (id, script_hash, script_path, job_kind, language, args, tag, created_by, permissioned_as, email, scheduled_for, workspace_id, concurrent_limit, concurrency_time_window_s, timeout, flow_status)\n (SELECT uuid, $1, $2, $3, $4, ('{ \"uuid\": \"' || uuid || '\" }')::jsonb, $5, $6, $7, $8, $9, $10, $12, $13, $14, $15 FROM uuid_table) \n RETURNING id AS \"id!\"", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Int8", + "Varchar", + { + "Custom": { + "name": "job_kind", + "kind": { + "Enum": [ + "script", + "preview", + "flow", + "dependencies", + "flowpreview", + "script_hub", + "identity", + "flowdependencies", + "http", + "graphql", + "postgresql", + "noop", + "appdependencies", + "deploymentcallback", + "singlescriptflow", + "flowscript", + "flownode", + "appscript" + ] + } + } + }, + { + "Custom": { + "name": "script_lang", + "kind": { + "Enum": [ + "python3", + "deno", + "go", + "bash", + "postgresql", + "nativets", + "bun", + "mysql", + "bigquery", + "snowflake", + "graphql", + "powershell", + "mssql", + "php", + "bunnative", + "rust", + "ansible", + "csharp", + "oracledb" + ] + } + } + }, + "Varchar", + "Varchar", + "Varchar", + "Varchar", + "Timestamptz", + "Varchar", + "UuidArray", + "Int4", + "Int4", + "Int4", + "Jsonb" + ] + }, + "nullable": [ + true + ] + }, + "hash": "b06915e02398511033717ea13b710c86a24fe666884cfd49996dee961751ce51" +} diff --git a/backend/.sqlx/query-bb93ba18709648b47cfbd04d91afd3b38546b1a718d0abff6b2795d7c2a29c97.json b/backend/.sqlx/query-bb93ba18709648b47cfbd04d91afd3b38546b1a718d0abff6b2795d7c2a29c97.json new file mode 100644 index 0000000000000..d6288765fe4cd --- /dev/null +++ b/backend/.sqlx/query-bb93ba18709648b47cfbd04d91afd3b38546b1a718d0abff6b2795d7c2a29c97.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT id AS \"id!\", flow_status, suspend AS \"suspend!\", script_path\n FROM queue\n WHERE id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "flow_status", + "type_info": "Jsonb" + }, + { + "ordinal": 2, + "name": "suspend!", + "type_info": "Int4" + }, + { + "ordinal": 3, + "name": "script_path", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + true, + true, + true, + true + ] + }, + "hash": "bb93ba18709648b47cfbd04d91afd3b38546b1a718d0abff6b2795d7c2a29c97" +} diff --git a/backend/.sqlx/query-bf91cb319e5b83c2235292a9e3ce8aa1c097c94b01aad0d9f7bce76a2a272bcc.json b/backend/.sqlx/query-bf91cb319e5b83c2235292a9e3ce8aa1c097c94b01aad0d9f7bce76a2a272bcc.json new file mode 100644 index 0000000000000..25caf0d4fe6c0 --- /dev/null +++ b/backend/.sqlx/query-bf91cb319e5b83c2235292a9e3ce8aa1c097c94b01aad0d9f7bce76a2a272bcc.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT success AS \"success!\"\n FROM completed_job WHERE id = $1 AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "success!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true + ] + }, + "hash": "bf91cb319e5b83c2235292a9e3ce8aa1c097c94b01aad0d9f7bce76a2a272bcc" +} diff --git a/backend/.sqlx/query-bfff3d8df18db198d6ebba8a049b00147fc8bcd42f3df37ef81b9ded80974bd0.json b/backend/.sqlx/query-bfff3d8df18db198d6ebba8a049b00147fc8bcd42f3df37ef81b9ded80974bd0.json index b378d68f5b09b..4843d959c190c 100644 --- a/backend/.sqlx/query-bfff3d8df18db198d6ebba8a049b00147fc8bcd42f3df37ef81b9ded80974bd0.json +++ b/backend/.sqlx/query-bfff3d8df18db198d6ebba8a049b00147fc8bcd42f3df37ef81b9ded80974bd0.json @@ -5,7 +5,7 @@ "columns": [ { "ordinal": 0, - "name": "bool", + "name": "?column?", "type_info": "Bool" } ], diff --git a/backend/.sqlx/query-c0b96d2f421afc43e256a8475825623bcb3dd4cbc37d570fc4273127bbf77c24.json b/backend/.sqlx/query-c0b96d2f421afc43e256a8475825623bcb3dd4cbc37d570fc4273127bbf77c24.json new file mode 100644 index 0000000000000..2015ab01d61cc --- /dev/null +++ b/backend/.sqlx/query-c0b96d2f421afc43e256a8475825623bcb3dd4cbc37d570fc4273127bbf77c24.json @@ -0,0 +1,67 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n job_kind AS \"job_kind!: JobKind\",\n script_hash AS \"script_hash: ScriptHash\",\n flow_status AS \"flow_status!: Json>\",\n raw_flow AS \"raw_flow: Json>\"\n FROM queue WHERE id = $1 AND workspace_id = $2 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "job_kind!: JobKind", + "type_info": { + "Custom": { + "name": "job_kind", + "kind": { + "Enum": [ + "script", + "preview", + "flow", + "dependencies", + "flowpreview", + "script_hub", + "identity", + "flowdependencies", + "http", + "graphql", + "postgresql", + "noop", + "appdependencies", + "deploymentcallback", + "singlescriptflow", + "flowscript", + "flownode", + "appscript" + ] + } + } + } + }, + { + "ordinal": 1, + "name": "script_hash: ScriptHash", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "flow_status!: Json>", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "raw_flow: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true, + true, + true + ] + }, + "hash": "c0b96d2f421afc43e256a8475825623bcb3dd4cbc37d570fc4273127bbf77c24" +} diff --git a/backend/.sqlx/query-c3b5abbf2c9079d597a55f7c63bc83b8b4da98bda204a40f045a62172cfb4ebb.json b/backend/.sqlx/query-c3b5abbf2c9079d597a55f7c63bc83b8b4da98bda204a40f045a62172cfb4ebb.json index b545c4859953d..3c5109c33faca 100644 --- a/backend/.sqlx/query-c3b5abbf2c9079d597a55f7c63bc83b8b4da98bda204a40f045a62172cfb4ebb.json +++ b/backend/.sqlx/query-c3b5abbf2c9079d597a55f7c63bc83b8b4da98bda204a40f045a62172cfb4ebb.json @@ -21,7 +21,7 @@ ] }, "nullable": [ - false, + true, null ] }, diff --git a/backend/.sqlx/query-c4bdbf1c6bc7d93db4cf2633105b088b781354cb7c02628d9f8ff7f9ea0e7ed9.json b/backend/.sqlx/query-c4bdbf1c6bc7d93db4cf2633105b088b781354cb7c02628d9f8ff7f9ea0e7ed9.json new file mode 100644 index 0000000000000..4d138db49c35f --- /dev/null +++ b/backend/.sqlx/query-c4bdbf1c6bc7d93db4cf2633105b088b781354cb7c02628d9f8ff7f9ea0e7ed9.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "CREATE INDEX CONCURRENTLY labeled_jobs_on_jobs ON v2_job_completed USING GIN ((result -> 'wm_labels')) WHERE result ? 'wm_labels'", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "c4bdbf1c6bc7d93db4cf2633105b088b781354cb7c02628d9f8ff7f9ea0e7ed9" +} diff --git a/backend/.sqlx/query-c7be5fa2eaf66147c1213046e615f5e9fd168ef1e3aba8af64b15341055d6007.json b/backend/.sqlx/query-c7be5fa2eaf66147c1213046e615f5e9fd168ef1e3aba8af64b15341055d6007.json new file mode 100644 index 0000000000000..d79a7f54652d3 --- /dev/null +++ b/backend/.sqlx/query-c7be5fa2eaf66147c1213046e615f5e9fd168ef1e3aba8af64b15341055d6007.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "create index concurrently if not exists ix_job_workspace_id_created_at_new_5 ON v2_job (workspace_id, created_at DESC) where kind in ('preview', 'flowpreview') AND parent_job IS NULL", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "c7be5fa2eaf66147c1213046e615f5e9fd168ef1e3aba8af64b15341055d6007" +} diff --git a/backend/.sqlx/query-d7f1e2920aec0f4eab9238d01370465945acdfa779f16b99cdc1a6b7ef84943e.json b/backend/.sqlx/query-d7f1e2920aec0f4eab9238d01370465945acdfa779f16b99cdc1a6b7ef84943e.json new file mode 100644 index 0000000000000..dbc893740bbe6 --- /dev/null +++ b/backend/.sqlx/query-d7f1e2920aec0f4eab9238d01370465945acdfa779f16b99cdc1a6b7ef84943e.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT id AS \"id!\", flow_status, suspend AS \"suspend!\", script_path\n FROM queue\n WHERE id = ( SELECT parent_job FROM queue WHERE id = $1 UNION ALL SELECT parent_job FROM completed_job WHERE id = $1)\n FOR UPDATE\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "flow_status", + "type_info": "Jsonb" + }, + { + "ordinal": 2, + "name": "suspend!", + "type_info": "Int4" + }, + { + "ordinal": 3, + "name": "script_path", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + true, + true, + true, + true + ] + }, + "hash": "d7f1e2920aec0f4eab9238d01370465945acdfa779f16b99cdc1a6b7ef84943e" +} diff --git a/backend/.sqlx/query-f64ae18811e211dbf0cb98b43d3b018b0dcc0abc7e4a1f0b45885cfe18efd9b2.json b/backend/.sqlx/query-f64ae18811e211dbf0cb98b43d3b018b0dcc0abc7e4a1f0b45885cfe18efd9b2.json new file mode 100644 index 0000000000000..99e04c9bff7ba --- /dev/null +++ b/backend/.sqlx/query-f64ae18811e211dbf0cb98b43d3b018b0dcc0abc7e4a1f0b45885cfe18efd9b2.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "create index concurrently if not exists ix_job_workspace_id_created_at_new_3 ON v2_job (workspace_id, created_at DESC)", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "f64ae18811e211dbf0cb98b43d3b018b0dcc0abc7e4a1f0b45885cfe18efd9b2" +} diff --git a/backend/.sqlx/query-f8f25948ae14fcb71c666cdc5e51d888e1f22fb2300a78bbeafebf64e82658db.json b/backend/.sqlx/query-f8f25948ae14fcb71c666cdc5e51d888e1f22fb2300a78bbeafebf64e82658db.json new file mode 100644 index 0000000000000..51675232120ca --- /dev/null +++ b/backend/.sqlx/query-f8f25948ae14fcb71c666cdc5e51d888e1f22fb2300a78bbeafebf64e82658db.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id AS \"id!\" FROM queue WHERE parent_job = $1 AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true + ] + }, + "hash": "f8f25948ae14fcb71c666cdc5e51d888e1f22fb2300a78bbeafebf64e82658db" +} diff --git a/backend/migrations/20250117124344_v2_job_runtime.down.sql b/backend/migrations/20250117124344_v2_job_runtime.down.sql new file mode 100644 index 0000000000000..45b34f0f7a547 --- /dev/null +++ b/backend/migrations/20250117124344_v2_job_runtime.down.sql @@ -0,0 +1,4 @@ +-- Add down migration script here +-- Lock `queue` in access exclusive to prevent deadlocks when dropping the foreign key to `queue`. +LOCK TABLE queue IN ACCESS EXCLUSIVE MODE; +DROP TABLE v2_job_runtime CASCADE; diff --git a/backend/migrations/20250117124344_v2_job_runtime.up.sql b/backend/migrations/20250117124344_v2_job_runtime.up.sql new file mode 100644 index 0000000000000..60b3a99384fd7 --- /dev/null +++ b/backend/migrations/20250117124344_v2_job_runtime.up.sql @@ -0,0 +1,14 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS v2_job_runtime ( + id UUID REFERENCES queue (id) ON DELETE CASCADE PRIMARY KEY NOT NULL, + -- Metrics fields: + ping TIMESTAMP WITH TIME ZONE DEFAULT now(), + memory_peak INTEGER +); + +CREATE POLICY admin_policy ON v2_job_runtime + AS PERMISSIVE + FOR ALL + TO windmill_admin; + +GRANT ALL ON v2_job_runtime TO windmill_user, windmill_admin; diff --git a/backend/migrations/20250117124345_v2_job_flow_runtime.down.sql b/backend/migrations/20250117124345_v2_job_flow_runtime.down.sql new file mode 100644 index 0000000000000..1fd382d050340 --- /dev/null +++ b/backend/migrations/20250117124345_v2_job_flow_runtime.down.sql @@ -0,0 +1,4 @@ +-- Add down migration script here +-- Lock `queue` in access exclusive to prevent deadlocks when dropping the foreign key to `queue`. +LOCK TABLE queue IN ACCESS EXCLUSIVE MODE; +DROP TABLE v2_job_flow_runtime CASCADE; diff --git a/backend/migrations/20250117124345_v2_job_flow_runtime.up.sql b/backend/migrations/20250117124345_v2_job_flow_runtime.up.sql new file mode 100644 index 0000000000000..eaf8a442a854a --- /dev/null +++ b/backend/migrations/20250117124345_v2_job_flow_runtime.up.sql @@ -0,0 +1,14 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS v2_job_flow_runtime ( + id UUID REFERENCES queue (id) ON DELETE CASCADE PRIMARY KEY NOT NULL, + -- Flow status fields: + flow_status JSONB NOT NULL, + leaf_jobs JSONB +); + +CREATE POLICY admin_policy ON v2_job_flow_runtime + AS PERMISSIVE + FOR ALL + TO windmill_admin; + +GRANT ALL ON v2_job_flow_runtime TO windmill_user, windmill_admin; diff --git a/backend/migrations/20250117124409_v2_job_queue_compatiblity_view.down.sql b/backend/migrations/20250117124409_v2_job_queue_compatiblity_view.down.sql new file mode 100644 index 0000000000000..43f51a18cea89 --- /dev/null +++ b/backend/migrations/20250117124409_v2_job_queue_compatiblity_view.down.sql @@ -0,0 +1,3 @@ +-- Add down migration script here +DROP VIEW queue; +ALTER TABLE v2_job_queue RENAME TO queue; diff --git a/backend/migrations/20250117124409_v2_job_queue_compatiblity_view.up.sql b/backend/migrations/20250117124409_v2_job_queue_compatiblity_view.up.sql new file mode 100644 index 0000000000000..6a4f1b9c74df6 --- /dev/null +++ b/backend/migrations/20250117124409_v2_job_queue_compatiblity_view.up.sql @@ -0,0 +1,48 @@ +-- Add down migration script here +ALTER TABLE queue RENAME TO v2_job_queue; +CREATE OR REPLACE VIEW queue AS ( + SELECT + id, + workspace_id, + parent_job AS parent_job, + created_by AS created_by, + created_at, + started_at, + scheduled_for, + running, + script_hash AS script_hash, + script_path AS script_path, + args AS args, + logs AS logs, + raw_code AS raw_code, + canceled AS canceled, + canceled_by, + canceled_reason, + last_ping AS last_ping, + job_kind AS job_kind, + env_id AS env_id, + schedule_path AS schedule_path, + permissioned_as AS permissioned_as, + flow_status AS flow_status, + raw_flow AS raw_flow, + is_flow_step AS is_flow_step, + language AS language, + suspend, + suspend_until, + same_worker AS same_worker, + raw_lock AS raw_lock, + pre_run_error AS pre_run_error, + email AS email, + visible_to_owner AS visible_to_owner, + mem_peak AS mem_peak, + root_job AS root_job, + leaf_jobs AS leaf_jobs, + tag, + concurrent_limit AS concurrent_limit, + concurrency_time_window_s AS concurrency_time_window_s, + timeout AS timeout, + flow_step_id AS flow_step_id, + cache_ttl AS cache_ttl, + priority + FROM v2_job_queue +); diff --git a/backend/migrations/20250117124410_v2_job_queue.down.sql b/backend/migrations/20250117124410_v2_job_queue.down.sql new file mode 100644 index 0000000000000..f913ee4b5eca7 --- /dev/null +++ b/backend/migrations/20250117124410_v2_job_queue.down.sql @@ -0,0 +1,32 @@ +-- Add down migration script here +ALTER TABLE v2_job_queue DROP COLUMN worker; +ALTER TABLE v2_job_queue RENAME COLUMN __parent_job TO parent_job; +ALTER TABLE v2_job_queue RENAME COLUMN __created_by TO created_by; +ALTER TABLE v2_job_queue RENAME COLUMN __script_hash TO script_hash; +ALTER TABLE v2_job_queue RENAME COLUMN __script_path TO script_path; +ALTER TABLE v2_job_queue RENAME COLUMN __args TO args; +ALTER TABLE v2_job_queue RENAME COLUMN __logs TO logs; +ALTER TABLE v2_job_queue RENAME COLUMN __raw_code TO raw_code; +ALTER TABLE v2_job_queue RENAME COLUMN __canceled TO canceled; +ALTER TABLE v2_job_queue RENAME COLUMN __last_ping TO last_ping; +ALTER TABLE v2_job_queue RENAME COLUMN __job_kind TO job_kind; +ALTER TABLE v2_job_queue RENAME COLUMN __env_id TO env_id; +ALTER TABLE v2_job_queue RENAME COLUMN __schedule_path TO schedule_path; +ALTER TABLE v2_job_queue RENAME COLUMN __permissioned_as TO permissioned_as; +ALTER TABLE v2_job_queue RENAME COLUMN __flow_status TO flow_status; +ALTER TABLE v2_job_queue RENAME COLUMN __raw_flow TO raw_flow; +ALTER TABLE v2_job_queue RENAME COLUMN __is_flow_step TO is_flow_step; +ALTER TABLE v2_job_queue RENAME COLUMN __language TO language; +ALTER TABLE v2_job_queue RENAME COLUMN __same_worker TO same_worker; +ALTER TABLE v2_job_queue RENAME COLUMN __raw_lock TO raw_lock; +ALTER TABLE v2_job_queue RENAME COLUMN __pre_run_error TO pre_run_error; +ALTER TABLE v2_job_queue RENAME COLUMN __email TO email; +ALTER TABLE v2_job_queue RENAME COLUMN __visible_to_owner TO visible_to_owner; +ALTER TABLE v2_job_queue RENAME COLUMN __mem_peak TO mem_peak; +ALTER TABLE v2_job_queue RENAME COLUMN __root_job TO root_job; +ALTER TABLE v2_job_queue RENAME COLUMN __leaf_jobs TO leaf_jobs; +ALTER TABLE v2_job_queue RENAME COLUMN __concurrent_limit TO concurrent_limit; +ALTER TABLE v2_job_queue RENAME COLUMN __concurrency_time_window_s TO concurrency_time_window_s; +ALTER TABLE v2_job_queue RENAME COLUMN __timeout TO timeout; +ALTER TABLE v2_job_queue RENAME COLUMN __flow_step_id TO flow_step_id; +ALTER TABLE v2_job_queue RENAME COLUMN __cache_ttl TO cache_ttl; diff --git a/backend/migrations/20250117124410_v2_job_queue.up.sql b/backend/migrations/20250117124410_v2_job_queue.up.sql new file mode 100644 index 0000000000000..75810c577e476 --- /dev/null +++ b/backend/migrations/20250117124410_v2_job_queue.up.sql @@ -0,0 +1,32 @@ +-- Add up migration script here +ALTER TABLE v2_job_queue ADD COLUMN IF NOT EXISTS worker VARCHAR(255); +ALTER TABLE v2_job_queue RENAME COLUMN parent_job TO __parent_job; +ALTER TABLE v2_job_queue RENAME COLUMN created_by TO __created_by; +ALTER TABLE v2_job_queue RENAME COLUMN script_hash TO __script_hash; +ALTER TABLE v2_job_queue RENAME COLUMN script_path TO __script_path; +ALTER TABLE v2_job_queue RENAME COLUMN args TO __args; +ALTER TABLE v2_job_queue RENAME COLUMN logs TO __logs; +ALTER TABLE v2_job_queue RENAME COLUMN raw_code TO __raw_code; +ALTER TABLE v2_job_queue RENAME COLUMN canceled TO __canceled; +ALTER TABLE v2_job_queue RENAME COLUMN last_ping TO __last_ping; +ALTER TABLE v2_job_queue RENAME COLUMN job_kind TO __job_kind; +ALTER TABLE v2_job_queue RENAME COLUMN env_id TO __env_id; +ALTER TABLE v2_job_queue RENAME COLUMN schedule_path TO __schedule_path; +ALTER TABLE v2_job_queue RENAME COLUMN permissioned_as TO __permissioned_as; +ALTER TABLE v2_job_queue RENAME COLUMN flow_status TO __flow_status; +ALTER TABLE v2_job_queue RENAME COLUMN raw_flow TO __raw_flow; +ALTER TABLE v2_job_queue RENAME COLUMN is_flow_step TO __is_flow_step; +ALTER TABLE v2_job_queue RENAME COLUMN language TO __language; +ALTER TABLE v2_job_queue RENAME COLUMN same_worker TO __same_worker; +ALTER TABLE v2_job_queue RENAME COLUMN raw_lock TO __raw_lock; +ALTER TABLE v2_job_queue RENAME COLUMN pre_run_error TO __pre_run_error; +ALTER TABLE v2_job_queue RENAME COLUMN email TO __email; +ALTER TABLE v2_job_queue RENAME COLUMN visible_to_owner TO __visible_to_owner; +ALTER TABLE v2_job_queue RENAME COLUMN mem_peak TO __mem_peak; +ALTER TABLE v2_job_queue RENAME COLUMN root_job TO __root_job; +ALTER TABLE v2_job_queue RENAME COLUMN leaf_jobs TO __leaf_jobs; +ALTER TABLE v2_job_queue RENAME COLUMN concurrent_limit TO __concurrent_limit; +ALTER TABLE v2_job_queue RENAME COLUMN concurrency_time_window_s TO __concurrency_time_window_s; +ALTER TABLE v2_job_queue RENAME COLUMN timeout TO __timeout; +ALTER TABLE v2_job_queue RENAME COLUMN flow_step_id TO __flow_step_id; +ALTER TABLE v2_job_queue RENAME COLUMN cache_ttl TO __cache_ttl; diff --git a/backend/migrations/20250117124422_v2_job_completed_compatibility_view.down.sql b/backend/migrations/20250117124422_v2_job_completed_compatibility_view.down.sql new file mode 100644 index 0000000000000..5eb87e01e0a20 --- /dev/null +++ b/backend/migrations/20250117124422_v2_job_completed_compatibility_view.down.sql @@ -0,0 +1,3 @@ +-- Add down migration script here +DROP VIEW IF EXISTS completed_job; +ALTER TABLE IF EXISTS v2_job_completed RENAME TO completed_job; diff --git a/backend/migrations/20250117124422_v2_job_completed_compatibility_view.up.sql b/backend/migrations/20250117124422_v2_job_completed_compatibility_view.up.sql new file mode 100644 index 0000000000000..7519a1c585f5e --- /dev/null +++ b/backend/migrations/20250117124422_v2_job_completed_compatibility_view.up.sql @@ -0,0 +1,39 @@ +-- Add down migration script here +ALTER TABLE completed_job RENAME TO v2_job_completed; +CREATE OR REPLACE VIEW completed_job AS ( + SELECT + id, + workspace_id, + parent_job AS parent_job, + created_by AS created_by, + created_at AS created_at, + duration_ms, + success AS success, + script_hash AS script_hash, + script_path AS script_path, + args AS args, + result, + logs AS logs, + deleted, + raw_code AS raw_code, + canceled AS canceled, + canceled_by, + canceled_reason, + job_kind AS job_kind, + env_id AS env_id, + schedule_path AS schedule_path, + permissioned_as AS permissioned_as, + flow_status, + raw_flow AS raw_flow, + is_flow_step AS is_flow_step, + language AS language, + started_at, + is_skipped AS is_skipped, + raw_lock AS raw_lock, + email AS email, + visible_to_owner AS visible_to_owner, + mem_peak AS mem_peak, + tag AS tag, + priority AS priority + FROM v2_job_completed +); diff --git a/backend/migrations/20250117124423_v2_job_completed.down.sql b/backend/migrations/20250117124423_v2_job_completed.down.sql new file mode 100644 index 0000000000000..4028819661ee7 --- /dev/null +++ b/backend/migrations/20250117124423_v2_job_completed.down.sql @@ -0,0 +1,30 @@ +-- Add down migration script here +ALTER TABLE v2_job_completed + DROP COLUMN status CASCADE, + DROP COLUMN completed_at CASCADE, + DROP COLUMN worker CASCADE; +ALTER TABLE v2_job_completed RENAME COLUMN memory_peak TO mem_peak; +ALTER TABLE v2_job_completed RENAME COLUMN __parent_job TO parent_job; +ALTER TABLE v2_job_completed RENAME COLUMN __created_by TO created_by; +ALTER TABLE v2_job_completed RENAME COLUMN __created_at TO created_at; +ALTER TABLE v2_job_completed RENAME COLUMN __success TO success; +ALTER TABLE v2_job_completed RENAME COLUMN __script_hash TO script_hash; +ALTER TABLE v2_job_completed RENAME COLUMN __script_path TO script_path; +ALTER TABLE v2_job_completed RENAME COLUMN __args TO args; +ALTER TABLE v2_job_completed RENAME COLUMN __logs TO logs; +ALTER TABLE v2_job_completed RENAME COLUMN __raw_code TO raw_code; +ALTER TABLE v2_job_completed RENAME COLUMN __canceled TO canceled; +ALTER TABLE v2_job_completed RENAME COLUMN __job_kind TO job_kind; +ALTER TABLE v2_job_completed RENAME COLUMN __env_id TO env_id; +ALTER TABLE v2_job_completed RENAME COLUMN __schedule_path TO schedule_path; +ALTER TABLE v2_job_completed RENAME COLUMN __permissioned_as TO permissioned_as; +ALTER TABLE v2_job_completed RENAME COLUMN __raw_flow TO raw_flow; +ALTER TABLE v2_job_completed RENAME COLUMN __is_flow_step TO is_flow_step; +ALTER TABLE v2_job_completed RENAME COLUMN __language TO language; +ALTER TABLE v2_job_completed RENAME COLUMN __is_skipped TO is_skipped; +ALTER TABLE v2_job_completed RENAME COLUMN __raw_lock TO raw_lock; +ALTER TABLE v2_job_completed RENAME COLUMN __email TO email; +ALTER TABLE v2_job_completed RENAME COLUMN __visible_to_owner TO visible_to_owner; +ALTER TABLE v2_job_completed RENAME COLUMN __tag TO tag; +ALTER TABLE v2_job_completed RENAME COLUMN __priority TO priority; +DROP TYPE IF EXISTS job_status CASCADE; diff --git a/backend/migrations/20250117124423_v2_job_completed.up.sql b/backend/migrations/20250117124423_v2_job_completed.up.sql new file mode 100644 index 0000000000000..09146ecc9b972 --- /dev/null +++ b/backend/migrations/20250117124423_v2_job_completed.up.sql @@ -0,0 +1,30 @@ +-- Add up migration script here +CREATE TYPE job_status AS ENUM ('success', 'failure', 'canceled', 'skipped'); +ALTER TABLE v2_job_completed + ADD COLUMN IF NOT EXISTS status job_status, + ADD COLUMN IF NOT EXISTS completed_at TIMESTAMP WITH TIME ZONE, + ADD COLUMN IF NOT EXISTS worker VARCHAR(255); +ALTER TABLE v2_job_completed RENAME COLUMN mem_peak TO memory_peak; +ALTER TABLE v2_job_completed RENAME COLUMN parent_job TO __parent_job; +ALTER TABLE v2_job_completed RENAME COLUMN created_by TO __created_by; +ALTER TABLE v2_job_completed RENAME COLUMN created_at TO __created_at; +ALTER TABLE v2_job_completed RENAME COLUMN success TO __success; +ALTER TABLE v2_job_completed RENAME COLUMN script_hash TO __script_hash; +ALTER TABLE v2_job_completed RENAME COLUMN script_path TO __script_path; +ALTER TABLE v2_job_completed RENAME COLUMN args TO __args; +ALTER TABLE v2_job_completed RENAME COLUMN logs TO __logs; +ALTER TABLE v2_job_completed RENAME COLUMN raw_code TO __raw_code; +ALTER TABLE v2_job_completed RENAME COLUMN canceled TO __canceled; +ALTER TABLE v2_job_completed RENAME COLUMN job_kind TO __job_kind; +ALTER TABLE v2_job_completed RENAME COLUMN env_id TO __env_id; +ALTER TABLE v2_job_completed RENAME COLUMN schedule_path TO __schedule_path; +ALTER TABLE v2_job_completed RENAME COLUMN permissioned_as TO __permissioned_as; +ALTER TABLE v2_job_completed RENAME COLUMN raw_flow TO __raw_flow; +ALTER TABLE v2_job_completed RENAME COLUMN is_flow_step TO __is_flow_step; +ALTER TABLE v2_job_completed RENAME COLUMN language TO __language; +ALTER TABLE v2_job_completed RENAME COLUMN is_skipped TO __is_skipped; +ALTER TABLE v2_job_completed RENAME COLUMN raw_lock TO __raw_lock; +ALTER TABLE v2_job_completed RENAME COLUMN email TO __email; +ALTER TABLE v2_job_completed RENAME COLUMN visible_to_owner TO __visible_to_owner; +ALTER TABLE v2_job_completed RENAME COLUMN tag TO __tag; +ALTER TABLE v2_job_completed RENAME COLUMN priority TO __priority; diff --git a/backend/migrations/20250117124430_v2_job_compatibility_view.down.sql b/backend/migrations/20250117124430_v2_job_compatibility_view.down.sql new file mode 100644 index 0000000000000..45dd1e24a3392 --- /dev/null +++ b/backend/migrations/20250117124430_v2_job_compatibility_view.down.sql @@ -0,0 +1,3 @@ +-- Add down migration script here +DROP VIEW IF EXISTS job; +ALTER TABLE IF EXISTS v2_job RENAME TO job; diff --git a/backend/migrations/20250117124430_v2_job_compatibility_view.up.sql b/backend/migrations/20250117124430_v2_job_compatibility_view.up.sql new file mode 100644 index 0000000000000..246d46e2b69d6 --- /dev/null +++ b/backend/migrations/20250117124430_v2_job_compatibility_view.up.sql @@ -0,0 +1,6 @@ +-- Add down migration script here +ALTER TABLE job RENAME TO v2_job; +CREATE OR REPLACE VIEW job AS ( + SELECT id, raw_code, raw_lock, raw_flow, tag, workspace_id + FROM v2_job +); diff --git a/backend/migrations/20250117124431_v2_job.down.sql b/backend/migrations/20250117124431_v2_job.down.sql new file mode 100644 index 0000000000000..7a49ed8997d39 --- /dev/null +++ b/backend/migrations/20250117124431_v2_job.down.sql @@ -0,0 +1,31 @@ +-- Add down migration script here +DROP POLICY see_folder_extra_perms_user ON v2_job; +DROP POLICY see_own_path ON v2_job; +DROP POLICY see_member_path ON v2_job; +DROP POLICY see_own ON v2_job; +DROP POLICY see_member ON v2_job; +ALTER TABLE v2_job + DROP COLUMN created_at CASCADE, + DROP COLUMN created_by CASCADE, + DROP COLUMN permissioned_as CASCADE, + DROP COLUMN permissioned_as_email CASCADE, + DROP COLUMN kind CASCADE, + DROP COLUMN runnable_id CASCADE, + DROP COLUMN runnable_path CASCADE, + DROP COLUMN parent_job CASCADE, + DROP COLUMN script_lang CASCADE, + DROP COLUMN flow_step CASCADE, + DROP COLUMN flow_step_id CASCADE, + DROP COLUMN flow_root_job CASCADE, + DROP COLUMN trigger CASCADE, + DROP COLUMN trigger_kind CASCADE, + DROP COLUMN same_worker CASCADE, + DROP COLUMN visible_to_owner CASCADE, + DROP COLUMN concurrent_limit CASCADE, + DROP COLUMN concurrency_time_window_s CASCADE, + DROP COLUMN cache_ttl CASCADE, + DROP COLUMN timeout CASCADE, + DROP COLUMN priority CASCADE, + DROP COLUMN args CASCADE, + DROP COLUMN pre_run_error CASCADE; +DROP TYPE job_trigger_kind; diff --git a/backend/migrations/20250117124431_v2_job.up.sql b/backend/migrations/20250117124431_v2_job.up.sql new file mode 100644 index 0000000000000..2dc39e4c5e25f --- /dev/null +++ b/backend/migrations/20250117124431_v2_job.up.sql @@ -0,0 +1,67 @@ +-- Add up migration script here +CREATE TYPE job_trigger_kind AS ENUM ('webhook', 'http', 'websocket', 'kafka', 'email', 'nats', 'schedule', 'app', 'ui'); + +ALTER TABLE v2_job + ADD COLUMN IF NOT EXISTS created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, + ADD COLUMN IF NOT EXISTS created_by VARCHAR(255) DEFAULT 'missing' NOT NULL, + ADD COLUMN IF NOT EXISTS permissioned_as VARCHAR(55) DEFAULT 'g/all' NOT NULL, + ADD COLUMN IF NOT EXISTS permissioned_as_email VARCHAR(255) DEFAULT 'missing@email.xyz' NOT NULL, + ADD COLUMN IF NOT EXISTS kind job_kind DEFAULT 'script'::job_kind NOT NULL, + ADD COLUMN IF NOT EXISTS runnable_id BIGINT, + ADD COLUMN IF NOT EXISTS runnable_path VARCHAR(255), + ADD COLUMN IF NOT EXISTS parent_job UUID, + ADD COLUMN IF NOT EXISTS script_lang script_lang DEFAULT 'python3'::script_lang, + ADD COLUMN IF NOT EXISTS flow_step INTEGER, + ADD COLUMN IF NOT EXISTS flow_step_id VARCHAR(255), + ADD COLUMN IF NOT EXISTS flow_root_job UUID, + ADD COLUMN IF NOT EXISTS trigger VARCHAR(255), + ADD COLUMN IF NOT EXISTS trigger_kind job_trigger_kind, + ADD COLUMN IF NOT EXISTS same_worker BOOLEAN DEFAULT FALSE NOT NULL, + ADD COLUMN IF NOT EXISTS visible_to_owner BOOLEAN DEFAULT TRUE NOT NULL, + ADD COLUMN IF NOT EXISTS concurrent_limit INTEGER, + ADD COLUMN IF NOT EXISTS concurrency_time_window_s INTEGER, + ADD COLUMN IF NOT EXISTS cache_ttl INTEGER, + ADD COLUMN IF NOT EXISTS timeout INTEGER, + ADD COLUMN IF NOT EXISTS priority SMALLINT, + ADD COLUMN IF NOT EXISTS args JSONB, + ADD COLUMN IF NOT EXISTS pre_run_error TEXT; + +CREATE POLICY see_folder_extra_perms_user ON v2_job + AS PERMISSIVE + FOR ALL + TO windmill_user + USING ((visible_to_owner IS TRUE) AND (SPLIT_PART((runnable_path)::TEXT, '/'::TEXT, 1) = 'f'::TEXT) AND + (SPLIT_PART((runnable_path)::TEXT, '/'::TEXT, 2) = ANY ( + REGEXP_SPLIT_TO_ARRAY(CURRENT_SETTING('session.folders_read'::TEXT), ','::TEXT)))); + +CREATE POLICY see_own_path ON v2_job + AS PERMISSIVE + FOR ALL + TO windmill_user + USING ((visible_to_owner IS TRUE) AND (SPLIT_PART((runnable_path)::TEXT, '/'::TEXT, 1) = 'u'::TEXT) AND + (SPLIT_PART((runnable_path)::TEXT, '/'::TEXT, 2) = CURRENT_SETTING('session.user'::TEXT))); + +CREATE POLICY see_member_path ON v2_job + AS PERMISSIVE + FOR ALL + TO windmill_user + USING ((visible_to_owner IS TRUE) AND (SPLIT_PART((runnable_path)::TEXT, '/'::TEXT, 1) = 'g'::TEXT) AND + (SPLIT_PART((runnable_path)::TEXT, '/'::TEXT, 2) = ANY + (REGEXP_SPLIT_TO_ARRAY(CURRENT_SETTING('session.groups'::TEXT), ','::TEXT)))); + +CREATE POLICY see_own ON v2_job + AS PERMISSIVE + FOR ALL + TO windmill_user + USING ((SPLIT_PART((permissioned_as)::TEXT, '/'::TEXT, 1) = 'f'::TEXT) AND + (SPLIT_PART((permissioned_as)::TEXT, '/'::TEXT, 2) = CURRENT_SETTING('session.user'::TEXT))); + +CREATE POLICY see_member ON v2_job + AS PERMISSIVE + FOR ALL + TO windmill_user + USING ((SPLIT_PART((permissioned_as)::TEXT, '/'::TEXT, 1) = 'g'::TEXT) AND + (SPLIT_PART((permissioned_as)::TEXT, '/'::TEXT, 2) = ANY + (REGEXP_SPLIT_TO_ARRAY(CURRENT_SETTING('session.groups'::TEXT), ','::TEXT)))); + +GRANT ALL ON v2_job TO windmill_user, windmill_admin; diff --git a/backend/migrations/20250117124743_v2_job_queue_sync.down.sql b/backend/migrations/20250117124743_v2_job_queue_sync.down.sql new file mode 100644 index 0000000000000..b64ab629bfb1d --- /dev/null +++ b/backend/migrations/20250117124743_v2_job_queue_sync.down.sql @@ -0,0 +1,4 @@ +-- Add down migration script here +DROP FUNCTION v2_job_queue_before_insert() CASCADE; +DROP FUNCTION v2_job_queue_after_insert() CASCADE; +DROP FUNCTION v2_job_queue_before_update() CASCADE; diff --git a/backend/migrations/20250117124743_v2_job_queue_sync.up.sql b/backend/migrations/20250117124743_v2_job_queue_sync.up.sql new file mode 100644 index 0000000000000..cda1ae913cc80 --- /dev/null +++ b/backend/migrations/20250117124743_v2_job_queue_sync.up.sql @@ -0,0 +1,178 @@ +-- Add up migration script here + +-- v2 -> v1 +-- This trigger will be removed once all server(s)/worker(s) are updated to use `v2_*` tables +CREATE OR REPLACE FUNCTION v2_job_queue_before_insert() RETURNS TRIGGER AS $$ +DECLARE job v2_job; +BEGIN + -- When inserting to `v2_job_queue` from `v2` code, set `v1` columns: + SELECT * INTO job FROM v2_job WHERE id = NEW.id; + NEW.__parent_job := job.parent_job; + NEW.__created_by := job.created_by; + NEW.__script_hash := job.runnable_id; + NEW.__script_path := job.runnable_path; + NEW.__args := job.args; + -- __logs + NEW.__raw_code := job.raw_code; + NEW.__canceled := NEW.canceled_by IS NOT NULL; + -- __last_ping + NEW.__job_kind := job.kind; + NEW.__env_id := 0xcafe; -- Magic used bellow. + NEW.__schedule_path := CASE WHEN job.trigger_kind = 'schedule'::job_trigger_kind THEN job.trigger END; + NEW.__permissioned_as := job.permissioned_as; + -- __flow_status + NEW.__raw_flow := job.raw_flow; + NEW.__is_flow_step := job.flow_step_id IS NOT NULL; + NEW.__language := job.script_lang; + NEW.__same_worker := job.same_worker; + NEW.__raw_lock := job.raw_lock; + NEW.__pre_run_error := job.pre_run_error; + NEW.__email := job.permissioned_as_email; + NEW.__visible_to_owner := job.visible_to_owner; + -- __mem_peak + NEW.__root_job := job.flow_root_job; + -- __leaf_jobs + NEW.__concurrent_limit := job.concurrent_limit; + NEW.__concurrency_time_window_s := job.concurrency_time_window_s; + NEW.__timeout := job.timeout; + NEW.__flow_step_id := job.flow_step_id; + NEW.__cache_ttl := job.cache_ttl; + RETURN NEW; +END $$ LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER v2_job_queue_before_insert_trigger + BEFORE INSERT ON v2_job_queue + FOR EACH ROW + WHEN (pg_trigger_depth() < 1 AND NEW.__created_by IS NULL) -- Prevent infinite loop v1 <-> v2 +EXECUTE FUNCTION v2_job_queue_before_insert(); + +-- v1 -> v2 +-- On every insert to `v2_job_queue`, insert to `v2_job`, `v2_job_runtime` and `v2_job_flow_runtime` as well +-- This trigger will be removed once all server(s)/worker(s) are updated to use `v2_*` tables +CREATE OR REPLACE FUNCTION v2_job_queue_after_insert() RETURNS TRIGGER AS $$ BEGIN + INSERT INTO v2_job ( + id, workspace_id, created_at, created_by, permissioned_as, permissioned_as_email, + kind, runnable_id, runnable_path, parent_job, + script_lang, + flow_step, flow_step_id, flow_root_job, + trigger, trigger_kind, + tag, same_worker, visible_to_owner, concurrent_limit, concurrency_time_window_s, cache_ttl, timeout, priority, + args, pre_run_error, + raw_code, raw_lock, raw_flow + ) VALUES ( + NEW.id, NEW.workspace_id, NEW.created_at, NEW.__created_by, NEW.__permissioned_as, NEW.__email, + NEW.__job_kind, NEW.__script_hash, NEW.__script_path, NEW.__parent_job, + NEW.__language, + NULL, NEW.__flow_step_id, NEW.__root_job, + NEW.__schedule_path, CASE WHEN NEW.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, + NEW.tag, NEW.__same_worker, NEW.__visible_to_owner, NEW.__concurrent_limit, NEW.__concurrency_time_window_s, + NEW.__cache_ttl, NEW.__timeout, NEW.priority, + NEW.__args, NEW.__pre_run_error, + NEW.__raw_code, NEW.__raw_lock, NEW.__raw_flow + ) ON CONFLICT (id) DO UPDATE SET + workspace_id = EXCLUDED.workspace_id, + created_at = EXCLUDED.created_at, + created_by = EXCLUDED.created_by, + permissioned_as = EXCLUDED.permissioned_as, + permissioned_as_email = EXCLUDED.permissioned_as_email, + kind = EXCLUDED.kind, + runnable_id = EXCLUDED.runnable_id, + runnable_path = EXCLUDED.runnable_path, + parent_job = EXCLUDED.parent_job, + script_lang = EXCLUDED.script_lang, + flow_step = EXCLUDED.flow_step, + flow_step_id = EXCLUDED.flow_step_id, + flow_root_job = EXCLUDED.flow_root_job, + trigger = EXCLUDED.trigger, + trigger_kind = EXCLUDED.trigger_kind, + tag = EXCLUDED.tag, + same_worker = EXCLUDED.same_worker, + visible_to_owner = EXCLUDED.visible_to_owner, + concurrent_limit = EXCLUDED.concurrent_limit, + concurrency_time_window_s = EXCLUDED.concurrency_time_window_s, + cache_ttl = EXCLUDED.cache_ttl, + timeout = EXCLUDED.timeout, + priority = EXCLUDED.priority, + args = EXCLUDED.args, + pre_run_error = EXCLUDED.pre_run_error, + raw_code = COALESCE(v2_job.raw_code, EXCLUDED.raw_code), + raw_lock = COALESCE(v2_job.raw_lock, EXCLUDED.raw_lock), + raw_flow = COALESCE(v2_job.raw_flow, EXCLUDED.raw_flow) + ; + INSERT INTO v2_job_runtime (id, ping, memory_peak) + VALUES (NEW.id, NEW.__last_ping, NEW.__mem_peak) + ON CONFLICT (id) DO UPDATE SET + ping = COALESCE(v2_job_runtime.ping, EXCLUDED.ping), + memory_peak = COALESCE(v2_job_runtime.memory_peak, EXCLUDED.memory_peak) + ; + IF NEW.__flow_status IS NOT NULL OR NEW.__leaf_jobs IS NOT NULL THEN + INSERT INTO v2_job_flow_runtime (id, flow_status, leaf_jobs) + VALUES (NEW.id, NEW.__flow_status, NEW.__leaf_jobs) + ON CONFLICT (id) DO UPDATE SET + flow_status = COALESCE(v2_job_flow_runtime.flow_status, EXCLUDED.flow_status), + leaf_jobs = COALESCE(v2_job_flow_runtime.leaf_jobs, EXCLUDED.leaf_jobs) + ; + END IF; + IF NEW.__logs IS NOT NULL THEN + INSERT INTO job_logs (job_id, workspace_id, logs) + VALUES (NEW.id, NEW.workspace_id, NEW.__logs) + ON CONFLICT (job_id) DO UPDATE SET + logs = CONCAT(job_logs.logs, EXCLUDED.logs) + ; + NEW.__logs := NULL; + END IF; + RETURN NEW; +END $$ LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER v2_job_queue_after_insert_trigger + AFTER INSERT ON v2_job_queue + FOR EACH ROW + -- Prevent infinite loop v1 <-> v2 + WHEN (pg_trigger_depth() < 1 AND NEW.__created_by IS NOT NULL AND NEW.__env_id IS DISTINCT FROM 0xcafe) +EXECUTE FUNCTION v2_job_queue_after_insert(); + +-- On every update to `v2_job_queue`, update `v2_job`, `v2_job_runtime` and `v2_job_flow_runtime` as well +-- This trigger will be removed once all server(s)/worker(s) are updated to use `v2_*` tables +CREATE OR REPLACE FUNCTION v2_job_queue_before_update() RETURNS TRIGGER AS $$ BEGIN + -- `v2_job`: Only `args` are updated + IF NEW.__args::text IS DISTINCT FROM OLD.__args::text THEN + UPDATE v2_job + SET args = NEW.__args + WHERE id = NEW.id; + END IF; + -- `v2_job_runtime`: + IF NEW.__last_ping IS DISTINCT FROM OLD.__last_ping OR NEW.__mem_peak IS DISTINCT FROM OLD.__mem_peak THEN + INSERT INTO v2_job_runtime (id, ping, memory_peak) + VALUES (NEW.id, NEW.__last_ping, NEW.__mem_peak) + ON CONFLICT (id) DO UPDATE SET + ping = EXCLUDED.ping, + memory_peak = EXCLUDED.memory_peak + ; + END IF; + -- `v2_job_flow_runtime`: + IF NEW.__flow_status::text IS DISTINCT FROM OLD.__flow_status::text OR + NEW.__leaf_jobs::text IS DISTINCT FROM OLD.__leaf_jobs::text THEN + INSERT INTO v2_job_flow_runtime (id, flow_status, leaf_jobs) + VALUES (NEW.id, NEW.__flow_status, NEW.__leaf_jobs) + ON CONFLICT (id) DO UPDATE SET + flow_status = EXCLUDED.flow_status, + leaf_jobs = EXCLUDED.leaf_jobs + ; + END IF; + -- `job_logs`: + IF NEW.__logs IS DISTINCT FROM OLD.__logs THEN + INSERT INTO job_logs (job_id, workspace_id, logs) + VALUES (NEW.id, NEW.workspace_id, NEW.__logs) + ON CONFLICT (job_id) DO UPDATE SET + logs = CONCAT(job_logs.logs, EXCLUDED.logs) + ; + NEW.__logs := NULL; + END IF; + RETURN NEW; +END $$ LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER v2_job_queue_before_update_trigger + BEFORE UPDATE ON v2_job_queue + FOR EACH ROW + WHEN (pg_trigger_depth() < 1) -- Prevent infinite loop v1 <-> v2 +EXECUTE FUNCTION v2_job_queue_before_update(); diff --git a/backend/migrations/20250117124744_v2_job_completed_sync.down.sql b/backend/migrations/20250117124744_v2_job_completed_sync.down.sql new file mode 100644 index 0000000000000..5ceab6f325fac --- /dev/null +++ b/backend/migrations/20250117124744_v2_job_completed_sync.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP FUNCTION v2_job_completed_before_insert() CASCADE; diff --git a/backend/migrations/20250117124744_v2_job_completed_sync.up.sql b/backend/migrations/20250117124744_v2_job_completed_sync.up.sql new file mode 100644 index 0000000000000..7ae14434652bc --- /dev/null +++ b/backend/migrations/20250117124744_v2_job_completed_sync.up.sql @@ -0,0 +1,53 @@ +-- Add up migration script here + +-- v1 -> v2 +-- On every insert to `v2_job_completed`, insert to `v2_job` as well +-- This trigger will be removed once all server(s)/worker(s) are updated to use `v2_*` tables +CREATE OR REPLACE FUNCTION v2_job_completed_before_insert() RETURNS TRIGGER AS $$ +DECLARE job v2_job; +BEGIN + IF NEW.__created_by IS NULL THEN + -- v2 -> v1 + -- When inserting to `v2_job_completed` from `v2` code, set `v1` columns: + SELECT * INTO job FROM v2_job WHERE id = NEW.id; + NEW.__parent_job := job.parent_job; + NEW.__created_by := job.created_by; + NEW.__created_at := job.created_at; + NEW.__success := NEW.status = 'success'::job_status; + NEW.__script_hash := job.runnable_id; + NEW.__script_path := job.runnable_path; + NEW.__args := job.args; + -- __logs + NEW.__raw_code := job.raw_code; + NEW.__canceled := NEW.status = 'canceled'::job_status; + NEW.__job_kind := job.kind; + -- __env_id + NEW.__schedule_path := CASE WHEN job.trigger_kind = 'schedule'::job_trigger_kind THEN job.trigger END; + NEW.__permissioned_as := job.permissioned_as; + NEW.__raw_flow := job.raw_flow; + NEW.__is_flow_step := job.flow_step_id IS NOT NULL; + NEW.__language := job.script_lang; + NEW.__is_skipped := NEW.status = 'skipped'::job_status; + NEW.__raw_lock := job.raw_lock; + NEW.__email := job.permissioned_as_email; + NEW.__visible_to_owner := job.visible_to_owner; + NEW.__tag := job.tag; + NEW.__priority := job.priority; + ELSE + -- v1 -> v2 + NEW.completed_at := now(); + NEW.status := CASE + WHEN NEW.__is_skipped THEN 'skipped'::job_status + WHEN NEW.__canceled THEN 'canceled'::job_status + WHEN NEW.__success THEN 'success'::job_status + ELSE 'failure'::job_status + END; + END IF; + RETURN NEW; +END $$ LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER v2_job_completed_before_insert_trigger + BEFORE INSERT ON v2_job_completed + FOR EACH ROW + WHEN (pg_trigger_depth() < 1) -- Prevent infinite loop v1 <-> v2 +EXECUTE FUNCTION v2_job_completed_before_insert(); diff --git a/backend/migrations/20250117124748_v2_migrate_from_v1.down.sql b/backend/migrations/20250117124748_v2_migrate_from_v1.down.sql new file mode 100644 index 0000000000000..6bc376fc3d78d --- /dev/null +++ b/backend/migrations/20250117124748_v2_migrate_from_v1.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +-- Nothing to do here diff --git a/backend/migrations/20250117124748_v2_migrate_from_v1.up.sql b/backend/migrations/20250117124748_v2_migrate_from_v1.up.sql new file mode 100644 index 0000000000000..ac3cd954b6bb6 --- /dev/null +++ b/backend/migrations/20250117124748_v2_migrate_from_v1.up.sql @@ -0,0 +1,144 @@ +-- Add up migration script here + +-- Set new columns in `v2_job_completed`: +UPDATE v2_job_completed +SET completed_at = started_at + (interval '1 millisecond' * duration_ms), + status = CASE + WHEN __is_skipped THEN 'skipped'::job_status + WHEN __canceled THEN 'canceled'::job_status + WHEN __success THEN 'success'::job_status + ELSE 'failure'::job_status END +WHERE status IS NULL; + +-- Insert missing `v2_job` rows from `v2_job_queue`: +INSERT INTO v2_job ( + id, workspace_id, created_at, created_by, permissioned_as, permissioned_as_email, + kind, runnable_id, runnable_path, parent_job, + script_lang, + flow_step_id, flow_root_job, + trigger, trigger_kind, + tag, same_worker, visible_to_owner, concurrent_limit, concurrency_time_window_s, cache_ttl, timeout, priority, + args, pre_run_error, + raw_code, raw_lock, raw_flow +) SELECT + id, workspace_id, created_at, __created_by, __permissioned_as, __email, + __job_kind, __script_hash, __script_path, __parent_job, + __language, + __flow_step_id, __root_job, + __schedule_path, CASE WHEN __schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, + tag, __same_worker, __visible_to_owner, __concurrent_limit, __concurrency_time_window_s, + __cache_ttl, __timeout, priority, + __args, __pre_run_error, + __raw_code, __raw_lock, __raw_flow +FROM v2_job_queue +WHERE NOT EXISTS (SELECT 1 FROM v2_job WHERE v2_job.id = v2_job_queue.id); + +-- Insert missing `v2_job` rows from `v2_job_completed`: +INSERT INTO v2_job ( + id, workspace_id, created_at, created_by, permissioned_as, permissioned_as_email, + kind, runnable_id, runnable_path, parent_job, + script_lang, + trigger, trigger_kind, + tag, visible_to_owner, priority, + args, + raw_code, raw_lock, raw_flow +) SELECT + id, workspace_id, __created_at, __created_by, __permissioned_as, __email, + __job_kind, __script_hash, __script_path, __parent_job, + __language, + __schedule_path, CASE WHEN __schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, + __tag, __visible_to_owner, __priority, + __args, + __raw_code, __raw_lock, __raw_flow +FROM v2_job_completed +WHERE NOT EXISTS (SELECT 1 FROM v2_job WHERE v2_job.id = v2_job_completed.id); + +-- Set existing `v2_job` rows from `v2_job_queue`: +UPDATE v2_job SET + created_at = v2_job_queue.created_at, + created_by = v2_job_queue.__created_by, + permissioned_as = v2_job_queue.__permissioned_as, + permissioned_as_email = v2_job_queue.__email, + kind = v2_job_queue.__job_kind, + runnable_id = v2_job_queue.__script_hash, + runnable_path = v2_job_queue.__script_path, + parent_job = v2_job_queue.__parent_job, + script_lang = v2_job_queue.__language, + flow_step_id = v2_job_queue.__flow_step_id, + flow_root_job = v2_job_queue.__root_job, + trigger = v2_job_queue.__schedule_path, + trigger_kind = CASE WHEN v2_job_queue.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, + tag = v2_job_queue.tag, + same_worker = v2_job_queue.__same_worker, + visible_to_owner = v2_job_queue.__visible_to_owner, + concurrent_limit = v2_job_queue.__concurrent_limit, + concurrency_time_window_s = v2_job_queue.__concurrency_time_window_s, + cache_ttl = v2_job_queue.__cache_ttl, + timeout = v2_job_queue.__timeout, + priority = v2_job_queue.priority, + args = v2_job_queue.__args, + pre_run_error = v2_job_queue.__pre_run_error, + raw_code = COALESCE(v2_job.raw_code, v2_job_queue.__raw_code), + raw_lock = COALESCE(v2_job.raw_lock, v2_job_queue.__raw_lock), + raw_flow = COALESCE(v2_job.raw_flow, v2_job_queue.__raw_flow) +FROM v2_job_queue +WHERE v2_job.id = v2_job_queue.id AND v2_job.created_by = 'missing'; + +-- Set existing `v2_job` rows from `v2_job_completed`: +UPDATE v2_job SET + created_at = v2_job_completed.__created_at, + created_by = v2_job_completed.__created_by, + permissioned_as = v2_job_completed.__permissioned_as, + permissioned_as_email = v2_job_completed.__email, + kind = v2_job_completed.__job_kind, + runnable_id = v2_job_completed.__script_hash, + runnable_path = v2_job_completed.__script_path, + parent_job = v2_job_completed.__parent_job, + script_lang = v2_job_completed.__language, + trigger = v2_job_completed.__schedule_path, + trigger_kind = CASE WHEN v2_job_completed.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END, + tag = v2_job_completed.__tag, + visible_to_owner = v2_job_completed.__visible_to_owner, + priority = v2_job_completed.__priority, + args = v2_job_completed.__args, + raw_code = COALESCE(v2_job.raw_code, v2_job_completed.__raw_code), + raw_lock = COALESCE(v2_job.raw_lock, v2_job_completed.__raw_lock), + raw_flow = COALESCE(v2_job.raw_flow, v2_job_completed.__raw_flow) +FROM v2_job_completed +WHERE v2_job.id = v2_job_completed.id AND v2_job.created_by = 'missing'; + +-- Migrate `v2_job_queue` moved columns to `v2_job_runtime`: +INSERT INTO v2_job_runtime (id, ping, memory_peak) +SELECT id, __last_ping, __mem_peak +FROM v2_job_queue +WHERE NOT running AND __last_ping IS NOT NULL OR __mem_peak IS NOT NULL + -- Locked ones will sync within triggers + FOR UPDATE SKIP LOCKED +ON CONFLICT (id) DO NOTHING; + +-- Migrate `v2_job_queue` moved columns to `v2_flow_job_runtime`: +INSERT INTO v2_job_flow_runtime (id, flow_status, leaf_jobs) +SELECT id, __flow_status, __leaf_jobs +FROM v2_job_queue +WHERE NOT running AND __flow_status IS NOT NULL OR __leaf_jobs IS NOT NULL + -- Locked ones will sync within triggers + FOR UPDATE SKIP LOCKED +ON CONFLICT (id) DO NOTHING; + +-- Migrate old `v2_job_queue.__logs` to `job_logs` +INSERT INTO job_logs (job_id, workspace_id, logs) +SELECT id, workspace_id, __logs +FROM v2_job_queue +WHERE __logs IS NOT NULL +ON CONFLICT (job_id) DO UPDATE SET + logs = CONCAT(job_logs.logs, EXCLUDED.logs) +; + +-- Migrate old `v2_job_completed.__logs` to `job_logs` +INSERT INTO job_logs (job_id, workspace_id, logs) +SELECT id, workspace_id, __logs +FROM v2_job_completed +WHERE __logs IS NOT NULL AND __logs IS DISTINCT FROM '##DELETED##' +ON CONFLICT (job_id) DO UPDATE SET + logs = CONCAT(job_logs.logs, EXCLUDED.logs) +; diff --git a/backend/migrations/20250117124804_v2_sync_runtime.down.sql b/backend/migrations/20250117124804_v2_sync_runtime.down.sql new file mode 100644 index 0000000000000..9c9fe30241f7e --- /dev/null +++ b/backend/migrations/20250117124804_v2_sync_runtime.down.sql @@ -0,0 +1,3 @@ +-- Add down migration script here +DROP FUNCTION v2_job_runtime_before_insert() CASCADE; +DROP FUNCTION v2_job_runtime_before_update() CASCADE; diff --git a/backend/migrations/20250117124804_v2_sync_runtime.up.sql b/backend/migrations/20250117124804_v2_sync_runtime.up.sql new file mode 100644 index 0000000000000..26947c3615745 --- /dev/null +++ b/backend/migrations/20250117124804_v2_sync_runtime.up.sql @@ -0,0 +1,31 @@ +-- Add up migration script here + +-- On every insert/update to `v2_job_runtime`, reflect to `v2_job_queue` as well +-- This triggers will be removed once all server(s)/worker(s) are updated to use `v2_*` tables +CREATE OR REPLACE FUNCTION v2_job_runtime_before_insert() RETURNS TRIGGER AS $$ BEGIN + UPDATE v2_job_queue + SET __last_ping = NEW.ping, __mem_peak = NEW.memory_peak + WHERE id = NEW.id; + RETURN NEW; +END $$ LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER v2_job_runtime_before_insert_trigger + BEFORE INSERT ON v2_job_runtime + FOR EACH ROW + WHEN (pg_trigger_depth() < 1) -- Prevent infinite loop v2 <-> v1 +EXECUTE FUNCTION v2_job_runtime_before_insert(); + +CREATE OR REPLACE FUNCTION v2_job_runtime_before_update() RETURNS TRIGGER AS $$ BEGIN + IF NEW.ping IS DISTINCT FROM OLD.ping OR NEW.memory_peak IS DISTINCT FROM OLD.memory_peak THEN + UPDATE v2_job_queue + SET __last_ping = NEW.ping, __mem_peak = NEW.memory_peak + WHERE id = NEW.id; + END IF; + RETURN NEW; +END $$ LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER v2_job_runtime_before_update_trigger + BEFORE UPDATE ON v2_job_runtime + FOR EACH ROW + WHEN (pg_trigger_depth() < 1) -- Prevent infinite loop v2 <-> v1 +EXECUTE FUNCTION v2_job_runtime_before_update(); diff --git a/backend/migrations/20250117124805_v2_sync_flow_runtime.down.sql b/backend/migrations/20250117124805_v2_sync_flow_runtime.down.sql new file mode 100644 index 0000000000000..8fba34b2a46bd --- /dev/null +++ b/backend/migrations/20250117124805_v2_sync_flow_runtime.down.sql @@ -0,0 +1,3 @@ +-- Add down migration script here +DROP FUNCTION v2_job_flow_runtime_before_insert() CASCADE; +DROP FUNCTION v2_job_flow_runtime_before_update() CASCADE; diff --git a/backend/migrations/20250117124805_v2_sync_flow_runtime.up.sql b/backend/migrations/20250117124805_v2_sync_flow_runtime.up.sql new file mode 100644 index 0000000000000..09ed5720ecce6 --- /dev/null +++ b/backend/migrations/20250117124805_v2_sync_flow_runtime.up.sql @@ -0,0 +1,32 @@ +-- Add up migration script here + +-- On every insert/update to `v2_job_flow_runtime`, reflect to `v2_job_queue` as well +-- This triggers will be removed once all server(s)/worker(s) are updated to use `v2_*` tables +CREATE OR REPLACE FUNCTION v2_job_flow_runtime_before_insert() RETURNS TRIGGER AS $$ BEGIN + UPDATE v2_job_queue + SET __flow_status = NEW.flow_status, __leaf_jobs = NEW.leaf_jobs + WHERE id = NEW.id; + RETURN NEW; +END $$ LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER v2_job_flow_runtime_before_insert_trigger + BEFORE INSERT ON v2_job_flow_runtime + FOR EACH ROW + WHEN (pg_trigger_depth() < 1) -- Prevent infinite loop v2 <-> v1 +EXECUTE FUNCTION v2_job_flow_runtime_before_insert(); + +CREATE OR REPLACE FUNCTION v2_job_flow_runtime_before_update() RETURNS TRIGGER AS $$ BEGIN + IF NEW.flow_status::TEXT IS DISTINCT FROM OLD.flow_status::TEXT OR + NEW.leaf_jobs::TEXT IS DISTINCT FROM OLD.leaf_jobs::TEXT THEN + UPDATE v2_job_queue + SET __flow_status = NEW.flow_status, __leaf_jobs = NEW.leaf_jobs + WHERE id = NEW.id; + END IF; + RETURN NEW; +END $$ LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER v2_job_flow_runtime_before_update_trigger + BEFORE UPDATE ON v2_job_flow_runtime + FOR EACH ROW + WHEN (pg_trigger_depth() < 1) -- Prevent infinite loop v2 <-> v1 +EXECUTE FUNCTION v2_job_flow_runtime_before_update(); diff --git a/backend/migrations/20250117124919_v2_job_constraints.down.sql b/backend/migrations/20250117124919_v2_job_constraints.down.sql new file mode 100644 index 0000000000000..72796ff008b58 --- /dev/null +++ b/backend/migrations/20250117124919_v2_job_constraints.down.sql @@ -0,0 +1,4 @@ +-- Add down migration script here +ALTER TABLE v2_job ALTER COLUMN workspace_id DROP NOT NULL; +ALTER TABLE v2_job ALTER COLUMN tag DROP DEFAULT; +ALTER TABLE v2_job ALTER COLUMN tag DROP NOT NULL; diff --git a/backend/migrations/20250117124919_v2_job_constraints.up.sql b/backend/migrations/20250117124919_v2_job_constraints.up.sql new file mode 100644 index 0000000000000..b0ba090fa9c39 --- /dev/null +++ b/backend/migrations/20250117124919_v2_job_constraints.up.sql @@ -0,0 +1,4 @@ +-- Add up migration script here +ALTER TABLE v2_job ALTER COLUMN workspace_id SET NOT NULL; +ALTER TABLE v2_job ALTER COLUMN tag SET DEFAULT 'other'; +ALTER TABLE v2_job ALTER COLUMN tag SET NOT NULL; diff --git a/backend/migrations/20250117145628_v2_job_queue_constraints.down.sql b/backend/migrations/20250117145628_v2_job_queue_constraints.down.sql new file mode 100644 index 0000000000000..9f13ce1b41241 --- /dev/null +++ b/backend/migrations/20250117145628_v2_job_queue_constraints.down.sql @@ -0,0 +1,2 @@ +-- Add up migration script here +ALTER TABLE v2_job_queue ALTER COLUMN __created_by SET NOT NULL; diff --git a/backend/migrations/20250117145628_v2_job_queue_constraints.up.sql b/backend/migrations/20250117145628_v2_job_queue_constraints.up.sql new file mode 100644 index 0000000000000..5eac732ab0a70 --- /dev/null +++ b/backend/migrations/20250117145628_v2_job_queue_constraints.up.sql @@ -0,0 +1,2 @@ +-- Add up migration script here +ALTER TABLE v2_job_queue ALTER COLUMN __created_by DROP NOT NULL; diff --git a/backend/migrations/20250117145629_v2_job_completed_constraints.down.sql b/backend/migrations/20250117145629_v2_job_completed_constraints.down.sql new file mode 100644 index 0000000000000..928a5feb5a44a --- /dev/null +++ b/backend/migrations/20250117145629_v2_job_completed_constraints.down.sql @@ -0,0 +1,8 @@ +-- Add down migration script here +ALTER TABLE v2_job_completed ALTER COLUMN status DROP NOT NULL; +ALTER TABLE v2_job_completed ALTER COLUMN completed_at DROP DEFAULT; +ALTER TABLE v2_job_completed ALTER COLUMN completed_at DROP NOT NULL; +ALTER TABLE v2_job_completed ALTER COLUMN started_at SET NOT NULL; +ALTER TABLE v2_job_completed ALTER COLUMN __created_at SET NOT NULL; +ALTER TABLE v2_job_completed ALTER COLUMN __created_by SET NOT NULL; +ALTER TABLE v2_job_completed ALTER COLUMN __success SET NOT NULL; diff --git a/backend/migrations/20250117145629_v2_job_completed_constraints.up.sql b/backend/migrations/20250117145629_v2_job_completed_constraints.up.sql new file mode 100644 index 0000000000000..9c4772ec726ad --- /dev/null +++ b/backend/migrations/20250117145629_v2_job_completed_constraints.up.sql @@ -0,0 +1,8 @@ +-- Add up migration script here +ALTER TABLE v2_job_completed ALTER COLUMN status SET NOT NULL; +ALTER TABLE v2_job_completed ALTER COLUMN completed_at SET DEFAULT now(); +ALTER TABLE v2_job_completed ALTER COLUMN completed_at SET NOT NULL; +ALTER TABLE v2_job_completed ALTER COLUMN started_at DROP NOT NULL; +ALTER TABLE v2_job_completed ALTER COLUMN __created_at DROP NOT NULL; +ALTER TABLE v2_job_completed ALTER COLUMN __created_by DROP NOT NULL; +ALTER TABLE v2_job_completed ALTER COLUMN __success DROP NOT NULL; diff --git a/backend/src/monitor.rs b/backend/src/monitor.rs index 0aeb29df56ea5..e7c691ee5725e 100644 --- a/backend/src/monitor.rs +++ b/backend/src/monitor.rs @@ -734,11 +734,11 @@ pub async fn delete_expired_items(db: &DB) -> () { match db.begin().await { Ok(mut tx) => { let deleted_jobs = sqlx::query_scalar!( - "DELETE FROM completed_job WHERE created_at <= now() - ($1::bigint::text || ' s')::interval AND started_at + ((duration_ms/1000 + $1::bigint) || ' s')::interval <= now() RETURNING id", - job_retention_secs - ) - .fetch_all(&mut *tx) - .await; + "DELETE FROM completed_job WHERE created_at <= now() - ($1::bigint::text || ' s')::interval AND started_at + ((duration_ms/1000 + $1::bigint) || ' s')::interval <= now() RETURNING id AS \"id!\"", + job_retention_secs + ) + .fetch_all(&mut *tx) + .await; match deleted_jobs { Ok(deleted_jobs) => { @@ -1495,15 +1495,15 @@ pub async fn reload_base_url_setting(db: &DB) -> error::Result<()> { async fn handle_zombie_jobs(db: &Pool, base_internal_url: &str, worker_name: &str) { if *RESTART_ZOMBIE_JOBS { let restarted = sqlx::query!( - "UPDATE queue SET running = false, started_at = null - WHERE last_ping < now() - ($1 || ' seconds')::interval - AND running = true AND job_kind NOT IN ('flow', 'flowpreview', 'flownode', 'singlescriptflow') AND same_worker = false RETURNING id, workspace_id, last_ping", - *ZOMBIE_JOB_TIMEOUT, - ) - .fetch_all(db) - .await - .ok() - .unwrap_or_else(|| vec![]); + "UPDATE queue SET running = false, started_at = null + WHERE last_ping < now() - ($1 || ' seconds')::interval + AND running = true AND job_kind NOT IN ('flow', 'flowpreview', 'flownode', 'singlescriptflow') AND same_worker = false RETURNING id AS \"id!\", workspace_id AS \"workspace_id!\", last_ping", + *ZOMBIE_JOB_TIMEOUT, + ) + .fetch_all(db) + .await + .ok() + .unwrap_or_else(|| vec![]); #[cfg(feature = "prometheus")] if METRICS_ENABLED.load(std::sync::atomic::Ordering::Relaxed) { diff --git a/backend/tests/fixtures/base.sql b/backend/tests/fixtures/base.sql index f406e603d4be3..e6e4f9875a01c 100644 --- a/backend/tests/fixtures/base.sql +++ b/backend/tests/fixtures/base.sql @@ -29,7 +29,7 @@ END; $$ LANGUAGE PLPGSQL; CREATE TRIGGER "notify_insert_on_completed_job" - AFTER INSERT ON "completed_job" + AFTER INSERT ON "v2_job_completed" FOR EACH ROW EXECUTE FUNCTION "notify_insert_on_completed_job" (); @@ -43,12 +43,12 @@ END; $$ LANGUAGE PLPGSQL; CREATE TRIGGER "notify_queue_after_insert" - AFTER INSERT ON "queue" + AFTER INSERT ON "v2_job_queue" FOR EACH ROW EXECUTE FUNCTION "notify_queue" (); CREATE TRIGGER "notify_queue_after_flow_status_update" - AFTER UPDATE ON "queue" + AFTER UPDATE ON "v2_job_flow_runtime" FOR EACH ROW WHEN (NEW.flow_status IS DISTINCT FROM OLD.flow_status) EXECUTE FUNCTION "notify_queue" (); \ No newline at end of file diff --git a/backend/windmill-api/src/db.rs b/backend/windmill-api/src/db.rs index 6186b0ada1a42..6268f603e6cb7 100644 --- a/backend/windmill-api/src/db.rs +++ b/backend/windmill-api/src/db.rs @@ -398,60 +398,60 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> { .await?; }); - run_windmill_migration!("fix_job_completed_index_4", &db, { + run_windmill_migration!("fix_job_index_1", &db, { let migration_job_name = "fix_job_completed_index_4"; let mut i = 1; tracing::info!("step {i} of {migration_job_name} migration"); - sqlx::query("create index concurrently if not exists ix_completed_job_workspace_id_created_at_new_3 ON completed_job (workspace_id, created_at DESC)") + sqlx::query!("create index concurrently if not exists ix_job_workspace_id_created_at_new_3 ON v2_job (workspace_id, created_at DESC)") .execute(db) .await?; i += 1; tracing::info!("step {i} of {migration_job_name} migration"); - sqlx::query("create index concurrently if not exists ix_completed_job_workspace_id_created_at_new_8 ON completed_job (workspace_id, created_at DESC) where job_kind in ('deploymentcallback') AND parent_job IS NULL") + sqlx::query!("create index concurrently if not exists ix_job_workspace_id_created_at_new_8 ON v2_job (workspace_id, created_at DESC) where kind in ('deploymentcallback') AND parent_job IS NULL") .execute(db) .await?; i += 1; tracing::info!("step {i} of {migration_job_name} migration"); - sqlx::query("create index concurrently if not exists ix_completed_job_workspace_id_created_at_new_9 ON completed_job (workspace_id, created_at DESC) where job_kind in ('dependencies', 'flowdependencies', 'appdependencies') AND parent_job IS NULL") + sqlx::query!("create index concurrently if not exists ix_job_workspace_id_created_at_new_9 ON v2_job (workspace_id, created_at DESC) where kind in ('dependencies', 'flowdependencies', 'appdependencies') AND parent_job IS NULL") .execute(db) .await?; i += 1; tracing::info!("step {i} of {migration_job_name} migration"); - sqlx::query("create index concurrently if not exists ix_completed_job_workspace_id_created_at_new_5 ON completed_job (workspace_id, created_at DESC) where job_kind in ('preview', 'flowpreview') AND parent_job IS NULL") + sqlx::query!("create index concurrently if not exists ix_job_workspace_id_created_at_new_5 ON v2_job (workspace_id, created_at DESC) where kind in ('preview', 'flowpreview') AND parent_job IS NULL") .execute(db) .await?; i += 1; tracing::info!("step {i} of {migration_job_name} migration"); - sqlx::query("create index concurrently if not exists ix_completed_job_workspace_id_created_at_new_6 ON completed_job (workspace_id, created_at DESC) where job_kind in ('script', 'flow') AND parent_job IS NULL") + sqlx::query!("create index concurrently if not exists ix_job_workspace_id_created_at_new_6 ON v2_job (workspace_id, created_at DESC) where kind in ('script', 'flow') AND parent_job IS NULL") .execute(db) .await?; i += 1; tracing::info!("step {i} of {migration_job_name} migration"); - sqlx::query("create index concurrently if not exists ix_completed_job_workspace_id_created_at_new_7 ON completed_job (workspace_id, success, created_at DESC) where job_kind in ('script', 'flow') AND parent_job IS NULL") + sqlx::query!("create index concurrently if not exists ix_job_workspace_id_created_at_new_7 ON v2_job (workspace_id, created_at DESC) where kind in ('script', 'flow') AND parent_job IS NULL") .execute(db) .await?; i += 1; tracing::info!("step {i} of {migration_job_name} migration"); - sqlx::query("create index concurrently if not exists ix_completed_job_workspace_id_started_at_new_2 ON completed_job (workspace_id, started_at DESC)") + sqlx::query!("create index concurrently if not exists ix_completed_job_workspace_id_started_at_new_2 ON v2_job_completed (workspace_id, started_at DESC)") .execute(db) .await?; i += 1; tracing::info!("step {i} of {migration_job_name} migration"); - sqlx::query("create index concurrently if not exists root_job_index_by_path_2 ON completed_job (workspace_id, script_path, created_at desc) WHERE parent_job IS NULL") + sqlx::query!("create index concurrently if not exists root_job_index_by_path_2 ON v2_job (workspace_id, runnable_path, created_at desc) WHERE parent_job IS NULL") .execute(db) .await?; i += 1; tracing::info!("step {i} of {migration_job_name} migration"); - sqlx::query("create index concurrently if not exists ix_completed_job_created_at ON completed_job (created_at DESC)") + sqlx::query!("create index concurrently if not exists ix_job_created_at ON v2_job (created_at DESC)") .execute(db) .await?; @@ -485,7 +485,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> { .execute(db) .await?; sqlx::query!( - "CREATE INDEX CONCURRENTLY labeled_jobs_on_jobs ON completed_job USING GIN ((result -> 'wm_labels')) WHERE result ? 'wm_labels'" + "CREATE INDEX CONCURRENTLY labeled_jobs_on_jobs ON v2_job_completed USING GIN ((result -> 'wm_labels')) WHERE result ? 'wm_labels'" ).execute(db).await?; }); diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 6e88f76ca7d3d..5dbd739f5e501 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -600,7 +600,7 @@ async fn get_flow_job_debug_info( let mut job_ids = vec![]; let jobs_with_root = sqlx::query_scalar!( - "SELECT id FROM queue WHERE workspace_id = $1 and root_job = $2", + "SELECT id AS \"id!\" FROM queue WHERE workspace_id = $1 and root_job = $2", &w_id, &id, ) @@ -1007,7 +1007,7 @@ async fn get_job_logs( .flatten(); let record = sqlx::query!( - "SELECT created_by, CONCAT(coalesce(completed_job.logs, ''), coalesce(job_logs.logs, '')) as logs, job_logs.log_offset, job_logs.log_file_index + "SELECT created_by AS \"created_by!\", CONCAT(coalesce(completed_job.logs, ''), coalesce(job_logs.logs, '')) as logs, job_logs.log_offset, job_logs.log_file_index FROM completed_job LEFT JOIN job_logs ON job_logs.job_id = completed_job.id WHERE completed_job.id = $1 AND completed_job.workspace_id = $2 AND ($3::text[] IS NULL OR completed_job.tag = ANY($3))", @@ -1044,7 +1044,7 @@ async fn get_job_logs( Ok(content_plain(Body::from(logs))) } else { let text = sqlx::query!( - "SELECT created_by, CONCAT(coalesce(queue.logs, ''), coalesce(job_logs.logs, '')) as logs, coalesce(job_logs.log_offset, 0) as log_offset, job_logs.log_file_index + "SELECT created_by AS \"created_by!\", CONCAT(coalesce(queue.logs, ''), coalesce(job_logs.logs, '')) as logs, coalesce(job_logs.log_offset, 0) as log_offset, job_logs.log_file_index FROM queue LEFT JOIN job_logs ON job_logs.job_id = queue.id WHERE queue.id = $1 AND queue.workspace_id = $2 AND ($3::text[] IS NULL OR queue.tag = ANY($3))", @@ -1549,7 +1549,7 @@ async fn cancel_jobs( , tag , priority FROM queue WHERE id = any($2) AND running = false AND parent_job IS NULL AND workspace_id = $3 AND schedule_path IS NULL FOR UPDATE SKIP LOCKED - ON CONFLICT (id) DO NOTHING RETURNING id", username, &jobs, w_id, serde_json::json!({"error": { "message": format!("Job canceled: cancel all by {username}"), "name": "Canceled", "reason": "cancel all", "canceler": username}})) + ON CONFLICT (id) DO NOTHING RETURNING id AS \"id!\"", username, &jobs, w_id, serde_json::json!({"error": { "message": format!("Job canceled: cancel all by {username}"), "name": "Canceled", "reason": "cancel all", "canceler": username}})) .fetch_all(&mut *tx) .await?.into_iter().map(|x| x.id).collect::>(); @@ -1622,7 +1622,7 @@ async fn cancel_selection( let mut tx = user_db.begin(&authed).await?; let tags = get_scope_tags(&authed).map(|v| v.iter().map(|s| s.to_string()).collect_vec()); let jobs_to_cancel = sqlx::query_scalar!( - "SELECT id FROM queue WHERE id = ANY($1) AND schedule_path IS NULL AND ($2::text[] IS NULL OR tag = ANY($2))", + "SELECT id AS \"id!\" FROM queue WHERE id = ANY($1) AND schedule_path IS NULL AND ($2::text[] IS NULL OR tag = ANY($2))", &jobs, tags.as_ref().map(|v| v.as_slice()) ) @@ -2080,7 +2080,7 @@ async fn get_suspended_parent_flow_info(job_id: Uuid, db: &DB) -> error::Result< let flow = sqlx::query_as!( FlowInfo, r#" - SELECT id, flow_status, suspend, script_path + SELECT id AS "id!", flow_status, suspend AS "suspend!", script_path FROM queue WHERE id = ( SELECT parent_job FROM queue WHERE id = $1 UNION ALL SELECT parent_job FROM completed_job WHERE id = $1) FOR UPDATE @@ -2100,7 +2100,7 @@ async fn get_suspended_flow_info<'c>( let flow = sqlx::query_as!( FlowInfo, r#" - SELECT id, flow_status, suspend, script_path + SELECT id AS "id!", flow_status, suspend AS "suspend!", script_path FROM queue WHERE id = $1 "#, @@ -4813,16 +4813,17 @@ async fn add_batch_jobs( let uuids = sqlx::query_scalar!( r#"WITH uuid_table as ( - select gen_random_uuid() as uuid from generate_series(1, $5) + select gen_random_uuid() as uuid from generate_series(1, $6) ) INSERT INTO job - (id, workspace_id, raw_code, raw_lock, raw_flow) - (SELECT uuid, $1, $2, $3, $4 FROM uuid_table) - RETURNING id"#, + (id, workspace_id, raw_code, raw_lock, raw_flow, tag) + (SELECT uuid, $1, $2, $3, $4, $5 FROM uuid_table) + RETURNING id AS "id!""#, w_id, raw_code, raw_lock, raw_flow.map(sqlx::types::Json) as Option>, + tag, n ) .fetch_all(&mut *tx) @@ -4835,7 +4836,7 @@ async fn add_batch_jobs( INSERT INTO queue (id, script_hash, script_path, job_kind, language, args, tag, created_by, permissioned_as, email, scheduled_for, workspace_id, concurrent_limit, concurrency_time_window_s, timeout, flow_status) (SELECT uuid, $1, $2, $3, $4, ('{ "uuid": "' || uuid || '" }')::jsonb, $5, $6, $7, $8, $9, $10, $12, $13, $14, $15 FROM uuid_table) - RETURNING id"#, + RETURNING id AS "id!""#, hash.map(|h| h.0), path, job_kind.clone() as JobKind, @@ -5668,7 +5669,7 @@ async fn get_completed_job_result_maybe( .into_response()) } else if get_started.is_some_and(|x| x) { let started = sqlx::query_scalar!( - "SELECT running FROM queue WHERE id = $1 AND workspace_id = $2", + "SELECT running AS \"running!\" FROM queue WHERE id = $1 AND workspace_id = $2", id, w_id ) diff --git a/backend/windmill-api/src/slack_approvals.rs b/backend/windmill-api/src/slack_approvals.rs index 1f48bd07dc4e9..fb027916ef1b0 100644 --- a/backend/windmill-api/src/slack_approvals.rs +++ b/backend/windmill-api/src/slack_approvals.rs @@ -971,11 +971,11 @@ async fn get_modal_blocks( let (job_kind, script_hash, raw_flow, parent_job_id, created_at, created_by, script_path, args) = sqlx::query!( "SELECT - queue.job_kind AS \"job_kind: JobKind\", + queue.job_kind AS \"job_kind!: JobKind\", queue.script_hash AS \"script_hash: ScriptHash\", queue.raw_flow AS \"raw_flow: sqlx::types::Json>\", completed_job.parent_job AS \"parent_job: Uuid\", - completed_job.created_at AS \"created_at: chrono::NaiveDateTime\", + completed_job.created_at AS \"created_at!: chrono::NaiveDateTime\", completed_job.created_by AS \"created_by!\", queue.script_path, queue.args AS \"args: sqlx::types::Json>\" diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 2405be5b613a1..7205d1571aefe 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -173,7 +173,7 @@ pub async fn cancel_single_job<'c>( }); } else { let id: Option = sqlx::query_scalar!( - "UPDATE queue SET canceled = true, canceled_by = $1, canceled_reason = $2, scheduled_for = now(), suspend = 0 WHERE id = $3 AND workspace_id = $4 AND (canceled = false OR canceled_reason != $2) RETURNING id", + "UPDATE queue SET canceled = true, canceled_by = $1, canceled_reason = $2, scheduled_for = now(), suspend = 0 WHERE id = $3 AND workspace_id = $4 AND (canceled = false OR canceled_reason != $2) RETURNING id AS \"id!\"", username, reason, job_running.id, @@ -249,7 +249,7 @@ pub async fn cancel_job<'c>( while !jobs.is_empty() { let p_job = jobs.pop(); let new_jobs = sqlx::query_scalar!( - "SELECT id FROM queue WHERE parent_job = $1 AND workspace_id = $2", + "SELECT id AS \"id!\" FROM queue WHERE parent_job = $1 AND workspace_id = $2", p_job, w_id ) @@ -609,7 +609,7 @@ pub async fn add_completed_job( ) VALUES ($1, $2, $3, $4, $5, COALESCE($6, now()), COALESCE($30::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($6, now()))))*1000), $7, $8, $9,\ $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29) - ON CONFLICT (id) DO UPDATE SET success = $7, result = $11 RETURNING duration_ms", + ON CONFLICT (id) DO UPDATE SET success = $7, result = $11 RETURNING duration_ms AS \"duration_ms!\"", queued_job.workspace_id, queued_job.id, queued_job.parent_job, @@ -2307,7 +2307,8 @@ pub async fn get_result_and_success_by_id_from_flow( let success = match &job_result { JobResult::SingleJob(job_id) => { sqlx::query_scalar!( - "SELECT success FROM completed_job WHERE id = $1 AND workspace_id = $2", + "SELECT success AS \"success!\" + FROM completed_job WHERE id = $1 AND workspace_id = $2", job_id, w_id ) @@ -3727,7 +3728,7 @@ pub async fn push<'c, 'd>( visible_to_owner, root_job, tag, concurrent_limit, concurrency_time_window_s, timeout, \ flow_step_id, cache_ttl, priority, last_ping) VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, now()), $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, CASE WHEN $3 THEN now() END, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, NULL) \ - RETURNING id", + RETURNING id AS \"id!\"", workspace_id, job_id, is_running, @@ -3912,7 +3913,7 @@ async fn restarted_flows_resolution( let row = sqlx::query!( "SELECT script_path, script_hash AS \"script_hash: ScriptHash\", - job_kind AS \"job_kind: JobKind\", + job_kind AS \"job_kind!: JobKind\", flow_status AS \"flow_status: Json>\", raw_flow AS \"raw_flow: Json>\" FROM completed_job WHERE id = $1 and workspace_id = $2", diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index b320b9a4456df..3d375c776cdf7 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -220,7 +220,7 @@ pub async fn update_flow_status_after_job_completion_internal( let (job_kind, script_hash, old_status, raw_flow) = sqlx::query!( "SELECT - job_kind AS \"job_kind: JobKind\", + job_kind AS \"job_kind!: JobKind\", script_hash AS \"script_hash: ScriptHash\", flow_status AS \"flow_status!: Json>\", raw_flow AS \"raw_flow: Json>\" @@ -550,7 +550,7 @@ pub async fn update_flow_status_after_job_completion_internal( let new_status = if skip_loop_failures || sqlx::query_scalar!( - "SELECT success FROM completed_job WHERE id = ANY($1)", + "SELECT success AS \"success!\" FROM completed_job WHERE id = ANY($1)", jobs.as_slice() ) .fetch_all(&mut *tx) @@ -1683,7 +1683,7 @@ async fn push_next_flow_job( .await?; if no_flow_overlap { let overlapping = sqlx::query_scalar!( - "SELECT id FROM queue WHERE schedule_path = $1 AND workspace_id = $2 AND id != $3 AND running = true", + "SELECT id AS \"id!\" FROM queue WHERE schedule_path = $1 AND workspace_id = $2 AND id != $3 AND running = true", flow_job.schedule_path.as_ref().unwrap(), flow_job.workspace_id.as_str(), flow_job.id diff --git a/backend/windmill-worker/src/worker_lockfiles.rs b/backend/windmill-worker/src/worker_lockfiles.rs index 06b7e4f0d2ec0..8553e5c867605 100644 --- a/backend/windmill-worker/src/worker_lockfiles.rs +++ b/backend/windmill-worker/src/worker_lockfiles.rs @@ -632,15 +632,17 @@ pub async fn handle_flow_dependency_job( let new_flow_value = Json(serde_json::value::to_raw_value(&flow).map_err(to_anyhow)?); // Re-check cancellation to ensure we don't accidentally override a flow. - if sqlx::query_scalar!("SELECT canceled FROM queue WHERE id = $1", job.id) - .fetch_optional(db) - .await - .map(|v| Some(true) == v) - .unwrap_or_else(|err| { - tracing::error!(%job.id, %err, "error checking cancellation for job {0}: {err}", job.id); - false - }) - { + if sqlx::query_scalar!( + "SELECT canceled AS \"canceled!\" FROM queue WHERE id = $1", + job.id + ) + .fetch_optional(db) + .await + .map(|v| Some(true) == v) + .unwrap_or_else(|err| { + tracing::error!(%job.id, %err, "error checking cancellation for job {0}: {err}", job.id); + false + }) { return Ok(to_raw_value_owned(json!({ "status": "Flow lock generation was canceled", }))); @@ -1524,15 +1526,17 @@ pub async fn handle_app_dependency_job( .await?; // Re-check cancelation to ensure we don't accidentially override an app. - if sqlx::query_scalar!("SELECT canceled FROM queue WHERE id = $1", job.id) - .fetch_optional(db) - .await - .map(|v| Some(true) == v) - .unwrap_or_else(|err| { - tracing::error!(%job.id, %err, "error checking cancelation for job {0}: {err}", job.id); - false - }) - { + if sqlx::query_scalar!( + "SELECT canceled AS \"canceled!\" FROM queue WHERE id = $1", + job.id + ) + .fetch_optional(db) + .await + .map(|v| Some(true) == v) + .unwrap_or_else(|err| { + tracing::error!(%job.id, %err, "error checking cancelation for job {0}: {err}", job.id); + false + }) { return Ok(()); }