Skip to content

Commit

Permalink
restart concurrency limits
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Jan 30, 2025
1 parent 8ba0f3a commit b5703c8
Showing 1 changed file with 34 additions and 4 deletions.
38 changes: 34 additions & 4 deletions backend/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1506,9 +1506,20 @@ pub async fn reload_base_url_setting(db: &DB) -> error::Result<()> {
async fn handle_zombie_jobs(db: &Pool<Postgres>, base_internal_url: &str, worker_name: &str) {
if *RESTART_ZOMBIE_JOBS {
let restarted = sqlx::query!(
"UPDATE queue SET running = false, started_at = null
WHERE last_ping < now() - ($1 || ' seconds')::interval
AND running = true AND job_kind NOT IN ('flow', 'flowpreview', 'flownode', 'singlescriptflow') AND same_worker = false RETURNING id, workspace_id, last_ping",
"WITH zombie_jobs AS (
UPDATE queue SET running = false, started_at = null
WHERE last_ping < now() - ($1 || ' seconds')::interval
AND running = true AND job_kind NOT IN ('flow', 'flowpreview', 'flownode', 'singlescriptflow') AND same_worker = false
RETURNING id, workspace_id, last_ping
),
update_concurrency AS (
UPDATE concurrency_counter cc
SET job_uuids = job_uuids - zj.id::text
FROM zombie_jobs zj
INNER JOIN concurrency_key ck ON ck.job_id = zj.id
WHERE cc.concurrency_id = ck.key
)
SELECT id, workspace_id, last_ping FROM zombie_jobs",
*ZOMBIE_JOB_TIMEOUT,
)
.fetch_all(db)
Expand Down Expand Up @@ -1652,12 +1663,31 @@ async fn handle_zombie_flows(db: &DB) -> error::Result<()> {
tracing::error!(error_message);
report_critical_error(error_message, db.clone(), Some(&flow.workspace_id), None).await;
// if the flow hasn't started and is a zombie, we can simply restart it
let mut tx = db.begin().await?;

let concurrency_key =
sqlx::query_scalar!("SELECT key FROM concurrency_key WHERE job_id = $1", flow.id)
.fetch_optional(&mut *tx)
.await?;

if let Some(key) = concurrency_key {
sqlx::query!(
"UPDATE concurrency_counter SET job_uuids = job_uuids - $2 WHERE concurrency_id = $1",
key,
flow.id.hyphenated().to_string()
)
.execute(&mut *tx)
.await?;
}

sqlx::query!(
"UPDATE queue SET running = false, started_at = null WHERE id = $1 AND canceled = false",
flow.id
)
.execute(db)
.execute(&mut *tx)
.await?;

tx.commit().await?;
} else {
let id = flow.id.clone();
let last_ping = flow.last_ping.clone();
Expand Down

0 comments on commit b5703c8

Please sign in to comment.