Skip to content

Commit

Permalink
Prepare & execute topics in BufferWriteActor (ydb-platform#12464)
Browse files Browse the repository at this point in the history
  • Loading branch information
dahbka-lis authored Dec 16, 2024
1 parent 04c46d2 commit 8efacf9
Show file tree
Hide file tree
Showing 9 changed files with 810 additions and 318 deletions.
49 changes: 47 additions & 2 deletions ydb/core/kqp/common/kqp_tx_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,32 @@ class TKqpTransactionManager : public IKqpTransactionManager {
}
}

void AddTopic(ui64 topicId, const TString& path) override {
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING);
ShardsIds.insert(topicId);
auto& shardInfo = ShardsInfo[topicId];

const auto [stringsIter, _] = TablePathes.insert(path);
const TStringBuf pathBuf = *stringsIter;
shardInfo.Pathes.insert(pathBuf);
}

void AddTopicsToShards() override {
if (!HasTopics()) {
return;
}

for (auto& topicId : GetTopicOperations().GetSendingTabletIds()) {
AddTopic(topicId, *GetTopicOperations().GetTabletName(topicId));
AddAction(topicId, EAction::READ);
}

for (auto& topicId : GetTopicOperations().GetReceivingTabletIds()) {
AddTopic(topicId, *GetTopicOperations().GetTabletName(topicId));
AddAction(topicId, EAction::WRITE);
}
}

