Skip to content

Commit

Permalink
Configure minimum coordinator resolution with icb (#13721)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Jan 23, 2025
1 parent 4621fcc commit 48c14e3
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 39 deletions.
5 changes: 5 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,11 @@ message TImmediateControlsConfig {
MinValue: 0,
MaxValue: 86400000,
DefaultValue: 50 }];
optional uint64 MinPlanResolutionMs = 5 [(ControlOptions) = {
Description: "Minimum plan resolution override in milliseconds, which includes volatile planning (0 when not overridden)",
MinValue: 0,
MaxValue: 1000,
DefaultValue: 0 }];
}

message TSchemeShardControls {
Expand Down
11 changes: 5 additions & 6 deletions ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,11 @@ namespace NKikimr::NFlatTxCoordinator {

auto* msg = ev->Get();
for (ui64 step : msg->Record.GetPlanSteps()) {
if (!usesVolatilePlanning) {
// Note: we want to align requested steps to plan resolution
// when volatile planning is not used. Otherwise extra steps
// are cheap and reduce latency.
step = AlignPlanStep(step);
}
// Note: we want to align requested steps to plan resolution when
// volatile planning is not used. Otherwise extra steps are cheap
// and reduce latency, but may still be aligned to the minimum
// resolution.
step = AlignPlanStep(step, usesVolatilePlanning);
// Note: this is not a sibling step, but it behaves similar enough
// so we reuse the same queue here.
if (step > VolatileState.LastPlanned && PendingSiblingSteps.insert(step).second) {
Expand Down
58 changes: 26 additions & 32 deletions ydb/core/tx/coordinator/coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ TTxCoordinator::TTxCoordinator(TTabletStorageInfo *info, const TActorId &tablet)
, MinLeaderLeaseDurationUs(250000, 1000, 5000000)
, VolatilePlanLeaseMs(250, 0, 10000)
, PlanAheadTimeShiftMs(50, 0, 86400000)
, MinPlanResolutionMs(0, 0, 1000)
#ifdef COORDINATOR_LOG_TO_FILE
, DebugName(Sprintf("/tmp/coordinator_db_log_%" PRIu64 ".%" PRIi32 ".%" PRIu64 ".gz", TabletID(), getpid(), tablet.LocalId()))
, DebugLogFile(DebugName)
Expand Down Expand Up @@ -141,18 +142,22 @@ void TTxCoordinator::PlanTx(TTransactionProposal &&proposal, const TActorContext
}

// Volatile transactions are not persistent and planned as soon as possible
bool volatileTx = proposal.HasVolatileFlag();

// Rapid transactions are buffered and may be planned without alignment
bool rapidTx = (proposal.MinStep <= VolatileState.LastPlanned) && !volatileTx;
ui64 volatileLeaseMs = VolatilePlanLeaseMs;
bool volatilePlan = volatileLeaseMs > 0 && proposal.HasVolatileFlag();

// The minimum step we can plan is the next step
ui64 planStep = VolatileState.LastPlanned + 1;
const ui64 minPlanStep = VolatileState.LastPlanned + 1;

// We would prefer planning to the next aligned plan step
ui64 planStep = AlignPlanStep(minPlanStep, volatilePlan);

// Prefer planning non-rapid transactions to a resolution aligned step
if (!rapidTx && !volatileTx) {
planStep = Max(planStep, Min(proposal.MaxStep - 1,
(proposal.MinStep + Config.Resolution - 1) / Config.Resolution * Config.Resolution));
if (planStep < proposal.MinStep) {
// We need a step that is further into the future
planStep = AlignPlanStep(proposal.MinStep, volatilePlan);
}
if (planStep >= proposal.MaxStep) {
// The preferred step is too late, try using an earlier time
planStep = Max(minPlanStep, proposal.MaxStep - 1);
}

if (planStep >= proposal.MaxStep) {
Expand All @@ -161,28 +166,12 @@ void TTxCoordinator::PlanTx(TTransactionProposal &&proposal, const TActorContext
TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusOutdated, proposal.TxId, 0, ctx, TabletID());
}

if ((planStep % Config.Resolution) == 0) {
// Step is already aligned
rapidTx = false;
}

MonCounters.PlanTxAccepted->Inc();
SendTransactionStatus(proposal.Proxy, TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusAccepted,
proposal.TxId, planStep, ctx, TabletID());

if (rapidTx) {
TQueueType::TSlot &rapidSlot = VolatileState.Queue.RapidSlot;
rapidSlot.push_back(std::move(proposal));

if (rapidSlot.size() < Config.RapidSlotFlushSize) {
// Wait for the next aligned step until enough rapid transactions
SchedulePlanTickAligned(planStep);
return;
}
} else {
TQueueType::TSlot &planSlot = VolatileState.Queue.LowSlot(planStep);
planSlot.push_back(std::move(proposal));
}
TQueueType::TSlot &planSlot = VolatileState.Queue.LowSlot(planStep);
planSlot.push_back(std::move(proposal));

// Wait for the specified step even when not aligned
SchedulePlanTickExact(planStep);
Expand Down Expand Up @@ -225,7 +214,8 @@ bool TTxCoordinator::AllowReducedPlanResolution() const {
}

void TTxCoordinator::SchedulePlanTick() {
const ui64 resolution = Config.Resolution;
const ui64 minResolution = MinPlanResolutionMs;
const ui64 resolution = Max(minResolution, Config.Resolution);
const ui64 timeShiftMs = PlanAheadTimeShiftMs;
const TInstant now = TAppData::TimeProvider->Now() + TDuration::MilliSeconds(timeShiftMs);
const TMonotonic monotonic = AppData()->MonotonicTimeProvider->Now();
Expand All @@ -238,7 +228,8 @@ void TTxCoordinator::SchedulePlanTick() {

if (AllowReducedPlanResolution()) {
// We want to tick with reduced resolution when all siblings are confirmed
ui64 reduced = (VolatileState.LastPlanned + 1 + Config.ReducedResolution - 1) / Config.ReducedResolution * Config.ReducedResolution;
ui64 reducedResolution = Max(minResolution, Config.ReducedResolution);
ui64 reduced = (VolatileState.LastPlanned + 1 + reducedResolution - 1) / reducedResolution * reducedResolution;
// Include transactions waiting in the queue, so we don't sleep for seconds when the next tx is in 10ms
ui64 minWaiting = (VolatileState.Queue.MinLowSlot() + resolution - 1) / resolution * resolution;
if (minWaiting && minWaiting < reduced) {
Expand Down Expand Up @@ -320,8 +311,9 @@ void TTxCoordinator::SchedulePlanTickAligned(ui64 next) {
SchedulePlanTickExact(AlignPlanStep(next));
}

ui64 TTxCoordinator::AlignPlanStep(ui64 step) {
const ui64 resolution = Config.Resolution;
ui64 TTxCoordinator::AlignPlanStep(ui64 step, bool volatilePlan) {
const ui64 minResolution = MinPlanResolutionMs;
const ui64 resolution = Max(minResolution, volatilePlan ? ui64(1) : Config.Resolution);
return ((step + resolution - 1) / resolution * resolution);
}

Expand All @@ -347,7 +339,8 @@ void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorConte
VolatileState.Queue.Unsorted.reset();
}

const ui64 resolution = Config.Resolution;
const ui64 minResolution = MinPlanResolutionMs;
const ui64 resolution = Max(minResolution, Config.Resolution);
const ui64 timeShiftMs = PlanAheadTimeShiftMs;
const TInstant now = TAppData::TimeProvider->Now() + TDuration::MilliSeconds(timeShiftMs);

Expand Down Expand Up @@ -508,6 +501,7 @@ void TTxCoordinator::IcbRegister() {
AppData()->Icb->RegisterSharedControl(MinLeaderLeaseDurationUs, "CoordinatorControls.MinLeaderLeaseDurationUs");
AppData()->Icb->RegisterSharedControl(VolatilePlanLeaseMs, "CoordinatorControls.VolatilePlanLeaseMs");
AppData()->Icb->RegisterSharedControl(PlanAheadTimeShiftMs, "CoordinatorControls.PlanAheadTimeShiftMs");
AppData()->Icb->RegisterSharedControl(MinPlanResolutionMs, "CoordinatorControls.MinPlanResolutionMs");
IcbRegistered = true;
}
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/coordinator/coordinator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
TControlWrapper MinLeaderLeaseDurationUs;
TControlWrapper VolatilePlanLeaseMs;
TControlWrapper PlanAheadTimeShiftMs;
TControlWrapper MinPlanResolutionMs;

TVolatileState VolatileState;
TConfig Config;
Expand Down Expand Up @@ -702,7 +703,7 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
void SchedulePlanTick();
void SchedulePlanTickExact(ui64 next);
void SchedulePlanTickAligned(ui64 next);
ui64 AlignPlanStep(ui64 step);
ui64 AlignPlanStep(ui64 step, bool volatilePlan = false);

void TryInitMonCounters(const TActorContext &ctx);
bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) override;
Expand Down

0 comments on commit 48c14e3

Please sign in to comment.