From 8343cfde490d814d17ea67511bc6be43bcf9fcd3 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Tue, 16 Jul 2024 07:57:41 +0300 Subject: [PATCH] Limit requested memory (#6698) --- ydb/core/protos/config.proto | 1 + ydb/core/tx/columnshard/counters/scan.h | 54 ++++++++++++++++--- .../engines/reader/common/result.h | 24 ++++----- .../reader/plain_reader/iterator/interval.cpp | 22 +++----- .../reader/plain_reader/iterator/interval.h | 5 +- .../reader/plain_reader/iterator/scanner.cpp | 43 ++++++++++----- .../reader/plain_reader/iterator/scanner.h | 1 + .../tx/columnshard/resource_subscriber/task.h | 4 ++ 8 files changed, 106 insertions(+), 48 deletions(-) diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 3e6bccd4719e..6697efab5001 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1518,6 +1518,7 @@ message TColumnShardConfig { repeated TRepairInfo Repairs = 15; optional uint32 MaxInFlightIntervalsOnRequest = 16; + optional uint32 MaxInFlightMemoryOnRequest = 17; } message TSchemeShardConfig { diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h index d9bbd6b898ce..efaec82c563e 100644 --- a/ydb/core/tx/columnshard/counters/scan.h +++ b/ydb/core/tx/columnshard/counters/scan.h @@ -3,6 +3,7 @@ #include "common/histogram.h" #include #include +#include #include namespace NKikimr::NColumnShard { @@ -10,31 +11,30 @@ namespace NKikimr::NColumnShard { class TScanAggregations: public TCommonCountersOwner { private: using TBase = TCommonCountersOwner; - std::shared_ptr ReadBlobs; - std::shared_ptr GranulesProcessing; - std::shared_ptr GranulesReady; std::shared_ptr ResultsReady; + std::shared_ptr RequestedResourcesMemory; std::shared_ptr ScanDuration; std::shared_ptr BlobsWaitingDuration; public: TScanAggregations(const TString& moduleId) : TBase(moduleId) - , GranulesProcessing(std::make_shared(moduleId, "InFlight/Granules/Processing")) , ResultsReady(std::make_shared(moduleId, "InFlight/Results/Ready")) + , RequestedResourcesMemory(std::make_shared(moduleId, "InFlight/Resources/Requested")) , ScanDuration(TBase::GetValueAutoAggregationsClient("ScanDuration")) , BlobsWaitingDuration(TBase::GetValueAutoAggregationsClient("BlobsWaitingDuration")) { } + std::shared_ptr GetRequestedResourcesMemory() const { + return RequestedResourcesMemory; + } + void OnBlobWaitingDuration(const TDuration d, const TDuration fullScanDuration) const { BlobsWaitingDuration->Add(d.MicroSeconds()); ScanDuration->SetValue(fullScanDuration.MicroSeconds()); } - const std::shared_ptr& GetGranulesProcessing() const { - return GranulesProcessing; - } const std::shared_ptr& GetResultsReady() const { return ResultsReady; } @@ -282,9 +282,40 @@ class TCounterGuard: TNonCopyable { }; +class TReaderResourcesGuard { +private: + std::shared_ptr Allocated; + std::shared_ptr Requested; + const std::shared_ptr SignalCounter; + const ui64 Volume; + +public: + TReaderResourcesGuard(const ui64 volume, const std::shared_ptr& requested, const std::shared_ptr& signalWatcher) + : Requested(requested) + , SignalCounter(signalWatcher) + , Volume(volume) + { + AFL_VERIFY(Requested); + Requested->Add(Volume); + SignalCounter->AddBytes(volume); + } + + void InitResources(const std::shared_ptr& g) { + AFL_VERIFY(!Allocated); + AFL_VERIFY(g->GetMemory() == Volume)("volume", Volume)("allocated", g->GetMemory()); + Allocated = g; + } + + ~TReaderResourcesGuard() { + SignalCounter->RemoveBytes(Volume); + AFL_VERIFY(Requested->Sub(Volume) >= 0); + } +}; + class TConcreteScanCounters: public TScanCounters { private: using TBase = TScanCounters; + std::shared_ptr RequestedResourcesBytes; std::shared_ptr MergeTasksCount; std::shared_ptr AssembleTasksCount; std::shared_ptr ReadTasksCount; @@ -292,6 +323,14 @@ class TConcreteScanCounters: public TScanCounters { public: TScanAggregations Aggregations; + ui64 GetRequestedMemoryBytes() const { + return RequestedResourcesBytes->Val(); + } + + std::shared_ptr BuildRequestedResourcesGuard(const ui64 volume) const { + return std::make_shared(volume, RequestedResourcesBytes, Aggregations.GetRequestedResourcesMemory()); + } + TCounterGuard GetMergeTasksGuard() const { return TCounterGuard(MergeTasksCount); } @@ -319,6 +358,7 @@ class TConcreteScanCounters: public TScanCounters { TConcreteScanCounters(const TScanCounters& counters) : TBase(counters) + , RequestedResourcesBytes(std::make_shared()) , MergeTasksCount(std::make_shared()) , AssembleTasksCount(std::make_shared()) , ReadTasksCount(std::make_shared()) diff --git a/ydb/core/tx/columnshard/engines/reader/common/result.h b/ydb/core/tx/columnshard/engines/reader/common/result.h index 2c3f698bf7d7..5780c0f2fc24 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/result.h +++ b/ydb/core/tx/columnshard/engines/reader/common/result.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include #include @@ -10,7 +11,7 @@ namespace NKikimr::NOlap::NReader { // Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation class TPartialReadResult { private: - YDB_READONLY_DEF(std::vector>, ResourcesGuards); + YDB_READONLY_DEF(std::vector>, ResourcesGuards); NArrow::TShardedRecordBatch ResultBatch; // This 1-row batch contains the last key that was read while producing the ResultBatch. @@ -32,7 +33,7 @@ class TPartialReadResult { return ResultBatch.GetRecordBatch(); } - const std::shared_ptr& GetResourcesGuardOnly() const { + const std::shared_ptr& GetResourcesGuardOnly() const { AFL_VERIFY(ResourcesGuards.size() == 1); AFL_VERIFY(!!ResourcesGuards.front()); return ResourcesGuards.front(); @@ -56,14 +57,12 @@ class TPartialReadResult { return LastReadKey; } - explicit TPartialReadResult( - const std::vector>& resourcesGuards, + explicit TPartialReadResult(const std::vector>& resourcesGuards, const NArrow::TShardedRecordBatch& batch, std::shared_ptr lastKey, const std::optional notFinishedIntervalIdx) : ResourcesGuards(resourcesGuards) , ResultBatch(batch) , LastReadKey(lastKey) - , NotFinishedIntervalIdx(notFinishedIntervalIdx) - { + , NotFinishedIntervalIdx(notFinishedIntervalIdx) { for (auto&& i : ResourcesGuards) { AFL_VERIFY(i); } @@ -72,16 +71,17 @@ class TPartialReadResult { Y_ABORT_UNLESS(LastReadKey->num_rows() == 1); } - explicit TPartialReadResult( - const std::shared_ptr& resourcesGuards, + explicit TPartialReadResult(const std::shared_ptr& resourcesGuards, const NArrow::TShardedRecordBatch& batch, std::shared_ptr lastKey, const std::optional notFinishedIntervalIdx) - : TPartialReadResult(std::vector>({resourcesGuards}), batch, lastKey, notFinishedIntervalIdx) { + : TPartialReadResult( + std::vector>({ resourcesGuards }), batch, lastKey, notFinishedIntervalIdx) { AFL_VERIFY(resourcesGuards); } - explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, std::shared_ptr lastKey, const std::optional notFinishedIntervalIdx) - : TPartialReadResult(std::vector>(), batch, lastKey, notFinishedIntervalIdx) { + explicit TPartialReadResult( + const NArrow::TShardedRecordBatch& batch, std::shared_ptr lastKey, const std::optional notFinishedIntervalIdx) + : TPartialReadResult(std::vector>(), batch, lastKey, notFinishedIntervalIdx) { } }; -} +} // namespace NKikimr::NOlap::NReader diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp index 8e228937b653..311a3c45f61d 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp @@ -18,19 +18,6 @@ void TFetchingInterval::ConstructResult() { } } -void TFetchingInterval::OnInitResourcesGuard(const std::shared_ptr& guard) { - IntervalStateGuard.SetStatus(NColumnShard::TScanCounters::EIntervalStatus::WaitSources); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "allocated")("interval_idx", IntervalIdx); - AFL_VERIFY(guard); - AFL_VERIFY(!ResourcesGuard); - ResourcesGuard = guard; - for (auto&& i : Sources) { - i.second->OnInitResourcesGuard(i.second); - } - AFL_VERIFY(ReadyGuards.Inc() <= 1); - ConstructResult(); -} - void TFetchingInterval::OnSourceFetchStageReady(const ui32 /*sourceIdx*/) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "fetched")("interval_idx", IntervalIdx); AFL_VERIFY(ReadySourcesCount.Inc() <= WaitSourcesCount); @@ -45,6 +32,7 @@ TFetchingInterval::TFetchingInterval(const NArrow::NMerger::TSortableBatchPositi , Context(context) , TaskGuard(Context->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) , Sources(sources) + , ResourcesGuard(Context->GetCommonContext()->GetCounters().BuildRequestedResourcesGuard(GetMemoryAllocation())) , IntervalIdx(intervalIdx) , IntervalStateGuard(Context->GetCommonContext()->GetCounters().CreateIntervalStateGuard()) { @@ -62,7 +50,13 @@ void TFetchingInterval::DoOnAllocationSuccess(const std::shared_ptrDebugString())("start", MergingContext->GetIncludeStart())("finish", MergingContext->GetIncludeFinish())("sources", Sources.size()); - OnInitResourcesGuard(guard); + IntervalStateGuard.SetStatus(NColumnShard::TScanCounters::EIntervalStatus::WaitSources); + ResourcesGuard->InitResources(guard); + for (auto&& i : Sources) { + i.second->OnInitResourcesGuard(i.second); + } + AFL_VERIFY(ReadyGuards.Inc() <= 1); + ConstructResult(); } void TFetchingInterval::SetMerger(std::unique_ptr&& merger) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.h index 80613ef5b2d2..6956303a48c9 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.h @@ -20,13 +20,12 @@ class TFetchingInterval: public TNonCopyable, public NResourceBroker::NSubscribe void ConstructResult(); - std::shared_ptr ResourcesGuard; + std::shared_ptr ResourcesGuard; const ui32 IntervalIdx; TAtomicCounter ReadySourcesCount = 0; TAtomicCounter ReadyGuards = 0; ui32 WaitSourcesCount = 0; NColumnShard::TConcreteScanCounters::TScanIntervalStateGuard IntervalStateGuard; - void OnInitResourcesGuard(const std::shared_ptr& guard); protected: virtual void DoOnAllocationSuccess(const std::shared_ptr& guard) override; @@ -47,7 +46,7 @@ class TFetchingInterval: public TNonCopyable, public NResourceBroker::NSubscribe return Sources; } - const std::shared_ptr& GetResourcesGuard() const { + const std::shared_ptr& GetResourcesGuard() const { return ResourcesGuard; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp index 9eafaf6f7cf1..bc5d8962ca97 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp @@ -97,10 +97,15 @@ TConclusionStatus TScanHead::Start() { TScanHead::TScanHead(std::deque>&& sources, const std::shared_ptr& context) : Context(context) { - if (!HasAppData() || !AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) { - MaxInFlight = 256; - } else { - MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest(); + + if (HasAppData()) { + if (AppDataVerified().ColumnShardConfig.HasMaxInFlightMemoryOnRequest()) { + MaxInFlightMemory = AppDataVerified().ColumnShardConfig.GetMaxInFlightMemoryOnRequest(); + } + + if (AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) { + MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest(); + } } if (Context->GetReadMetadata()->Limit) { @@ -239,19 +244,31 @@ TConclusion TScanHead::BuildNextInterval() { if (AbortFlag) { return false; } - while (BorderPoints.size() && (FetchingIntervals.size() < InFlightLimit || BorderPoints.begin()->second.GetStartSources().empty())) { + while (BorderPoints.size()) { + if (BorderPoints.begin()->second.GetStartSources().size()) { + if (FetchingIntervals.size() >= InFlightLimit) { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_next_interval")("reason", "too many intervals in flight")( + "count", FetchingIntervals.size())("limit", InFlightLimit); + return false; + } + if (Context->GetCommonContext()->GetCounters().GetRequestedMemoryBytes() >= MaxInFlightMemory) { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_next_interval")("reason", "a lot of memory in usage")( + "volume", Context->GetCommonContext()->GetCounters().GetRequestedMemoryBytes())("limit", MaxInFlightMemory); + return false; + } + } auto firstBorderPointInfo = std::move(BorderPoints.begin()->second); CurrentState.OnStartPoint(firstBorderPointInfo); if (CurrentState.GetIsSpecialPoint()) { const ui32 intervalIdx = SegmentIdxCounter++; - auto interval = std::make_shared( - BorderPoints.begin()->first, BorderPoints.begin()->first, intervalIdx, CurrentState.GetCurrentSources(), - Context, true, true, false); + auto interval = std::make_shared(BorderPoints.begin()->first, BorderPoints.begin()->first, intervalIdx, + CurrentState.GetCurrentSources(), Context, true, true, false); FetchingIntervals.emplace(intervalIdx, interval); IntervalStats.emplace_back(CurrentState.GetCurrentSources().size(), true); NResourceBroker::NSubscribe::ITask::StartResourceSubscription(Context->GetCommonContext()->GetResourceSubscribeActorId(), interval); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)("interval", interval->DebugJson()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)( + "interval", interval->DebugJson()); } CurrentState.OnFinishPoint(firstBorderPointInfo); @@ -262,11 +279,13 @@ TConclusion TScanHead::BuildNextInterval() { Y_ABORT_UNLESS(BorderPoints.size()); CurrentState.OnNextPointInfo(BorderPoints.begin()->second); const ui32 intervalIdx = SegmentIdxCounter++; - auto interval = std::make_shared(*CurrentStart, BorderPoints.begin()->first, intervalIdx, CurrentState.GetCurrentSources(), Context, - CurrentState.GetIncludeFinish(), CurrentState.GetIncludeStart(), CurrentState.GetIsExclusiveInterval()); + auto interval = + std::make_shared(*CurrentStart, BorderPoints.begin()->first, intervalIdx, CurrentState.GetCurrentSources(), + Context, CurrentState.GetIncludeFinish(), CurrentState.GetIncludeStart(), CurrentState.GetIsExclusiveInterval()); FetchingIntervals.emplace(intervalIdx, interval); IntervalStats.emplace_back(CurrentState.GetCurrentSources().size(), false); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)("interval", interval->DebugJson()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)( + "interval", interval->DebugJson()); NResourceBroker::NSubscribe::ITask::StartResourceSubscription(Context->GetCommonContext()->GetResourceSubscribeActorId(), interval); return true; } else { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h index 7092dac19acd..382c9f88b22a 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h @@ -79,6 +79,7 @@ class TScanHead { std::vector IntervalStats; ui64 InFlightLimit = 1; ui64 MaxInFlight = 256; + ui64 MaxInFlightMemory = ((ui64)256) << 20; ui64 ZeroCount = 0; bool AbortFlag = false; void DrainSources(); diff --git a/ydb/core/tx/columnshard/resource_subscriber/task.h b/ydb/core/tx/columnshard/resource_subscriber/task.h index df4b742f1ad9..46a1ebebd32c 100644 --- a/ydb/core/tx/columnshard/resource_subscriber/task.h +++ b/ydb/core/tx/columnshard/resource_subscriber/task.h @@ -28,6 +28,10 @@ class TResourcesGuard: public NColumnShard::TMonitoringObjectsCounter