Skip to content

Commit

Permalink
add network utilization aggregate (ydb-platform#13550)
Browse files Browse the repository at this point in the history
  • Loading branch information
adameat authored Jan 20, 2025
1 parent 35b059a commit 04f4156
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 66 deletions.
5 changes: 1 addition & 4 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,11 +666,8 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
} else if (data.SessionConnected) {
record.SetSessionState(NKikimrWhiteboard::TNodeStateInfo::CONNECTED);
}
record.SetSameScope(data.SameScope);
data.ActorSystem->Send(whiteboardId, update.release());
if (data.ReportClockSkew) {
data.ActorSystem->Send(whiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvClockSkewUpdate(
data.PeerNodeId, data.ClockSkewUs));
}
};
}

Expand Down
20 changes: 0 additions & 20 deletions ydb/core/node_whiteboard/node_whiteboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,28 +431,8 @@ struct TEvWhiteboard{

struct TEvSystemStateResponse : public TEventPB<TEvSystemStateResponse, NKikimrWhiteboard::TEvSystemStateResponse, EvSystemStateResponse> {};

struct TEvClockSkewUpdate : TEventPB<TEvClockSkewUpdate, NKikimrWhiteboard::TNodeClockSkew, EvClockSkewUpdate> {
TEvClockSkewUpdate() = default;

TEvClockSkewUpdate(const ui32 peerNodeId, i64 clockSkewUs) {
Record.SetPeerNodeId(peerNodeId);
Record.SetClockSkewUs(clockSkewUs);
}
};

struct TEvNodeStateUpdate : TEventPB<TEvNodeStateUpdate, NKikimrWhiteboard::TNodeStateInfo, EvNodeStateUpdate> {
TEvNodeStateUpdate() = default;

TEvNodeStateUpdate(const TString& peerName, bool connected) {
Record.SetPeerName(peerName);
Record.SetConnected(connected);
}

TEvNodeStateUpdate(const TString& peerName, bool connected, NKikimrWhiteboard::EFlag connectStatus) {
Record.SetPeerName(peerName);
Record.SetConnected(connected);
Record.SetConnectStatus(connectStatus);
}
};

struct TEvNodeStateDelete : TEventPB<TEvNodeStateDelete, NKikimrWhiteboard::TNodeStateInfo, EvNodeStateDelete> {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/node_whiteboard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ message TNodeStateInfo {
optional uint64 BytesWritten = 14; // bytes written to the socket
optional uint64 WriteThroughput = 15; // bytes written per second
optional ESessionState SessionState = 16;
optional bool SameScope = 17; // true if the peer is in the same scope
}

message TEvNodeStateRequest {
Expand Down Expand Up @@ -342,6 +343,7 @@ message TSystemStateInfo {
optional NKikimrMemory.TMemoryStats MemoryStats = 38;
optional double CoresUsed = 39;
optional uint32 CoresTotal = 40;
optional float NetworkUtilization = 41;
}

message TEvSystemStateRequest {
Expand Down
62 changes: 33 additions & 29 deletions ydb/core/tablet/node_whiteboard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
enum EEv {
EvUpdateRuntimeStats = EventSpaceBegin(TEvents::ES_PRIVATE),
EvCleanupDeadTablets,
EvUpdateClockSkew,
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expected EvEnd < EventSpaceEnd");

struct TEvUpdateRuntimeStats : TEventLocal<TEvUpdateRuntimeStats, EvUpdateRuntimeStats> {};
struct TEvCleanupDeadTablets : TEventLocal<TEvCleanupDeadTablets, EvCleanupDeadTablets> {};
struct TEvUpdateClockSkew : TEventLocal<TEvUpdateClockSkew, EvUpdateClockSkew> {};
};
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand Down Expand Up @@ -64,7 +62,6 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
MaxClockSkewPeerIdCounter = group->GetCounter("MaxClockSkewPeerId");

ctx.Schedule(TDuration::Seconds(60), new TEvPrivate::TEvCleanupDeadTablets());
ctx.Schedule(TDuration::Seconds(15), new TEvPrivate::TEvUpdateClockSkew());
Become(&TNodeWhiteboardService::StateFunc);
}

Expand All @@ -76,6 +73,7 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
std::unordered_map<ui32, NKikimrWhiteboard::TBSGroupStateInfo> BSGroupStateInfo;
i64 MaxClockSkewWithPeerUs;
ui32 MaxClockSkewPeerId;
float MaxNetworkUtilization = 0.0;
NKikimrWhiteboard::TSystemStateInfo SystemStateInfo;
THolder<NTracing::ITraceCollection> TabletIntrospectionData;

Expand Down Expand Up @@ -509,7 +507,6 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
STRICT_STFUNC(StateFunc,
HFunc(TEvWhiteboard::TEvTabletStateUpdate, Handle);
HFunc(TEvWhiteboard::TEvTabletStateRequest, Handle);
HFunc(TEvWhiteboard::TEvClockSkewUpdate, Handle);
HFunc(TEvWhiteboard::TEvNodeStateUpdate, Handle);
HFunc(TEvWhiteboard::TEvNodeStateDelete, Handle);
HFunc(TEvWhiteboard::TEvNodeStateRequest, Handle);
Expand Down Expand Up @@ -538,7 +535,6 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
HFunc(TEvWhiteboard::TEvSignalBodyRequest, Handle);
HFunc(TEvPrivate::TEvUpdateRuntimeStats, Handle);
HFunc(TEvPrivate::TEvCleanupDeadTablets, Handle);
HFunc(TEvPrivate::TEvUpdateClockSkew, Handle);
)

void Handle(TEvWhiteboard::TEvTabletStateUpdate::TPtr &ev, const TActorContext &ctx) {
Expand All @@ -552,14 +548,6 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
}
}

void Handle(TEvWhiteboard::TEvClockSkewUpdate::TPtr &ev, const TActorContext &) {
i64 skew = ev->Get()->Record.GetClockSkewUs();
if (abs(skew) > abs(MaxClockSkewWithPeerUs)) {
MaxClockSkewWithPeerUs = skew;
MaxClockSkewPeerId = ev->Get()->Record.GetPeerNodeId();
}
}

void Handle(TEvWhiteboard::TEvNodeStateUpdate::TPtr &ev, const TActorContext &ctx) {
auto& nodeStateInfo = NodeStateInfo[ev->Get()->Record.GetPeerName()];
ui64 previousChangeTime = nodeStateInfo.GetChangeTime();
Expand All @@ -571,6 +559,15 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
} else {
nodeStateInfo.ClearWriteThroughput();
}
if (ev->Get()->Record.GetSameScope()) {
i64 skew = ev->Get()->Record.GetClockSkewUs();
if (abs(skew) > abs(MaxClockSkewWithPeerUs)) {
MaxClockSkewWithPeerUs = skew;
MaxClockSkewPeerId = ev->Get()->Record.GetPeerNodeId();
}
}
// TODO: need better way to calculate network utilization
MaxNetworkUtilization = std::max(MaxNetworkUtilization, ev->Get()->Record.GetUtilization());
nodeStateInfo.MergeFrom(ev->Get()->Record);
nodeStateInfo.SetChangeTime(currentChangeTime);
}
Expand Down Expand Up @@ -1081,14 +1078,31 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
}

