Skip to content

Commit

Permalink
Enable view import
Browse files Browse the repository at this point in the history
- server side view import code
  • Loading branch information
jepett0 committed Jan 21, 2025
1 parent 2b4dd19 commit c5bff86
Show file tree
Hide file tree
Showing 14 changed files with 485 additions and 16 deletions.
11 changes: 11 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4433,6 +4433,17 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
item.Scheme = scheme;
}

if (rowset.HaveValue<Schema::ImportItems::CreationQuery>()) {
item.CreationQuery = rowset.GetValue<Schema::ImportItems::CreationQuery>();
}

if (rowset.HaveValue<Schema::ImportItems::PreparedCreationQuery>()) {
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(
item.PreparedCreationQuery,
rowset.GetValue<Schema::ImportItems::PreparedCreationQuery>()
));
}

if (rowset.HaveValue<Schema::ImportItems::Permissions>()) {
Ydb::Scheme::ModifyPermissionsRequest permissions;
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(permissions, rowset.GetValue<Schema::ImportItems::Permissions>()));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4845,6 +4845,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
HFuncTraced(TEvImport::TEvForgetImportRequest, Handle);
HFuncTraced(TEvImport::TEvListImportsRequest, Handle);
HFuncTraced(TEvPrivate::TEvImportSchemeReady, Handle);
HFuncTraced(TEvPrivate::TEvImportSchemeQueryResult, Handle);
// } // NImport

// namespace NBackup {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,7 @@ class TSchemeShard
static void PersistImportState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo);
static void PersistImportItemState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx);
static void PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx);
static void PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx);
static void PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx);

struct TImport {
Expand All @@ -1284,6 +1285,7 @@ class TSchemeShard

NTabletFlatExecutor::ITransaction* CreateTxProgressImport(ui64 id, const TMaybe<ui32>& itemIdx = Nothing());
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeReady::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvIndexBuilder::TEvCreateResponse::TPtr& ev);
Expand All @@ -1295,6 +1297,7 @@ class TSchemeShard
void Handle(TEvImport::TEvForgetImportRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvImport::TEvListImportsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx);

void ResumeImports(const TVector<ui64>& ids, const TActorContext& ctx);
// } // NImport
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
record.Update(
NIceDb::TUpdate<Schema::ImportItems::Scheme>(item.Scheme.SerializeAsString())
);

if (!item.CreationQuery.empty()) {
record.Update(
NIceDb::TUpdate<Schema::ImportItems::CreationQuery>(item.CreationQuery)
);
}
if (item.Permissions.Defined()) {
record.Update(
NIceDb::TUpdate<Schema::ImportItems::Permissions>(item.Permissions->SerializeAsString())
Expand All @@ -185,6 +191,18 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
);
}

void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items[itemIdx];

// persist the prepared modify scheme if it is non-empty (has operation type is interpreted as non-empty)
if (item.PreparedCreationQuery.HasOperationType()) {
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ImportItems::PreparedCreationQuery>(item.PreparedCreationQuery.SerializeAsString())
);
}
}

void TSchemeShard::PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items.at(itemIdx);
Expand Down Expand Up @@ -219,6 +237,10 @@ void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TAct
Execute(CreateTxProgressImport(ev), ctx);
}

void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx) {
Execute(CreateTxProgressImport(ev), ctx);
}

void TSchemeShard::ResumeImports(const TVector<ui64>& ids, const TActorContext& ctx) {
for (const ui64 id : ids) {
Execute(CreateTxProgressImport(id), ctx);
Expand Down
190 changes: 183 additions & 7 deletions ydb/core/tx/schemeshard/schemeshard_import__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
#include "schemeshard_import_flow_proposals.h"
#include "schemeshard_import_helpers.h"
#include "schemeshard_import_scheme_getter.h"
#include "schemeshard_import_scheme_query_executor.h"
#include "schemeshard_xxport__helpers.h"
#include "schemeshard_xxport__tx_base.h"

#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/public/api/protos/ydb_import.pb.h>
#include <ydb/public/api/protos/ydb_issue_message.pb.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/lib/ydb_cli/dump/files/files.h>
#include <ydb/public/lib/ydb_cli/dump/util/view_utils.h>

#include <util/generic/algorithm.h>
#include <util/generic/maybe.h>
Expand Down Expand Up @@ -237,6 +241,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
ui64 Id;
TMaybe<ui32> ItemIdx;
TEvPrivate::TEvImportSchemeReady::TPtr SchemeResult = nullptr;
TEvPrivate::TEvImportSchemeQueryResult::TPtr SchemeQueryResult = nullptr;
TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr;
TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr;
TEvIndexBuilder::TEvCreateResponse::TPtr CreateIndexResult = nullptr;
Expand All @@ -255,6 +260,12 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
{
}

explicit TTxProgress(TSelf* self, TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev)
: TXxport::TTxBase(self)
, SchemeQueryResult(ev)
{
}

explicit TTxProgress(TSelf* self, TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev)
: TXxport::TTxBase(self)
, AllocateResult(ev)
Expand Down Expand Up @@ -288,6 +299,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase

if (SchemeResult) {
OnSchemeResult(txc, ctx);
} else if (SchemeQueryResult) {
OnSchemeQueryPreparation(txc);
} else if (AllocateResult) {
OnAllocateResult(txc, ctx);
} else if (ModifyResult) {
Expand Down Expand Up @@ -338,6 +351,65 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
Send(Self->SelfId(), std::move(propose));
}

void ExecutePreparedQuery(TTransactionContext& txc, TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items[itemIdx];

item.SubState = ESubState::Proposed;

LOG_I("TImport::TTxProgress: ExecutePreparedQuery"
<< ": info# " << importInfo->ToString()
<< ", item# " << item.ToString(itemIdx)
<< ", txId# " << txId
);

Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);

auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), Self->TabletID());
auto& record = propose->Record;

