Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VIEW: import #13361

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <ydb/services/fq/private_grpc.h>
#include <ydb/services/cms/grpc_service.h>
#include <ydb/services/datastreams/grpc_service.h>
#include <ydb/services/view/grpc_service.h>
#include <ydb/services/ymq/grpc_service.h>
#include <ydb/services/kesus/grpc_service.h>
#include <ydb/core/grpc_services/grpc_mon.h>
Expand Down Expand Up @@ -719,6 +720,7 @@ namespace Tests {
GRpcServer->AddService(new NGRpcService::TGRpcYdbLogStoreService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcAuthService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcReplicationService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcViewService(system, counters, grpcRequestProxies[0], true));
GRpcServer->Start();
}

Expand Down
12 changes: 12 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4329,6 +4329,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {

item.SourcePathId.OwnerId = rowset.GetValueOrDefault<Schema::ExportItems::SourceOwnerPathId>(selfId);
item.SourcePathId.LocalPathId = rowset.GetValue<Schema::ExportItems::SourcePathId>();
item.SourcePathType = rowset.GetValue<Schema::ExportItems::SourcePathType>();

item.State = static_cast<TExportInfo::EState>(rowset.GetValue<Schema::ExportItems::State>());
item.WaitTxId = rowset.GetValueOrDefault<Schema::ExportItems::BackupTxId>(InvalidTxId);
Expand Down Expand Up @@ -4432,6 +4433,17 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
item.Scheme = scheme;
}

if (rowset.HaveValue<Schema::ImportItems::CreationQuery>()) {
item.CreationQuery = rowset.GetValue<Schema::ImportItems::CreationQuery>();
}

if (rowset.HaveValue<Schema::ImportItems::PreparedCreationQuery>()) {
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(
item.PreparedCreationQuery,
rowset.GetValue<Schema::ImportItems::PreparedCreationQuery>()
));
}

if (rowset.HaveValue<Schema::ImportItems::Permissions>()) {
Ydb::Scheme::ModifyPermissionsRequest permissions;
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(permissions, rowset.GetValue<Schema::ImportItems::Permissions>()));
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::T
NIceDb::TUpdate<Schema::ExportItems::SourcePathName>(item.SourcePathName),
NIceDb::TUpdate<Schema::ExportItems::SourceOwnerPathId>(item.SourcePathId.OwnerId),
NIceDb::TUpdate<Schema::ExportItems::SourcePathId>(item.SourcePathId.LocalPathId),
NIceDb::TUpdate<Schema::ExportItems::State>(static_cast<ui8>(item.State))
NIceDb::TUpdate<Schema::ExportItems::State>(static_cast<ui8>(item.State)),
NIceDb::TUpdate<Schema::ExportItems::SourcePathType>(item.SourcePathType)
);
}
}
Expand Down Expand Up @@ -231,6 +232,10 @@ void TSchemeShard::Handle(TEvExport::TEvListExportsRequest::TPtr& ev, const TAct
Execute(CreateTxListExports(ev), ctx);
}

void TSchemeShard::Handle(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev, const TActorContext& ctx) {
Execute(CreateTxProgressExport(ev), ctx);
}

void TSchemeShard::ResumeExports(const TVector<ui64>& exportIds, const TActorContext& ctx) {
for (const ui64 id : exportIds) {
Execute(CreateTxProgressExport(id), ctx);
Expand Down
163 changes: 146 additions & 17 deletions ydb/core/tx/schemeshard/schemeshard_export__create.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#include "schemeshard_xxport__tx_base.h"
#include "schemeshard_xxport__helpers.h"
#include "schemeshard_export.h"
#include "schemeshard_export_flow_proposals.h"
#include "schemeshard_export_helpers.h"
#include "schemeshard_export.h"
#include "schemeshard_export_scheme_uploader.h"
#include "schemeshard_audit_log.h"
#include "schemeshard_impl.h"

Expand Down Expand Up @@ -203,7 +204,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
.IsResolved()
.NotDeleted()
.NotUnderDeleting()
.IsTable()
.IsSupportedInExports()
.FailOnRestrictedCreateInTempZone();

if (!checks) {
Expand All @@ -212,7 +213,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
}
}

exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId);
exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId, path->PathType);
exportInfo->PendingItems.push_back(itemIdx);
}

