Skip to content

Commit

Permalink
checkpoint storage: consistently pass query parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam committed Jan 23, 2025
1 parent 2117806 commit 6dc86b6
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,18 @@ struct TCheckpointContext : public TThrRefBase {
TGenerationContextPtr GenerationContext;
TCheckpointGraphDescriptionContextPtr CheckpointGraphDescriptionContext;
IEntityIdGenerator::TPtr EntityIdGenerator;
TExecDataQuerySettings Settings;

TCheckpointContext(const TCheckpointId& id,
ECheckpointStatus status,
ECheckpointStatus expected,
ui64 stateSizeBytes)
ui64 stateSizeBytes,
TExecDataQuerySettings settings)
: CheckpointId(id)
, Status(status)
, ExpectedStatus(expected)
, StateSizeBytes(stateSizeBytes)
, Settings(settings)
{
}
};
Expand Down Expand Up @@ -220,7 +223,7 @@ TFuture<TStatus> CreateCheckpoint(const TCheckpointContextPtr& context) {
}

auto ttxControl = TTxControl::Tx(*generationContext->Transaction).CommitTx();
return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build()).Apply(
return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build(), context->Settings).Apply(
[] (const TFuture<TDataQueryResult>& future) {
TStatus status = future.GetValue();
return status;
Expand Down Expand Up @@ -270,7 +273,7 @@ TFuture<TStatus> UpdateCheckpoint(const TCheckpointContextPtr& context) {
.Build();

auto ttxControl = TTxControl::Tx(*generationContext->Transaction).CommitTx();
return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build()).Apply(
return generationContext->Session.ExecuteDataQuery(query, ttxControl, params.Build(), context->Settings).Apply(
[] (const TFuture<TDataQueryResult>& future) {
TStatus status = future.GetValue();
return status;
Expand All @@ -297,7 +300,7 @@ TFuture<TDataQueryResult> SelectGraphDescId(const TCheckpointContextPtr& context
.String(graphDescContext->GraphDescId)
.Build();

return generationContext->Session.ExecuteDataQuery(query, TTxControl::Tx(*generationContext->Transaction), params.Build());
return generationContext->Session.ExecuteDataQuery(query, TTxControl::Tx(*generationContext->Transaction), params.Build(), context->Settings);
}

bool GraphDescIdExists(const TFuture<TDataQueryResult>& result) {
Expand Down Expand Up @@ -494,7 +497,8 @@ TFuture<TDataQueryResult> SelectCheckpoint(const TCheckpointContextPtr& context)
return generationContext->Session.ExecuteDataQuery(
query,
TTxControl::Tx(*generationContext->Transaction),
params.Build());
params.Build(),
context->Settings);
}

TFuture<TStatus> CheckCheckpoint(
Expand Down Expand Up @@ -807,7 +811,7 @@ TFuture<ICheckpointStorage::TCreateCheckpointResult> TCheckpointStorage::CreateC
ECheckpointStatus status)
{
Y_ABORT_UNLESS(graphDescId);
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, status, ECheckpointStatus::Pending, 0ul);
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, status, ECheckpointStatus::Pending, 0ul, DefaultExecDataQuerySettings());
checkpointContext->CheckpointGraphDescriptionContext = MakeIntrusive<TCheckpointGraphDescriptionContext>(graphDescId);
return CreateCheckpointImpl(coordinator, checkpointContext);
}
Expand All @@ -818,7 +822,7 @@ TFuture<ICheckpointStorage::TCreateCheckpointResult> TCheckpointStorage::CreateC
const NProto::TCheckpointGraphDescription& graphDesc,
ECheckpointStatus status)
{
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, status, ECheckpointStatus::Pending, 0ul);
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, status, ECheckpointStatus::Pending, 0ul, DefaultExecDataQuerySettings());
checkpointContext->CheckpointGraphDescriptionContext = MakeIntrusive<TCheckpointGraphDescriptionContext>(graphDesc);
checkpointContext->EntityIdGenerator = EntityIdGenerator;
return CreateCheckpointImpl(coordinator, checkpointContext);
Expand Down Expand Up @@ -859,7 +863,7 @@ TFuture<TIssues> TCheckpointStorage::UpdateCheckpointStatus(
ECheckpointStatus prevStatus,
ui64 stateSizeBytes)
{
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, newStatus, prevStatus, stateSizeBytes);
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, newStatus, prevStatus, stateSizeBytes, DefaultExecDataQuerySettings());
auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) {
auto generationContext = MakeIntrusive<TGenerationContext>(
Expand All @@ -885,7 +889,7 @@ TFuture<TIssues> TCheckpointStorage::AbortCheckpoint(
const TCoordinatorId& coordinator,
const TCheckpointId& checkpointId)
{
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, ECheckpointStatus::Aborted, ECheckpointStatus::Pending, 0ul);
auto checkpointContext = MakeIntrusive<TCheckpointContext>(checkpointId, ECheckpointStatus::Aborted, ECheckpointStatus::Pending, 0ul, DefaultExecDataQuerySettings());
auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, coordinator, checkpointContext] (TSession session) {
auto generationContext = MakeIntrusive<TGenerationContext>(
Expand Down Expand Up @@ -944,7 +948,7 @@ TFuture<ICheckpointStorage::TGetCheckpointsResult> TCheckpointStorage::GetCheckp

TFuture<TIssues> TCheckpointStorage::DeleteGraph(const TString& graphId) {
auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId] (TSession session) {
[prefix = YdbConnection->TablePathPrefix, graphId, settings = DefaultExecDataQuerySettings()] (TSession session) {
// TODO: use prepared queries
auto query = Sprintf(R"(
--!syntax_v1
Expand All @@ -971,7 +975,8 @@ TFuture<TIssues> TCheckpointStorage::DeleteGraph(const TString& graphId) {
auto future = session.ExecuteDataQuery(
query,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
params.Build());
params.Build(),
settings);

return future.Apply(
[] (const TFuture<TDataQueryResult>& future) {
Expand Down Expand Up @@ -1046,7 +1051,7 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(
const TCheckpointId& checkpointUpperBound)
{
auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound] (TSession session) {
[prefix = YdbConnection->TablePathPrefix, graphId, checkpointUpperBound, settings = DefaultExecDataQuerySettings()] (TSession session) {
// TODO: use prepared queries
using namespace fmt::literals;
const TString query = fmt::format(R"sql(
Expand Down Expand Up @@ -1105,7 +1110,7 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(

auto future = session.ExecuteDataQuery(
query,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params.Build());
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params.Build(), settings);

return future.Apply(
[] (const TFuture<TDataQueryResult>& future) {
Expand Down

0 comments on commit 6dc86b6

Please sign in to comment.