diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 96d1486d267c..0ba5642c31c7 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -186,4 +186,5 @@ message TFeatureFlags { optional bool DisableLocalDBEraseCache = 161 [default = false]; optional bool EnableExportChecksums = 162 [default = false]; optional bool EnableTopicTransfer = 163 [default = false]; + optional bool EnableViewExport = 164 [default = false]; } diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 2ec3ee1acd75..938215b4e957 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4329,6 +4329,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase { item.SourcePathId.OwnerId = rowset.GetValueOrDefault(selfId); item.SourcePathId.LocalPathId = rowset.GetValue(); + item.SourcePathType = rowset.GetValue(); item.State = static_cast(rowset.GetValue()); item.WaitTxId = rowset.GetValueOrDefault(InvalidTxId); diff --git a/ydb/core/tx/schemeshard/schemeshard_export.cpp b/ydb/core/tx/schemeshard/schemeshard_export.cpp index ea6f881ee39a..cba13774cc96 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export.cpp @@ -170,7 +170,8 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::T NIceDb::TUpdate(item.SourcePathName), NIceDb::TUpdate(item.SourcePathId.OwnerId), NIceDb::TUpdate(item.SourcePathId.LocalPathId), - NIceDb::TUpdate(static_cast(item.State)) + NIceDb::TUpdate(static_cast(item.State)), + NIceDb::TUpdate(item.SourcePathType) ); } } @@ -231,6 +232,10 @@ void TSchemeShard::Handle(TEvExport::TEvListExportsRequest::TPtr& ev, const TAct Execute(CreateTxListExports(ev), ctx); } +void TSchemeShard::Handle(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxProgressExport(ev), ctx); +} + void TSchemeShard::ResumeExports(const TVector& exportIds, const TActorContext& ctx) { for (const ui64 id : exportIds) { Execute(CreateTxProgressExport(id), ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp b/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp index ccc6146e269d..ec7635c7cd2a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp @@ -1,8 +1,9 @@ -#include "schemeshard_xxport__tx_base.h" -#include "schemeshard_export_flow_proposals.h" -#include "schemeshard_export.h" #include "schemeshard_audit_log.h" +#include "schemeshard_export.h" +#include "schemeshard_export_flow_proposals.h" +#include "schemeshard_export_helpers.h" #include "schemeshard_impl.h" +#include "schemeshard_xxport__tx_base.h" #include #include @@ -58,6 +59,10 @@ struct TSchemeShard::TExport::TTxCancel: public TSchemeShard::TXxport::TTxBase { return true; } + LOG_D("TExport::TTxCancel, cancelling manually" + << ", info: " << exportInfo->ToString() + ); + exportInfo->Issue = "Cancelled manually"; if (exportInfo->State < TExportInfo::EState::Transferring) { diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 7f63a60a7970..41e2f6698a57 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -1,8 +1,9 @@ #include "schemeshard_xxport__tx_base.h" #include "schemeshard_xxport__helpers.h" +#include "schemeshard_export.h" #include "schemeshard_export_flow_proposals.h" #include "schemeshard_export_helpers.h" -#include "schemeshard_export.h" +#include "schemeshard_export_scheme_uploader.h" #include "schemeshard_audit_log.h" #include "schemeshard_impl.h" @@ -15,6 +16,16 @@ #include #include +namespace { + +ui32 PopFront(TDeque& pendingItems) { + const ui32 itemIdx = pendingItems.front(); + pendingItems.pop_front(); + return itemIdx; +} + +} + namespace NKikimr { namespace NSchemeShard { @@ -203,7 +214,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { .IsResolved() .NotDeleted() .NotUnderDeleting() - .IsTable() + .IsSupportedInExports() .FailOnRestrictedCreateInTempZone(); if (!checks) { @@ -212,7 +223,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } } - exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId); + exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId, path->PathType); exportInfo->PendingItems.push_back(itemIdx); } @@ -230,6 +241,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase ui64 Id; TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr; TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr; + TEvPrivate::TEvExportSchemeUploadResult::TPtr SchemeUploadResult = nullptr; TTxId CompletedTxId = InvalidTxId; explicit TTxProgress(TSelf* self, ui64 id) @@ -250,6 +262,12 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase { } + explicit TTxProgress(TSelf* self, TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev) + : TXxport::TTxBase(self) + , SchemeUploadResult(ev) + { + } + explicit TTxProgress(TSelf* self, TTxId completedTxId) : TXxport::TTxBase(self) , CompletedTxId(completedTxId) @@ -264,9 +282,11 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase LOG_D("TExport::TTxProgress: DoExecute"); if (AllocateResult) { - OnAllocateResult(txc, ctx); + OnAllocateResult(); } else if (ModifyResult) { OnModifyResult(txc, ctx); + } else if (SchemeUploadResult) { + OnSchemeUploadResult(txc); } else if (CompletedTxId) { OnNotifyResult(txc, ctx); } else { @@ -301,19 +321,50 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId) { Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); - auto& item = exportInfo->Items.at(itemIdx); + auto& item = exportInfo->Items[itemIdx]; item.SubState = ESubState::Proposed; LOG_I("TExport::TTxProgress: Backup propose" << ": info# " << exportInfo->ToString() << ", item# " << item.ToString(itemIdx) - << ", txId# " << txId); + << ", txId# " << txId + ); Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId); Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx)); } + void UploadScheme(TExportInfo::TPtr exportInfo, ui32 itemIdx, const TActorContext& ctx) { + Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); + auto& item = exportInfo->Items[itemIdx]; + + item.SubState = ESubState::Proposed; + + LOG_I("TExport::TTxProgress: UploadScheme" + << ": info# " << exportInfo->ToString() + << ", item# " << item.ToString(itemIdx) + ); + + Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId); + if (item.SourcePathType == NKikimrSchemeOp::EPathTypeView) { + Ydb::Export::ExportToS3Settings exportSettings; + Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings)); + const auto databaseRoot = CanonizePath(Self->RootPathElements); + + NBackup::TMetadata metadata; + // to do: enable view checksum validation + constexpr bool EnableChecksums = false; + metadata.SetVersion(EnableChecksums ? 1 : 0); + + item.SchemeUploader = ctx.Register(CreateSchemeUploader( + Self->SelfId(), exportInfo->Id, itemIdx, item.SourcePathId, + exportSettings, databaseRoot, metadata.Serialize() + )); + Self->RunningExportSchemeUploaders.emplace(item.SchemeUploader); + } + } + bool CancelTransferring(TExportInfo::TPtr exportInfo, ui32 itemIdx) { Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); const auto& item = exportInfo->Items.at(itemIdx); @@ -421,21 +472,26 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase return InvalidTxId; } - const auto& item = exportInfo->Items.at(0); - if (!Self->PathsById.contains(item.SourcePathId)) { - return InvalidTxId; - } + for (size_t i : xrange(exportInfo->Items.size())) { + const auto& item = exportInfo->Items[i]; - auto path = Self->PathsById.at(item.SourcePathId); - if (path->PathState != NKikimrSchemeOp::EPathStateCopying) { - return InvalidTxId; - } + if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable) { + // only tables can be targets of the copy tables operation + continue; + } - if (!ItemPathId(Self, exportInfo, 0)) { - return InvalidTxId; - } + auto path = Self->PathsById.Value(item.SourcePathId, nullptr); + if (!path || path->PathState != NKikimrSchemeOp::EPathStateCopying) { + return InvalidTxId; + } - return path->LastTxId; + if (!ItemPathId(Self, exportInfo, i)) { + return InvalidTxId; + } + + return path->LastTxId; + } + return InvalidTxId; } TTxId GetActiveBackupTxId(TExportInfo::TPtr exportInfo, ui32 itemIdx) { @@ -461,6 +517,13 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase return path->LastTxId; } + void KillChildActors(TExportInfo::TItem& item) { + if (auto schemeUploader = std::exchange(item.SchemeUploader, {})) { + Send(schemeUploader, new TEvents::TEvPoisonPill()); + Self->RunningExportSchemeUploaders.erase(schemeUploader); + } + } + void Cancel(TExportInfo::TPtr exportInfo, ui32 itemIdx, TStringBuf marker) { Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); const auto& item = exportInfo->Items.at(itemIdx); @@ -472,6 +535,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase exportInfo->State = EState::Cancelled; for (ui32 i : xrange(exportInfo->Items.size())) { + KillChildActors(exportInfo->Items[i]); if (i == itemIdx) { continue; } @@ -527,7 +591,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase return output; } - void Resume(TTransactionContext& txc, const TActorContext&) { + void Resume(TTransactionContext& txc, const TActorContext& ctx) { Y_ABORT_UNLESS(Self->Exports.contains(Id)); TExportInfo::TPtr exportInfo = Self->Exports.at(Id); @@ -546,17 +610,27 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } break; - case EState::Transferring: + case EState::Transferring: { + TDeque pendingTables; for (ui32 itemIdx : xrange(exportInfo->Items.size())) { const auto& item = exportInfo->Items.at(itemIdx); if (item.WaitTxId == InvalidTxId) { - AllocateTxId(exportInfo, itemIdx); + if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) { + pendingTables.emplace_back(itemIdx); + } else { + UploadScheme(exportInfo, itemIdx, ctx); + } } else { SubscribeTx(exportInfo, itemIdx); } } + exportInfo->PendingItems = std::move(pendingTables); + for (ui32 itemIdx : exportInfo->PendingItems) { + AllocateTxId(exportInfo, itemIdx); + } break; + } case EState::Cancellation: exportInfo->State = EState::Cancelled; @@ -595,7 +669,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase for (ui32 itemIdx : xrange(exportInfo->Items.size())) { const auto& item = exportInfo->Items.at(itemIdx); - if (item.State != EState::Dropping) { + if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable || item.State != EState::Dropping) { continue; } @@ -620,7 +694,16 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } } - void OnAllocateResult(TTransactionContext&, const TActorContext&) { + void EndExport(TExportInfo::TPtr exportInfo, EState finalState, NIceDb::TNiceDb& db) { + exportInfo->State = finalState; + exportInfo->EndTime = TAppData::TimeProvider->Now(); + + Self->PersistExportState(db, exportInfo); + SendNotificationsIfFinished(exportInfo); + AuditLogExportEnd(*exportInfo.Get(), Self); + } + + void OnAllocateResult() { Y_ABORT_UNLESS(AllocateResult); const auto txId = TTxId(AllocateResult->Get()->TxIds.front()); @@ -639,12 +722,6 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase TExportInfo::TPtr exportInfo = Self->Exports.at(id); ui32 itemIdx = Max(); - auto popPendingItemIdx = [](TDeque& pendingItems) { - const ui32 itemIdx = pendingItems.front(); - pendingItems.pop_front(); - return itemIdx; - }; - switch (exportInfo->State) { case EState::CreateExportDir: MkDir(exportInfo, txId); @@ -655,17 +732,25 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase break; case EState::Transferring: - if (exportInfo->PendingItems) { - itemIdx = popPendingItemIdx(exportInfo->PendingItems); + if (exportInfo->PendingItems.empty()) { + return; + } + itemIdx = PopFront(exportInfo->PendingItems); + if (const auto type = exportInfo->Items.at(itemIdx).SourcePathType; type == NKikimrSchemeOp::EPathTypeTable) { TransferData(exportInfo, itemIdx, txId); } else { + LOG_W("TExport::TTxProgress: OnAllocateResult allocated a needless txId for an item transferring" + << ": id# " << id + << ", itemIdx# " << itemIdx + << ", type# " << type + ); return; } break; case EState::Dropping: if (exportInfo->PendingDropItems) { - itemIdx = popPendingItemIdx(exportInfo->PendingDropItems); + itemIdx = PopFront(exportInfo->PendingDropItems); DropTable(exportInfo, itemIdx, txId); } else { DropDir(exportInfo, txId); @@ -870,7 +955,78 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase SubscribeTx(txId); } - void OnNotifyResult(TTransactionContext& txc, const TActorContext&) { + void OnSchemeUploadResult(TTransactionContext& txc) { + Y_ABORT_UNLESS(SchemeUploadResult); + const auto& result = *SchemeUploadResult.Get()->Get(); + + LOG_D("TExport::TTxProgress: OnSchemeUploadResult" + << ": id# " << result.ExportId + << ", itemIdx# " << result.ItemIdx + << ", success# " << result.Success + << ", error# " << result.Error + ); + + const auto exportId = result.ExportId; + auto exportInfo = Self->Exports.Value(exportId, nullptr); + if (!exportInfo) { + LOG_E("TExport::TTxProgress: OnSchemeUploadResult received unknown export id" + << ": id# " << exportId + ); + return; + } + + ui32 itemIdx = result.ItemIdx; + if (itemIdx >= exportInfo->Items.size()) { + LOG_E("TExport::TTxProgress: OnSchemeUploadResult item index out of range" + << ": id# " << exportId + << ", item index# " << itemIdx + << ", number of items# " << exportInfo->Items.size() + ); + return; + } + + NIceDb::TNiceDb db(txc.DB); + + auto& item = exportInfo->Items[itemIdx]; + Self->RunningExportSchemeUploaders.erase(item.SchemeUploader); + item.SchemeUploader = TActorId(); + + if (!result.Success) { + item.State = EState::Cancelled; + item.Issue = result.Error; + Self->PersistExportItemState(db, exportInfo, itemIdx); + + if (!exportInfo->IsInProgress()) { + return; + } + + Cancel(exportInfo, itemIdx, "unsuccessful scheme upload"); + + Self->PersistExportState(db, exportInfo); + return SendNotificationsIfFinished(exportInfo); + } + + if (exportInfo->State == EState::Transferring) { + item.State = EState::Done; + Self->PersistExportItemState(db, exportInfo, itemIdx); + + if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { + EndExport(exportInfo, EState::Done, db); + } + } else if (exportInfo->State == EState::Cancellation) { + item.State = EState::Cancelled; + Self->PersistExportItemState(db, exportInfo, itemIdx); + + if (AllOf(exportInfo->Items, [](const TExportInfo::TItem& item) { + // on cancellation we wait only for transferring items + return item.State != EState::Transferring; + })) { + EndExport(exportInfo, EState::Cancelled, db); + } + } + } + + void OnNotifyResult(TTransactionContext& txc, const TActorContext& ctx) { Y_ABORT_UNLESS(CompletedTxId); LOG_D("TExport::TTxProgress: OnNotifyResult" << ": txId# " << CompletedTxId); @@ -887,20 +1043,20 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase ui32 itemIdx; std::tie(id, itemIdx) = Self->TxIdToExport.at(txId); - OnNotifyResult(txId, id, itemIdx, txc); + OnNotifyResult(txId, id, itemIdx, txc, ctx); Self->TxIdToExport.erase(txId); } if (Self->TxIdToDependentExport.contains(txId)) { for (const auto id : Self->TxIdToDependentExport.at(txId)) { - OnNotifyResult(txId, id, Max(), txc); + OnNotifyResult(txId, id, Max(), txc, ctx); } Self->TxIdToDependentExport.erase(txId); } } - void OnNotifyResult(TTxId txId, ui64 id, ui32 itemIdx, TTransactionContext& txc) { + void OnNotifyResult(TTxId txId, ui64 id, ui32 itemIdx, TTransactionContext& txc, const TActorContext& ctx) { LOG_D("TExport::TTxProgress: OnNotifyResult" << ": txId# " << txId << ", id# " << id @@ -917,12 +1073,28 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase switch (exportInfo->State) { case EState::CreateExportDir: - exportInfo->State = EState::CopyTables; exportInfo->WaitTxId = InvalidTxId; - AllocateTxId(exportInfo); + + if (AnyOf(exportInfo->Items, [](const TExportInfo::TItem& item) { + return item.SourcePathType == NKikimrSchemeOp::EPathTypeTable; + })) { + exportInfo->State = EState::CopyTables; + AllocateTxId(exportInfo); + } else { + // None of the items is a table. + for (ui32 i : xrange(exportInfo->Items.size())) { + exportInfo->Items[i].State = EState::Transferring; + Self->PersistExportItemState(db, exportInfo, i); + + UploadScheme(exportInfo, i, ctx); + } + + exportInfo->State = EState::Transferring; + exportInfo->PendingItems.clear(); + } break; - case EState::CopyTables: + case EState::CopyTables: { if (exportInfo->DependencyTxIds.contains(txId)) { exportInfo->DependencyTxIds.erase(txId); if (exportInfo->DependencyTxIds.empty()) { @@ -933,13 +1105,24 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase exportInfo->State = EState::Transferring; exportInfo->WaitTxId = InvalidTxId; + TDeque tables; for (ui32 itemIdx : xrange(exportInfo->Items.size())) { - exportInfo->Items.at(itemIdx).State = EState::Transferring; + auto& item = exportInfo->Items[itemIdx]; + item.State = EState::Transferring; Self->PersistExportItemState(db, exportInfo, itemIdx); + if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) { + tables.emplace_back(itemIdx); + } else { + UploadScheme(exportInfo, itemIdx, ctx); + } + } + exportInfo->PendingItems = std::move(tables); + for (ui32 itemIdx : exportInfo->PendingItems) { AllocateTxId(exportInfo, itemIdx); } break; + } case EState::Transferring: { Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); @@ -948,15 +1131,18 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase item.State = EState::Done; item.WaitTxId = InvalidTxId; - if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) { - item.Issue = *issue; - Cancel(exportInfo, itemIdx, "issues during backing up"); - } else { - if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { - exportInfo->State = EState::Done; - exportInfo->EndTime = TAppData::TimeProvider->Now(); + bool itemHasIssues = false; + if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) { + if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) { + item.Issue = *issue; + Cancel(exportInfo, itemIdx, "issues during backing up"); + itemHasIssues = true; } } + if (!itemHasIssues && AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { + exportInfo->State = EState::Done; + exportInfo->EndTime = TAppData::TimeProvider->Now(); + } Self->PersistExportItemState(db, exportInfo, itemIdx); break; @@ -1016,6 +1202,10 @@ ITransaction* TSchemeShard::CreateTxProgressExport(TEvSchemeShard::TEvModifySche return new TExport::TTxProgress(this, ev); } +ITransaction* TSchemeShard::CreateTxProgressExport(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev) { + return new TExport::TTxProgress(this, ev); +} + ITransaction* TSchemeShard::CreateTxProgressExport(TTxId completedTxId) { return new TExport::TTxProgress(this, completedTxId); } diff --git a/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp b/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp index a829cba4fcd8..2a4fd3c5f186 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp @@ -74,6 +74,9 @@ struct TSchemeShard::TExport::TTxForget: public TSchemeShard::TXxport::TTxBase { Self->Exports.erase(exportInfo->Id); Self->PersistRemoveExport(db, exportInfo); } else { + LOG_D("TExport::TTxForget, dropping export tables" + << ", info: " << exportInfo->ToString() + ); exportInfo->WaitTxId = InvalidTxId; exportInfo->State = TExportInfo::EState::Dropping; Self->PersistExportState(db, exportInfo); diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index c403bfc128ba..74eb0c0c4cde 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -59,6 +59,9 @@ THolder CopyTablesPropose( for (ui32 itemIdx : xrange(exportInfo->Items.size())) { const auto& item = exportInfo->Items.at(itemIdx); + if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable) { + continue; + } auto& desc = *copyTables.Add(); desc.SetSrcPath(item.SourcePathName); diff --git a/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.cpp b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.cpp new file mode 100644 index 000000000000..46d4c9dc8f14 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.cpp @@ -0,0 +1,342 @@ +#include "schemeshard.h" +#include "schemeshard_export_scheme_uploader.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NSchemeShard { + +class TSchemeUploader: public TActorBootstrapped { + + using TS3ExternalStorageConfig = NWrappers::NExternalStorage::TS3ExternalStorageConfig; + using TEvExternalStorage = NWrappers::TEvExternalStorage; + using TPutObjectResult = Aws::Utils::Outcome; + + void GetDescription() { + Send(SchemeShard, new TEvSchemeShard::TEvDescribeScheme(SourcePathId)); + Become(&TThis::StateDescribe); + } + + static TString BuildViewScheme(const TString& path, const NKikimrSchemeOp::TViewDescription& viewDescription, const TString& backupRoot, TString& error) { + NYql::TIssues issues; + auto scheme = NYdb::NDump::BuildCreateViewQuery(viewDescription.GetName(), path, viewDescription.GetQueryText(), backupRoot, issues); + if (!scheme) { + error = issues.ToString(); + } + return scheme; + } + + bool BuildSchemeToUpload(const NKikimrScheme::TEvDescribeSchemeResult& describeResult, TString& error) { + const auto pathType = describeResult.GetPathDescription().GetSelf().GetPathType(); + switch (pathType) { + case NKikimrSchemeOp::EPathTypeView: { + Scheme = BuildViewScheme(describeResult.GetPath(), describeResult.GetPathDescription().GetViewDescription(), DatabaseRoot, error); + return !Scheme.empty(); + } + default: + error = TStringBuilder() << "unsupported path type: " << pathType; + return false; + } + } + + void HandleSchemeDescription(TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev) { + const auto& describeResult = ev->Get()->GetRecord(); + + LOG_D("HandleSchemeDescription" + << ", self: " << SelfId() + << ", status: " << describeResult.GetStatus() + ); + + if (describeResult.GetStatus() != TEvSchemeShard::EStatus::StatusSuccess) { + return Finish(false, describeResult.GetReason()); + } + + TString error; + if (!BuildSchemeToUpload(describeResult, error)) { + return Finish(false, error); + } + + if (auto permissions = NDataShard::GenYdbPermissions(describeResult.GetPathDescription())) { + google::protobuf::TextFormat::PrintToString(permissions.GetRef(), &Permissions); + } else { + return Finish(false, "cannot infer permissions"); + } + + UploadScheme(); + } + + void UploadScheme() { + Y_ABORT_UNLESS(!SchemeUploaded); + + if (!Scheme) { + return Finish(false, "cannot infer scheme"); + } + if (Attempt == 0) { + StorageOperator = RegisterWithSameMailbox( + NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()) + ); + } + + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(Sprintf("%s/create_view.sql", DestinationPrefix->c_str())); + + Send(StorageOperator, new TEvExternalStorage::TEvPutObjectRequest(request, TString(Scheme))); + Become(&TThis::StateUploadScheme); + } + + void HandleSchemePutResponse(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + LOG_D("HandleSchemePutResponse" + << ", self: " << SelfId() + << ", result: " << result + ); + + if (!CheckResult(result, TStringBuf("PutObject (scheme)"))) { + return; + } + SchemeUploaded = true; + UploadPermissions(); + } + + void UploadPermissions() { + Y_ABORT_UNLESS(!PermissionsUploaded); + + if (!Permissions) { + return Finish(false, "cannot infer permissions"); + } + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(Sprintf("%s/permissions.pb", DestinationPrefix->c_str())); + + Send(StorageOperator, new TEvExternalStorage::TEvPutObjectRequest(request, TString(Permissions))); + Become(&TThis::StateUploadPermissions); + } + + void HandlePermissionsPutResponse(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + LOG_D("HandlePermissionsPutResponse" + << ", self: " << SelfId() + << ", result: " << result + ); + + if (!CheckResult(result, TStringBuf("PutObject (permissions)"))) { + return; + } + PermissionsUploaded = true; + UploadMetadata(); + } + + void UploadMetadata() { + Y_ABORT_UNLESS(!MetadataUploaded); + + if (!Metadata) { + return Finish(false, "empty metadata"); + } + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(Sprintf("%s/metadata.json", DestinationPrefix->c_str())); + + Send(StorageOperator, new TEvExternalStorage::TEvPutObjectRequest(request, TString(Metadata))); + Become(&TThis::StateUploadMetadata); + } + + void HandleMetadataPutResponse(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + LOG_D("HandleMetadataPutResponse" + << ", self: " << SelfId() + << ", result: " << result + ); + + if (!CheckResult(result, TStringBuf("PutObject (metadata)"))) { + return; + } + MetadataUploaded = true; + Finish(); + } + + bool CheckResult(const TPutObjectResult& result, const TStringBuf marker) { + if (result.IsSuccess()) { + return true; + } + + LOG_E("Error at '" << marker << "'" + << ", self: " << SelfId() + << ", error: " << result + ); + + RetryOrFinish(result.GetError()); + return false; + } + + void RetryOrFinish(const Aws::S3::S3Error& error) { + if (Attempt < Retries && ShouldRetry(error)) { + Retry(); + } else { + Finish(false, TStringBuilder() << "S3 error: " << error.GetMessage()); + } + } + + static bool ShouldRetry(const Aws::S3::S3Error& error) { + if (error.ShouldRetry()) { + return true; + } + return error.GetExceptionName() == "TooManyRequests"; + } + + void Retry() { + Delay = Min(Delay * ++Attempt, MaxDelay); + const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds()); + Schedule(Delay + random, new TEvents::TEvWakeup()); + } + + void Finish(bool success = true, const TString& error = TString()) { + LOG_I("Finish" + << ", self: " << SelfId() + << ", success: " << success + << ", error: " << error + ); + + Send(SchemeShard, new TEvPrivate::TEvExportSchemeUploadResult(ExportId, ItemIdx, success, error)); + PassAway(); + } + + void PassAway() override { + Send(StorageOperator, new TEvents::TEvPoisonPill()); + IActor::PassAway(); + } + +public: + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::EXPORT_S3_UPLOADER_ACTOR; + } + + TSchemeUploader( + TActorId schemeShard, + ui64 exportId, + ui32 itemIdx, + TPathId sourcePathId, + const Ydb::Export::ExportToS3Settings& settings, + const TString& databaseRoot, + const TString& metadata + ) + : SchemeShard(schemeShard) + , ExportId(exportId) + , ItemIdx(itemIdx) + , SourcePathId(sourcePathId) + , ExternalStorageConfig(new TS3ExternalStorageConfig(settings)) + , Retries(settings.number_of_retries()) + , DatabaseRoot(databaseRoot) + , Metadata(metadata) + { + if (itemIdx < ui32(settings.items_size())) { + DestinationPrefix = settings.items(itemIdx).destination_prefix(); + } + } + + void Bootstrap() { + if (!DestinationPrefix) { + return Finish(false, TStringBuilder() << "cannot determine destination prefix, item index: " << ItemIdx << " out of range"); + } + if (!Scheme || !Permissions) { + return GetDescription(); + } + if (!SchemeUploaded) { + return UploadScheme(); + } + if (!PermissionsUploaded) { + return UploadPermissions(); + } + if (!MetadataUploaded) { + return UploadMetadata(); + } + Finish(); + } + + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + sFunc(TEvents::TEvWakeup, Bootstrap); + sFunc(TEvents::TEvPoisonPill, PassAway); + } + } + + STATEFN(StateDescribe) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvSchemeShard::TEvDescribeSchemeResult, HandleSchemeDescription); + default: + return StateBase(ev); + } + } + + STATEFN(StateUploadScheme) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleSchemePutResponse); + default: + return StateBase(ev); + } + } + + STATEFN(StateUploadPermissions) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandlePermissionsPutResponse); + default: + return StateBase(ev); + } + } + + STATEFN(StateUploadMetadata) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleMetadataPutResponse); + default: + return StateBase(ev); + } + } + +private: + + TActorId SchemeShard; + + ui64 ExportId; + ui32 ItemIdx; + TPathId SourcePathId; + + NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig; + TMaybe DestinationPrefix; + + ui32 Attempt = 0; + const ui32 Retries; + + TString DatabaseRoot; + + TActorId StorageOperator; + + TDuration Delay = TDuration::Minutes(1); + static constexpr TDuration MaxDelay = TDuration::Minutes(10); + + TString Scheme; + bool SchemeUploaded = false; + + TString Permissions; + bool PermissionsUploaded = false; + + TString Metadata; + bool MetadataUploaded = false; + +}; // TSchemeUploader + +IActor* CreateSchemeUploader(TActorId schemeShard, ui64 exportId, ui32 itemIdx, TPathId sourcePathId, + const Ydb::Export::ExportToS3Settings& settings, const TString& databaseRoot, const TString& metadata +) { + return new TSchemeUploader(schemeShard, exportId, itemIdx, sourcePathId, settings, databaseRoot, metadata); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.h b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.h new file mode 100644 index 000000000000..d64d53a318c0 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.h @@ -0,0 +1,16 @@ +#pragma once + +#include +#include + +namespace Ydb::Export { + class ExportToS3Settings; +} + +namespace NKikimr::NSchemeShard { + +NActors::IActor* CreateSchemeUploader(NActors::TActorId schemeShard, ui64 exportId, ui32 itemIdx, TPathId sourcePathId, + const Ydb::Export::ExportToS3Settings& settings, const TString& databaseRoot, const TString& metadata +); + +} diff --git a/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader_fallback.cpp b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader_fallback.cpp new file mode 100644 index 000000000000..030551ed79eb --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader_fallback.cpp @@ -0,0 +1,40 @@ +#include "schemeshard_export_scheme_uploader.h" + +#include +#include + +using namespace NActors; + +namespace NKikimr::NSchemeShard { + +class TSchemeUploaderFallback: public TActorBootstrapped { +public: + explicit TSchemeUploaderFallback(TActorId schemeShard, ui64 exportId, ui32 itemIdx) + : SchemeShard(schemeShard) + , ExportId(exportId) + , ItemIdx(itemIdx) + { + } + + void Bootstrap() { + Send(SchemeShard, new TEvPrivate::TEvExportSchemeUploadResult(ExportId, ItemIdx, false, + "Exports to S3 are disabled" + )); + PassAway(); + } + +private: + TActorId SchemeShard; + ui64 ExportId; + ui32 ItemIdx; +}; + + +IActor* CreateSchemeUploader(TActorId schemeShard, ui64 exportId, ui32 itemIdx, TPathId sourcePathId, + const Ydb::Export::ExportToS3Settings& settings, const TString& databaseRoot, const TString& metadata +) { + Y_UNUSED(sourcePathId, settings, databaseRoot, metadata); + return new TSchemeUploaderFallback(schemeShard, exportId, itemIdx); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 5f3ec61c1559..63fee5351800 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4539,6 +4539,9 @@ void TSchemeShard::Die(const TActorContext &ctx) { if (TabletMigrator) { ctx.Send(TabletMigrator, new TEvents::TEvPoisonPill()); } + for (TActorId schemeUploader : RunningExportSchemeUploaders) { + ctx.Send(schemeUploader, new TEvents::TEvPoisonPill()); + } IndexBuildPipes.Shutdown(ctx); CdcStreamScanPipes.Shutdown(ctx); @@ -4827,6 +4830,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvExport::TEvListExportsRequest, Handle); // } // NExport HFuncTraced(NBackground::TEvListRequest, Handle); + HFuncTraced(TEvPrivate::TEvExportSchemeUploadResult, Handle); // namespace NImport { HFuncTraced(TEvImport::TEvCreateImportRequest, Handle); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 52a1d89ce367..5385b50682e9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1205,6 +1205,8 @@ class TSchemeShard THashMap ExportsByUid; THashMap> TxIdToExport; THashMap> TxIdToDependentExport; + // This set is needed to kill all the running scheme uploaders on SchemeShard death. + THashSet RunningExportSchemeUploaders; void FromXxportInfo(NKikimrExport::TExport& exprt, const TExportInfo::TPtr exportInfo); @@ -1236,6 +1238,7 @@ class TSchemeShard NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TTxId completedTxId); + NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev); void Handle(TEvExport::TEvCreateExportRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvExport::TEvGetExportRequest::TPtr& ev, const TActorContext& ctx); @@ -1243,6 +1246,7 @@ class TSchemeShard void Handle(TEvExport::TEvForgetExportRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvExport::TEvListExportsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TAutoPtr>& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev, const TActorContext& ctx); void ResumeExports(const TVector& exportIds, const TActorContext& ctx); // } // NExport diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 086e0b692bb5..a86ea66bb5c9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -2091,6 +2091,7 @@ TString TExportInfo::TItem::ToString(ui32 idx) const { << " Idx: " << idx << " SourcePathName: '" << SourcePathName << "'" << " SourcePathId: " << SourcePathId + << " SourcePathType: " << SourcePathType << " State: " << State << " SubState: " << SubState << " WaitTxId: " << WaitTxId diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 2bfc69f91dbb..be85a45af7be 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2683,17 +2683,20 @@ struct TExportInfo: public TSimpleRefCount { TString SourcePathName; TPathId SourcePathId; + NKikimrSchemeOp::EPathType SourcePathType; EState State = EState::Waiting; ESubState SubState = ESubState::AllocateTxId; TTxId WaitTxId = InvalidTxId; + TActorId SchemeUploader; TString Issue; TItem() = default; - explicit TItem(const TString& sourcePathName, const TPathId sourcePathId) + explicit TItem(const TString& sourcePathName, const TPathId sourcePathId, NKikimrSchemeOp::EPathType sourcePathType) : SourcePathName(sourcePathName) , SourcePathId(sourcePathId) + , SourcePathType(sourcePathType) { } diff --git a/ydb/core/tx/schemeshard/schemeshard_path.cpp b/ydb/core/tx/schemeshard/schemeshard_path.cpp index cfed8c7e2215..d01f285c0f8f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path.cpp @@ -922,6 +922,26 @@ const TPath::TChecker& TPath::TChecker::IsBackupCollection(EStatus status) const << " (" << BasicPathInfo(Path.Base()) << ")"); } +const TPath::TChecker& TPath::TChecker::IsSupportedInExports(EStatus status) const { + if (Failed) { + return *this; + } + + // Warning: scheme objects using YQL backups should only be allowed to be exported + // when we can be certain that the database will never be downgraded to a version + // which does not support the YQL export process. Otherwise, they will be considered as tables, + // and we might cause the process to be aborted. + if (Path.Base()->IsTable() + || (Path.Base()->IsView() && AppData()->FeatureFlags.GetEnableViewExport()) + ) { + return *this; + } + + return Fail(status, TStringBuilder() << "path type is not supported in exports" + << " (" << BasicPathInfo(Path.Base()) << ")" + ); +} + const TPath::TChecker& TPath::TChecker::PathShardsLimit(ui64 delta, EStatus status) const { if (Failed) { return *this; diff --git a/ydb/core/tx/schemeshard/schemeshard_path.h b/ydb/core/tx/schemeshard/schemeshard_path.h index 81e3ee13d09b..4e017c51b05a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.h +++ b/ydb/core/tx/schemeshard/schemeshard_path.h @@ -104,6 +104,7 @@ class TPath { const TChecker& FailOnRestrictedCreateInTempZone(bool allowCreateInTemporaryDir = false, EStatus status = EStatus::StatusPreconditionFailed) const; const TChecker& IsResourcePool(EStatus status = EStatus::StatusNameConflict) const; const TChecker& IsBackupCollection(EStatus status = EStatus::StatusNameConflict) const; + const TChecker& IsSupportedInExports(EStatus status = EStatus::StatusNameConflict) const; }; public: diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index 2ba95e78b0c8..ef5909153a89 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -15,6 +15,7 @@ namespace TEvPrivate { EvRunConditionalErase, EvIndexBuildBilling, EvImportSchemeReady, + EvExportSchemeUploadResult, EvServerlessStorageBilling, EvCleanDroppedPaths, EvCleanDroppedSubDomains, @@ -95,6 +96,20 @@ namespace TEvPrivate { {} }; + struct TEvExportSchemeUploadResult: public TEventLocal { + const ui64 ExportId; + const ui32 ItemIdx; + const bool Success; + const TString Error; + + TEvExportSchemeUploadResult(ui64 id, ui32 itemIdx, bool success, const TString& error) + : ExportId(id) + , ItemIdx(itemIdx) + , Success(success) + , Error(error) + {} + }; + struct TEvServerlessStorageBilling: public TEventLocal { TEvServerlessStorageBilling() {} diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 5fb719e771e0..c08e2a6472e8 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1209,6 +1209,7 @@ struct Schema : NIceDb::Schema { struct BackupTxId : Column<6, NScheme::NTypeIds::Uint64> { using Type = TTxId; }; struct Issue : Column<7, NScheme::NTypeIds::Utf8> {}; struct SourceOwnerPathId : Column<8, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; + struct SourcePathType : Column<9, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::EPathType; static constexpr Type Default = NKikimrSchemeOp::EPathTypeTable; }; using TKey = TableKey; using TColumns = TableColumns< @@ -1219,7 +1220,8 @@ struct Schema : NIceDb::Schema { State, BackupTxId, Issue, - SourceOwnerPathId + SourceOwnerPathId, + SourcePathType >; }; diff --git a/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp b/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp index 97e34a6d3fa4..bd880a149a4d 100644 --- a/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp +++ b/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp @@ -8,9 +8,10 @@ #include +using namespace NKikimrSchemeOp; +using namespace NKikimr::NWrappers::NTestHelpers; using namespace NSchemeShardUT_Private; using namespace NSchemeShardUT_Private::NExportReboots; -using namespace NKikimr::NWrappers::NTestHelpers; namespace { @@ -27,9 +28,9 @@ Y_TEST_HOOK_AFTER_RUN(ShutdownAwsAPI) { } Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { - using TUnderlying = std::function&, const TString&, TTestWithReboots&)>; + using TUnderlying = std::function&, const TString&, TTestWithReboots&)>; - void Decorate(const TVector& tables, const TString& request, TUnderlying func) { + void Decorate(const TVector& schemeObjects, const TString& request, TUnderlying func) { TPortManager portManager; const ui16 port = portManager.GetPort(); @@ -37,19 +38,19 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { TS3Mock s3Mock({}, TS3Mock::TSettings(port)); UNIT_ASSERT(s3Mock.Start()); - func(tables, Sprintf(request.c_str(), port), t); + func(schemeObjects, Sprintf(request.c_str(), port), t); } - void RunS3(const TVector& tables, const TString& request) { - Decorate(tables, request, &Run); + void RunS3(const TVector& schemeObjects, const TString& request) { + Decorate(schemeObjects, request, &Run); } - void CancelS3(const TVector& tables, const TString& request) { - Decorate(tables, request, &Cancel); + void CancelS3(const TVector& schemeObjects, const TString& request) { + Decorate(schemeObjects, request, &Cancel); } - void ForgetS3(const TVector& tables, const TString& request) { - Decorate(tables, request, &Forget); + void ForgetS3(const TVector& schemeObjects, const TString& request) { + Decorate(schemeObjects, request, &Forget); } Y_UNIT_TEST(ShouldSucceedOnSingleShardTable) { @@ -127,6 +128,60 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { )"); } + Y_UNIT_TEST(ShouldSucceedOnSingleView) { + RunS3({ + { + R"( + Name: "View" + QueryText: "some query" + )", + EPathTypeView + } + }, R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/View" + destination_prefix: "" + } + } + )"); + } + + Y_UNIT_TEST(ShouldSucceedOnViewsAndTables) { + RunS3({ + { + R"( + Name: "View" + QueryText: "some query" + )", + EPathTypeView + }, { + R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", + EPathTypeTable + } + }, R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/View" + destination_prefix: "view" + } + items { + source_path: "/MyRoot/Table" + destination_prefix: "table" + } + } + )"); + } + Y_UNIT_TEST(CancelShouldSucceedOnSingleShardTable) { CancelS3({ R"( @@ -202,6 +257,60 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { )"); } + Y_UNIT_TEST(CancelShouldSucceedOnSingleView) { + CancelS3({ + { + R"( + Name: "View" + QueryText: "some query" + )", + EPathTypeView + } + }, R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/View" + destination_prefix: "" + } + } + )"); + } + + Y_UNIT_TEST(CancelShouldSucceedOnViewsAndTables) { + CancelS3({ + { + R"( + Name: "View" + QueryText: "some query" + )", + EPathTypeView + }, { + R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", + EPathTypeTable + } + }, R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/View" + destination_prefix: "view" + } + items { + source_path: "/MyRoot/Table" + destination_prefix: "table" + } + } + )"); + } + Y_UNIT_TEST(ForgetShouldSucceedOnSingleShardTable) { ForgetS3({ R"( @@ -276,4 +385,58 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { } )"); } + + Y_UNIT_TEST(ForgetShouldSucceedOnSingleView) { + ForgetS3({ + { + R"( + Name: "View" + QueryText: "some query" + )", + EPathTypeView + } + }, R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/View" + destination_prefix: "" + } + } + )"); + } + + Y_UNIT_TEST(ForgetShouldSucceedOnViewsAndTables) { + ForgetS3({ + { + R"( + Name: "View" + QueryText: "some query" + )", + EPathTypeView + }, { + R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", + EPathTypeTable + } + }, R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/View" + destination_prefix: "view" + } + items { + source_path: "/MyRoot/Table" + destination_prefix: "table" + } + } + )"); + } } diff --git a/ydb/core/tx/schemeshard/ut_export_reboots_s3/ya.make b/ydb/core/tx/schemeshard/ut_export_reboots_s3/ya.make index 121b4dc461cd..bc24f9093a09 100644 --- a/ydb/core/tx/schemeshard/ut_export_reboots_s3/ya.make +++ b/ydb/core/tx/schemeshard/ut_export_reboots_s3/ya.make @@ -2,7 +2,7 @@ UNITTEST_FOR(ydb/core/tx/schemeshard) FORK_SUBTESTS() -SPLIT_FACTOR(12) +SPLIT_FACTOR(18) IF (SANITIZER_TYPE OR WITH_VALGRIND) SIZE(LARGE) diff --git a/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp b/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp index 568e9e5749ed..56444c79bdd1 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.cpp @@ -6,20 +6,37 @@ #include #include +using namespace NKikimrSchemeOp; + namespace NSchemeShardUT_Private { namespace NExportReboots { -void Run(const TVector& tables, const TString& request, TTestWithReboots& t) { +void CreateSchemeObjects(TTestWithReboots& t, TTestActorRuntime& runtime, const TVector& schemeObjects) { + TSet toWait; + for (const auto& [scheme, type] : schemeObjects) { + switch (type) { + case EPathTypeTable: + TestCreateTable(runtime, ++t.TxId, "/MyRoot", scheme); + break; + case EPathTypeView: + TestCreateView(runtime, ++t.TxId, "/MyRoot", scheme); + break; + default: + UNIT_FAIL("export is not implemented for the scheme object type: " << type); + return; + } + toWait.insert(t.TxId); + } + t.TestEnv->TestWaitNotification(runtime, toWait); +} + +void Run(const TVector& schemeObjects, const TString& request, TTestWithReboots& t) { t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + runtime.GetAppData().FeatureFlags.SetEnableViewExport(true); + runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); { TInactiveZone inactive(activeZone); - - TSet toWait; - for (const auto& table : tables) { - TestCreateTable(runtime, ++t.TxId, "/MyRoot", table); - toWait.insert(t.TxId); - } - t.TestEnv->TestWaitNotification(runtime, toWait); + CreateSchemeObjects(t, runtime, schemeObjects); } TestExport(runtime, ++t.TxId, "/MyRoot", request); @@ -47,17 +64,13 @@ void Run(const TVector& tables, const TString& request, TTestWithReboot }); } -void Cancel(const TVector& tables, const TString& request, TTestWithReboots& t) { +void Cancel(const TVector& schemeObjects, const TString& request, TTestWithReboots& t) { t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + runtime.GetAppData().FeatureFlags.SetEnableViewExport(true); + runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); { TInactiveZone inactive(activeZone); - - TSet toWait; - for (const auto& table : tables) { - TestCreateTable(runtime, ++t.TxId, "/MyRoot", table); - toWait.insert(t.TxId); - } - t.TestEnv->TestWaitNotification(runtime, toWait); + CreateSchemeObjects(t, runtime, schemeObjects); } TestExport(runtime, ++t.TxId, "/MyRoot", request); @@ -90,17 +103,13 @@ void Cancel(const TVector& tables, const TString& request, TTestWithReb }); } -void Forget(const TVector& tables, const TString& request, TTestWithReboots& t) { +void Forget(const TVector& schemeObjects, const TString& request, TTestWithReboots& t) { t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + runtime.GetAppData().FeatureFlags.SetEnableViewExport(true); + runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); { TInactiveZone inactive(activeZone); - - TSet toWait; - for (const auto& table : tables) { - TestCreateTable(runtime, ++t.TxId, "/MyRoot", table); - toWait.insert(t.TxId); - } - t.TestEnv->TestWaitNotification(runtime, toWait); + CreateSchemeObjects(t, runtime, schemeObjects); TestExport(runtime, ++t.TxId, "/MyRoot", request); t.TestEnv->TestWaitNotification(runtime, t.TxId); diff --git a/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.h b/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.h index a7f869c58512..6682b0553547 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.h +++ b/ydb/core/tx/schemeshard/ut_helpers/export_reboots_common.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include @@ -9,9 +11,24 @@ class TTestWithReboots; namespace NExportReboots { -void Run(const TVector& tables, const TString& request, TTestWithReboots& t); -void Cancel(const TVector& tables, const TString& request, TTestWithReboots& t); -void Forget(const TVector& tables, const TString& request, TTestWithReboots& t); +struct TTypedScheme { + TString Scheme; + NKikimrSchemeOp::EPathType Type; + + explicit TTypedScheme(const TString& scheme, NKikimrSchemeOp::EPathType type = NKikimrSchemeOp::EPathTypeTable) + : Scheme(scheme) + , Type(type) + {} + + TTypedScheme(const char* scheme, NKikimrSchemeOp::EPathType type = NKikimrSchemeOp::EPathTypeTable) + : Scheme(scheme) + , Type(type) + {} +}; + +void Run(const TVector& schemeObjects, const TString& request, TTestWithReboots& t); +void Cancel(const TVector& schemeObjects, const TString& request, TTestWithReboots& t); +void Forget(const TVector& schemeObjects, const TString& request, TTestWithReboots& t); } // NExportReboots } // NSchemeShardUT_Private diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index e6692fa6fb41..ac12c4ccb7a8 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -1079,7 +1079,8 @@ namespace NSchemeShardUT_Private { TAutoPtr handle; auto ev = runtime.GrabEdgeEvent(handle); - UNIT_ASSERT_EQUAL(ev->Record.GetResponse().GetEntry().GetStatus(), expectedStatus); + const auto& entry = ev->Record.GetResponse().GetEntry(); + UNIT_ASSERT_VALUES_EQUAL_C(entry.GetStatus(), expectedStatus, entry.GetIssues()); } void TestExport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, const TString& peerName, @@ -1093,18 +1094,11 @@ namespace NSchemeShardUT_Private { TAutoPtr handle; auto ev = runtime.GrabEdgeEvent(handle); - const auto result = ev->Record.GetResponse().GetEntry().GetStatus(); + const auto& entry = ev->Record.GetResponse().GetEntry(); + const auto status = entry.GetStatus(); - bool found = false; - for (const auto status : expectedStatuses) { - if (result == status) { - found = true; - break; - } - } - - if (!found) { - UNIT_ASSERT_C(found, "Unexpected status: " << Ydb::StatusIds::StatusCode_Name(result)); + if (!IsIn(expectedStatuses, status)) { + UNIT_FAIL("Unexpected status: " << Ydb::StatusIds::StatusCode_Name(status) << ", issues: " << entry.GetIssues()); } return ev->Record; @@ -1198,7 +1192,8 @@ namespace NSchemeShardUT_Private { TAutoPtr handle; auto ev = runtime.GrabEdgeEvent(handle); - UNIT_ASSERT_EQUAL(ev->Record.GetResponse().GetEntry().GetStatus(), expectedStatus); + const auto& entry = ev->Record.GetResponse().GetEntry(); + UNIT_ASSERT_VALUES_EQUAL_C(entry.GetStatus(), expectedStatus, entry.GetIssues()); } void TestImport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, const TString& peerName, @@ -1212,18 +1207,11 @@ namespace NSchemeShardUT_Private { TAutoPtr handle; auto ev = runtime.GrabEdgeEvent(handle); - const auto result = ev->Record.GetResponse().GetEntry().GetStatus(); - - bool found = false; - for (const auto status : expectedStatuses) { - if (result == status) { - found = true; - break; - } - } + const auto& entry = ev->Record.GetResponse().GetEntry(); + const auto status = entry.GetStatus(); - if (!found) { - UNIT_ASSERT_C(found, "Unexpected status: " << Ydb::StatusIds::StatusCode_Name(result) << " issues: " << ev->Record.GetResponse().GetEntry().GetIssues()); + if (!IsIn(expectedStatuses, status)) { + UNIT_FAIL("Unexpected status: " << Ydb::StatusIds::StatusCode_Name(status) << ", issues: " << entry.GetIssues()); } return ev->Record; diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 5dd8b1c9c700..182a7f2f1a0f 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -308,6 +308,7 @@ PEERDIR( ydb/library/login ydb/library/login/protos ydb/library/protobuf_printer + ydb/public/lib/ydb_cli/dump/util yql/essentials/minikql yql/essentials/providers/common/proto ydb/services/bg_tasks @@ -319,10 +320,12 @@ YQL_LAST_ABI_VERSION() IF (OS_WINDOWS) SRCS( + schemeshard_export_scheme_uploader_fallback.cpp schemeshard_import_scheme_getter_fallback.cpp ) ELSE() SRCS( + schemeshard_export_scheme_uploader.cpp schemeshard_import_scheme_getter.cpp ) ENDIF() diff --git a/ydb/library/backup/backup.cpp b/ydb/library/backup/backup.cpp index 86489ff290b8..9ece91f28e31 100644 --- a/ydb/library/backup/backup.cpp +++ b/ydb/library/backup/backup.cpp @@ -41,8 +41,6 @@ #include -#include - namespace NYdb::NBackup { @@ -552,68 +550,6 @@ NView::TViewDescription DescribeView(NView::TViewClient& client, const TString& return status.GetViewDescription(); } -struct TViewQuerySplit { - TString ContextRecreation; - TString Select; -}; - -TViewQuerySplit SplitViewQuery(TStringInput query) { - // to do: make the implementation more versatile - TViewQuerySplit split; - - TString line; - while (query.ReadLine(line)) { - (line.StartsWith("--") || line.StartsWith("PRAGMA ") - ? split.ContextRecreation - : split.Select - ) += line; - } - - return split; -} - -void ValidateViewQuery(const TString& query, const TString& dbPath, NYql::TIssues& accumulatedIssues) { - NYql::TIssues subIssues; - if (!NDump::ValidateViewQuery(query, subIssues)) { - NYql::TIssue restorabilityIssue( - TStringBuilder() << "Restorability of the view: " << dbPath.Quote() - << " storing the following query:\n" - << query - << "\ncannot be guaranteed. For more information, please consult the 'ydb tools dump' documentation." - ); - restorabilityIssue.Severity = NYql::TSeverityIds::S_WARNING; - for (const auto& subIssue : subIssues) { - restorabilityIssue.AddSubIssue(MakeIntrusive(subIssue)); - } - accumulatedIssues.AddIssue(std::move(restorabilityIssue)); - } -} - -TString BuildCreateViewQuery(TStringBuf name, const NView::TViewDescription& description, TStringBuf backupRoot) { - auto queryText = TString{description.GetQueryText()}; - auto [contextRecreation, select] = SplitViewQuery(queryText); - - const TString query = std::format( - "-- backup root: \"{}\"\n" - "{}\n" - "CREATE VIEW IF NOT EXISTS `{}` WITH (security_invoker = TRUE) AS\n" - " {};\n", - backupRoot.data(), - contextRecreation.data(), - name.data(), - select.data() - ); - - TString formattedQuery; - TString errors; - Y_ENSURE(NSQLFormat::SqlFormatSimple( - query, - formattedQuery, - errors - ), errors); - return formattedQuery; -} - } /*! @@ -635,19 +571,22 @@ void BackupView(TDriver driver, const TString& dbBackupRoot, const TString& dbPa LOG_I("Backup view " << dbPath.Quote() << " to " << fsBackupDir.GetPath().Quote()); NView::TViewClient client(driver); - auto viewDescription = DescribeView(client, dbPath); - - ValidateViewQuery(TString{viewDescription.GetQueryText()}, dbPath, issues); + const auto viewDescription = DescribeView(client, dbPath); const auto fsPath = fsBackupDir.Child(NDump::NFiles::CreateView().FileName); LOG_D("Write view creation query to " << fsPath.GetPath().Quote()); - TFileOutput output(fsPath); - output << BuildCreateViewQuery( + const auto creationQuery = NDump::BuildCreateViewQuery( TFsPath(dbPathRelativeToBackupRoot).GetName(), - viewDescription, - dbBackupRoot + dbPath, + TString(viewDescription.GetQueryText()), + dbBackupRoot, + issues ); + Y_ENSURE(creationQuery, issues.ToString()); + + TFileOutput output(fsPath); + output << creationQuery; BackupPermissions(driver, dbBackupRoot, dbPathRelativeToBackupRoot, fsBackupDir); } diff --git a/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp b/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp index ac5d51f62e76..4c61632a87b2 100644 --- a/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp +++ b/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp @@ -11,6 +11,8 @@ #include #include +#include + using namespace NSQLv1Generated; namespace { @@ -103,15 +105,6 @@ void VisitAllFields(const NProtoBuf::Message& msg, TTokenCollector& callback) { } } -bool Format(const TString& query, TString& formattedQuery, NYql::TIssues& issues) { - google::protobuf::Arena arena; - NSQLTranslation::TTranslationSettings settings; - settings.Arena = &arena; - - auto formatter = NSQLFormat::MakeSqlFormatter(settings); - return formatter->Format(query, formattedQuery, issues); -} - struct TTableRefValidator { // returns true if the message is not a table ref and we need to dive deeper to find it @@ -184,10 +177,6 @@ bool SqlToProtoAst(const TString& query, TRule_sql_query& queryProto, NYql::TIss return true; } -} - -namespace NYdb::NDump { - bool ValidateViewQuery(const TString& query, NYql::TIssues& issues) { TRule_sql_query queryProto; if (!SqlToProtoAst(query, queryProto, issues)) { @@ -196,6 +185,10 @@ bool ValidateViewQuery(const TString& query, NYql::TIssues& issues) { return ValidateTableRefs(queryProto, issues); } +} + +namespace NYdb::NDump { + TString RewriteAbsolutePath(TStringBuf path, TStringBuf backupRoot, TStringBuf restoreRoot) { if (backupRoot == restoreRoot) { return TString(path); @@ -231,4 +224,71 @@ bool RewriteTableRefs(TString& query, TStringBuf backupRoot, TStringBuf restoreR return true; } +TViewQuerySplit SplitViewQuery(TStringInput query) { + // to do: make the implementation more versatile + TViewQuerySplit split; + + TString line; + while (query.ReadLine(line)) { + (line.StartsWith("--") || line.StartsWith("PRAGMA ") + ? split.ContextRecreation + : split.Select + ) += line; + } + + return split; +} + +void ValidateViewQuery(const TString& query, const TString& dbPath, NYql::TIssues& issues) { + NYql::TIssues subIssues; + if (!::ValidateViewQuery(query, subIssues)) { + NYql::TIssue restorabilityIssue( + TStringBuilder() << "Restorability of the view: " << dbPath.Quote() + << " storing the following query:\n" + << query + << "\ncannot be guaranteed. For more information, please refer to the 'ydb tools dump' documentation." + ); + restorabilityIssue.Severity = NYql::TSeverityIds::S_WARNING; + for (const auto& subIssue : subIssues) { + restorabilityIssue.AddSubIssue(MakeIntrusive(subIssue)); + } + issues.AddIssue(std::move(restorabilityIssue)); + } +} + +bool Format(const TString& query, TString& formattedQuery, NYql::TIssues& issues) { + google::protobuf::Arena arena; + NSQLTranslation::TTranslationSettings settings; + settings.Arena = &arena; + + auto formatter = NSQLFormat::MakeSqlFormatter(settings); + return formatter->Format(query, formattedQuery, issues); +} + +TString BuildCreateViewQuery( + const TString& name, const TString& dbPath, const TString& viewQuery, const TString& backupRoot, + NYql::TIssues& issues +) { + auto [contextRecreation, select] = SplitViewQuery(viewQuery); + + const TString creationQuery = std::format( + "-- backup root: \"{}\"\n" + "{}\n" + "CREATE VIEW IF NOT EXISTS `{}` WITH (security_invoker = TRUE) AS\n" + " {};\n", + backupRoot.data(), + contextRecreation.data(), + name.data(), + select.data() + ); + + ValidateViewQuery(creationQuery, dbPath, issues); + + TString formattedQuery; + if (!Format(creationQuery, formattedQuery, issues)) { + return ""; + } + return formattedQuery; +} + } diff --git a/ydb/public/lib/ydb_cli/dump/util/view_utils.h b/ydb/public/lib/ydb_cli/dump/util/view_utils.h index 6e65ca7c78ab..2b87a5022e5b 100644 --- a/ydb/public/lib/ydb_cli/dump/util/view_utils.h +++ b/ydb/public/lib/ydb_cli/dump/util/view_utils.h @@ -2,10 +2,25 @@ namespace NYdb::NDump { -bool ValidateViewQuery(const TString& query, NYql::TIssues& issues); - TString RewriteAbsolutePath(TStringBuf path, TStringBuf backupRoot, TStringBuf restoreRoot); bool RewriteTableRefs(TString& scheme, TStringBuf backupRoot, TStringBuf restoreRoot, NYql::TIssues& issues); +struct TViewQuerySplit { + TString ContextRecreation; + TString Select; +}; + +TViewQuerySplit SplitViewQuery(TStringInput query); + +// returns void, because the validation is non-blocking +void ValidateViewQuery(const TString& query, const TString& dbPath, NYql::TIssues& issues); + +bool Format(const TString& query, TString& formattedQuery, NYql::TIssues& issues); + +TString BuildCreateViewQuery( + const TString& name, const TString& dbPath, const TString& viewQuery, const TString& backupRoot, + NYql::TIssues& issues +); + } diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 9ec4b87cd28b..caf96b227559 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -3205,6 +3205,11 @@ "ColumnId": 8, "ColumnName": "SourceOwnerPathId", "ColumnType": "Uint64" + }, + { + "ColumnId": 9, + "ColumnName": "SourcePathType", + "ColumnType": "Uint32" } ], "ColumnsDropped": [], @@ -3218,7 +3223,8 @@ 5, 6, 7, - 8 + 8, + 9 ], "RoomID": 0, "Codec": 0,