Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change default compression via CSConfig #12542

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
6 changes: 3 additions & 3 deletions ydb/core/base/appdata_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,15 @@ inline TAppData* AppData(NActors::TActorSystem* actorSystem) {
}

inline bool HasAppData() {
return !!NActors::TlsActivationContext;
return !!NActors::TlsActivationContext && NActors::TlsActivationContext->ExecutorThread.ActorSystem &&
NActors::TlsActivationContext->ExecutorThread.ActorSystem->AppData<TAppData>();
vlad-gogov marked this conversation as resolved.
Show resolved Hide resolved
}

inline TAppData& AppDataVerified() {
Y_ABORT_UNLESS(HasAppData());
auto& actorSystem = NActors::TlsActivationContext->ExecutorThread.ActorSystem;
Y_ABORT_UNLESS(actorSystem);
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
TAppData* const x = actorSystem->AppData<TAppData>();
Y_ABORT_UNLESS(x && x->Magic == TAppData::MagicTag);
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
Y_ABORT_UNLESS(x->Magic == TAppData::MagicTag);
return *x;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/serializer/native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ NKikimr::TConclusionStatus TNativeSerializer::DoDeserializeFromRequest(NYql::TFe
level = levelLocal;
}
}
auto codecPtrStatus = BuildCodec(codec.value_or(Options.codec->compression_type()), level);
auto codecPtrStatus = BuildCodec(codec.value_or(GetDefaultCompressionType()), level);
vlad-gogov marked this conversation as resolved.
Show resolved Hide resolved
if (!codecPtrStatus) {
return codecPtrStatus.GetError();
}
Expand Down
31 changes: 29 additions & 2 deletions ydb/core/formats/arrow/serializer/native.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#pragma once

#include "abstract.h"
#include "parsing.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <ydb/library/accessor/accessor.h>
Expand All @@ -20,6 +23,10 @@ class TNativeSerializer: public ISerializer {
private:
arrow::ipc::IpcWriteOptions Options;

static inline std::shared_ptr<arrow::util::Codec> DefaultCodec() {
return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(arrow::Compression::type::ZSTD, 1));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZSTD/1 это что?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Сжатие по умолчанию, если его не задали в конфигурации CS

}

TConclusion<std::shared_ptr<arrow::util::Codec>> BuildCodec(const arrow::Compression::type& cType, const std::optional<ui32> level) const;
static const inline TFactory::TRegistrator<TNativeSerializer> Registrator = TFactory::TRegistrator<TNativeSerializer>(GetClassNameStatic());
protected:
Expand Down Expand Up @@ -50,10 +57,30 @@ class TNativeSerializer: public ISerializer {

virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) override;

static arrow::Compression::type GetDefaultCompressionType() {
if (!HasAppData() && !AppData()->ColumnShardConfig.HasDefaultCompression()) {
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

поправь условие.
LZ4 по умолчанию

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вернул LZ4 по умолчанию и поправил условию.

return arrow::Compression::ZSTD;
}
return CompressionFromProto(AppData()->ColumnShardConfig.GetDefaultCompression()).value();
}

static std::shared_ptr<arrow::util::Codec> GetDefaultCodec() {
if (!HasAppData() ||
(!AppData()->ColumnShardConfig.HasDefaultCompression() && !AppData()->ColumnShardConfig.HasDefaultCompressionLevel())) {
return DefaultCodec();
}
arrow::Compression::type codec = GetDefaultCompressionType();
if (AppData()->ColumnShardConfig.HasDefaultCompressionLevel()) {
return NArrow::TStatusValidator::GetValid(
arrow::util::Codec::Create(codec, AppData()->ColumnShardConfig.GetDefaultCompressionLevel()));
}
return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec));
}

static arrow::ipc::IpcOptions BuildDefaultOptions() {
arrow::ipc::IpcWriteOptions options;
options.use_threads = false;
options.codec = *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME);
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
options.codec = GetDefaultCodec();
vlad-gogov marked this conversation as resolved.
Show resolved Hide resolved
return options;
}

Expand Down Expand Up @@ -83,7 +110,7 @@ class TNativeSerializer: public ISerializer {
}

static arrow::ipc::IpcOptions GetDefaultOptions() {
static arrow::ipc::IpcWriteOptions options = BuildDefaultOptions();
arrow::ipc::IpcWriteOptions options = BuildDefaultOptions();
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
return options;
}

Expand Down
6 changes: 5 additions & 1 deletion ydb/core/formats/arrow/serializer/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ bool SupportsCompressionLevel(const arrow::Compression::type compression, const
return true;
}

bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression, const i32 compressionLevel) {
vlad-gogov marked this conversation as resolved.
Show resolved Hide resolved
return SupportsCompressionLevel(CompressionFromProto(compression).value(), compressionLevel);
}