auto& modifyScheme = *record.AddTransaction();
modifyScheme = item.PreparedCreationQuery;
modifyScheme.SetInternal(true);

if (importInfo->UserSID) {
record.SetOwner(*importInfo->UserSID);
}
FillOwner(record, item.Permissions);

if (TString error; !FillACL(modifyScheme, item.Permissions, error)) {
NIceDb::TNiceDb db(txc.DB);
return CancelAndPersist(db, importInfo, itemIdx, error, "cannot parse permissions");
}

Send(Self->SelfId(), std::move(propose));
}

void RetryViewsCreation(TImportInfo::TPtr importInfo, NIceDb::TNiceDb& db, const TActorContext& ctx) {
const auto database = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements);
TVector<ui32> retriedItems;
for (ui32 itemIdx : xrange(importInfo->Items.size())) {
auto& item = importInfo->Items[itemIdx];
if (!item.CreationQuery.empty() && item.ViewCreationRetries == 0) {
ctx.Register(CreateSchemeQueryExecutor(
Self->SelfId(), importInfo->Id, itemIdx, item.CreationQuery, database
));

item.State = EState::CreateTable;
item.ViewCreationRetries++;
Self->PersistImportItemState(db, importInfo, itemIdx);

retriedItems.emplace_back(itemIdx);
}
}
if (!retriedItems.empty()) {
LOG_D("TImport::TTxProgress: retry view creation"
<< ": id# " << importInfo->Id
<< ", retried items# " << JoinSeq(", ", retriedItems)
);
}
}

void TransferData(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
Expand Down Expand Up @@ -631,7 +703,14 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
case EState::Transferring:
case EState::BuildIndexes:
if (item.WaitTxId == InvalidTxId) {
AllocateTxId(importInfo, itemIdx);
if (item.CreationQuery.empty() || item.PreparedCreationQuery.HasOperationType()) {
AllocateTxId(importInfo, itemIdx);
} else {
const auto database = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements);
ctx.Register(CreateSchemeQueryExecutor(
Self->SelfId(), importInfo->Id, itemIdx, item.CreationQuery, database
));
}
} else {
SubscribeTx(importInfo, itemIdx);
}
Expand Down Expand Up @@ -719,19 +798,93 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
if (!msg.Success) {
return CancelAndPersist(db, importInfo, msg.ItemIdx, msg.Error, "cannot get scheme");
}
TString error;
if (!CreateTablePropose(Self, TTxId(), importInfo, msg.ItemIdx, error)) {
return CancelAndPersist(db, importInfo, msg.ItemIdx, error, "invalid scheme");
if (item.CreationQuery.empty()) {
TString error;
if (!CreateTablePropose(Self, TTxId(), importInfo, msg.ItemIdx, error)) {
return CancelAndPersist(db, importInfo, msg.ItemIdx, error, "invalid scheme");
}
} else {
// send the creation script to KQP to prepare
const auto database = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements);
const TString source = TStringBuilder()
<< importInfo->Settings.items(msg.ItemIdx).source_prefix() << NYdb::NDump::NFiles::CreateView().FileName;

NYql::TIssues issues;
if (!NYdb::NDump::RewriteCreateViewQuery(item.CreationQuery, database, true, item.DstPathName, source, issues)) {
return CancelAndPersist(db, importInfo, msg.ItemIdx, issues.ToString(), "invalid view creation query");
}
ctx.Register(CreateSchemeQueryExecutor(Self->SelfId(), msg.ImportId, msg.ItemIdx, item.CreationQuery, database));
}

Self->PersistImportItemScheme(db, importInfo, msg.ItemIdx);

item.State = EState::CreateTable;
Self->PersistImportItemState(db, importInfo, msg.ItemIdx);
AllocateTxId(importInfo, msg.ItemIdx);
if (item.CreationQuery.empty()) {
AllocateTxId(importInfo, msg.ItemIdx);
}
}

void OnSchemeQueryPreparation(TTransactionContext& txc) {
Y_ABORT_UNLESS(SchemeQueryResult);
const auto& message = *SchemeQueryResult.Get()->Get();
const TString error = std::holds_alternative<TString>(message.Result) ? std::get<TString>(message.Result) : "";

LOG_D("TImport::TTxProgress: OnSchemeQueryPreparation"
<< ": id# " << message.ImportId
<< ", itemIdx# " << message.ItemIdx
<< ", status# " << message.Status
<< ", error# " << error
);

auto importInfo = Self->Imports.Value(message.ImportId, nullptr);
if (!importInfo) {
LOG_E("TImport::TTxProgress: OnSchemeQueryPreparation received unknown import id"
<< ": id# " << message.ImportId
);
return;
}
if (message.ItemIdx >= importInfo->Items.size()) {
LOG_E("TImport::TTxProgress: OnSchemeQueryPreparation item index out of range"
<< ": id# " << message.ImportId
<< ", item index# " << message.ItemIdx
<< ", number of items# " << importInfo->Items.size()
);
return;
}

NIceDb::TNiceDb db(txc.DB);
auto& item = importInfo->Items[message.ItemIdx];

if (message.Status == Ydb::StatusIds::SCHEME_ERROR && item.ViewCreationRetries == 0) {
// Scheme error happens when the view depends on a table (or a view) that is not yet imported.
// Instead of tracking view dependencies, we simply retry the creation of the view later.
item.State = EState::Waiting;

auto isWaiting = [](const TImportInfo::TItem& item) {
return item.State == EState::Waiting;
};
if (AllOf(importInfo->Items, isWaiting)) {
// All items are waiting? Cancel the import, or we will end up waiting indefinetely.
return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed");
}

Self->PersistImportItemState(db, importInfo, message.ItemIdx);
return;
}

if (message.Status != Ydb::StatusIds::SUCCESS || !error.empty()) {
return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed");
}

if (item.State == EState::CreateTable) {
item.PreparedCreationQuery = std::get<NKikimrSchemeOp::TModifyScheme>(message.Result);
PersistImportItemPreparedCreationQuery(db, importInfo, message.ItemIdx);
AllocateTxId(importInfo, message.ItemIdx);
}
}

void OnAllocateResult(TTransactionContext&, const TActorContext&) {
void OnAllocateResult(TTransactionContext& txc, const TActorContext&) {
Y_ABORT_UNLESS(AllocateResult);

const auto txId = TTxId(AllocateResult->Get()->TxIds.front());
Expand Down Expand Up @@ -762,6 +915,16 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase

switch (item.State) {
case EState::CreateTable:
if (item.PreparedCreationQuery.HasOperationType()) {
ExecutePreparedQuery(txc, importInfo, i, txId);
itemIdx = i;
break;
}
if (!item.CreationQuery.empty()) {
// We only need a txId for modify scheme transactions.
// If an object lacks a PreparedCreationQuery, it doesn't require a txId at this stage.
break;
}
if (!Self->TableProfilesLoaded) {
Self->WaitForTableProfiles(id, i);
} else {
Expand Down Expand Up @@ -945,7 +1108,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
SubscribeTx(importInfo, itemIdx);
}

void OnNotifyResult(TTransactionContext& txc, const TActorContext&) {
void OnNotifyResult(TTransactionContext& txc, const TActorContext& ctx) {
Y_ABORT_UNLESS(CompletedTxId);
LOG_D("TImport::TTxProgress: OnNotifyResult"
<< ": txId# " << CompletedTxId);
Expand Down Expand Up @@ -983,6 +1146,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase

switch (item.State) {
case EState::CreateTable:
if (!item.CreationQuery.empty()) {
item.State = EState::Done;
break;
}
item.State = EState::Transferring;
AllocateTxId(importInfo, itemIdx);
break;
Expand Down Expand Up @@ -1021,6 +1188,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
if (AllOf(importInfo->Items, &TImportInfo::TItem::IsDone)) {
importInfo->State = EState::Done;
importInfo->EndTime = TAppData::TimeProvider->Now();
} else if (AllOf(importInfo->Items, [](const TImportInfo::TItem& item) {
return TImportInfo::TItem::IsDone(item) || item.State == EState::Waiting;
}
)) {
RetryViewsCreation(importInfo, db, ctx);
}

Self->PersistImportItemState(db, importInfo, itemIdx);
Expand All @@ -1047,6 +1219,10 @@ ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemeRe
return new TImport::TTxProgress(this, ev);
}

ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev) {
return new TImport::TTxProgress(this, ev);
}

ITransaction* TSchemeShard::CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev) {
return new TImport::TTxProgress(this, ev);
}
Expand Down
Loading

0 comments on commit c5bff86

Please sign in to comment.