Skip to content

Commit

Permalink
Return explicit task execution code not found (#2659)
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin authored Aug 12, 2024
1 parent 768ae81 commit c2b5c45
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 6 deletions.
6 changes: 4 additions & 2 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from flytekit.configuration import DataConfig
from flytekit.core.local_fsspec import FlyteLocalFileSystem
from flytekit.core.utils import timeit
from flytekit.exceptions.user import FlyteAssertion, FlyteValueException
from flytekit.exceptions.user import FlyteAssertion, FlyteDataNotFoundException
from flytekit.interfaces.random import random
from flytekit.loggers import logger

Expand Down Expand Up @@ -300,7 +300,7 @@ def get(self, from_path: str, to_path: str, recursive: bool = False, **kwargs):
except OSError as oe:
logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {oe}")
if not file_system.exists(from_path):
raise FlyteValueException(from_path, "File not found")
raise FlyteDataNotFoundException(from_path)
file_system = self.get_filesystem(get_protocol(from_path), anonymous=True)
if file_system is not None:
logger.debug(f"Attempting anonymous get with {file_system}")
Expand Down Expand Up @@ -558,6 +558,8 @@ def get_data(self, remote_path: str, local_path: str, is_multipart: bool = False
pathlib.Path(local_path).parent.mkdir(parents=True, exist_ok=True)
with timeit(f"Download data to local from {remote_path}"):
self.get(remote_path, to_path=local_path, recursive=is_multipart, **kwargs)
except FlyteDataNotFoundException:
raise
except Exception as ex:
raise FlyteAssertion(
f"Failed to get data from {remote_path} to {local_path} (recursive={is_multipart}).\n\n"
Expand Down
5 changes: 5 additions & 0 deletions flytekit/exceptions/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ def __init__(self, received_value, error_message):
super(FlyteValueException, self).__init__(self._create_verbose_message(received_value, error_message))


class FlyteDataNotFoundException(FlyteValueException):
def __init__(self, path: str):
super(FlyteDataNotFoundException, self).__init__(path, "File not found")


class FlyteAssertion(FlyteUserException, AssertionError):
_ERROR_CODE = "USER:AssertionError"

Expand Down
8 changes: 7 additions & 1 deletion flytekit/tools/fast_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from flytekit.core.context_manager import FlyteContextManager
from flytekit.core.utils import timeit
from flytekit.exceptions.user import FlyteDataNotFoundException
from flytekit.loggers import logger
from flytekit.tools.ignore import DockerIgnore, FlyteIgnore, GitIgnore, Ignore, IgnoreGroup, StandardIgnore
from flytekit.tools.script_mode import tar_strip_file_attributes
Expand Down Expand Up @@ -146,7 +147,12 @@ def download_distribution(additional_distribution: str, destination: str):
# NOTE the os.path.join(destination, ''). This is to ensure that the given path is in fact a directory and all
# downloaded data should be copied into this directory. We do this to account for a difference in behavior in
# fsspec, which requires a trailing slash in case of pre-existing directory.
FlyteContextManager.current_context().file_access.get_data(additional_distribution, os.path.join(destination, ""))
try:
FlyteContextManager.current_context().file_access.get_data(
additional_distribution, os.path.join(destination, "")
)
except FlyteDataNotFoundException as ex:
raise RuntimeError("task execution code was not found") from ex
tarfile_name = os.path.basename(additional_distribution)
if not tarfile_name.endswith(".tar.gz"):
raise RuntimeError("Unrecognized additional distribution format for {}".format(additional_distribution))
Expand Down
6 changes: 3 additions & 3 deletions tests/flytekit/unit/core/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import flytekit
from flytekit.core.checkpointer import SyncCheckpoint
from flytekit.exceptions.user import FlyteAssertion
from flytekit.exceptions.user import FlyteDataNotFoundException


def test_sync_checkpoint_write(tmpdir):
Expand Down Expand Up @@ -90,10 +90,10 @@ def test_sync_checkpoint_restore_corrupt(tmpdir):
prev.unlink()
src.rmdir()

with pytest.raises(FlyteAssertion):
with pytest.raises(FlyteDataNotFoundException):
cp.restore(user_dest)

with pytest.raises(FlyteAssertion):
with pytest.raises(FlyteDataNotFoundException):
cp.restore(user_dest)


Expand Down

0 comments on commit c2b5c45

Please sign in to comment.