Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Update RewriteJoin logic to choose optimal build side #1424

Merged
merged 4 commits into from
Feb 21, 2025
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.comet.rules

import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, JoinSelectionHelper}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftSemi}
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution.{SortExec, SparkPlan}
import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec}

Expand All @@ -31,14 +31,29 @@ import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoin
*/
object RewriteJoin extends JoinSelectionHelper {

private def getBuildSide(joinType: JoinType): Option[BuildSide] = {
if (canBuildShuffledHashJoinRight(joinType)) {
Some(BuildRight)
} else if (canBuildShuffledHashJoinLeft(joinType)) {
Some(BuildLeft)
} else {
None
private def getSmjBuildSide(join: SortMergeJoinExec): Option[BuildSide] = {
val leftBuildable = canBuildShuffledHashJoinLeft(join.joinType)
val rightBuildable = canBuildShuffledHashJoinRight(join.joinType)
if (!leftBuildable && !rightBuildable) {
return None
}
if (!leftBuildable) {
return Some(BuildRight)
}
if (!rightBuildable) {
return Some(BuildLeft)
}
Comment on lines +38 to +46
Copy link
Contributor

@comphead comphead Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!leftBuildable && !rightBuildable) {
return None
}
if (!leftBuildable) {
return Some(BuildRight)
}
if (!rightBuildable) {
return Some(BuildLeft)
}
(leftBuildable, rightBuildable) match {
case (false, false) => return None
case (false, true) => return Some(BuildRight)
case (true, false) => return Some(BuildLeft)
case _ => {
// all other stuff
}
}

?

val side = join.logicalLink
.flatMap {
case join: Join => Some(getOptimalBuildSide(join))
case _ => None
}
.getOrElse {
// If smj has no logical link, or its logical link is not a join,
// then we always choose left as build side.
BuildLeft
Comment on lines +53 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We previously preferring right as

    if (canBuildShuffledHashJoinRight(joinType)) {
       Some(BuildRight)
     } else if (canBuildShuffledHashJoinLeft(joinType)) {
       Some(BuildLeft)
     } else {
       None
     }

Is this a behavior change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a behavior change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, query results won't change, just performance characteristics.

}
Some(side)
}

private def removeSort(plan: SparkPlan) = plan match {
Expand All @@ -48,11 +63,7 @@ object RewriteJoin extends JoinSelectionHelper {

def rewrite(plan: SparkPlan): SparkPlan = plan match {
case smj: SortMergeJoinExec =>
getBuildSide(smj.joinType) match {
case Some(BuildRight) if smj.joinType == LeftSemi =>
// TODO this was added as a workaround for TPC-DS q14 hanging and needs
// further investigation
plan
getSmjBuildSide(smj) match {
case Some(buildSide) =>
ShuffledHashJoinExec(
smj.leftKeys,
Expand All @@ -67,4 +78,21 @@ object RewriteJoin extends JoinSelectionHelper {
}
case _ => plan
}

def getOptimalBuildSide(join: Join): BuildSide = {
val leftSize = join.left.stats.sizeInBytes
val rightSize = join.right.stats.sizeInBytes
val leftRowCount = join.left.stats.rowCount
val rightRowCount = join.right.stats.rowCount
if (leftSize == rightSize && rightRowCount.isDefined && leftRowCount.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why checking the sizes matters? 🤔 shouldn't be it enough to use rowcounts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rowCount is an Option so may not always be available. Row count is only used here as a tie-breaker if the left and right size are the same

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also revisit this logic as part of #1430

Copy link
Contributor

@comphead comphead Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I missing something? leftSize == rightSize condition looks very unlikely so by the logic it would hardly consider rowCounts here and there fallback to sizes comparison.

We can perhaps use something like https://docs.pingcap.com/tidb/stable/join-reorder#example-the-greedy-algorithm-of-join-reorder

and if rowCounts are not available fallback to sizes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it depends on the goal for choosing the build-side. Do we want to limit the amount of data that needs to be loaded into the hash map or limit the number of rows?

The article you linked is for reordering nested joins, so row count would likely be more critical for that scenario.

Copy link
Contributor

@comphead comphead Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this is for build side, not the entire join reorder, that what I was missing. For hash join build side it is a rule of thumb of having smaller table for build side to fit in memory, but what means smaller here I dont have a strong opinion. Perhaps if we talk about memory then sizeInBytes makes more sense.

if (rightRowCount.get <= leftRowCount.get) {
Copy link
Contributor

@mbutrovich mbutrovich Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any sort of cardinality stats (possibly with some catalog implementations)? I'm envisioning a scenario where the right table has fewer rows but much higher cardinality so the resulting hash table is bigger.

Maybe a follow-up issue makes sense to add more heuristics (if we can even get them) to this choice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is really awesome to find correct join order, are we sure we can rely on stats for any query?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With AQE enabled, we have rowCount and sizeInBytes available for completed query stages. We could extend the logic here to also look at LogicalRelation.sizeInBytes if the AQE stats are not available (this could be the case for the first join in a query).

@mbutrovich also suggested that we take into account the size of the data that will go into the build-side hash table i.e. consider which columns are uses in the join.

I think we can experiment more with this. I will file an issue to track this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #1430

Copy link
Contributor

@parthchandra parthchandra Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any sort of cardinality stats (possibly with some catalog implementations)?

With cbo enabled, we might have cardinality information. Column stats require a catalog to keep the metadata and stats so with just plain Parquet files, we may not have the column stats information. With iceberg, there is some work that has been done to incorporate column stats into the iceberg table but I'm not sure where the effort to use them in Spark is.
Ref: https://github.com/apache/spark/blob/f37be893d01884461ac515c8b197fb30d9ba68ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala#L101

I'm envisioning a scenario where the right table has fewer rows but much higher cardinality so the resulting hash table is bigger.

The hash table might have more keys but total size is still a very good metric simply because a larger size hash table might not fit into memory. Also, depending on the implementation, the larger the size of the data in the hash table, the less cache friendly it might be and we may end up with slower performance.
For reference, Spark uses https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java as the fallback hash table implementation (afaik).

return BuildRight
}
return BuildLeft
}
if (rightSize <= leftSize) {
return BuildRight
}
BuildLeft
}
}
Loading