From 93089eecc92947f748b20c17c2fa0b3fa7d2b18a Mon Sep 17 00:00:00 2001 From: Ivan <5627721+abyss7@users.noreply.github.com> Date: Thu, 29 Aug 2024 18:09:32 +0300 Subject: [PATCH] Apply the last stats received from terminated CAs (#8356) --- .../kqp/executer_actor/kqp_data_executer.cpp | 18 ++---- .../kqp/executer_actor/kqp_executer_impl.h | 57 +++++++++++++------ ydb/core/kqp/executer_actor/kqp_planner.cpp | 6 +- ydb/core/kqp/executer_actor/kqp_planner.h | 2 +- .../kqp/executer_actor/kqp_scan_executer.cpp | 2 +- 5 files changed, 51 insertions(+), 34 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 6f365c44e2df..32353e34cd1f 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -533,7 +533,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) { CancelProposal(0); } - HandleComputeStats(ev); + HandleComputeState(ev); } void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { @@ -1015,7 +1015,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseBecome(&TThis::WaitShutdownState); LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and " << Planner->GetPendingComputeActors().size() << " compute actors"); + // TODO(ilezhankin): the CA awaiting timeout should be configurable. TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison)); } } else { @@ -2681,17 +2682,10 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) { - YQL_ENSURE(Planner); - - TActorId actor = ev->Sender; - ui64 taskId = ev->Get()->Record.GetTaskId(); - - Planner->CompletedCA(taskId, actor); + HandleComputeStats(ev); - if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { - PassAway(); - } + if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + PassAway(); } } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 2915775ea3fc..297e247e147c 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -380,7 +380,7 @@ class TKqpExecuterBase : public TActorBootstrapped { this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId); } - void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { + bool HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { TActorId computeActor = ev->Sender; auto& state = ev->Get()->Record; ui64 taskId = state.GetTaskId(); @@ -409,7 +409,39 @@ class TKqpExecuterBase : public TActorBootstrapped { } YQL_ENSURE(Planner); - bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state); + bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state); + + switch (state.GetState()) { + case NYql::NDqProto::COMPUTE_STATE_FAILURE: + case NYql::NDqProto::COMPUTE_STATE_FINISHED: + // Don't finalize stats twice. + if (Planner->CompletedCA(taskId, computeActor)) { + ExtraData[computeActor].Swap(state.MutableExtraData()); + + if (Stats) { + Stats->AddComputeActorStats( + computeActor.NodeId(), + std::move(*state.MutableStats()), + TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) + ); + } + + LastTaskId = taskId; + LastComputeActorId = computeActor.ToString(); + } + default: + ; // ignore all other states. + } + + return ack; + } + + void HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { + TActorId computeActor = ev->Sender; + auto& state = ev->Get()->Record; + ui64 taskId = state.GetTaskId(); + + bool populateChannels = HandleComputeStats(ev); switch (state.GetState()) { case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: { @@ -427,22 +459,8 @@ class TKqpExecuterBase : public TActorBootstrapped { break; } - case NYql::NDqProto::COMPUTE_STATE_FAILURE: - case NYql::NDqProto::COMPUTE_STATE_FINISHED: { - ExtraData[computeActor].Swap(state.MutableExtraData()); - if (Stats) { - Stats->AddComputeActorStats( - computeActor.NodeId(), - std::move(*state.MutableStats()), - TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) - ); - } - - LastTaskId = taskId; - LastComputeActorId = computeActor.ToString(); - YQL_ENSURE(Planner); - Planner->CompletedCA(taskId, computeActor); - } + default: + ; // ignore all other states. } if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) { @@ -1854,6 +1872,9 @@ class TKqpExecuterBase : public TActorBootstrapped { void PassAway() override { YQL_ENSURE(AlreadyReplied && ResponseEv); + + // Actualize stats with the last stats from terminated CAs, but keep the status. + FillResponseStats(ResponseEv->Record.GetResponse().GetStatus()); this->Send(Target, ResponseEv.release()); for (auto channelPair: ResultChannelProxies) { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 44243f088244..997964a60f57 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -592,11 +592,11 @@ bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql:: return false; } -void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { +bool TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { auto& task = TasksGraph.GetTask(taskId); if (task.Meta.Completed) { YQL_ENSURE(!PendingComputeActors.contains(computeActor)); - return; + return false; } task.Meta.Completed = true; @@ -606,6 +606,8 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { PendingComputeActors.erase(it); LOG_I("Compute actor has finished execution: " << computeActor.ToString()); + + return true; } void TKqpPlanner::TaskNotStarted(ui64 taskId) { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index df3c3fdd39dd..29facb855f07 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -75,7 +75,7 @@ class TKqpPlanner { std::unique_ptr PlanExecution(); std::unique_ptr AssignTasksToNodes(); bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state); - void CompletedCA(ui64 taskId, TActorId computeActor); + bool CompletedCA(ui64 taskId, TActorId computeActor); void TaskNotStarted(ui64 taskId); TProgressStat::TEntry CalculateConsumptionUpdate(); void ShiftConsumption(); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index c04d5e7573f4..f7357744e14a 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -110,7 +110,7 @@ class TKqpScanExecuter : public TKqpExecuterBaseGetTypeRewrite()) { - hFunc(TEvDqCompute::TEvState, HandleComputeStats); + hFunc(TEvDqCompute::TEvState, HandleComputeState); hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);