Skip to content

Commit

Permalink
[auto-merge] branch-25.02 to branch-25.04 [skip ci] [bot] (#12127)
Browse files Browse the repository at this point in the history
auto-merge triggered by github actions on `branch-25.02` to create a PR
keeping `branch-25.04` up-to-date. If this PR is unable to be merged due
to conflicts, it will remain open until manually fix.
  • Loading branch information
nvauto authored Feb 13, 2025
2 parents b02a1b4 + 3c929c0 commit af3c69a
Showing 1 changed file with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ object GpuShuffledAsymmetricHashJoinExec {
exprs.buildSideNeedsNullFilter, metrics)
JoinInfo(joinType, buildSide, buildIter, buildSize, None, streamIter, exprs)
} else {
val buildBatch = getSingleBuildBatch(baseBuildIter, exprs, metrics)
val buildBatch = getAsSingleBuildBatch(baseBuildIter, exprs, metrics)
val buildIter = new SingleGpuColumnarBatchIterator(buildBatch)
val buildStats = JoinBuildSideStats.fromBatch(buildBatch, exprs.boundBuildKeys)
if (buildStats.streamMagnificationFactor < magnificationThreshold) {
Expand Down Expand Up @@ -1023,15 +1023,26 @@ object GpuShuffledAsymmetricHashJoinExec {
}
}

private def getSingleBuildBatch(
private def getAsSingleBuildBatch(
baseIter: Iterator[ColumnarBatch],
exprs: BoundJoinExprs,
metrics: Map[String, GpuMetric]): ColumnarBatch = {
val iter = addNullFilterIfNecessary(baseIter, exprs.boundBuildKeys,
exprs.buildSideNeedsNullFilter, metrics)
closeOnExcept(iter.next()) { batch =>
assert(!iter.hasNext)
batch
// Multiple small batches may exist when split-retry happens in the previous op.
// So need to concat them into a single one
val spBatches = mutable.Queue.empty[SpillableColumnarBatch]
closeOnExcept(spBatches) { _ =>
while(iter.hasNext) {
spBatches.enqueue(
SpillableColumnarBatch(iter.next(), SpillPriorities.ACTIVE_BATCHING_PRIORITY))
}
}
assert(spBatches.nonEmpty, "At least one batch is expected")
val cbTypes = spBatches.head.dataTypes
withRetryNoSplit(spBatches.toSeq) { _ =>
ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(
spBatches.toArray.safeMap(_.getColumnarBatch()), cbTypes)
}
}
}
Expand Down

0 comments on commit af3c69a

Please sign in to comment.