Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test] Add integration test for accessing sd sttr in dc #2969

Merged
20 changes: 19 additions & 1 deletion tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from urllib.parse import urlparse
import uuid
import pytest
import mock
from unittest import mock

from flytekit import LaunchPlan, kwtypes, WorkflowExecutionPhase
from flytekit.configuration import Config, ImageConfig, SerializationSettings
Expand Down Expand Up @@ -833,3 +833,21 @@ def test_open_ff():
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
file_transfer.delete_file(bucket=bucket, key=key)


def test_attr_access_sd():
"""Test accessing StructuredDataset attribute from a dataclass."""
# Upload a file to minio s3 bucket
file_transfer = SimpleFileTransfer()
remote_file_path = file_transfer.upload_file(file_type="parquet")
Comment on lines +841 to +842
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for upload

Consider adding error handling around the file upload operation since network operations can fail. The upload_file() call could throw exceptions that should be caught and handled appropriately. A similar issue was also found in tests/flytekit/integration/remote/workflows/basic/attr_access_sd.py (line 34).

Code suggestion
Check the AI-generated fix before applying
Suggested change
file_transfer = SimpleFileTransfer()
remote_file_path = file_transfer.upload_file(file_type="parquet")
file_transfer = SimpleFileTransfer()
try:
remote_file_path = file_transfer.upload_file(file_type="parquet")
except Exception as e:
# Clean up any partial uploads if needed
raise RuntimeError(f"Failed to upload file: {str(e)}")

Code Review Run #4d8bc5


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


execution_id = run("attr_access_sd.py", "wf", "--uri", remote_file_path)
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
execution = remote.fetch_execution(name=execution_id)
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5))
assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}"

# Delete the remote file to free the space
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
file_transfer.delete_file(bucket=bucket, key=key)
Comment on lines +850 to +853
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling file deletion errors

Consider adding error handling around the file deletion operation. The delete_file() call could fail and should be handled gracefully, possibly with a warning if cleanup fails.

Code suggestion
Check the AI-generated fix before applying
Suggested change
# Delete the remote file to free the space
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
file_transfer.delete_file(bucket=bucket, key=key)
# Delete the remote file to free the space
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
try:
file_transfer.delete_file(bucket=bucket, key=key)
except Exception as e:
warnings.warn(f"Failed to delete remote file: {str(e)}")

Code Review Run #4d8bc5


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

3 changes: 3 additions & 0 deletions tests/flytekit/integration/remote/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ def _dump_tmp_file(self, file_type: str, tmp_dir: str) -> str:
tmp_file_path = pathlib.Path(tmp_dir) / "test.json"
with open(tmp_file_path, "w") as f:
json.dump(d, f)
elif file_type == "parquet":
# Because `upload_file` accepts a single file only, we specify 00000 to make it a single file
tmp_file_path = pathlib.Path(__file__).parent / "workflows/basic/data/df.parquet/00000"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making test data path configurable

The hardcoded path workflows/basic/data/df.parquet/00000 may cause issues if the file structure changes. Consider making this configurable or using a more robust path resolution approach.

Code suggestion
Check the AI-generated fix before applying
Suggested change
tmp_file_path = pathlib.Path(__file__).parent / "workflows/basic/data/df.parquet/00000"
TEST_DATA_DIR = pathlib.Path(__file__).parent / "test_data"
tmp_file_path = TEST_DATA_DIR / "df.parquet/00000"

Code Review Run #4d8bc5


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


return tmp_file_path

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
Test accessing StructuredDataset attribute from a dataclass.
"""
from dataclasses import dataclass

import pandas as pd
from flytekit import task, workflow
from flytekit.types.structured import StructuredDataset


@dataclass
class DC:
sd: StructuredDataset


@task
def create_dc(uri: str) -> DC:
"""Create a dataclass with a StructuredDataset attribute.

Args:
uri: File URI.

Returns:
dc: A dataclass with a StructuredDataset attribute.
"""
dc = DC(sd=StructuredDataset(uri=uri, file_format="parquet"))

return dc


@task
def read_sd(sd: StructuredDataset) -> StructuredDataset:
"""Read input StructuredDataset."""
print("sd:", sd.open(pd.DataFrame).all())

return sd


@workflow
def wf(uri: str) -> None:
dc = create_dc(uri=uri)
read_sd(sd=dc.sd)


if __name__ == "__main__":
wf(uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet")
Loading