From 0a75a6744a90daea978dad219b939a45d89afe69 Mon Sep 17 00:00:00 2001 From: Mathew Wicks <5735406+thesuperzapper@users.noreply.github.com> Date: Tue, 30 Apr 2024 19:48:50 -0700 Subject: [PATCH] fix: scheduler and triggerer probe performance for airflow <2.6.0 (#853) fix: scheduler and triggerer probe performance in <2.6.0 Signed-off-by: Mathew Wicks <5735406+thesuperzapper@users.noreply.github.com> --- .../scheduler/scheduler-deployment.yaml | 16 ++-------------- .../triggerer/triggerer-deployment.yaml | 9 +-------- 2 files changed, 3 insertions(+), 22 deletions(-) diff --git a/charts/airflow/templates/scheduler/scheduler-deployment.yaml b/charts/airflow/templates/scheduler/scheduler-deployment.yaml index 30ed6f3f..b31d7cee 100644 --- a/charts/airflow/templates/scheduler/scheduler-deployment.yaml +++ b/charts/airflow/templates/scheduler/scheduler-deployment.yaml @@ -157,21 +157,9 @@ spec: from airflow.utils.db import create_session from airflow.utils.net import get_hostname - # heartbeat check imports - try: - from airflow.jobs.scheduler_job_runner import SchedulerJobRunner - except ImportError: - # `SchedulerJob` is wrapped by `SchedulerJobRunner` since airflow 2.6.0 - from airflow.jobs.scheduler_job import SchedulerJob as SchedulerJobRunner - {{- if .Values.scheduler.livenessProbe.taskCreationCheck.enabled }} {{ "" }} # task creation check imports - try: - from airflow.jobs.local_task_job_runner import LocalTaskJobRunner - except ImportError: - # `LocalTaskJob` is wrapped by `LocalTaskJobRunner` since airflow 2.6.0 - from airflow.jobs.local_task_job import LocalTaskJob as LocalTaskJobRunner from airflow.utils import timezone {{- end }} @@ -183,7 +171,7 @@ spec: hostname = get_hostname() scheduler_job = session \ .query(Job) \ - .filter_by(job_type=SchedulerJobRunner.job_type) \ + .filter_by(job_type="SchedulerJob") \ .filter_by(hostname=hostname) \ .order_by(Job.latest_heartbeat.desc()) \ .limit(1) \ @@ -212,7 +200,7 @@ spec: task_job_threshold = {{ $task_job_threshold }} task_job = session \ .query(Job) \ - .filter_by(job_type=LocalTaskJobRunner.job_type) \ + .filter_by(job_type="LocalTaskJob") \ .order_by(Job.id.desc()) \ .limit(1) \ .first() diff --git a/charts/airflow/templates/triggerer/triggerer-deployment.yaml b/charts/airflow/templates/triggerer/triggerer-deployment.yaml index bac60bb1..ed90f532 100644 --- a/charts/airflow/templates/triggerer/triggerer-deployment.yaml +++ b/charts/airflow/templates/triggerer/triggerer-deployment.yaml @@ -145,19 +145,12 @@ spec: from airflow.utils.db import create_session from airflow.utils.net import get_hostname - # heartbeat check imports - try: - from airflow.jobs.triggerer_job_runner import TriggererJobRunner - except ImportError: - # `TriggererJob` is wrapped by `TriggererJobRunner` since airflow 2.6.0 - from airflow.jobs.triggerer_job import TriggererJob as TriggererJobRunner - with create_session() as session: # ensure the TriggererJob with most recent heartbeat for this `hostname` is alive hostname = get_hostname() triggerer_job = session \ .query(Job) \ - .filter_by(job_type=TriggererJobRunner.job_type) \ + .filter_by(job_type="TriggererJob") \ .filter_by(hostname=hostname) \ .order_by(Job.latest_heartbeat.desc()) \ .limit(1) \