Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change default compression via CSConfig #12542

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
7 changes: 4 additions & 3 deletions ydb/core/base/appdata_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,16 @@ inline TAppData* AppData(NActors::TActorSystem* actorSystem) {
}

inline bool HasAppData() {
return !!NActors::TlsActivationContext;
return !!NActors::TlsActivationContext
&& NActors::TlsActivationContext->ExecutorThread.ActorSystem
&& NActors::TlsActivationContext->ExecutorThread.ActorSystem->AppData<TAppData>();
}

inline TAppData& AppDataVerified() {
Y_ABORT_UNLESS(HasAppData());
auto& actorSystem = NActors::TlsActivationContext->ExecutorThread.ActorSystem;
Y_ABORT_UNLESS(actorSystem);
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
TAppData* const x = actorSystem->AppData<TAppData>();
Y_ABORT_UNLESS(x && x->Magic == TAppData::MagicTag);
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
Y_ABORT_UNLESS(x->Magic == TAppData::MagicTag);
return *x;
}

Expand Down
26 changes: 24 additions & 2 deletions ydb/core/formats/arrow/serializer/native.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#pragma once

#include "abstract.h"
#include "parsing.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <ydb/library/accessor/accessor.h>
Expand All @@ -22,6 +25,11 @@ class TNativeSerializer: public ISerializer {

TConclusion<std::shared_ptr<arrow::util::Codec>> BuildCodec(const arrow::Compression::type& cType, const std::optional<ui32> level) const;
static const inline TFactory::TRegistrator<TNativeSerializer> Registrator = TFactory::TRegistrator<TNativeSerializer>(GetClassNameStatic());

static std::shared_ptr<arrow::util::Codec> GetDefaultCodec() {
return *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME);
}

protected:
virtual bool IsCompatibleForExchangeWithSameClass(const ISerializer& /*item*/) const override {
return true;
Expand Down Expand Up @@ -53,7 +61,21 @@ class TNativeSerializer: public ISerializer {
static arrow::ipc::IpcOptions BuildDefaultOptions() {
arrow::ipc::IpcWriteOptions options;
options.use_threads = false;
options.codec = *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME);
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
if (HasAppData()) {
if (AppData()->ColumnShardConfig.HasDefaultCompression()) {
arrow::Compression::type codec = CompressionFromProto(AppData()->ColumnShardConfig.GetDefaultCompression()).value();
if (AppData()->ColumnShardConfig.HasDefaultCompressionLevel()) {
options.codec = NArrow::TStatusValidator::GetValid(
arrow::util::Codec::Create(codec, AppData()->ColumnShardConfig.GetDefaultCompressionLevel()));
} else {
options.codec = NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec));
}
} else {
options.codec = GetDefaultCodec();
}
} else {
options.codec = GetDefaultCodec();
}
return options;
}

Expand Down Expand Up @@ -83,7 +105,7 @@ class TNativeSerializer: public ISerializer {
}

static arrow::ipc::IpcOptions GetDefaultOptions() {
static arrow::ipc::IpcWriteOptions options = BuildDefaultOptions();
arrow::ipc::IpcWriteOptions options = BuildDefaultOptions();
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
return options;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/serializer/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ std::optional<int> MaximumCompressionLevel(const arrow::Compression::type compre
}
return NArrow::TStatusValidator::GetValid(arrow::util::Codec::MaximumCompressionLevel(compression));
}
}
} // namespace NKikimr::NArrow
13 changes: 10 additions & 3 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,19 @@ bool FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T&
}
familyDescription->SetColumnCodec(codec);
} else {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family'" << family.Name << "'";
return false;
if (family.Name != "default") {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for non `default` column family '" << family.Name << "'";
return false;
}
}

