Skip to content

Commit

Permalink
remove result channel proxies (#11382)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit committed Feb 1, 2025
1 parent 61dc134 commit eb0f09e
Show file tree
Hide file tree
Showing 16 changed files with 46 additions and 64 deletions.
4 changes: 1 addition & 3 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ struct TProducerState {
ui64 ChannelId = 0;

void SendAck(const NActors::TActorIdentity& actor) const {
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*LastSeqNo);
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo, ChannelId);
resp->Record.SetFreeSpace(AckedFreeSpaceBytes);
resp->Record.SetChannelId(ChannelId);

actor.Send(ActorId, resp.Release());
}
Expand Down
10 changes: 4 additions & 6 deletions ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,10 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
<< ", freeSpace: " << freeSpaceBytes
<< ", to: " << ExecuterActorId_);

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*LastSeqNo_);
// scan query has single result set, so it's ok to put zero as channelId here.
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo_, 0);
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(ExecuterActorId_, resp.Release());

AckedFreeSpaceBytes_ = freeSpaceBytes;
}
}
Expand Down Expand Up @@ -326,6 +324,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
}

void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {

auto& record = ev->Get()->Record;
NYql::TIssues issues = ev->Get()->GetIssues();

Expand Down Expand Up @@ -361,8 +360,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
<< ", to: " << ev->Sender
<< ", queue: " << FlowControl_.QueueSize());

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(ev->Sender, resp.Release());
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,7 @@ class TStreamExecuteYqlScriptRPC
<< ", to: " << ev->Sender
<< ", queue: " << FlowControl_.QueueSize());

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(ev->Sender, resp.Release());
Expand Down Expand Up @@ -320,8 +319,7 @@ class TStreamExecuteYqlScriptRPC
<< ", freeSpace: " << freeSpaceBytes
<< ", to: " << GatewayRequestHandlerActorId_);

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*LastSeqNo_);
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo_, 0);
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(GatewayRequestHandlerActorId_, resp.Release());
Expand Down
14 changes: 13 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,19 @@ struct TEvKqpExecuter {
TKqpExecuterEvents::EvStreamData> {};

struct TEvStreamDataAck : public TEventPB<TEvStreamDataAck, NKikimrKqp::TEvExecuterStreamDataAck,
TKqpExecuterEvents::EvStreamDataAck> {};
TKqpExecuterEvents::EvStreamDataAck>
{
friend class TEventPBBase;
explicit TEvStreamDataAck(ui64 seqno, ui64 channelId)
{
Record.SetSeqNo(seqno);
Record.SetChannelId(channelId);
}

private:
// using a little hack to hide default empty constructor
TEvStreamDataAck() = default;
};

struct TEvStreamProfile : public TEventPB<TEvStreamProfile, NKikimrKqp::TEvExecuterStreamProfile,
TKqpExecuterEvents::EvStreamProfile> {};
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,8 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase<
ResultSet.set_truncated(true);
}

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetEnough(truncated);
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
resp->Record.SetFreeSpace(ResultSetBytesLimit);
ctx.Send(ev->Sender, resp.Release());
}
Expand Down Expand Up @@ -497,8 +496,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase<
}
}

auto response = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto response = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
response->Record.SetFreeSpace(SizeLimit && SizeLimit < std::numeric_limits<i64>::max() ? SizeLimit : std::numeric_limits<i64>::max());
Send(ev->Sender, response.Release());
}
Expand Down
5 changes: 1 addition & 4 deletions ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@ struct TProducerState {
ui64 ChannelId = 0;

void SendAck(const NActors::TActorIdentity& actor) const {
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*LastSeqNo);
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo, ChannelId);
resp->Record.SetFreeSpace(AckedFreeSpaceBytes);
resp->Record.SetChannelId(ChannelId);

actor.Send(ActorId, resp.Release());
}

Expand Down
14 changes: 5 additions & 9 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1404,9 +1404,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ASSERT(record.GetResultSet().rows().size() == 1);
result = 1;

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -1472,9 +1471,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -1535,9 +1533,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ASSERT(record.GetResultSet().rows().size() == 0);
hasResult = true;

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -1606,9 +1603,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -2210,7 +2206,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

