Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tiering fixes #13839

Merged
merged 3 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions ydb/core/kqp/ut/olap/helpers/get_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ void PrintValue(IOutputStream& out, const NYdb::TValue& v) {
out << value.GetUint32();
break;
}
case NYdb::EPrimitiveType::Int32:
{
out << value.GetInt32();
break;
}
case NYdb::EPrimitiveType::Uint64:
{
out << value.GetUint64();
Expand Down Expand Up @@ -72,6 +77,15 @@ ui64 GetUint32(const NYdb::TValue& v) {
}
}

i64 GetInt32(const NYdb::TValue& v) {
NYdb::TValueParser value(v);
if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
return *value.GetOptionalInt32();
} else {
return value.GetInt32();
}
}

ui64 GetUint64(const NYdb::TValue& v) {
NYdb::TValueParser value(v);
if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/olap/helpers/get_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ void PrintRow(IOutputStream& out, const THashMap<TString, NYdb::TValue>& fields)
void PrintRows(IOutputStream& out, const TVector<THashMap<TString, NYdb::TValue>>& rows);

ui64 GetUint32(const NYdb::TValue& v);
i64 GetInt32(const NYdb::TValue& v);
ui64 GetUint64(const NYdb::TValue& v);
TString GetUtf8(const NYdb::TValue& v);
TInstant GetTimestamp(const NYdb::TValue& v);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/ut/olap/helpers/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

namespace NKikimr::NKqp {

void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool withSomeNulls /*= false*/) {
void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool withSomeNulls /*= false*/,
ui64 tsStepUs /*= 1*/) {
UNIT_ASSERT(testTable != "/Root/benchTable"); // TODO: check schema instead
TLocalHelper lHelper(kikimr);
if (withSomeNulls) {
lHelper.WithSomeNulls();
}
auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount);
auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount, tsStepUs);
lHelper.SendDataViaActorSystem(testTable, batch);
}

}
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/olap/helpers/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@

namespace NKikimr::NKqp {

void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool withSomeNulls = false);

void WriteTestData(
TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool withSomeNulls = false, ui64 tsStepUs = 1);
}
137 changes: 137 additions & 0 deletions ydb/core/kqp/ut/olap/tiering_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,143 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) {

testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
}

Y_UNIT_TEST(DeletedTier) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetSkipSpecialCheckForEvict(true);

TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
TTestHelper testHelper(runnerSettings);
TLocalHelper localHelper(testHelper.GetKikimr());
testHelper.GetRuntime().SetLogPriority(NKikimrServices::TX_TIERING, NActors::NLog::PRI_DEBUG);
NYdb::NTable::TTableClient tableClient = testHelper.GetKikimr().GetTableClient();
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->SetSecretKey("fakeSecret");

localHelper.CreateTestOlapTable();
testHelper.CreateTier("tier1");

for (ui64 i = 0; i < 100; ++i) {
WriteTestData(testHelper.GetKikimr(), "/Root/olapStore/olapTable", 0, 3600000000 + i * 1000, 1000);
WriteTestData(testHelper.GetKikimr(), "/Root/olapStore/olapTable", 0, 3600000000 + i * 1000, 1000);
}

testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
csController->WaitCompactions(TDuration::Seconds(5));
csController->WaitActualization(TDuration::Seconds(5));

csController->DisableBackground(NYDBTest::ICSController::EBackground::TTL);
testHelper.ResetTiering("/Root/olapStore/olapTable");
testHelper.RebootTablets("/Root/olapStore/olapTable");

{
auto selectQuery = TString(R"(
SELECT
TierName, SUM(ColumnRawBytes) As RawBytes
FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats`
WHERE Activity == 1
GROUP BY TierName
)");

auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "/Root/tier1");
}

ui64 maxLevelValue;
{
auto selectQuery = TString(R"(SELECT MAX(level) AS level FROM `/Root/olapStore/olapTable`)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
maxLevelValue = GetInt32(rows[0].at("level"));
}

{
auto result = testHelper.GetSession().ExecuteSchemeQuery(R"(DROP EXTERNAL DATA SOURCE `/Root/tier1`)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
testHelper.RebootTablets("/Root/olapStore/olapTable");

{
auto selectQuery = TString(R"(SELECT MAX(level) FROM `/Root/olapStore/olapTable`)");
auto it = tableClient.StreamExecuteScanQuery(selectQuery, NYdb::NTable::TStreamExecScanQuerySettings()).GetValueSync();
auto streamPart = it.ReadNext().GetValueSync();
UNIT_ASSERT(!streamPart.IsSuccess());
UNIT_ASSERT_STRING_CONTAINS(streamPart.GetIssues().ToString(), "cannot read blob range");
}

testHelper.CreateTier("tier1");
testHelper.RebootTablets("/Root/olapStore/olapTable");

{
auto selectQuery = TString(R"(SELECT MAX(level) AS level FROM `/Root/olapStore/olapTable`)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(GetInt32(rows[0].at("level")), maxLevelValue);
}
}

Y_UNIT_TEST(TtlBorders) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();

TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
TTestHelper testHelper(runnerSettings);
TLocalHelper localHelper(testHelper.GetKikimr());
testHelper.GetRuntime().SetLogPriority(NKikimrServices::TX_TIERING, NActors::NLog::PRI_DEBUG);
NYdb::NTable::TTableClient tableClient = testHelper.GetKikimr().GetTableClient();
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->SetSecretKey("fakeSecret");

localHelper.CreateTestOlapTable("olapTable", "olapStore", 1, 1);

{
const TDuration tsInterval = TDuration::Days(3650);
const ui64 rows = 10000;
WriteTestData(testHelper.GetKikimr(), "/Root/olapStore/olapTable", 0, (TInstant::Now() - tsInterval).MicroSeconds(), rows,
false, tsInterval.MicroSeconds() / rows);
}

{
auto selectQuery = TString(R"(
SELECT MAX(timestamp) AS timestamp FROM `/Root/olapStore/olapTable`
)");

auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_GT(GetTimestamp(rows[0].at("timestamp")), TInstant::Now() - TDuration::Days(100));
}

{
auto selectQuery = TString(R"(
SELECT COUNT(*) AS count FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats`
)");

auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("count")), 1);
}

