Skip to content

Commit

Permalink
implemented without test
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-gogov committed Dec 20, 2024
1 parent 6ea7dea commit 74572a2
Show file tree
Hide file tree
Showing 20 changed files with 370 additions and 87 deletions.
54 changes: 54 additions & 0 deletions ydb/core/config/validation/column_shard_config_validator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include "validators.h"

#include <ydb/core/formats/arrow/serializer/parsing.h>
#include <ydb/core/formats/arrow/serializer/utils.h>
#include <ydb/core/protos/config.pb.h>

#include <util/generic/string.h>
#include <util/string/builder.h>

#include <vector>

namespace NKikimr::NConfig {
namespace {

EValidationResult ValidateDefaultCompression(const NKikimrConfig::TColumnShardConfig& columnShardConfig, std::vector<TString>& msg) {
if (!columnShardConfig.HasDefaultCompression() && !columnShardConfig.HasDefaultCompression()) {
return EValidationResult::Ok;
}
std::optional<arrow::Compression::type> codec = NArrow::CompressionFromProto(columnShardConfig.GetDefaultCompression());
if (!codec.has_value()) {
msg.push_back("ColumnShardConfig: Unknown compression");
return EValidationResult::Error;
}
if (columnShardConfig.HasDefaultCompressionLevel()) {
if (!NArrow::SupportsCompressionLevel(codec.value())) {
TString messageErr = TStringBuilder() << "ColumnShardConfig: compression `" << NArrow::CompressionToString(codec.value())
<< "` is not support compression level";
msg.push_back(messageErr);
return EValidationResult::Error;
} else if (!NArrow::SupportsCompressionLevel(codec.value(), columnShardConfig.GetDefaultCompressionLevel())) {
TString messageErr = TStringBuilder()
<< "ColumnShardConfig: compression `" << NArrow::CompressionToString(codec.value())
<< "` is not support compression level = " << std::to_string(columnShardConfig.GetDefaultCompressionLevel());
msg.push_back(messageErr);
return EValidationResult::Error;
}
}
return EValidationResult::Ok;
}

} // namespace

EValidationResult ValidateColumnShardConfig(const NKikimrConfig::TColumnShardConfig& columnShardConfig, std::vector<TString>& msg) {
EValidationResult validatePasswordComplexityResult = ValidateDefaultCompression(columnShardConfig, msg);
if (validatePasswordComplexityResult == EValidationResult::Error) {
return EValidationResult::Error;
}
if (msg.size() > 0) {
return EValidationResult::Warn;
}
return EValidationResult::Ok;
}

} // namespace NKikimr::NConfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#include <ydb/core/config/validation/validators.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <library/cpp/testing/unittest/registar.h>

#include <vector>

using namespace NKikimr::NConfig;

Y_UNIT_TEST_SUITE(ColumnShardConfigValidation) {
Y_UNIT_TEST(AcceptDefaultCompression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
UNIT_ASSERT_C(error.empty(), "Should not be errors");
}

Y_UNIT_TEST(CorrectPlainCompression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
UNIT_ASSERT_C(error.empty(), "Should not be errors");
}

Y_UNIT_TEST(NotCorrectPlainCompression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain);
CSConfig.SetDefaultCompressionLevel(1);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Error);
UNIT_ASSERT_VALUES_EQUAL(error.size(), 1);
UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression `uncompressed` is not support compression level");
}

Y_UNIT_TEST(CorrectLZ4Compression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
UNIT_ASSERT_C(error.empty(), "Should not be errors");
}

Y_UNIT_TEST(NotCorrectLZ4Compression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4);
CSConfig.SetDefaultCompressionLevel(1);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Error);
UNIT_ASSERT_VALUES_EQUAL(error.size(), 1);
UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression `lz4` is not support compression level");
}

Y_UNIT_TEST(CorrectZSTDCompression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
UNIT_ASSERT_C(error.empty(), "Should not be errors");
CSConfig.SetDefaultCompressionLevel(0);
result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
UNIT_ASSERT_C(error.empty(), "Should not be errors");
CSConfig.SetDefaultCompressionLevel(-100);
result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Ok);
UNIT_ASSERT_C(error.empty(), "Should not be errors");
}

