From af411bb10f1133d6e7f4c6324a89dde2f745d675 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 10 Jan 2025 21:15:01 +0300 Subject: [PATCH] strict writing data validation (#13190) --- ydb/core/kqp/common/kqp_tx_manager.cpp | 9 +++--- .../kqp/executer_actor/kqp_data_executer.cpp | 1 + .../tx/columnshard/columnshard__write.cpp | 28 +++++++++++++------ .../transactions/operators/ev_write/primary.h | 3 +- 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp index 6c0803d33a7b..c6f94262b634 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.cpp +++ b/ydb/core/kqp/common/kqp_tx_manager.cpp @@ -278,17 +278,17 @@ class TKqpTransactionManager : public IKqpTransactionManager { for (auto& [shardId, shardInfo] : ShardsInfo) { if ((shardInfo.Flags & EAction::WRITE)) { ReceivingShards.insert(shardId); + if (shardInfo.IsOlap) { + receivingColumnShardsSet.insert(shardId); + } if (IsVolatile()) { SendingShards.insert(shardId); } - if (shardInfo.IsOlap) { - sendingColumnShardsSet.insert(shardId); - } } if (!shardInfo.Locks.empty()) { SendingShards.insert(shardId); if (shardInfo.IsOlap) { - receivingColumnShardsSet.insert(shardId); + sendingColumnShardsSet.insert(shardId); } } @@ -325,6 +325,7 @@ class TKqpTransactionManager : public IKqpTransactionManager { auto arbiterIterator = std::begin(shards); std::advance(arbiterIterator, index); ArbiterColumnShard = *arbiterIterator; + ReceivingShards.insert(*ArbiterColumnShard); } ShardsToWaitPrepare = ShardsIds; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index bcb66aa49750..cbd154485695 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2561,6 +2561,7 @@ class TKqpDataExecuter : public TKqpExecuterBase(locks.GetSendingShards().begin(), locks.GetSendingShards().end()); ReceivingShards = std::set(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end()); - const bool singleShardTx = SendingShards.empty() && ReceivingShards.empty(); - if (!singleShardTx) { + if (SendingShards.empty() != ReceivingShards.empty()) { + return TConclusionStatus::Fail("incorrect synchronization data (send/receiving lists)"); + } + if (ReceivingShards.size() && SendingShards.size()) { if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { - return TConclusionStatus::Fail("shard is absent in sending and receiving lists"); + return TConclusionStatus::Fail("current tablet_id is absent in sending and receiving lists"); } - if (locks.HasArbiterColumnShard()) { - ArbiterColumnShard = locks.GetArbiterColumnShard(); - } else { - AFL_VERIFY(!ReceivingShards.empty()); - ArbiterColumnShard = *ReceivingShards.begin(); + if (!locks.HasArbiterColumnShard()) { + return TConclusionStatus::Fail("no arbiter info in request"); + } + ArbiterColumnShard = locks.GetArbiterColumnShard(); + + if (IsPrimary() && !ReceivingShards.contains(ArbiterColumnShard)) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "incorrect arbiter")("arbiter_id", ArbiterColumnShard)( + "receiving", JoinSeq(", ", ReceivingShards))("sending", JoinSeq(", ", SendingShards)); + return TConclusionStatus::Fail("arbiter is absent in receiving lists"); + } + if (!IsPrimary() && (!ReceivingShards.contains(ArbiterColumnShard) || !SendingShards.contains(ArbiterColumnShard))) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "incorrect arbiter")("arbiter_id", ArbiterColumnShard)( + "receiving", JoinSeq(", ", ReceivingShards))("sending", JoinSeq(", ", SendingShards)); + return TConclusionStatus::Fail("arbiter is absent in sending or receiving lists"); } - AFL_VERIFY(ArbiterColumnShard); } Generation = lock.GetGeneration(); diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h index 8fc19f3c8587..6ec0e70811b1 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h @@ -159,7 +159,8 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac }; virtual bool IsTxBroken() const override { - return TxBroken.value_or(false); + AFL_VERIFY(TxBroken); + return *TxBroken; } void InitializeRequests(TColumnShard& owner) {