Skip to content

Commit

Permalink
NBS-5637: Asynchronous disks allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
ya-ksgamora committed Jan 17, 2025
1 parent 6d14024 commit 2460bac
Show file tree
Hide file tree
Showing 13 changed files with 344 additions and 70 deletions.
3 changes: 3 additions & 0 deletions cloud/blockstore/config/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1074,4 +1074,7 @@ message TStorageServiceConfig

// Enabling direct copying of data between disk agents.
optional bool UseDirectCopyRange = 394;

// Maximum number of pending allocation requests per one disk.
optional uint32 MaxNonReplicatedDiskAllocationRequests = 395;
}
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ TDuration MSeconds(ui32 value)
xxx(NonReplicatedDontSuspendDevices, bool, false )\
xxx(AddClientRetryTimeoutIncrement, TDuration, MSeconds(100) )\
xxx(MaxNonReplicatedDiskDeallocationRequests, ui32, 16 )\
xxx(MaxNonReplicatedDiskAllocationRequests, ui32, 16 )\
xxx(BalancerActionDelayInterval, TDuration, Seconds(3) )\
\
xxx(UseMirrorResync, bool, false )\
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ class TStorageConfig
TDuration GetDiskRegistryBackupPeriod() const;
TString GetDiskRegistryBackupDirPath() const;

ui32 GetMaxNonReplicatedDiskAllocationRequests() const;
ui32 GetMaxNonReplicatedDiskDeallocationRequests() const;

TDuration GetDiskRegistryMetricsCachePeriod() const;
Expand Down
15 changes: 15 additions & 0 deletions cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class TDiskRegistryActor final
// Pending requests
TDeque<TPendingRequest> PendingRequests;

THashMap<TDiskId, TVector<TRequestInfoPtr>> PendingDiskAllocationRequests;
THashMap<TDiskId, TVector<TRequestInfoPtr>> PendingDiskDeallocationRequests;

bool BrokenDisksDestructionInProgress = false;
Expand Down Expand Up @@ -227,6 +228,20 @@ class TDiskRegistryActor final
TDiskRegistryDatabase& db,
TDiskRegistryStateSnapshot& args);

void AddPendingAllocation(
const NActors::TActorContext& ctx,
const TString& diskId,
TRequestInfoPtr requestInfoPtr);

void ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
const TString& diskId);

void ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
TVector<TRequestInfoPtr>& requestInfos,
NProto::TError error);

void AddPendingDeallocation(
const NActors::TActorContext& ctx,
const TString& diskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ void TDiskRegistryActor::ExecuteAddDisk(
&result);

args.Devices = std::move(result.Devices);
args.DirtyDevices = std::move(result.DirtyDevices);
args.DeviceMigrations = std::move(result.Migrations);
args.Replicas = std::move(result.Replicas);
args.DeviceReplacementUUIDs = std::move(result.DeviceReplacementIds);
Expand All @@ -157,6 +158,9 @@ void TDiskRegistryActor::CompleteAddDisk(
TStringBuilder devices;
OutputDevices(args.Devices, devices);

TStringBuilder dirtyDevices;
OutputDevices(args.DirtyDevices, dirtyDevices);

TStringBuilder replicas;
replicas << "[";
if (!args.Replicas.empty()) {
Expand All @@ -180,11 +184,12 @@ void TDiskRegistryActor::CompleteAddDisk(
migrations << "]";

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"[%lu] AddDisk success. DiskId=%s Devices=%s Replicas=%s"
" Migrations=%s",
"[%lu] AddDisk success. DiskId=%s Devices=%s DirtyDevices=%s"
" Replicas=%s Migrations=%s",
TabletID(),
args.DiskId.Quote().c_str(),
devices.c_str(),
dirtyDevices.c_str(),
replicas.c_str(),
migrations.c_str()
);
Expand Down Expand Up @@ -240,12 +245,70 @@ void TDiskRegistryActor::CompleteAddDisk(
response->Record.SetMuteIOErrors(args.MuteIOErrors);
}

NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
if (HasError(args.Error) || args.DirtyDevices.empty()) {
NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
} else {
AddPendingAllocation(ctx, args.DiskId, args.RequestInfo);
SecureErase(ctx);
}

DestroyBrokenDisks(ctx);
NotifyUsers(ctx);
}

void TDiskRegistryActor::AddPendingAllocation(
const NActors::TActorContext& ctx,
const TString& diskId,
TRequestInfoPtr requestInfo)
{
auto& requestInfos = PendingDiskAllocationRequests[diskId];

if (requestInfos.size() > Config->GetMaxNonReplicatedDiskAllocationRequests()) {
LOG_WARN(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Too many pending allocation requests (%lu) for disk %s. "
"Reject all requests.",
requestInfos.size(),
diskId.Quote().c_str());

ReplyToPendingAllocations(ctx, requestInfos, MakeError(E_REJECTED));
}

requestInfos.emplace_back(std::move(requestInfo));
}

void TDiskRegistryActor::ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
TVector<TRequestInfoPtr>& requestInfos,
NProto::TError error)
{
for (auto& requestInfo: requestInfos) {
NCloud::Reply(
ctx,
*requestInfo,
std::make_unique<TEvDiskRegistry::TEvAllocateDiskResponse>(error));
}
requestInfos.clear();
}

void TDiskRegistryActor::ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
const TString& diskId)
{
auto it = PendingDiskAllocationRequests.find(diskId);
if (it == PendingDiskAllocationRequests.end()) {
return;
}

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Reply to pending allocation requests. DiskId=%s PendingRequests=%d",
diskId.Quote().c_str(),
static_cast<int>(it->second.size()));

