Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ProcessingEngineRunFacet to OL DAG Start event #43213

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions providers/src/airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
nominal_time_run,
ownership_job,
parent_run,
processing_engine_run,
source_code_location_job,
)
from openlineage.client.uuid import generate_static_uuid
Expand All @@ -42,6 +41,7 @@
OpenLineageRedactor,
get_airflow_debug_facet,
get_airflow_state_run_facet,
get_processing_engine_facet,
)
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -197,18 +197,10 @@ def start_task(
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
"""
from airflow.version import version as AIRFLOW_VERSION

processing_engine_version_facet = processing_engine_run.ProcessingEngineRunFacet(
version=AIRFLOW_VERSION,
name="Airflow",
openlineageAdapterVersion=OPENLINEAGE_PROVIDER_VERSION,
)

run_facets = run_facets or {}
if task:
run_facets = {**task.run_facets, **run_facets}
run_facets["processing_engine"] = processing_engine_version_facet # type: ignore
run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore
event = RunEvent(
eventType=RunState.START,
eventTime=event_time,
Expand Down Expand Up @@ -364,7 +356,7 @@ def dag_started(
job_name=dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets={**run_facets, **get_airflow_debug_facet()},
run_facets={**run_facets, **get_airflow_debug_facet(), **get_processing_engine_facet()},
),
inputs=[],
outputs=[],
Expand Down
16 changes: 14 additions & 2 deletions providers/src/airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
# TODO: move this maybe to Airflow's logic?
from airflow.models import DAG, BaseOperator, DagRun, MappedOperator
from airflow.providers.common.compat.assets import Asset
from airflow.providers.openlineage import conf
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.plugins.facets import (
AirflowDagRunFacet,
AirflowDebugRunFacet,
Expand All @@ -65,7 +65,7 @@

if TYPE_CHECKING:
from openlineage.client.event_v2 import Dataset as OpenLineageDataset
from openlineage.client.facet_v2 import RunFacet
from openlineage.client.facet_v2 import RunFacet, processing_engine_run

from airflow.models import TaskInstance
from airflow.utils.state import DagRunState, TaskInstanceState
Expand Down Expand Up @@ -406,6 +406,18 @@ def _get_all_packages_installed() -> dict[str, str]:
return {dist.metadata["Name"]: dist.version for dist in metadata.distributions()}


def get_processing_engine_facet() -> dict[str, processing_engine_run.ProcessingEngineRunFacet]:
from openlineage.client.facet_v2 import processing_engine_run

return {
"processing_engine": processing_engine_run.ProcessingEngineRunFacet(
version=AIRFLOW_VERSION,
name="Airflow",
openlineageAdapterVersion=OPENLINEAGE_PROVIDER_VERSION,
)
}


def get_airflow_debug_facet() -> dict[str, AirflowDebugRunFacet]:
if not conf.debug_mode():
return {}
Expand Down
3 changes: 3 additions & 0 deletions providers/tests/openlineage/plugins/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,9 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat
nominalStartTime=event_time.isoformat(),
nominalEndTime=event_time.isoformat(),
),
"processing_engine": processing_engine_run.ProcessingEngineRunFacet(
version=ANY, name="Airflow", openlineageAdapterVersion=ANY
),
"airflowDagRun": AirflowDagRunFacet(
dag=expected_dag_info,
dagRun={
Expand Down
Loading