Expand All @@ -230,6 +231,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
ui64 Id;
TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr;
TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr;
TEvPrivate::TEvExportSchemeUploadResult::TPtr SchemeUploadResult = nullptr;
TTxId CompletedTxId = InvalidTxId;

explicit TTxProgress(TSelf* self, ui64 id)
Expand All @@ -250,6 +252,12 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
{
}

explicit TTxProgress(TSelf* self, TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev)
: TXxport::TTxBase(self)
, SchemeUploadResult(ev)
{
}

explicit TTxProgress(TSelf* self, TTxId completedTxId)
: TXxport::TTxBase(self)
, CompletedTxId(completedTxId)
Expand All @@ -267,6 +275,8 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
OnAllocateResult(txc, ctx);
} else if (ModifyResult) {
OnModifyResult(txc, ctx);
} else if (SchemeUploadResult) {
OnSchemeUploadResult(txc);
} else if (CompletedTxId) {
OnNotifyResult(txc, ctx);
} else {
Expand All @@ -290,16 +300,22 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
Send(Self->SelfId(), MkDirPropose(Self, txId, exportInfo));
}

void CopyTables(TExportInfo::TPtr exportInfo, TTxId txId) {
bool CopyTables(TExportInfo::TPtr exportInfo, TTxId txId) {
LOG_I("TExport::TTxProgress: CopyTables propose"
<< ": info# " << exportInfo->ToString()
<< ", txId# " << txId);

Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId);
Send(Self->SelfId(), CopyTablesPropose(Self, txId, exportInfo));
if (AnyOf(exportInfo->Items, [](const TExportInfo::TItem& item) {
return item.SourcePathType == NKikimrSchemeOp::EPathTypeTable;
})) {
Send(Self->SelfId(), CopyTablesPropose(Self, txId, exportInfo));
return true;
}
return false;
}

void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId) {
void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId, const TActorContext& ctx) {
Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
auto& item = exportInfo->Items.at(itemIdx);

Expand All @@ -311,7 +327,24 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
<< ", txId# " << txId);

Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);
Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx));
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) {
Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx));
} else if (item.SourcePathType == NKikimrSchemeOp::EPathTypeView) {
Ydb::Export::ExportToS3Settings exportSettings;
Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings));
const auto databaseRoot = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements);
jepett0 marked this conversation as resolved.
Show resolved Hide resolved

NBackup::TMetadata metadata;
// to do: enable view checksum validation
constexpr bool EnableChecksums = false;
metadata.SetVersion(EnableChecksums ? 1 : 0);

item.SchemeUploader = ctx.Register(CreateSchemeUploader(
Self->SelfId(), exportInfo->Id, itemIdx, item.SourcePathId,
exportSettings, databaseRoot, metadata.Serialize()
));
Self->RunningExportSchemeUploaders.emplace(item.SchemeUploader);
}
}

bool CancelTransferring(TExportInfo::TPtr exportInfo, ui32 itemIdx) {
Expand Down Expand Up @@ -461,6 +494,14 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
return path->LastTxId;
}

void KillChildActors(TExportInfo::TItem& item) {
if (auto& schemeUploader = item.SchemeUploader; schemeUploader != TActorId()) {
Send(schemeUploader, new TEvents::TEvPoisonPill());
Self->RunningExportSchemeUploaders.erase(schemeUploader);
schemeUploader = TActorId();
}
}

void Cancel(TExportInfo::TPtr exportInfo, ui32 itemIdx, TStringBuf marker) {
Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
const auto& item = exportInfo->Items.at(itemIdx);
Expand All @@ -472,6 +513,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
exportInfo->State = EState::Cancelled;

for (ui32 i : xrange(exportInfo->Items.size())) {
KillChildActors(exportInfo->Items[i]);
if (i == itemIdx) {
continue;
}
Expand Down Expand Up @@ -620,7 +662,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
}
}

void OnAllocateResult(TTransactionContext&, const TActorContext&) {
void OnAllocateResult(TTransactionContext& txc, const TActorContext& ctx) {
Y_ABORT_UNLESS(AllocateResult);

const auto txId = TTxId(AllocateResult->Get()->TxIds.front());
Expand Down Expand Up @@ -651,13 +693,27 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
break;

case EState::CopyTables:
CopyTables(exportInfo, txId);
if (!CopyTables(exportInfo, txId)) {
// none of the items is a table
NIceDb::TNiceDb db(txc.DB);

for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
exportInfo->Items[itemIdx].State = EState::Transferring;
Self->PersistExportItemState(db, exportInfo, itemIdx);

AllocateTxId(exportInfo, itemIdx);
}

exportInfo->State = EState::Transferring;
Self->PersistExportState(db, exportInfo);
return;
}
break;

case EState::Transferring:
if (exportInfo->PendingItems) {
itemIdx = popPendingItemIdx(exportInfo->PendingItems);
TransferData(exportInfo, itemIdx, txId);
TransferData(exportInfo, itemIdx, txId, ctx);
} else {
return;
}
Expand Down Expand Up @@ -870,6 +926,72 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
SubscribeTx(txId);
}

