-
Notifications
You must be signed in to change notification settings - Fork 234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
repartition-based fallback for hash aggregate v3 #11712
base: branch-24.12
Are you sure you want to change the base?
repartition-based fallback for hash aggregate v3 #11712
Conversation
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
…tor leak Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Firestarman <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
…gether small buckets Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not finished yet. Could you post an explanation of the changes? I see places in the code that appear to have duplicate functionality. Not to mention we have the old sort based agg code completely duplicating a lot of the newer hash re-partition based code.
I really just want to understand what the work flow is supposed to be?
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortBasedGpuAggregateExec.scala
Outdated
Show resolved
Hide resolved
@@ -335,7 +513,10 @@ class AggHelper( | |||
// We need to merge the aggregated batches into 1 before calling post process, | |||
// if the aggregate code had to split on a retry | |||
if (aggregatedSeq.size > 1) { | |||
val concatted = concatenateBatches(metrics, aggregatedSeq) | |||
val concatted = | |||
withResource(aggregatedSeq) { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused by this? Was this a bug? This change feels wrong to me.
concatenateBatches has the contract that it will either close everything in toConcat, or if there is a single item in the sequence it will just return it without closing anything. By putting it within a withResource
it looks like we are going to double close the data in aggregatedSeq.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SpillableColumnarBatch
has the nasty habit (?) of hiding double closes from us (https://github.com/NVIDIA/spark-rapids/blob/branch-24.12/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala#L137). I'd like to remove this behavior with my spillable changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I was mislead by
spark-rapids/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
Line 955 in 5be4bd5
val concatBatch = withResource(batches) { _ => |
I will review this today |
realIter = Some(ConcatIterator.apply(firstPassIter, | ||
(aggOutputSizeRatio * configuredTargetBatchSize).toLong | ||
)) | ||
firstPassAggToggle.set(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that this does what you think that it does. The line that reads this is used to create an iterator. It is not within an iterator which decides if we should or should not do the agg. I added in some print statements and I have verified that it does indeed agg for every batch, even if the first batch set this to false. Which is a good thing because if you disabled the initial aggregation on something where the output types do not match the input types you would get a crash or data corruption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Known issue, will revert this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember getting a crash or data corruption when I tried to fix the iterator bug here. Do you think it's beneficial if we do convert the output types, but not try any row-wise aggregate if heuristics show that first pass agg does not agg out a lot rows?
|
||
// Handle the case of skipping second and third pass of aggregation | ||
// This only work when spark.rapids.sql.agg.skipAggPassReductionRatio < 1 | ||
if (!firstBatchChecked && firstPassIter.hasNext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are doing an aggregate every time, would it be better to check each batch and skip repartitioning if the batch stayed large?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand this. can you elaborate on this?
Hi @revans2 and @abellina, this PR, as I stated in Slack and marked in itself, is still in DRAFT, so I didn't expect a thorough review on this before I change it from Draft to Ready. (Or we wee already have some special usage of Draft PR in our dev process?) Some major problems such as mixing other features(e.g. so called voluntary release check), not working implementation of skipping first iteration agg, etc. The refinement is not ready last Friday but it is now. Please go ahead and review. The reason why I showed you this PR is for proving "That we do a complete pass over all of the batches and cache them into GPU memory before we make a decision on when and how to release a batch" is no longer true. I assume having access to the state-of-art implementation of agg in our customer may help you better understand the problem we're facing now. |
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
batches: mutable.ArrayBuffer[SpillableColumnarBatch], | ||
metrics: GpuHashAggregateMetrics, | ||
concatAndMergeHelper: AggHelper): SpillableColumnarBatch = { | ||
// TODO: concatenateAndMerge (and calling code) could output a sequence |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: this is TODO is not newly introduced
@@ -2113,6 +2187,7 @@ class DynamicGpuPartialSortAggregateIterator( | |||
} | |||
val newIter = if (doSinglePassAgg) { | |||
metrics.singlePassTasks += 1 | |||
// TO discuss in PR: is singlePassSortedAgg still necessary? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we remove this?
Next steps for this PR will be:
|
This PR replaces #11116, since there has been too many differences with #11116.