Skip to content

Commit

Permalink
Limit requested memory (ydb-platform#6698)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jul 16, 2024
1 parent 89c16ee commit 8343cfd
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 48 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1518,6 +1518,7 @@ message TColumnShardConfig {
repeated TRepairInfo Repairs = 15;

optional uint32 MaxInFlightIntervalsOnRequest = 16;
optional uint32 MaxInFlightMemoryOnRequest = 17;
}

message TSchemeShardConfig {
Expand Down
54 changes: 47 additions & 7 deletions ydb/core/tx/columnshard/counters/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,38 @@
#include "common/histogram.h"
#include <ydb/core/tx/columnshard/resources/memory.h>
#include <ydb/core/tx/columnshard/resource_subscriber/counters.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>

namespace NKikimr::NColumnShard {

class TScanAggregations: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
std::shared_ptr<NOlap::TMemoryAggregation> ReadBlobs;
std::shared_ptr<NOlap::TMemoryAggregation> GranulesProcessing;
std::shared_ptr<NOlap::TMemoryAggregation> GranulesReady;
std::shared_ptr<NOlap::TMemoryAggregation> ResultsReady;
std::shared_ptr<NOlap::TMemoryAggregation> RequestedResourcesMemory;
std::shared_ptr<TValueAggregationClient> ScanDuration;
std::shared_ptr<TValueAggregationClient> BlobsWaitingDuration;
public:
TScanAggregations(const TString& moduleId)
: TBase(moduleId)
, GranulesProcessing(std::make_shared<NOlap::TMemoryAggregation>(moduleId, "InFlight/Granules/Processing"))
, ResultsReady(std::make_shared<NOlap::TMemoryAggregation>(moduleId, "InFlight/Results/Ready"))
, RequestedResourcesMemory(std::make_shared<NOlap::TMemoryAggregation>(moduleId, "InFlight/Resources/Requested"))
, ScanDuration(TBase::GetValueAutoAggregationsClient("ScanDuration"))
, BlobsWaitingDuration(TBase::GetValueAutoAggregationsClient("BlobsWaitingDuration"))
{

}

std::shared_ptr<NOlap::TMemoryAggregation> GetRequestedResourcesMemory() const {
return RequestedResourcesMemory;
}

void OnBlobWaitingDuration(const TDuration d, const TDuration fullScanDuration) const {
BlobsWaitingDuration->Add(d.MicroSeconds());
ScanDuration->SetValue(fullScanDuration.MicroSeconds());
}

const std::shared_ptr<NOlap::TMemoryAggregation>& GetGranulesProcessing() const {
return GranulesProcessing;
}
const std::shared_ptr<NOlap::TMemoryAggregation>& GetResultsReady() const {
return ResultsReady;
}
Expand Down Expand Up @@ -282,16 +282,55 @@ class TCounterGuard: TNonCopyable {

};

class TReaderResourcesGuard {
private:
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> Allocated;
std::shared_ptr<TAtomicCounter> Requested;
const std::shared_ptr<NOlap::TMemoryAggregation> SignalCounter;
const ui64 Volume;

public:
TReaderResourcesGuard(const ui64 volume, const std::shared_ptr<TAtomicCounter>& requested, const std::shared_ptr<NOlap::TMemoryAggregation>& signalWatcher)
: Requested(requested)
, SignalCounter(signalWatcher)
, Volume(volume)
{
AFL_VERIFY(Requested);
Requested->Add(Volume);
SignalCounter->AddBytes(volume);
}

void InitResources(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& g) {
AFL_VERIFY(!Allocated);
AFL_VERIFY(g->GetMemory() == Volume)("volume", Volume)("allocated", g->GetMemory());
Allocated = g;
}

~TReaderResourcesGuard() {
SignalCounter->RemoveBytes(Volume);
AFL_VERIFY(Requested->Sub(Volume) >= 0);
}
};

class TConcreteScanCounters: public TScanCounters {
private:
using TBase = TScanCounters;
std::shared_ptr<TAtomicCounter> RequestedResourcesBytes;
std::shared_ptr<TAtomicCounter> MergeTasksCount;
std::shared_ptr<TAtomicCounter> AssembleTasksCount;
std::shared_ptr<TAtomicCounter> ReadTasksCount;
std::shared_ptr<TAtomicCounter> ResourcesAllocationTasksCount;
public:
TScanAggregations Aggregations;

ui64 GetRequestedMemoryBytes() const {
return RequestedResourcesBytes->Val();
}

std::shared_ptr<TReaderResourcesGuard> BuildRequestedResourcesGuard(const ui64 volume) const {
return std::make_shared<TReaderResourcesGuard>(volume, RequestedResourcesBytes, Aggregations.GetRequestedResourcesMemory());
}

TCounterGuard GetMergeTasksGuard() const {
return TCounterGuard(MergeTasksCount);
}
Expand Down Expand Up @@ -319,6 +358,7 @@ class TConcreteScanCounters: public TScanCounters {

TConcreteScanCounters(const TScanCounters& counters)
: TBase(counters)
, RequestedResourcesBytes(std::make_shared<TAtomicCounter>())
, MergeTasksCount(std::make_shared<TAtomicCounter>())
, AssembleTasksCount(std::make_shared<TAtomicCounter>())
, ReadTasksCount(std::make_shared<TAtomicCounter>())
Expand Down
24 changes: 12 additions & 12 deletions ydb/core/tx/columnshard/engines/reader/common/result.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <ydb/core/tx/columnshard/common/snapshot.h>
#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/tx/columnshard/engines/predicate/filter.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
#include <ydb/core/tx/program/program.h>
Expand All @@ -10,7 +11,7 @@ namespace NKikimr::NOlap::NReader {
// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
class TPartialReadResult {
private:
YDB_READONLY_DEF(std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>, ResourcesGuards);
YDB_READONLY_DEF(std::vector<std::shared_ptr<NColumnShard::TReaderResourcesGuard>>, ResourcesGuards);
NArrow::TShardedRecordBatch ResultBatch;

// This 1-row batch contains the last key that was read while producing the ResultBatch.
Expand All @@ -32,7 +33,7 @@ class TPartialReadResult {
return ResultBatch.GetRecordBatch();
}

const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& GetResourcesGuardOnly() const {
const std::shared_ptr<NColumnShard::TReaderResourcesGuard>& GetResourcesGuardOnly() const {
AFL_VERIFY(ResourcesGuards.size() == 1);
AFL_VERIFY(!!ResourcesGuards.front());
return ResourcesGuards.front();
Expand All @@ -56,14 +57,12 @@ class TPartialReadResult {
return LastReadKey;
}

explicit TPartialReadResult(
const std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>& resourcesGuards,
explicit TPartialReadResult(const std::vector<std::shared_ptr<NColumnShard::TReaderResourcesGuard>>& resourcesGuards,
const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey, const std::optional<ui32> notFinishedIntervalIdx)
: ResourcesGuards(resourcesGuards)
, ResultBatch(batch)
, LastReadKey(lastKey)
, NotFinishedIntervalIdx(notFinishedIntervalIdx)
{
, NotFinishedIntervalIdx(notFinishedIntervalIdx) {
for (auto&& i : ResourcesGuards) {
AFL_VERIFY(i);
}
Expand All @@ -72,16 +71,17 @@ class TPartialReadResult {
Y_ABORT_UNLESS(LastReadKey->num_rows() == 1);
}

explicit TPartialReadResult(
const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuards,
explicit TPartialReadResult(const std::shared_ptr<NColumnShard::TReaderResourcesGuard>& resourcesGuards,
const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey, const std::optional<ui32> notFinishedIntervalIdx)
: TPartialReadResult(std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>({resourcesGuards}), batch, lastKey, notFinishedIntervalIdx) {
: TPartialReadResult(
std::vector<std::shared_ptr<NColumnShard::TReaderResourcesGuard>>({ resourcesGuards }), batch, lastKey, notFinishedIntervalIdx) {
AFL_VERIFY(resourcesGuards);
}

explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey, const std::optional<ui32> notFinishedIntervalIdx)
: TPartialReadResult(std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>(), batch, lastKey, notFinishedIntervalIdx) {
explicit TPartialReadResult(
const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey, const std::optional<ui32> notFinishedIntervalIdx)
: TPartialReadResult(std::vector<std::shared_ptr<NColumnShard::TReaderResourcesGuard>>(), batch, lastKey, notFinishedIntervalIdx) {
}
};

}
} // namespace NKikimr::NOlap::NReader
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,6 @@ void TFetchingInterval::ConstructResult() {
}
}

void TFetchingInterval::OnInitResourcesGuard(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard) {
IntervalStateGuard.SetStatus(NColumnShard::TScanCounters::EIntervalStatus::WaitSources);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "allocated")("interval_idx", IntervalIdx);
AFL_VERIFY(guard);
AFL_VERIFY(!ResourcesGuard);
ResourcesGuard = guard;
for (auto&& i : Sources) {
i.second->OnInitResourcesGuard(i.second);
}
AFL_VERIFY(ReadyGuards.Inc() <= 1);
ConstructResult();
}

void TFetchingInterval::OnSourceFetchStageReady(const ui32 /*sourceIdx*/) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "fetched")("interval_idx", IntervalIdx);
AFL_VERIFY(ReadySourcesCount.Inc() <= WaitSourcesCount);
Expand All @@ -45,6 +32,7 @@ TFetchingInterval::TFetchingInterval(const NArrow::NMerger::TSortableBatchPositi
, Context(context)
, TaskGuard(Context->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard())
, Sources(sources)
, ResourcesGuard(Context->GetCommonContext()->GetCounters().BuildRequestedResourcesGuard(GetMemoryAllocation()))
, IntervalIdx(intervalIdx)
, IntervalStateGuard(Context->GetCommonContext()->GetCounters().CreateIntervalStateGuard())
{
Expand All @@ -62,7 +50,13 @@ void TFetchingInterval::DoOnAllocationSuccess(const std::shared_ptr<NResourceBro
AFL_VERIFY(guard);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("interval_idx", IntervalIdx)("event", "resources_allocated")
("resources", guard->DebugString())("start", MergingContext->GetIncludeStart())("finish", MergingContext->GetIncludeFinish())("sources", Sources.size());
OnInitResourcesGuard(guard);
IntervalStateGuard.SetStatus(NColumnShard::TScanCounters::EIntervalStatus::WaitSources);
ResourcesGuard->InitResources(guard);
for (auto&& i : Sources) {
i.second->OnInitResourcesGuard(i.second);
}
AFL_VERIFY(ReadyGuards.Inc() <= 1);
ConstructResult();
}

void TFetchingInterval::SetMerger(std::unique_ptr<NArrow::NMerger::TMergePartialStream>&& merger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ class TFetchingInterval: public TNonCopyable, public NResourceBroker::NSubscribe

void ConstructResult();

std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
std::shared_ptr<NColumnShard::TReaderResourcesGuard> ResourcesGuard;
const ui32 IntervalIdx;
TAtomicCounter ReadySourcesCount = 0;
TAtomicCounter ReadyGuards = 0;
ui32 WaitSourcesCount = 0;
NColumnShard::TConcreteScanCounters::TScanIntervalStateGuard IntervalStateGuard;
void OnInitResourcesGuard(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard);
protected:
virtual void DoOnAllocationSuccess(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& guard) override;

Expand All @@ -47,7 +46,7 @@ class TFetchingInterval: public TNonCopyable, public NResourceBroker::NSubscribe
return Sources;
}

const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& GetResourcesGuard() const {
const std::shared_ptr<NColumnShard::TReaderResourcesGuard>& GetResourcesGuard() const {
return ResourcesGuard;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,15 @@ TConclusionStatus TScanHead::Start() {
TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context)
: Context(context)
{
if (!HasAppData() || !AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) {
MaxInFlight = 256;
} else {
MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest();

if (HasAppData()) {
if (AppDataVerified().ColumnShardConfig.HasMaxInFlightMemoryOnRequest()) {
MaxInFlightMemory = AppDataVerified().ColumnShardConfig.GetMaxInFlightMemoryOnRequest();
}

if (AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) {
MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest();
}
}

if (Context->GetReadMetadata()->Limit) {
Expand Down Expand Up @@ -239,19 +244,31 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
if (AbortFlag) {
return false;
}
while (BorderPoints.size() && (FetchingIntervals.size() < InFlightLimit || BorderPoints.begin()->second.GetStartSources().empty())) {
while (BorderPoints.size()) {
if (BorderPoints.begin()->second.GetStartSources().size()) {
if (FetchingIntervals.size() >= InFlightLimit) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_next_interval")("reason", "too many intervals in flight")(
"count", FetchingIntervals.size())("limit", InFlightLimit);
return false;
}
if (Context->GetCommonContext()->GetCounters().GetRequestedMemoryBytes() >= MaxInFlightMemory) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_next_interval")("reason", "a lot of memory in usage")(
"volume", Context->GetCommonContext()->GetCounters().GetRequestedMemoryBytes())("limit", MaxInFlightMemory);
return false;
}
}
auto firstBorderPointInfo = std::move(BorderPoints.begin()->second);
CurrentState.OnStartPoint(firstBorderPointInfo);

if (CurrentState.GetIsSpecialPoint()) {
const ui32 intervalIdx = SegmentIdxCounter++;
auto interval = std::make_shared<TFetchingInterval>(
BorderPoints.begin()->first, BorderPoints.begin()->first, intervalIdx, CurrentState.GetCurrentSources(),
Context, true, true, false);
auto interval = std::make_shared<TFetchingInterval>(BorderPoints.begin()->first, BorderPoints.begin()->first, intervalIdx,
CurrentState.GetCurrentSources(), Context, true, true, false);
FetchingIntervals.emplace(intervalIdx, interval);
IntervalStats.emplace_back(CurrentState.GetCurrentSources().size(), true);
NResourceBroker::NSubscribe::ITask::StartResourceSubscription(Context->GetCommonContext()->GetResourceSubscribeActorId(), interval);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)("interval", interval->DebugJson());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)(
"interval", interval->DebugJson());
}

CurrentState.OnFinishPoint(firstBorderPointInfo);
Expand All @@ -262,11 +279,13 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
Y_ABORT_UNLESS(BorderPoints.size());
CurrentState.OnNextPointInfo(BorderPoints.begin()->second);
const ui32 intervalIdx = SegmentIdxCounter++;
auto interval = std::make_shared<TFetchingInterval>(*CurrentStart, BorderPoints.begin()->first, intervalIdx, CurrentState.GetCurrentSources(), Context,
CurrentState.GetIncludeFinish(), CurrentState.GetIncludeStart(), CurrentState.GetIsExclusiveInterval());
auto interval =
std::make_shared<TFetchingInterval>(*CurrentStart, BorderPoints.begin()->first, intervalIdx, CurrentState.GetCurrentSources(),
Context, CurrentState.GetIncludeFinish(), CurrentState.GetIncludeStart(), CurrentState.GetIsExclusiveInterval());
FetchingIntervals.emplace(intervalIdx, interval);
IntervalStats.emplace_back(CurrentState.GetCurrentSources().size(), false);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)("interval", interval->DebugJson());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_interval")("interval_idx", intervalIdx)(
"interval", interval->DebugJson());
NResourceBroker::NSubscribe::ITask::StartResourceSubscription(Context->GetCommonContext()->GetResourceSubscribeActorId(), interval);
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class TScanHead {
std::vector<TIntervalStat> IntervalStats;
ui64 InFlightLimit = 1;
ui64 MaxInFlight = 256;
ui64 MaxInFlightMemory = ((ui64)256) << 20;
ui64 ZeroCount = 0;
bool AbortFlag = false;
void DrainSources();
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/resource_subscriber/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class TResourcesGuard: public NColumnShard::TMonitoringObjectsCounter<TResources
const TTaskContext Context;
const ui64 Priority;
public:
ui64 GetMemory() const {
return Memory;
}

TString DebugString() const {
return TStringBuilder() << "(mem=" << Memory << ";cpu=" << Cpu << ";)";
}
Expand Down

0 comments on commit 8343cfd

Please sign in to comment.