Skip to content

Commit

Permalink
checkpoint storage: use more query parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam committed Jan 23, 2025
1 parent 78fee80 commit 2117806
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 42 deletions.
143 changes: 103 additions & 40 deletions ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,21 +236,41 @@ TFuture<TStatus> UpdateCheckpoint(const TCheckpointContextPtr& context) {
auto query = Sprintf(R"(
--!syntax_v1
PRAGMA TablePathPrefix("%s");
$ts = cast(%lu as Timestamp);
DECLARE $graph_id AS String;
DECLARE $coordinator_generation AS Uint64;
DECLARE $seq_no AS Uint64;
DECLARE $status AS Uint8;
DECLARE $state_size AS Uint64;
DECLARE $ts AS Timestamp;
UPSERT INTO %s (graph_id, coordinator_generation, seq_no, status, state_size, modified_by) VALUES
("%s", %lu, %lu, %u, %lu, $ts);
($graph_id, $coordinator_generation, $seq_no, $status, $state_size, $ts);
)", generationContext->TablePathPrefix.c_str(),
TInstant::Now().MicroSeconds(),
CheckpointsMetadataTable,
generationContext->PrimaryKey.c_str(),
context->CheckpointId.CoordinatorGeneration,
context->CheckpointId.SeqNo,
(ui32)context->Status,
context->StateSizeBytes);
CheckpointsMetadataTable);

NYdb::TParamsBuilder params;
params
.AddParam("$graph_id")
.String(generationContext->PrimaryKey)
.Build()
.AddParam("$coordinator_generation")
.Uint64(context->CheckpointId.CoordinatorGeneration)
.Build()
.AddParam("$seq_no")
.Uint64(context->CheckpointId.SeqNo)
.Build()
.AddParam("$status")
.Uint8((ui8)context->Status)
.Build()
.AddParam("$state_size")
.Uint64(context->StateSizeBytes)
.Build()
.AddParam("$ts")
.Timestamp(TInstant::Now())
.Build();

auto ttxControl = TTxControl::Tx(*generationContext->Transaction).CommitTx();
return generationContext->Session.ExecuteDataQuery(query, ttxControl).Apply(
return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build()).Apply(
[] (const TFuture<TDataQueryResult>& future) {
TStatus status = future.GetValue();
return status;
Expand All @@ -264,15 +284,20 @@ TFuture<TDataQueryResult> SelectGraphDescId(const TCheckpointContextPtr& context
auto query = Sprintf(R"(
--!syntax_v1
PRAGMA TablePathPrefix("%s");
DECLARE $graph_desc_id AS String;
SELECT ref_count
FROM %s
WHERE id = "%s";
WHERE id = $graph_desc_id;
)", generationContext->TablePathPrefix.c_str(),
CheckpointsGraphsDescriptionTable,
graphDescContext->GraphDescId.c_str());
CheckpointsGraphsDescriptionTable);
NYdb::TParamsBuilder params;
params
.AddParam("$graph_desc_id")
.String(graphDescContext->GraphDescId)
.Build();

return generationContext->Session.ExecuteDataQuery(query, TTxControl::Tx(*generationContext->Transaction));
return generationContext->Session.ExecuteDataQuery(query, TTxControl::Tx(*generationContext->Transaction), params.Build());
}

bool GraphDescIdExists(const TFuture<TDataQueryResult>& result) {
Expand All @@ -292,6 +317,7 @@ TFuture<TStatus> GenerateGraphDescId(const TCheckpointContextPtr& context) {
if (!result.GetValue().IsSuccess()) {
return MakeFuture<TStatus>(result.GetValue());
}
// TODO racing!
if (!GraphDescIdExists(result)) {
return MakeFuture(TStatus(EStatus::SUCCESS, NYdb::NIssue::TIssues()));
} else {
Expand Down Expand Up @@ -443,19 +469,32 @@ TFuture<TDataQueryResult> SelectCheckpoint(const TCheckpointContextPtr& context)
auto query = Sprintf(R"(
--!syntax_v1
PRAGMA TablePathPrefix("%s");
DECLARE $graph_id AS String;
DECLARE $coordinator_generation AS Uint64;
DECLARE $seq_no AS Uint64;
SELECT status
FROM %s
WHERE graph_id = "%s" AND coordinator_generation = %lu AND seq_no = %lu;
WHERE graph_id = $graph_id AND coordinator_generation = $coordinator_generation AND seq_no = $seq_no;
)", generationContext->TablePathPrefix.c_str(),
CheckpointsMetadataTable,
generationContext->PrimaryKey.c_str(),
context->CheckpointId.CoordinatorGeneration,
context->CheckpointId.SeqNo);
CheckpointsMetadataTable);

NYdb::TParamsBuilder params;
params
.AddParam("$graph_id")
.String(generationContext->PrimaryKey)
.Build()
.AddParam("$coordinator_generation")
.Uint64(context->CheckpointId.CoordinatorGeneration)
.Build()
.AddParam("$seq_no")
.Uint64(context->CheckpointId.SeqNo)
.Build();

return generationContext->Session.ExecuteDataQuery(
query,
TTxControl::Tx(*generationContext->Transaction));
TTxControl::Tx(*generationContext->Transaction),
params.Build());
}

