Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maintain access to tiers removed from TTL #13733

Merged
merged 5 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions ydb/core/kqp/ut/olap/helpers/get_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ void PrintValue(IOutputStream& out, const NYdb::TValue& v) {
out << value.GetUint32();
break;
}
case NYdb::EPrimitiveType::Int32:
{
out << value.GetInt32();
break;
}
case NYdb::EPrimitiveType::Uint64:
{
out << value.GetUint64();
Expand Down Expand Up @@ -72,6 +77,15 @@ ui64 GetUint32(const NYdb::TValue& v) {
}
}

i64 GetInt32(const NYdb::TValue& v) {
NYdb::TValueParser value(v);
if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
return *value.GetOptionalInt32();
} else {
return value.GetInt32();
}
}

ui64 GetUint64(const NYdb::TValue& v) {
NYdb::TValueParser value(v);
if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/olap/helpers/get_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ void PrintRow(IOutputStream& out, const THashMap<TString, NYdb::TValue>& fields)
void PrintRows(IOutputStream& out, const TVector<THashMap<TString, NYdb::TValue>>& rows);

ui64 GetUint32(const NYdb::TValue& v);
i64 GetInt32(const NYdb::TValue& v);
ui64 GetUint64(const NYdb::TValue& v);
TString GetUtf8(const NYdb::TValue& v);
TInstant GetTimestamp(const NYdb::TValue& v);
Expand Down
76 changes: 76 additions & 0 deletions ydb/core/kqp/ut/olap/tiering_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,82 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) {

testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
}

Y_UNIT_TEST(DeletedTier) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetSkipSpecialCheckForEvict(true);

TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
TTestHelper testHelper(runnerSettings);
TLocalHelper localHelper(testHelper.GetKikimr());
testHelper.GetRuntime().SetLogPriority(NKikimrServices::TX_TIERING, NActors::NLog::PRI_DEBUG);
NYdb::NTable::TTableClient tableClient = testHelper.GetKikimr().GetTableClient();
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->SetSecretKey("fakeSecret");

localHelper.CreateTestOlapTable();
testHelper.CreateTier("tier1");

for (ui64 i = 0; i < 100; ++i) {
WriteTestData(testHelper.GetKikimr(), "/Root/olapStore/olapTable", 0, 3600000000 + i * 1000, 1000);
WriteTestData(testHelper.GetKikimr(), "/Root/olapStore/olapTable", 0, 3600000000 + i * 1000, 1000);
}

testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
csController->WaitCompactions(TDuration::Seconds(5));
csController->WaitActualization(TDuration::Seconds(5));

csController->DisableBackground(NYDBTest::ICSController::EBackground::TTL);
testHelper.ResetTiering("/Root/olapStore/olapTable");
testHelper.RebootTablets("/Root/olapStore/olapTable");

