Skip to content

Commit

Permalink
YQ-3893 Use one session read_actor <-> RD / to stable (#13649)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Jan 23, 2025
1 parent 6f80afb commit 9065dc3
Show file tree
Hide file tree
Showing 16 changed files with 934 additions and 685 deletions.
6 changes: 3 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
UpdateInterconnectSessions(ev->InterconnectSession);

TStringStream str;
LOG_ROW_DISPATCHER_INFO("TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: " << JoinSeq(", ", ev->Get()->Record.GetPartitionId()));
LOG_ROW_DISPATCHER_INFO("TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: " << JoinSeq(", ", ev->Get()->Record.GetPartitionIds()));
Metrics.IncomingRequests->Inc();

TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record};
Expand All @@ -430,7 +430,7 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC

bool hasPendingPartitions = false;
TMap<NActors::TActorId, TSet<ui64>> tmpResult;
for (auto& partitionId : request.Record.GetPartitionId()) {
for (auto& partitionId : request.Record.GetPartitionIds()) {
TTopicKey topicKey{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath()};
TPartitionKey key {topicKey, partitionId};
auto locationIt = PartitionLocations.find(key);
Expand All @@ -457,7 +457,7 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC
auto* partitionsProto = response->Record.AddPartitions();
ActorIdToProto(actorId, partitionsProto->MutableActorId());
for (auto partitionId : partitions) {
partitionsProto->AddPartitionId(partitionId);
partitionsProto->AddPartitionIds(partitionId);
}
}

Expand Down
27 changes: 13 additions & 14 deletions ydb/core/fq/libs/row_dispatcher/events/data_plane.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/actors/core/event_local.h>
#include <ydb/core/fq/libs/events/event_subspace.h>

#include <ydb/core/fq/libs/row_dispatcher/protos/events.pb.h>
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h>

#include <yql/essentials/public/issue/yql_issue.h>
#include <yql/essentials/public/purecalc/common/fwd.h>

#include <util/generic/set.h>
#include <util/generic/map.h>

namespace NFq {

NActors::TActorId RowDispatcherServiceActorId();
Expand Down Expand Up @@ -75,7 +77,7 @@ struct TEvRowDispatcher {
const std::vector<ui64>& partitionIds) {
*Record.MutableSource() = sourceParams;
for (const auto& id : partitionIds) {
Record.AddPartitionId(id);
Record.AddPartitionIds(id);
}
}
};
Expand All @@ -93,16 +95,20 @@ struct TEvRowDispatcher {
TEvStartSession() = default;
TEvStartSession(
const NYql::NPq::NProto::TDqPqTopicSource& sourceParams,
ui64 partitionId,
const std::set<ui32>& partitionIds,
const TString token,
TMaybe<ui64> readOffset,
const std::map<ui32, ui64>& readOffsets,
ui64 startingMessageTimestampMs,
const TString& queryId) {
*Record.MutableSource() = sourceParams;
Record.SetPartitionId(partitionId);
for (auto partitionId : partitionIds) {
Record.AddPartitionIds(partitionId);
}
Record.SetToken(token);
if (readOffset) {
Record.SetOffset(*readOffset);
for (const auto& [partitionId, offset] : readOffsets) {
auto* partitionOffset = Record.AddOffsets();
partitionOffset->SetPartitionId(partitionId);
partitionOffset->SetOffset(offset);
}
Record.SetStartingMessageTimestampMs(startingMessageTimestampMs);
Record.SetQueryId(queryId);
Expand Down Expand Up @@ -143,7 +149,6 @@ struct TEvRowDispatcher {
struct TEvStatistics : public NActors::TEventPB<TEvStatistics,
NFq::NRowDispatcherProto::TEvStatistics, EEv::EvStatistics> {
TEvStatistics() = default;
NActors::TActorId ReadActorId;
};

struct TEvSessionError : public NActors::TEventPB<TEvSessionError,
Expand All @@ -162,16 +167,10 @@ struct TEvRowDispatcher {

struct TEvHeartbeat : public NActors::TEventPB<TEvHeartbeat, NFq::NRowDispatcherProto::TEvHeartbeat, EEv::EvHeartbeat> {
TEvHeartbeat() = default;
TEvHeartbeat(ui32 partitionId) {
Record.SetPartitionId(partitionId);
}
};

struct TEvNoSession : public NActors::TEventPB<TEvNoSession, NFq::NRowDispatcherProto::TEvNoSession, EEv::EvNoSession> {
TEvNoSession() = default;
TEvNoSession(ui32 partitionId) {
Record.SetPartitionId(partitionId);
}
};

struct TEvGetInternalStateRequest : public NActors::TEventPB<TEvGetInternalStateRequest,
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,23 @@ struct TTopicSessionClientStatistic {
i64 UnreadRows = 0; // Current value
i64 UnreadBytes = 0; // Current value
ui64 Offset = 0; // Current value
ui64 ReadBytes = 0; // Increment / filtered
ui64 FilteredReadBytes = 0; // Increment / filtered
ui64 ReadBytes = 0; // Increment
bool IsWaiting = false; // Current value
i64 ReadLagMessages = 0; // Current value
ui64 InitialOffset = 0;
void Add(const TTopicSessionClientStatistic& stat) {
UnreadRows = stat.UnreadRows;
UnreadBytes = stat.UnreadBytes;
Offset = stat.Offset;
FilteredReadBytes += stat.FilteredReadBytes;
ReadBytes += stat.ReadBytes;
IsWaiting = stat.IsWaiting;
ReadLagMessages = stat.ReadLagMessages;
InitialOffset = stat.InitialOffset;
}
void Clear() {
FilteredReadBytes = 0;
ReadBytes = 0;
}
};
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/leader_election.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ struct TLeaderElectionMetrics {
explicit TLeaderElectionMetrics(const ::NMonitoring::TDynamicCounterPtr& counters)
: Counters(counters) {
Errors = Counters->GetCounter("LeaderElectionErrors", true);
LeaderChangedCount = Counters->GetCounter("LeaderElectionChangedCount");
LeaderChanged = Counters->GetCounter("LeaderChanged", true);
}

::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounters::TCounterPtr Errors;
::NMonitoring::TDynamicCounters::TCounterPtr LeaderChangedCount;
::NMonitoring::TDynamicCounters::TCounterPtr LeaderChanged;
};

class TLeaderElection: public TActorBootstrapped<TLeaderElection> {
Expand Down Expand Up @@ -458,7 +458,7 @@ void TLeaderElection::Handle(TEvPrivate::TEvDescribeSemaphoreResult::TPtr& ev) {
if (!LeaderActorId || (*LeaderActorId != id)) {
LOG_ROW_DISPATCHER_INFO("Send TEvCoordinatorChanged to " << ParentId);
TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(id, generation));
Metrics.LeaderChangedCount->Inc();
Metrics.LeaderChanged->Inc();
}
LeaderActorId = id;
}
Expand Down
20 changes: 10 additions & 10 deletions ydb/core/fq/libs/row_dispatcher/probes.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
NAMES("sender", "coordinatorGeneration", "coordinatorActor")) \
PROBE(StartSession, \
GROUPS(), \
TYPES(TString, ui32, TString, ui64), \
NAMES("sender", "partitionId", "queryId", "size")) \
TYPES(TString, TString, ui64), \
NAMES("sender", "queryId", "size")) \
PROBE(GetNextBatch, \
GROUPS(), \
TYPES(TString, ui32, TString, ui64), \
Expand All @@ -53,16 +53,16 @@
NAMES("sender", "partitionId", "queryId", "size")) \
PROBE(StopSession, \
GROUPS(), \
TYPES(TString, ui32, TString, ui64), \
NAMES("sender", "partitionId", "queryId", "size")) \
TYPES(TString, TString, ui64), \
NAMES("sender", "queryId", "size")) \
PROBE(TryConnect, \
GROUPS(), \
TYPES(TString, ui32), \
NAMES("sender", "nodeId")) \
PROBE(PrivateHeartbeat, \
GROUPS(), \
TYPES(TString, ui32, TString, ui64), \
NAMES("sender", "partitionId", "queryId", "generation")) \
TYPES(TString, TString, ui64), \
NAMES("sender", "queryId", "generation")) \
PROBE(NewDataArrived, \
GROUPS(), \
TYPES(TString, TString, TString, ui64, ui64), \
Expand All @@ -73,12 +73,12 @@
NAMES("sender", "readActor", "queryId", "generation", "size")) \
PROBE(SessionError, \
GROUPS(), \
TYPES(TString, TString, ui32, TString, ui64, ui64), \
NAMES("sender", "readActor", "partitionId","queryId", "generation", "size")) \
TYPES(TString, TString, TString, ui64, ui64), \
NAMES("sender", "readActor", "queryId", "generation", "size")) \
PROBE(Statistics, \
GROUPS(), \
TYPES(TString, TString, ui32, TString, ui64, ui64), \
NAMES("sender", "readActor", "partitionId","queryId", "generation", "size")) \
TYPES(TString, TString, ui64, ui64), \
NAMES("readActor", "queryId", "generation", "size")) \
PROBE(UpdateMetrics, \
GROUPS(), \
TYPES(), \
Expand Down
30 changes: 23 additions & 7 deletions ydb/core/fq/libs/row_dispatcher/protos/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,34 @@ import "ydb/public/api/protos/ydb_issue_message.proto";

message TEvGetAddressRequest {
NYql.NPq.NProto.TDqPqTopicSource Source = 1;
repeated uint32 PartitionId = 2;
repeated uint32 PartitionId = 2 [deprecated=true];
repeated uint32 PartitionIds = 3;
}

message TEvPartitionAddress {
repeated uint32 PartitionId = 1;
repeated uint32 PartitionId = 1 [deprecated=true];
NActorsProto.TActorId ActorId = 2;
repeated uint32 PartitionIds = 3;
}

message TEvGetAddressResponse {
repeated TEvPartitionAddress Partitions = 1;
}

message TPartitionOffset {
uint32 PartitionId = 1;
uint64 Offset = 2;
}

message TEvStartSession {
NYql.NPq.NProto.TDqPqTopicSource Source = 1;
uint32 PartitionId = 2;
uint32 PartitionId = 2 [deprecated=true];
string Token = 3;
optional uint64 Offset = 4;
optional uint64 Offset = 4 [deprecated=true];
uint64 StartingMessageTimestampMs = 5;
string QueryId = 6;
repeated uint32 PartitionIds = 7;
repeated TPartitionOffset Offsets = 8;
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}

Expand All @@ -50,7 +59,7 @@ message TEvNewDataArrived {

message TEvStopSession {
NYql.NPq.NProto.TDqPqTopicSource Source = 1;
uint32 PartitionId = 2;
uint32 PartitionId = 2 [deprecated=true];
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}

Expand All @@ -68,16 +77,23 @@ message TEvMessageBatch {
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}

message TEvStatistics {
message TPartitionStatistics {
uint32 PartitionId = 1;
uint64 NextMessageOffset = 2;
}

message TEvStatistics {
uint32 PartitionId = 1; // deprecated
uint64 NextMessageOffset = 2; // deprecated
uint64 ReadBytes = 3;
repeated TPartitionStatistics Partition = 4;
uint64 CpuMicrosec = 5;
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}

message TEvSessionError {
reserved 1;
uint32 PartitionId = 2;
uint32 PartitionId = 2 [deprecated=true];
NYql.NDqProto.StatusIds.StatusCode StatusCode = 3;
repeated Ydb.Issue.IssueMessage Issues = 4;
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
Expand Down
Loading

0 comments on commit 9065dc3

Please sign in to comment.