Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37267][table] Add support for UNNEST WITH ORDINALITY #26113

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

gustavodemorais
Copy link
Contributor

@gustavodemorais gustavodemorais commented Feb 6, 2025

What is the purpose of the change

The SQL standard specifies a WITH ORDINALITY clause that can be appended to any UNNEST function call, which returns a new row for each element and its position in the data structure being unnested.

Examples:

– Returns a new row for each element in a constant array and its position in the array
SELECT *
FROM (VALUES('order_1'))
CROSS JOIN UNNEST(ARRAY["shirt", "pants", "hat"])
WITH ORDINALITY AS t(product_name, index)

id       product_name  index
=======  ============  =====
order_1  shirt             1
order_1  pants             2
order_1  hat               3

– Returns a new row for each element and its position in the array assuming a Orders table with an array column `product_names`
SELECT order_id, product_name, product_index
FROM Orders
CROSS JOIN UNNEST(product_names)
WITH ORDINALITY AS t(product_name, product_index)

A unnest with ordinality will return each element and the position of the element in the data structure, 1-indexed. The order of the elements for arrays is guaranteed. Since maps and multisets are unordered, the order of the elements is not guaranteed.

Brief change log

  • Created UnnestRowsWithOrdinalityFunction
  • Refactored code so we have one UnnestRowsFunctionBase for both with and without ordinality
  • Refactored to remove duplicated code: eval and type logic in UnnestRowsFunctionBase. We just .collect with or without the index (ordinality) as an extra column depending if with or without ordinality.
  • Call UnnestRowsFunction or UnnestRowsWithOrdinalityFunction in LogicalUnnestRule.java when rewriting tree.
  • For arrays and multisets of rows, we extract the fields and emit one single row with one extra column, following Calcite's implementation.
  • Revamped outdated unnest documentation

Verifying this change

This change added tests and can be verified as follows:

  • Added several stream tests, batch tests and plan tests
  • I'm still testing/checking if we need to add other relevant tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 6, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 5 out of 13 changed files in this pull request and generated no comments.

Files not reviewed (8)
  • flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml: Language not supported
  • flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml: Language not supported
  • flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml: Language not supported
  • flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala: Language not supported
  • flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala: Language not supported
  • flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala: Language not supported
  • docs/content/docs/dev/table/sql/queries/joins.md: Evaluated as low risk
  • flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/UnnestRowsFunction.java: Evaluated as low risk
Comments suppressed due to low confidence (1)

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/UncollectToTableFunctionScanRule.java:106

  • Ensure that the new functionality for 'UNNEST WITH ORDINALITY' is covered by tests.
UnnestRowsFunctionBase.getUnnestedType(logicalType, uc.withOrdinality)
Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @gustavodemorais

I put some minor comments.

@gustavodemorais
Copy link
Contributor Author

gustavodemorais commented Feb 7, 2025

Thanks for the review, @dawidwys! Addressed the comments.

I'm adding tests for arrays with rows since I realized a small difference compared to how calcite implements it. In short, an array with row(a,b) unnests to a row(a, b, ordinality). My first assumption was that it unnested to (row(a,b), ordinality). Making the changes to accommodate this and will push it soon.

@gustavodemorais gustavodemorais force-pushed the FLINK-37267 branch 3 times, most recently from f5e7456 to 8913370 Compare February 7, 2025 18:46
as calcite implements it. Also added several tests
@gustavodemorais gustavodemorais force-pushed the FLINK-37267 branch 2 times, most recently from 710b9d3 to da82a34 Compare February 8, 2025 13:31
@gustavodemorais
Copy link
Contributor Author

Apart from addressing the minor comments, I've made the changes to support arrays and multisets of rows, following Calcite's implementation.

After a couple of iterations on the code and refactoring, also added several tests for multiple cases. The PR should be ready.

docs/content/docs/dev/table/sql/queries/joins.md Outdated Show resolved Hide resolved
docs/content/docs/dev/table/sql/queries/joins.md Outdated Show resolved Hide resolved
docs/content/docs/dev/table/sql/queries/joins.md Outdated Show resolved Hide resolved
docs/content/docs/dev/table/sql/queries/joins.md Outdated Show resolved Hide resolved
SpecializedContext context,
LogicalType outputType,
LogicalType keyValTypes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename this back? it still describes the output type, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are element types we receive as an input. They might or might not be the same as the output type. For with ordinality we have:

keyValTypes: row(key, val)
outputType: row(key, val, ordinality)

Since it's still not yet the output type, and the outputType can actually differ, I renamed the params to elementType for CollectionUnnest and keyValTypes for MapUnnest for both classes. IMO was clearer and closer to reality since. Wdyt?


for (int i = 0; i < fieldCount; i++) {
types[i] = rowType.getTypeAt(i);
names[i] = "f" + i;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use r =LogicalTypeUtils#toRowType?
then
new RowType(Stream.concat(r.getFields().stream(), Stream.of(new RowField("ordinality", new IntType(false))).
we don't want to loose field names and should improve code readability.

Copy link
Contributor Author

@gustavodemorais gustavodemorais Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, nice. That makes sense. I was blind to only using RowType.of and didn't consider using new RowType. Since we are have the fields array from .getFields, we could do both:

return new RowType(
                    false,
                    Stream.concat(
                                    rowType.getFields().stream(),
                                    Stream.of(new RowType.RowField("ordinality", DataTypes.INT().notNull().getLogicalType())))
                            .collect(Collectors.toList()));

and
2.

return new RowType(
                    false,
                    new ArrayList<>() {{
                        addAll(rowType.getFields());
                        add(new RowType.RowField("ordinality", DataTypes.INT().notNull().getLogicalType()));
                    }}
            );

I find 2. a bit easier to read but pushed your suggestion for now (1.) - no strong preference, though. Let me know if you have a preference. According to findings, the performance should be exactly the same for both O(n + 1) accounting JVM optimizations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 7f522cf


-- Returns a new row for each element and its position in the array
-- assuming a Orders table with an array column `product_names`
SELECT order_id, product_name, product_index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make an example for multisets? It's not clear to me how WITH ORDINALITY behaves with them. The tests seem to drop the index?

Copy link
Contributor Author

@gustavodemorais gustavodemorais Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, adding one example for multisets and one for a map. Multisets are stored as maps, so we go through the keys and emit them x amount of times, x being how often they occurred (their multiplicity).

Copy link
Contributor Author

@gustavodemorais gustavodemorais Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if the example helps: e179c09

@gustavodemorais
Copy link
Contributor Author

Hey @twalthr, thanks for the review! I've addressed all the points. An important shout is that I'll definitely be looking into using semantic tests for future changes.

Currently, we already have 3 different files for all unnest testing, and I was hesitant at first to create a new file and spread them even more and changing old tests in this PR. Anyway, I've put some time into refactoring unnest stream tests within the file, so it's easier to read and eventually easier to migrate :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants