Skip to content

Commit

Permalink
remove result channel proxies (#11382)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit committed Feb 1, 2025
1 parent 61dc134 commit 5c9ab0b
Show file tree
Hide file tree
Showing 17 changed files with 117 additions and 64 deletions.
4 changes: 1 addition & 3 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ struct TProducerState {
ui64 ChannelId = 0;

void SendAck(const NActors::TActorIdentity& actor) const {
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*LastSeqNo);
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo, ChannelId);
resp->Record.SetFreeSpace(AckedFreeSpaceBytes);
resp->Record.SetChannelId(ChannelId);

actor.Send(ActorId, resp.Release());
}
Expand Down
10 changes: 4 additions & 6 deletions ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,10 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
<< ", freeSpace: " << freeSpaceBytes
<< ", to: " << ExecuterActorId_);

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*LastSeqNo_);
// scan query has single result set, so it's ok to put zero as channelId here.
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo_, 0);
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(ExecuterActorId_, resp.Release());

AckedFreeSpaceBytes_ = freeSpaceBytes;
}
}
Expand Down Expand Up @@ -326,6 +324,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
}

void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {

auto& record = ev->Get()->Record;
NYql::TIssues issues = ev->Get()->GetIssues();

Expand Down Expand Up @@ -361,8 +360,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
<< ", to: " << ev->Sender
<< ", queue: " << FlowControl_.QueueSize());

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(ev->Sender, resp.Release());
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,7 @@ class TStreamExecuteYqlScriptRPC
<< ", to: " << ev->Sender
<< ", queue: " << FlowControl_.QueueSize());

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(ev->Sender, resp.Release());
Expand Down Expand Up @@ -320,8 +319,7 @@ class TStreamExecuteYqlScriptRPC
<< ", freeSpace: " << freeSpaceBytes
<< ", to: " << GatewayRequestHandlerActorId_);

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*LastSeqNo_);
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo_, 0);
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(GatewayRequestHandlerActorId_, resp.Release());
Expand Down
14 changes: 13 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,19 @@ struct TEvKqpExecuter {
TKqpExecuterEvents::EvStreamData> {};

struct TEvStreamDataAck : public TEventPB<TEvStreamDataAck, NKikimrKqp::TEvExecuterStreamDataAck,
TKqpExecuterEvents::EvStreamDataAck> {};
TKqpExecuterEvents::EvStreamDataAck>
{
friend class TEventPBBase;
explicit TEvStreamDataAck(ui64 seqno, ui64 channelId)
{
Record.SetSeqNo(seqno);
Record.SetChannelId(channelId);
}

private:
// using a little hack to hide default empty constructor
TEvStreamDataAck() = default;
};

struct TEvStreamProfile : public TEventPB<TEvStreamProfile, NKikimrKqp::TEvExecuterStreamProfile,
TKqpExecuterEvents::EvStreamProfile> {};
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/executer_actor/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ SRCS(
kqp_partition_helper.cpp
kqp_planner.cpp
kqp_planner_strategy.cpp
<<<<<<< HEAD
kqp_shards_resolver.cpp
kqp_result_channel.cpp
=======
>>>>>>> 667a30d8c2d... remove result channel proxies (#11382)
kqp_table_resolver.cpp
kqp_tasks_graph.cpp
kqp_tasks_validate.cpp
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,8 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase<
ResultSet.set_truncated(true);
}

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetEnough(truncated);
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
resp->Record.SetFreeSpace(ResultSetBytesLimit);
ctx.Send(ev->Sender, resp.Release());
}
Expand Down Expand Up @@ -497,8 +496,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase<
}
}

auto response = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto response = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
response->Record.SetFreeSpace(SizeLimit && SizeLimit < std::numeric_limits<i64>::max() ? SizeLimit : std::numeric_limits<i64>::max());
Send(ev->Sender, response.Release());
}
Expand Down
5 changes: 1 addition & 4 deletions ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@ struct TProducerState {
ui64 ChannelId = 0;

void SendAck(const NActors::TActorIdentity& actor) const {
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*LastSeqNo);
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo, ChannelId);
resp->Record.SetFreeSpace(AckedFreeSpaceBytes);
resp->Record.SetChannelId(ChannelId);

actor.Send(ActorId, resp.Release());
}

Expand Down
14 changes: 5 additions & 9 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1404,9 +1404,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ASSERT(record.GetResultSet().rows().size() == 1);
result = 1;

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -1472,9 +1471,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -1535,9 +1533,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ASSERT(record.GetResultSet().rows().size() == 0);
hasResult = true;

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -1606,9 +1603,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -2210,7 +2206,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

auto result = CollectStreamResult(it);
auto ast = result.QueryStats->Getquery_ast();

pushdown = ast.find("KqpOlapFilter") != std::string::npos;
} else {
// Error means that predicate not pushed down
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2414,9 +2414,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record;
Y_ASSERT(record.GetResultSet().rows().size() == 0);

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(record.GetSeqNo());
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return true;
}
Expand Down
9 changes: 4 additions & 5 deletions ydb/core/kqp/ut/scan/kqp_split_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,8 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
collectedKeys->push_back(row.items(0).uint64_value());
}

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
resp->Record.SetSeqNo(record.GetSeqNo());
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return true;
}
Expand Down Expand Up @@ -401,7 +400,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
} else if (testActorType == ETestActorType::StreamLookup) {
InterceptStreamLookupActorPipeCache(MakePipePerNodeCacheID(false));
}

if (providedServer) {
Server = providedServer;
} else {
Expand Down Expand Up @@ -890,7 +889,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
);

shim->ReadsReceived.WaitI();

UNIT_ASSERT_EQUAL(shards.size(), 1);
auto undelivery = MakeHolder<TEvPipeCache::TEvDeliveryProblem>(shards[0], true);

Expand Down Expand Up @@ -937,7 +936,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
);

shim->ReadsReceived.WaitI();

UNIT_ASSERT_EQUAL(shards.size(), 1);
auto undelivery = MakeHolder<TEvPipeCache::TEvDeliveryProblem>(shards[0], true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ class TQueryRunnerActor : public TActorBootstrapped<TQueryRunnerActor> {
SendNotification<TEvQueryRunner::TEvExecutionStarted>();
}

auto response = std::make_unique<TEvKqpExecuter::TEvStreamDataAck>();
response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto response = std::make_unique<TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
response->Record.SetFreeSpace(std::numeric_limits<i64>::max());

auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,7 @@ class TPgwireKqpProxy : public TActorBootstrapped<Base> {
TBase::Send(EventRequest_->Sender, response.release(), 0, EventRequest_->Cookie);

BLOG_D(this->SelfId() << " Send stream data ack to " << ev->Sender);
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetFreeSpace(std::numeric_limits<i64>::max());
TBase::Send(ev->Sender, resp.Release());
}
Expand Down
Loading

0 comments on commit 5c9ab0b

Please sign in to comment.