From 5c9ab0b958709af9edc7924ca3293e827995448e Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Tue, 12 Nov 2024 13:48:46 +0300 Subject: [PATCH] remove result channel proxies (#11382) --- .../grpc_services/query/rpc_execute_query.cpp | 4 +- .../rpc_stream_execute_scan_query.cpp | 10 +-- .../rpc_stream_execute_yql_script.cpp | 6 +- ydb/core/kqp/executer_actor/kqp_executer.h | 14 ++- ydb/core/kqp/executer_actor/ya.make | 3 + ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 6 +- .../run_script_actor/kqp_run_script_actor.cpp | 5 +- ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 14 ++- ydb/core/kqp/ut/scan/kqp_scan_ut.cpp | 3 +- ydb/core/kqp/ut/scan/kqp_split_ut.cpp | 9 +- .../common/kqp_workload_service_ut_common.cpp | 3 +- ydb/core/local_pgwire/pgwire_kqp_proxy.cpp | 3 +- .../tx/datashard/datashard_ut_kqp_scan.cpp | 89 ++++++++++++++++--- .../tx/datashard/datashard_ut_volatile.cpp | 3 +- ydb/core/viewer/viewer_query.h | 3 +- ydb/core/viewer/viewer_query_old.h | 3 +- ydb/tests/tools/kqprun/src/actors.cpp | 3 +- 17 files changed, 117 insertions(+), 64 deletions(-) diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 0c8c59e8cedf..b95992825535 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -30,10 +30,8 @@ struct TProducerState { ui64 ChannelId = 0; void SendAck(const NActors::TActorIdentity& actor) const { - auto resp = MakeHolder(); - resp->Record.SetSeqNo(*LastSeqNo); + auto resp = MakeHolder(*LastSeqNo, ChannelId); resp->Record.SetFreeSpace(AckedFreeSpaceBytes); - resp->Record.SetChannelId(ChannelId); actor.Send(ActorId, resp.Release()); } diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 4fe754f06b25..361b81102364 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -268,12 +268,10 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped(); - resp->Record.SetSeqNo(*LastSeqNo_); + // scan query has single result set, so it's ok to put zero as channelId here. + auto resp = MakeHolder(*LastSeqNo_, 0); resp->Record.SetFreeSpace(freeSpaceBytes); - ctx.Send(ExecuterActorId_, resp.Release()); - AckedFreeSpaceBytes_ = freeSpaceBytes; } } @@ -326,6 +324,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrappedGet()->Record; NYql::TIssues issues = ev->Get()->GetIssues(); @@ -361,8 +360,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrappedSender << ", queue: " << FlowControl_.QueueSize()); - auto resp = MakeHolder(); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto resp = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(ev->Sender, resp.Release()); diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index 11716bda176e..a8be0e790572 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -279,8 +279,7 @@ class TStreamExecuteYqlScriptRPC << ", to: " << ev->Sender << ", queue: " << FlowControl_.QueueSize()); - auto resp = MakeHolder(); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto resp = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(ev->Sender, resp.Release()); @@ -320,8 +319,7 @@ class TStreamExecuteYqlScriptRPC << ", freeSpace: " << freeSpaceBytes << ", to: " << GatewayRequestHandlerActorId_); - auto resp = MakeHolder(); - resp->Record.SetSeqNo(*LastSeqNo_); + auto resp = MakeHolder(*LastSeqNo_, 0); resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(GatewayRequestHandlerActorId_, resp.Release()); diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 9c0ef4fae41e..39db34150e61 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -70,7 +70,19 @@ struct TEvKqpExecuter { TKqpExecuterEvents::EvStreamData> {}; struct TEvStreamDataAck : public TEventPB {}; + TKqpExecuterEvents::EvStreamDataAck> + { + friend class TEventPBBase; + explicit TEvStreamDataAck(ui64 seqno, ui64 channelId) + { + Record.SetSeqNo(seqno); + Record.SetChannelId(channelId); + } + + private: + // using a little hack to hide default empty constructor + TEvStreamDataAck() = default; + }; struct TEvStreamProfile : public TEventPB {}; diff --git a/ydb/core/kqp/executer_actor/ya.make b/ydb/core/kqp/executer_actor/ya.make index 9f8be9bb9feb..66979ba9da27 100644 --- a/ydb/core/kqp/executer_actor/ya.make +++ b/ydb/core/kqp/executer_actor/ya.make @@ -11,8 +11,11 @@ SRCS( kqp_partition_helper.cpp kqp_planner.cpp kqp_planner_strategy.cpp +<<<<<<< HEAD kqp_shards_resolver.cpp kqp_result_channel.cpp +======= +>>>>>>> 667a30d8c2d... remove result channel proxies (#11382) kqp_table_resolver.cpp kqp_tasks_graph.cpp kqp_tasks_validate.cpp diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 6cef7ba661d5..ccafd30948c8 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -202,9 +202,8 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase< ResultSet.set_truncated(true); } - auto resp = MakeHolder(); + auto resp = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); resp->Record.SetEnough(truncated); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(ResultSetBytesLimit); ctx.Send(ev->Sender, resp.Release()); } @@ -497,8 +496,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase< } } - auto response = MakeHolder(); - response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto response = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); response->Record.SetFreeSpace(SizeLimit && SizeLimit < std::numeric_limits::max() ? SizeLimit : std::numeric_limits::max()); Send(ev->Sender, response.Release()); } diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index 74aad1bd5ea1..ff6d93914e74 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -43,11 +43,8 @@ struct TProducerState { ui64 ChannelId = 0; void SendAck(const NActors::TActorIdentity& actor) const { - auto resp = MakeHolder(); - resp->Record.SetSeqNo(*LastSeqNo); + auto resp = MakeHolder(*LastSeqNo, ChannelId); resp->Record.SetFreeSpace(AckedFreeSpaceBytes); - resp->Record.SetChannelId(ChannelId); - actor.Send(ActorId, resp.Release()); } diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 002053da8e4c..07f5465c1a4f 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1404,9 +1404,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ASSERT(record.GetResultSet().rows().size() == 1); result = 1; - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -1472,9 +1471,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -1535,9 +1533,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ASSERT(record.GetResultSet().rows().size() == 0); hasResult = true; - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -1606,9 +1603,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -2210,7 +2206,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto result = CollectStreamResult(it); auto ast = result.QueryStats->Getquery_ast(); - + pushdown = ast.find("KqpOlapFilter") != std::string::npos; } else { // Error means that predicate not pushed down diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index 01aa8580bb25..ba6418420dba 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -2414,9 +2414,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto& record = ev->Get()->Record; Y_ASSERT(record.GetResultSet().rows().size() == 0); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(record.GetSeqNo()); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return true; } diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp index 5e968ec37d08..2bd685009417 100644 --- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp @@ -326,9 +326,8 @@ Y_UNIT_TEST_SUITE(KqpSplit) { collectedKeys->push_back(row.items(0).uint64_value()); } - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(record.GetSeqNo()); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return true; } @@ -401,7 +400,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) { } else if (testActorType == ETestActorType::StreamLookup) { InterceptStreamLookupActorPipeCache(MakePipePerNodeCacheID(false)); } - + if (providedServer) { Server = providedServer; } else { @@ -890,7 +889,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) { ); shim->ReadsReceived.WaitI(); - + UNIT_ASSERT_EQUAL(shards.size(), 1); auto undelivery = MakeHolder(shards[0], true); @@ -937,7 +936,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) { ); shim->ReadsReceived.WaitI(); - + UNIT_ASSERT_EQUAL(shards.size(), 1); auto undelivery = MakeHolder(shards[0], true); diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp index 7ff63e246426..a15db47d2603 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp @@ -86,8 +86,7 @@ class TQueryRunnerActor : public TActorBootstrapped { SendNotification(); } - auto response = std::make_unique(); - response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto response = std::make_unique(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); response->Record.SetFreeSpace(std::numeric_limits::max()); auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex(); diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp index 5bccb65a32ae..0311a4c93d4f 100644 --- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp +++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp @@ -273,8 +273,7 @@ class TPgwireKqpProxy : public TActorBootstrapped { TBase::Send(EventRequest_->Sender, response.release(), 0, EventRequest_->Cookie); BLOG_D(this->SelfId() << " Send stream data ack to " << ev->Sender); - auto resp = MakeHolder(); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto resp = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); resp->Record.SetFreeSpace(std::numeric_limits::max()); TBase::Send(ev->Sender, resp.Release()); } diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp index 40961b9025b7..4bb9a5d5bb6d 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp @@ -116,9 +116,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -213,9 +212,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -316,9 +314,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); @@ -434,9 +431,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); @@ -564,9 +560,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); @@ -692,9 +687,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { } } - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -802,9 +796,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder(); + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(record.GetSeqNo()); resp->Record.SetFreeSpace(100); ctx.Send(ev->Sender, resp.Release()); break; @@ -898,6 +891,74 @@ Y_UNIT_TEST_SUITE(KqpScan) { UNIT_ASSERT_VALUES_EQUAL(*result, 596400); } +<<<<<<< HEAD +======= + Y_UNIT_TEST(ScanPg) { + NKikimrConfig::TAppConfig appCfg; + + auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager(); + rm->SetChannelBufferSize(100); + rm->SetMinChannelBufferSize(100); + + NKikimrConfig::TFeatureFlags featureFlags; + // featureFlags.SetEnablePgSyntax(true); + featureFlags.SetEnableTablePgTypes(true); + + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(2) + .SetAppConfig(appCfg) + .SetUseRealThreads(false) + .SetFeatureFlags(featureFlags); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Columns({ + {"key", "pgint4", true, false}, + {"value", "pgint4", false, false} + }); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + ExecSQL(server, sender, FillTableQuery(true)); + + ui64 result = 0; + + auto captureEvents = [&](TAutoPtr &ev) -> auto { + + switch (ev->GetTypeRewrite()) { + case NKqp::TKqpExecuterEvents::EvStreamData: { + auto& record = ev->Get()->Record; + + Y_ASSERT(record.GetResultSet().rows().size() == 1); + Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); + result = record.GetResultSet().rows().at(0).items().at(0).int64_value(); + + auto resp = MakeHolder(record.GetSeqNo(), record.GetChannelId()); + resp->Record.SetEnough(false); + resp->Record.SetFreeSpace(100); + runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); + return TTestActorRuntime::EEventAction::DROP; + } + default: + break; + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + runtime.SetObserverFunc(captureEvents); + + auto streamSender = runtime.AllocateEdgeActor(); + SendRequest(runtime, streamSender, MakeStreamRequest(streamSender, "SELECT sum(FromPg(value)) FROM `/Root/table-1`;", false)); + auto ev = runtime.GrabEdgeEventRethrow(streamSender); + + UNIT_ASSERT_VALUES_EQUAL(result, 596400); + } + +>>>>>>> 667a30d8c2d... remove result channel proxies (#11382) } } // namespace NKqp diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 5073c2700d49..d668a65862fa 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -1309,8 +1309,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { if (msg->Record.GetResultSet().rows().size()) { observedResults.emplace_back(FormatResult(msg->Record.GetResultSet())); } - auto resp = MakeHolder(); - resp->Record.SetSeqNo(msg->Record.GetSeqNo()); + auto resp = MakeHolder(msg->Record.GetSeqNo(), msg->Record.GetChannelId()); resp->Record.SetFreeSpace(1); ctx.Send(ev->Sender, resp.Release()); break; diff --git a/ydb/core/viewer/viewer_query.h b/ydb/core/viewer/viewer_query.h index a5af8046d49c..a99b71d668a4 100644 --- a/ydb/core/viewer/viewer_query.h +++ b/ydb/core/viewer/viewer_query.h @@ -567,8 +567,7 @@ class TJsonQuery : public TViewerPipeClient { ResultSets[data.GetQueryResultIndex()].emplace_back() = std::move(*data.MutableResultSet()); } - THolder ack = MakeHolder(); - ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + THolder ack = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); if (TotalRows >= LimitRows) { ack->Record.SetEnough(true); } diff --git a/ydb/core/viewer/viewer_query_old.h b/ydb/core/viewer/viewer_query_old.h index 273ba1a4d0ee..bc25baf431e5 100644 --- a/ydb/core/viewer/viewer_query_old.h +++ b/ydb/core/viewer/viewer_query_old.h @@ -440,8 +440,7 @@ class TJsonQueryOld : public TViewerPipeClient { ResultSets.emplace_back(); ResultSets.back() = std::move(data.GetResultSet()); - THolder ack = MakeHolder(); - ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + THolder ack = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); Send(ev->Sender, ack.Release()); } diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 5e962c59157e..4dc3d94c6800 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -42,8 +42,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped(); - response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto response = MakeHolder(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); response->Record.SetFreeSpace(ResultSizeLimit_); auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();