Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Update RewriteJoin logic to choose optimal build side #1424

Merged
merged 4 commits into from
Feb 21, 2025

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Feb 19, 2025

Which issue does this PR close?

Related to #1382

Rationale for this change

The main goal of this PR is to try and choose smaller side of join for build-side.

With this PR, many queries are now faster when compared to the 0.6.0 release.

tpch_queries_speedup_rel

Total time for TPC-H is 285 seconds, down from 330 seconds.

Query 9 Before

2025-02-19_18-28

Query 9 After

2025-02-19_18-28_1

What changes are included in this PR?

  • Update RewreiteJoin to match latest version from Apache Gluten

How are these changes tested?

@codecov-commenter
Copy link

codecov-commenter commented Feb 20, 2025

Codecov Report

Attention: Patch coverage is 0% with 26 lines in your changes missing coverage. Please review.

Project coverage is 57.78%. Comparing base (f09f8af) to head (5ab8932).
Report is 42 commits behind head on main.

Files with missing lines Patch % Lines
...ain/scala/org/apache/comet/rules/RewriteJoin.scala 0.00% 26 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1424      +/-   ##
============================================
+ Coverage     56.12%   57.78%   +1.65%     
- Complexity      976      986      +10     
============================================
  Files           119      122       +3     
  Lines         11743    12132     +389     
  Branches       2251     2282      +31     
