Skip to content

Commit

Permalink
[SPARK-45652][SQL][3.4] SPJ: Handle empty input partitions after dyna…
Browse files Browse the repository at this point in the history
…mic filtering

This is a cherry-pick of apache#43531 to branch-3.4, with a few modifications.

### What changes were proposed in this pull request?

Handle the case when input partitions become empty after V2 dynamic filtering, when SPJ is enabled.

### Why are the changes needed?

Current in the situation when all input partitions are filtered out via dynamic filtering, SPJ doesn't work but instead will panic:
```
java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions$lzycompute(BatchScanExec.scala:108)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions(BatchScanExec.scala:65)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD$lzycompute(BatchScanExec.scala:136)
	at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD(BatchScanExec.scala:135)
	at org.apache.spark.sql.boson.BosonBatchScanExec.inputRDD$lzycompute(BosonBatchScanExec.scala:28)
```

This is because the `groupPartitions` method will return `None` in this scenario. We should handle the case.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a test case for this.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#43539 from sunchao/SPARK-45652-branch-3.4.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
sunchao authored and dongjoon-hyun committed Oct 26, 2023
1 parent ecdb69f commit 75aed56
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ case class BatchScanExec(
"partition values that are not present in the original partitioning.")
}

groupPartitions(newPartitions).get.map(_._2)
groupPartitions(newPartitions).getOrElse(Seq.empty).map(_._2)

case _ =>
// no validation is needed as the data source did not report any specific partitioning
Expand Down Expand Up @@ -148,7 +148,8 @@ case class BatchScanExec(
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
"is enabled")

val groupedPartitions = groupPartitions(finalPartitions.map(_.head), true).get
val groupedPartitions = groupPartitions(finalPartitions.map(_.head), true)
.getOrElse(Seq.empty)

// This means the input partitions are not grouped by partition values. We'll need to
// check `groupByPartitionValues` and decide whether to group and replicate splits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,4 +1093,46 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
}
}
}

test("SPARK-45652: SPJ should handle empty partition after dynamic filtering") {
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") {
val items_partitions = Array(identity("id"))
createTable(items, items_schema, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")

val purchases_partitions = Array(identity("item_id"))
createTable(purchases, purchases_schema, purchases_partitions)
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
s"(3, 19.5, cast('2020-02-01' as timestamp))")

Seq(true, false).foreach { pushDownValues =>
Seq(true, false).foreach { partiallyClustered => {
withSQLConf(
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
partiallyClustered.toString,
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) {
// The dynamic filtering effectively filtered out all the partitions
val df = sql(s"SELECT p.price from testcat.ns.$items i, testcat.ns.$purchases p " +
"WHERE i.id = p.item_id AND i.price > 50.0")
checkAnswer(df, Seq.empty)
}
}
}
}
}
}
}

0 comments on commit 75aed56

Please sign in to comment.