Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return StructuredDataset which is a field in a dataclass #3071

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

arbaobao
Copy link
Contributor

@arbaobao arbaobao commented Jan 21, 2025

Tracking issue

Related to #6117

Why are the changes needed?

If we wrap the StructuredDataset in a dataclass, it will fail during the to_flyte_idl conversion.

What changes were proposed in this pull request?

Before returning Literals, we check the type of python_val._literal_sd. If it is a Python native StructuredDataset, we transform it into a Literals.StructuredDataset.

How was this patch tested?

As described in #6117, an error occurs when the extract task is executed.

@dataclass
class Data:
    f: StructuredDataset


@task
def create_data() -> Data:
    return Data(f=StructuredDataset(dataframe=pd.DataFrame({"a": [5]})))


@task
def extract(d: Data) -> StructuredDataset:
    return d.f


@workflow
def example_wf() -> None:
    d = create_data()
    f = extract(d=d)

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Summary by Bito

This PR fixes StructuredDataset handling within dataclasses during to_flyte_idl conversion by properly transforming python_val._literal_sd instances into Literals.StructuredDataset. It also introduces a new Kubernetes StatefulSet Data Service plugin, enhances image specification with custom Python executable support, and implements configurable chunk sizes for S3/GCS operations. The changes include improved resource management and enhanced Ray plugin configuration capabilities.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 5

Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 21, 2025

Code Review Agent Run #63793c

Actionable Suggestions - 2
  • flytekit/types/structured/structured_dataset.py - 2
Review Details
  • Files reviewed - 2 · Commit Range: 51f6f73..a3df842
    • flytekit/core/type_engine.py
    • flytekit/types/structured/structured_dataset.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 21, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
Bug Fix - StructuredDataset Dataclass Field Handling

structured_dataset.py - Added support for handling StructuredDataset fields within dataclasses during type transformation

test_remote.py - Added integration test for accessing StructuredDataset from dataclass

attr_access_dc_sd.py - Added test workflow demonstrating StructuredDataset usage within dataclass

Comment on lines +738 to +742
if isinstance(python_val._literal_sd, StructuredDataset):
sdt = StructuredDatasetType(format=python_val._literal_sd.file_format)
metad = literals.StructuredDatasetMetadata(structured_dataset_type=sdt)
sd_literal = literals.StructuredDataset(uri=python_val._literal_sd.uri, metadata=metad)
return Literal(scalar=Scalar(structured_dataset=sd_literal))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Private member access needs encapsulation

Accessing private member '_literal_sd'. Consider using a public interface or property to access this data.

Code suggestion
Check the AI-generated fix before applying
 -            if literal_type.structured_dataset_type is not None and self._literal_sd is not None:
 -                return self._literal_sd
 -            if literal_type.structured_dataset_type is not None and self._literal_sd is None:
 +            if literal_type.structured_dataset_type is not None and self.literal_sd is not None:
 +                return self.literal_sd
 +            if literal_type.structured_dataset_type is not None and self.literal_sd is None:

Code Review Run #63793c


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +738 to +742
if isinstance(python_val._literal_sd, StructuredDataset):
sdt = StructuredDatasetType(format=python_val._literal_sd.file_format)
metad = literals.StructuredDatasetMetadata(structured_dataset_type=sdt)
sd_literal = literals.StructuredDataset(uri=python_val._literal_sd.uri, metadata=metad)
return Literal(scalar=Scalar(structured_dataset=sd_literal))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting literal creation logic

The code block for handling StructuredDataset passed through dataclass could be simplified by extracting the literal creation logic into a helper method. This would improve code readability and maintainability.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if isinstance(python_val._literal_sd, StructuredDataset):
sdt = StructuredDatasetType(format=python_val._literal_sd.file_format)
metad = literals.StructuredDatasetMetadata(structured_dataset_type=sdt)
sd_literal = literals.StructuredDataset(uri=python_val._literal_sd.uri, metadata=metad)
return Literal(scalar=Scalar(structured_dataset=sd_literal))
if isinstance(python_val._literal_sd, StructuredDataset):
return self._create_structured_dataset_literal(python_val._literal_sd.uri, python_val._literal_sd.file_format)
def _create_structured_dataset_literal(self, uri: str, file_format: str) -> Literal:
sdt = StructuredDatasetType(format=file_format)
metad = literals.StructuredDatasetMetadata(structured_dataset_type=sdt)
sd_literal = literals.StructuredDataset(uri=uri, metadata=metad)
return Literal(scalar=Scalar(structured_dataset=sd_literal))

Code Review Run #63793c


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks correct, can you provide

  1. screenshot
  2. add an example to integration test to test it properlly?
    test_remote.py

Copy link

codecov bot commented Feb 4, 2025

Codecov Report

Attention: Patch coverage is 0% with 5 lines in your changes missing coverage. Please review.

Project coverage is 79.64%. Comparing base (5607b0d) to head (5329247).
Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/types/structured/structured_dataset.py 0.00% 4 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3071      +/-   ##
==========================================
- Coverage   80.13%   79.64%   -0.50%     
==========================================
  Files         272      202      -70     
  Lines       24614    21479    -3135     
  Branches     2768     2769       +1     
==========================================
- Hits        19725    17106    -2619     
+ Misses       4082     3604     -478     
+ Partials      807      769      -38     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@arbaobao
Copy link
Contributor Author

arbaobao commented Feb 4, 2025