{
auto selectQuery = TString(R"(
SELECT
TierName, SUM(ColumnRawBytes) As RawBytes
FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats`
WHERE Activity == 1
GROUP BY TierName
)");

auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "/Root/tier1");
}

ui64 maxLevelValue;
{
auto selectQuery = TString(R"(SELECT MAX(level) AS level FROM `/Root/olapStore/olapTable`)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
maxLevelValue = GetInt32(rows[0].at("level"));
}

{
auto result = testHelper.GetSession().ExecuteSchemeQuery(R"(DROP EXTERNAL DATA SOURCE `/Root/tier1`)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
testHelper.RebootTablets("/Root/olapStore/olapTable");

{
auto selectQuery = TString(R"(SELECT MAX(level) FROM `/Root/olapStore/olapTable`)");
auto it = tableClient.StreamExecuteScanQuery(selectQuery, NYdb::NTable::TStreamExecScanQuerySettings()).GetValueSync();
auto streamPart = it.ReadNext().GetValueSync();
UNIT_ASSERT(!streamPart.IsSuccess());
UNIT_ASSERT_STRING_CONTAINS(streamPart.GetIssues().ToString(), "cannot read blob range");
}

testHelper.CreateTier("tier1");
testHelper.RebootTablets("/Root/olapStore/olapTable");

{
auto selectQuery = TString(R"(SELECT MAX(level) AS level FROM `/Root/olapStore/olapTable`)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(GetInt32(rows[0].at("level")), maxLevelValue);
}
}
}

} // namespace NKikimr::NKqp
41 changes: 23 additions & 18 deletions ydb/core/tx/columnshard/blobs_action/tier/storage.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include "storage.h"
#include "adapter.h"
#include "remove.h"
#include "write.h"
#include "read.h"
#include "gc.h"
#include "gc_actor.h"
#include "read.h"
#include "remove.h"
#include "storage.h"
#include "write.h"

#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/tiering/manager.h>
#include <ydb/core/wrappers/unavailable_storage.h>

namespace NKikimr::NOlap::NBlobOperations::NTier {

Expand Down Expand Up @@ -54,23 +56,26 @@ void TOperator::DoStartGCAction(const std::shared_ptr<IBlobsGCAction>& action) c
}

void TOperator::InitNewExternalOperator(const NColumnShard::NTiers::TManager* tierManager) {
if (!tierManager || !tierManager->IsReady()) {
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
CurrentS3Settings.reset();
ExternalStorageOperator = nullptr;
return;
}
NWrappers::NExternalStorage::IExternalStorageOperator::TPtr extStorageOperator;
std::optional<NKikimrSchemeOp::TS3Settings> settings;

NKikimrSchemeOp::TS3Settings settings = tierManager->GetS3Settings();
{
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
if (CurrentS3Settings && CurrentS3Settings->SerializeAsString() == settings.SerializeAsString()) {
return;
if (tierManager && tierManager->IsReady()) {
settings = tierManager->GetS3Settings();
{
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
if (CurrentS3Settings && CurrentS3Settings->SerializeAsString() == settings->SerializeAsString()) {
return;
}
}
auto extStorageConfig = NWrappers::NExternalStorage::IExternalStorageConfig::Construct(*settings);
AFL_VERIFY(extStorageConfig);
extStorageOperator = extStorageConfig->ConstructStorageOperator(false);
} else {
extStorageOperator = std::make_shared<NWrappers::NExternalStorage::TUnavailableExternalStorageOperator>(
NWrappers::NExternalStorage::TUnavailableExternalStorageOperator(
"tier_unavailable", TStringBuilder() << "Tier is not configured: " << GetStorageId()));
}
auto extStorageConfig = NWrappers::NExternalStorage::IExternalStorageConfig::Construct(settings);
AFL_VERIFY(extStorageConfig);
auto extStorageOperator = extStorageConfig->ConstructStorageOperator(false);

extStorageOperator->InitReplyAdapter(std::make_shared<NOlap::NBlobOperations::NTier::TRepliesAdapter>(GetStorageId()));
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
CurrentS3Settings = settings;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx) {
}

void TColumnShard::TrySwitchToWork(const TActorContext& ctx) {
if (!Tiers->AreConfigsComplete()) {
if (Tiers->GetAwaitedConfigsCount()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "skip_switch_to_work")("reason", "tiering_metadata_not_ready");
return;
}
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ std::shared_ptr<ITxReader> TTxInit::BuildReader() {
result->AddChildren(std::make_shared<NLoading::TBackgroundSessionsInitializer>("bg_sessions", Self));
result->AddChildren(std::make_shared<NLoading::TSharingSessionsInitializer>("sharing_sessions", Self));
result->AddChildren(std::make_shared<NLoading::TInFlightReadsInitializer>("in_flight_reads", Self));
result->AddChildren(std::make_shared<NLoading::TTiersManagerInitializer>("tiers_manager", Self));
return result;
}

Expand Down Expand Up @@ -106,11 +107,6 @@ void TTxInit::Complete(const TActorContext& ctx) {
Self->Counters.GetCSCounters().Initialization.OnTxInitFinished(TMonotonic::Now() - StartInstant);
AFL_VERIFY(!Self->IsTxInitFinished);
Self->IsTxInitFinished = true;

for (const auto& [pathId, tiering] : Self->TablesManager.GetTtl()) {
Self->Tiers->EnablePathId(pathId, tiering.GetUsedTiers());
}

Self->TrySwitchToWork(ctx);
}

Expand Down
7 changes: 1 addition & 6 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1594,12 +1594,7 @@ void TColumnShard::Handle(NOlap::NBlobOperations::NEvents::TEvDeleteSharedBlobs:

void TColumnShard::ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& usedTiers) {
AFL_VERIFY(Tiers);
if (!usedTiers.empty()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "activate_tiering")("path_id", pathId)("tier_count", usedTiers.size());
Tiers->EnablePathId(pathId, usedTiers);
} else {
Tiers->DisablePathId(pathId);
}
Tiers->ActivateTiers(usedTiers);
OnTieringModified(pathId);
}

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class TSharingSessionsInitializer;
class TInFlightReadsInitializer;
class TSpecialValuesInitializer;
class TTablesManagerInitializer;
class TTiersManagerInitializer;
} // namespace NLoading

extern bool gAllowLogBatchingDefaultValue;
Expand Down Expand Up @@ -233,6 +234,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
friend class NLoading::TInFlightReadsInitializer;
friend class NLoading::TSpecialValuesInitializer;
friend class NLoading::TTablesManagerInitializer;
friend class NLoading::TTiersManagerInitializer;
friend class TWriteTasksQueue;
friend class TWriteTask;

Expand Down Expand Up @@ -330,7 +332,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
putStatus.OnYellowChannels(Executor());
}

void ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& usedTiers);
void ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& tiers);
void OnTieringModified(const std::optional<ui64> pathId = {});

std::shared_ptr<TAtomicCounter> TabletActivityImpl = std::make_shared<TAtomicCounter>(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ std::optional<TTieringActualizer::TFullActualizationInfo> TTieringActualizer::Bu
targetTierName = tieringInfo.GetNextTierNameVerified();
}
if (d) {
if (targetTierName != NTiering::NCommon::DeleteTierName) {
if (const auto op = StoragesManager->GetOperatorOptional(targetTierName); !op || !op->IsReady()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_eviction")("reason", "storage_not_ready")("tier", targetTierName)(
"portion", portion.GetPortionId());
return std::nullopt;
}
}
// if (currentTierName == "deploy_logs_s3" && targetTierName == IStoragesManager::DefaultStorageId) {
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId);
// AFL_VERIFY(false)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId);
Expand Down
29 changes: 29 additions & 0 deletions ydb/core/tx/columnshard/loading/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/transactions/locks_db.h>
#include <ydb/core/tx/tiering/manager.h>

namespace NKikimr::NColumnShard::NLoading {

Expand Down Expand Up @@ -228,4 +229,32 @@ bool TTablesManagerInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionCon
(int)Schema::Precharge<Schema::TtlSettingsPresetVersionInfo>(db, txc.DB.GetScheme());
}

bool TTiersManagerInitializer::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
NIceDb::TNiceDb db(txc.DB);
auto rowset = db.Table<Schema::TableVersionInfo>().Select();
if (!rowset.IsReady()) {
return false;
}

while (!rowset.EndOfSet()) {
NKikimrTxColumnShard::TTableVersionInfo versionInfo;
AFL_VERIFY(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
if (versionInfo.GetTtlSettings().HasEnabled()) {
NOlap::TTiering tiering;
tiering.DeserializeFromProto(versionInfo.GetTtlSettings().GetEnabled()).Validate();
Self->Tiers->ActivateTiers(tiering.GetUsedTiers());
}

if (!rowset.Next()) {
return false;
}
}
return true;
}

bool TTiersManagerInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
NIceDb::TNiceDb db(txc.DB);
return Schema::Precharge<Schema::TableVersionInfo>(db, txc.DB.GetScheme());
}

} // namespace NKikimr::NColumnShard::NLoading
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/loading/stages.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,14 @@ class TTablesManagerInitializer: public ITxShardInitReader {
using TBase::TBase;
};

class TTiersManagerInitializer: public ITxShardInitReader {
private:
using TBase = ITxShardInitReader;
virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) override;
virtual bool DoPrecharge(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) override;

public:
using TBase::TBase;
};

} // namespace NKikimr::NColumnShard::NLoading
8 changes: 1 addition & 7 deletions ydb/core/tx/tiering/fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ class TEvSchemeObjectResolutionFailed: public TEventLocal<TEvSchemeObjectResolut
class TSchemeObjectWatcher: public TActorBootstrapped<TSchemeObjectWatcher> {
private:
TActorId Owner;
THashSet<TPathId> WatchedPathIds;

private:
THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest(
Expand Down Expand Up @@ -103,11 +102,7 @@ class TSchemeObjectWatcher: public TActorBootstrapped<TSchemeObjectWatcher> {
}

void WatchPathId(const TPathId& pathId) {
if (WatchedPathIds.emplace(pathId).second) {
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(pathId), IEventHandle::FlagTrackDelivery);
} else {
AFL_DEBUG(NKikimrServices::TX_TIERING)("event", "skip_watch_path_id")("reason", "already_subscribed")("path", pathId.ToString());
}
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(pathId), IEventHandle::FlagTrackDelivery);
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
Expand Down Expand Up @@ -152,7 +147,6 @@ class TSchemeObjectWatcher: public TActorBootstrapped<TSchemeObjectWatcher> {
const TString name = TString(ExtractBase(record->Path));
const TString storageDir = TString(ExtractParent(record->Path));
AFL_DEBUG(NKikimrServices::TX_TIERING)("event", "object_deleted")("path", record->Path);
AFL_VERIFY(WatchedPathIds.erase(record->PathId));
Send(Owner, new NTiers::TEvNotifySchemeObjectDeleted(record->Path));
}

Expand Down
Loading
Loading