{
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P300D") ON timestamp)";
auto result = testHelper.GetSession().ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

csController->WaitCompactions(TDuration::Seconds(5));
csController->WaitActualization(TDuration::Seconds(5));

{
auto selectQuery = TString(R"(
SELECT COUNT(*) AS count FROM `/Root/olapStore/olapTable`
)");

auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_GT(GetUint64(rows[0].at("count")), 0);
}
}
}

} // namespace NKikimr::NKqp
6 changes: 3 additions & 3 deletions ydb/core/testlib/cs_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() const {
return std::make_shared<arrow::Schema>(std::move(fields));
}

std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs) const {
std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui64 tsStepUs) const {
std::shared_ptr<arrow::Schema> schema = GetArrowSchema();

arrow::TimestampBuilder b1(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool());
Expand Down Expand Up @@ -368,7 +368,7 @@ std::shared_ptr<arrow::Schema> TCickBenchHelper::GetArrowSchema() const {
});
}

std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 begin, size_t rowCount, const ui32 tsStepUs) const {
std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 begin, size_t rowCount, const ui64 tsStepUs) const {
std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
UNIT_ASSERT(schema);
UNIT_ASSERT(schema->num_fields());
Expand Down Expand Up @@ -441,7 +441,7 @@ std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch() cons
return TestArrowBatch(0, 0, 10, 1);
}

std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch(ui64, ui64, size_t rowCount, const ui32 /*tsStepUs*/) const {
std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch(ui64, ui64, size_t rowCount, const ui64 /*tsStepUs*/) const {
rowCount = 10;
std::shared_ptr<arrow::Schema> schema = GetArrowSchema();

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/testlib/cs_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class THelperSchemaless : public NCommon::THelper {
void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const;
void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch, const Ydb::StatusIds_StatusCode& expectedStatus = Ydb::StatusIds::SUCCESS) const;

virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const = 0;
virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui64 tsStepUs = 1) const = 0;
};

class THelper: public THelperSchemaless {
Expand Down Expand Up @@ -68,7 +68,7 @@ class THelper: public THelperSchemaless {
}
virtual TString GetTestTableSchema() const;

virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const override;
virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui64 tsStepUs = 1) const override;
};

class TCickBenchHelper: public THelperSchemaless {
Expand Down Expand Up @@ -189,7 +189,7 @@ class TCickBenchHelper: public THelperSchemaless {
KeyColumnNames: ["EventTime", "EventDate", "CounterID", "UserID", "WatchID"]
)";

std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64 begin, size_t rowCount, const ui32 tsStepUs = 1) const override;
std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64 begin, size_t rowCount, const ui64 tsStepUs = 1) const override;
};

class TTableWithNullsHelper: public THelperSchemaless {
Expand All @@ -210,7 +210,7 @@ class TTableWithNullsHelper: public THelperSchemaless {
KeyColumnNames: "id"
)";

std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64, size_t rowCount = 10, const ui32 tsStepUs = 1) const override;
std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64, size_t rowCount = 10, const ui64 tsStepUs = 1) const override;
std::shared_ptr<arrow::RecordBatch> TestArrowBatch() const;
};

Expand Down
41 changes: 23 additions & 18 deletions ydb/core/tx/columnshard/blobs_action/tier/storage.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include "storage.h"
#include "adapter.h"
#include "remove.h"
#include "write.h"
#include "read.h"
#include "gc.h"
#include "gc_actor.h"
#include "read.h"
#include "remove.h"
#include "storage.h"
#include "write.h"

#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/tiering/manager.h>
#include <ydb/core/wrappers/unavailable_storage.h>