auto result = CollectStreamResult(it);
auto ast = result.QueryStats->Getquery_ast();

pushdown = ast.find("KqpOlapFilter") != std::string::npos;
} else {
// Error means that predicate not pushed down
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2414,9 +2414,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record;
Y_ASSERT(record.GetResultSet().rows().size() == 0);

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(record.GetSeqNo());
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return true;
}
Expand Down
9 changes: 4 additions & 5 deletions ydb/core/kqp/ut/scan/kqp_split_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,8 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
collectedKeys->push_back(row.items(0).uint64_value());
}

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(record.GetSeqNo());
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return true;
}
Expand Down Expand Up @@ -401,7 +400,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
} else if (testActorType == ETestActorType::StreamLookup) {
InterceptStreamLookupActorPipeCache(MakePipePerNodeCacheID(false));
}

if (providedServer) {
Server = providedServer;
} else {
Expand Down Expand Up @@ -890,7 +889,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
);

shim->ReadsReceived.WaitI();

UNIT_ASSERT_EQUAL(shards.size(), 1);
auto undelivery = MakeHolder<TEvPipeCache::TEvDeliveryProblem>(shards[0], true);

Expand Down Expand Up @@ -937,7 +936,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
);

shim->ReadsReceived.WaitI();

UNIT_ASSERT_EQUAL(shards.size(), 1);
auto undelivery = MakeHolder<TEvPipeCache::TEvDeliveryProblem>(shards[0], true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ class TQueryRunnerActor : public TActorBootstrapped<TQueryRunnerActor> {
SendNotification<TEvQueryRunner::TEvExecutionStarted>();
}

auto response = std::make_unique<TEvKqpExecuter::TEvStreamDataAck>();
response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto response = std::make_unique<TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
response->Record.SetFreeSpace(std::numeric_limits<i64>::max());

auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,7 @@ class TPgwireKqpProxy : public TActorBootstrapped<Base> {
TBase::Send(EventRequest_->Sender, response.release(), 0, EventRequest_->Cookie);

BLOG_D(this->SelfId() << " Send stream data ack to " << ev->Sender);
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetFreeSpace(std::numeric_limits<i64>::max());
TBase::Send(ev->Sender, resp.Release());
}
Expand Down
21 changes: 7 additions & 14 deletions ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -213,9 +212,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -316,9 +314,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);

runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
Expand Down Expand Up @@ -434,9 +431,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);

runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
Expand Down Expand Up @@ -564,9 +560,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);

runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
Expand Down Expand Up @@ -692,9 +687,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
}

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -802,9 +796,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(record.GetSeqNo());
resp->Record.SetFreeSpace(100);
ctx.Send(ev->Sender, resp.Release());
break;
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/tx/datashard/datashard_ut_volatile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1309,8 +1309,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
if (msg->Record.GetResultSet().rows().size()) {
observedResults.emplace_back(FormatResult(msg->Record.GetResultSet()));
}
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(msg->Record.GetSeqNo());
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(msg->Record.GetSeqNo(), msg->Record.GetChannelId());
resp->Record.SetFreeSpace(1);
ctx.Send(ev->Sender, resp.Release());
break;
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/viewer/viewer_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,7 @@ class TJsonQuery : public TViewerPipeClient {
ResultSets[data.GetQueryResultIndex()].emplace_back() = std::move(*data.MutableResultSet());
}

THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
if (TotalRows >= LimitRows) {
ack->Record.SetEnough(true);
}
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/viewer/viewer_query_old.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,7 @@ class TJsonQueryOld : public TViewerPipeClient {
ResultSets.emplace_back();
ResultSets.back() = std::move(data.GetResultSet());

THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
Send(ev->Sender, ack.Release());
}

Expand Down
3 changes: 1 addition & 2 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
)

void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
auto response = MakeHolder<NKikimr::NKqp::TEvKqpExecuter::TEvStreamDataAck>();
response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto response = MakeHolder<NKikimr::NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
response->Record.SetFreeSpace(ResultSizeLimit_);

auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();
Expand Down

0 comments on commit eb0f09e

Please sign in to comment.