Skip to content

Commit

Permalink
add handle class
Browse files Browse the repository at this point in the history
  • Loading branch information
VPolka committed Dec 23, 2024
1 parent 9b834dc commit 0ef8627
Show file tree
Hide file tree
Showing 27 changed files with 182 additions and 120 deletions.
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/vdisk/common/blobstorage_status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace NKikimr {
SetRacingGroupInfo(record, Result->Record, GroupInfo);
LOG_DEBUG(ctx, BS_VDISK_OTHER, VDISKP(VCtx->VDiskLogPrefix, "TEvVStatusResult Request# {%s} Response# {%s}",
SingleLineProto(record).data(), SingleLineProto(Result->Record).data()));
SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie, Ev->GetChannel(), VCtx);
SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie, Ev->GetChannel(), VCtx, {});
Die(ctx);
return;
}
Expand Down Expand Up @@ -73,7 +73,7 @@ namespace NKikimr {
ctx.Send(NotifyId, new TEvents::TEvActorDied());
LOG_DEBUG(ctx, BS_VDISK_GET,
VDISKP(VCtx->VDiskLogPrefix, "TEvVStatusResult"));
SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie, Ev->GetChannel(), VCtx);
SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie, Ev->GetChannel(), VCtx, {});
Die(ctx);
}
}
Expand Down
68 changes: 41 additions & 27 deletions ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
Original file line number Diff line number Diff line change
Expand Up @@ -672,43 +672,51 @@ public:
public:
GROUP_CONSTRUCTOR(THandleClassGroup)
{
COUNTER_INIT(VGetDiscover, true);
COUNTER_INIT(VGetFast, true);
COUNTER_INIT(VGetAsync, true);
COUNTER_INIT(VGetLow, true);
COUNTER_INIT(VPutTabletLog, true);
COUNTER_INIT(VPutUserData, true);
COUNTER_INIT(VPutAsyncBlob, true);
COUNTER_INIT(Undefined, true);
COUNTER_INIT(GetDiscover, true);
COUNTER_INIT(GetFast, true);
COUNTER_INIT(GetAsync, true);
COUNTER_INIT(GetLow, true);
COUNTER_INIT(PutTabletLog, true);
COUNTER_INIT(PutUserData, true);
COUNTER_INIT(PutAsyncBlob, true);
}

COUNTER_DEF(VGetDiscover);
COUNTER_DEF(VGetFast);
COUNTER_DEF(VGetAsync);
COUNTER_DEF(VGetLow);
COUNTER_DEF(VPutTabletLog);
COUNTER_DEF(VPutUserData);
COUNTER_DEF(VPutAsyncBlob);
COUNTER_DEF(Undefined);
COUNTER_DEF(GetDiscover);
COUNTER_DEF(GetFast);
COUNTER_DEF(GetAsync);
COUNTER_DEF(GetLow);
COUNTER_DEF(PutTabletLog);
COUNTER_DEF(PutUserData);
COUNTER_DEF(PutAsyncBlob);