namespace NKikimr::NOlap::NBlobOperations::NTier {

Expand Down Expand Up @@ -54,23 +56,26 @@ void TOperator::DoStartGCAction(const std::shared_ptr<IBlobsGCAction>& action) c
}

void TOperator::InitNewExternalOperator(const NColumnShard::NTiers::TManager* tierManager) {
if (!tierManager || !tierManager->IsReady()) {
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
CurrentS3Settings.reset();
ExternalStorageOperator = nullptr;
return;
}
NWrappers::NExternalStorage::IExternalStorageOperator::TPtr extStorageOperator;
std::optional<NKikimrSchemeOp::TS3Settings> settings;

NKikimrSchemeOp::TS3Settings settings = tierManager->GetS3Settings();
{
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
if (CurrentS3Settings && CurrentS3Settings->SerializeAsString() == settings.SerializeAsString()) {
return;
if (tierManager && tierManager->IsReady()) {
settings = tierManager->GetS3Settings();
{
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
if (CurrentS3Settings && CurrentS3Settings->SerializeAsString() == settings->SerializeAsString()) {
return;
}
}
auto extStorageConfig = NWrappers::NExternalStorage::IExternalStorageConfig::Construct(*settings);
AFL_VERIFY(extStorageConfig);
extStorageOperator = extStorageConfig->ConstructStorageOperator(false);
} else {
extStorageOperator = std::make_shared<NWrappers::NExternalStorage::TUnavailableExternalStorageOperator>(
NWrappers::NExternalStorage::TUnavailableExternalStorageOperator(
"tier_unavailable", TStringBuilder() << "Tier is not configured: " << GetStorageId()));
}
auto extStorageConfig = NWrappers::NExternalStorage::IExternalStorageConfig::Construct(settings);
AFL_VERIFY(extStorageConfig);
auto extStorageOperator = extStorageConfig->ConstructStorageOperator(false);

extStorageOperator->InitReplyAdapter(std::make_shared<NOlap::NBlobOperations::NTier::TRepliesAdapter>(GetStorageId()));
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
CurrentS3Settings = settings;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx) {
}

void TColumnShard::TrySwitchToWork(const TActorContext& ctx) {
if (!Tiers->AreConfigsComplete()) {
if (Tiers->GetAwaitedConfigsCount()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "skip_switch_to_work")("reason", "tiering_metadata_not_ready");
return;
}
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ std::shared_ptr<ITxReader> TTxInit::BuildReader() {
result->AddChildren(std::make_shared<NLoading::TBackgroundSessionsInitializer>("bg_sessions", Self));
result->AddChildren(std::make_shared<NLoading::TSharingSessionsInitializer>("sharing_sessions", Self));
result->AddChildren(std::make_shared<NLoading::TInFlightReadsInitializer>("in_flight_reads", Self));
result->AddChildren(std::make_shared<NLoading::TTiersManagerInitializer>("tiers_manager", Self));
return result;
}

Expand Down Expand Up @@ -106,11 +107,6 @@ void TTxInit::Complete(const TActorContext& ctx) {
Self->Counters.GetCSCounters().Initialization.OnTxInitFinished(TMonotonic::Now() - StartInstant);
AFL_VERIFY(!Self->IsTxInitFinished);
Self->IsTxInitFinished = true;

for (const auto& [pathId, tiering] : Self->TablesManager.GetTtl()) {
Self->Tiers->EnablePathId(pathId, tiering.GetUsedTiers());
}

Self->TrySwitchToWork(ctx);
}

Expand Down
7 changes: 1 addition & 6 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1594,12 +1594,7 @@ void TColumnShard::Handle(NOlap::NBlobOperations::NEvents::TEvDeleteSharedBlobs:

void TColumnShard::ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& usedTiers) {
AFL_VERIFY(Tiers);
if (!usedTiers.empty()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "activate_tiering")("path_id", pathId)("tier_count", usedTiers.size());
Tiers->EnablePathId(pathId, usedTiers);
} else {
Tiers->DisablePathId(pathId);
}
Tiers->ActivateTiers(usedTiers);
OnTieringModified(pathId);
}

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class TSharingSessionsInitializer;
class TInFlightReadsInitializer;
class TSpecialValuesInitializer;
class TTablesManagerInitializer;
class TTiersManagerInitializer;
} // namespace NLoading

extern bool gAllowLogBatchingDefaultValue;
Expand Down Expand Up @@ -233,6 +234,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
friend class NLoading::TInFlightReadsInitializer;
friend class NLoading::TSpecialValuesInitializer;
friend class NLoading::TTablesManagerInitializer;
friend class NLoading::TTiersManagerInitializer;
friend class TWriteTasksQueue;
friend class TWriteTask;

Expand Down Expand Up @@ -330,7 +332,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
putStatus.OnYellowChannels(Executor());
}

void ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& usedTiers);
void ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& tiers);
void OnTieringModified(const std::optional<ui64> pathId = {});

std::shared_ptr<TAtomicCounter> TabletActivityImpl = std::make_shared<TAtomicCounter>(0);
Expand Down
Loading
Loading