Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test] Add integration test for accessing sd sttr in dc #2969

Merged
24 changes: 23 additions & 1 deletion tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from urllib.parse import urlparse
import uuid
import pytest
from mock import mock, patch
import mock
JiangJiaWei1103 marked this conversation as resolved.
Show resolved Hide resolved

from flytekit import LaunchPlan, kwtypes, WorkflowExecutionPhase
from flytekit.configuration import Config, ImageConfig, SerializationSettings
Expand All @@ -28,6 +28,10 @@
from flytekit.types.schema import FlyteSchema
from flytekit.clients.friendly import SynchronousFlyteClient as _SynchronousFlyteClient
from flytekit.configuration import PlatformConfig
from botocore.client import BaseClient

from tests.flytekit.integration.remote.utils import SimpleFileTransfer


MODULE_PATH = pathlib.Path(__file__).parent / "workflows/basic"
CONFIG = os.environ.get("FLYTECTL_CONFIG", str(pathlib.Path.home() / ".flyte" / "config-sandbox.yaml"))
Expand Down Expand Up @@ -799,3 +803,21 @@ def test_get_control_plane_version():
client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("localhost:30080", True))
version = client.get_control_plane_version()
assert version == "unknown" or version.startswith("v")


def test_attr_access_sd():
"""Test accessing StructuredDataset attribute from a dataclass."""
# Upload a file to minio s3 bucket
file_transfer = SimpleFileTransfer()
remote_file_path = file_transfer.upload_file(file_type="parquet")
Comment on lines +841 to +842
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for upload

Consider adding error handling around the file upload operation since network operations can fail. The upload_file() call could throw exceptions that should be caught and handled appropriately. A similar issue was also found in tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py (line 34).

Code suggestion
Check the AI-generated fix before applying
Suggested change
file_transfer = SimpleFileTransfer()
remote_file_path = file_transfer.upload_file(file_type="parquet")
file_transfer = SimpleFileTransfer()
try:
remote_file_path = file_transfer.upload_file(file_type="parquet")
except Exception as e:
# Clean up any partial uploads if needed
raise RuntimeError(f"Failed to upload file: {str(e)}")

Code Review Run #4d8bc5


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


execution_id = run("attr_access_sd.py", "wf", "--uri", remote_file_path)
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
execution = remote.fetch_execution(name=execution_id)
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5))
assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}"

# Delete the remote file to free the space
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
file_transfer.delete_file(bucket=bucket, key=key)
Comment on lines +850 to +853
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling file deletion errors

Consider adding error handling around the file deletion operation. The delete_file() call could fail and should be handled gracefully, possibly with a warning if cleanup fails.

Code suggestion
Check the AI-generated fix before applying
Suggested change
# Delete the remote file to free the space
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
file_transfer.delete_file(bucket=bucket, key=key)
# Delete the remote file to free the space
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
try:
file_transfer.delete_file(bucket=bucket, key=key)
except Exception as e:
warnings.warn(f"Failed to delete remote file: {str(e)}")

Code Review Run #4d8bc5


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

101 changes: 101 additions & 0 deletions tests/flytekit/integration/remote/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""
Common utilities for flyte remote runs in integration tests.
"""
import os
import json
import tempfile
import pathlib

import botocore.session
from botocore.client import BaseClient
from flytekit.configuration import Config
from flytekit.remote.remote import FlyteRemote


# Define constants
CONFIG = os.environ.get("FLYTECTL_CONFIG", str(pathlib.Path.home() / ".flyte" / "config-sandbox.yaml"))
PROJECT = "flytesnacks"
DOMAIN = "development"


class SimpleFileTransfer:
"""Utilities for file transfer to minio s3 bucket.

Mainly support single file uploading and automatic teardown.
"""

def __init__(self) -> None:
self._remote = FlyteRemote(
config=Config.auto(config_file=CONFIG),
default_project=PROJECT,
default_domain=DOMAIN
)
self._s3_client = self._get_minio_s3_client(self._remote)

def _get_minio_s3_client(self, remote: FlyteRemote) -> BaseClient:
"""Creat a botocore client."""
minio_s3_config = remote.file_access.data_config.s3
sess = botocore.session.get_session()

return sess.create_client(
"s3",
endpoint_url=minio_s3_config.endpoint,
aws_access_key_id=minio_s3_config.access_key_id,
aws_secret_access_key=minio_s3_config.secret_access_key,
)

def upload_file(self, file_type: str) -> str:
"""Upload a single file to minio s3 bucket.

Args:
file_type: File type. Support "txt" and "json".

Returns:
remote_file_path: Remote file path.
"""
with tempfile.TemporaryDirectory() as tmp_dir:
local_file_path = self._dump_tmp_file(file_type, tmp_dir)

# Upload to minio s3 bucket
_, remote_file_path = self._remote.upload_file(
to_upload=local_file_path,
project=PROJECT,
domain=DOMAIN,
)

return remote_file_path

def _dump_tmp_file(self, file_type: str, tmp_dir: str) -> str:
"""Generate and dump a temporary file locally.

Args:
file_type: File type.
tmp_dir: Temporary directory.

Returns:
tmp_file_path: Temporary local file path.
"""
if file_type == "txt":
tmp_file_path = pathlib.Path(tmp_dir) / "test.txt"
with open(tmp_file_path, "w") as f:
f.write("Hello World!")
elif file_type == "json":
d = {"name": "john", "height": 190}
tmp_file_path = pathlib.Path(tmp_dir) / "test.json"
with open(tmp_file_path, "w") as f:
json.dump(d, f)
elif file_type == "parquet":
# Because `upload_file` accepts a single file only, we specify 00000 to make it a single file
tmp_file_path = pathlib.Path(__file__).parent / "workflows/basic/data/df.parquet/00000"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making test data path configurable

The hardcoded path workflows/basic/data/df.parquet/00000 may cause issues if the file structure changes. Consider making this configurable or using a more robust path resolution approach.

Code suggestion
Check the AI-generated fix before applying
Suggested change
tmp_file_path = pathlib.Path(__file__).parent / "workflows/basic/data/df.parquet/00000"
TEST_DATA_DIR = pathlib.Path(__file__).parent / "test_data"
tmp_file_path = TEST_DATA_DIR / "df.parquet/00000"

Code Review Run #4d8bc5


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


return tmp_file_path

def delete_file(self, bucket: str, key: str) -> None:
"""Delete the remote file from minio s3 bucket to free the space.

Args:
bucket: s3 bucket name.
key: Key name of the object.
"""
res = self._s3_client.delete_object(Bucket=bucket, Key=key)
assert res["ResponseMetadata"]["HTTPStatusCode"] == 204
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
Test accessing StructuredDataset attribute from a dataclass.
"""
from dataclasses import dataclass

import pandas as pd
from flytekit import task, workflow
from flytekit.types.structured import StructuredDataset


@dataclass
class DC:
sd: StructuredDataset


@task
def create_dc(uri: str) -> DC:
"""Create a dataclass with a StructuredDataset attribute.

Args:
uri: File URI.

Returns:
dc: A dataclass with a StructuredDataset attribute.
"""
dc = DC(sd=StructuredDataset(uri=uri, file_format="parquet"))

return dc


@task
def read_sd(sd: StructuredDataset) -> StructuredDataset:
"""Read input StructuredDataset."""
print("sd:", sd.open(pd.DataFrame).all())

return sd


@workflow
def wf(uri: str) -> None:
dc = create_dc(uri=uri)
read_sd(sd=dc.sd)


if __name__ == "__main__":
wf(uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet")