From 662f15e441d3950a06d37844e008332d1356fb4e Mon Sep 17 00:00:00 2001 From: mao3267 Date: Fri, 9 Aug 2024 20:07:38 +0900 Subject: [PATCH 01/27] feat: add docker commands in ImageSpec Signed-off-by: mao3267 --- flytekit/image_spec/default_builder.py | 8 ++++++++ flytekit/image_spec/image_spec.py | 17 +++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/flytekit/image_spec/default_builder.py b/flytekit/image_spec/default_builder.py index 4e3275c5d0..ce202c2221 100644 --- a/flytekit/image_spec/default_builder.py +++ b/flytekit/image_spec/default_builder.py @@ -80,6 +80,7 @@ $COPY_COMMAND_RUNTIME RUN $RUN_COMMANDS +$DOCKER_COMMANDS WORKDIR /root SHELL ["/bin/bash", "-c"] @@ -232,6 +233,11 @@ def create_docker_context(image_spec: ImageSpec, tmp_dir: Path): else: run_commands = "" + if image_spec.docker_commands: + docker_commands = "\n".join(image_spec.docker_commands) + else: + docker_commands = "" + docker_content = DOCKER_FILE_TEMPLATE.substitute( PYTHON_VERSION=python_version, UV_PYTHON_INSTALL_COMMAND=uv_python_install_command, @@ -244,6 +250,7 @@ def create_docker_context(image_spec: ImageSpec, tmp_dir: Path): COPY_COMMAND_RUNTIME=copy_command_runtime, ENTRYPOINT=entrypoint, RUN_COMMANDS=run_commands, + DOCKER_COMMANDS=docker_commands, ) dockerfile_path = tmp_dir / "Dockerfile" @@ -272,6 +279,7 @@ class DefaultImageBuilder(ImageSpecBuilder): "pip_index", # "registry_config", "commands", + "docker_commands", } def build_image(self, image_spec: ImageSpec) -> str: diff --git a/flytekit/image_spec/image_spec.py b/flytekit/image_spec/image_spec.py index 7cde9fa70a..10e06fd8c6 100644 --- a/flytekit/image_spec/image_spec.py +++ b/flytekit/image_spec/image_spec.py @@ -49,6 +49,7 @@ class ImageSpec: commands: Command to run during the building process tag_format: Custom string format for image tag. The ImageSpec hash passed in as `spec_hash`. For example, to add a "dev" suffix to the image tag, set `tag_format="{spec_hash}-dev"` + docker_commands: List of docker commands to run during the building process """ name: str = "flytekit" @@ -72,6 +73,7 @@ class ImageSpec: entrypoint: Optional[List[str]] = None commands: Optional[List[str]] = None tag_format: Optional[str] = None + docker_commands: Optional[List[str]] = None def __post_init__(self): self.name = self.name.lower() @@ -210,6 +212,21 @@ def with_apt_packages(self, apt_packages: Union[str, List[str]]) -> "ImageSpec": return new_image_spec + def with_docker_commands(self, docker_commands: Union[str, List[str]]) -> "ImageSpec": + """ + Builder that returns a new image spec with additional list of docker commands that will be executed during the building process. + """ + new_image_spec = copy.deepcopy(self) + if new_image_spec.docker_commands is None: + new_image_spec.docker_commands = [] + + if isinstance(docker_commands, List): + new_image_spec.docker_commands.extend(docker_commands) + else: + new_image_spec.docker_commands.append(docker_commands) + + return new_image_spec + def force_push(self) -> "ImageSpec": """ Builder that returns a new image spec with force push enabled. From 4f670dc6c516754b3c5bce03d50afe899f08a694 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 2 Aug 2024 17:48:17 +0800 Subject: [PATCH 02/27] use private-key (#2645) Signed-off-by: mao3267 --- flytekit/types/structured/snowflake.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/types/structured/snowflake.py b/flytekit/types/structured/snowflake.py index c603b55669..19ac538af2 100644 --- a/flytekit/types/structured/snowflake.py +++ b/flytekit/types/structured/snowflake.py @@ -24,7 +24,7 @@ def get_private_key() -> bytes: from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization - pk_string = flytekit.current_context().secrets.get("private_key", "snowflake", encode_mode="r") + pk_string = flytekit.current_context().secrets.get("private-key", "snowflake", encode_mode="r") # Cryptography needs the string to be stripped and converted to bytes pk_string = pk_string.strip().encode() From b4e7ecec16c09a0f49152613ddf952e1c4cae25a Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 2 Aug 2024 05:47:28 -0700 Subject: [PATCH 03/27] Don't call remote when --help in remote-X (#2642) * don't call remote Signed-off-by: Yee Hing Tong * nit Signed-off-by: Yee Hing Tong --------- Signed-off-by: Yee Hing Tong Signed-off-by: mao3267 --- flytekit/clis/sdk_in_container/run.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index 122a739265..9919d857d3 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -4,6 +4,7 @@ import json import os import pathlib +import sys import tempfile import typing from dataclasses import dataclass, field, fields @@ -741,6 +742,8 @@ def _get_entities(self, r: FlyteRemote, project: str, domain: str, limit: int) - return [] def list_commands(self, ctx): + if "--help" in sys.argv: + return [] if self._entities or ctx.obj is None: return self._entities From a22792f82a497fdf1b9f63e25b8fb6fe554d47c0 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Sat, 3 Aug 2024 10:36:37 -0700 Subject: [PATCH 04/27] Bump grpc receive message size (#2640) Signed-off-by: Yee Hing Tong Signed-off-by: Kevin Su Co-authored-by: Kevin Su Signed-off-by: mao3267 --- .github/workflows/monodocs_build.yml | 7 +++---- flytekit/clients/raw.py | 3 ++- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/monodocs_build.yml b/.github/workflows/monodocs_build.yml index 7585c464fe..9085f7b236 100644 --- a/.github/workflows/monodocs_build.yml +++ b/.github/workflows/monodocs_build.yml @@ -18,8 +18,8 @@ jobs: steps: - name: Fetch flytekit code uses: actions/checkout@v4 - with: - path: "${{ github.workspace }}/flytekit" + - name: 'Clear action cache' + uses: ./.github/actions/clear-action-cache - name: Fetch flyte code uses: actions/checkout@v4 with: @@ -41,7 +41,6 @@ jobs: export SETUPTOOLS_SCM_PRETEND_VERSION="2.0.0" pip install -e ./flyteidl - shell: bash -el {0} - working-directory: ${{ github.workspace }}/flytekit run: | conda activate monodocs-env pip install -e . @@ -54,7 +53,7 @@ jobs: working-directory: ${{ github.workspace }}/flyte shell: bash -el {0} env: - FLYTEKIT_LOCAL_PATH: ${{ github.workspace }}/flytekit + FLYTEKIT_LOCAL_PATH: ${{ github.workspace }} run: | conda activate monodocs-env make -C docs clean html SPHINXOPTS="-W -vvv" diff --git a/flytekit/clients/raw.py b/flytekit/clients/raw.py index b9e35a8290..df643d554d 100644 --- a/flytekit/clients/raw.py +++ b/flytekit/clients/raw.py @@ -48,7 +48,8 @@ def __init__(self, cfg: PlatformConfig, **kwargs): # Set the value here to match the limit in Admin, otherwise the client will cut off and the user gets a # StreamRemoved exception. # https://github.com/flyteorg/flyte/blob/e8588f3a04995a420559327e78c3f95fbf64dc01/flyteadmin/pkg/common/constants.go#L14 - options = (("grpc.max_metadata_size", 32000),) + # 32KB for error messages, 20MB for actual messages. + options = (("grpc.max_metadata_size", 32 * 1024), ("grpc.max_receive_message_length", 20 * 1024 * 1024)) self._cfg = cfg self._channel = wrap_exceptions_channel( cfg, From fbba82355425b9309e14ccac0914fdbb7aa3c54a Mon Sep 17 00:00:00 2001 From: arbaobao Date: Tue, 6 Aug 2024 05:30:11 +0800 Subject: [PATCH 05/27] Raise an exception when filters' value isn't a list. (#2576) * Add an exeception when filters' value isn't a list * Make the exception more specific Signed-off-by: Nelson Chen * add an unit test for value_in Signed-off-by: Nelson Chen * lint Signed-off-by: Kevin Su --------- Signed-off-by: Nelson Chen Signed-off-by: Kevin Su Co-authored-by: Kevin Su Signed-off-by: mao3267 --- flytekit/models/filters.py | 2 ++ tests/flytekit/unit/models/test_filters.py | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/flytekit/models/filters.py b/flytekit/models/filters.py index 2b0cb04d88..5d7bb55104 100644 --- a/flytekit/models/filters.py +++ b/flytekit/models/filters.py @@ -118,6 +118,8 @@ def __init__(self, key, values): :param Text key: The name of the field to compare against :param list[Text] values: A list of textual values to compare. """ + if not isinstance(values, list): + raise TypeError(f"values must be a list. but got {type(values)}") super(SetFilter, self).__init__(key, ";".join(values)) @classmethod diff --git a/tests/flytekit/unit/models/test_filters.py b/tests/flytekit/unit/models/test_filters.py index 7f4f9c9b86..d995eeb805 100644 --- a/tests/flytekit/unit/models/test_filters.py +++ b/tests/flytekit/unit/models/test_filters.py @@ -1,5 +1,5 @@ from flytekit.models import filters - +import pytest def test_eq_filter(): assert filters.Equal("key", "value").to_flyte_idl() == "eq(key,value)" @@ -28,6 +28,10 @@ def test_lte_filter(): def test_value_in_filter(): assert filters.ValueIn("key", ["1", "2", "3"]).to_flyte_idl() == "value_in(key,1;2;3)" +def test_invalid_value_in_filter(): + with pytest.raises(TypeError, match=r"values must be a list. but got .*"): + filters.ValueIn("key", "1") + def test_contains_filter(): assert filters.Contains("key", ["1", "2", "3"]).to_flyte_idl() == "contains(key,1;2;3)" From b9466746d3ce86bbbdbf8088c36a9822d4fa6b43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=B6=AD=E6=84=88?= <115421902+wayner0628@users.noreply.github.com> Date: Tue, 6 Aug 2024 06:27:48 +0800 Subject: [PATCH 06/27] Update error message for TypeTransformerFailedError (#2648) Signed-off-by: wayner0628 Signed-off-by: Kevin Su Co-authored-by: Kevin Su Signed-off-by: mao3267 --- flytekit/core/base_task.py | 4 ++-- flytekit/core/promise.py | 2 +- flytekit/core/type_engine.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 88c2b39c02..58c9392dec 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -292,7 +292,7 @@ def local_execute( except TypeTransformerFailedError as exc: msg = f"Failed to convert inputs of task '{self.name}':\n {exc}" logger.error(msg) - raise TypeError(msg) from exc + raise TypeError(msg) from None input_literal_map = _literal_models.LiteralMap(literals=literals) # if metadata.cache is set, check memoized version @@ -724,7 +724,7 @@ def dispatch_execute( except Exception as exc: msg = f"Failed to convert inputs of task '{self.name}':\n {exc}" logger.error(msg) - raise type(exc)(msg) from exc + raise type(exc)(msg) from None # TODO: Logger should auto inject the current context information to indicate if the task is running within # a workflow or a subworkflow etc diff --git a/flytekit/core/promise.py b/flytekit/core/promise.py index c4f71eb2d6..b976cd56ae 100644 --- a/flytekit/core/promise.py +++ b/flytekit/core/promise.py @@ -94,7 +94,7 @@ def my_wf(in1: int, in2: int) -> int: v = resolve_attr_path_in_promise(v) result[k] = TypeEngine.to_literal(ctx, v, t, var.type) except TypeTransformerFailedError as exc: - raise TypeTransformerFailedError(f"Failed argument '{k}': {exc}") from exc + raise TypeTransformerFailedError(f"Failed argument '{k}': {exc}") from None return result diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index c8bc881791..d66bc8a956 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -1155,7 +1155,7 @@ def literal_map_to_kwargs( try: kwargs[k] = TypeEngine.to_python_value(ctx, lm.literals[k], python_interface_inputs[k]) except TypeTransformerFailedError as exc: - raise TypeTransformerFailedError(f"Error converting input '{k}' at position {i}:\n {exc}") from exc + raise TypeTransformerFailedError(f"Error converting input '{k}' at position {i}:\n {exc}") from None return kwargs @classmethod From ffc030e279db4a200dcfca2dd6b5d32526da0850 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Tue, 6 Aug 2024 09:25:07 +0800 Subject: [PATCH 07/27] [Error Message] Dataclasses Mismatched Type (#2650) * Show different of types in dataclass when transforming error Signed-off-by: Future-Outlier * add tests for dataclass Signed-off-by: Future-Outlier * fix tests Signed-off-by: Future-Outlier --------- Signed-off-by: Future-Outlier Signed-off-by: mao3267 --- flytekit/core/base_task.py | 6 ++++- tests/flytekit/unit/core/test_type_hints.py | 30 ++++++++++++++++++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 58c9392dec..17967f8252 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -636,7 +636,11 @@ def _output_to_literal_map(self, native_outputs: Dict[int, Any], ctx: FlyteConte except Exception as e: # only show the name of output key if it's user-defined (by default Flyte names these as "o") key = k if k != f"o{i}" else i - msg = f"Failed to convert outputs of task '{self.name}' at position {key}:\n {e}" + msg = ( + f"Failed to convert outputs of task '{self.name}' at position {key}.\n" + f"Failed to convert type {type(native_outputs_as_map[expected_output_names[i]])} to type {py_type}.\n" + f"Error Message: {e}." + ) logger.error(msg) raise TypeError(msg) from e # Now check if there is any output metadata associated with this output variable and attach it to the diff --git a/tests/flytekit/unit/core/test_type_hints.py b/tests/flytekit/unit/core/test_type_hints.py index 11a35f2578..0a3501665c 100644 --- a/tests/flytekit/unit/core/test_type_hints.py +++ b/tests/flytekit/unit/core/test_type_hints.py @@ -1568,6 +1568,17 @@ def t2() -> Bar: def test_error_messages(): + @dataclass + class DC1: + a: int + b: str + + @dataclass + class DC2: + a: int + b: str + c: int + @task def foo(a: int, b: str) -> typing.Tuple[int, str]: return 10, "hello" @@ -1580,6 +1591,10 @@ def foo2(a: int, b: str) -> typing.Tuple[int, str]: def foo3(a: typing.Dict) -> typing.Dict: return a + @task + def foo4(input: DC1=DC1(1, 'a')) -> DC2: + return input # type: ignore + # pytest-xdist uses `__channelexec__` as the top-level module running_xdist = os.environ.get("PYTEST_XDIST_WORKER") is not None prefix = "__channelexec__." if running_xdist else "" @@ -1596,9 +1611,9 @@ def foo3(a: typing.Dict) -> typing.Dict: with pytest.raises( TypeError, match=( - f"Failed to convert outputs of task '{prefix}tests.flytekit.unit.core.test_type_hints.foo2' " - "at position 0:\n" - " Expected value of type but got 'hello' of type " + f"Failed to convert outputs of task '{prefix}tests.flytekit.unit.core.test_type_hints.foo2' at position 0.\n" + f"Failed to convert type to type .\n" + "Error Message: Expected value of type but got 'hello' of type ." ), ): foo2(a=10, b="hello") @@ -1610,6 +1625,15 @@ def foo3(a: typing.Dict) -> typing.Dict: ): foo3(a=[{"hello": 2}]) + with pytest.raises( + TypeError, + match=( + f"Failed to convert outputs of task '{prefix}tests.flytekit.unit.core.test_type_hints.foo4' at position 0.\n" + f"Failed to convert type .DC1'> to type .DC2'>.\n" + "Error Message: 'DC1' object has no attribute 'c'." + ), + ): + foo4() def test_failure_node(): @task From ac8563d2e2fb21ef12e014797ff60ef5b16d958d Mon Sep 17 00:00:00 2001 From: pryce-turner <31577879+pryce-turner@users.noreply.github.com> Date: Tue, 6 Aug 2024 04:54:16 -0700 Subject: [PATCH 08/27] Added warning for command list and shell true (#2653) Signed-off-by: mao3267 --- flytekit/extras/tasks/shell.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/flytekit/extras/tasks/shell.py b/flytekit/extras/tasks/shell.py index ef9cd0c0e1..ec728feeee 100644 --- a/flytekit/extras/tasks/shell.py +++ b/flytekit/extras/tasks/shell.py @@ -76,17 +76,26 @@ def subproc_execute(command: typing.Union[List[str], str], **kwargs) -> ProcessR kwargs = {**defaults, **kwargs} - try: - # Execute the command and capture stdout and stderr - result = subprocess.run(command, **kwargs) - print(result.check_returncode()) - - if "|" in command and kwargs.get("shell"): + if kwargs.get("shell"): + if "|" in command: logger.warning( """Found a pipe in the command and shell=True. This can lead to silent failures if subsequent commands succeed despite previous failures.""" ) + if type(command) == list: + logger.warning( + """Found `command` formatted as a list instead of a string with shell=True. + With this configuration, the first member of the list will be + executed and the remaining arguments will be passed as arguments + to the shell instead of to the binary being called. This may not + be intended behavior and may lead to confusing failures.""" + ) + + try: + # Execute the command and capture stdout and stderr + result = subprocess.run(command, **kwargs) + result.check_returncode() # Access the stdout and stderr output return ProcessResult(result.returncode, result.stdout, result.stderr) From b59346ab143bef8ca0b47744e4b1974a04d98a91 Mon Sep 17 00:00:00 2001 From: redartera <120470035+redartera@users.noreply.github.com> Date: Tue, 6 Aug 2024 10:08:19 -0400 Subject: [PATCH 09/27] In `FlyteRemote.upload_file`, pass the file object directly rather than the entire bytes buffer (#2641) * pass the local file directly for streaming in FlyteRemote.upload_file Signed-off-by: Reda Oulbacha * ruff format Signed-off-by: Reda Oulbacha * add an integration test Signed-off-by: Reda Oulbacha * remove unnecessary len Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> * redo registration in the integration test Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> * fix misspel Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> * run the integration test serially Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> * disable agent Signed-off-by: Kevin Su * use os.stat instead of os.seek to determine content_length Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> * rewrite tests only uploda a file, use a separate marker Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> * parametrize integration test makefile cmd Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> * add workflow_dispatch for debugging Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> * replace trigger with push Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> * remove trailing whitespaces Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> * remove agent disabling Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> * remove trailing debug CI trigger Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> * clean up botocore imports Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> --------- Signed-off-by: Reda Oulbacha Signed-off-by: redartera <120470035+redartera@users.noreply.github.com> Signed-off-by: Kevin Su Co-authored-by: Kevin Su Signed-off-by: mao3267 --- .github/workflows/pythonbuild.yml | 3 +- Makefile | 10 +- flytekit/remote/remote.py | 8 +- pyproject.toml | 1 + .../integration/remote/test_remote.py | 95 +++++++++++++++++++ 5 files changed, 111 insertions(+), 6 deletions(-) diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index c973aee3e2..b8757cc41e 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -244,6 +244,7 @@ jobs: matrix: os: [ubuntu-latest] python-version: ${{fromJson(needs.detect-python-versions.outputs.python-versions)}} + makefile-cmd: [integration_test_codecov, integration_test_lftransfers_codecov] steps: # As described in https://github.com/pypa/setuptools_scm/issues/414, SCM needs git history # and tags to work. @@ -297,7 +298,7 @@ jobs: FLYTEKIT_CI: 1 PYTEST_OPTS: -n2 run: | - make integration_test_codecov + make ${{ matrix.makefile-cmd }} - name: Codecov uses: codecov/codecov-action@v3.1.0 with: diff --git a/Makefile b/Makefile index 42758101fd..0ff0246f72 100644 --- a/Makefile +++ b/Makefile @@ -95,7 +95,15 @@ integration_test_codecov: .PHONY: integration_test integration_test: - $(PYTEST_AND_OPTS) tests/flytekit/integration ${CODECOV_OPTS} + $(PYTEST_AND_OPTS) tests/flytekit/integration ${CODECOV_OPTS} -m "not lftransfers" + +.PHONY: integration_test_lftransfers_codecov +integration_test_lftransfers_codecov: + $(MAKE) CODECOV_OPTS="--cov=./ --cov-report=xml --cov-append" integration_test_lftransfers + +.PHONY: integration_test_lftransfers +integration_test_lftransfers: + $(PYTEST) tests/flytekit/integration ${CODECOV_OPTS} -m "lftransfers" doc-requirements.txt: export CUSTOM_COMPILE_COMMAND := make doc-requirements.txt doc-requirements.txt: doc-requirements.in install-piptools diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 1406e6a560..a1e359b4b8 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -901,14 +901,14 @@ def upload_file( extra_headers = self.get_extra_headers_for_protocol(upload_location.native_url) extra_headers.update(upload_location.headers) encoded_md5 = b64encode(md5_bytes) - with open(str(to_upload), "+rb") as local_file: - content = local_file.read() - content_length = len(content) + local_file_path = str(to_upload) + content_length = os.stat(local_file_path).st_size + with open(local_file_path, "+rb") as local_file: headers = {"Content-Length": str(content_length), "Content-MD5": encoded_md5} headers.update(extra_headers) rsp = requests.put( upload_location.signed_url, - data=content, + data=local_file, # NOTE: We pass the file object directly to stream our upload. headers=headers, verify=False if self._config.platform.insecure_skip_verify is True diff --git a/pyproject.toml b/pyproject.toml index 8fa40f8d26..2c3b7a658c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,6 +97,7 @@ markers = [ "sandbox_test: fake integration tests", # unit tests that are really integration tests that run on a sandbox environment "serial: tests to avoid using with pytest-xdist", "hypothesis: tests that use they hypothesis library", + "lftransfers: integration tests which involve large file transfers" ] [tool.coverage.report] diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 7fbc8b90a6..7e0661f808 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -1,12 +1,18 @@ +import botocore.session +from contextlib import ExitStack, contextmanager import datetime +import hashlib import json import os import pathlib import subprocess +import tempfile import time import typing import joblib +from urllib.parse import urlparse +import uuid import pytest from flytekit import LaunchPlan, kwtypes @@ -483,3 +489,92 @@ def test_execute_workflow_with_maptask(register): wait=True, ) assert execution.outputs["o0"] == [4, 5, 6] + +@pytest.mark.lftransfers +class TestLargeFileTransfers: + """A class to capture tests and helper functions for large file transfers.""" + + @staticmethod + def _get_minio_s3_client(remote): + minio_s3_config = remote.file_access.data_config.s3 + sess = botocore.session.get_session() + return sess.create_client( + "s3", + endpoint_url=minio_s3_config.endpoint, + aws_access_key_id=minio_s3_config.access_key_id, + aws_secret_access_key=minio_s3_config.secret_access_key, + ) + + @staticmethod + def _get_s3_file_md5_bytes(s3_client, bucket, key): + md5_hash = hashlib.md5() + response = s3_client.get_object(Bucket=bucket, Key=key) + body = response['Body'] + # Read the object in chunks and update the hash (this keeps memory usage low) + for chunk in iter(lambda: body.read(4096), b''): + md5_hash.update(chunk) + return md5_hash.digest() + + @staticmethod + def _delete_s3_file(s3_client, bucket, key): + # Delete the object + response = s3_client.delete_object(Bucket=bucket, Key=key) + # Ensure the object was deleted - for 'delete_object' 204 is the expected successful response code + assert response["ResponseMetadata"]["HTTPStatusCode"] == 204 + + @staticmethod + @contextmanager + def _ephemeral_minio_project_domain_filename_root(s3_client, project, domain): + """An ephemeral minio S3 path which is wiped upon the context manager's exit""" + # Generate a random path in our Minio s3 bucket, under /PROJECT/DOMAIN/ + buckets = s3_client.list_buckets()["Buckets"] + assert len(buckets) == 1 # We expect just the default sandbox bucket + bucket = buckets[0]["Name"] + root = str(uuid.uuid4()) + key = f"{PROJECT}/{DOMAIN}/{root}/" + yield ((bucket, key), root) + # Teardown everything under bucket/key + response = s3_client.list_objects_v2(Bucket=bucket, Prefix=key) + if "Contents" in response: + for obj in response["Contents"]: + TestLargeFileTransfers._delete_s3_file(s3_client, bucket, obj["Key"]) + + + @staticmethod + @pytest.mark.parametrize("gigabytes", [2, 3]) + def test_flyteremote_uploads_large_file(gigabytes): + """This test checks whether FlyteRemote can upload large files.""" + remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) + minio_s3_client = TestLargeFileTransfers._get_minio_s3_client(remote) + with ExitStack() as stack: + # Step 1 - Create a large local file + tempdir = stack.enter_context(tempfile.TemporaryDirectory()) + file_path = pathlib.Path(tempdir) / "large_file" + + with open(file_path, "wb") as f: + # Write in chunks of 500mb to keep memory usage low during tests + for _ in range(gigabytes * 2): + f.write(os.urandom(int(1e9 // 2))) + + # Step 2 - Create an ephemeral S3 storage location. This will be wiped + # on context exit to not overload the sandbox's storage + _, ephemeral_filename_root = stack.enter_context( + TestLargeFileTransfers._ephemeral_minio_project_domain_filename_root( + minio_s3_client, + PROJECT, + DOMAIN + ) + ) + + # Step 3 - Upload our large file and check whether the uploaded file's md5 checksum matches our local file's + md5_bytes, upload_location = remote.upload_file( + to_upload=file_path, + project=PROJECT, + domain=DOMAIN, + filename_root=ephemeral_filename_root + ) + + url = urlparse(upload_location) + bucket, key = url.netloc, url.path.lstrip("/") + s3_md5_bytes = TestLargeFileTransfers._get_s3_file_md5_bytes(minio_s3_client, bucket, key) + assert s3_md5_bytes == md5_bytes From f11f3975fc1ba2cc2c61fbbf17649b8c9140be47 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Tue, 6 Aug 2024 07:08:37 -0700 Subject: [PATCH 10/27] Modify test_array_node.py to support running in python 3.8 (#2652) Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario Signed-off-by: mao3267 --- tests/flytekit/unit/core/test_array_node.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/flytekit/unit/core/test_array_node.py b/tests/flytekit/unit/core/test_array_node.py index f7704d4afd..9a788daf6a 100644 --- a/tests/flytekit/unit/core/test_array_node.py +++ b/tests/flytekit/unit/core/test_array_node.py @@ -37,7 +37,7 @@ def parent_wf(a: int, b: int) -> int: @workflow -def grandparent_wf() -> list[int]: +def grandparent_wf() -> typing.List[int]: return array_node(lp, concurrency=10, min_success_ratio=0.9)(a=[1, 3, 5], b=[2, 4, 6]) @@ -86,7 +86,7 @@ def ex_wf(val: int) -> int: ex_lp = LaunchPlan.get_default_launch_plan(current_context(), ex_wf) @workflow - def grandparent_ex_wf() -> list[typing.Optional[int]]: + def grandparent_ex_wf() -> typing.List[typing.Optional[int]]: return array_node(ex_lp, min_successes=min_successes, min_success_ratio=min_success_ratio)(val=[1, 2, 3, 4]) if should_raise_error: From 901fe4fd4cf81f46ae06344bca93f89a578ec4df Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Tue, 6 Aug 2024 07:09:08 -0700 Subject: [PATCH 11/27] Handle common cases of mutable default arguments explicitly (#2651) Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario Signed-off-by: mao3267 --- flytekit/core/promise.py | 11 ++++++++--- tests/flytekit/unit/core/test_serialization.py | 16 +++++++++++++--- .../test_structured_dataset.py | 18 ++++++++++++++---- 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/flytekit/core/promise.py b/flytekit/core/promise.py index b976cd56ae..40f51f5bf8 100644 --- a/flytekit/core/promise.py +++ b/flytekit/core/promise.py @@ -5,7 +5,7 @@ import typing from copy import deepcopy from enum import Enum -from typing import Any, Coroutine, Dict, Hashable, List, Optional, Set, Tuple, Union, cast, get_args +from typing import Any, Coroutine, Dict, List, Optional, Set, Tuple, Union, cast, get_args from google.protobuf import struct_pb2 as _struct from typing_extensions import Protocol @@ -1116,8 +1116,13 @@ def create_and_link_node( or UnionTransformer.is_optional_type(interface.inputs_with_defaults[k][0]) ): default_val = interface.inputs_with_defaults[k][1] - if not isinstance(default_val, Hashable): - raise _user_exceptions.FlyteAssertion("Cannot use non-hashable object as default argument") + # Common cases of mutable default arguments, as described in https://www.pullrequest.com/blog/python-pitfalls-the-perils-of-using-lists-and-dicts-as-default-arguments/ + # or https://florimond.dev/en/posts/2018/08/python-mutable-defaults-are-the-source-of-all-evil, are not supported. + # As of 2024-08-05, Python native sets are not supported in Flytekit. However, they are included here for completeness. + if isinstance(default_val, list) or isinstance(default_val, dict) or isinstance(default_val, set): + raise _user_exceptions.FlyteAssertion( + f"Argument {k} for function {entity.name} is a mutable default argument, which is a python anti-pattern and not supported in flytekit tasks" + ) kwargs[k] = default_val else: error_msg = f"Input {k} of type {interface.inputs[k]} was not specified for function {entity.name}" diff --git a/tests/flytekit/unit/core/test_serialization.py b/tests/flytekit/unit/core/test_serialization.py index 44dc404a4f..f995997155 100644 --- a/tests/flytekit/unit/core/test_serialization.py +++ b/tests/flytekit/unit/core/test_serialization.py @@ -1,3 +1,4 @@ +import re import os import typing from collections import OrderedDict @@ -775,7 +776,10 @@ def wf_no_input() -> typing.List[int]: def wf_with_input() -> typing.List[int]: return t1(a=input_val) - with pytest.raises(FlyteAssertion, match="Cannot use non-hashable object as default argument"): + with pytest.raises( + FlyteAssertion, + match=r"Argument a for function .*test_serialization\.t1 is a mutable default argument, which is a python anti-pattern and not supported in flytekit tasks" + ): get_serializable(OrderedDict(), serialization_settings, wf_no_input) wf_with_input_spec = get_serializable(OrderedDict(), serialization_settings, wf_with_input) @@ -810,7 +814,10 @@ def wf_no_input() -> typing.Dict[str, int]: def wf_with_input() -> typing.Dict[str, int]: return t1(a=input_val) - with pytest.raises(FlyteAssertion, match="Cannot use non-hashable object as default argument"): + with pytest.raises( + FlyteAssertion, + match=r"Argument a for function .*test_serialization\.t1 is a mutable default argument, which is a python anti-pattern and not supported in flytekit tasks" + ): get_serializable(OrderedDict(), serialization_settings, wf_no_input) wf_with_input_spec = get_serializable(OrderedDict(), serialization_settings, wf_with_input) @@ -910,7 +917,10 @@ def wf_no_input() -> typing.Optional[typing.List[int]]: def wf_with_input() -> typing.Optional[typing.List[int]]: return t1(a=input_val) - with pytest.raises(FlyteAssertion, match="Cannot use non-hashable object as default argument"): + with pytest.raises( + FlyteAssertion, + match=r"Argument a for function .*test_serialization\.t1 is a mutable default argument, which is a python anti-pattern and not supported in flytekit tasks" + ): get_serializable(OrderedDict(), serialization_settings, wf_no_input) wf_with_input_spec = get_serializable(OrderedDict(), serialization_settings, wf_with_input) diff --git a/tests/flytekit/unit/types/structured_dataset/test_structured_dataset.py b/tests/flytekit/unit/types/structured_dataset/test_structured_dataset.py index 9e29416523..f107384b96 100644 --- a/tests/flytekit/unit/types/structured_dataset/test_structured_dataset.py +++ b/tests/flytekit/unit/types/structured_dataset/test_structured_dataset.py @@ -543,10 +543,11 @@ def test_reregister_encoder(): def test_default_args_task(): + default_val = pd.DataFrame({"name": ["Aegon"], "age": [27]}) input_val = generate_pandas() @task - def t1(a: pd.DataFrame = pd.DataFrame()) -> pd.DataFrame: + def t1(a: pd.DataFrame = default_val) -> pd.DataFrame: return a @workflow @@ -557,11 +558,16 @@ def wf_no_input() -> pd.DataFrame: def wf_with_input() -> pd.DataFrame: return t1(a=input_val) - with pytest.raises(FlyteAssertion, match="Cannot use non-hashable object as default argument"): - get_serializable(OrderedDict(), serialization_settings, wf_no_input) - + wf_no_input_spec = get_serializable(OrderedDict(), serialization_settings, wf_no_input) wf_with_input_spec = get_serializable(OrderedDict(), serialization_settings, wf_with_input) + assert wf_no_input_spec.template.nodes[0].inputs[ + 0 + ].binding.value.structured_dataset.metadata == StructuredDatasetMetadata( + structured_dataset_type=StructuredDatasetType( + format="parquet", + ), + ) assert wf_with_input_spec.template.nodes[0].inputs[ 0 ].binding.value.structured_dataset.metadata == StructuredDatasetMetadata( @@ -570,8 +576,12 @@ def wf_with_input() -> pd.DataFrame: ), ) + assert wf_no_input_spec.template.interface.outputs["o0"].type == LiteralType( + structured_dataset_type=StructuredDatasetType() + ) assert wf_with_input_spec.template.interface.outputs["o0"].type == LiteralType( structured_dataset_type=StructuredDatasetType() ) + pd.testing.assert_frame_equal(wf_no_input(), default_val) pd.testing.assert_frame_equal(wf_with_input(), input_val) From 9e0dbf5127a77f1fab0500d7057d82a0913b3824 Mon Sep 17 00:00:00 2001 From: demmerichs Date: Tue, 6 Aug 2024 20:49:51 +0200 Subject: [PATCH 12/27] Allow a hash method to be present for numpy arrays (#2649) Signed-off-by: Yee Hing Tong Signed-off-by: mao3267 --- flytekit/types/numpy/ndarray.py | 49 ++++++++++++++----- .../flytekit/unit/types/numpy/test_ndarray.py | 46 +++++++++++++++-- 2 files changed, 79 insertions(+), 16 deletions(-) diff --git a/flytekit/types/numpy/ndarray.py b/flytekit/types/numpy/ndarray.py index 3455ea8267..1ca25bde11 100644 --- a/flytekit/types/numpy/ndarray.py +++ b/flytekit/types/numpy/ndarray.py @@ -7,20 +7,37 @@ from typing_extensions import Annotated, get_args, get_origin from flytekit.core.context_manager import FlyteContext -from flytekit.core.type_engine import TypeEngine, TypeTransformer, TypeTransformerFailedError +from flytekit.core.hash import HashMethod +from flytekit.core.type_engine import ( + TypeEngine, + TypeTransformer, + TypeTransformerFailedError, +) from flytekit.models.core import types as _core_types from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar from flytekit.models.types import LiteralType def extract_metadata(t: Type[np.ndarray]) -> Tuple[Type[np.ndarray], Dict[str, bool]]: - metadata = {} + metadata: dict = {} + metadata_set = False + if get_origin(t) is Annotated: - base_type, metadata = get_args(t) - if isinstance(metadata, OrderedDict): - return base_type, metadata - else: - raise TypeTransformerFailedError(f"{t}'s metadata needs to be of type kwtypes.") + base_type, *annotate_args = get_args(t) + + for aa in annotate_args: + if isinstance(aa, OrderedDict): + if metadata_set: + raise TypeTransformerFailedError(f"Metadata {metadata} is already specified, cannot use {aa}.") + metadata = aa + metadata_set = True + elif isinstance(aa, HashMethod): + continue + else: + raise TypeTransformerFailedError(f"The metadata for {t} must be of type kwtypes or HashMethod.") + return base_type, metadata + + # Return the type itself if no metadata was found. return t, metadata @@ -37,18 +54,24 @@ def __init__(self): def get_literal_type(self, t: Type[np.ndarray]) -> LiteralType: return LiteralType( blob=_core_types.BlobType( - format=self.NUMPY_ARRAY_FORMAT, dimensionality=_core_types.BlobType.BlobDimensionality.SINGLE + format=self.NUMPY_ARRAY_FORMAT, + dimensionality=_core_types.BlobType.BlobDimensionality.SINGLE, ) ) def to_literal( - self, ctx: FlyteContext, python_val: np.ndarray, python_type: Type[np.ndarray], expected: LiteralType + self, + ctx: FlyteContext, + python_val: np.ndarray, + python_type: Type[np.ndarray], + expected: LiteralType, ) -> Literal: python_type, metadata = extract_metadata(python_type) meta = BlobMetadata( type=_core_types.BlobType( - format=self.NUMPY_ARRAY_FORMAT, dimensionality=_core_types.BlobType.BlobDimensionality.SINGLE + format=self.NUMPY_ARRAY_FORMAT, + dimensionality=_core_types.BlobType.BlobDimensionality.SINGLE, ) ) @@ -56,7 +79,11 @@ def to_literal( pathlib.Path(local_path).parent.mkdir(parents=True, exist_ok=True) # save numpy array to file - np.save(file=local_path, arr=python_val, allow_pickle=metadata.get("allow_pickle", False)) + np.save( + file=local_path, + arr=python_val, + allow_pickle=metadata.get("allow_pickle", False), + ) remote_path = ctx.file_access.put_raw_data(local_path) return Literal(scalar=Scalar(blob=Blob(metadata=meta, uri=remote_path))) diff --git a/tests/flytekit/unit/types/numpy/test_ndarray.py b/tests/flytekit/unit/types/numpy/test_ndarray.py index d53979f2a9..22571d992a 100644 --- a/tests/flytekit/unit/types/numpy/test_ndarray.py +++ b/tests/flytekit/unit/types/numpy/test_ndarray.py @@ -1,7 +1,9 @@ +import pytest import numpy as np from typing_extensions import Annotated -from flytekit import kwtypes, task, workflow +from flytekit import HashMethod, kwtypes, task, workflow +from flytekit.core.type_engine import TypeTransformerFailedError @task @@ -63,6 +65,35 @@ def t4(array: Annotated[np.ndarray, kwtypes(allow_pickle=True)]) -> int: return array.size +def dummy_hash_array(arr: np.ndarray) -> str: + return "dummy" + + +@task +def t5_annotate_kwtypes_and_hash( + array: Annotated[ + np.ndarray, kwtypes(allow_pickle=True), HashMethod(dummy_hash_array) + ], +): + pass + + +@task +def t6_annotate_kwtypes_twice( + array: Annotated[ + np.ndarray, kwtypes(allow_pickle=True), kwtypes(allow_pickle=False) + ], +): + pass + + +@task +def t7_annotate_with_sth_strange( + array: Annotated[np.ndarray, (1, 2, 3)], +): + pass + + @workflow def wf(): array_1d = generate_numpy_1d() @@ -72,10 +103,15 @@ def wf(): t2(array=array_2d) t3(array=array_1d) t4(array=array_dtype_object) - try: - generate_numpy_fails() - except Exception as e: - assert isinstance(e, TypeError) + t5_annotate_kwtypes_and_hash(array=array_1d) + + if array_1d.is_ready: + with pytest.raises(TypeTransformerFailedError, match=r"Metadata OrderedDict.*'allow_pickle'.*True.* is already specified, cannot use OrderedDict.*'allow_pickle'.*False.*\."): + t6_annotate_kwtypes_twice(array=array_1d) + with pytest.raises(TypeTransformerFailedError, match=r"The metadata for typing.Annotated.*numpy\.ndarray.*1, 2, 3.* must be of type kwtypes or HashMethod\."): + t7_annotate_with_sth_strange(array=array_1d) + with pytest.raises(TypeError, match=r"The metadata for typing.Annotated.*numpy\.ndarray.*'allow_pickle'.*True.* must be of type kwtypes or HashMethod\."): + generate_numpy_fails() @workflow From 140b58fc3d4a032b9bc045a3da5f35e2815b94e5 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 7 Aug 2024 15:45:32 -0700 Subject: [PATCH 13/27] return exceptions when gathering (#2657) * return exceptions when gathering Signed-off-by: Yee Hing Tong * pr comment Signed-off-by: Yee Hing Tong --------- Signed-off-by: Yee Hing Tong Signed-off-by: mao3267 --- flytekit/remote/remote.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index a1e359b4b8..76d16457b9 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -658,7 +658,7 @@ def raw_register( raise RegistrationSkipped(f"Remote task/Workflow {cp_entity.name} is not registrable.") else: logger.debug(f"Skipping registration of remote entity: {cp_entity.name}") - raise RegistrationSkipped(f"Remote task/Workflow {cp_entity.name} is not registrable.") + raise RegistrationSkipped(f"Remote entity {cp_entity.name} is not registrable.") if isinstance( cp_entity, @@ -768,13 +768,23 @@ async def _serialize_and_register( functools.partial(self.raw_register, cp_entity, serialization_settings, version, og_entity=entity), ) ) - ident = [] - ident.extend(await asyncio.gather(*tasks)) + + identifiers_or_exceptions = [] + identifiers_or_exceptions.extend(await asyncio.gather(*tasks, return_exceptions=True)) + # Check to make sure any exceptions are just registration skipped exceptions + for ie in identifiers_or_exceptions: + if isinstance(ie, RegistrationSkipped): + logger.info(f"Skipping registration... {ie}") + continue + if isinstance(ie, Exception): + raise ie # serial register cp_other_entities = OrderedDict(filter(lambda x: not isinstance(x[1], task_models.TaskSpec), m.items())) for entity, cp_entity in cp_other_entities.items(): - ident.append(self.raw_register(cp_entity, serialization_settings, version, og_entity=entity)) - return ident[-1] + identifiers_or_exceptions.append( + self.raw_register(cp_entity, serialization_settings, version, og_entity=entity) + ) + return identifiers_or_exceptions[-1] def register_task( self, From 78a6219a8bae276990525eb67cf62fb63de00bf4 Mon Sep 17 00:00:00 2001 From: Peeter Piegaze <1153481+ppiegaze@users.noreply.github.com> Date: Thu, 8 Aug 2024 13:12:45 +0200 Subject: [PATCH 14/27] Correct FlyteFile docstring (#2658) Signed-off-by: mao3267 --- flytekit/types/file/file.py | 34 +++++++++++----------------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/flytekit/types/file/file.py b/flytekit/types/file/file.py index 087cad6b5e..ca1dccb927 100644 --- a/flytekit/types/file/file.py +++ b/flytekit/types/file/file.py @@ -309,38 +309,26 @@ def open( cache_type: typing.Optional[str] = None, cache_options: typing.Optional[typing.Dict[str, typing.Any]] = None, ): - """ - Returns a streaming File handle + """Returns a streaming File handle .. code-block:: python @task def copy_file(ff: FlyteFile) -> FlyteFile: - new_file = FlyteFile.new_remote_file(ff.name) - with ff.open("rb", cache_type="readahead", cache={}) as r: + new_file = FlyteFile.new_remote_file() + with ff.open("rb", cache_type="readahead") as r: with new_file.open("wb") as w: w.write(r.read()) return new_file - Alternatively, - - .. code-block:: python - - @task - def copy_file(ff: FlyteFile) -> FlyteFile: - new_file = FlyteFile.new_remote_file(ff.name) - with fsspec.open(f"readahead::{ff.remote_path}", "rb", readahead={}) as r: - with new_file.open("wb") as w: - w.write(r.read()) - return new_file - - - :param mode: str Open mode like 'rb', 'rt', 'wb', ... - :param cache_type: optional str Specify if caching is to be used. Cache protocol can be ones supported by - fsspec https://filesystem-spec.readthedocs.io/en/latest/api.html#readbuffering, - especially useful for large file reads - :param cache_options: optional Dict[str, Any] Refer to fsspec caching options. This is strongly coupled to the - cache_protocol + :param mode: Open mode. For example: 'r', 'w', 'rb', 'rt', 'wb', etc. + :type mode: str + :param cache_type: Specifies the cache type. Possible values are "blockcache", "bytes", "mmap", "readahead", "first", or "background". + This is especially useful for large file reads. See https://filesystem-spec.readthedocs.io/en/latest/api.html#readbuffering. + :type cache_type: str, optional + :param cache_options: A Dict corresponding to the parameters for the chosen cache_type. + Refer to fsspec caching options above. + :type cache_options: Dict[str, Any], optional """ ctx = FlyteContextManager.current_context() final_path = self.path From 8d229d9ba61190e9da401d05f7cd881babfd7206 Mon Sep 17 00:00:00 2001 From: "Thomas J. Fan" Date: Thu, 8 Aug 2024 14:18:35 -0400 Subject: [PATCH 15/27] Remove pip cache after install (#2662) Signed-off-by: Thomas J. Fan Signed-off-by: mao3267 --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index cd72eed846..2f7429c4ec 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,6 +27,7 @@ RUN apt-get update && apt-get install build-essential -y \ && apt-get clean autoclean \ && apt-get autoremove --yes \ && rm -rf /var/lib/{apt,dpkg,cache,log}/ \ + && rm -rf /root/.cache/pip \ && useradd -u 1000 flytekit \ && chown flytekit: /root \ && chown flytekit: /home \ From 59b59327491e84c2b64a25e41e919754b49444ba Mon Sep 17 00:00:00 2001 From: "Thomas J. Fan" Date: Thu, 8 Aug 2024 14:25:17 -0400 Subject: [PATCH 16/27] Adds validation to image_spec for list of strings (#2655) Signed-off-by: Thomas J. Fan Signed-off-by: mao3267 --- flytekit/image_spec/image_spec.py | 17 +++++++++++++++++ .../unit/core/image_spec/test_image_spec.py | 16 ++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/flytekit/image_spec/image_spec.py b/flytekit/image_spec/image_spec.py index 10e06fd8c6..f0dd321a65 100644 --- a/flytekit/image_spec/image_spec.py +++ b/flytekit/image_spec/image_spec.py @@ -81,6 +81,23 @@ def __post_init__(self): if self.registry: self.registry = self.registry.lower() + parameters_str_list = [ + "packages", + "conda_channels", + "conda_packages", + "apt_packages", + "pip_extra_index_url", + "entrypoint", + "commands", + ] + for parameter in parameters_str_list: + attr = getattr(self, parameter) + parameter_is_None = attr is None + parameter_is_list_string = isinstance(attr, list) and all(isinstance(v, str) for v in attr) + if not (parameter_is_None or parameter_is_list_string): + error_msg = f"{parameter} must be a list of strings or None" + raise ValueError(error_msg) + def image_name(self) -> str: """Full image name with tag.""" image_name = self._image_name() diff --git a/tests/flytekit/unit/core/image_spec/test_image_spec.py b/tests/flytekit/unit/core/image_spec/test_image_spec.py index c1e52953bb..fa63f08993 100644 --- a/tests/flytekit/unit/core/image_spec/test_image_spec.py +++ b/tests/flytekit/unit/core/image_spec/test_image_spec.py @@ -138,3 +138,19 @@ def test_no_build_during_execution(): ImageBuildEngine.build(spec) ImageBuildEngine._build_image.assert_not_called() + + +@pytest.mark.parametrize( + "parameter_name", [ + "packages", "conda_channels", "conda_packages", + "apt_packages", "pip_extra_index_url", "entrypoint", "commands" + ] +) +@pytest.mark.parametrize("value", ["requirements.txt", [1, 2, 3]]) +def test_image_spec_validation_string_list(parameter_name, value): + msg = f"{parameter_name} must be a list of strings or None" + + input_params = {parameter_name: value} + + with pytest.raises(ValueError, match=msg): + ImageSpec(**input_params) From be3b145ece01e11ab8f66c6a4386d57d46000842 Mon Sep 17 00:00:00 2001 From: Chen Zhu Date: Thu, 8 Aug 2024 11:31:49 -0700 Subject: [PATCH 17/27] Make elastic timeout configurable for HorovovJob. (#2631) Signed-off-by: Chen Zhu Signed-off-by: Chen Zhu Signed-off-by: mao3267 --- plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py | 4 ++++ plugins/flytekit-kf-mpi/tests/test_mpi_task.py | 3 +++ 2 files changed, 7 insertions(+) diff --git a/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py b/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py index 7c8416d007..a6a6ef3647 100644 --- a/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py +++ b/plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py @@ -233,6 +233,7 @@ class HorovodJob(object): verbose: Optional flag indicating whether to enable verbose logging (default: False). log_level: Optional string specifying the log level (default: "INFO"). discovery_script_path: Path to the discovery script used for host discovery (default: "/etc/mpi/discover_hosts.sh"). + elastic_timeout: horovod elastic timeout in second (default: 1200). num_launcher_replicas: [DEPRECATED] The number of launcher server replicas to use. This argument is deprecated. Please use launcher.replicas instead. num_workers: [DEPRECATED] The number of worker replicas to spawn in the cluster for this job. Please use worker.replicas instead. """ @@ -244,6 +245,7 @@ class HorovodJob(object): verbose: Optional[bool] = False log_level: Optional[str] = "INFO" discovery_script_path: Optional[str] = "/etc/mpi/discover_hosts.sh" + elastic_timeout: Optional[int] = 1200 # Support v0 config for backwards compatibility num_launcher_replicas: Optional[int] = None num_workers: Optional[int] = None @@ -287,6 +289,8 @@ def _get_horovod_prefix(self) -> List[str]: f"{self.task_config.slots}", "--host-discovery-script", self.task_config.discovery_script_path, + "--elastic-timeout", + f"{self.task_config.elastic_timeout}", ] if self.task_config.verbose: base_cmd.append("--verbose") diff --git a/plugins/flytekit-kf-mpi/tests/test_mpi_task.py b/plugins/flytekit-kf-mpi/tests/test_mpi_task.py index deec3ff385..36758bfb6f 100644 --- a/plugins/flytekit-kf-mpi/tests/test_mpi_task.py +++ b/plugins/flytekit-kf-mpi/tests/test_mpi_task.py @@ -167,6 +167,7 @@ def test_horovod_task(serialization_settings): slots=2, verbose=False, log_level="INFO", + elastic_timeout=200, run_policy=RunPolicy( clean_pod_policy=CleanPodPolicy.NONE, backoff_limit=5, @@ -182,6 +183,8 @@ def my_horovod_task(): ... assert "--verbose" not in cmd assert "--log-level" in cmd assert "INFO" in cmd + assert "--elastic-timeout" in cmd + assert "200" in cmd # CleanPodPolicy.NONE is the default, so it should not be in the output dictionary expected_dict = { "launcherReplicas": { From 9ea6b23bf6961ff3288ee9cbe7077f8ca2e490ed Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Thu, 8 Aug 2024 11:45:16 -0700 Subject: [PATCH 18/27] Fix overriding of loader_args task resolver in papermill plugin (#2660) * Add repro test case Signed-off-by: Eduardo Apolinario * Restore loader_args in papermill plugin Signed-off-by: Eduardo Apolinario * Add unit tests Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario Signed-off-by: mao3267 --- .../flytekitplugins/papermill/task.py | 10 +- plugins/flytekit-papermill/tests/test_task.py | 178 +++++++++++++++++- 2 files changed, 180 insertions(+), 8 deletions(-) diff --git a/plugins/flytekit-papermill/flytekitplugins/papermill/task.py b/plugins/flytekit-papermill/flytekitplugins/papermill/task.py index 23b2295913..93cd13f05b 100644 --- a/plugins/flytekit-papermill/flytekitplugins/papermill/task.py +++ b/plugins/flytekit-papermill/flytekitplugins/papermill/task.py @@ -202,15 +202,21 @@ def get_container(self, settings: SerializationSettings) -> task_models.Containe # Always extract the module from the notebook task, no matter what _config_task_instance is. _, m, t, _ = extract_task_module(self) loader_args = ["task-module", m, "task-name", t] + previous_loader_args = self._config_task_instance.task_resolver.loader_args self._config_task_instance.task_resolver.loader_args = lambda ss, task: loader_args - return self._config_task_instance.get_container(settings) + container = self._config_task_instance.get_container(settings) + self._config_task_instance.task_resolver.loader_args = previous_loader_args + return container def get_k8s_pod(self, settings: SerializationSettings) -> task_models.K8sPod: # Always extract the module from the notebook task, no matter what _config_task_instance is. _, m, t, _ = extract_task_module(self) loader_args = ["task-module", m, "task-name", t] + previous_loader_args = self._config_task_instance.task_resolver.loader_args self._config_task_instance.task_resolver.loader_args = lambda ss, task: loader_args - return self._config_task_instance.get_k8s_pod(settings) + k8s_pod = self._config_task_instance.get_k8s_pod(settings) + self._config_task_instance.task_resolver.loader_args = previous_loader_args + return k8s_pod def get_config(self, settings: SerializationSettings) -> typing.Dict[str, str]: return {**super().get_config(settings), **self._config_task_instance.get_config(settings)} diff --git a/plugins/flytekit-papermill/tests/test_task.py b/plugins/flytekit-papermill/tests/test_task.py index 9c7b778afb..efca238dbd 100644 --- a/plugins/flytekit-papermill/tests/test_task.py +++ b/plugins/flytekit-papermill/tests/test_task.py @@ -4,8 +4,10 @@ import tempfile import typing from unittest import mock +import pytest import pandas as pd +from flytekit.core.pod_template import PodTemplate from click.testing import CliRunner from flytekitplugins.awsbatch import AWSBatchConfig from flytekitplugins.papermill import NotebookTask @@ -147,16 +149,27 @@ def generate_por_spec_for_task(): return pod_spec -nb = NotebookTask( +nb_pod = NotebookTask( name="test", task_config=Pod(pod_spec=generate_por_spec_for_task(), primary_container_name="primary"), notebook_path=_get_nb_path("nb-simple", abs=False), inputs=kwtypes(h=str, n=int, w=str), outputs=kwtypes(h=str, w=PythonNotebook, x=X), ) +nb_pod_template = NotebookTask( + name="test", + pod_template=PodTemplate(pod_spec=generate_por_spec_for_task(), primary_container_name="primary"), + notebook_path=_get_nb_path("nb-simple", abs=False), + inputs=kwtypes(h=str, n=int, w=str), + outputs=kwtypes(h=str, w=PythonNotebook, x=X), +) -def test_notebook_pod_task(): +@pytest.mark.parametrize("nb_task", [ + nb_pod, + nb_pod_template, +]) +def test_notebook_pod_task(nb_task): serialization_settings = flytekit.configuration.SerializationSettings( project="project", domain="domain", @@ -165,13 +178,93 @@ def test_notebook_pod_task(): image_config=ImageConfig(Image(name="name", fqn="image", tag="name")), ) - assert nb.get_container(serialization_settings) is None - assert nb.get_config(serialization_settings)["primary_container_name"] == "primary" + assert nb_task.get_container(serialization_settings) is None + assert nb_task.get_config(serialization_settings)["primary_container_name"] == "primary" assert ( - nb.get_command(serialization_settings) - == nb.get_k8s_pod(serialization_settings).pod_spec["containers"][0]["args"] + nb_task.get_command(serialization_settings) + == nb_task.get_k8s_pod(serialization_settings).pod_spec["containers"][0]["args"] + ) + + +@pytest.mark.parametrize("nb_task, name", [ + (nb_pod, "nb_pod"), + (nb_pod_template, "nb_pod_template"), +]) +def test_notebook_pod_override(nb_task, name): + serialization_settings = flytekit.configuration.SerializationSettings( + project="project", + domain="domain", + version="version", + env=None, + image_config=ImageConfig(Image(name="name", fqn="image", tag="name")), ) + @task + def t1(): + ... + + assert t1.get_container(serialization_settings).args == [ + "pyflyte-execute", + "--inputs", + "{{.input}}", + "--output-prefix", + "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}", + "--checkpoint-path", + "{{.checkpointOutputPrefix}}", + "--prev-checkpoint", + "{{.prevCheckpointPrefix}}", + "--resolver", + "flytekit.core.python_auto_container.default_task_resolver", + "--", + "task-module", + "tests.test_task", + "task-name", + "t1", + ] + assert nb_task.get_k8s_pod(serialization_settings).pod_spec["containers"][0]["args"] == [ + "pyflyte-execute", + "--inputs", + "{{.input}}", + "--output-prefix", + "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}", + "--checkpoint-path", + "{{.checkpointOutputPrefix}}", + "--prev-checkpoint", + "{{.prevCheckpointPrefix}}", + "--resolver", + "flytekit.core.python_auto_container.default_task_resolver", + "--", + "task-module", + "tests.test_task", + "task-name", + f"{name}", + ] + assert t1.get_container(serialization_settings).args == [ + "pyflyte-execute", + "--inputs", + "{{.input}}", + "--output-prefix", + "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}", + "--checkpoint-path", + "{{.checkpointOutputPrefix}}", + "--prev-checkpoint", + "{{.prevCheckpointPrefix}}", + "--resolver", + "flytekit.core.python_auto_container.default_task_resolver", + "--", + "task-module", + "tests.test_task", + "task-name", + # Confirm that task name is correctly pointing to t1 + "t1", + ] + nb_batch = NotebookTask( name="simple-nb", @@ -210,6 +303,79 @@ def test_notebook_batch_task(): ] +def test_overriding_task_resolver_loader_args(): + serialization_settings = flytekit.configuration.SerializationSettings( + project="project", + domain="domain", + version="version", + env=None, + image_config=ImageConfig(Image(name="name", fqn="image", tag="name")), + ) + + @task + def t1(): + ... + + assert t1.get_container(serialization_settings).args == [ + "pyflyte-execute", + "--inputs", + "{{.input}}", + "--output-prefix", + "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}", + "--checkpoint-path", + "{{.checkpointOutputPrefix}}", + "--prev-checkpoint", + "{{.prevCheckpointPrefix}}", + "--resolver", + "flytekit.core.python_auto_container.default_task_resolver", + "--", + "task-module", + "tests.test_task", + "task-name", + "t1", + ] + assert nb_batch.get_container(serialization_settings).args == [ + "pyflyte-execute", + "--inputs", + "{{.input}}", + "--output-prefix", + "{{.outputPrefix}}/0", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}", + "--resolver", + "flytekit.core.python_auto_container.default_task_resolver", + "--", + "task-module", + "tests.test_task", + "task-name", + "nb_batch", + ] + assert t1.get_container(serialization_settings).args == [ + "pyflyte-execute", + "--inputs", + "{{.input}}", + "--output-prefix", + "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}", + "--checkpoint-path", + "{{.checkpointOutputPrefix}}", + "--prev-checkpoint", + "{{.prevCheckpointPrefix}}", + "--resolver", + "flytekit.core.python_auto_container.default_task_resolver", + "--", + "task-module", + "tests.test_task", + "task-name", + # Confirm that task name is correctly pointing to t1 + "t1", + ] + + + def test_flyte_types(): @task def create_file() -> FlyteFile: From e83c240f145b4da3f2fd635014b8945ec24703a4 Mon Sep 17 00:00:00 2001 From: "Thomas J. Fan" Date: Thu, 8 Aug 2024 16:53:24 -0400 Subject: [PATCH 19/27] Catch all exceptions when rendering python dependencies (#2664) Signed-off-by: Thomas J. Fan Signed-off-by: mao3267 --- flytekit/deck/renderer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/deck/renderer.py b/flytekit/deck/renderer.py index 51157dc876..26d589e072 100644 --- a/flytekit/deck/renderer.py +++ b/flytekit/deck/renderer.py @@ -113,7 +113,7 @@ def to_html(self) -> str: .replace("\\n", "\n") .rstrip() ) - except subprocess.CalledProcessError as e: + except Exception as e: logger.error(f"Error occurred while fetching installed packages: {e}") return "Error occurred while fetching installed packages." From b3dbc0858262b98256bd891e762a55525aa67019 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 9 Aug 2024 05:47:45 +0800 Subject: [PATCH 20/27] Don't check the retrun statement for reference_launch_plan (#2665) Signed-off-by: Kevin Su Signed-off-by: mao3267 --- flytekit/core/launch_plan.py | 2 +- tests/flytekit/unit/core/test_references.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flytekit/core/launch_plan.py b/flytekit/core/launch_plan.py index 9018184837..c4327dadc8 100644 --- a/flytekit/core/launch_plan.py +++ b/flytekit/core/launch_plan.py @@ -509,7 +509,7 @@ def reference_launch_plan( """ def wrapper(fn) -> ReferenceLaunchPlan: - interface = transform_function_to_interface(fn) + interface = transform_function_to_interface(fn, is_reference_entity=True) return ReferenceLaunchPlan(project, domain, name, version, interface.inputs, interface.outputs) return wrapper diff --git a/tests/flytekit/unit/core/test_references.py b/tests/flytekit/unit/core/test_references.py index b945027570..d9494a2425 100644 --- a/tests/flytekit/unit/core/test_references.py +++ b/tests/flytekit/unit/core/test_references.py @@ -408,7 +408,7 @@ def ref_wf1(p1: str, p2: str) -> None: def test_ref_lp_from_decorator(): @reference_launch_plan(project="project", domain="domain", name="name", version="version") def ref_lp1(p1: str, p2: str) -> int: - return 0 + ... assert ref_lp1.id.name == "name" assert ref_lp1.id.project == "project" @@ -422,7 +422,7 @@ def test_ref_lp_from_decorator_with_named_outputs(): nt = typing.NamedTuple("RefLPOutput", [("o1", int), ("o2", str)]) @reference_launch_plan(project="project", domain="domain", name="name", version="version") def ref_lp1(p1: str, p2: str) -> nt: - return nt(o1=1, o2="2") + ... assert ref_lp1.python_interface.outputs == {"o1": int, "o2": str} @@ -470,7 +470,7 @@ def test_ref_dynamic_lp(): def my_subwf(a: int) -> typing.List[int]: @reference_launch_plan(project="project", domain="domain", name="name", version="version") def ref_lp1(p1: str, p2: str) -> int: - return 1 + ... s = [] for i in range(a): From 72319a27676cb905c42a426d6e98cf4a33ed85db Mon Sep 17 00:00:00 2001 From: mao3267 Date: Fri, 9 Aug 2024 20:18:08 +0900 Subject: [PATCH 21/27] Bump flyteidl to 1.13.1 (#2666) Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario Signed-off-by: mao3267 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2c3b7a658c..8e8fcef90f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ "diskcache>=5.2.1", "docker>=4.0.0", "docstring-parser>=0.9.0", - "flyteidl>=1.13.1b0", + "flyteidl>=1.13.1", "fsspec>=2023.3.0", "gcsfs>=2023.3.0", "googleapis-common-protos>=1.57", From 8446503e92d95f9b87220e9c4eedd108773fa6f4 Mon Sep 17 00:00:00 2001 From: mao3267 Date: Tue, 13 Aug 2024 10:12:57 +0800 Subject: [PATCH 22/27] fix: raise exception for docker commands in envd builder Signed-off-by: mao3267 --- plugins/flytekit-envd/flytekitplugins/envd/image_builder.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/flytekit-envd/flytekitplugins/envd/image_builder.py b/plugins/flytekit-envd/flytekitplugins/envd/image_builder.py index af5c32ec6d..cf747b9742 100644 --- a/plugins/flytekit-envd/flytekitplugins/envd/image_builder.py +++ b/plugins/flytekit-envd/flytekitplugins/envd/image_builder.py @@ -28,6 +28,9 @@ class EnvdImageSpecBuilder(ImageSpecBuilder): def build_image(self, image_spec: ImageSpec): cfg_path = create_envd_config(image_spec) + if image_spec.docker_commands: + raise ValueError("Docker commands are not supported in envd") + if image_spec.registry_config: bootstrap_command = f"envd bootstrap --registry-config {image_spec.registry_config}" execute_command(bootstrap_command) From 0a85cccce47f8df761a5ca9635a0fc6c3d20ab8d Mon Sep 17 00:00:00 2001 From: mao3267 Date: Tue, 13 Aug 2024 10:21:21 +0800 Subject: [PATCH 23/27] test: add unit tests for docker commands Signed-off-by: mao3267 --- .../flytekit/unit/core/image_spec/test_default_builder.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/flytekit/unit/core/image_spec/test_default_builder.py b/tests/flytekit/unit/core/image_spec/test_default_builder.py index 6887f472b3..ee42c234db 100644 --- a/tests/flytekit/unit/core/image_spec/test_default_builder.py +++ b/tests/flytekit/unit/core/image_spec/test_default_builder.py @@ -30,7 +30,8 @@ def test_create_docker_context(tmp_path): source_root=os.fspath(source_root), commands=["mkdir my_dir"], entrypoint=["/bin/bash"], - pip_extra_index_url=["https://extra-url.com"] + pip_extra_index_url=["https://extra-url.com"], + docker_commands=["RUN git clone https://github.com/flyteorg/flytekit.git", "COPY . /root"], ) create_docker_context(image_spec, docker_context_path) @@ -48,6 +49,8 @@ def test_create_docker_context(tmp_path): assert "RUN mkdir my_dir" in dockerfile_content assert "ENTRYPOINT [\"/bin/bash\"]" in dockerfile_content assert "mkdir -p $HOME" in dockerfile_content + assert "RUN git clone https://github.com/flyteorg/flytekit.git" in dockerfile_content + assert "COPY . /root" in dockerfile_content requirements_path = docker_context_path / "requirements_uv.txt" assert requirements_path.exists() @@ -170,12 +173,13 @@ def test_build(tmp_path): name="FLYTEKIT", python_version="3.12", env={"MY_ENV": "MY_VALUE"}, - apt_packages=["curl"], + apt_packages=["curl", "git"], conda_packages=["scipy==1.13.0", "numpy"], packages=["pandas==2.2.1"], requirements=os.fspath(other_requirements_path), source_root=os.fspath(source_root), commands=["mkdir my_dir"], + docker_commands=["RUN git clone https://github.com/flyteorg/flytekit.git", "COPY . /root"], ) builder = DefaultImageBuilder() From f18c283fc5747bf0e447742157300f952932edd2 Mon Sep 17 00:00:00 2001 From: mao3267 Date: Fri, 16 Aug 2024 16:25:35 +0800 Subject: [PATCH 24/27] fix: modify error message Signed-off-by: mao3267 --- plugins/flytekit-envd/flytekitplugins/envd/image_builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-envd/flytekitplugins/envd/image_builder.py b/plugins/flytekit-envd/flytekitplugins/envd/image_builder.py index cf747b9742..4cdec105a4 100644 --- a/plugins/flytekit-envd/flytekitplugins/envd/image_builder.py +++ b/plugins/flytekit-envd/flytekitplugins/envd/image_builder.py @@ -29,7 +29,7 @@ def build_image(self, image_spec: ImageSpec): cfg_path = create_envd_config(image_spec) if image_spec.docker_commands: - raise ValueError("Docker commands are not supported in envd") + raise ValueError("Docker commands are not supported in envd builder.") if image_spec.registry_config: bootstrap_command = f"envd bootstrap --registry-config {image_spec.registry_config}" From f073069d3ad894a2b4f30834750d12912f047329 Mon Sep 17 00:00:00 2001 From: mao3267 Date: Fri, 16 Aug 2024 16:27:58 +0800 Subject: [PATCH 25/27] test: raise exception for docker commands in envd builder Signed-off-by: mao3267 --- plugins/flytekit-envd/tests/test_image_spec.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/plugins/flytekit-envd/tests/test_image_spec.py b/plugins/flytekit-envd/tests/test_image_spec.py index cbd1eb761d..fbdc321799 100644 --- a/plugins/flytekit-envd/tests/test_image_spec.py +++ b/plugins/flytekit-envd/tests/test_image_spec.py @@ -128,3 +128,12 @@ def build(): ) assert contents == expected_contents + +def test_image_spec_with_envd_builder_exception(): + image_spec = ImageSpec( + name="envd_image", + builder="envd", + docker_commands=["RUN ls"], + ) + with pytest.raises(ValueError): + ImageBuildEngine.build(image_spec) From f2c3679ae175ba3287bc85ac0412d4f1bced05a5 Mon Sep 17 00:00:00 2001 From: mao3267 Date: Fri, 16 Aug 2024 16:30:37 +0800 Subject: [PATCH 26/27] test: ImageSpec with docker commands Signed-off-by: mao3267 --- tests/flytekit/unit/core/image_spec/test_image_spec.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/flytekit/unit/core/image_spec/test_image_spec.py b/tests/flytekit/unit/core/image_spec/test_image_spec.py index fa63f08993..be3233190e 100644 --- a/tests/flytekit/unit/core/image_spec/test_image_spec.py +++ b/tests/flytekit/unit/core/image_spec/test_image_spec.py @@ -26,12 +26,14 @@ def test_image_spec(mock_image_spec_builder): requirements=REQUIREMENT_FILE, registry_config=REGISTRY_CONFIG_FILE, entrypoint=["/bin/bash"], + docker_commands=["RUN ls"], ) assert image_spec._is_force_push is False image_spec = image_spec.with_commands("echo hello") image_spec = image_spec.with_packages("numpy") image_spec = image_spec.with_apt_packages("wget") + image_spec = image_spec.with_docker_commands(["RUN echo hello"]) image_spec = image_spec.force_push() assert image_spec.python_version == "3.8" @@ -52,6 +54,7 @@ def test_image_spec(mock_image_spec_builder): assert image_spec.commands == ["echo hello"] assert image_spec._is_force_push is True assert image_spec.entrypoint == ["/bin/bash"] + assert image_spec.docker_commands == ["RUN ls", "RUN echo hello"] tag = calculate_hash_from_image_spec(image_spec) assert "=" != tag[-1] From cca874b13542d1018a2d3bf7df20ad1d61825de7 Mon Sep 17 00:00:00 2001 From: mao3267 Date: Sat, 17 Aug 2024 09:23:48 +0800 Subject: [PATCH 27/27] fix: add docker commands to parameters_str_list Signed-off-by: mao3267 --- flytekit/image_spec/image_spec.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flytekit/image_spec/image_spec.py b/flytekit/image_spec/image_spec.py index f0dd321a65..2ca7fe1984 100644 --- a/flytekit/image_spec/image_spec.py +++ b/flytekit/image_spec/image_spec.py @@ -89,6 +89,7 @@ def __post_init__(self): "pip_extra_index_url", "entrypoint", "commands", + "docker_commands", ] for parameter in parameters_str_list: attr = getattr(self, parameter)