Skip to content

Commit

Permalink
correct commit processing for immediate writing (ydb-platform#13188)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 21, 2025
1 parent 5fa1a94 commit d715e5b
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,28 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
auto& index = Self->MutableIndexAs<NOlap::TColumnEngineForLogs>();
const auto minReadSnapshot = Self->GetMinReadSnapshot();

std::optional<EOperationBehaviour> packBehaviour;
for (auto&& writeResult : Pack.GetWriteResults()) {
const auto& writeMeta = writeResult.GetWriteMeta();
AFL_VERIFY(!writeMeta.HasLongTxId());
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
if (!packBehaviour) {
packBehaviour = op->GetBehaviour();
if (!PackBehaviour) {
PackBehaviour = op->GetBehaviour();
} else {
AFL_VERIFY(packBehaviour == op->GetBehaviour());
AFL_VERIFY(PackBehaviour == op->GetBehaviour());
}
}
AFL_VERIFY(!!packBehaviour);
std::vector<TInsertWriteId> insertWriteIds;
AFL_VERIFY(!!PackBehaviour);
auto& granule = index.MutableGranuleVerified(Pack.GetPathId());
for (auto&& portion : Pack.MutablePortions()) {
if (packBehaviour == EOperationBehaviour::NoTxWrite) {
if (PackBehaviour == EOperationBehaviour::NoTxWrite) {
static TAtomicCounter Counter = 0;
portion.GetPortionInfoConstructor()->MutablePortionConstructor().SetInsertWriteId((TInsertWriteId)Counter.Inc());
} else {
portion.GetPortionInfoConstructor()->MutablePortionConstructor().SetInsertWriteId(Self->InsertTable->BuildNextWriteId(txc));
}
insertWriteIds.emplace_back(portion.GetPortionInfoConstructor()->GetPortionConstructor().GetInsertWriteIdVerified());
InsertWriteIds.emplace_back(portion.GetPortionInfoConstructor()->GetPortionConstructor().GetInsertWriteIdVerified());
portion.Finalize(Self, txc);
if (packBehaviour == EOperationBehaviour::NoTxWrite) {
if (PackBehaviour == EOperationBehaviour::NoTxWrite) {
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo());
} else {
granule.InsertPortionOnExecute(txc, portion.GetPortionInfo());
Expand All @@ -61,15 +59,11 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
AFL_VERIFY(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot));
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
if (operationIds.size() == 1) {
operation->OnWriteFinish(txc, insertWriteIds, true);
} else {
operation->OnWriteFinish(txc, {}, true);
}
operation->OnWriteFinish(txc, {}, true);
} else {
operation->OnWriteFinish(txc, insertWriteIds, false);
operation->OnWriteFinish(txc, InsertWriteIds, false);
}
Self->OperationsManager->LinkInsertWriteIdToOperationWriteId(insertWriteIds, operation->GetWriteId());
Self->OperationsManager->LinkInsertWriteIdToOperationWriteId(InsertWriteIds, operation->GetWriteId());
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
Expand Down Expand Up @@ -121,6 +115,11 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
for (auto&& portion : Pack.GetPortions()) {
granule.InsertPortionOnComplete(portion.GetPortionInfo(), index);
}
if (PackBehaviour == EOperationBehaviour::NoTxWrite) {
for (auto&& i : InsertWriteIds) {
granule.CommitPortionOnComplete(i, index);
}
}
for (auto&& writeResult : Pack.GetWriteResults()) {
const auto& writeMeta = writeResult.GetWriteMeta();
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ class TTxBlobsWritingFinished: public TExtendedTransactionBase {
}
};

std::vector<TInsertWriteId> InsertWriteIds;
std::vector<TReplyInfo> Results;
std::optional<EOperationBehaviour> PackBehaviour;

public:
TTxBlobsWritingFinished(TColumnShard* self, const NKikimrProto::EReplyStatus writeStatus,
Expand Down

0 comments on commit d715e5b

Please sign in to comment.