From 211780637524a468e77f721126cf2556a07d2f26 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 23 Jan 2025 14:08:13 +0300 Subject: [PATCH 1/2] checkpoint storage: use more query parameters --- .../ydb_checkpoint_storage.cpp | 143 +++++++++++++----- .../checkpoint_storage/ydb_state_storage.cpp | 4 +- 2 files changed, 105 insertions(+), 42 deletions(-) diff --git a/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp b/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp index 8357d6f8655f..1b4364f1347f 100644 --- a/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp +++ b/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp @@ -236,21 +236,41 @@ TFuture UpdateCheckpoint(const TCheckpointContextPtr& context) { auto query = Sprintf(R"( --!syntax_v1 PRAGMA TablePathPrefix("%s"); - $ts = cast(%lu as Timestamp); + DECLARE $graph_id AS String; + DECLARE $coordinator_generation AS Uint64; + DECLARE $seq_no AS Uint64; + DECLARE $status AS Uint8; + DECLARE $state_size AS Uint64; + DECLARE $ts AS Timestamp; UPSERT INTO %s (graph_id, coordinator_generation, seq_no, status, state_size, modified_by) VALUES - ("%s", %lu, %lu, %u, %lu, $ts); + ($graph_id, $coordinator_generation, $seq_no, $status, $state_size, $ts); )", generationContext->TablePathPrefix.c_str(), - TInstant::Now().MicroSeconds(), - CheckpointsMetadataTable, - generationContext->PrimaryKey.c_str(), - context->CheckpointId.CoordinatorGeneration, - context->CheckpointId.SeqNo, - (ui32)context->Status, - context->StateSizeBytes); + CheckpointsMetadataTable); + + NYdb::TParamsBuilder params; + params + .AddParam("$graph_id") + .String(generationContext->PrimaryKey) + .Build() + .AddParam("$coordinator_generation") + .Uint64(context->CheckpointId.CoordinatorGeneration) + .Build() + .AddParam("$seq_no") + .Uint64(context->CheckpointId.SeqNo) + .Build() + .AddParam("$status") + .Uint8((ui8)context->Status) + .Build() + .AddParam("$state_size") + .Uint64(context->StateSizeBytes) + .Build() + .AddParam("$ts") + .Timestamp(TInstant::Now()) + .Build(); auto ttxControl = TTxControl::Tx(*generationContext->Transaction).CommitTx(); - return generationContext->Session.ExecuteDataQuery(query, ttxControl).Apply( + return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build()).Apply( [] (const TFuture& future) { TStatus status = future.GetValue(); return status; @@ -264,15 +284,20 @@ TFuture SelectGraphDescId(const TCheckpointContextPtr& context auto query = Sprintf(R"( --!syntax_v1 PRAGMA TablePathPrefix("%s"); + DECLARE $graph_desc_id AS String; SELECT ref_count FROM %s - WHERE id = "%s"; + WHERE id = $graph_desc_id; )", generationContext->TablePathPrefix.c_str(), - CheckpointsGraphsDescriptionTable, - graphDescContext->GraphDescId.c_str()); + CheckpointsGraphsDescriptionTable); + NYdb::TParamsBuilder params; + params + .AddParam("$graph_desc_id") + .String(graphDescContext->GraphDescId) + .Build(); - return generationContext->Session.ExecuteDataQuery(query, TTxControl::Tx(*generationContext->Transaction)); + return generationContext->Session.ExecuteDataQuery(query, TTxControl::Tx(*generationContext->Transaction), params.Build()); } bool GraphDescIdExists(const TFuture& result) { @@ -292,6 +317,7 @@ TFuture GenerateGraphDescId(const TCheckpointContextPtr& context) { if (!result.GetValue().IsSuccess()) { return MakeFuture(result.GetValue()); } + // TODO racing! if (!GraphDescIdExists(result)) { return MakeFuture(TStatus(EStatus::SUCCESS, NYdb::NIssue::TIssues())); } else { @@ -443,19 +469,32 @@ TFuture SelectCheckpoint(const TCheckpointContextPtr& context) auto query = Sprintf(R"( --!syntax_v1 PRAGMA TablePathPrefix("%s"); + DECLARE $graph_id AS String; + DECLARE $coordinator_generation AS Uint64; + DECLARE $seq_no AS Uint64; SELECT status FROM %s - WHERE graph_id = "%s" AND coordinator_generation = %lu AND seq_no = %lu; + WHERE graph_id = $graph_id AND coordinator_generation = $coordinator_generation AND seq_no = $seq_no; )", generationContext->TablePathPrefix.c_str(), - CheckpointsMetadataTable, - generationContext->PrimaryKey.c_str(), - context->CheckpointId.CoordinatorGeneration, - context->CheckpointId.SeqNo); + CheckpointsMetadataTable); + + NYdb::TParamsBuilder params; + params + .AddParam("$graph_id") + .String(generationContext->PrimaryKey) + .Build() + .AddParam("$coordinator_generation") + .Uint64(context->CheckpointId.CoordinatorGeneration) + .Build() + .AddParam("$seq_no") + .Uint64(context->CheckpointId.SeqNo) + .Build(); return generationContext->Session.ExecuteDataQuery( query, - TTxControl::Tx(*generationContext->Transaction)); + TTxControl::Tx(*generationContext->Transaction), + params.Build()); } TFuture CheckCheckpoint( @@ -910,23 +949,29 @@ TFuture TCheckpointStorage::DeleteGraph(const TString& graphId) { auto query = Sprintf(R"( --!syntax_v1 PRAGMA TablePathPrefix("%s"); + DECLARE $graph_id AS String; DELETE FROM %s - WHERE graph_id = "%s"; + WHERE graph_id = $graph_id; DELETE FROM %s - WHERE graph_id = "%s"; + WHERE graph_id = $graph_id; )", prefix.c_str(), CoordinatorsSyncTable, - graphId.c_str(), - CheckpointsMetadataTable, - graphId.c_str()); + CheckpointsMetadataTable); + + NYdb::TParamsBuilder params; + params + .AddParam("$graph_id") + .String(graphId) + .Build(); auto future = session.ExecuteDataQuery( query, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()); + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + params.Build()); return future.Apply( [] (const TFuture& future) { @@ -943,30 +988,48 @@ TFuture TCheckpointStorage::MarkCheckpointsGC( const TCheckpointId& checkpointUpperBound) { auto future = YdbConnection->TableClient.RetryOperation( - [prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound] (TSession session) { + [prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, thisPtr = TIntrusivePtr(this)] (TSession session) { // TODO: use prepared queries auto query = Sprintf(R"( --!syntax_v1 PRAGMA TablePathPrefix("%s"); - $ts = cast(%lu as Timestamp); + DECLARE $ts AS Timestamp; + DECLARE $status AS Uint8; + DECLARE $graph_id AS String; + DECLARE $coordinator_generation AS Uint64; + DECLARE $seq_no AS Uint64; UPDATE %s - SET status = %u, modified_by = $ts - WHERE graph_id = "%s" AND - (coordinator_generation < %lu OR - (coordinator_generation = %lu AND seq_no < %lu)); + SET status = $status, modified_by = $ts + WHERE graph_id = $graph_id AND + (coordinator_generation < $coordinator_generation OR + (coordinator_generation = $coordinator_generation AND seq_no < $seq_no)); )", prefix.c_str(), - TInstant::Now().MicroSeconds(), - CheckpointsMetadataTable, - (ui32)ECheckpointStatus::GC, - graphId.c_str(), - checkpointUpperBound.CoordinatorGeneration, - checkpointUpperBound.CoordinatorGeneration, - checkpointUpperBound.SeqNo); + CheckpointsMetadataTable); + + NYdb::TParamsBuilder params; + params + .AddParam("$graph_id") + .String(graphId) + .Build() + .AddParam("$coordinator_generation") + .Uint64(checkpointUpperBound.CoordinatorGeneration) + .Build() + .AddParam("$seq_no") + .Uint64(checkpointUpperBound.SeqNo) + .Build() + .AddParam("$status") + .Uint8((ui8)ECheckpointStatus::GC) + .Build() + .AddParam("$ts") + .Timestamp(TInstant::Now()) + .Build(); auto future = session.ExecuteDataQuery( query, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()); + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + params.Build(), + thisPtr->DefaultExecDataQuerySettings()); return future.Apply( [] (const TFuture& future) { diff --git a/ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp b/ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp index a1b86fdb8d8e..03be0aa4ac8b 100644 --- a/ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp +++ b/ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp @@ -726,8 +726,8 @@ TFuture TStateStorage::DeleteGraph(const TString& graphId) { DELETE FROM %s - WHERE graph_id = "%s"; - )", prefix.c_str(), StatesTable, graphId.c_str()); + WHERE graph_id = $graph_id; + )", prefix.c_str(), StatesTable); auto future = session.ExecuteDataQuery( query, From 6dc86b647abe217f33a42ac8cb6dea335e943ac0 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 23 Jan 2025 15:30:34 +0300 Subject: [PATCH 2/2] checkpoint storage: consistently pass query parameters --- .../ydb_checkpoint_storage.cpp | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp b/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp index 1b4364f1347f..7d5c6bfd0ff8 100644 --- a/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp +++ b/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp @@ -59,15 +59,18 @@ struct TCheckpointContext : public TThrRefBase { TGenerationContextPtr GenerationContext; TCheckpointGraphDescriptionContextPtr CheckpointGraphDescriptionContext; IEntityIdGenerator::TPtr EntityIdGenerator; + TExecDataQuerySettings Settings; TCheckpointContext(const TCheckpointId& id, ECheckpointStatus status, ECheckpointStatus expected, - ui64 stateSizeBytes) + ui64 stateSizeBytes, + TExecDataQuerySettings settings) : CheckpointId(id) , Status(status) , ExpectedStatus(expected) , StateSizeBytes(stateSizeBytes) + , Settings(settings) { } }; @@ -220,7 +223,7 @@ TFuture CreateCheckpoint(const TCheckpointContextPtr& context) { } auto ttxControl = TTxControl::Tx(*generationContext->Transaction).CommitTx(); - return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build()).Apply( + return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build(), context->Settings).Apply( [] (const TFuture& future) { TStatus status = future.GetValue(); return status; @@ -270,7 +273,7 @@ TFuture UpdateCheckpoint(const TCheckpointContextPtr& context) { .Build(); auto ttxControl = TTxControl::Tx(*generationContext->Transaction).CommitTx(); - return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build()).Apply( + return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build(), context->Settings).Apply( [] (const TFuture& future) { TStatus status = future.GetValue(); return status; @@ -297,7 +300,7 @@ TFuture SelectGraphDescId(const TCheckpointContextPtr& context .String(graphDescContext->GraphDescId) .Build(); - return generationContext->Session.ExecuteDataQuery(query, TTxControl::Tx(*generationContext->Transaction), params.Build()); + return generationContext->Session.ExecuteDataQuery(query, TTxControl::Tx(*generationContext->Transaction), params.Build(), context->Settings); } bool GraphDescIdExists(const TFuture& result) { @@ -494,7 +497,8 @@ TFuture SelectCheckpoint(const TCheckpointContextPtr& context) return generationContext->Session.ExecuteDataQuery( query, TTxControl::Tx(*generationContext->Transaction), - params.Build()); + params.Build(), + context->Settings); } TFuture CheckCheckpoint( @@ -807,7 +811,7 @@ TFuture TCheckpointStorage::CreateC ECheckpointStatus status) { Y_ABORT_UNLESS(graphDescId); - auto checkpointContext = MakeIntrusive(checkpointId, status, ECheckpointStatus::Pending, 0ul); + auto checkpointContext = MakeIntrusive(checkpointId, status, ECheckpointStatus::Pending, 0ul, DefaultExecDataQuerySettings()); checkpointContext->CheckpointGraphDescriptionContext = MakeIntrusive(graphDescId); return CreateCheckpointImpl(coordinator, checkpointContext); } @@ -818,7 +822,7 @@ TFuture TCheckpointStorage::CreateC const NProto::TCheckpointGraphDescription& graphDesc, ECheckpointStatus status) { - auto checkpointContext = MakeIntrusive(checkpointId, status, ECheckpointStatus::Pending, 0ul); + auto checkpointContext = MakeIntrusive(checkpointId, status, ECheckpointStatus::Pending, 0ul, DefaultExecDataQuerySettings()); checkpointContext->CheckpointGraphDescriptionContext = MakeIntrusive(graphDesc); checkpointContext->EntityIdGenerator = EntityIdGenerator; return CreateCheckpointImpl(coordinator, checkpointContext); @@ -859,7 +863,7 @@ TFuture TCheckpointStorage::UpdateCheckpointStatus( ECheckpointStatus prevStatus, ui64 stateSizeBytes) { - auto checkpointContext = MakeIntrusive(checkpointId, newStatus, prevStatus, stateSizeBytes); + auto checkpointContext = MakeIntrusive(checkpointId, newStatus, prevStatus, stateSizeBytes, DefaultExecDataQuerySettings()); auto future = YdbConnection->TableClient.RetryOperation( [prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) { auto generationContext = MakeIntrusive( @@ -885,7 +889,7 @@ TFuture TCheckpointStorage::AbortCheckpoint( const TCoordinatorId& coordinator, const TCheckpointId& checkpointId) { - auto checkpointContext = MakeIntrusive(checkpointId, ECheckpointStatus::Aborted, ECheckpointStatus::Pending, 0ul); + auto checkpointContext = MakeIntrusive(checkpointId, ECheckpointStatus::Aborted, ECheckpointStatus::Pending, 0ul, DefaultExecDataQuerySettings()); auto future = YdbConnection->TableClient.RetryOperation( [prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) { auto generationContext = MakeIntrusive( @@ -944,7 +948,7 @@ TFuture TCheckpointStorage::GetCheckp TFuture TCheckpointStorage::DeleteGraph(const TString& graphId) { auto future = YdbConnection->TableClient.RetryOperation( - [prefix = YdbConnection->TablePathPrefix, graphId] (TSession session) { + [prefix = YdbConnection->TablePathPrefix, graphId, settings = DefaultExecDataQuerySettings()] (TSession session) { // TODO: use prepared queries auto query = Sprintf(R"( --!syntax_v1 @@ -971,7 +975,8 @@ TFuture TCheckpointStorage::DeleteGraph(const TString& graphId) { auto future = session.ExecuteDataQuery( query, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - params.Build()); + params.Build(), + settings); return future.Apply( [] (const TFuture& future) { @@ -1046,7 +1051,7 @@ TFuture TCheckpointStorage::DeleteMarkedCheckpoints( const TCheckpointId& checkpointUpperBound) { auto future = YdbConnection->TableClient.RetryOperation( - [prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound] (TSession session) { + [prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, settings = DefaultExecDataQuerySettings()] (TSession session) { // TODO: use prepared queries using namespace fmt::literals; const TString query = fmt::format(R"sql( @@ -1105,7 +1110,7 @@ TFuture TCheckpointStorage::DeleteMarkedCheckpoints( auto future = session.ExecuteDataQuery( query, - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params.Build()); + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params.Build(), settings); return future.Apply( [] (const TFuture& future) {