Skip to content

Commit

Permalink
allow missing tiers in schema
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 committed Jan 23, 2025
1 parent ce2c5e5 commit 63faf17
Show file tree
Hide file tree
Showing 20 changed files with 358 additions and 184 deletions.
14 changes: 14 additions & 0 deletions ydb/core/kqp/ut/olap/helpers/get_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ void PrintValue(IOutputStream& out, const NYdb::TValue& v) {
out << value.GetUint32();
break;
}
case NYdb::EPrimitiveType::Int32:
{
out << value.GetInt32();
break;
}
case NYdb::EPrimitiveType::Uint64:
{
out << value.GetUint64();
Expand Down Expand Up @@ -72,6 +77,15 @@ ui64 GetUint32(const NYdb::TValue& v) {
}
}

i64 GetInt32(const NYdb::TValue& v) {
NYdb::TValueParser value(v);
if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
return *value.GetOptionalInt32();
} else {
return value.GetInt32();
}
}

ui64 GetUint64(const NYdb::TValue& v) {
NYdb::TValueParser value(v);
if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/olap/helpers/get_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ void PrintRow(IOutputStream& out, const THashMap<TString, NYdb::TValue>& fields)
void PrintRows(IOutputStream& out, const TVector<THashMap<TString, NYdb::TValue>>& rows);

ui64 GetUint32(const NYdb::TValue& v);
i64 GetInt32(const NYdb::TValue& v);
ui64 GetUint64(const NYdb::TValue& v);
TString GetUtf8(const NYdb::TValue& v);
TInstant GetTimestamp(const NYdb::TValue& v);
Expand Down
76 changes: 76 additions & 0 deletions ydb/core/kqp/ut/olap/tiering_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,82 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) {

testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
}

Y_UNIT_TEST(DeletedTier) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetSkipSpecialCheckForEvict(true);

TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
TTestHelper testHelper(runnerSettings);
TLocalHelper localHelper(testHelper.GetKikimr());
testHelper.GetRuntime().SetLogPriority(NKikimrServices::TX_TIERING, NActors::NLog::PRI_DEBUG);
NYdb::NTable::TTableClient tableClient = testHelper.GetKikimr().GetTableClient();
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->SetSecretKey("fakeSecret");

localHelper.CreateTestOlapTable();
testHelper.CreateTier("tier1");

for (ui64 i = 0; i < 100; ++i) {
WriteTestData(testHelper.GetKikimr(), "/Root/olapStore/olapTable", 0, 3600000000 + i * 1000, 1000);
WriteTestData(testHelper.GetKikimr(), "/Root/olapStore/olapTable", 0, 3600000000 + i * 1000, 1000);
}

testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
csController->WaitCompactions(TDuration::Seconds(5));
csController->WaitActualization(TDuration::Seconds(5));

csController->DisableBackground(NYDBTest::ICSController::EBackground::TTL);
testHelper.ResetTiering("/Root/olapStore/olapTable");
testHelper.RebootTablets("/Root/olapStore/olapTable");

