Skip to content

Commit

Permalink
Merge branch 'main' into cmakebuild
Browse files Browse the repository at this point in the history
  • Loading branch information
alexv-smirnov committed Jan 7, 2025
2 parents 4c9e889 + 5299845 commit a85ac96
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 22 deletions.
36 changes: 17 additions & 19 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class TCommitOperation {
using TPtr = std::shared_ptr<TCommitOperation>;

bool NeedSyncLocks() const {
return SendingShards.size() && ReceivingShards.size();
return SendingShards.size() || ReceivingShards.size();
}

bool IsPrimary() const {
Expand All @@ -308,29 +308,28 @@ class TCommitOperation {
}

TConclusionStatus Parse(const NEvents::TDataEvents::TEvWrite& evWrite) {
AFL_VERIFY(evWrite.Record.GetLocks().GetLocks().size() >= 1);
auto& locks = evWrite.Record.GetLocks();
auto& lock = evWrite.Record.GetLocks().GetLocks()[0];
TxId = evWrite.Record.GetTxId();
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tx_id", TxId);
const auto& locks = evWrite.Record.GetLocks();
AFL_VERIFY(!locks.GetLocks().empty());
auto& lock = locks.GetLocks()[0];
LockId = lock.GetLockId();
SendingShards = std::set<ui64>(locks.GetSendingShards().begin(), locks.GetSendingShards().end());
ReceivingShards = std::set<ui64>(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end());
if (!ReceivingShards.size() || !SendingShards.size()) {
ReceivingShards.clear();
SendingShards.clear();
} else if (!locks.HasArbiterColumnShard()) {
ArbiterColumnShard = *ReceivingShards.begin();
const bool singleShardTx = SendingShards.empty() && ReceivingShards.empty();
if (!singleShardTx) {
if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) {
return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists");
return TConclusionStatus::Fail("shard is absent in sending and receiving lists");
}
} else {
ArbiterColumnShard = locks.GetArbiterColumnShard();
AFL_VERIFY(ArbiterColumnShard);
if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) {
return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists");
if (locks.HasArbiterColumnShard()) {
ArbiterColumnShard = locks.GetArbiterColumnShard();
} else {
AFL_VERIFY(!ReceivingShards.empty());
ArbiterColumnShard = *ReceivingShards.begin();
}
AFL_VERIFY(ArbiterColumnShard);
}

TxId = evWrite.Record.GetTxId();
LockId = lock.GetLockId();
Generation = lock.GetGeneration();
InternalGenerationCounter = lock.GetCounter();
if (!GetLockId()) {
Expand All @@ -339,15 +338,14 @@ class TCommitOperation {
if (!TxId) {
return TConclusionStatus::Fail("not initialized TxId for commit event");
}
if (evWrite.Record.GetLocks().GetOp() != NKikimrDataEvents::TKqpLocks::Commit) {
if (locks.GetOp() != NKikimrDataEvents::TKqpLocks::Commit) {
return TConclusionStatus::Fail("incorrect message type");
}
return TConclusionStatus::Success();
}

std::unique_ptr<NColumnShard::TEvWriteCommitSyncTransactionOperator> CreateTxOperator(
const NKikimrTxColumnShard::ETransactionKind kind) const {
AFL_VERIFY(ReceivingShards.size());
if (IsPrimary()) {
return std::make_unique<NColumnShard::TEvWriteCommitPrimaryTransactionOperator>(
TFullTxInfo::BuildFake(kind), LockId, ReceivingShards, SendingShards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
for (auto&& i : protoData.GetWaitShardsResultAck()) {
WaitShardsResultAck.emplace(i);
}
AFL_VERIFY(ReceivingShards.empty() == SendingShards.empty());
if (protoData.HasTxBroken()) {
TxBroken = protoData.GetTxBroken();
}
Expand Down Expand Up @@ -160,8 +159,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
};

virtual bool IsTxBroken() const override {
AFL_VERIFY(TxBroken);
return *TxBroken;
return TxBroken.value_or(false);
}

void InitializeRequests(TColumnShard& owner) {
Expand Down

0 comments on commit a85ac96

Please sign in to comment.