Skip to content

Commit

Permalink
default compression via CSConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-gogov committed Dec 19, 2024
1 parent 3317b75 commit f1e3c60
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 85 deletions.
19 changes: 18 additions & 1 deletion ydb/core/formats/arrow/serializer/native.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
#pragma once

#include "abstract.h"
#include "parsing.h"
#include "utils.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/result.h>
#include <ydb/library/yverify_stream/yverify_stream.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/options.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
Expand Down Expand Up @@ -53,7 +58,19 @@ class TNativeSerializer: public ISerializer {
static arrow::ipc::IpcOptions BuildDefaultOptions() {
arrow::ipc::IpcWriteOptions options;
options.use_threads = false;
options.codec = *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME);
std::optional<arrow::Compression::type> codec = CompressionFromProto(AppData()->ColumnShardConfig.GetDefaultCompression());
Y_VERIFY_S(codec.has_value(), "ColumnShardConfig: Unknown compression");
if (AppData()->ColumnShardConfig.HasDefaultCompressionLevel()) {
TString errorMessage = TStringBuilder() << "ColumnShardConfig: Compression `" << CompressionToString(codec.value())
<< "` is not support compression level";
Y_VERIFY_S(SupportsCompressionLevel(codec.value()), errorMessage.c_str());
}
if (codec.value() == arrow::Compression::type::ZSTD) {
i32 codecLevel = AppData()->ColumnShardConfig.GetDefaultCompressionLevel();
options.codec = NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec.value(), codecLevel));
} else {
options.codec = NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec.value()));
}
return options;
}

Expand Down
9 changes: 5 additions & 4 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,13 +415,14 @@ bool FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T&
return false;
}
familyDescription->SetColumnCodec(codec);
} else {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family'" << family.Name << "'";
return false;
}

if (family.CompressionLevel.Defined()) {
if (!family.Compression.Defined()) {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family '" << family.Name << "', but compression level is set";
return false;
}
familyDescription->SetColumnCodecLevel(family.CompressionLevel.GetRef());
}
}
Expand Down
30 changes: 21 additions & 9 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ namespace NKqp {

TString TTestHelper::TCompression::BuildQuery() const {
TStringBuilder str;
str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType) << "\"";
if (CompressionType.has_value()) {
str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType.value()) << "\"";
}
if (CompressionLevel.has_value()) {
str << ", COMPRESSION_LEVEL=" << CompressionLevel.value();
}
Expand All @@ -173,9 +175,16 @@ namespace NKqp {
<< "` and in right value `" << rhs.GetSerializerClassName() << "`";
return false;
}
if (CompressionType != rhs.GetCompressionType()) {
errorMessage = TStringBuilder() << "different compression type: in left value `" << NArrow::CompressionToString(CompressionType)
<< "` and in right value `" << NArrow::CompressionToString(rhs.GetCompressionType()) << "`";
if (CompressionType.has_value() && rhs.GetCompressionType().has_value() && CompressionType.value() != rhs.GetCompressionType().value()) {
errorMessage = TStringBuilder() << "different compression type: in left value `"
<< NArrow::CompressionToString(CompressionType.value()) << "` and in right value `"
<< NArrow::CompressionToString(rhs.GetCompressionType().value()) << "`";
return false;
} else if (CompressionType.has_value() && !rhs.GetCompressionType().has_value()) {
errorMessage = TStringBuilder() << "compression type is set in left value, but not set in right value";
return false;
} else if (!CompressionType.has_value() && rhs.GetCompressionType().has_value()) {
errorMessage = TStringBuilder() << "compression type is not set in left value, but set in right value";
return false;
}
if (CompressionLevel.has_value() && rhs.GetCompressionLevel().has_value() &&
Expand All @@ -199,12 +208,15 @@ namespace NKqp {
}

bool TTestHelper::TColumnFamily::DeserializeFromProto(const NKikimrSchemeOp::TFamilyDescription& family) {
if (!family.HasId() || !family.HasName() || !family.HasColumnCodec()) {
if (!family.HasId() || !family.HasName()) {
return false;
}
Id = family.GetId();
FamilyName = family.GetName();
Compression = TTestHelper::TCompression().SetCompressionType(family.GetColumnCodec());
Compression = TTestHelper::TCompression();
if (family.HasColumnCodec()) {
Compression.SetCompressionType(family.GetColumnCodec());
}
if (family.HasColumnCodecLevel()) {
Compression.SetCompressionLevel(family.GetColumnCodecLevel());
}
Expand Down Expand Up @@ -296,9 +308,9 @@ namespace NKqp {
TString TTestHelper::TColumnTableBase::BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const {
auto str = TStringBuilder() << "ALTER OBJECT `" << Name << "` (TYPE " << GetObjectType() << ") SET";
str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerClassName() << "`,";
auto codec = NArrow::CompressionFromProto(compression.GetCompressionType());
Y_VERIFY(codec.has_value());
str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`";
if (compression.GetCompressionType().has_value()) {
str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(compression.GetCompressionType().value()) << "`";
}
if (compression.GetCompressionLevel().has_value()) {
str << "`COMPRESSION.LEVEL`=" << compression.GetCompressionLevel().value();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TTestHelper {
public:
class TCompression {
YDB_ACCESSOR(TString, SerializerClassName, "ARROW_SERIALIZER");
YDB_ACCESSOR_DEF(NKikimrSchemeOp::EColumnCodec, CompressionType);
YDB_ACCESSOR_DEF(std::optional<NKikimrSchemeOp::EColumnCodec>, CompressionType);
YDB_ACCESSOR_DEF(std::optional<i32>, CompressionLevel);

public:
Expand Down
121 changes: 101 additions & 20 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9468,10 +9468,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {

auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName);
auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema();
TTestHelper::TCompression plainCompression =
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);
TTestHelper::TColumnFamily defaultFamily =
TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression);
TTestHelper::TColumnFamily defaultFamily = TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default");

UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1);
TTestHelper::TColumnFamily defaultFromScheme;
Expand All @@ -9480,18 +9477,6 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TString errorMessage;
UNIT_ASSERT_C(defaultFromScheme.IsEqual(defaultFamily, errorMessage), errorMessage);
}

auto columns = schema.GetColumns();
for (ui32 i = 0; i < schema.ColumnsSize(); i++) {
auto column = columns[i];
UNIT_ASSERT(column.HasSerializer());
UNIT_ASSERT_EQUAL_C(
column.GetColumnFamilyId(), 0, TStringBuilder() << "family for column " << column.GetName() << " is not default");
TTestHelper::TCompression compression;
UNIT_ASSERT(compression.DeserializeFromProto(schema.GetColumns(i).GetSerializer()));
TString errorMessage;
UNIT_ASSERT_C(compression.IsEqual(defaultFamily.GetCompression(), errorMessage), errorMessage);
}
}

// Field `Data` is not used in ColumnFamily for ColumnTable
Expand Down Expand Up @@ -10263,11 +10248,10 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false));
TString tableName = "/Root/TableWithFamily";

TTestHelper::TCompression plainCompression =
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);
TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);

