diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 94db2bd0539a..65e4a7280c46 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -719,6 +720,7 @@ namespace Tests { GRpcServer->AddService(new NGRpcService::TGRpcYdbLogStoreService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcAuthService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcReplicationService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcViewService(system, counters, grpcRequestProxies[0], true)); GRpcServer->Start(); } diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 591379be6e29..da80cabf5516 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4329,6 +4329,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase { item.SourcePathId.OwnerId = rowset.GetValueOrDefault(selfId); item.SourcePathId.LocalPathId = rowset.GetValue(); + item.SourcePathType = rowset.GetValue(); item.State = static_cast(rowset.GetValue()); item.WaitTxId = rowset.GetValueOrDefault(InvalidTxId); @@ -4432,6 +4433,17 @@ struct TSchemeShard::TTxInit : public TTransactionBase { item.Scheme = scheme; } + if (rowset.HaveValue()) { + item.CreationQuery = rowset.GetValue(); + } + + if (rowset.HaveValue()) { + Y_ABORT_UNLESS(ParseFromStringNoSizeLimit( + item.PreparedCreationQuery, + rowset.GetValue() + )); + } + if (rowset.HaveValue()) { Ydb::Scheme::ModifyPermissionsRequest permissions; Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(permissions, rowset.GetValue())); diff --git a/ydb/core/tx/schemeshard/schemeshard_export.cpp b/ydb/core/tx/schemeshard/schemeshard_export.cpp index ea6f881ee39a..cba13774cc96 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export.cpp @@ -170,7 +170,8 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::T NIceDb::TUpdate(item.SourcePathName), NIceDb::TUpdate(item.SourcePathId.OwnerId), NIceDb::TUpdate(item.SourcePathId.LocalPathId), - NIceDb::TUpdate(static_cast(item.State)) + NIceDb::TUpdate(static_cast(item.State)), + NIceDb::TUpdate(item.SourcePathType) ); } } @@ -231,6 +232,10 @@ void TSchemeShard::Handle(TEvExport::TEvListExportsRequest::TPtr& ev, const TAct Execute(CreateTxListExports(ev), ctx); } +void TSchemeShard::Handle(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxProgressExport(ev), ctx); +} + void TSchemeShard::ResumeExports(const TVector& exportIds, const TActorContext& ctx) { for (const ui64 id : exportIds) { Execute(CreateTxProgressExport(id), ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 7f63a60a7970..2c12a94e0907 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -1,8 +1,9 @@ #include "schemeshard_xxport__tx_base.h" #include "schemeshard_xxport__helpers.h" +#include "schemeshard_export.h" #include "schemeshard_export_flow_proposals.h" #include "schemeshard_export_helpers.h" -#include "schemeshard_export.h" +#include "schemeshard_export_scheme_uploader.h" #include "schemeshard_audit_log.h" #include "schemeshard_impl.h" @@ -203,7 +204,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { .IsResolved() .NotDeleted() .NotUnderDeleting() - .IsTable() + .IsSupportedInExports() .FailOnRestrictedCreateInTempZone(); if (!checks) { @@ -212,7 +213,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } } - exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId); + exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId, path->PathType); exportInfo->PendingItems.push_back(itemIdx); } @@ -230,6 +231,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase ui64 Id; TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr; TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr; + TEvPrivate::TEvExportSchemeUploadResult::TPtr SchemeUploadResult = nullptr; TTxId CompletedTxId = InvalidTxId; explicit TTxProgress(TSelf* self, ui64 id) @@ -250,6 +252,12 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase { } + explicit TTxProgress(TSelf* self, TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev) + : TXxport::TTxBase(self) + , SchemeUploadResult(ev) + { + } + explicit TTxProgress(TSelf* self, TTxId completedTxId) : TXxport::TTxBase(self) , CompletedTxId(completedTxId) @@ -267,6 +275,8 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase OnAllocateResult(txc, ctx); } else if (ModifyResult) { OnModifyResult(txc, ctx); + } else if (SchemeUploadResult) { + OnSchemeUploadResult(txc); } else if (CompletedTxId) { OnNotifyResult(txc, ctx); } else { @@ -290,16 +300,22 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase Send(Self->SelfId(), MkDirPropose(Self, txId, exportInfo)); } - void CopyTables(TExportInfo::TPtr exportInfo, TTxId txId) { + bool CopyTables(TExportInfo::TPtr exportInfo, TTxId txId) { LOG_I("TExport::TTxProgress: CopyTables propose" << ": info# " << exportInfo->ToString() << ", txId# " << txId); Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId); - Send(Self->SelfId(), CopyTablesPropose(Self, txId, exportInfo)); + if (AnyOf(exportInfo->Items, [](const TExportInfo::TItem& item) { + return item.SourcePathType == NKikimrSchemeOp::EPathTypeTable; + })) { + Send(Self->SelfId(), CopyTablesPropose(Self, txId, exportInfo)); + return true; + } + return false; } - void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId) { + void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId, const TActorContext& ctx) { Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); auto& item = exportInfo->Items.at(itemIdx); @@ -311,7 +327,24 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase << ", txId# " << txId); Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId); - Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx)); + if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) { + Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx)); + } else if (item.SourcePathType == NKikimrSchemeOp::EPathTypeView) { + Ydb::Export::ExportToS3Settings exportSettings; + Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings)); + const auto databaseRoot = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements); + + NBackup::TMetadata metadata; + // to do: enable view checksum validation + constexpr bool EnableChecksums = false; + metadata.SetVersion(EnableChecksums ? 1 : 0); + + item.SchemeUploader = ctx.Register(CreateSchemeUploader( + Self->SelfId(), exportInfo->Id, itemIdx, item.SourcePathId, + exportSettings, databaseRoot, metadata.Serialize() + )); + Self->RunningExportSchemeUploaders.emplace(item.SchemeUploader); + } } bool CancelTransferring(TExportInfo::TPtr exportInfo, ui32 itemIdx) { @@ -461,6 +494,14 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase return path->LastTxId; } + void KillChildActors(TExportInfo::TItem& item) { + if (auto& schemeUploader = item.SchemeUploader; schemeUploader != TActorId()) { + Send(schemeUploader, new TEvents::TEvPoisonPill()); + Self->RunningExportSchemeUploaders.erase(schemeUploader); + schemeUploader = TActorId(); + } + } + void Cancel(TExportInfo::TPtr exportInfo, ui32 itemIdx, TStringBuf marker) { Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); const auto& item = exportInfo->Items.at(itemIdx); @@ -472,6 +513,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase exportInfo->State = EState::Cancelled; for (ui32 i : xrange(exportInfo->Items.size())) { + KillChildActors(exportInfo->Items[i]); if (i == itemIdx) { continue; } @@ -620,7 +662,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } } - void OnAllocateResult(TTransactionContext&, const TActorContext&) { + void OnAllocateResult(TTransactionContext& txc, const TActorContext& ctx) { Y_ABORT_UNLESS(AllocateResult); const auto txId = TTxId(AllocateResult->Get()->TxIds.front()); @@ -651,13 +693,27 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase break; case EState::CopyTables: - CopyTables(exportInfo, txId); + if (!CopyTables(exportInfo, txId)) { + // none of the items is a table + NIceDb::TNiceDb db(txc.DB); + + for (ui32 itemIdx : xrange(exportInfo->Items.size())) { + exportInfo->Items[itemIdx].State = EState::Transferring; + Self->PersistExportItemState(db, exportInfo, itemIdx); + + AllocateTxId(exportInfo, itemIdx); + } + + exportInfo->State = EState::Transferring; + Self->PersistExportState(db, exportInfo); + return; + } break; case EState::Transferring: if (exportInfo->PendingItems) { itemIdx = popPendingItemIdx(exportInfo->PendingItems); - TransferData(exportInfo, itemIdx, txId); + TransferData(exportInfo, itemIdx, txId, ctx); } else { return; } @@ -870,6 +926,72 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase SubscribeTx(txId); } + void OnSchemeUploadResult(TTransactionContext& txc) { + Y_ABORT_UNLESS(SchemeUploadResult); + const auto& result = *SchemeUploadResult.Get()->Get(); + + LOG_D("TExport::TTxProgress: OnSchemeUploadResult" + << ": id# " << result.ExportId + << ", itemIdx# " << result.ItemIdx + << ", success# " << result.Success + << ", error# " << result.Error + ); + + const auto exportId = result.ExportId; + auto exportInfo = Self->Exports.Value(exportId, nullptr); + if (!exportInfo) { + LOG_E("TExport::TTxProgress: OnSchemeUploadResult received unknown export id" + << ": id# " << exportId + ); + return; + } + + ui32 itemIdx = result.ItemIdx; + if (itemIdx >= exportInfo->Items.size()) { + LOG_E("TExport::TTxProgress: OnSchemeUploadResult item index out of range" + << ": id# " << exportId + << ", item index# " << itemIdx + << ", number of items# " << exportInfo->Items.size() + ); + return; + } + + NIceDb::TNiceDb db(txc.DB); + + auto& item = exportInfo->Items[itemIdx]; + Self->RunningExportSchemeUploaders.erase(item.SchemeUploader); + item.SchemeUploader = TActorId(); + + if (!result.Success) { + item.State = EState::Cancelled; + item.Issue = result.Error; + Self->PersistExportItemState(db, exportInfo, itemIdx); + + if (!exportInfo->IsInProgress()) { + return; + } + + Cancel(exportInfo, itemIdx, "unsuccessful scheme upload"); + + Self->PersistExportState(db, exportInfo); + return SendNotificationsIfFinished(exportInfo); + } + + if (exportInfo->State == EState::Transferring) { + item.State = EState::Done; + Self->PersistExportItemState(db, exportInfo, itemIdx); + + if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { + exportInfo->State = EState::Done; + exportInfo->EndTime = TAppData::TimeProvider->Now(); + + Self->PersistExportState(db, exportInfo); + SendNotificationsIfFinished(exportInfo); + AuditLogExportEnd(*exportInfo.Get(), Self); + } + } + } + void OnNotifyResult(TTransactionContext& txc, const TActorContext&) { Y_ABORT_UNLESS(CompletedTxId); LOG_D("TExport::TTxProgress: OnNotifyResult" @@ -948,15 +1070,18 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase item.State = EState::Done; item.WaitTxId = InvalidTxId; - if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) { - item.Issue = *issue; - Cancel(exportInfo, itemIdx, "issues during backing up"); - } else { - if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { - exportInfo->State = EState::Done; - exportInfo->EndTime = TAppData::TimeProvider->Now(); + bool itemHasIssues = false; + if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) { + if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) { + item.Issue = *issue; + Cancel(exportInfo, itemIdx, "issues during backing up"); + itemHasIssues = true; } } + if (!itemHasIssues && AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { + exportInfo->State = EState::Done; + exportInfo->EndTime = TAppData::TimeProvider->Now(); + } Self->PersistExportItemState(db, exportInfo, itemIdx); break; @@ -1016,6 +1141,10 @@ ITransaction* TSchemeShard::CreateTxProgressExport(TEvSchemeShard::TEvModifySche return new TExport::TTxProgress(this, ev); } +ITransaction* TSchemeShard::CreateTxProgressExport(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev) { + return new TExport::TTxProgress(this, ev); +} + ITransaction* TSchemeShard::CreateTxProgressExport(TTxId completedTxId) { return new TExport::TTxProgress(this, completedTxId); } diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index c403bfc128ba..74eb0c0c4cde 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -59,6 +59,9 @@ THolder CopyTablesPropose( for (ui32 itemIdx : xrange(exportInfo->Items.size())) { const auto& item = exportInfo->Items.at(itemIdx); + if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable) { + continue; + } auto& desc = *copyTables.Add(); desc.SetSrcPath(item.SourcePathName); diff --git a/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.cpp b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.cpp new file mode 100644 index 000000000000..851e6e9c8f04 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.cpp @@ -0,0 +1,342 @@ +#include "schemeshard.h" +#include "schemeshard_export_scheme_uploader.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NSchemeShard { + +class TSchemeUploader: public TActorBootstrapped { + + using TS3ExternalStorageConfig = NWrappers::NExternalStorage::TS3ExternalStorageConfig; + using TEvExternalStorage = NWrappers::TEvExternalStorage; + using TPutObjectResult = Aws::Utils::Outcome; + + void GetDescription() { + Send(SchemeShard, new TEvSchemeShard::TEvDescribeScheme(SourcePathId)); + this->Become(&TThis::StateDescribe); + } + + static TString BuildViewScheme(const TString& path, const NKikimrSchemeOp::TViewDescription& viewDescription, const TString& backupRoot, TString& error) { + NYql::TIssues issues; + auto scheme = NYdb::NDump::BuildCreateViewQuery(viewDescription.GetName(), path, viewDescription.GetQueryText(), backupRoot, issues); + if (!scheme) { + error = issues.ToString(); + } + return scheme; + } + + bool BuildSchemeToUpload(const NKikimrScheme::TEvDescribeSchemeResult& describeResult, TString& error) { + const auto pathType = describeResult.GetPathDescription().GetSelf().GetPathType(); + switch (pathType) { + case NKikimrSchemeOp::EPathTypeView: { + Scheme = BuildViewScheme(describeResult.GetPath(), describeResult.GetPathDescription().GetViewDescription(), DatabaseRoot, error); + return !Scheme.empty(); + } + default: + error = TStringBuilder() << "unsupported path type: " << pathType; + return false; + } + } + + void HandleSchemeDescription(TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev) { + const auto& describeResult = ev->Get()->GetRecord(); + + LOG_D("HandleSchemeDescription" + << ", self: " << this->SelfId() + << ", status: " << describeResult.GetStatus() + ); + + if (describeResult.GetStatus() != TEvSchemeShard::EStatus::StatusSuccess) { + return Finish(false, describeResult.GetReason()); + } + + TString error; + if (!BuildSchemeToUpload(describeResult, error)) { + return Finish(false, error); + } + + if (auto permissions = NDataShard::GenYdbPermissions(describeResult.GetPathDescription())) { + google::protobuf::TextFormat::PrintToString(permissions.GetRef(), &Permissions); + } else { + return Finish(false, "cannot infer permissions"); + } + + UploadScheme(); + } + + void UploadScheme() { + Y_ABORT_UNLESS(!SchemeUploaded); + + if (!Scheme) { + return Finish(false, "cannot infer scheme"); + } + if (Attempt == 0) { + StorageOperator = this->RegisterWithSameMailbox( + NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()) + ); + } + + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(Sprintf("%s/create_view.sql", DestinationPrefix.c_str())); + + this->Send(StorageOperator, new TEvExternalStorage::TEvPutObjectRequest(request, TString(Scheme))); + this->Become(&TThis::StateUploadScheme); + } + + void HandleSchemePutResponse(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + LOG_D("HandleSchemePutResponse" + << ", self: " << this->SelfId() + << ", result: " << result + ); + + if (!CheckResult(result, TStringBuf("PutObject (scheme)"))) { + return; + } + SchemeUploaded = true; + UploadPermissions(); + } + + void UploadPermissions() { + Y_ABORT_UNLESS(!PermissionsUploaded); + + if (!Permissions) { + return Finish(false, "cannot infer permissions"); + } + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(Sprintf("%s/permissions.pb", DestinationPrefix.c_str())); + + this->Send(StorageOperator, new TEvExternalStorage::TEvPutObjectRequest(request, TString(Permissions))); + this->Become(&TThis::StateUploadPermissions); + } + + void HandlePermissionsPutResponse(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + LOG_D("HandlePermissionsPutResponse" + << ", self: " << this->SelfId() + << ", result: " << result + ); + + if (!CheckResult(result, TStringBuf("PutObject (permissions)"))) { + return; + } + PermissionsUploaded = true; + UploadMetadata(); + } + + void UploadMetadata() { + Y_ABORT_UNLESS(!MetadataUploaded); + + if (!Metadata) { + return Finish(false, "empty metadata"); + } + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(Sprintf("%s/metadata.json", DestinationPrefix.c_str())); + + this->Send(StorageOperator, new TEvExternalStorage::TEvPutObjectRequest(request, TString(Metadata))); + this->Become(&TThis::StateUploadMetadata); + } + + void HandleMetadataPutResponse(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + LOG_D("HandleMetadataPutResponse" + << ", self: " << this->SelfId() + << ", result: " << result + ); + + if (!CheckResult(result, TStringBuf("PutObject (metadata)"))) { + return; + } + MetadataUploaded = true; + Finish(); + } + + bool CheckResult(const TPutObjectResult& result, const TStringBuf marker) { + if (result.IsSuccess()) { + return true; + } + + LOG_E("Error at '" << marker << "'" + << ", self: " << this->SelfId() + << ", error: " << result + ); + + RetryOrFinish(result.GetError()); + return false; + } + + void RetryOrFinish(const Aws::S3::S3Error& error) { + if (Attempt < Retries && ShouldRetry(error)) { + Retry(); + } else { + Finish(false, TStringBuilder() << "S3 error: " << error.GetMessage()); + } + } + + static bool ShouldRetry(const Aws::S3::S3Error& error) { + if (error.ShouldRetry()) { + return true; + } + return error.GetExceptionName() == "TooManyRequests"; + } + + void Retry() { + Delay = Min(Delay * ++Attempt, MaxDelay); + const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds()); + this->Schedule(Delay + random, new TEvents::TEvWakeup()); + } + + void Finish(bool success = true, const TString& error = TString()) { + LOG_I("Finish" + << ", self: " << this->SelfId() + << ", success: " << success + << ", error: " << error + ); + + Send(SchemeShard, new TEvPrivate::TEvExportSchemeUploadResult(ExportId, ItemIdx, success, error)); + PassAway(); + } + + void PassAway() override { + this->Send(StorageOperator, new TEvents::TEvPoisonPill()); + IActor::PassAway(); + } + +public: + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::EXPORT_S3_UPLOADER_ACTOR; + } + + TSchemeUploader( + TActorId schemeShard, + ui64 exportId, + ui32 itemIdx, + TPathId sourcePathId, + const Ydb::Export::ExportToS3Settings& settings, + const TString& databaseRoot, + const TString& metadata + ) + : SchemeShard(schemeShard) + , ExportId(exportId) + , ItemIdx(itemIdx) + , SourcePathId(sourcePathId) + , ExternalStorageConfig(new TS3ExternalStorageConfig(settings)) + , Retries(settings.number_of_retries()) + , DatabaseRoot(databaseRoot) + , Metadata(metadata) + { + if (itemIdx < ui32(settings.items_size())) { + DestinationPrefix = settings.items(itemIdx).destination_prefix(); + } + } + + void Bootstrap() { + if (!DestinationPrefix) { + return Finish(false, TStringBuilder() << "cannot determine destination prefix, item index: " << ItemIdx << " out of range"); + } + if (!Scheme || !Permissions) { + return GetDescription(); + } + if (!SchemeUploaded) { + return UploadScheme(); + } + if (!PermissionsUploaded) { + return UploadPermissions(); + } + if (!MetadataUploaded) { + return UploadMetadata(); + } + Finish(); + } + + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + sFunc(TEvents::TEvWakeup, Bootstrap); + sFunc(TEvents::TEvPoisonPill, PassAway); + } + } + + STATEFN(StateDescribe) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvSchemeShard::TEvDescribeSchemeResult, HandleSchemeDescription); + default: + return StateBase(ev); + } + } + + STATEFN(StateUploadScheme) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleSchemePutResponse); + default: + return StateBase(ev); + } + } + + STATEFN(StateUploadPermissions) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandlePermissionsPutResponse); + default: + return StateBase(ev); + } + } + + STATEFN(StateUploadMetadata) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleMetadataPutResponse); + default: + return StateBase(ev); + } + } + +private: + + TActorId SchemeShard; + + ui64 ExportId; + ui32 ItemIdx; + TPathId SourcePathId; + + NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig; + TString DestinationPrefix; + + ui32 Attempt = 0; + const ui32 Retries; + + TString DatabaseRoot; + + TActorId StorageOperator; + + TDuration Delay = TDuration::Minutes(1); + static constexpr TDuration MaxDelay = TDuration::Minutes(10); + + TString Scheme; + bool SchemeUploaded = false; + + TString Permissions; + bool PermissionsUploaded = false; + + TString Metadata; + bool MetadataUploaded = false; + +}; // TSchemeUploader + +IActor* CreateSchemeUploader(TActorId schemeShard, ui64 exportId, ui32 itemIdx, TPathId sourcePathId, + const Ydb::Export::ExportToS3Settings& settings, const TString& databaseRoot, const TString& metadata +) { + return new TSchemeUploader(schemeShard, exportId, itemIdx, sourcePathId, settings, databaseRoot, metadata); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.h b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.h new file mode 100644 index 000000000000..d64d53a318c0 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.h @@ -0,0 +1,16 @@ +#pragma once + +#include +#include + +namespace Ydb::Export { + class ExportToS3Settings; +} + +namespace NKikimr::NSchemeShard { + +NActors::IActor* CreateSchemeUploader(NActors::TActorId schemeShard, ui64 exportId, ui32 itemIdx, TPathId sourcePathId, + const Ydb::Export::ExportToS3Settings& settings, const TString& databaseRoot, const TString& metadata +); + +} diff --git a/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader_fallback.cpp b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader_fallback.cpp new file mode 100644 index 000000000000..030551ed79eb --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader_fallback.cpp @@ -0,0 +1,40 @@ +#include "schemeshard_export_scheme_uploader.h" + +#include +#include + +using namespace NActors; + +namespace NKikimr::NSchemeShard { + +class TSchemeUploaderFallback: public TActorBootstrapped { +public: + explicit TSchemeUploaderFallback(TActorId schemeShard, ui64 exportId, ui32 itemIdx) + : SchemeShard(schemeShard) + , ExportId(exportId) + , ItemIdx(itemIdx) + { + } + + void Bootstrap() { + Send(SchemeShard, new TEvPrivate::TEvExportSchemeUploadResult(ExportId, ItemIdx, false, + "Exports to S3 are disabled" + )); + PassAway(); + } + +private: + TActorId SchemeShard; + ui64 ExportId; + ui32 ItemIdx; +}; + + +IActor* CreateSchemeUploader(TActorId schemeShard, ui64 exportId, ui32 itemIdx, TPathId sourcePathId, + const Ydb::Export::ExportToS3Settings& settings, const TString& databaseRoot, const TString& metadata +) { + Y_UNUSED(sourcePathId, settings, databaseRoot, metadata); + return new TSchemeUploaderFallback(schemeShard, exportId, itemIdx); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 19224c4ef5fc..6ab353d0fa79 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4539,6 +4539,9 @@ void TSchemeShard::Die(const TActorContext &ctx) { if (TabletMigrator) { ctx.Send(TabletMigrator, new TEvents::TEvPoisonPill()); } + for (TActorId schemeUploader : RunningExportSchemeUploaders) { + ctx.Send(schemeUploader, new TEvents::TEvPoisonPill()); + } IndexBuildPipes.Shutdown(ctx); CdcStreamScanPipes.Shutdown(ctx); @@ -4827,6 +4830,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvExport::TEvListExportsRequest, Handle); // } // NExport HFuncTraced(NBackground::TEvListRequest, Handle); + HFuncTraced(TEvPrivate::TEvExportSchemeUploadResult, Handle); // namespace NImport { HFuncTraced(TEvImport::TEvCreateImportRequest, Handle); @@ -4835,6 +4839,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvImport::TEvForgetImportRequest, Handle); HFuncTraced(TEvImport::TEvListImportsRequest, Handle); HFuncTraced(TEvPrivate::TEvImportSchemeReady, Handle); + HFuncTraced(TEvPrivate::TEvImportSchemeQueryResult, Handle); // } // NImport // namespace NBackup { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index eccfd7d2cfce..d73d4fb784f4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1205,6 +1205,8 @@ class TSchemeShard THashMap ExportsByUid; THashMap> TxIdToExport; THashMap> TxIdToDependentExport; + // This set is needed to kill all the running scheme uploaders on SchemeShard death. + THashSet RunningExportSchemeUploaders; void FromXxportInfo(NKikimrExport::TExport& exprt, const TExportInfo::TPtr exportInfo); @@ -1236,6 +1238,7 @@ class TSchemeShard NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TTxId completedTxId); + NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev); void Handle(TEvExport::TEvCreateExportRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvExport::TEvGetExportRequest::TPtr& ev, const TActorContext& ctx); @@ -1243,6 +1246,7 @@ class TSchemeShard void Handle(TEvExport::TEvForgetExportRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvExport::TEvListExportsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TAutoPtr>& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev, const TActorContext& ctx); void ResumeExports(const TVector& exportIds, const TActorContext& ctx); // } // NExport @@ -1259,6 +1263,7 @@ class TSchemeShard static void PersistImportState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo); static void PersistImportItemState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); static void PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); + static void PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); static void PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); struct TImport { @@ -1282,6 +1287,7 @@ class TSchemeShard NTabletFlatExecutor::ITransaction* CreateTxProgressImport(ui64 id, const TMaybe& itemIdx = Nothing()); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeReady::TPtr& ev); + NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvIndexBuilder::TEvCreateResponse::TPtr& ev); @@ -1293,6 +1299,7 @@ class TSchemeShard void Handle(TEvImport::TEvForgetImportRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvImport::TEvListImportsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx); void ResumeImports(const TVector& ids, const TActorContext& ctx); // } // NImport diff --git a/ydb/core/tx/schemeshard/schemeshard_import.cpp b/ydb/core/tx/schemeshard/schemeshard_import.cpp index 0ced40d7370c..951b1d63c48c 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import.cpp @@ -171,11 +171,18 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); const auto& item = importInfo->Items.at(itemIdx); - db.Table().Key(importInfo->Id, itemIdx).Update( + auto record = db.Table().Key(importInfo->Id, itemIdx); + record.Update( NIceDb::TUpdate(item.Scheme.SerializeAsString()) ); + + if (!item.CreationQuery.empty()) { + record.Update( + NIceDb::TUpdate(item.CreationQuery) + ); + } if (item.Permissions.Defined()) { - db.Table().Key(importInfo->Id, itemIdx).Update( + record.Update( NIceDb::TUpdate(item.Permissions->SerializeAsString()) ); } @@ -184,6 +191,18 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf ); } +void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + const auto& item = importInfo->Items[itemIdx]; + + // persist the prepared modify scheme if it is non-empty (has operation type is interpreted as non-empty) + if (item.PreparedCreationQuery.HasOperationType()) { + db.Table().Key(importInfo->Id, itemIdx).Update( + NIceDb::TUpdate(item.PreparedCreationQuery.SerializeAsString()) + ); + } +} + void TSchemeShard::PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); const auto& item = importInfo->Items.at(itemIdx); @@ -218,6 +237,10 @@ void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TAct Execute(CreateTxProgressImport(ev), ctx); } +void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxProgressImport(ev), ctx); +} + void TSchemeShard::ResumeImports(const TVector& ids, const TActorContext& ctx) { for (const ui64 id : ids) { Execute(CreateTxProgressImport(id), ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index e3845ac57158..7871d4751371 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -1,15 +1,19 @@ -#include "schemeshard_xxport__tx_base.h" -#include "schemeshard_xxport__helpers.h" -#include "schemeshard_import_flow_proposals.h" -#include "schemeshard_import_scheme_getter.h" -#include "schemeshard_import_helpers.h" -#include "schemeshard_import.h" #include "schemeshard_audit_log.h" #include "schemeshard_impl.h" +#include "schemeshard_import.h" +#include "schemeshard_import_flow_proposals.h" +#include "schemeshard_import_helpers.h" +#include "schemeshard_import_scheme_getter.h" +#include "schemeshard_import_scheme_query_executor.h" +#include "schemeshard_xxport__helpers.h" +#include "schemeshard_xxport__tx_base.h" +#include #include #include #include +#include +#include #include #include @@ -237,6 +241,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase ui64 Id; TMaybe ItemIdx; TEvPrivate::TEvImportSchemeReady::TPtr SchemeResult = nullptr; + TEvPrivate::TEvImportSchemeQueryResult::TPtr SchemeQueryResult = nullptr; TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr; TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr; TEvIndexBuilder::TEvCreateResponse::TPtr CreateIndexResult = nullptr; @@ -255,6 +260,12 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase { } + explicit TTxProgress(TSelf* self, TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev) + : TXxport::TTxBase(self) + , SchemeQueryResult(ev) + { + } + explicit TTxProgress(TSelf* self, TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev) : TXxport::TTxBase(self) , AllocateResult(ev) @@ -288,6 +299,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase if (SchemeResult) { OnSchemeResult(txc, ctx); + } else if (SchemeQueryResult) { + OnSchemeQueryPreparation(txc); } else if (AllocateResult) { OnAllocateResult(txc, ctx); } else if (ModifyResult) { @@ -338,6 +351,65 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase Send(Self->SelfId(), std::move(propose)); } + void ExecutePreparedQuery(TTransactionContext& txc, TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + auto& item = importInfo->Items[itemIdx]; + + item.SubState = ESubState::Proposed; + + LOG_I("TImport::TTxProgress: ExecutePreparedQuery" + << ": info# " << importInfo->ToString() + << ", item# " << item.ToString(itemIdx) + << ", txId# " << txId + ); + + Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId); + + auto propose = MakeHolder(ui64(txId), Self->TabletID()); + auto& record = propose->Record; + + auto& modifyScheme = *record.AddTransaction(); + modifyScheme = item.PreparedCreationQuery; + modifyScheme.SetInternal(true); + + if (importInfo->UserSID) { + record.SetOwner(*importInfo->UserSID); + } + FillOwner(record, item.Permissions); + + if (TString error; !FillACL(modifyScheme, item.Permissions, error)) { + NIceDb::TNiceDb db(txc.DB); + return CancelAndPersist(db, importInfo, itemIdx, error, "cannot parse permissions"); + } + + Send(Self->SelfId(), std::move(propose)); + } + + void RetryViewsCreation(TImportInfo::TPtr importInfo, NIceDb::TNiceDb& db, const TActorContext& ctx) { + const auto database = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements); + TVector retriedItems; + for (ui32 itemIdx : xrange(importInfo->Items.size())) { + auto& item = importInfo->Items[itemIdx]; + if (!item.CreationQuery.empty() && item.ViewCreationRetries == 0) { + ctx.Register(CreateSchemeQueryExecutor( + Self->SelfId(), importInfo->Id, itemIdx, item.CreationQuery, database + )); + + item.State = EState::CreateTable; + item.ViewCreationRetries++; + Self->PersistImportItemState(db, importInfo, itemIdx); + + retriedItems.emplace_back(itemIdx); + } + } + if (!retriedItems.empty()) { + LOG_D("TImport::TTxProgress: retry view creation" + << ": id# " << importInfo->Id + << ", retried items# " << JoinSeq(", ", retriedItems) + ); + } + } + void TransferData(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); auto& item = importInfo->Items.at(itemIdx); @@ -515,6 +587,23 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } } + void CancelAndPersist(NIceDb::TNiceDb& db, TImportInfo::TPtr importInfo, ui32 itemIdx, TStringBuf itemIssue, TStringBuf marker) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + auto& item = importInfo->Items[itemIdx]; + + item.Issue = itemIssue; + PersistImportItemState(db, importInfo, itemIdx); + + if (importInfo->State != EState::Waiting) { + return; + } + + Cancel(importInfo, itemIdx, marker); + PersistImportState(db, importInfo); + + SendNotificationsIfFinished(importInfo); + } + TMaybe GetIssues(const TPathId& dstPathId, TTxId restoreTxId) { Y_ABORT_UNLESS(Self->Tables.contains(dstPathId)); TTableInfo::TPtr table = Self->Tables.at(dstPathId); @@ -614,7 +703,14 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase case EState::Transferring: case EState::BuildIndexes: if (item.WaitTxId == InvalidTxId) { - AllocateTxId(importInfo, itemIdx); + if (item.CreationQuery.empty() || item.PreparedCreationQuery.HasOperationType()) { + AllocateTxId(importInfo, itemIdx); + } else { + const auto database = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements); + ctx.Register(CreateSchemeQueryExecutor( + Self->SelfId(), importInfo->Id, itemIdx, item.CreationQuery, database + )); + } } else { SubscribeTx(importInfo, itemIdx); } @@ -671,13 +767,16 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } } - void OnSchemeResult(TTransactionContext& txc, const TActorContext&) { + void OnSchemeResult(TTransactionContext& txc, const TActorContext& ctx) { Y_ABORT_UNLESS(SchemeResult); const auto& msg = *SchemeResult->Get(); LOG_D("TImport::TTxProgress: OnSchemeResult" - << ": id# " << msg.ImportId); + << ": id# " << msg.ImportId + << ", itemIdx# " << msg.ItemIdx + << ", success# " << msg.Success + ); if (!Self->Imports.contains(msg.ImportId)) { LOG_E("TImport::TTxProgress: OnSchemeResult received unknown id" @@ -696,29 +795,96 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase auto& item = importInfo->Items.at(msg.ItemIdx); NIceDb::TNiceDb db(txc.DB); - TString error; - if (!msg.Success || !CreateTablePropose(Self, TTxId(), importInfo, msg.ItemIdx, error)) { - item.Issue = msg.Success ? error : msg.Error; - Self->PersistImportItemState(db, importInfo, msg.ItemIdx); - - if (importInfo->State != EState::Waiting) { - return; + if (!msg.Success) { + return CancelAndPersist(db, importInfo, msg.ItemIdx, msg.Error, "cannot get scheme"); + } + if (item.CreationQuery.empty()) { + TString error; + if (!CreateTablePropose(Self, TTxId(), importInfo, msg.ItemIdx, error)) { + return CancelAndPersist(db, importInfo, msg.ItemIdx, error, "invalid scheme"); } - - Cancel(importInfo, msg.ItemIdx, "cannot get/invalid scheme"); - Self->PersistImportState(db, importInfo); - - return SendNotificationsIfFinished(importInfo); + } else { + // send the creation script to KQP to prepare + const auto database = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements); + const TString source = TStringBuilder() + << importInfo->Settings.items(msg.ItemIdx).source_prefix() << NYdb::NDump::NFiles::CreateView().FileName; + + NYql::TIssues issues; + if (!NYdb::NDump::RewriteCreateViewQuery(item.CreationQuery, database, true, item.DstPathName, source, issues)) { + return CancelAndPersist(db, importInfo, msg.ItemIdx, issues.ToString(), "invalid view creation query"); + } + ctx.Register(CreateSchemeQueryExecutor(Self->SelfId(), msg.ImportId, msg.ItemIdx, item.CreationQuery, database)); } Self->PersistImportItemScheme(db, importInfo, msg.ItemIdx); item.State = EState::CreateTable; Self->PersistImportItemState(db, importInfo, msg.ItemIdx); - AllocateTxId(importInfo, msg.ItemIdx); + if (item.CreationQuery.empty()) { + AllocateTxId(importInfo, msg.ItemIdx); + } } - void OnAllocateResult(TTransactionContext&, const TActorContext&) { + void OnSchemeQueryPreparation(TTransactionContext& txc) { + Y_ABORT_UNLESS(SchemeQueryResult); + const auto& message = *SchemeQueryResult.Get()->Get(); + const TString error = std::holds_alternative(message.Result) ? std::get(message.Result) : ""; + + LOG_D("TImport::TTxProgress: OnSchemeQueryPreparation" + << ": id# " << message.ImportId + << ", itemIdx# " << message.ItemIdx + << ", status# " << message.Status + << ", error# " << error + ); + + auto importInfo = Self->Imports.Value(message.ImportId, nullptr); + if (!importInfo) { + LOG_E("TImport::TTxProgress: OnSchemeQueryPreparation received unknown import id" + << ": id# " << message.ImportId + ); + return; + } + if (message.ItemIdx >= importInfo->Items.size()) { + LOG_E("TImport::TTxProgress: OnSchemeQueryPreparation item index out of range" + << ": id# " << message.ImportId + << ", item index# " << message.ItemIdx + << ", number of items# " << importInfo->Items.size() + ); + return; + } + + NIceDb::TNiceDb db(txc.DB); + auto& item = importInfo->Items[message.ItemIdx]; + + if (message.Status == Ydb::StatusIds::SCHEME_ERROR && item.ViewCreationRetries == 0) { + // Scheme error happens when the view depends on a table (or a view) that is not yet imported. + // Instead of tracking view dependencies, we simply retry the creation of the view later. + item.State = EState::Waiting; + + auto isWaiting = [](const TImportInfo::TItem& item) { + return item.State == EState::Waiting; + }; + if (AllOf(importInfo->Items, isWaiting)) { + // All items are waiting? Cancel the import, or we will end up waiting indefinetely. + return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed"); + } + + Self->PersistImportItemState(db, importInfo, message.ItemIdx); + return; + } + + if (message.Status != Ydb::StatusIds::SUCCESS || !error.empty()) { + return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed"); + } + + if (item.State == EState::CreateTable) { + item.PreparedCreationQuery = std::get(message.Result); + PersistImportItemPreparedCreationQuery(db, importInfo, message.ItemIdx); + AllocateTxId(importInfo, message.ItemIdx); + } + } + + void OnAllocateResult(TTransactionContext& txc, const TActorContext&) { Y_ABORT_UNLESS(AllocateResult); const auto txId = TTxId(AllocateResult->Get()->TxIds.front()); @@ -749,6 +915,16 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase switch (item.State) { case EState::CreateTable: + if (item.PreparedCreationQuery.HasOperationType()) { + ExecutePreparedQuery(txc, importInfo, i, txId); + itemIdx = i; + break; + } + if (!item.CreationQuery.empty()) { + // We only need a txId for modify scheme transactions. + // If an object lacks a PreparedCreationQuery, it doesn't require a txId at this stage. + break; + } if (!Self->TableProfilesLoaded) { Self->WaitForTableProfiles(id, i); } else { @@ -932,7 +1108,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase SubscribeTx(importInfo, itemIdx); } - void OnNotifyResult(TTransactionContext& txc, const TActorContext&) { + void OnNotifyResult(TTransactionContext& txc, const TActorContext& ctx) { Y_ABORT_UNLESS(CompletedTxId); LOG_D("TImport::TTxProgress: OnNotifyResult" << ": txId# " << CompletedTxId); @@ -970,6 +1146,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase switch (item.State) { case EState::CreateTable: + if (!item.CreationQuery.empty()) { + item.State = EState::Done; + break; + } item.State = EState::Transferring; AllocateTxId(importInfo, itemIdx); break; @@ -1008,6 +1188,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase if (AllOf(importInfo->Items, &TImportInfo::TItem::IsDone)) { importInfo->State = EState::Done; importInfo->EndTime = TAppData::TimeProvider->Now(); + } else if (AllOf(importInfo->Items, [](const TImportInfo::TItem& item) { + return TImportInfo::TItem::IsDone(item) || item.State == EState::Waiting; + } + )) { + RetryViewsCreation(importInfo, db, ctx); } Self->PersistImportItemState(db, importInfo, itemIdx); @@ -1034,6 +1219,10 @@ ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemeRe return new TImport::TTxProgress(this, ev); } +ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev) { + return new TImport::TTxProgress(this, ev); +} + ITransaction* TSchemeShard::CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev) { return new TImport::TTxProgress(this, ev); } diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp index ab722b2c5e5c..84b740ad9947 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp @@ -10,6 +10,7 @@ #include #include +#include #include @@ -32,9 +33,9 @@ class TSchemeGetter: public TActorBootstrapped { return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/metadata.json"; } - static TString SchemeKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) { + static TString SchemeKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, TStringBuf filename) { Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size()); - return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/scheme.pb"; + return TStringBuilder() << settings.items(itemIdx).source_prefix() << '/' << filename; } static TString PermissionsKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) { @@ -42,6 +43,14 @@ class TSchemeGetter: public TActorBootstrapped { return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/permissions.pb"; } + static bool IsView(TStringBuf schemeKey) { + return schemeKey.EndsWith(NYdb::NDump::NFiles::CreateView().FileName); + } + + static bool NoObjectFound(Aws::S3::S3Errors errorType) { + return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY; + } + void HeadObject(const TString& key) { auto request = Model::HeadObjectRequest() .WithKey(key); @@ -71,6 +80,13 @@ class TSchemeGetter: public TActorBootstrapped { << ": self# " << SelfId() << ", result# " << result); + if (!IsView(SchemeKey) && NoObjectFound(result.GetError().GetErrorType())) { + // try search for a view + SchemeKey = SchemeKeyFromSettings(ImportInfo->Settings, ItemIdx, NYdb::NDump::NFiles::CreateView().FileName); + HeadObject(SchemeKey); + return; + } + if (!CheckResult(result, "HeadObject")) { return; } @@ -86,8 +102,7 @@ class TSchemeGetter: public TActorBootstrapped { << ": self# " << SelfId() << ", result# " << result); - if (result.GetError().GetErrorType() == S3Errors::RESOURCE_NOT_FOUND - || result.GetError().GetErrorType() == S3Errors::NO_SUCH_KEY) { + if (NoObjectFound(result.GetError().GetErrorType())) { Reply(); // permissions are optional return; } else if (!CheckResult(result, "HeadObject")) { @@ -176,9 +191,13 @@ class TSchemeGetter: public TActorBootstrapped { LOG_T("Trying to parse scheme" << ": self# " << SelfId() + << ", itemIdx# " << ItemIdx + << ", schemeKey# " << SchemeKey << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n")); - if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &item.Scheme)) { + if (IsView(SchemeKey)) { + item.CreationQuery = msg.Body; + } else if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &item.Scheme)) { return Reply(false, "Cannot parse scheme"); } @@ -230,7 +249,7 @@ class TSchemeGetter: public TActorBootstrapped { StartValidatingChecksum(PermissionsKey, msg.Body, nextStep); } else { nextStep(); - } + } } void HandleChecksum(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) { @@ -351,7 +370,7 @@ class TSchemeGetter: public TActorBootstrapped { , ImportInfo(importInfo) , ItemIdx(itemIdx) , MetadataKey(MetadataKeyFromSettings(importInfo->Settings, itemIdx)) - , SchemeKey(SchemeKeyFromSettings(importInfo->Settings, itemIdx)) + , SchemeKey(SchemeKeyFromSettings(importInfo->Settings, itemIdx, "scheme.pb")) , PermissionsKey(PermissionsKeyFromSettings(importInfo->Settings, itemIdx)) , Retries(importInfo->Settings.number_of_retries()) , NeedDownloadPermissions(!importInfo->Settings.no_acl()) @@ -411,7 +430,7 @@ class TSchemeGetter: public TActorBootstrapped { const ui32 ItemIdx; const TString MetadataKey; - const TString SchemeKey; + TString SchemeKey; const TString PermissionsKey; const ui32 Retries; diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp new file mode 100644 index 000000000000..e4137e397c47 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp @@ -0,0 +1,179 @@ +#include "schemeshard_import_helpers.h" +#include "schemeshard_import_scheme_query_executor.h" +#include "schemeshard_private.h" + +#include +#include +#include +#include +#include +#include + +#include + +using namespace NKikimr::NKqp; + +namespace NKikimr::NSchemeShard { + +class TSchemeQueryExecutor: public TActorBootstrapped { + + std::unique_ptr BuildCompileRequest() { + UserToken.Reset(MakeIntrusive("")); + + TKqpQuerySettings querySettings(NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY); + querySettings.IsInternalCall = true; + + GUCSettings = std::make_shared(); + + TKqpQueryId query( + TString(DefaultKikimrPublicClusterName), // cluster + Database, // database + "", // database id + SchemeQuery, // query text + querySettings, // query settings + nullptr, // query parameter types + *GUCSettings // GUC settings + ); + + // TO DO: pass cancel after from the import operation + auto deadline = TAppData::TimeProvider->Now() + TDuration::Minutes(1); + TKqpCounters kqpCounters(AppData()->Counters, &TlsActivationContext->AsActorContext()); + IsInterestedInResult = std::make_shared>(true); + UserRequestContext.Reset(MakeIntrusive()); + + return std::make_unique( + UserToken, // user token + "", // client address + Nothing(), // uid + query, // TKqpQueryId + false, // keep in cache + true, // is query action == prepare? + false, // per statement result + deadline, // deadline + kqpCounters.GetDbCounters(Database), // db counters + GUCSettings, // GUC settings + Nothing(), // application name + IsInterestedInResult, // is still interested in result? + UserRequestContext // user request context + ); + } + + void PrepareSchemeQuery() { + if (!Send(MakeKqpCompileServiceID(SelfId().NodeId()), BuildCompileRequest().release())) { + return Reply(Ydb::StatusIds::INTERNAL_ERROR, "cannot send query request"); + } + this->Become(&TThis::StateExecute); + } + + void HandleCompileResponse(const TEvKqp::TEvCompileResponse::TPtr& ev) { + const auto* result = ev->Get()->CompileResult.get(); + if (!result) { + // TO DO: figure out the proper status for this situation. + // Probably, just change the reply event to contain a plain bool status. + return Reply(Ydb::StatusIds::BAD_REQUEST, "no compile result"); + } + + LOG_D("TSchemeQueryExecutor HandleCompileResponse" + << ", self: " << this->SelfId() + << ", status: " << result->Status; + ); + + if (result->Status != Ydb::StatusIds::SUCCESS) { + return Reply(result->Status, result->Issues.ToOneLineString()); + } + if (!result->PreparedQuery) { + return Reply(Ydb::StatusIds::BAD_REQUEST, "no prepared query"); + } + const auto& transactions = result->PreparedQuery->GetPhysicalQuery().GetTransactions(); + if (transactions.empty()) { + return Reply(Ydb::StatusIds::BAD_REQUEST, "empty transactions"); + } + if (!transactions[0].HasSchemeOperation()) { + return Reply(Ydb::StatusIds::BAD_REQUEST, "no scheme operations"); + } + if (!transactions[0].GetSchemeOperation().HasCreateView()) { + return Reply(Ydb::StatusIds::BAD_REQUEST, "no create view operation"); + } + const auto& createView = transactions[0].GetSchemeOperation().GetCreateView(); + Reply(result->Status, createView); + } + + void Reply(Ydb::StatusIds::StatusCode status, std::variant result) { + auto logMessage = TStringBuilder() << "TSchemeQueryExecutor Reply" + << ", self: " << this->SelfId() + << ", success: " << status; + LOG_I(logMessage); + + std::visit([&](T& rresult) { + if constexpr (std::is_same_v) { + logMessage << ", error: " << rresult; + } else if constexpr (std::is_same_v) { + logMessage << ", prepared query: " << rresult.ShortDebugString().Quote(); + } + LOG_D(logMessage); + Send(ReplyTo, new TEvPrivate::TEvImportSchemeQueryResult(ImportId, ItemIdx, status, std::move(rresult))); + }, result); + + PassAway(); + } + +public: + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::IMPORT_SCHEME_QUERY_EXECUTOR; + } + + TSchemeQueryExecutor( + TActorId replyTo, + ui64 importId, + ui32 itemIdx, + const TString& schemeQuery, + const TString& database + ) + : ReplyTo(replyTo) + , ImportId(importId) + , ItemIdx(itemIdx) + , SchemeQuery(schemeQuery) + , Database(database) + { + } + + void Bootstrap() { + PrepareSchemeQuery(); + } + + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + sFunc(TEvents::TEvPoisonPill, PassAway); + } + } + + STATEFN(StateExecute) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvKqp::TEvCompileResponse, HandleCompileResponse); + default: + return StateBase(ev); + } + } + +private: + + TActorId ReplyTo; + ui64 ImportId; + ui32 ItemIdx; + TString SchemeQuery; + TString Database; + + // Pointer type event arguments need to live until we receive the compilation response. + TIntrusiveConstPtr UserToken; + TGUCSettings::TPtr GUCSettings; + std::shared_ptr> IsInterestedInResult; + TIntrusivePtr UserRequestContext; + +}; // TSchemeQueryExecutor + +IActor* CreateSchemeQueryExecutor(NActors::TActorId replyTo, ui64 importId, ui32 itemIdx, const TString& schemeQuery, const TString& database) { + return new TSchemeQueryExecutor(replyTo, importId, itemIdx, schemeQuery, database); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.h b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.h new file mode 100644 index 000000000000..550c9c145437 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.h @@ -0,0 +1,9 @@ +#pragma once + +#include + +namespace NKikimr::NSchemeShard { + +NActors::IActor* CreateSchemeQueryExecutor(NActors::TActorId replyTo, ui64 importId, ui32 itemIdx, const TString& creationQuery, const TString& database); + +} diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 6af2eb2f11a5..e1ed21ddb2b6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2664,17 +2664,20 @@ struct TExportInfo: public TSimpleRefCount { TString SourcePathName; TPathId SourcePathId; + NKikimrSchemeOp::EPathType SourcePathType; EState State = EState::Waiting; ESubState SubState = ESubState::AllocateTxId; TTxId WaitTxId = InvalidTxId; + TActorId SchemeUploader; TString Issue; TItem() = default; - explicit TItem(const TString& sourcePathName, const TPathId sourcePathId) + explicit TItem(const TString& sourcePathName, const TPathId sourcePathId, NKikimrSchemeOp::EPathType sourcePathType) : SourcePathName(sourcePathName) , SourcePathId(sourcePathId) + , SourcePathType(sourcePathType) { } @@ -2822,6 +2825,8 @@ struct TImportInfo: public TSimpleRefCount { TString DstPathName; TPathId DstPathId; Ydb::Table::CreateTableRequest Scheme; + TString CreationQuery; + NKikimrSchemeOp::TModifyScheme PreparedCreationQuery; TMaybeFail Permissions; NBackup::TMetadata Metadata; @@ -2830,6 +2835,7 @@ struct TImportInfo: public TSimpleRefCount { TTxId WaitTxId = InvalidTxId; int NextIndexIdx = 0; TString Issue; + int ViewCreationRetries = 0; TItem() = default; diff --git a/ydb/core/tx/schemeshard/schemeshard_path.cpp b/ydb/core/tx/schemeshard/schemeshard_path.cpp index cfed8c7e2215..0202ec3d6463 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path.cpp @@ -922,6 +922,20 @@ const TPath::TChecker& TPath::TChecker::IsBackupCollection(EStatus status) const << " (" << BasicPathInfo(Path.Base()) << ")"); } +const TPath::TChecker& TPath::TChecker::IsSupportedInExports(EStatus status) const { + if (Failed) { + return *this; + } + + if (Path.Base()->IsTable() || Path.Base()->IsView()) { + return *this; + } + + return Fail(status, TStringBuilder() << "path type is not supported in exports" + << " (" << BasicPathInfo(Path.Base()) << ")" + ); +} + const TPath::TChecker& TPath::TChecker::PathShardsLimit(ui64 delta, EStatus status) const { if (Failed) { return *this; diff --git a/ydb/core/tx/schemeshard/schemeshard_path.h b/ydb/core/tx/schemeshard/schemeshard_path.h index 81e3ee13d09b..4e017c51b05a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.h +++ b/ydb/core/tx/schemeshard/schemeshard_path.h @@ -104,6 +104,7 @@ class TPath { const TChecker& FailOnRestrictedCreateInTempZone(bool allowCreateInTemporaryDir = false, EStatus status = EStatus::StatusPreconditionFailed) const; const TChecker& IsResourcePool(EStatus status = EStatus::StatusNameConflict) const; const TChecker& IsBackupCollection(EStatus status = EStatus::StatusNameConflict) const; + const TChecker& IsSupportedInExports(EStatus status = EStatus::StatusNameConflict) const; }; public: diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index 2ba95e78b0c8..02aec915e902 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -1,8 +1,13 @@ #pragma once -#include "defs.h" - #include "schemeshard_identificators.h" +#include +#include +#include +#include + +#include + namespace NKikimr { namespace NSchemeShard { @@ -15,6 +20,8 @@ namespace TEvPrivate { EvRunConditionalErase, EvIndexBuildBilling, EvImportSchemeReady, + EvImportSchemeQueryResult, + EvExportSchemeUploadResult, EvServerlessStorageBilling, EvCleanDroppedPaths, EvCleanDroppedSubDomains, @@ -95,6 +102,43 @@ namespace TEvPrivate { {} }; + struct TEvImportSchemeQueryResult: public TEventLocal { + const ui64 ImportId; + const ui32 ItemIdx; + const Ydb::StatusIds::StatusCode Status; + const std::variant Result; + + // failed query + TEvImportSchemeQueryResult(ui64 id, ui32 itemIdx, Ydb::StatusIds::StatusCode status, TString&& error) + : ImportId(id) + , ItemIdx(itemIdx) + , Status(status) + , Result(error) + {} + + // successful query + TEvImportSchemeQueryResult(ui64 id, ui32 itemIdx, Ydb::StatusIds::StatusCode status, NKikimrSchemeOp::TModifyScheme&& preparedQuery) + : ImportId(id) + , ItemIdx(itemIdx) + , Status(status) + , Result(preparedQuery) + {} + }; + + struct TEvExportSchemeUploadResult: public TEventLocal { + const ui64 ExportId; + const ui32 ItemIdx; + const bool Success; + const TString Error; + + TEvExportSchemeUploadResult(ui64 id, ui32 itemIdx, bool success, const TString& error) + : ExportId(id) + , ItemIdx(itemIdx) + , Success(success) + , Error(error) + {} + }; + struct TEvServerlessStorageBilling: public TEventLocal { TEvServerlessStorageBilling() {} diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 5fb719e771e0..438c59bd9103 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1209,6 +1209,7 @@ struct Schema : NIceDb::Schema { struct BackupTxId : Column<6, NScheme::NTypeIds::Uint64> { using Type = TTxId; }; struct Issue : Column<7, NScheme::NTypeIds::Utf8> {}; struct SourceOwnerPathId : Column<8, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; + struct SourcePathType : Column<9, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::EPathType; static constexpr Type Default = NKikimrSchemeOp::EPathTypeTable; }; using TKey = TableKey; using TColumns = TableColumns< @@ -1219,7 +1220,8 @@ struct Schema : NIceDb::Schema { State, BackupTxId, Issue, - SourceOwnerPathId + SourceOwnerPathId, + SourcePathType >; }; @@ -1551,6 +1553,9 @@ struct Schema : NIceDb::Schema { struct DstPathOwnerId : Column<4, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; struct DstPathLocalId : Column<5, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; struct Scheme : Column<6, NScheme::NTypeIds::String> {}; + struct CreationQuery : Column<13, NScheme::NTypeIds::Utf8> {}; + // NKikimrSchemeOp::TModifyScheme serialized as string + struct PreparedCreationQuery : Column<14, NScheme::NTypeIds::String> {}; struct Permissions : Column<11, NScheme::NTypeIds::String> {}; struct Metadata : Column<12, NScheme::NTypeIds::String> {}; @@ -1567,6 +1572,8 @@ struct Schema : NIceDb::Schema { DstPathOwnerId, DstPathLocalId, Scheme, + CreationQuery, + PreparedCreationQuery, Permissions, Metadata, State, diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 5dd8b1c9c700..34acca3ae4d3 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -234,6 +234,7 @@ SRCS( schemeshard_import__get.cpp schemeshard_import__list.cpp schemeshard_import_flow_proposals.cpp + schemeshard_import_scheme_query_executor.cpp schemeshard_info_types.cpp schemeshard_info_types.h schemeshard_path.cpp @@ -308,6 +309,8 @@ PEERDIR( ydb/library/login ydb/library/login/protos ydb/library/protobuf_printer + ydb/public/lib/ydb_cli/dump/files + ydb/public/lib/ydb_cli/dump/util yql/essentials/minikql yql/essentials/providers/common/proto ydb/services/bg_tasks @@ -319,10 +322,12 @@ YQL_LAST_ABI_VERSION() IF (OS_WINDOWS) SRCS( + schemeshard_export_scheme_uploader_fallback.cpp schemeshard_import_scheme_getter_fallback.cpp ) ELSE() SRCS( + schemeshard_export_scheme_uploader.cpp schemeshard_import_scheme_getter.cpp ) ENDIF() diff --git a/ydb/library/backup/backup.cpp b/ydb/library/backup/backup.cpp index 86489ff290b8..2f7db4e15729 100644 --- a/ydb/library/backup/backup.cpp +++ b/ydb/library/backup/backup.cpp @@ -41,8 +41,6 @@ #include -#include - namespace NYdb::NBackup { @@ -548,72 +546,10 @@ NView::TViewDescription DescribeView(NView::TViewClient& client, const TString& auto status = NConsoleClient::RetryFunction([&]() { return client.DescribeView(path).ExtractValueSync(); }); - VerifyStatus(status); + VerifyStatus(status, "describe view to build a backup"); return status.GetViewDescription(); } -struct TViewQuerySplit { - TString ContextRecreation; - TString Select; -}; - -TViewQuerySplit SplitViewQuery(TStringInput query) { - // to do: make the implementation more versatile - TViewQuerySplit split; - - TString line; - while (query.ReadLine(line)) { - (line.StartsWith("--") || line.StartsWith("PRAGMA ") - ? split.ContextRecreation - : split.Select - ) += line; - } - - return split; -} - -void ValidateViewQuery(const TString& query, const TString& dbPath, NYql::TIssues& accumulatedIssues) { - NYql::TIssues subIssues; - if (!NDump::ValidateViewQuery(query, subIssues)) { - NYql::TIssue restorabilityIssue( - TStringBuilder() << "Restorability of the view: " << dbPath.Quote() - << " storing the following query:\n" - << query - << "\ncannot be guaranteed. For more information, please consult the 'ydb tools dump' documentation." - ); - restorabilityIssue.Severity = NYql::TSeverityIds::S_WARNING; - for (const auto& subIssue : subIssues) { - restorabilityIssue.AddSubIssue(MakeIntrusive(subIssue)); - } - accumulatedIssues.AddIssue(std::move(restorabilityIssue)); - } -} - -TString BuildCreateViewQuery(TStringBuf name, const NView::TViewDescription& description, TStringBuf backupRoot) { - auto queryText = TString{description.GetQueryText()}; - auto [contextRecreation, select] = SplitViewQuery(queryText); - - const TString query = std::format( - "-- backup root: \"{}\"\n" - "{}\n" - "CREATE VIEW IF NOT EXISTS `{}` WITH (security_invoker = TRUE) AS\n" - " {};\n", - backupRoot.data(), - contextRecreation.data(), - name.data(), - select.data() - ); - - TString formattedQuery; - TString errors; - Y_ENSURE(NSQLFormat::SqlFormatSimple( - query, - formattedQuery, - errors - ), errors); - return formattedQuery; -} - } /*! @@ -635,19 +571,22 @@ void BackupView(TDriver driver, const TString& dbBackupRoot, const TString& dbPa LOG_I("Backup view " << dbPath.Quote() << " to " << fsBackupDir.GetPath().Quote()); NView::TViewClient client(driver); - auto viewDescription = DescribeView(client, dbPath); - - ValidateViewQuery(TString{viewDescription.GetQueryText()}, dbPath, issues); + const auto viewDescription = DescribeView(client, dbPath); const auto fsPath = fsBackupDir.Child(NDump::NFiles::CreateView().FileName); LOG_D("Write view creation query to " << fsPath.GetPath().Quote()); - TFileOutput output(fsPath); - output << BuildCreateViewQuery( + const auto creationQuery = NDump::BuildCreateViewQuery( TFsPath(dbPathRelativeToBackupRoot).GetName(), - viewDescription, - dbBackupRoot + dbPath, + TString(viewDescription.GetQueryText()), + dbBackupRoot, + issues ); + Y_ENSURE(creationQuery, issues.ToString()); + + TFileOutput output(fsPath); + output << creationQuery; BackupPermissions(driver, dbBackupRoot, dbPathRelativeToBackupRoot, fsBackupDir); } diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 75d3557e83c8..bff8c7ac8df5 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1074,5 +1074,6 @@ message TActivity { BS_GROUP_GETBLOCK = 653; HTTP_MON_INDEX_SERVICE = 654; HTTP_MON_AUTHORIZED_ACTOR_REQUEST = 655; + IMPORT_SCHEME_QUERY_EXECUTOR = 656; }; }; diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp index 816a9f5d446f..f820d95af020 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp +++ b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp @@ -20,7 +20,6 @@ #include #include -#include #include @@ -129,72 +128,11 @@ TRestoreResult CombineResults(const TVector& results) { return Result(); } -TString GetBackupRoot(TStringInput query) { - TString backupRoot; - - constexpr TStringBuf targetLinePrefix = "-- backup root: \""; - constexpr TStringBuf discardedSuffix = "\""; - TString line; - while (query.ReadLine(line)) { - if (line.StartsWith(targetLinePrefix)) { - backupRoot = line.substr( - std::size(targetLinePrefix), - std::size(line) - std::size(targetLinePrefix) - std::size(discardedSuffix) - ); - return backupRoot; - } - } - - return backupRoot; -} - bool IsDatabase(TSchemeClient& client, const TString& path) { auto result = DescribePath(client, path); return result.GetStatus() == EStatus::SUCCESS && result.GetEntry().Type == ESchemeEntryType::SubDomain; } -bool RewriteTablePathPrefix(TString& query, TStringBuf backupRoot, TStringBuf restoreRoot, - bool restoreRootIsDatabase, NIssue::TIssues& issues -) { - if (backupRoot == restoreRoot) { - return true; - } - - TString pathPrefix; - if (!re2::RE2::PartialMatch(query, R"(PRAGMA TablePathPrefix = '(\S+)';)", &pathPrefix)) { - if (!restoreRootIsDatabase) { - // Initially, the view used the implicit table path prefix, but this is no longer feasible - // since the restore root is different from the database root. - // Consequently, we must issue an explicit TablePathPrefix pragma to ensure that the reference targets - // maintain the same relative positions to the view's location as they did previously. - - size_t contextRecreationEnd = query.find("CREATE VIEW"); - if (contextRecreationEnd == TString::npos) { - issues.AddIssue(TStringBuilder() << "no create view statement in the query: " << query); - return false; - } - query.insert(contextRecreationEnd, TString( - std::format("PRAGMA TablePathPrefix = '{}';\n", restoreRoot.data()) - )); - } - return true; - } - - pathPrefix = RewriteAbsolutePath(pathPrefix, backupRoot, restoreRoot); - - constexpr TStringBuf pattern = R"(PRAGMA TablePathPrefix = '\S+';)"; - if (!re2::RE2::Replace(&query, pattern, - std::format(R"(PRAGMA TablePathPrefix = '{}';)", pathPrefix.c_str()) - )) { - issues.AddIssue(TStringBuilder() << "query: " << query.Quote() - << " does not contain the pattern: \"" << pattern << "\"" - ); - return false; - } - - return true; -} - } // anonymous namespace NPrivate { @@ -454,12 +392,13 @@ TRestoreResult TRestoreClient::RestoreFolder(const TFsPath& fsPath, const TStrin } } - if (!result.Defined() && !IsDatabase(SchemeClient, dbPath)) { + const bool dbPathExists = oldEntries.contains(dbPath); + if (!result.Defined() && !dbPathExists) { // This situation occurs when all the children of the folder are views. - return RestoreEmptyDir(fsPath, dbPath, settings, oldEntries.contains(dbPath)); + return RestoreEmptyDir(fsPath, dbPath, settings, dbPathExists); } - return RestorePermissions(fsPath, dbPath, settings, oldEntries.contains(dbPath)); + return RestorePermissions(fsPath, dbPath, settings, dbPathExists); } TRestoreResult TRestoreClient::RestoreView( @@ -483,31 +422,11 @@ TRestoreResult TRestoreClient::RestoreView( const auto createViewFile = fsPath.Child(NFiles::CreateView().FileName); TString query = TFileInput(createViewFile).ReadAll(); - const auto backupRoot = GetBackupRoot(query); - { - NIssue::TIssues issues; - if (!RewriteTablePathPrefix(query, backupRoot, dbRestoreRoot, IsDatabase(SchemeClient, dbRestoreRoot), issues)) { - // hard fail since we want to avoid silent fails with wrong table path prefixes - return Result(dbPath, TStatus(EStatus::BAD_REQUEST, std::move(issues))); - } - } - { - NYql::TIssues issues; - RewriteTableRefs(query, backupRoot, dbRestoreRoot, issues); - if (!issues.Empty()) { - // soft fail since the only kind of table references that cause issues are evaluated absolute paths - // and they will fail during the query execution anyway - LOG_W(issues.ToOneLineString()); - } - } - - constexpr TStringBuf pattern = R"(CREATE VIEW IF NOT EXISTS `\S+` )"; - if (!re2::RE2::Replace(&query, pattern, std::format(R"(CREATE VIEW IF NOT EXISTS `{}` )", dbPath.c_str()))) { - NIssue::TIssues issues; - issues.AddIssue(TStringBuilder() << "Cannot restore a view from the file: " << createViewFile.GetPath().Quote() - << ". Pattern: \"" << pattern << "\", was not found in the create view statement: " << query.Quote() - ); - return Result(dbPath, TStatus(EStatus::BAD_REQUEST, std::move(issues))); + NYql::TIssues issues; + if (!RewriteCreateViewQuery(query, dbRestoreRoot, IsDatabase(SchemeClient, dbRestoreRoot), dbPath, + createViewFile.GetPath().Quote(), issues + )) { + return Result(dbPath, EStatus::BAD_REQUEST, issues.ToString()); } if (settings.DryRun_) { diff --git a/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp b/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp index ac5d51f62e76..fccef21b8f34 100644 --- a/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp +++ b/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp @@ -11,10 +11,35 @@ #include #include +#include +#include + using namespace NSQLv1Generated; namespace { +TString RewriteAbsolutePath(TStringBuf path, TStringBuf backupRoot, TStringBuf restoreRoot) { + if (backupRoot == restoreRoot) { + return TString(path); + } + + TPathSplitUnix pathSplit(path); + TPathSplitUnix backupRootSplit(backupRoot); + + size_t matchedParts = 0; + while (matchedParts < pathSplit.size() && matchedParts < backupRootSplit.size() + && pathSplit[matchedParts] == backupRootSplit[matchedParts] + ) { + ++matchedParts; + } + + TPathSplitUnix restoreRootSplit(restoreRoot); + for (size_t unmatchedParts = matchedParts + 1; unmatchedParts <= backupRootSplit.size(); ++unmatchedParts) { + restoreRootSplit.AppendComponent(".."); + } + return restoreRootSplit.AppendMany(pathSplit.begin() + matchedParts, pathSplit.end()).Reconstruct(); +} + struct TAbsolutePathRewriter { static bool IsAbsolutePath(TStringBuf path) { @@ -27,7 +52,7 @@ struct TAbsolutePathRewriter { } return TStringBuilder() << '`' - << NYdb::NDump::RewriteAbsolutePath(TStringBuf(path.begin() + 1, path.end() - 1), BackupRoot, RestoreRoot) + << ::RewriteAbsolutePath(TStringBuf(path.begin() + 1, path.end() - 1), BackupRoot, RestoreRoot) << '`'; } @@ -103,15 +128,6 @@ void VisitAllFields(const NProtoBuf::Message& msg, TTokenCollector& callback) { } } -bool Format(const TString& query, TString& formattedQuery, NYql::TIssues& issues) { - google::protobuf::Arena arena; - NSQLTranslation::TTranslationSettings settings; - settings.Arena = &arena; - - auto formatter = NSQLFormat::MakeSqlFormatter(settings); - return formatter->Format(query, formattedQuery, issues); -} - struct TTableRefValidator { // returns true if the message is not a table ref and we need to dive deeper to find it @@ -162,6 +178,26 @@ TString RewriteTableRefs(const TRule_sql_query& query, TStringBuf backupRoot, TS return tokenCollector.Tokens; } +struct TViewQuerySplit { + TString ContextRecreation; + TString Select; +}; + +TViewQuerySplit SplitViewQuery(TStringInput query) { + // to do: make the implementation more versatile + TViewQuerySplit split; + + TString line; + while (query.ReadLine(line)) { + (line.StartsWith("--") || line.StartsWith("PRAGMA ") + ? split.ContextRecreation + : split.Select + ) += line; + } + + return split; +} + bool SqlToProtoAst(const TString& query, TRule_sql_query& queryProto, NYql::TIssues& issues) { NSQLTranslation::TTranslationSettings settings; if (!NSQLTranslation::ParseTranslationSettings(query, settings, issues)) { @@ -184,10 +220,6 @@ bool SqlToProtoAst(const TString& query, TRule_sql_query& queryProto, NYql::TIss return true; } -} - -namespace NYdb::NDump { - bool ValidateViewQuery(const TString& query, NYql::TIssues& issues) { TRule_sql_query queryProto; if (!SqlToProtoAst(query, queryProto, issues)) { @@ -196,26 +228,49 @@ bool ValidateViewQuery(const TString& query, NYql::TIssues& issues) { return ValidateTableRefs(queryProto, issues); } -TString RewriteAbsolutePath(TStringBuf path, TStringBuf backupRoot, TStringBuf restoreRoot) { - if (backupRoot == restoreRoot) { - return TString(path); +void ValidateViewQuery(const TString& query, const TString& dbPath, NYql::TIssues& issues) { + NYql::TIssues subIssues; + if (!ValidateViewQuery(query, subIssues)) { + NYql::TIssue restorabilityIssue( + TStringBuilder() << "Restorability of the view: " << dbPath.Quote() + << " storing the following query:\n" + << query + << "\ncannot be guaranteed. For more information, please refer to the 'ydb tools dump' documentation." + ); + restorabilityIssue.Severity = NYql::TSeverityIds::S_WARNING; + for (const auto& subIssue : subIssues) { + restorabilityIssue.AddSubIssue(MakeIntrusive(subIssue)); + } + issues.AddIssue(std::move(restorabilityIssue)); } +} - TPathSplitUnix pathSplit(path); - TPathSplitUnix backupRootSplit(backupRoot); - - size_t matchedParts = 0; - while (matchedParts < pathSplit.size() && matchedParts < backupRootSplit.size() - && pathSplit[matchedParts] == backupRootSplit[matchedParts] - ) { - ++matchedParts; +TString GetBackupRoot(TStringInput query) { + TString backupRoot; + + constexpr TStringBuf targetLinePrefix = "-- backup root: \""; + constexpr TStringBuf discardedSuffix = "\""; + TString line; + while (query.ReadLine(line)) { + if (line.StartsWith(targetLinePrefix)) { + backupRoot = line.substr( + std::size(targetLinePrefix), + std::size(line) - std::size(targetLinePrefix) - std::size(discardedSuffix) + ); + return backupRoot; + } } - TPathSplitUnix restoreRootSplit(restoreRoot); - for (size_t unmatchedParts = matchedParts + 1; unmatchedParts <= backupRootSplit.size(); ++unmatchedParts) { - restoreRootSplit.AppendComponent(".."); - } - return restoreRootSplit.AppendMany(pathSplit.begin() + matchedParts, pathSplit.end()).Reconstruct(); + return backupRoot; +} + +bool Format(const TString& query, TString& formattedQuery, NYql::TIssues& issues) { + google::protobuf::Arena arena; + NSQLTranslation::TTranslationSettings settings; + settings.Arena = &arena; + + auto formatter = NSQLFormat::MakeSqlFormatter(settings); + return formatter->Format(query, formattedQuery, issues); } bool RewriteTableRefs(TString& query, TStringBuf backupRoot, TStringBuf restoreRoot, NYql::TIssues& issues) { @@ -223,7 +278,7 @@ bool RewriteTableRefs(TString& query, TStringBuf backupRoot, TStringBuf restoreR if (!SqlToProtoAst(query, queryProto, issues)) { return false; } - const auto rewrittenQuery = ::RewriteTableRefs(queryProto, backupRoot, restoreRoot); + const auto rewrittenQuery = RewriteTableRefs(queryProto, backupRoot, restoreRoot); // formatting here is necessary for the view to have pretty text inside it after the creation if (!Format(rewrittenQuery, query, issues)) { return false; @@ -231,4 +286,101 @@ bool RewriteTableRefs(TString& query, TStringBuf backupRoot, TStringBuf restoreR return true; } +bool RewriteTablePathPrefix(TString& query, TStringBuf backupRoot, TStringBuf restoreRoot, + bool restoreRootIsDatabase, NYql::TIssues& issues +) { + if (backupRoot == restoreRoot) { + return true; + } + + TString pathPrefix; + if (!re2::RE2::PartialMatch(query, R"(PRAGMA TablePathPrefix = '(\S+)';)", &pathPrefix)) { + if (!restoreRootIsDatabase) { + // Initially, the view relied on the implicit table path prefix; + // however, this approach is now incorrect because the requested restore root differs from the database root. + // We need to explicitly set the TablePathPrefix pragma to ensure that the reference targets + // keep the same relative positions to the view's location as before. + + size_t contextRecreationEnd = query.find("CREATE VIEW"); + if (contextRecreationEnd == TString::npos) { + issues.AddIssue(TStringBuilder() << "no create view statement in the query: " << query); + return false; + } + query.insert(contextRecreationEnd, TString( + std::format("PRAGMA TablePathPrefix = '{}';\n", restoreRoot.data()) + )); + } + return true; + } + + pathPrefix = RewriteAbsolutePath(pathPrefix, backupRoot, restoreRoot); + + constexpr TStringBuf pattern = R"(PRAGMA TablePathPrefix = '\S+';)"; + if (!re2::RE2::Replace(&query, pattern, + std::format(R"(PRAGMA TablePathPrefix = '{}';)", pathPrefix.c_str()) + )) { + issues.AddIssue(TStringBuilder() << "query: " << query.Quote() + << " does not contain the pattern: \"" << pattern << "\"" + ); + return false; + } + + return true; +} + +} + +namespace NYdb::NDump { + +TString BuildCreateViewQuery( + const TString& name, const TString& dbPath, const TString& viewQuery, const TString& backupRoot, + NYql::TIssues& issues +) { + auto [contextRecreation, select] = SplitViewQuery(viewQuery); + + const TString creationQuery = std::format( + "-- backup root: \"{}\"\n" + "{}\n" + "CREATE VIEW IF NOT EXISTS `{}` WITH (security_invoker = TRUE) AS\n" + " {};\n", + backupRoot.data(), + contextRecreation.data(), + name.data(), + select.data() + ); + + ValidateViewQuery(creationQuery, dbPath, issues); + + TString formattedQuery; + if (!Format(creationQuery, formattedQuery, issues)) { + return ""; + } + return formattedQuery; +} + +bool RewriteCreateViewQuery(TString& query, const TString& restoreRoot, bool restoreRootIsDatabase, + const TString& dbPath, const TString& source, NYql::TIssues& issues +) { + const auto backupRoot = GetBackupRoot(query); + + if (!RewriteTablePathPrefix(query, backupRoot, restoreRoot, restoreRootIsDatabase, issues)) { + return false; + } + + if (!RewriteTableRefs(query, backupRoot, restoreRoot, issues)) { + return false; + } + + constexpr TStringBuf pattern = R"(CREATE VIEW IF NOT EXISTS `\S+` )"; + if (!re2::RE2::Replace(&query, pattern, std::format(R"(CREATE VIEW IF NOT EXISTS `{}` )", dbPath.c_str()))) { + issues.AddIssue(TStringBuilder() + << "Cannot restore a view from the source: " << source + << ". Pattern: \"" << pattern << "\", was not found in the create view statement: " << query.Quote() + ); + return false; + } + + return true; +} + } diff --git a/ydb/public/lib/ydb_cli/dump/util/view_utils.h b/ydb/public/lib/ydb_cli/dump/util/view_utils.h index 6e65ca7c78ab..15252d6b294f 100644 --- a/ydb/public/lib/ydb_cli/dump/util/view_utils.h +++ b/ydb/public/lib/ydb_cli/dump/util/view_utils.h @@ -2,10 +2,13 @@ namespace NYdb::NDump { -bool ValidateViewQuery(const TString& query, NYql::TIssues& issues); - -TString RewriteAbsolutePath(TStringBuf path, TStringBuf backupRoot, TStringBuf restoreRoot); - -bool RewriteTableRefs(TString& scheme, TStringBuf backupRoot, TStringBuf restoreRoot, NYql::TIssues& issues); +TString BuildCreateViewQuery( + const TString& name, const TString& dbPath, const TString& viewQuery, const TString& backupRoot, + NYql::TIssues& issues +); + +bool RewriteCreateViewQuery(TString& query, const TString& restoreRoot, bool restoreRootIsDatabase, + const TString& dbPath, const TString& source, NYql::TIssues& issues +); } diff --git a/ydb/services/ydb/backup_ut/ydb_backup_ut.cpp b/ydb/services/ydb/backup_ut/ydb_backup_ut.cpp index f797e11b2f57..9257fb93771b 100644 --- a/ydb/services/ydb/backup_ut/ydb_backup_ut.cpp +++ b/ydb/services/ydb/backup_ut/ydb_backup_ut.cpp @@ -2,13 +2,17 @@ #include +#include +#include #include #include +#include #include #include #include #include #include +#include #include @@ -23,6 +27,7 @@ using namespace NYdb; using namespace NYdb::NOperation; using namespace NYdb::NScheme; using namespace NYdb::NTable; +using namespace NYdb::NView; namespace NYdb::NTable { @@ -40,6 +45,14 @@ bool operator==(const TKeyRange& lhs, const TKeyRange& rhs) { } +namespace NYdb { + +struct TTenantsTestSettings : TKikimrTestSettings { + static constexpr bool PrecreatePools = false; +}; + +} + namespace { #define Y_UNIT_TEST_ALL_PROTO_ENUM_VALUES(N, ENUM_TYPE) \ @@ -92,26 +105,51 @@ TDataQueryResult ExecuteDataModificationQuery(TSession& session, return result; } -TDataQueryResult GetTableContent(TSession& session, const char* table) { +NQuery::TExecuteQueryResult ExecuteQuery(NQuery::TSession& session, const TString& script, bool isDDL = false) { + const auto result = session.ExecuteQuery( + script, + isDDL ? NQuery::TTxControl::NoTx() : NQuery::TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), "query:\n" << script << "\nissues:\n" << result.GetIssues().ToString()); + return result; +} + +TDataQueryResult GetTableContent(TSession& session, const char* table, + const char* keyColumn = "Key" +) { return ExecuteDataModificationQuery(session, Sprintf(R"( - SELECT * FROM `%s` ORDER BY Key; - )", table + SELECT * FROM `%s` ORDER BY %s; + )", table, keyColumn )); } -void CompareResults(const TDataQueryResult& first, const TDataQueryResult& second) { - const auto& firstResults = first.GetResultSets(); - const auto& secondResults = second.GetResultSets(); +NQuery::TExecuteQueryResult GetTableContent(NQuery::TSession& session, const char* table, + const char* keyColumn = "Key" +) { + return ExecuteQuery(session, Sprintf(R"( + SELECT * FROM `%s` ORDER BY %s; + )", table, keyColumn + )); +} - UNIT_ASSERT_VALUES_EQUAL(firstResults.size(), secondResults.size()); - for (size_t i = 0; i < firstResults.size(); ++i) { +void CompareResults(const std::vector& first, const std::vector& second) { + UNIT_ASSERT_VALUES_EQUAL(first.size(), second.size()); + for (size_t i = 0; i < first.size(); ++i) { UNIT_ASSERT_STRINGS_EQUAL( - FormatResultSetYson(firstResults[i]), - FormatResultSetYson(secondResults[i]) + FormatResultSetYson(first[i]), + FormatResultSetYson(second[i]) ); } } +void CompareResults(const TDataQueryResult& first, const TDataQueryResult& second) { + CompareResults(first.GetResultSets(), second.GetResultSets()); +} + +void CompareResults(const NQuery::TExecuteQueryResult& first, const NQuery::TExecuteQueryResult& second) { + CompareResults(first.GetResultSets(), second.GetResultSets()); +} + TTableDescription GetTableDescription(TSession& session, const TString& path, const TDescribeTableSettings& settings = {} ) { @@ -170,6 +208,23 @@ void CheckBuildIndexOperationsCleared(TDriver& driver) { UNIT_ASSERT_C(result.GetList().empty(), "Build index operations aren't cleared:\n" << result.ToJsonString()); } +TViewDescription DescribeView(TViewClient& viewClient, const TString& path) { + const auto describeResult = viewClient.DescribeView(path).ExtractValueSync(); + UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString()); + return describeResult.GetViewDescription(); +} + +// note: the storage pool kind must be preconfigured in the server +void CreateDatabase(TTenants& tenants, TStringBuf path, TStringBuf storagePoolKind) { + Ydb::Cms::CreateDatabaseRequest request; + request.set_path(path); + auto& storage = *request.mutable_resources()->add_storage_units(); + storage.set_unit_kind(storagePoolKind); + storage.set_count(1); + + tenants.CreateTenant(std::move(request)); +} + using TBackupFunction = std::function; using TRestoreFunction = std::function; @@ -468,28 +523,143 @@ void TestRestoreDirectory(const char* directory, TSchemeClient& client, TBackupF } } +void TestViewOutputIsPreserved( + const char* view, NQuery::TSession& session, TBackupFunction&& backup, TRestoreFunction&& restore +) { + constexpr const char* viewQuery = R"( + SELECT 1 AS Key + UNION + SELECT 2 AS Key + UNION + SELECT 3 AS Key; + )"; + ExecuteQuery(session, Sprintf(R"( + CREATE VIEW `%s` WITH security_invoker = TRUE AS %s; + )", view, viewQuery + ), true + ); + const auto originalContent = GetTableContent(session, view); + + backup(view); + + ExecuteQuery(session, Sprintf(R"( + DROP VIEW `%s`; + )", view + ), true + ); + + restore(view); + CompareResults(GetTableContent(session, view), originalContent); } -Y_UNIT_TEST_SUITE(BackupRestore) { +void TestViewQueryTextIsPreserved( + const char* view, TViewClient& viewClient, NQuery::TSession& session, TBackupFunction&& backup, TRestoreFunction&& restore +) { + constexpr const char* viewQuery = "SELECT 42"; + ExecuteQuery(session, Sprintf(R"( + CREATE VIEW `%s` WITH security_invoker = TRUE AS %s; + )", view, viewQuery + ), true + ); + const auto originalText = DescribeView(viewClient, view).GetQueryText(); + UNIT_ASSERT_STRINGS_EQUAL(originalText, viewQuery); - void Restore(NDump::TClient& client, const TFsPath& sourceFile, const TString& dbPath) { - auto result = client.Restore(sourceFile, dbPath); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - } + backup(view); + + ExecuteQuery(session, Sprintf(R"( + DROP VIEW `%s`; + )", view + ), true + ); - auto CreateBackupLambda(const TDriver& driver, const TFsPath& pathToBackup, bool schemaOnly = false) { - return [&driver, &pathToBackup, schemaOnly](const char* table) { + restore(view); + UNIT_ASSERT_STRINGS_EQUAL( + DescribeView(viewClient, view).GetQueryText(), + originalText + ); +} + +// The view might be restored to a different path from the original. +void TestViewReferenceTableIsPreserved( + const char* view, const char* table, const char* restoredView, NQuery::TSession& session, + TBackupFunction&& backup, TRestoreFunction&& restore +) { + ExecuteQuery(session, Sprintf(R"( + CREATE TABLE `%s` ( + Key Uint32, + Value Utf8, + PRIMARY KEY (Key) + ); + )", table + ), true + ); + ExecuteQuery(session, Sprintf(R"( + UPSERT INTO `%s` ( + Key, + Value + ) + VALUES + (1, "one"), + (2, "two"), + (3, "three"); + )", + table + )); + + const TString viewQuery = Sprintf(R"( + SELECT * FROM `%s` + )", table + ); + ExecuteQuery(session, Sprintf(R"( + CREATE VIEW `%s` WITH security_invoker = TRUE AS %s; + )", view, viewQuery.c_str() + ), true + ); + const auto originalContent = GetTableContent(session, view); + + backup(view); + + ExecuteQuery(session, Sprintf(R"( + DROP VIEW `%s`; + )", view + ), true + ); + ExecuteQuery(session, Sprintf(R"( + DROP TABLE `%s`; + )", table + ), true + ); + + restore(view); + CompareResults(GetTableContent(session, restoredView), originalContent); +} + +void TestViewReferenceTableIsPreserved( + const char* view, const char* table, NQuery::TSession& session, TBackupFunction&& backup, TRestoreFunction&& restore +) { + // view is restored to the original path + TestViewReferenceTableIsPreserved(view, table, view, session, std::move(backup), std::move(restore)); +} + +} + +Y_UNIT_TEST_SUITE(BackupRestore) { + + auto CreateBackupLambda(const TDriver& driver, const TFsPath& fsPath, const TString& dbPath = "/Root") { + return [&](const char* table) { Y_UNUSED(table); - // TO DO: implement NDump::TClient::Dump and call it instead of BackupFolder - NBackup::BackupFolder(driver, "/Root", ".", pathToBackup, {}, schemaOnly, false); + NDump::TClient backupClient(driver); + const auto result = backupClient.Dump(dbPath, fsPath, NDump::TDumpSettings().Database(dbPath)); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); }; } - auto CreateRestoreLambda(const TDriver& driver, const TFsPath& pathToBackup) { - return [&driver, &pathToBackup](const char* table) { + auto CreateRestoreLambda(const TDriver& driver, const TFsPath& fsPath, const TString& dbPath = "/Root") { + return [&](const char* table) { Y_UNUSED(table); NDump::TClient backupClient(driver); - Restore(backupClient, pathToBackup, "/Root"); + const auto result = backupClient.Restore(fsPath, dbPath); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); }; } @@ -508,7 +678,7 @@ Y_UNIT_TEST_SUITE(BackupRestore) { table, minPartitions, session, - CreateBackupLambda(driver, pathToBackup, true), + CreateBackupLambda(driver, pathToBackup), CreateRestoreLambda(driver, pathToBackup) ); } @@ -530,7 +700,7 @@ Y_UNIT_TEST_SUITE(BackupRestore) { index, minIndexPartitions, session, - CreateBackupLambda(driver, pathToBackup, true), + CreateBackupLambda(driver, pathToBackup), CreateRestoreLambda(driver, pathToBackup) ); } @@ -550,13 +720,86 @@ Y_UNIT_TEST_SUITE(BackupRestore) { table, partitions, session, - CreateBackupLambda(driver, pathToBackup, true), + CreateBackupLambda(driver, pathToBackup), CreateRestoreLambda(driver, pathToBackup) ); } // TO DO: test index impl table split boundaries restoration from a backup + Y_UNIT_TEST(RestoreViewQueryText) { + TKikimrWithGrpcAndRootSchema server; + server.GetRuntime()->GetAppData().FeatureFlags.SetEnableViews(true); + auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", server.GetPort()))); + NQuery::TQueryClient queryClient(driver); + auto session = queryClient.GetSession().ExtractValueSync().GetSession(); + TViewClient viewClient(driver); + TTempDir tempDir; + const auto& pathToBackup = tempDir.Path(); + + constexpr const char* view = "/Root/view"; + + TestViewQueryTextIsPreserved( + view, + viewClient, + session, + CreateBackupLambda(driver, pathToBackup), + CreateRestoreLambda(driver, pathToBackup) + ); + } + + Y_UNIT_TEST(RestoreViewReferenceTable) { + TKikimrWithGrpcAndRootSchema server; + server.GetRuntime()->GetAppData().FeatureFlags.SetEnableViews(true); + auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", server.GetPort()))); + NQuery::TQueryClient queryClient(driver); + auto session = queryClient.GetSession().ExtractValueSync().GetSession(); + TTempDir tempDir; + const auto& pathToBackup = tempDir.Path(); + + constexpr const char* view = "/Root/view"; + constexpr const char* table = "/Root/a/b/c/table"; + + TestViewReferenceTableIsPreserved( + view, + table, + session, + CreateBackupLambda(driver, pathToBackup), + CreateRestoreLambda(driver, pathToBackup) + ); + } + + Y_UNIT_TEST(RestoreViewToDifferentDatabase) { + TBasicKikimrWithGrpcAndRootSchema server; + + constexpr const char* alice = "/Root/tenants/alice"; + constexpr const char* bob = "/Root/tenants/bob"; + CreateDatabase(*server.Tenants_, alice, "ssd"); + CreateDatabase(*server.Tenants_, bob, "hdd"); + + auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", server.GetPort()))); + NQuery::TQueryClient queryClient(driver); + auto session = queryClient.GetSession().ExtractValueSync().GetSession(); + TTempDir tempDir; + const auto& pathToBackup = tempDir.Path(); + + // query client lives on the node 0, so it is enough to enable the views only on it + server.GetRuntime()->GetAppData(0).FeatureFlags.SetEnableViews(true); + + const TString view = JoinFsPaths(alice, "view"); + const TString table = JoinFsPaths(alice, "a", "b", "c", "table"); + const TString restoredView = JoinFsPaths(bob, "view"); + + TestViewReferenceTableIsPreserved( + view.c_str(), + table.c_str(), + restoredView.c_str(), + session, + CreateBackupLambda(driver, pathToBackup, alice), + CreateRestoreLambda(driver, pathToBackup, bob) + ); + } + void TestTableBackupRestore() { TKikimrWithGrpcAndRootSchema server; auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", server.GetPort()))); @@ -629,6 +872,25 @@ Y_UNIT_TEST_SUITE(BackupRestore) { ); } + void TestViewBackupRestore() { + TKikimrWithGrpcAndRootSchema server; + server.GetRuntime()->GetAppData().FeatureFlags.SetEnableViews(true); + auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", server.GetPort()))); + NQuery::TQueryClient queryClient(driver); + auto session = queryClient.GetSession().ExtractValueSync().GetSession(); + TTempDir tempDir; + const auto& pathToBackup = tempDir.Path(); + + constexpr const char* view = "/Root/view"; + + TestViewOutputIsPreserved( + view, + session, + CreateBackupLambda(driver, pathToBackup), + CreateRestoreLambda(driver, pathToBackup) + ); + } + Y_UNIT_TEST_ALL_PROTO_ENUM_VALUES(TestAllSchemeObjectTypes, NKikimrSchemeOp::EPathType) { using namespace NKikimrSchemeOp; @@ -651,7 +913,8 @@ Y_UNIT_TEST_SUITE(BackupRestore) { case EPathTypeExtSubDomain: break; // https://github.com/ydb-platform/ydb/issues/10432 case EPathTypeView: - break; // https://github.com/ydb-platform/ydb/issues/10433 + TestViewBackupRestore(); + break; case EPathTypeCdcStream: break; // https://github.com/ydb-platform/ydb/issues/7054 case EPathTypeReplication: @@ -717,44 +980,53 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) { using NKikimr::NWrappers::NTestHelpers::TS3Mock; class TS3TestEnv { - TKikimrWithGrpcAndRootSchema server; - TDriver driver; - TTableClient tableClient; - TSession session; - ui16 s3Port; - TS3Mock s3Mock; + TKikimrWithGrpcAndRootSchema Server; + TDriver Driver; + TTableClient TableClient; + TSession TableSession; + NQuery::TQueryClient QueryClient; + NQuery::TSession QuerySession; + ui16 S3Port; + TS3Mock S3Mock; // required for exports to function - TDataShardExportFactory dataShardExportFactory; + TDataShardExportFactory DataShardExportFactory; public: TS3TestEnv() - : driver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", server.GetPort()))) - , tableClient(driver) - , session(tableClient.CreateSession().ExtractValueSync().GetSession()) - , s3Port(server.GetPortManager().GetPort()) - , s3Mock({}, TS3Mock::TSettings(s3Port)) + : Driver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", Server.GetPort()))) + , TableClient(Driver) + , TableSession(TableClient.CreateSession().ExtractValueSync().GetSession()) + , QueryClient(Driver) + , QuerySession(QueryClient.GetSession().ExtractValueSync().GetSession()) + , S3Port(Server.GetPortManager().GetPort()) + , S3Mock({}, TS3Mock::TSettings(S3Port)) { - UNIT_ASSERT_C(s3Mock.Start(), s3Mock.GetError()); + UNIT_ASSERT_C(S3Mock.Start(), S3Mock.GetError()); - auto& runtime = *server.GetRuntime(); + auto& runtime = *Server.GetRuntime(); runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::EPriority::PRI_DEBUG); - runtime.GetAppData().DataShardExportFactory = &dataShardExportFactory; + runtime.GetAppData().DataShardExportFactory = &DataShardExportFactory; + runtime.GetAppData().FeatureFlags.SetEnableViews(true); } TKikimrWithGrpcAndRootSchema& GetServer() { - return server; + return Server; } const TDriver& GetDriver() const { - return driver; + return Driver; + } + + TSession& GetTableSession() { + return TableSession; } - TSession& GetSession() { - return session; + NQuery::TSession& GetQuerySession() { + return QuerySession; } ui16 GetS3Port() const { - return s3Port; + return S3Port; } }; @@ -776,20 +1048,49 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) { return false; } - void ExportToS3(NExport::TExportClient& exportClient, ui16 s3Port, NOperation::TOperationClient& operationClient, - const TString& source, const TString& destination + bool FilterSupportedSchemeObjects(const NYdb::NScheme::TSchemeEntry& entry) { + return IsIn({ + NYdb::NScheme::ESchemeEntryType::Table, + NYdb::NScheme::ESchemeEntryType::View, + }, entry.Type); + } + + void ExpandSource(TSchemeClient& schemeClient, const TString& source, const TString& destination, + NExport::TExportToS3Settings& exportSettings + ) { + const auto listSettings = NConsoleClient::TRecursiveListSettings().Filter(FilterSupportedSchemeObjects); + const auto sourceListing = NConsoleClient::RecursiveList(schemeClient, source, listSettings); + UNIT_ASSERT_C(sourceListing.Status.IsSuccess(), sourceListing.Status.GetIssues()); + + for (const auto& entry : sourceListing.Entries) { + exportSettings.AppendItem({ + .Src = entry.Name, + .Dst = TStringBuilder() << destination << TStringBuf(entry.Name).RNextTok(source) + }); + } + } + + void ExportToS3( + TSchemeClient& schemeClient, + NExport::TExportClient& exportClient, + ui16 s3Port, + NOperation::TOperationClient& operationClient, + const TString& source, + const TString& destination ) { // The exact values for Bucket, AccessKey and SecretKey do not matter if the S3 backend is TS3Mock. // Any non-empty strings should do. - const auto exportSettings = NExport::TExportToS3Settings() + auto exportSettings = NExport::TExportToS3Settings() .Endpoint(Sprintf("localhost:%u", s3Port)) .Scheme(ES3Scheme::HTTP) .Bucket("test_bucket") .AccessKey("test_key") - .SecretKey("test_secret") - .AppendItem(NExport::TExportToS3Settings::TItem{.Src = source, .Dst = destination}); + .SecretKey("test_secret"); + + ExpandSource(schemeClient, source, destination, exportSettings); - auto response = exportClient.ExportToS3(exportSettings).ExtractValueSync(); + const auto response = exportClient.ExportToS3(exportSettings).ExtractValueSync(); + UNIT_ASSERT_C(response.Status().IsSuccess(), response.Status().GetIssues().ToString()); UNIT_ASSERT_C(WaitForOperation(operationClient, response.Id()), Sprintf("The export from %s to %s did not complete within the allocated time.", source.c_str(), destination.c_str() @@ -797,40 +1098,54 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) { ); } + auto CreateBackupLambda(const TDriver& driver, ui16 s3Port, const TString& source = "/Root") { + return [&driver, s3Port, &source](const char* table) { + Y_UNUSED(table); + const auto clientSettings = TCommonClientSettings().Database(source); + TSchemeClient schemeClient(driver, clientSettings); + NExport::TExportClient exportClient(driver, clientSettings); + NOperation::TOperationClient operationClient(driver, clientSettings); + ExportToS3(schemeClient, exportClient, s3Port, operationClient, source, ""); + }; + } + void ImportFromS3(NImport::TImportClient& importClient, ui16 s3Port, NOperation::TOperationClient& operationClient, - const TString& source, const TString& destination + TVector&& items ) { // The exact values for Bucket, AccessKey and SecretKey do not matter if the S3 backend is TS3Mock. // Any non-empty strings should do. - const auto importSettings = NImport::TImportFromS3Settings() + auto importSettings = NImport::TImportFromS3Settings() .Endpoint(Sprintf("localhost:%u", s3Port)) .Scheme(ES3Scheme::HTTP) .Bucket("test_bucket") .AccessKey("test_key") - .SecretKey("test_secret") - .AppendItem(NImport::TImportFromS3Settings::TItem{.Src = source, .Dst = destination}); + .SecretKey("test_secret"); + + importSettings.Item_ = std::move(items); - auto response = importClient.ImportFromS3(importSettings).ExtractValueSync(); + const auto response = importClient.ImportFromS3(importSettings).ExtractValueSync(); + UNIT_ASSERT_C(response.Status().IsSuccess(), response.Status().GetIssues().ToString()); UNIT_ASSERT_C(WaitForOperation(operationClient, response.Id()), - Sprintf("The import from %s to %s did not complete within the allocated time.", - source.c_str(), destination.c_str() - ) + "The import did not complete within the allocated time." ); } - auto CreateBackupLambda(const TDriver& driver, ui16 s3Port) { - return [&driver, s3Port](const char* table) { - NExport::TExportClient exportClient(driver); - NOperation::TOperationClient operationClient(driver); - ExportToS3(exportClient, s3Port, operationClient, table, "table"); - }; - } - - auto CreateRestoreLambda(const TDriver& driver, ui16 s3Port) { - return [&driver, s3Port](const char* table) { - NImport::TImportClient importClient(driver); - NOperation::TOperationClient operationClient(driver); - ImportFromS3(importClient, s3Port, operationClient, "table", table); + auto CreateRestoreLambda(const TDriver& driver, ui16 s3Port, const TString& destination = "/Root", const TVector& destinations = {}) { + return [&driver, s3Port, &destination, &destinations](const char* table) { + Y_UNUSED(table); + const auto clientSettings = TCommonClientSettings().Database(destination); + NImport::TImportClient importClient(driver, clientSettings); + NOperation::TOperationClient operationClient(driver, clientSettings); + TVector items; + if (destinations.empty()) { + items.emplace_back(TString(TStringBuf(table).RNextTok(destination)), table); + ImportFromS3(importClient, s3Port, operationClient, std::move(items)); + } else { + for (const auto& item : destinations) { + items.emplace_back(TString(TStringBuf(item).RNextTok(destination)), item); + } + ImportFromS3(importClient, s3Port, operationClient, std::move(items)); + } }; } @@ -842,7 +1157,7 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) { TestTablePartitioningSettingsArePreserved( table, minPartitions, - testEnv.GetSession(), + testEnv.GetTableSession(), CreateBackupLambda(testEnv.GetDriver(), testEnv.GetS3Port()), CreateRestoreLambda(testEnv.GetDriver(), testEnv.GetS3Port()) ); @@ -858,7 +1173,7 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) { table, index, minIndexPartitions, - testEnv.GetSession(), + testEnv.GetTableSession(), CreateBackupLambda(testEnv.GetDriver(), testEnv.GetS3Port()), CreateRestoreLambda(testEnv.GetDriver(), testEnv.GetS3Port()) ); @@ -872,7 +1187,7 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) { TestTableSplitBoundariesArePreserved( table, partitions, - testEnv.GetSession(), + testEnv.GetTableSession(), CreateBackupLambda(testEnv.GetDriver(), testEnv.GetS3Port()), CreateRestoreLambda(testEnv.GetDriver(), testEnv.GetS3Port()) ); @@ -914,7 +1229,7 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) { table, index, indexPartitions, - testEnv.GetSession(), + testEnv.GetTableSession(), tableBuilder, CreateBackupLambda(testEnv.GetDriver(), testEnv.GetS3Port()), CreateRestoreLambda(testEnv.GetDriver(), testEnv.GetS3Port()) @@ -964,20 +1279,50 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) { table, index, indexPartitions, - testEnv.GetSession(), + testEnv.GetTableSession(), tableBuilder, CreateBackupLambda(testEnv.GetDriver(), testEnv.GetS3Port()), CreateRestoreLambda(testEnv.GetDriver(), testEnv.GetS3Port()) ); } + Y_UNIT_TEST(RestoreViewQueryText) { + TS3TestEnv testEnv; + TViewClient viewClient(testEnv.GetDriver()); + constexpr const char* view = "/Root/view"; + + TestViewQueryTextIsPreserved( + view, + viewClient, + testEnv.GetQuerySession(), + CreateBackupLambda(testEnv.GetDriver(), testEnv.GetS3Port()), + CreateRestoreLambda(testEnv.GetDriver(), testEnv.GetS3Port()) + ); + } + + Y_UNIT_TEST(RestoreViewReferenceTable) { + TS3TestEnv testEnv; + constexpr const char* view = "/Root/view"; + constexpr const char* table = "/Root/a/b/c/table"; + + TestViewReferenceTableIsPreserved( + view, + table, + testEnv.GetQuerySession(), + CreateBackupLambda(testEnv.GetDriver(), testEnv.GetS3Port()), + CreateRestoreLambda(testEnv.GetDriver(), testEnv.GetS3Port(), "/Root", {"/Root/view", "/Root/a/b/c/table"}) + ); + } + + // TO DO: test view restoration to a different database + void TestTableBackupRestore() { TS3TestEnv testEnv; constexpr const char* table = "/Root/table"; TestTableContentIsPreserved( table, - testEnv.GetSession(), + testEnv.GetTableSession(), CreateBackupLambda(testEnv.GetDriver(), testEnv.GetS3Port()), CreateRestoreLambda(testEnv.GetDriver(), testEnv.GetS3Port()) ); @@ -992,7 +1337,7 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) { table, index, indexType, - testEnv.GetSession(), + testEnv.GetTableSession(), CreateBackupLambda(testEnv.GetDriver(), testEnv.GetS3Port()), CreateRestoreLambda(testEnv.GetDriver(), testEnv.GetS3Port()) ); @@ -1004,7 +1349,19 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) { TestRestoreTableWithSerial( table, - testEnv.GetSession(), + testEnv.GetTableSession(), + CreateBackupLambda(testEnv.GetDriver(), testEnv.GetS3Port()), + CreateRestoreLambda(testEnv.GetDriver(), testEnv.GetS3Port()) + ); + } + + void TestViewBackupRestore() { + TS3TestEnv testEnv; + constexpr const char* view = "/Root/view"; + + TestViewOutputIsPreserved( + view, + testEnv.GetQuerySession(), CreateBackupLambda(testEnv.GetDriver(), testEnv.GetS3Port()), CreateRestoreLambda(testEnv.GetDriver(), testEnv.GetS3Port()) ); @@ -1031,7 +1388,8 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) { case EPathTypeExtSubDomain: break; // https://github.com/ydb-platform/ydb/issues/10432 case EPathTypeView: - break; // https://github.com/ydb-platform/ydb/issues/10433 + TestViewBackupRestore(); + break; case EPathTypeCdcStream: break; // https://github.com/ydb-platform/ydb/issues/7054 case EPathTypeReplication: diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 9ec4b87cd28b..1f2147f19d35 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -3205,6 +3205,11 @@ "ColumnId": 8, "ColumnName": "SourceOwnerPathId", "ColumnType": "Uint64" + }, + { + "ColumnId": 9, + "ColumnName": "SourcePathType", + "ColumnType": "Uint32" } ], "ColumnsDropped": [], @@ -3218,7 +3223,8 @@ 5, 6, 7, - 8 + 8, + 9 ], "RoomID": 0, "Codec": 0, @@ -6173,6 +6179,16 @@ "ColumnId": 12, "ColumnName": "Metadata", "ColumnType": "String" + }, + { + "ColumnId": 13, + "ColumnName": "CreationQuery", + "ColumnType": "Utf8" + }, + { + "ColumnId": 14, + "ColumnName": "PreparedCreationQuery", + "ColumnType": "String" } ], "ColumnsDropped": [], @@ -6190,7 +6206,9 @@ 9, 10, 11, - 12 + 12, + 13, + 14 ], "RoomID": 0, "Codec": 0,