diff --git a/.github/workflows/audio.yml b/.github/workflows/audio.yml index b130e276..4ce1be90 100644 --- a/.github/workflows/audio.yml +++ b/.github/workflows/audio.yml @@ -7,12 +7,14 @@ on: branches: - master - develop + - release-* paths: - 'audio/**' pull_request: branches: - master - develop + - release-* paths: - 'audio/**' diff --git a/.github/workflows/cli.yml b/.github/workflows/cli.yml index 654fb06d..f782dffa 100644 --- a/.github/workflows/cli.yml +++ b/.github/workflows/cli.yml @@ -7,6 +7,7 @@ on: branches: - master - develop + - release-* paths: - 'core/**' # since CLI depends on core, we should run tests when core changes - 'cli/**' @@ -14,6 +15,7 @@ on: branches: - master - develop + - release-* paths: - 'core/**' # since CLI depends on core, we should run tests when core changes - 'cli/**' @@ -81,6 +83,9 @@ jobs: - uses: "actions/setup-python@v1" with: python-version: "3.7" + - name: "Install dependency: klio_core" + run: "python -m pip install -e ." + working-directory: core - name: "Install in dev mode" run: "python -m pip install -e .[dev]" - name: "Import package" diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index b8477346..9e991978 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -7,12 +7,14 @@ on: branches: - master - develop + - release-* paths: - 'core/**' pull_request: branches: - master - develop + - release-* paths: - 'core/**' diff --git a/.github/workflows/dependencies.yml b/.github/workflows/dependencies.yml index 7265b0c3..5286f95b 100644 --- a/.github/workflows/dependencies.yml +++ b/.github/workflows/dependencies.yml @@ -8,6 +8,7 @@ on: branches: - master - develop + - release-* paths-ignore: - 'docs/**' - 'devtools/**' @@ -17,6 +18,7 @@ on: branches: - master - develop + - release-* paths-ignore: - 'docs/**' - 'devtools/**' diff --git a/.github/workflows/devtools.yml b/.github/workflows/devtools.yml index d4a6df64..064cbfd4 100644 --- a/.github/workflows/devtools.yml +++ b/.github/workflows/devtools.yml @@ -7,12 +7,14 @@ on: branches: - master - develop + - release-* paths: - 'devtools/**' pull_request: branches: - master - develop + - release-* paths: - 'devtools/**' diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 12008816..3ba17896 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -7,6 +7,7 @@ on: branches: - master - develop + - release-* paths: - 'docs/**' - '**.rst' @@ -14,6 +15,7 @@ on: branches: - master - develop + - release-* paths: - 'docs/**' - '**.rst' diff --git a/.github/workflows/exec.yml b/.github/workflows/exec.yml index 13d6fa0a..774f3fbd 100644 --- a/.github/workflows/exec.yml +++ b/.github/workflows/exec.yml @@ -7,6 +7,7 @@ on: branches: - master - develop + - release-* paths: - 'core/**' # since exec depends on core, we should run tests when core changes - 'lib/**' # since exec depends on lib, we should run tests when lib changes @@ -15,6 +16,7 @@ on: branches: - master - develop + - release-* paths: - 'core/**' # since exec depends on core, we should run tests when core changes - 'lib/**' # since exec depends on lib, we should run tests when lib changes @@ -83,6 +85,12 @@ jobs: - uses: "actions/setup-python@v1" with: python-version: "3.7" + - name: "Install dependency: klio_core" + run: "python -m pip install -e ." + working-directory: core + - name: "Install dependency: klio" + run: "python -m pip install -e ." + working-directory: lib - name: "Install in dev mode" run: "python -m pip install -e .[dev]" - name: "Import package" diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 4c0cb016..66316284 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -5,6 +5,7 @@ on: branches: - master - develop + - release-* paths-ignore: - 'docs/**' - 'devtools/**' @@ -14,6 +15,7 @@ on: branches: - master - develop + - release-* paths-ignore: - 'docs/**' - 'devtools/**' diff --git a/.github/workflows/lib.yml b/.github/workflows/lib.yml index f16db3c4..463ede99 100644 --- a/.github/workflows/lib.yml +++ b/.github/workflows/lib.yml @@ -7,6 +7,7 @@ on: branches: - master - develop + - release-* paths: - 'core/**' # since lib depends on core, we should run tests when core changes - 'lib/**' @@ -14,6 +15,7 @@ on: branches: - master - develop + - release-* paths: - 'core/**' # since lib depends on core, we should run tests when core changes - 'lib/**' @@ -81,6 +83,9 @@ jobs: - uses: "actions/setup-python@v1" with: python-version: "3.7" + - name: "Install dependency: klio_core" + run: "python -m pip install -e ." + working-directory: core - name: "Install in dev mode" run: "python -m pip install -e .[dev]" - name: "Import package" diff --git a/cli/setup.cfg b/cli/setup.cfg index a43e16aa..0de9a5d4 100644 --- a/cli/setup.cfg +++ b/cli/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.0.5 +current_version = 21.2.0 tag_name = cli-{new_version} commit = True tag = True diff --git a/cli/setup.py b/cli/setup.py index b75e7139..5d30fc93 100644 --- a/cli/setup.py +++ b/cli/setup.py @@ -151,7 +151,7 @@ def get_long_description(package_dir): } META_FILE = read(META_PATH) INSTALL_REQUIRES = [ - "klio-core>=0.2.0", + "klio-core>=21.2.0", "click", "dateparser", "docker", diff --git a/cli/src/klio_cli/__init__.py b/cli/src/klio_cli/__init__.py index 5ef51888..235801f0 100644 --- a/cli/src/klio_cli/__init__.py +++ b/cli/src/klio_cli/__init__.py @@ -14,7 +14,7 @@ # __author__ = "The klio developers" -__version__ = "1.0.5" +__version__ = "21.2.0" __email__ = "opensource+klio@spotify.com" __description__ = "Main entrypoint for Klio jobs" __uri__ = "https://github.com/spotify/klio" diff --git a/cli/src/klio_cli/commands/base.py b/cli/src/klio_cli/commands/base.py index 23614c15..06463e36 100644 --- a/cli/src/klio_cli/commands/base.py +++ b/cli/src/klio_cli/commands/base.py @@ -15,12 +15,11 @@ import logging import os -import tempfile import docker -import yaml -from klio_cli.commands.job import configuration +from klio_core.config import core as config_core + from klio_cli.utils import docker_utils @@ -31,8 +30,6 @@ class BaseDockerizedPipeline(object): CONTAINER_GCP_CRED_PATH = os.path.join("/usr", GCP_CRED_FILE) CONTAINER_JOB_DIR = "/usr/src/app" DOCKER_LOGGER_NAME = "klio.base_docker_pipeline" - # path where the temp config-file is mounted into klio-exec's container - MATERIALIZED_CONFIG_PATH = "/usr/src/config/materialized_config.yaml" def __init__(self, job_dir, klio_config, docker_runtime_config): self.job_dir = job_dir @@ -44,7 +41,6 @@ def __init__(self, job_dir, klio_config, docker_runtime_config): # if this is set to true, running the command will generate a temp file # and mount it to the container self.requires_config_file = True - self.materialized_config_file = None @property def _full_image_name(self): @@ -102,12 +98,6 @@ def _get_volumes(self): }, } - if self.materialized_config_file is not None: - volumes[self.materialized_config_file.name] = { - "bind": BaseDockerizedPipeline.MATERIALIZED_CONFIG_PATH, - "mode": "rw", - } - return volumes def _get_command(self, *args, **kwargs): @@ -118,7 +108,10 @@ def _add_base_args(self, command): command.extend( [ "--config-file", - BaseDockerizedPipeline.MATERIALIZED_CONFIG_PATH, + os.path.join( + self.CONTAINER_JOB_DIR, + config_core.RUN_EFFECTIVE_CONFIG_FILE, + ), ] ) return command @@ -178,16 +171,10 @@ def _check_gcp_credentials_exist(self): def _write_effective_config(self): if self.requires_config_file: - self.materialized_config_file = tempfile.NamedTemporaryFile( - prefix="/tmp/", mode="w", delete=False - ) - yaml.dump( - self.klio_config.as_dict(), - stream=self.materialized_config_file, - Dumper=configuration.IndentListDumper, - default_flow_style=False, - sort_keys=False, + path = os.path.join( + self.job_dir, config_core.RUN_EFFECTIVE_CONFIG_FILE ) + self.klio_config.write_to_file(path) def run(self, *args, **kwargs): # bail early diff --git a/cli/src/klio_cli/commands/job/configuration.py b/cli/src/klio_cli/commands/job/configuration.py index 7ba3c33b..b04b25bc 100644 --- a/cli/src/klio_cli/commands/job/configuration.py +++ b/cli/src/klio_cli/commands/job/configuration.py @@ -21,29 +21,7 @@ from klio_core import config as kconfig from klio_core import utils as core_utils - - -class IndentListDumper(yaml.Dumper): - """Force indentation for lists for better visual understanding. - - Instead of this: - - foo: - bar: - - one - - two - - three - - Format list indentations like this: - foo: - bar: - - one - - two - - three - """ - - def increase_indent(self, flow=False, indentless=False): - return super(IndentListDumper, self).increase_indent(flow, False) +from klio_core.config import _utils as config_utils class EffectiveJobConfig(object): @@ -124,7 +102,7 @@ def show(self): yaml.dump( effective_config, stream=sys.stdout, - Dumper=IndentListDumper, + Dumper=config_utils.IndentListDumper, sort_keys=False, ) @@ -142,7 +120,7 @@ def set(self, target_to_value): yaml.dump( effective_config, stream=f, - Dumper=IndentListDumper, + Dumper=config_utils.IndentListDumper, default_flow_style=False, sort_keys=False, ) @@ -156,7 +134,7 @@ def unset(self, target): yaml.dump( self.config_data, stream=f, - Dumper=IndentListDumper, + Dumper=config_utils.IndentListDumper, default_flow_style=False, sort_keys=False, ) diff --git a/cli/src/klio_cli/commands/job/utils/templates/dockerfile.tpl b/cli/src/klio_cli/commands/job/utils/templates/dockerfile.tpl index 9383a3a7..ed297c1e 100644 --- a/cli/src/klio_cli/commands/job/utils/templates/dockerfile.tpl +++ b/cli/src/klio_cli/commands/job/utils/templates/dockerfile.tpl @@ -45,8 +45,11 @@ COPY __init__.py \ {%- endif %} /usr/src/app/ +ARG KLIO_CONFIG=klio-job.yaml {% if not klio.use_fnapi -%} +COPY $KLIO_CONFIG klio-job-run-effective.yaml + RUN pip install . -{% endif -%} -ARG KLIO_CONFIG=klio-job.yaml -COPY $KLIO_CONFIG /usr/src/config/.effective-klio-job.yaml \ No newline at end of file +{%- else -%} +COPY $KLIO_CONFIG /usr/src/config/.effective-klio-job.yaml +{% endif -%} \ No newline at end of file diff --git a/cli/src/klio_cli/commands/job/utils/templates/setup.py.tpl b/cli/src/klio_cli/commands/job/utils/templates/setup.py.tpl index 1031cc25..b0ca98eb 100644 --- a/cli/src/klio_cli/commands/job/utils/templates/setup.py.tpl +++ b/cli/src/klio_cli/commands/job/utils/templates/setup.py.tpl @@ -126,7 +126,7 @@ setuptools.setup( # str(dir where to install files, relative to Python modules), # list(str(non-Python filenames)) # ) - (".", ["klio-job.yaml"]), + (".", ["klio-job-run-effective.yaml"]), ], include_package_data=True, # required # NOTE: Explicitly include Python modules names (all relevant Python files diff --git a/cli/tests/commands/job/utils/fixtures/expected/fnapi/Dockerfile b/cli/tests/commands/job/utils/fixtures/expected/fnapi/Dockerfile index 88687f24..1d7aceae 100644 --- a/cli/tests/commands/job/utils/fixtures/expected/fnapi/Dockerfile +++ b/cli/tests/commands/job/utils/fixtures/expected/fnapi/Dockerfile @@ -33,4 +33,4 @@ COPY __init__.py \ /usr/src/app/ ARG KLIO_CONFIG=klio-job.yaml -COPY $KLIO_CONFIG /usr/src/config/.effective-klio-job.yaml \ No newline at end of file +COPY $KLIO_CONFIG /usr/src/config/.effective-klio-job.yaml diff --git a/cli/tests/commands/job/utils/fixtures/expected/no_fnapi/Dockerfile b/cli/tests/commands/job/utils/fixtures/expected/no_fnapi/Dockerfile index 17f719e0..1ab20a44 100644 --- a/cli/tests/commands/job/utils/fixtures/expected/no_fnapi/Dockerfile +++ b/cli/tests/commands/job/utils/fixtures/expected/no_fnapi/Dockerfile @@ -18,6 +18,7 @@ COPY __init__.py \ # Include any other non-Python files your job needs /usr/src/app/ -RUN pip install . ARG KLIO_CONFIG=klio-job.yaml -COPY $KLIO_CONFIG /usr/src/config/.effective-klio-job.yaml \ No newline at end of file +COPY $KLIO_CONFIG klio-job-run-effective.yaml + +RUN pip install . \ No newline at end of file diff --git a/cli/tests/commands/job/utils/fixtures/expected/no_fnapi/setup.py b/cli/tests/commands/job/utils/fixtures/expected/no_fnapi/setup.py index 4a46b938..71465280 100644 --- a/cli/tests/commands/job/utils/fixtures/expected/no_fnapi/setup.py +++ b/cli/tests/commands/job/utils/fixtures/expected/no_fnapi/setup.py @@ -126,7 +126,7 @@ def run(self): # str(dir where to install files, relative to Python modules), # list(str(non-Python filenames)) # ) - (".", ["klio-job.yaml"]), + (".", ["klio-job-run-effective.yaml"]), ], include_package_data=True, # required # NOTE: Explicitly include Python modules names (all relevant Python files diff --git a/cli/tests/commands/test_base.py b/cli/tests/commands/test_base.py index 338296a0..364ad97d 100644 --- a/cli/tests/commands/test_base.py +++ b/cli/tests/commands/test_base.py @@ -71,13 +71,6 @@ def mock_docker_client(mocker): return mock_client -@pytest.fixture -def mock_materialized_config_file(mocker): - mock = mocker.Mock() - mock.name = "test-config" - return mock - - @pytest.fixture def base_pipeline( klio_config, @@ -115,10 +108,6 @@ def expected_volumes(): "mode": "rw", }, "/test/dir/jobs/test_run_job": {"bind": "/usr/src/app", "mode": "rw"}, - "test-config": { - "bind": base.BaseDockerizedPipeline.MATERIALIZED_CONFIG_PATH, - "mode": "rw", - }, } @@ -148,17 +137,8 @@ def test_get_environment(base_pipeline, expected_envs): def test_get_volumes( - base_pipeline, - expected_volumes, - mocker, - monkeypatch, - mock_materialized_config_file, + base_pipeline, expected_volumes, mocker, monkeypatch, ): - monkeypatch.setattr( - base_pipeline, - "materialized_config_file", - mock_materialized_config_file, - ) assert expected_volumes == base_pipeline._get_volumes() @@ -175,7 +155,6 @@ def test_get_docker_runflags( mocker, monkeypatch, requires_config, - mock_materialized_config_file, ): mock_get_command = mocker.Mock(return_value=["command"]) monkeypatch.setattr(base_pipeline, "_get_command", mock_get_command) @@ -185,19 +164,9 @@ def test_get_docker_runflags( expected_command = ["command"] if requires_config: - monkeypatch.setattr( - base_pipeline, - "materialized_config_file", - mock_materialized_config_file, - ) expected_command.extend( - [ - "--config-file", - base.BaseDockerizedPipeline.MATERIALIZED_CONFIG_PATH, - ] + ["--config-file", "/usr/src/app/klio-job-run-effective.yaml"] ) - else: - expected_volumes.pop("test-config") exp_runflags = { "image": "test-image:foo-123", @@ -331,11 +300,16 @@ def test_run(base_pipeline, mocker, monkeypatch): monkeypatch.setattr( base_pipeline, "_run_docker_container", mock_run_docker_container ) + mock_write_effective_config = mocker.Mock() + monkeypatch.setattr( + base_pipeline, "_write_effective_config", mock_write_effective_config + ) base_pipeline.run() mock_check_gcp_credentials_exist.assert_called_once_with() mock_check_docker_setup.assert_called_once_with() + mock_write_effective_config.assert_called_once_with() mock_setup_docker_image.assert_called_once_with() mock_get_docker_runflags.assert_called_once_with() mock_run_docker_container.assert_called_once_with( diff --git a/core/setup.cfg b/core/setup.cfg index e447dc33..5705a41b 100644 --- a/core/setup.cfg +++ b/core/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.2.2 +current_version = 21.2.0 commit = True tag = True tag_name = core-{new_version} diff --git a/core/src/klio_core/__init__.py b/core/src/klio_core/__init__.py index f498256a..73594fef 100644 --- a/core/src/klio_core/__init__.py +++ b/core/src/klio_core/__init__.py @@ -14,7 +14,7 @@ # __author__ = "The klio developers" -__version__ = "0.2.2" +__version__ = "21.2.0" __email__ = "opensource+klio@spotify.com" __description__ = "Core klio library for common functionality" __uri__ = "https://github.com/spotify/klio" diff --git a/core/src/klio_core/config/_utils.py b/core/src/klio_core/config/_utils.py index 9b20889b..bcaeb4be 100644 --- a/core/src/klio_core/config/_utils.py +++ b/core/src/klio_core/config/_utils.py @@ -16,6 +16,7 @@ import logging import attr +import yaml from klio_core.config import _converters as converters @@ -174,3 +175,26 @@ def init_from_dict(self, config_dict={}, **extra_kwargs): attrib_cls.__init__ = init_from_dict attrib_cls.from_values = original_init return attrib_cls + + +class IndentListDumper(yaml.Dumper): + """Force indentation for lists for better visual understanding. + + Instead of this: + + foo: + bar: + - one + - two + - three + + Format list indentations like this: + foo: + bar: + - one + - two + - three + """ + + def increase_indent(self, flow=False, indentless=False): + return super(IndentListDumper, self).increase_indent(flow, False) diff --git a/core/src/klio_core/config/core.py b/core/src/klio_core/config/core.py index b3da3ee8..793e5755 100644 --- a/core/src/klio_core/config/core.py +++ b/core/src/klio_core/config/core.py @@ -14,8 +14,10 @@ # import logging +import os import attr +import yaml from klio_core.config import _io as io from klio_core.config import _preprocessing as preprocessing @@ -23,6 +25,19 @@ logger = logging.getLogger("klio") + +# path used by both klio-cli and klioexec to write and read the config when +# running a job +RUN_EFFECTIVE_CONFIG_FILE = "klio-job-run-effective.yaml" +RUN_EFFECTIVE_CONFIG_PATH = os.path.join( + "/usr/src/app", RUN_EFFECTIVE_CONFIG_FILE +) + +# location where workers expect to load the config file from when using setup.py +WORKER_RUN_EFFECTIVE_CONFIG_PATH = os.path.join( + "/usr/local", RUN_EFFECTIVE_CONFIG_FILE +) + WORKER_DISK_TYPE_URL = ( "compute.googleapis.com/projects/{project}/regions/{region}/" "diskTypes/{disk_type}" @@ -109,6 +124,31 @@ def as_dict(self): """ return super().as_dict() + def write_to_file(self, path_or_stream): + """write the config object as a yaml file + + Args: + path_or_stream: either a string of a file path or a writable stream + """ + + def _write(stream): + yaml.dump( + self.as_dict(), + stream=stream, + Dumper=utils.IndentListDumper, + default_flow_style=False, + sort_keys=False, + ) + + if isinstance(path_or_stream, str): + dirname = os.path.dirname(path_or_stream) + if not os.path.exists(dirname): + os.makedirs(dirname) + with open(path_or_stream, "w") as stream: + _write(stream) + else: + _write(path_or_stream) + @attr.attrs class KlioIOConfigContainer(object): diff --git a/docs/src/conf.py b/docs/src/conf.py index 30b2b559..c235831e 100644 --- a/docs/src/conf.py +++ b/docs/src/conf.py @@ -156,6 +156,7 @@ def find_version(*file_paths): linkcheck_ignore = [ r"https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load", r"https://matplotlib.org/api/_as_gen/matplotlib.figure.Figure.html#matplotlib.figure.Figure", + r"https://cloud.google.com/logging/docs/logs-based-metrics/#distribution_metrics", # ignore local links r"\./.+\.html", ] diff --git a/docs/src/faqs/migrate_from_fnapi.rst b/docs/src/faqs/migrate_from_fnapi.rst index 3aabde3b..86b5c667 100644 --- a/docs/src/faqs/migrate_from_fnapi.rst +++ b/docs/src/faqs/migrate_from_fnapi.rst @@ -20,7 +20,6 @@ Creating a new Klio job that does not use the FnAPI from the start via: Limitations and Warnings ------------------------ -* Currently, Klio in non-FnAPI mode does not yet support jobs with multiple configuration files. Support is planned. * ``pipeline_options.requirements_file`` configuration for `pipeline dependencies`_ **will not work** for Klio jobs. While klio will honor that configuration value for Dataflow to pick up, declaring requirements in ``setup.py`` is needed because a Klio job inherently has multiple Python files. * While Klio will still upload the worker image to `Google Container Registry`_ when running/deploying a job, Dataflow will *not* use the image. It is good practice to upload the worker image to ensure repeatable builds, but in the future, an option will be added to skip the upload. @@ -65,7 +64,7 @@ explicitly including non-Python files needed for a job (i.e. a model, a JSON sch description="My example job using setup.py", # optional install_requires=["tensorflow"], # optional data_files=[ # required - (".", ["klio-job.yaml", "my-model.h5"]), + (".", ["klio-job-run-effective.yaml", "my-model.h5"]), ], include_package_data=True, # required py_modules=["run", "transforms"], # required diff --git a/docs/src/faqs/setup_packaging.rst b/docs/src/faqs/setup_packaging.rst index 7d7655a8..3dd404ef 100644 --- a/docs/src/faqs/setup_packaging.rst +++ b/docs/src/faqs/setup_packaging.rst @@ -60,7 +60,7 @@ explicitly including non-Python files needed for a job (i.e. a model, a JSON sch description="My example job using setup.py", # optional install_requires=["tensorflow"], # optional data_files=[ # required - (".", ["klio-job.yaml", "my-model.h5"]), + (".", ["klio-job-run-effective.yaml", "my-model.h5"]), ], include_package_data=True, # required py_modules=["run", "transforms"], # required @@ -172,7 +172,7 @@ explicitly including non-Python files needed for a job (i.e. a model, a JSON sch # str(dir where to install files, relative to Python modules), # list(str(non-Python filenames)) # ) - (".", ["klio-job.yaml", "my-model.h5"]), + (".", ["klio-job-run-effective.yaml", "my-model.h5"]), ], include_package_data=True, # required py_modules=["run", "transforms"], # required @@ -223,7 +223,6 @@ A file called ``MANIFEST.in`` is needed in the **root of your job's directory** Limitations and Warnings ------------------------ -* Currently, Klio in non-FnAPI mode does not yet support jobs with multiple configuration files. Support is planned. * ``pipeline_options.requirements_file`` configuration for `pipeline dependencies`_ **will not work** for Klio jobs. While klio will honor that configuration value for Dataflow to pick up, declaring requirements in ``setup.py`` is needed because a Klio job inherently has multiple Python files. * While Klio will still upload the worker image to `Google Container Registry`_ when running/deploying a job, Dataflow will *not* use the image. It is good practice to upload the worker image to ensure repeatable builds, but in the future, an option will be added to skip the upload. diff --git a/docs/src/reference/cli/changelog.rst b/docs/src/reference/cli/changelog.rst index 8ca7c1e1..fa69d2e9 100644 --- a/docs/src/reference/cli/changelog.rst +++ b/docs/src/reference/cli/changelog.rst @@ -1,6 +1,21 @@ CLI Changelog ============= +21.2.0 (2021-03-16) +------------------- + +Fixed +***** + +* The ``--config-file`` flag can now be used in ``setup.py`` projects. + +Changed +******* + +* Moved ``IndentListDumper`` to ``klio_core`` config utils. +* Runtime config file for ``klioexec`` now written to ``klio-job-run-effective.yaml`` in the job's directory. + + 1.0.5 (2021-01-26) ------------------ diff --git a/docs/src/reference/core/changelog.rst b/docs/src/reference/core/changelog.rst index 107b087b..215a3551 100644 --- a/docs/src/reference/core/changelog.rst +++ b/docs/src/reference/core/changelog.rst @@ -1,6 +1,15 @@ Changelog ========= +21.2.0 (2021-03-16) +------------------- + +Changed +******* + +* Moved ``IndentListDumper`` to ``klio_core`` config utils. + + 0.2.2 (2021-02-11) ------------------ @@ -11,6 +20,7 @@ Fixed * Fixed config bug that skipped preprocessing (overrides, templates) of dict parsed from YAML * ``KlioWriteToAvro`` has been enabled as an output event type (previously missing). + 0.2.1 (2020-12-03) ------------------ diff --git a/docs/src/reference/executor/changelog.rst b/docs/src/reference/executor/changelog.rst index 8b19e001..5ff44824 100644 --- a/docs/src/reference/executor/changelog.rst +++ b/docs/src/reference/executor/changelog.rst @@ -1,6 +1,15 @@ Changelog ========= +21.2.0 (2021-03-16) +------------------- + +Fixed +***** + +* ``klioexec`` now writes runtime config to include in ``setup.py`` distribution. + + 0.2.2 (2021-01-14) ------------------ diff --git a/docs/src/reference/lib/changelog.rst b/docs/src/reference/lib/changelog.rst index 54b571bd..dd09ebab 100644 --- a/docs/src/reference/lib/changelog.rst +++ b/docs/src/reference/lib/changelog.rst @@ -15,6 +15,17 @@ Changed * The Beam metrics client will always be used, no matter the configured runner. * Marked Klio's support for Stackdriver log-based metrics for deprecation and eventual removal. + +21.2.0 (2021-03-16) +------------------- + +Changed +******* + +* Changed thread limiter logging to debug. +* Workers will look for ``klio-job-run-effective.yaml`` before dropping back to ``.effective-klio-job.yaml``. + + 0.2.4 (2021-01-14) ------------------ diff --git a/exec/setup.cfg b/exec/setup.cfg index 6290a57f..d8117c7c 100644 --- a/exec/setup.cfg +++ b/exec/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.2.2 +current_version = 21.2.0 commit = True tag = True tag_name = exec-{new_version} diff --git a/exec/setup.py b/exec/setup.py index 24c334be..8f1cc2d5 100644 --- a/exec/setup.py +++ b/exec/setup.py @@ -153,8 +153,8 @@ def get_long_description(package_dir): INSTALL_REQUIRES = [ "attrs", "click", - "klio-core>=0.2.0", - "klio>=0.2.0", + "klio-core>=21.2.0", + "klio>=21.2.0", "pyyaml", # 2.22 added DirectRunner support for `DoFn.setup` "apache-beam[gcp]>2.21.0", diff --git a/exec/src/klio_exec/__init__.py b/exec/src/klio_exec/__init__.py index 7faddb3c..a0190987 100644 --- a/exec/src/klio_exec/__init__.py +++ b/exec/src/klio_exec/__init__.py @@ -14,7 +14,7 @@ # __author__ = "The klio developers" -__version__ = "0.2.2" +__version__ = "21.2.0" __email__ = "opensource+klio@spotify.com" __description__ = "Component of klio project that reduces boilerplate while \ writing YAML-config-driven, Dockerized python Beam pipelines\ diff --git a/exec/src/klio_exec/cli.py b/exec/src/klio_exec/cli.py index 345eb9ca..62eff7a2 100644 --- a/exec/src/klio_exec/cli.py +++ b/exec/src/klio_exec/cli.py @@ -57,11 +57,9 @@ def _get_config(config_path): raise SystemExit(1) +# TODO: remove when after internal calls are removed def _compare_runtime_to_buildtime_config(klio_config): - buildtime_config_path = "/usr/src/config/.effective-klio-job.yaml" - buildtime_config = config.KlioConfig(_get_config(buildtime_config_path)) - - return buildtime_config.as_dict() == klio_config.as_dict() + pass @main.command("run") @@ -74,20 +72,10 @@ def run_pipeline( image_tag, direct_runner, update, klio_config, config_meta, blocking ): - # Prompt user to continue if runtime config file is not the same as - # the buildtime config file. Do this after _get_config since that - # will prompt the user if their config file doesn't even exist first. - if _compare_runtime_to_buildtime_config(klio_config) is False: - msg = ( - "The Klio config file '{}' at runtime differs from the config " - "file used when building this Docker image. If this is unexpected " - "behavior, please double check your runtime config, or rebuild " - "your Docker image with the correct config file." - ) - logging.warning(msg.format(config_meta.config_path)) - # RunConfig ensures config is pickled and sent to worker. Note this # depends on save_main_session being True + # Notice, this is currently unused due to dataflow pickling issues, leaving + # in for compatibility until a fix/alternative solution is in place klio_transforms_core.RunConfig.set(klio_config) # This can only be imported after RunConfig is set since it will end up diff --git a/exec/src/klio_exec/commands/run.py b/exec/src/klio_exec/commands/run.py index d90da885..a83cf967 100644 --- a/exec/src/klio_exec/commands/run.py +++ b/exec/src/klio_exec/commands/run.py @@ -26,6 +26,7 @@ from klio import transforms from klio.transforms import helpers from klio_core import __version__ as klio_core_version +from klio_core.config import core as config_core from klio_exec import __version__ as klio_exec_version @@ -39,6 +40,15 @@ "klio-cli": "KLIO_CLI_VERSION", } +RUN_CONFIG_PREAMBLE = """# +# THIS FILE IS AUTO-GENERATED BY KLIO +# +# To change your job's configuration, edit the job's klio-job.yaml or use the +# --config-file flag with klio-job-run to pass the path of a different +# configuration file to klio +# +""" + # Regex according to (https://cloud.google.com/resource-manager/ # docs/creating-managing-labels#requirements) # otherwise, deployments will fail @@ -260,11 +270,42 @@ def _get_run_callable(self): ) raise SystemExit(1) + def _write_run_effective_config(self): + # this method assumes setup.py is being used! + if self.runtime_conf.direct_runner: + path = config_core.WORKER_RUN_EFFECTIVE_CONFIG_PATH + else: + path = config_core.RUN_EFFECTIVE_CONFIG_PATH + logging.debug( + "Writing runtime configuration to {}" + " in the job's running docker container.".format(path) + ) + with open(path, "w") as f: + f.write(RUN_CONFIG_PREAMBLE) + self.config.write_to_file(f) + + def _verify_setup_py(self): + # verify that setup.py has a reference to the runtime config file + data_files_line = f'(".", ["{config_core.RUN_EFFECTIVE_CONFIG_FILE}"])' + setup_file = self.config.pipeline_options.setup_file + with open(setup_file, "r") as r: + data = r.read() + if config_core.RUN_EFFECTIVE_CONFIG_FILE not in data: + logging.warning( + "Reference to 'klio-job-run-effective.yaml'" + " appears to be missing in 'setup.py'. Please" + " ensure that the 'data_files' list includes " + f"the tuple: {data_files_line}." + ) + def _verify_packaging(self): pipeline_opts = self.config.pipeline_options experiments = pipeline_opts.experiments fnapi_enabled = "beam_fn_api" in experiments has_setup_file = pipeline_opts.setup_file is not None + setup_file_exists = has_setup_file and os.path.exists( + pipeline_opts.setup_file + ) has_reqs_file = pipeline_opts.requirements_file is not None if fnapi_enabled and any([has_setup_file, has_reqs_file]): logging.error( @@ -274,9 +315,6 @@ def _verify_packaging(self): raise SystemExit(1) if not pipeline_opts.streaming: - setup_file_exists = has_setup_file and os.path.exists( - pipeline_opts.setup_file - ) if fnapi_enabled: logging.warn( "Support for batch jobs using the 'beam_fn_api' " @@ -297,6 +335,12 @@ def _verify_packaging(self): ) raise SystemExit(1) + if setup_file_exists: + # when using setup.py, dump the current config to a runtime config + # file to be included in the distribution package + self._verify_setup_py() + self._write_run_effective_config() + def _setup_data_io_filters(self, in_pcol, label_prefix=None): # label prefixes are required for multiple inputs (to avoid label # name collisions in Beam) diff --git a/exec/tests/unit/commands/test_run.py b/exec/tests/unit/commands/test_run.py index 957c070b..2d1c9790 100644 --- a/exec/tests/unit/commands/test_run.py +++ b/exec/tests/unit/commands/test_run.py @@ -203,6 +203,26 @@ def test_get_image_tag(image, tag, expected_image): assert expected_image == actual_image +@pytest.mark.parametrize("direct_runner", (True, False)) +def test_write_run_effective_config(mocker, direct_runner): + if direct_runner: + expected_path = "/usr/local/klio-job-run-effective.yaml" + else: + expected_path = "/usr/src/app/klio-job-run-effective.yaml" + + mock_config = mocker.Mock() + mock_runtime_conf = mocker.Mock() + mock_runtime_conf.direct_runner = direct_runner + m_open = mocker.mock_open() + mock_open = mocker.patch("klio_exec.commands.run.open", m_open) + + kpipe = run.KlioPipeline("test-job", mock_config, mock_runtime_conf) + kpipe._write_run_effective_config() + + mock_open.assert_called_once_with(expected_path, "w") + mock_config.write_to_file.assert_called_once_with(mock_open.return_value) + + @pytest.mark.parametrize("streaming", (True, False)) @pytest.mark.parametrize( "exp,setup_file, requirements_file", @@ -225,6 +245,10 @@ def test_verify_packaging( mock_path_exists = mocker.patch.object(os.path, "exists") mock_path_exists.return_value = True + mock_write_run_effective_config = mocker.patch.object( + run.KlioPipeline, "_write_run_effective_config" + ) + kpipe = run.KlioPipeline("test-job", mock_config, mocker.Mock()) if not streaming and not any([requirements_file, setup_file, exp]): @@ -233,6 +257,11 @@ def test_verify_packaging( else: kpipe._verify_packaging() + if setup_file and not exp: + mock_write_run_effective_config.assert_called_once() + else: + mock_write_run_effective_config.assert_not_called() + def test_verify_packaging_with_both_packagaing_systems_raises(mocker): mock_config = mocker.Mock() diff --git a/exec/tests/unit/test_cli.py b/exec/tests/unit/test_cli.py index 6e4804bc..657f8e0b 100644 --- a/exec/tests/unit/test_cli.py +++ b/exec/tests/unit/test_cli.py @@ -196,42 +196,6 @@ def test_get_config_raises(tmpdir, caplog): assert 1 == len(caplog.records) -@pytest.mark.parametrize( - "addl_runtime_data,buildtime_exists,exp_retval", - ( - (False, True, True), - (True, True, False), - (False, False, True), - (True, False, False), # not possible but CYA - ), -) -def test_compare_runtime_to_buildtime_config( - mocker, monkeypatch, addl_runtime_data, buildtime_exists, exp_retval -): - monkeypatch.setattr(os.path, "exists", lambda x: buildtime_exists) - - buildtime_data = {"job_name": "foo", "job_config": {}} - runtime_data = buildtime_data.copy() - if addl_runtime_data: - runtime_data["job_config"] = runtime_data["job_config"].copy() - runtime_data["job_config"]["foo"] = "bar" - - # multiple `open` mocks: https://stackoverflow.com/a/26830397/1579977 - open_name = "klio_exec.cli.open" - buildtime_data_str = yaml.dump(buildtime_data).encode("utf-8") - - runtime_conf = kconfig.KlioConfig(runtime_data) - - mock_open_buildtime = mocker.mock_open(read_data=buildtime_data_str) - mock_open = mocker.patch(open_name, mock_open_buildtime) - - side_effect = (mock_open_buildtime.return_value,) - mock_open.side_effect = side_effect - - act_retval = cli._compare_runtime_to_buildtime_config(runtime_conf) - assert exp_retval == act_retval - - @pytest.mark.parametrize("blocking", (True, False, None)) @pytest.mark.parametrize( "image_tag,direct_runner,update", @@ -252,7 +216,6 @@ def test_run_pipeline( cli_runner, mock_klio_config, patch_run_basic_pipeline, - mock_compare_runtime_to_buildtime_config, ): mock_compare_runtime_to_buildtime_config.return_value = True runtime_conf = cli.RuntimeConfig( @@ -285,34 +248,21 @@ def test_run_pipeline( mock_klio_config.assert_calls() patch_run_basic_pipeline.assert_called_once_with() - mock_compare_runtime_to_buildtime_config.assert_called_once_with( - mock_klio_config.klio_config - ) @pytest.mark.parametrize( - "config_file_override,compare_conf_retval", - ( - (None, True), - (None, False), - ("klio-job2.yaml", True), - ("klio-job2.yaml", False), - ), + "config_file_override", (None, "klio-job2.yaml"), ) def test_run_pipeline_conf_override( config_file_override, - compare_conf_retval, cli_runner, config, mock_klio_config, patch_run_basic_pipeline, - mock_compare_runtime_to_buildtime_config, caplog, tmpdir, monkeypatch, ): - mock_compare_runtime_to_buildtime_config.return_value = compare_conf_retval - cli_inputs = [] temp_dir = tmpdir.mkdir("testing123") @@ -335,15 +285,7 @@ def test_run_pipeline_conf_override( patch_run_basic_pipeline.assert_called_once_with() - mock_compare_runtime_to_buildtime_config.assert_called_once_with( - mock_klio_config.klio_config - ) - - if compare_conf_retval is False: - assert 1 == len(caplog.records) - assert "WARNING" == caplog.records[0].levelname - else: - assert 0 == len(caplog.records) + assert 0 == len(caplog.records) @pytest.mark.parametrize("config_file_override", (None, "klio-job2.yaml")) diff --git a/integration/audio-spectrograms/Dockerfile b/integration/audio-spectrograms/Dockerfile index 0607632a..e4959595 100644 --- a/integration/audio-spectrograms/Dockerfile +++ b/integration/audio-spectrograms/Dockerfile @@ -30,5 +30,5 @@ COPY __init__.py \ /usr/src/app/ ARG KLIO_CONFIG=klio-job.yaml -COPY $KLIO_CONFIG /usr/src/config/.effective-klio-job.yaml +COPY $KLIO_CONFIG klio-job-run-effective.yaml RUN pip install . diff --git a/integration/audio-spectrograms/expected_job_output.txt b/integration/audio-spectrograms/expected_job_output.txt index 638552c2..6d8c806b 100644 --- a/integration/audio-spectrograms/expected_job_output.txt +++ b/integration/audio-spectrograms/expected_job_output.txt @@ -1,5 +1,6 @@ INFO:root:Found worker image: integration-klio-audio:audio-spectrograms INFO:matplotlib.font_manager:Generating new fontManager, this may take some time... +DEBUG:klio:Loading config file from /usr/local/klio-job-run-effective.yaml. DEBUG:klio:KlioMessage full audit log - Entity ID: - Path: fluffy-zelda-glitch-toki-kobe::klio-audio (current job) DEBUG:klio:Process 'battleclip_daq': Ping mode OFF. DEBUG:klio:Downloading battleclip_daq.ogg from gs://klio-integration/beatbox diff --git a/integration/audio-spectrograms/setup.py b/integration/audio-spectrograms/setup.py index 6ddc8b1a..786f7f19 100644 --- a/integration/audio-spectrograms/setup.py +++ b/integration/audio-spectrograms/setup.py @@ -144,6 +144,7 @@ def run(self): # str(dir where to install files, relative to Python modules), # list(str(non-Python filenames)) # ) + (".", ["klio-job-run-effective.yaml"]), ], include_package_data=True, # required # NOTE: Explicitly include Python modules names (all relevant Python files diff --git a/integration/batch-modular-default/Dockerfile b/integration/batch-modular-default/Dockerfile index 80519c14..963d9709 100644 --- a/integration/batch-modular-default/Dockerfile +++ b/integration/batch-modular-default/Dockerfile @@ -26,5 +26,5 @@ COPY __init__.py \ /usr/src/app/ ARG KLIO_CONFIG=klio-job.yaml -COPY $KLIO_CONFIG /usr/src/config/.effective-klio-job.yaml +COPY $KLIO_CONFIG klio-job-run-effective.yaml RUN pip install . diff --git a/integration/batch-modular-default/setup.py b/integration/batch-modular-default/setup.py index e9713961..76d63d51 100644 --- a/integration/batch-modular-default/setup.py +++ b/integration/batch-modular-default/setup.py @@ -144,6 +144,7 @@ def run(self): # str(dir where to install files, relative to Python modules), # list(str(non-Python filenames)) # ) + (".", ["klio-job-run-effective.yaml"]), ], include_package_data=True, # required # NOTE: Explicitly include Python modules names (all relevant Python files diff --git a/integration/batch-modular-default/tests/test_transforms.py b/integration/batch-modular-default/tests/test_transforms.py index 0beca63f..71b0a831 100644 --- a/integration/batch-modular-default/tests/test_transforms.py +++ b/integration/batch-modular-default/tests/test_transforms.py @@ -41,27 +41,44 @@ def klio_msg(): @pytest.fixture def expected_log_messages(klio_msg): return [ - ( - "KlioThreadLimiter(name=LogKlioMessage.process) Blocked – " - "waiting on semaphore for an available thread (available threads:" - ), - ( - "KlioThreadLimiter(name=LogKlioMessage.process) Released " - "semaphore (available threads:" - ), - "Received element {}".format(klio_msg.data.element), - "Received payload {}".format(klio_msg.data.payload), + { + "level": "DEBUG", + "message": ( + "KlioThreadLimiter(name=LogKlioMessage.process) Blocked – " + "waiting on semaphore for an available thread (available threads:" + ), + }, + { + "level": "DEBUG", + "message": ( + "KlioThreadLimiter(name=LogKlioMessage.process) Released " + "semaphore (available threads:" + ), + }, + { + "level": "DEBUG", + "message": "Loading config file from " + "/usr/local/klio-job-run-effective.yaml.", + }, + { + "level": "INFO", + "message": "Received element {}".format(klio_msg.data.element), + }, + { + "level": "INFO", + "message": "Received payload {}".format(klio_msg.data.payload), + }, ] def test_process(klio_msg, expected_log_messages, caplog): helloklio_fn = transforms.LogKlioMessage() output = helloklio_fn.process(klio_msg.SerializeToString()) - assert klio_msg.SerializeToString() == list(output)[0] assert len(caplog.records) == len(expected_log_messages) for index, record in enumerate(caplog.records): - assert "INFO" == record.levelname - assert expected_log_messages[index] in record.message + expected_log_message = expected_log_messages[index] + assert expected_log_message["level"] == record.levelname + assert expected_log_message["message"] in record.message diff --git a/integration/multi-event-input-batch/Dockerfile b/integration/multi-event-input-batch/Dockerfile index e029e822..8440eedf 100644 --- a/integration/multi-event-input-batch/Dockerfile +++ b/integration/multi-event-input-batch/Dockerfile @@ -26,5 +26,5 @@ COPY __init__.py \ /usr/src/app/ ARG KLIO_CONFIG=klio-job.yaml -COPY $KLIO_CONFIG /usr/src/config/.effective-klio-job.yaml +COPY $KLIO_CONFIG klio-job-run-effective.yaml RUN pip install . diff --git a/integration/multi-event-input-batch/setup.py b/integration/multi-event-input-batch/setup.py index 3f1b8b8f..9e2f1845 100644 --- a/integration/multi-event-input-batch/setup.py +++ b/integration/multi-event-input-batch/setup.py @@ -145,6 +145,7 @@ def run(self): # str(dir where to install files, relative to Python modules), # list(str(non-Python filenames)) # ) + (".", ["klio-job-run-effective.yaml"]), ], include_package_data=True, # required # NOTE: Explicitly include Python modules names (all relevant Python files diff --git a/integration/read-bq-write-bq/Dockerfile b/integration/read-bq-write-bq/Dockerfile index 138fe088..3dc6833b 100644 --- a/integration/read-bq-write-bq/Dockerfile +++ b/integration/read-bq-write-bq/Dockerfile @@ -25,5 +25,5 @@ COPY __init__.py \ /usr/src/app/ ARG KLIO_CONFIG=klio-job.yaml -COPY $KLIO_CONFIG /usr/src/config/.effective-klio-job.yaml +COPY $KLIO_CONFIG klio-job-run-effective.yaml RUN pip install . diff --git a/integration/read-bq-write-bq/setup.py b/integration/read-bq-write-bq/setup.py index cd5d51b9..0c0e60dc 100644 --- a/integration/read-bq-write-bq/setup.py +++ b/integration/read-bq-write-bq/setup.py @@ -145,6 +145,7 @@ def run(self): # str(dir where to install files, relative to Python modules), # list(str(non-Python filenames)) # ) + (".", ["klio-job-run-effective.yaml"]), ], include_package_data=True, # required # NOTE: Explicitly include Python modules names (all relevant Python files diff --git a/integration/read-bq-write-bq/tests/test_transforms.py b/integration/read-bq-write-bq/tests/test_transforms.py index a18e086f..c9692343 100644 --- a/integration/read-bq-write-bq/tests/test_transforms.py +++ b/integration/read-bq-write-bq/tests/test_transforms.py @@ -41,17 +41,34 @@ def klio_msg(): @pytest.fixture def expected_log_messages(klio_msg): return [ - ( - "KlioThreadLimiter(name=LogKlioMessage.process) Blocked – " - "waiting on semaphore for an available thread (available threads:" - ), - ( - "KlioThreadLimiter(name=LogKlioMessage.process) Released " - "semaphore (available threads:" - ), - "Hello, Klio!", - "Received element {}".format(klio_msg.data.element), - "Received payload {}".format(klio_msg.data.payload), + { + "level": "DEBUG", + "message": ( + "KlioThreadLimiter(name=LogKlioMessage.process) Blocked – " + "waiting on semaphore for an available thread (available threads:" + ), + }, + { + "level": "DEBUG", + "message": ( + "KlioThreadLimiter(name=LogKlioMessage.process) Released " + "semaphore (available threads:" + ), + }, + { + "level": "DEBUG", + "message": "Loading config file from " + "/usr/local/klio-job-run-effective.yaml.", + }, + {"level": "INFO", "message": "Hello, Klio!"}, + { + "level": "INFO", + "message": "Received element {}".format(klio_msg.data.element), + }, + { + "level": "INFO", + "message": "Received payload {}".format(klio_msg.data.payload), + }, ] @@ -73,6 +90,6 @@ def test_process(klio_msg, expected_log_messages, caplog): assert len(caplog.records) == len(expected_log_messages) for index, record in enumerate(caplog.records): - assert "INFO" == record.levelname - assert expected_log_messages[index] in record.message - + expected_log_message = expected_log_messages[index] + assert expected_log_message["level"] == record.levelname + assert expected_log_message["message"] in record.message diff --git a/integration/read-file-write-file/Dockerfile b/integration/read-file-write-file/Dockerfile index 80519c14..963d9709 100644 --- a/integration/read-file-write-file/Dockerfile +++ b/integration/read-file-write-file/Dockerfile @@ -26,5 +26,5 @@ COPY __init__.py \ /usr/src/app/ ARG KLIO_CONFIG=klio-job.yaml -COPY $KLIO_CONFIG /usr/src/config/.effective-klio-job.yaml +COPY $KLIO_CONFIG klio-job-run-effective.yaml RUN pip install . diff --git a/integration/read-file-write-file/setup.py b/integration/read-file-write-file/setup.py index 383898be..bc68fe56 100644 --- a/integration/read-file-write-file/setup.py +++ b/integration/read-file-write-file/setup.py @@ -144,6 +144,7 @@ def run(self): # str(dir where to install files, relative to Python modules), # list(str(non-Python filenames)) # ) + (".", ["klio-job-run-effective.yaml"]), ], include_package_data=True, # required # NOTE: Explicitly include Python modules names (all relevant Python files diff --git a/integration/read-file-write-file/tests/test_transforms.py b/integration/read-file-write-file/tests/test_transforms.py index 4a8a6a5a..e3b85f95 100644 --- a/integration/read-file-write-file/tests/test_transforms.py +++ b/integration/read-file-write-file/tests/test_transforms.py @@ -42,17 +42,34 @@ def klio_msg(): @pytest.fixture def expected_log_messages(klio_msg): return [ - ( - "KlioThreadLimiter(name=LogKlioMessage.process) Blocked – waiting " - "on semaphore for an available thread (available threads:" - ), - ( - "KlioThreadLimiter(name=LogKlioMessage.process) Released " - "semaphore (available threads:" - ), - "Hello, Klio!", - "Received element {}".format(klio_msg.data.element), - "Received payload {}".format(klio_msg.data.payload), + { + "level": "DEBUG", + "message": ( + "KlioThreadLimiter(name=LogKlioMessage.process) Blocked – " + "waiting on semaphore for an available thread (available threads:" + ), + }, + { + "level": "DEBUG", + "message": ( + "KlioThreadLimiter(name=LogKlioMessage.process) Released " + "semaphore (available threads:" + ), + }, + { + "level": "DEBUG", + "message": "Loading config file from " + "/usr/local/klio-job-run-effective.yaml.", + }, + {"level": "INFO", "message": "Hello, Klio!"}, + { + "level": "INFO", + "message": "Received element {}".format(klio_msg.data.element), + }, + { + "level": "INFO", + "message": "Received payload {}".format(klio_msg.data.payload), + }, ] @@ -64,6 +81,6 @@ def test_process(klio_msg, expected_log_messages, caplog): assert len(caplog.records) == len(expected_log_messages) for index, record in enumerate(caplog.records): - assert "INFO" == record.levelname - assert expected_log_messages[index] in record.message - + expected_log_message = expected_log_messages[index] + assert expected_log_message["level"] == record.levelname + assert expected_log_message["message"] in record.message diff --git a/integration/read-file/Dockerfile b/integration/read-file/Dockerfile index 80519c14..963d9709 100644 --- a/integration/read-file/Dockerfile +++ b/integration/read-file/Dockerfile @@ -26,5 +26,5 @@ COPY __init__.py \ /usr/src/app/ ARG KLIO_CONFIG=klio-job.yaml -COPY $KLIO_CONFIG /usr/src/config/.effective-klio-job.yaml +COPY $KLIO_CONFIG klio-job-run-effective.yaml RUN pip install . diff --git a/integration/read-file/setup.py b/integration/read-file/setup.py index 383898be..bc68fe56 100644 --- a/integration/read-file/setup.py +++ b/integration/read-file/setup.py @@ -144,6 +144,7 @@ def run(self): # str(dir where to install files, relative to Python modules), # list(str(non-Python filenames)) # ) + (".", ["klio-job-run-effective.yaml"]), ], include_package_data=True, # required # NOTE: Explicitly include Python modules names (all relevant Python files diff --git a/integration/read-file/tests/test_transforms.py b/integration/read-file/tests/test_transforms.py index e0539501..e3b85f95 100644 --- a/integration/read-file/tests/test_transforms.py +++ b/integration/read-file/tests/test_transforms.py @@ -42,17 +42,34 @@ def klio_msg(): @pytest.fixture def expected_log_messages(klio_msg): return [ - ( - "KlioThreadLimiter(name=LogKlioMessage.process) Blocked – " - "waiting on semaphore for an available thread (available threads:" - ), - ( - "KlioThreadLimiter(name=LogKlioMessage.process) Released " - "semaphore (available threads:" - ), - "Hello, Klio!", - "Received element {}".format(klio_msg.data.element), - "Received payload {}".format(klio_msg.data.payload), + { + "level": "DEBUG", + "message": ( + "KlioThreadLimiter(name=LogKlioMessage.process) Blocked – " + "waiting on semaphore for an available thread (available threads:" + ), + }, + { + "level": "DEBUG", + "message": ( + "KlioThreadLimiter(name=LogKlioMessage.process) Released " + "semaphore (available threads:" + ), + }, + { + "level": "DEBUG", + "message": "Loading config file from " + "/usr/local/klio-job-run-effective.yaml.", + }, + {"level": "INFO", "message": "Hello, Klio!"}, + { + "level": "INFO", + "message": "Received element {}".format(klio_msg.data.element), + }, + { + "level": "INFO", + "message": "Received payload {}".format(klio_msg.data.payload), + }, ] @@ -64,6 +81,6 @@ def test_process(klio_msg, expected_log_messages, caplog): assert len(caplog.records) == len(expected_log_messages) for index, record in enumerate(caplog.records): - assert "INFO" == record.levelname - assert expected_log_messages[index] in record.message - + expected_log_message = expected_log_messages[index] + assert expected_log_message["level"] == record.levelname + assert expected_log_message["message"] in record.message diff --git a/lib/setup.cfg b/lib/setup.cfg index e12d92e8..dbe76f09 100644 --- a/lib/setup.cfg +++ b/lib/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.2.4 +current_version = 21.2.0 commit = True tag = True tag_name = lib-{new_version} diff --git a/lib/setup.py b/lib/setup.py index d571a9a5..aacb1b41 100644 --- a/lib/setup.py +++ b/lib/setup.py @@ -154,7 +154,7 @@ def get_long_description(package_dir): # 2.22 added DirectRunner support for `DoFn.setup` "apache-beam[gcp]>2.21.0", "google-api-python-client", - "klio-core>=0.2.1", + "klio-core>=21.2.0", "protobuf", "psutil", "pyyaml", diff --git a/lib/src/klio/__init__.py b/lib/src/klio/__init__.py index aa984020..f5f880f9 100644 --- a/lib/src/klio/__init__.py +++ b/lib/src/klio/__init__.py @@ -14,7 +14,7 @@ # __author__ = "The klio developers" -__version__ = "0.2.4" +__version__ = "21.2.0" __email__ = "opensource+klio@spotify.com" __description__ = "Conventions for Python + Apache Beam " __uri__ = "https://github.com/spotify/klio" diff --git a/lib/src/klio/transforms/core.py b/lib/src/klio/transforms/core.py index c7f03b7b..ad9f482a 100644 --- a/lib/src/klio/transforms/core.py +++ b/lib/src/klio/transforms/core.py @@ -22,6 +22,7 @@ import yaml from klio_core import config +from klio_core.config import core as config_core from klio_core.proto import klio_pb2 from klio.metrics import client as metrics_client @@ -36,19 +37,26 @@ class RunConfig(object): @classmethod def _load_config_from_file(cls): - # [Klio v2] this may get expensive, to always be reading config - # from a file. Can this be replaced by something in memory - # that's also globally accessible? - klio_job_file = "/usr/src/config/.effective-klio-job.yaml" - # for backwards compatibility, and user is using setup.py and we - # have to find it somewhere... - if not os.path.exists(klio_job_file): - # use iterator so we don't waste time searching everywhere upfront - files = glob.iglob("/usr/**/klio-job.yaml", recursive=True) + klio_job_file = None + + if os.path.exists(config_core.WORKER_RUN_EFFECTIVE_CONFIG_PATH): + klio_job_file = config_core.WORKER_RUN_EFFECTIVE_CONFIG_PATH + else: + run_config_path = os.path.join( + "/usr/**", config_core.RUN_EFFECTIVE_CONFIG_FILE + ) + files = glob.iglob(run_config_path, recursive=True) for f in files: klio_job_file = f # only grab the first one break + + if not klio_job_file: + klio_job_file = "/usr/src/config/.effective-klio-job.yaml" + + logger = logging.getLogger("klio") + logger.debug(f"Loading config file from {klio_job_file}.") + with open(klio_job_file, "r") as f: all_config_data = yaml.safe_load(f) return config.KlioConfig(all_config_data) diff --git a/lib/src/klio/utils/_thread_limiter.py b/lib/src/klio/utils/_thread_limiter.py index 68e0b680..813936a2 100644 --- a/lib/src/klio/utils/_thread_limiter.py +++ b/lib/src/klio/utils/_thread_limiter.py @@ -127,7 +127,7 @@ def _log(self, message): if not self._dummy: threads_available = self._semaphore._value suffix_msg = f" (available threads: {threads_available})" - self.logger.info(f"{self} {message}{suffix_msg}") + self.logger.debug(f"{self} {message}{suffix_msg}") def acquire(self): """Acquire a semaphore (a thread). diff --git a/lib/tests/unit/transforms/test_core.py b/lib/tests/unit/transforms/test_core.py index 28e87d22..1bc9521f 100644 --- a/lib/tests/unit/transforms/test_core.py +++ b/lib/tests/unit/transforms/test_core.py @@ -105,16 +105,27 @@ def test_klio_metrics( assert actual_relay.disabled is False -@pytest.mark.parametrize("exists", (True, False)) -def test_load_config_from_file(exists, config_dict, mocker, monkeypatch): - monkeypatch.setattr(os.path, "exists", lambda x: exists) +@pytest.mark.parametrize("usr_local_exists", (True, False)) +@pytest.mark.parametrize("usr_glob_exists", (True, False)) +def test_load_config_from_file( + usr_local_exists, usr_glob_exists, config_dict, mocker, monkeypatch, +): + monkeypatch.setattr(os.path, "exists", lambda x: usr_local_exists) + effective_klio_yaml_file = "/usr/src/app/klio-job-run-effective.yaml" + expected_open_file = "/usr/src/config/.effective-klio-job.yaml" + + if usr_local_exists: + expected_open_file = "/usr/local/klio-job-run-effective.yaml" + elif usr_glob_exists: + expected_open_file = effective_klio_yaml_file - klio_yaml_file = "/usr/src/config/.effective-klio-job.yaml" - if not exists: - klio_yaml_file = "/usr/lib/python/site-packages/klio/klio-job.yaml" - mock_iglob = mocker.Mock() - mock_iglob.return_value = iter([klio_yaml_file]) - monkeypatch.setattr(core_transforms.glob, "iglob", mock_iglob) + mock_iglob = mocker.Mock() + if usr_glob_exists: + mock_iglob.return_value = [effective_klio_yaml_file] + else: + mock_iglob.return_value = [] + + monkeypatch.setattr(core_transforms.glob, "iglob", mock_iglob) open_name = "klio.transforms.core.open" config_str = yaml.dump(config_dict) @@ -123,12 +134,14 @@ def test_load_config_from_file(exists, config_dict, mocker, monkeypatch): klio_config = core_transforms.RunConfig._load_config_from_file() - m.assert_called_once_with(klio_yaml_file, "r") + m.assert_called_once_with(expected_open_file, "r") assert isinstance(klio_config, config.KlioConfig) - if not exists: + if not usr_local_exists: mock_iglob.assert_called_once_with( - "/usr/**/klio-job.yaml", recursive=True + "/usr/**/klio-job-run-effective.yaml", recursive=True ) + else: + mock_iglob.assert_not_called() def test_load_config_from_file_raises(config_dict, mocker, monkeypatch):