Skip to content

Commit

Permalink
Merge pull request ydb-platform#11014 from uzhastik/24_3_merge_14
Browse files Browse the repository at this point in the history
24 3 merge 14
  • Loading branch information
maximyurchuk authored Oct 29, 2024
2 parents c9088c0 + 1368707 commit d6142ce
Show file tree
Hide file tree
Showing 47 changed files with 3,399 additions and 448 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
!*/
# Unignore all files inside canondata dir
!**/canondata/**
# Allow docker files
!Dockerfile

/canonization_show_res.log

Expand Down Expand Up @@ -43,6 +45,7 @@ __pycache__/
.idea/
.vscode/
.clangd
.antlr/

# KDevelop IDE
*.kdev4
Expand Down Expand Up @@ -82,3 +85,6 @@ list_result.log
bin/config.json

.vs/

# handy for local junk, which is not intended to appear in the repo
junk/
18 changes: 15 additions & 3 deletions ydb/core/base/pool_stats_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,24 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor {
void OnWakeup(const TActorContext &ctx) override {
MiniKQLPoolStats.Update();

TVector<std::tuple<TString, double, ui32, ui32>> pools;
auto systemUpdate = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate>();
ui32 coresTotal = 0;
double coresUsed = 0;
for (const auto& pool : PoolCounters) {
pools.emplace_back(pool.Name, pool.Usage, pool.Threads, pool.LimitThreads);
auto& pb = *systemUpdate->Record.AddPoolStats();
pb.SetName(pool.Name);
pb.SetUsage(pool.Usage);
pb.SetThreads(static_cast<ui32>(pool.Threads));
pb.SetLimit(static_cast<ui32>(pool.LimitThreads));
if (pool.Name != "IO") {
coresTotal += static_cast<ui32>(pool.DefaultThreads);
}
coresUsed += pool.Usage * pool.LimitThreads;
}
systemUpdate->Record.SetCoresTotal(coresTotal);
systemUpdate->Record.SetCoresUsed(coresUsed);

ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools));
ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), systemUpdate.release());
}

private:
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,8 @@ void TSharedCacheInitializer::InitializeServices(

config->TotalAsyncQueueInFlyLimit = cfg.GetAsyncQueueInFlyLimit();
config->TotalScanQueueInFlyLimit = cfg.GetScanQueueInFlyLimit();
config->ReplacementPolicy = cfg.GetReplacementPolicy();
config->LimitBytes = cfg.GetMemoryLimit();

if (cfg.HasActivePagesReservationPercent()) {
config->ActivePagesReservationPercent = cfg.GetActivePagesReservationPercent();
Expand All @@ -1184,10 +1186,6 @@ void TSharedCacheInitializer::InitializeServices(
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
TIntrusivePtr<::NMonitoring::TDynamicCounters> sausageGroup = tabletGroup->GetSubgroup("type", "S_CACHE");

config->CacheConfig = new TCacheCacheConfig(cfg.GetMemoryLimit(),
sausageGroup->GetCounter("fresh"),
sausageGroup->GetCounter("staging"),
sausageGroup->GetCounter("warm"));
config->Counters = new TSharedPageCacheCounters(sausageGroup);

setup->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(MakeSharedPageCacheId(0),
Expand Down
47 changes: 47 additions & 0 deletions ydb/core/grpc_services/base/flow_control.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once
#include <util/generic/queue.h>
#include <util/generic/yexception.h>
#include <util/system/types.h>

namespace NKikimr::NGRpcService {

class TRpcFlowControlState {
public:
TRpcFlowControlState(ui64 inflightLimitBytes)
: InflightLimitBytes_(inflightLimitBytes) {}

void PushResponse(ui64 responseSizeBytes) {
ResponseSizeQueue_.push(responseSizeBytes);
TotalResponsesSize_ += responseSizeBytes;
}

void PopResponse() {
Y_ENSURE(!ResponseSizeQueue_.empty());
TotalResponsesSize_ -= ResponseSizeQueue_.front();
ResponseSizeQueue_.pop();
}

size_t QueueSize() const {
return ResponseSizeQueue_.size();
}

i64 FreeSpaceBytes() const { // Negative value temporarily stops data evaluation in DQ graph
return static_cast<i64>(InflightLimitBytes_) - static_cast<i64>(TotalResponsesSize_);
}

ui64 InflightBytes() const {
return TotalResponsesSize_;
}

ui64 InflightLimitBytes() const {
return InflightLimitBytes_;
}

private:
const ui64 InflightLimitBytes_;

TQueue<ui64> ResponseSizeQueue_;
ui64 TotalResponsesSize_ = 0;
};

} // namespace NKikimr::NGRpcService
51 changes: 5 additions & 46 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include "service_query.h"

#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/grpc_services/audit_dml_operations.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/base/flow_control.h>
#include <ydb/core/grpc_services/cancelation/cancelation_event.h>
#include <ydb/core/grpc_services/rpc_kqp_base.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
Expand All @@ -23,51 +23,10 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQu

struct TProducerState {
TMaybe<ui64> LastSeqNo;
ui64 AckedFreeSpaceBytes = 0;
i64 AckedFreeSpaceBytes = 0;
TActorId ActorId;
};

class TRpcFlowControlState {
public:
TRpcFlowControlState(ui64 inflightLimitBytes)
: InflightLimitBytes_(inflightLimitBytes) {}

void PushResponse(ui64 responseSizeBytes) {
ResponseSizeQueue_.push(responseSizeBytes);
TotalResponsesSize_ += responseSizeBytes;
}

void PopResponse() {
Y_ENSURE(!ResponseSizeQueue_.empty());
TotalResponsesSize_ -= ResponseSizeQueue_.front();
ResponseSizeQueue_.pop();
}

size_t QueueSize() const {
return ResponseSizeQueue_.size();
}

ui64 FreeSpaceBytes() const {
return TotalResponsesSize_ < InflightLimitBytes_
? InflightLimitBytes_ - TotalResponsesSize_
: 0;
}

ui64 InflightBytes() const {
return TotalResponsesSize_;
}

ui64 InflightLimitBytes() const {
return InflightLimitBytes_;
}

private:
const ui64 InflightLimitBytes_;

TQueue<ui64> ResponseSizeQueue_;
ui64 TotalResponsesSize_ = 0;
};

bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::TransactionSettings& to,
NYql::TIssues& issues)
{
Expand Down Expand Up @@ -326,13 +285,13 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
FlowControl_.PopResponse();
}

ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();

for (auto& pair : StreamChannels_) {
const auto& channelId = pair.first;
auto& channel = pair.second;

if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes == 0) {
if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes <= 0) {
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, "
<< ", channel: " << channelId
<< ", seqNo: " << channel.LastSeqNo
Expand Down Expand Up @@ -361,7 +320,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);

FlowControl_.PushResponse(out.size());
auto freeSpaceBytes = FlowControl_.FreeSpaceBytes();
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();

Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS);

Expand Down
62 changes: 27 additions & 35 deletions ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "service_table.h"
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/base/flow_control.h>

#include "rpc_common/rpc_common.h"
#include "rpc_kqp_base.h"
Expand Down Expand Up @@ -155,7 +156,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ

TStreamExecuteScanQueryRPC(TEvStreamExecuteScanQueryRequest* request, ui64 rpcBufferSize)
: Request_(request)
, RpcBufferSize_(rpcBufferSize) {}
, FlowControl_(rpcBufferSize) {}

void Bootstrap(const TActorContext &ctx) {
this->Become(&TStreamExecuteScanQueryRPC::StateWork);
Expand Down Expand Up @@ -250,32 +251,30 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
void Handle(TRpcServices::TEvGrpcNextReply::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " NextReply"
<< ", left: " << ev->Get()->LeftInQueue
<< ", queue: " << GRpcResponsesSizeQueue_.size()
<< ", used memory: " << GRpcResponsesSize_
<< ", buffer size: " << RpcBufferSize_);
<< ", queue: " << FlowControl_.QueueSize()
<< ", inflight bytes: " << FlowControl_.InflightBytes()
<< ", limit bytes: " << FlowControl_.InflightLimitBytes());

while (GRpcResponsesSizeQueue_.size() > ev->Get()->LeftInQueue) {
GRpcResponsesSize_ -= GRpcResponsesSizeQueue_.front();
GRpcResponsesSizeQueue_.pop();
while (FlowControl_.QueueSize() > ev->Get()->LeftInQueue) {
FlowControl_.PopResponse();
}
Y_DEBUG_ABORT_UNLESS(GRpcResponsesSizeQueue_.empty() == (GRpcResponsesSize_ == 0));
LastDataStreamTimestamp_ = TAppData::TimeProvider->Now();

if (WaitOnSeqNo_ && RpcBufferSize_ > GRpcResponsesSize_) {
ui64 freeSpace = RpcBufferSize_ - GRpcResponsesSize_;
LastDataStreamTimestamp_ = TAppData::TimeProvider->Now();

const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
if (freeSpaceBytes > 0 && LastSeqNo_ && AckedFreeSpaceBytes_ <= 0) {
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack"
<< ", seqNo: " << *WaitOnSeqNo_
<< ", freeSpace: " << freeSpace
<< ", seqNo: " << *LastSeqNo_
<< ", freeSpace: " << freeSpaceBytes
<< ", to: " << ExecuterActorId_);

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*WaitOnSeqNo_);
resp->Record.SetFreeSpace(freeSpace);
resp->Record.SetSeqNo(*LastSeqNo_);
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(ExecuterActorId_, resp.Release());

WaitOnSeqNo_.Clear();
AckedFreeSpaceBytes_ = freeSpaceBytes;
}
}

Expand Down Expand Up @@ -349,28 +348,22 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);

GRpcResponsesSizeQueue_.push(out.size());
GRpcResponsesSize_ += out.size();
FlowControl_.PushResponse(out.size());
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
LastSeqNo_ = ev->Get()->Record.GetSeqNo();
AckedFreeSpaceBytes_ = freeSpaceBytes;

Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS);

ui64 freeSpace = GRpcResponsesSize_ < RpcBufferSize_
? RpcBufferSize_ - GRpcResponsesSize_
: 0;

if (freeSpace == 0) {
WaitOnSeqNo_ = ev->Get()->Record.GetSeqNo();
}

LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack"
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
<< ", freeSpace: " << freeSpace
<< ", freeSpace: " << freeSpaceBytes
<< ", to: " << ev->Sender
<< ", queue: " << GRpcResponsesSizeQueue_.size());
<< ", queue: " << FlowControl_.QueueSize());

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
resp->Record.SetFreeSpace(freeSpace);
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(ev->Sender, resp.Release());
}
Expand Down Expand Up @@ -411,9 +404,9 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
TInstant now = TAppData::TimeProvider->Now();
TDuration timeout;
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "Got timeout event, InactiveClientTimeout: " << InactiveClientTimeout_
<< " GRpcResponsesSizeQueue: " << GRpcResponsesSizeQueue_.size());
<< " GRpcResponsesSizeQueue: " << FlowControl_.QueueSize());

if (InactiveClientTimeout_ && GRpcResponsesSizeQueue_.size() > 0) {
if (InactiveClientTimeout_ && FlowControl_.QueueSize() > 0) {
TDuration processTime = now - LastDataStreamTimestamp_;
if (processTime >= InactiveClientTimeout_) {
auto message = TStringBuilder() << this->SelfId() << " Client cannot process data in " << processTime
Expand Down Expand Up @@ -477,13 +470,12 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ

private:
std::shared_ptr<TEvStreamExecuteScanQueryRequest> Request_;
const ui64 RpcBufferSize_;
TRpcFlowControlState FlowControl_;
TMaybe<ui64> LastSeqNo_;
i64 AckedFreeSpaceBytes_ = 0;

TDuration InactiveClientTimeout_;
TQueue<ui64> GRpcResponsesSizeQueue_;
ui64 GRpcResponsesSize_ = 0;
TInstant LastDataStreamTimestamp_;
TMaybe<ui64> WaitOnSeqNo_;

TSchedulerCookieHolder TimeoutTimerCookieHolder_;

Expand Down
Loading

0 comments on commit d6142ce

Please sign in to comment.