-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Update RewriteJoin logic to choose optimal build side #1424
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
package org.apache.comet.rules | ||
|
||
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, JoinSelectionHelper} | ||
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftSemi} | ||
import org.apache.spark.sql.catalyst.plans.logical.Join | ||
import org.apache.spark.sql.execution.{SortExec, SparkPlan} | ||
import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} | ||
|
||
|
@@ -31,14 +31,29 @@ import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoin | |
*/ | ||
object RewriteJoin extends JoinSelectionHelper { | ||
|
||
private def getBuildSide(joinType: JoinType): Option[BuildSide] = { | ||
if (canBuildShuffledHashJoinRight(joinType)) { | ||
Some(BuildRight) | ||
} else if (canBuildShuffledHashJoinLeft(joinType)) { | ||
Some(BuildLeft) | ||
} else { | ||
None | ||
private def getSmjBuildSide(join: SortMergeJoinExec): Option[BuildSide] = { | ||
val leftBuildable = canBuildShuffledHashJoinLeft(join.joinType) | ||
val rightBuildable = canBuildShuffledHashJoinRight(join.joinType) | ||
if (!leftBuildable && !rightBuildable) { | ||
return None | ||
} | ||
if (!leftBuildable) { | ||
return Some(BuildRight) | ||
} | ||
if (!rightBuildable) { | ||
return Some(BuildLeft) | ||
} | ||
val side = join.logicalLink | ||
.flatMap { | ||
case join: Join => Some(getOptimalBuildSide(join)) | ||
case _ => None | ||
} | ||
.getOrElse { | ||
// If smj has no logical link, or its logical link is not a join, | ||
// then we always choose left as build side. | ||
BuildLeft | ||
Comment on lines
+53
to
+55
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We previously preferring right as
Is this a behavior change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is a behavior change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, query results won't change, just performance characteristics. |
||
} | ||
Some(side) | ||
} | ||
|
||
private def removeSort(plan: SparkPlan) = plan match { | ||
|
@@ -48,11 +63,7 @@ object RewriteJoin extends JoinSelectionHelper { | |
|
||
def rewrite(plan: SparkPlan): SparkPlan = plan match { | ||
case smj: SortMergeJoinExec => | ||
getBuildSide(smj.joinType) match { | ||
case Some(BuildRight) if smj.joinType == LeftSemi => | ||
// TODO this was added as a workaround for TPC-DS q14 hanging and needs | ||
// further investigation | ||
plan | ||
getSmjBuildSide(smj) match { | ||
case Some(buildSide) => | ||
ShuffledHashJoinExec( | ||
smj.leftKeys, | ||
|
@@ -67,4 +78,21 @@ object RewriteJoin extends JoinSelectionHelper { | |
} | ||
case _ => plan | ||
} | ||
|
||
def getOptimalBuildSide(join: Join): BuildSide = { | ||
val leftSize = join.left.stats.sizeInBytes | ||
val rightSize = join.right.stats.sizeInBytes | ||
val leftRowCount = join.left.stats.rowCount | ||
val rightRowCount = join.right.stats.rowCount | ||
if (leftSize == rightSize && rightRowCount.isDefined && leftRowCount.isDefined) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why checking the sizes matters? 🤔 shouldn't be it enough to use rowcounts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rowCount is an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can also revisit this logic as part of #1430 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe I missing something? We can perhaps use something like https://docs.pingcap.com/tidb/stable/join-reorder#example-the-greedy-algorithm-of-join-reorder and if rowCounts are not available fallback to sizes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose it depends on the goal for choosing the build-side. Do we want to limit the amount of data that needs to be loaded into the hash map or limit the number of rows? The article you linked is for reordering nested joins, so row count would likely be more critical for that scenario. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh this is for build side, not the entire join reorder, that what I was missing. For hash join build side it is a rule of thumb of having smaller table for build side to fit in memory, but what means smaller here I dont have a strong opinion. Perhaps if we talk about memory then sizeInBytes makes more sense. |
||
if (rightRowCount.get <= leftRowCount.get) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have any sort of cardinality stats (possibly with some catalog implementations)? I'm envisioning a scenario where the right table has fewer rows but much higher cardinality so the resulting hash table is bigger. Maybe a follow-up issue makes sense to add more heuristics (if we can even get them) to this choice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is really awesome to find correct join order, are we sure we can rely on stats for any query? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With AQE enabled, we have rowCount and sizeInBytes available for completed query stages. We could extend the logic here to also look at @mbutrovich also suggested that we take into account the size of the data that will go into the build-side hash table i.e. consider which columns are uses in the join. I think we can experiment more with this. I will file an issue to track this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I filed #1430 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
With cbo enabled, we might have cardinality information. Column stats require a catalog to keep the metadata and stats so with just plain Parquet files, we may not have the column stats information. With iceberg, there is some work that has been done to incorporate column stats into the iceberg table but I'm not sure where the effort to use them in Spark is.
The hash table might have more keys but total size is still a very good metric simply because a larger size hash table might not fit into memory. Also, depending on the implementation, the larger the size of the data in the hash table, the less cache friendly it might be and we may end up with slower performance. |
||
return BuildRight | ||
} | ||
return BuildLeft | ||
} | ||
if (rightSize <= leftSize) { | ||
return BuildRight | ||
} | ||
BuildLeft | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?