diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index a39b86c938bf..cb2342bd393d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4433,6 +4433,17 @@ struct TSchemeShard::TTxInit : public TTransactionBase { item.Scheme = scheme; } + if (rowset.HaveValue()) { + item.CreationQuery = rowset.GetValue(); + } + + if (rowset.HaveValue()) { + Y_ABORT_UNLESS(ParseFromStringNoSizeLimit( + item.PreparedCreationQuery, + rowset.GetValue() + )); + } + if (rowset.HaveValue()) { Ydb::Scheme::ModifyPermissionsRequest permissions; Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(permissions, rowset.GetValue())); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 13ecad5dad96..796ef0629eb0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4845,6 +4845,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvImport::TEvForgetImportRequest, Handle); HFuncTraced(TEvImport::TEvListImportsRequest, Handle); HFuncTraced(TEvPrivate::TEvImportSchemeReady, Handle); + HFuncTraced(TEvPrivate::TEvImportSchemeQueryResult, Handle); // } // NImport // namespace NBackup { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 7a3200aea584..5fb9a2ac9706 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1261,6 +1261,7 @@ class TSchemeShard static void PersistImportState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo); static void PersistImportItemState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); static void PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); + static void PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); static void PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx); struct TImport { @@ -1284,6 +1285,7 @@ class TSchemeShard NTabletFlatExecutor::ITransaction* CreateTxProgressImport(ui64 id, const TMaybe& itemIdx = Nothing()); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeReady::TPtr& ev); + NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvIndexBuilder::TEvCreateResponse::TPtr& ev); @@ -1295,6 +1297,7 @@ class TSchemeShard void Handle(TEvImport::TEvForgetImportRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvImport::TEvListImportsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx); void ResumeImports(const TVector& ids, const TActorContext& ctx); // } // NImport diff --git a/ydb/core/tx/schemeshard/schemeshard_import.cpp b/ydb/core/tx/schemeshard/schemeshard_import.cpp index 6254be341664..951b1d63c48c 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import.cpp @@ -175,6 +175,12 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf record.Update( NIceDb::TUpdate(item.Scheme.SerializeAsString()) ); + + if (!item.CreationQuery.empty()) { + record.Update( + NIceDb::TUpdate(item.CreationQuery) + ); + } if (item.Permissions.Defined()) { record.Update( NIceDb::TUpdate(item.Permissions->SerializeAsString()) @@ -185,6 +191,18 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf ); } +void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + const auto& item = importInfo->Items[itemIdx]; + + // persist the prepared modify scheme if it is non-empty (has operation type is interpreted as non-empty) + if (item.PreparedCreationQuery.HasOperationType()) { + db.Table().Key(importInfo->Id, itemIdx).Update( + NIceDb::TUpdate(item.PreparedCreationQuery.SerializeAsString()) + ); + } +} + void TSchemeShard::PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); const auto& item = importInfo->Items.at(itemIdx); @@ -219,6 +237,10 @@ void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TAct Execute(CreateTxProgressImport(ev), ctx); } +void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxProgressImport(ev), ctx); +} + void TSchemeShard::ResumeImports(const TVector& ids, const TActorContext& ctx) { for (const ui64 id : ids) { Execute(CreateTxProgressImport(id), ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index d586276080ec..7871d4751371 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -4,12 +4,16 @@ #include "schemeshard_import_flow_proposals.h" #include "schemeshard_import_helpers.h" #include "schemeshard_import_scheme_getter.h" +#include "schemeshard_import_scheme_query_executor.h" #include "schemeshard_xxport__helpers.h" #include "schemeshard_xxport__tx_base.h" +#include #include #include #include +#include +#include #include #include @@ -237,6 +241,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase ui64 Id; TMaybe ItemIdx; TEvPrivate::TEvImportSchemeReady::TPtr SchemeResult = nullptr; + TEvPrivate::TEvImportSchemeQueryResult::TPtr SchemeQueryResult = nullptr; TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr; TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr; TEvIndexBuilder::TEvCreateResponse::TPtr CreateIndexResult = nullptr; @@ -255,6 +260,12 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase { } + explicit TTxProgress(TSelf* self, TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev) + : TXxport::TTxBase(self) + , SchemeQueryResult(ev) + { + } + explicit TTxProgress(TSelf* self, TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev) : TXxport::TTxBase(self) , AllocateResult(ev) @@ -288,6 +299,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase if (SchemeResult) { OnSchemeResult(txc, ctx); + } else if (SchemeQueryResult) { + OnSchemeQueryPreparation(txc); } else if (AllocateResult) { OnAllocateResult(txc, ctx); } else if (ModifyResult) { @@ -338,6 +351,65 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase Send(Self->SelfId(), std::move(propose)); } + void ExecutePreparedQuery(TTransactionContext& txc, TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) { + Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); + auto& item = importInfo->Items[itemIdx]; + + item.SubState = ESubState::Proposed; + + LOG_I("TImport::TTxProgress: ExecutePreparedQuery" + << ": info# " << importInfo->ToString() + << ", item# " << item.ToString(itemIdx) + << ", txId# " << txId + ); + + Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId); + + auto propose = MakeHolder(ui64(txId), Self->TabletID()); + auto& record = propose->Record; + + auto& modifyScheme = *record.AddTransaction(); + modifyScheme = item.PreparedCreationQuery; + modifyScheme.SetInternal(true); + + if (importInfo->UserSID) { + record.SetOwner(*importInfo->UserSID); + } + FillOwner(record, item.Permissions); + + if (TString error; !FillACL(modifyScheme, item.Permissions, error)) { + NIceDb::TNiceDb db(txc.DB); + return CancelAndPersist(db, importInfo, itemIdx, error, "cannot parse permissions"); + } + + Send(Self->SelfId(), std::move(propose)); + } + + void RetryViewsCreation(TImportInfo::TPtr importInfo, NIceDb::TNiceDb& db, const TActorContext& ctx) { + const auto database = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements); + TVector retriedItems; + for (ui32 itemIdx : xrange(importInfo->Items.size())) { + auto& item = importInfo->Items[itemIdx]; + if (!item.CreationQuery.empty() && item.ViewCreationRetries == 0) { + ctx.Register(CreateSchemeQueryExecutor( + Self->SelfId(), importInfo->Id, itemIdx, item.CreationQuery, database + )); + + item.State = EState::CreateTable; + item.ViewCreationRetries++; + Self->PersistImportItemState(db, importInfo, itemIdx); + + retriedItems.emplace_back(itemIdx); + } + } + if (!retriedItems.empty()) { + LOG_D("TImport::TTxProgress: retry view creation" + << ": id# " << importInfo->Id + << ", retried items# " << JoinSeq(", ", retriedItems) + ); + } + } + void TransferData(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) { Y_ABORT_UNLESS(itemIdx < importInfo->Items.size()); auto& item = importInfo->Items.at(itemIdx); @@ -631,7 +703,14 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase case EState::Transferring: case EState::BuildIndexes: if (item.WaitTxId == InvalidTxId) { - AllocateTxId(importInfo, itemIdx); + if (item.CreationQuery.empty() || item.PreparedCreationQuery.HasOperationType()) { + AllocateTxId(importInfo, itemIdx); + } else { + const auto database = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements); + ctx.Register(CreateSchemeQueryExecutor( + Self->SelfId(), importInfo->Id, itemIdx, item.CreationQuery, database + )); + } } else { SubscribeTx(importInfo, itemIdx); } @@ -719,19 +798,93 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase if (!msg.Success) { return CancelAndPersist(db, importInfo, msg.ItemIdx, msg.Error, "cannot get scheme"); } - TString error; - if (!CreateTablePropose(Self, TTxId(), importInfo, msg.ItemIdx, error)) { - return CancelAndPersist(db, importInfo, msg.ItemIdx, error, "invalid scheme"); + if (item.CreationQuery.empty()) { + TString error; + if (!CreateTablePropose(Self, TTxId(), importInfo, msg.ItemIdx, error)) { + return CancelAndPersist(db, importInfo, msg.ItemIdx, error, "invalid scheme"); + } + } else { + // send the creation script to KQP to prepare + const auto database = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements); + const TString source = TStringBuilder() + << importInfo->Settings.items(msg.ItemIdx).source_prefix() << NYdb::NDump::NFiles::CreateView().FileName; + + NYql::TIssues issues; + if (!NYdb::NDump::RewriteCreateViewQuery(item.CreationQuery, database, true, item.DstPathName, source, issues)) { + return CancelAndPersist(db, importInfo, msg.ItemIdx, issues.ToString(), "invalid view creation query"); + } + ctx.Register(CreateSchemeQueryExecutor(Self->SelfId(), msg.ImportId, msg.ItemIdx, item.CreationQuery, database)); } Self->PersistImportItemScheme(db, importInfo, msg.ItemIdx); item.State = EState::CreateTable; Self->PersistImportItemState(db, importInfo, msg.ItemIdx); - AllocateTxId(importInfo, msg.ItemIdx); + if (item.CreationQuery.empty()) { + AllocateTxId(importInfo, msg.ItemIdx); + } + } + + void OnSchemeQueryPreparation(TTransactionContext& txc) { + Y_ABORT_UNLESS(SchemeQueryResult); + const auto& message = *SchemeQueryResult.Get()->Get(); + const TString error = std::holds_alternative(message.Result) ? std::get(message.Result) : ""; + + LOG_D("TImport::TTxProgress: OnSchemeQueryPreparation" + << ": id# " << message.ImportId + << ", itemIdx# " << message.ItemIdx + << ", status# " << message.Status + << ", error# " << error + ); + + auto importInfo = Self->Imports.Value(message.ImportId, nullptr); + if (!importInfo) { + LOG_E("TImport::TTxProgress: OnSchemeQueryPreparation received unknown import id" + << ": id# " << message.ImportId + ); + return; + } + if (message.ItemIdx >= importInfo->Items.size()) { + LOG_E("TImport::TTxProgress: OnSchemeQueryPreparation item index out of range" + << ": id# " << message.ImportId + << ", item index# " << message.ItemIdx + << ", number of items# " << importInfo->Items.size() + ); + return; + } + + NIceDb::TNiceDb db(txc.DB); + auto& item = importInfo->Items[message.ItemIdx]; + + if (message.Status == Ydb::StatusIds::SCHEME_ERROR && item.ViewCreationRetries == 0) { + // Scheme error happens when the view depends on a table (or a view) that is not yet imported. + // Instead of tracking view dependencies, we simply retry the creation of the view later. + item.State = EState::Waiting; + + auto isWaiting = [](const TImportInfo::TItem& item) { + return item.State == EState::Waiting; + }; + if (AllOf(importInfo->Items, isWaiting)) { + // All items are waiting? Cancel the import, or we will end up waiting indefinetely. + return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed"); + } + + Self->PersistImportItemState(db, importInfo, message.ItemIdx); + return; + } + + if (message.Status != Ydb::StatusIds::SUCCESS || !error.empty()) { + return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed"); + } + + if (item.State == EState::CreateTable) { + item.PreparedCreationQuery = std::get(message.Result); + PersistImportItemPreparedCreationQuery(db, importInfo, message.ItemIdx); + AllocateTxId(importInfo, message.ItemIdx); + } } - void OnAllocateResult(TTransactionContext&, const TActorContext&) { + void OnAllocateResult(TTransactionContext& txc, const TActorContext&) { Y_ABORT_UNLESS(AllocateResult); const auto txId = TTxId(AllocateResult->Get()->TxIds.front()); @@ -762,6 +915,16 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase switch (item.State) { case EState::CreateTable: + if (item.PreparedCreationQuery.HasOperationType()) { + ExecutePreparedQuery(txc, importInfo, i, txId); + itemIdx = i; + break; + } + if (!item.CreationQuery.empty()) { + // We only need a txId for modify scheme transactions. + // If an object lacks a PreparedCreationQuery, it doesn't require a txId at this stage. + break; + } if (!Self->TableProfilesLoaded) { Self->WaitForTableProfiles(id, i); } else { @@ -945,7 +1108,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase SubscribeTx(importInfo, itemIdx); } - void OnNotifyResult(TTransactionContext& txc, const TActorContext&) { + void OnNotifyResult(TTransactionContext& txc, const TActorContext& ctx) { Y_ABORT_UNLESS(CompletedTxId); LOG_D("TImport::TTxProgress: OnNotifyResult" << ": txId# " << CompletedTxId); @@ -983,6 +1146,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase switch (item.State) { case EState::CreateTable: + if (!item.CreationQuery.empty()) { + item.State = EState::Done; + break; + } item.State = EState::Transferring; AllocateTxId(importInfo, itemIdx); break; @@ -1021,6 +1188,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase if (AllOf(importInfo->Items, &TImportInfo::TItem::IsDone)) { importInfo->State = EState::Done; importInfo->EndTime = TAppData::TimeProvider->Now(); + } else if (AllOf(importInfo->Items, [](const TImportInfo::TItem& item) { + return TImportInfo::TItem::IsDone(item) || item.State == EState::Waiting; + } + )) { + RetryViewsCreation(importInfo, db, ctx); } Self->PersistImportItemState(db, importInfo, itemIdx); @@ -1047,6 +1219,10 @@ ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemeRe return new TImport::TTxProgress(this, ev); } +ITransaction* TSchemeShard::CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev) { + return new TImport::TTxProgress(this, ev); +} + ITransaction* TSchemeShard::CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev) { return new TImport::TTxProgress(this, ev); } diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp index ab722b2c5e5c..84b740ad9947 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp @@ -10,6 +10,7 @@ #include #include +#include #include @@ -32,9 +33,9 @@ class TSchemeGetter: public TActorBootstrapped { return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/metadata.json"; } - static TString SchemeKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) { + static TString SchemeKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, TStringBuf filename) { Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size()); - return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/scheme.pb"; + return TStringBuilder() << settings.items(itemIdx).source_prefix() << '/' << filename; } static TString PermissionsKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) { @@ -42,6 +43,14 @@ class TSchemeGetter: public TActorBootstrapped { return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/permissions.pb"; } + static bool IsView(TStringBuf schemeKey) { + return schemeKey.EndsWith(NYdb::NDump::NFiles::CreateView().FileName); + } + + static bool NoObjectFound(Aws::S3::S3Errors errorType) { + return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY; + } + void HeadObject(const TString& key) { auto request = Model::HeadObjectRequest() .WithKey(key); @@ -71,6 +80,13 @@ class TSchemeGetter: public TActorBootstrapped { << ": self# " << SelfId() << ", result# " << result); + if (!IsView(SchemeKey) && NoObjectFound(result.GetError().GetErrorType())) { + // try search for a view + SchemeKey = SchemeKeyFromSettings(ImportInfo->Settings, ItemIdx, NYdb::NDump::NFiles::CreateView().FileName); + HeadObject(SchemeKey); + return; + } + if (!CheckResult(result, "HeadObject")) { return; } @@ -86,8 +102,7 @@ class TSchemeGetter: public TActorBootstrapped { << ": self# " << SelfId() << ", result# " << result); - if (result.GetError().GetErrorType() == S3Errors::RESOURCE_NOT_FOUND - || result.GetError().GetErrorType() == S3Errors::NO_SUCH_KEY) { + if (NoObjectFound(result.GetError().GetErrorType())) { Reply(); // permissions are optional return; } else if (!CheckResult(result, "HeadObject")) { @@ -176,9 +191,13 @@ class TSchemeGetter: public TActorBootstrapped { LOG_T("Trying to parse scheme" << ": self# " << SelfId() + << ", itemIdx# " << ItemIdx + << ", schemeKey# " << SchemeKey << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n")); - if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &item.Scheme)) { + if (IsView(SchemeKey)) { + item.CreationQuery = msg.Body; + } else if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &item.Scheme)) { return Reply(false, "Cannot parse scheme"); } @@ -230,7 +249,7 @@ class TSchemeGetter: public TActorBootstrapped { StartValidatingChecksum(PermissionsKey, msg.Body, nextStep); } else { nextStep(); - } + } } void HandleChecksum(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) { @@ -351,7 +370,7 @@ class TSchemeGetter: public TActorBootstrapped { , ImportInfo(importInfo) , ItemIdx(itemIdx) , MetadataKey(MetadataKeyFromSettings(importInfo->Settings, itemIdx)) - , SchemeKey(SchemeKeyFromSettings(importInfo->Settings, itemIdx)) + , SchemeKey(SchemeKeyFromSettings(importInfo->Settings, itemIdx, "scheme.pb")) , PermissionsKey(PermissionsKeyFromSettings(importInfo->Settings, itemIdx)) , Retries(importInfo->Settings.number_of_retries()) , NeedDownloadPermissions(!importInfo->Settings.no_acl()) @@ -411,7 +430,7 @@ class TSchemeGetter: public TActorBootstrapped { const ui32 ItemIdx; const TString MetadataKey; - const TString SchemeKey; + TString SchemeKey; const TString PermissionsKey; const ui32 Retries; diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp new file mode 100644 index 000000000000..e4137e397c47 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp @@ -0,0 +1,179 @@ +#include "schemeshard_import_helpers.h" +#include "schemeshard_import_scheme_query_executor.h" +#include "schemeshard_private.h" + +#include +#include +#include +#include +#include +#include + +#include + +using namespace NKikimr::NKqp; + +namespace NKikimr::NSchemeShard { + +class TSchemeQueryExecutor: public TActorBootstrapped { + + std::unique_ptr BuildCompileRequest() { + UserToken.Reset(MakeIntrusive("")); + + TKqpQuerySettings querySettings(NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY); + querySettings.IsInternalCall = true; + + GUCSettings = std::make_shared(); + + TKqpQueryId query( + TString(DefaultKikimrPublicClusterName), // cluster + Database, // database + "", // database id + SchemeQuery, // query text + querySettings, // query settings + nullptr, // query parameter types + *GUCSettings // GUC settings + ); + + // TO DO: pass cancel after from the import operation + auto deadline = TAppData::TimeProvider->Now() + TDuration::Minutes(1); + TKqpCounters kqpCounters(AppData()->Counters, &TlsActivationContext->AsActorContext()); + IsInterestedInResult = std::make_shared>(true); + UserRequestContext.Reset(MakeIntrusive()); + + return std::make_unique( + UserToken, // user token + "", // client address + Nothing(), // uid + query, // TKqpQueryId + false, // keep in cache + true, // is query action == prepare? + false, // per statement result + deadline, // deadline + kqpCounters.GetDbCounters(Database), // db counters + GUCSettings, // GUC settings + Nothing(), // application name + IsInterestedInResult, // is still interested in result? + UserRequestContext // user request context + ); + } + + void PrepareSchemeQuery() { + if (!Send(MakeKqpCompileServiceID(SelfId().NodeId()), BuildCompileRequest().release())) { + return Reply(Ydb::StatusIds::INTERNAL_ERROR, "cannot send query request"); + } + this->Become(&TThis::StateExecute); + } + + void HandleCompileResponse(const TEvKqp::TEvCompileResponse::TPtr& ev) { + const auto* result = ev->Get()->CompileResult.get(); + if (!result) { + // TO DO: figure out the proper status for this situation. + // Probably, just change the reply event to contain a plain bool status. + return Reply(Ydb::StatusIds::BAD_REQUEST, "no compile result"); + } + + LOG_D("TSchemeQueryExecutor HandleCompileResponse" + << ", self: " << this->SelfId() + << ", status: " << result->Status; + ); + + if (result->Status != Ydb::StatusIds::SUCCESS) { + return Reply(result->Status, result->Issues.ToOneLineString()); + } + if (!result->PreparedQuery) { + return Reply(Ydb::StatusIds::BAD_REQUEST, "no prepared query"); + } + const auto& transactions = result->PreparedQuery->GetPhysicalQuery().GetTransactions(); + if (transactions.empty()) { + return Reply(Ydb::StatusIds::BAD_REQUEST, "empty transactions"); + } + if (!transactions[0].HasSchemeOperation()) { + return Reply(Ydb::StatusIds::BAD_REQUEST, "no scheme operations"); + } + if (!transactions[0].GetSchemeOperation().HasCreateView()) { + return Reply(Ydb::StatusIds::BAD_REQUEST, "no create view operation"); + } + const auto& createView = transactions[0].GetSchemeOperation().GetCreateView(); + Reply(result->Status, createView); + } + + void Reply(Ydb::StatusIds::StatusCode status, std::variant result) { + auto logMessage = TStringBuilder() << "TSchemeQueryExecutor Reply" + << ", self: " << this->SelfId() + << ", success: " << status; + LOG_I(logMessage); + + std::visit([&](T& rresult) { + if constexpr (std::is_same_v) { + logMessage << ", error: " << rresult; + } else if constexpr (std::is_same_v) { + logMessage << ", prepared query: " << rresult.ShortDebugString().Quote(); + } + LOG_D(logMessage); + Send(ReplyTo, new TEvPrivate::TEvImportSchemeQueryResult(ImportId, ItemIdx, status, std::move(rresult))); + }, result); + + PassAway(); + } + +public: + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::IMPORT_SCHEME_QUERY_EXECUTOR; + } + + TSchemeQueryExecutor( + TActorId replyTo, + ui64 importId, + ui32 itemIdx, + const TString& schemeQuery, + const TString& database + ) + : ReplyTo(replyTo) + , ImportId(importId) + , ItemIdx(itemIdx) + , SchemeQuery(schemeQuery) + , Database(database) + { + } + + void Bootstrap() { + PrepareSchemeQuery(); + } + + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + sFunc(TEvents::TEvPoisonPill, PassAway); + } + } + + STATEFN(StateExecute) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvKqp::TEvCompileResponse, HandleCompileResponse); + default: + return StateBase(ev); + } + } + +private: + + TActorId ReplyTo; + ui64 ImportId; + ui32 ItemIdx; + TString SchemeQuery; + TString Database; + + // Pointer type event arguments need to live until we receive the compilation response. + TIntrusiveConstPtr UserToken; + TGUCSettings::TPtr GUCSettings; + std::shared_ptr> IsInterestedInResult; + TIntrusivePtr UserRequestContext; + +}; // TSchemeQueryExecutor + +IActor* CreateSchemeQueryExecutor(NActors::TActorId replyTo, ui64 importId, ui32 itemIdx, const TString& schemeQuery, const TString& database) { + return new TSchemeQueryExecutor(replyTo, importId, itemIdx, schemeQuery, database); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.h b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.h new file mode 100644 index 000000000000..550c9c145437 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.h @@ -0,0 +1,9 @@ +#pragma once + +#include + +namespace NKikimr::NSchemeShard { + +NActors::IActor* CreateSchemeQueryExecutor(NActors::TActorId replyTo, ui64 importId, ui32 itemIdx, const TString& creationQuery, const TString& database); + +} diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 9a026c8f3e8b..5b1364339a22 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2825,6 +2825,8 @@ struct TImportInfo: public TSimpleRefCount { TString DstPathName; TPathId DstPathId; Ydb::Table::CreateTableRequest Scheme; + TString CreationQuery; + NKikimrSchemeOp::TModifyScheme PreparedCreationQuery; TMaybeFail Permissions; NBackup::TMetadata Metadata; @@ -2833,6 +2835,7 @@ struct TImportInfo: public TSimpleRefCount { TTxId WaitTxId = InvalidTxId; int NextIndexIdx = 0; TString Issue; + int ViewCreationRetries = 0; TItem() = default; diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index ba7b55645af1..2d23880d1cee 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -1,8 +1,10 @@ #pragma once #include "schemeshard_identificators.h" +#include #include #include +#include #include @@ -18,6 +20,7 @@ namespace TEvPrivate { EvRunConditionalErase, EvIndexBuildBilling, EvImportSchemeReady, + EvImportSchemeQueryResult, EvExportSchemeUploadResult, EvServerlessStorageBilling, EvCleanDroppedPaths, @@ -99,6 +102,29 @@ namespace TEvPrivate { {} }; + struct TEvImportSchemeQueryResult: public TEventLocal { + const ui64 ImportId; + const ui32 ItemIdx; + const Ydb::StatusIds::StatusCode Status; + const std::variant Result; + + // failed query + TEvImportSchemeQueryResult(ui64 id, ui32 itemIdx, Ydb::StatusIds::StatusCode status, TString&& error) + : ImportId(id) + , ItemIdx(itemIdx) + , Status(status) + , Result(error) + {} + + // successful query + TEvImportSchemeQueryResult(ui64 id, ui32 itemIdx, Ydb::StatusIds::StatusCode status, NKikimrSchemeOp::TModifyScheme&& preparedQuery) + : ImportId(id) + , ItemIdx(itemIdx) + , Status(status) + , Result(preparedQuery) + {} + }; + struct TEvExportSchemeUploadResult: public TEventLocal { ui64 ExportId; ui32 ItemIdx; diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 527d41668adf..f4537f8a8535 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1553,6 +1553,9 @@ struct Schema : NIceDb::Schema { struct DstPathOwnerId : Column<4, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; struct DstPathLocalId : Column<5, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; struct Scheme : Column<6, NScheme::NTypeIds::String> {}; + struct CreationQuery : Column<13, NScheme::NTypeIds::Utf8> {}; + // NKikimrSchemeOp::TModifyScheme serialized as string + struct PreparedCreationQuery : Column<14, NScheme::NTypeIds::String> {}; struct Permissions : Column<11, NScheme::NTypeIds::String> {}; struct Metadata : Column<12, NScheme::NTypeIds::String> {}; @@ -1569,6 +1572,8 @@ struct Schema : NIceDb::Schema { DstPathOwnerId, DstPathLocalId, Scheme, + CreationQuery, + PreparedCreationQuery, Permissions, Metadata, State, diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 1cf24d74eb11..d1723c47eb65 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -233,6 +233,7 @@ SRCS( schemeshard_import__get.cpp schemeshard_import__list.cpp schemeshard_import_flow_proposals.cpp + schemeshard_import_scheme_query_executor.cpp schemeshard_info_types.cpp schemeshard_info_types.h schemeshard_path.cpp @@ -307,6 +308,7 @@ PEERDIR( ydb/library/login ydb/library/login/protos ydb/library/protobuf_printer + ydb/public/lib/ydb_cli/dump/files ydb/public/lib/ydb_cli/dump/util yql/essentials/minikql yql/essentials/providers/common/proto diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 75d3557e83c8..bff8c7ac8df5 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1074,5 +1074,6 @@ message TActivity { BS_GROUP_GETBLOCK = 653; HTTP_MON_INDEX_SERVICE = 654; HTTP_MON_AUTHORIZED_ACTOR_REQUEST = 655; + IMPORT_SCHEME_QUERY_EXECUTOR = 656; }; }; diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 151a153186cb..82fff12f36e2 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -6179,6 +6179,16 @@ "ColumnId": 12, "ColumnName": "Metadata", "ColumnType": "String" + }, + { + "ColumnId": 13, + "ColumnName": "CreationQuery", + "ColumnType": "Utf8" + }, + { + "ColumnId": 14, + "ColumnName": "PreparedCreationQuery", + "ColumnType": "String" } ], "ColumnsDropped": [], @@ -6196,7 +6206,9 @@ 9, 10, 11, - 12 + 12, + 13, + 14 ], "RoomID": 0, "Codec": 0,