void Handle(TEvPrivate::TEvUpdateRuntimeStats::TPtr &, const TActorContext &ctx) {
THolder<TEvWhiteboard::TEvSystemStateUpdate> systemStatsUpdate = MakeHolder<TEvWhiteboard::TEvSystemStateUpdate>();
TVector<double> loadAverage = GetLoadAverage();
for (double d : loadAverage) {
systemStatsUpdate->Record.AddLoadAverage(d);
{
NKikimrWhiteboard::TSystemStateInfo systemStatsUpdate;
TVector<double> loadAverage = GetLoadAverage();
for (double d : loadAverage) {
systemStatsUpdate.AddLoadAverage(d);
}
if (CheckedMerge(SystemStateInfo, systemStatsUpdate)) {
SystemStateInfo.SetChangeTime(ctx.Now().MilliSeconds());
}
}
if (CheckedMerge(SystemStateInfo, systemStatsUpdate->Record)) {
SystemStateInfo.SetChangeTime(ctx.Now().MilliSeconds());

{
MaxClockSkewWithPeerUsCounter->Set(abs(MaxClockSkewWithPeerUs));
MaxClockSkewPeerIdCounter->Set(MaxClockSkewPeerId);

SystemStateInfo.SetMaxClockSkewWithPeerUs(MaxClockSkewWithPeerUs);
SystemStateInfo.SetMaxClockSkewPeerId(MaxClockSkewPeerId);
MaxClockSkewWithPeerUs = 0;
}

{
SystemStateInfo.SetNetworkUtilization(MaxNetworkUtilization);
MaxNetworkUtilization = 0;
}

UpdateSystemState(ctx);
ctx.Schedule(TDuration::Seconds(15), new TEvPrivate::TEvUpdateRuntimeStats());
}
Expand Down Expand Up @@ -1122,16 +1136,6 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
}
ctx.Schedule(TDuration::Seconds(60), new TEvPrivate::TEvCleanupDeadTablets());
}

