Skip to content

Commit

Permalink
NBS-5637: Asynchronous disks allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
ya-ksgamora committed Jan 24, 2025
1 parent 4483d4d commit 9692eeb
Show file tree
Hide file tree
Showing 17 changed files with 460 additions and 80 deletions.
3 changes: 3 additions & 0 deletions cloud/blockstore/config/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1103,4 +1103,7 @@ message TStorageServiceConfig
optional uint32 ForcedCompactionRangeCountPerRun = 401;

optional bool YdbViewerServiceEnabled = 402;

// Maximum number of pending allocation requests per one disk.
optional uint32 MaxNonReplicatedDiskAllocationRequests = 403;
}
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ TDuration MSeconds(ui32 value)
xxx(NonReplicatedDontSuspendDevices, bool, false )\
xxx(AddClientRetryTimeoutIncrement, TDuration, MSeconds(100) )\
xxx(MaxNonReplicatedDiskDeallocationRequests, ui32, 16 )\
xxx(MaxNonReplicatedDiskAllocationRequests, ui32, 16 )\
xxx(BalancerActionDelayInterval, TDuration, Seconds(3) )\
\
xxx(UseMirrorResync, bool, false )\
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ class TStorageConfig
TDuration GetDiskRegistryBackupPeriod() const;
TString GetDiskRegistryBackupDirPath() const;

ui32 GetMaxNonReplicatedDiskAllocationRequests() const;
ui32 GetMaxNonReplicatedDiskDeallocationRequests() const;

TDuration GetDiskRegistryMetricsCachePeriod() const;
Expand Down
16 changes: 16 additions & 0 deletions cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class TDiskRegistryActor final
// Pending requests
TDeque<TPendingRequest> PendingRequests;

THashMap<TDiskId, TVector<TRequestInfoPtr>> PendingDiskAllocationRequests;
THashMap<TDiskId, TVector<TRequestInfoPtr>> PendingDiskDeallocationRequests;

bool BrokenDisksDestructionInProgress = false;
Expand Down Expand Up @@ -227,6 +228,21 @@ class TDiskRegistryActor final
TDiskRegistryDatabase& db,
TDiskRegistryStateSnapshot& args);

void AddPendingAllocation(
const NActors::TActorContext& ctx,
const TString& diskId,
TRequestInfoPtr requestInfoPtr);

void ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
const TString& diskId,
NProto::TError error = MakeError(S_OK));

void ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
TVector<TRequestInfoPtr>& requestInfos,
NProto::TError error);

void AddPendingDeallocation(
const NActors::TActorContext& ctx,
const TString& diskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ void TDiskRegistryActor::ExecuteAddDisk(
&result);

args.Devices = std::move(result.Devices);
args.DirtyDevices = std::move(result.DirtyDevices);
args.DeviceMigrations = std::move(result.Migrations);
args.Replicas = std::move(result.Replicas);
args.DeviceReplacementUUIDs = std::move(result.DeviceReplacementIds);
Expand All @@ -157,6 +158,9 @@ void TDiskRegistryActor::CompleteAddDisk(
TStringBuilder devices;
OutputDevices(args.Devices, devices);

TStringBuilder dirtyDevices;
OutputDevices(args.DirtyDevices, dirtyDevices);

TStringBuilder replicas;
replicas << "[";
if (!args.Replicas.empty()) {
Expand All @@ -180,11 +184,12 @@ void TDiskRegistryActor::CompleteAddDisk(
migrations << "]";

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"[%lu] AddDisk success. DiskId=%s Devices=%s Replicas=%s"
" Migrations=%s",
"[%lu] AddDisk success. DiskId=%s Devices=%s DirtyDevices=%s"
" Replicas=%s Migrations=%s",
TabletID(),
args.DiskId.Quote().c_str(),
devices.c_str(),
dirtyDevices.c_str(),
replicas.c_str(),
migrations.c_str()
);
Expand Down Expand Up @@ -240,12 +245,71 @@ void TDiskRegistryActor::CompleteAddDisk(
response->Record.SetMuteIOErrors(args.MuteIOErrors);
}

NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
if (HasError(args.Error) || args.DirtyDevices.empty()) {
NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
} else {
AddPendingAllocation(ctx, args.DiskId, args.RequestInfo);
SecureErase(ctx);
}

DestroyBrokenDisks(ctx);
NotifyUsers(ctx);
}

void TDiskRegistryActor::AddPendingAllocation(
const NActors::TActorContext& ctx,
const TString& diskId,
TRequestInfoPtr requestInfo)
{
auto& requestInfos = PendingDiskAllocationRequests[diskId];

if (requestInfos.size() > Config->GetMaxNonReplicatedDiskAllocationRequests()) {
LOG_WARN(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Too many pending allocation requests (%lu) for disk %s. "
"Reject all requests.",
requestInfos.size(),
diskId.Quote().c_str());

ReplyToPendingAllocations(ctx, requestInfos, MakeError(E_REJECTED));
}

requestInfos.emplace_back(std::move(requestInfo));
}

void TDiskRegistryActor::ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
TVector<TRequestInfoPtr>& requestInfos,
NProto::TError error)
{
for (auto& requestInfo: requestInfos) {
NCloud::Reply(
ctx,
*requestInfo,
std::make_unique<TEvDiskRegistry::TEvAllocateDiskResponse>(error));
}
requestInfos.clear();
}

void TDiskRegistryActor::ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
const TString& diskId,
NProto::TError error)
{
auto it = PendingDiskAllocationRequests.find(diskId);
if (it == PendingDiskAllocationRequests.end()) {
return;
}

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Reply to pending allocation requests. DiskId=%s PendingRequests=%d",
diskId.Quote().c_str(),
static_cast<int>(it->second.size()));

ReplyToPendingAllocations(ctx, it->second, std::move(error));

PendingDiskAllocationRequests.erase(it);
}

////////////////////////////////////////////////////////////////////////////////

void TDiskRegistryActor::HandleDeallocateDisk(
Expand Down Expand Up @@ -379,7 +443,7 @@ void TDiskRegistryActor::ReplyToPendingDeallocations(
}

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Reply to pending deallocation requests. DiskId=%s PendingRquests=%d",
"Reply to pending deallocation requests. DiskId=%s PendingRequests=%d",
diskId.Quote().c_str(),
static_cast<int>(it->second.size()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ void TDiskRegistryActor::ExecuteCleanupDevices(
TTxDiskRegistry::TCleanupDevices& args)
{
TDiskRegistryDatabase db(tx.DB);
args.SyncDeallocatedDisks =
std::tie(args.SyncAllocatedDisks, args.SyncDeallocatedDisks) =
State->MarkDevicesAsClean(ctx.Now(), db, args.Devices);
}

Expand All @@ -293,6 +293,10 @@ void TDiskRegistryActor::CompleteCleanupDevices(
for (const auto& diskId: args.SyncDeallocatedDisks) {
ReplyToPendingDeallocations(ctx, diskId);
}

for (const auto& diskId: args.SyncAllocatedDisks) {
ReplyToPendingAllocations(ctx, diskId);
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,19 @@ void TDiskRegistryActor::CompleteUpdateAgentState(

NCloud::Reply(ctx, *args.RequestInfo, std::move(response));

TVector<TDiskId> failedAllocationDisks;
if (args.State == NProto::AGENT_STATE_UNAVAILABLE) {
ScheduleSwitchAgentDisksToReadOnly(ctx, args.AgentId);
failedAllocationDisks = State->CheckPendingAllocations(args.AgentId);
}

for (const auto& diskId: failedAllocationDisks) {
ReplyToPendingAllocations(
ctx,
diskId,
MakeError(
E_REJECTED,
"Allocation failed due to disk agent problems"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,20 @@ void TDiskRegistryActor::CompleteUpdateDeviceState(
SecureErase(ctx);
StartMigration(ctx);

TDiskId failedAllocationDisk;
if (args.State == NProto::DEVICE_STATE_ERROR) {
failedAllocationDisk = State->CheckPendingAllocation(args.DeviceId);
}

if (failedAllocationDisk) {
ReplyToPendingAllocations(
ctx,
failedAllocationDisk,
MakeError(
E_REJECTED,
"Allocation failed due to disk agent problems"));
}

auto response = std::make_unique<TEvDiskRegistry::TEvChangeDeviceStateResponse>(
std::move(args.Error));
NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
Expand Down
Loading

0 comments on commit 9692eeb

Please sign in to comment.