diff --git a/.github/workflows/flink.yaml b/.github/workflows/flink.yaml index eb6655f8..47e4edd4 100644 --- a/.github/workflows/flink.yaml +++ b/.github/workflows/flink.yaml @@ -98,7 +98,7 @@ jobs: - name: Test with pytest run: | - pytest -vvv -s --cov=pangeo_forge_runner tests/integration/test_flink.py + pytest -vvv -s --cov=pangeo_forge_runner tests/integration/test_flink_integration.py kubectl get pod -A kubectl describe pod diff --git a/pangeo_forge_runner/commands/bake.py b/pangeo_forge_runner/commands/bake.py index 4f2fae6a..82f4103e 100644 --- a/pangeo_forge_runner/commands/bake.py +++ b/pangeo_forge_runner/commands/bake.py @@ -222,63 +222,88 @@ def start(self): extra_options["requirements_file"] = str(requirements_path) for name, recipe in recipes.items(): - pipeline_options = bakery.get_pipeline_options( - job_name=self.job_name, - # FIXME: Bring this in from meta.yaml? - container_image=self.container_image, - extra_options=extra_options, - ) - - # Set argv explicitly to empty so Apache Beam doesn't try to parse the commandline - # for pipeline options - we have traitlets doing that for us. - pipeline = Pipeline(options=pipeline_options, argv=[]) - # Chain our recipe to the pipeline. This mutates the `pipeline` object! - # We expect `recipe` to either be a beam PTransform, or an object with a 'to_beam' - # method that returns a transform. - if isinstance(recipe, PTransform): - # This means we are in pangeo-forge-recipes >=0.9 - pipeline | recipe - elif hasattr(recipe, "to_beam"): - # We are in pangeo-forge-recipes <=0.9 - # The import has to be here, as this import is not valid in pangeo-forge-recipes>=0.9 - # NOTE: `StorageConfig` only requires a target; input and metadata caches are optional, - # so those are handled conditionally if provided. - from pangeo_forge_recipes.storage import StorageConfig - - recipe.storage_config = StorageConfig( - target_storage.get_forge_target(job_name=self.job_name), + with feedstock.generate_setup_py() as setup_path: + extra_options["setup_file"] = setup_path + pipeline_options = bakery.get_pipeline_options( + job_name=self.job_name, + # FIXME: Bring this in from meta.yaml? + container_image=self.container_image, + extra_options=extra_options, ) - for attrname, optional_storage in zip( - ("cache", "metadata"), - (input_cache_storage, metadata_cache_storage), - ): - # `.root_path` is an empty string by default, so if the user has not setup this - # optional storage type in config, this block is skipped. - if optional_storage.root_path: - setattr( - recipe.storage_config, - attrname, - optional_storage.get_forge_target( - job_name=self.job_name - ), + # Set argv explicitly to empty so Apache Beam doesn't try to parse the commandline + # for pipeline options - we have traitlets doing that for us. + pipeline = Pipeline(options=pipeline_options, argv=[]) + # Chain our recipe to the pipeline. This mutates the `pipeline` object! + # We expect `recipe` to either be a beam PTransform, or an object with a 'to_beam' + # method that returns a transform. + if isinstance(recipe, PTransform): + # This means we are in pangeo-forge-recipes >=0.9 + pipeline | recipe + elif hasattr(recipe, "to_beam"): + # We are in pangeo-forge-recipes <=0.9 + # The import has to be here, as this import is not valid in pangeo-forge-recipes>=0.9 + # NOTE: `StorageConfig` only requires a target; input and metadata caches are optional, + # so those are handled conditionally if provided. + from pangeo_forge_recipes.storage import StorageConfig + + recipe.storage_config = StorageConfig( + target_storage.get_forge_target(job_name=self.job_name), + ) + for attrname, optional_storage in zip( + ("cache", "metadata"), + (input_cache_storage, metadata_cache_storage), + ): + # `.root_path` is an empty string by default, so if the user has not setup this + # optional storage type in config, this block is skipped. + if optional_storage.root_path: + setattr( + recipe.storage_config, + attrname, + optional_storage.get_forge_target( + job_name=self.job_name + ), + ) + # with configured storage now attached, compile recipe to beam + pipeline | recipe.to_beam() + + # Some bakeries are blocking - if Beam is configured to use them, calling + # pipeline.run() blocks. Some are not. We handle that here, and provide + # appropriate feedback to the user too. + extra = {"recipe": name, "job_name": self.job_name} + if bakery.blocking: + self.log.info( + f"Running job for recipe {name}\n", + extra=extra | {"status": "running"}, + ) + pipeline.run() + else: + result = pipeline.run() + job_id = result.job_id() + self.log.info( + f"Submitted job {job_id} for recipe {name}", + extra=extra | {"job_id": job_id, "status": "submitted"}, + ) + + # Set argv explicitly to empty so Apache Beam doesn't try to parse the commandline + # for pipeline options - we have traitlets doing that for us. + pipeline = Pipeline(options=pipeline_options, argv=[]) + # Chain our recipe to the pipeline. This mutates the `pipeline` object! + pipeline | recipe.to_beam() + + # Some bakeries are blocking - if Beam is configured to use them, calling + # pipeline.run() blocks. Some are not. We handle that here, and provide + # appropriate feedback to the user too. + extra = {"recipe": name, "job_name": self.job_name} + if bakery.blocking: + self.log.info( + f"Running job for recipe {name}\n", + extra=extra | {"status": "running"}, + ) + pipeline.run() + else: + result = pipeline.run() + job_id = result.job_id() + self.log.info( + f"Submitted job {job_id} for recipe {name}", + extra=extra | {"job_id": job_id, "status": "submitted"}, ) - # with configured storage now attached, compile recipe to beam - pipeline | recipe.to_beam() - - # Some bakeries are blocking - if Beam is configured to use them, calling - # pipeline.run() blocks. Some are not. We handle that here, and provide - # appropriate feedback to the user too. - extra = {"recipe": name, "job_name": self.job_name} - if bakery.blocking: - self.log.info( - f"Running job for recipe {name}\n", - extra=extra | {"status": "running"}, - ) - pipeline.run() - else: - result = pipeline.run() - job_id = result.job_id() - self.log.info( - f"Submitted job {job_id} for recipe {name}", - extra=extra | {"job_id": job_id, "status": "submitted"}, - ) diff --git a/pangeo_forge_runner/feedstock.py b/pangeo_forge_runner/feedstock.py index 491a8aaf..4e87d166 100644 --- a/pangeo_forge_runner/feedstock.py +++ b/pangeo_forge_runner/feedstock.py @@ -1,6 +1,9 @@ import ast +import os +from contextlib import contextmanager from copy import deepcopy from pathlib import Path +from textwrap import dedent from typing import Optional from ruamel.yaml import YAML @@ -86,6 +89,44 @@ def parse_recipes(self): return recipes + @contextmanager + def generate_setup_py(self): + """ + Auto-generate a setup.py file for use with apache beam. + + Beam sends all the user code we need to workers by creating an + sdist off a python package. However, our feedstocks only have a + few python files (at best) - mostly just one (recipe.py). We do not + want to impose creating a setup.py file manually for all our users, + so instead we autogenerate one here. + """ + file = dedent( + """ + import setuptools + + setuptools.setup( + name='recipe', + version='0.1', + # FIXME: Support all the files we need to here! + py_modules=["recipe"] + ) + """ + ) + + setup_path = self.feedstock_dir / "setup.py" + with open(setup_path, "w") as f: + f.write(file) + + readme_path = self.feedstock_dir / "readme.md" + + with open(readme_path, "w") as f: + f.write("") + + try: + yield str(setup_path) + finally: + os.remove(setup_path) + def get_expanded_meta(self): """ Return full meta.yaml file, expanding recipes if needed. diff --git a/tests/integration/test_flink.py b/tests/integration/test_flink_integration.py similarity index 100% rename from tests/integration/test_flink.py rename to tests/integration/test_flink_integration.py