Skip to content

Commit

Permalink
Update code
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer committed Aug 5, 2023
1 parent 4b8a067 commit ed2a3f5
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,17 @@ object CombineJoinedAggregates
case (lf: Filter, rf: Filter) =>
val mergedChildPlan = mergePlan(lf.child, rf.child)
mergedChildPlan.map {
case (newChild, outputMap, _)
case (newChild, outputMap, filters)
if isLikelySelective(lf.condition) && isLikelySelective(rf.condition) =>
val rightCondition = mapAttributes(rf.condition, outputMap)
val newCondition = Or(lf.condition, rightCondition)
val mappedRightCondition = mapAttributes(rf.condition, outputMap)
val (newLeftCondition, newRightCondition) = if (filters.length == 2) {
(And(lf.condition, filters.head), And(mappedRightCondition, filters.last))
} else {
(lf.condition, mappedRightCondition)
}
val newCondition = Or(newLeftCondition, newRightCondition)

(Filter(newCondition, newChild), outputMap, Seq(lf.condition, rightCondition))
(Filter(newCondition, newChild), outputMap, Seq(newLeftCondition, newRightCondition))
}
case (ll: LeafNode, rl: LeafNode) =>
checkIdenticalPlans(rl, ll).map { outputMap =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.scalatest.matchers.must.Matchers.the
import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.sql.catalyst.expressions.EqualTo
import org.apache.spark.sql.catalyst.expressions.aggregate.{Average, Count, Sum}
import org.apache.spark.sql.catalyst.optimizer.PushDownPredicates
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
Expand Down Expand Up @@ -2315,6 +2316,17 @@ class DataFrameAggregateSuite extends QueryTest
df.where($"date" === 20151124).agg(new Column(avgWithFilter).as("avg_temp"))).join(
df.where($"date" === 20151125).agg(count($"temp").as("count_temp")))
checkAnswer(join27, Row(84.0, 18.25, 5))

Seq(PushDownPredicates.ruleName, "").map { ruleName =>
withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> ruleName) {
val subQuery1 = df.where($"date" === 20151123).as("tab1")
val subQuery2 = df.where($"date" === 20151124).as("tab2")
val join28 =
subQuery1.where($"tab1.minute" > 30).agg(sum($"tab1.temp").as("sum_temp")).join(
subQuery2.where($"tab2.minute" < 30).agg(avg($"tab2.temp").as("avg_temp")))
checkAnswer(join28, Row(84.0, 24.600000000000005))
}
}
}
}
}
Expand Down

0 comments on commit ed2a3f5

Please sign in to comment.