Skip to content

Commit

Permalink
add ability to read from certain replica of mirrored disk
Browse files Browse the repository at this point in the history
  • Loading branch information
WilyTiger committed Jan 23, 2025
1 parent a839db1 commit eb6020e
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 16 deletions.
69 changes: 69 additions & 0 deletions cloud/blockstore/apps/client/lib/command_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,75 @@ Y_UNIT_TEST_SUITE(TCommandTest)
UNIT_ASSERT(!ExecuteRequest("readblocks", argv, client));
}

Y_UNIT_TEST(ShouldAddReplicaIndexToReadRequestHeadersIfOptionIsSet)
{
auto client = std::make_shared<TTestService>();

const ui64 volumeBlocksCount = 4096;
TString sessionId = CreateGuidAsString();
TString mountToken = CreateGuidAsString();

client->MountVolumeHandler =
[&](std::shared_ptr<NProto::TMountVolumeRequest> request)
{
UNIT_ASSERT(request->GetDiskId() == DefaultDiskId);
UNIT_ASSERT(
request->GetVolumeMountMode() == NProto::VOLUME_MOUNT_REMOTE);
UNIT_ASSERT(request->GetToken() == mountToken);

NProto::TMountVolumeResponse response;
response.SetSessionId(sessionId);

auto& volume = *response.MutableVolume();
volume.SetDiskId(DefaultDiskId);
volume.SetBlockSize(DefaultBlockSize);
volume.SetBlocksCount(volumeBlocksCount);

return MakeFuture(response);
};
client->UnmountVolumeHandler =
[&](std::shared_ptr<NProto::TUnmountVolumeRequest> request)
{
UNIT_ASSERT(request->GetDiskId() == DefaultDiskId);
UNIT_ASSERT(request->GetSessionId() == sessionId);
return MakeFuture<NProto::TUnmountVolumeResponse>();
};

const ui32 replicaIndex = 3;
bool handlerCalled = false;
client->ReadBlocksLocalHandler =
[&](std::shared_ptr<NProto::TReadBlocksLocalRequest> request)
{
handlerCalled = true;

UNIT_ASSERT(request->GetDiskId() == DefaultDiskId);
UNIT_ASSERT(
request->GetHeaders().GetReplicaIndex() == replicaIndex);

auto guard = request->Sglist.Acquire();
UNIT_ASSERT(guard);
const auto& sglist = guard.Get();

for (ui64 i = 0; i < sglist.size(); ++i) {
auto* dstPtr = const_cast<char*>(sglist[i].Data());
memset(dstPtr, 0, sglist[i].Size());
}

return MakeFuture(NProto::TReadBlocksLocalResponse());
};

TVector<TString> argv;
argv.reserve(5);
argv.emplace_back(GetProgramName());
argv.emplace_back(TStringBuilder() << "--token=" << mountToken);
argv.emplace_back(TStringBuilder() << "--disk-id=" << DefaultDiskId);
argv.emplace_back("--read-all");
argv.emplace_back("--replica-index=3");

UNIT_ASSERT(ExecuteRequest("readblocks", argv, client));
UNIT_ASSERT(handlerCalled);
}

Y_UNIT_TEST(ShouldSplitLargeWriteVolumeRequestIntoSeveralRequests)
{
auto client = std::make_shared<TTestService>();
Expand Down
8 changes: 8 additions & 0 deletions cloud/blockstore/apps/client/lib/read_blocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class TReadBlocksCommand final
ui32 IODepth = 0;
bool ReadAll = false;
bool Repair = false;
ui32 ReplicaIndex = 0;

TLockFreeStack<TReadResponse> ReadyBatches;
TAutoEvent Ready;
Expand Down Expand Up @@ -272,6 +273,10 @@ class TReadBlocksCommand final
.RequiredArgument("NUM")
.StoreResult(&IODepth)
.DefaultValue(1);

Opts.AddLongOption("replica-index", "from which replica(numerate from 1) read data, only for ssd-io disks")
.RequiredArgument("NUM")
.StoreResult(&ReplicaIndex);
}

protected:
Expand Down Expand Up @@ -454,6 +459,9 @@ class TReadBlocksCommand final
request->BlockSize = Volume.GetBlockSize();
request->Sglist = holder.GetGuardedSgList();
PrepareHeaders(*request->MutableHeaders());
if (ReplicaIndex) {
request->MutableHeaders()->SetReplicaIndex(ReplicaIndex);
}