When we return a StructuredDataset attribute from a dataclass instance, an error occurs :
AttributeError: 'StructuredDataset' object has no attribute 'to_flyte_idl'
Screenshot 2025-02-04 at 12 08 10 PM

This issue has been addressed in this PR.
Screenshot 2025-02-04 at 12 13 25 PM

Signed-off-by: Nelson Chen <[email protected]>
Signed-off-by: Nelson Chen <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 4, 2025

Code Review Agent Run #d93af6

Actionable Suggestions - 1
  • tests/flytekit/integration/remote/workflows/basic/attr_access_dc_sd.py - 1
    • Consider adding error handling for dataset access · Line 31-31
Additional Suggestions - 10
  • flytekit/clis/sdk_in_container/serve.py - 2
    • Consider adding parameter validation checks · Line 64-64
    • Consider adding port number validation · Line 83-83
  • flytekit/models/security.py - 1
    • Consider adding env_var validation check · Line 45-45
  • flytekit/core/data_persistence.py - 1
  • plugins/flytekit-omegaconf/flytekitplugins/omegaconf/dictconfig_transformer.py - 1
    • Consider consolidating NoneType handling checks · Line 146-147
  • tests/flytekit/clis/sdk_in_container/test_serve.py - 1
    • Consider consistent CLI argument naming convention · Line 19-20
  • tests/flytekit/unit/core/test_dataclass.py - 1
  • tests/flytekit/unit/core/image_spec/test_default_builder.py - 1
    • Consider expanding python_exec validation tests · Line 347-360
  • plugins/flytekit-k8sdataservice/utils/resources.py - 1
    • Consider optimizing zero value checks · Line 13-20
  • flytekit/core/type_engine.py - 1
    • Consider more descriptive environment variable name · Line 63-63
Review Details
  • Files reviewed - 56 · Commit Range: a3df842..5329247
    • .pre-commit-config.yaml
    • Dockerfile.agent
    • docs/source/plugins/k8sstatefuldataservice.rst
    • flytekit/clis/sdk_in_container/serve.py
    • flytekit/core/data_persistence.py
    • flytekit/core/python_function_task.py
    • flytekit/core/resources.py
    • flytekit/core/type_engine.py
    • flytekit/image_spec/default_builder.py
    • flytekit/image_spec/image_spec.py
    • flytekit/interaction/parse_stdin.py
    • flytekit/models/security.py
    • flytekit/models/task.py
    • flytekit/remote/remote.py
    • flytekit/remote/remote_fs.py
    • flytekit/types/structured/structured_dataset.py
    • plugins/flytekit-envd/flytekitplugins/envd/image_builder.py
    • plugins/flytekit-envd/tests/test_image_spec.py
    • plugins/flytekit-k8sdataservice/dev-requirements.txt
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/__init__.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/agent.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/kube_config.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/k8s/manager.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/sensor.py
    • plugins/flytekit-k8sdataservice/flytekitplugins/k8sdataservice/task.py
    • plugins/flytekit-k8sdataservice/setup.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_kube_config.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/k8s/test_manager.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_agent.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_sensor.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_task.py
    • plugins/flytekit-k8sdataservice/tests/k8sdataservice/utils/test_resources.py
    • plugins/flytekit-k8sdataservice/utils/infra.py
    • plugins/flytekit-k8sdataservice/utils/resources.py
    • plugins/flytekit-omegaconf/flytekitplugins/omegaconf/dictconfig_transformer.py
    • plugins/flytekit-omegaconf/tests/test_dictconfig_transformer.py
    • plugins/flytekit-ray/flytekitplugins/ray/task.py
    • plugins/flytekit-ray/setup.py
    • plugins/flytekit-ray/tests/test_ray.py
    • plugins/setup.py
    • pydoclint-errors-baseline.txt
    • pyproject.toml
    • tests/flytekit/clis/sdk_in_container/test_serve.py
    • tests/flytekit/integration/remote/test_remote.py
    • tests/flytekit/integration/remote/workflows/basic/attr_access_dc_sd.py
    • tests/flytekit/integration/remote/workflows/basic/get_secret.py
    • tests/flytekit/integration/remote/workflows/basic/sd_attr.py
    • tests/flytekit/unit/core/image_spec/test_default_builder.py
    • tests/flytekit/unit/core/test_data_persistence.py
    • tests/flytekit/unit/core/test_dataclass.py
    • tests/flytekit/unit/core/test_generice_idl_type_engine.py
    • tests/flytekit/unit/core/test_list.py
    • tests/flytekit/unit/core/test_resources.py
    • tests/flytekit/unit/core/test_type_engine.py
    • tests/flytekit/unit/extras/pydantic_transformer/test_pydantic_basemodel_transformer.py
    • tests/flytekit/unit/types/structured_dataset/test_structured_dataset.py
  • Files skipped - 2
    • .github/workflows/pythonbuild.yml - Reason: Filter setting
    • plugins/flytekit-k8sdataservice/README.md - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@task
def read_sd(dc: DC) -> StructuredDataset:
"""Read input StructuredDataset."""
print("sd:", dc.sd.open(pd.DataFrame).all())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for dataset access

Consider adding error handling around open() and all() calls to handle potential exceptions when accessing the structured dataset.

Code suggestion
Check the AI-generated fix before applying
Suggested change
print("sd:", dc.sd.open(pd.DataFrame).all())
try:
df = dc.sd.open(pd.DataFrame).all()
print("sd:", df)
except Exception as e:
print(f"Error accessing structured dataset: {e}")
raise

Code Review Run #d93af6


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants