diff --git a/charts/airflow/templates/scheduler/scheduler-deployment.yaml b/charts/airflow/templates/scheduler/scheduler-deployment.yaml index 9660f83b..e1ffa67c 100644 --- a/charts/airflow/templates/scheduler/scheduler-deployment.yaml +++ b/charts/airflow/templates/scheduler/scheduler-deployment.yaml @@ -149,21 +149,9 @@ spec: from airflow.utils.db import create_session from airflow.utils.net import get_hostname - # heartbeat check imports - try: - from airflow.jobs.scheduler_job_runner import SchedulerJobRunner - except ImportError: - # `SchedulerJob` is wrapped by `SchedulerJobRunner` since airflow 2.6.0 - from airflow.jobs.scheduler_job import SchedulerJob as SchedulerJobRunner - {{- if .Values.scheduler.livenessProbe.taskCreationCheck.enabled }} {{ "" }} # task creation check imports - try: - from airflow.jobs.local_task_job_runner import LocalTaskJobRunner - except ImportError: - # `LocalTaskJob` is wrapped by `LocalTaskJobRunner` since airflow 2.6.0 - from airflow.jobs.local_task_job import LocalTaskJob as LocalTaskJobRunner from airflow.utils import timezone {{- end }} @@ -175,7 +163,7 @@ spec: hostname = get_hostname() scheduler_job = session \ .query(Job) \ - .filter_by(job_type=SchedulerJobRunner.job_type) \ + .filter_by(job_type="SchedulerJob") \ .filter_by(hostname=hostname) \ .order_by(Job.latest_heartbeat.desc()) \ .limit(1) \ @@ -204,7 +192,7 @@ spec: task_job_threshold = {{ $task_job_threshold }} task_job = session \ .query(Job) \ - .filter_by(job_type=LocalTaskJobRunner.job_type) \ + .filter_by(job_type="LocalTaskJob") \ .order_by(Job.id.desc()) \ .limit(1) \ .first() diff --git a/charts/airflow/templates/triggerer/triggerer-deployment.yaml b/charts/airflow/templates/triggerer/triggerer-deployment.yaml index 2497f68f..13434b1d 100644 --- a/charts/airflow/templates/triggerer/triggerer-deployment.yaml +++ b/charts/airflow/templates/triggerer/triggerer-deployment.yaml @@ -134,19 +134,12 @@ spec: from airflow.utils.db import create_session from airflow.utils.net import get_hostname - # heartbeat check imports - try: - from airflow.jobs.triggerer_job_runner import TriggererJobRunner - except ImportError: - # `TriggererJob` is wrapped by `TriggererJobRunner` since airflow 2.6.0 - from airflow.jobs.triggerer_job import TriggererJob as TriggererJobRunner - with create_session() as session: # ensure the TriggererJob with most recent heartbeat for this `hostname` is alive hostname = get_hostname() triggerer_job = session \ .query(Job) \ - .filter_by(job_type=TriggererJobRunner.job_type) \ + .filter_by(job_type="TriggererJob") \ .filter_by(hostname=hostname) \ .order_by(Job.latest_heartbeat.desc()) \ .limit(1) \