diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 5f2cdde07cb4..747f102a6403 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -295,7 +295,7 @@ class TCommitOperation { using TPtr = std::shared_ptr; bool NeedSyncLocks() const { - return SendingShards.size() && ReceivingShards.size(); + return SendingShards.size() || ReceivingShards.size(); } bool IsPrimary() const { @@ -308,29 +308,28 @@ class TCommitOperation { } TConclusionStatus Parse(const NEvents::TDataEvents::TEvWrite& evWrite) { - AFL_VERIFY(evWrite.Record.GetLocks().GetLocks().size() >= 1); - auto& locks = evWrite.Record.GetLocks(); - auto& lock = evWrite.Record.GetLocks().GetLocks()[0]; + TxId = evWrite.Record.GetTxId(); + NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tx_id", TxId); + const auto& locks = evWrite.Record.GetLocks(); + AFL_VERIFY(!locks.GetLocks().empty()); + auto& lock = locks.GetLocks()[0]; + LockId = lock.GetLockId(); SendingShards = std::set(locks.GetSendingShards().begin(), locks.GetSendingShards().end()); ReceivingShards = std::set(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end()); - if (!ReceivingShards.size() || !SendingShards.size()) { - ReceivingShards.clear(); - SendingShards.clear(); - } else if (!locks.HasArbiterColumnShard()) { - ArbiterColumnShard = *ReceivingShards.begin(); + const bool singleShardTx = SendingShards.empty() && ReceivingShards.empty(); + if (!singleShardTx) { if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { - return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); + return TConclusionStatus::Fail("shard is absent in sending and receiving lists"); } - } else { - ArbiterColumnShard = locks.GetArbiterColumnShard(); - AFL_VERIFY(ArbiterColumnShard); - if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { - return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); + if (locks.HasArbiterColumnShard()) { + ArbiterColumnShard = locks.GetArbiterColumnShard(); + } else { + AFL_VERIFY(!ReceivingShards.empty()); + ArbiterColumnShard = *ReceivingShards.begin(); } + AFL_VERIFY(ArbiterColumnShard); } - TxId = evWrite.Record.GetTxId(); - LockId = lock.GetLockId(); Generation = lock.GetGeneration(); InternalGenerationCounter = lock.GetCounter(); if (!GetLockId()) { @@ -339,7 +338,7 @@ class TCommitOperation { if (!TxId) { return TConclusionStatus::Fail("not initialized TxId for commit event"); } - if (evWrite.Record.GetLocks().GetOp() != NKikimrDataEvents::TKqpLocks::Commit) { + if (locks.GetOp() != NKikimrDataEvents::TKqpLocks::Commit) { return TConclusionStatus::Fail("incorrect message type"); } return TConclusionStatus::Success(); @@ -347,7 +346,6 @@ class TCommitOperation { std::unique_ptr CreateTxOperator( const NKikimrTxColumnShard::ETransactionKind kind) const { - AFL_VERIFY(ReceivingShards.size()); if (IsPrimary()) { return std::make_unique( TFullTxInfo::BuildFake(kind), LockId, ReceivingShards, SendingShards); diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h index 0a8137fd9d31..8fc19f3c8587 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h @@ -63,7 +63,6 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac for (auto&& i : protoData.GetWaitShardsResultAck()) { WaitShardsResultAck.emplace(i); } - AFL_VERIFY(ReceivingShards.empty() == SendingShards.empty()); if (protoData.HasTxBroken()) { TxBroken = protoData.GetTxBroken(); } @@ -160,8 +159,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac }; virtual bool IsTxBroken() const override { - AFL_VERIFY(TxBroken); - return *TxBroken; + return TxBroken.value_or(false); } void InitializeRequests(TColumnShard& owner) {