Skip to content

Commit

Permalink
fix: Fix operator finishTiming metrics (#12316)
Browse files Browse the repository at this point in the history
Summary:
Issue: We record the execution time of `nextOp->noMoreInput()` to `op`'s
finishTiming metrics. This issue is noticeable in the OrderBy, Window, and
HashBuild operators, where computationally intensive tasks are executed within
the `noMoreInput()` function—for example, sorting occurs in `noMoreInput()`. As
a result, the recorded wall time is incorrectly added to upstream operators.

Pull Request resolved: #12316

Reviewed By: kKPulla

Differential Revision: D69635556

Pulled By: pedroerp

fbshipit-source-id: a31c41976f196c007de2140b790c590f359c8bda
  • Loading branch information
zhli1142015 authored and facebook-github-bot committed Feb 14, 2025
1 parent b5dda37 commit 8ed7b0b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
2 changes: 1 addition & 1 deletion velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ StopReason Driver::runInternal(
});
if (finished) {
withDeltaCpuWallTimer(
op, &OperatorStats::finishTiming, [this, &nextOp]() {
nextOp, &OperatorStats::finishTiming, [this, &nextOp]() {
TestValue::adjust(
"facebook::velox::exec::Driver::runInternal::noMoreInput",
nextOp);
Expand Down
27 changes: 26 additions & 1 deletion velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,7 @@ DEBUG_ONLY_TEST_F(TaskTest, liveStats) {
EXPECT_EQ(numBatches, operatorStats.outputVectors);
// isBlocked() should be called at least twice for each batch
EXPECT_LE(2 * numBatches, operatorStats.isBlockedTiming.count);
EXPECT_EQ(2, operatorStats.finishTiming.count);
EXPECT_EQ(1, operatorStats.finishTiming.count);
// No operators with background CPU time yet.
EXPECT_EQ(0, operatorStats.backgroundTiming.count);

Expand Down Expand Up @@ -2415,4 +2415,29 @@ DEBUG_ONLY_TEST_F(TaskTest, taskCancellation) {
task.reset();
waitForAllTasksToBeDeleted();
}

TEST_F(TaskTest, finishTiming) {
auto data = makeRowVector({
makeFlatVector<int64_t>(1'000, [](auto row) { return row; }),
});
core::PlanNodeId projectId;
core::PlanNodeId orderById;
auto plan = PlanBuilder()
.values({data, data})
.project({"c0"})
.capturePlanNodeId(projectId)
.orderBy({"c0 DESC NULLS LAST"}, false)
.capturePlanNodeId(orderById)
.planFragment();

auto [task, _] = executeSerial(plan);
auto taskStats = exec::toPlanStats(task->taskStats());
auto& projectStats = taskStats.at(projectId);
auto& orderByStats = taskStats.at(orderById);
// Since the sort is executed in the 'noMoreInput' function of the OrderBy
// operator, the finish time of the OrderBy operator should be greater than
// that of the Project operator.
ASSERT_GT(
orderByStats.finishTiming.wallNanos, projectStats.finishTiming.wallNanos);
}
} // namespace facebook::velox::exec::test

0 comments on commit 8ed7b0b

Please sign in to comment.