Skip to content

Commit

Permalink
export: correct cancellation state handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jepett0 committed Jan 23, 2025
1 parent 501b8da commit 02c117c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 7 deletions.
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct TSchemeShard::TExport::TTxCancel: public TSchemeShard::TXxport::TTxBase {
}

LOG_D("TExport::TTxCancel, cancelling manually"
<< ", info# " << exportInfo->ToString()
<< ", info: " << exportInfo->ToString()
);

exportInfo->Issue = "Cancelled manually";
Expand Down
26 changes: 20 additions & 6 deletions ydb/core/tx/schemeshard/schemeshard_export__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
const auto& item = exportInfo->Items.at(itemIdx);

if (item.State != EState::Dropping) {
if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable || item.State != EState::Dropping) {
continue;
}

Expand Down Expand Up @@ -690,6 +690,15 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
return itemIdx;
}

void EndExport(TExportInfo::TPtr exportInfo, EState finalState, NIceDb::TNiceDb& db) {
exportInfo->State = finalState;
exportInfo->EndTime = TAppData::TimeProvider->Now();

Self->PersistExportState(db, exportInfo);
SendNotificationsIfFinished(exportInfo);
AuditLogExportEnd(*exportInfo.Get(), Self);
}

void OnAllocateResult() {
Y_ABORT_UNLESS(AllocateResult);

Expand Down Expand Up @@ -998,12 +1007,17 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
Self->PersistExportItemState(db, exportInfo, itemIdx);

if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
exportInfo->State = EState::Done;
exportInfo->EndTime = TAppData::TimeProvider->Now();
EndExport(exportInfo, EState::Done, db);
}
} else if (exportInfo->State == EState::Cancellation) {
item.State = EState::Cancelled;
Self->PersistExportItemState(db, exportInfo, itemIdx);

Self->PersistExportState(db, exportInfo);
SendNotificationsIfFinished(exportInfo);
AuditLogExportEnd(*exportInfo.Get(), Self);
if (AllOf(exportInfo->Items, [](const TExportInfo::TItem& item) {
// on cancellation we wait only for transferring items
return item.State != EState::Transferring;
})) {
EndExport(exportInfo, EState::Cancelled, db);
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_export__forget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,14 @@ struct TSchemeShard::TExport::TTxForget: public TSchemeShard::TXxport::TTxBase {
Self->Exports.erase(exportInfo->Id);
Self->PersistRemoveExport(db, exportInfo);
} else {
LOG_D("TExport::TTxForget, dropping export tables"
<< ", info: " << exportInfo->ToString()
);
exportInfo->WaitTxId = InvalidTxId;
exportInfo->State = TExportInfo::EState::Dropping;
Self->PersistExportState(db, exportInfo);

TVector<ui32> itemsToDrop;
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
auto& item = exportInfo->Items.at(itemIdx);

Expand All @@ -87,10 +91,14 @@ struct TSchemeShard::TExport::TTxForget: public TSchemeShard::TXxport::TTxBase {
const TPath itemPath = TPath::Resolve(ExportItemPathName(Self, exportInfo, itemIdx), Self);
if (itemPath.IsResolved() && !itemPath.IsDeleted()) {
item.State = TExportInfo::EState::Dropping;
itemsToDrop.emplace_back(itemIdx);
}

Self->PersistExportItemState(db, exportInfo, itemIdx);
}
LOG_T("TExport::TTxForget, items to drop"
<< ", items: " << JoinSeq(", ", itemsToDrop)
);

Progress = true;
}
Expand Down

0 comments on commit 02c117c

Please sign in to comment.