Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test] Add integration test for accessing sd sttr in dc #2969

Merged
97 changes: 97 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from flytekit.types.schema import FlyteSchema
from flytekit.clients.friendly import SynchronousFlyteClient as _SynchronousFlyteClient
from flytekit.configuration import PlatformConfig
from botocore.client import BaseClient

MODULE_PATH = pathlib.Path(__file__).parent / "workflows/basic"
CONFIG = os.environ.get("FLYTECTL_CONFIG", str(pathlib.Path.home() / ".flyte" / "config-sandbox.yaml"))
Expand Down Expand Up @@ -799,3 +800,99 @@ def test_get_control_plane_version():
client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("localhost:30080", True))
version = client.get_control_plane_version()
assert version == "unknown" or version.startswith("v")


class SimpleFileTransfer:
"""Utilities for file transfer to minio s3 bucket.
Mainly support single file uploading and automatic teardown.
"""

def __init__(self) -> None:
self._remote = FlyteRemote(
config=Config.auto(config_file=CONFIG),
default_project=PROJECT,
default_domain=DOMAIN
)
self._s3_client = self._get_minio_s3_client(self._remote)

def _get_minio_s3_client(self, remote: FlyteRemote) -> BaseClient:
"""Creat a botocore client."""
minio_s3_config = remote.file_access.data_config.s3
sess = botocore.session.get_session()

return sess.create_client(
"s3",
endpoint_url=minio_s3_config.endpoint,
aws_access_key_id=minio_s3_config.access_key_id,
aws_secret_access_key=minio_s3_config.secret_access_key,
)

def upload_file(self, file_type: str) -> str:
"""Upload a single file to minio s3 bucket.
Args:
file_type: File type. Support "txt", "json", and "parquet".
Return:
remote_file_path: Remote file path.
"""
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file_path = self._dump_tmp_file(file_type, tmp_dir)

# Upload to minio s3 bucket
_, remote_file_path = self._remote.upload_file(
to_upload=tmp_file_path,
project=PROJECT,
domain=DOMAIN,
)

return remote_file_path

def _dump_tmp_file(self, file_type: str, tmp_dir: str) -> str:
"""Generate and dump a temporary file locally."""
if file_type == "txt":
tmp_file_path = pathlib.Path(tmp_dir) / "test.txt"
with open(tmp_file_path, "w") as f:
f.write("Hello World!")
elif file_type == "json":
d = {"name": "jiawei", "height": 171}
tmp_file_path = pathlib.Path(tmp_dir) / "test.json"
with open(tmp_file_path, "w") as f:
json.dump(d, f)
elif file_type == "parquet":
# Because `upload_file` accepts a single file only, we specify 00000 to make it a single file
tmp_file_path = pathlib.Path(__file__).parent / "workflows/basic/data/df.parquet/00000"

return tmp_file_path

def delete_file(self, bucket: str, key: str) -> None:
"""Delete the remote file from minio s3 bucket to free the space.
Args:
bucket: s3 bucket name.
key: Key name of the object.
"""
res = self._s3_client.delete_object(Bucket=bucket, Key=key)
assert res["ResponseMetadata"]["HTTPStatusCode"] == 204


def test_attr_access_sd():
"""Test accessing StructuredDataset attribute from a dataclass."""
# Set environment variables for interacting with minio
os.environ["AWS_ENDPOINT_URL"] = "http://localhost:30002"
os.environ["AWS_ACCESS_KEY_ID"] = "minio"
os.environ["AWS_SECRET_ACCESS_KEY"] = "miniostorage"
JiangJiaWei1103 marked this conversation as resolved.
Show resolved Hide resolved

# Upload a file to minio s3 bucket
file_transfer = SimpleFileTransfer()
remote_file_path = file_transfer.upload_file(file_type="parquet")
Comment on lines +841 to +842
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for upload

Consider adding error handling around the file upload operation since network operations can fail. The upload_file() call could throw exceptions that should be caught and handled appropriately. A similar issue was also found in tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py (line 34).

Code suggestion
Check the AI-generated fix before applying
Suggested change
file_transfer = SimpleFileTransfer()
remote_file_path = file_transfer.upload_file(file_type="parquet")
file_transfer = SimpleFileTransfer()
try:
remote_file_path = file_transfer.upload_file(file_type="parquet")
except Exception as e:
# Clean up any partial uploads if needed
raise RuntimeError(f"Failed to upload file: {str(e)}")

Code Review Run #4d8bc5


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

print(remote_file_path)
JiangJiaWei1103 marked this conversation as resolved.
Show resolved Hide resolved

execution_id = run("attr_access_sd.py", "wf", "--uri", remote_file_path)
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
execution = remote.fetch_execution(name=execution_id)
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5))
print("Execution Error:", execution.error)
JiangJiaWei1103 marked this conversation as resolved.
Show resolved Hide resolved
assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}"

# Delete the remote file to free the space
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
file_transfer.delete_file(bucket=bucket, key=key)
Comment on lines +850 to +853
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling file deletion errors

Consider adding error handling around the file deletion operation. The delete_file() call could fail and should be handled gracefully, possibly with a warning if cleanup fails.

Code suggestion
Check the AI-generated fix before applying
Suggested change
# Delete the remote file to free the space
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
file_transfer.delete_file(bucket=bucket, key=key)
# Delete the remote file to free the space
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
try:
file_transfer.delete_file(bucket=bucket, key=key)
except Exception as e:
warnings.warn(f"Failed to delete remote file: {str(e)}")

Code Review Run #4d8bc5


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Access StructuredDataset attribute from a dataclass.
"""
from dataclasses import dataclass

import pandas as pd
from flytekit import task, workflow
from flytekit.types.structured import StructuredDataset


URI = "tests/flytekit/integration/remote/workflows/basic/data/df.parquet"


@dataclass
class DC:
sd: StructuredDataset


@task
def build_dc(uri: str) -> DC:
dc = DC(sd=StructuredDataset(uri=uri, file_format="parquet"))

return dc


@task
def t_sd_attr(sd: StructuredDataset) -> StructuredDataset:
print("sd:", sd.open(pd.DataFrame).all())

return sd


@workflow
def wf(uri: str) -> None:
dc = build_dc(uri=uri)
t_sd_attr(sd=dc.sd)


if __name__ == "__main__":
wf(uri=URI)
Loading