Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filestore] implement blob compression #2777

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
be85b59
add TBlobCompressionInfo to TMixedBlobMeta
Nov 5, 2024
531c9a5
compress blobs during compaction
Nov 6, 2024
fc03544
add TBlobCompressionInfo to IMixedBlocksVisitor
Nov 7, 2024
a241102
simplify ReadBlob actor
Nov 9, 2024
a088d70
use TBlobCompressionInfo::BlobCompressed() method instead of operator…
Nov 10, 2024
e144e8c
code is simpler if TReadBlobActor uses TVector<TReadBlob::TBlock> ins…
Nov 10, 2024
85cb495
use c_str instead of data in TReadBlobActor
Nov 10, 2024
d3b845c
extract ReadUncompressedResponse function from TReadBlobActor::Handle…
Nov 10, 2024
8f8b14a
use TEvGetQueryInfo::BlockOffsets instead of TReadBlobRequest::Blocks
Nov 11, 2024
0018328
minor tweak
Nov 11, 2024
0d14ad9
Use TReadBlob::TBlock instead of block offset (ui64) in TEvGetQueryInfo
Nov 12, 2024
b55cafc
pass TBlobCompressionInfo to TReadBlobRequest
Nov 12, 2024
c60c2df
further refactoring: rename TEvGetQueryInfo to TQuery and Offset, Siz…
Nov 13, 2024
220c493
decompress data in TReadBlobActor
Nov 17, 2024
5d3a58a
get rid of TQuery::Compressed flag
Nov 23, 2024
6edaf47
introduce TBlobCompressionInfo::TImpl
Nov 23, 2024
2e971d2
TBlobCompressionInfo should use shared_ptr instead of unique_ptr
Dec 12, 2024
3ffcf38
TBlobCompressionInfo should store encoded data using TByteVector
Dec 12, 2024
34f6b6b
split TUncompressedRange and TCompressedRange
SvartMetal Dec 28, 2024
78bfd3e
add EAllocatorTag::BlobCompressionInfo
SvartMetal Dec 28, 2024
ad11b22
fix blob size after compression; implement 'identity' compression alg…
SvartMetal Dec 28, 2024
87e2cb0
add service-kikimr-compression-test
SvartMetal Dec 29, 2024
5f8c7f8
fix bug and implement simple 'whole blob' compression algorithm
SvartMetal Dec 29, 2024
302119c
FlushBytes should respect TBlobCompressionInfo; fix subtle bug with m…
SvartMetal Jan 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cloud/filestore/config/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,6 @@ message TStorageConfig
// Enables directory creation in shards (by default directories are created
// only in the main tablet).
optional bool DirectoryCreationInShardsEnabled = 414;

optional bool BlobCompressionEnabled = 415;
}
1 change: 1 addition & 0 deletions cloud/filestore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ using TAliases = NProto::TStorageConfig::TFilestoreAliases;
xxx(BlobCompressionRate, ui32, 0 )\
xxx(BlobCompressionCodec, TString, "lz4" )\
xxx(BlobCompressionChunkSize, ui32, 80_KB )\
xxx(BlobCompressionEnabled, bool, false )\
\
xxx(MaxZeroCompactionRangesToDeletePerTx, ui32, 10000 )\
xxx(ChannelFreeSpaceThreshold, ui32, 25 )\
Expand Down
2 changes: 2 additions & 0 deletions cloud/filestore/libs/storage/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ class TStorageConfig
bool GetDirectoryCreationInShardsEnabled() const;

bool GetGuestWritebackCacheEnabled() const;

bool GetBlobCompressionEnabled() const;
};

} // namespace NCloud::NFileStore::NStorage
1 change: 1 addition & 0 deletions cloud/filestore/libs/storage/tablet/model/alloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum class EAllocatorTag
NodeIndexCache,
InMemoryNodeIndexCache,
LargeBlocks,
BlobCompressionInfo,

Max
};
Expand Down
16 changes: 14 additions & 2 deletions cloud/filestore/libs/storage/tablet/model/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "block.h"

#include <cloud/filestore/libs/storage/tablet/model/blob_compression.h>

#include <cloud/storage/core/libs/tablet/model/commit.h>
#include <cloud/storage/core/libs/tablet/model/partial_blob_id.h>

Expand All @@ -26,12 +28,17 @@ struct TMixedBlobMeta
{
TPartialBlobId BlobId;
TVector<TBlock> Blocks;
TBlobCompressionInfo BlobCompressionInfo;

TMixedBlobMeta() = default;

TMixedBlobMeta(const TPartialBlobId& blobId, TVector<TBlock> blocks)
TMixedBlobMeta(
const TPartialBlobId& blobId,
TVector<TBlock> blocks,
TBlobCompressionInfo blobCompressionInfo)
: BlobId(blobId)
, Blocks(std::move(blocks))
, BlobCompressionInfo(std::move(blobCompressionInfo))
{}
};

