Skip to content

Commit

Permalink
Update code
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer committed Jul 28, 2023
1 parent 45b8017 commit 8f50a07
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,22 @@ case class WindowInPandasExec(
)

protected override def doExecute(): RDD[InternalRow] = {
val spillSize = longMetric("spillSize")

val evaluatorFactory =
new WindowInPandasEvaluatorFactory(
windowExpression,
partitionSpec,
orderSpec,
child.output,
spillSize,
longMetric("spillSize"),
pythonMetrics)

// Start processing.
if (conf.usePartitionEvaluator) {
child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
} else {
child.execute().mapPartitions { iter =>
child.execute().mapPartitionsWithIndex { (index, rowIterator) =>
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(0, iter)
evaluator.eval(index, rowIterator)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,21 @@ case class WindowExec(
)

protected override def doExecute(): RDD[InternalRow] = {
val spillSize = longMetric("spillSize")

val evaluatorFactory =
new WindowEvaluatorFactory(
windowExpression,
partitionSpec,
orderSpec,
child.output,
spillSize)
longMetric("spillSize"))

// Start processing.
if (conf.usePartitionEvaluator) {
child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
} else {
child.execute().mapPartitions { iter =>
child.execute().mapPartitionsWithIndex { (index, rowIterator) =>
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(0, iter)
evaluator.eval(index, rowIterator)
}
}
}
Expand Down

0 comments on commit 8f50a07

Please sign in to comment.