Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Jan 7, 2025
1 parent 46f2ea9 commit 5738349
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,28 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
auto& index = Self->MutableIndexAs<NOlap::TColumnEngineForLogs>();
const auto minReadSnapshot = Self->GetMinReadSnapshot();

std::optional<EOperationBehaviour> packBehaviour;
for (auto&& writeResult : Pack.GetWriteResults()) {
const auto& writeMeta = writeResult.GetWriteMeta();
AFL_VERIFY(!writeMeta.HasLongTxId());
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
if (!packBehaviour) {
packBehaviour = op->GetBehaviour();
if (!PackBehaviour) {
PackBehaviour = op->GetBehaviour();
} else {
AFL_VERIFY(packBehaviour == op->GetBehaviour());
AFL_VERIFY(PackBehaviour == op->GetBehaviour());
}
}
AFL_VERIFY(!!packBehaviour);
AFL_VERIFY(!!PackBehaviour);
auto& granule = index.MutableGranuleVerified(Pack.GetPathId());
for (auto&& portion : Pack.MutablePortions()) {
if (packBehaviour == EOperationBehaviour::NoTxWrite) {
if (PackBehaviour == EOperationBehaviour::NoTxWrite) {
static TAtomicCounter Counter = 0;
portion.GetPortionInfoConstructor()->MutablePortionConstructor().SetInsertWriteId((TInsertWriteId)Counter.Inc());
} else {
portion.GetPortionInfoConstructor()->MutablePortionConstructor().SetInsertWriteId(Self->InsertTable->BuildNextWriteId(txc));
}
InsertWriteIds.emplace_back(portion.GetPortionInfoConstructor()->GetPortionConstructor().GetInsertWriteIdVerified());
portion.Finalize(Self, txc);
if (packBehaviour == EOperationBehaviour::NoTxWrite) {
if (PackBehaviour == EOperationBehaviour::NoTxWrite) {
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo());
} else {
granule.InsertPortionOnExecute(txc, portion.GetPortionInfo());
Expand Down Expand Up @@ -116,16 +115,18 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
for (auto&& portion : Pack.GetPortions()) {
granule.InsertPortionOnComplete(portion.GetPortionInfo(), index);
}
if (PackBehaviour == EOperationBehaviour::NoTxWrite) {
for (auto&& i : InsertWriteIds) {
granule.CommitPortionOnComplete(i, index);
}
}
for (auto&& writeResult : Pack.GetWriteResults()) {
const auto& writeMeta = writeResult.GetWriteMeta();
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
AFL_VERIFY(CommitSnapshot);
Self->OperationsManager->AddTemporaryTxLink(op->GetLockId());
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), *CommitSnapshot);
for (auto&& i : InsertWriteIds) {
granule.CommitPortionOnComplete(i, index);
}
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
}
Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class TTxBlobsWritingFinished: public TExtendedTransactionBase {

std::vector<TInsertWriteId> InsertWriteIds;
std::vector<TReplyInfo> Results;
std::optional<EOperationBehaviour> PackBehaviour;

public:
TTxBlobsWritingFinished(TColumnShard* self, const NKikimrProto::EReplyStatus writeStatus,
Expand Down

0 comments on commit 5738349

Please sign in to comment.