void OnSchemeUploadResult(TTransactionContext& txc) {
Y_ABORT_UNLESS(SchemeUploadResult);
const auto& result = *SchemeUploadResult.Get()->Get();

LOG_D("TExport::TTxProgress: OnSchemeUploadResult"
<< ": id# " << result.ExportId
<< ", itemIdx# " << result.ItemIdx
<< ", success# " << result.Success
<< ", error# " << result.Error
);

const auto exportId = result.ExportId;
auto exportInfo = Self->Exports.Value(exportId, nullptr);
if (!exportInfo) {
LOG_E("TExport::TTxProgress: OnSchemeUploadResult received unknown export id"
<< ": id# " << exportId
);
return;
}

ui32 itemIdx = result.ItemIdx;
if (itemIdx >= exportInfo->Items.size()) {
LOG_E("TExport::TTxProgress: OnSchemeUploadResult item index out of range"
<< ": id# " << exportId
<< ", item index# " << itemIdx
<< ", number of items# " << exportInfo->Items.size()
);
return;
}

NIceDb::TNiceDb db(txc.DB);

auto& item = exportInfo->Items[itemIdx];
Self->RunningExportSchemeUploaders.erase(item.SchemeUploader);
item.SchemeUploader = TActorId();

if (!result.Success) {
item.State = EState::Cancelled;
item.Issue = result.Error;
Self->PersistExportItemState(db, exportInfo, itemIdx);

if (!exportInfo->IsInProgress()) {
return;
}

Cancel(exportInfo, itemIdx, "unsuccessful scheme upload");

Self->PersistExportState(db, exportInfo);
return SendNotificationsIfFinished(exportInfo);
}

if (exportInfo->State == EState::Transferring) {
item.State = EState::Done;
Self->PersistExportItemState(db, exportInfo, itemIdx);

if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
exportInfo->State = EState::Done;
exportInfo->EndTime = TAppData::TimeProvider->Now();

Self->PersistExportState(db, exportInfo);
SendNotificationsIfFinished(exportInfo);
AuditLogExportEnd(*exportInfo.Get(), Self);
}
}
}

void OnNotifyResult(TTransactionContext& txc, const TActorContext&) {
Y_ABORT_UNLESS(CompletedTxId);
LOG_D("TExport::TTxProgress: OnNotifyResult"
Expand Down Expand Up @@ -948,15 +1070,18 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
item.State = EState::Done;
item.WaitTxId = InvalidTxId;

if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) {
item.Issue = *issue;
Cancel(exportInfo, itemIdx, "issues during backing up");
} else {
if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
exportInfo->State = EState::Done;
exportInfo->EndTime = TAppData::TimeProvider->Now();
bool itemHasIssues = false;
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) {
if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) {
item.Issue = *issue;
Cancel(exportInfo, itemIdx, "issues during backing up");
itemHasIssues = true;
}
}
if (!itemHasIssues && AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
exportInfo->State = EState::Done;
exportInfo->EndTime = TAppData::TimeProvider->Now();
}

Self->PersistExportItemState(db, exportInfo, itemIdx);
break;
Expand Down Expand Up @@ -1016,6 +1141,10 @@ ITransaction* TSchemeShard::CreateTxProgressExport(TEvSchemeShard::TEvModifySche
return new TExport::TTxProgress(this, ev);
}

ITransaction* TSchemeShard::CreateTxProgressExport(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev) {
return new TExport::TTxProgress(this, ev);
}

ITransaction* TSchemeShard::CreateTxProgressExport(TTxId completedTxId) {
return new TExport::TTxProgress(this, completedTxId);
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CopyTablesPropose(

for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
const auto& item = exportInfo->Items.at(itemIdx);
if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable) {
continue;
}

auto& desc = *copyTables.Add();
desc.SetSrcPath(item.SourcePathName);
Expand Down
Loading
Loading