bool AddLock(ui64 shardId, const NKikimrDataEvents::TLock& lockProto) override {
Y_ABORT_UNLESS(State == ETransactionState::COLLECTING);
TKqpLock lock(lockProto);
Expand Down Expand Up @@ -124,6 +150,22 @@ class TKqpTransactionManager : public IKqpTransactionManager {
ShardsInfo.at(shardId).State = state;
}

void SetTopicOperations(NTopic::TTopicOperations&& topicOperations) override {
TopicOperations = std::move(topicOperations);
}

const NTopic::TTopicOperations& GetTopicOperations() const override {
return TopicOperations;
}

void BuildTopicTxs(NTopic::TTopicOperationTransactions& txs) override {
TopicOperations.BuildTopicTxs(txs);
}

bool HasTopics() const override {
return GetTopicOperations().GetSize() != 0;
}

TVector<NKikimrDataEvents::TLock> GetLocks() const override {
TVector<NKikimrDataEvents::TLock> locks;
for (const auto& [_, shardInfo] : ShardsInfo) {
Expand Down Expand Up @@ -189,7 +231,8 @@ class TKqpTransactionManager : public IKqpTransactionManager {
bool IsVolatile() const override {
return !HasOlapTable()
&& !IsReadOnly()
&& !IsSingleShard();
&& !IsSingleShard()
&& !HasTopics();

// TODO: && !HasPersistentChannels;
// Note: currently persistent channels are never used
Expand Down Expand Up @@ -342,7 +385,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
shardInfo.State = EShardState::EXECUTING;
}

AFL_ENSURE(ReceivingShards.empty() || !IsSingleShard() || HasOlapTable());
AFL_ENSURE(ReceivingShards.empty() || HasTopics() || !IsSingleShard() || HasOlapTable());
}

TCommitInfo GetCommitInfo() override {
Expand Down Expand Up @@ -440,6 +483,8 @@ class TKqpTransactionManager : public IKqpTransactionManager {

THashSet<ui64> ShardsToWaitPrepare;

NTopic::TTopicOperations TopicOperations;

ui64 MinStep = 0;
ui64 MaxStep = 0;
ui64 Coordinator = 0;
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/kqp/common/kqp_tx_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class IKqpTransactionManager {

virtual void AddShard(ui64 shardId, bool isOlap, const TString& path) = 0;
virtual void AddAction(ui64 shardId, ui8 action) = 0;
virtual void AddTopic(ui64 topicId, const TString& path) = 0;
virtual void AddTopicsToShards() = 0;
virtual bool AddLock(ui64 shardId, const NKikimrDataEvents::TLock& lock) = 0;

virtual void BreakLock(ui64 shardId) = 0;
Expand All @@ -49,6 +51,13 @@ class IKqpTransactionManager {
virtual EShardState GetState(ui64 shardId) const = 0;
virtual void SetState(ui64 shardId, EShardState state) = 0;

virtual void SetTopicOperations(NTopic::TTopicOperations&& topicOperations) = 0;
virtual const NTopic::TTopicOperations& GetTopicOperations() const = 0;

virtual void BuildTopicTxs(NTopic::TTopicOperationTransactions& txs) = 0;

virtual bool HasTopics() const = 0;

virtual bool IsTxPrepared() const = 0;
virtual bool IsTxFinished() const = 0;

Expand Down
16 changes: 12 additions & 4 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1542,7 +1542,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

private:
bool IsReadOnlyTx() const {
if (Request.TopicOperations.HasOperations()) {
if (BufferActorId && TxManager->GetTopicOperations().HasOperations()) {
YQL_ENSURE(!Request.UseImmediateEffects);
return false;
}

if (!BufferActorId && Request.TopicOperations.HasOperations()) {
YQL_ENSURE(!Request.UseImmediateEffects);
return false;
}
Expand Down Expand Up @@ -2112,7 +2117,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

// Single-shard datashard transactions are always immediate
ImmediateTx = (datashardTxs.size() + evWriteTxs.size() + Request.TopicOperations.GetSize() + sourceScanPartitionsCount) <= 1
auto topicSize = (BufferActorId) ? TxManager->GetTopicOperations().GetSize() : Request.TopicOperations.GetSize();
ImmediateTx = (datashardTxs.size() + evWriteTxs.size() + topicSize + sourceScanPartitionsCount) <= 1
&& !UnknownAffectedShardCount
&& evWriteTxs.empty()
&& !HasOlapTable;
Expand Down Expand Up @@ -2392,6 +2398,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

YQL_ENSURE(!TxManager);
Request.TopicOperations.BuildTopicTxs(topicTxs);

const bool needRollback = Request.LocksOp == ELocksOp::Rollback;
Expand Down Expand Up @@ -2428,8 +2435,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// HTAP transactions always use generic readsets
!evWriteTxs.empty());

if (!locksMap.empty() || VolatileTx ||
Request.TopicOperations.HasReadOperations() || Request.TopicOperations.HasWriteOperations())
if (!locksMap.empty() || VolatileTx || Request.TopicOperations.HasReadOperations()
|| Request.TopicOperations.HasWriteOperations())
{
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback || VolatileTx);

Expand Down Expand Up @@ -2776,6 +2783,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
void ExecuteTopicTabletTransactions(TTopicTabletTxs& topicTxs) {
YQL_ENSURE(!TxManager);
TMaybe<ui64> writeId;

if (Request.TopicOperations.HasWriteId()) {
writeId = Request.TopicOperations.GetWriteId();
}
Expand Down
168 changes: 167 additions & 1 deletion ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <ydb/core/tx/data_events/shards_splitter.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/tx.h>
#include <ydb/core/persqueue/events/global.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/wilson_ids/wilson.h>
Expand Down Expand Up @@ -1356,6 +1357,7 @@ struct TEvBufferWriteResult : public TEventLocal<TEvBufferWriteResult, TKqpEvent

class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, public IKqpTableWriterCallbacks {
using TBase = TActorBootstrapped<TKqpBufferWriteActor>;
using TTopicTabletTxs = NTopic::TTopicOperationTransactions;

public:
enum class EState {
Expand Down Expand Up @@ -1384,6 +1386,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
State = EState::WRITING;
Alloc->Release();
Counters->BufferActorsCount->Inc();
TxManager->AddTopicsToShards();
}

void Bootstrap() {
Expand All @@ -1404,6 +1407,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
hFunc(TEvBufferWrite, Handle);

hFunc(TEvTxProxy::TEvProposeTransactionStatus, Handle);
hFunc(TEvPersQueue::TEvProposeTransactionResult, Handle);
hFunc(NKikimr::NEvents::TDataEvents::TEvWriteResult, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
default:
Expand Down Expand Up @@ -1590,6 +1594,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
Close();
Process();
SendToExternalShards(false);
SendToTopics();
}

void ImmediateCommit() {
Expand Down Expand Up @@ -1687,6 +1692,63 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
}
}

void SendToTopics() {
if (!TxManager->HasTopics()) {
return;
}

TTopicTabletTxs topicTxs;
TxManager->BuildTopicTxs(topicTxs);

TMaybe<ui64> writeId;
if (TxManager->GetTopicOperations().HasWriteId()) {
writeId = TxManager->GetTopicOperations().GetWriteId();
}

for (auto& [tabletId, t] : topicTxs) {
auto& transaction = t.tx;
transaction.SetOp(NKikimrPQ::TDataTransaction::Commit);

const auto prepareSettings = TxManager->GetPrepareTransactionInfo();
if (!prepareSettings.ArbiterColumnShard) {
for (const ui64 sendingShardId : prepareSettings.SendingShards) {
transaction.AddSendingShards(sendingShardId);
}
for (const ui64 receivingShardId : prepareSettings.ReceivingShards) {
transaction.AddReceivingShards(receivingShardId);
}
} else {
transaction.AddSendingShards(*prepareSettings.ArbiterColumnShard);
transaction.AddReceivingShards(*prepareSettings.ArbiterColumnShard);
}

auto ev = std::make_unique<TEvPersQueue::TEvProposeTransactionBuilder>();

if (t.hasWrite && writeId.Defined()) {
auto* w = transaction.MutableWriteId();
w->SetNodeId(SelfId().NodeId());
w->SetKeyId(*writeId);
}
transaction.SetImmediate(false);

ActorIdToProto(SelfId(), ev->Record.MutableSourceActor());
ev->Record.MutableData()->Swap(&transaction);
ev->Record.SetTxId(*TxId);

SendTime[tabletId] = TInstant::Now();
auto traceId = BufferWriteActor.GetTraceId();

CA_LOG_D("Preparing KQP transaction on topic tablet: " << tabletId << ", writeId: " << writeId);

Send(
MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(ev.release(), tabletId, /* subscribe */ true),
IEventHandle::FlagTrackDelivery,
0,
std::move(traceId));
}
}

void SendCommitToCoordinator() {
const auto commitInfo = TxManager->GetCommitInfo();

Expand Down Expand Up @@ -1810,6 +1872,69 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
}
}

void Handle(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) {
auto& event = ev->Get()->Record;
const ui64 tabletId = event.GetOrigin();

CA_LOG_D("Got ProposeTransactionResult" <<
", PQ tablet: " << tabletId <<
", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus()));

switch (event.GetStatus()) {
case NKikimrPQ::TEvProposeTransactionResult::PREPARED:
ProcessPreparedTopic(ev);
return;
case NKikimrPQ::TEvProposeTransactionResult::COMPLETE:
ProcessCompletedTopic(ev);
return;
case NKikimrPQ::TEvProposeTransactionResult::ABORTED:
CA_LOG_E("Got ABORTED ProposeTransactionResult for PQ."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << ".");
ReplyErrorAndDie(
TStringBuilder() << "Aborted proposal status for PQ. ",
NYql::NDqProto::StatusIds::ABORTED,
{});
return;
case NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST:
CA_LOG_E("Got BAD REQUEST ProposeTransactionResult for PQ."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << ".");
ReplyErrorAndDie(
TStringBuilder() << "Bad request proposal status for PQ. ",
NYql::NDqProto::StatusIds::BAD_REQUEST,
{});
return;
case NKikimrPQ::TEvProposeTransactionResult::OVERLOADED:
CA_LOG_E("Got OVERLOADED ProposeTransactionResult for PQ."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << ".");
ReplyErrorAndDie(
TStringBuilder() << "Overloaded proposal status for PQ. ",
NYql::NDqProto::StatusIds::OVERLOADED,
{});
return;
case NKikimrPQ::TEvProposeTransactionResult::CANCELLED:
CA_LOG_E("Got CANCELLED ProposeTransactionResult for PQ."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << ".");
ReplyErrorAndDie(
TStringBuilder() << "Cancelled proposal status for PQ. ",
NYql::NDqProto::StatusIds::CANCELLED,
{});
return;
default:
CA_LOG_E("Got undefined ProposeTransactionResult for PQ."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << ".");
ReplyErrorAndDie(
TStringBuilder() << "Undefined proposal status for PQ. ",
NYql::NDqProto::StatusIds::INTERNAL_ERROR,
{});
return;
}
}

void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
CA_LOG_W("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId);
ReplyErrorAndDie(TStringBuilder() << "Failed to deviler message.", NYql::NDqProto::StatusIds::UNAVAILABLE, {});
Expand Down Expand Up @@ -1837,7 +1962,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
Rollback();
State = EState::FINISHED;
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{});
} else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && !WriteInfos.empty()) {
} else if (TxManager->IsSingleShard() && !TxManager->HasOlapTable() && !WriteInfos.empty() && !TxManager->HasTopics()) {
TxManager->StartExecute();
ImmediateCommit();
} else {
Expand Down Expand Up @@ -2016,6 +2141,47 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
}
}

void ProcessPreparedTopic(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) {
if (State != EState::PREPARING) {
CA_LOG_D("Ignored topic prepared event.");
return;
}
OnMessageReceived(ev->Get()->Record.GetOrigin());
CA_LOG_D("Got propose prepared result TxId=" << ev->Get()->Record.GetTxId()
<< ", TabletId=" << ev->Get()->Record.GetOrigin()
<< ", Cookie=" << ev->Cookie);

const auto& record = ev->Get()->Record;
IKqpTransactionManager::TPrepareResult preparedInfo;
preparedInfo.ShardId = record.GetOrigin();
preparedInfo.MinStep = record.GetMinStep();
preparedInfo.MaxStep = record.GetMaxStep();

preparedInfo.Coordinator = 0;
if (record.DomainCoordinatorsSize()) {
auto domainCoordinators = TCoordinators(TVector<ui64>(record.GetDomainCoordinators().begin(),
record.GetDomainCoordinators().end()));
preparedInfo.Coordinator = domainCoordinators.Select(*TxId);
}

OnPrepared(std::move(preparedInfo), 0);
}

void ProcessCompletedTopic(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) {
NKikimrPQ::TEvProposeTransactionResult& event = ev->Get()->Record;

if (State != EState::COMMITTING) {
CA_LOG_D("Ignored completed event.");
return;
}
OnMessageReceived(event.GetOrigin());
CA_LOG_D("Got propose completed result" <<
", topic tablet: " << event.GetOrigin() <<
", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus()));

OnCommitted(event.GetOrigin(), 0);
}

void ProcessWritePreparedShard(NKikimr::NEvents::TDataEvents::TEvWriteResult::TPtr& ev) {
if (State != EState::PREPARING) {
CA_LOG_D("Ignored write prepared event.");
Expand Down
Loading

0 comments on commit 8efacf9

Please sign in to comment.