Skip to content

Commit

Permalink
implemented functional
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-gogov committed Dec 19, 2024
1 parent 3317b75 commit 1e64839
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 19 deletions.
9 changes: 5 additions & 4 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,13 +415,14 @@ bool FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T&
return false;
}
familyDescription->SetColumnCodec(codec);
} else {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family'" << family.Name << "'";
return false;
}

if (family.CompressionLevel.Defined()) {
if (!family.Compression.Defined()) {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family '" << family.Name << "', but compression level is set";
return false;
}
familyDescription->SetColumnCodecLevel(family.CompressionLevel.GetRef());
}
}
Expand Down
145 changes: 141 additions & 4 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9468,10 +9468,10 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {

auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName);
auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema();
TTestHelper::TCompression plainCompression =
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);
TTestHelper::TCompression defaultCompression =
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(1);
TTestHelper::TColumnFamily defaultFamily =
TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression);
TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(defaultCompression);

UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1);
TTestHelper::TColumnFamily defaultFromScheme;
Expand Down Expand Up @@ -10264,7 +10264,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TString tableName = "/Root/TableWithFamily";

TTestHelper::TCompression plainCompression =
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(1);

TVector<TTestHelper::TColumnFamily> families = {
TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(plainCompression),
Expand Down Expand Up @@ -10715,6 +10715,143 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.RebootTablets("/Root/ColumnTableTest");
}

Y_UNIT_TEST(CreateTableWithDefaultFamilyWithoutSettings) {
TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false));
TString tableName = "/Root/ColumnTableTest";
auto session = testHelper.GetSession();
auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` (
Key Uint64 NOT NULL,
Value1 String,
Value2 Uint32,
PRIMARY KEY (Key),
FAMILY default ())
WITH (STORE = COLUMN);)";
auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto& runner = testHelper.GetKikimr();
auto runtime = runner.GetTestServer().GetRuntime();
TActorId sender = runtime->AllocateEdgeActor();

auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName);
auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema();
TTestHelper::TCompression defaultCompression =
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(1);
TTestHelper::TColumnFamily defaultFamily =
TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(defaultCompression);

UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1);
TTestHelper::TColumnFamily defaultFromScheme;
UNIT_ASSERT(defaultFromScheme.DeserializeFromProto(schema.GetColumnFamilies(0)));
{
TString errorMessage;
UNIT_ASSERT_C(defaultFromScheme.IsEqual(defaultFamily, errorMessage), errorMessage);
}

auto columns = schema.GetColumns();
for (ui32 i = 0; i < schema.ColumnsSize(); i++) {
auto column = columns[i];
UNIT_ASSERT(column.HasSerializer());
UNIT_ASSERT_EQUAL_C(
column.GetColumnFamilyId(), 0, TStringBuilder() << "family for column " << column.GetName() << " is not default");
TTestHelper::TCompression compression;
UNIT_ASSERT(compression.DeserializeFromProto(schema.GetColumns(i).GetSerializer()));
TString errorMessage;
UNIT_ASSERT_C(compression.IsEqual(defaultFamily.GetCompression(), errorMessage), errorMessage);
}
}

Y_UNIT_TEST(CreateTableWithFamilyWithOnlyCompressionLevel) {
TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false));
TString tableName = "/Root/ColumnTableTest";
auto session = testHelper.GetSession();
auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` (
Key Uint64 NOT NULL,
Value1 String,
Value2 Uint32,
PRIMARY KEY (Key),
FAMILY family1 (
COMPRESSION_LEVEL = 2
))
WITH (STORE = COLUMN);)";
auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
}

void CreateTableWithDefaultColumnFamily(const TTestHelper::TCompression& compression, const EStatus& statusCreateTable = EStatus::SUCCESS) {
NKikimrConfig::TAppConfig appConfig;
auto* CSConfig = appConfig.MutableColumnShardConfig();
CSConfig->SetDefaultCompression(compression.GetCompressionType());
if (compression.GetCompressionLevel().has_value()) {
CSConfig->SetDefaultCompressionLevel(compression.GetCompressionLevel().value());
}
TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig));

TString tableName = "/Root/TableWithoutColumnFamily";
{
TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false),
TTestHelper::TColumnSchema().SetName("Value1").SetType(NScheme::NTypeIds::String).SetNullable(true),
TTestHelper::TColumnSchema().SetName("Value2").SetType(NScheme::NTypeIds::Uint32).SetNullable(true)
};

TTestHelper::TColumnTable testTable;
testTable.SetName(tableName).SetPrimaryKey({ "Key" }).SetSchema(schema);
testHelper.CreateTable(testTable, statusCreateTable);
}

if (statusCreateTable == EStatus::SUCCESS) {
auto& runner = testHelper.GetKikimr();
auto tableClient = runner.GetTableClient();
auto session = tableClient.CreateSession().GetValueSync().GetSession();

auto runtime = runner.GetTestServer().GetRuntime();
TActorId sender = runtime->AllocateEdgeActor();

auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName);
auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema();
TTestHelper::TColumnFamily defaultFamily =
TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(compression);

UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1);
TTestHelper::TColumnFamily defaultFromScheme;
UNIT_ASSERT(defaultFromScheme.DeserializeFromProto(schema.GetColumnFamilies(0)));
{
TString errorMessage;
UNIT_ASSERT_C(defaultFromScheme.IsEqual(defaultFamily, errorMessage), errorMessage);
}

auto columns = schema.GetColumns();
for (ui32 i = 0; i < schema.ColumnsSize(); i++) {
auto column = columns[i];
UNIT_ASSERT(column.HasSerializer());
UNIT_ASSERT_EQUAL_C(
column.GetColumnFamilyId(), 0, TStringBuilder() << "family for column " << column.GetName() << " is not default");
TTestHelper::TCompression compression;
UNIT_ASSERT(compression.DeserializeFromProto(schema.GetColumns(i).GetSerializer()));
TString errorMessage;
UNIT_ASSERT_C(compression.IsEqual(defaultFamily.GetCompression(), errorMessage), errorMessage);
}
}
}

Y_UNIT_TEST(DefaultCompressionViaConfig) {
CreateTableWithDefaultColumnFamily(TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain));
CreateTableWithDefaultColumnFamily(
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain).SetCompressionLevel(1),
EStatus::SCHEME_ERROR);
CreateTableWithDefaultColumnFamily(TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4));
CreateTableWithDefaultColumnFamily(
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain).SetCompressionLevel(5),
EStatus::SCHEME_ERROR);
CreateTableWithDefaultColumnFamily(
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(5));
CreateTableWithDefaultColumnFamily(
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(100),
EStatus::SCHEME_ERROR);
CreateTableWithDefaultColumnFamily(
TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD).SetCompressionLevel(-100));
}
}

Y_UNIT_TEST_SUITE(KqpOlapTypes) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,8 @@ message TColumnShardConfig {
optional string ReaderClassName = 28;
optional bool AllowNullableColumnsInPK = 29 [default = false];
optional uint32 RestoreDataOnWriteTimeoutSeconds = 30;
optional NKikimrSchemeOp.EColumnCodec DefaultCompression = 31 [default = ColumnCodecZSTD];
optional int32 DefaultCompressionLevel = 32 [default = 1];
}

message TSchemeShardConfig {
Expand Down
43 changes: 32 additions & 11 deletions ydb/core/tx/schemeshard/olap/operations/create_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,22 +561,43 @@ class TCreateColumnTable: public TSubOperation {

void AddDefaultFamilyIfNotExists(NKikimrSchemeOp::TColumnTableDescription& createDescription) {
auto schema = createDescription.GetSchema();
for (const auto& family : schema.GetColumnFamilies()) {
std::optional<ui32> idDefaultFamily = {};
bool defaultFamilyIsNotExists = true;

for (ui32 i = 0; i < schema.ColumnFamiliesSize(); i++) {
auto family = schema.GetColumnFamilies(i);
if (family.GetName() == "default") {
return;
defaultFamilyIsNotExists = false;
if (family.HasColumnCodec()) {
return;
}
idDefaultFamily = i;
break;
}
}

NKikimrSchemeOp::TFamilyDescription* defaultFamily = nullptr;
auto mutableSchema = createDescription.MutableSchema();
auto defaultFamily = mutableSchema->AddColumnFamilies();
defaultFamily->SetName("default");
defaultFamily->SetId(0);
defaultFamily->SetColumnCodec(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);

for (ui32 i = 0; i < schema.ColumnsSize(); i++) {
if (!schema.GetColumns(i).HasColumnFamilyName() || !schema.GetColumns(i).HasColumnFamilyId()) {
mutableSchema->MutableColumns(i)->SetColumnFamilyName("default");
mutableSchema->MutableColumns(i)->SetColumnFamilyId(0);
if (!idDefaultFamily.has_value()) {
defaultFamily = mutableSchema->AddColumnFamilies();
defaultFamily->SetName("default");
defaultFamily->SetId(0);
} else {
defaultFamily = mutableSchema->MutableColumnFamilies(idDefaultFamily.value());
}

auto CSConfig = AppData()->ColumnShardConfig;
defaultFamily->SetColumnCodec(CSConfig.GetDefaultCompression());
if (CSConfig.HasDefaultCompressionLevel()) {
defaultFamily->SetColumnCodecLevel(CSConfig.GetDefaultCompressionLevel());
}

if (defaultFamilyIsNotExists) {
for (ui32 i = 0; i < schema.ColumnsSize(); i++) {
if (!schema.GetColumns(i).HasColumnFamilyName() || !schema.GetColumns(i).HasColumnFamilyId()) {
mutableSchema->MutableColumns(i)->SetColumnFamilyName("default");
mutableSchema->MutableColumns(i)->SetColumnFamilyId(0);
}
}
}
}
Expand Down

0 comments on commit 1e64839

Please sign in to comment.