-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Update RewriteJoin logic to choose optimal build side #1424
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1424 +/- ##
============================================
+ Coverage 56.12% 57.78% +1.65%
- Complexity 976 986 +10
============================================
Files 119 122 +3
Lines 11743 12132 +389
Branches 2251 2282 +31
============================================
+ Hits 6591 7010 +419
+ Misses 4012 3954 -58
- Partials 1140 1168 +28 ☔ View full report in Codecov by Sentry. |
val leftRowCount = join.left.stats.rowCount | ||
val rightRowCount = join.right.stats.rowCount | ||
if (leftSize == rightSize && rightRowCount.isDefined && leftRowCount.isDefined) { | ||
if (rightRowCount.get <= leftRowCount.get) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any sort of cardinality stats (possibly with some catalog implementations)? I'm envisioning a scenario where the right table has fewer rows but much higher cardinality so the resulting hash table is bigger.
Maybe a follow-up issue makes sense to add more heuristics (if we can even get them) to this choice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is really awesome to find correct join order, are we sure we can rely on stats for any query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With AQE enabled, we have rowCount and sizeInBytes available for completed query stages. We could extend the logic here to also look at LogicalRelation.sizeInBytes
if the AQE stats are not available (this could be the case for the first join in a query).
@mbutrovich also suggested that we take into account the size of the data that will go into the build-side hash table i.e. consider which columns are uses in the join.
I think we can experiment more with this. I will file an issue to track this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed #1430
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any sort of cardinality stats (possibly with some catalog implementations)?
With cbo enabled, we might have cardinality information. Column stats require a catalog to keep the metadata and stats so with just plain Parquet files, we may not have the column stats information. With iceberg, there is some work that has been done to incorporate column stats into the iceberg table but I'm not sure where the effort to use them in Spark is.
Ref: https://github.com/apache/spark/blob/f37be893d01884461ac515c8b197fb30d9ba68ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala#L101
I'm envisioning a scenario where the right table has fewer rows but much higher cardinality so the resulting hash table is bigger.
The hash table might have more keys but total size is still a very good metric simply because a larger size hash table might not fit into memory. Also, depending on the implementation, the larger the size of the data in the hash table, the less cache friendly it might be and we may end up with slower performance.
For reference, Spark uses https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java as the fallback hash table implementation (afaik).
Moving to draft while I test with TPC-DS |
I ran TPC-DS and see no difference in performance, so marking this as ready for review |
if (!leftBuildable && !rightBuildable) { | ||
return None | ||
} | ||
if (!leftBuildable) { | ||
return Some(BuildRight) | ||
} | ||
if (!rightBuildable) { | ||
return Some(BuildLeft) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!leftBuildable && !rightBuildable) { | |
return None | |
} | |
if (!leftBuildable) { | |
return Some(BuildRight) | |
} | |
if (!rightBuildable) { | |
return Some(BuildLeft) | |
} | |
(leftBuildable, rightBuildable) match { | |
case (false, false) => return None | |
case (false, true) => return Some(BuildRight) | |
case (true, false) => return Some(BuildLeft) | |
case _ => { | |
// all other stuff | |
} | |
} |
?
val rightSize = join.right.stats.sizeInBytes | ||
val leftRowCount = join.left.stats.rowCount | ||
val rightRowCount = join.right.stats.rowCount | ||
if (leftSize == rightSize && rightRowCount.isDefined && leftRowCount.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why checking the sizes matters? 🤔 shouldn't be it enough to use rowcounts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rowCount is an Option
so may not always be available. Row count is only used here as a tie-breaker if the left and right size are the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also revisit this logic as part of #1430
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I missing something? leftSize == rightSize
condition looks very unlikely so by the logic it would hardly consider rowCounts here and there fallback to sizes comparison.
We can perhaps use something like https://docs.pingcap.com/tidb/stable/join-reorder#example-the-greedy-algorithm-of-join-reorder
and if rowCounts are not available fallback to sizes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose it depends on the goal for choosing the build-side. Do we want to limit the amount of data that needs to be loaded into the hash map or limit the number of rows?
The article you linked is for reordering nested joins, so row count would likely be more critical for that scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh this is for build side, not the entire join reorder, that what I was missing. For hash join build side it is a rule of thumb of having smaller table for build side to fit in memory, but what means smaller here I dont have a strong opinion. Perhaps if we talk about memory then sizeInBytes makes more sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks @andygrove
Do we know why Spark's decision is so bad to start with? Spark has the same logic here: https://github.com/apache/spark/blob/fb17856a22be6968b2ed55ccbd7cf72111920bea/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala#L506 |
Spark is building a SortMergeJoin and we are replacing with ShuffledHashJoin. Our new logic in this PR seems to match the Spark logic you linked to. |
Makes sense |
@andygrove Thanks for opening this PR! I have one questions though. I also tried to apply the same build side selection logic but found that with multi executors, the CometExchange that is executed right after CometHashJoin with BuildLeft gets slower as described here #1382 (comment) and that is why I did not open PR. Is it confirmed that queries are faster with multi executors as well? |
I will test with multiple executors today, just to be sure, but I suspect the issue you were seeing is related to spilling in shuffle. I commented on the issue. I will share the results here later today for multi-executor testing. |
// TODO this was added as a workaround for TPC-DS q14 hanging and needs | ||
// further investigation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker but just wondering if this this because we were choosing wrong side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed both before and after this PR. I think it may be related to excessive spilling and shuffle but it still needs to be investigated.
// If smj has no logical link, or its logical link is not a join, | ||
// then we always choose left as build side. | ||
BuildLeft |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We previously preferring right as
if (canBuildShuffledHashJoinRight(joinType)) {
Some(BuildRight)
} else if (canBuildShuffledHashJoinLeft(joinType)) {
Some(BuildLeft)
} else {
None
}
Is this a behavior change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is a behavior change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, query results won't change, just performance characteristics.
Thanks for the reviews @comphead @parthchandra @kazuyukitanimura @mbutrovich @hayman42 I ran benchmarks with 1 executor w/ 8 cores vs 2 executors w/ 4 cores and saw no difference in performance, so I will go ahead and merge this PR. I do see issues if shuffle has to spill, and I have ideas on how we can greatly improve this. |
Which issue does this PR close?
Related to #1382
Rationale for this change
The main goal of this PR is to try and choose smaller side of join for build-side.
With this PR, many queries are now faster when compared to the 0.6.0 release.
Total time for TPC-H is 285 seconds, down from 330 seconds.
Query 9 Before
Query 9 After
What changes are included in this PR?
How are these changes tested?