diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index ae2bc9ab9fa5..83d75f41941a 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -297,15 +297,16 @@ inline TAppData* AppData(NActors::TActorSystem* actorSystem) { } inline bool HasAppData() { - return !!NActors::TlsActivationContext; + return !!NActors::TlsActivationContext + && NActors::TlsActivationContext->ExecutorThread.ActorSystem + && NActors::TlsActivationContext->ExecutorThread.ActorSystem->AppData(); } inline TAppData& AppDataVerified() { Y_ABORT_UNLESS(HasAppData()); auto& actorSystem = NActors::TlsActivationContext->ExecutorThread.ActorSystem; - Y_ABORT_UNLESS(actorSystem); TAppData* const x = actorSystem->AppData(); - Y_ABORT_UNLESS(x && x->Magic == TAppData::MagicTag); + Y_ABORT_UNLESS(x->Magic == TAppData::MagicTag); return *x; } diff --git a/ydb/core/formats/arrow/serializer/native.h b/ydb/core/formats/arrow/serializer/native.h index d09241a7799f..de33865692b9 100644 --- a/ydb/core/formats/arrow/serializer/native.h +++ b/ydb/core/formats/arrow/serializer/native.h @@ -1,7 +1,10 @@ #pragma once #include "abstract.h" +#include "parsing.h" +#include +#include #include #include @@ -22,6 +25,11 @@ class TNativeSerializer: public ISerializer { TConclusion> BuildCodec(const arrow::Compression::type& cType, const std::optional level) const; static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); + + static std::shared_ptr GetDefaultCodec() { + return *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME); + } + protected: virtual bool IsCompatibleForExchangeWithSameClass(const ISerializer& /*item*/) const override { return true; @@ -53,7 +61,21 @@ class TNativeSerializer: public ISerializer { static arrow::ipc::IpcOptions BuildDefaultOptions() { arrow::ipc::IpcWriteOptions options; options.use_threads = false; - options.codec = *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME); + if (HasAppData()) { + if (AppData()->ColumnShardConfig.HasDefaultCompression()) { + arrow::Compression::type codec = CompressionFromProto(AppData()->ColumnShardConfig.GetDefaultCompression()).value(); + if (AppData()->ColumnShardConfig.HasDefaultCompressionLevel()) { + options.codec = NArrow::TStatusValidator::GetValid( + arrow::util::Codec::Create(codec, AppData()->ColumnShardConfig.GetDefaultCompressionLevel())); + } else { + options.codec = NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec)); + } + } else { + options.codec = GetDefaultCodec(); + } + } else { + options.codec = GetDefaultCodec(); + } return options; } @@ -83,7 +105,7 @@ class TNativeSerializer: public ISerializer { } static arrow::ipc::IpcOptions GetDefaultOptions() { - static arrow::ipc::IpcWriteOptions options = BuildDefaultOptions(); + arrow::ipc::IpcWriteOptions options = BuildDefaultOptions(); return options; } diff --git a/ydb/core/formats/arrow/serializer/utils.cpp b/ydb/core/formats/arrow/serializer/utils.cpp index 432086605caf..cda91a203b10 100644 --- a/ydb/core/formats/arrow/serializer/utils.cpp +++ b/ydb/core/formats/arrow/serializer/utils.cpp @@ -32,4 +32,4 @@ std::optional MaximumCompressionLevel(const arrow::Compression::type compre } return NArrow::TStatusValidator::GetValid(arrow::util::Codec::MaximumCompressionLevel(compression)); } -} +} // namespace NKikimr::NArrow diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 81ceb584d6ba..1aa312d1c2e5 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -416,12 +416,19 @@ bool FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T& } familyDescription->SetColumnCodec(codec); } else { - code = Ydb::StatusIds::BAD_REQUEST; - error = TStringBuilder() << "Compression is not set for column family'" << family.Name << "'"; - return false; + if (family.Name != "default") { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Compression is not set for non `default` column family '" << family.Name << "'"; + return false; + } } if (family.CompressionLevel.Defined()) { + if (!family.Compression.Defined()) { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Compression is not set for column family '" << family.Name << "', but compression level is set"; + return false; + } familyDescription->SetColumnCodecLevel(family.CompressionLevel.GetRef()); } } diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index 016440eff634..d456eba2a292 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -154,7 +154,9 @@ namespace NKqp { TString TTestHelper::TCompression::BuildQuery() const { TStringBuilder str; - str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType) << "\""; + if (CompressionType.has_value()) { + str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType.value()) << "\""; + } if (CompressionLevel.has_value()) { str << ", COMPRESSION_LEVEL=" << CompressionLevel.value(); } @@ -167,9 +169,16 @@ namespace NKqp { << "` and in right value `" << rhs.GetSerializerClassName() << "`"; return false; } - if (CompressionType != rhs.GetCompressionType()) { - errorMessage = TStringBuilder() << "different compression type: in left value `" << NArrow::CompressionToString(CompressionType) - << "` and in right value `" << NArrow::CompressionToString(rhs.GetCompressionType()) << "`"; + if (CompressionType.has_value() && rhs.HasCompressionType() && CompressionType.value() != rhs.GetCompressionTypeUnsafe()) { + errorMessage = TStringBuilder() << "different compression type: in left value `" + << NArrow::CompressionToString(CompressionType.value()) << "` and in right value `" + << NArrow::CompressionToString(rhs.GetCompressionTypeUnsafe()) << "`"; + return false; + } else if (CompressionType.has_value() && !rhs.HasCompressionType()) { + errorMessage = TStringBuilder() << "compression type is set in left value, but not set in right value"; + return false; + } else if (!CompressionType.has_value() && rhs.HasCompressionType()) { + errorMessage = TStringBuilder() << "compression type is not set in left value, but set in right value"; return false; } if (CompressionLevel.has_value() && rhs.GetCompressionLevel().has_value() && @@ -193,12 +202,15 @@ namespace NKqp { } bool TTestHelper::TColumnFamily::DeserializeFromProto(const NKikimrSchemeOp::TFamilyDescription& family) { - if (!family.HasId() || !family.HasName() || !family.HasColumnCodec()) { + if (!family.HasId() || !family.HasName()) { return false; } Id = family.GetId(); FamilyName = family.GetName(); - Compression = TTestHelper::TCompression().SetCompressionType(family.GetColumnCodec()); + Compression = TTestHelper::TCompression(); + if (family.HasColumnCodec()) { + Compression.SetCompressionType(family.GetColumnCodec()); + } if (family.HasColumnCodecLevel()) { Compression.SetCompressionLevel(family.GetColumnCodecLevel()); } @@ -290,9 +302,11 @@ namespace NKqp { TString TTestHelper::TColumnTableBase::BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const { auto str = TStringBuilder() << "ALTER OBJECT `" << Name << "` (TYPE " << GetObjectType() << ") SET"; str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerClassName() << "`,"; - auto codec = NArrow::CompressionFromProto(compression.GetCompressionType()); - Y_VERIFY(codec.has_value()); - str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`"; + if (compression.HasCompressionType()) { + auto codec = NArrow::CompressionFromProto(compression.GetCompressionTypeUnsafe()); + Y_VERIFY(codec.has_value()); + str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`"; + } if (compression.GetCompressionLevel().has_value()) { str << "`COMPRESSION.LEVEL`=" << compression.GetCompressionLevel().value(); } diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h index ba85f6686162..8d5bcf0ee518 100644 --- a/ydb/core/kqp/ut/common/columnshard.h +++ b/ydb/core/kqp/ut/common/columnshard.h @@ -20,7 +20,7 @@ class TTestHelper { public: class TCompression { YDB_ACCESSOR(TString, SerializerClassName, "ARROW_SERIALIZER"); - YDB_ACCESSOR_DEF(NKikimrSchemeOp::EColumnCodec, CompressionType); + YDB_OPT(NKikimrSchemeOp::EColumnCodec, CompressionType); YDB_ACCESSOR_DEF(std::optional, CompressionLevel); public: diff --git a/ydb/core/kqp/ut/olap/compression_ut.cpp b/ydb/core/kqp/ut/olap/compression_ut.cpp index b325324a1f3d..454a1ff00170 100644 --- a/ydb/core/kqp/ut/olap/compression_ut.cpp +++ b/ydb/core/kqp/ut/olap/compression_ut.cpp @@ -1,7 +1,47 @@ #include +#include +#include +#include + +#include namespace NKikimr::NKqp { +std::pair GetVolumes( + const TKikimrRunner& runner, const TString& tablePath, const std::vector columnNames) { + TString selectQuery = "SELECT * FROM `" + tablePath + "/.sys/primary_index_stats` WHERE Activity == 1"; + if (columnNames.size()) { + selectQuery += " AND EntityName IN ('" + JoinSeq("','", columnNames) + "')"; + } + auto tableClient = runner.GetTableClient(); + std::optional rawBytesPred; + std::optional bytesPred; + while (true) { + auto rows = ExecuteScanQuery(tableClient, selectQuery, false); + ui64 rawBytes = 0; + ui64 bytes = 0; + for (auto&& r : rows) { + for (auto&& c : r) { + if (c.first == "RawBytes") { + rawBytes += GetUint64(c.second); + } + if (c.first == "BlobRangeSize") { + bytes += GetUint64(c.second); + } + } + } + if (rawBytesPred && *rawBytesPred == rawBytes && bytesPred && *bytesPred == bytes) { + break; + } else { + rawBytesPred = rawBytes; + bytesPred = bytes; + Cerr << "Wait changes: " << bytes << "/" << rawBytes << Endl; + Sleep(TDuration::Seconds(5)); + } + } + return { rawBytesPred.value(), bytesPred.value() }; +} + Y_UNIT_TEST_SUITE(KqpOlapCompression) { Y_UNIT_TEST(DisabledAlterCompression) { TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false).SetEnableOlapCompression(false); @@ -63,5 +103,46 @@ Y_UNIT_TEST_SUITE(KqpOlapCompression) { testHelper.CreateTable(testTable); testHelper.SetCompression(testTable, "pk_int", compression, NYdb::EStatus::SCHEME_ERROR); } + + std::pair GetVolumesColumnWithCompression(const std::optional& CSConfig = {}) { + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); + NKikimrConfig::TAppConfig appConfig; + if (CSConfig.has_value()) { + *appConfig.MutableColumnShardConfig() = CSConfig.value(); + } + auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); + TTestHelper testHelper(settings); + Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize(); + TTestHelper::TCompression plainCompression = + TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + + TVector schema = { + TTestHelper::TColumnSchema().SetName("pk_int").SetType(NScheme::NTypeIds::Uint64).SetNullable(false) + }; + + TString tableName = "/Root/ColumnTableTest"; + TTestHelper::TColumnTable testTable; + testTable.SetName(tableName).SetPrimaryKey({ "pk_int" }).SetSharding({ "pk_int" }).SetSchema(schema); + testHelper.CreateTable(testTable); + + TVector dataBuilders; + dataBuilders.push_back( + NArrow::NConstruction::TSimpleArrayConstructor>::BuildNotNullable( + "pk_int", false)); + auto batch = NArrow::NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(100000); + testHelper.BulkUpsert(testTable, batch); + csController->WaitCompactions(TDuration::Seconds(10)); + return GetVolumes(testHelper.GetKikimr(), tableName, { "pk_int" }); + } + + Y_UNIT_TEST(DefaultCompressionViaCSConfig) { + auto [rawBytesPK1, bytesPK1] = GetVolumesColumnWithCompression(); // Default compression LZ4 + NKikimrConfig::TColumnShardConfig csConfig = NKikimrConfig::TColumnShardConfig(); + csConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD); + csConfig.SetDefaultCompressionLevel(1); + auto [rawBytesPK2, bytesPK2] = GetVolumesColumnWithCompression(csConfig); + AFL_VERIFY(rawBytesPK2 == rawBytesPK1)("pk1", rawBytesPK1)("pk2", rawBytesPK2); + AFL_VERIFY(bytesPK2 < bytesPK1 / 3)("pk1", bytesPK1)("pk2", bytesPK2); + } } } diff --git a/ydb/core/kqp/ut/olap/sys_view_ut.cpp b/ydb/core/kqp/ut/olap/sys_view_ut.cpp index 9d12b54efab6..0d9108b5d241 100644 --- a/ydb/core/kqp/ut/olap/sys_view_ut.cpp +++ b/ydb/core/kqp/ut/olap/sys_view_ut.cpp @@ -191,8 +191,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 rawBytesPK1; ui64 bytesPK1; auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); - auto settings = TKikimrSettings() - .SetWithSampleTables(false); + NKikimrConfig::TAppConfig appConfig; + auto* CSConfig = appConfig.MutableColumnShardConfig(); + CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); TKikimrRunner kikimr(settings); Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore"); @@ -259,8 +261,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 rawBytes1; ui64 bytes1; auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); - auto settings = TKikimrSettings() - .SetWithSampleTables(false); + NKikimrConfig::TAppConfig appConfig; + auto* CSConfig = appConfig.MutableColumnShardConfig(); + CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); TKikimrRunner kikimr(settings); Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTypedLocalHelper helper("Utf8", kikimr); @@ -298,7 +302,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 bytes1; auto csController = NYDBTest::TControllers::RegisterCSControllerGuard(); csController->SetSmallSizeDetector(Max()); - auto settings = TKikimrSettings().SetWithSampleTables(false); + NKikimrConfig::TAppConfig appConfig; + auto* CSConfig = appConfig.MutableColumnShardConfig(); + CSConfig->SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); + auto settings = TKikimrSettings().SetWithSampleTables(false).SetAppConfig(appConfig); TKikimrRunner kikimr(settings); Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTypedLocalHelper helper("Utf8", kikimr); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index f9cd4148955a..7243ed178b9e 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -9519,10 +9519,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); - TTestHelper::TCompression plainCompression = - TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); - TTestHelper::TColumnFamily defaultFamily = - TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression); + TTestHelper::TColumnFamily defaultFamily = TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default"); UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1); TTestHelper::TColumnFamily defaultFromScheme; @@ -9531,18 +9528,6 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TString errorMessage; UNIT_ASSERT_C(defaultFromScheme.IsEqual(defaultFamily, errorMessage), errorMessage); } - - auto columns = schema.GetColumns(); - for (ui32 i = 0; i < schema.ColumnsSize(); i++) { - auto column = columns[i]; - UNIT_ASSERT(column.HasSerializer()); - UNIT_ASSERT_EQUAL_C( - column.GetColumnFamilyId(), 0, TStringBuilder() << "family for column " << column.GetName() << " is not default"); - TTestHelper::TCompression compression; - UNIT_ASSERT(compression.DeserializeFromProto(schema.GetColumns(i).GetSerializer())); - TString errorMessage; - UNIT_ASSERT_C(compression.IsEqual(defaultFamily.GetCompression(), errorMessage), errorMessage); - } } // Field `Data` is not used in ColumnFamily for ColumnTable @@ -10314,11 +10299,10 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); TString tableName = "/Root/TableWithFamily"; - TTestHelper::TCompression plainCompression = - TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); + TTestHelper::TCompression lz4Compression = TTestHelper::TCompression().SetCompressionType(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4); TVector families = { - TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(plainCompression), + TTestHelper::TColumnFamily().SetId(1).SetFamilyName("family1").SetCompression(lz4Compression), }; { @@ -10341,7 +10325,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.CreateTable(testTable); } - families.push_back(TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default").SetCompression(plainCompression)); + families.push_back(TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default")); auto& runner = testHelper.GetKikimr(); auto runtime = runner.GetTestServer().GetRuntime(); TActorId sender = runtime->AllocateEdgeActor(); @@ -10686,6 +10670,70 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.CreateTable(testTable, EStatus::GENERIC_ERROR); } + Y_UNIT_TEST(CreateTableWithDefaultFamilyWithoutSettings) { + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/ColumnTableTest"; + auto session = testHelper.GetSession(); + auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` ( + Key Uint64 NOT NULL, + Value1 String, + Value2 Uint32, + PRIMARY KEY (Key), + FAMILY default ()) + WITH (STORE = COLUMN);)"; + auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& runner = testHelper.GetKikimr(); + auto runtime = runner.GetTestServer().GetRuntime(); + TActorId sender = runtime->AllocateEdgeActor(); + + auto describeResult = DescribeTable(&runner.GetTestServer(), sender, tableName); + auto schema = describeResult.GetPathDescription().GetColumnTableDescription().GetSchema(); + TTestHelper::TColumnFamily defaultFamily = TTestHelper::TColumnFamily().SetId(0).SetFamilyName("default"); + + UNIT_ASSERT_EQUAL(schema.ColumnFamiliesSize(), 1); + TTestHelper::TColumnFamily defaultFromScheme; + UNIT_ASSERT(defaultFromScheme.DeserializeFromProto(schema.GetColumnFamilies(0))); + { + TString errorMessage; + UNIT_ASSERT_C(defaultFromScheme.IsEqual(defaultFamily, errorMessage), errorMessage); + } + } + + Y_UNIT_TEST(CreateTableWithFamilyWithOnlyCompressionLevel) { + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/ColumnTableTest"; + auto session = testHelper.GetSession(); + auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` ( + Key Uint64 NOT NULL, + Value1 String, + Value2 Uint32, + PRIMARY KEY (Key), + FAMILY family1 ( + COMPRESSION_LEVEL = 2 + )) + WITH (STORE = COLUMN);)"; + auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + + Y_UNIT_TEST(CreateTableNonDefaultFamilyWithoutCompression) { + TTestHelper testHelper(TKikimrSettings().SetWithSampleTables(false)); + TString tableName = "/Root/ColumnTableTest"; + auto session = testHelper.GetSession(); + auto createQuery = TStringBuilder() << R"(CREATE TABLE `)" << tableName << R"(` ( + Key Uint64 NOT NULL, + Value1 String, + Value2 Uint32, + PRIMARY KEY (Key), + FAMILY family1 ( + )) + WITH (STORE = COLUMN);)"; + auto result = session.ExecuteSchemeQuery(createQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + Y_UNIT_TEST(DropColumnAndResetTtl) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -10765,7 +10813,6 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { testHelper.RebootTablets("/Root/ColumnTableTest"); } - } Y_UNIT_TEST_SUITE(KqpOlapTypes) { diff --git a/ydb/core/tx/schemeshard/olap/column_families/schema.cpp b/ydb/core/tx/schemeshard/olap/column_families/schema.cpp index 7667e984d532..957788be869a 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/column_families/schema.cpp @@ -119,23 +119,24 @@ bool TOlapColumnFamiliesDescription::Validate(const NKikimrSchemeOp::TColumnTabl } lastColumnFamilyId = familyProto.GetId(); - if (!familyProto.HasColumnCodec()) { - errors.AddError("missing column codec for column family '" + columnFamilyName + "'"); - return false; - } - - auto serializerProto = ConvertFamilyDescriptionToProtoSerializer(familyProto); - if (serializerProto.IsFail()) { - errors.AddError(serializerProto.GetErrorMessage()); - return false; - } - NArrow::NSerialization::TSerializerContainer serializer; - if (!serializer.DeserializeFromProto(serializerProto.GetResult())) { - errors.AddError(TStringBuilder() << "can't deserialize column family `" << columnFamilyName << "` from proto "); - return false; - } - if (!family->GetSerializerContainer().IsEqualTo(serializer)) { - errors.AddError(TStringBuilder() << "compression from column family '" << columnFamilyName << "` is not matching schema preset"); + if (familyProto.HasColumnCodec() && family->GetSerializerContainer().HasObject()) { + auto serializerProto = ConvertFamilyDescriptionToProtoSerializer(familyProto); + if (serializerProto.IsFail()) { + errors.AddError(serializerProto.GetErrorMessage()); + return false; + } + NArrow::NSerialization::TSerializerContainer serializer; + if (!serializer.DeserializeFromProto(serializerProto.GetResult())) { + errors.AddError(TStringBuilder() << "can't deserialize column family `" << columnFamilyName << "` from proto "); + return false; + } + if (!family->GetSerializerContainer().IsEqualTo(serializer)) { + errors.AddError(TStringBuilder() << "compression from column family '" << columnFamilyName << "` is not matching schema preset"); + return false; + } + } else if ((!familyProto.HasColumnCodec() && family->GetSerializerContainer().HasObject()) || + (familyProto.HasColumnCodec() && !family->GetSerializerContainer().HasObject())) { + errors.AddError(TStringBuilder() << "compression is not matching schema preset in column family `" << columnFamilyName << "`"); return false; } } diff --git a/ydb/core/tx/schemeshard/olap/column_families/update.cpp b/ydb/core/tx/schemeshard/olap/column_families/update.cpp index d365688ca46e..2234c3e0f36c 100644 --- a/ydb/core/tx/schemeshard/olap/column_families/update.cpp +++ b/ydb/core/tx/schemeshard/olap/column_families/update.cpp @@ -44,6 +44,9 @@ NKikimr::TConclusion ConvertFamilyDes NKikimr::TConclusion ConvertSerializerContainerToFamilyDescription( const NArrow::NSerialization::TSerializerContainer& serializer) { + if (!serializer.HasObject()) { + return NKikimr::TConclusionStatus::Fail("convert TSerializerContainer to TFamilyDescription: container doesn't have object"); + } NKikimrSchemeOp::TFamilyDescription result; if (serializer->GetClassName().empty()) { return NKikimr::TConclusionStatus::Fail("convert TSerializerContainer to TFamilyDescription: field `ClassName` is empty"); @@ -85,55 +88,66 @@ bool TOlapColumnFamlilyAdd::ParseFromRequest(const NKikimrSchemeOp::TFamilyDescr } Name = columnFamily.GetName(); - auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); - if (serializer.IsFail()) { - errors.AddError(serializer.GetErrorMessage()); - return false; - } - auto resultBuild = NArrow::NSerialization::TSerializerContainer::BuildFromProto(serializer.GetResult()); - if (resultBuild.IsFail()) { - errors.AddError(resultBuild.GetErrorMessage()); - return false; + if (columnFamily.HasColumnCodec()) { + auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); + if (serializer.IsFail()) { + errors.AddError(serializer.GetErrorMessage()); + return false; + } + auto resultBuild = NArrow::NSerialization::TSerializerContainer::BuildFromProto(serializer.GetResult()); + if (resultBuild.IsFail()) { + errors.AddError(resultBuild.GetErrorMessage()); + return false; + } + SerializerContainer = resultBuild.GetResult(); } - SerializerContainer = resultBuild.GetResult(); return true; } void TOlapColumnFamlilyAdd::ParseFromLocalDB(const NKikimrSchemeOp::TFamilyDescription& columnFamily) { Name = columnFamily.GetName(); - auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); - Y_VERIFY_S(serializer.IsSuccess(), serializer.GetErrorMessage()); - Y_VERIFY(SerializerContainer.DeserializeFromProto(serializer.GetResult())); + if (columnFamily.HasColumnCodec()) { + auto serializer = ConvertFamilyDescriptionToProtoSerializer(columnFamily); + Y_VERIFY_S(serializer.IsSuccess(), serializer.GetErrorMessage()); + SerializerContainer = NArrow::NSerialization::TSerializerContainer(); + Y_VERIFY(SerializerContainer.DeserializeFromProto(serializer.GetResult())); + } } void TOlapColumnFamlilyAdd::Serialize(NKikimrSchemeOp::TFamilyDescription& columnFamily) const { - auto result = ConvertSerializerContainerToFamilyDescription(SerializerContainer); - Y_VERIFY_S(result.IsSuccess(), result.GetErrorMessage()); columnFamily.SetName(Name); - columnFamily.SetColumnCodec(result->GetColumnCodec()); - if (result->HasColumnCodecLevel()) { - columnFamily.SetColumnCodecLevel(result->GetColumnCodecLevel()); + if (SerializerContainer.HasObject()) { + auto result = ConvertSerializerContainerToFamilyDescription(SerializerContainer); + Y_VERIFY_S(result.IsSuccess(), result.GetErrorMessage()); + columnFamily.SetColumnCodec(result->GetColumnCodec()); + if (result->HasColumnCodecLevel()) { + columnFamily.SetColumnCodecLevel(result->GetColumnCodecLevel()); + } } } bool TOlapColumnFamlilyAdd::ApplyDiff(const TOlapColumnFamlilyDiff& diffColumnFamily, IErrorCollector& errors) { Y_ABORT_UNLESS(GetName() == diffColumnFamily.GetName()); - auto newColumnFamily = ConvertSerializerContainerToFamilyDescription(SerializerContainer); - if (newColumnFamily.IsFail()) { - errors.AddError(newColumnFamily.GetErrorMessage()); - return false; + NKikimrSchemeOp::TFamilyDescription newColumnFamily; + if (SerializerContainer.HasObject()) { + auto resultConvert = ConvertSerializerContainerToFamilyDescription(SerializerContainer); + if (resultConvert.IsFail()) { + errors.AddError(resultConvert.GetErrorMessage()); + return false; + } + newColumnFamily = resultConvert.GetResult(); } - newColumnFamily->SetName(GetName()); + newColumnFamily.SetName(GetName()); auto codec = diffColumnFamily.GetCodec(); if (codec.has_value()) { - newColumnFamily->SetColumnCodec(codec.value()); - newColumnFamily->ClearColumnCodecLevel(); + newColumnFamily.SetColumnCodec(codec.value()); + newColumnFamily.ClearColumnCodecLevel(); } auto codecLevel = diffColumnFamily.GetCodecLevel(); if (codecLevel.has_value()) { - newColumnFamily->SetColumnCodecLevel(codecLevel.value()); + newColumnFamily.SetColumnCodecLevel(codecLevel.value()); } - auto serializer = ConvertFamilyDescriptionToProtoSerializer(newColumnFamily.GetResult()); + auto serializer = ConvertFamilyDescriptionToProtoSerializer(newColumnFamily); if (serializer.IsFail()) { errors.AddError(serializer.GetErrorMessage()); return false; diff --git a/ydb/core/tx/schemeshard/olap/columns/schema.cpp b/ydb/core/tx/schemeshard/olap/columns/schema.cpp index 950ba879b133..f533a3476bf9 100644 --- a/ydb/core/tx/schemeshard/olap/columns/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/columns/schema.cpp @@ -56,7 +56,7 @@ bool TOlapColumnsDescription::ApplyUpdate( if (newColumn.GetKeyOrder()) { Y_ABORT_UNLESS(orderedKeyColumnIds.emplace(*newColumn.GetKeyOrder(), newColumn.GetId()).second); } - if (!newColumn.GetSerializer().has_value() && !columnFamilies.GetColumnFamilies().empty() && + if (!newColumn.GetSerializer().HasObject() && !columnFamilies.GetColumnFamilies().empty() && !newColumn.ApplySerializerFromColumnFamily(columnFamilies, errors)) { return false; } diff --git a/ydb/core/tx/schemeshard/olap/columns/update.cpp b/ydb/core/tx/schemeshard/olap/columns/update.cpp index 30ffe4e3faf3..dff1c82bb8c2 100644 --- a/ydb/core/tx/schemeshard/olap/columns/update.cpp +++ b/ydb/core/tx/schemeshard/olap/columns/update.cpp @@ -180,7 +180,7 @@ void TOlapColumnBase::Serialize(NKikimrSchemeOp::TOlapColumnDescription& columnS columnSchema.SetColumnFamilyId(ColumnFamilyId.value()); } if (Serializer) { - Serializer->SerializeToProto(*columnSchema.MutableSerializer()); + Serializer.SerializeToProto(*columnSchema.MutableSerializer()); } if (AccessorConstructor) { *columnSchema.MutableDataAccessorConstructor() = AccessorConstructor.SerializeToProto(); diff --git a/ydb/core/tx/schemeshard/olap/columns/update.h b/ydb/core/tx/schemeshard/olap/columns/update.h index 4f87cf014d37..5670ac5f23e9 100644 --- a/ydb/core/tx/schemeshard/olap/columns/update.h +++ b/ydb/core/tx/schemeshard/olap/columns/update.h @@ -35,7 +35,7 @@ class TOlapColumnBase { YDB_READONLY_DEF(NScheme::TTypeInfo, Type); YDB_READONLY_DEF(TString, StorageId); YDB_FLAG_ACCESSOR(NotNull, false); - YDB_ACCESSOR_DEF(std::optional, Serializer); + YDB_ACCESSOR_DEF(NArrow::NSerialization::TSerializerContainer, Serializer); YDB_READONLY_DEF(std::optional, DictionaryEncoding); YDB_READONLY_DEF(NOlap::TColumnDefaultScalarValue, DefaultValue); YDB_READONLY_DEF(NArrow::NAccessor::TConstructorContainer, AccessorConstructor); diff --git a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp index a407a991689c..00e434421e09 100644 --- a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp @@ -571,7 +571,6 @@ class TCreateColumnTable: public TSubOperation { auto defaultFamily = mutableSchema->AddColumnFamilies(); defaultFamily->SetName("default"); defaultFamily->SetId(0); - defaultFamily->SetColumnCodec(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain); for (ui32 i = 0; i < schema.ColumnsSize(); i++) { if (!schema.GetColumns(i).HasColumnFamilyName() || !schema.GetColumns(i).HasColumnFamilyId()) {