if (family.CompressionLevel.Defined()) {
if (!family.Compression.Defined()) {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family '" << family.Name << "', but compression level is set";
return false;
}
familyDescription->SetColumnCodecLevel(family.CompressionLevel.GetRef());
}
}
Expand Down
32 changes: 23 additions & 9 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ namespace NKqp {

TString TTestHelper::TCompression::BuildQuery() const {
TStringBuilder str;
str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType) << "\"";
if (CompressionType.has_value()) {
str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType.value()) << "\"";
}
if (CompressionLevel.has_value()) {
str << ", COMPRESSION_LEVEL=" << CompressionLevel.value();
}
Expand All @@ -167,9 +169,16 @@ namespace NKqp {
<< "` and in right value `" << rhs.GetSerializerClassName() << "`";
return false;
}
if (CompressionType != rhs.GetCompressionType()) {
errorMessage = TStringBuilder() << "different compression type: in left value `" << NArrow::CompressionToString(CompressionType)
<< "` and in right value `" << NArrow::CompressionToString(rhs.GetCompressionType()) << "`";
if (CompressionType.has_value() && rhs.HasCompressionType() && CompressionType.value() != rhs.GetCompressionTypeUnsafe()) {
errorMessage = TStringBuilder() << "different compression type: in left value `"
<< NArrow::CompressionToString(CompressionType.value()) << "` and in right value `"
<< NArrow::CompressionToString(rhs.GetCompressionTypeUnsafe()) << "`";
return false;
} else if (CompressionType.has_value() && !rhs.HasCompressionType()) {
errorMessage = TStringBuilder() << "compression type is set in left value, but not set in right value";
return false;
} else if (!CompressionType.has_value() && rhs.HasCompressionType()) {
errorMessage = TStringBuilder() << "compression type is not set in left value, but set in right value";
return false;
}
if (CompressionLevel.has_value() && rhs.GetCompressionLevel().has_value() &&
Expand All @@ -193,12 +202,15 @@ namespace NKqp {
}

bool TTestHelper::TColumnFamily::DeserializeFromProto(const NKikimrSchemeOp::TFamilyDescription& family) {
if (!family.HasId() || !family.HasName() || !family.HasColumnCodec()) {
if (!family.HasId() || !family.HasName()) {
return false;
}
Id = family.GetId();
FamilyName = family.GetName();
Compression = TTestHelper::TCompression().SetCompressionType(family.GetColumnCodec());
Compression = TTestHelper::TCompression();
if (family.HasColumnCodec()) {
Compression.SetCompressionType(family.GetColumnCodec());
}
if (family.HasColumnCodecLevel()) {
Compression.SetCompressionLevel(family.GetColumnCodecLevel());
}
Expand Down Expand Up @@ -290,9 +302,11 @@ namespace NKqp {
TString TTestHelper::TColumnTableBase::BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const {
auto str = TStringBuilder() << "ALTER OBJECT `" << Name << "` (TYPE " << GetObjectType() << ") SET";
str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerClassName() << "`,";
auto codec = NArrow::CompressionFromProto(compression.GetCompressionType());
Y_VERIFY(codec.has_value());
str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`";
if (compression.HasCompressionType()) {
auto codec = NArrow::CompressionFromProto(compression.GetCompressionTypeUnsafe());
Y_VERIFY(codec.has_value());
str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`";
}
if (compression.GetCompressionLevel().has_value()) {
str << "`COMPRESSION.LEVEL`=" << compression.GetCompressionLevel().value();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TTestHelper {
public:
class TCompression {
YDB_ACCESSOR(TString, SerializerClassName, "ARROW_SERIALIZER");
YDB_ACCESSOR_DEF(NKikimrSchemeOp::EColumnCodec, CompressionType);
YDB_OPT(NKikimrSchemeOp::EColumnCodec, CompressionType);
YDB_ACCESSOR_DEF(std::optional<i32>, CompressionLevel);

public:
Expand Down
81 changes: 81 additions & 0 deletions ydb/core/kqp/ut/olap/compression_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,47 @@
#include <ydb/core/kqp/ut/common/columnshard.h>
#include <ydb/core/kqp/ut/olap/helpers/get_value.h>
#include <ydb/core/kqp/ut/olap/helpers/query_executor.h>
#include <ydb/core/tx/columnshard/test_helper/controllers.h>

#include <ut/olap/helpers/typed_local.h>

namespace NKikimr::NKqp {

std::pair<ui64, ui64> GetVolumes(
const TKikimrRunner& runner, const TString& tablePath, const std::vector<TString> columnNames) {
TString selectQuery = "SELECT * FROM `" + tablePath + "/.sys/primary_index_stats` WHERE Activity == 1";
if (columnNames.size()) {
selectQuery += " AND EntityName IN ('" + JoinSeq("','", columnNames) + "')";
}
auto tableClient = runner.GetTableClient();
std::optional<ui64> rawBytesPred;
std::optional<ui64> bytesPred;
while (true) {
auto rows = ExecuteScanQuery(tableClient, selectQuery, false);
ui64 rawBytes = 0;
ui64 bytes = 0;
for (auto&& r : rows) {
for (auto&& c : r) {
if (c.first == "RawBytes") {
rawBytes += GetUint64(c.second);
}
if (c.first == "BlobRangeSize") {
bytes += GetUint64(c.second);
}
}
}
if (rawBytesPred && *rawBytesPred == rawBytes && bytesPred && *bytesPred == bytes) {
break;
} else {
rawBytesPred = rawBytes;
bytesPred = bytes;
Cerr << "Wait changes: " << bytes << "/" << rawBytes << Endl;
Sleep(TDuration::Seconds(5));
}
}
return { rawBytesPred.value(), bytesPred.value() };
}

Y_UNIT_TEST_SUITE(KqpOlapCompression) {
Y_UNIT_TEST(DisabledAlterCompression) {
TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false).SetEnableOlapCompression(false);
Expand Down Expand Up @@ -63,5 +103,46 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) {
testHelper.CreateTable(testTable);
testHelper.SetCompression(testTable, "pk_int", compression, NYdb::EStatus::SCHEME_ERROR);
}

std::pair<ui64, ui64> GetVolumesColumnWithCompression(const std::optional<NKikimrConfig::TColumnShardConfig>& CSConfig = {}) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
NKikimrConfig::TAppConfig appConfig;
if (CSConfig.has_value()) {
*appConfig.MutableColumnShardConfig() = CSConfig.value();
}
auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig);
TTestHelper testHelper(settings);
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
TTestHelper::TCompression plainCompression =
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);

TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("pk_int").SetType(NScheme::NTypeIds::Uint64).SetNullable(false)
};

TString tableName = "/Root/ColumnTableTest";
TTestHelper::TColumnTable testTable;
testTable.SetName(tableName).SetPrimaryKey({ "pk_int" }).SetSharding({ "pk_int" }).SetSchema(schema);
testHelper.CreateTable(testTable);

TVector<NArrow::NConstruction::IArrayBuilder::TPtr> dataBuilders;
dataBuilders.push_back(
NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt64Type>>::BuildNotNullable(
"pk_int", false));
auto batch = NArrow::NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(100000);
testHelper.BulkUpsert(testTable, batch);
csController->WaitCompactions(TDuration::Seconds(10));
return GetVolumes(testHelper.GetKikimr(), tableName, { "pk_int" });
}

Y_UNIT_TEST(DefaultCompressionViaCSConfig) {
auto [rawBytesPK1, bytesPK1] = GetVolumesColumnWithCompression(); // Default compression LZ4
NKikimrConfig::TColumnShardConfig csConfig = NKikimrConfig::TColumnShardConfig();
csConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD);
csConfig.SetDefaultCompressionLevel(1);
auto [rawBytesPK2, bytesPK2] = GetVolumesColumnWithCompression(csConfig);
AFL_VERIFY(rawBytesPK2 == rawBytesPK1)("pk1", rawBytesPK1)("pk2", rawBytesPK2);
AFL_VERIFY(bytesPK2 < bytesPK1 / 3)("pk1", bytesPK1)("pk2", bytesPK2);
}
}
}
17 changes: 12 additions & 5 deletions ydb/core/kqp/ut/olap/sys_view_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 rawBytesPK1;
ui64 bytesPK1;
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
NKikimrConfig::TAppConfig appConfig;
auto* CSConfig = appConfig.MutableColumnShardConfig();
CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore");
Expand Down Expand Up @@ -259,8 +261,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 rawBytes1;
ui64 bytes1;
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
NKikimrConfig::TAppConfig appConfig;
auto* CSConfig = appConfig.MutableColumnShardConfig();
CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("Utf8", kikimr);
Expand Down Expand Up @@ -298,7 +302,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 bytes1;
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetSmallSizeDetector(Max<ui32>());
auto settings = TKikimrSettings().SetWithSampleTables(false);
NKikimrConfig::TAppConfig appConfig;
auto* CSConfig = appConfig.MutableColumnShardConfig();
CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("Utf8", kikimr);
Expand Down
Loading
Loading