diff --git a/kedro-airflow/README.md b/kedro-airflow/README.md index afabe1b51..c7ff99c41 100644 --- a/kedro-airflow/README.md +++ b/kedro-airflow/README.md @@ -55,3 +55,58 @@ You can use the additional command line argument `--jinja-file` (alias `-j`) to ```bash kedro airflow create --jinja-file=./custom/template.j2 ``` + +#### How can I pass arguments to the Airflow DAGs dynamically? + +`kedro-airflow` picks up configuration from `airflow.yml` files. Arguments can be specified globally, or per pipeline: + +```yaml +# Global parameters +default: + start_date: [2023, 1, 1] + max_active_runs: 3 + # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs + schedule_interval: "@once" + catchup: false + # Default settings applied to all tasks + owner: "airflow" + depends_on_past: false + email_on_failure: false + email_on_retry: false + retries: 1 + retry_delay: 5 + +# Arguments specific to the pipeline (overrides the parameters above) +data_science: + owner: "airflow-ds" +``` + +Arguments can also be passed via `--params` in the command line: + +```bash +kedro airflow create --params "schedule_interval='@weekly'" +``` + +These variables are passed to the Jinja2 template. + +#### What if I want to pass different arguments? + +In order to pass arguments other than those specified in the default template, simply pass a custom template (see: _"What if I want to use a different Jinja2 template?"_) + +The syntax for arguments is: +``` +{{ argument_name }} +``` + +In order to make arguments optional, one can use: +``` +{{ argument_name | default("default_value") }} +``` + +For examples, please have a look at the default template (`airflow_dag_template.j2`). + +#### How can I use Airflow runtime parameters? + +It is possible to pass parameters when triggering an Airflow DAG from the user interface. +In order to use this feature, create a custom template using the [Params syntax](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html). +See _"What if I want to use a different Jinja2 template?"_ for instructions on using custom templates. diff --git a/kedro-airflow/RELEASE.md b/kedro-airflow/RELEASE.md index c2e0615b4..c0ee5f189 100755 --- a/kedro-airflow/RELEASE.md +++ b/kedro-airflow/RELEASE.md @@ -1,6 +1,11 @@ # Upcoming release 0.5.2 * Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory. * Migrate all project metadata to static `pyproject.toml`. +* Configure DAG kwargs via `airflow.yml`. +* The generated DAG file now contains the pipeline name. +* Included help for CLI arguments (see `kedro airflow create --help`). +* Raise error when pipeline does not exists. +* Added tests for CLI arugments. # Release 0.5.1 * Added additional CLI argument `--jinja-file` to provide a path to a custom Jinja2 template. diff --git a/kedro-airflow/kedro_airflow/airflow_dag_template.j2 b/kedro-airflow/kedro_airflow/airflow_dag_template.j2 index 92c6296e1..12d600f2c 100644 --- a/kedro-airflow/kedro_airflow/airflow_dag_template.j2 +++ b/kedro-airflow/kedro_airflow/airflow_dag_template.j2 @@ -10,7 +10,6 @@ from kedro.framework.project import configure_project class KedroOperator(BaseOperator): - @apply_defaults def __init__( self, @@ -35,46 +34,43 @@ class KedroOperator(BaseOperator): env=self.env) as session: session.run(self.pipeline_name, node_names=[self.node_name]) + # Kedro settings required to run your pipeline env = "{{ env }}" pipeline_name = "{{ pipeline_name }}" project_path = Path.cwd() package_name = "{{ package_name }}" -# Default settings applied to all tasks -default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) -} - # Using a DAG context manager, you don't have to specify the dag property of each task with DAG( - "{{ dag_name | safe | slugify }}", - start_date=datetime(2019, 1, 1), - max_active_runs=3, - schedule_interval=timedelta(minutes=30), # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs - default_args=default_args, - catchup=False # enable if you don't want historical dag runs to run -) as dag: - - tasks = {} - {% for node in pipeline.nodes %} - tasks["{{ node.name | safe | slugify }}"] = KedroOperator( - task_id="{{ node.name | safe | slugify }}", - package_name=package_name, - pipeline_name=pipeline_name, - node_name="{{ node.name | safe }}", - project_path=project_path, - env=env, + dag_id="{{ dag_name | safe | slugify }}", + start_date=datetime({{ start_date | default([2023, 1, 1]) | join(",")}}), + max_active_runs={{ max_active_runs | default(3) }}, + # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs + schedule_interval="{{ schedule_interval | default('@once') }}", + catchup={{ catchup | default(False) }}, + # Default settings applied to all tasks + default_args=dict( + owner="{{ owner | default('airflow') }}", + depends_on_past={{ depends_on_past | default(False) }}, + email_on_failure={{ email_on_failure | default(False) }}, + email_on_retry={{ email_on_retry | default(False) }}, + retries={{ retries | default(1) }}, + retry_delay=timedelta(minutes={{ retry_delay | default(5) }}) ) - {% endfor %} +) as dag: + tasks = { + {% for node in pipeline.nodes %} "{{ node.name | safe | slugify }}": KedroOperator( + task_id="{{ node.name | safe | slugify }}", + package_name=package_name, + pipeline_name=pipeline_name, + node_name="{{ node.name | safe }}", + project_path=project_path, + env=env, + ), +{% endfor %} } {% for parent_node, child_nodes in dependencies.items() -%} - {% for child in child_nodes %} - tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"] + {% for child in child_nodes %} tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"] {% endfor %} {%- endfor %} diff --git a/kedro-airflow/kedro_airflow/plugin.py b/kedro-airflow/kedro_airflow/plugin.py index c1a62b0f3..bfd78ca63 100644 --- a/kedro-airflow/kedro_airflow/plugin.py +++ b/kedro-airflow/kedro_airflow/plugin.py @@ -2,14 +2,23 @@ from collections import defaultdict from pathlib import Path +from typing import Any import click import jinja2 from click import secho +from kedro.config import MissingConfigException +from kedro.framework.cli.project import PARAMS_ARG_HELP +from kedro.framework.cli.utils import ENV_HELP, KedroCliError, _split_params +from kedro.framework.context import KedroContext from kedro.framework.project import pipelines -from kedro.framework.startup import ProjectMetadata +from kedro.framework.session import KedroSession +from kedro.framework.startup import ProjectMetadata, bootstrap_project from slugify import slugify +PIPELINE_ARG_HELP = """Name of the registered pipeline to convert. +If not set, the '__default__' pipeline is used.""" + @click.group(name="Kedro-Airflow") def commands(): # pylint: disable=missing-function-docstring @@ -22,15 +31,35 @@ def airflow_commands(): pass +def load_config( + context: KedroContext, pipeline_name: str, patterns: list[str] +) -> dict[str, Any]: + try: + config_airflow = context.config_loader.get(*patterns) + dag_config = {} + # Load the default config if specified + if "default" in config_airflow: + dag_config.update(config_airflow["default"]) + # Update with pipeline-specific config if present + if pipeline_name in config_airflow: + dag_config.update(config_airflow[pipeline_name]) + except MissingConfigException: + dag_config = {} + return dag_config + + @airflow_commands.command() -@click.option("-p", "--pipeline", "pipeline_name", default="__default__") -@click.option("-e", "--env", default="local") +@click.option( + "-p", "--pipeline", "pipeline_name", default="__default__", help=PIPELINE_ARG_HELP +) +@click.option("-e", "--env", default="local", help=ENV_HELP) @click.option( "-t", "--target-dir", "target_path", type=click.Path(writable=True, resolve_path=True, file_okay=False), default="./airflow_dags/", + help="The directory path to store the generated Airflow dags", ) @click.option( "-j", @@ -39,6 +68,22 @@ def airflow_commands(): exists=True, readable=True, resolve_path=True, file_okay=True, dir_okay=False ), default=Path(__file__).parent / "airflow_dag_template.j2", + help="The template file for the generated Airflow dags", +) +@click.option( + "--params", + type=click.UNPROCESSED, + default="", + help=PARAMS_ARG_HELP, + callback=_split_params, +) +@click.option( + "-c", + "--config-patterns", + multiple=True, + type=click.STRING, + default=["airflow*", "airflow/**"], + help="Config pattern for airflow.yml", ) @click.pass_obj def create( @@ -47,8 +92,19 @@ def create( env, target_path, jinja_file, + params, + config_patterns, ): # pylint: disable=too-many-locals,too-many-arguments """Create an Airflow DAG for a project""" + project_path = Path().cwd() + bootstrap_project(project_path) + with KedroSession.create(project_path=project_path, env=env) as session: + context = session.load_context() + dag_config = load_config(context, pipeline_name, config_patterns) + + # Update with params if provided + dag_config.update(params) + jinja_file = Path(jinja_file).resolve() loader = jinja2.FileSystemLoader(jinja_file.parent) jinja_env = jinja2.Environment(autoescape=True, loader=loader, lstrip_blocks=True) @@ -56,7 +112,7 @@ def create( template = jinja_env.get_template(jinja_file.name) package_name = metadata.package_name - dag_filename = f"{package_name}_dag.py" + dag_filename = f"{package_name}_{pipeline_name}_dag.py" target_path = Path(target_path) target_path = target_path / dag_filename @@ -64,6 +120,8 @@ def create( target_path.parent.mkdir(parents=True, exist_ok=True) pipeline = pipelines.get(pipeline_name) + if pipeline is None: + raise KedroCliError(f"Pipeline {pipeline_name} not found.") dependencies = defaultdict(list) for node, parent_nodes in pipeline.node_dependencies.items(): @@ -77,6 +135,7 @@ def create( pipeline_name=pipeline_name, package_name=package_name, pipeline=pipeline, + **dag_config, ).dump(str(target_path)) secho("") @@ -84,7 +143,8 @@ def create( secho(str(target_path)) secho("This file should be copied to your Airflow DAG folder.", fg="yellow") secho( - "The Airflow configuration can be customized by editing this file.", fg="green" + "The Airflow configuration can be customized by editing this file.", + fg="green", ) secho("") secho( @@ -101,4 +161,3 @@ def create( "And all local paths in both the data catalog and log config must be absolute paths.", fg="yellow", ) - secho("") diff --git a/kedro-airflow/test_requirements.txt b/kedro-airflow/test_requirements.txt index cdea520c7..e37dd055e 100644 --- a/kedro-airflow/test_requirements.txt +++ b/kedro-airflow/test_requirements.txt @@ -11,3 +11,4 @@ pytest-mock pytest-xdist trufflehog>=2.1.0, <3.0 wheel +pyyaml diff --git a/kedro-airflow/tests/conftest.py b/kedro-airflow/tests/conftest.py index c23cc5916..2b013f493 100644 --- a/kedro-airflow/tests/conftest.py +++ b/kedro-airflow/tests/conftest.py @@ -4,16 +4,19 @@ discover them automatically. More info here: https://docs.pytest.org/en/latest/fixture.html """ +import os from pathlib import Path from shutil import copyfile from click.testing import CliRunner +from cookiecutter.main import cookiecutter from kedro import __version__ as kedro_version +from kedro.framework.cli.starters import TEMPLATE_PATH from kedro.framework.startup import ProjectMetadata from pytest import fixture -@fixture(name="cli_runner") +@fixture(name="cli_runner", scope="session") def cli_runner(): runner = CliRunner() cwd = Path.cwd() @@ -23,10 +26,60 @@ def cli_runner(): yield runner -@fixture -def metadata(cli_runner): # pylint: disable=unused-argument +@fixture(scope="session") +def kedro_project(cli_runner): + tmp_path = Path().cwd() + # From `kedro-mlflow.tests.conftest.py` + config = { + "output_dir": tmp_path, + "kedro_version": kedro_version, + "project_name": "This is a fake project", + "repo_name": "fake-project", + "python_package": "fake_project", + "include_example": True, + } + + cookiecutter( + str(TEMPLATE_PATH), + output_dir=config["output_dir"], + no_input=True, + extra_context=config, + ) + + pipeline_registry_py = """ +from kedro.pipeline import Pipeline, node + + +def identity(arg): + return arg + + +def register_pipelines(): + pipeline = Pipeline( + [ + node(identity, ["input"], ["intermediate"], name="node0"), + node(identity, ["intermediate"], ["output"], name="node1"), + ], + tags="pipeline0", + ) + return { + "__default__": pipeline, + "ds": pipeline, + } + """ + + ( + tmp_path / "fake-project" / "src" / "fake_project" / "pipeline_registry.py" + ).write_text(pipeline_registry_py) + + os.chdir(tmp_path / "fake-project") + return tmp_path / "fake-project" + + +@fixture(scope="session") +def metadata(kedro_project): # pylint: disable=unused-argument # cwd() depends on ^ the isolated filesystem, created by CliRunner() - project_path = Path.cwd() + project_path = kedro_project return ProjectMetadata( project_path / "pyproject.toml", "hello_world", diff --git a/kedro-airflow/tests/test_plugin.py b/kedro-airflow/tests/test_plugin.py index 77c051ff5..f9dc1d401 100644 --- a/kedro-airflow/tests/test_plugin.py +++ b/kedro-airflow/tests/test_plugin.py @@ -1,50 +1,227 @@ from pathlib import Path +from typing import Any import pytest -from kedro.framework.project import pipelines -from kedro.pipeline import node -from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline +import yaml from kedro_airflow.plugin import commands -def identity(arg): - return arg - - @pytest.mark.parametrize( "dag_name,pipeline_name,command", [ # Test normal execution - ("hello_world_dag", "__default__", ["airflow", "create"]), + ("hello_world", "__default__", ["airflow", "create"]), # Test execution with alternate pipeline name - ("hello_world_dag", "ds", ["airflow", "create", "--pipeline", "ds"]), - # Test execution with different dir and filename for Jinja2 Template - ( - "hello_world_dag", - "__default__", - ["airflow", "create", "-j", "airflow_dag.j2"], - ), + ("hello_world", "ds", ["airflow", "create", "--pipeline", "ds"]), ], ) -def test_create_airflow_dag( - dag_name, pipeline_name, command, mocker, cli_runner, metadata -): +def test_create_airflow_dag(dag_name, pipeline_name, command, cli_runner, metadata): """Check the generation and validity of a simple Airflow DAG.""" - dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}.py" - mock_pipeline = modular_pipeline( - [ - node(identity, ["input"], ["intermediate"], name="node0"), - node(identity, ["intermediate"], ["output"], name="node1"), - ], - tags="pipeline0", + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_{pipeline_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + + expected_airflow_dag = 'tasks["node0"] >> tasks["node1"]' + with dag_file.open(encoding="utf-8") as f: + dag_code = [line.strip() for line in f.read().splitlines()] + assert expected_airflow_dag in dag_code + dag_file.unlink() + + +def _create_kedro_airflow_yml(file_name: Path, content: dict[str, Any]): + file_name.parent.mkdir(parents=True, exist_ok=True) + with file_name.open("w") as fp: + yaml.dump(content, fp) + + +def test_airflow_config_params(cli_runner, metadata): + """Check if config variables are picked up""" + dag_name = "hello_world" + pipeline_name = "__default__" + template_name = "airflow_params.j2" + content = "{{ owner | default('hello')}}" + + _create_kedro_airflow_jinja_template(Path.cwd(), template_name, content) + + # default + default_content = "hello" + command = ["airflow", "create", "-j", template_name] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_{pipeline_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == default_content + dag_file.unlink() + + # "--params" + expected_content = "testme" + command = ["airflow", "create", "--params", "owner=testme", "-j", template_name] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_{pipeline_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + dag_file.unlink() + + # airflow.yml + expected_content = "someone else" + file_name = Path.cwd() / "conf" / "base" / "airflow.yml" + _create_kedro_airflow_yml(file_name, {"default": {"owner": expected_content}}) + command = ["airflow", "create", "-j", template_name] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_{pipeline_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + file_name.unlink() + dag_file.unlink() + + # ../airflow.yml + expected_content = "yet someone else" + file_name = Path.cwd() / "conf" / "base" / "airflow" / "default.yml" + _create_kedro_airflow_yml(file_name, {"default": {"owner": expected_content}}) + command = ["airflow", "create", "-j", template_name] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_{pipeline_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + file_name.unlink() + + # scheduler.yml + expected_content = "yet someone else" + file_name = Path.cwd() / "conf" / "base" / "scheduler.yml" + _create_kedro_airflow_yml(file_name, {"default": {"owner": expected_content}}) + command = ["airflow", "create", "-j", template_name] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_{pipeline_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == default_content + dag_file.unlink() + + command = ["airflow", "create", "-j", template_name, "-c", "scheduler*"] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_{pipeline_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + dag_file.unlink() + file_name.unlink() + + # env + expected_content = "again someone else" + file_name = Path.cwd() / "conf" / "local" / "airflow.yml" + _create_kedro_airflow_yml(file_name, {"default": {"owner": expected_content}}) + command = ["airflow", "create", "-j", template_name, "-e", "local"] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_{pipeline_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + dag_file.unlink() + + # custom pipeline name + expected_content = "finally someone else" + file_name = Path.cwd() / "conf" / "base" / "airflow.yml" + _create_kedro_airflow_yml( + file_name, {"default": {"owner": "foobar"}, "ds": {"owner": expected_content}} + ) + command = ["airflow", "create", "-j", template_name, "-p", "ds"] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_ds_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + dag_file.unlink() + + +def _create_kedro_airflow_jinja_template(path: Path, name: str, content: str): + (path / name).write_text(content) + + +def test_custom_template_exists(cli_runner, metadata): + """Test execution with different dir and filename for Jinja2 Template""" + dag_name = "hello_world" + pipeline_name = "__default__" + template_name = "custom_template.j2" + command = ["airflow", "create", "-j", template_name] + content = "print('my custom dag')" + # because there are no jinja variables + expected_content = content + + _create_kedro_airflow_jinja_template(Path.cwd(), template_name, content) + + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_{pipeline_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + + +def test_custom_template_nonexistent(cli_runner, metadata): + """Test execution with different dir and filename for Jinja2 Template""" + template_name = "non_existent_custom_template.j2" + command = ["airflow", "create", "-j", template_name] + result = cli_runner.invoke(commands, command, obj=metadata) + assert result.exit_code == 2 + assert ( + f"Error: Invalid value for '-j' / '--jinja-file': File '{template_name}' does not exist." + in result.stdout ) - mocker.patch.dict(pipelines, {pipeline_name: mock_pipeline}) + + +def _kedro_create_env(project_root: Path): + (project_root / "conf" / "remote").mkdir(parents=True) + + +def test_create_airflow_dag_env_parameter_exists(cli_runner, metadata): + """Test the `env` parameter""" + dag_name = "hello_world" + pipeline_name = "__default__" + command = ["airflow", "create", "--env", "remote"] + + _kedro_create_env(Path.cwd()) + + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_{pipeline_name}_dag.py" result = cli_runner.invoke(commands, command, obj=metadata) - assert result.exit_code == 0 + assert result.exit_code == 0, (result.exit_code, result.stdout_bytes) assert dag_file.exists() expected_airflow_dag = 'tasks["node0"] >> tasks["node1"]' - with open(dag_file, "r", encoding="utf-8") as f: + with dag_file.open(encoding="utf-8") as f: dag_code = [line.strip() for line in f.read().splitlines()] assert expected_airflow_dag in dag_code + + +def test_create_airflow_dag_env_parameter_nonexistent(cli_runner, metadata): + """Test the `env` parameter for non-existent value""" + command = ["airflow", "create", "--env", "non-local"] + result = cli_runner.invoke(commands, command, obj=metadata) + assert result.exit_code == 1, (result.exit_code, result.stdout) + assert ( + "Given configuration path either does not exist or is not a valid directory" + in str(result.exception) + ) + + +def test_create_airflow_dag_nonexistent_pipeline(cli_runner, metadata): + """Test executing with a non-existing pipeline""" + command = ["airflow", "create", "--pipeline", "de"] + result = cli_runner.invoke(commands, command, obj=metadata) + assert result.exit_code == 1 + assert ( + "kedro.framework.cli.utils.KedroCliError: Pipeline de not found." + in result.stdout + )