From 8d394bffe1da5da02e1a712aa61607445d598c93 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 24 Feb 2025 13:55:16 -0500 Subject: [PATCH] mma: [dnm] add cluster state message handling TODO(kvoli)... Informs: #103320 Release note: None --- pkg/kv/kvserver/allocator/mma/BUILD.bazel | 7 + .../kvserver/allocator/mma/allocator_state.go | 5 +- .../kvserver/allocator/mma/cluster_state.go | 853 ++++++++++++++---- .../allocator/mma/cluster_state_test.go | 450 +++++++++ .../allocator/mma/constraint_matcher_test.go | 46 +- .../kvserver/allocator/mma/constraint_test.go | 2 +- pkg/kv/kvserver/allocator/mma/load.go | 10 + pkg/kv/kvserver/allocator/mma/load_test.go | 2 +- pkg/kv/kvserver/allocator/mma/messages.go | 4 - .../mma/testdata/cluster_state/simple | 97 ++ .../allocator/mma/testdata/pending_changes | 338 +++++++ 11 files changed, 1619 insertions(+), 195 deletions(-) create mode 100644 pkg/kv/kvserver/allocator/mma/cluster_state_test.go create mode 100644 pkg/kv/kvserver/allocator/mma/testdata/cluster_state/simple create mode 100644 pkg/kv/kvserver/allocator/mma/testdata/pending_changes diff --git a/pkg/kv/kvserver/allocator/mma/BUILD.bazel b/pkg/kv/kvserver/allocator/mma/BUILD.bazel index f68fa90ca161..6b09ed83a534 100644 --- a/pkg/kv/kvserver/allocator/mma/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/mma/BUILD.bazel @@ -16,13 +16,16 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/roachpb", + "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", ], ) go_test( name = "mma_test", srcs = [ + "cluster_state_test.go", "constraint_matcher_test.go", "constraint_test.go", "load_test.go", @@ -32,6 +35,10 @@ go_test( embed = [":mma"], deps = [ "//pkg/roachpb", + "//pkg/spanconfig/spanconfigtestutils", + "//pkg/testutils/datapathutils", + "//pkg/util/log", + "//pkg/util/timeutil", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/kv/kvserver/allocator/mma/allocator_state.go b/pkg/kv/kvserver/allocator/mma/allocator_state.go index 8dbd94f589a9..99bd768e4184 100644 --- a/pkg/kv/kvserver/allocator/mma/allocator_state.go +++ b/pkg/kv/kvserver/allocator/mma/allocator_state.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) type allocatorState struct { @@ -27,9 +28,9 @@ type allocatorState struct { changeRangeLimiter *storeChangeRateLimiter } -func newAllocatorState() *allocatorState { +func newAllocatorState(ts timeutil.TimeSource) *allocatorState { interner := newStringInterner() - cs := newClusterState(interner) + cs := newClusterState(ts, interner) return &allocatorState{ cs: cs, rangesNeedingAttention: map[roachpb.RangeID]struct{}{}, diff --git a/pkg/kv/kvserver/allocator/mma/cluster_state.go b/pkg/kv/kvserver/allocator/mma/cluster_state.go index f04f56a2d01b..95864a024d56 100644 --- a/pkg/kv/kvserver/allocator/mma/cluster_state.go +++ b/pkg/kv/kvserver/allocator/mma/cluster_state.go @@ -6,10 +6,15 @@ package mma import ( + "context" + "fmt" "slices" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/redact" ) // These values can sometimes be used in replicaType, replicaIDAndType, @@ -34,38 +39,77 @@ type replicaIDAndType struct { replicaType } +// SafeFormat implements the redact.SafeFormatter interface. +func (rt replicaIDAndType) SafeFormat(w redact.SafePrinter, _ rune) { + w.Print("replica-id=") + switch rt.ReplicaID { + case unknownReplicaID: + w.Print("unknown") + case noReplicaID: + w.Print("none") + default: + w.Print(rt.ReplicaID) + } + w.Printf(" type=%v leaseholder=%v", rt.replicaType.replicaType, rt.isLeaseholder) +} + +func (rt replicaIDAndType) String() string { + return redact.StringWithoutMarkers(rt) +} + +// prev is the state before the proposed change and next is the state after +// the proposed change. rit is the current observed state. +func (rit replicaIDAndType) subsumesChange(prev replicaIDAndType, next replicaIDAndType) bool { + if rit.ReplicaID == noReplicaID && next.ReplicaID == noReplicaID { + // Removal has happened. + return true + } + notSubsumed := (rit.ReplicaID == noReplicaID && next.ReplicaID != noReplicaID) || + (rit.ReplicaID != noReplicaID && next.ReplicaID == noReplicaID) + if notSubsumed { + return false + } + // Both rit and next have replicaIDs != noReplicaID. We don't actually care + // about the replicaID's since we don't control them. If the replicaTypes + // are as expected, and if we were either not trying to change the + // leaseholder, or that leaseholder change has happened, then the change has + // been subsumed. + switch rit.replicaType.replicaType { + case roachpb.VOTER_INCOMING: + // Already seeing the load, so consider the change done. + rit.replicaType.replicaType = roachpb.VOTER_FULL + } + // rit.replicaId equal to LEARNER, VOTER_DEMOTING* are left as-is. If next + // is trying to remove a replica, this store is still seeing some of the + // load. + if rit.replicaType == next.replicaType && (prev.isLeaseholder == next.isLeaseholder || + rit.isLeaseholder == next.isLeaseholder) { + return true + } + return false +} + type replicaState struct { replicaIDAndType // voterIsLagging can be set for a VOTER_FULL replica that has fallen behind // (and possibly even needs a snapshot to catch up). It is a hint to the // allocator not to transfer the lease to this replica. voterIsLagging bool - // replicationPaused is set to true if replication to this replica is - // paused. This can be a desirable replica to shed for an overloaded store. - replicationPaused bool + // TODO(kvoli,sumeerbhola): Add in rac2.SendQueue information to prevent + // lease transfers to replicas which are not able to take the lease due to a + // send queue. } -// Unique ID, in the context of this data-structure and when receiving updates -// about enactment having happened or having been rejected (by the component -// responsible for change enactment). +// changeID is a unique ID, in the context of this data-structure and when +// receiving updates about enactment having happened or having been rejected +// (by the component responsible for change enactment). type changeID uint64 -// pendingReplicaChange is a proposed change to a single replica. Some -// external entity (the leaseholder of the range) may choose to enact this -// change. It may not be enacted if it will cause some invariant (like the -// number of replicas, or having a leaseholder) to be violated. If not -// enacted, the allocator will either be told about the lack of enactment, or -// will eventually expire from the allocator's state after -// pendingChangeExpiryDuration. Such expiration without enactment should be -// rare. pendingReplicaChanges can be paired, when a range is being moved from -// one store to another -- that pairing is not captured here, and captured in -// the changes suggested by the allocator to the external entity. -type pendingReplicaChange struct { - changeID - +type replicaChange struct { // The load this change adds to a store. The values will be negative if the // load is being removed. - loadDelta loadVector + loadDelta loadVector + secondaryLoadDelta secondaryLoadVector storeID roachpb.StoreID rangeID roachpb.RangeID @@ -88,41 +132,205 @@ type pendingReplicaChange struct { // NON_VOTER. prev replicaState next replicaIDAndType +} + +func (rc replicaChange) isRemoval() bool { + return rc.prev.ReplicaID >= 0 && rc.next.ReplicaID == noReplicaID +} + +func (rc replicaChange) isAddition() bool { + return rc.prev.ReplicaID == noReplicaID && rc.next.ReplicaID == unknownReplicaID +} + +func (rc replicaChange) isUpdate() bool { + return rc.prev.ReplicaID >= 0 && rc.next.ReplicaID >= 0 +} + +func makeLeaseTransferChanges( + rState *rangeState, rLoad rangeLoad, addStoreID, removeStoreID roachpb.StoreID, +) [2]replicaChange { + addIdx, removeIdx := -1, -1 + for i, replica := range rState.replicas { + if replica.StoreID == addStoreID { + addIdx = i + } + if replica.StoreID == removeStoreID { + removeIdx = i + } + } + if removeIdx == -1 { + panic(fmt.Sprintf( + "existing leaseholder replica doesn't exist on store %v", removeIdx)) + } + if addIdx == -1 { + panic(fmt.Sprintf( + "new leaseholder replica doesn't exist on store %v", addStoreID)) + } + remove := rState.replicas[removeIdx] + add := rState.replicas[addIdx] + + removeLease := replicaChange{ + storeID: removeStoreID, + rangeID: rState.rangeID, + prev: remove.replicaState, + next: remove.replicaIDAndType, + } + addLease := replicaChange{ + storeID: addStoreID, + rangeID: rState.rangeID, + prev: add.replicaState, + next: add.replicaIDAndType, + } + addLease.next.isLeaseholder = true + removeLease.next.isLeaseholder = false + + // Only account for the leaseholder CPU, all other primary load dimensions + // are ignored. Byte size and write bytes are not impacted by having a range + // lease. + nonRaftCPU := rLoad.load[cpu] - rLoad.raftCPU + removeLease.loadDelta[cpu] -= nonRaftCPU + addLease.loadDelta[cpu] += nonRaftCPU + // Also account for the lease count. + removeLease.secondaryLoadDelta[leaseCount] = -1 + addLease.secondaryLoadDelta[leaseCount] = 1 + return [2]replicaChange{removeLease, addLease} +} + +// makeAddReplicaChange creates a replica change which adds the replica type +// to the store addStoreID. The load impact of adding the replica does not +// account for whether the replica is becoming the leaseholder or not. +// +// TODO(kvoli,sumeerbhola): Add promotion/demotion changes. +func makeAddReplicaChange( + rangeID roachpb.RangeID, rLoad rangeLoad, addStoreID roachpb.StoreID, rType roachpb.ReplicaType, +) replicaChange { + addReplica := replicaChange{ + storeID: addStoreID, + rangeID: rangeID, + prev: replicaState{ + replicaIDAndType: replicaIDAndType{ + ReplicaID: noReplicaID, + }, + }, + next: replicaIDAndType{ + ReplicaID: unknownReplicaID, + replicaType: replicaType{ + replicaType: rType, + }, + }, + } + + addReplica.loadDelta.add(rLoad.load) + // Set the load delta for CPU to be just the raft CPU. The non-raft CPU we + // assume is associated with the lease. + addReplica.loadDelta[cpu] = rLoad.raftCPU + return addReplica +} + +// makeRemoveReplicaChange creates a replica change which removes the replica +// given. The load impact of removing the replica does not account for whether +// the replica was the previous leaseholder or not. +func makeRemoveReplicaChange( + rangeID roachpb.RangeID, rLoad rangeLoad, remove storeIDAndReplicaState, +) replicaChange { + removeReplica := replicaChange{ + storeID: remove.StoreID, + rangeID: rangeID, + prev: remove.replicaState, + next: replicaIDAndType{ + ReplicaID: noReplicaID, + }, + } + removeReplica.loadDelta.subtract(rLoad.load) + // Set the load delta for CPU to be just the raft CPU. The non-raft CPU we + // assume is associated with the lease. + removeReplica.loadDelta[cpu] = -rLoad.raftCPU + return removeReplica +} + +// makeRebalanceReplicaChanges creates to replica changes, adding a replica and +// removing another. If the replica being rebalanced is the current +// leaseholder, the impact of the rebalance also includes the lease load. +// +// TODO(kvoli,sumeerbhola): If the leaseholder is being rebalanced, we need to +// ensure the incoming replica is eligible to take the lease. +func makeRebalanceReplicaChanges( + rState *rangeState, rLoad rangeLoad, addStoreID, removeStoreID roachpb.StoreID, +) [2]replicaChange { + var remove storeIDAndReplicaState + for _, replica := range rState.replicas { + if replica.StoreID == removeStoreID { + remove = replica + } + } + + addReplicaChange := makeAddReplicaChange(rState.rangeID, rLoad, addStoreID, remove.replicaType.replicaType) + removeReplicaChange := makeRemoveReplicaChange(rState.rangeID, rLoad, remove) + if remove.isLeaseholder { + // The existing leaseholder is being removed. The incoming replica will + // take the lease load, in addition to the replica load. + addReplicaChange.next.isLeaseholder = true + addReplicaChange.loadDelta = loadVector{} + removeReplicaChange.loadDelta = loadVector{} + addReplicaChange.loadDelta.add(rLoad.load) + removeReplicaChange.loadDelta.subtract(rLoad.load) + addReplicaChange.secondaryLoadDelta[leaseCount] = 1 + addReplicaChange.secondaryLoadDelta[leaseCount] = -1 + } + + return [2]replicaChange{addReplicaChange, removeReplicaChange} +} + +// pendingReplicaChange is a proposed change to a single replica. Some +// external entity (the leaseholder of the range) may choose to enact this +// change. It may not be enacted if it will cause some invariant (like the +// number of replicas, or having a leaseholder) to be violated. If not +// enacted, the allocator will either be told about the lack of enactment, or +// will eventually expire from the allocator's state after +// pendingChangeExpiryDuration. Such expiration without enactment should be +// rare. pendingReplicaChanges can be paired, when a range is being moved from +// one store to another -- that pairing is not captured here, and captured in +// the changes suggested by the allocator to the external entity. +type pendingReplicaChange struct { + changeID + replicaChange // The wall time at which this pending change was initiated. Used for // expiry. startTime time.Time // When the change is known to be enacted based on the authoritative - // information received from the leaseholder, this value is set, so that we - // can garbage collect this change. + // information received from the leaseholder, this value is set, so that even + // if the store with a replica affected by this pending change does not tell + // us about the enactment, we can garbage collect this change. enactedAtTime time.Time } -type pendingChangesOldestFirst []*pendingReplicaChange - -func (p *pendingChangesOldestFirst) removeChangeAtIndex(index int) { - n := len(*p) - copy((*p)[index:n-1], (*p)[index+1:n]) - *p = (*p)[:n-1] -} - -type storeInitState int8 +type storeMembership int8 +// TODO(kvoli): todo const ( - // partialInit is the state iff only the StoreID and adjusted.replicas have - // been initialized in this store. This can happen if the leaseholder of a - // range sends information about a store that has a replica before the - // allocator is explicitly told about this new store. - partialInit storeInitState = iota - fullyInit - // When the store is known to be removed but it is still referenced as a - // replica by some leaseholders. This is different from a dead store, from - // which we will rebalance away. For a removed store we'll just wait until - // there are no ranges referencing it. - removed + storeMembershipMember storeMembership = iota + storeMembershipRemoving + storeMembershipRemoved ) +func (s storeMembership) String() string { + return redact.StringWithoutMarkers(s) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (s storeMembership) SafeFormat(w redact.SafePrinter, _ rune) { + switch s { + case storeMembershipMember: + w.Print("full") + case storeMembershipRemoving: + w.Print("removing") + case storeMembershipRemoved: + w.Print("removed") + } +} + // storeChangeRateLimiter and helpers. // // Usage: @@ -288,14 +496,11 @@ func (crl *storeChangeRateLimiter) updateForRebalancePass(h *storeEnactedHistory // storeState maintains the complete state about a store as known to the // allocator. type storeState struct { - storeInitState + storeMembership storeLoad adjusted struct { - load loadVector - // loadReplicas is computed from the authoritative information provided by - // various leaseholders in storeLeaseholderMsgs, and adjusted for - // loadPendingChanges. - loadReplicas map[roachpb.RangeID]replicaType + load loadVector + secondaryLoad secondaryLoadVector // Pending changes for computing loadReplicas and load. // Added to at the same time as clusterState.pendingChanges. // @@ -319,16 +524,14 @@ type storeState struct { // can live on -- but since that will set enactedAtTime, we are guaranteed // to eventually remove it. loadPendingChanges map[changeID]*pendingReplicaChange - - enactedHistory storeEnactedHistory - - secondaryLoad secondaryLoadVector - // replicas is computed from the authoritative information provided by // various leaseholders in storeLeaseholderMsgs and adjusted for pending // changes in clusterState.pendingChanges/rangeState.pendingChanges. + // + // XXX: Changes when something is never going to happen (gc duration 5 + // mins), reject pending change (enacting module). Or a change is + // tentatively made. replicas map[roachpb.RangeID]replicaState - // topKRanges along some load dimensions. If the store is closer to // hitting the resource limit on some resource ranges that are higher in // that resource dimension should be over-represented in this map. It @@ -342,6 +545,9 @@ type storeState struct { // TODO(sumeer): figure out at least one reasonable way to do this, even // if we postpone it to a later code iteration. topKRanges map[roachpb.RangeID]rangeLoad + // TODO(kvoli,sumeerbhola): Update enactedHistory when integrating the + // storeChangeRateLimiter. + enactedHistory storeEnactedHistory } // This is a locally incremented seqnum which is incremented whenever the // adjusted or reported load information for this store or the containing @@ -365,8 +571,13 @@ type storeState struct { // effect of that change is seen in the load information computed by that // store. // -// TODO(sumeer): set this based on an understanding of ReplicaStats etc. -const lagForChangeReflectedInLoad = 5 * time.Second +// NOTE: The gossip interval is 10s (see gossip.go StoresInterval). On a +// signficant load change (% delta), the store will gossip more frequently (see +// kvserver/store_gossip.go). +// +// TODO(sumeer): set this based on an understanding of ReplicaStats, Gossip +// etc. +const lagForChangeReflectedInLoad = 10 * time.Second // NB: this is a heuristic. // @@ -392,6 +603,34 @@ func (ss *storeState) computePendingChangesReflectedInLatestLoad( return changes } +func newStoreState(storeID roachpb.StoreID, nodeID roachpb.NodeID) *storeState { + ss := &storeState{ + storeLoad: storeLoad{ + StoreID: storeID, + NodeID: nodeID, + }, + } + ss.adjusted.loadPendingChanges = map[changeID]*pendingReplicaChange{} + ss.adjusted.replicas = map[roachpb.RangeID]replicaState{} + ss.adjusted.topKRanges = map[roachpb.RangeID]rangeLoad{} + return ss +} + +// applyChangeLoadDelta adds the change load delta to the store's adjusted load. +func (ss *storeState) applyChangeLoadDelta(change replicaChange) { + ss.adjusted.load.add(change.loadDelta) + ss.adjusted.secondaryLoad.add(change.secondaryLoadDelta) + ss.loadSeqNum++ +} + +// undoChangeLoadDelta subtracts the change load delta from the store's +// adjusted load. +func (ss *storeState) undoChangeLoadDelta(change replicaChange) { + ss.adjusted.load.subtract(change.loadDelta) + ss.adjusted.secondaryLoad.subtract(change.secondaryLoadDelta) + ss.loadSeqNum++ +} + // failureDetectionSummary is provided by an external entity and never // computed inside the allocator. type failureDetectionSummary uint8 @@ -409,6 +648,24 @@ const ( fdDead ) +func (fds failureDetectionSummary) String() string { + return redact.StringWithoutMarkers(fds) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (fds failureDetectionSummary) SafeFormat(w redact.SafePrinter, _ rune) { + switch fds { + case fdOK: + w.Print("ok") + case fdSuspect: + w.Print("suspect") + case fdDrain: + w.Print("drain") + case fdDead: + w.Print("dead") + } +} + type nodeState struct { stores []roachpb.StoreID nodeLoad @@ -420,6 +677,26 @@ type nodeState struct { fdSummary failureDetectionSummary } +func newNodeState(nodeID roachpb.NodeID) *nodeState { + return &nodeState{ + stores: []roachpb.StoreID{}, + nodeLoad: nodeLoad{ + nodeID: nodeID, + }, + } +} + +// applyChangeLoadDelta adds the change load delta to the node's adjusted load. +func (ns *nodeState) applyChangeLoadDelta(change replicaChange) { + ns.adjustedCPU += change.loadDelta[cpu] +} + +// undoChangeLoadDelta subtracts the change load delta from the node's adjusted +// load. +func (ns *nodeState) undoChangeLoadDelta(change replicaChange) { + ns.adjustedCPU -= change.loadDelta[cpu] +} + type storeIDAndReplicaState struct { roachpb.StoreID // Only valid ReplicaTypes are used here. @@ -428,6 +705,7 @@ type storeIDAndReplicaState struct { // rangeState is periodically updated based on reporting by the leaseholder. type rangeState struct { + rangeID roachpb.RangeID // replicas is the adjusted replicas. It is always consistent with // the storeState.adjusted.replicas in the corresponding stores. replicas []storeIDAndReplicaState @@ -451,28 +729,75 @@ type rangeState struct { diversityIncreaseLastFailedAttempt time.Time } +func newRangeState() *rangeState { + return &rangeState{ + replicas: []storeIDAndReplicaState{}, + pendingChanges: []*pendingReplicaChange{}, + } +} + +func (rs *rangeState) setReplica(repl storeIDAndReplicaState) { + for i := range rs.replicas { + if rs.replicas[i].StoreID == repl.StoreID { + rs.replicas[i].replicaState = repl.replicaState + return + } + } + rs.replicas = append(rs.replicas, repl) +} + +func (rs *rangeState) removeReplica(storeID roachpb.StoreID) { + var i, n int + n = len(rs.replicas) + for ; i < n; i++ { + if rs.replicas[i].StoreID == storeID { + rs.replicas[i], rs.replicas[n-1] = rs.replicas[n-1], rs.replicas[i] + rs.replicas = rs.replicas[:n-1] + break + } + } +} + +func (rs *rangeState) removePendingChangeTracking(cid changeID) { + i := 0 + n := len(rs.pendingChanges) + for ; i < n; i++ { + if rs.pendingChanges[i].changeID == cid { + rs.pendingChanges[i], rs.pendingChanges[n-1] = rs.pendingChanges[n-1], rs.pendingChanges[i] + rs.pendingChanges = rs.pendingChanges[:n-1] + break + } + } + // Wipe the analyzed constraints, as the range has changed. + rs.constraints = nil +} + // clusterState is the state of the cluster known to the allocator, including // adjustments based on pending changes. It does not include additional // indexing needed for constraint matching, or for tracking ranges that may // need attention etc. (those happen at a higher layer). type clusterState struct { + ts timeutil.TimeSource nodes map[roachpb.NodeID]*nodeState stores map[roachpb.StoreID]*storeState ranges map[roachpb.RangeID]*rangeState + // Added to when a change is proposed. Will also add to corresponding - // rangeState.pendingChanges and to the effected storeStates. + // rangeState.pendingChanges and to the affected storeStates. // // Removed from based on rangeMsg, explicit rejection by enacting module, or // time-based GC. There is no explicit acceptance by enacting module since // the single source of truth of a rangeState is the leaseholder. pendingChanges map[changeID]*pendingReplicaChange + changeSeqGen changeID *constraintMatcher *localityTierInterner } -func newClusterState(interner *stringInterner) *clusterState { +func newClusterState(ts timeutil.TimeSource, interner *stringInterner) *clusterState { return &clusterState{ + ts: ts, nodes: map[roachpb.NodeID]*nodeState{}, stores: map[roachpb.StoreID]*storeState{}, ranges: map[roachpb.RangeID]*rangeState{}, @@ -486,26 +811,114 @@ func newClusterState(interner *stringInterner) *clusterState { // clusterState mutators //====================================================================== +// XXX: We GC pending changes on processNodeLoadMsg. +// - This implies that pendingChanges are only +// XXX: We update pending changes as enacted vs not enacted on +// processStoreLeaseholderMsg. func (cs *clusterState) processNodeLoadMsg(msg *nodeLoadMsg) { - // TODO(sumeer): + now := cs.ts.Now() + cs.gcPendingChanges(now) + + ns := cs.nodes[msg.nodeID] + // Handle the node load, updating the reported load and set the adjusted load + // to be equal to the reported load initially. Any remaining pending changes + // will be re-applied to the reported load. + ns.reportedCPU = msg.reportedCPU + ns.adjustedCPU = msg.reportedCPU + ns.nodeLoad = msg.nodeLoad + for _, storeMsg := range msg.stores { + ss := cs.stores[storeMsg.StoreID] + // The store's load seqeunce number is incremented on each load change. The + // store's load is updated below. + ss.loadSeqNum++ + ss.storeLoad.reportedLoad = storeMsg.load + ss.storeLoad.capacity = storeMsg.capacity + ss.storeLoad.reportedSecondaryLoad = storeMsg.secondaryLoad + // Reset the adjusted load to be the reported load. We will re-apply any + // remaining pending change deltas to the updated adjusted load. + ss.adjusted.load = storeMsg.load + ss.adjusted.secondaryLoad = storeMsg.secondaryLoad + // Find any pending changes for range's which involve this store. These + // pending changes can now be removed from the loadPendingChanges. We don't + // need to undo the corresponding delta adjustment as the reported load + // (should) already contain(s) the effect(s). + for _, change := range ss.computePendingChangesReflectedInLatestLoad(now) { + delete(ss.adjusted.loadPendingChanges, change.changeID) + } + for _, change := range ss.adjusted.loadPendingChanges { + // The pending change hasn't been reported as done, re-apply the load + // delta to the adjusted load and include it in the new adjusted load + // replicas. + cs.applyChangeLoadDelta(change.replicaChange) + } + } } -func (cs *clusterState) processStoreLeaseholderMsg(msg *storeLeaseholderMsg) {} +func (cs *clusterState) processStoreLeaseholderMsg(msg *storeLeaseholderMsg) { + now := cs.ts.Now() + cs.gcPendingChanges(now) -func (cs *clusterState) addNodeID(nodeID roachpb.NodeID) { - // TODO(sumeer): -} - -func (cs *clusterState) addStore(store roachpb.StoreDescriptor) { - // TODO(sumeer): -} + ctx := context.Background() + log.Infof(ctx, "store leaseholder msg %v", msg) -func (cs *clusterState) changeStore(store roachpb.StoreDescriptor) { - // TODO(sumeer): -} - -func (cs *clusterState) removeNodeAndStores(nodeID roachpb.NodeID) { - // TODO(sumeer): + for _, rangeMsg := range msg.ranges { + rs, ok := cs.ranges[rangeMsg.RangeID] + if !ok { + // This is the first time we've seen this range. + rs = newRangeState() + cs.ranges[rangeMsg.RangeID] = rs + } + // Unilaterally remove the existing range state replicas. Then add back + // the replicas reported by the range message. + // + // TODO(kvoli,sumeerbhola): Use a set difference with rangeMsg.replicas + // and rs.replicas instead. + for _, replica := range rs.replicas { + delete(cs.stores[replica.StoreID].adjusted.replicas, rangeMsg.RangeID) + } + // Set the range state replicas to be equal to the range leaseholder + // replicas. The pending changes which are not enacted in the range message + // are handled and added back below. + rs.replicas = rangeMsg.replicas + log.Infof(ctx, "store leaseholder rangeMsg.replicas=%v", rangeMsg.replicas) + for _, replica := range rangeMsg.replicas { + cs.stores[replica.StoreID].adjusted.replicas[rangeMsg.RangeID] = replica.replicaState + } + // Find any pending changes which are now enacted, according to the + // leaseholder's view. + var remainingChanges, enactedChanges []*pendingReplicaChange + for _, change := range rs.pendingChanges { + ss := cs.stores[change.storeID] + adjustedReplicas, ok := ss.adjusted.replicas[rangeMsg.RangeID] + if !ok { + adjustedReplicas.ReplicaID = noReplicaID + } + if adjustedReplicas.subsumesChange(change.prev.replicaIDAndType, change.next) { + // The change has been enacted according to the leaseholder. + enactedChanges = append(enactedChanges, change) + } else { + remainingChanges = append(remainingChanges, change) + } + } + for _, change := range enactedChanges { + // Mark the change as enacted. Enacting a change does not remove the + // corresponding load adjustments. The store load message will do that, + // or GC, indiciating that the change is been reflected in the store + // load. + cs.markPendingChangeEnacted(change.changeID, now) + } + // Re-apply the remaining changes. + for _, change := range remainingChanges { + cs.applyReplicaChange(change.replicaChange) + } + normSpanConfig, err := makeNormalizedSpanConfig(&rangeMsg.conf, cs.constraintMatcher.interner) + if err != nil { + // TODO(kvoli): Decide if we want to bubble this error up, or log a + // warning here instead? + panic(err) + } + rs.conf = normSpanConfig + } } // If the pending change does not happen within this GC duration, we @@ -513,25 +926,211 @@ func (cs *clusterState) removeNodeAndStores(nodeID roachpb.NodeID) { const pendingChangeGCDuration = 5 * time.Minute // Called periodically by allocator. -func (cs *clusterState) gcPendingChanges() { - // TODO(sumeer): +func (cs *clusterState) gcPendingChanges(now time.Time) { + gcBeforeTime := now.Add(-pendingChangeGCDuration) + var removeChangeIds []changeID + for _, pendingChange := range cs.pendingChanges { + if !pendingChange.startTime.After(gcBeforeTime) { + removeChangeIds = append(removeChangeIds, pendingChange.changeID) + } + } + for _, rmChange := range removeChangeIds { + cs.undoPendingChange(rmChange) + } } // Called by enacting module. -func (cs *clusterState) pendingChangesRejected( - rangeID roachpb.RangeID, changes []pendingReplicaChange, -) { - // TODO(sumeer): +func (cs *clusterState) pendingChangesRejected(changes []changeID) { + // Wipe rejected changes, including load adjustments, tracking and replica + // changes. + for _, changeID := range changes { + cs.undoPendingChange(changeID) + } } -func (cs *clusterState) addPendingChanges(rangeID roachpb.RangeID, changes []pendingReplicaChange) { - // TODO(sumeer): +// makePendingChanges takes a set of changes for a range and applies the +// changes as pending. The application updates the adjusted load, tracked +// pending changes and changeID to reflect the pending application. +func (cs *clusterState) makePendingChanges( + rangeID roachpb.RangeID, changes []replicaChange, +) []*pendingReplicaChange { + now := cs.ts.Now() + var pendingChanges []*pendingReplicaChange + for _, change := range changes { + pendingChange := cs.makePendingChange(now, change) + pendingChanges = append(pendingChanges, pendingChange) + } + return pendingChanges +} + +// makePendingChange takes a single change for a range and applies the change +// as pending. The application updates the adjusted load, tracked pending +// changes and changeID to reflect the pending application. +func (cs *clusterState) makePendingChange( + now time.Time, change replicaChange, +) *pendingReplicaChange { + // Apply the load and replica change to state. + cs.applyReplicaChange(change) + // Grab the next change ID, then add the pending change to the pending change + // trackers on the range, store and cluster state. + cs.changeSeqGen++ + cid := cs.changeSeqGen + pendingChange := &pendingReplicaChange{ + changeID: cid, + replicaChange: change, + startTime: now, + enactedAtTime: time.Time{}, + } + storeState := cs.stores[change.storeID] + rangeState := cs.ranges[change.rangeID] + cs.pendingChanges[cid] = pendingChange + storeState.adjusted.loadPendingChanges[cid] = pendingChange + rangeState.pendingChanges = append(rangeState.pendingChanges, pendingChange) + return pendingChange +} + +func (cs *clusterState) applyReplicaChange(change replicaChange) { + storeState := cs.stores[change.storeID] + rangeState := cs.ranges[change.rangeID] + if change.isRemoval() { + delete(storeState.adjusted.replicas, change.rangeID) + rangeState.removeReplica(change.storeID) + } else if change.isAddition() { + // The change is to add a new replica. + pendingRepl := storeIDAndReplicaState{ + StoreID: change.storeID, + replicaState: replicaState{ + replicaIDAndType: change.next, + }, + } + storeState.adjusted.replicas[change.rangeID] = pendingRepl.replicaState + rangeState.setReplica(pendingRepl) + } else if change.isUpdate() { + // The change is to update an existing replica. + replState := storeState.adjusted.replicas[change.rangeID] + replState.replicaIDAndType = change.next + storeState.adjusted.replicas[change.rangeID] = replState + rangeState.setReplica(storeIDAndReplicaState{ + StoreID: change.storeID, + replicaState: replState, + }) + } else { + panic(fmt.Sprintf("unknown replica change %+v", change)) + } + cs.applyChangeLoadDelta(change) +} + +func (cs *clusterState) markPendingChangeEnacted(cid changeID, enactedAt time.Time) { + change := cs.pendingChanges[cid] + change.enactedAtTime = enactedAt +} + +// undoPendingChange reverses the change with ID cid. +func (cs *clusterState) undoPendingChange(cid changeID) { + change, ok := cs.pendingChanges[cid] + if !ok { + // TODO(kvoli): This is a panic but we could just log a warning or let this + // happen, revisit. + panic(fmt.Sprintf("change %v not found", cid)) + } + // Undo the change effects, including changes to the range replicas and load. + cs.undoReplicaChange(change.replicaChange) + cs.removePendingChangeTracking(cid) + delete(cs.stores[change.storeID].adjusted.loadPendingChanges, change.changeID) +} + +func (cs *clusterState) undoReplicaChange(change replicaChange) { + rangeState := cs.ranges[change.rangeID] + storeState := cs.stores[change.storeID] + if change.isRemoval() { + prevRepl := storeIDAndReplicaState{ + StoreID: change.storeID, + replicaState: change.prev, + } + rangeState.setReplica(prevRepl) + storeState.adjusted.replicas[change.rangeID] = prevRepl.replicaState + } else if change.isAddition() { + delete(storeState.adjusted.replicas, change.rangeID) + rangeState.removeReplica(change.storeID) + } else if change.isUpdate() { + replState := change.prev + rangeState.setReplica(storeIDAndReplicaState{ + StoreID: change.storeID, + replicaState: replState, + }) + storeState.adjusted.replicas[change.rangeID] = replState + } else { + panic(fmt.Sprintf("unknown replica change %+v", change)) + } + cs.undoChangeLoadDelta(change) +} + +// TODO(kvoli,sumeerbhola): The load of the store and node can become negative +// when applying or undoing load adjustments. For load adjustments to be +// reversible quickly, we aren't able to zero out the value when negative. We +// should handle the negative values when using them. + +// applyChangeLoadDelta adds the change load delta to the adjusted load of the +// store and node affected. +func (cs *clusterState) applyChangeLoadDelta(change replicaChange) { + ss := cs.stores[change.storeID] + ns := cs.nodes[ss.NodeID] + ss.applyChangeLoadDelta(change) + ns.applyChangeLoadDelta(change) +} + +// undoChangeLoadDelta subtracts the change load delta from the adjusted load +// of the store and node affected. +func (cs *clusterState) undoChangeLoadDelta(change replicaChange) { + ss := cs.stores[change.storeID] + ns := cs.nodes[ss.NodeID] + ss.undoChangeLoadDelta(change) + ns.undoChangeLoadDelta(change) +} + +// removePendingChangeTracking removes the given change from the cluster and +// range pending change tracking. +func (cs *clusterState) removePendingChangeTracking(cid changeID) { + change, ok := cs.pendingChanges[cid] + if !ok { + panic(fmt.Sprintf("change %v not found", cid)) + } + cs.ranges[change.rangeID].removePendingChangeTracking(change.changeID) + delete(cs.pendingChanges, change.changeID) +} + +func (cs *clusterState) setStore(desc roachpb.StoreDescriptor) { + ns, ok := cs.nodes[desc.Node.NodeID] + if !ok { + // This is the first time seeing the associated node. + ns = newNodeState(desc.Node.NodeID) + cs.nodes[desc.Node.NodeID] = ns + } + ns.stores = append(ns.stores, desc.StoreID) + ss, ok := cs.stores[desc.StoreID] + if !ok { + // This is the first time seeing this store. + ss = newStoreState(desc.StoreID, desc.Node.NodeID) + ss.localityTiers = cs.localityTierInterner.intern(desc.Locality()) + cs.constraintMatcher.setStore(desc) + cs.stores[desc.StoreID] = ss + } + ss.StoreDescriptor = desc +} + +func (cs *clusterState) setStoreMembership(storeID roachpb.StoreID, state storeMembership) { + if ss, ok := cs.stores[storeID]; ok { + ss.storeMembership = state + } else { + panic(fmt.Sprintf("store %d not found in cluster state", storeID)) + } } func (cs *clusterState) updateFailureDetectionSummary( nodeID roachpb.NodeID, fd failureDetectionSummary, ) { - // TODO(sumeer): + ns := cs.nodes[nodeID] + ns.fdSummary = fd } //====================================================================== @@ -543,13 +1142,17 @@ func (cs *clusterState) updateFailureDetectionSummary( // For meansMemo. var _ loadInfoProvider = &clusterState{} -func (cs *clusterState) getStoreReportedLoad(roachpb.StoreID) *storeLoad { - // TODO(sumeer): +func (cs *clusterState) getStoreReportedLoad(storeID roachpb.StoreID) *storeLoad { + if storeState, ok := cs.stores[storeID]; ok { + return &storeState.storeLoad + } return nil } -func (cs *clusterState) getNodeReportedLoad(roachpb.NodeID) *nodeLoad { - // TODO(sumeer): +func (cs *clusterState) getNodeReportedLoad(nodeID roachpb.NodeID) *nodeLoad { + if nodeState, ok := cs.nodes[nodeID]; ok { + return &nodeState.nodeLoad + } return nil } @@ -568,6 +1171,7 @@ func (cs *clusterState) computeLoadSummary( ns := cs.nodes[ss.NodeID] sls := loadLow for i := range msl.load { + // TODO(kvoli,sumeerbhola): Handle negative adjusted store/node loads. ls := loadSummaryForDimension(ss.adjusted.load[i], ss.capacity[i], msl.load[i], msl.util[i]) if ls < sls { sls = ls @@ -581,80 +1185,3 @@ func (cs *clusterState) computeLoadSummary( loadSeqNum: ss.loadSeqNum, } } - -// Avoid unused lint errors. - -var _ = (&pendingChangesOldestFirst{}).removeChangeAtIndex -var _ = (&clusterState{}).processNodeLoadMsg -var _ = (&clusterState{}).processStoreLeaseholderMsg -var _ = (&clusterState{}).addNodeID -var _ = (&clusterState{}).addStore -var _ = (&clusterState{}).changeStore -var _ = (&clusterState{}).removeNodeAndStores -var _ = (&clusterState{}).gcPendingChanges -var _ = (&clusterState{}).pendingChangesRejected -var _ = (&clusterState{}).addPendingChanges -var _ = (&clusterState{}).updateFailureDetectionSummary -var _ = (&clusterState{}).getStoreReportedLoad -var _ = (&clusterState{}).getNodeReportedLoad -var _ = (&clusterState{}).canAddLoad -var _ = (&clusterState{}).computeLoadSummary -var _ = unknownReplicaID -var _ = noReplicaID -var _ = fdSuspect -var _ = fdDrain -var _ = fdDead -var _ = partialInit -var _ = fullyInit -var _ = removed -var _ = pendingChangeGCDuration -var _ = replicaType{}.replicaType -var _ = replicaType{}.isLeaseholder -var _ = replicaIDAndType{}.ReplicaID -var _ = replicaIDAndType{}.replicaType -var _ = replicaState{}.replicaIDAndType -var _ = replicaState{}.voterIsLagging -var _ = replicaState{}.replicationPaused -var _ = pendingReplicaChange{}.changeID -var _ = pendingReplicaChange{}.loadDelta -var _ = pendingReplicaChange{}.storeID -var _ = pendingReplicaChange{}.rangeID -var _ = pendingReplicaChange{}.prev -var _ = pendingReplicaChange{}.next -var _ = pendingReplicaChange{}.startTime -var _ = pendingReplicaChange{}.enactedAtTime -var _ = storeState{}.storeInitState -var _ = storeState{}.storeLoad -var _ = storeState{}.adjusted.loadReplicas -var _ = storeState{}.adjusted.loadPendingChanges -var _ = storeState{}.adjusted.secondaryLoad -var _ = storeState{}.adjusted.replicas -var _ = storeState{}.maxFractionPending -var _ = storeState{}.localityTiers -var _ = nodeState{}.stores -var _ = nodeState{}.nodeLoad -var _ = nodeState{}.adjustedCPU -var _ = nodeState{}.loadSummary -var _ = nodeState{}.fdSummary -var _ = storeIDAndReplicaState{}.StoreID -var _ = storeIDAndReplicaState{}.replicaState -var _ = rangeState{}.replicas -var _ = rangeState{}.conf -var _ = rangeState{}.load -var _ = rangeState{}.pendingChanges -var _ = rangeState{}.constraints -var _ = rangeState{}.diversityIncreaseLastFailedAttempt -var _ = clusterState{}.nodes -var _ = clusterState{}.stores -var _ = clusterState{}.ranges -var _ = clusterState{}.pendingChanges -var _ = clusterState{}.constraintMatcher -var _ = clusterState{}.localityTierInterner -var _ = (&storeChangeRateLimiter{}).initForRebalancePass -var _ = (&storeChangeRateLimiter{}).updateForRebalancePass -var _ = newStoreChangeRateLimiter -var _ = (&storeEnactedHistory{}).addEnactedChange -var _ = (&storeEnactedHistory{}).allowLoadBasedChanges -var _ = (&storeEnactedHistory{}).gcHistory -var _ = enactedReplicaChange{} -var _ = (&storeState{}).computePendingChangesReflectedInLatestLoad diff --git a/pkg/kv/kvserver/allocator/mma/cluster_state_test.go b/pkg/kv/kvserver/allocator/mma/cluster_state_test.go new file mode 100644 index 000000000000..3a2fa3f94e59 --- /dev/null +++ b/pkg/kv/kvserver/allocator/mma/cluster_state_test.go @@ -0,0 +1,450 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package mma + +import ( + "fmt" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +var testingBaseTime = time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + +func parseBool(t *testing.T, in string) bool { + b, err := strconv.ParseBool(strings.TrimSpace(in)) + require.NoError(t, err) + return b +} + +func parseInt(t *testing.T, in string) int { + i, err := strconv.Atoi(strings.TrimSpace(in)) + require.NoError(t, err) + return i +} + +func parseInts(t *testing.T, in string) []int { + parts := strings.Split(in, ",") + ints := make([]int, 0, len(parts)) + for _, i := range strings.Split(in, ",") { + ints = append(ints, parseInt(t, i)) + } + return ints +} + +func stripBrackets(t *testing.T, in string) string { + rTrim := strings.TrimSuffix(in, "]") + lrTrim := strings.TrimPrefix(rTrim, "[") + return lrTrim +} + +func parseLoadVector(t *testing.T, in string) loadVector { + var vec loadVector + parts := strings.Split(stripBrackets(t, in), ",") + require.Len(t, parts, int(numLoadDimensions)) + for dim := range vec { + vec[dim] = loadValue(parseInt(t, parts[dim])) + } + return vec +} + +func parseSecondaryLoadVector(t *testing.T, in string) secondaryLoadVector { + var vec secondaryLoadVector + parts := strings.Split(stripBrackets(t, in), ",") + require.Len(t, parts, int(numSecondaryLoadDimensions)) + for dim := range vec { + vec[dim] = loadValue(parseInt(t, parts[dim])) + } + return vec +} + +func parseStoreLoadMsg(t *testing.T, in string) storeLoadMsg { + var msg storeLoadMsg + for _, v := range strings.Fields(in) { + parts := strings.Split(v, "=") + require.Len(t, parts, 2) + switch parts[0] { + case "store-id": + msg.StoreID = roachpb.StoreID(parseInt(t, parts[1])) + case "load": + msg.load = parseLoadVector(t, parts[1]) + case "capacity": + msg.capacity = parseLoadVector(t, parts[1]) + for i := range msg.capacity { + if msg.capacity[i] < 0 { + msg.capacity[i] = parentCapacity + } + } + case "secondary-load": + msg.secondaryLoad = parseSecondaryLoadVector(t, parts[1]) + default: + t.Fatalf("Unknown argument: %s", parts[0]) + } + } + return msg +} + +func parseNodeLoadMsg(t *testing.T, in string) nodeLoadMsg { + var msg nodeLoadMsg + lines := strings.Split(in, "\n") + for _, part := range strings.Fields(lines[0]) { + parts := strings.Split(part, "=") + require.Len(t, parts, 2) + switch parts[0] { + case "node-id": + msg.nodeID = roachpb.NodeID(parseInt(t, parts[1])) + case "cpu-load": + msg.reportedCPU = loadValue(parseInt(t, parts[1])) + case "cpu-capacity": + msg.capacityCPU = loadValue(parseInt(t, parts[1])) + case "load-time": + msg.loadTime = testingBaseTime.Add(time.Second * time.Duration(parseInt(t, parts[1]))) + default: + t.Fatalf("Unknown argument: %s", parts[0]) + } + } + + msg.stores = make([]storeLoadMsg, 0, len(lines)-1) + for _, line := range lines[1:] { + msg.stores = append(msg.stores, parseStoreLoadMsg(t, line)) + } + + return msg +} + +func parseStoreLeaseholderMsg(t *testing.T, in string) storeLeaseholderMsg { + var msg storeLeaseholderMsg + + lines := strings.Split(in, "\n") + require.True(t, strings.HasPrefix(lines[0], "store-id=")) + msg.StoreID = roachpb.StoreID(parseInt(t, strings.TrimPrefix(lines[0], "store-id="))) + + var rMsg rangeMsg + for _, line := range lines[1:] { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "range-id") { + if rMsg.RangeID != 0 { + msg.ranges = append(msg.ranges, rMsg) + } + rMsg = rangeMsg{RangeID: 0} + for _, field := range strings.Fields(line) { + parts := strings.SplitN(field, "=", 2) + switch parts[0] { + case "range-id": + rMsg.RangeID = roachpb.RangeID(parseInt(t, parts[1])) + case "load": + rMsg.rangeLoad.load = parseLoadVector(t, parts[1]) + case "raft-cpu": + rMsg.rangeLoad.raftCPU = loadValue(parseInt(t, parts[1])) + } + } + } else if strings.HasPrefix(line, "config=") { + rMsg.conf = spanconfigtestutils.ParseZoneConfig(t, strings.TrimPrefix(line, "config=")).AsSpanConfig() + } else { + var repl storeIDAndReplicaState + fields := strings.Fields(line) + require.Greater(t, len(fields), 2) + for _, field := range fields { + parts := strings.Split(field, "=") + require.GreaterOrEqual(t, len(parts), 2) + switch parts[0] { + case "store-id": + repl.StoreID = roachpb.StoreID(parseInt(t, parts[1])) + case "replica-id": + repl.ReplicaID = roachpb.ReplicaID(parseInt(t, parts[1])) + case "leaseholder": + repl.isLeaseholder = parseBool(t, parts[1]) + case "type": + replType, err := parseReplicaType(parts[1]) + require.NoError(t, err) + repl.replicaType.replicaType = replType + default: + panic(fmt.Sprintf("unknown argument: %s", parts[0])) + } + } + rMsg.replicas = append(rMsg.replicas, repl) + } + } + if rMsg.RangeID != 0 { + msg.ranges = append(msg.ranges, rMsg) + } + + return msg +} + +func parseChangeAddRemove( + t *testing.T, in string, +) (add, remove roachpb.StoreID, replType roachpb.ReplicaType, rangeLoad rangeLoad) { + // Note that remove or add will 0 if not found in this string. + for _, v := range strings.Fields(in) { + parts := strings.Split(v, "=") + require.Len(t, parts, 2) + switch parts[0] { + case "add-store": + add = roachpb.StoreID(parseInt(t, parts[1])) + case "remove-store": + remove = roachpb.StoreID(parseInt(t, parts[1])) + case "type": + var err error + replType, err = parseReplicaType(parts[1]) + require.NoError(t, err) + case "load": + rangeLoad.load = parseLoadVector(t, parts[1]) + case "cpu-raft": + rangeLoad.raftCPU = loadValue(parseInt(t, parts[1])) + } + } + return add, remove, replType, rangeLoad +} + +func printPendingChanges(changes []*pendingReplicaChange) string { + sortedChanges := make([]*pendingReplicaChange, 0, len(changes)) + sort.Slice(changes, func(i, j int) bool { + return changes[i].startTime.Before(changes[j].startTime) + }) + var buf strings.Builder + fmt.Fprintf(&buf, "pending(%d)", len(sortedChanges)) + for _, change := range sortedChanges { + fmt.Fprintf(&buf, "\nchange-id=%d start=%ds store-id=%v range-id=%v delta=%v", + change.changeID, change.startTime.Unix(), change.storeID, + change.rangeID, change.loadDelta, + ) + if !(change.enactedAtTime == time.Time{}) { + fmt.Fprintf(&buf, " enacted=%ds", change.enactedAtTime.Unix()) + } + fmt.Fprintf(&buf, "\n prev=(%v)\n next=(%v)", change.prev, change.next) + } + return buf.String() +} + +func testingGetStoreList(cs *clusterState) (member, removed storeIDPostingList) { + for storeID, ss := range cs.stores { + switch ss.storeMembership { + case storeMembershipMember, storeMembershipRemoving: + member = append(member, storeID) + case storeMembershipRemoved: + removed = append(removed, storeID) + } + } + return member, removed +} + +func testingGetPendingChanges(cs *clusterState) []*pendingReplicaChange { + pendingChangeSet := map[changeID]struct{}{} + var pendingChangeList []*pendingReplicaChange + maybeAddChange := func(change *pendingReplicaChange) { + if _, ok := pendingChangeSet[change.changeID]; ok { + return + } + pendingChangeList = append(pendingChangeList, change) + pendingChangeSet[change.changeID] = struct{}{} + } + + for _, change := range cs.pendingChanges { + maybeAddChange(change) + } + + // Use the sorted store list to ensure deterministic iteration order. + memberStores, _ := testingGetStoreList(cs) + for _, storeID := range memberStores { + for _, change := range cs.stores[storeID].adjusted.loadPendingChanges { + maybeAddChange(change) + } + } + return pendingChangeList +} + +// TODO(kvoli): Clean this up, it is leftover from the earlier testing. +func TestClusterState(t *testing.T) { + datadriven.Walk(t, + datapathutils.TestDataPath(t, "cluster_state"), + func(t *testing.T, path string) { + ts := timeutil.NewManualTime(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)) + cs := newClusterState(ts, newStringInterner()) + + printNodeListMeta := func() string { + nodeList := []int{} + for nodeID := range cs.nodes { + nodeList = append(nodeList, int(nodeID)) + } + sort.Ints(nodeList) + var buf strings.Builder + for _, nodeID := range nodeList { + ns := cs.nodes[roachpb.NodeID(nodeID)] + fmt.Fprintf(&buf, "node-id=%s failure-summary=%s locality-tiers=%s\n", + ns.nodeID, ns.fdSummary, cs.stores[ns.stores[0]].StoreDescriptor.Locality()) + for _, storeID := range ns.stores { + ss := cs.stores[storeID] + fmt.Fprintf(&buf, " store-id=%v membership=%v attrs=%s locality-code=%s\n", + ss.StoreID, ss.storeMembership, ss.Attrs, ss.localityTiers.str) + } + } + return buf.String() + } + + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "ranges": + var rangeIDs []int + for rangeID := range cs.ranges { + rangeIDs = append(rangeIDs, int(rangeID)) + } + // Sort the range IDs before printing, iterating over a map would + // lead to non-determinism. + sort.Ints(rangeIDs) + var buf strings.Builder + for _, rangeID := range rangeIDs { + rs := cs.ranges[roachpb.RangeID(rangeID)] + fmt.Fprintf(&buf, "range-id=%v\n", rangeID) + for _, repl := range rs.replicas { + fmt.Fprintf(&buf, " store-id=%v %v\n", + repl.StoreID, repl.replicaIDAndType, + ) + } + } + return buf.String() + + case "get-load-info": + var buf strings.Builder + storeList, _ := testingGetStoreList(cs) + for _, storeID := range storeList { + ss := cs.stores[storeID] + if ss.storeMembership == storeMembershipRemoved { + continue + } + ns := cs.nodes[ss.NodeID] + fmt.Fprintf(&buf, + "store-id=%v reported=%v adjusted=%v node-reported-cpu=%v node-adjusted-cpu=%v seq=%d\n", + ss.StoreID, ss.reportedLoad, ss.adjusted.load, ns.reportedCPU, ns.adjustedCPU, ss.loadSeqNum, + ) + } + return buf.String() + + case "get-pending-changes": + return printPendingChanges(testingGetPendingChanges(cs)) + + case "set-store": + for _, next := range strings.Split(d.Input, "\n") { + desc := parseStoreDescriptor(t, next) + cs.setStore(desc) + } + return printNodeListMeta() + + case "set-store-membership": + var storeID int + d.ScanArgs(t, "store-id", &storeID) + var storeMembershipString string + d.ScanArgs(t, "membership", &storeMembershipString) + var storeMembership storeMembership + switch storeMembershipString { + case "member": + storeMembership = storeMembershipMember + case "removing": + storeMembership = storeMembershipRemoving + case "removed": + storeMembership = storeMembershipRemoved + } + cs.setStoreMembership(roachpb.StoreID(storeID), storeMembership) + + var buf strings.Builder + nonRemovedStores, removedStores := testingGetStoreList(cs) + buf.WriteString("member store-ids: ") + printPostingList(&buf, nonRemovedStores) + buf.WriteString("\nremoved store-ids: ") + printPostingList(&buf, removedStores) + return buf.String() + + case "update-failure-detection": + var nodeID int + var failureDetectionString string + d.ScanArgs(t, "node-id", &nodeID) + d.ScanArgs(t, "summary", &failureDetectionString) + var fd failureDetectionSummary + for i := fdOK; i < fdDead+1; i++ { + if i.String() == failureDetectionString { + fd = i + break + } + } + cs.updateFailureDetectionSummary(roachpb.NodeID(nodeID), fd) + return printNodeListMeta() + + case "node-load-msg": + msg := parseNodeLoadMsg(t, d.Input) + cs.processNodeLoadMsg(&msg) + return "" + + case "store-leaseholder-msg": + msg := parseStoreLeaseholderMsg(t, d.Input) + cs.processStoreLeaseholderMsg(&msg) + return "" + + case "make-pending-changes": + var rid int + var changes []replicaChange + d.ScanArgs(t, "range-id", &rid) + rangeID := roachpb.RangeID(rid) + rState := cs.ranges[rangeID] + lines := strings.Split(d.Input, "\n") + for _, line := range lines { + parts := strings.Split(strings.TrimSpace(line), ":") + switch parts[0] { + case "transfer-lease": + add, remove, _, load := parseChangeAddRemove(t, parts[1]) + rState.rangeID = rangeID + transferChanges := makeLeaseTransferChanges(rState, load, add, remove) + changes = append(changes, transferChanges[:]...) + case "add-replica": + add, _, replType, load := parseChangeAddRemove(t, parts[1]) + addChanges := makeAddReplicaChange(rangeID, load, add, replType) + changes = append(changes, addChanges) + case "remove-replica": + _, remove, _, load := parseChangeAddRemove(t, parts[1]) + var removeRepl storeIDAndReplicaState + for _, replica := range rState.replicas { + if replica.StoreID == remove { + removeRepl = replica + } + } + changes = append(changes, makeRemoveReplicaChange(rangeID, load, removeRepl)) + case "rebalance-replica": + add, remove, _, load := parseChangeAddRemove(t, parts[1]) + rebalanceChanges := makeRebalanceReplicaChanges(rState, load, add, remove) + changes = append(changes, rebalanceChanges[:]...) + } + } + cs.makePendingChanges(rangeID, changes) + return printPendingChanges(testingGetPendingChanges(cs)) + + case "gc-pending-changes": + cs.gcPendingChanges(cs.ts.Now()) + return printPendingChanges(testingGetPendingChanges(cs)) + + case "reject-pending-changes": + return "OK" + + case "tick": + var seconds int + d.ScanArgs(t, "seconds", &seconds) + ts.Advance(time.Second * time.Duration(seconds)) + return fmt.Sprintf("clock=%ds", ts.Now().UnixNano()/int64(time.Second)) + + default: + panic(fmt.Sprintf("unknown command: %s", d.Cmd)) + } + }, + ) + }) +} diff --git a/pkg/kv/kvserver/allocator/mma/constraint_matcher_test.go b/pkg/kv/kvserver/allocator/mma/constraint_matcher_test.go index 111463c131f9..96766f2d68fa 100644 --- a/pkg/kv/kvserver/allocator/mma/constraint_matcher_test.go +++ b/pkg/kv/kvserver/allocator/mma/constraint_matcher_test.go @@ -16,30 +16,28 @@ import ( "github.com/stretchr/testify/require" ) -func parseStoreDescriptor(t *testing.T, d *datadriven.TestData) roachpb.StoreDescriptor { +func parseStoreDescriptor(t *testing.T, in string) roachpb.StoreDescriptor { var desc roachpb.StoreDescriptor - var storeID int - d.ScanArgs(t, "store-id", &storeID) - desc.StoreID = roachpb.StoreID(storeID) - var nodeID int - if d.HasArg("node-id") { - d.ScanArgs(t, "node-id", &nodeID) - desc.Node.NodeID = roachpb.NodeID(nodeID) - } - var attrs string - d.ScanArgs(t, "attrs", &attrs) - for _, v := range strings.Split(attrs, ",") { - v = strings.TrimSpace(v) - desc.Attrs.Attrs = append(desc.Attrs.Attrs, v) - } - var lts string - d.ScanArgs(t, "locality-tiers", <s) - for _, v := range strings.Split(lts, ",") { - v = strings.TrimSpace(v) - kv := strings.Split(v, "=") - require.Equal(t, 2, len(kv)) - desc.Node.Locality.Tiers = append( - desc.Node.Locality.Tiers, roachpb.Tier{Key: kv[0], Value: kv[1]}) + for _, v := range strings.Split(in, " ") { + parts := strings.SplitN(v, "=", 2) + switch parts[0] { + case "store-id": + desc.StoreID = roachpb.StoreID(parseInt(t, parts[1])) + case "node-id": + desc.Node.NodeID = roachpb.NodeID(parseInt(t, parts[1])) + case "attrs": + desc.Attrs.Attrs = append( + desc.Attrs.Attrs, + strings.Split(parts[1], ",")..., + ) + case "locality-tiers": + for _, lt := range strings.Split(parts[1], ",") { + kv := strings.Split(lt, "=") + require.Equal(t, 2, len(kv)) + desc.Node.Locality.Tiers = append( + desc.Node.Locality.Tiers, roachpb.Tier{Key: kv[0], Value: kv[1]}) + } + } } return desc } @@ -84,7 +82,7 @@ func TestConstraintMatcher(t *testing.T) { switch d.Cmd { case "store": - desc := parseStoreDescriptor(t, d) + desc := parseStoreDescriptor(t, d.Input) cm.setStore(desc) var b strings.Builder printMatcher(&b) diff --git a/pkg/kv/kvserver/allocator/mma/constraint_test.go b/pkg/kv/kvserver/allocator/mma/constraint_test.go index 9345e737e4b8..05eba3fd0862 100644 --- a/pkg/kv/kvserver/allocator/mma/constraint_test.go +++ b/pkg/kv/kvserver/allocator/mma/constraint_test.go @@ -352,7 +352,7 @@ func TestRangeAnalyzedConstraints(t *testing.T) { func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "store": - desc := parseStoreDescriptor(t, d) + desc := parseStoreDescriptor(t, d.Input) cm.setStore(desc) stores[desc.StoreID] = desc return "" diff --git a/pkg/kv/kvserver/allocator/mma/load.go b/pkg/kv/kvserver/allocator/mma/load.go index 80428eb5685e..af98b03f7af2 100644 --- a/pkg/kv/kvserver/allocator/mma/load.go +++ b/pkg/kv/kvserver/allocator/mma/load.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/redact" ) // Misc helper classes for working with range, store and node load. @@ -37,6 +38,15 @@ type loadValue int64 // dimension. type loadVector [numLoadDimensions]loadValue +func (lv loadVector) String() string { + return redact.StringWithoutMarkers(lv) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (lv loadVector) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("[%d,%d,%d]", lv[cpu], lv[writeBandwidth], lv[byteSize]) +} + func (lv *loadVector) add(other loadVector) { for i := range other { (*lv)[i] += other[i] diff --git a/pkg/kv/kvserver/allocator/mma/load_test.go b/pkg/kv/kvserver/allocator/mma/load_test.go index 55bf0d940ca0..d3df80225e87 100644 --- a/pkg/kv/kvserver/allocator/mma/load_test.go +++ b/pkg/kv/kvserver/allocator/mma/load_test.go @@ -59,7 +59,7 @@ func TestMeansMemo(t *testing.T) { func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "store": - desc := parseStoreDescriptor(t, d) + desc := parseStoreDescriptor(t, d.Input) cm.setStore(desc) storeMap[desc.StoreID] = desc return "" diff --git a/pkg/kv/kvserver/allocator/mma/messages.go b/pkg/kv/kvserver/allocator/mma/messages.go index 38dea64776f2..b20d8f88aa05 100644 --- a/pkg/kv/kvserver/allocator/mma/messages.go +++ b/pkg/kv/kvserver/allocator/mma/messages.go @@ -60,8 +60,6 @@ type storeLeaseholderMsg struct { // iteration. type rangeMsg struct { roachpb.RangeID - start roachpb.Key - end roachpb.Key replicas []storeIDAndReplicaState conf roachpb.SpanConfig rangeLoad rangeLoad @@ -81,8 +79,6 @@ var _ = storeLoadMsg{}.secondaryLoad var _ = storeLeaseholderMsg{}.StoreID var _ = storeLeaseholderMsg{}.ranges var _ = rangeMsg{}.RangeID -var _ = rangeMsg{}.start -var _ = rangeMsg{}.end var _ = rangeMsg{}.replicas var _ = rangeMsg{}.conf var _ = rangeMsg{}.rangeLoad diff --git a/pkg/kv/kvserver/allocator/mma/testdata/cluster_state/simple b/pkg/kv/kvserver/allocator/mma/testdata/cluster_state/simple new file mode 100644 index 000000000000..018fc45f1b8d --- /dev/null +++ b/pkg/kv/kvserver/allocator/mma/testdata/cluster_state/simple @@ -0,0 +1,97 @@ +set-store + store-id=1 node-id=1 attrs=purple locality-tiers=region=us-west-1,zone=us-west-1a + store-id=2 node-id=2 attrs=yellow locality-tiers=region=us-east-1,zone=us-east-1a + store-id=3 node-id=3 attrs=yellow locality-tiers=region=eu-west-1,zone=eu-west-1a +---- +node-id=1 failure-summary=ok locality-tiers=region=us-west-1,zone=us-west-1a,node=1 + store-id=1 membership=full attrs=purple locality-code=1:2:3: +node-id=2 failure-summary=ok locality-tiers=region=us-east-1,zone=us-east-1a,node=2 + store-id=2 membership=full attrs=yellow locality-code=4:5:6: +node-id=3 failure-summary=ok locality-tiers=region=eu-west-1,zone=eu-west-1a,node=3 + store-id=3 membership=full attrs=yellow locality-code=7:8:9: + +set-store-membership store-id=1 membership=member +---- +member store-ids: 1, 2, 3 +removed store-ids: + +set-store-membership store-id=2 membership=member +---- +member store-ids: 1, 2, 3 +removed store-ids: + +node-load-msg +node-id=1 cpu-load=80 cpu-capacity=100 load-time=1 + store-id=1 load=[0,80,80] capacity=[-1,100,100] secondary-load=1 +---- + +get-load-info +---- +store-id=2 reported=[0,0,0] adjusted=[0,0,0] node-reported-cpu=0 node-adjusted-cpu=0 seq=0 +store-id=3 reported=[0,0,0] adjusted=[0,0,0] node-reported-cpu=0 node-adjusted-cpu=0 seq=0 +store-id=1 reported=[0,80,80] adjusted=[0,80,80] node-reported-cpu=80 node-adjusted-cpu=80 seq=1 + +store-leaseholder-msg +store-id=1 + range-id=1 load=[0,40,80] raft-cpu=20 + config=num_replicas=3 constraints={'+region=us-west-1:1'} voter_constraints={'+region=us-west-1:1'} + store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true + store-id=2 replica-id=2 type=VOTER_FULL leaseholder=false + store-id=3 replica-id=3 type=VOTER_FULL leaseholder=false + range-id=2 load=[0,40,40] raft-cpu=20 config=(num_replicas=3 constraints={'+region=us-west-1:1'} voter_constraints={'+region=us-west-1:1'}) + store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true + store-id=2 replica-id=2 type=VOTER_FULL leaseholder=false + store-id=3 replica-id=3 type=VOTER_FULL leaseholder=false +---- + +ranges +---- +range-id=1 + store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true + store-id=2 replica-id=2 type=VOTER_FULL leaseholder=false + store-id=3 replica-id=3 type=VOTER_FULL leaseholder=false +range-id=2 + store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true + store-id=2 replica-id=2 type=VOTER_FULL leaseholder=false + store-id=3 replica-id=3 type=VOTER_FULL leaseholder=false + +store-leaseholder-msg +store-id=1 + range-id=1 load=[0,40,80] raft-cpu=20 + config=num_replicas=3 constraints={'+region=us-west-1:1'} voter_constraints={'+region=us-west-1:1'} + store-id=1 replica-id=1 type=VOTER_FULL leaseholder=false + store-id=2 replica-id=2 type=VOTER_FULL leaseholder=false + store-id=3 replica-id=3 type=VOTER_FULL leaseholder=true + range-id=2 load=[0,40,40] raft-cpu=20 config=(num_replicas=3 constraints={'+region=us-west-1:1'} voter_constraints={'+region=us-west-1:1'}) + store-id=1 replica-id=1 type=VOTER_FULL leaseholder=false + store-id=2 replica-id=2 type=VOTER_FULL leaseholder=false + store-id=3 replica-id=3 type=VOTER_FULL leaseholder=true +---- + +ranges +---- +range-id=1 + store-id=1 replica-id=1 type=VOTER_FULL leaseholder=false + store-id=2 replica-id=2 type=VOTER_FULL leaseholder=false + store-id=3 replica-id=3 type=VOTER_FULL leaseholder=true +range-id=2 + store-id=1 replica-id=1 type=VOTER_FULL leaseholder=false + store-id=2 replica-id=2 type=VOTER_FULL leaseholder=false + store-id=3 replica-id=3 type=VOTER_FULL leaseholder=true + +set-store + store-id=4 node-id=4 attrs=blue locality-tiers=region=eu-west-1,zone=eu-west-1a +---- +node-id=1 failure-summary=ok locality-tiers=region=us-west-1,zone=us-west-1a,node=1 + store-id=1 membership=full attrs=purple locality-code=1:2:3: +node-id=2 failure-summary=ok locality-tiers=region=us-east-1,zone=us-east-1a,node=2 + store-id=2 membership=full attrs=yellow locality-code=4:5:6: +node-id=3 failure-summary=ok locality-tiers=region=eu-west-1,zone=eu-west-1a,node=3 + store-id=3 membership=full attrs=yellow locality-code=7:8:9: +node-id=4 failure-summary=ok locality-tiers=region=eu-west-1,zone=eu-west-1a,node=4 + store-id=4 membership=full attrs=blue locality-code=7:8:12: + +make-pending-changes range-id=1 + add-replica: add-store=4 load=[80,80,80] cpu-raft=40 +---- +pending(0) diff --git a/pkg/kv/kvserver/allocator/mma/testdata/pending_changes b/pkg/kv/kvserver/allocator/mma/testdata/pending_changes new file mode 100644 index 000000000000..92ed4a9a1ed6 --- /dev/null +++ b/pkg/kv/kvserver/allocator/mma/testdata/pending_changes @@ -0,0 +1,338 @@ +set-store +store-id=1 node-id=1 attrs=purple locality-tiers=region=us-west-1,zone=us-west-1a +store-id=2 node-id=2 attrs=yellow locality-tiers=region=us-east-1,zone=us-east-1a +---- +node-id=1 failure-summary=ok locality-tiers=region=us-west-1,zone=us-west-1a,node=1 + store-id=1 init=full attrs=purple locality-code=1:2:3: +node-id=2 failure-summary=ok locality-tiers=region=us-east-1,zone=us-east-1a,node=2 + store-id=2 init=full attrs=yellow locality-code=4:5:6: + +msg last-seq=-1 cur-seq=-1 +node-id=1 cpu-load=80 cpu-capacity=100 + store: store-id=1 load=(80,80,80) capacity=(-1,100,100) secondary-load=1 + range: range-id=1 lease=true type=VOTER_FULL + replica: store-id=1 replica-id=1 type=VOTER_FULL +---- +OK + +msg last-seq=-1 cur-seq=-1 +node-id=2 cpu-load=0 cpu-capacity=100 + store: store-id=2 load=(0,0,0) capacity=(-1,100,100) secondary-load=0 +---- +OK + +make-pending-changes range-id=1 +add-replica: add-store=2 load=(80,80,80) cpu-raft=40 +---- +pending(1) +change-id=1 start=0s store-id=2 range-id=1 delta=[40 80 80] + prev=(replica-id=none lease=false type=VOTER_FULL) + next=(replica-id=unknown lease=false type=VOTER_FULL) + +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[80 80 80] node-reported-cpu=80 node-adjusted-cpu=80 seq=1 +store-id=2 reported=[0 0 0] adjusted=[40 80 80] node-reported-cpu=0 node-adjusted-cpu=40 seq=2 + +tick seconds=10 +---- +clock=10s + +msg last-seq=-1 cur-seq=-1 +node-id=1 cpu-load=80 cpu-capacity=100 + store: store-id=1 load=(80,80,80) capacity=(-1,100,100) secondary-load=1 + range: range-id=1 lease=true type=VOTER_FULL + replica: store-id=1 replica-id=1 type=VOTER_FULL + replica: store-id=2 replica-id=2 type=VOTER_FULL +---- +OK + +# Check the pending changes on each store, we expect there to be an enacted +# change on store 2. The change should also have been removed from the cluster +# and range tracking. +get-pending-changes +---- +pending(1) +change-id=1 start=0s store-id=2 range-id=1 delta=[40 80 80] enacted=10s + prev=(replica-id=none lease=false type=VOTER_FULL) + next=(replica-id=unknown lease=false type=VOTER_FULL) + +# The load deltas should still be applied to the adjusted store and node state +# for s2. Enacting a change should not undo its adjustments. +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[80 80 80] node-reported-cpu=80 node-adjusted-cpu=80 seq=2 +store-id=2 reported=[0 0 0] adjusted=[40 80 80] node-reported-cpu=0 node-adjusted-cpu=40 seq=2 + +# Now send the load update from s2 which reports the change as complete. +msg last-seq=-1 cur-seq=-1 +node-id=2 cpu-load=40 cpu-capacity=100 + store: store-id=2 load=(40,80,80) capacity=(-1,100,100) secondary-load=0 + range: range-id=1 lease=false type=VOTER_FULL +---- +OK + +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[80 80 80] node-reported-cpu=80 node-adjusted-cpu=80 seq=2 +store-id=2 reported=[40 80 80] adjusted=[40 80 80] node-reported-cpu=40 node-adjusted-cpu=40 seq=3 + +# Expect no changes tracked in either the store or cluster tracker. +get-pending-changes +---- +pending(0) + +# Next enqueue a lease transfer with the same range load as the replica add. +make-pending-changes range-id=1 +transfer-lease: remove-store=1 add-store=2 load=(80,80,80) cpu-raft=40 +---- +pending(2) +change-id=2 start=10s store-id=1 range-id=1 delta=[-40 0 0] + prev=(replica-id=1 lease=true type=VOTER_FULL) + next=(replica-id=1 lease=false type=VOTER_FULL) +change-id=3 start=10s store-id=2 range-id=1 delta=[40 0 0] + prev=(replica-id=2 lease=false type=VOTER_FULL) + next=(replica-id=2 lease=true type=VOTER_FULL) + +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[40 80 80] node-reported-cpu=80 node-adjusted-cpu=40 seq=3 +store-id=2 reported=[40 80 80] adjusted=[80 80 80] node-reported-cpu=40 node-adjusted-cpu=80 seq=4 + +# Bump the clock with the GC duration. The pending changes should be GC'd and +# the reported load equal to the adjusted load. +tick seconds=300 +---- +clock=310s + +gc-pending-changes +---- +pending(0) + +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[80 80 80] node-reported-cpu=80 node-adjusted-cpu=80 seq=4 +store-id=2 reported=[40 80 80] adjusted=[40 80 80] node-reported-cpu=40 node-adjusted-cpu=40 seq=5 + +# Perform a replica removal without GC'ing. This time, instead of first +# sending a lease store message to mark the change as enacted; send a load +# message which should drop the load adjustments but leave the change as +# pending in the cluster and range state. +make-pending-changes range-id=1 +remove-replica: remove-store=2 load=(80,80,80) cpu-raft=40 +---- +pending(1) +change-id=4 start=310s store-id=2 range-id=1 delta=[-40 -80 -80] + prev=(replica-id=2 lease=false type=VOTER_FULL) + next=(replica-id=none lease=false type=VOTER_FULL) + +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[80 80 80] node-reported-cpu=80 node-adjusted-cpu=80 seq=4 +store-id=2 reported=[40 80 80] adjusted=[0 0 0] node-reported-cpu=40 node-adjusted-cpu=0 seq=6 + +msg last-seq=-1 cur-seq=-1 +node-id=2 cpu-load=0 cpu-capacity=100 + store: store-id=2 load=(0,0,0) capacity=(-1,100,100) secondary-load=0 +---- +OK + +get-pending-changes +---- +pending(1) +change-id=4 start=310s store-id=2 range-id=1 delta=[-40 -80 -80] + prev=(replica-id=2 lease=false type=VOTER_FULL) + next=(replica-id=none lease=false type=VOTER_FULL) + +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[80 80 80] node-reported-cpu=80 node-adjusted-cpu=80 seq=4 +store-id=2 reported=[0 0 0] adjusted=[0 0 0] node-reported-cpu=0 node-adjusted-cpu=0 seq=7 + +# Send a lease message to mark the change as enacted and clean it up from the +# cluster tracking. +msg last-seq=-1 cur-seq=-1 +node-id=1 cpu-load=80 cpu-capacity=100 + store: store-id=1 load=(80,80,80) capacity=(-1,100,100) secondary-load=1 + range: range-id=1 lease=true type=VOTER_FULL + replica: store-id=1 replica-id=1 type=VOTER_FULL +---- +OK + +get-pending-changes +---- +pending(0) + +# Add a new store (s3) and send a leaseholder message from the store indicating +# it is the new leaseholder for r1. +set-store +store-id=3 node-id=3 attrs=green locality-tiers=region=us-west-1,zone=us-west-1a +---- +node-id=1 failure-summary=ok locality-tiers=region=us-west-1,zone=us-west-1a,node=1 + store-id=1 init=full attrs=purple locality-code=1:2:3: +node-id=2 failure-summary=ok locality-tiers=region=us-east-1,zone=us-east-1a,node=2 + store-id=2 init=full attrs=yellow locality-code=4:5:6: +node-id=3 failure-summary=ok locality-tiers=region=us-west-1,zone=us-west-1a,node=3 + store-id=3 init=full attrs=green locality-code=1:2:7: + +msg last-seq=-1 cur-seq=-1 +node-id=3 cpu-load=80 cpu-capacity=100 + store: store-id=3 load=(80,80,80) capacity=(-1,100,100) secondary-load=1 + range: range-id=1 lease=true type=VOTER_FULL + replica: store-id=1 replica-id=1 type=VOTER_FULL + replica: store-id=3 replica-id=3 type=VOTER_FULL +---- +OK + +ranges +---- +range-id=1 + store-id=1 replica-id=1 lease=false type=VOTER_FULL + store-id=3 replica-id=3 lease=true type=VOTER_FULL + +# Make a pending change to rebalance the replica from s1 to s3. +make-pending-changes range-id=1 +rebalance-replica: add-store=2 remove-store=1 load=(80,80,80) cpu-raft=40 +---- +pending(2) +change-id=5 start=310s store-id=2 range-id=1 delta=[40 80 80] + prev=(replica-id=none lease=false type=VOTER_FULL) + next=(replica-id=unknown lease=false type=VOTER_FULL) +change-id=6 start=310s store-id=1 range-id=1 delta=[-40 -80 -80] + prev=(replica-id=1 lease=false type=VOTER_FULL) + next=(replica-id=none lease=false type=VOTER_FULL) + +ranges +---- +range-id=1 + store-id=2 replica-id=unknown lease=false type=VOTER_FULL + store-id=3 replica-id=3 lease=true type=VOTER_FULL + +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[40 0 0] node-reported-cpu=80 node-adjusted-cpu=40 seq=6 +store-id=2 reported=[0 0 0] adjusted=[40 80 80] node-reported-cpu=0 node-adjusted-cpu=40 seq=8 +store-id=3 reported=[80 80 80] adjusted=[80 80 80] node-reported-cpu=80 node-adjusted-cpu=80 seq=1 + +# Mark the change as enacted by sending a lease range update from s3. +msg last-seq=-1 cur-seq=-1 +node-id=3 cpu-load=80 cpu-capacity=100 + store: store-id=3 load=(80,80,80) capacity=(-1,100,100) secondary-load=1 + range: range-id=1 lease=true type=VOTER_FULL + replica: store-id=2 replica-id=2 type=VOTER_FULL + replica: store-id=3 replica-id=3 type=VOTER_FULL +---- +OK + +# The change should still be present on each store for load adjustments. +get-pending-changes +---- +pending(2) +change-id=6 start=310s store-id=1 range-id=1 delta=[-40 -80 -80] enacted=310s + prev=(replica-id=1 lease=false type=VOTER_FULL) + next=(replica-id=none lease=false type=VOTER_FULL) +change-id=5 start=310s store-id=2 range-id=1 delta=[40 80 80] enacted=310s + prev=(replica-id=none lease=false type=VOTER_FULL) + next=(replica-id=unknown lease=false type=VOTER_FULL) + +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[40 0 0] node-reported-cpu=80 node-adjusted-cpu=40 seq=6 +store-id=2 reported=[0 0 0] adjusted=[40 80 80] node-reported-cpu=0 node-adjusted-cpu=40 seq=8 +store-id=3 reported=[80 80 80] adjusted=[80 80 80] node-reported-cpu=80 node-adjusted-cpu=80 seq=2 + +# Bump the clock forward to GC the enacted changes. The involved stores (s1,s2) haven't +# had any load updates to remove these yet - so the adjustments are still +# applied, shown above. +tick seconds=15 +---- +clock=325s + +gc-pending-changes +---- +pending(2) +change-id=6 start=310s store-id=1 range-id=1 delta=[-40 -80 -80] enacted=310s + prev=(replica-id=1 lease=false type=VOTER_FULL) + next=(replica-id=none lease=false type=VOTER_FULL) +change-id=5 start=310s store-id=2 range-id=1 delta=[40 80 80] enacted=310s + prev=(replica-id=none lease=false type=VOTER_FULL) + next=(replica-id=unknown lease=false type=VOTER_FULL) + +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[40 0 0] node-reported-cpu=80 node-adjusted-cpu=40 seq=6 +store-id=2 reported=[0 0 0] adjusted=[40 80 80] node-reported-cpu=0 node-adjusted-cpu=40 seq=8 +store-id=3 reported=[80 80 80] adjusted=[80 80 80] node-reported-cpu=80 node-adjusted-cpu=80 seq=2 + +ranges +---- +range-id=1 + store-id=2 replica-id=2 lease=false type=VOTER_FULL + store-id=3 replica-id=3 lease=true type=VOTER_FULL + +# Add a change which we then mark as rejected by the enacting module, the +# change should be wiped. +make-pending-changes range-id=1 +rebalance-replica: add-store=1 remove-store=2 load=(80,80,80) cpu-raft=40 +---- +pending(4) +change-id=6 start=310s store-id=1 range-id=1 delta=[-40 -80 -80] enacted=310s + prev=(replica-id=1 lease=false type=VOTER_FULL) + next=(replica-id=none lease=false type=VOTER_FULL) +change-id=5 start=310s store-id=2 range-id=1 delta=[40 80 80] enacted=310s + prev=(replica-id=none lease=false type=VOTER_FULL) + next=(replica-id=unknown lease=false type=VOTER_FULL) +change-id=7 start=325s store-id=1 range-id=1 delta=[40 80 80] + prev=(replica-id=none lease=false type=VOTER_FULL) + next=(replica-id=unknown lease=false type=VOTER_FULL) +change-id=8 start=325s store-id=2 range-id=1 delta=[-40 -80 -80] + prev=(replica-id=2 lease=false type=VOTER_FULL) + next=(replica-id=none lease=false type=VOTER_FULL) + +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[80 80 80] node-reported-cpu=80 node-adjusted-cpu=80 seq=7 +store-id=2 reported=[0 0 0] adjusted=[0 0 0] node-reported-cpu=0 node-adjusted-cpu=0 seq=9 +store-id=3 reported=[80 80 80] adjusted=[80 80 80] node-reported-cpu=80 node-adjusted-cpu=80 seq=2 + +reject-pending-changes +change-ids=(7,8) +---- +pending(2) +change-id=6 start=310s store-id=1 range-id=1 delta=[-40 -80 -80] enacted=310s + prev=(replica-id=1 lease=false type=VOTER_FULL) + next=(replica-id=none lease=false type=VOTER_FULL) +change-id=5 start=310s store-id=2 range-id=1 delta=[40 80 80] enacted=310s + prev=(replica-id=none lease=false type=VOTER_FULL) + next=(replica-id=unknown lease=false type=VOTER_FULL) + +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[40 0 0] node-reported-cpu=80 node-adjusted-cpu=40 seq=8 +store-id=2 reported=[0 0 0] adjusted=[40 80 80] node-reported-cpu=0 node-adjusted-cpu=40 seq=10 +store-id=3 reported=[80 80 80] adjusted=[80 80 80] node-reported-cpu=80 node-adjusted-cpu=80 seq=2 + +# Rebalance the leaseholder replica from store 3 to store 1. The load +# adjustment should include the full load of the range. +make-pending-changes range-id=1 +rebalance-replica: add-store=1 remove-store=3 load=(80,80,80) cpu-raft=40 +---- +pending(4) +change-id=6 start=310s store-id=1 range-id=1 delta=[-40 -80 -80] enacted=310s + prev=(replica-id=1 lease=false type=VOTER_FULL) + next=(replica-id=none lease=false type=VOTER_FULL) +change-id=5 start=310s store-id=2 range-id=1 delta=[40 80 80] enacted=310s + prev=(replica-id=none lease=false type=VOTER_FULL) + next=(replica-id=unknown lease=false type=VOTER_FULL) +change-id=9 start=325s store-id=1 range-id=1 delta=[80 80 80] + prev=(replica-id=none lease=false type=VOTER_FULL) + next=(replica-id=unknown lease=true type=VOTER_FULL) +change-id=10 start=325s store-id=3 range-id=1 delta=[-80 -80 -80] + prev=(replica-id=3 lease=true type=VOTER_FULL) + next=(replica-id=none lease=false type=VOTER_FULL) + +get-load-info +---- +store-id=1 reported=[80 80 80] adjusted=[120 80 80] node-reported-cpu=80 node-adjusted-cpu=120 seq=9 +store-id=2 reported=[0 0 0] adjusted=[40 80 80] node-reported-cpu=0 node-adjusted-cpu=40 seq=10 +store-id=3 reported=[80 80 80] adjusted=[0 0 0] node-reported-cpu=80 node-adjusted-cpu=0 seq=3