Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repartition-based fallback for hash aggregate v3 #11712

Draft
wants to merge 52 commits into
base: branch-24.12
Choose a base branch
from

Conversation

binmahone
Copy link
Collaborator

@binmahone binmahone commented Nov 8, 2024

This PR replaces #11116, since there has been too many differences with #11116.

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
binmahone and others added 16 commits August 21, 2024 10:28
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
…gether small buckets

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
@binmahone binmahone marked this pull request as draft November 8, 2024 07:21
@binmahone binmahone changed the title 240821 repartition agg v3 repartition-based fallback for hash aggregate v3 Nov 8, 2024
@abellina abellina requested review from abellina and revans2 and removed request for revans2 November 8, 2024 14:33
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not finished yet. Could you post an explanation of the changes? I see places in the code that appear to have duplicate functionality. Not to mention we have the old sort based agg code completely duplicating a lot of the newer hash re-partition based code.

I really just want to understand what the work flow is supposed to be?

@@ -335,7 +513,10 @@ class AggHelper(
// We need to merge the aggregated batches into 1 before calling post process,
// if the aggregate code had to split on a retry
if (aggregatedSeq.size > 1) {
val concatted = concatenateBatches(metrics, aggregatedSeq)
val concatted =
withResource(aggregatedSeq) { _ =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused by this? Was this a bug? This change feels wrong to me.

concatenateBatches has the contract that it will either close everything in toConcat, or if there is a single item in the sequence it will just return it without closing anything. By putting it within a withResource it looks like we are going to double close the data in aggregatedSeq.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SpillableColumnarBatch has the nasty habit (?) of hiding double closes from us (https://github.com/NVIDIA/spark-rapids/blob/branch-24.12/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala#L137). I'd like to remove this behavior with my spillable changes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was mislead by

val concatBatch = withResource(batches) { _ =>
on main branch, and thought concatenateBatches will not close input batches. Will revert this part

@abellina
Copy link
Collaborator

abellina commented Nov 8, 2024

I will review this today

realIter = Some(ConcatIterator.apply(firstPassIter,
(aggOutputSizeRatio * configuredTargetBatchSize).toLong
))
firstPassAggToggle.set(false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this does what you think that it does. The line that reads this is used to create an iterator. It is not within an iterator which decides if we should or should not do the agg. I added in some print statements and I have verified that it does indeed agg for every batch, even if the first batch set this to false. Which is a good thing because if you disabled the initial aggregation on something where the output types do not match the input types you would get a crash or data corruption.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Known issue, will revert this part.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember getting a crash or data corruption when I tried to fix the iterator bug here. Do you think it's beneficial if we do convert the output types, but not try any row-wise aggregate if heuristics show that first pass agg does not agg out a lot rows?


// Handle the case of skipping second and third pass of aggregation
// This only work when spark.rapids.sql.agg.skipAggPassReductionRatio < 1
if (!firstBatchChecked && firstPassIter.hasNext
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are doing an aggregate every time, would it be better to check each batch and skip repartitioning if the batch stayed large?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this. can you elaborate on this?

@binmahone
Copy link
Collaborator Author

binmahone commented Nov 10, 2024

Hi @revans2 and @abellina, this PR, as I stated in Slack and marked in itself, is still in DRAFT, so I didn't expect a thorough review on this before I change it from Draft to Ready. (Or we wee already have some special usage of Draft PR in our dev process?) Some major problems such as mixing other features(e.g. so called voluntary release check), not working implementation of skipping first iteration agg, etc. The refinement is not ready last Friday but it is now. Please go ahead and review.

The reason why I showed you this PR is for proving "That we do a complete pass over all of the batches and cache them into GPU memory before we make a decision on when and how to release a batch" is no longer true. I assume having access to the state-of-art implementation of agg in our customer may help you better understand the problem we're facing now.

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
batches: mutable.ArrayBuffer[SpillableColumnarBatch],
metrics: GpuHashAggregateMetrics,
concatAndMergeHelper: AggHelper): SpillableColumnarBatch = {
// TODO: concatenateAndMerge (and calling code) could output a sequence
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: this is TODO is not newly introduced

@@ -2113,6 +2187,7 @@ class DynamicGpuPartialSortAggregateIterator(
}
val newIter = if (doSinglePassAgg) {
metrics.singlePassTasks += 1
// TO discuss in PR: is singlePassSortedAgg still necessary?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove this?

@binmahone
Copy link
Collaborator Author

Next steps for this PR will be:

  1. Addressing review comments. When reviewing please aware that we found repartition-based agg is more prone to "Container OOM (then get killed by K8S)" than sort based. I'm troubleshooting on my side, but any insight from the reviewer would be a great help.
  2. We have selected four representative queries at our customer, and before/after result is being collected. Part of the collected results has already shown improvements.
  3. I'll run regression test on NDS to ensure no/minor regression.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants