Skip to content

Commit

Permalink
[SPARK-45652][SQL] SPJ: Handle empty input partitions after dynamic f…
Browse files Browse the repository at this point in the history
…iltering

Handle the case when input partitions become empty after V2 dynamic filtering, when SPJ is enabled.

Current in the situation when all input partitions are filtered out via dynamic filtering, SPJ doesn't work but instead will panic:
```
java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions$lzycompute(BatchScanExec.scala:108)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions(BatchScanExec.scala:65)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD$lzycompute(BatchScanExec.scala:136)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD(BatchScanExec.scala:135)
	at org.apache.spark.sql.boson.BosonBatchScanExec.inputRDD$lzycompute(BosonBatchScanExec.scala:28)
```

This is because the `groupPartitions` method will return `None` in this scenario. We should handle the case.

No

Added a test case for this.

No

Closes apache#43531 from sunchao/SPARK-45652.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
  • Loading branch information
sunchao committed Oct 26, 2023
1 parent ecdb69f commit 7fe82f2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ case class BatchScanExec(
"partition values that are not present in the original partitioning.")
}

groupPartitions(newPartitions).get.map(_._2)
groupPartitions(newPartitions).getOrElse(Seq.empty).map(_._2)

case _ =>
// no validation is needed as the data source did not report any specific partitioning
Expand Down Expand Up @@ -148,7 +148,8 @@ case class BatchScanExec(
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
"is enabled")

val groupedPartitions = groupPartitions(finalPartitions.map(_.head), true).get
val groupedPartitions = groupPartitions(finalPartitions.map(_.head), true)
.getOrElse(Seq.empty)

// This means the input partitions are not grouped by partition values. We'll need to
// check `groupByPartitionValues` and decide whether to group and replicate splits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,4 +1093,46 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
}
}
}

test("SPARK-45652: SPJ should handle empty partition after dynamic filtering") {
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") {
val items_partitions = Array(identity("id"))
createTable(items, items_schema, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")

val purchases_partitions = Array(identity("item_id"))
createTable(purchases, purchases_schema, purchases_partitions)
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
s"(3, 19.5, cast('2020-02-01' as timestamp))")

Seq(true, false).foreach { pushDownValues =>
Seq(true, false).foreach { partiallyClustered => {
withSQLConf(
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
partiallyClustered.toString,
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) {
// The dynamic filtering effectively filtered out all the partitions
val df = sql(s"SELECT p.price from testcat.ns.$items i, testcat.ns.$purchases p " +
"WHERE i.id = p.item_id AND i.price > 50.0")
checkAnswer(df, Seq.empty)
}
}
}
}
}
}
}

0 comments on commit 7fe82f2

Please sign in to comment.