Skip to content

Commit

Permalink
Implement PDisk shred event handling, #12483 (#13538)
Browse files Browse the repository at this point in the history
  • Loading branch information
the-ancient-1 authored Jan 21, 2025
1 parent 805caa2 commit 276c163
Show file tree
Hide file tree
Showing 16 changed files with 795 additions and 73 deletions.
7 changes: 7 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,10 @@ struct TEvBlobStorage {
EvHugeAllocateSlots,
EvHugeAllocateSlotsResult,
EvHugeDropAllocatedSlots,
EvShredPDisk,
EvPreShredCompactVDisk,
EvShredVDisk,
EvMarkDirty,

EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
EvLogResult,
Expand Down Expand Up @@ -793,6 +797,9 @@ struct TEvBlobStorage {
EvGetLogoBlobIndexStatResponse,
EvReadMetadataResult,
EvWriteMetadataResult,
EvShredPDiskResult,
EvPreShredCompactVDiskResult,
EvShredVDiskResult,

// internal proxy interface
EvUnusedLocal1 = EvPut + 10 * 512, // Not used. /// 268 637 184
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/base/blobstorage_grouptype.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ struct TBlobStorageGroupType : public TErasureType {
: PartCount(partCount)
{}

TResult(const TString &error)
: Error(error)
TResult(TString error)
: Error(std::move(error))
{}

bool Good() const {
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/blobstorage/base/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ namespace NKikimr {
} else {
str << " ";
}
item.Output(str);
if constexpr (requires { item.Output(str); }) {
item.Output(str);
} else {
str << item;
}
}
str << "]";
}
Expand Down
251 changes: 200 additions & 51 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk.h

Large diffs are not rendered by default.

117 changes: 104 additions & 13 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <ydb/library/pdisk_io/buffers.h>
#include "blobstorage_pdisk_completion_impl.h"
#include "blobstorage_pdisk_crypto.h"
#include "blobstorage_pdisk_data.h"
#include "blobstorage_pdisk_factory.h"
#include "blobstorage_pdisk_impl.h"
Expand All @@ -9,7 +8,6 @@
#include "blobstorage_pdisk_state.h"
#include "blobstorage_pdisk_thread.h"
#include "blobstorage_pdisk_tools.h"
#include "blobstorage_pdisk_util_countedqueueoneone.h"
#include "blobstorage_pdisk_util_cputimer.h"
#include "blobstorage_pdisk_writer.h"

Expand Down Expand Up @@ -58,13 +56,21 @@ void CreatePDiskActor(TGenericExecutorThread& executorThread,

class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
struct TInitQueueItem {
TOwnerRound OwnerRound;
TVDiskID VDisk;
ui64 PDiskGuid;
TOwnerRound OwnerRound = 0;
TVDiskID VDisk = TVDiskID::InvalidId;
ui64 PDiskGuid = 0;
TActorId Sender;
TActorId CutLogId;
TActorId WhiteboardProxyId;
ui32 SlotId;
ui32 SlotId = 0;
bool IsShred = false;
ui64 ShredGeneration = 0;

TInitQueueItem(const TActorId sender, const ui64 shredGeneration)
: Sender(sender)
, IsShred(true)
, ShredGeneration(shredGeneration)
{}

TInitQueueItem(TOwnerRound ownerRound, TVDiskID vDisk, ui64 pDiskGuid, TActorId sender, TActorId cutLogId,
TActorId whiteboardProxyId, ui32 slotId)
Expand Down Expand Up @@ -292,9 +298,16 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
void InitError(const TString &errorReason, bool allowMetadataHandling = false) {
Become(&TThis::StateError);
for (TList<TInitQueueItem>::iterator it = InitQueue.begin(); it != InitQueue.end(); ++it) {
Send(it->Sender, new NPDisk::TEvYardInitResult(NKikimrProto::CORRUPTED, errorReason));
if (PDisk) {
PDisk->Mon.YardInit.CountResponse();
if (it->IsShred) {
Send(it->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, it->ShredGeneration, errorReason));
if (PDisk) {
PDisk->Mon.ShredPDisk.CountResponse();
}
} else {
Send(it->Sender, new NPDisk::TEvYardInitResult(NKikimrProto::CORRUPTED, errorReason));
if (PDisk) {
PDisk->Mon.YardInit.CountResponse();
}
}
}
InitQueue.clear();
Expand Down Expand Up @@ -601,10 +614,16 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
void InitSuccess() {
Become(&TThis::StateOnline);
for (TList<TInitQueueItem>::iterator it = InitQueue.begin(); it != InitQueue.end(); ++it) {
NPDisk::TEvYardInit evInit(it->OwnerRound, it->VDisk, it->PDiskGuid, it->CutLogId, it->WhiteboardProxyId,
it->SlotId);
auto* request = PDisk->ReqCreator.CreateFromEv<TYardInit>(evInit, it->Sender);
PDisk->InputRequest(request);
if (it->IsShred) {
NPDisk::TEvShredPDisk evShredPDisk(it->ShredGeneration);
auto* request = PDisk->ReqCreator.CreateFromEv<NPDisk::TShredPDisk>(evShredPDisk, it->Sender);
PDisk->InputRequest(request);
} else {
NPDisk::TEvYardInit evInit(it->OwnerRound, it->VDisk, it->PDiskGuid, it->CutLogId, it->WhiteboardProxyId,
it->SlotId);
auto* request = PDisk->ReqCreator.CreateFromEv<TYardInit>(evInit, it->Sender);
PDisk->InputRequest(request);
}
}
InitQueue.clear();
if (ControledStartResult) {
Expand Down Expand Up @@ -645,6 +664,26 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
evSlay.VDiskId, evSlay.SlayOwnerRound, evSlay.PDiskId, evSlay.VSlotId, str.Str()));
PDisk->Mon.YardSlay.CountResponse();
}

void InitHandle(NPDisk::TEvShredPDisk::TPtr &ev) {
const NPDisk::TEvShredPDisk &evShredPDisk = *ev->Get();
InitQueue.emplace_back(ev->Sender, evShredPDisk.ShredGeneration);
}

void InitHandle(NPDisk::TEvPreShredCompactVDiskResult::TPtr &ev) {
// Just ignore the event, can't pre-shred compact in this state.
Y_UNUSED(ev);
}

void InitHandle(NPDisk::TEvShredVDiskResult::TPtr &ev) {
// Just ignore the event, can't shred in this state.
Y_UNUSED(ev);
}

void InitHandle(NPDisk::TEvMarkDirty::TPtr &ev) {
// Just ignore the event, can't mark dirty in this state.
Y_UNUSED(ev);
}


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -802,6 +841,26 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
Y_UNUSED(ev);
}

void ErrorHandle(NPDisk::TEvShredPDisk::TPtr &ev) {
// Respond with error, can't shred in this state.
Send(ev->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, 0, StateErrorReason));
}

void ErrorHandle(NPDisk::TEvPreShredCompactVDiskResult::TPtr &ev) {
// Just ignore the event, can't pre-shred compact in this state.
Y_UNUSED(ev);
}

void ErrorHandle(NPDisk::TEvShredVDiskResult::TPtr &ev) {
// Just ignore the event, can't shred in this state.
Y_UNUSED(ev);
}

void ErrorHandle(NPDisk::TEvMarkDirty::TPtr &ev) {
// Just ignore the event, can't mark dirty in this state.
Y_UNUSED(ev);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Online state

Expand Down Expand Up @@ -999,6 +1058,26 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
}
}

void Handle(NPDisk::TEvShredPDisk::TPtr &ev) {
auto* request = PDisk->ReqCreator.CreateFromEv<TShredPDisk>(*ev->Get(), ev->Sender);
PDisk->InputRequest(request);
}

void Handle(NPDisk::TEvPreShredCompactVDiskResult::TPtr &ev) {
auto* request = PDisk->ReqCreator.CreateFromEv<TPreShredCompactVDiskResult>(*ev->Get(), ev->Sender);
PDisk->InputRequest(request);
}

void Handle(NPDisk::TEvShredVDiskResult::TPtr &ev) {
auto* request = PDisk->ReqCreator.CreateFromEv<TShredVDiskResult>(*ev->Get(), ev->Sender);
PDisk->InputRequest(request);
}

void Handle(NPDisk::TEvMarkDirty::TPtr &ev) {
auto* request = PDisk->ReqCreator.CreateFromEv<TMarkDirty>(*ev->Get(), ev->Sender);
PDisk->InputRequest(request);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// All states

Expand Down Expand Up @@ -1353,6 +1432,10 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
hFunc(NPDisk::TEvDeviceError, Handle);
hFunc(TEvBlobStorage::TEvAskWardenRestartPDiskResult, Handle);
hFunc(NPDisk::TEvFormatReencryptionFinish, InitHandle);
hFunc(NPDisk::TEvShredPDisk, InitHandle);
hFunc(NPDisk::TEvPreShredCompactVDiskResult, InitHandle);
hFunc(NPDisk::TEvShredVDiskResult, InitHandle);
hFunc(NPDisk::TEvMarkDirty, InitHandle);

hFunc(TEvReadMetadata, Handle);
hFunc(TEvWriteMetadata, Handle);
Expand Down Expand Up @@ -1380,6 +1463,10 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
hFunc(NPDisk::TEvReadLogContinue, Handle);
hFunc(NPDisk::TEvLogSectorRestore, Handle);
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(NPDisk::TEvShredPDisk, Handle);
hFunc(NPDisk::TEvPreShredCompactVDiskResult, Handle);
hFunc(NPDisk::TEvShredVDiskResult, Handle);
hFunc(NPDisk::TEvMarkDirty, Handle);

cFunc(NActors::TEvents::TSystem::PoisonPill, HandlePoison);
hFunc(NMon::TEvHttpInfo, Handle);
Expand Down Expand Up @@ -1410,6 +1497,10 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
hFunc(NPDisk::TEvReadLogContinue, Handle);
hFunc(NPDisk::TEvLogSectorRestore, Handle);
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(NPDisk::TEvShredPDisk, ErrorHandle);
hFunc(NPDisk::TEvPreShredCompactVDiskResult, ErrorHandle);
hFunc(NPDisk::TEvShredVDiskResult, ErrorHandle);
hFunc(NPDisk::TEvMarkDirty, ErrorHandle);

cFunc(NActors::TEvents::TSystem::PoisonPill, HandlePoison);
hFunc(NMon::TEvHttpInfo, Handle);
Expand Down
Loading

0 comments on commit 276c163

Please sign in to comment.