diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 5ddc3dea0573..b95992825535 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -11,6 +11,8 @@ #include #include +#include + namespace NKikimr::NGRpcService { @@ -25,6 +27,23 @@ struct TProducerState { TMaybe LastSeqNo; i64 AckedFreeSpaceBytes = 0; TActorId ActorId; + ui64 ChannelId = 0; + + void SendAck(const NActors::TActorIdentity& actor) const { + auto resp = MakeHolder(*LastSeqNo, ChannelId); + resp->Record.SetFreeSpace(AckedFreeSpaceBytes); + + actor.Send(ActorId, resp.Release()); + } + + bool ResumeIfStopped(const NActors::TActorIdentity& actor, i64 freeSpaceBytes) { + if (LastSeqNo && AckedFreeSpaceBytes <= 0) { + AckedFreeSpaceBytes = freeSpaceBytes; + SendAck(actor); + return true; + } + return false; + } }; bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::TransactionSettings& to, @@ -163,7 +182,9 @@ class TExecuteQueryRPC : public TActorBootstrapped { TExecuteQueryRPC(TEvExecuteQueryRequest* request, ui64 inflightLimitBytes) : Request_(request) - , FlowControl_(inflightLimitBytes) {} + , FlowControl_(inflightLimitBytes) + , Span_(TWilsonGrpc::RequestActor, request->GetWilsonTraceId(), + "RequestProxy.RpcOperationRequestActor", NWilson::EFlags::AUTO_END) {} void Bootstrap(const TActorContext &ctx) { this->Become(&TExecuteQueryRPC::StateWork); @@ -255,7 +276,7 @@ class TExecuteQueryRPC : public TActorBootstrapped { settings, req->pool_id()); - if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) { + if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId())) { NYql::TIssues issues; issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error")); ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, std::move(issues)); @@ -286,28 +307,16 @@ class TExecuteQueryRPC : public TActorBootstrapped { } const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes(); - - for (auto& pair : StreamChannels_) { - const auto& channelId = pair.first; - auto& channel = pair.second; - - if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes <= 0) { - LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, " - << ", channel: " << channelId - << ", seqNo: " << channel.LastSeqNo - << ", freeSpace: " << freeSpaceBytes); - - auto resp = MakeHolder(); - resp->Record.SetSeqNo(*channel.LastSeqNo); - resp->Record.SetFreeSpace(freeSpaceBytes); - resp->Record.SetChannelId(channelId); - - ctx.Send(channel.ActorId, resp.Release()); - - channel.AckedFreeSpaceBytes = freeSpaceBytes; + if (freeSpaceBytes > 0) { + for (auto& [channelId, channel] : StreamChannels_) { + if (channel.ResumeIfStopped(SelfId(), freeSpaceBytes)) { + LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, " + << ", channel: " << channelId + << ", seqNo: " << channel.LastSeqNo + << ", freeSpace: " << freeSpaceBytes); + } } } - } void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) { @@ -328,6 +337,7 @@ class TExecuteQueryRPC : public TActorBootstrapped { channel.ActorId = ev->Sender; channel.LastSeqNo = ev->Get()->Record.GetSeqNo(); channel.AckedFreeSpaceBytes = freeSpaceBytes; + channel.ChannelId = ev->Get()->Record.GetChannelId(); LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Send stream data ack" << ", seqNo: " << ev->Get()->Record.GetSeqNo() @@ -335,12 +345,7 @@ class TExecuteQueryRPC : public TActorBootstrapped { << ", to: " << ev->Sender << ", queue: " << FlowControl_.QueueSize()); - auto resp = MakeHolder(); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); - resp->Record.SetFreeSpace(freeSpaceBytes); - resp->Record.SetChannelId(ev->Get()->Record.GetChannelId()); - - ctx.Send(channel.ActorId, resp.Release()); + channel.SendAck(SelfId()); } void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) { @@ -415,6 +420,7 @@ class TExecuteQueryRPC : public TActorBootstrapped { void ReplySerializedAndFinishStream(Ydb::StatusIds::StatusCode status, TString&& buf) { const auto finishStreamFlag = NYdbGrpc::IRequestContextBase::EStreamCtrl::FINISH; Request_->SendSerializedResult(std::move(buf), status, finishStreamFlag); + NWilson::EndSpanWithStatus(Span_, status); this->PassAway(); } @@ -453,6 +459,7 @@ class TExecuteQueryRPC : public TActorBootstrapped { } else { Request_->FinishStream(status); } + NWilson::EndSpanWithStatus(Span_, status); this->PassAway(); } @@ -479,6 +486,8 @@ class TExecuteQueryRPC : public TActorBootstrapped { NKikimrKqp::EQueryAction QueryAction; TRpcFlowControlState FlowControl_; TMap StreamChannels_; + + NWilson::TSpan Span_; }; } // namespace diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 4fe754f06b25..361b81102364 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -268,12 +268,10 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped(); - resp->Record.SetSeqNo(*LastSeqNo_); + // scan query has single result set, so it's ok to put zero as channelId here. + auto resp = MakeHolder(*LastSeqNo_, 0); resp->Record.SetFreeSpace(freeSpaceBytes); - ctx.Send(ExecuterActorId_, resp.Release()); - AckedFreeSpaceBytes_ = freeSpaceBytes; } } @@ -326,6 +324,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrappedGet()->Record; NYql::TIssues issues = ev->Get()->GetIssues(); @@ -361,8 +360,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrappedSender << ", queue: " << FlowControl_.QueueSize()); - auto resp = MakeHolder(); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto resp = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(ev->Sender, resp.Release()); diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index 11716bda176e..a8be0e790572 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -279,8 +279,7 @@ class TStreamExecuteYqlScriptRPC << ", to: " << ev->Sender << ", queue: " << FlowControl_.QueueSize()); - auto resp = MakeHolder(); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto resp = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(ev->Sender, resp.Release()); @@ -320,8 +319,7 @@ class TStreamExecuteYqlScriptRPC << ", freeSpace: " << freeSpaceBytes << ", to: " << GatewayRequestHandlerActorId_); - auto resp = MakeHolder(); - resp->Record.SetSeqNo(*LastSeqNo_); + auto resp = MakeHolder(*LastSeqNo_, 0); resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(GatewayRequestHandlerActorId_, resp.Release()); diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 9c0ef4fae41e..39db34150e61 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -70,7 +70,19 @@ struct TEvKqpExecuter { TKqpExecuterEvents::EvStreamData> {}; struct TEvStreamDataAck : public TEventPB {}; + TKqpExecuterEvents::EvStreamDataAck> + { + friend class TEventPBBase; + explicit TEvStreamDataAck(ui64 seqno, ui64 channelId) + { + Record.SetSeqNo(seqno); + Record.SetChannelId(channelId); + } + + private: + // using a little hack to hide default empty constructor + TEvStreamDataAck() = default; + }; struct TEvStreamProfile : public TEventPB {}; diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 6cef7ba661d5..ccafd30948c8 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -202,9 +202,8 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase< ResultSet.set_truncated(true); } - auto resp = MakeHolder(); + auto resp = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); resp->Record.SetEnough(truncated); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(ResultSetBytesLimit); ctx.Send(ev->Sender, resp.Release()); } @@ -497,8 +496,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase< } } - auto response = MakeHolder(); - response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto response = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); response->Record.SetFreeSpace(SizeLimit && SizeLimit < std::numeric_limits::max() ? SizeLimit : std::numeric_limits::max()); Send(ev->Sender, response.Release()); } diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index 8d6d19ec55ff..289c712e5ccb 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -1032,7 +1032,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped LastSeqNo; + i64 AckedFreeSpaceBytes = 0; + TActorId ActorId; + ui64 ChannelId = 0; + + void SendAck(const NActors::TActorIdentity& actor) const { + auto resp = MakeHolder(*LastSeqNo, ChannelId); + resp->Record.SetFreeSpace(AckedFreeSpaceBytes); + actor.Send(ActorId, resp.Release()); + } + + bool ResumeIfStopped(const NActors::TActorIdentity& actor, i64 freeSpaceBytes) { + if (LastSeqNo && AckedFreeSpaceBytes <= 0) { + AckedFreeSpaceBytes = freeSpaceBytes; + SendAck(actor); + return true; + } + return false; + } +}; + class TRunScriptActor : public NActors::TActorBootstrapped { enum class ERunState { Created, @@ -62,11 +84,6 @@ class TRunScriptActor : public NActors::TActorBootstrapped { Ydb::ResultSet PendingResult; }; - struct TPendingAck { - TActorId ReplyActorId; - THolder AckEvent; - }; - public: TRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration, TDuration leaseDuration, TDuration resultsTtl, NKikimrConfig::TQueryServiceConfig&& queryServiceConfig, TIntrusivePtr counters) : ExecutionId(executionId) @@ -264,35 +281,30 @@ class TRunScriptActor : public NActors::TActorBootstrapped { PassAway(); } - void SendStreamDataResponse() { - if (PendingAcks.empty()) { - return; + bool ShouldSaveResult(size_t resultSetId) const { + if (SaveResultInflight) { + return false; } - if (PendingResultSetsSize > RUN_SCRIPT_ACTOR_BUFFER_SIZE) { - // Try to save any pending result - SaveResult(); + const TResultSetInfo& resultInfo = ResultSetInfos[resultSetId]; + if (!resultInfo.PendingResult.rows_size()) { + return false; } - - if (PendingResultSetsSize <= RUN_SCRIPT_ACTOR_BUFFER_SIZE) { - while (!PendingAcks.empty()) { - auto response = std::move(PendingAcks.front()); - PendingAcks.pop(); - - LOG_D("Send stream data ack" - << ", seqNo: " << response.AckEvent->Record.GetSeqNo() - << ", to: " << response.ReplyActorId); - - Send(response.ReplyActorId, response.AckEvent.Release()); - } + if (resultInfo.Truncated || !IsExecuting()) { + return true; } + return resultInfo.PendingResult.rows_size() >= MIN_SAVE_RESULT_BATCH_ROWS || resultInfo.ByteCount - resultInfo.AccumulatedSize >= MIN_SAVE_RESULT_BATCH_SIZE; } - void SaveResult(size_t resultSetId) { - if (SaveResultInflight) { - return; + size_t GetBytesToSave(size_t resultSetId) const { + const TResultSetInfo& resultInfo = ResultSetInfos[resultSetId]; + if (!resultInfo.PendingResult.rows_size()) { + return 0; } + return resultInfo.ByteCount - resultInfo.AccumulatedSize; + } + void SaveResult(size_t resultSetId) { if (!ExpireAt && ResultsTtl > TDuration::Zero()) { ExpireAt = TInstant::Now() + ResultsTtl; } @@ -300,7 +312,9 @@ class TRunScriptActor : public NActors::TActorBootstrapped { auto& resultSetInfo = ResultSetInfos[resultSetId]; Register(CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, resultSetId, ExpireAt, resultSetInfo.FirstRowId, resultSetInfo.AccumulatedSize, std::move(resultSetInfo.PendingResult))); SaveResultInflight++; - PendingResultSetsSize -= resultSetInfo.ByteCount - resultSetInfo.AccumulatedSize; + const ui64 bytes = resultSetInfo.ByteCount - resultSetInfo.AccumulatedSize; + PendingResultSetsSize -= bytes; + SaveResultInflightBytes = bytes; resultSetInfo.FirstRowId = resultSetInfo.RowCount; resultSetInfo.AccumulatedSize = resultSetInfo.ByteCount; resultSetInfo.PendingResult = Ydb::ResultSet(); @@ -308,7 +322,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped { void SaveResult() { for (size_t resultSetId = 0; resultSetId < ResultSetInfos.size(); ++resultSetId) { - if (ResultSetInfos[resultSetId].PendingResult.rows_size()) { + if (ShouldSaveResult(resultSetId)) { SaveResult(resultSetId); break; } @@ -319,13 +333,6 @@ class TRunScriptActor : public NActors::TActorBootstrapped { if (RunState != ERunState::Running) { return; } - auto resp = MakeHolder(); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); - resp->Record.SetFreeSpace(QueryServiceConfig.GetScriptResultSizeLimit() - ? QueryServiceConfig.GetScriptResultSizeLimit() > std::numeric_limits::max() - ? std::numeric_limits::max() - : static_cast(QueryServiceConfig.GetScriptResultSizeLimit()) - : std::numeric_limits::max()); LOG_D("Compute stream data" << ", seqNo: " << ev->Get()->Record.GetSeqNo() @@ -341,6 +348,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped { ResultSetInfos.resize(resultSetIndex + 1); } + bool savedResult = false; auto& resultSetInfo = ResultSetInfos[resultSetIndex]; if (IsExecuting() && !resultSetInfo.Truncated) { auto& rowCount = resultSetInfo.RowCount; @@ -392,13 +400,25 @@ class TRunScriptActor : public NActors::TActorBootstrapped { } } - if (ShouldSaveResult(resultSetInfo)) { + if (ShouldSaveResult(resultSetIndex)) { + savedResult = true; SaveResult(resultSetIndex); } } - PendingAcks.push({.ReplyActorId = ev->Sender, .AckEvent = std::move(resp)}); - SendStreamDataResponse(); + const i64 freeSpaceBytes = GetFreeSpaceBytes(); + const ui32 channelId = ev->Get()->Record.GetChannelId(); + const ui64 seqNo = ev->Get()->Record.GetSeqNo(); + auto& channel = StreamChannels[channelId]; + channel.ActorId = ev->Sender; + channel.LastSeqNo = seqNo; + channel.AckedFreeSpaceBytes = freeSpaceBytes; + channel.ChannelId = channelId; + channel.SendAck(SelfId()); + + if (!savedResult && SaveResultInflight == 0) { + CheckInflight(); + } } void SaveResultMeta() { @@ -518,20 +538,28 @@ class TRunScriptActor : public NActors::TActorBootstrapped { void Handle(TEvSaveScriptResultFinished::TPtr& ev) { SaveResultInflight--; + SaveResultInflightBytes = 0; if (Status == Ydb::StatusIds::SUCCESS || Status == Ydb::StatusIds::STATUS_CODE_UNSPECIFIED) { if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { Status = ev->Get()->Status; Issues.AddIssues(ev->Get()->Issues); } else { - for (size_t resultSetId = 0; resultSetId < ResultSetInfos.size(); ++resultSetId) { - if (ShouldSaveResult(ResultSetInfos[resultSetId])) { - SaveResult(resultSetId); - break; - } + SaveResult(); + } + } + + const i64 freeSpaceBytes = GetFreeSpaceBytes(); + if (freeSpaceBytes > 0 && IsExecuting()) { + for (auto& [channelId, channel] : StreamChannels) { + if (channel.ResumeIfStopped(SelfId(), freeSpaceBytes)) { + LOG_D("Resume execution, " + << ", channel: " << channelId + << ", seqNo: " << channel.LastSeqNo + << ", freeSpace: " << freeSpaceBytes); } } } - SendStreamDataResponse(); + CheckInflight(); } @@ -553,21 +581,39 @@ class TRunScriptActor : public NActors::TActorBootstrapped { } void CheckInflight() { - if (Status == Ydb::StatusIds::STATUS_CODE_UNSPECIFIED || (Status == Ydb::StatusIds::SUCCESS && RunState == ERunState::Finishing && (SaveResultMetaInflight || SaveResultInflight))) { - // waiting for script completion - return; - } - - if (PendingResultSetsSize) { - // Complete results saving - SaveResult(); + if (SaveResultMetaInflight || SaveResultInflight) { return; } - if (!LeaseUpdateQueryRunning) { - RunScriptExecutionFinisher(); + const int freeSpaceBytes = GetFreeSpaceBytes(); + if (freeSpaceBytes < 0 && IsExecuting()) { + // try to free the space + size_t maxBytesToSave = 0; + size_t maxResultSet = 0; + for (size_t resultSetId = 0; resultSetId < ResultSetInfos.size(); ++resultSetId) { + if (size_t bytesToSave = GetBytesToSave(resultSetId); bytesToSave > maxBytesToSave) { + maxBytesToSave = bytesToSave; + maxResultSet = resultSetId; + } + } + SaveResult(maxResultSet); } else { - FinishAfterLeaseUpdate = true; + if (Status == Ydb::StatusIds::STATUS_CODE_UNSPECIFIED || IsExecuting()) { + // waiting for script completion + return; + } + + if (PendingResultSetsSize) { + // Complete results saving + SaveResult(); + return; + } + + if (!LeaseUpdateQueryRunning) { + RunScriptExecutionFinisher(); + } else { + FinishAfterLeaseUpdate = true; + } } } @@ -597,11 +643,8 @@ class TRunScriptActor : public NActors::TActorBootstrapped { && RunState != ERunState::Cancelling; } - static bool ShouldSaveResult(TResultSetInfo& resultInfo) { - if (!resultInfo.PendingResult.rows_size()) { - return false; - } - return resultInfo.Truncated || resultInfo.PendingResult.rows_size() >= MIN_SAVE_RESULT_BATCH_ROWS || resultInfo.ByteCount - resultInfo.AccumulatedSize >= MIN_SAVE_RESULT_BATCH_SIZE; + i64 GetFreeSpaceBytes() const { + return static_cast(RUN_SCRIPT_ACTOR_BUFFER_SIZE) - static_cast(PendingResultSetsSize) - static_cast(SaveResultInflightBytes); } private: @@ -628,10 +671,11 @@ class TRunScriptActor : public NActors::TActorBootstrapped { // Result std::vector ResultSetInfos; - std::queue PendingAcks; + TMap StreamChannels; TMaybe ExpireAt; NJson::TJsonValue ResultSetMetas; ui32 SaveResultInflight = 0; + ui64 SaveResultInflightBytes = 0; ui32 SaveResultMetaInflight = 0; bool PendingResultMeta = false; ui64 PendingResultSetsSize = 0; diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 002053da8e4c..07f5465c1a4f 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1404,9 +1404,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ASSERT(record.GetResultSet().rows().size() == 1); result = 1; - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -1472,9 +1471,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -1535,9 +1533,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ASSERT(record.GetResultSet().rows().size() == 0); hasResult = true; - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -1606,9 +1603,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -2210,7 +2206,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto result = CollectStreamResult(it); auto ast = result.QueryStats->Getquery_ast(); - + pushdown = ast.find("KqpOlapFilter") != std::string::npos; } else { // Error means that predicate not pushed down diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index 01aa8580bb25..ba6418420dba 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -2414,9 +2414,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto& record = ev->Get()->Record; Y_ASSERT(record.GetResultSet().rows().size() == 0); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(record.GetSeqNo()); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return true; } diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp index 5e968ec37d08..2bd685009417 100644 --- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp @@ -326,9 +326,8 @@ Y_UNIT_TEST_SUITE(KqpSplit) { collectedKeys->push_back(row.items(0).uint64_value()); } - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(record.GetSeqNo()); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return true; } @@ -401,7 +400,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) { } else if (testActorType == ETestActorType::StreamLookup) { InterceptStreamLookupActorPipeCache(MakePipePerNodeCacheID(false)); } - + if (providedServer) { Server = providedServer; } else { @@ -890,7 +889,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) { ); shim->ReadsReceived.WaitI(); - + UNIT_ASSERT_EQUAL(shards.size(), 1); auto undelivery = MakeHolder(shards[0], true); @@ -937,7 +936,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) { ); shim->ReadsReceived.WaitI(); - + UNIT_ASSERT_EQUAL(shards.size(), 1); auto undelivery = MakeHolder(shards[0], true); diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp index 7ff63e246426..a15db47d2603 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp @@ -86,8 +86,7 @@ class TQueryRunnerActor : public TActorBootstrapped { SendNotification(); } - auto response = std::make_unique(); - response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto response = std::make_unique(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); response->Record.SetFreeSpace(std::numeric_limits::max()); auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex(); diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp index 5bccb65a32ae..0311a4c93d4f 100644 --- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp +++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp @@ -273,8 +273,7 @@ class TPgwireKqpProxy : public TActorBootstrapped { TBase::Send(EventRequest_->Sender, response.release(), 0, EventRequest_->Cookie); BLOG_D(this->SelfId() << " Send stream data ack to " << ev->Sender); - auto resp = MakeHolder(); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto resp = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); resp->Record.SetFreeSpace(std::numeric_limits::max()); TBase::Send(ev->Sender, resp.Release()); } diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp index 40961b9025b7..3c28ee748124 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp @@ -116,9 +116,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -213,9 +212,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -316,9 +314,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); @@ -434,9 +431,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); @@ -564,9 +560,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); @@ -692,9 +687,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { } } - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -802,9 +796,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(record.GetSeqNo()); resp->Record.SetFreeSpace(100); ctx.Send(ev->Sender, resp.Release()); break; diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 5073c2700d49..d668a65862fa 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -1309,8 +1309,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { if (msg->Record.GetResultSet().rows().size()) { observedResults.emplace_back(FormatResult(msg->Record.GetResultSet())); } - auto resp = MakeHolder(); - resp->Record.SetSeqNo(msg->Record.GetSeqNo()); + auto resp = MakeHolder(msg->Record.GetSeqNo(), msg->Record.GetChannelId()); resp->Record.SetFreeSpace(1); ctx.Send(ev->Sender, resp.Release()); break; diff --git a/ydb/core/viewer/viewer_query.h b/ydb/core/viewer/viewer_query.h index a5af8046d49c..a99b71d668a4 100644 --- a/ydb/core/viewer/viewer_query.h +++ b/ydb/core/viewer/viewer_query.h @@ -567,8 +567,7 @@ class TJsonQuery : public TViewerPipeClient { ResultSets[data.GetQueryResultIndex()].emplace_back() = std::move(*data.MutableResultSet()); } - THolder ack = MakeHolder(); - ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + THolder ack = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); if (TotalRows >= LimitRows) { ack->Record.SetEnough(true); } diff --git a/ydb/core/viewer/viewer_query_old.h b/ydb/core/viewer/viewer_query_old.h index 273ba1a4d0ee..bc25baf431e5 100644 --- a/ydb/core/viewer/viewer_query_old.h +++ b/ydb/core/viewer/viewer_query_old.h @@ -440,8 +440,7 @@ class TJsonQueryOld : public TViewerPipeClient { ResultSets.emplace_back(); ResultSets.back() = std::move(data.GetResultSet()); - THolder ack = MakeHolder(); - ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + THolder ack = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); Send(ev->Sender, ack.Release()); } diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 5e962c59157e..4dc3d94c6800 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -42,8 +42,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped(); - response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto response = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); response->Record.SetFreeSpace(ResultSizeLimit_); auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();