Skip to content

Commit

Permalink
[SPARK-48718][SQL] Handle and fix the case when deserializer in cogro…
Browse files Browse the repository at this point in the history
…up is resolved during application of DeduplicateRelation rule

### What changes were proposed in this pull request?
A followup for https://github.com/apache/spark/pull/41554/files.
Handle the case when the deserializer in cogroup is resolved when applying DeduplicateRelation rule. Otherwise, it will throw an uncastable error.
See the added test case as an example.

### Why are the changes needed?
Fix a bug introduced in a previous commit.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Add a new test case.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#47091 from anchovYu/fix-cogroup-dedup-rel.

Lead-authored-by: Xinyi Yu <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
anchovYu and cloud-fan committed Jun 26, 2024
1 parent 9d4abaf commit ebacb91
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,18 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
val newRightGroup = rewriteAttrs(c.rightGroup, rightAttrMap)
val newLeftOrder = rewriteAttrs(c.leftOrder, leftAttrMap)
val newRightOrder = rewriteAttrs(c.rightOrder, rightAttrMap)
val newKeyDes = c.keyDeserializer.asInstanceOf[UnresolvedDeserializer]
.copy(inputAttributes = newLeftGroup)
val newLeftDes = c.leftDeserializer.asInstanceOf[UnresolvedDeserializer]
.copy(inputAttributes = newLeftAttr)
val newRightDes = c.rightDeserializer.asInstanceOf[UnresolvedDeserializer]
.copy(inputAttributes = newRightAttr)
val newKeyDes = c.keyDeserializer match {
case u: UnresolvedDeserializer => u.copy(inputAttributes = newLeftGroup)
case e: Expression => e.withNewChildren(rewriteAttrs(e.children, leftAttrMap))
}
val newLeftDes = c.leftDeserializer match {
case u: UnresolvedDeserializer => u.copy(inputAttributes = newLeftAttr)
case e: Expression => e.withNewChildren(rewriteAttrs(e.children, leftAttrMap))
}
val newRightDes = c.rightDeserializer match {
case u: UnresolvedDeserializer => u.copy(inputAttributes = newRightAttr)
case e: Expression => e.withNewChildren(rewriteAttrs(e.children, rightAttrMap))
}
c.copy(keyDeserializer = newKeyDes, leftDeserializer = newLeftDes,
rightDeserializer = newRightDes, leftGroup = newLeftGroup,
rightGroup = newRightGroup, leftAttr = newLeftAttr, rightAttr = newRightAttr,
Expand Down
20 changes: 20 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.sql.{Date, Timestamp}

import scala.collection.immutable.HashSet
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
import scala.util.Random

Expand Down Expand Up @@ -952,6 +953,25 @@ class DatasetSuite extends QueryTest
assert(result2.length == 3)
}

test("SPARK-48718: cogroup deserializer expr is resolved before dedup relation") {
val lhs = spark.createDataFrame(
List(Row(123L)).asJava,
StructType(Seq(StructField("GROUPING_KEY", LongType)))
)
val rhs = spark.createDataFrame(
List(Row(0L, 123L)).asJava,
StructType(Seq(StructField("ID", LongType), StructField("GROUPING_KEY", LongType)))
)

val lhsKV = lhs.groupByKey((r: Row) => r.getAs[Long]("GROUPING_KEY"))
val rhsKV = rhs.groupByKey((r: Row) => r.getAs[Long]("GROUPING_KEY"))
val cogrouped = lhsKV.cogroup(rhsKV)(
(a: Long, b: Iterator[Row], c: Iterator[Row]) => Iterator(0L)
)
val joined = rhs.join(cogrouped, col("ID") === col("value"), "left")
checkAnswer(joined, Row(0L, 123L, 0L) :: Nil)
}

test("SPARK-34806: observation on datasets") {
val namedObservation = Observation("named")
val unnamedObservation = Observation()
Expand Down

0 comments on commit ebacb91

Please sign in to comment.