Skip to content

Commit

Permalink
fix: scheduler and triggerer probe performance for airflow <2.6.0 (#853)
Browse files Browse the repository at this point in the history
fix: scheduler and triggerer probe performance in <2.6.0

Signed-off-by: Mathew Wicks <[email protected]>
  • Loading branch information
thesuperzapper authored May 1, 2024
1 parent c4fa882 commit 0a75a67
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 22 deletions.
16 changes: 2 additions & 14 deletions charts/airflow/templates/scheduler/scheduler-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,9 @@ spec:
from airflow.utils.db import create_session
from airflow.utils.net import get_hostname
# heartbeat check imports
try:
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
except ImportError:
# `SchedulerJob` is wrapped by `SchedulerJobRunner` since airflow 2.6.0
from airflow.jobs.scheduler_job import SchedulerJob as SchedulerJobRunner
{{- if .Values.scheduler.livenessProbe.taskCreationCheck.enabled }}
{{ "" }}
# task creation check imports
try:
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
except ImportError:
# `LocalTaskJob` is wrapped by `LocalTaskJobRunner` since airflow 2.6.0
from airflow.jobs.local_task_job import LocalTaskJob as LocalTaskJobRunner
from airflow.utils import timezone
{{- end }}
Expand All @@ -183,7 +171,7 @@ spec:
hostname = get_hostname()
scheduler_job = session \
.query(Job) \
.filter_by(job_type=SchedulerJobRunner.job_type) \
.filter_by(job_type="SchedulerJob") \
.filter_by(hostname=hostname) \
.order_by(Job.latest_heartbeat.desc()) \
.limit(1) \
Expand Down Expand Up @@ -212,7 +200,7 @@ spec:
task_job_threshold = {{ $task_job_threshold }}
task_job = session \
.query(Job) \
.filter_by(job_type=LocalTaskJobRunner.job_type) \
.filter_by(job_type="LocalTaskJob") \
.order_by(Job.id.desc()) \
.limit(1) \
.first()
Expand Down
9 changes: 1 addition & 8 deletions charts/airflow/templates/triggerer/triggerer-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,12 @@ spec:
from airflow.utils.db import create_session
from airflow.utils.net import get_hostname
# heartbeat check imports
try:
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
except ImportError:
# `TriggererJob` is wrapped by `TriggererJobRunner` since airflow 2.6.0
from airflow.jobs.triggerer_job import TriggererJob as TriggererJobRunner
with create_session() as session:
# ensure the TriggererJob with most recent heartbeat for this `hostname` is alive
hostname = get_hostname()
triggerer_job = session \
.query(Job) \
.filter_by(job_type=TriggererJobRunner.job_type) \
.filter_by(job_type="TriggererJob") \
.filter_by(hostname=hostname) \
.order_by(Job.latest_heartbeat.desc()) \
.limit(1) \
Expand Down

0 comments on commit 0a75a67

Please sign in to comment.