auto future = Session->ReadBlocksLocal(
MakeIntrusive<TCallContext>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,24 +427,52 @@ void TMirrorPartitionActor::ReadBlocks(
return;
}

const auto replicaIndex = record.GetHeaders().GetReplicaIndex();
TSet<TActorId> replicaActorIds;
const ui32 readReplicaCount = Min<ui32>(
Max<ui32>(1, Config->GetMirrorReadReplicaCount()),
State.GetReplicaInfos().size());
for (ui32 i = 0; i < readReplicaCount; ++i) {
TActorId replicaActorId;
const auto error = State.NextReadReplica(blockRange, &replicaActorId);
if (HasError(error)) {
Reply(
ctx,
*requestInfo,
std::make_unique<typename TMethod::TResponse>(error));

return;
if (replicaIndex) {
if (replicaIndex > State.GetReplicaInfos().size()) {
auto response =
std::make_unique<typename TMethod::TResponse>(MakeError(
E_ARGUMENT,
TStringBuilder()
<< "Request " << TMethod::Name
<< " has incorrect ReplicaIndex " << replicaIndex
<< " disk has " << State.GetReplicaInfos().size()
<< "replicas"));
NCloud::Reply(ctx, *ev, std::move(response));
}

if (!replicaActorIds.insert(replicaActorId).second) {
break;
const auto& replicaInfo = State.GetReplicaInfos()[replicaIndex - 1];
if (!replicaInfo.Config->DevicesReadyForReading(blockRange)) {
auto response =
std::make_unique<typename TMethod::TResponse>(MakeError(
E_REJECTED,
TStringBuilder() << "Cannot process " << TMethod::Name
<< " cause replica " << replicaIndex
<< " has not ready devices"));
NCloud::Reply(ctx, *ev, std::move(response));
}
replicaActorIds.insert(State.GetReplicaActors()[replicaIndex - 1]);
} else {
const ui32 readReplicaCount = Min<ui32>(
Max<ui32>(1, Config->GetMirrorReadReplicaCount()),
State.GetReplicaInfos().size());
for (ui32 i = 0; i < readReplicaCount; ++i) {
TActorId replicaActorId;
const auto error =
State.NextReadReplica(blockRange, &replicaActorId);
if (HasError(error)) {
Reply(
ctx,
*requestInfo,
std::make_unique<typename TMethod::TResponse>(error));

return;
}

if (!replicaActorIds.insert(replicaActorId).second) {
break;
}
}
}

Expand Down
36 changes: 36 additions & 0 deletions cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,42 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionTest)
#undef TEST_READ
}

Y_UNIT_TEST(ShouldReadFromSpecifiedReplica)
{
TTestRuntime runtime;
TTestEnv env(runtime);

TPartitionClient client(runtime, env.ActorId);

const auto range1 = TBlockRange64::WithLength(0, 100);
env.WriteMirror(range1, 'X');
env.WriteReplica(0, range1, 'A');
env.WriteReplica(1, range1, 'B');
env.WriteReplica(2, range1, 'C');

{
auto response = client.ReadBlocks(range1, 0);
const auto& blocks = response->Record.GetBlocks();
UNIT_ASSERT_VALUES_EQUAL(100, blocks.BuffersSize());
for (ui32 i = 0; i < 100; ++i) {
UNIT_ASSERT_VALUES_EQUAL(
TString(DefaultBlockSize, 'A'),
blocks.GetBuffers(i));
}
}

{
auto response = client.ReadBlocks(range1, 2);
const auto& blocks = response->Record.GetBlocks();
UNIT_ASSERT_VALUES_EQUAL(100, blocks.BuffersSize());
for (ui32 i = 0; i < 100; ++i) {
UNIT_ASSERT_VALUES_EQUAL(
TString(DefaultBlockSize, 'B'),
blocks.GetBuffers(i));
}
}
}

struct TMigrationTestRuntime
{
TTestRuntime Runtime;
Expand Down
7 changes: 6 additions & 1 deletion cloud/blockstore/libs/storage/partition_nonrepl/ut_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,16 @@ class TPartitionClient
return std::unique_ptr<TResponse>(handle->Release<TResponse>().Release());
}

auto CreateReadBlocksRequest(const TBlockRange64& blockRange)
auto CreateReadBlocksRequest(
const TBlockRange64& blockRange,
ui32 replicaIndex = 0)
{
auto request = std::make_unique<TEvService::TEvReadBlocksRequest>();
request->Record.SetStartIndex(blockRange.Start);
request->Record.SetBlocksCount(blockRange.Size());
if (replicaIndex) {
request->Record.MutableHeaders()->SetReplicaIndex(replicaIndex);
}

return request;
}
Expand Down
4 changes: 4 additions & 0 deletions cloud/blockstore/public/api/protos/headers.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,8 @@ message THeaders

// This flag controls the optimization of data transmission over the network.
EOptimizeNetworkTransfer OptimizeNetworkTransfer = 10;

// For which replica this request will be performed.
// Only for network-ssd-io-m* disks.
uint32 ReplicaIndex = 11;
}

0 comments on commit eb6020e

Please sign in to comment.