ReplyToPendingAllocations(ctx, it->second, MakeError(S_OK));

PendingDiskAllocationRequests.erase(it);
}

////////////////////////////////////////////////////////////////////////////////

void TDiskRegistryActor::HandleDeallocateDisk(
Expand Down Expand Up @@ -379,7 +442,7 @@ void TDiskRegistryActor::ReplyToPendingDeallocations(
}

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Reply to pending deallocation requests. DiskId=%s PendingRquests=%d",
"Reply to pending deallocation requests. DiskId=%s PendingRequests=%d",
diskId.Quote().c_str(),
static_cast<int>(it->second.size()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ void TDiskRegistryActor::ExecuteCleanupDevices(
TTxDiskRegistry::TCleanupDevices& args)
{
TDiskRegistryDatabase db(tx.DB);
args.SyncDeallocatedDisks =
State->MarkDevicesAsClean(ctx.Now(), db, args.Devices);
std::tie(args.SyncAllocatedDisks, args.SyncDeallocatedDisks) =
std::move(State->MarkDevicesAsClean(ctx.Now(), db, args.Devices));
}

void TDiskRegistryActor::CompleteCleanupDevices(
Expand All @@ -293,6 +293,10 @@ void TDiskRegistryActor::CompleteCleanupDevices(
for (const auto& diskId: args.SyncDeallocatedDisks) {
ReplyToPendingDeallocations(ctx, diskId);
}

for (const auto& diskId: args.SyncAllocatedDisks) {
ReplyToPendingAllocations(ctx, diskId);
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
107 changes: 90 additions & 17 deletions cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ void TDiskRegistryState::ProcessDirtyDevices(TVector<TDirtyDevice> dirtyDevices)
{
for (auto&& [uuid, diskId]: dirtyDevices) {
if (!diskId.empty()) {
auto error = PendingCleanup.Insert(diskId, std::move(uuid));
auto error = PendingCleanup.Insert(diskId, std::move(uuid), /*allocation=*/false);
if (HasError(error)) {
ReportDiskRegistryInsertToPendingCleanupFailed(
TStringBuilder()
Expand Down Expand Up @@ -1299,16 +1299,23 @@ NProto::TError TDiskRegistryState::ReplaceDeviceWithoutDiskStateUpdate(
}

if (!manual && !deviceReplacementId.empty()) {
auto cleaningDiskId =
auto [allocating, deallocating] =
PendingCleanup.FindDiskId(deviceReplacementId);
if (!cleaningDiskId.empty() && cleaningDiskId != diskId) {
if (!allocating || *allocating != diskId) {
allocating = std::nullopt;
}
if (!deallocating || *deallocating != diskId) {
deallocating = std::nullopt;
}
auto owningDisk = allocating ? allocating : deallocating;
if (owningDisk) {
return MakeError(
E_ARGUMENT,
TStringBuilder()
<< "can't allocate specific device "
<< deviceReplacementId.Quote() << " for disk " << diskId
<< " since it is in pending cleanup for disk "
<< cleaningDiskId);
<< *owningDisk);
}
}

Expand Down Expand Up @@ -1414,7 +1421,7 @@ NProto::TError TDiskRegistryState::ReplaceDeviceWithoutDiskStateUpdate(
UpdatePlacementGroup(db, diskId, disk, "ReplaceDevice");
UpdateAndReallocateDisk(db, diskId, disk);

error = PendingCleanup.Insert(diskId, deviceId);
error = PendingCleanup.Insert(diskId, deviceId, /*allocation=*/false);
if (HasError(error)) {
ReportDiskRegistryInsertToPendingCleanupFailed(
TStringBuilder() << "An error occurred while replacing device: "
Expand Down Expand Up @@ -2756,6 +2763,37 @@ NProto::TError TDiskRegistryState::AllocateSimpleDisk(
params.BlockSize << " bytes");
}

// check that we can secure erase each dirty allocated device
TVector<TString> dirtyDevices;
for (const auto& device: allocatedDevices) {
const auto& uuid = device.GetDeviceUUID();
if (!IsDirtyDevice(uuid)) {
continue;
}
// if we can't secure erase one of allocated disk's dirty devices, we can't allocate the disk
if (!CanSecureErase(uuid)) {
onError();

return MakeError(E_BS_DISK_ALLOCATION_FAILED, TStringBuilder() <<
"can't secure erase device " << uuid);
}
dirtyDevices.push_back(uuid);
result->DirtyDevices.push_back(device);
}

if (dirtyDevices) {
NProto::TError cleanupError = PendingCleanup.Insert(params.DiskId, std::move(dirtyDevices), /*allocation=*/true);
// if we can't secure erase one of allocated disk's dirty devices, we can't allocate the disk
if (HasError(cleanupError)) {
onError();

ReportDiskRegistryInsertToPendingCleanupFailed(
TStringBuilder() << "An error occurred while allocating disk: "
<< FormatError(cleanupError));
return cleanupError;
}
}

for (const auto& device: allocatedDevices) {
disk.Devices.push_back(device.GetDeviceUUID());
}
Expand Down Expand Up @@ -2851,7 +2889,7 @@ NProto::TError TDiskRegistryState::DeallocateDisk(
JoinSeq(", ", devicesAllowedToBeCleaned).c_str());

auto error =
PendingCleanup.Insert(diskId, std::move(devicesAllowedToBeCleaned));
PendingCleanup.Insert(diskId, std::move(devicesAllowedToBeCleaned), /*allocation=*/false);
if (HasError(error)) {
ReportDiskRegistryInsertToPendingCleanupFailed(
TStringBuilder() << "An error occurred while deallocating disk: "
Expand Down Expand Up @@ -2998,6 +3036,11 @@ auto TDiskRegistryState::DeallocateSimpleDisk(
db.UpdateDirtyDevice(uuid, diskId);
}

auto agents = FindDiskDevicesAgents(disk);
for (const auto* agent: agents) {
DeviceList.UpdateDevices(*agent, DevicePoolConfigs);
}

DeleteAllDeviceMigrations(diskId);
DeleteDisk(db, diskId);

Expand Down Expand Up @@ -3855,16 +3898,20 @@ bool TDiskRegistryState::MarkDeviceAsDirty(
return true;
}

TDiskRegistryState::TDiskId TDiskRegistryState::MarkDeviceAsClean(
TDiskRegistryState::TOpt2Disk TDiskRegistryState::MarkDeviceAsClean(
TInstant now,
TDiskRegistryDatabase& db,
const TDeviceId& uuid)
{
auto ret = MarkDevicesAsClean(now, db, TVector<TDeviceId>{uuid});
return ret.empty() ? "" : ret[0];
auto [alloc, dealloc] = MarkDevicesAsClean(now, db, TVector<TDeviceId>{uuid});
return {
alloc.empty() ? std::nullopt : std::make_optional(std::move(alloc[0])),
dealloc.empty() ? std::nullopt
: std::make_optional(std::move(dealloc[0]))};
}

TVector<TDiskRegistryState::TDiskId> TDiskRegistryState::MarkDevicesAsClean(
std::pair<TDiskRegistryState::TAllocatedDisksList, TDiskRegistryState::TDellocatedDisksList>
TDiskRegistryState::MarkDevicesAsClean(
TInstant now,
TDiskRegistryDatabase& db,
const TVector<TDeviceId>& uuids)
Expand All @@ -3878,14 +3925,19 @@ TVector<TDiskRegistryState::TDiskId> TDiskRegistryState::MarkDevicesAsClean(
}
}

TVector<TDiskId> ret;
TAllocatedDisksList allocatedDisks;
TDellocatedDisksList dellocatedDisks;
for (const auto& uuid: TryUpdateDevices(now, db, uuids)) {
if (auto diskId = PendingCleanup.EraseDevice(uuid); !diskId.empty()) {
ret.push_back(std::move(diskId));
auto [allocatedDisk, deallocatedDisk] = PendingCleanup.EraseDevice(uuid);
if (allocatedDisk) {
allocatedDisks.push_back(std::move(*allocatedDisk));
}
if (deallocatedDisk) {
dellocatedDisks.push_back(std::move(*deallocatedDisk));
}
}

return ret;
return {std::move(allocatedDisks), std::move(dellocatedDisks)};
}

bool TDiskRegistryState::TryUpdateDevice(
Expand Down Expand Up @@ -4811,7 +4863,7 @@ void TDiskRegistryState::RemoveFinishedMigrations(

DeviceList.ReleaseDevice(m.DeviceId);
db.UpdateDirtyDevice(m.DeviceId, diskId);
auto error = PendingCleanup.Insert(diskId, m.DeviceId);
auto error = PendingCleanup.Insert(diskId, m.DeviceId, /*allocation=*/false);
if (HasError(error)) {
ReportDiskRegistryInsertToPendingCleanupFailed(
TStringBuilder()
Expand Down Expand Up @@ -5072,8 +5124,9 @@ bool TDiskRegistryState::HasDependentSsdDisks(
continue;
}

auto [allocating, deallocating] = PendingCleanup.FindDiskId(d.GetDeviceUUID());
if (d.GetPoolKind() == NProto::DEVICE_POOL_KIND_LOCAL &&
PendingCleanup.FindDiskId(d.GetDeviceUUID()))
(allocating || deallocating))
{
return true;
}
Expand Down Expand Up @@ -5598,6 +5651,24 @@ auto TDiskRegistryState::FindDeviceLocation(const TDeviceId& deviceId) const
return const_cast<TDiskRegistryState*>(this)->FindDeviceLocation(deviceId);
}

auto TDiskRegistryState::FindDiskDevicesAgents(const TDiskState& disk) const
-> std::set<const NProto::TAgentConfig*>
{
std::set<TString> diskAgents;
for (const auto& uuid: disk.Devices) {
// TODO: handle when not found
diskAgents.insert(DeviceList.FindAgentId(uuid));
}

std::set<const NProto::TAgentConfig*> agents;
for (const auto& agentId: diskAgents) {
// TODO: handle when not found
agents.insert(AgentList.FindAgent(agentId));
}

return std::move(agents);
}

auto TDiskRegistryState::FindDeviceLocation(const TDeviceId& deviceId)
-> std::pair<NProto::TAgentConfig*, NProto::TDeviceConfig*>
{
Expand Down Expand Up @@ -6354,7 +6425,9 @@ NProto::TDiskRegistryStateBackup TDiskRegistryState::BackupState() const
transform(GetDirtyDevices(), backup.MutableDirtyDevices(), [this] (auto& x) {
NProto::TDiskRegistryStateBackup::TDirtyDevice dd;
dd.SetId(x.GetDeviceUUID());
dd.SetDiskId(PendingCleanup.FindDiskId(x.GetDeviceUUID()));
auto [allocating, deallocating] = PendingCleanup.FindDiskId(x.GetDeviceUUID()); // TODO: need to backup
Y_UNUSED(allocating);
dd.SetDiskId(*deallocating);

return dd;
});
Expand Down
Loading

0 comments on commit 2460bac

Please sign in to comment.