{
auto selectQuery = TString(R"(
SELECT
TierName, SUM(ColumnRawBytes) As RawBytes
FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats`
WHERE Activity == 1
GROUP BY TierName
)");

auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "/Root/tier1");
}

ui64 maxLevelValue;
{
auto selectQuery = TString(R"(SELECT MAX(level) AS level FROM `/Root/olapStore/olapTable`)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
maxLevelValue = GetInt32(rows[0].at("level"));
}

{
auto result = testHelper.GetSession().ExecuteSchemeQuery(R"(DROP EXTERNAL DATA SOURCE `/Root/tier1`)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
testHelper.RebootTablets("/Root/olapStore/olapTable");

{
auto selectQuery = TString(R"(SELECT MAX(level) FROM `/Root/olapStore/olapTable`)");
auto it = tableClient.StreamExecuteScanQuery(selectQuery, NYdb::NTable::TStreamExecScanQuerySettings()).GetValueSync();
auto streamPart = it.ReadNext().GetValueSync();
UNIT_ASSERT(!streamPart.IsSuccess());
UNIT_ASSERT_STRING_CONTAINS(streamPart.GetIssues().ToString(), "cannot read blob range");
}

testHelper.CreateTier("tier1");
testHelper.RebootTablets("/Root/olapStore/olapTable");

{
auto selectQuery = TString(R"(SELECT MAX(level) AS level FROM `/Root/olapStore/olapTable`)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(GetInt32(rows[0].at("level")), maxLevelValue);
}
}
}

} // namespace NKikimr::NKqp
41 changes: 24 additions & 17 deletions ydb/core/tx/columnshard/blobs_action/tier/storage.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include "storage.h"
#include "adapter.h"
#include "remove.h"
#include "write.h"
#include "read.h"
#include "gc.h"
#include "gc_actor.h"
#include "read.h"
#include "remove.h"
#include "storage.h"
#include "write.h"

#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/tiering/manager.h>
#include <ydb/core/wrappers/unavailable_storage.h>

namespace NKikimr::NOlap::NBlobOperations::NTier {

Expand Down Expand Up @@ -54,23 +56,28 @@ void TOperator::DoStartGCAction(const std::shared_ptr<IBlobsGCAction>& action) c
}

void TOperator::InitNewExternalOperator(const NColumnShard::NTiers::TManager* tierManager) {
if (!tierManager || !tierManager->IsReady()) {
NWrappers::NExternalStorage::IExternalStorageOperator::TPtr extStorageOperator;
std::optional<NKikimrSchemeOp::TS3Settings> settings;

if (tierManager && tierManager->IsReady()) {
settings = tierManager->GetS3Settings();
{
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
if (CurrentS3Settings && CurrentS3Settings->SerializeAsString() == settings->SerializeAsString()) {
return;
}
}
auto extStorageConfig = NWrappers::NExternalStorage::IExternalStorageConfig::Construct(*settings);
AFL_VERIFY(extStorageConfig);
extStorageOperator = extStorageConfig->ConstructStorageOperator(false);
} else {
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
CurrentS3Settings.reset();
ExternalStorageOperator = nullptr;
return;
extStorageOperator = std::make_shared<NWrappers::NExternalStorage::TUnavailableExternalStorageOperator>(
NWrappers::NExternalStorage::TUnavailableExternalStorageOperator(
"tier_unavailable", TStringBuilder() << "Tier is not configured: " << GetStorageId()));
}

NKikimrSchemeOp::TS3Settings settings = tierManager->GetS3Settings();
{
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
if (CurrentS3Settings && CurrentS3Settings->SerializeAsString() == settings.SerializeAsString()) {
return;
}
}
auto extStorageConfig = NWrappers::NExternalStorage::IExternalStorageConfig::Construct(settings);
AFL_VERIFY(extStorageConfig);
auto extStorageOperator = extStorageConfig->ConstructStorageOperator(false);
extStorageOperator->InitReplyAdapter(std::make_shared<NOlap::NBlobOperations::NTier::TRepliesAdapter>(GetStorageId()));
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
CurrentS3Settings = settings;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx) {
}

void TColumnShard::TrySwitchToWork(const TActorContext& ctx) {
if (!Tiers->AreConfigsComplete()) {
if (Tiers->GetAwaitedConfigsCount()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "skip_switch_to_work")("reason", "tiering_metadata_not_ready");
return;
}
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ std::shared_ptr<ITxReader> TTxInit::BuildReader() {
result->AddChildren(std::make_shared<NLoading::TBackgroundSessionsInitializer>("bg_sessions", Self));
result->AddChildren(std::make_shared<NLoading::TSharingSessionsInitializer>("sharing_sessions", Self));
result->AddChildren(std::make_shared<NLoading::TInFlightReadsInitializer>("in_flight_reads", Self));
result->AddChildren(std::make_shared<NLoading::TTiersManagerInitializer>("tiers_manager", Self));
return result;
}

Expand Down Expand Up @@ -106,11 +107,6 @@ void TTxInit::Complete(const TActorContext& ctx) {
Self->Counters.GetCSCounters().Initialization.OnTxInitFinished(TMonotonic::Now() - StartInstant);
AFL_VERIFY(!Self->IsTxInitFinished);
Self->IsTxInitFinished = true;

for (const auto& [pathId, tiering] : Self->TablesManager.GetTtl()) {
Self->Tiers->EnablePathId(pathId, tiering.GetUsedTiers());
}

Self->TrySwitchToWork(ctx);
}

Expand Down
7 changes: 1 addition & 6 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1594,12 +1594,7 @@ void TColumnShard::Handle(NOlap::NBlobOperations::NEvents::TEvDeleteSharedBlobs:

void TColumnShard::ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& usedTiers) {
AFL_VERIFY(Tiers);
if (!usedTiers.empty()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "activate_tiering")("path_id", pathId)("tier_count", usedTiers.size());
Tiers->EnablePathId(pathId, usedTiers);
} else {
Tiers->DisablePathId(pathId);
}
Tiers->ActivateTiers(usedTiers);
OnTieringModified(pathId);
}

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class TSharingSessionsInitializer;
class TInFlightReadsInitializer;
class TSpecialValuesInitializer;
class TTablesManagerInitializer;
class TTiersManagerInitializer;
} // namespace NLoading

extern bool gAllowLogBatchingDefaultValue;
Expand Down Expand Up @@ -233,6 +234,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
friend class NLoading::TInFlightReadsInitializer;
friend class NLoading::TSpecialValuesInitializer;
friend class NLoading::TTablesManagerInitializer;
friend class NLoading::TTiersManagerInitializer;
friend class TWriteTasksQueue;
friend class TWriteTask;

Expand Down Expand Up @@ -330,7 +332,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
putStatus.OnYellowChannels(Executor());
}

void ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& usedTiers);
void ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& tiers);
void OnTieringModified(const std::optional<ui64> pathId = {});

std::shared_ptr<TAtomicCounter> TabletActivityImpl = std::make_shared<TAtomicCounter>(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ std::optional<TTieringActualizer::TFullActualizationInfo> TTieringActualizer::Bu
targetTierName = tieringInfo.GetNextTierNameVerified();
}
if (d) {
if (targetTierName != NTiering::NCommon::DeleteTierName) {
if (const auto op = StoragesManager->GetOperatorOptional(targetTierName); !op || !op->IsReady()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_eviction")("reason", "storage_not_ready")("tier", targetTierName)(
"portion", portion.GetPortionId());
return std::nullopt;
}
}
// if (currentTierName == "deploy_logs_s3" && targetTierName == IStoragesManager::DefaultStorageId) {
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId);
// AFL_VERIFY(false)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId);
Expand Down
29 changes: 29 additions & 0 deletions ydb/core/tx/columnshard/loading/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/transactions/locks_db.h>
#include <ydb/core/tx/tiering/manager.h>

namespace NKikimr::NColumnShard::NLoading {

Expand Down Expand Up @@ -228,4 +229,32 @@ bool TTablesManagerInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionCon
(int)Schema::Precharge<Schema::TtlSettingsPresetVersionInfo>(db, txc.DB.GetScheme());
}

bool TTiersManagerInitializer::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
NIceDb::TNiceDb db(txc.DB);
auto rowset = db.Table<Schema::TableVersionInfo>().Select();
if (!rowset.IsReady()) {
return false;
}

while (!rowset.EndOfSet()) {
NKikimrTxColumnShard::TTableVersionInfo versionInfo;
AFL_VERIFY(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
if (versionInfo.GetTtlSettings().HasEnabled()) {
NOlap::TTiering tiering;
tiering.DeserializeFromProto(versionInfo.GetTtlSettings().GetEnabled()).Validate();
Self->Tiers->ActivateTiers(tiering.GetUsedTiers());
}

if (!rowset.Next()) {
return false;
}
}
return true;
}

bool TTiersManagerInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
NIceDb::TNiceDb db(txc.DB);
return Schema::Precharge<Schema::TableVersionInfo>(db, txc.DB.GetScheme());
}

} // namespace NKikimr::NColumnShard::NLoading
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/loading/stages.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,14 @@ class TTablesManagerInitializer: public ITxShardInitReader {
using TBase::TBase;
};

class TTiersManagerInitializer: public ITxShardInitReader {
private:
using TBase = ITxShardInitReader;
virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) override;
virtual bool DoPrecharge(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) override;

public:
using TBase::TBase;
};

} // namespace NKikimr::NColumnShard::NLoading
Loading

0 comments on commit 63faf17

Please sign in to comment.