void Handle(TEvPrivate::TEvUpdateClockSkew::TPtr &, const TActorContext &ctx) {
MaxClockSkewWithPeerUsCounter->Set(abs(MaxClockSkewWithPeerUs));
MaxClockSkewPeerIdCounter->Set(MaxClockSkewPeerId);

SystemStateInfo.SetMaxClockSkewWithPeerUs(MaxClockSkewWithPeerUs);
SystemStateInfo.SetMaxClockSkewPeerId(MaxClockSkewPeerId);
MaxClockSkewWithPeerUs = 0;
ctx.Schedule(TDuration::Seconds(15), new TEvPrivate::TEvUpdateClockSkew());
}
};

template<typename TMessage>
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/viewer/protos/viewer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ message TClusterInfo {
repeated NKikimrSysView.TStorageStatsEntry StorageStats = 51;
uint64 Hosts = 60;
uint64 Tenants = 61;
double NetworkUtilization = 62;
}

enum ETenantType {
Expand Down Expand Up @@ -398,6 +399,7 @@ message TTenant {
repeated TStorageUsage DatabaseStorage = 45;
uint32 CoresTotal = 50;
optional NKikimrMemory.TMemoryStats MemoryStats = 51;
float NetworkUtilization = 52;
}

message TTenants {
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/viewer/viewer_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ class TJsonCluster : public TViewerPipeClient {
request->AddFieldsRequired(NKikimrWhiteboard::TSystemStateInfo::kMemoryStatsFieldNumber);
request->AddFieldsRequired(NKikimrWhiteboard::TSystemStateInfo::kCoresUsedFieldNumber);
request->AddFieldsRequired(NKikimrWhiteboard::TSystemStateInfo::kCoresTotalFieldNumber);
request->AddFieldsRequired(NKikimrWhiteboard::TSystemStateInfo::kNetworkUtilizationFieldNumber);
}

void InitTabletWhiteboardRequest(NKikimrWhiteboard::TEvTabletStateRequest* request) {
Expand Down Expand Up @@ -471,6 +472,7 @@ class TJsonCluster : public TViewerPipeClient {

std::unordered_set<TString> hostPassed;
std::unordered_map<TString, TMemoryStats> memoryStats;
int nodesWithNetworkUtilization = 0;

for (TNode& node : NodeData) {
const NKikimrWhiteboard::TSystemStateInfo& systemState = node.SystemState;
Expand Down Expand Up @@ -538,6 +540,14 @@ class TJsonCluster : public TViewerPipeClient {
ClusterInfo.SetCoresUsed(ClusterInfo.GetCoresUsed() + systemState.GetCoresUsed());
ClusterInfo.SetCoresTotal(ClusterInfo.GetCoresTotal() + systemState.GetCoresTotal());
}
if (systemState.HasNetworkUtilization()) {
ClusterInfo.SetNetworkUtilization(ClusterInfo.GetNetworkUtilization() + systemState.GetNetworkUtilization());
++nodesWithNetworkUtilization;
}
}

if (nodesWithNetworkUtilization != 0) {
ClusterInfo.SetNetworkUtilization(ClusterInfo.GetNetworkUtilization() / nodesWithNetworkUtilization);
}

for (const auto& memStats : memoryStats) {
Expand Down
27 changes: 20 additions & 7 deletions ydb/core/viewer/viewer_tenantinfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,21 @@ class TJsonTenantInfo : public TViewerPipeClient {
}
}

void InitSystemStateRequest(NKikimrWhiteboard::TEvSystemStateRequest& request) {
request.MutableFieldsRequired()->CopyFrom(GetDefaultWhiteboardFields<NKikimrWhiteboard::TSystemStateInfo>());
request.AddFieldsRequired(NKikimrWhiteboard::TSystemStateInfo::kCoresUsedFieldNumber);
request.AddFieldsRequired(NKikimrWhiteboard::TSystemStateInfo::kCoresTotalFieldNumber);
if (MemoryStats) {
request.AddFieldsRequired(NKikimrWhiteboard::TSystemStateInfo::kMemoryStatsFieldNumber);
}
request.AddFieldsRequired(NKikimrWhiteboard::TSystemStateInfo::kNetworkUtilizationFieldNumber);
}

void SendWhiteboardSystemStateRequest(const TNodeId nodeId) {
Subscribers.insert(nodeId);
if (SystemStateResponse.count(nodeId) == 0) {
auto request = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvSystemStateRequest>();
request->Record.MutableFieldsRequired()->CopyFrom(GetDefaultWhiteboardFields<NKikimrWhiteboard::TSystemStateInfo>());
request->Record.AddFieldsRequired(NKikimrWhiteboard::TSystemStateInfo::kCoresUsedFieldNumber);
request->Record.AddFieldsRequired(NKikimrWhiteboard::TSystemStateInfo::kCoresTotalFieldNumber);
if (MemoryStats) {
request->Record.AddFieldsRequired(NKikimrWhiteboard::TSystemStateInfo::kMemoryStatsFieldNumber);
}
InitSystemStateRequest(request->Record);
SystemStateResponse.emplace(nodeId, MakeWhiteboardRequest(nodeId, request.release()));
}
}
Expand Down Expand Up @@ -313,7 +318,7 @@ class TJsonTenantInfo : public TViewerPipeClient {
TNodeId nodeId = *itPos;
Subscribers.insert(nodeId);
THolder<TEvViewer::TEvViewerRequest> sysRequest = MakeHolder<TEvViewer::TEvViewerRequest>();
sysRequest->Record.MutableSystemRequest();
InitSystemStateRequest(*sysRequest->Record.MutableSystemRequest());
sysRequest->Record.SetTimeout(Timeout / 3);
for (auto nodeId : nodesIds) {
sysRequest->Record.MutableLocation()->AddNodeId(nodeId);
Expand Down Expand Up @@ -790,6 +795,7 @@ class TJsonTenantInfo : public TViewerPipeClient {

THashSet<TNodeId> tenantNodes;
NMemory::TMemoryStatsAggregator tenantMemoryStats;
int nodesWithNetworkUtilization = 0;

for (TNodeId nodeId : tenant.GetNodeIds()) {
auto itNodeInfo = nodeSystemStateInfo.find(nodeId);
Expand Down Expand Up @@ -840,10 +846,17 @@ class TJsonTenantInfo : public TViewerPipeClient {
if (nodeInfo.HasMemoryStats()) {
tenantMemoryStats.Add(nodeInfo.GetMemoryStats(), nodeInfo.GetHost());
}
if (nodeInfo.HasNetworkUtilization()) {
tenant.SetNetworkUtilization(tenant.GetNetworkUtilization() + nodeInfo.GetNetworkUtilization());
++nodesWithNetworkUtilization;
}
overall = Max(overall, GetViewerFlag(nodeInfo.GetSystemState()));
}
tenantNodes.emplace(nodeId);
}
if (nodesWithNetworkUtilization != 0) {
tenant.SetNetworkUtilization(tenant.GetNetworkUtilization() / nodesWithNetworkUtilization);
}
tenant.MutableMemoryStats()->CopyFrom(tenantMemoryStats.Aggregate());
if (tenant.GetType() == NKikimrViewer::Serverless) {
tenant.SetStorageAllocatedSize(tenant.GetMetrics().GetStorage());
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/interconnect/interconnect_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace NActors {
// }
EFlag ConnectStatus;
i64 ClockSkewUs;
bool ReportClockSkew;
bool SameScope;
ui64 PingTimeUs;
NActors::TScopeId ScopeId;
double Utilization;
Expand Down
6 changes: 1 addition & 5 deletions ydb/library/actors/interconnect/interconnect_tcp_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1028,10 +1028,6 @@ namespace NActors {
} while (false);
}

// we need track clockskew only if it's one tenant nodes connection
// they have one scope in this case
bool reportClockSkew = Proxy->Common->LocalScopeId.first != 0 && Proxy->Common->LocalScopeId == Params.PeerScopeId;

callback({
.ActorSystem = TlsActivationContext->ExecutorThread.ActorSystem,
.PeerNodeId = Proxy->PeerNodeId,
Expand All @@ -1042,7 +1038,7 @@ namespace NActors {
.SessionConnected = connected && Socket,
.ConnectStatus = flagState,
.ClockSkewUs = ReceiveContext->ClockSkew_us,
.ReportClockSkew = reportClockSkew,
.SameScope = Proxy->Common->LocalScopeId == Params.PeerScopeId,
.PingTimeUs = ReceiveContext->PingRTT_us,
.ScopeId = Params.PeerScopeId,
.Utilization = Utilized,
Expand Down

0 comments on commit 04f4156

Please sign in to comment.