Y_UNIT_TEST(NotCorrectZSTDCompression) {
NKikimrConfig::TColumnShardConfig CSConfig;
std::vector<TString> error;
CSConfig.SetDefaultCompression(NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD);
CSConfig.SetDefaultCompressionLevel(100);
EValidationResult result = ValidateColumnShardConfig(CSConfig, error);
UNIT_ASSERT_EQUAL(result, EValidationResult::Error);
UNIT_ASSERT_VALUES_EQUAL(error.size(), 1);
UNIT_ASSERT_STRINGS_EQUAL(error.front(), "ColumnShardConfig: compression `zstd` is not support compression level = 100");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
UNITTEST_FOR(ydb/core/config/validation)

SRC(
column_shard_config_validator_ut.cpp
)

YQL_LAST_ABI_VERSION()

END()
6 changes: 6 additions & 0 deletions ydb/core/config/validation/validators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ EValidationResult ValidateConfig(const NKikimrConfig::TAppConfig& config, std::v
return EValidationResult::Error;
}
}
if (config.HasColumnShardConfig()) {
NKikimr::NConfig::EValidationResult result = NKikimr::NConfig::ValidateColumnShardConfig(config.GetColumnShardConfig(), msg);
if (result == NKikimr::NConfig::EValidationResult::Error) {
return EValidationResult::Error;
}
}
if (msg.size() > 0) {
return EValidationResult::Warn;
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/config/validation/validators.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ EValidationResult ValidateAuthConfig(
const NKikimrProto::TAuthConfig& authConfig,
std::vector<TString>& msg);

EValidationResult ValidateColumnShardConfig(const NKikimrConfig::TColumnShardConfig& columnShardConfig, std::vector<TString>& msg);

EValidationResult ValidateConfig(
const NKikimrConfig::TAppConfig& config,
std::vector<TString>& msg);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/config/validation/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ SRCS(
validators.h
validators.cpp
auth_config_validator.cpp
column_shard_config_validator.cpp
)

PEERDIR(
ydb/core/protos
ydb/core/formats/arrow/serializer
)

END()

RECURSE_FOR_TESTS(
ut
auth_config_validator_ut
column_shard_config_validator_ut
)
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/serializer/native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ NKikimr::TConclusionStatus TNativeSerializer::DoDeserializeFromRequest(NYql::TFe
level = levelLocal;
}
}
auto codecPtrStatus = BuildCodec(codec.value_or(Options.codec->compression_type()), level);
auto codecPtrStatus = BuildCodec(codec.value_or(GetDefaultCompressionType()), level);
if (!codecPtrStatus) {
return codecPtrStatus.GetError();
}
Expand Down
18 changes: 17 additions & 1 deletion ydb/core/formats/arrow/serializer/native.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#pragma once

#include "abstract.h"
#include "parsing.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <ydb/library/accessor/accessor.h>
Expand Down Expand Up @@ -50,10 +53,23 @@ class TNativeSerializer: public ISerializer {

virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) override;

static arrow::Compression::type GetDefaultCompressionType() {
return CompressionFromProto(AppData()->ColumnShardConfig.GetDefaultCompression()).value();
}

static std::shared_ptr<arrow::util::Codec> GetDefaultCodec() {
arrow::Compression::type codec = GetDefaultCompressionType();
if (codec == arrow::Compression::type::ZSTD) {
i32 codecLevel = AppData()->ColumnShardConfig.GetDefaultCompressionLevel();
return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec, codecLevel));
}
return NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(codec));
}

static arrow::ipc::IpcOptions BuildDefaultOptions() {
arrow::ipc::IpcWriteOptions options;
options.use_threads = false;
options.codec = *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME);
options.codec = GetDefaultCodec();
return options;
}

Expand Down
16 changes: 16 additions & 0 deletions ydb/core/formats/arrow/serializer/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,26 @@ bool SupportsCompressionLevel(const arrow::Compression::type compression) {
return arrow::util::Codec::SupportsCompressionLevel(compression);
}

bool SupportsCompressionLevel(const arrow::Compression::type compression, const i32 compressionLevel) {
if (!SupportsCompressionLevel(compression)) {
return false;
}
int minLevel = MinimumCompressionLevel(compression).value();
int maxLevel = MaximumCompressionLevel(compression).value();
if (compressionLevel < minLevel || compressionLevel > maxLevel) {
return false;
}
return true;
}

bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression) {
return SupportsCompressionLevel(CompressionFromProto(compression).value());
}

bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression, const i32 compressionLevel) {
return SupportsCompressionLevel(CompressionFromProto(compression).value(), compressionLevel);
}

