Skip to content

Commit

Permalink
YQ-3542 Calculate cpu over ticks (+ task runner actor cpu) (#13479)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Jan 22, 2025
1 parent aa7a166 commit a30b332
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 11 deletions.
11 changes: 10 additions & 1 deletion ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
if (ev->Get()->Stats) {
CA_LOG_T("update task runner stats");
TaskRunnerStats = std::move(ev->Get()->Stats);
TaskRunnerActorElapsedTicks = TaskRunnerStats.GetActorElapsedTicks();
}
ComputeActorState = NDqProto::TEvComputeActorState();
ComputeActorState.SetState(NDqProto::COMPUTE_STATE_EXECUTING);
Expand Down Expand Up @@ -794,12 +795,19 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
}

if (UseCpuQuota()) {
CpuTimeSpent += ev->Get()->ComputeTime;
CpuTimeSpent += TakeCpuTimeDelta();
AskCpuQuota();
ProcessContinueRun();
}
}

TDuration TakeCpuTimeDelta() {
auto newTicks = ComputeActorElapsedTicks + TaskRunnerActorElapsedTicks;
auto result = newTicks - LastQuotaElapsedTicks;
LastQuotaElapsedTicks = newTicks;
return TDuration::MicroSeconds(NHPTimer::GetSeconds(result) * 1'000'000ull);
}

void SaveState(const NDqProto::TCheckpoint& checkpoint, TComputeActorState& state) const override {
CA_LOG_D("Save state");
Y_ABORT_UNLESS(ProgramState);
Expand Down Expand Up @@ -1213,6 +1221,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
// Cpu quota
TActorId QuoterServiceActorId;
TInstant CpuTimeQuotaAsked;
ui64 LastQuotaElapsedTicks = 0;
std::unique_ptr<NTaskRunnerActor::TEvContinueRun> ContinueRunEvent;
TInstant ContinueRunStartWaitTime;
bool ContinueRunInflight = false;
Expand Down
13 changes: 7 additions & 6 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>

void ReportEventElapsedTime() {
if (RuntimeSettings.CollectBasic()) {
ui64 elapsedMicros = NActors::TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000ull;
CpuTime += TDuration::MicroSeconds(elapsedMicros);
ComputeActorElapsedTicks += NActors::TlsActivationContext->GetCurrentEventTicks();
}
}

Expand Down Expand Up @@ -1704,7 +1703,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
ReportEventElapsedTime();
}

dst->SetCpuTimeUs(CpuTime.MicroSeconds() + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds());
ui64 computeActorElapsedUs = NHPTimer::GetSeconds(ComputeActorElapsedTicks) * 1'000'000ull;
dst->SetCpuTimeUs(computeActorElapsedUs + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds());
dst->SetMaxMemoryUsage(MemoryLimits.MemoryQuotaManager->GetMaxMemorySize());

if (auto memProfileStats = GetMemoryProfileStats(); memProfileStats) {
Expand Down Expand Up @@ -1740,9 +1740,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
auto cpuTimeUs = taskStats->ComputeCpuTime.MicroSeconds() + taskStats->BuildCpuTime.MicroSeconds();
if (TDerived::HasAsyncTaskRunner) {
// Async TR is another actor, summarize CPU usage
cpuTimeUs += CpuTime.MicroSeconds();
cpuTimeUs = NHPTimer::GetSeconds(ComputeActorElapsedTicks + TaskRunnerActorElapsedTicks) * 1'000'000ull;
}
// CpuTimeUs does include SourceCpuTime
// cpuTimeUs does include SourceCpuTime
protoTask->SetCpuTimeUs(cpuTimeUs + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds());
protoTask->SetSourceCpuTimeUs(SourceCpuTime.MicroSeconds());

Expand Down Expand Up @@ -1999,7 +1999,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
bool ResumeEventScheduled = false;
NDqProto::EComputeState State;
TIntrusivePtr<NYql::NDq::TRequestContext> RequestContext;
TDuration CpuTime;
ui64 ComputeActorElapsedTicks = 0;
ui64 TaskRunnerActorElapsedTicks = 0;

struct TProcessOutputsState {
int Inflight = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class TLocalTaskRunnerActor
/*flags=*/0,
ev->Cookie);
}
ActorElapsedTicks += NActors::TlsActivationContext->GetCurrentEventTicks();
}

private:
Expand All @@ -99,7 +100,7 @@ class TLocalTaskRunnerActor
inputTransforms[inputTransformId] = TaskRunner->GetInputTransform(inputTransformId)->second.Get();
}

ev->Get()->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinks), std::move(inputTransforms));
ev->Get()->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinks), std::move(inputTransforms), ActorElapsedTicks);
Send(
ParentId,
ev->Release().Release(),
Expand Down Expand Up @@ -247,7 +248,7 @@ class TLocalTaskRunnerActor
inputTransforms[inputTransformId] = TaskRunner->GetInputTransform(inputTransformId)->second.Get();
}

st->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinks), std::move(inputTransforms));
st->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinks), std::move(inputTransforms), ActorElapsedTicks);
Send(ParentId, st.Release());
}

Expand Down Expand Up @@ -508,6 +509,7 @@ class TLocalTaskRunnerActor
TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner;
THashSet<ui32> InputChannelsWithDisabledCheckpoints;
THolder<TDqMemoryQuota> MemoryQuota;
ui64 ActorElapsedTicks = 0;
};

struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
Expand Down
10 changes: 8 additions & 2 deletions ydb/library/yql/dq/runtime/dq_tasks_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ class TDqTaskRunnerStatsView {
}

TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats, THashMap<ui32, const IDqAsyncOutputBuffer*>&& sinks,
THashMap<ui32, const IDqAsyncInputBuffer*>&& inputTransforms)
THashMap<ui32, const IDqAsyncInputBuffer*>&& inputTransforms, ui64 actorElapsedTicks)
: StatsPtr(stats)
, IsDefined(true)
, Sinks(std::move(sinks))
, InputTransforms(std::move(inputTransforms)) {
, InputTransforms(std::move(inputTransforms))
, ActorElapsedTicks(actorElapsedTicks) {
}

const TTaskRunnerStatsBase* Get() {
Expand All @@ -120,11 +121,16 @@ class TDqTaskRunnerStatsView {
return InputTransforms.at(inputTransformId);
}

ui64 GetActorElapsedTicks() {
return ActorElapsedTicks;
}

private:
const TDqTaskRunnerStats* StatsPtr;
bool IsDefined;
THashMap<ui32, const IDqAsyncOutputBuffer*> Sinks;
THashMap<ui32, const IDqAsyncInputBuffer*> InputTransforms;
ui64 ActorElapsedTicks = 0;
};

struct TDqTaskRunnerContext {
Expand Down

0 comments on commit a30b332

Please sign in to comment.