From 75aed566018046c23200e3bb7188d29ece321b26 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Thu, 26 Oct 2023 13:54:24 -0700 Subject: [PATCH] [SPARK-45652][SQL][3.4] SPJ: Handle empty input partitions after dynamic filtering This is a cherry-pick of https://github.com/apache/spark/pull/43531 to branch-3.4, with a few modifications. ### What changes were proposed in this pull request? Handle the case when input partitions become empty after V2 dynamic filtering, when SPJ is enabled. ### Why are the changes needed? Current in the situation when all input partitions are filtered out via dynamic filtering, SPJ doesn't work but instead will panic: ``` java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions$lzycompute(BatchScanExec.scala:108) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions(BatchScanExec.scala:65) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD$lzycompute(BatchScanExec.scala:136) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD(BatchScanExec.scala:135) at org.apache.spark.sql.boson.BosonBatchScanExec.inputRDD$lzycompute(BosonBatchScanExec.scala:28) ``` This is because the `groupPartitions` method will return `None` in this scenario. We should handle the case. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test case for this. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43539 from sunchao/SPARK-45652-branch-3.4. Authored-by: Chao Sun Signed-off-by: Dongjoon Hyun --- .../datasources/v2/BatchScanExec.scala | 5 ++- .../KeyGroupedPartitioningSuite.scala | 42 +++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index 02821d10d5081..55f4a712410d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -105,7 +105,7 @@ case class BatchScanExec( "partition values that are not present in the original partitioning.") } - groupPartitions(newPartitions).get.map(_._2) + groupPartitions(newPartitions).getOrElse(Seq.empty).map(_._2) case _ => // no validation is needed as the data source did not report any specific partitioning @@ -148,7 +148,8 @@ case class BatchScanExec( s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " + "is enabled") - val groupedPartitions = groupPartitions(finalPartitions.map(_.head), true).get + val groupedPartitions = groupPartitions(finalPartitions.map(_.head), true) + .getOrElse(Seq.empty) // This means the input partitions are not grouped by partition values. We'll need to // check `groupByPartitionValues` and decide whether to group and replicate splits diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 0c3a1930dc9fa..cf76f6ca32cad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1093,4 +1093,46 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } } + + test("SPARK-45652: SPJ should handle empty partition after dynamic filtering") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") { + val items_partitions = Array(identity("id")) + createTable(items, items_schema, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchases_schema, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(1, 44.0, cast('2020-01-15' as timestamp)), " + + s"(1, 45.0, cast('2020-01-15' as timestamp)), " + + s"(2, 11.0, cast('2020-01-01' as timestamp)), " + + s"(3, 19.5, cast('2020-02-01' as timestamp))") + + Seq(true, false).foreach { pushDownValues => + Seq(true, false).foreach { partiallyClustered => { + withSQLConf( + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> + partiallyClustered.toString, + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) { + // The dynamic filtering effectively filtered out all the partitions + val df = sql(s"SELECT p.price from testcat.ns.$items i, testcat.ns.$purchases p " + + "WHERE i.id = p.item_id AND i.price > 50.0") + checkAnswer(df, Seq.empty) + } + } + } + } + } + } }