Skip to content

Commit

Permalink
Rebase (#3038)
Browse files Browse the repository at this point in the history
* Store protos in local cache (#3022)

* Store proto obj instead of model Literal in local cache

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove unused file

Signed-off-by: Eduardo Apolinario <[email protected]>

---------

Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>

* Bump aiohttp from 3.9.5 to 3.10.11 (#3018)

Bumps [aiohttp](https://github.com/aio-libs/aiohttp) from 3.9.5 to 3.10.11.
- [Release notes](https://github.com/aio-libs/aiohttp/releases)
- [Changelog](https://github.com/aio-libs/aiohttp/blob/master/CHANGES.rst)
- [Commits](aio-libs/aiohttp@v3.9.5...v3.10.11)

---
updated-dependencies:
- dependency-name: aiohttp
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Fix bug in FlyteDirectory.listdir on local files (#2926)

* Fix issue in FlyteDirectory.listdir

Fixes flyteorg/flyte#6005

Signed-off-by: Pim de Haan <[email protected]>

* Added test

Signed-off-by: Pim de Haan <[email protected]>

* Run make lint

Signed-off-by: Eduardo Apolinario <[email protected]>

---------

Signed-off-by: Pim de Haan <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>

* Fix unit tests in airflow plugin (#3024)

Signed-off-by: Kevin Su <[email protected]>

* fix: Fix resource meta typos for async agent (#3023)

Signed-off-by: JiaWei Jiang <[email protected]>

* fix: format commands output (#3026)

* Fix pydantic basemodel default input (#3013)

* Fix pydantic default input

Signed-off-by: Future-Outlier <[email protected]>

* add pydantic integration test

Signed-off-by: Future-Outlier <[email protected]>

* Use duck typing by Thomas's advice

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: Thomas J. Fan <[email protected]>

* lint

Signed-off-by: Future-Outlier <[email protected]>

---------

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: Thomas J. Fan <[email protected]>

* [BUG] Open FlyteFile from remote path (#2991)

* fix: Open FlyteFile from remote path

Signed-off-by: JiaWei Jiang <[email protected]>

* Add integration test

Signed-off-by: JiaWei Jiang <[email protected]>

* refactor: Use ctx as param instead of recreation

Signed-off-by: JiaWei Jiang <[email protected]>

* refactor: Clean test logic

1. Remove redundant prints
2. Use `mock.patch.dict` to setup `os.environ` for the current test fn
    * Avoid contaminating other tests running in the same process

Signed-off-by: JiaWei Jiang <[email protected]>

* refactor: Setup local path and downloader in constructor

Signed-off-by: JiaWei Jiang <[email protected]>

* refactor: Move SimpleFileTransfer to an utility file

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant env var setup

Please refer to #3001

Signed-off-by: JiaWei Jiang <[email protected]>

* test: Add another ff use case

Create ff in one task pod and read it in another task pod.

Signed-off-by: JiaWei Jiang <[email protected]>

---------

Signed-off-by: JiaWei Jiang <[email protected]>

* vllm inference plugin (#2967)

* vllm inference plugin

Signed-off-by: Daniel Sola <[email protected]>

* fixed default value

Signed-off-by: Daniel Sola <[email protected]>

---------

Signed-off-by: Daniel Sola <[email protected]>

* Add poetry to image spec (#3025)

* Add poetry to image spec

Signed-off-by: Thomas J. Fan <[email protected]>

* Add stricter check

Signed-off-by: Thomas J. Fan <[email protected]>

---------

Signed-off-by: Thomas J. Fan <[email protected]>

* [test] Add integration test for accessing sd sttr in dc (#2969)

* test: Add integration test for attr access of sd

Signed-off-by: JiaWei Jiang <[email protected]>

* Correct file path

Signed-off-by: JiaWei Jiang <[email protected]>

* test: Support interaction with minio s3 bucket

1. Upload a local parquet file to minio s3 bucket
2. Access StructuredDataset attr from a dataclass
3. Open StructuredDataset from a remote path

Signed-off-by: JiaWei Jiang <[email protected]>

* Delete an unmerged integration test

Signed-off-by: JiaWei Jiang <[email protected]>

* Try imagespec with commit sha of corresponding fix

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant test

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove default_factory and create sd dc from input uri

Signed-off-by: JiaWei Jiang <[email protected]>

* refactor: Clean test logic

1. Remove redundant prints
2. Use `mock.patch.dict` to setup `os.environ` for the current test fn
    * Avoid contaminating other tests running in the same process

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant minio env var setup and add test comments

Signed-off-by: JiaWei Jiang <[email protected]>

* Support uploading tmp pqt file

Signed-off-by: JiaWei Jiang <[email protected]>

* Udpate deprecated module

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant and unused imports

Signed-off-by: JiaWei Jiang <[email protected]>

---------

Signed-off-by: JiaWei Jiang <[email protected]>

---------

Signed-off-by: Eduardo Apolinario <[email protected]>
Signed-off-by: dependabot[bot] <[email protected]>
Signed-off-by: Pim de Haan <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Daniel Sola <[email protected]>
Signed-off-by: Thomas J. Fan <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Pim de Haan <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
Co-authored-by: 江家瑋 <[email protected]>
Co-authored-by: V <[email protected]>
Co-authored-by: Han-Ru Chen (Future-Outlier) <[email protected]>
Co-authored-by: Thomas J. Fan <[email protected]>
Co-authored-by: Daniel Sola <[email protected]>
  • Loading branch information
11 people authored Jan 6, 2025
1 parent 44c9395 commit 8d5bd85
Show file tree
Hide file tree
Showing 25 changed files with 753 additions and 59 deletions.
20 changes: 12 additions & 8 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ adlfs==2024.4.1
# via flytekit
aiobotocore==2.13.0
# via s3fs
aiohttp==3.9.5
aiohappyeyeballs==2.4.4
# via aiohttp
aiohttp==3.10.11
# via
# adlfs
# aiobotocore
Expand Down Expand Up @@ -113,10 +115,8 @@ filelock==3.14.0
# via
# snowflake-connector-python
# virtualenv
flyteidl @ git+https://github.com/flyteorg/flyte.git@master#subdirectory=flyteidl
# via
# -r dev-requirements.in
# flytekit
flyteidl==1.14.1
# via flytekit
frozenlist==1.4.1
# via
# aiohttp
Expand Down Expand Up @@ -244,7 +244,9 @@ keyring==25.2.1
keyrings-alt==5.0.1
# via -r dev-requirements.in
kubernetes==29.0.0
# via -r dev-requirements.in
# via
# -r dev-requirements.in
# flytekit
markdown-it-py==3.0.0
# via
# flytekit
Expand All @@ -260,7 +262,7 @@ marshmallow-enum==1.5.1
# flytekit
marshmallow-jsonschema==0.13.0
# via flytekit
mashumaro==3.13
mashumaro==3.15
# via flytekit
matplotlib-inline==0.1.7
# via
Expand Down Expand Up @@ -345,6 +347,8 @@ prometheus-client==0.20.0
# via -r dev-requirements.in
prompt-toolkit==3.0.45
# via ipython
propcache==0.2.1
# via yarl
proto-plus==1.23.0
# via
# google-api-core
Expand Down Expand Up @@ -557,7 +561,7 @@ websocket-client==1.8.0
# kubernetes
wrapt==1.16.0
# via aiobotocore
yarl==1.9.4
yarl==1.18.3
# via aiohttp
zipp==3.19.1
# via importlib-metadata
Expand Down
13 changes: 10 additions & 3 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,15 @@ def to_click_option(
If no custom logic exists, fall back to json.dumps.
"""
with FlyteContextManager.with_context(flyte_ctx.new_builder()):
encoder = JSONEncoder(python_type)
default_val = encoder.encode(default_val)
if hasattr(default_val, "model_dump_json"):
# pydantic v2
default_val = default_val.model_dump_json()
elif hasattr(default_val, "json"):
# pydantic v1
default_val = default_val.json()
else:
encoder = JSONEncoder(python_type)
default_val = encoder.encode(default_val)
if literal_var.type.metadata:
description_extra = f": {json.dumps(literal_var.type.metadata)}"

Expand Down Expand Up @@ -1057,7 +1064,7 @@ def _create_command(
h = h + click.style(f" (LP Name: {loaded_entity.name})", fg="yellow")
else:
if loaded_entity.__doc__:
h = h + click.style(f"{loaded_entity.__doc__}", dim=True)
h = h + click.style(f" {loaded_entity.__doc__}", dim=True)
cmd = YamlFileReadingCommand(
name=entity_name,
params=params,
Expand Down
48 changes: 38 additions & 10 deletions flytekit/core/local_cache.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from typing import Optional, Tuple

from diskcache import Cache
from flyteidl.core.literals_pb2 import LiteralMap

from flytekit import lazy_module
from flytekit.models.literals import Literal, LiteralCollection, LiteralMap
from flytekit.models.literals import Literal, LiteralCollection
from flytekit.models.literals import LiteralMap as ModelLiteralMap

joblib = lazy_module("joblib")

Expand All @@ -23,13 +25,16 @@ def _recursive_hash_placement(literal: Literal) -> Literal:
literal_map = {}
for key, literal_value in literal.map.literals.items():
literal_map[key] = _recursive_hash_placement(literal_value)
return Literal(map=LiteralMap(literal_map))
return Literal(map=ModelLiteralMap(literal_map))
else:
return literal


def _calculate_cache_key(
task_name: str, cache_version: str, input_literal_map: LiteralMap, cache_ignore_input_vars: Tuple[str, ...] = ()
task_name: str,
cache_version: str,
input_literal_map: ModelLiteralMap,
cache_ignore_input_vars: Tuple[str, ...] = (),
) -> str:
# Traverse the literals and replace the literal with a new literal that only contains the hash
literal_map_overridden = {}
Expand All @@ -40,7 +45,7 @@ def _calculate_cache_key(

# Generate a stable representation of the underlying protobuf by passing `deterministic=True` to the
# protobuf library.
hashed_inputs = LiteralMap(literal_map_overridden).to_flyte_idl().SerializeToString(deterministic=True)
hashed_inputs = ModelLiteralMap(literal_map_overridden).to_flyte_idl().SerializeToString(deterministic=True)
# Use joblib to hash the string representation of the literal into a fixed length string
return f"{task_name}-{cache_version}-{joblib.hash(hashed_inputs)}"

Expand All @@ -66,24 +71,47 @@ def clear():

@staticmethod
def get(
task_name: str, cache_version: str, input_literal_map: LiteralMap, cache_ignore_input_vars: Tuple[str, ...]
) -> Optional[LiteralMap]:
task_name: str, cache_version: str, input_literal_map: ModelLiteralMap, cache_ignore_input_vars: Tuple[str, ...]
) -> Optional[ModelLiteralMap]:
if not LocalTaskCache._initialized:
LocalTaskCache.initialize()
return LocalTaskCache._cache.get(
serialized_obj = LocalTaskCache._cache.get(
_calculate_cache_key(task_name, cache_version, input_literal_map, cache_ignore_input_vars)
)

if serialized_obj is None:
return None

# If the serialized object is a model file, first convert it back to a proto object (which will force it to
# use the installed flyteidl proto messages) and then convert it to a model object. This will guarantee
# that the object is in the correct format.
if isinstance(serialized_obj, ModelLiteralMap):
return ModelLiteralMap.from_flyte_idl(ModelLiteralMap.to_flyte_idl(serialized_obj))
elif isinstance(serialized_obj, bytes):
# If it is a bytes object, then it is a serialized proto object.
# We need to convert it to a model object first.o
pb_literal_map = LiteralMap()
pb_literal_map.ParseFromString(serialized_obj)
return ModelLiteralMap.from_flyte_idl(pb_literal_map)
else:
raise ValueError(f"Unexpected object type {type(serialized_obj)}")

@staticmethod
def set(
task_name: str,
cache_version: str,
input_literal_map: LiteralMap,
input_literal_map: ModelLiteralMap,
cache_ignore_input_vars: Tuple[str, ...],
value: LiteralMap,
value: ModelLiteralMap,
) -> None:
if not LocalTaskCache._initialized:
LocalTaskCache.initialize()
LocalTaskCache._cache.set(
_calculate_cache_key(task_name, cache_version, input_literal_map, cache_ignore_input_vars), value
_calculate_cache_key(
task_name,
cache_version,
input_literal_map,
cache_ignore_input_vars,
),
value.to_flyte_idl().SerializeToString(),
)
4 changes: 2 additions & 2 deletions flytekit/extend/backend/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon
task_execution_metadata = TaskExecutionMetadata.from_flyte_idl(request.task_execution_metadata)

logger.info(f"{agent.name} start creating the job")
resource_mata = await mirror_async_methods(
resource_meta = await mirror_async_methods(
agent.create,
task_template=template,
inputs=inputs,
output_prefix=request.output_prefix,
task_execution_metadata=task_execution_metadata,
)
return CreateTaskResponse(resource_meta=resource_mata.encode())
return CreateTaskResponse(resource_meta=resource_meta.encode())

@record_agent_metrics
async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse:
Expand Down
4 changes: 2 additions & 2 deletions flytekit/extend/backend/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,10 @@ def execute(self: PythonTask, **kwargs) -> LiteralMap:
task_template = get_serializable(OrderedDict(), ss, self).template
self._agent = AgentRegistry.get_agent(task_template.type, task_template.task_type_version)

resource_mata = asyncio.run(
resource_meta = asyncio.run(
self._create(task_template=task_template, output_prefix=output_prefix, inputs=kwargs)
)
resource = asyncio.run(self._get(resource_meta=resource_mata))
resource = asyncio.run(self._get(resource_meta=resource_meta))

if resource.phase != TaskExecution.SUCCEEDED:
raise FlyteUserException(f"Failed to run the task {self.name} with error: {resource.message}")
Expand Down
61 changes: 51 additions & 10 deletions flytekit/image_spec/default_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,31 @@
"""
)

POETRY_LOCK_TEMPLATE = Template(
"""\
RUN --mount=type=cache,sharing=locked,mode=0777,target=/root/.cache/uv,id=uv \
--mount=from=uv,source=/uv,target=/usr/bin/uv \
uv pip install poetry
ENV POETRY_CACHE_DIR=/tmp/poetry_cache \
POETRY_VIRTUALENVS_IN_PROJECT=true
# poetry install does not work running in /, so we move to /root to create the venv
WORKDIR /root
RUN --mount=type=cache,sharing=locked,mode=0777,target=/tmp/poetry_cache,id=poetry \
--mount=type=bind,target=poetry.lock,src=poetry.lock \
--mount=type=bind,target=pyproject.toml,src=pyproject.toml \
poetry install $PIP_INSTALL_ARGS
WORKDIR /
# Update PATH and UV_PYTHON to point to venv
ENV PATH="/root/.venv/bin:$$PATH" \
UV_PYTHON=/root/.venv/bin/python
"""
)

UV_PYTHON_INSTALL_COMMAND_TEMPLATE = Template(
"""\
RUN --mount=type=cache,sharing=locked,mode=0777,target=/root/.cache/uv,id=uv \
Expand All @@ -44,6 +69,7 @@
"""
)


APT_INSTALL_COMMAND_TEMPLATE = Template("""\
RUN --mount=type=cache,sharing=locked,mode=0777,target=/var/cache/apt,id=apt \
apt-get update && apt-get install -y --no-install-recommends \
Expand Down Expand Up @@ -128,29 +154,33 @@ def _is_flytekit(package: str) -> bool:
return name == "flytekit"


def prepare_uv_lock_command(image_spec: ImageSpec, pip_install_args: List[str], tmp_dir: Path) -> str:
# uv sync is experimental, so our uv.lock support is also experimental
# the parameters we pass into install args could be different
warnings.warn("uv.lock support is experimental", UserWarning)

def _copy_lock_files_into_context(image_spec: ImageSpec, lock_file: str, tmp_dir: Path):
if image_spec.packages is not None:
msg = "Support for uv.lock files and packages is mutually exclusive"
msg = f"Support for {lock_file} files and packages is mutually exclusive"
raise ValueError(msg)

uv_lock_path = tmp_dir / "uv.lock"
shutil.copy2(image_spec.requirements, uv_lock_path)
lock_path = tmp_dir / lock_file
shutil.copy2(image_spec.requirements, lock_path)

# uv.lock requires pyproject.toml to be included
# lock requires pyproject.toml to be included
pyproject_toml_path = tmp_dir / "pyproject.toml"
dir_name = os.path.dirname(image_spec.requirements)

pyproject_toml_src = os.path.join(dir_name, "pyproject.toml")
if not os.path.exists(pyproject_toml_src):
msg = "To use uv.lock, a pyproject.toml must be in the same directory as the lock file"
msg = f"To use {lock_file}, a pyproject.toml file must be in the same directory as the lock file"
raise ValueError(msg)

shutil.copy2(pyproject_toml_src, pyproject_toml_path)


def prepare_uv_lock_command(image_spec: ImageSpec, pip_install_args: List[str], tmp_dir: Path) -> str:
# uv sync is experimental, so our uv.lock support is also experimental
# the parameters we pass into install args could be different
warnings.warn("uv.lock support is experimental", UserWarning)

_copy_lock_files_into_context(image_spec, "uv.lock", tmp_dir)

# --locked: Assert that the `uv.lock` will remain unchanged
# --no-dev: Omit the development dependency group
# --no-install-project: Do not install the current project
Expand All @@ -160,6 +190,15 @@ def prepare_uv_lock_command(image_spec: ImageSpec, pip_install_args: List[str],
return UV_LOCK_INSTALL_TEMPLATE.substitute(PIP_INSTALL_ARGS=pip_install_args)


def prepare_poetry_lock_command(image_spec: ImageSpec, pip_install_args: List[str], tmp_dir: Path) -> str:
_copy_lock_files_into_context(image_spec, "poetry.lock", tmp_dir)

# --no-root: Do not install the current project
pip_install_args.extend(["--no-root"])
pip_install_args = " ".join(pip_install_args)
return POETRY_LOCK_TEMPLATE.substitute(PIP_INSTALL_ARGS=pip_install_args)


def prepare_python_install(image_spec: ImageSpec, tmp_dir: Path) -> str:
pip_install_args = []
if image_spec.pip_index:
Expand All @@ -174,6 +213,8 @@ def prepare_python_install(image_spec: ImageSpec, tmp_dir: Path) -> str:
requirement_basename = os.path.basename(image_spec.requirements)
if requirement_basename == "uv.lock":
return prepare_uv_lock_command(image_spec, pip_install_args, tmp_dir)
elif requirement_basename == "poetry.lock":
return prepare_poetry_lock_command(image_spec, pip_install_args, tmp_dir)

# Assume this is a requirements.txt file
with open(image_spec.requirements) as f:
Expand Down
10 changes: 9 additions & 1 deletion flytekit/models/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,15 @@ def offloaded_metadata(self) -> Optional[LiteralOffloadedMetadata]:
"""
This value holds metadata about the offloaded literal.
"""
return self._offloaded_metadata
# The following check might seem non-sensical, since `_offloaded_metadata` is set in the constructor.
# This is here to support backwards compatibility caused by the local cache implementation. Let me explain.
# The local cache pickles values and unpickles them. When unpickling, the constructor is not called, so there
# are cases where the `_offloaded_metadata` is not set (for example if you cache a value using flytekit<=1.13.6
# and you load that value later using flytekit>1.13.6).
# In other words, this is a workaround to support backwards compatibility with the local cache.
if hasattr(self, "_offloaded_metadata"):
return self._offloaded_metadata
return None

def to_flyte_idl(self):
"""
Expand Down
7 changes: 4 additions & 3 deletions flytekit/types/directory/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,11 @@ def listdir(cls, directory: FlyteDirectory) -> typing.List[typing.Union[FlyteDir
file_access = FlyteContextManager.current_context().file_access
if not file_access.is_remote(final_path):
for p in os.listdir(final_path):
if os.path.isfile(os.path.join(final_path, p)):
paths.append(FlyteFile(p))
joined_path = os.path.join(final_path, p)
if os.path.isfile(joined_path):
paths.append(FlyteFile(joined_path))
else:
paths.append(FlyteDirectory(p))
paths.append(FlyteDirectory(joined_path))
return paths

def create_downloader(_remote_path: str, _local_path: str, is_multipart: bool):
Expand Down
Loading

0 comments on commit 8d5bd85

Please sign in to comment.