Skip to content

Commit

Permalink
[test] Add integration test for accessing sd sttr in dc (flyteorg#2969)
Browse files Browse the repository at this point in the history
* test: Add integration test for attr access of sd

Signed-off-by: JiaWei Jiang <[email protected]>

* Correct file path

Signed-off-by: JiaWei Jiang <[email protected]>

* test: Support interaction with minio s3 bucket

1. Upload a local parquet file to minio s3 bucket
2. Access StructuredDataset attr from a dataclass
3. Open StructuredDataset from a remote path

Signed-off-by: JiaWei Jiang <[email protected]>

* Delete an unmerged integration test

Signed-off-by: JiaWei Jiang <[email protected]>

* Try imagespec with commit sha of corresponding fix

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant test

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove default_factory and create sd dc from input uri

Signed-off-by: JiaWei Jiang <[email protected]>

* refactor: Clean test logic

1. Remove redundant prints
2. Use `mock.patch.dict` to setup `os.environ` for the current test fn
    * Avoid contaminating other tests running in the same process

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant minio env var setup and add test comments

Signed-off-by: JiaWei Jiang <[email protected]>

* Support uploading tmp pqt file

Signed-off-by: JiaWei Jiang <[email protected]>

* Udpate deprecated module

Signed-off-by: JiaWei Jiang <[email protected]>

* Remove redundant and unused imports

Signed-off-by: JiaWei Jiang <[email protected]>

---------

Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: Shuying Liang <[email protected]>
  • Loading branch information
JiangJiaWei1103 authored and shuyingliang committed Jan 11, 2025
1 parent d6ab260 commit 25ab6f6
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 1 deletion.
20 changes: 19 additions & 1 deletion tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from urllib.parse import urlparse
import uuid
import pytest
import mock
from unittest import mock

from flytekit import LaunchPlan, kwtypes, WorkflowExecutionPhase
from flytekit.configuration import Config, ImageConfig, SerializationSettings
Expand Down Expand Up @@ -833,3 +833,21 @@ def test_open_ff():
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
file_transfer.delete_file(bucket=bucket, key=key)


def test_attr_access_sd():
"""Test accessing StructuredDataset attribute from a dataclass."""
# Upload a file to minio s3 bucket
file_transfer = SimpleFileTransfer()
remote_file_path = file_transfer.upload_file(file_type="parquet")

execution_id = run("attr_access_sd.py", "wf", "--uri", remote_file_path)
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
execution = remote.fetch_execution(name=execution_id)
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5))
assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}"

# Delete the remote file to free the space
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
file_transfer.delete_file(bucket=bucket, key=key)
3 changes: 3 additions & 0 deletions tests/flytekit/integration/remote/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ def _dump_tmp_file(self, file_type: str, tmp_dir: str) -> str:
tmp_file_path = pathlib.Path(tmp_dir) / "test.json"
with open(tmp_file_path, "w") as f:
json.dump(d, f)
elif file_type == "parquet":
# Because `upload_file` accepts a single file only, we specify 00000 to make it a single file
tmp_file_path = pathlib.Path(__file__).parent / "workflows/basic/data/df.parquet/00000"

return tmp_file_path

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
Test accessing StructuredDataset attribute from a dataclass.
"""
from dataclasses import dataclass

import pandas as pd
from flytekit import task, workflow
from flytekit.types.structured import StructuredDataset


@dataclass
class DC:
sd: StructuredDataset


@task
def create_dc(uri: str) -> DC:
"""Create a dataclass with a StructuredDataset attribute.
Args:
uri: File URI.
Returns:
dc: A dataclass with a StructuredDataset attribute.
"""
dc = DC(sd=StructuredDataset(uri=uri, file_format="parquet"))

return dc


@task
def read_sd(sd: StructuredDataset) -> StructuredDataset:
"""Read input StructuredDataset."""
print("sd:", sd.open(pd.DataFrame).all())

return sd


@workflow
def wf(uri: str) -> None:
dc = create_dc(uri=uri)
read_sd(sd=dc.sd)


if __name__ == "__main__":
wf(uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet")

0 comments on commit 25ab6f6

Please sign in to comment.