Skip to content

Commit

Permalink
fix scanner limits processing (#13556)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 20, 2025
1 parent 2894ac6 commit e649793
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 25 deletions.
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
exchangerSettings->SetStartDelayMs(10);
exchangerSettings->SetMaxDelayMs(10);
AppConfig.MutableColumnShardConfig()->SetDisabledOnSchemeShard(false);
AppConfig.MutableColumnShardConfig()->SetMaxInFlightIntervalsOnRequest(1);
FeatureFlags.SetEnableSparsedColumns(true);
FeatureFlags.SetEnableWritePortionsOnInsert(true);
FeatureFlags.SetEnableParameterizedDecimal(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace NKikimr::NOlap::NReader::NSimple {

void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::shared_ptr<arrow::Table>&& tableExt, const ui32 startIndex,
const ui32 recordsCount, TPlainReadData& reader) {

source->MutableResultRecordsCount() += tableExt ? tableExt->num_rows() : 0;
if (!tableExt || !tableExt->num_rows()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("empty_source", source->DebugJson().GetStringRobust());
Expand Down Expand Up @@ -56,10 +55,11 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
AFL_VERIFY(FetchingSourcesByIdx.erase(frontSource->GetSourceIdx()));
FetchingSources.pop_front();
frontSource->ClearResult();
if (Context->GetCommonContext()->GetReadMetadata()->HasLimit() && SortedSources.size() && frontSource->GetResultRecordsCount()) {
if (Context->GetCommonContext()->GetReadMetadata()->HasLimit()) {
AFL_VERIFY(FetchingInFlightSources.erase(frontSource));
AFL_VERIFY(FinishedSources.emplace(frontSource).second);
while (FinishedSources.size() && (*FinishedSources.begin())->GetFinish() < SortedSources.front()->GetStart()) {
while (FinishedSources.size() &&
(SortedSources.empty() || (*FinishedSources.begin())->GetFinish() < SortedSources.front()->GetStart())) {
auto finishedSource = *FinishedSources.begin();
if (!finishedSource->GetResultRecordsCount() && InFlightLimit < MaxInFlight) {
InFlightLimit = 2 * InFlightLimit;
Expand Down Expand Up @@ -90,10 +90,8 @@ TConclusionStatus TScanHead::Start() {

TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context)
: Context(context) {
if (HasAppData()) {
if (AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) {
MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest();
}
if (HasAppData() && AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) {
MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest();
}
if (Context->GetReadMetadata()->HasLimit()) {
InFlightLimit = 1;
Expand Down Expand Up @@ -121,45 +119,51 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
if (!Context->IsActive()) {
return false;
}
if (InFlightLimit <= IntervalsInFlightCount) {
return false;
}
if (SortedSources.size() == 0) {
return false;
}
bool changed = false;
ui32 inFlightCountLocal = 0;
if (SortedSources.size()) {
if (!Context->GetCommonContext()->GetReadMetadata()->HasLimit()) {
while (SortedSources.size() && FetchingSources.size() < InFlightLimit) {
SortedSources.front()->StartProcessing(SortedSources.front());
FetchingSources.emplace_back(SortedSources.front());
AFL_VERIFY(FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front()).second);
SortedSources.pop_front();
changed = true;
}
} else {
if (InFlightLimit <= IntervalsInFlightCount) {
return false;
}
ui32 inFlightCountLocal = 0;
for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
if ((*it)->GetFinish() < SortedSources.front()->GetStart()) {
if (SortedSources.empty() || (*it)->GetFinish() < SortedSources.front()->GetStart()) {
++inFlightCountLocal;
} else {
break;
}
}
}
AFL_VERIFY(IntervalsInFlightCount == inFlightCountLocal)("count_global", IntervalsInFlightCount)("count_local", inFlightCountLocal);
while (SortedSources.size() && inFlightCountLocal < InFlightLimit) {
SortedSources.front()->StartProcessing(SortedSources.front());
FetchingSources.emplace_back(SortedSources.front());
FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front());
AFL_VERIFY(FetchingInFlightSources.emplace(SortedSources.front()).second);
SortedSources.pop_front();
if (SortedSources.size()) {
AFL_VERIFY(IntervalsInFlightCount == inFlightCountLocal)("count_global", IntervalsInFlightCount)("count_local", inFlightCountLocal);
while (SortedSources.size() && inFlightCountLocal < InFlightLimit) {
SortedSources.front()->StartProcessing(SortedSources.front());
FetchingSources.emplace_back(SortedSources.front());
AFL_VERIFY(FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front()).second);
AFL_VERIFY(FetchingInFlightSources.emplace(SortedSources.front()).second);
SortedSources.pop_front();
ui32 inFlightCountLocalNew = 0;
for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
if ((*it)->GetFinish() < SortedSources.front()->GetStart()) {
if (SortedSources.empty() || (*it)->GetFinish() < SortedSources.front()->GetStart()) {
++inFlightCountLocalNew;
} else {
break;
}
}
AFL_VERIFY(inFlightCountLocal <= inFlightCountLocalNew);
inFlightCountLocal = inFlightCountLocalNew;
changed = true;
}
changed = true;
IntervalsInFlightCount = inFlightCountLocal;
}
IntervalsInFlightCount = inFlightCountLocal;
return changed;
}

Expand Down

0 comments on commit e649793

Please sign in to comment.