diff --git a/cloud/filestore/config/storage.proto b/cloud/filestore/config/storage.proto index 109d98ea65d..8d6e2a61f99 100644 --- a/cloud/filestore/config/storage.proto +++ b/cloud/filestore/config/storage.proto @@ -505,4 +505,6 @@ message TStorageConfig // Enables directory creation in shards (by default directories are created // only in the main tablet). optional bool DirectoryCreationInShardsEnabled = 414; + + optional bool BlobCompressionEnabled = 415; } diff --git a/cloud/filestore/libs/storage/core/config.cpp b/cloud/filestore/libs/storage/core/config.cpp index bd908877114..8e9c5016b71 100644 --- a/cloud/filestore/libs/storage/core/config.cpp +++ b/cloud/filestore/libs/storage/core/config.cpp @@ -206,6 +206,7 @@ using TAliases = NProto::TStorageConfig::TFilestoreAliases; xxx(BlobCompressionRate, ui32, 0 )\ xxx(BlobCompressionCodec, TString, "lz4" )\ xxx(BlobCompressionChunkSize, ui32, 80_KB )\ + xxx(BlobCompressionEnabled, bool, false )\ \ xxx(MaxZeroCompactionRangesToDeletePerTx, ui32, 10000 )\ xxx(ChannelFreeSpaceThreshold, ui32, 25 )\ diff --git a/cloud/filestore/libs/storage/core/config.h b/cloud/filestore/libs/storage/core/config.h index bf62371b029..6e0428e15ad 100644 --- a/cloud/filestore/libs/storage/core/config.h +++ b/cloud/filestore/libs/storage/core/config.h @@ -301,6 +301,8 @@ class TStorageConfig bool GetDirectoryCreationInShardsEnabled() const; bool GetGuestWritebackCacheEnabled() const; + + bool GetBlobCompressionEnabled() const; }; } // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/model/alloc.h b/cloud/filestore/libs/storage/tablet/model/alloc.h index 6cda70ef34f..059f130ff8e 100644 --- a/cloud/filestore/libs/storage/tablet/model/alloc.h +++ b/cloud/filestore/libs/storage/tablet/model/alloc.h @@ -19,6 +19,7 @@ enum class EAllocatorTag NodeIndexCache, InMemoryNodeIndexCache, LargeBlocks, + BlobCompressionInfo, Max }; diff --git a/cloud/filestore/libs/storage/tablet/model/blob.h b/cloud/filestore/libs/storage/tablet/model/blob.h index 5f1a98eda74..e599f1566ce 100644 --- a/cloud/filestore/libs/storage/tablet/model/blob.h +++ b/cloud/filestore/libs/storage/tablet/model/blob.h @@ -4,6 +4,8 @@ #include "block.h" +#include + #include #include @@ -26,12 +28,17 @@ struct TMixedBlobMeta { TPartialBlobId BlobId; TVector Blocks; + TBlobCompressionInfo BlobCompressionInfo; TMixedBlobMeta() = default; - TMixedBlobMeta(const TPartialBlobId& blobId, TVector blocks) + TMixedBlobMeta( + const TPartialBlobId& blobId, + TVector blocks, + TBlobCompressionInfo blobCompressionInfo) : BlobId(blobId) , Blocks(std::move(blocks)) + , BlobCompressionInfo(std::move(blobCompressionInfo)) {} }; @@ -46,8 +53,12 @@ struct TMixedBlob: TMixedBlobMeta TMixedBlob( const TPartialBlobId& blobId, TVector blocks, + TBlobCompressionInfo blobCompressionInfo, TString blobContent) - : TMixedBlobMeta(blobId, std::move(blocks)) + : TMixedBlobMeta( + blobId, + std::move(blocks), + std::move(blobCompressionInfo)) , BlobContent(std::move(blobContent)) {} }; @@ -96,6 +107,7 @@ struct TCompactionBlob { TPartialBlobId BlobId; TVector Blocks; + TBlobCompressionInfo BlobCompressionInfo; TCompactionBlob() = default; diff --git a/cloud/filestore/libs/storage/tablet/model/blob_builder.cpp b/cloud/filestore/libs/storage/tablet/model/blob_builder.cpp index dcb7ccf1de5..d4b60da0151 100644 --- a/cloud/filestore/libs/storage/tablet/model/blob_builder.cpp +++ b/cloud/filestore/libs/storage/tablet/model/blob_builder.cpp @@ -45,6 +45,7 @@ void TMixedBlobBuilder::CompleteBlob(TRange& range) Blobs.emplace_back( TPartialBlobId(), // need to generate BlobId later std::move(range.Blocks), + TBlobCompressionInfo(), std::move(range.BlobContent)); ++BlobsCount; diff --git a/cloud/filestore/libs/storage/tablet/model/blob_compression.cpp b/cloud/filestore/libs/storage/tablet/model/blob_compression.cpp new file mode 100644 index 00000000000..fab80afd622 --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/model/blob_compression.cpp @@ -0,0 +1,162 @@ +#include "blob_compression.h" + +#include "binary_reader.h" +#include "binary_writer.h" + +#include + +#include + +namespace NCloud::NFileStore::NStorage { + +//////////////////////////////////////////////////////////////////////////////// + +struct TBlobCompressionInfo::TImpl +{ + TByteVector Bytes; + ui32 DecompressedBlobSize = 0; + ui32 CompressedBlobSize = 0; + + TImpl( + ui32 decompressedBlobSize, + ui32 compressedBlobSize, + IAllocator* alloc) + : Bytes(alloc) + , DecompressedBlobSize(decompressedBlobSize) + , CompressedBlobSize(compressedBlobSize) + { + TBinaryWriter writer(alloc); + writer.Write(DecompressedBlobSize); + writer.Write(CompressedBlobSize); + Bytes = writer.Finish(); + } + + explicit TImpl(TByteVector bytes) + : Bytes(std::move(bytes)) + { + TBinaryReader reader(Bytes); + DecompressedBlobSize = reader.Read(); + CompressedBlobSize = reader.Read(); + } + + TCompressedRange CompressedRange(TUncompressedRange range) const + { + Y_UNUSED(range); + return TCompressedRange(0, CompressedBlobSize); + } + + const TByteVector& GetEncoded() const + { + return Bytes; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +TBlobCompressionInfo::TBlobCompressionInfo( + ui32 decompressedBlobSize, + ui32 compressedBlobSize, + IAllocator* alloc) + : Impl(new TImpl(decompressedBlobSize, compressedBlobSize, alloc)) +{} + +TBlobCompressionInfo::TBlobCompressionInfo(TByteVector bytes) + : Impl(new TImpl(std::move(bytes))) +{} + +//////////////////////////////////////////////////////////////////////////////// + +bool TBlobCompressionInfo::BlobCompressed() const +{ + return !!Impl; +} + +ui32 TBlobCompressionInfo::DecompressedBlobSize() const +{ + Y_ABORT_UNLESS(Impl); + return Impl->DecompressedBlobSize; +} + +ui32 TBlobCompressionInfo::CompressedBlobSize() const +{ + Y_ABORT_UNLESS(Impl); + return Impl->CompressedBlobSize; +} + +TCompressedRange TBlobCompressionInfo::CompressedRange( + TUncompressedRange range) const +{ + Y_ABORT_UNLESS(Impl); + return Impl->CompressedRange(range); +} + +const TByteVector& TBlobCompressionInfo::GetEncoded() const +{ + Y_ABORT_UNLESS(Impl); + return Impl->GetEncoded(); +} + +//////////////////////////////////////////////////////////////////////////////// + +TBlobCompressionInfo TryCompressBlob( + ui32 chunkSize, + const NBlockCodecs::ICodec* codec, + TString* content, + IAllocator* alloc) +{ + Y_ABORT_UNLESS(chunkSize); + Y_ABORT_UNLESS(codec); + Y_ABORT_UNLESS(content); + Y_ABORT_UNLESS(alloc); + + const size_t decompressedSize = content->size(); + + TString out; + codec->Encode(*content, out); + *content = std::move(out); + + const size_t compressedSize = content->size(); + Y_DEBUG_ABORT_UNLESS(decompressedSize >= compressedSize); + + return TBlobCompressionInfo( + static_cast(decompressedSize), + static_cast(compressedSize), + alloc); +} + +//////////////////////////////////////////////////////////////////////////////// + +void Decompress( + const NBlockCodecs::ICodec* codec, + const TBlobCompressionInfo& blobCompressionInfo, + ui32 blockSize, + const TRope& compressedData, + ui32 compressedDataOffset, + const TVector& blocks, + IBlockBuffer* out) +{ + Y_ABORT_UNLESS(codec); + Y_ABORT_UNLESS(blobCompressionInfo.BlobCompressed()); + Y_ABORT_UNLESS(blockSize); + Y_ABORT_UNLESS( + compressedData.size() == blobCompressionInfo.CompressedBlobSize()); + Y_ABORT_UNLESS(compressedDataOffset == 0); + Y_ABORT_UNLESS(out); + + TString data = compressedData.ConvertToString(); + Y_ABORT_UNLESS( + codec->DecompressedLength(data) == blobCompressionInfo.DecompressedBlobSize()); + + TString decompressedData; + codec->Decode(data, decompressedData); + + for (const auto& block: blocks) { + const ui32 byteOffset = block.BlobOffset * blockSize; + Y_ABORT_UNLESS(byteOffset < decompressedData.size()); + + TStringBuf view(decompressedData.begin() + byteOffset, blockSize); + out->SetBlock(block.BlockOffset, view); + } +} + +} // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/model/blob_compression.h b/cloud/filestore/libs/storage/tablet/model/blob_compression.h new file mode 100644 index 00000000000..e2479bde0d6 --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/model/blob_compression.h @@ -0,0 +1,139 @@ +#pragma once + +#include "public.h" + +#include "alloc.h" + +#include +#include + +#include + +#include +#include + +#include + +namespace NBlockCodecs { + +//////////////////////////////////////////////////////////////////////////////// + +struct ICodec; + +} // namespace NBlockCodecs + +namespace NCloud::NFileStore::NStorage { + +//////////////////////////////////////////////////////////////////////////////// + +struct TUncompressedRange +{ + ui32 Offset = 0; + ui32 Length = 0; + + TUncompressedRange() = default; + + TUncompressedRange(ui32 offset, ui32 length) + : Offset(offset) + , Length(length) + {} + + void Extend(ui32 length) + { + Length += length; + } +}; + +struct TCompressedRange +{ + ui32 Offset = 0; + ui32 Length = 0; + + TCompressedRange() = default; + + TCompressedRange(ui32 offset, ui32 length) + : Offset(offset) + , Length(length) + {} + + ui32 End() const + { + return Offset + Length; + } + + void Merge(TCompressedRange other) + { + Offset = Min(Offset, other.Offset); + auto end = Max(End(), other.End()); + Length = end - Offset; + } + + bool Overlaps(TCompressedRange other) const + { + auto offset = Max(Offset, other.Offset); + auto end = Min(End(), other.End()); + return end <= offset; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TBlobCompressionInfo +{ +private: + struct TImpl; + std::shared_ptr Impl; + +public: + TBlobCompressionInfo() = default; + + TBlobCompressionInfo( + ui32 decompressedBlobSize, + ui32 compressedBlobSize, + IAllocator* alloc); + + explicit TBlobCompressionInfo(TByteVector bytes); + + bool BlobCompressed() const; + + ui32 DecompressedBlobSize() const; + ui32 CompressedBlobSize() const; + + TCompressedRange CompressedRange(TUncompressedRange range) const; + + const TByteVector& GetEncoded() const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TBlobCompressionInfo TryCompressBlob( + ui32 chunkSize, + const NBlockCodecs::ICodec* codec, + TString* content, + IAllocator* alloc); + +//////////////////////////////////////////////////////////////////////////////// + +struct TUncompressedBlock +{ + ui32 BlobOffset; + ui32 BlockOffset; + + TUncompressedBlock(ui32 blobOffset, ui32 blockOffset) + : BlobOffset(blobOffset) + , BlockOffset(blockOffset) + {} +}; + +struct IBlockBuffer; + +void Decompress( + const NBlockCodecs::ICodec* codec, + const TBlobCompressionInfo& blobCompressionInfo, + ui32 blockSize, + const TRope& compressedData, + ui32 compressedDataOffset, + const TVector& blocks, + IBlockBuffer* out); + +} // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/model/block.h b/cloud/filestore/libs/storage/tablet/model/block.h index a162d24536e..1d29cc3cce9 100644 --- a/cloud/filestore/libs/storage/tablet/model/block.h +++ b/cloud/filestore/libs/storage/tablet/model/block.h @@ -2,6 +2,8 @@ #include "public.h" +#include + #include #include @@ -67,6 +69,7 @@ struct TBlockDataRef: TBlock { TPartialBlobId BlobId; ui32 BlobOffset = 0; + TBlobCompressionInfo BlobCompressionInfo; }; //////////////////////////////////////////////////////////////////////////////// @@ -259,7 +262,8 @@ struct IMixedBlockVisitor virtual void Accept( const TBlock& block, const TPartialBlobId& blobId, - ui32 blobOffset) = 0; + ui32 blobOffset, + const TBlobCompressionInfo& blobCompressionInfo) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/filestore/libs/storage/tablet/model/mixed_blocks.cpp b/cloud/filestore/libs/storage/tablet/model/mixed_blocks.cpp index 8f4e97b7752..2dc6f3a0217 100644 --- a/cloud/filestore/libs/storage/tablet/model/mixed_blocks.cpp +++ b/cloud/filestore/libs/storage/tablet/model/mixed_blocks.cpp @@ -22,16 +22,19 @@ struct TBlobMeta: TIntrusiveListItem { const TPartialBlobId BlobId; const TBlockList BlockList; + const TBlobCompressionInfo BlobCompressionInfo; const TMixedBlobStats Stats; const size_t Level; TBlobMeta( const TPartialBlobId& blobId, TBlockList blockList, + TBlobCompressionInfo blobCompressionInfo, const TMixedBlobStats& stats, size_t level) : BlobId(blobId) , BlockList(std::move(blockList)) + , BlobCompressionInfo(std::move(blobCompressionInfo)) , Stats(stats) , Level(level) {} @@ -156,6 +159,7 @@ bool TMixedBlocks::AddBlocks( ui32 rangeId, const TPartialBlobId& blobId, TBlockList blockList, + TBlobCompressionInfo blobCompressionInfo, const TMixedBlobStats& stats) { auto* range = Impl->Ranges.FindPtr(rangeId); @@ -165,6 +169,7 @@ bool TMixedBlocks::AddBlocks( auto [it, inserted] = range->Blobs.emplace( blobId, std::move(blockList), + std::move(blobCompressionInfo), stats, 0); @@ -236,7 +241,11 @@ void TMixedBlocks::FindBlocks( range->DeletionMarkers.Apply(block); if (commitId < block.MaxCommitId) { - visitor.Accept(block, blob.BlobId, iter->BlobOffset); + visitor.Accept( + block, + blob.BlobId, + iter->BlobOffset, + blob.BlobCompressionInfo); } } } @@ -283,7 +292,10 @@ TVector TMixedBlocks::ApplyDeletionMarkers(ui32 rangeId) const auto blocks = blob.BlockList.DecodeBlocks(); if (range->DeletionMarkers.Apply(MakeArrayRef(blocks)) > 0) { - result.emplace_back(blob.BlobId, std::move(blocks)); + result.emplace_back( + blob.BlobId, + std::move(blocks), + blob.BlobCompressionInfo); } } @@ -304,7 +316,10 @@ auto TMixedBlocks::ApplyDeletionMarkersAndGetMetas(ui32 rangeId) const range->DeletionMarkers.Apply(MakeArrayRef(blocks)) > 0; result.emplace_back( - TMixedBlobMeta{blob.BlobId, std::move(blocks)}, + TMixedBlobMeta( + blob.BlobId, + std::move(blocks), + blob.BlobCompressionInfo), affected); } @@ -323,7 +338,10 @@ TVector TMixedBlocks::GetBlobsForCompaction(ui32 rangeId) const auto blocks = blob.BlockList.DecodeBlocks(); range->DeletionMarkers.Apply(MakeArrayRef(blocks)); - result.emplace_back(blob.BlobId, std::move(blocks)); + result.emplace_back( + blob.BlobId, + std::move(blocks), + blob.BlobCompressionInfo); } return result; @@ -334,15 +352,13 @@ TMixedBlobMeta TMixedBlocks::FindBlob(ui32 rangeId, TPartialBlobId blobId) const const auto* range = Impl->Ranges.FindPtr(rangeId); Y_ABORT_UNLESS(range); - TVector result; - auto it = range->Blobs.find(blobId); Y_ABORT_UNLESS(it != range->Blobs.end()); auto blocks = it->BlockList.DecodeBlocks(); range->DeletionMarkers.Apply(MakeArrayRef(blocks)); - return {it->BlobId, std::move(blocks)}; + return {it->BlobId, std::move(blocks), it->BlobCompressionInfo}; } ui32 TMixedBlocks::CalculateGarbageBlockCount(ui32 rangeId) const diff --git a/cloud/filestore/libs/storage/tablet/model/mixed_blocks.h b/cloud/filestore/libs/storage/tablet/model/mixed_blocks.h index d70c86c9cac..e4ff73b0ebd 100644 --- a/cloud/filestore/libs/storage/tablet/model/mixed_blocks.h +++ b/cloud/filestore/libs/storage/tablet/model/mixed_blocks.h @@ -33,6 +33,7 @@ class TMixedBlocks ui32 rangeId, const TPartialBlobId& blobId, TBlockList blockList, + TBlobCompressionInfo blobCompressionInfo, const TMixedBlobStats& stats = {}); bool RemoveBlocks( diff --git a/cloud/filestore/libs/storage/tablet/model/mixed_blocks_ut.cpp b/cloud/filestore/libs/storage/tablet/model/mixed_blocks_ut.cpp index 03b59d88101..f24da8bed3d 100644 --- a/cloud/filestore/libs/storage/tablet/model/mixed_blocks_ut.cpp +++ b/cloud/filestore/libs/storage/tablet/model/mixed_blocks_ut.cpp @@ -20,9 +20,10 @@ class TMixedBlockVisitor final void Accept( const TBlock& block, const TPartialBlobId& blobId, - ui32 blobOffset) override + ui32 blobOffset, + const TBlobCompressionInfo& blobCompressionInfo) override { - Blocks.push_back({ block, blobId, blobOffset }); + Blocks.push_back({ block, blobId, blobOffset, blobCompressionInfo }); } TVector Finish() @@ -54,7 +55,7 @@ Y_UNIT_TEST_SUITE(TMixedBlocksTest) TMixedBlocks mixedBlocks(TDefaultAllocator::Instance()); mixedBlocks.RefRange(rangeId); - mixedBlocks.AddBlocks(rangeId, TPartialBlobId(), std::move(list)); + mixedBlocks.AddBlocks(rangeId, TPartialBlobId(), std::move(list), {}); { TMixedBlockVisitor visitor; @@ -95,7 +96,7 @@ Y_UNIT_TEST_SUITE(TMixedBlocksTest) TMixedBlocks mixedBlocks(TDefaultAllocator::Instance()); mixedBlocks.RefRange(rangeId); - mixedBlocks.AddBlocks(rangeId, TPartialBlobId(), std::move(list)); + mixedBlocks.AddBlocks(rangeId, TPartialBlobId(), std::move(list), {}); mixedBlocks.AddDeletionMarker( rangeId, @@ -141,7 +142,7 @@ Y_UNIT_TEST_SUITE(TMixedBlocksTest) TMixedBlocks mixedBlocks(TDefaultAllocator::Instance()); mixedBlocks.RefRange(rangeId); - mixedBlocks.AddBlocks(rangeId, TPartialBlobId(), std::move(list)); + mixedBlocks.AddBlocks(rangeId, TPartialBlobId(), std::move(list), {}); UNIT_ASSERT(mixedBlocks.IsLoaded(rangeId)); mixedBlocks.RefRange(rangeId); diff --git a/cloud/filestore/libs/storage/tablet/model/profile_log_events_ut.cpp b/cloud/filestore/libs/storage/tablet/model/profile_log_events_ut.cpp index 308f81cef59..2b8c63f76f7 100644 --- a/cloud/filestore/libs/storage/tablet/model/profile_log_events_ut.cpp +++ b/cloud/filestore/libs/storage/tablet/model/profile_log_events_ut.cpp @@ -235,15 +235,22 @@ Y_UNIT_TEST_SUITE(TProfileLogEvent) const auto oldBlobFirst = TMixedBlob( MakePartialBlobId(1, 1), {TBlock(1, 3, 0, 0), TBlock(1, 5, 0, 0), TBlock(3, 6, 0, 0)}, + TBlobCompressionInfo(), "content_1"); const auto oldBlobSecond = TMixedBlob( MakePartialBlobId(1, 3), {TBlock(3, 10, 0, 0), TBlock(7, 5, 0, 0), TBlock(7, 6, 0, 0)}, + TBlobCompressionInfo(), "content_2"); - const auto emptyBlob = TMixedBlob(MakePartialBlobId(1, 3), {}, ""); + const auto emptyBlob = TMixedBlob( + MakePartialBlobId(1, 3), + {}, + TBlobCompressionInfo(), + ""); const auto newBlob = TMixedBlob( MakePartialBlobId(2, 1), {TBlock(1, 10, 0, 0)}, + TBlobCompressionInfo(), "content_3"); NProto::TProfileLogRequestInfo profileLogRequest; @@ -287,14 +294,20 @@ Y_UNIT_TEST_SUITE(TProfileLogEvent) const ui32 blockSize = 512; const auto oldBlobFirst = TMixedBlobMeta( MakePartialBlobId(1, 1), - {TBlock(1, 3, 0, 0), TBlock(1, 4, 0, 0), TBlock(1, 6, 0, 0)}); - const auto emptyBlob = TMixedBlobMeta(MakePartialBlobId(1, 3), {}); + {TBlock(1, 3, 0, 0), TBlock(1, 4, 0, 0), TBlock(1, 6, 0, 0)}, + TBlobCompressionInfo()); + const auto emptyBlob = TMixedBlobMeta( + MakePartialBlobId(1, 3), + {}, + TBlobCompressionInfo()); const auto oldBlobSecond = TMixedBlobMeta( MakePartialBlobId(1, 3), - {TBlock(2, 4, 0, 0), TBlock(2, 5, 0, 0), TBlock(3, 0, 0, 0)}); + {TBlock(2, 4, 0, 0), TBlock(2, 5, 0, 0), TBlock(3, 0, 0, 0)}, + TBlobCompressionInfo()); const auto newBlob = TMixedBlobMeta( MakePartialBlobId(2, 1), - {TBlock(1, 10, 0, 0)}); + {TBlock(1, 10, 0, 0)}, + TBlobCompressionInfo()); NProto::TProfileLogRequestInfo profileLogRequest; AddBlobsInfo( diff --git a/cloud/filestore/libs/storage/tablet/model/ya.make b/cloud/filestore/libs/storage/tablet/model/ya.make index 1f5dda5c16c..61aa0aaba72 100644 --- a/cloud/filestore/libs/storage/tablet/model/ya.make +++ b/cloud/filestore/libs/storage/tablet/model/ya.make @@ -11,6 +11,7 @@ SRCS( binary_writer.cpp blob.cpp blob_builder.cpp + blob_compression.cpp block.cpp block_list.cpp block_list_decode.cpp diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_addblob.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_addblob.cpp index 7fee541fb47..ef7160c0384 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_addblob.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_addblob.cpp @@ -221,7 +221,11 @@ class TAddBlobsExecutor group.size()); }); - bool written = Tablet.WriteMixedBlocks(db, blob.BlobId, blob.Blocks); + bool written = Tablet.WriteMixedBlocks( + db, + blob.BlobId, + blob.Blocks, + blob.BlobCompressionInfo); if (written) { ui32 rangeId = Tablet.GetMixedRangeIndex(blob.Blocks); AccessCompactionRangeInfo(rangeId).Stats.BlobsCount += 1; @@ -262,7 +266,12 @@ class TAddBlobsExecutor const auto rangeId = Tablet.GetMixedRangeIndex(blob.Blocks); auto& stats = AccessCompactionRangeInfo(rangeId).Stats; - if (Tablet.WriteMixedBlocks(db, blob.BlobId, blob.Blocks)) { + if (Tablet.WriteMixedBlocks( + db, + blob.BlobId, + blob.Blocks, + blob.BlobCompressionInfo) + ) { stats.BlobsCount += 1; // conservative estimate stats.GarbageBlocksCount += blob.Blocks.size(); @@ -301,7 +310,12 @@ class TAddBlobsExecutor for (auto& blob: args.MixedBlobs) { const auto rangeId = Tablet.GetMixedRangeIndex(blob.Blocks); auto& stats = AccessCompactionRangeInfo(rangeId).Stats; - if (Tablet.WriteMixedBlocks(db, blob.BlobId, blob.Blocks)) { + if (Tablet.WriteMixedBlocks( + db, + blob.BlobId, + blob.Blocks, + blob.BlobCompressionInfo) + ) { stats.BlobsCount += 1; // conservative estimate stats.GarbageBlocksCount += blob.Blocks.size(); @@ -341,7 +355,12 @@ class TAddBlobsExecutor THashSet writtenRangeIds; for (auto& blob: args.MixedBlobs) { const auto rangeId = Tablet.GetMixedRangeIndex(blob.Blocks); - if (Tablet.WriteMixedBlocks(db, blob.BlobId, blob.Blocks)) { + if (Tablet.WriteMixedBlocks( + db, + blob.BlobId, + blob.Blocks, + blob.BlobCompressionInfo) + ) { writtenRangeIds.insert(rangeId); } diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_compaction.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_compaction.cpp index e8f452b3e84..66668efa0be 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_compaction.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_compaction.cpp @@ -32,10 +32,14 @@ class TCompactionActor final const ui64 CommitId; const ui32 RangeId; const ui32 BlockSize; + const bool BlobCompressionEnabled; + const ui32 BlobCompressionChunkSize; + const NBlockCodecs::ICodec* BlobCodec; + IAllocator* BlobCompressionInfoAllocator; const IProfileLogPtr ProfileLog; TVector SrcBlobs; - const TVector DstBlobs; + TVector DstBlobs; ui32 OperationSize = 0; THashMap Buffers; @@ -53,6 +57,10 @@ class TCompactionActor final ui64 commitId, ui32 rangeId, ui32 blockSize, + bool blobCompressionEnabled, + ui32 blobCompressionChunkSize, + const NBlockCodecs::ICodec* blobCodec, + IAllocator* blobCompressionInfoAllocator, IProfileLogPtr profileLog, TVector srcBlobs, TVector dstBlobs, @@ -97,6 +105,10 @@ TCompactionActor::TCompactionActor( ui64 commitId, ui32 rangeId, ui32 blockSize, + bool blobCompressionEnabled, + ui32 blobCompressionChunkSize, + const NBlockCodecs::ICodec* blobCodec, + IAllocator* blobCompressionInfoAllocator, IProfileLogPtr profileLog, TVector srcBlobs, TVector dstBlobs, @@ -108,6 +120,10 @@ TCompactionActor::TCompactionActor( , CommitId(commitId) , RangeId(rangeId) , BlockSize(blockSize) + , BlobCompressionEnabled(blobCompressionEnabled) + , BlobCompressionChunkSize(blobCompressionChunkSize) + , BlobCodec(blobCodec) + , BlobCompressionInfoAllocator(blobCompressionInfoAllocator) , ProfileLog(std::move(profileLog)) , SrcBlobs(std::move(srcBlobs)) , DstBlobs(std::move(dstBlobs)) @@ -162,7 +178,10 @@ void TCompactionActor::ReadBlob(const TActorContext& ctx) blocks.size() * BlockSize, BlockSize )); - request->Blobs.emplace_back(blob.BlobId, std::move(blocks)); + request->Blobs.emplace_back( + blob.BlobId, + std::move(blocks), + std::move(blob.BlobCompressionInfo)); request->Blobs.back().Async = true; Buffers[blob.BlobId] = request->Buffer; @@ -196,7 +215,7 @@ void TCompactionActor::WriteBlob(const TActorContext& ctx) RequestInfo->CallContext ); - for (const auto& blob: DstBlobs) { + for (auto& blob: DstBlobs) { TString blobContent(Reserve(BlockSize * blob.Blocks.size())); for (const auto& block: blob.Blocks) { @@ -204,6 +223,18 @@ void TCompactionActor::WriteBlob(const TActorContext& ctx) blobContent.append(buffer->GetBlock(block.BlobOffset)); } + if (BlobCompressionEnabled) { + blob.BlobCompressionInfo = TryCompressBlob( + BlobCompressionChunkSize, + BlobCodec, + &blobContent, + BlobCompressionInfoAllocator); + if (blob.BlobCompressionInfo.BlobCompressed()) { + blob.BlobId.SetBlobSize( + blob.BlobCompressionInfo.CompressedBlobSize()); + } + } + request->Blobs.emplace_back(blob.BlobId, std::move(blobContent)); request->Blobs.back().Async = true; } @@ -239,7 +270,10 @@ void TCompactionActor::AddBlob(const TActorContext& ctx) blocks.emplace_back(block); } - request->MixedBlobs.emplace_back(blob.BlobId, std::move(blocks)); + request->MixedBlobs.emplace_back( + blob.BlobId, + std::move(blocks), + std::move(blob.BlobCompressionInfo)); } NCloud::Send(ctx, Tablet, std::move(request)); @@ -654,7 +688,12 @@ void TIndexTabletActor::CompleteTx_Compaction( } blocks.emplace_back( - TBlockDataRef { block, blob.BlobId, blockOffset++ }); + TBlockDataRef { + block, + blob.BlobId, + blockOffset++, + blob.BlobCompressionInfo + }); } } } @@ -707,6 +746,10 @@ void TIndexTabletActor::CompleteTx_Compaction( args.CommitId, args.RangeId, GetBlockSize(), + Config->GetBlobCompressionEnabled(), + Config->GetBlobCompressionChunkSize(), + BlobCodec, + GetAllocator(EAllocatorTag::BlobCompressionInfo), ProfileLog, std::move(args.CompactionBlobs), std::move(dstBlobs), diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_deletecheckpoint.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_deletecheckpoint.cpp index f25319832e8..f5dee5dcf56 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_deletecheckpoint.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_deletecheckpoint.cpp @@ -227,6 +227,7 @@ void TIndexTabletActor::ExecuteTx_DeleteCheckpoint( TMixedBlobMeta blob { args.MixedBlobs[i].BlobId, args.MixedBlobs[i].BlockList.DecodeBlocks(), + args.MixedBlobs[i].BlobCompressionInfo, }; TMixedBlobStats stats { diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_flush.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_flush.cpp index 7ac50a0b02a..5428b9c8a3e 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_flush.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_flush.cpp @@ -143,7 +143,10 @@ void TFlushActor::AddBlob(const TActorContext& ctx) request->Mode = EAddBlobMode::Flush; for (auto& blob: Blobs) { - request->MixedBlobs.emplace_back(blob.BlobId, std::move(blob.Blocks)); + request->MixedBlobs.emplace_back( + blob.BlobId, + std::move(blob.Blocks), + TBlobCompressionInfo()); } NCloud::Send(ctx, Tablet, std::move(request)); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_flush_bytes.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_flush_bytes.cpp index 248a9202777..5ae31796c7d 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_flush_bytes.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_flush_bytes.cpp @@ -58,7 +58,8 @@ class TReadBlockVisitor final void Accept( const TBlock& block, const TPartialBlobId& blobId, - ui32 blobOffset) override + ui32 blobOffset, + const TBlobCompressionInfo& blobCompressionInfo) override { TABLET_VERIFY(!ApplyingByteLayer); @@ -68,6 +69,7 @@ class TReadBlockVisitor final static_cast(ref) = block; ref.BlobId = blobId; ref.BlobOffset = blobOffset; + ref.BlobCompressionInfo = blobCompressionInfo; Block.Block = std::move(ref); } } @@ -261,7 +263,10 @@ void TFlushBytesActor::ReadBlobs(const TActorContext& ctx) blocks.size() * BlockSize, BlockSize )); - request->Blobs.emplace_back(blobToRead.BlobId, std::move(blocks)); + request->Blobs.emplace_back( + blobToRead.BlobId, + std::move(blocks), + blobToRead.BlobCompressionInfo); request->Blobs.back().Async = true; Buffers[blobToRead.BlobId] = request->Buffer; @@ -385,7 +390,10 @@ void TFlushBytesActor::AddBlob(const TActorContext& ctx) commitId = block.BytesMinCommitId; } - request->MixedBlobs.emplace_back(blob.BlobId, std::move(blocks)); + request->MixedBlobs.emplace_back( + blob.BlobId, + std::move(blocks), + TBlobCompressionInfo() /* uncompressed */); } for (auto& srcBlob: request->SrcBlobs) { @@ -677,8 +685,6 @@ void TIndexTabletActor::CompleteTx_FlushBytes( } }; - - THashMap blockMap; struct TSrcBlobInfo @@ -754,7 +760,11 @@ void TIndexTabletActor::CompleteTx_FlushBytes( if (!srcBlobInfo.SrcBlob.BlobId) { const auto rangeId = GetMixedRangeIndex(bytes.NodeId, blockIndex); srcBlobInfo.SrcBlob = FindBlob(rangeId, ref->BlobId); + srcBlobInfo.SrcBlob.BlobCompressionInfo = ref->BlobCompressionInfo; + srcBlobInfo.SrcBlobToRead.BlobId = ref->BlobId; + srcBlobInfo.SrcBlobToRead.BlobCompressionInfo = + ref->BlobCompressionInfo; } srcBlobInfo.SrcBlobToRead.Blocks.push_back( static_cast(*ref) diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_readblob.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_readblob.cpp index 39b33dd7015..4f485218431 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_readblob.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_readblob.cpp @@ -19,35 +19,103 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -struct TReadBlockRange +struct TQuery { - ui32 BlockOffset = 0; - ui32 Count = 0; + TUncompressedRange UncompressedRange; + TCompressedRange CompressedRange; + TVector Blocks; }; +void MergeOverlappingCompressedQueries(TVector& queries) +{ + Sort(queries, [] (const auto& l, const auto& r) { + return l.CompressedRange.Offset < r.CompressedRange.Offset + || (l.CompressedRange.Offset == r.CompressedRange.Offset + && l.CompressedRange.Length < r.CompressedRange.Length); + }); + + auto pivotQueryIter = queries.begin(); + for (size_t i = 1; i < queries.size(); i++) { + auto& curQuery = queries[i]; + auto& pivotQuery = *pivotQueryIter; + + if (pivotQuery.CompressedRange.Overlaps(curQuery.CompressedRange)) { + pivotQuery.CompressedRange.Merge(curQuery.CompressedRange); + + for (auto& block : curQuery.Blocks) { + pivotQuery.Blocks.push_back(std::move(block)); + } + curQuery.Blocks.clear(); + } else { + ++pivotQueryIter; + } + } + + EraseIf(queries, [] (const auto& query) { return query.Blocks.empty(); }); +} + //////////////////////////////////////////////////////////////////////////////// struct TReadBlobRequest { TActorId Proxy; TLogoBlobID BlobId; - TVector Ranges; + TBlobCompressionInfo BlobCompressionInfo; + TVector Queries; std::unique_ptr Request; TReadBlobRequest( const TActorId& proxy, const TLogoBlobID& blobId, - TVector ranges, + TBlobCompressionInfo blobCompressionInfo, + TVector queries, std::unique_ptr request) : Proxy(proxy) , BlobId(blobId) - , Ranges(std::move(ranges)) + , BlobCompressionInfo(std::move(blobCompressionInfo)) + , Queries(std::move(queries)) , Request(std::move(request)) {} }; //////////////////////////////////////////////////////////////////////////////// +std::unique_ptr CreateGetRequest( + const TLogoBlobID& blobId, + const TVector& queries, + TInstant deadline, + bool async, + bool blobCompressed +) { + using TEvGetQuery = TEvBlobStorage::TEvGet::TQuery; + TArrayHolder qs(new TEvGetQuery[queries.size()]); + + size_t queryIndex = 0; + for (const auto& query : queries) { + if (blobCompressed) { + qs[queryIndex].Set( + blobId, + query.CompressedRange.Offset, + query.CompressedRange.Length); + } else { + qs[queryIndex].Set( + blobId, + query.UncompressedRange.Offset, + query.UncompressedRange.Length); + } + ++queryIndex; + } + + return std::make_unique( + qs, + queries.size(), + deadline, + async ? NKikimrBlobStorage::AsyncRead : NKikimrBlobStorage::FastRead + ); +} + +//////////////////////////////////////////////////////////////////////////////// + TString DumpBlobIds(const TVector& requests) { TStringStream out; @@ -69,9 +137,10 @@ class TReadBlobActor final const TString LogTag; const TActorId Tablet; const TRequestInfoPtr RequestInfo; - const IBlockBufferPtr Buffer; + const IBlockBufferPtr BlockBuffer; const ui32 BlockSize; const TString FileSystemId; + const NBlockCodecs::ICodec* BlobCodec; const IProfileLogPtr ProfileLog; TVector Requests; @@ -87,6 +156,7 @@ class TReadBlobActor final IBlockBufferPtr buffer, ui32 blockSize, TString fileSystemId, + const NBlockCodecs::ICodec* blobCodec, IProfileLogPtr profileLog, TVector requests, NProto::TProfileLogRequestInfo profileLogRequest); @@ -100,6 +170,9 @@ class TReadBlobActor final void HandleGetResult( const TEvBlobStorage::TEvGetResult::TPtr& ev, const TActorContext& ctx); + void ReadQueryResponseUncompressed( + const TRope& responseBuffer, + const TQuery& query); void HandlePoisonPill( const TEvents::TEvPoisonPill::TPtr& ev, @@ -124,15 +197,17 @@ TReadBlobActor::TReadBlobActor( IBlockBufferPtr buffer, ui32 blockSize, TString fileSystemId, + const NBlockCodecs::ICodec* blobCodec, IProfileLogPtr profileLog, TVector requests, NProto::TProfileLogRequestInfo profileLogRequest) : LogTag(std::move(logTag)) , Tablet(tablet) , RequestInfo(std::move(requestInfo)) - , Buffer(std::move(buffer)) + , BlockBuffer(std::move(buffer)) , BlockSize(blockSize) , FileSystemId(std::move(fileSystemId)) + , BlobCodec(blobCodec) , ProfileLog(std::move(profileLog)) , Requests(std::move(requests)) , ProfileLogRequest(std::move(profileLogRequest)) @@ -179,8 +254,10 @@ void TReadBlobActor::HandleGetResult( const auto& request = Requests[requestIndex]; - const auto* rangeIt = request.Ranges.begin(); - TABLET_VERIFY(rangeIt != request.Ranges.end()); + if (msg->ResponseSz != request.Queries.size()) { + ReplyError(ctx, *msg, "invalid number of responses"); + return; + } for (size_t i = 0; i < msg->ResponseSz; ++i) { auto& response = msg->Responses[i]; @@ -190,56 +267,70 @@ void TReadBlobActor::HandleGetResult( return; } - if (response.Id != request.BlobId || - response.Buffer.empty() || - response.Buffer.size() % BlockSize != 0) - { - ReplyError(ctx, *msg, "invalid response received"); + if (response.Id != request.BlobId) { + ReplyError(ctx, *msg, "invalid blob id"); return; } TotalSize += response.Buffer.size(); - ui32 blocksCount = response.Buffer.size() / BlockSize; - ui32 rangeOffset = 0; - - char buffer[BlockSize]; - auto iter = response.Buffer.begin(); - - for (size_t j = 0; j < blocksCount; ++j) { - size_t inRange = j - rangeOffset; - if (inRange >= rangeIt->Count) { - ++rangeIt; - rangeOffset = j; - inRange = 0; - } - TStringBuf view; + const auto& query = request.Queries[i]; - if (iter.ContiguousSize() >= BlockSize) { - view = TStringBuf(iter.ContiguousData(), BlockSize); - iter += BlockSize; - } else { - iter.ExtractPlainDataAndAdvance(buffer, BlockSize); - view = TStringBuf(buffer, BlockSize); + if (request.BlobCompressionInfo.BlobCompressed()) { + if (response.Buffer.size() != query.CompressedRange.Length) { + ReplyError( + ctx, + *msg, + "invalid compressed response buffer size"); + return; } - Buffer->SetBlock( - rangeIt->BlockOffset + inRange, - view); - } + Decompress( + BlobCodec, + request.BlobCompressionInfo, + BlockSize, + response.Buffer, + query.CompressedRange.Offset, + query.Blocks, + BlockBuffer.get()); + } else { + if (response.Buffer.size() / BlockSize != query.Blocks.size()) { + ReplyError(ctx, *msg, "invalid response buffer size"); + return; + } - TABLET_VERIFY(blocksCount - rangeOffset == rangeIt->Count); - ++rangeIt; + ReadQueryResponseUncompressed(response.Buffer, query); + } } - TABLET_VERIFY(rangeIt == request.Ranges.end()); - TABLET_VERIFY(RequestsCompleted < Requests.size()); if (++RequestsCompleted == Requests.size()) { ReplyAndDie(ctx); } } +void TReadBlobActor::ReadQueryResponseUncompressed( + const TRope& responseBuffer, + const TQuery& query) +{ + char buffer[BlockSize]; + auto iter = responseBuffer.begin(); + + for (const auto& block : query.Blocks) { + TStringBuf view; + + if (iter.ContiguousSize() >= BlockSize) { + view = TStringBuf(iter.ContiguousData(), BlockSize); + iter += BlockSize; + } else { + iter.ExtractPlainDataAndAdvance(buffer, BlockSize); + view = TStringBuf(buffer, BlockSize); + } + + BlockBuffer->SetBlock(block.BlockOffset, view); + } +} + void TReadBlobActor::HandlePoisonPill( const TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx) @@ -256,8 +347,8 @@ void TReadBlobActor::ReplyError( LOG_ERROR(ctx, TFileStoreComponents::TABLET, "%s TEvBlobStorage::TEvGet failed: %s\n%s", LogTag.c_str(), - message.data(), - response.Print(false).data()); + message.c_str(), + response.Print(false).c_str()); auto error = MakeError(E_REJECTED, "TEvBlobStorage::TEvGet failed: " + message); ReplyAndDie(ctx, error); @@ -340,69 +431,80 @@ void TIndexTabletActor::HandleReadBlob( msg->CallContext, "ReadBlob"); - ui32 blockSize = GetBlockSize(); + const auto blockSize = GetBlockSize(); TVector requests(Reserve(msg->Blobs.size())); for (auto& blob: msg->Blobs) { - auto blobId = MakeBlobId(TabletID(), blob.BlobId); - - auto proxy = Info()->BSProxyIDForChannel( - blob.BlobId.Channel(), - blob.BlobId.Generation()); - - ui32 blocksCount = blob.Blocks.size(); - - using TEvGetQuery = TEvBlobStorage::TEvGet::TQuery; - - TArrayHolder queries(new TEvGetQuery[blocksCount]); - size_t queriesCount = 0; + struct TBlockRange + { + ui32 BlockOffset = 0; + ui32 BlocksCount = 0; + }; + + auto addRangeToProfileLog = [&](const TBlockRange& range) { + if (range.BlocksCount) { + AddRange( + blob.BlobId.CommitId(), + static_cast(range.BlockOffset) * blockSize, + static_cast(range.BlocksCount) * blockSize, + profileLogRequest); + } + }; - TVector ranges; - ranges.reserve(blocksCount); + TVector queries; + TBlockRange curBlockRange; + const auto blocksCount = blob.Blocks.size(); for (size_t i = 0; i < blocksCount; ++i) { const auto& curBlock = blob.Blocks[i]; + if (i && curBlock.BlobOffset == blob.Blocks[i - 1].BlobOffset + 1) { const auto& prevBlock = blob.Blocks[i - 1]; - // extend range - queries[queriesCount - 1].Size += blockSize; + queries.back().UncompressedRange.Extend(blockSize); + queries.back().Blocks.push_back( + TUncompressedBlock( + curBlock.BlobOffset, + curBlock.BlockOffset)); + if (curBlock.BlockOffset == prevBlock.BlockOffset + 1) { - ++ranges.back().Count; + // extend range + ++curBlockRange.BlocksCount; } else { - ranges.push_back({ - curBlock.BlockOffset, - 1 - }); + addRangeToProfileLog(curBlockRange); + // new range + curBlockRange = { curBlock.BlockOffset, 1 }; } } else { - queries[queriesCount++].Set( - blobId, - blob.Blocks[i].BlobOffset * blockSize, - blockSize); - - ranges.push_back({ - blob.Blocks[i].BlockOffset, - 1 + queries.push_back(TQuery { + .UncompressedRange = TUncompressedRange( + curBlock.BlobOffset * blockSize, + blockSize), + .Blocks = {{ curBlock.BlobOffset, curBlock.BlockOffset }}, }); + + addRangeToProfileLog(curBlockRange); + // new range + curBlockRange = { curBlock.BlockOffset, 1 }; } } - for (const auto& range : ranges) { - AddRange( - blob.BlobId.CommitId(), - static_cast(range.BlockOffset) * blockSize, - static_cast(range.Count) * blockSize, - profileLogRequest); + if (blob.BlobCompressionInfo.BlobCompressed()) { + for (auto& query : queries) { + query.CompressedRange = blob.BlobCompressionInfo.CompressedRange( + query.UncompressedRange); + } + + MergeOverlappingCompressedQueries(queries); } - auto request = std::make_unique( + auto blobId = MakeBlobId(TabletID(), blob.BlobId); + auto request = CreateGetRequest( + blobId, queries, - queriesCount, blob.Deadline, - blob.Async - ? NKikimrBlobStorage::AsyncRead - : NKikimrBlobStorage::FastRead); + blob.Async, + blob.BlobCompressionInfo.BlobCompressed()); if (!msg->CallContext->LWOrbit.Fork(request->Orbit)) { FILESTORE_TRACK( @@ -411,10 +513,15 @@ void TIndexTabletActor::HandleReadBlob( "TEvBlobStorage::TEvGet"); } + auto proxy = Info()->BSProxyIDForChannel( + blob.BlobId.Channel(), + blob.BlobId.Generation()); + requests.emplace_back( proxy, blobId, - std::move(ranges), + std::move(blob.BlobCompressionInfo), + std::move(queries), std::move(request)); } @@ -430,6 +537,7 @@ void TIndexTabletActor::HandleReadBlob( std::move(msg->Buffer), blockSize, GetFileSystemId(), + BlobCodec, ProfileLog, std::move(requests), std::move(profileLogRequest)); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_readdata.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_readdata.cpp index e08814501a1..83d2531d1e7 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_readdata.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_readdata.cpp @@ -212,7 +212,7 @@ class TReadDataVisitor final TABLET_VERIFY(blockOffset < Args.ActualRange().BlockCount()); auto& prev = Args.Blocks[blockOffset]; - if (Update(prev, block, {}, 0)) { + if (Update(prev, block, {}, 0, {})) { Args.Buffer->SetBlock(blockOffset, blockData); } } @@ -220,7 +220,8 @@ class TReadDataVisitor final void Accept( const TBlock& block, const TPartialBlobId& blobId, - ui32 blobOffset) override + ui32 blobOffset, + const TBlobCompressionInfo& blobCompressionInfo) override { TABLET_VERIFY(!ApplyingByteLayer); TABLET_VERIFY(blobId); @@ -229,7 +230,7 @@ class TReadDataVisitor final TABLET_VERIFY(blockOffset < Args.ActualRange().BlockCount()); auto& prev = Args.Blocks[blockOffset]; - if (Update(prev, block, blobId, blobOffset)) { + if (Update(prev, block, blobId, blobOffset, blobCompressionInfo)) { Args.Buffer->ClearBlock(blockOffset); } } @@ -286,12 +287,14 @@ class TReadDataVisitor final TBlockDataRef& prev, const TBlock& block, const TPartialBlobId& blobId, - ui32 blobOffset) + ui32 blobOffset, + const TBlobCompressionInfo blobCompressionInfo) { if (prev.MinCommitId < block.MinCommitId) { memcpy(&prev, &block, sizeof(TBlock)); prev.BlobId = blobId; prev.BlobOffset = blobOffset; + prev.BlobCompressionInfo = blobCompressionInfo; return true; } return false; @@ -395,13 +398,19 @@ void TReadDataActor::Bootstrap(const TActorContext& ctx) void TReadDataActor::ReadBlob(const TActorContext& ctx) { - using TBlocksByBlob = THashMap< + using TBlocksByBlobId = THashMap< TPartialBlobId, TVector, TPartialBlobIdHash >; + using TBlobCompressionInfoByBlobId = THashMap< + TPartialBlobId, + TBlobCompressionInfo, + TPartialBlobIdHash + >; - TBlocksByBlob blocksByBlob; + TBlocksByBlobId blocksByBlobId; + TBlobCompressionInfoByBlobId blobCompressionInfoByBlobId; ui32 blockOffset = 0; for (const auto& block: Blocks) { @@ -411,7 +420,13 @@ void TReadDataActor::ReadBlob(const TActorContext& ctx) continue; } - blocksByBlob[block.BlobId].emplace_back(block.BlobOffset, blockOffset - 1); + blocksByBlobId[block.BlobId].emplace_back( + block.BlobOffset, + blockOffset - 1); + if (block.BlobCompressionInfo.BlobCompressed()) { + blobCompressionInfoByBlobId[block.BlobId] = + block.BlobCompressionInfo; + } } auto request = std::make_unique( @@ -422,9 +437,20 @@ void TReadDataActor::ReadBlob(const TActorContext& ctx) return l.BlobOffset < r.BlobOffset; }; - for (auto& [blobId, blocks]: blocksByBlob) { + for (auto& [blobId, blocks]: blocksByBlobId) { Sort(blocks, comparer); - request->Blobs.emplace_back(blobId, std::move(blocks)); + + TBlobCompressionInfo blobCompressionInfo; + if (auto iter = blobCompressionInfoByBlobId.find(blobId); + iter != blobCompressionInfoByBlobId.end()) + { + blobCompressionInfo = std::move(iter->second); + } + + request->Blobs.emplace_back( + blobId, + std::move(blocks), + std::move(blobCompressionInfo)); } NCloud::Send(ctx, Tablet, std::move(request)); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_writebatch.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_writebatch.cpp index 54a01c5a708..86bb228d02e 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_writebatch.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_writebatch.cpp @@ -156,7 +156,10 @@ void TWriteBatchActor::AddBlob(const TActorContext& ctx) request->WriteRanges = std::move(WriteRanges); for (auto& blob: Blobs) { - request->MixedBlobs.emplace_back(blob.BlobId, std::move(blob.Blocks)); + request->MixedBlobs.emplace_back( + blob.BlobId, + std::move(blob.Blocks), + TBlobCompressionInfo()); } NCloud::Send(ctx, Tablet, std::move(request)); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_writeblob.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_writeblob.cpp index 6684ac7f035..a0b444a398c 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_writeblob.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_writeblob.cpp @@ -297,6 +297,8 @@ void TIndexTabletActor::HandleWriteBlob( return; } + // TODO: this experiment should be superseeded by blob compression + // feature const auto compRate = Config->GetBlobCompressionRate(); if (BlobCodec && compRate && blob.BlobId.GetHash() % compRate == 0) { size_t compressedSize = 0; diff --git a/cloud/filestore/libs/storage/tablet/tablet_database.cpp b/cloud/filestore/libs/storage/tablet/tablet_database.cpp index 39de1bd9d3b..3bb9c5a4aa3 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_database.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_database.cpp @@ -1213,6 +1213,7 @@ void TIndexTabletDatabase::WriteMixedBlocks( ui32 rangeId, const TPartialBlobId& blobId, const TBlockList& blockList, + const TBlobCompressionInfo& blobCompressionInfo, ui32 garbageBlocks, ui32 checkpointBlocks) { @@ -1226,14 +1227,25 @@ void TIndexTabletDatabase::WriteMixedBlocks( blockList.GetEncodedDeletionMarkers().begin(), blockList.GetEncodedDeletionMarkers().end()}; - Table() - .Key(rangeId, blobId.CommitId(), blobId.UniqueId()) - .Update( - NIceDb::TUpdate(encodedBlocks), - NIceDb::TUpdate(encodedDeletionMarkers), - NIceDb::TUpdate(garbageBlocks), - NIceDb::TUpdate(checkpointBlocks) + auto value = Table() + .Key(rangeId, blobId.CommitId(), blobId.UniqueId()); + + value.Update( + NIceDb::TUpdate(encodedBlocks), + NIceDb::TUpdate(encodedDeletionMarkers), + NIceDb::TUpdate(garbageBlocks), + NIceDb::TUpdate(checkpointBlocks) + ); + + if (blobCompressionInfo.BlobCompressed()) { + TStringBuf encodedBlobCompressionInfo{ + blobCompressionInfo.GetEncoded().begin(), + blobCompressionInfo.GetEncoded().end()}; + value.Update( + NIceDb::TUpdate( + encodedBlobCompressionInfo) ); + } } void TIndexTabletDatabase::DeleteMixedBlocks( @@ -1278,9 +1290,16 @@ bool TIndexTabletDatabase::ReadMixedBlocks( TBlockList blockList { std::move(blocks), std::move(deletionMarkers) }; + TBlobCompressionInfo blobCompressionInfo; + if (auto value = it.GetValue()) { + blobCompressionInfo = + TBlobCompressionInfo(FromStringBuf(value, alloc)); + } + blob = TMixedBlob { blobId, std::move(blockList), + std::move(blobCompressionInfo), it.GetValue(), it.GetValue() }; @@ -1319,9 +1338,16 @@ bool TIndexTabletDatabase::ReadMixedBlocks( TBlockList blockList { std::move(blocks), std::move(deletionMarkers) }; + TBlobCompressionInfo blobCompressionInfo; + if (auto value = it.GetValue()) { + blobCompressionInfo = + TBlobCompressionInfo(FromStringBuf(value, alloc)); + } + blobs.emplace_back(TMixedBlob { blobId, std::move(blockList), + std::move(blobCompressionInfo), it.GetValue(), it.GetValue() }); diff --git a/cloud/filestore/libs/storage/tablet/tablet_database.h b/cloud/filestore/libs/storage/tablet/tablet_database.h index dfc967ce440..3b52b12a86c 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_database.h +++ b/cloud/filestore/libs/storage/tablet/tablet_database.h @@ -6,6 +6,7 @@ #include "tablet_state_iface.h" #include +#include #include #include #include @@ -373,6 +374,7 @@ FILESTORE_FILESYSTEM_STATS(FILESTORE_DECLARE_STATS) ui32 rangeId, const TPartialBlobId& blobId, const TBlockList& blockList, + const TBlobCompressionInfo& blobCompressionInfo, ui32 garbageBlocks, ui32 checkpointBlocks); @@ -382,6 +384,7 @@ FILESTORE_FILESYSTEM_STATS(FILESTORE_DECLARE_STATS) { TPartialBlobId BlobId; TBlockList BlockList; + TBlobCompressionInfo BlobCompressionInfo; ui32 GarbageBlocks; ui32 CheckpointBlocks; }; diff --git a/cloud/filestore/libs/storage/tablet/tablet_private.h b/cloud/filestore/libs/storage/tablet/tablet_private.h index 247a2ee4c2a..8d07da8c2c9 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_private.h +++ b/cloud/filestore/libs/storage/tablet/tablet_private.h @@ -105,13 +105,18 @@ struct TReadBlob TPartialBlobId BlobId; TVector Blocks; + TBlobCompressionInfo BlobCompressionInfo; TInstant Deadline = TInstant::Max(); bool Async = false; - TReadBlob(const TPartialBlobId& blobId, TVector blocks) + TReadBlob( + const TPartialBlobId& blobId, + TVector blocks, + TBlobCompressionInfo blobCompressionInfo) : BlobId(blobId) , Blocks(std::move(blocks)) + , BlobCompressionInfo(std::move(blobCompressionInfo)) {} }; diff --git a/cloud/filestore/libs/storage/tablet/tablet_schema.h b/cloud/filestore/libs/storage/tablet/tablet_schema.h index 23f1267abab..675d0978a1d 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_schema.h +++ b/cloud/filestore/libs/storage/tablet/tablet_schema.h @@ -292,6 +292,7 @@ struct TIndexTabletSchema struct DeletionMarkers : Column<5, NKikimr::NScheme::NTypeIds::String> { using Type = TStringBuf; }; struct GarbageBlocksCount : Column<6, NKikimr::NScheme::NTypeIds::Uint32> {}; struct CheckpointBlocksCount : Column<7, NKikimr::NScheme::NTypeIds::Uint32> {}; + struct BlobCompressionInfo : Column<8, NKikimr::NScheme::NTypeIds::String> { using Type = TStringBuf; }; using TKey = TableKey; @@ -302,7 +303,8 @@ struct TIndexTabletSchema Blocks, DeletionMarkers, GarbageBlocksCount, - CheckpointBlocksCount + CheckpointBlocksCount, + BlobCompressionInfo >; using StoragePolicy = TStoragePolicy; diff --git a/cloud/filestore/libs/storage/tablet/tablet_state.h b/cloud/filestore/libs/storage/tablet/tablet_state.h index 337fcaae16d..a9c48fdd4b2 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_state.h +++ b/cloud/filestore/libs/storage/tablet/tablet_state.h @@ -883,7 +883,8 @@ FILESTORE_DUPCACHE_REQUESTS(FILESTORE_DECLARE_DUPCACHE) bool WriteMixedBlocks( TIndexTabletDatabase& db, const TPartialBlobId& blobId, - /*const*/ TVector& blocks); + /*const*/ TVector& blocks, + const TBlobCompressionInfo& blobCompressionInfo); void DeleteMixedBlocks( TIndexTabletDatabase& db, @@ -929,7 +930,8 @@ FILESTORE_DUPCACHE_REQUESTS(FILESTORE_DECLARE_DUPCACHE) TIndexTabletDatabase& db, ui32 rangeId, const TPartialBlobId& blobId, - /*const*/ TVector& blocks); + /*const*/ TVector& blocks, + const TBlobCompressionInfo& blobCompressionInfo); void DeleteMixedBlocks( TIndexTabletDatabase& db, diff --git a/cloud/filestore/libs/storage/tablet/tablet_state_data.cpp b/cloud/filestore/libs/storage/tablet/tablet_state_data.cpp index bb4bfae7f8d..574e2d3c96b 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_state_data.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_state_data.cpp @@ -596,6 +596,7 @@ bool TIndexTabletState::LoadMixedBlocks( rangeId, blob.BlobId, std::move(blob.BlockList), + std::move(blob.BlobCompressionInfo), TMixedBlobStats { blob.GarbageBlocks, blob.CheckpointBlocks @@ -653,7 +654,13 @@ void TIndexTabletState::WriteMixedBlocks( block, blocksCount, GetAllocator(EAllocatorTag::BlockList)); - db.WriteMixedBlocks(rangeId, blobId, blockList, 0, 0); + db.WriteMixedBlocks( + rangeId, + blobId, + blockList, + TBlobCompressionInfo(), // uncompressed + 0, + 0); IncrementMixedBlobsCount(db); IncrementMixedBlocksCount(db, blocksCount); @@ -662,7 +669,8 @@ void TIndexTabletState::WriteMixedBlocks( bool added = Impl->MixedBlocks.AddBlocks( rangeId, blobId, - std::move(blockList)); + std::move(blockList), + TBlobCompressionInfo() /* uncompressed */); TABLET_VERIFY(added); } @@ -674,11 +682,12 @@ void TIndexTabletState::WriteMixedBlocks( bool TIndexTabletState::WriteMixedBlocks( TIndexTabletDatabase& db, const TPartialBlobId& blobId, - /*const*/ TVector& blocks) + /*const*/ TVector& blocks, + const TBlobCompressionInfo& blobCompressionInfo) { ui32 rangeId = GetMixedRangeIndex(blocks); - if (WriteMixedBlocks(db, rangeId, blobId, blocks)) { + if (WriteMixedBlocks(db, rangeId, blobId, blocks, blobCompressionInfo)) { AddNewBlob(db, blobId); return true; @@ -691,7 +700,8 @@ bool TIndexTabletState::WriteMixedBlocks( TIndexTabletDatabase& db, ui32 rangeId, const TPartialBlobId& blobId, - /*const*/ TVector& blocks) + /*const*/ TVector& blocks, + const TBlobCompressionInfo& blobCompressionInfo) { const bool isMixedRangeLoaded = Impl->MixedBlocks.IsLoaded(rangeId); if (isMixedRangeLoaded) { @@ -726,6 +736,7 @@ bool TIndexTabletState::WriteMixedBlocks( rangeId, blobId, blockList, + blobCompressionInfo, rebaseResult.GarbageBlocks, rebaseResult.CheckpointBlocks); @@ -737,6 +748,7 @@ bool TIndexTabletState::WriteMixedBlocks( rangeId, blobId, std::move(blockList), + blobCompressionInfo, TMixedBlobStats { rebaseResult.GarbageBlocks, rebaseResult.CheckpointBlocks @@ -866,7 +878,12 @@ bool TIndexTabletState::UpdateBlockLists( { const auto rangeId = GetMixedRangeIndex(blob.Blocks); DeleteMixedBlocks(db, rangeId, blob.BlobId, blob.Blocks); - return WriteMixedBlocks(db, rangeId, blob.BlobId, blob.Blocks); + return WriteMixedBlocks( + db, + rangeId, + blob.BlobId, + blob.Blocks, + blob.BlobCompressionInfo); } ui32 TIndexTabletState::CleanupBlockDeletions( @@ -898,7 +915,8 @@ ui32 TIndexTabletState::CleanupBlockDeletions( db, rangeId, blob.BlobMeta.BlobId, - blob.BlobMeta.Blocks); + blob.BlobMeta.Blocks, + TBlobCompressionInfo() /* remains unchanged */); if (!written) { ++removedBlobs; } @@ -1026,6 +1044,7 @@ void TIndexTabletState::RewriteMixedBlocks( rangeId, blob.BlobId, blockList, + TBlobCompressionInfo(), // remains unchanged rebaseResult.GarbageBlocks, rebaseResult.CheckpointBlocks); @@ -1037,6 +1056,7 @@ void TIndexTabletState::RewriteMixedBlocks( rangeId, blob.BlobId, std::move(blockList), + blob.BlobCompressionInfo, TMixedBlobStats { rebaseResult.GarbageBlocks, rebaseResult.CheckpointBlocks diff --git a/cloud/filestore/tests/loadtest/service-kikimr-compression-test/nfs-storage.txt b/cloud/filestore/tests/loadtest/service-kikimr-compression-test/nfs-storage.txt new file mode 100644 index 00000000000..afb8a0db68a --- /dev/null +++ b/cloud/filestore/tests/loadtest/service-kikimr-compression-test/nfs-storage.txt @@ -0,0 +1,2 @@ +TwoStageReadEnabled: false +BlobCompressionEnabled: true diff --git a/cloud/filestore/tests/loadtest/service-kikimr-compression-test/read-write-validation.txt b/cloud/filestore/tests/loadtest/service-kikimr-compression-test/read-write-validation.txt new file mode 100644 index 00000000000..a534bc7de81 --- /dev/null +++ b/cloud/filestore/tests/loadtest/service-kikimr-compression-test/read-write-validation.txt @@ -0,0 +1,58 @@ +Tests { + LoadTest { + Name: "smoke" + CreateFileStoreRequest: { + FileSystemId: "smoke" + FolderId: "folder" + CloudId: "cloud" + BlocksCount: 10241024 + BlockSize: 4096 + } + DataLoadSpec { + Actions { + Action: ACTION_READ + Rate: 33 + } + Actions { + Action: ACTION_WRITE + Rate: 33 + } + ReadBytes: 4096 + WriteBytes: 4096 + InitialFileSize: 104857600 + ValidationEnabled: true + AppendPercentage: 10 + } + IODepth: 1 + TestDuration: 60 + } +} +Tests { + LoadTest { + Name: "smoke64" + CreateFileStoreRequest: { + FileSystemId: "smoke64" + FolderId: "folder" + CloudId: "cloud" + BlocksCount: 10241024 + BlockSize: 65536 + } + DataLoadSpec { + Actions { + Action: ACTION_READ + Rate: 33 + } + Actions { + Action: ACTION_WRITE + Rate: 33 + } + ReadBytes: 65536 + WriteBytes: 65536 + InitialFileSize: 104857600 + ValidationEnabled: true + AppendPercentage: 10 + } + IODepth: 1 + TestDuration: 60 + } +} diff --git a/cloud/filestore/tests/loadtest/service-kikimr-compression-test/test.py b/cloud/filestore/tests/loadtest/service-kikimr-compression-test/test.py new file mode 100644 index 00000000000..8d969b27ea2 --- /dev/null +++ b/cloud/filestore/tests/loadtest/service-kikimr-compression-test/test.py @@ -0,0 +1,35 @@ +import os +import pytest + +from cloud.filestore.tests.python.lib.loadtest import run_load_test + +import yatest.common as common + + +class Case(object): + + def __init__(self, name, config_path): + self.name = name + self.config_path = config_path + + +# TODO: make blob compression support two-stage read and move this test to +# service-kikimr-newfeatures-test +TESTS = [ + Case( + "read-write-validation", + "cloud/filestore/tests/loadtest/service-kikimr-compression-test/read-write-validation.txt", + ), +] + + +@pytest.mark.parametrize("test_case", TESTS, ids=[x.name for x in TESTS]) +def test_load(test_case): + test_case.config_path = common.source_path(test_case.config_path) + run_load_test( + test_case.name, + test_case.config_path, + os.getenv("NFS_SERVER_PORT"), + ) + + return None diff --git a/cloud/filestore/tests/loadtest/service-kikimr-compression-test/ya.make b/cloud/filestore/tests/loadtest/service-kikimr-compression-test/ya.make new file mode 100644 index 00000000000..540e4cc2a46 --- /dev/null +++ b/cloud/filestore/tests/loadtest/service-kikimr-compression-test/ya.make @@ -0,0 +1,28 @@ +PY3TEST() + +INCLUDE(${ARCADIA_ROOT}/cloud/filestore/tests/recipes/medium.inc) + +TEST_SRCS( + test.py +) + +DEPENDS( + cloud/filestore/tools/testing/loadtest/bin +) + +DATA( + arcadia/cloud/filestore/tests/loadtest/service-kikimr-compression-test +) + +PEERDIR( + cloud/filestore/tests/python/lib +) + +SET( + NFS_STORAGE_CONFIG_PATCH + cloud/filestore/tests/loadtest/service-kikimr-compression-test/nfs-storage.txt +) + +INCLUDE(${ARCADIA_ROOT}/cloud/filestore/tests/recipes/service-kikimr.inc) + +END() diff --git a/cloud/filestore/tests/loadtest/ya.make b/cloud/filestore/tests/loadtest/ya.make index fd344d597aa..7775cedaadc 100644 --- a/cloud/filestore/tests/loadtest/ya.make +++ b/cloud/filestore/tests/loadtest/ya.make @@ -1,4 +1,5 @@ RECURSE_FOR_TESTS( + service-kikimr-compression-test service-kikimr-auth-test service-kikimr-emergency-test service-kikimr-nemesis-test diff --git a/cloud/storage/core/libs/tablet/model/partial_blob_id.h b/cloud/storage/core/libs/tablet/model/partial_blob_id.h index c7645456e0b..2b611bff1a9 100644 --- a/cloud/storage/core/libs/tablet/model/partial_blob_id.h +++ b/cloud/storage/core/libs/tablet/model/partial_blob_id.h @@ -102,6 +102,11 @@ class TPartialBlobId return UniqueId_.BlobSize; } + void SetBlobSize(ui32 size) + { + UniqueId_.BlobSize = size; + } + ui32 PartId() const { return UniqueId_.PartId;