TFuture<TStatus> CheckCheckpoint(
Expand Down Expand Up @@ -910,23 +949,29 @@ TFuture<TIssues> TCheckpointStorage::DeleteGraph(const TString& graphId) {
auto query = Sprintf(R"(
--!syntax_v1
PRAGMA TablePathPrefix("%s");
DECLARE $graph_id AS String;
DELETE
FROM %s
WHERE graph_id = "%s";
WHERE graph_id = $graph_id;
DELETE
FROM %s
WHERE graph_id = "%s";
WHERE graph_id = $graph_id;
)", prefix.c_str(),
CoordinatorsSyncTable,
graphId.c_str(),
CheckpointsMetadataTable,
graphId.c_str());
CheckpointsMetadataTable);

NYdb::TParamsBuilder params;
params
.AddParam("$graph_id")
.String(graphId)
.Build();

auto future = session.ExecuteDataQuery(
query,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx());
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
params.Build());

return future.Apply(
[] (const TFuture<TDataQueryResult>& future) {
Expand All @@ -943,30 +988,48 @@ TFuture<TIssues> TCheckpointStorage::MarkCheckpointsGC(
const TCheckpointId& checkpointUpperBound)
{
auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound] (TSession session) {
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, thisPtr = TIntrusivePtr(this)] (TSession session) {
// TODO: use prepared queries
auto query = Sprintf(R"(
--!syntax_v1
PRAGMA TablePathPrefix("%s");
$ts = cast(%lu as Timestamp);
DECLARE $ts AS Timestamp;
DECLARE $status AS Uint8;
DECLARE $graph_id AS String;
DECLARE $coordinator_generation AS Uint64;
DECLARE $seq_no AS Uint64;
UPDATE %s
SET status = %u, modified_by = $ts
WHERE graph_id = "%s" AND
(coordinator_generation < %lu OR
(coordinator_generation = %lu AND seq_no < %lu));
SET status = $status, modified_by = $ts
WHERE graph_id = $graph_id AND
(coordinator_generation < $coordinator_generation OR
(coordinator_generation = $coordinator_generation AND seq_no < $seq_no));
)", prefix.c_str(),
TInstant::Now().MicroSeconds(),
CheckpointsMetadataTable,
(ui32)ECheckpointStatus::GC,
graphId.c_str(),
checkpointUpperBound.CoordinatorGeneration,
checkpointUpperBound.CoordinatorGeneration,
checkpointUpperBound.SeqNo);
CheckpointsMetadataTable);

NYdb::TParamsBuilder params;
params
.AddParam("$graph_id")
.String(graphId)
.Build()
.AddParam("$coordinator_generation")
.Uint64(checkpointUpperBound.CoordinatorGeneration)
.Build()
.AddParam("$seq_no")
.Uint64(checkpointUpperBound.SeqNo)
.Build()
.AddParam("$status")
.Uint8((ui8)ECheckpointStatus::GC)
.Build()
.AddParam("$ts")
.Timestamp(TInstant::Now())
.Build();

auto future = session.ExecuteDataQuery(
query,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx());
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
params.Build(),
thisPtr->DefaultExecDataQuerySettings());

return future.Apply(
[] (const TFuture<TDataQueryResult>& future) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/fq/libs/checkpoint_storage/ydb_state_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,8 @@ TFuture<TIssues> TStateStorage::DeleteGraph(const TString& graphId) {
DELETE
FROM %s
WHERE graph_id = "%s";
)", prefix.c_str(), StatesTable, graphId.c_str());
WHERE graph_id = $graph_id;
)", prefix.c_str(), StatesTable);

auto future = session.ExecuteDataQuery(
query,
Expand Down

0 comments on commit 2117806

Please sign in to comment.