std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression) {
if (!SupportsCompressionLevel(compression)) {
return {};
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/formats/arrow/serializer/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

namespace NKikimr::NArrow {
bool SupportsCompressionLevel(const arrow::Compression::type compression);
bool SupportsCompressionLevel(const arrow::Compression::type compression, const i32 compressionLevel);
bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression);
bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression, const i32 compressionLevel);

std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression);
std::optional<int> MaximumCompressionLevel(const arrow::Compression::type compression);
Expand Down
13 changes: 10 additions & 3 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,19 @@ bool FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T&
}
familyDescription->SetColumnCodec(codec);
} else {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family'" << family.Name << "'";
return false;
if (family.Name != "default") {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for non `default` column family '" << family.Name << "'";
return false;
}
}

if (family.CompressionLevel.Defined()) {
if (!family.Compression.Defined()) {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family '" << family.Name << "', but compression level is set";
return false;
}
familyDescription->SetColumnCodecLevel(family.CompressionLevel.GetRef());
}
}
Expand Down
32 changes: 23 additions & 9 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ namespace NKqp {

TString TTestHelper::TCompression::BuildQuery() const {
TStringBuilder str;
str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType) << "\"";
if (CompressionType.has_value()) {
str << "COMPRESSION=\"" << NArrow::CompressionToString(CompressionType.value()) << "\"";
}
if (CompressionLevel.has_value()) {
str << ", COMPRESSION_LEVEL=" << CompressionLevel.value();
}
Expand All @@ -167,9 +169,16 @@ namespace NKqp {
<< "` and in right value `" << rhs.GetSerializerClassName() << "`";
return false;
}
if (CompressionType != rhs.GetCompressionType()) {
errorMessage = TStringBuilder() << "different compression type: in left value `" << NArrow::CompressionToString(CompressionType)
<< "` and in right value `" << NArrow::CompressionToString(rhs.GetCompressionType()) << "`";
if (CompressionType.has_value() && rhs.GetCompressionType().has_value() && CompressionType.value() != rhs.GetCompressionType().value()) {
errorMessage = TStringBuilder() << "different compression type: in left value `"
<< NArrow::CompressionToString(CompressionType.value()) << "` and in right value `"
<< NArrow::CompressionToString(rhs.GetCompressionType().value()) << "`";
return false;
} else if (CompressionType.has_value() && !rhs.GetCompressionType().has_value()) {
errorMessage = TStringBuilder() << "compression type is set in left value, but not set in right value";
return false;
} else if (!CompressionType.has_value() && rhs.GetCompressionType().has_value()) {
errorMessage = TStringBuilder() << "compression type is not set in left value, but set in right value";
return false;
}
if (CompressionLevel.has_value() && rhs.GetCompressionLevel().has_value() &&
Expand All @@ -193,12 +202,15 @@ namespace NKqp {
}

bool TTestHelper::TColumnFamily::DeserializeFromProto(const NKikimrSchemeOp::TFamilyDescription& family) {
if (!family.HasId() || !family.HasName() || !family.HasColumnCodec()) {
if (!family.HasId() || !family.HasName()) {
return false;
}
Id = family.GetId();
FamilyName = family.GetName();
Compression = TTestHelper::TCompression().SetCompressionType(family.GetColumnCodec());
Compression = TTestHelper::TCompression();
if (family.HasColumnCodec()) {
Compression.SetCompressionType(family.GetColumnCodec());
}
if (family.HasColumnCodecLevel()) {
Compression.SetCompressionLevel(family.GetColumnCodecLevel());
}
Expand Down Expand Up @@ -290,9 +302,11 @@ namespace NKqp {
TString TTestHelper::TColumnTableBase::BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const {
auto str = TStringBuilder() << "ALTER OBJECT `" << Name << "` (TYPE " << GetObjectType() << ") SET";
str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerClassName() << "`,";
auto codec = NArrow::CompressionFromProto(compression.GetCompressionType());
Y_VERIFY(codec.has_value());
str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`";
if (compression.GetCompressionType().has_value()) {
auto codec = NArrow::CompressionFromProto(compression.GetCompressionType().value());
Y_VERIFY(codec.has_value());
str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(codec.value()) << "`";
}
if (compression.GetCompressionLevel().has_value()) {
str << "`COMPRESSION.LEVEL`=" << compression.GetCompressionLevel().value();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TTestHelper {
public:
class TCompression {
YDB_ACCESSOR(TString, SerializerClassName, "ARROW_SERIALIZER");
YDB_ACCESSOR_DEF(NKikimrSchemeOp::EColumnCodec, CompressionType);
YDB_ACCESSOR_DEF(std::optional<NKikimrSchemeOp::EColumnCodec>, CompressionType);
YDB_ACCESSOR_DEF(std::optional<i32>, CompressionLevel);

public:
Expand Down
Loading

0 comments on commit 74572a2

Please sign in to comment.