const ::NMonitoring::TDeprecatedCounter &GetCounter(NKikimrBlobStorage::EGetHandleClass handleClass) const {
switch (handleClass) {
::NMonitoring::TDeprecatedCounter &GetCounter(const std::optional<NKikimrBlobStorage::EGetHandleClass>& handleClass = std::nullopt) {
if (!handleClass) {
return Undefined();
}
switch (*handleClass) {
case NKikimrBlobStorage::AsyncRead:
return VGetAsync();
return GetAsync();
case NKikimrBlobStorage::FastRead:
return VGetFast();
return GetFast();
case NKikimrBlobStorage::Discover:
return VGetDiscover();
return GetDiscover();
case NKikimrBlobStorage::LowRead:
return VGetLow();
return GetLow();
}
}
const ::NMonitoring::TDeprecatedCounter &GetCounter(NKikimrBlobStorage::EPutHandleClass handleClass) const {
switch (handleClass) {
::NMonitoring::TDeprecatedCounter &GetCounter(const std::optional<NKikimrBlobStorage::EPutHandleClass>& handleClass = std::nullopt) {
if (!handleClass) {
return Undefined();
}
switch (*handleClass) {
case NKikimrBlobStorage::TabletLog:
return VPutTabletLog();
return PutTabletLog();
case NKikimrBlobStorage::AsyncBlob:
return VPutAsyncBlob();
return PutAsyncBlob();
case NKikimrBlobStorage::UserData:
return VPutUserData();
return PutUserData();
}
}
};
Expand All @@ -720,6 +728,7 @@ public:
public:
TIntrusivePtr<::NMonitoring::TDynamicCounters> Group;

THandleClassGroup Undefined;
THandleClassGroup ResponsesWithStatusError;
THandleClassGroup ResponsesWithStatusRace;
THandleClassGroup ResponsesWithStatusBlocked;
Expand All @@ -730,6 +739,7 @@ public:

TResponseStatusGroup(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters)
: Group(counters->GetSubgroup("subsystem", "statuses"))
, Undefined(Group, "status", "UNDEFINED")
, ResponsesWithStatusError(Group, "status", "ERROR")
, ResponsesWithStatusRace(Group, "status", "RACE")
, ResponsesWithStatusBlocked(Group, "status", "BLOCKED")
Expand All @@ -740,7 +750,7 @@ public:
{}

template <typename THandleClassType>
const ::NMonitoring::TDeprecatedCounter &GetCounter(NKikimrProto::EReplyStatus status, THandleClassType handleClass) const {
::NMonitoring::TDeprecatedCounter &GetCounter(NKikimrProto::EReplyStatus status, const std::optional<THandleClassType>& handleClass = std::nullopt) {
switch (status) {
case NKikimrProto::ERROR:
return ResponsesWithStatusError.GetCounter(handleClass);
Expand All @@ -756,9 +766,13 @@ public:
return ResponsesWithStatusNotReady.GetCounter(handleClass);
case NKikimrProto::VDISK_ERROR_STATE:
return ResponsesWithStatusVdiskErrorState.GetCounter(handleClass);
default: break;
default: return Undefined.GetCounter(handleClass);
}
}

::NMonitoring::TDeprecatedCounter &GetCounter(NKikimrProto::EReplyStatus status) {
return GetCounter(status, std::optional<NKikimrBlobStorage::EPutHandleClass>{});
}
};

///////////////////////////////////////////////////////////////////////////////////
Expand Down
55 changes: 30 additions & 25 deletions ydb/core/blobstorage/vdisk/common/vdisk_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,27 @@
namespace NKikimr {

template <class TRecord>
void ReportResponse(const TRecord& record, const TIntrusivePtr<TVDiskContext>& vCtx);
void ReportResponse(const TRecord& record, const TCommonHandleClass& handleClass, const TIntrusivePtr<TVDiskContext>& vCtx);
void LogOOSStatus(ui32 flags, const TLogoBlobID& blobId, const TString& vDiskLogPrefix, std::atomic<ui32>& curFlags);
void UpdateMonOOSStatus(ui32 flags, const std::shared_ptr<NMonGroup::TOutOfSpaceGroup>& monGroup);
void UpdateMonResponseStatus(NKikimrProto::EReplyStatus status, HandleClassType handleClass, const std::shared_ptr<NMonGroup::TResponseStatusGroup>& monGroup);
void UpdateMonResponseStatus(NKikimrProto::EReplyStatus status, const TCommonHandleClass& handleClass, const std::shared_ptr<NMonGroup::TResponseStatusGroup>& monGroup);

void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, const TIntrusivePtr<TVDiskContext>& vCtx) {
void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, const TIntrusivePtr<TVDiskContext>& vCtx, const TCommonHandleClass& handleClass) {
ui32 channel = TInterconnectChannels::IC_BLOBSTORAGE;
if (TEvVResultBase *base = dynamic_cast<TEvVResultBase *>(ev)) {
channel = base->GetChannelToSend();
}
SendVDiskResponse(ctx, recipient, ev, cookie, channel, vCtx);
SendVDiskResponse(ctx, recipient, ev, cookie, channel, vCtx, handleClass);
}

void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, ui32 channel, const TIntrusivePtr<TVDiskContext>& vCtx) {
void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, ui32 channel, const TIntrusivePtr<TVDiskContext>& vCtx, const TCommonHandleClass& handleClass) {
if (vCtx) {
switch (ev->Type()) {
#define HANDLE_EVENT(T) \
case T::EventType: { \
T *event = static_cast<T *>(ev); \
ReportResponse(event->Record, vCtx); \
break; \
#define HANDLE_EVENT(T) \
case T::EventType: { \
T *event = static_cast<T *>(ev); \
ReportResponse(event->Record, handleClass, vCtx); \
break; \
}

HANDLE_EVENT(TEvBlobStorage::TEvVPutResult)
Expand Down Expand Up @@ -84,13 +84,13 @@ struct TReportingOSStatus {
NKikimrBlobStorage::TEvVMultiPutResult>;

template <typename TRecord>
static void Report(const TRecord& record, const TIntrusivePtr<TVDiskContext>& vCtx) {
static void Report(const TRecord& record, const TCommonHandleClass&, const TIntrusivePtr<TVDiskContext>& vCtx) {
LogOOSStatus(record.GetStatusFlags(), LogoBlobIDFromLogoBlobID(record.GetBlobID()), vCtx->VDiskLogPrefix, vCtx->CurrentOOSStatusFlag);
UpdateMonOOSStatus(record.GetStatusFlags(), vCtx->OOSMonGroup);
}

template<>
void Report(const NKikimrBlobStorage::TEvVMultiPutResult& record, const TIntrusivePtr<TVDiskContext>& vCtx) {
void Report(const NKikimrBlobStorage::TEvVMultiPutResult& record, const TCommonHandleClass&, const TIntrusivePtr<TVDiskContext>& vCtx) {
if (record.ItemsSize() > 0) {
const auto& item = record.GetItems(0);
LogOOSStatus(record.GetStatusFlags(), LogoBlobIDFromLogoBlobID(item.GetBlobID()), vCtx->VDiskLogPrefix, vCtx->CurrentOOSStatusFlag);
Expand All @@ -108,37 +108,37 @@ struct TReportingResponseStatus {
NKikimrBlobStorage::TEvVCollectGarbageResult>;

template <typename TRecord>
static void Report(const TRecord& record, const TIntrusivePtr<TVDiskContext>& vCtx) {
UpdateMonResponseStatus(record.GetStatus(), record.GetHandleClass(), vCtx->ResponseStatusMonGroup);
static void Report(const TRecord& record, const TCommonHandleClass& handleClass, const TIntrusivePtr<TVDiskContext>& vCtx) {
UpdateMonResponseStatus(record.GetStatus(), handleClass, vCtx->ResponseStatusMonGroup);
}

template<>
void Report(const NKikimrBlobStorage::TEvVMultiPutResult& record, const TIntrusivePtr<TVDiskContext>& vCtx) {
void Report(const NKikimrBlobStorage::TEvVMultiPutResult& record, const TCommonHandleClass& handleClass, const TIntrusivePtr<TVDiskContext>& vCtx) {
for (const auto& item : record.GetItems()) {
UpdateMonResponseStatus(item.GetStatus(), record.GetHandleClass(), vCtx->ResponseStatusMonGroup);
UpdateMonResponseStatus(item.GetStatus(), handleClass, vCtx->ResponseStatusMonGroup);
}
}
};

#define DEFUNE_REPORT(NAME) \
template <typename TRecord> \
typename std::enable_if<IsInTypes<TRecord, TReporting##NAME::EnableFor>::value>::type Report##NAME( \
const TRecord& record, const TIntrusivePtr<TVDiskContext>& vCtx) { \
TReporting##NAME::Report(record, vCtx); \
const TRecord& record, const TCommonHandleClass& handleClass, const TIntrusivePtr<TVDiskContext>& vCtx) { \
TReporting##NAME::Report(record, handleClass, vCtx); \
} \
\
template <typename TRecord> \
typename std::enable_if<!IsInTypes<TRecord, TReporting##NAME::EnableFor>::value>::type Report##NAME( \
const TRecord& record, const TIntrusivePtr<TVDiskContext>& vCtx) {}
const TRecord& record, const TCommonHandleClass& handleClass, const TIntrusivePtr<TVDiskContext>& vCtx) {}

DEFUNE_REPORT(OSStatus)
DEFUNE_REPORT(ResponseStatus)
#undef DEFUNE_REPORT

template <class TRecord>
void ReportResponse(const TRecord& record, const TIntrusivePtr<TVDiskContext>& vCtx) {
ReportOSStatus(record, vCtx);
ReportResponseStatus(record, vCtx);
void ReportResponse(const TRecord& record, const TCommonHandleClass& handleClass, const TIntrusivePtr<TVDiskContext>& vCtx) {
ReportOSStatus(record, handleClass, vCtx);
ReportResponseStatus(record, handleClass, vCtx);
}

void LogOOSStatus(ui32 flags, const TLogoBlobID& blobId, const TString& vDiskLogPrefix, std::atomic<ui32>& curFlags) {
Expand Down Expand Up @@ -183,13 +183,18 @@ void UpdateMonOOSStatus(ui32 flags, const std::shared_ptr<NMonGroup::TOutOfSpace
}
}

void UpdateMonResponseStatus(NKikimrProto::EReplyStatus status, HandleClassType handleClass,
const std::shared_ptr<NMonGroup::TResponseStatusGroup>& monGroup) {
void UpdateMonResponseStatus(NKikimrProto::EReplyStatus status, const TCommonHandleClass& handleClass, const std::shared_ptr<NMonGroup::TResponseStatusGroup>& monGroup) {
if (!monGroup) {
return;
}

monGroup->GetCounter(status, handleClass).Inc();
if (handleClass.PutHandleClass) {
monGroup->GetCounter(status, handleClass.PutHandleClass).Inc();
} else if (handleClass.GetHandleClass) {
monGroup->GetCounter(status, handleClass.GetHandleClass).Inc();
} else {
monGroup->GetCounter(status).Inc();
}
}

} //NKikimr
39 changes: 37 additions & 2 deletions ydb/core/blobstorage/vdisk/common/vdisk_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,43 @@

namespace NKikimr {

void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, const TIntrusivePtr<TVDiskContext>& vCtx);
template <typename TEv, typename = void>
struct THasGetHandleClass : std::false_type {};
template <typename TEv>
struct THasGetHandleClass<TEv, std::void_t<decltype(std::declval<TEv>().GetHandleClass())>> : std::true_type {};

void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, ui32 channel, const TIntrusivePtr<TVDiskContext>& vCtx);
template <typename TEv, typename = void>
struct THasRecordWithGetHandleClass : std::false_type {};
template <typename TEv>
struct THasRecordWithGetHandleClass<TEv, std::void_t<decltype(std::declval<TEv>().Record.GetHandleClass())>> : std::true_type {};

struct TCommonHandleClass {
TCommonHandleClass() = default;

template <typename TEv>
TCommonHandleClass(const TEv& ev) {
if constexpr (THasRecordWithGetHandleClass<TEv>::value) {
TCommonHandleClass(ev.Record.GetHandleClass());
} else if constexpr (THasGetHandleClass<TEv>::value) {
TCommonHandleClass(ev.GetHandleClass());
}
}
template <>
TCommonHandleClass(NKikimrBlobStorage::EPutHandleClass putHandleClass) {
PutHandleClass = putHandleClass;
}
template <>
TCommonHandleClass(NKikimrBlobStorage::EGetHandleClass getHandleClass) {
GetHandleClass = getHandleClass;
}


std::optional<NKikimrBlobStorage::EPutHandleClass> PutHandleClass;
std::optional<NKikimrBlobStorage::EGetHandleClass> GetHandleClass;
};

void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, const TIntrusivePtr<TVDiskContext>& vCtx, const TCommonHandleClass& handleClass);

void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, ui32 channel, const TIntrusivePtr<TVDiskContext>& vCtx, const TCommonHandleClass& handleClass);

}//NKikimr
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
CHECK_PDISK_RESPONSE(HugeKeeperCtx->VCtx, ev, ctx);
ctx.Send(NotifyID, new TEvHullHugeWritten(HugeSlot));
ctx.Send(HugeKeeperCtx->SkeletonId, new TEvHullLogHugeBlob(WriteId, Item->LogoBlobId, Item->Ingress, DiskAddr,
Item->IgnoreBlock, Item->SenderId, Item->Cookie, std::move(Item->Result), &Item->ExtraBlockChecks), 0, 0,
Item->IgnoreBlock, Item->SenderId, Item->Cookie, Item->HandleClass, std::move(Item->Result), &Item->ExtraBlockChecks), 0, 0,
Span.GetTraceId());
LOG_DEBUG(ctx, BS_HULLHUGE,
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ namespace NKikimr {
const bool IgnoreBlock;
const TActorId OrigClient;
const ui64 OrigCookie;
const NKikimrBlobStorage::EPutHandleClass HandleClass;
std::unique_ptr<TEvBlobStorage::TEvVPutResult> Result;
NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> ExtraBlockChecks;

Expand All @@ -82,6 +83,7 @@ namespace NKikimr {
bool ignoreBlock,
const TActorId &origClient,
ui64 origCookie,
NKikimrBlobStorage::EPutHandleClass handleClass,
std::unique_ptr<TEvBlobStorage::TEvVPutResult> result,
NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks)
: WriteId(writeId)
Expand All @@ -91,6 +93,7 @@ namespace NKikimr {
, IgnoreBlock(ignoreBlock)
, OrigClient(origClient)
, OrigCookie(origCookie)
, HandleClass(handleClass)
, Result(std::move(result))
{
if (extraBlockChecks) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/vdisk/query/query_barrier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace NKikimr {
LOG_DEBUG(ctx, BS_VDISK_GC,
VDISKP(HullCtx->VCtx->VDiskLogPrefix,
"TEvVGetBarrierResult: %s", Result->ToString().data()));
SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie, HullCtx->VCtx);
SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie, HullCtx->VCtx, {});
ctx.Send(ParentId, new TEvents::TEvActorDied);
Die(ctx);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/vdisk/query/query_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ namespace NKikimr {
ctx.Send(ReplSchedulerId, new TEvBlobStorage::TEvEnrichNotYet(BatcherCtx->OrigEv, std::move(Result)));
} else {
// send reply event to sender
SendVDiskResponse(ctx, BatcherCtx->OrigEv->Sender, Result.release(), BatcherCtx->OrigEv->Cookie, QueryCtx->HullCtx->VCtx);
SendVDiskResponse(ctx, BatcherCtx->OrigEv->Sender, Result.release(), BatcherCtx->OrigEv->Cookie, QueryCtx->HullCtx->VCtx, TCommonHandleClass(Record.GetHandleClass()));
}

ctx.Send(ParentId, new TEvents::TEvActorDied);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/vdisk/query/query_dumpdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace NKikimr {

// send result
Result->SetResult(str.Str());
SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie, HullCtx->VCtx);
SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie, HullCtx->VCtx, {});
TThis::Die(ctx);
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/vdisk/query/query_public.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ namespace NKikimr {
LOG_DEBUG(ctx, NKikimrServices::BS_VDISK_OTHER,
VDISKP(vctx->VDiskLogPrefix,
"TEvVDbStatResult: %s", result->ToString().data()));
SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie, vctx);
SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie, vctx, {});
}

template <class TKey, class TMemRec>
Expand Down
Loading

0 comments on commit 0ef8627

Please sign in to comment.