diff --git a/dev-requirements.txt b/dev-requirements.txt index 9acff98cb6..26af8ad1bb 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -10,7 +10,9 @@ adlfs==2024.4.1 # via flytekit aiobotocore==2.13.0 # via s3fs -aiohttp==3.9.5 +aiohappyeyeballs==2.4.4 + # via aiohttp +aiohttp==3.10.11 # via # adlfs # aiobotocore @@ -113,10 +115,8 @@ filelock==3.14.0 # via # snowflake-connector-python # virtualenv -flyteidl @ git+https://github.com/flyteorg/flyte.git@master#subdirectory=flyteidl - # via - # -r dev-requirements.in - # flytekit +flyteidl==1.14.1 + # via flytekit frozenlist==1.4.1 # via # aiohttp @@ -244,7 +244,9 @@ keyring==25.2.1 keyrings-alt==5.0.1 # via -r dev-requirements.in kubernetes==29.0.0 - # via -r dev-requirements.in + # via + # -r dev-requirements.in + # flytekit markdown-it-py==3.0.0 # via # flytekit @@ -260,7 +262,7 @@ marshmallow-enum==1.5.1 # flytekit marshmallow-jsonschema==0.13.0 # via flytekit -mashumaro==3.13 +mashumaro==3.15 # via flytekit matplotlib-inline==0.1.7 # via @@ -345,6 +347,8 @@ prometheus-client==0.20.0 # via -r dev-requirements.in prompt-toolkit==3.0.45 # via ipython +propcache==0.2.1 + # via yarl proto-plus==1.23.0 # via # google-api-core @@ -557,7 +561,7 @@ websocket-client==1.8.0 # kubernetes wrapt==1.16.0 # via aiobotocore -yarl==1.9.4 +yarl==1.18.3 # via aiohttp zipp==3.19.1 # via importlib-metadata diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index 7d661c3ff8..4c1da5ccf3 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -475,8 +475,15 @@ def to_click_option( If no custom logic exists, fall back to json.dumps. """ with FlyteContextManager.with_context(flyte_ctx.new_builder()): - encoder = JSONEncoder(python_type) - default_val = encoder.encode(default_val) + if hasattr(default_val, "model_dump_json"): + # pydantic v2 + default_val = default_val.model_dump_json() + elif hasattr(default_val, "json"): + # pydantic v1 + default_val = default_val.json() + else: + encoder = JSONEncoder(python_type) + default_val = encoder.encode(default_val) if literal_var.type.metadata: description_extra = f": {json.dumps(literal_var.type.metadata)}" @@ -1057,7 +1064,7 @@ def _create_command( h = h + click.style(f" (LP Name: {loaded_entity.name})", fg="yellow") else: if loaded_entity.__doc__: - h = h + click.style(f"{loaded_entity.__doc__}", dim=True) + h = h + click.style(f" {loaded_entity.__doc__}", dim=True) cmd = YamlFileReadingCommand( name=entity_name, params=params, diff --git a/flytekit/core/local_cache.py b/flytekit/core/local_cache.py index 7cd87e2a49..d6c7f93f99 100644 --- a/flytekit/core/local_cache.py +++ b/flytekit/core/local_cache.py @@ -1,9 +1,11 @@ from typing import Optional, Tuple from diskcache import Cache +from flyteidl.core.literals_pb2 import LiteralMap from flytekit import lazy_module -from flytekit.models.literals import Literal, LiteralCollection, LiteralMap +from flytekit.models.literals import Literal, LiteralCollection +from flytekit.models.literals import LiteralMap as ModelLiteralMap joblib = lazy_module("joblib") @@ -23,13 +25,16 @@ def _recursive_hash_placement(literal: Literal) -> Literal: literal_map = {} for key, literal_value in literal.map.literals.items(): literal_map[key] = _recursive_hash_placement(literal_value) - return Literal(map=LiteralMap(literal_map)) + return Literal(map=ModelLiteralMap(literal_map)) else: return literal def _calculate_cache_key( - task_name: str, cache_version: str, input_literal_map: LiteralMap, cache_ignore_input_vars: Tuple[str, ...] = () + task_name: str, + cache_version: str, + input_literal_map: ModelLiteralMap, + cache_ignore_input_vars: Tuple[str, ...] = (), ) -> str: # Traverse the literals and replace the literal with a new literal that only contains the hash literal_map_overridden = {} @@ -40,7 +45,7 @@ def _calculate_cache_key( # Generate a stable representation of the underlying protobuf by passing `deterministic=True` to the # protobuf library. - hashed_inputs = LiteralMap(literal_map_overridden).to_flyte_idl().SerializeToString(deterministic=True) + hashed_inputs = ModelLiteralMap(literal_map_overridden).to_flyte_idl().SerializeToString(deterministic=True) # Use joblib to hash the string representation of the literal into a fixed length string return f"{task_name}-{cache_version}-{joblib.hash(hashed_inputs)}" @@ -66,24 +71,47 @@ def clear(): @staticmethod def get( - task_name: str, cache_version: str, input_literal_map: LiteralMap, cache_ignore_input_vars: Tuple[str, ...] - ) -> Optional[LiteralMap]: + task_name: str, cache_version: str, input_literal_map: ModelLiteralMap, cache_ignore_input_vars: Tuple[str, ...] + ) -> Optional[ModelLiteralMap]: if not LocalTaskCache._initialized: LocalTaskCache.initialize() - return LocalTaskCache._cache.get( + serialized_obj = LocalTaskCache._cache.get( _calculate_cache_key(task_name, cache_version, input_literal_map, cache_ignore_input_vars) ) + if serialized_obj is None: + return None + + # If the serialized object is a model file, first convert it back to a proto object (which will force it to + # use the installed flyteidl proto messages) and then convert it to a model object. This will guarantee + # that the object is in the correct format. + if isinstance(serialized_obj, ModelLiteralMap): + return ModelLiteralMap.from_flyte_idl(ModelLiteralMap.to_flyte_idl(serialized_obj)) + elif isinstance(serialized_obj, bytes): + # If it is a bytes object, then it is a serialized proto object. + # We need to convert it to a model object first.o + pb_literal_map = LiteralMap() + pb_literal_map.ParseFromString(serialized_obj) + return ModelLiteralMap.from_flyte_idl(pb_literal_map) + else: + raise ValueError(f"Unexpected object type {type(serialized_obj)}") + @staticmethod def set( task_name: str, cache_version: str, - input_literal_map: LiteralMap, + input_literal_map: ModelLiteralMap, cache_ignore_input_vars: Tuple[str, ...], - value: LiteralMap, + value: ModelLiteralMap, ) -> None: if not LocalTaskCache._initialized: LocalTaskCache.initialize() LocalTaskCache._cache.set( - _calculate_cache_key(task_name, cache_version, input_literal_map, cache_ignore_input_vars), value + _calculate_cache_key( + task_name, + cache_version, + input_literal_map, + cache_ignore_input_vars, + ), + value.to_flyte_idl().SerializeToString(), ) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index ee97b9ddfb..8b76db9e32 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -117,14 +117,14 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon task_execution_metadata = TaskExecutionMetadata.from_flyte_idl(request.task_execution_metadata) logger.info(f"{agent.name} start creating the job") - resource_mata = await mirror_async_methods( + resource_meta = await mirror_async_methods( agent.create, task_template=template, inputs=inputs, output_prefix=request.output_prefix, task_execution_metadata=task_execution_metadata, ) - return CreateTaskResponse(resource_meta=resource_mata.encode()) + return CreateTaskResponse(resource_meta=resource_meta.encode()) @record_agent_metrics async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse: diff --git a/flytekit/extend/backend/base_agent.py b/flytekit/extend/backend/base_agent.py index 16235a68ec..01bfe0fb7f 100644 --- a/flytekit/extend/backend/base_agent.py +++ b/flytekit/extend/backend/base_agent.py @@ -335,10 +335,10 @@ def execute(self: PythonTask, **kwargs) -> LiteralMap: task_template = get_serializable(OrderedDict(), ss, self).template self._agent = AgentRegistry.get_agent(task_template.type, task_template.task_type_version) - resource_mata = asyncio.run( + resource_meta = asyncio.run( self._create(task_template=task_template, output_prefix=output_prefix, inputs=kwargs) ) - resource = asyncio.run(self._get(resource_meta=resource_mata)) + resource = asyncio.run(self._get(resource_meta=resource_meta)) if resource.phase != TaskExecution.SUCCEEDED: raise FlyteUserException(f"Failed to run the task {self.name} with error: {resource.message}") diff --git a/flytekit/image_spec/default_builder.py b/flytekit/image_spec/default_builder.py index 9e37e9c8c5..96b9acbe9e 100644 --- a/flytekit/image_spec/default_builder.py +++ b/flytekit/image_spec/default_builder.py @@ -35,6 +35,31 @@ """ ) +POETRY_LOCK_TEMPLATE = Template( + """\ +RUN --mount=type=cache,sharing=locked,mode=0777,target=/root/.cache/uv,id=uv \ + --mount=from=uv,source=/uv,target=/usr/bin/uv \ + uv pip install poetry + +ENV POETRY_CACHE_DIR=/tmp/poetry_cache \ + POETRY_VIRTUALENVS_IN_PROJECT=true + +# poetry install does not work running in /, so we move to /root to create the venv +WORKDIR /root + +RUN --mount=type=cache,sharing=locked,mode=0777,target=/tmp/poetry_cache,id=poetry \ + --mount=type=bind,target=poetry.lock,src=poetry.lock \ + --mount=type=bind,target=pyproject.toml,src=pyproject.toml \ + poetry install $PIP_INSTALL_ARGS + +WORKDIR / + +# Update PATH and UV_PYTHON to point to venv +ENV PATH="/root/.venv/bin:$$PATH" \ + UV_PYTHON=/root/.venv/bin/python +""" +) + UV_PYTHON_INSTALL_COMMAND_TEMPLATE = Template( """\ RUN --mount=type=cache,sharing=locked,mode=0777,target=/root/.cache/uv,id=uv \ @@ -44,6 +69,7 @@ """ ) + APT_INSTALL_COMMAND_TEMPLATE = Template("""\ RUN --mount=type=cache,sharing=locked,mode=0777,target=/var/cache/apt,id=apt \ apt-get update && apt-get install -y --no-install-recommends \ @@ -128,29 +154,33 @@ def _is_flytekit(package: str) -> bool: return name == "flytekit" -def prepare_uv_lock_command(image_spec: ImageSpec, pip_install_args: List[str], tmp_dir: Path) -> str: - # uv sync is experimental, so our uv.lock support is also experimental - # the parameters we pass into install args could be different - warnings.warn("uv.lock support is experimental", UserWarning) - +def _copy_lock_files_into_context(image_spec: ImageSpec, lock_file: str, tmp_dir: Path): if image_spec.packages is not None: - msg = "Support for uv.lock files and packages is mutually exclusive" + msg = f"Support for {lock_file} files and packages is mutually exclusive" raise ValueError(msg) - uv_lock_path = tmp_dir / "uv.lock" - shutil.copy2(image_spec.requirements, uv_lock_path) + lock_path = tmp_dir / lock_file + shutil.copy2(image_spec.requirements, lock_path) - # uv.lock requires pyproject.toml to be included + # lock requires pyproject.toml to be included pyproject_toml_path = tmp_dir / "pyproject.toml" dir_name = os.path.dirname(image_spec.requirements) pyproject_toml_src = os.path.join(dir_name, "pyproject.toml") if not os.path.exists(pyproject_toml_src): - msg = "To use uv.lock, a pyproject.toml must be in the same directory as the lock file" + msg = f"To use {lock_file}, a pyproject.toml file must be in the same directory as the lock file" raise ValueError(msg) shutil.copy2(pyproject_toml_src, pyproject_toml_path) + +def prepare_uv_lock_command(image_spec: ImageSpec, pip_install_args: List[str], tmp_dir: Path) -> str: + # uv sync is experimental, so our uv.lock support is also experimental + # the parameters we pass into install args could be different + warnings.warn("uv.lock support is experimental", UserWarning) + + _copy_lock_files_into_context(image_spec, "uv.lock", tmp_dir) + # --locked: Assert that the `uv.lock` will remain unchanged # --no-dev: Omit the development dependency group # --no-install-project: Do not install the current project @@ -160,6 +190,15 @@ def prepare_uv_lock_command(image_spec: ImageSpec, pip_install_args: List[str], return UV_LOCK_INSTALL_TEMPLATE.substitute(PIP_INSTALL_ARGS=pip_install_args) +def prepare_poetry_lock_command(image_spec: ImageSpec, pip_install_args: List[str], tmp_dir: Path) -> str: + _copy_lock_files_into_context(image_spec, "poetry.lock", tmp_dir) + + # --no-root: Do not install the current project + pip_install_args.extend(["--no-root"]) + pip_install_args = " ".join(pip_install_args) + return POETRY_LOCK_TEMPLATE.substitute(PIP_INSTALL_ARGS=pip_install_args) + + def prepare_python_install(image_spec: ImageSpec, tmp_dir: Path) -> str: pip_install_args = [] if image_spec.pip_index: @@ -174,6 +213,8 @@ def prepare_python_install(image_spec: ImageSpec, tmp_dir: Path) -> str: requirement_basename = os.path.basename(image_spec.requirements) if requirement_basename == "uv.lock": return prepare_uv_lock_command(image_spec, pip_install_args, tmp_dir) + elif requirement_basename == "poetry.lock": + return prepare_poetry_lock_command(image_spec, pip_install_args, tmp_dir) # Assume this is a requirements.txt file with open(image_spec.requirements) as f: diff --git a/flytekit/models/literals.py b/flytekit/models/literals.py index d65ebfafae..a4b5a1d359 100644 --- a/flytekit/models/literals.py +++ b/flytekit/models/literals.py @@ -979,7 +979,15 @@ def offloaded_metadata(self) -> Optional[LiteralOffloadedMetadata]: """ This value holds metadata about the offloaded literal. """ - return self._offloaded_metadata + # The following check might seem non-sensical, since `_offloaded_metadata` is set in the constructor. + # This is here to support backwards compatibility caused by the local cache implementation. Let me explain. + # The local cache pickles values and unpickles them. When unpickling, the constructor is not called, so there + # are cases where the `_offloaded_metadata` is not set (for example if you cache a value using flytekit<=1.13.6 + # and you load that value later using flytekit>1.13.6). + # In other words, this is a workaround to support backwards compatibility with the local cache. + if hasattr(self, "_offloaded_metadata"): + return self._offloaded_metadata + return None def to_flyte_idl(self): """ diff --git a/flytekit/types/directory/types.py b/flytekit/types/directory/types.py index a7c0aacc83..3bcbf77c97 100644 --- a/flytekit/types/directory/types.py +++ b/flytekit/types/directory/types.py @@ -367,10 +367,11 @@ def listdir(cls, directory: FlyteDirectory) -> typing.List[typing.Union[FlyteDir file_access = FlyteContextManager.current_context().file_access if not file_access.is_remote(final_path): for p in os.listdir(final_path): - if os.path.isfile(os.path.join(final_path, p)): - paths.append(FlyteFile(p)) + joined_path = os.path.join(final_path, p) + if os.path.isfile(joined_path): + paths.append(FlyteFile(joined_path)) else: - paths.append(FlyteDirectory(p)) + paths.append(FlyteDirectory(joined_path)) return paths def create_downloader(_remote_path: str, _local_path: str, is_multipart: bool): diff --git a/flytekit/types/file/file.py b/flytekit/types/file/file.py index 16ec949264..7f681af3ca 100644 --- a/flytekit/types/file/file.py +++ b/flytekit/types/file/file.py @@ -297,14 +297,53 @@ def __init__( self._downloader = downloader self._downloaded = False self._remote_path = remote_path - self._remote_source: typing.Optional[str] = None + self._remote_source: typing.Optional[typing.Union[str, os.PathLike]] = None + + # Setup local path and downloader for delayed downloading + # We introduce another attribute self._local_path to avoid overriding user-defined self.path + self._local_path = self.path + + ctx = FlyteContextManager.current_context() + if ctx.file_access.is_remote(self.path): + self._remote_source = self.path + self._local_path = ctx.file_access.get_random_local_path(self._remote_source) + self._downloader = lambda: FlyteFilePathTransformer.downloader( + ctx=ctx, + remote_path=self._remote_source, # type: ignore + local_path=self._local_path, + ) def __fspath__(self): - # This is where a delayed downloading of the file will happen + """ + Define the file path protocol for opening FlyteFile with the context manager, + following show two common use cases: + + 1. Directly open a FlyteFile with a local path: + + ff = FlyteFile(path=local_path) + with open(ff, "r") as f: + # Read your local file here + # ... + + There's no need to handle downloading of the file because it's on the local file system. + In this case, a dummy downloading will be done. + + 2. Directly open a FlyteFile with a remote path: + + ff = FlyteFile(path=remote_path) + with open(ff, "r") as f: + # Read your remote file here + # ... + + We now support directly opening a FlyteFile with a file from the remote data storage. + In this case, a delayed downloading of the remote file will be done. + For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/6090. + """ if not self._downloaded: + # Download data from remote to local or run dummy downloading for input local path self._downloader() self._downloaded = True - return self.path + return self._local_path def __eq__(self, other): if isinstance(other, FlyteFile): @@ -693,16 +732,26 @@ async def async_to_python_value( # For the remote case, return an FlyteFile object that can download local_path = ctx.file_access.get_random_local_path(uri) - - def _downloader(): - return ctx.file_access.get_data(uri, local_path, is_multipart=False) - expected_format = FlyteFilePathTransformer.get_format(expected_python_type) - ff = FlyteFile.__class_getitem__(expected_format)(local_path, _downloader) + ff = FlyteFile.__class_getitem__(expected_format)( + path=local_path, downloader=lambda: self.downloader(ctx=ctx, remote_path=uri, local_path=local_path) + ) ff._remote_source = uri return ff + @staticmethod + def downloader( + ctx: FlyteContext, remote_path: typing.Union[str, os.PathLike], local_path: typing.Union[str, os.PathLike] + ) -> None: + """ + Download data from remote_path to local_path. + + We design the downloader as a static method because its behavior is logically + related to this class but don't need to interact with class or instance data. + """ + ctx.file_access.get_data(remote_path, local_path, is_multipart=False) + def guess_python_type(self, literal_type: LiteralType) -> typing.Type[FlyteFile[typing.Any]]: if ( literal_type.blob is not None diff --git a/plugins/flytekit-airflow/setup.py b/plugins/flytekit-airflow/setup.py index 98d1b38bf1..ccfe1f81fc 100644 --- a/plugins/flytekit-airflow/setup.py +++ b/plugins/flytekit-airflow/setup.py @@ -6,6 +6,7 @@ plugin_requires = [ "apache-airflow", + "apache-airflow-providers-google<12.0.0", "flytekit>1.10.7", "flyteidl>1.10.7", ] diff --git a/plugins/flytekit-inference/README.md b/plugins/flytekit-inference/README.md index 1bc5c8475e..646200c111 100644 --- a/plugins/flytekit-inference/README.md +++ b/plugins/flytekit-inference/README.md @@ -126,3 +126,66 @@ def model_serving(questions: list[str], gguf: FlyteFile) -> list[str]: return responses ``` + +## vLLM + +The vLLM plugin allows you to serve an LLM hosted on HuggingFace. + +```python +import flytekit as fl +from openai import OpenAI + +model_name = "google/gemma-2b-it" +hf_token_key = "vllm_hf_token" + +vllm_args = { + "model": model_name, + "dtype": "half", + "max-model-len": 2000, +} + +hf_secrets = HFSecret( + secrets_prefix="_FSEC_", + hf_token_key=hf_token_key +) + +vllm_instance = VLLM( + hf_secret=hf_secrets, + arg_dict=vllm_args +) + +image = fl.ImageSpec( + name="vllm_serve", + registry="...", + packages=["flytekitplugins-inference"], +) + + +@fl.task( + pod_template=vllm_instance.pod_template, + container_image=image, + secret_requests=[ + fl.Secret( + key=hf_token_key, mount_requirement=fl.Secret.MountType.ENV_VAR # must be mounted as an env var + ) + ], +) +def model_serving() -> str: + client = OpenAI( + base_url=f"{vllm_instance.base_url}/v1", api_key="vllm" # api key required but ignored + ) + + completion = client.chat.completions.create( + model=model_name, + messages=[ + { + "role": "user", + "content": "Compose a haiku about the power of AI.", + } + ], + temperature=0.5, + top_p=1, + max_tokens=1024, + ) + return completion.choices[0].message.content +``` diff --git a/plugins/flytekit-inference/flytekitplugins/inference/__init__.py b/plugins/flytekit-inference/flytekitplugins/inference/__init__.py index cfd14b09a8..8b43dd16a8 100644 --- a/plugins/flytekit-inference/flytekitplugins/inference/__init__.py +++ b/plugins/flytekit-inference/flytekitplugins/inference/__init__.py @@ -14,3 +14,4 @@ from .nim.serve import NIM, NIMSecrets from .ollama.serve import Model, Ollama +from .vllm.serve import VLLM, HFSecret diff --git a/plugins/flytekit-inference/flytekitplugins/inference/vllm/__init__.py b/plugins/flytekit-inference/flytekitplugins/inference/vllm/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/plugins/flytekit-inference/flytekitplugins/inference/vllm/serve.py b/plugins/flytekit-inference/flytekitplugins/inference/vllm/serve.py new file mode 100644 index 0000000000..f353aabda4 --- /dev/null +++ b/plugins/flytekit-inference/flytekitplugins/inference/vllm/serve.py @@ -0,0 +1,85 @@ +from dataclasses import dataclass +from typing import Optional + +from ..sidecar_template import ModelInferenceTemplate + + +@dataclass +class HFSecret: + """ + :param secrets_prefix: The secrets prefix that Flyte appends to all mounted secrets. + :param hf_token_group: The group name for the HuggingFace token. + :param hf_token_key: The key name for the HuggingFace token. + """ + + secrets_prefix: str # _UNION_ or _FSEC_ + hf_token_key: str + hf_token_group: Optional[str] = None + + +class VLLM(ModelInferenceTemplate): + def __init__( + self, + hf_secret: HFSecret, + arg_dict: Optional[dict] = None, + image: str = "vllm/vllm-openai", + health_endpoint: str = "/health", + port: int = 8000, + cpu: int = 2, + gpu: int = 1, + mem: str = "10Gi", + ): + """ + Initialize NIM class for managing a Kubernetes pod template. + + :param hf_secret: Instance of HFSecret for managing hugging face secrets. + :param arg_dict: A dictionary of arguments for the VLLM model server (https://docs.vllm.ai/en/stable/models/engine_args.html). + :param image: The Docker image to be used for the model server container. Default is "vllm/vllm-openai". + :param health_endpoint: The health endpoint for the model server container. Default is "/health". + :param port: The port number for the model server container. Default is 8000. + :param cpu: The number of CPU cores requested for the model server container. Default is 2. + :param gpu: The number of GPU cores requested for the model server container. Default is 1. + :param mem: The amount of memory requested for the model server container. Default is "10Gi". + """ + if hf_secret.hf_token_key is None: + raise ValueError("HuggingFace token key must be provided.") + if hf_secret.secrets_prefix is None: + raise ValueError("Secrets prefix must be provided.") + + self._hf_secret = hf_secret + self._arg_dict = arg_dict + + super().__init__( + image=image, + health_endpoint=health_endpoint, + port=port, + cpu=cpu, + gpu=gpu, + mem=mem, + ) + + self.setup_vllm_pod_template() + + def setup_vllm_pod_template(self): + from kubernetes.client.models import V1EnvVar + + model_server_container = self.pod_template.pod_spec.init_containers[0] + + if self._hf_secret.hf_token_group: + hf_key = f"$({self._hf_secret.secrets_prefix}{self._hf_secret.hf_token_group}_{self._hf_secret.hf_token_key})".upper() + else: + hf_key = f"$({self._hf_secret.secrets_prefix}{self._hf_secret.hf_token_key})".upper() + + model_server_container.env = [ + V1EnvVar(name="HUGGING_FACE_HUB_TOKEN", value=hf_key), + ] + model_server_container.args = self.build_vllm_args() + + def build_vllm_args(self) -> list: + args = [] + if self._arg_dict: + for key, value in self._arg_dict.items(): + args.append(f"--{key}") + if value is not None: + args.append(str(value)) + return args diff --git a/plugins/flytekit-inference/setup.py b/plugins/flytekit-inference/setup.py index c0f42a2e41..ef46849726 100644 --- a/plugins/flytekit-inference/setup.py +++ b/plugins/flytekit-inference/setup.py @@ -19,6 +19,7 @@ f"flytekitplugins.{PLUGIN_NAME}", f"flytekitplugins.{PLUGIN_NAME}.nim", f"flytekitplugins.{PLUGIN_NAME}.ollama", + f"flytekitplugins.{PLUGIN_NAME}.vllm", ], install_requires=plugin_requires, license="apache2", diff --git a/plugins/flytekit-inference/tests/test_vllm.py b/plugins/flytekit-inference/tests/test_vllm.py new file mode 100644 index 0000000000..e1a7901de5 --- /dev/null +++ b/plugins/flytekit-inference/tests/test_vllm.py @@ -0,0 +1,60 @@ +from flytekitplugins.inference import VLLM, HFSecret + + +def test_vllm_init_valid_params(): + vllm_args = { + "model": "google/gemma-2b-it", + "dtype": "half", + "max-model-len": 2000, + } + + hf_secrets = HFSecret( + secrets_prefix="_UNION_", + hf_token_key="vllm_hf_token" + ) + + vllm_instance = VLLM( + hf_secret=hf_secrets, + arg_dict=vllm_args, + image='vllm/vllm-openai:my-tag', + cpu='10', + gpu='2', + mem='50Gi', + port=8080, + ) + + assert len(vllm_instance.pod_template.pod_spec.init_containers) == 1 + assert ( + vllm_instance.pod_template.pod_spec.init_containers[0].image + == 'vllm/vllm-openai:my-tag' + ) + assert ( + vllm_instance.pod_template.pod_spec.init_containers[0].resources.requests[ + "memory" + ] + == "50Gi" + ) + assert ( + vllm_instance.pod_template.pod_spec.init_containers[0].ports[0].container_port + == 8080 + ) + assert vllm_instance.pod_template.pod_spec.init_containers[0].args == ['--model', 'google/gemma-2b-it', '--dtype', 'half', '--max-model-len', '2000'] + assert vllm_instance.pod_template.pod_spec.init_containers[0].env[0].name == 'HUGGING_FACE_HUB_TOKEN' + assert vllm_instance.pod_template.pod_spec.init_containers[0].env[0].value == '$(_UNION_VLLM_HF_TOKEN)' + + + +def test_vllm_default_params(): + vllm_instance = VLLM(hf_secret=HFSecret(secrets_prefix="_FSEC_", hf_token_key="test_token")) + + assert vllm_instance.base_url == "http://localhost:8000" + assert vllm_instance._image == 'vllm/vllm-openai' + assert vllm_instance._port == 8000 + assert vllm_instance._cpu == 2 + assert vllm_instance._gpu == 1 + assert vllm_instance._health_endpoint == "/health" + assert vllm_instance._mem == "10Gi" + assert vllm_instance._arg_dict == None + assert vllm_instance._hf_secret.secrets_prefix == '_FSEC_' + assert vllm_instance._hf_secret.hf_token_key == 'test_token' + assert vllm_instance._hf_secret.hf_token_group == None diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 5d953350a0..8736c7b2ef 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -14,7 +14,7 @@ from urllib.parse import urlparse import uuid import pytest -from mock import mock, patch +from unittest import mock from flytekit import LaunchPlan, kwtypes, WorkflowExecutionPhase from flytekit.configuration import Config, ImageConfig, SerializationSettings @@ -29,6 +29,9 @@ from flytekit.clients.friendly import SynchronousFlyteClient as _SynchronousFlyteClient from flytekit.configuration import PlatformConfig +from tests.flytekit.integration.remote.utils import SimpleFileTransfer + + MODULE_PATH = pathlib.Path(__file__).parent / "workflows/basic" CONFIG = os.environ.get("FLYTECTL_CONFIG", str(pathlib.Path.home() / ".flyte" / "config-sandbox.yaml")) # Run `make build-dev` to build and push the image to the local registry. @@ -111,6 +114,14 @@ def test_remote_eager_run(): # child_workflow.parent_wf asynchronously register a parent wf1 with child lp from another wf2. run("eager_example.py", "simple_eager_workflow", "--x", "3") +def test_pydantic_default_input_with_map_task(): + execution_id = run("pydantic_wf.py", "wf") + remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) + execution = remote.fetch_execution(name=execution_id) + execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) + print("Execution Error:", execution.error) + assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}" + def test_generic_idl_flytetypes(): os.environ["FLYTE_USE_OLD_DC_FORMAT"] = "true" @@ -804,3 +815,39 @@ def test_get_control_plane_version(): client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("localhost:30080", True)) version = client.get_control_plane_version() assert version == "unknown" or version.startswith("v") + + +def test_open_ff(): + """Test opening FlyteFile from a remote path.""" + # Upload a file to minio s3 bucket + file_transfer = SimpleFileTransfer() + remote_file_path = file_transfer.upload_file(file_type="json") + + execution_id = run("flytefile.py", "wf", "--remote_file_path", remote_file_path) + remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) + execution = remote.fetch_execution(name=execution_id) + execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) + assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}" + + # Delete the remote file to free the space + url = urlparse(remote_file_path) + bucket, key = url.netloc, url.path.lstrip("/") + file_transfer.delete_file(bucket=bucket, key=key) + + +def test_attr_access_sd(): + """Test accessing StructuredDataset attribute from a dataclass.""" + # Upload a file to minio s3 bucket + file_transfer = SimpleFileTransfer() + remote_file_path = file_transfer.upload_file(file_type="parquet") + + execution_id = run("attr_access_sd.py", "wf", "--uri", remote_file_path) + remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) + execution = remote.fetch_execution(name=execution_id) + execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) + assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}" + + # Delete the remote file to free the space + url = urlparse(remote_file_path) + bucket, key = url.netloc, url.path.lstrip("/") + file_transfer.delete_file(bucket=bucket, key=key) diff --git a/tests/flytekit/integration/remote/utils.py b/tests/flytekit/integration/remote/utils.py new file mode 100644 index 0000000000..c16a0d0f4d --- /dev/null +++ b/tests/flytekit/integration/remote/utils.py @@ -0,0 +1,101 @@ +""" +Common utilities for flyte remote runs in integration tests. +""" +import os +import json +import tempfile +import pathlib + +import botocore.session +from botocore.client import BaseClient +from flytekit.configuration import Config +from flytekit.remote.remote import FlyteRemote + + +# Define constants +CONFIG = os.environ.get("FLYTECTL_CONFIG", str(pathlib.Path.home() / ".flyte" / "config-sandbox.yaml")) +PROJECT = "flytesnacks" +DOMAIN = "development" + + +class SimpleFileTransfer: + """Utilities for file transfer to minio s3 bucket. + + Mainly support single file uploading and automatic teardown. + """ + + def __init__(self) -> None: + self._remote = FlyteRemote( + config=Config.auto(config_file=CONFIG), + default_project=PROJECT, + default_domain=DOMAIN + ) + self._s3_client = self._get_minio_s3_client(self._remote) + + def _get_minio_s3_client(self, remote: FlyteRemote) -> BaseClient: + """Creat a botocore client.""" + minio_s3_config = remote.file_access.data_config.s3 + sess = botocore.session.get_session() + + return sess.create_client( + "s3", + endpoint_url=minio_s3_config.endpoint, + aws_access_key_id=minio_s3_config.access_key_id, + aws_secret_access_key=minio_s3_config.secret_access_key, + ) + + def upload_file(self, file_type: str) -> str: + """Upload a single file to minio s3 bucket. + + Args: + file_type: File type. Support "txt" and "json". + + Returns: + remote_file_path: Remote file path. + """ + with tempfile.TemporaryDirectory() as tmp_dir: + local_file_path = self._dump_tmp_file(file_type, tmp_dir) + + # Upload to minio s3 bucket + _, remote_file_path = self._remote.upload_file( + to_upload=local_file_path, + project=PROJECT, + domain=DOMAIN, + ) + + return remote_file_path + + def _dump_tmp_file(self, file_type: str, tmp_dir: str) -> str: + """Generate and dump a temporary file locally. + + Args: + file_type: File type. + tmp_dir: Temporary directory. + + Returns: + tmp_file_path: Temporary local file path. + """ + if file_type == "txt": + tmp_file_path = pathlib.Path(tmp_dir) / "test.txt" + with open(tmp_file_path, "w") as f: + f.write("Hello World!") + elif file_type == "json": + d = {"name": "john", "height": 190} + tmp_file_path = pathlib.Path(tmp_dir) / "test.json" + with open(tmp_file_path, "w") as f: + json.dump(d, f) + elif file_type == "parquet": + # Because `upload_file` accepts a single file only, we specify 00000 to make it a single file + tmp_file_path = pathlib.Path(__file__).parent / "workflows/basic/data/df.parquet/00000" + + return tmp_file_path + + def delete_file(self, bucket: str, key: str) -> None: + """Delete the remote file from minio s3 bucket to free the space. + + Args: + bucket: s3 bucket name. + key: Key name of the object. + """ + res = self._s3_client.delete_object(Bucket=bucket, Key=key) + assert res["ResponseMetadata"]["HTTPStatusCode"] == 204 diff --git a/tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py b/tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py new file mode 100644 index 0000000000..9d01926081 --- /dev/null +++ b/tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py @@ -0,0 +1,46 @@ +""" +Test accessing StructuredDataset attribute from a dataclass. +""" +from dataclasses import dataclass + +import pandas as pd +from flytekit import task, workflow +from flytekit.types.structured import StructuredDataset + + +@dataclass +class DC: + sd: StructuredDataset + + +@task +def create_dc(uri: str) -> DC: + """Create a dataclass with a StructuredDataset attribute. + + Args: + uri: File URI. + + Returns: + dc: A dataclass with a StructuredDataset attribute. + """ + dc = DC(sd=StructuredDataset(uri=uri, file_format="parquet")) + + return dc + + +@task +def read_sd(sd: StructuredDataset) -> StructuredDataset: + """Read input StructuredDataset.""" + print("sd:", sd.open(pd.DataFrame).all()) + + return sd + + +@workflow +def wf(uri: str) -> None: + dc = create_dc(uri=uri) + read_sd(sd=dc.sd) + + +if __name__ == "__main__": + wf(uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet") diff --git a/tests/flytekit/integration/remote/workflows/basic/flytefile.py b/tests/flytekit/integration/remote/workflows/basic/flytefile.py new file mode 100644 index 0000000000..f25b77d907 --- /dev/null +++ b/tests/flytekit/integration/remote/workflows/basic/flytefile.py @@ -0,0 +1,52 @@ +from flytekit import task, workflow +from flytekit.types.file import FlyteFile + + +@task +def create_ff(file_path: str) -> FlyteFile: + """Create a FlyteFile.""" + return FlyteFile(path=file_path) + + +@task +def read_ff(ff: FlyteFile) -> None: + """Read input FlyteFile. + + This can be used in the case in which a FlyteFile is created + in another task pod and read in this task pod. + """ + with open(ff, "r") as f: + content = f.read() + print(f"FILE CONTENT | {content}") + + +@task +def create_and_read_ff(file_path: str) -> FlyteFile: + """Create a FlyteFile and read it. + + Both FlyteFile creation and reading are done in this task pod. + + Args: + file_path: File path. + + Returns: + ff: FlyteFile object. + """ + ff = FlyteFile(path=file_path) + with open(ff, "r") as f: + content = f.read() + print(f"FILE CONTENT | {content}") + + return ff + + +@workflow +def wf(remote_file_path: str) -> None: + ff_1 = create_ff(file_path=remote_file_path) + read_ff(ff=ff_1) + ff_2 = create_and_read_ff(file_path=remote_file_path) + read_ff(ff=ff_2) + + +if __name__ == "__main__": + wf() diff --git a/tests/flytekit/integration/remote/workflows/basic/pydantic_wf.py b/tests/flytekit/integration/remote/workflows/basic/pydantic_wf.py new file mode 100644 index 0000000000..d5e9c32170 --- /dev/null +++ b/tests/flytekit/integration/remote/workflows/basic/pydantic_wf.py @@ -0,0 +1,20 @@ +from pydantic import BaseModel + +from flytekit import map_task +from typing import List +from flytekit import task, workflow + + +class MyBaseModel(BaseModel): + my_floats: List[float] = [1.0, 2.0, 5.0, 10.0] + +@task +def print_float(my_float: float): + print(f"my_float: {my_float}") + +@workflow +def wf(bm: MyBaseModel = MyBaseModel()): + map_task(print_float)(my_float=bm.my_floats) + +if __name__ == "__main__": + wf() diff --git a/tests/flytekit/unit/core/image_spec/test_default_builder.py b/tests/flytekit/unit/core/image_spec/test_default_builder.py index b7d8f3af00..0abd3d9467 100644 --- a/tests/flytekit/unit/core/image_spec/test_default_builder.py +++ b/tests/flytekit/unit/core/image_spec/test_default_builder.py @@ -251,44 +251,70 @@ def test_create_docker_context_uv_lock(tmp_path): ) in dockerfile_content +@pytest.mark.parametrize("lock_file", ["uv.lock", "poetry.lock"]) @pytest.mark.filterwarnings("ignore::UserWarning") -def test_uv_lock_errors_no_pyproject_toml(monkeypatch, tmp_path): +def test_lock_errors_no_pyproject_toml(monkeypatch, tmp_path, lock_file): run_mock = Mock() monkeypatch.setattr("flytekit.image_spec.default_builder.run", run_mock) - uv_lock_file = tmp_path / "uv.lock" - uv_lock_file.write_text("this is a lock file") + lock_file_path = tmp_path / lock_file + lock_file_path.write_text("this is a lock file") image_spec = ImageSpec( name="FLYTEKIT", python_version="3.12", - requirements=os.fspath(uv_lock_file), + requirements=os.fspath(lock_file_path), ) builder = DefaultImageBuilder() - with pytest.raises(ValueError, match="To use uv.lock"): + with pytest.raises(ValueError, match="a pyproject.toml file must be in the same"): builder.build_image(image_spec) +@pytest.mark.parametrize("lock_file", ["uv.lock", "poetry.lock"]) @pytest.mark.filterwarnings("ignore::UserWarning") -@pytest.mark.parametrize("invalid_param", ["packages"]) -def test_uv_lock_error_no_packages(monkeypatch, tmp_path, invalid_param): +def test_uv_lock_error_no_packages(monkeypatch, tmp_path, lock_file): run_mock = Mock() monkeypatch.setattr("flytekit.image_spec.default_builder.run", run_mock) - uv_lock_file = tmp_path / "uv.lock" - uv_lock_file.write_text("this is a lock file") + lock_file_path = tmp_path / lock_file + lock_file_path.write_text("this is a lock file") image_spec = ImageSpec( name="FLYTEKIT", python_version="3.12", - requirements=os.fspath(uv_lock_file), + requirements=os.fspath(lock_file), packages=["ruff"], ) builder = DefaultImageBuilder() - with pytest.raises(ValueError, match="Support for uv.lock files and packages is mutually exclusive"): + with pytest.raises(ValueError, match=f"Support for {lock_file} files and packages is mutually exclusive"): builder.build_image(image_spec) run_mock.assert_not_called() + + +def test_create_poetry_lock(tmp_path): + docker_context_path = tmp_path / "builder_root" + docker_context_path.mkdir() + + poetry_lock = tmp_path / "poetry.lock" + poetry_lock.write_text("this is a lock file") + + pyproject_file = tmp_path / "pyproject.toml" + pyproject_file.write_text("this is a pyproject.toml file") + + image_spec = ImageSpec( + name="FLYTEKIT", + python_version="3.12", + requirements=os.fspath(poetry_lock), + ) + + create_docker_context(image_spec, docker_context_path) + + dockerfile_path = docker_context_path / "Dockerfile" + assert dockerfile_path.exists() + dockerfile_content = dockerfile_path.read_text() + + assert "poetry install --no-root" in dockerfile_content diff --git a/tests/flytekit/unit/core/test_local_cache.py b/tests/flytekit/unit/core/test_local_cache.py index cf3e90e338..0990541a84 100644 --- a/tests/flytekit/unit/core/test_local_cache.py +++ b/tests/flytekit/unit/core/test_local_cache.py @@ -1,4 +1,6 @@ import datetime +import pathlib +import pickle import re import sys import typing @@ -627,3 +629,23 @@ def test_set_cache_ignore_input_vars_without_set_cache(): @task(cache_ignore_input_vars=["a"]) def add(a: int, b: int) -> int: return a + b + + +@pytest.mark.serial +def test_cache_old_version_of_literal_map(): + cache_key = "t.produce_dc-1-ea65cfadb0079394a8be1f4aa1e96e2b" + + # Load a literal map from a previous version of the cache from a local file + with open(pathlib.Path(__file__).parent / "testdata/pickled_value.bin", "rb") as f: + literal_map = pickle.loads(f.read()) + LocalTaskCache._cache.set(cache_key, literal_map) + + assert _calculate_cache_key("t.produce_dc", "1", LiteralMap(literals={})) == cache_key + + # Hit the cache directly and confirm that the loaded object does not have the `_offloaded_metadata` attribute + literal_map = LocalTaskCache._cache.get(cache_key) + assert hasattr(literal_map.literals['o0'], "_offloaded_metadata") is False + + # Now load the same object from the cache and confirm that the `_offloaded_metadata` attribute is now present + loaded_literal_map = LocalTaskCache.get("t.produce_dc", "1", LiteralMap(literals={}), ()) + assert hasattr(loaded_literal_map.literals['o0'], "_offloaded_metadata") is True diff --git a/tests/flytekit/unit/core/testdata/pickled_value.bin b/tests/flytekit/unit/core/testdata/pickled_value.bin new file mode 100644 index 0000000000..71a45e1909 Binary files /dev/null and b/tests/flytekit/unit/core/testdata/pickled_value.bin differ diff --git a/tests/flytekit/unit/types/directory/test_listdir.py b/tests/flytekit/unit/types/directory/test_listdir.py new file mode 100644 index 0000000000..0987456907 --- /dev/null +++ b/tests/flytekit/unit/types/directory/test_listdir.py @@ -0,0 +1,31 @@ +import tempfile +from pathlib import Path + +from flytekit import FlyteDirectory, FlyteFile, map_task, task, workflow + +def test_listdir(): + @task + def setup() -> FlyteDirectory: + tmpdir = Path(tempfile.mkdtemp()) + (tmpdir / "file.txt").write_text("Hello, World!") + return FlyteDirectory(tmpdir) + + + @task + def read_file(file: FlyteFile) -> str: + with open(file, "r") as f: + return f.read() + + + @task + def list_dir(dir: FlyteDirectory) -> list[FlyteFile]: + return FlyteDirectory.listdir(dir) + + + @workflow + def wf() -> list[str]: + tmpdir = setup() + files = list_dir(dir=tmpdir) + return map_task(read_file)(file=files) + + assert wf() == ["Hello, World!"]