============================================
+ Hits           6591     7010     +419     
+ Misses         4012     3954      -58     
- Partials       1140     1168      +28     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@andygrove andygrove changed the title [wip] Update RewriteJoin logic perf: Update RewriteJoin logic to favor BuildLeft Feb 20, 2025
@andygrove andygrove marked this pull request as ready for review February 20, 2025 15:06
val leftRowCount = join.left.stats.rowCount
val rightRowCount = join.right.stats.rowCount
if (leftSize == rightSize && rightRowCount.isDefined && leftRowCount.isDefined) {
if (rightRowCount.get <= leftRowCount.get) {
Copy link
Contributor

@mbutrovich mbutrovich Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any sort of cardinality stats (possibly with some catalog implementations)? I'm envisioning a scenario where the right table has fewer rows but much higher cardinality so the resulting hash table is bigger.

Maybe a follow-up issue makes sense to add more heuristics (if we can even get them) to this choice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is really awesome to find correct join order, are we sure we can rely on stats for any query?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With AQE enabled, we have rowCount and sizeInBytes available for completed query stages. We could extend the logic here to also look at LogicalRelation.sizeInBytes if the AQE stats are not available (this could be the case for the first join in a query).

@mbutrovich also suggested that we take into account the size of the data that will go into the build-side hash table i.e. consider which columns are uses in the join.

I think we can experiment more with this. I will file an issue to track this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #1430

Copy link
Contributor

@parthchandra parthchandra Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any sort of cardinality stats (possibly with some catalog implementations)?

With cbo enabled, we might have cardinality information. Column stats require a catalog to keep the metadata and stats so with just plain Parquet files, we may not have the column stats information. With iceberg, there is some work that has been done to incorporate column stats into the iceberg table but I'm not sure where the effort to use them in Spark is.
Ref: https://github.com/apache/spark/blob/f37be893d01884461ac515c8b197fb30d9ba68ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala#L101

I'm envisioning a scenario where the right table has fewer rows but much higher cardinality so the resulting hash table is bigger.

The hash table might have more keys but total size is still a very good metric simply because a larger size hash table might not fit into memory. Also, depending on the implementation, the larger the size of the data in the hash table, the less cache friendly it might be and we may end up with slower performance.
For reference, Spark uses https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java as the fallback hash table implementation (afaik).

@andygrove andygrove marked this pull request as draft February 20, 2025 15:44
@andygrove
Copy link
Member Author

Moving to draft while I test with TPC-DS

@andygrove andygrove changed the title perf: Update RewriteJoin logic to favor BuildLeft perf: Update RewriteJoin logic to choose optimal build side Feb 20, 2025
@andygrove andygrove marked this pull request as ready for review February 20, 2025 18:50
@andygrove
Copy link
Member Author

I ran TPC-DS and see no difference in performance, so marking this as ready for review

Comment on lines +38 to +46
if (!leftBuildable && !rightBuildable) {
return None
}
if (!leftBuildable) {
return Some(BuildRight)
}
if (!rightBuildable) {
return Some(BuildLeft)
}
Copy link
Contributor

@comphead comphead Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!leftBuildable && !rightBuildable) {
return None
}
if (!leftBuildable) {
return Some(BuildRight)
}
if (!rightBuildable) {
return Some(BuildLeft)
}
(leftBuildable, rightBuildable) match {
case (false, false) => return None
case (false, true) => return Some(BuildRight)
case (true, false) => return Some(BuildLeft)
case _ => {
// all other stuff
}
}

?

val rightSize = join.right.stats.sizeInBytes
val leftRowCount = join.left.stats.rowCount
val rightRowCount = join.right.stats.rowCount
if (leftSize == rightSize && rightRowCount.isDefined && leftRowCount.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why checking the sizes matters? 🤔 shouldn't be it enough to use rowcounts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rowCount is an Option so may not always be available. Row count is only used here as a tie-breaker if the left and right size are the same

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also revisit this logic as part of #1430

Copy link
Contributor

@comphead comphead Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I missing something? leftSize == rightSize condition looks very unlikely so by the logic it would hardly consider rowCounts here and there fallback to sizes comparison.

We can perhaps use something like https://docs.pingcap.com/tidb/stable/join-reorder#example-the-greedy-algorithm-of-join-reorder

and if rowCounts are not available fallback to sizes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it depends on the goal for choosing the build-side. Do we want to limit the amount of data that needs to be loaded into the hash map or limit the number of rows?

The article you linked is for reordering nested joins, so row count would likely be more critical for that scenario.

Copy link
Contributor

@comphead comphead Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this is for build side, not the entire join reorder, that what I was missing. For hash join build side it is a rule of thumb of having smaller table for build side to fit in memory, but what means smaller here I dont have a strong opinion. Perhaps if we talk about memory then sizeInBytes makes more sense.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @andygrove

@parthchandra
Copy link
Contributor

@andygrove
Copy link
Member Author

Do we know why Spark's decision is so bad to start with? Spark has the same logic here: https://github.com/apache/spark/blob/fb17856a22be6968b2ed55ccbd7cf72111920bea/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala#L506

Spark is building a SortMergeJoin and we are replacing with ShuffledHashJoin. Our new logic in this PR seems to match the Spark logic you linked to.

@parthchandra
Copy link
Contributor

Spark is building a SortMergeJoin and we are replacing with ShuffledHashJoin.

Makes sense

@hayman42
Copy link

@andygrove Thanks for opening this PR! I have one questions though.

I also tried to apply the same build side selection logic but found that with multi executors, the CometExchange that is executed right after CometHashJoin with BuildLeft gets slower as described here #1382 (comment) and that is why I did not open PR. Is it confirmed that queries are faster with multi executors as well?

@andygrove
Copy link
Member Author

@andygrove Thanks for opening this PR! I have one questions though.

I also tried to apply the same build side selection logic but found that with multi executors, the CometExchange that is executed right after CometHashJoin with BuildLeft gets slower as described here #1382 (comment) and that is why I did not open PR. Is it confirmed that queries are faster with multi executors as well?

I will test with multiple executors today, just to be sure, but I suspect the issue you were seeing is related to spilling in shuffle. I commented on the issue.

I will share the results here later today for multi-executor testing.

Comment on lines 69 to 70
// TODO this was added as a workaround for TPC-DS q14 hanging and needs
// further investigation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker but just wondering if this this because we were choosing wrong side?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed both before and after this PR. I think it may be related to excessive spilling and shuffle but it still needs to be investigated.

Comment on lines +53 to +55
// If smj has no logical link, or its logical link is not a join,
// then we always choose left as build side.
BuildLeft
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We previously preferring right as

    if (canBuildShuffledHashJoinRight(joinType)) {
       Some(BuildRight)
     } else if (canBuildShuffledHashJoinLeft(joinType)) {
       Some(BuildLeft)
     } else {
       None
     }

Is this a behavior change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a behavior change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, query results won't change, just performance characteristics.

@andygrove andygrove marked this pull request as draft February 21, 2025 18:49
@andygrove andygrove marked this pull request as ready for review February 21, 2025 20:21
@andygrove
Copy link
Member Author

Thanks for the reviews @comphead @parthchandra @kazuyukitanimura @mbutrovich @hayman42

I ran benchmarks with 1 executor w/ 8 cores vs 2 executors w/ 4 cores and saw no difference in performance, so I will go ahead and merge this PR.

I do see issues if shuffle has to spill, and I have ideas on how we can greatly improve this.

@andygrove andygrove merged commit ef33052 into apache:main Feb 21, 2025
75 checks passed
@andygrove andygrove deleted the rewrite-join branch February 21, 2025 22:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants