Skip to content

Commit

Permalink
export: scheme uploader separate describe method
Browse files Browse the repository at this point in the history
+ additional code simplification and readability improvements
  • Loading branch information
jepett0 committed Jan 22, 2025
1 parent 1bc3c40 commit fd7d697
Showing 1 changed file with 21 additions and 27 deletions.
48 changes: 21 additions & 27 deletions ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ class TSchemeUploader: public TActorBootstrapped<TSchemeUploader> {
using TEvExternalStorage = NWrappers::TEvExternalStorage;
using TPutObjectResult = Aws::Utils::Outcome<Aws::S3::Model::PutObjectResult, Aws::S3::S3Error>;

void GetDescription() {
Send(SchemeShard, new TEvSchemeShard::TEvDescribeScheme(SourcePathId));
this->Become(&TThis::StateDescribe);
}

static TString BuildViewScheme(const TString& path, const NKikimrSchemeOp::TViewDescription& viewDescription, const TString& backupRoot, TString& error) {
NYql::TIssues issues;
auto scheme = NYdb::NDump::BuildCreateViewQuery(viewDescription.GetName(), path, viewDescription.GetQueryText(), backupRoot, issues);
Expand Down Expand Up @@ -74,23 +79,7 @@ class TSchemeUploader: public TActorBootstrapped<TSchemeUploader> {
return;
}

Restart();
}

void Restart() {
if (Attempt) {
this->Send(std::exchange(StorageOperator, TActorId()), new TEvents::TEvPoisonPill());
}

StorageOperator = this->RegisterWithSameMailbox(
NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator())
);

if (!SchemeUploaded) {
UploadScheme();
} else if (!PermissionsUploaded) {
UploadPermissions();
}
UploadScheme();
}

void UploadScheme() {
Expand All @@ -99,6 +88,12 @@ class TSchemeUploader: public TActorBootstrapped<TSchemeUploader> {
if (!Scheme) {
return Finish(false, "cannot infer scheme");
}
if (Attempt == 0) {
StorageOperator = this->RegisterWithSameMailbox(
NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator())
);
}

auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(Sprintf("%s/create_view.sql", DestinationPrefix.c_str()));

Expand Down Expand Up @@ -191,6 +186,14 @@ class TSchemeUploader: public TActorBootstrapped<TSchemeUploader> {
return false;
}

void RetryOrFinish(const Aws::S3::S3Error& error) {
if (Attempt < Retries && ShouldRetry(error)) {
Retry();
} else {
Finish(false, TStringBuilder() << "S3 error: " << error.GetMessage());
}
}

static bool ShouldRetry(const Aws::S3::S3Error& error) {
if (error.ShouldRetry()) {
return true;
Expand All @@ -204,14 +207,6 @@ class TSchemeUploader: public TActorBootstrapped<TSchemeUploader> {
this->Schedule(Delay + random, new TEvents::TEvWakeup());
}

void RetryOrFinish(const Aws::S3::S3Error& error) {
if (Attempt < Retries && ShouldRetry(error)) {
Retry();
} else {
Finish(false, TStringBuilder() << "S3 error: " << error.GetMessage());
}
}

void Finish(bool success = true, const TString& error = TString()) {
LOG_I("Finish"
<< ", self: " << this->SelfId()
Expand Down Expand Up @@ -263,8 +258,7 @@ class TSchemeUploader: public TActorBootstrapped<TSchemeUploader> {
return;
}
if (!Scheme || !Permissions) {
Send(SchemeShard, new TEvSchemeShard::TEvDescribeScheme(SourcePathId));
this->Become(&TThis::StateDescribe);
GetDescription();
return;
}
if (!SchemeUploaded) {
Expand Down

0 comments on commit fd7d697

Please sign in to comment.