std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression) {
if (!SupportsCompressionLevel(compression)) {
return {};
Expand All @@ -32,4 +36,4 @@ std::optional<int> MaximumCompressionLevel(const arrow::Compression::type compre
}
return NArrow::TStatusValidator::GetValid(arrow::util::Codec::MaximumCompressionLevel(compression));
}
}
} // namespace NKikimr::NArrow
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/serializer/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ bool SupportsCompressionLevel(const arrow::Compression::type compression, const

std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression);
std::optional<int> MaximumCompressionLevel(const arrow::Compression::type compression);
}
} // namespace NKikimr::NArrow
vlad-gogov marked this conversation as resolved.
Show resolved Hide resolved
13 changes: 10 additions & 3 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,19 @@ bool FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T&
}
familyDescription->SetColumnCodec(codec);
} else {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family'" << family.Name << "'";
return false;
if (family.Name != "default") {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for non `default` column family '" << family.Name << "'";
return false;
}
}

if (family.CompressionLevel.Defined()) {
if (!family.Compression.Defined()) {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family '" << family.Name << "', but compression level is set";
return false;
}
familyDescription->SetColumnCodecLevel(family.CompressionLevel.GetRef());
}
}
Expand Down
32 changes: 23 additions & 9 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ namespace NKqp {

TString TTestHelper::TCompression::BuildQuery() const {
TStringBuilder str;
str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType) << "\"";
if (CompressionType.has_value()) {
str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType.value()) << "\"";
}
if (CompressionLevel.has_value()) {
str << ", COMPRESSION_LEVEL=" << CompressionLevel.value();
}
Expand All @@ -167,9 +169,16 @@ namespace NKqp {
<< "` and in right value `" << rhs.GetSerializerClassName() << "`";
return false;
}
if (CompressionType != rhs.GetCompressionType()) {
errorMessage = TStringBuilder() << "different compression type: in left value `" << NArrow::CompressionToString(CompressionType)
<< "` and in right value `" << NArrow::CompressionToString(rhs.GetCompressionType()) << "`";
if (CompressionType.has_value() && rhs.GetCompressionType().has_value() && CompressionType.value() != rhs.GetCompressionType().value()) {
errorMessage = TStringBuilder() << "different compression type: in left value `"
<< NArrow::CompressionToString(CompressionType.value()) << "` and in right value `"
<< NArrow::CompressionToString(rhs.GetCompressionType().value()) << "`";
return false;
} else if (CompressionType.has_value() && !rhs.GetCompressionType().has_value()) {
errorMessage = TStringBuilder() << "compression type is set in left value, but not set in right value";
return false;
} else if (!CompressionType.has_value() && rhs.GetCompressionType().has_value()) {
errorMessage = TStringBuilder() << "compression type is not set in left value, but set in right value";
return false;
}
if (CompressionLevel.has_value() && rhs.GetCompressionLevel().has_value() &&
Expand All @@ -193,12 +202,15 @@ namespace NKqp {
}

bool TTestHelper::TColumnFamily::DeserializeFromProto(const NKikimrSchemeOp::TFamilyDescription& family) {
if (!family.HasId() || !family.HasName() || !family.HasColumnCodec()) {
if (!family.HasId() || !family.HasName()) {
return false;
}
Id = family.GetId();
FamilyName = family.GetName();
Compression = TTestHelper::TCompression().SetCompressionType(family.GetColumnCodec());
Compression = TTestHelper::TCompression();
if (family.HasColumnCodec()) {
Compression.SetCompressionType(family.GetColumnCodec());
}
if (family.HasColumnCodecLevel()) {
Compression.SetCompressionLevel(family.GetColumnCodecLevel());
}
Expand Down Expand Up @@ -290,9 +302,11 @@ namespace NKqp {
TString TTestHelper::TColumnTableBase::BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const {
auto str = TStringBuilder() << "ALTER OBJECT `" << Name << "` (TYPE " << GetObjectType() << ") SET";
str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerClassName() << "`,";
auto codec = NArrow::CompressionFromProto(compression.GetCompressionType());
Y_VERIFY(codec.has_value());
str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`";
if (compression.GetCompressionType().has_value()) {
auto codec = NArrow::CompressionFromProto(compression.GetCompressionType().value());
Y_VERIFY(codec.has_value());
str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`";
}
if (compression.GetCompressionLevel().has_value()) {
str << "`COMPRESSION.LEVEL`=" << compression.GetCompressionLevel().value();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TTestHelper {
public:
class TCompression {
YDB_ACCESSOR(TString, SerializerClassName, "ARROW_SERIALIZER");
YDB_ACCESSOR_DEF(NKikimrSchemeOp::EColumnCodec, CompressionType);
YDB_ACCESSOR_DEF(std::optional<NKikimrSchemeOp::EColumnCodec>, CompressionType);
vlad-gogov marked this conversation as resolved.
Show resolved Hide resolved
YDB_ACCESSOR_DEF(std::optional<i32>, CompressionLevel);

public:
Expand Down
80 changes: 80 additions & 0 deletions ydb/core/kqp/ut/olap/compression_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,47 @@
#include <ydb/core/kqp/ut/common/columnshard.h>
#include <ydb/core/kqp/ut/olap/helpers/get_value.h>
#include <ydb/core/kqp/ut/olap/helpers/query_executor.h>
#include <ydb/core/tx/columnshard/test_helper/controllers.h>

#include <ut/olap/helpers/typed_local.h>

namespace NKikimr::NKqp {

std::pair<ui64, ui64> GetVolumes(
const TKikimrRunner& runner, const TString& tablePath, const std::vector<TString> columnNames) {
TString selectQuery = "SELECT * FROM `" + tablePath + "/.sys/primary_index_stats` WHERE Activity == 1";
if (columnNames.size()) {
selectQuery += " AND EntityName IN ('" + JoinSeq("','", columnNames) + "')";
}
auto tableClient = runner.GetTableClient();
std::optional<ui64> rawBytesPred;
std::optional<ui64> bytesPred;
while (true) {
auto rows = ExecuteScanQuery(tableClient, selectQuery, false);
ui64 rawBytes = 0;
ui64 bytes = 0;
for (auto&& r : rows) {
for (auto&& c : r) {
if (c.first == "RawBytes") {
rawBytes += GetUint64(c.second);
}
if (c.first == "BlobRangeSize") {
bytes += GetUint64(c.second);
}
}
}
if (rawBytesPred && *rawBytesPred == rawBytes && bytesPred && *bytesPred == bytes) {
break;
} else {
rawBytesPred = rawBytes;
bytesPred = bytes;
Cerr << "Wait changes: " << bytes << "/" << rawBytes << Endl;
Sleep(TDuration::Seconds(5));
}
}
return { rawBytesPred.value(), bytesPred.value() };
}

Y_UNIT_TEST_SUITE(KqpOlapCompression) {
Y_UNIT_TEST(DisabledAlterCompression) {
TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false).SetEnableOlapCompression(false);
Expand Down Expand Up @@ -63,5 +103,45 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) {
testHelper.CreateTable(testTable);
testHelper.SetCompression(testTable, "pk_int", compression, NYdb::EStatus::SCHEME_ERROR);
}

std::pair<ui64, ui64> GetVolumesColumnWithCompression(const std::optional<NKikimrConfig::TColumnShardConfig>& CSConfig = {}) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
NKikimrConfig::TAppConfig appConfig;
if (CSConfig.has_value()) {
*appConfig.MutableColumnShardConfig() = CSConfig.value();
}
auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig);
TTestHelper testHelper(settings);
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
TTestHelper::TCompression plainCompression =
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);

TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("pk_int").SetType(NScheme::NTypeIds::Uint64).SetNullable(false)
};

TString tableName = "/Root/ColumnTableTest";
TTestHelper::TColumnTable testTable;
testTable.SetName(tableName).SetPrimaryKey({ "pk_int" }).SetSharding({ "pk_int" }).SetSchema(schema);
testHelper.CreateTable(testTable);

TVector<NArrow::NConstruction::IArrayBuilder::TPtr> dataBuilders;
dataBuilders.push_back(
NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt64Type>>::BuildNotNullable(
"pk_int", false));
auto batch = NArrow::NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(100000);
testHelper.BulkUpsert(testTable, batch);
csController->WaitCompactions(TDuration::Seconds(10));
return GetVolumes(testHelper.GetKikimr(), tableName, { "pk_int" });
}

Y_UNIT_TEST(DefaultCompressionViaCSConfig) {
NKikimrConfig::TColumnShardConfig csConfig = NKikimrConfig::TColumnShardConfig();
csConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);
auto [rawBytesPK1, bytesPK1] = GetVolumesColumnWithCompression(csConfig);
auto [rawBytesPK2, bytesPK2] = GetVolumesColumnWithCompression(); // Default compression (ZSTD, 1)
AFL_VERIFY(rawBytesPK2 == rawBytesPK1)("pk1", rawBytesPK1)("pk2", rawBytesPK2);
AFL_VERIFY(bytesPK2 < bytesPK1 / 3)("pk1", bytesPK1)("pk2", bytesPK2);
}
}
}
17 changes: 12 additions & 5 deletions ydb/core/kqp/ut/olap/sys_view_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 rawBytesPK1;
ui64 bytesPK1;
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
NKikimrConfig::TAppConfig appConfig;
auto* CSConfig = appConfig.MutableColumnShardConfig();
CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore");
Expand Down Expand Up @@ -259,8 +261,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 rawBytes1;
ui64 bytes1;
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
NKikimrConfig::TAppConfig appConfig;
auto* CSConfig = appConfig.MutableColumnShardConfig();
CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("Utf8", kikimr);
Expand Down Expand Up @@ -298,7 +302,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 bytes1;
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetSmallSizeDetector(Max<ui32>());
auto settings = TKikimrSettings().SetWithSampleTables(false);
NKikimrConfig::TAppConfig appConfig;
auto* CSConfig = appConfig.MutableColumnShardConfig();
CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("Utf8", kikimr);
Expand Down
Loading
Loading