TVector<TTestHelper::TColumnFamily> families = {
TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(plainCompression),
TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(lz4Compression),
};

{
Expand All @@ -10290,7 +10274,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.CreateTable(testTable);
}

families.push_back(TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression));
families.push_back(TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default"));
auto& runner = testHelper.GetKikimr();
auto runtime = runner.GetTestServer().GetRuntime();
TActorId sender = runtime->AllocateEdgeActor();
Expand Down Expand Up @@ -10715,6 +10699,103 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.RebootTablets("/Root/ColumnTableTest");
}

Y_UNIT_TEST(CreateTableWithDefaultFamilyWithoutSettings) {
TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false));
TString tableName = "/Root/ColumnTableTest";
auto session = testHelper.GetSession();
auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` (
Key Uint64 NOT NULL,
Value1 String,
Value2 Uint32,
PRIMARY KEY (Key),
FAMILY default ())
WITH (STORE = COLUMN);)";
auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto& runner = testHelper.GetKikimr();
auto runtime = runner.GetTestServer().GetRuntime();
TActorId sender = runtime->AllocateEdgeActor();

auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName);
auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema();
TTestHelper::TColumnFamily defaultFamily = TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default");

UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1);
TTestHelper::TColumnFamily defaultFromScheme;
UNIT_ASSERT(defaultFromScheme.DeserializeFromProto(schema.GetColumnFamilies(0)));
{
TString errorMessage;
UNIT_ASSERT_C(defaultFromScheme.IsEqual(defaultFamily, errorMessage), errorMessage);
}
}

Y_UNIT_TEST(CreateTableWithFamilyWithOnlyCompressionLevel) {
TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false));
TString tableName = "/Root/ColumnTableTest";
auto session = testHelper.GetSession();
auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` (
Key Uint64 NOT NULL,
Value1 String,
Value2 Uint32,
PRIMARY KEY (Key),
FAMILY family1 (
COMPRESSION_LEVEL = 2
))
WITH (STORE = COLUMN);)";
auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
}

