Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add FlyteValueException handling and clean up exit_handler calls #3036

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
14 changes: 13 additions & 1 deletion flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@
FlyteNonRecoverableSystemException,
FlyteUploadDataException,
)
from flytekit.exceptions.user import FlyteUserRuntimeException
from flytekit.exceptions.user import (
FlyteUserRuntimeException,
FlyteValueException,
)
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import interface as _interface_models
Expand Down Expand Up @@ -770,6 +773,15 @@ def dispatch_execute(
):
return native_outputs

if isinstance(native_outputs, VoidPromise):
return _literal_models.LiteralMap(literals={})

if native_outputs is not None and len(list(self._outputs_interface.keys())) == 0:
raise FlyteValueException(
native_outputs,
f"Interface has {len(self.python_interface.outputs)} outputs.",
)

try:
with timeit("dispatch execute"):
literals_map, native_outputs_as_map = run_sync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def jupyter_patches():
with mock.patch("multiprocessing.Process") as mock_process, mock.patch(
"flytekitplugins.flyteinteractive.jupyter_lib.decorator.write_example_notebook"
) as mock_write_example_notebook, mock.patch(
"flytekitplugins.flyteinteractive.jupyter_lib.decorator.exit_handler"
"flytekitplugins.flyteinteractive.jupyter_lib.decorator.exit_handler",
return_value=None,
) as mock_exit_handler:
yield (mock_process, mock_write_example_notebook, mock_exit_handler)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def vscode_patches():
with mock.patch("multiprocessing.Process") as mock_process, mock.patch(
"flytekit.interactive.vscode_lib.decorator.prepare_interactive_python"
) as mock_prepare_interactive_python, mock.patch(
"flytekit.interactive.vscode_lib.decorator.exit_handler"
"flytekit.interactive.vscode_lib.decorator.exit_handler",
return_value=None,
) as mock_exit_handler, mock.patch(
"flytekit.interactive.vscode_lib.decorator.download_vscode"
) as mock_download_vscode, mock.patch("signal.signal") as mock_signal, mock.patch(
Expand Down
2 changes: 1 addition & 1 deletion plugins/flytekit-kf-pytorch/tests/test_elastic_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def test_execution_params(start_method: str, target_exec_id: str, monkeypatch_ex
monkeypatch.setenv("FLYTE_INTERNAL_EXECUTION_ID", target_exec_id)

@task(task_config=Elastic(nnodes=1, nproc_per_node=1, start_method=start_method))
def test_task(n: int):
def test_task(n: int) -> int:
ctx = flytekit.current_context()

assert ctx.execution_id.name == target_exec_id
Expand Down
2 changes: 1 addition & 1 deletion plugins/flytekit-pandera/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class Schema(pandera.DataFrameModel):
col2: float

@task
def fn_input(df: typing.Annotated[DataFrame[Schema], config]):
def fn_input(df: typing.Annotated[DataFrame[Schema], config]) -> typing.Annotated[DataFrame[Schema], config]:
return df

@task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def t1(dc: DC = DC(ff=FlyteFile(os.path.realpath(__file__)),
uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet",
file_format="parquet"),
fd=FlyteDirectory("tests/flytekit/integration/remote/workflows/basic/data/")
)):
)) -> DC:

with open(dc.ff, "r") as f:
print("File Content: ", f.read())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def t1(dc: DC = DC(ff=FlyteFile(os.path.realpath(__file__)),
uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet",
file_format="parquet"),
fd=FlyteDirectory("tests/flytekit/integration/remote/workflows/basic/data/")
)):
)) -> DC:

with open(dc.ff, "r") as f:
print("File Content: ", f.read())
Expand Down
33 changes: 33 additions & 0 deletions tests/flytekit/unit/core/test_task_return.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pytest
from flytekit import task
from flytekit.exceptions.user import FlyteValueException
from typing import Optional


def test_task_return():
@task
def foo(a: int) -> int:
return a + 1

assert foo(1) == 2


def test_task_optional_return():
@task
def foo(return_none: bool) -> Optional[int]:
return None if return_none else 1

assert foo(True) is None
assert foo(False) == 1


def test_task_no_return():
@task
def foo(a: int):
return a + 1

with pytest.raises(
FlyteValueException,
match="Interface has 0 outputs.",
):
foo(1)
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def vscode_patches():
with mock.patch("multiprocessing.Process") as mock_process, mock.patch(
"flytekit.interactive.vscode_lib.decorator.prepare_interactive_python"
) as mock_prepare_interactive_python, mock.patch(
"flytekit.interactive.vscode_lib.decorator.exit_handler"
"flytekit.interactive.vscode_lib.decorator.exit_handler",
return_value=None,
) as mock_exit_handler, mock.patch(
"flytekit.interactive.vscode_lib.decorator.download_vscode"
) as mock_download_vscode, mock.patch("signal.signal") as mock_signal, mock.patch(
Expand Down
Loading