From 95580a71f2263eede02efe83689ba8b4f1ddcd6a Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Wed, 15 Jan 2025 17:27:52 +0700 Subject: [PATCH 1/3] issue-2725: mv acquire release logic from DR --- .../acquire_devices_actor.cpp} | 259 +++++------- .../storage/core/acquire_release_devices.h | 107 +++++ .../libs/storage/core/proto_helpers.cpp | 11 + .../libs/storage/core/proto_helpers.h | 1 + .../storage/core/release_devices_actor.cpp | 271 +++++++++++++ cloud/blockstore/libs/storage/core/ya.make | 2 + .../disk_registry/disk_registry_actor.cpp | 45 ++- .../disk_registry/disk_registry_actor.h | 18 +- .../disk_registry_actor_acquire_release.cpp | 245 ++++++++++++ .../disk_registry_actor_release.cpp | 373 ------------------ .../disk_registry/disk_registry_private.h | 67 +--- .../disk_registry/disk_registry_state.cpp | 6 + .../disk_registry/disk_registry_state.h | 1 + .../disk_registry_ut_session.cpp | 8 +- .../storage/disk_registry/testlib/test_env.h | 12 - .../libs/storage/disk_registry/ya.make | 3 +- 16 files changed, 788 insertions(+), 641 deletions(-) rename cloud/blockstore/libs/storage/{disk_registry/disk_registry_actor_acquire.cpp => core/acquire_devices_actor.cpp} (58%) create mode 100644 cloud/blockstore/libs/storage/core/acquire_release_devices.h create mode 100644 cloud/blockstore/libs/storage/core/release_devices_actor.cpp create mode 100644 cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp delete mode 100644 cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp b/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp similarity index 58% rename from cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp rename to cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp index f7d78f8ef35..88eb853c88a 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp +++ b/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp @@ -1,53 +1,52 @@ -#include "disk_registry_actor.h" +#include "acquire_release_devices.h" -#include +#include +#include +#include -#include +#include +#include +#include #include -#include - -namespace NCloud::NBlockStore::NStorage { +namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices { using namespace NActors; -using namespace NKikimr::NTabletFlatExecutor; - namespace { //////////////////////////////////////////////////////////////////////////////// -class TAcquireDiskActor final - : public TActorBootstrapped +class TAcquireDevicesActor final: public TActorBootstrapped { private: const TActorId Owner; - TRequestInfoPtr RequestInfo; TVector Devices; - const ui32 LogicalBlockSize = 0; - const TString DiskId; - const TString ClientId; + TString DiskId; + TString ClientId; const NProto::EVolumeAccessMode AccessMode; const ui64 MountSeqNumber; const ui32 VolumeGeneration; const TDuration RequestTimeout; + const bool MuteIOErrors; + NLog::EComponent Component; int PendingRequests = 0; TVector SentAcquireRequests; public: - TAcquireDiskActor( + TAcquireDevicesActor( const TActorId& owner, - TRequestInfoPtr requestInfo, TVector devices, - ui32 logicalBlockSize, TString diskId, TString clientId, NProto::EVolumeAccessMode accessMode, ui64 mountSeqNumber, ui32 volumeGeneration, - TDuration requestTimeout); + TDuration requestTimeout, + bool muteIOErrors, + NLog::EComponent component); void Bootstrap(const TActorContext& ctx); @@ -55,8 +54,6 @@ class TAcquireDiskActor final void PrepareRequest(NProto::TAcquireDevicesRequest& request) const; void PrepareRequest(NProto::TReleaseDevicesRequest& request) const; - void FinishAcquireDisk(const TActorContext& ctx, NProto::TError error); - void ReplyAndDie(const TActorContext& ctx, NProto::TError error); void OnAcquireResponse( @@ -104,45 +101,47 @@ class TAcquireDiskActor final //////////////////////////////////////////////////////////////////////////////// -TAcquireDiskActor::TAcquireDiskActor( +TAcquireDevicesActor::TAcquireDevicesActor( const TActorId& owner, - TRequestInfoPtr requestInfo, TVector devices, - ui32 logicalBlockSize, TString diskId, TString clientId, NProto::EVolumeAccessMode accessMode, ui64 mountSeqNumber, ui32 volumeGeneration, - TDuration requestTimeout) + TDuration requestTimeout, + bool muteIOErrors, + NLog::EComponent component) : Owner(owner) - , RequestInfo(std::move(requestInfo)) , Devices(std::move(devices)) - , LogicalBlockSize(logicalBlockSize) , DiskId(std::move(diskId)) , ClientId(std::move(clientId)) , AccessMode(accessMode) , MountSeqNumber(mountSeqNumber) , VolumeGeneration(volumeGeneration) , RequestTimeout(requestTimeout) + , MuteIOErrors(muteIOErrors) + , Component(component) { SortBy(Devices, [] (auto& d) { return d.GetNodeId(); }); } -void TAcquireDiskActor::Bootstrap(const TActorContext& ctx) +void TAcquireDevicesActor::Bootstrap(const TActorContext& ctx) { Become(&TThis::StateAcquire); if (Devices.empty()) { - FinishAcquireDisk(ctx, {}); + ReplyAndDie(ctx, {}); return; } ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup()); - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, + LOG_DEBUG( + ctx, + Component, "[%s] Sending acquire devices requests for disk %s, targets %s", ClientId.c_str(), DiskId.c_str(), @@ -156,29 +155,13 @@ void TAcquireDiskActor::Bootstrap(const TActorContext& ctx) TInstant now = ctx.Now(); for (auto& x: sentRequests) { SentAcquireRequests.push_back(TAgentAcquireDevicesCachedRequest{ - std::move(x.AgentId), - std::move(x.Record), - now}); + .AgentId = std::move(x.AgentId), + .Request = std::move(x.Record), + .RequestTime = now}); } } -void TAcquireDiskActor::FinishAcquireDisk( - const TActorContext& ctx, - NProto::TError error) -{ - using TType = TEvDiskRegistryPrivate::TEvFinishAcquireDiskRequest; - NCloud::Send( - ctx, - Owner, - std::make_unique( - DiskId, - ClientId, - std::move(SentAcquireRequests))); - - ReplyAndDie(ctx, std::move(error)); -} - -void TAcquireDiskActor::PrepareRequest( +void TAcquireDevicesActor::PrepareRequest( NProto::TAcquireDevicesRequest& request) const { request.MutableHeaders()->SetClientId(ClientId); @@ -188,14 +171,14 @@ void TAcquireDiskActor::PrepareRequest( request.SetVolumeGeneration(VolumeGeneration); } -void TAcquireDiskActor::PrepareRequest( +void TAcquireDevicesActor::PrepareRequest( NProto::TReleaseDevicesRequest& request) const { request.MutableHeaders()->SetClientId(ClientId); } template -auto TAcquireDiskActor::CreateRequests() const +auto TAcquireDevicesActor::CreateRequests() const -> TVector> { auto it = Devices.begin(); @@ -216,7 +199,7 @@ auto TAcquireDiskActor::CreateRequests() const } template -void TAcquireDiskActor::SendRequests( +void TAcquireDevicesActor::SendRequests( const TActorContext& ctx, const TVector>& requests) { @@ -227,7 +210,9 @@ void TAcquireDiskActor::SendRequests( TCallContextPtr {}, r.Record); - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, + LOG_DEBUG( + ctx, + Component, "[%s] Send an acquire request to node #%d. Devices: %s", ClientId.c_str(), r.NodeId, @@ -248,40 +233,34 @@ void TAcquireDiskActor::SendRequests( } } -void TAcquireDiskActor::ReplyAndDie( +void TAcquireDevicesActor::ReplyAndDie( const TActorContext& ctx, NProto::TError error) { - auto response = std::make_unique( - std::move(error)); - - if (HasError(response->GetError())) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, - "[%s] AcquireDisk %s targets %s error: %s", + if (HasError(error)) { + LOG_ERROR( + ctx, + Component, + "[%s] AcquireDevices %s targets %s error: %s", ClientId.c_str(), DiskId.c_str(), LogTargets().c_str(), - FormatError(response->GetError()).c_str()); - } else { - response->Record.MutableDevices()->Reserve(Devices.size()); - - for (auto& device: Devices) { - ToLogicalBlocks(device, LogicalBlockSize); - *response->Record.AddDevices() = std::move(device); - } + FormatError(error).c_str()); } - NCloud::Reply(ctx, *RequestInfo, std::move(response)); + auto response = std::make_unique( + std::move(DiskId), + std::move(ClientId), + std::move(SentAcquireRequests), + std::move(Devices), + std::move(error)); - NCloud::Send( - ctx, - Owner, - std::make_unique()); + NCloud::Send(ctx, Owner, std::move(response)); Die(ctx); } //////////////////////////////////////////////////////////////////////////////// -void TAcquireDiskActor::HandlePoisonPill( +void TAcquireDevicesActor::HandlePoisonPill( const TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx) { @@ -290,15 +269,17 @@ void TAcquireDiskActor::HandlePoisonPill( ReplyAndDie(ctx, MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); } -void TAcquireDiskActor::OnAcquireResponse( +void TAcquireDevicesActor::OnAcquireResponse( const TActorContext& ctx, ui32 nodeId, NProto::TError error) { Y_ABORT_UNLESS(PendingRequests > 0); - if (HasError(error)) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, + if (HasError(error) && !MuteIOErrors) { + LOG_ERROR( + ctx, + Component, "[%s] AcquireDevices on the node #%d %s error: %s", ClientId.c_str(), nodeId, @@ -306,7 +287,9 @@ void TAcquireDiskActor::OnAcquireResponse( FormatError(error).c_str()); if (GetErrorKind(error) != EErrorKind::ErrorRetriable) { - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, + LOG_DEBUG( + ctx, + Component, "[%s] Canceling acquire operation for disk %s, targets %s", ClientId.c_str(), DiskId.c_str(), @@ -318,17 +301,17 @@ void TAcquireDiskActor::OnAcquireResponse( } SentAcquireRequests.clear(); - FinishAcquireDisk(ctx, std::move(error)); + ReplyAndDie(ctx, std::move(error)); return; } if (--PendingRequests == 0) { - FinishAcquireDisk(ctx, {}); + ReplyAndDie(ctx, {}); } } -void TAcquireDiskActor::HandleAcquireDevicesResponse( +void TAcquireDevicesActor::HandleAcquireDevicesResponse( const TEvDiskAgent::TEvAcquireDevicesResponse::TPtr& ev, const TActorContext& ctx) { @@ -338,7 +321,7 @@ void TAcquireDiskActor::HandleAcquireDevicesResponse( ev->Get()->GetError()); } -void TAcquireDiskActor::HandleAcquireDevicesUndelivery( +void TAcquireDevicesActor::HandleAcquireDevicesUndelivery( const TEvDiskAgent::TEvAcquireDevicesRequest::TPtr& ev, const TActorContext& ctx) { @@ -348,7 +331,7 @@ void TAcquireDiskActor::HandleAcquireDevicesUndelivery( MakeError(E_REJECTED, "not delivered")); } -void TAcquireDiskActor::HandleWakeup( +void TAcquireDevicesActor::HandleWakeup( const TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { @@ -360,14 +343,14 @@ void TAcquireDiskActor::HandleWakeup( //////////////////////////////////////////////////////////////////////////////// -TString TAcquireDiskActor::LogTargets() const +TString TAcquireDevicesActor::LogTargets() const { return LogDevices(Devices); } //////////////////////////////////////////////////////////////////////////////// -STFUNC(TAcquireDiskActor::StateAcquire) +STFUNC(TAcquireDevicesActor::StateAcquire) { switch (ev->GetTypeRewrite()) { HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); @@ -380,7 +363,7 @@ STFUNC(TAcquireDiskActor::StateAcquire) HFunc(TEvents::TEvWakeup, HandleWakeup); default: - HandleUnexpectedEvent(ev, TBlockStoreComponents::DISK_REGISTRY_WORKER); + HandleUnexpectedEvent(ev, Component); break; } } @@ -389,91 +372,31 @@ STFUNC(TAcquireDiskActor::StateAcquire) //////////////////////////////////////////////////////////////////////////////// -void TDiskRegistryActor::HandleAcquireDisk( - const TEvDiskRegistry::TEvAcquireDiskRequest::TPtr& ev, - const TActorContext& ctx) +TActorId AcquireDevices( + const NActors::TActorContext& ctx, + const TActorId& owner, + TVector devices, + TString diskId, + TString clientId, + NProto::EVolumeAccessMode accessMode, + ui64 mountSeqNumber, + ui32 volumeGeneration, + TDuration requestTimeout, + bool muteIOErrors, + NLog::EComponent component) { - BLOCKSTORE_DISK_REGISTRY_COUNTER(AcquireDisk); - - const auto* msg = ev->Get(); - - auto clientId = msg->Record.GetHeaders().GetClientId(); - auto diskId = msg->Record.GetDiskId(); - - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY, - "[%lu] Received AcquireDisk request: " - "DiskId=%s, ClientId=%s, AccessMode=%u, MountSeqNumber=%lu" - ", VolumeGeneration=%u", - TabletID(), - diskId.c_str(), - clientId.c_str(), - static_cast(msg->Record.GetAccessMode()), - msg->Record.GetMountSeqNumber(), - msg->Record.GetVolumeGeneration()); - - TDiskInfo diskInfo; - auto error = State->StartAcquireDisk(diskId, diskInfo); - - if (HasError(error)) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, - "[%s] AcquireDisk %s error: %s", - clientId.c_str(), - diskId.c_str(), - FormatError(error).c_str()); - - NCloud::Reply( - ctx, - *ev, - std::make_unique( - std::move(error))); - return; - } - - State->FilterDevicesAtUnavailableAgents(diskInfo); - - TVector devices = std::move(diskInfo.Devices); - for (auto& migration: diskInfo.Migrations) { - devices.push_back(std::move(*migration.MutableTargetDevice())); - } - for (auto& replica: diskInfo.Replicas) { - devices.insert(devices.end(), - std::make_move_iterator(replica.begin()), - std::make_move_iterator(replica.end())); - } - - auto actor = NCloud::Register( + return NCloud::Register( ctx, - ctx.SelfID, - CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext), + owner, std::move(devices), - diskInfo.LogicalBlockSize, - std::move(diskId), + diskId, std::move(clientId), - msg->Record.GetAccessMode(), - msg->Record.GetMountSeqNumber(), - msg->Record.GetVolumeGeneration(), - Config->GetAgentRequestTimeout()); - Actors.insert(actor); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TDiskRegistryActor::HandleFinishAcquireDisk( - const TEvDiskRegistryPrivate::TEvFinishAcquireDiskRequest::TPtr& ev, - const TActorContext& ctx) -{ - auto* msg = ev->Get(); - - State->FinishAcquireDisk(msg->DiskId); - - OnDiskAcquired(std::move(msg->SentRequests)); - - auto response = std::make_unique< - TEvDiskRegistryPrivate::TEvFinishAcquireDiskResponse>(); - NCloud::Reply(ctx, *ev, std::move(response)); + accessMode, + mountSeqNumber, + volumeGeneration, + requestTimeout, + muteIOErrors, + component); } -} // namespace NCloud::NBlockStore::NStorage +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices \ No newline at end of file diff --git a/cloud/blockstore/libs/storage/core/acquire_release_devices.h b/cloud/blockstore/libs/storage/core/acquire_release_devices.h new file mode 100644 index 00000000000..93ebe418234 --- /dev/null +++ b/cloud/blockstore/libs/storage/core/acquire_release_devices.h @@ -0,0 +1,107 @@ + +#pragma once + +#include + +#include +#include + +#include + +namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices { + +struct TAgentAcquireDevicesCachedRequest +{ + TString AgentId; + NProto::TAcquireDevicesRequest Request; + TInstant RequestTime; +}; + +struct TAgentReleaseDevicesCachedRequest +{ + TString AgentId; + NProto::TReleaseDevicesRequest Request; +}; + +struct TDevicesAcquireFinished +{ + TString DiskId; + TString ClientId; + TVector SentRequests; + TVector Devices; + NProto::TError Error; + + TDevicesAcquireFinished( + TString diskId, + TString clientId, + TVector sentRequests, + TVector devices, + NProto::TError error) + : DiskId(std::move(diskId)) + , ClientId(std::move(clientId)) + , SentRequests(std::move(sentRequests)) + , Devices(std::move(devices)) + , Error(std::move(error)) + {} +}; + +struct TDevicesReleaseFinished +{ + TString DiskId; + TString ClientId; + TVector SentRequests; + NProto::TError Error; + + TDevicesReleaseFinished( + TString diskId, + TString clientId, + TVector sentRequests, + NProto::TError error) + : DiskId(std::move(diskId)) + , ClientId(std::move(clientId)) + , SentRequests(std::move(sentRequests)) + , Error(std::move(error)) + {} +}; + +enum EEvents +{ + EvBegin, + + EvDevicesAcquireFinished, + EvDevicesReleaseFinished, + + EvEnd +}; + +using TEvDevicesAcquireFinished = + TRequestEvent; + +using TEvDevicesReleaseFinished = + TRequestEvent; + +TActorId AcquireDevices( + const NActors::TActorContext& ctx, + const TActorId& owner, + TVector devices, + TString diskId, + TString clientId, + NProto::EVolumeAccessMode accessMode, + ui64 mountSeqNumber, + ui32 volumeGeneration, + TDuration requestTimeout, + bool muteIOErrors, + NActors::NLog::EComponent component); + +TActorId ReleaseDevices( + const NActors::TActorContext& ctx, + const TActorId& owner, + TString diskId, + TString clientId, + ui32 volumeGeneration, + TDuration requestTimeout, + TVector devices, + bool muteIOErrors, + NActors::NLog::EComponent component); + +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices \ No newline at end of file diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.cpp b/cloud/blockstore/libs/storage/core/proto_helpers.cpp index 4f1c472d4f7..51a23fb1e7d 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.cpp +++ b/cloud/blockstore/libs/storage/core/proto_helpers.cpp @@ -444,4 +444,15 @@ ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request) return request.Record.GetVolumeRequestId(); } +TString LogDevices(const TVector& devices) +{ + TStringBuilder sb; + sb << "( "; + for (const auto& d: devices) { + sb << d.GetDeviceUUID() << "@" << d.GetAgentId() << " "; + } + sb << ")"; + return sb; +} + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.h b/cloud/blockstore/libs/storage/core/proto_helpers.h index 0fbecf7cbc8..f8ecfb90327 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.h +++ b/cloud/blockstore/libs/storage/core/proto_helpers.h @@ -223,4 +223,5 @@ TBlockRange64 BuildRequestBlockRange( ui64 GetVolumeRequestId(const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request); ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request); +TString LogDevices(const TVector& devices); } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/core/release_devices_actor.cpp b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp new file mode 100644 index 00000000000..931724ae91e --- /dev/null +++ b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp @@ -0,0 +1,271 @@ +#include "acquire_release_devices.h" + +#include +#include + +#include +#include +#include + +#include + +namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices { +using namespace NActors; + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TReleaseDevicesActor final + : public TActorBootstrapped +{ +private: + const TActorId Owner; + const TString DiskId; + const TString ClientId; + const ui32 VolumeGeneration; + const TDuration RequestTimeout; + TVector Devices; + bool MuteIOErrors; + NLog::EComponent Component; + + int PendingRequests = 0; + + TVector SentReleaseRequests; + +public: + TReleaseDevicesActor( + const TActorId& owner, + TString diskId, + TString clientId, + ui32 volumeGeneration, + TDuration requestTimeout, + TVector devices, + bool muteIoErrors, + NLog::EComponent component); + + void Bootstrap(const TActorContext& ctx); + +private: + void PrepareRequest(NProto::TReleaseDevicesRequest& request); + void ReplyAndDie(const TActorContext& ctx, NProto::TError error); + + void OnReleaseResponse( + const TActorContext& ctx, + ui64 cookie, + NProto::TError error); + +private: + STFUNC(StateWork); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); + + void HandleReleaseDevicesResponse( + const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, + const TActorContext& ctx); + + void HandleReleaseDevicesUndelivery( + const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, + const TActorContext& ctx); + + void HandleTimeout( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx); + + TString LogTargets() const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TReleaseDevicesActor::TReleaseDevicesActor( + const TActorId& owner, + TString diskId, + TString clientId, + ui32 volumeGeneration, + TDuration requestTimeout, + TVector devices, + bool muteIOErrors, + NLog::EComponent component) + : Owner(owner) + , DiskId(std::move(diskId)) + , ClientId(std::move(clientId)) + , VolumeGeneration(volumeGeneration) + , RequestTimeout(requestTimeout) + , Devices(std::move(devices)) + , MuteIOErrors(muteIOErrors) + , Component(component) +{} + +void TReleaseDevicesActor::PrepareRequest(NProto::TReleaseDevicesRequest& request) +{ + request.MutableHeaders()->SetClientId(ClientId); + request.SetDiskId(DiskId); + request.SetVolumeGeneration(VolumeGeneration); +} + +void TReleaseDevicesActor::Bootstrap(const TActorContext& ctx) +{ + Become(&TThis::StateWork); + + SortBy(Devices, [] (auto& d) { + return d.GetNodeId(); + }); + + auto it = Devices.begin(); + while (it != Devices.end()) { + auto request = + std::make_unique(); + NProto::TReleaseDevicesRequest requestCopy; + PrepareRequest(request->Record); + PrepareRequest(requestCopy); + + const ui32 nodeId = it->GetNodeId(); + const TString& agentId = it->GetAgentId(); + + for (; it != Devices.end() && it->GetNodeId() == nodeId; ++it) { + *request->Record.AddDeviceUUIDs() = it->GetDeviceUUID(); + *requestCopy.AddDeviceUUIDs() = it->GetDeviceUUID(); + } + + ++PendingRequests; + SentReleaseRequests.emplace_back(agentId, std::move(requestCopy)); + NCloud::Send( + ctx, + MakeDiskAgentServiceId(nodeId), + std::move(request), + nodeId); + } + + ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup()); +} + +void TReleaseDevicesActor::ReplyAndDie(const TActorContext& ctx, NProto::TError error) +{ + NCloud::Send( + ctx, + Owner, + std::make_unique( + DiskId, + ClientId, + std::move(SentReleaseRequests), + std::move(error))); + + Die(ctx); +} + +void TReleaseDevicesActor::OnReleaseResponse( + const TActorContext& ctx, + ui64 cookie, + NProto::TError error) +{ + Y_ABORT_UNLESS(PendingRequests > 0); + + if (HasError(error)) { + LOG_LOG( + ctx, + MuteIOErrors ? NLog::PRI_WARN : NLog::PRI_ERROR, + Component, + "ReleaseDevices %s error: %s, %llu", + LogTargets().c_str(), + FormatError(error).c_str(), + cookie); + } + + if (--PendingRequests == 0) { + ReplyAndDie(ctx, {}); + } +} + +void TReleaseDevicesActor::HandleReleaseDevicesResponse( + const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, + const TActorContext& ctx) +{ + OnReleaseResponse(ctx, ev->Cookie, ev->Get()->GetError()); +} + +void TReleaseDevicesActor::HandleReleaseDevicesUndelivery( + const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, + const TActorContext& ctx) +{ + OnReleaseResponse(ctx, ev->Cookie, MakeError(E_REJECTED, "not delivered")); +} + +void TReleaseDevicesActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + ReplyAndDie(ctx, MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); +} + +void TReleaseDevicesActor::HandleTimeout( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + const auto err = TStringBuilder() + << "TReleaseDevicesActor timeout." + << " DiskId: " << DiskId + << " ClientId: " << ClientId + << " Targets: " << LogTargets() + << " VolumeGeneration: " << VolumeGeneration + << " PendingRequests: " << PendingRequests; + + LOG_WARN(ctx, Component, err); + + ReplyAndDie(ctx, MakeError(E_TIMEOUT, err)); +} + +STFUNC(TReleaseDevicesActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvents::TEvWakeup, HandleTimeout); + + HFunc(TEvDiskAgent::TEvReleaseDevicesResponse, + HandleReleaseDevicesResponse); + HFunc(TEvDiskAgent::TEvReleaseDevicesRequest, + HandleReleaseDevicesUndelivery); + + default: + HandleUnexpectedEvent(ev, Component); + break; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +TString TReleaseDevicesActor::LogTargets() const +{ + return LogDevices(Devices); +} + +} // namespace + +TActorId ReleaseDevices( + const TActorContext& ctx, + const TActorId& owner, + TString diskId, + TString clientId, + ui32 volumeGeneration, + TDuration requestTimeout, + TVector devices, + bool muteIOErrors, + NActors::NLog::EComponent component) +{ + return NCloud::Register( + ctx, + owner, + std::move(diskId), + std::move(clientId), + volumeGeneration, + requestTimeout, + std::move(devices), + muteIOErrors, + component); +} +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices \ No newline at end of file diff --git a/cloud/blockstore/libs/storage/core/ya.make b/cloud/blockstore/libs/storage/core/ya.make index 6467cd97190..1ca815aaee8 100644 --- a/cloud/blockstore/libs/storage/core/ya.make +++ b/cloud/blockstore/libs/storage/core/ya.make @@ -3,6 +3,7 @@ LIBRARY() GENERATE_ENUM_SERIALIZATION(mount_token.h) SRCS( + acquire_devices_actor.cpp block_handler.cpp compaction_map.cpp compaction_options.cpp @@ -19,6 +20,7 @@ SRCS( pending_request.cpp probes.cpp proto_helpers.cpp + release_devices_actor.cpp request_buffer.cpp request_info.cpp storage_request_counters.cpp diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp index 5f251d85cb0..efd2a4ca6d0 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp @@ -188,6 +188,24 @@ void TDiskRegistryActor::BeforeDie(const NActors::TActorContext& ctx) MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); } PendingDiskDeallocationRequests.clear(); + + for (auto& [actorId, requestInfo]: PendingAcquireDiskRequests) { + NCloud::Reply( + ctx, + *requestInfo, + std::make_unique( + MakeTabletIsDeadError(E_REJECTED, __LOCATION__))); + } + PendingAcquireDiskRequests.clear(); + + for (auto& [actorId, requestInfo]: PendingReleaseDiskRequests) { + NCloud::Reply( + ctx, + *requestInfo, + std::make_unique( + MakeTabletIsDeadError(E_REJECTED, __LOCATION__))); + } + PendingReleaseDiskRequests.clear(); } void TDiskRegistryActor::OnDetach(const TActorContext& ctx) @@ -708,6 +726,14 @@ STFUNC(TDiskRegistryActor::StateWork) TEvDiskRegistryPrivate::TEvDiskRegistryAgentListExpiredParamsCleanup, TDiskRegistryActor::HandleDiskRegistryAgentListExpiredParamsCleanup); + HFunc( + NAcquireReleaseDevices::TEvDevicesAcquireFinished, + HandleDevicesAcquireFinished); + + HFunc( + NAcquireReleaseDevices::TEvDevicesReleaseFinished, + HandleDevicesReleaseFinished); + default: if (!HandleRequests(ev) && !HandleDefaultEvents(ev, SelfId())) { HandleUnexpectedEvent(ev, TBlockStoreComponents::DISK_REGISTRY); @@ -879,21 +905,9 @@ bool ToLogicalBlocks(NProto::TDeviceConfig& device, ui32 logicalBlockSize) //////////////////////////////////////////////////////////////////////////////// -TString LogDevices(const TVector& devices) -{ - TStringBuilder sb; - sb << "( "; - for (const auto& d: devices) { - sb << d.GetDeviceUUID() << "@" << d.GetAgentId() << " "; - } - sb << ")"; - return sb; -} - -//////////////////////////////////////////////////////////////////////////////// - void TDiskRegistryActor::OnDiskAcquired( - TVector sentAcquireRequests) + TVector + sentAcquireRequests) { for (auto& sentRequest: sentAcquireRequests) { TCachedAcquireRequests& cachedRequests = @@ -906,7 +920,8 @@ void TDiskRegistryActor::OnDiskAcquired( } void TDiskRegistryActor::OnDiskReleased( - const TVector& sentReleaseRequests) + const TVector& + sentReleaseRequests) { auto& acquireCacheByAgentId = State->GetAcquireCacheByAgentId(); for (const auto& [agentId, releaseRequest]: sentReleaseRequests) { diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h index 32a19889b3a..b3739a29a06 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -85,6 +86,9 @@ class TDiskRegistryActor final THashMap> PendingDiskDeallocationRequests; + THashMap PendingAcquireDiskRequests; + THashMap PendingReleaseDiskRequests; + bool BrokenDisksDestructionInProgress = false; bool DisksNotificationInProgress = false; bool UsersNotificationInProgress = false; @@ -243,10 +247,19 @@ class TDiskRegistryActor final void ProcessAutomaticallyReplacedDevices(const NActors::TActorContext& ctx); + void HandleDevicesAcquireFinished( + const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, + const NActors::TActorContext& ctx); void OnDiskAcquired( - TVector sentAcquireRequests); + TVector + sentAcquireRequests); + void HandleDevicesReleaseFinished( + const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, + const NActors::TActorContext& ctx); void OnDiskReleased( - const TVector& sentReleaseRequests); + const TVector< + NAcquireReleaseDevices::TAgentReleaseDevicesCachedRequest>& + sentReleaseRequests); void OnDiskDeallocated(const TDiskId& diskId); void SendCachedAcquireRequestsToAgent( const NActors::TActorContext& ctx, @@ -503,6 +516,5 @@ class TDiskRegistryActor final // BLOCKSTORE_DISK_REGISTRY_COUNTER bool ToLogicalBlocks(NProto::TDeviceConfig& device, ui32 logicalBlockSize); -TString LogDevices(const TVector& devices); } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp new file mode 100644 index 00000000000..715774a235f --- /dev/null +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -0,0 +1,245 @@ +#include "disk_registry_actor.h" + +#include +#include +#include + +#include +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; + +using namespace NKikimr::NTabletFlatExecutor; + +//////////////////////////////////////////////////////////////////////////////// + +void TDiskRegistryActor::HandleAcquireDisk( + const TEvDiskRegistry::TEvAcquireDiskRequest::TPtr& ev, + const TActorContext& ctx) +{ + BLOCKSTORE_DISK_REGISTRY_COUNTER(AcquireDisk); + + const auto* msg = ev->Get(); + + auto clientId = msg->Record.GetHeaders().GetClientId(); + auto diskId = msg->Record.GetDiskId(); + + LOG_DEBUG( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "[%lu] Received AcquireDisk request: " + "DiskId=%s, ClientId=%s, AccessMode=%u, MountSeqNumber=%lu" + ", VolumeGeneration=%u", + TabletID(), + diskId.c_str(), + clientId.c_str(), + static_cast(msg->Record.GetAccessMode()), + msg->Record.GetMountSeqNumber(), + msg->Record.GetVolumeGeneration()); + + TDiskInfo diskInfo; + auto error = State->StartAcquireDisk(diskId, diskInfo); + + if (HasError(error)) { + LOG_ERROR( + ctx, + TBlockStoreComponents::DISK_REGISTRY_WORKER, + "[%s] AcquireDisk %s error: %s", + clientId.c_str(), + diskId.c_str(), + FormatError(error).c_str()); + + NCloud::Reply( + ctx, + *ev, + std::make_unique( + std::move(error))); + return; + } + + State->FilterDevicesAtUnavailableAgents(diskInfo); + + TVector devices = std::move(diskInfo.Devices); + for (auto& migration: diskInfo.Migrations) { + devices.push_back(std::move(*migration.MutableTargetDevice())); + } + for (auto& replica: diskInfo.Replicas) { + devices.insert( + devices.end(), + std::make_move_iterator(replica.begin()), + std::make_move_iterator(replica.end())); + } + + auto actor = NAcquireReleaseDevices::AcquireDevices( + ctx, + ctx.SelfID, + std::move(devices), + std::move(diskId), + std::move(clientId), + msg->Record.GetAccessMode(), + msg->Record.GetMountSeqNumber(), + msg->Record.GetVolumeGeneration(), + Config->GetAgentRequestTimeout(), + /*muteIOErrors=*/false, + TBlockStoreComponents::DISK_REGISTRY); + Actors.insert(actor); + PendingAcquireDiskRequests[actor] = + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); +} + +void TDiskRegistryActor::HandleDevicesAcquireFinished( + const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, + const NActors::TActorContext& ctx) +{ + auto* msg = ev->Get(); + + State->FinishAcquireDisk(msg->DiskId); + + OnDiskAcquired(std::move(msg->SentRequests)); + + auto reqInfo = PendingAcquireDiskRequests.at(ev->Sender); + + auto response = std::make_unique( + std::move(msg->Error)); + + const auto* disk = State->GetDisk(msg->DiskId); + + if (HasError(response->GetError())) { + LOG_ERROR( + ctx, + TBlockStoreComponents::DISK_REGISTRY_WORKER, + "[%s] AcquireDisk %s targets %s error: %s", + msg->ClientId.c_str(), + msg->DiskId.c_str(), + LogDevices(msg->Devices).c_str(), + FormatError(response->GetError()).c_str()); + } else { + response->Record.MutableDevices()->Reserve(msg->Devices.size()); + + for (auto& device: msg->Devices) { + if (disk) { + ToLogicalBlocks(device, disk->LogicalBlockSize); + } + *response->Record.AddDevices() = std::move(device); + } + } + + NCloud::Reply(ctx, *reqInfo, std::move(response)); + Actors.erase(ev->Sender); + PendingAcquireDiskRequests.erase(ev->Sender); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TDiskRegistryActor::HandleReleaseDisk( + const TEvDiskRegistry::TEvReleaseDiskRequest::TPtr& ev, + const TActorContext& ctx) +{ + BLOCKSTORE_DISK_REGISTRY_COUNTER(ReleaseDisk); + + auto replyWithError = [&](auto error) + { + auto response = + std::make_unique( + std::move(error)); + NCloud::Reply(ctx, *ev, std::move(response)); + }; + + auto* msg = ev->Get(); + TString& diskId = *msg->Record.MutableDiskId(); + TString& clientId = *msg->Record.MutableHeaders()->MutableClientId(); + ui32 volumeGeneration = msg->Record.GetVolumeGeneration(); + + LOG_DEBUG( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "[%lu] Received ReleaseDisk request: DiskId=%s, ClientId=%s" + ", VolumeGeneration=%u", + TabletID(), + diskId.c_str(), + clientId.c_str(), + volumeGeneration); + + if (!clientId) { + replyWithError(MakeError(E_ARGUMENT, "empty client id")); + return; + } + + if (!diskId) { + replyWithError(MakeError(E_ARGUMENT, "empty disk id")); + return; + } + + TDiskInfo diskInfo; + const auto error = State->GetDiskInfo(diskId, diskInfo); + if (HasError(error)) { + LOG_ERROR( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "ReleaseDisk %s. GetDiskInfo error: %s", + diskId.c_str(), + FormatError(error).c_str()); + + replyWithError(error); + return; + } + + if (!State->FilterDevicesAtUnavailableAgents(diskInfo)) { + LOG_WARN( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "ReleaseDisk %s. Nothing to release", + diskId.c_str()); + + replyWithError(MakeError(S_ALREADY, {})); + return; + } + + TVector devices = std::move(diskInfo.Devices); + for (auto& migration: diskInfo.Migrations) { + devices.push_back(std::move(*migration.MutableTargetDevice())); + } + for (auto& replica: diskInfo.Replicas) { + for (auto& device: replica) { + devices.push_back(std::move(device)); + } + } + + auto actor = NAcquireReleaseDevices::ReleaseDevices( + ctx, + ctx.SelfID, + std::move(diskId), + std::move(clientId), + volumeGeneration, + Config->GetAgentRequestTimeout(), + std::move(devices), + /*muteIOErrors=*/false, + TBlockStoreComponents::DISK_REGISTRY); + + Actors.insert(actor); + PendingReleaseDiskRequests[actor] = + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); +} + +void TDiskRegistryActor::HandleDevicesReleaseFinished( + const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, + const NActors::TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + OnDiskReleased(msg->SentRequests); + + State->FinishAcquireDisk(msg->DiskId); + auto reqInfo = PendingReleaseDiskRequests.at(ev->Sender); + + auto response = + std::make_unique(msg->Error); + NCloud::Reply(ctx, *reqInfo, std::move(response)); + + Actors.erase(ev->Sender); + PendingReleaseDiskRequests.erase(ev->Sender); +} + +} // namespace NCloud::NBlockStore::NStorage \ No newline at end of file diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp deleted file mode 100644 index 2cff5a6d1a6..00000000000 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp +++ /dev/null @@ -1,373 +0,0 @@ -#include "disk_registry_actor.h" - -#include - -#include - -namespace NCloud::NBlockStore::NStorage { - -using namespace NActors; - -using namespace NKikimr::NTabletFlatExecutor; - -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -class TReleaseDiskActor final - : public TActorBootstrapped -{ -private: - const TActorId Owner; - const TRequestInfoPtr RequestInfo; - const TString DiskId; - const TString ClientId; - const ui32 VolumeGeneration; - const TDuration RequestTimeout; - - TVector Devices; - int PendingRequests = 0; - - TVector SentReleaseRequests; - -public: - TReleaseDiskActor( - const TActorId& owner, - TRequestInfoPtr requestInfo, - TString diskId, - TString clientId, - ui32 volumeGeneration, - TDuration requestTimeout, - TVector devices); - - void Bootstrap(const TActorContext& ctx); - -private: - void PrepareRequest(NProto::TReleaseDevicesRequest& request); - void RemoveDiskSession(const TActorContext& ctx); - void ReplyAndDie(const TActorContext& ctx, NProto::TError error); - - void OnReleaseResponse( - const TActorContext& ctx, - ui64 cookie, - NProto::TError error); - -private: - STFUNC(StateWork); - - void HandlePoisonPill( - const TEvents::TEvPoisonPill::TPtr& ev, - const TActorContext& ctx); - - void HandleReleaseDevicesResponse( - const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, - const TActorContext& ctx); - - void HandleReleaseDevicesUndelivery( - const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, - const TActorContext& ctx); - - void HandleRemoveDiskSessionResponse( - const TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse::TPtr& ev, - const TActorContext& ctx); - - void HandleTimeout( - const TEvents::TEvWakeup::TPtr& ev, - const TActorContext& ctx); - - TString LogTargets() const; -}; - -//////////////////////////////////////////////////////////////////////////////// - -TReleaseDiskActor::TReleaseDiskActor( - const TActorId& owner, - TRequestInfoPtr requestInfo, - TString diskId, - TString clientId, - ui32 volumeGeneration, - TDuration requestTimeout, - TVector devices) - : Owner(owner) - , RequestInfo(std::move(requestInfo)) - , DiskId(std::move(diskId)) - , ClientId(std::move(clientId)) - , VolumeGeneration(volumeGeneration) - , RequestTimeout(requestTimeout) - , Devices(std::move(devices)) -{} - -void TReleaseDiskActor::PrepareRequest(NProto::TReleaseDevicesRequest& request) -{ - request.MutableHeaders()->SetClientId(ClientId); - request.SetDiskId(DiskId); - request.SetVolumeGeneration(VolumeGeneration); -} - -void TReleaseDiskActor::Bootstrap(const TActorContext& ctx) -{ - Become(&TThis::StateWork); - - SortBy(Devices, [] (auto& d) { - return d.GetNodeId(); - }); - - auto it = Devices.begin(); - while (it != Devices.end()) { - auto request = - std::make_unique(); - NProto::TReleaseDevicesRequest requestCopy; - PrepareRequest(request->Record); - PrepareRequest(requestCopy); - - const ui32 nodeId = it->GetNodeId(); - const TString& agentId = it->GetAgentId(); - - for (; it != Devices.end() && it->GetNodeId() == nodeId; ++it) { - *request->Record.AddDeviceUUIDs() = it->GetDeviceUUID(); - *requestCopy.AddDeviceUUIDs() = it->GetDeviceUUID(); - } - - ++PendingRequests; - SentReleaseRequests.emplace_back(agentId, std::move(requestCopy)); - NCloud::Send( - ctx, - MakeDiskAgentServiceId(nodeId), - std::move(request), - nodeId); - } - - ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup()); -} - -void TReleaseDiskActor::RemoveDiskSession(const TActorContext& ctx) -{ - auto request = - std::make_unique( - DiskId, - ClientId, - std::move(SentReleaseRequests)); - - NCloud::Send(ctx, Owner, std::move(request)); -} - -void TReleaseDiskActor::ReplyAndDie(const TActorContext& ctx, NProto::TError error) -{ - auto response = std::make_unique( - std::move(error)); - NCloud::Reply(ctx, *RequestInfo, std::move(response)); - - NCloud::Send( - ctx, - Owner, - std::make_unique()); - - Die(ctx); -} - -void TReleaseDiskActor::OnReleaseResponse( - const TActorContext& ctx, - ui64 cookie, - NProto::TError error) -{ - Y_ABORT_UNLESS(PendingRequests > 0); - - if (HasError(error)) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, - "ReleaseDevices %s error: %s, %llu", - LogTargets().c_str(), - FormatError(error).c_str(), - cookie); - } - - if (--PendingRequests == 0) { - RemoveDiskSession(ctx); - } -} - -void TReleaseDiskActor::HandleReleaseDevicesResponse( - const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, - const TActorContext& ctx) -{ - OnReleaseResponse(ctx, ev->Cookie, ev->Get()->GetError()); -} - -void TReleaseDiskActor::HandleReleaseDevicesUndelivery( - const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, - const TActorContext& ctx) -{ - OnReleaseResponse(ctx, ev->Cookie, MakeError(E_REJECTED, "not delivered")); -} - -void TReleaseDiskActor::HandleRemoveDiskSessionResponse( - const TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse::TPtr& ev, - const TActorContext& ctx) -{ - const auto* msg = ev->Get(); - - ReplyAndDie(ctx, msg->GetError()); -} - -void TReleaseDiskActor::HandlePoisonPill( - const TEvents::TEvPoisonPill::TPtr& ev, - const TActorContext& ctx) -{ - Y_UNUSED(ev); - - ReplyAndDie(ctx, MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); -} - -void TReleaseDiskActor::HandleTimeout( - const TEvents::TEvWakeup::TPtr& ev, - const TActorContext& ctx) -{ - Y_UNUSED(ev); - - const auto err = TStringBuilder() - << "TReleaseDiskActor timeout." - << " DiskId: " << DiskId - << " ClientId: " << ClientId - << " Targets: " << LogTargets() - << " VolumeGeneration: " << VolumeGeneration - << " PendingRequests: " << PendingRequests; - - LOG_WARN(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, err); - - ReplyAndDie(ctx, MakeError(E_TIMEOUT, err)); -} - -STFUNC(TReleaseDiskActor::StateWork) -{ - switch (ev->GetTypeRewrite()) { - HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); - HFunc(TEvents::TEvWakeup, HandleTimeout); - - HFunc(TEvDiskAgent::TEvReleaseDevicesResponse, - HandleReleaseDevicesResponse); - HFunc(TEvDiskAgent::TEvReleaseDevicesRequest, - HandleReleaseDevicesUndelivery); - - HFunc(TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse, - HandleRemoveDiskSessionResponse); - - default: - HandleUnexpectedEvent(ev, TBlockStoreComponents::DISK_REGISTRY_WORKER); - break; - } -} - -//////////////////////////////////////////////////////////////////////////////// - -TString TReleaseDiskActor::LogTargets() const -{ - return LogDevices(Devices); -} - -} // namespace - -//////////////////////////////////////////////////////////////////////////////// - -void TDiskRegistryActor::HandleReleaseDisk( - const TEvDiskRegistry::TEvReleaseDiskRequest::TPtr& ev, - const TActorContext& ctx) -{ - BLOCKSTORE_DISK_REGISTRY_COUNTER(ReleaseDisk); - - auto replyWithError = [&] (auto error) { - auto response = std::make_unique( - std::move(error)); - NCloud::Reply(ctx, *ev, std::move(response)); - }; - - auto* msg = ev->Get(); - TString& diskId = *msg->Record.MutableDiskId(); - TString& clientId = *msg->Record.MutableHeaders()->MutableClientId(); - ui32 volumeGeneration = msg->Record.GetVolumeGeneration(); - - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY, - "[%lu] Received ReleaseDisk request: DiskId=%s, ClientId=%s" - ", VolumeGeneration=%u", - TabletID(), - diskId.c_str(), - clientId.c_str(), - volumeGeneration); - - if (!clientId) { - replyWithError(MakeError(E_ARGUMENT, "empty client id")); - return; - } - - if (!diskId) { - replyWithError(MakeError(E_ARGUMENT, "empty disk id")); - return; - } - - TDiskInfo diskInfo; - const auto error = State->GetDiskInfo(diskId, diskInfo); - if (HasError(error)) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY, - "ReleaseDisk %s. GetDiskInfo error: %s", - diskId.c_str(), - FormatError(error).c_str()); - - replyWithError(error); - return; - } - - if (!State->FilterDevicesAtUnavailableAgents(diskInfo)) { - LOG_WARN(ctx, TBlockStoreComponents::DISK_REGISTRY, - "ReleaseDisk %s. Nothing to release", - diskId.c_str()); - - replyWithError(MakeError(S_ALREADY, {})); - return; - } - - TVector devices = std::move(diskInfo.Devices); - for (auto& migration: diskInfo.Migrations) { - devices.push_back(std::move(*migration.MutableTargetDevice())); - } - for (auto& replica: diskInfo.Replicas) { - for (auto& device: replica) { - devices.push_back(std::move(device)); - } - } - - auto requestInfo = CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext); - - auto actor = NCloud::Register( - ctx, - ctx.SelfID, - std::move(requestInfo), - std::move(diskId), - std::move(clientId), - volumeGeneration, - Config->GetAgentRequestTimeout(), - std::move(devices)); - - Actors.insert(actor); -} - -void TDiskRegistryActor::HandleRemoveDiskSession( - const TEvDiskRegistryPrivate::TEvRemoveDiskSessionRequest::TPtr& ev, - const TActorContext& ctx) -{ - const auto* msg = ev->Get(); - - OnDiskReleased(msg->SentRequests); - - auto requestInfo = CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext); - - State->FinishAcquireDisk(msg->DiskId); - auto response = - std::make_unique(); - NCloud::Reply(ctx, *ev, std::move(response)); -} - -} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h index 1c51f0d77bb..be27a49619b 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -74,19 +75,6 @@ struct TUserNotificationKey //////////////////////////////////////////////////////////////////////////////// -struct TAgentAcquireDevicesCachedRequest -{ - TString AgentId; - NProto::TAcquireDevicesRequest Request; - TInstant RequestTime; -}; - -struct TAgentReleaseDevicesCachedRequest -{ - TString AgentId; - NProto::TReleaseDevicesRequest Request; -}; - struct TCachedAcquireKey { TString DiskId; @@ -104,8 +92,9 @@ struct TCachedAcquireKey } }; -using TCachedAcquireRequests = - TMap; +using TCachedAcquireRequests = TMap< + TCachedAcquireKey, + NAcquireReleaseDevices::TAgentAcquireDevicesCachedRequest>; //////////////////////////////////////////////////////////////////////////////// @@ -179,8 +168,6 @@ using TVolumeConfig = NKikimrBlockStore::TVolumeConfig; xxx(CleanupDisks, __VA_ARGS__) \ xxx(SecureErase, __VA_ARGS__) \ xxx(CleanupDevices, __VA_ARGS__) \ - xxx(FinishAcquireDisk, __VA_ARGS__) \ - xxx(RemoveDiskSession, __VA_ARGS__) \ xxx(DestroyBrokenDisks, __VA_ARGS__) \ xxx(ListBrokenDisks, __VA_ARGS__) \ xxx(NotifyDisks, __VA_ARGS__) \ @@ -204,52 +191,6 @@ using TVolumeConfig = NKikimrBlockStore::TVolumeConfig; struct TEvDiskRegistryPrivate { - // - // FinishAcquireDisk - // - - struct TFinishAcquireDiskRequest - { - TString DiskId; - TString ClientId; - TVector SentRequests; - - TFinishAcquireDiskRequest( - TString diskId, - TString clientId, - TVector sentRequests) - : DiskId(std::move(diskId)) - , ClientId(std::move(clientId)) - , SentRequests(std::move(sentRequests)) - {} - }; - - struct TFinishAcquireDiskResponse - {}; - - // - // RemoveDiskSession - // - - struct TRemoveDiskSessionRequest - { - TString DiskId; - TString ClientId; - TVector SentRequests; - - TRemoveDiskSessionRequest( - TString diskId, - TString clientId, - TVector sentRequests) - : DiskId(std::move(diskId)) - , ClientId(std::move(clientId)) - , SentRequests(std::move(sentRequests)) - {} - }; - - struct TRemoveDiskSessionResponse - {}; - // // CleanupDisks // diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp index 51714d32e53..d581a013bda 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp @@ -3363,6 +3363,12 @@ NProto::EDiskState TDiskRegistryState::GetDiskState(const TDiskId& diskId) const return disk->State; } +const TDiskRegistryState::TDiskState* TDiskRegistryState::GetDisk( + const TDiskId& diskId) const +{ + return Disks.FindPtr(diskId); +} + NProto::TError TDiskRegistryState::GetShadowDiskId( const TDiskId& sourceDiskId, const TCheckpointId& checkpointId, diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h index 3b78b8786fb..99c65de006b 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h @@ -447,6 +447,7 @@ class TDiskRegistryState NProto::TError GetDiskInfo(const TDiskId& diskId, TDiskInfo& diskInfo) const; NProto::EDiskState GetDiskState(const TDiskId& diskId) const; + const TDiskState* GetDisk(const TDiskId& diskId) const; NProto::TError GetShadowDiskId( const TDiskId& sourceDiskId, const TCheckpointId& checkpointId, diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp index a8446be4a7c..bc592ea2e9a 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp @@ -507,7 +507,8 @@ Y_UNIT_TEST_SUITE(TDiskRegistryTest) runtime->SetObserverFunc( [&] (TAutoPtr& event) { switch (event->GetTypeRewrite()) { - case TEvDiskRegistryPrivate::EvFinishAcquireDiskResponse: { + case NAcquireReleaseDevices::TEvDevicesAcquireFinished:: + EventType: { finished = true; break; } @@ -676,10 +677,7 @@ Y_UNIT_TEST_SUITE(TDiskRegistryTest) } { - auto response = diskRegistry.RemoveDiskSession( - "disk-1", - "session-1", - TVector()); + auto response = diskRegistry.ReleaseDisk("disk-1", "session-1"); UNIT_ASSERT(!HasError(response->GetError())); } } diff --git a/cloud/blockstore/libs/storage/disk_registry/testlib/test_env.h b/cloud/blockstore/libs/storage/disk_registry/testlib/test_env.h index 1539d257c0c..2c5d82fe828 100644 --- a/cloud/blockstore/libs/storage/disk_registry/testlib/test_env.h +++ b/cloud/blockstore/libs/storage/disk_registry/testlib/test_env.h @@ -780,18 +780,6 @@ class TDiskRegistryClient std::move(devices)); } - auto CreateRemoveDiskSessionRequest( - TString diskId, - TString clientId, - TVector sentRequests) - { - return std::make_unique< - TEvDiskRegistryPrivate::TEvRemoveDiskSessionRequest>( - std::move(diskId), - std::move(clientId), - std::move(sentRequests)); - } - auto CreateUpdateAgentStatsRequest(NProto::TAgentStats stats) { auto request = std::make_unique(); diff --git a/cloud/blockstore/libs/storage/disk_registry/ya.make b/cloud/blockstore/libs/storage/disk_registry/ya.make index fc8d924270c..49000e240d4 100644 --- a/cloud/blockstore/libs/storage/disk_registry/ya.make +++ b/cloud/blockstore/libs/storage/disk_registry/ya.make @@ -1,7 +1,7 @@ LIBRARY() SRCS( - disk_registry_actor_acquire.cpp + disk_registry_actor_acquire_release.cpp disk_registry_actor_allocate.cpp disk_registry_actor_backup_state.cpp disk_registry_actor_change_disk_device.cpp @@ -34,7 +34,6 @@ SRCS( disk_registry_actor_query_available_storage.cpp disk_registry_actor_register.cpp disk_registry_actor_regular.cpp - disk_registry_actor_release.cpp disk_registry_actor_replace.cpp disk_registry_actor_restore_state.cpp disk_registry_actor_resume_device.cpp From d06b1f4353e886501147937ef69f7dce9c19b64a Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Wed, 15 Jan 2025 17:38:07 +0700 Subject: [PATCH 2/3] issue-2725: add new line at eof --- cloud/blockstore/libs/storage/core/acquire_release_devices.h | 2 +- cloud/blockstore/libs/storage/core/release_devices_actor.cpp | 2 +- .../disk_registry/disk_registry_actor_acquire_release.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cloud/blockstore/libs/storage/core/acquire_release_devices.h b/cloud/blockstore/libs/storage/core/acquire_release_devices.h index 93ebe418234..d9dcdb40216 100644 --- a/cloud/blockstore/libs/storage/core/acquire_release_devices.h +++ b/cloud/blockstore/libs/storage/core/acquire_release_devices.h @@ -104,4 +104,4 @@ TActorId ReleaseDevices( bool muteIOErrors, NActors::NLog::EComponent component); -} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices \ No newline at end of file +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices diff --git a/cloud/blockstore/libs/storage/core/release_devices_actor.cpp b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp index 931724ae91e..1f361cd3cbc 100644 --- a/cloud/blockstore/libs/storage/core/release_devices_actor.cpp +++ b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp @@ -268,4 +268,4 @@ TActorId ReleaseDevices( muteIOErrors, component); } -} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices \ No newline at end of file +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp index 715774a235f..355c21b8f08 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -242,4 +242,4 @@ void TDiskRegistryActor::HandleDevicesReleaseFinished( PendingReleaseDiskRequests.erase(ev->Sender); } -} // namespace NCloud::NBlockStore::NStorage \ No newline at end of file +} // namespace NCloud::NBlockStore::NStorage From 2709160ff5a0a2dccf7b97673832e7133732026f Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Wed, 15 Jan 2025 18:07:31 +0700 Subject: [PATCH 3/3] issue-2725: add direct acquire release device sending from volume --- cloud/blockstore/config/storage.proto | 3 + cloud/blockstore/libs/storage/core/config.cpp | 1 + cloud/blockstore/libs/storage/core/config.h | 1 + .../libs/storage/testlib/disk_agent_mock.h | 22 ++++ .../libs/storage/volume/volume_actor.cpp | 6 + .../libs/storage/volume/volume_actor.h | 27 ++++ .../volume/volume_actor_acquire_release.cpp | 102 +++++++++++++++ .../storage/volume/volume_actor_addclient.cpp | 21 +++- .../volume/volume_actor_removeclient.cpp | 11 ++ .../libs/storage/volume/volume_state.h | 27 ++++ .../libs/storage/volume/volume_state_ut.cpp | 38 ++++++ .../libs/storage/volume/volume_ut.cpp | 116 ++++++++++++++++++ cloud/blockstore/libs/storage/volume/ya.make | 1 + 13 files changed, 373 insertions(+), 3 deletions(-) create mode 100644 cloud/blockstore/libs/storage/volume/volume_actor_acquire_release.cpp diff --git a/cloud/blockstore/config/storage.proto b/cloud/blockstore/config/storage.proto index c7fd81f92d1..d0069aef14a 100644 --- a/cloud/blockstore/config/storage.proto +++ b/cloud/blockstore/config/storage.proto @@ -1083,4 +1083,7 @@ message TStorageServiceConfig // percentage, then the rejection of such agents does not occur - we assume // a connectivity failure in the cluster. optional double DiskRegistryInitialAgentRejectionThreshold = 396; + + // Enabling direct sending AcquireDevices to disk agent. + optional bool UseDirectAcquireReleaseDevicesSending = 397; } diff --git a/cloud/blockstore/libs/storage/core/config.cpp b/cloud/blockstore/libs/storage/core/config.cpp index 827ad3a8a01..87f027da946 100644 --- a/cloud/blockstore/libs/storage/core/config.cpp +++ b/cloud/blockstore/libs/storage/core/config.cpp @@ -521,6 +521,7 @@ TDuration MSeconds(ui32 value) xxx(EncryptionAtRestForDiskRegistryBasedDisksEnabled, bool, false )\ xxx(DisableFullPlacementGroupCountCalculation, bool, false )\ xxx(DiskRegistryInitialAgentRejectionThreshold, double, 50 )\ + xxx(UseDirectAcquireReleaseDevicesSending, bool, false )\ // BLOCKSTORE_STORAGE_CONFIG_RW #define BLOCKSTORE_STORAGE_CONFIG(xxx) \ diff --git a/cloud/blockstore/libs/storage/core/config.h b/cloud/blockstore/libs/storage/core/config.h index 96ed8b1968b..2d10fc8a356 100644 --- a/cloud/blockstore/libs/storage/core/config.h +++ b/cloud/blockstore/libs/storage/core/config.h @@ -622,6 +622,7 @@ class TStorageConfig [[nodiscard]] bool GetDisableFullPlacementGroupCountCalculation() const; [[nodiscard]] double GetDiskRegistryInitialAgentRejectionThreshold() const; + [[nodiscard]] bool GetUseDirectAcquireReleaseDevicesSending() const; }; ui64 GetAllocationUnit( diff --git a/cloud/blockstore/libs/storage/testlib/disk_agent_mock.h b/cloud/blockstore/libs/storage/testlib/disk_agent_mock.h index 35c4fd724cf..8b9ee4d1edd 100644 --- a/cloud/blockstore/libs/storage/testlib/disk_agent_mock.h +++ b/cloud/blockstore/libs/storage/testlib/disk_agent_mock.h @@ -100,6 +100,8 @@ class TDiskAgentMock final HFunc(TEvDiskAgent::TEvZeroDeviceBlocksRequest, HandleZeroDeviceBlocks); HFunc(TEvDiskAgent::TEvChecksumDeviceBlocksRequest, HandleChecksumDeviceBlocks); HFunc(TEvDiskAgent::TEvDirectCopyBlocksRequest, HandleDirectCopyBlocks); + HFunc(TEvDiskAgent::TEvAcquireDevicesRequest, HandleAcquireDevicesRequest); + HFunc(TEvDiskAgent::TEvReleaseDevicesRequest, HandleReleaseDevicesRequest); default: Y_ABORT("Unexpected event %x", ev->GetTypeRewrite()); @@ -315,6 +317,26 @@ class TDiskAgentMock final { State->CreateDirectCopyActorFunc(ev, ctx, SelfId()); } + + void HandleAcquireDevicesRequest( + TEvDiskAgent::TEvAcquireDevicesRequest::TPtr& ev, + const NActors::TActorContext& ctx) + { + auto response = + std::make_unique(); + + Reply(ctx, *ev, std::move(response)); + } + + void HandleReleaseDevicesRequest( + TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, + const NActors::TActorContext& ctx) + { + auto response = + std::make_unique(); + + Reply(ctx, *ev, std::move(response)); + } }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_actor.cpp b/cloud/blockstore/libs/storage/volume/volume_actor.cpp index ac4c911e4fd..d8a6ce85c2c 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor.cpp @@ -1032,6 +1032,9 @@ STFUNC(TVolumeActor::StateWork) HFunc( TEvDiskRegistry::TEvAcquireDiskResponse, HandleAcquireDiskResponse); + HFunc( + NAcquireReleaseDevices::TEvDevicesAcquireFinished, + HandleDevicesAcquireFinished); HFunc( TEvVolumePrivate::TEvAcquireDiskIfNeeded, HandleAcquireDiskIfNeeded); @@ -1040,6 +1043,9 @@ STFUNC(TVolumeActor::StateWork) HFunc( TEvDiskRegistry::TEvReleaseDiskResponse, HandleReleaseDiskResponse); + HFunc( + NAcquireReleaseDevices::TEvDevicesReleaseFinished, + HandleDevicesReleasedFinished); HFunc( TEvDiskRegistry::TEvAllocateDiskResponse, HandleAllocateDiskResponse); diff --git a/cloud/blockstore/libs/storage/volume/volume_actor.h b/cloud/blockstore/libs/storage/volume/volume_actor.h index c44e9873e88..7d6cf64f9c8 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor.h +++ b/cloud/blockstore/libs/storage/volume/volume_actor.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -716,12 +717,26 @@ class TVolumeActor final const TEvDiskRegistry::TEvAcquireDiskResponse::TPtr& ev, const NActors::TActorContext& ctx); + void HandleDevicesAcquireFinishedImpl( + const NProto::TError& error, + const NActors::TActorContext& ctx); + void AcquireDisk( const NActors::TActorContext& ctx, TString clientId, NProto::EVolumeAccessMode accessMode, ui64 mountSeqNumber); + void SendAcquireDevicesToAgents( + TString clientId, + NProto::EVolumeAccessMode accessMode, + ui64 mountSeqNumber, + const NActors::TActorContext& ctx); + + void HandleDevicesAcquireFinished( + const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, + const NActors::TActorContext& ctx); + void AcquireDiskIfNeeded(const NActors::TActorContext& ctx); void ScheduleAcquireDiskIfNeeded(const NActors::TActorContext& ctx); @@ -742,8 +757,20 @@ class TVolumeActor final const TEvDiskRegistry::TEvReleaseDiskResponse::TPtr& ev, const NActors::TActorContext& ctx); + void HandleDevicesReleasedFinishedImpl( + const NProto::TError& error, + const NActors::TActorContext& ctx); + void ReleaseDisk(const NActors::TActorContext& ctx, const TString& clientId); + void SendReleaseDevicesToAgents( + const TString& clientId, + const NActors::TActorContext& ctx); + + void HandleDevicesReleasedFinished( + const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, + const NActors::TActorContext& ctx); + void HandleAllocateDiskResponse( const TEvDiskRegistry::TEvAllocateDiskResponse::TPtr& ev, const NActors::TActorContext& ctx); diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_acquire_release.cpp new file mode 100644 index 00000000000..c549bfc1988 --- /dev/null +++ b/cloud/blockstore/libs/storage/volume/volume_actor_acquire_release.cpp @@ -0,0 +1,102 @@ +#include "volume_actor.h" + +#include +#include +#include + +#include +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; + +using namespace NKikimr::NTabletFlatExecutor; + +//////////////////////////////////////////////////////////////////////////////// + +void TVolumeActor::SendAcquireDevicesToAgents( + TString clientId, + NProto::EVolumeAccessMode accessMode, + ui64 mountSeqNumber, + const TActorContext& ctx) +{ + auto devices = State->GetAllDevicesForAcquireRelease(); + auto actor = NAcquireReleaseDevices::AcquireDevices( + ctx, + ctx.SelfID, + std::move(devices), + State->GetDiskId(), + std::move(clientId), + accessMode, + mountSeqNumber, + Executor()->Generation(), + Config->GetAgentRequestTimeout(), + State->GetMeta().GetMuteIOErrors(), + TBlockStoreComponents::VOLUME); + Actors.insert(actor); +} + +void TVolumeActor::HandleDevicesAcquireFinished( + const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, + const TActorContext& ctx) +{ + HandleDevicesAcquireFinishedImpl(ev->Get()->Error, ctx); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TVolumeActor::SendReleaseDevicesToAgents( + const TString& clientId, + const TActorContext& ctx) +{ + auto returnError = [&](auto error) + { + auto event = + std::make_unique( + NAcquireReleaseDevices::TDevicesReleaseFinished{ + {}, + {}, + {}, + std::move(error), + }); + NCloud::Send(ctx, SelfId(), std::move(event)); + }; + + TString diskId = State->GetDiskId(); + ui32 volumeGeneration = Executor()->Generation(); + + if (!clientId) { + returnError(MakeError(E_ARGUMENT, "empty client id")); + return; + } + + if (!diskId) { + returnError(MakeError(E_ARGUMENT, "empty disk id")); + return; + } + + auto devices = State->GetAllDevicesForAcquireRelease(); + + auto actor = NAcquireReleaseDevices::ReleaseDevices( + ctx, + ctx.SelfID, + std::move(diskId), + clientId, + volumeGeneration, + Config->GetAgentRequestTimeout(), + std::move(devices), + State->GetMeta().GetMuteIOErrors(), + TBlockStoreComponents::VOLUME); + + Actors.insert(actor); +} + +void TVolumeActor::HandleDevicesReleasedFinished( + const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, + const NActors::TActorContext& ctx) +{ + HandleDevicesReleasedFinishedImpl(ev->Get()->Error, ctx); +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_addclient.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_addclient.cpp index dff49881dea..40de30517f8 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_addclient.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_addclient.cpp @@ -40,6 +40,15 @@ void TVolumeActor::AcquireDisk( "Acquiring disk " << State->GetDiskId() ); + if (Config->GetUseDirectAcquireReleaseDevicesSending()) { + SendAcquireDevicesToAgents( + std::move(clientId), + accessMode, + mountSeqNumber, + ctx); + return; + } + auto request = std::make_unique(); request->Record.SetDiskId(State->GetDiskId()); @@ -177,7 +186,13 @@ void TVolumeActor::HandleAcquireDiskResponse( // NOTE: record.GetDevices() contains only the devices located at available // agents auto& record = msg->Record; + HandleDevicesAcquireFinishedImpl(record.GetError(), ctx); +} +void TVolumeActor::HandleDevicesAcquireFinishedImpl( + const NProto::TError& error, + const NActors::TActorContext& ctx) +{ ScheduleAcquireDiskIfNeeded(ctx); if (AcquireReleaseDiskRequests.empty()) { @@ -193,7 +208,7 @@ void TVolumeActor::HandleAcquireDiskResponse( auto& request = AcquireReleaseDiskRequests.front(); auto& cr = request.ClientRequest; - if (HasError(record.GetError())) { + if (HasError(error)) { LOG_DEBUG_S( ctx, TBlockStoreComponents::VOLUME, @@ -201,8 +216,8 @@ void TVolumeActor::HandleAcquireDiskResponse( ); if (cr) { - auto response = std::make_unique( - record.GetError()); + auto response = + std::make_unique(error); response->Record.MutableVolume()->SetDiskId(cr->DiskId); response->Record.SetClientId(cr->GetClientId()); response->Record.SetTabletId(TabletID()); diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_removeclient.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_removeclient.cpp index 27c26445aea..f4794067f97 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_removeclient.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_removeclient.cpp @@ -23,6 +23,10 @@ LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER); void TVolumeActor::ReleaseDisk(const TActorContext& ctx, const TString& clientId) { + if (Config->GetUseDirectAcquireReleaseDevicesSending()) { + SendReleaseDevicesToAgents(clientId, ctx); + return; + } auto request = std::make_unique(); request->Record.SetDiskId(State->GetDiskId()); @@ -42,6 +46,13 @@ void TVolumeActor::HandleReleaseDiskResponse( auto* msg = ev->Get(); auto& record = msg->Record; + HandleDevicesReleasedFinishedImpl(record.GetError(), ctx); +} + +void TVolumeActor::HandleDevicesReleasedFinishedImpl( + const NProto::TError& error, + const NActors::TActorContext& ctx) +{ if (AcquireReleaseDiskRequests.empty()) { LOG_DEBUG_S( ctx, diff --git a/cloud/blockstore/libs/storage/volume/volume_state.h b/cloud/blockstore/libs/storage/volume/volume_state.h index e2c4c0a3201..04fbd47fdfc 100644 --- a/cloud/blockstore/libs/storage/volume/volume_state.h +++ b/cloud/blockstore/libs/storage/volume/volume_state.h @@ -721,6 +721,33 @@ class TVolumeState return Meta.GetResyncNeeded(); } + TVector GetAllDevicesForAcquireRelease() + { + + size_t allDevicesCount = Meta.GetDevices().size(); + for (const auto& replica: Meta.GetReplicas()) { + allDevicesCount += replica.GetDevices().size(); + } + allDevicesCount += GetMeta().GetMigrations().size(); + + TVector resultDevices; + resultDevices.reserve(allDevicesCount); + + for (const auto& device: Meta.GetDevices()) { + resultDevices.emplace_back(device); + } + for (const auto& replica: Meta.GetReplicas()) { + for (const auto& device: replica.GetDevices()) { + resultDevices.emplace_back(device); + } + } + for (const auto& migration: Meta.GetMigrations()) { + resultDevices.emplace_back(migration.GetTargetDevice()); + } + + return resultDevices; + } + private: bool CanPreemptClient( const TString& oldClientId, diff --git a/cloud/blockstore/libs/storage/volume/volume_state_ut.cpp b/cloud/blockstore/libs/storage/volume/volume_state_ut.cpp index 9734b85c262..9d7f20b7ac0 100644 --- a/cloud/blockstore/libs/storage/volume/volume_state_ut.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_state_ut.cpp @@ -11,6 +11,7 @@ #include #include +#include namespace NCloud::NBlockStore::NStorage { @@ -1933,6 +1934,43 @@ Y_UNIT_TEST_SUITE(TVolumeStateTest) UNIT_ASSERT(!state.GetTrackUsedBlocks()); } } + + Y_UNIT_TEST(AcquireDisk) + { + auto volumeState = CreateVolumeState(); + auto meta = volumeState.GetMeta(); + const TInstant oldDate = TInstant::ParseIso8601("2023-08-30"); + meta.MutableVolumeConfig()->SetCreationTs(oldDate.MicroSeconds()); + meta.AddDevices()->SetDeviceUUID("d1"); + meta.AddDevices()->SetDeviceUUID("d2"); + auto& r1 = *meta.AddReplicas(); + r1.AddDevices()->SetDeviceUUID("d3"); + r1.AddDevices()->SetDeviceUUID("d4"); + auto& r2 = *meta.AddReplicas(); + r2.AddDevices()->SetDeviceUUID("d5"); + r2.AddDevices()->SetDeviceUUID("d6"); + + auto deviceMigration = NProto::TDeviceMigration(); + deviceMigration.SetSourceDeviceId("d1"); + *deviceMigration.MutableTargetDevice()->MutableDeviceUUID() = "d7"; + + meta.MutableMigrations()->Add(std::move(deviceMigration)); + volumeState.ResetMeta(meta); + + const THashSet + deviceUUIDSExpected{"d1", "d2", "d3", "d4", "d5", "d6", "d7"}; + + auto devices = volumeState.GetAllDevicesForAcquireRelease(); + auto devicesUUIDS = + devices | std::views::transform([](const auto& el) + { return el.GetDeviceUUID(); }); + + THashSet devicesUUIDSActual( + devicesUUIDS.begin(), + devicesUUIDS.end()); + + UNIT_ASSERT_EQUAL(deviceUUIDSExpected, devicesUUIDSActual); + } } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_ut.cpp b/cloud/blockstore/libs/storage/volume/volume_ut.cpp index 45a725ac498..333ecd6b8bc 100644 --- a/cloud/blockstore/libs/storage/volume/volume_ut.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_ut.cpp @@ -2044,6 +2044,122 @@ Y_UNIT_TEST_SUITE(TVolumeTest) UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 5); } + Y_UNIT_TEST(ShouldSendAquireRequestsDirectlyToDiskAgent) + { + NProto::TStorageServiceConfig config; + config.SetAcquireNonReplicatedDevices(true); + config.SetUseDirectAcquireReleaseDevicesSending(true); + config.SetClientRemountPeriod(2000); + auto state = MakeIntrusive(); + auto runtime = PrepareTestActorRuntime(config, state); + + TVolumeClient volume(*runtime); + volume.UpdateVolumeConfig( + 0, + 0, + 0, + 0, + false, + 1, + NCloud::NProto::STORAGE_MEDIA_SSD_NONREPLICATED, + 1024); + + TVolumeClient writerClient(*runtime); + TVolumeClient readerClient1(*runtime); + TVolumeClient readerClient2(*runtime); + + volume.WaitReady(); + + ui32 acquireRequestsToDiskRegistry = 0; + ui32 readerAcquireRequests = 0; + ui32 writerAcquireRequests = 0; + + runtime->SetObserverFunc( + [&](TAutoPtr& event) + { + if (event->GetTypeRewrite() == + TEvDiskRegistry::EvAcquireDiskRequest) + { + ++acquireRequestsToDiskRegistry; + } + + if (event->GetTypeRewrite() == + TEvDiskAgent::EvAcquireDevicesRequest) + { + auto* msg = + event->Get(); + if (msg->Record.GetAccessMode() == + NProto::VOLUME_ACCESS_READ_ONLY) + { + ++readerAcquireRequests; + } else { + ++writerAcquireRequests; + } + } + + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + runtime->AdvanceCurrentTime(TDuration::Seconds(2)); + runtime->DispatchEvents({}, TDuration::MilliSeconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(acquireRequestsToDiskRegistry, 0); + UNIT_ASSERT_VALUES_EQUAL(writerAcquireRequests, 0); + UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 0); + + auto writer = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + writerClient.AddClient(writer); + + UNIT_ASSERT_VALUES_EQUAL(acquireRequestsToDiskRegistry, 0); + UNIT_ASSERT_VALUES_EQUAL(writerAcquireRequests, 1); + UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 0); + + auto reader1 = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_ONLY, + NProto::VOLUME_MOUNT_REMOTE, + 0); + readerClient1.AddClient(reader1); + + auto reader2 = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_ONLY, + NProto::VOLUME_MOUNT_REMOTE, + 0); + readerClient2.AddClient(reader2); + + UNIT_ASSERT_VALUES_EQUAL(acquireRequestsToDiskRegistry, 0); + UNIT_ASSERT_VALUES_EQUAL(writerAcquireRequests, 1); + UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 2); + + runtime->AdvanceCurrentTime(TDuration::Seconds(2)); + runtime->DispatchEvents({}, TDuration::MilliSeconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(acquireRequestsToDiskRegistry, 0); + UNIT_ASSERT_VALUES_EQUAL(writerAcquireRequests, 2); + UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 4); + + readerClient1.RemoveClient(reader1.GetClientId()); + + runtime->AdvanceCurrentTime(TDuration::Seconds(2)); + runtime->DispatchEvents({}, TDuration::MilliSeconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(acquireRequestsToDiskRegistry, 0); + UNIT_ASSERT_VALUES_EQUAL(writerAcquireRequests, 3); + UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 5); + + writerClient.RemoveClient(writer.GetClientId()); + readerClient2.RemoveClient(reader2.GetClientId()); + + runtime->AdvanceCurrentTime(TDuration::Seconds(2)); + runtime->DispatchEvents({}, TDuration::MilliSeconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(acquireRequestsToDiskRegistry, 0); + UNIT_ASSERT_VALUES_EQUAL(writerAcquireRequests, 3); + UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 5); + } + Y_UNIT_TEST(ShouldPassAllParamsInAcquireDiskRequests) { NProto::TStorageServiceConfig config; diff --git a/cloud/blockstore/libs/storage/volume/ya.make b/cloud/blockstore/libs/storage/volume/ya.make index 77f72624e71..542771970d5 100644 --- a/cloud/blockstore/libs/storage/volume/ya.make +++ b/cloud/blockstore/libs/storage/volume/ya.make @@ -7,6 +7,7 @@ SRCS( volume.cpp volume_actor_addclient.cpp + volume_actor_acquire_release.cpp volume_actor_allocatedisk.cpp volume_actor_change_storage_config.cpp volume_actor_checkpoint.cpp