Skip to content

Commit

Permalink
[Spark] Minor test fix for partition-like data skipping (#4088)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
Currently, we look at a query's optimized plan's expressions to get
query's predicates (which we use to test data skipping). This isn't
correct - it includes the projected columns in addition to the actual
filters, which means that we might have incorrect validation (it might
be too selective). Replace this by replacing with the split conjunctive
predicates from the Filter of the query plan.

## How was this patch tested?
Test-only change.

## Does this PR introduce _any_ user-facing changes?
No
  • Loading branch information
chirag-s-db authored Jan 24, 2025
1 parent d326793 commit ba8246e
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.functions.{array, col, concat, lit, struct}
import org.apache.spark.sql.test.SharedSparkSession

Expand Down Expand Up @@ -72,8 +73,9 @@ trait PartitionLikeDataSkippingSuiteBase
val res = sql(query).collect()
assert(res.sameElements(baseResult))

val predicates =
sql(query).queryExecution.optimizedPlan.expressions.flatMap(splitConjunctivePredicates)
val predicates = sql(query).queryExecution.optimizedPlan.collect {
case Filter(condition, _) => condition
}.flatMap(splitConjunctivePredicates)
val scanResult = DeltaLog.forTable(spark, TableIdentifier(tableName))
.update().filesForScan(predicates)
assert(scanResult.files.length == expectedNumFiles)
Expand Down

0 comments on commit ba8246e

Please sign in to comment.