Skip to content

Commit

Permalink
add process defferent handle classes
Browse files Browse the repository at this point in the history
  • Loading branch information
VPolka committed Dec 18, 2024
1 parent 084221b commit 9b834dc
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 69 deletions.
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/vdisk/common/vdisk_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ namespace NKikimr {
, ReplNodeResponseQuoter(std::move(replNodeResponseQuoter))
, CostTracker()
, OOSMonGroup(std::make_shared<NMonGroup::TOutOfSpaceGroup>(VDiskCounters, "subsystem", "oos"))
, ResponseStatusMonGroup(std::make_shared<NMonGroup::TResponseStatusGroup>(VDiskCounters, "subsystem", "status"))
, ResponseStatusMonGroup(std::make_shared<NMonGroup::TResponseStatusGroup>(VDiskCounters))
, OutOfSpaceState(Top->GetTotalVDisksNum(), Top->GetOrderNumber(ShortSelfVDisk))
, CostMonGroup(vdiskCounters, "subsystem", "cost")
, Logger(as ? ActorSystemLogger(as) : DevNullLogger())
Expand Down
108 changes: 90 additions & 18 deletions ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
#include "defs.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/protos/base.pb.h>
#include <ydb/core/protos/blobstorage_base.pb.h>
#include <ydb/core/protos/node_whiteboard.pb.h>
#include <ydb/core/protos/whiteboard_disk_states.pb.h>

namespace NKikimr {
namespace NMonGroup {

class TBase {
public:
TBase(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
Expand Down Expand Up @@ -665,28 +666,99 @@ public:
};

///////////////////////////////////////////////////////////////////////////////////
// TResponseStatusGroup
// THandleClassGroup
///////////////////////////////////////////////////////////////////////////////////
class TResponseStatusGroup : public TBase {
class THandleClassGroup : public TBase {
public:
GROUP_CONSTRUCTOR(TResponseStatusGroup)
GROUP_CONSTRUCTOR(THandleClassGroup)
{
COUNTER_INIT(ResponsesWithStatusError, true);
COUNTER_INIT(ResponsesWithStatusRace, true);
COUNTER_INIT(ResponsesWithStatusBlocked, true);
COUNTER_INIT(ResponsesWithStatusOutOfSpace, true);
COUNTER_INIT(ResponsesWithStatusDeadline, true);
COUNTER_INIT(ResponsesWithStatusNotReady, true);
COUNTER_INIT(ResponsesWithStatusVdiskErrorState, true);
COUNTER_INIT(VGetDiscover, true);
COUNTER_INIT(VGetFast, true);
COUNTER_INIT(VGetAsync, true);
COUNTER_INIT(VGetLow, true);
COUNTER_INIT(VPutTabletLog, true);
COUNTER_INIT(VPutUserData, true);
COUNTER_INIT(VPutAsyncBlob, true);
}

COUNTER_DEF(VGetDiscover);
COUNTER_DEF(VGetFast);
COUNTER_DEF(VGetAsync);
COUNTER_DEF(VGetLow);
COUNTER_DEF(VPutTabletLog);
COUNTER_DEF(VPutUserData);
COUNTER_DEF(VPutAsyncBlob);

const ::NMonitoring::TDeprecatedCounter &GetCounter(NKikimrBlobStorage::EGetHandleClass handleClass) const {
switch (handleClass) {
case NKikimrBlobStorage::AsyncRead:
return VGetAsync();
case NKikimrBlobStorage::FastRead:
return VGetFast();
case NKikimrBlobStorage::Discover:
return VGetDiscover();
case NKikimrBlobStorage::LowRead:
return VGetLow();
}
}
const ::NMonitoring::TDeprecatedCounter &GetCounter(NKikimrBlobStorage::EPutHandleClass handleClass) const {
switch (handleClass) {
case NKikimrBlobStorage::TabletLog:
return VPutTabletLog();
case NKikimrBlobStorage::AsyncBlob:
return VPutAsyncBlob();
case NKikimrBlobStorage::UserData:
return VPutUserData();
}
}
};

///////////////////////////////////////////////////////////////////////////////////
// TResponseStatusGroup
///////////////////////////////////////////////////////////////////////////////////
class TResponseStatusGroup {
public:
TIntrusivePtr<::NMonitoring::TDynamicCounters> Group;

COUNTER_DEF(ResponsesWithStatusError);
COUNTER_DEF(ResponsesWithStatusRace);
COUNTER_DEF(ResponsesWithStatusBlocked);
COUNTER_DEF(ResponsesWithStatusOutOfSpace);
COUNTER_DEF(ResponsesWithStatusDeadline);
COUNTER_DEF(ResponsesWithStatusNotReady);
COUNTER_DEF(ResponsesWithStatusVdiskErrorState);
THandleClassGroup ResponsesWithStatusError;
THandleClassGroup ResponsesWithStatusRace;
THandleClassGroup ResponsesWithStatusBlocked;
THandleClassGroup ResponsesWithStatusOutOfSpace;
THandleClassGroup ResponsesWithStatusDeadline;
THandleClassGroup ResponsesWithStatusNotReady;
THandleClassGroup ResponsesWithStatusVdiskErrorState;

TResponseStatusGroup(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters)
: Group(counters->GetSubgroup("subsystem", "statuses"))
, ResponsesWithStatusError(Group, "status", "ERROR")
, ResponsesWithStatusRace(Group, "status", "RACE")
, ResponsesWithStatusBlocked(Group, "status", "BLOCKED")
, ResponsesWithStatusOutOfSpace(Group, "status", "OUT_OF_SPACE")
, ResponsesWithStatusDeadline(Group, "status", "DEADLINE")
, ResponsesWithStatusNotReady(Group, "status", "NOT_READY")
, ResponsesWithStatusVdiskErrorState(Group, "status", "VDISK_STATUS_ERROR")
{}

template <typename THandleClassType>
const ::NMonitoring::TDeprecatedCounter &GetCounter(NKikimrProto::EReplyStatus status, THandleClassType handleClass) const {
switch (status) {
case NKikimrProto::ERROR:
return ResponsesWithStatusError.GetCounter(handleClass);
case NKikimrProto::RACE:
return ResponsesWithStatusRace.GetCounter(handleClass);
case NKikimrProto::BLOCKED:
return ResponsesWithStatusBlocked.GetCounter(handleClass);
case NKikimrProto::OUT_OF_SPACE:
return ResponsesWithStatusOutOfSpace.GetCounter(handleClass);
case NKikimrProto::DEADLINE:
return ResponsesWithStatusDeadline.GetCounter(handleClass);
case NKikimrProto::NOTREADY:
return ResponsesWithStatusNotReady.GetCounter(handleClass);
case NKikimrProto::VDISK_ERROR_STATE:
return ResponsesWithStatusVdiskErrorState.GetCounter(handleClass);
default: break;
}
}
};

///////////////////////////////////////////////////////////////////////////////////
Expand Down
135 changes: 85 additions & 50 deletions ydb/core/blobstorage/vdisk/common/vdisk_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,10 @@
namespace NKikimr {

template <class TRecord>
void ReportResponse(TRecord record, const TLogoBlobID& blobId, const TIntrusivePtr<TVDiskContext>& vCtx);

template <class TRecord>
void ReportOSStatus(TRecord record, const TLogoBlobID& blobId, const TIntrusivePtr<TVDiskContext>& vCtx);
void ReportResponse(const TRecord& record, const TIntrusivePtr<TVDiskContext>& vCtx);
void LogOOSStatus(ui32 flags, const TLogoBlobID& blobId, const TString& vDiskLogPrefix, std::atomic<ui32>& curFlags);
void UpdateMonOOSStatus(ui32 flags, const std::shared_ptr<NMonGroup::TOutOfSpaceGroup>& monGroup);

template <class TRecord>
void ReportResponseStatus(TRecord record, const TLogoBlobID& blobId, const TIntrusivePtr<TVDiskContext>& vCtx);
void UpdateMonResponseStatus(NKikimrProto::EReplyStatus status, const std::shared_ptr<NMonGroup::TResponseStatusGroup>& monGroup);
void UpdateMonResponseStatus(NKikimrProto::EReplyStatus status, HandleClassType handleClass, const std::shared_ptr<NMonGroup::TResponseStatusGroup>& monGroup);

void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, const TIntrusivePtr<TVDiskContext>& vCtx) {
ui32 channel = TInterconnectChannels::IC_BLOBSTORAGE;
Expand All @@ -28,20 +22,20 @@ void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEve

void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, ui32 channel, const TIntrusivePtr<TVDiskContext>& vCtx) {
if (vCtx) {
switch(ev->Type()) {
case TEvBlobStorage::TEvVPutResult::EventType: {
TEvBlobStorage::TEvVPutResult* event = static_cast<TEvBlobStorage::TEvVPutResult *>(ev);
ReportResponse(event->Record, LogoBlobIDFromLogoBlobID(event->Record.GetBlobID()), vCtx);
break;
}
case TEvBlobStorage::TEvVMultiPutResult::EventType: {
TEvBlobStorage::TEvVMultiPutResult *event = static_cast<TEvBlobStorage::TEvVMultiPutResult *>(ev);
if (event->Record.ItemsSize() > 0) {
const auto& item = event->Record.GetItems(0);
ReportResponse(event->Record, LogoBlobIDFromLogoBlobID(item.GetBlobID()), vCtx);
}
break;
switch (ev->Type()) {
#define HANDLE_EVENT(T) \
case T::EventType: { \
T *event = static_cast<T *>(ev); \
ReportResponse(event->Record, vCtx); \
break; \
}

HANDLE_EVENT(TEvBlobStorage::TEvVPutResult)
HANDLE_EVENT(TEvBlobStorage::TEvVMultiPutResult)
HANDLE_EVENT(TEvBlobStorage::TEvVGetResult)
HANDLE_EVENT(TEvBlobStorage::TEvVGetBlockResult)
HANDLE_EVENT(TEvBlobStorage::TEvVCollectGarbageResult)
#undef HANDLE_EVENT
}
}

Expand Down Expand Up @@ -76,43 +70,75 @@ void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEve
}
}

template <class TRecord>
void ReportResponse(TRecord record, const TLogoBlobID& blobId, const TIntrusivePtr<TVDiskContext>& vCtx) {
ReportOSStatus(record, blobId, vCtx);
ReportResponseStatus(record, blobId, vCtx);
}
template <typename T, typename Tuple>
struct IsInTypes;

template <class TRecord>
void ReportResponseStatus(TRecord record, const TLogoBlobID& blobId, const TIntrusivePtr<TVDiskContext>& vCtx) {
UpdateMonResponseStatus(record.GetStatus(), vCtx->ResponseStatusMonGroup);
}
template <typename T, typename... Types>
struct IsInTypes<T, std::tuple<Types...>> {
static constexpr bool value = (std::is_same_v<T, Types> || ...);
};

void UpdateMonResponseStatus(NKikimrProto::EReplyStatus status, const std::shared_ptr<NMonGroup::TResponseStatusGroup>& monGroup) {
if (!monGroup) {
return;
struct TReportingOSStatus {
using EnableFor = std::tuple<
NKikimrBlobStorage::TEvVPutResult,
NKikimrBlobStorage::TEvVMultiPutResult>;

template <typename TRecord>
static void Report(const TRecord& record, const TIntrusivePtr<TVDiskContext>& vCtx) {
LogOOSStatus(record.GetStatusFlags(), LogoBlobIDFromLogoBlobID(record.GetBlobID()), vCtx->VDiskLogPrefix, vCtx->CurrentOOSStatusFlag);
UpdateMonOOSStatus(record.GetStatusFlags(), vCtx->OOSMonGroup);
}

template<>
void Report(const NKikimrBlobStorage::TEvVMultiPutResult& record, const TIntrusivePtr<TVDiskContext>& vCtx) {
if (record.ItemsSize() > 0) {
const auto& item = record.GetItems(0);
LogOOSStatus(record.GetStatusFlags(), LogoBlobIDFromLogoBlobID(item.GetBlobID()), vCtx->VDiskLogPrefix, vCtx->CurrentOOSStatusFlag);
UpdateMonOOSStatus(record.GetStatusFlags(), vCtx->OOSMonGroup);
}
}
};

struct TReportingResponseStatus {
using EnableFor = std::tuple<
NKikimrBlobStorage::TEvVPutResult,
NKikimrBlobStorage::TEvVMultiPutResult,
NKikimrBlobStorage::TEvVGetResult,
NKikimrBlobStorage::TEvVGetBlockResult,
NKikimrBlobStorage::TEvVCollectGarbageResult>;

template <typename TRecord>
static void Report(const TRecord& record, const TIntrusivePtr<TVDiskContext>& vCtx) {
UpdateMonResponseStatus(record.GetStatus(), record.GetHandleClass(), vCtx->ResponseStatusMonGroup);
}

if (status == NKikimrProto::ERROR) {
monGroup->ResponsesWithStatusError().Inc();
} else if (status == NKikimrProto::RACE) {
monGroup->ResponsesWithStatusRace().Inc();
} else if (status == NKikimrProto::BLOCKED) {
monGroup->ResponsesWithStatusBlocked().Inc();
} else if (status == NKikimrProto::OUT_OF_SPACE) {
monGroup->ResponsesWithStatusOutOfSpace().Inc();
} else if (status == NKikimrProto::DEADLINE) {
monGroup->ResponsesWithStatusDeadline().Inc();
} else if (status == NKikimrProto::NOTREADY) {
monGroup->ResponsesWithStatusNotReady().Inc();
} else if (status == NKikimrProto::VDISK_ERROR_STATE) {
monGroup->ResponsesWithStatusVdiskErrorState().Inc();
template<>
void Report(const NKikimrBlobStorage::TEvVMultiPutResult& record, const TIntrusivePtr<TVDiskContext>& vCtx) {
for (const auto& item : record.GetItems()) {
UpdateMonResponseStatus(item.GetStatus(), record.GetHandleClass(), vCtx->ResponseStatusMonGroup);
}
}
};

#define DEFUNE_REPORT(NAME) \
template <typename TRecord> \
typename std::enable_if<IsInTypes<TRecord, TReporting##NAME::EnableFor>::value>::type Report##NAME( \
const TRecord& record, const TIntrusivePtr<TVDiskContext>& vCtx) { \
TReporting##NAME::Report(record, vCtx); \
} \
\
template <typename TRecord> \
typename std::enable_if<!IsInTypes<TRecord, TReporting##NAME::EnableFor>::value>::type Report##NAME( \
const TRecord& record, const TIntrusivePtr<TVDiskContext>& vCtx) {}

DEFUNE_REPORT(OSStatus)
DEFUNE_REPORT(ResponseStatus)
#undef DEFUNE_REPORT

template <class TRecord>
void ReportOSStatus(TRecord record, const TLogoBlobID& blobId, const TIntrusivePtr<TVDiskContext>& vCtx) {
LogOOSStatus(record.GetStatusFlags(), blobId, vCtx->VDiskLogPrefix, vCtx->CurrentOOSStatusFlag);
UpdateMonOOSStatus(record.GetStatusFlags(), vCtx->OOSMonGroup);
void ReportResponse(const TRecord& record, const TIntrusivePtr<TVDiskContext>& vCtx) {
ReportOSStatus(record, vCtx);
ReportResponseStatus(record, vCtx);
}

void LogOOSStatus(ui32 flags, const TLogoBlobID& blobId, const TString& vDiskLogPrefix, std::atomic<ui32>& curFlags) {
Expand Down Expand Up @@ -157,4 +183,13 @@ void UpdateMonOOSStatus(ui32 flags, const std::shared_ptr<NMonGroup::TOutOfSpace
}
}

void UpdateMonResponseStatus(NKikimrProto::EReplyStatus status, HandleClassType handleClass,
const std::shared_ptr<NMonGroup::TResponseStatusGroup>& monGroup) {
if (!monGroup) {
return;
}

monGroup->GetCounter(status, handleClass).Inc();
}

} //NKikimr

0 comments on commit 9b834dc

Please sign in to comment.