void CreateTableWithCompressionSettings(const TTestHelper::TCompression& compression, const EStatus& statusCreateTable = EStatus::SUCCESS) {
NKikimrConfig::TAppConfig appConfig;
auto* CSConfig = appConfig.MutableColumnShardConfig();
CSConfig->SetDefaultCompression(compression.GetCompressionType().value());
if (compression.GetCompressionLevel().has_value()) {
CSConfig->SetDefaultCompressionLevel(compression.GetCompressionLevel().value());
}
TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig));

TString tableName = "/Root/TableWithoutColumnFamily";
{
TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false),
TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true),
TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true)
};

TTestHelper::TColumnTable testTable;
testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema);
testHelper.CreateTable(testTable, statusCreateTable);
}

if (statusCreateTable == EStatus::SUCCESS) {
auto& runner = testHelper.GetKikimr();
auto tableClient = runner.GetTableClient();
auto session = tableClient.CreateSession().GetValueSync().GetSession();

auto runtime = runner.GetTestServer().GetRuntime();
TActorId sender = runtime->AllocateEdgeActor();

auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName);
auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema();
TTestHelper::TColumnFamily defaultFamily = TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default");

UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1);
TTestHelper::TColumnFamily defaultFromScheme;
UNIT_ASSERT(defaultFromScheme.DeserializeFromProto(schema.GetColumnFamilies(0)));
{
TString errorMessage;
UNIT_ASSERT_C(defaultFromScheme.IsEqual(defaultFamily, errorMessage), errorMessage);
}
}
}

Y_UNIT_TEST(DefaultCompressionViaConfig) {
CreateTableWithCompressionSettings(TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4));
CreateTableWithCompressionSettings(
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(5));
}
}

Y_UNIT_TEST_SUITE(KqpOlapTypes) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,8 @@ message TColumnShardConfig {
optional string ReaderClassName = 28;
optional bool AllowNullableColumnsInPK = 29 [default = false];
optional uint32 RestoreDataOnWriteTimeoutSeconds = 30;
optional NKikimrSchemeOp.EColumnCodec DefaultCompression = 31 [default = ColumnCodecZSTD];
optional int32 DefaultCompressionLevel = 32 [default = 1];
}

message TSchemeShardConfig {
Expand Down
35 changes: 18 additions & 17 deletions ydb/core/tx/schemeshard/olap/column_families/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,24 @@ bool TOlapColumnFamiliesDescription::Validate(const NKikimrSchemeOp::TColumnTabl
}
lastColumnFamilyId = familyProto.GetId();

if (!familyProto.HasColumnCodec()) {
errors.AddError("missing column codec for column family '" + columnFamilyName + "'");
return false;
}

auto serializerProto = ConvertFamilyDescriptionToProtoSerializer(familyProto);
if (serializerProto.IsFail()) {
errors.AddError(serializerProto.GetErrorMessage());
return false;
}
NArrow::NSerialization::TSerializerContainer serializer;
if (!serializer.DeserializeFromProto(serializerProto.GetResult())) {
errors.AddError(TStringBuilder() << "can't deserialize column family `" << columnFamilyName << "` from proto ");
return false;
}
if (!family->GetSerializerContainer().IsEqualTo(serializer)) {
errors.AddError(TStringBuilder() << "compression from column family '" << columnFamilyName << "` is not matching schema preset");
if (familyProto.HasColumnCodec() && family->GetSerializerContainer().has_value()) {
auto serializerProto = ConvertFamilyDescriptionToProtoSerializer(familyProto);
if (serializerProto.IsFail()) {
errors.AddError(serializerProto.GetErrorMessage());
return false;
}
NArrow::NSerialization::TSerializerContainer serializer;
if (!serializer.DeserializeFromProto(serializerProto.GetResult())) {
errors.AddError(TStringBuilder() << "can't deserialize column family `" << columnFamilyName << "` from proto ");
return false;
}
if (!family->GetSerializerContainer()->IsEqualTo(serializer)) {
errors.AddError(TStringBuilder() << "compression from column family '" << columnFamilyName << "` is not matching schema preset");
return false;
}
} else if ((!familyProto.HasColumnCodec() && family->GetSerializerContainer().has_value()) ||
(familyProto.HasColumnCodec() && !family->GetSerializerContainer().has_value())) {
errors.AddError(TStringBuilder() << "compression is not matching schema preset in column family `" << columnFamilyName << "`");
return false;
}
}
Expand Down
Loading

0 comments on commit f1e3c60

Please sign in to comment.