From ba8246e2a82efe6810aee94dfe578e0dfa082ed6 Mon Sep 17 00:00:00 2001 From: Chirag Singh <137233133+chirag-s-db@users.noreply.github.com> Date: Fri, 24 Jan 2025 10:16:34 -0800 Subject: [PATCH] [Spark] Minor test fix for partition-like data skipping (#4088) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Currently, we look at a query's optimized plan's expressions to get query's predicates (which we use to test data skipping). This isn't correct - it includes the projected columns in addition to the actual filters, which means that we might have incorrect validation (it might be too selective). Replace this by replacing with the split conjunctive predicates from the Filter of the query plan. ## How was this patch tested? Test-only change. ## Does this PR introduce _any_ user-facing changes? No --- .../sql/delta/stats/PartitionLikeDataSkippingSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/stats/PartitionLikeDataSkippingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/stats/PartitionLikeDataSkippingSuite.scala index 531f4fad93f..4b2bfa33688 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/stats/PartitionLikeDataSkippingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/stats/PartitionLikeDataSkippingSuite.scala @@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.functions.{array, col, concat, lit, struct} import org.apache.spark.sql.test.SharedSparkSession @@ -72,8 +73,9 @@ trait PartitionLikeDataSkippingSuiteBase val res = sql(query).collect() assert(res.sameElements(baseResult)) - val predicates = - sql(query).queryExecution.optimizedPlan.expressions.flatMap(splitConjunctivePredicates) + val predicates = sql(query).queryExecution.optimizedPlan.collect { + case Filter(condition, _) => condition + }.flatMap(splitConjunctivePredicates) val scanResult = DeltaLog.forTable(spark, TableIdentifier(tableName)) .update().filesForScan(predicates) assert(scanResult.files.length == expectedNumFiles)