Skip to content

Commit

Permalink
[FLINK-37267][table] Test with ordinality for map with rows as key/value
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavodemorais committed Feb 8, 2025
1 parent 57137d2 commit da82a34
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ public boolean matches(RelOptRuleCall call) {
LogicalProject logicalProject = (LogicalProject) right;
RelNode relNode = getRel(logicalProject.getInput());
return relNode instanceof Uncollect;
} else return right instanceof Uncollect;
} else {
return right instanceof Uncollect;
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,50 @@ class UnnestITCase extends BatchTestBase {
)
}

@Test
def testUnnestForMapOfRowsWithOrdinality(): Unit = {
val data = List(
row(
1, {
val map = new java.util.HashMap[Row, Row]()
map.put(Row.of("a", "a"), Row.of(10: Integer))
map.put(Row.of("b", "b"), Row.of(11: Integer))
map
}),
row(
2, {
val map = new java.util.HashMap[Row, Row]()
map.put(Row.of("c", "c"), Row.of(20: Integer))
map
}),
row(
3, {
val map = new java.util.HashMap[Row, Row]()
map.put(Row.of("d", "d"), Row.of(30: Integer))
map.put(Row.of("e", "e"), Row.of(31: Integer))
map
})
)

registerCollection(
"T",
data,
new RowTypeInfo(
Types.INT,
Types.MAP(Types.ROW(Types.STRING, Types.STRING), Types.ROW(Types.INT()))),
"a, b")

checkResult(
"SELECT a, k, v, o FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY as f (k, v, o)",
Seq(
row(1, row("a", "a"), row(10), 1),
row(1, row("b", "b"), row(11), 2),
row(2, row("c", "c"), row(20), 1),
row(3, row("d", "d"), row(30), 1),
row(3, row("e", "e"), row(31), 2))
)
}

@Test
def testUnnestWithOrdinalityForChainOfArraysAndMaps(): Unit = {
val data = List(
Expand Down Expand Up @@ -546,5 +590,4 @@ class UnnestITCase extends BatchTestBase {
Seq(row(1, 12, "45.6", 1), row(2, 13, "41.6", 1))
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,62 @@ class UnnestITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mo
assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted)
}

def testUnnestForMapOfRowsWitOrdinality(): Unit = {
val data = List(
Row.of(
Int.box(1), {
val map = new java.util.HashMap[Row, Row]()
map.put(Row.of("a", "a"), Row.of(10: Integer))
map.put(Row.of("b", "b"), Row.of(11: Integer))
map
}),
Row.of(
Int.box(2), {
val map = new java.util.HashMap[Row, Row]()
map.put(Row.of("c", "c"), Row.of(20: Integer))
map
}),
Row.of(
Int.box(3), {
val map = new java.util.HashMap[Row, Row]()
map.put(Row.of("d", "d"), Row.of(30: Integer))
map.put(Row.of("e", "e"), Row.of(31: Integer))
map
})
)

implicit val typeInfo = Types.ROW(
Array("a", "b"),
Array[TypeInformation[_]](
Types.INT,
Types.MAP(Types.ROW(Types.STRING, Types.STRING), Types.ROW(Types.INT())))
)
val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
tEnv.createTemporaryView("T", t)

val sqlQuery = "SELECT a, k, v, o FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY as f (k, v, o)"
val result = tEnv.sqlQuery(sqlQuery)
TestSinkUtil.addValuesSink(
tEnv,
"MySink",
List("a", "k", "v", "o"),
List(
DataTypes.INT,
DataTypes.ROW(DataTypes.STRING(), DataTypes.STRING()),
DataTypes.ROW(DataTypes.INT()),
DataTypes.INT.notNull()),
ChangelogMode.all()
)
result.executeInsert("MySink").await()

val expected =
List("1,a,a,10", "1,b,b,11", "2,c,c,20", "3,d,d,30", "3,e,e,31")
val resultWithoutOrd = assertAndRemoveOrdinality(
TestValuesTableFactory.getResultsAsStrings("MySink").sorted.toList,
2)
assertThat(resultWithoutOrd).isEqualTo(expected.sorted)
}

@TestTemplate
def testUnnestWithOrdinalityForChainOfArraysAndMaps(): Unit = {
val data = List(
Expand Down Expand Up @@ -721,21 +777,21 @@ class UnnestITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mo
}

@TestTemplate
def testUnnestMultiSetFromCollectResultWithOrdinality(): Unit = {
def testUnnestMultiSetOfRowsFromCollectResultWithOrdinality(): Unit = {
val data = List(
(1, 1, (12, "45.6")),
(2, 2, (12, "45.612")),
(3, 2, (13, "41.6")),
(4, 3, (14, "45.2136")),
(5, 3, (18, "42.6")))
val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 'c)
(1, (12, "45.6")),
(2, (12, "45.612")),
(2, (13, "41.6")),
(3, (14, "45.2136")),
(3, (18, "42.6")))
val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
tEnv.createTemporaryView("T", t)

val sqlQuery =
"""
|WITH T1 AS (SELECT b, COLLECT(c) as `set` FROM T GROUP BY b)
|SELECT b, id, point, o FROM T1
|CROSS JOIN UNNEST(T1.`set`) WITH ORDINALITY AS A(id, point, o) WHERE b < 3
|WITH T1 AS (SELECT a, COLLECT(b) as `set` FROM T GROUP BY a)
|SELECT a, id, point, o FROM T1
|CROSS JOIN UNNEST(T1.`set`) WITH ORDINALITY AS A(id, point, o) WHERE a < 3
""".stripMargin
val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
val sink = new TestingRetractSink
Expand Down Expand Up @@ -775,21 +831,6 @@ class UnnestITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mo
assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
}

/* Utility for maps to assert that ordinality is within range and remove it from output.
* Necessary since maps are not ordered */
def assertAndRemoveOrdinality(results: List[String], maxOrdinality: Int): List[String] = {
results.foreach {
result =>
val columns = result.split(",")
val ordinality = columns.last.toInt
assert(
ordinality >= 1 && ordinality <= maxOrdinality,
s"Ordinality $ordinality out of range")
}

results.map(_.split(",").dropRight(1).mkString(","))
}

@TestTemplate
def testUnnestWithOrdinalityAliasColumnNames(): Unit = {
val sqlQuery =
Expand All @@ -806,4 +847,19 @@ class UnnestITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mo
assertThat(fieldNames).isEqualTo(expectedFieldNames)
}

/* Utility for maps to assert that ordinality is within range and remove it from output.
* Necessary since maps are not ordered */
def assertAndRemoveOrdinality(results: List[String], maxOrdinality: Int): List[String] = {
results.foreach {
result =>
val columns = result.split(",")
val ordinality = columns.last.toInt
assert(
ordinality >= 1 && ordinality <= maxOrdinality,
s"Ordinality $ordinality out of range")
}

results.map(_.split(",").dropRight(1).mkString(","))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public CollectionUnnestWithOrdinalityFunction(
this.elementGetter = elementGetter;

if (elementType instanceof RowType) {
/* When unnesting a collection, according to Calcite's implementation,
row(a,b) unnests to a row(a, b, ordinality) and not to (row(a,b), ordinality).
That means, if we are unnesting a row, we need field getters
to be able to extract all field values */
RowType rowType = (RowType) elementType;
this.fieldGetters = createFieldGetters(rowType);
this.outputDataType = createRowTypeOutputDataType(rowType);
Expand Down

0 comments on commit da82a34

Please sign in to comment.