Expand All @@ -46,8 +53,12 @@ struct TMixedBlob: TMixedBlobMeta
TMixedBlob(
const TPartialBlobId& blobId,
TVector<TBlock> blocks,
TBlobCompressionInfo blobCompressionInfo,
TString blobContent)
: TMixedBlobMeta(blobId, std::move(blocks))
: TMixedBlobMeta(
blobId,
std::move(blocks),
std::move(blobCompressionInfo))
, BlobContent(std::move(blobContent))
{}
};
Expand Down Expand Up @@ -96,6 +107,7 @@ struct TCompactionBlob
{
TPartialBlobId BlobId;
TVector<TBlockDataRef> Blocks;
TBlobCompressionInfo BlobCompressionInfo;

TCompactionBlob() = default;

Expand Down
1 change: 1 addition & 0 deletions cloud/filestore/libs/storage/tablet/model/blob_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void TMixedBlobBuilder::CompleteBlob(TRange& range)
Blobs.emplace_back(
TPartialBlobId(), // need to generate BlobId later
std::move(range.Blocks),
TBlobCompressionInfo(),
std::move(range.BlobContent));

++BlobsCount;
Expand Down
162 changes: 162 additions & 0 deletions cloud/filestore/libs/storage/tablet/model/blob_compression.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#include "blob_compression.h"

#include "binary_reader.h"
#include "binary_writer.h"

#include <cloud/filestore/libs/storage/model/block_buffer.h>

#include <library/cpp/blockcodecs/codecs.h>

namespace NCloud::NFileStore::NStorage {

////////////////////////////////////////////////////////////////////////////////

struct TBlobCompressionInfo::TImpl
{
TByteVector Bytes;
ui32 DecompressedBlobSize = 0;
ui32 CompressedBlobSize = 0;

TImpl(
ui32 decompressedBlobSize,
ui32 compressedBlobSize,
IAllocator* alloc)
: Bytes(alloc)
, DecompressedBlobSize(decompressedBlobSize)
, CompressedBlobSize(compressedBlobSize)
{
TBinaryWriter writer(alloc);
writer.Write<ui32>(DecompressedBlobSize);
writer.Write<ui32>(CompressedBlobSize);
Bytes = writer.Finish();
}

explicit TImpl(TByteVector bytes)
: Bytes(std::move(bytes))
{
TBinaryReader reader(Bytes);
DecompressedBlobSize = reader.Read<ui32>();
CompressedBlobSize = reader.Read<ui32>();
}

TCompressedRange CompressedRange(TUncompressedRange range) const
{
Y_UNUSED(range);
return TCompressedRange(0, CompressedBlobSize);
}

const TByteVector& GetEncoded() const
{
return Bytes;
}
};

////////////////////////////////////////////////////////////////////////////////

TBlobCompressionInfo::TBlobCompressionInfo(
ui32 decompressedBlobSize,
ui32 compressedBlobSize,
IAllocator* alloc)
: Impl(new TImpl(decompressedBlobSize, compressedBlobSize, alloc))
{}

TBlobCompressionInfo::TBlobCompressionInfo(TByteVector bytes)
: Impl(new TImpl(std::move(bytes)))
{}

////////////////////////////////////////////////////////////////////////////////

bool TBlobCompressionInfo::BlobCompressed() const
{
return !!Impl;
}

ui32 TBlobCompressionInfo::DecompressedBlobSize() const
{
Y_ABORT_UNLESS(Impl);
return Impl->DecompressedBlobSize;
}

ui32 TBlobCompressionInfo::CompressedBlobSize() const
{
Y_ABORT_UNLESS(Impl);
return Impl->CompressedBlobSize;
}

TCompressedRange TBlobCompressionInfo::CompressedRange(
TUncompressedRange range) const
{
Y_ABORT_UNLESS(Impl);
return Impl->CompressedRange(range);
}

const TByteVector& TBlobCompressionInfo::GetEncoded() const
{
Y_ABORT_UNLESS(Impl);
return Impl->GetEncoded();
}

////////////////////////////////////////////////////////////////////////////////

TBlobCompressionInfo TryCompressBlob(
ui32 chunkSize,
const NBlockCodecs::ICodec* codec,
TString* content,
IAllocator* alloc)
{
Y_ABORT_UNLESS(chunkSize);
Y_ABORT_UNLESS(codec);
Y_ABORT_UNLESS(content);
Y_ABORT_UNLESS(alloc);

const size_t decompressedSize = content->size();

TString out;
codec->Encode(*content, out);
*content = std::move(out);

const size_t compressedSize = content->size();
Y_DEBUG_ABORT_UNLESS(decompressedSize >= compressedSize);

return TBlobCompressionInfo(
static_cast<ui32>(decompressedSize),
static_cast<ui32>(compressedSize),
alloc);
}

////////////////////////////////////////////////////////////////////////////////

void Decompress(
const NBlockCodecs::ICodec* codec,
const TBlobCompressionInfo& blobCompressionInfo,
ui32 blockSize,
const TRope& compressedData,
ui32 compressedDataOffset,
const TVector<TUncompressedBlock>& blocks,
IBlockBuffer* out)
{
Y_ABORT_UNLESS(codec);
Y_ABORT_UNLESS(blobCompressionInfo.BlobCompressed());
Y_ABORT_UNLESS(blockSize);
Y_ABORT_UNLESS(
compressedData.size() == blobCompressionInfo.CompressedBlobSize());
Y_ABORT_UNLESS(compressedDataOffset == 0);
Y_ABORT_UNLESS(out);

TString data = compressedData.ConvertToString();
Y_ABORT_UNLESS(
codec->DecompressedLength(data) == blobCompressionInfo.DecompressedBlobSize());

TString decompressedData;
codec->Decode(data, decompressedData);

for (const auto& block: blocks) {
const ui32 byteOffset = block.BlobOffset * blockSize;
Y_ABORT_UNLESS(byteOffset < decompressedData.size());

TStringBuf view(decompressedData.begin() + byteOffset, blockSize);
out->SetBlock(block.BlockOffset, view);
}
}

} // namespace NCloud::NFileStore::NStorage
139 changes: 139 additions & 0 deletions cloud/filestore/libs/storage/tablet/model/blob_compression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#pragma once

#include "public.h"

#include "alloc.h"

#include <cloud/storage/core/libs/common/block_buffer.h>
#include <cloud/storage/core/libs/common/byte_vector.h>

#include <contrib/ydb/library/actors/util/rope.h>

#include <util/generic/string.h>
#include <util/generic/vector.h>

#include <memory>

namespace NBlockCodecs {

////////////////////////////////////////////////////////////////////////////////

struct ICodec;

} // namespace NBlockCodecs

namespace NCloud::NFileStore::NStorage {

////////////////////////////////////////////////////////////////////////////////

struct TUncompressedRange
{
ui32 Offset = 0;
ui32 Length = 0;

TUncompressedRange() = default;

TUncompressedRange(ui32 offset, ui32 length)
: Offset(offset)
, Length(length)
{}

void Extend(ui32 length)
{
Length += length;
}
};

struct TCompressedRange
{
ui32 Offset = 0;
ui32 Length = 0;

TCompressedRange() = default;

TCompressedRange(ui32 offset, ui32 length)
: Offset(offset)
, Length(length)
{}

ui32 End() const
{
return Offset + Length;
}

void Merge(TCompressedRange other)
{
Offset = Min(Offset, other.Offset);
auto end = Max(End(), other.End());
Length = end - Offset;
}

bool Overlaps(TCompressedRange other) const
{
auto offset = Max(Offset, other.Offset);
auto end = Min(End(), other.End());
return end <= offset;
}
};

////////////////////////////////////////////////////////////////////////////////

class TBlobCompressionInfo
{
private:
struct TImpl;
std::shared_ptr<TImpl> Impl;

public:
TBlobCompressionInfo() = default;

TBlobCompressionInfo(
ui32 decompressedBlobSize,
ui32 compressedBlobSize,
IAllocator* alloc);

explicit TBlobCompressionInfo(TByteVector bytes);

bool BlobCompressed() const;

ui32 DecompressedBlobSize() const;
ui32 CompressedBlobSize() const;

TCompressedRange CompressedRange(TUncompressedRange range) const;

const TByteVector& GetEncoded() const;
};

////////////////////////////////////////////////////////////////////////////////

TBlobCompressionInfo TryCompressBlob(
ui32 chunkSize,
const NBlockCodecs::ICodec* codec,
TString* content,
IAllocator* alloc);

////////////////////////////////////////////////////////////////////////////////

struct TUncompressedBlock
{
ui32 BlobOffset;
ui32 BlockOffset;

TUncompressedBlock(ui32 blobOffset, ui32 blockOffset)
: BlobOffset(blobOffset)
, BlockOffset(blockOffset)
{}
};

struct IBlockBuffer;

void Decompress(
const NBlockCodecs::ICodec* codec,
const TBlobCompressionInfo& blobCompressionInfo,
ui32 blockSize,
const TRope& compressedData,
ui32 compressedDataOffset,
const TVector<TUncompressedBlock>& blocks,
IBlockBuffer* out);

} // namespace NCloud::NFileStore::NStorage
Loading
Loading