Skip to content

Commit

Permalink
Added UserReplicaGroupMetrics (#6463)
Browse files Browse the repository at this point in the history
* Added UserReplicaGroupMetrics

Signed-off-by: 7h3-3mp7y-m4n <[email protected]>

* Adding the changed in ha_tracker

Signed-off-by: 7h3-3mp7y-m4n <[email protected]>

* Added testcase and minor changes

Signed-off-by: 7h3-3mp7y-m4n <[email protected]>

* Added changes to test

Signed-off-by: 7h3-3mp7y-m4n <[email protected]>

---------

Signed-off-by: 7h3-3mp7y-m4n <[email protected]>
  • Loading branch information
7h3-3mp7y-m4n authored Jan 7, 2025
1 parent c2c4827 commit 7ec57ac
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
39 changes: 38 additions & 1 deletion pkg/ha/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
"github.com/cortexproject/cortex/pkg/util/services"
)

const (
userReplicaGroupUpdateInterval = 30 * time.Second
)

var (
errNegativeUpdateTimeoutJitterMax = errors.New("HA tracker max update timeout jitter shouldn't be negative")
errInvalidFailoverTimeout = "HA Tracker failover timeout (%v) must be at least 1s greater than update timeout - max jitter (%v)"
Expand Down Expand Up @@ -137,6 +141,7 @@ type HATracker struct {
electedReplicaTimestamp *prometheus.GaugeVec
electedReplicaPropagationTime prometheus.Histogram
kvCASCalls *prometheus.CounterVec
userReplicaGroupCount *prometheus.GaugeVec

cleanupRuns prometheus.Counter
replicasMarkedForDeletion prometheus.Counter
Expand Down Expand Up @@ -182,6 +187,11 @@ func NewHATracker(cfg HATrackerConfig, limits HATrackerLimits, trackerStatusConf
Help: "The total number of CAS calls to the KV store for a user ID/cluster.",
}, []string{"user", "cluster"}),

userReplicaGroupCount: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ha_tracker_user_replica_group_count",
Help: "Number of HA replica groups tracked for each user.",
}, []string{"user"}),

cleanupRuns: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "ha_tracker_replicas_cleanup_started_total",
Help: "Number of elected replicas cleanup loops started.",
Expand Down Expand Up @@ -227,11 +237,26 @@ func (c *HATracker) loop(ctx context.Context) error {

// Start cleanup loop. It will stop when context is done.
wg := sync.WaitGroup{}
wg.Add(1)
wg.Add(2)
go func() {
defer wg.Done()
c.cleanupOldReplicasLoop(ctx)
}()
// Start periodic update of user replica group count.
go func() {
defer wg.Done()
ticker := time.NewTicker(userReplicaGroupUpdateInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
c.updateUserReplicaGroupCount()
case <-ctx.Done():
return
}
}
}()

// The KVStore config we gave when creating c should have contained a prefix,
// which would have given us a prefixed KVStore client. So, we can pass empty string here.
Expand Down Expand Up @@ -504,6 +529,9 @@ func (c *HATracker) CleanupHATrackerMetricsForUser(userID string) {
if err := util.DeleteMatchingLabels(c.kvCASCalls, filter); err != nil {
level.Warn(c.logger).Log("msg", "failed to remove cortex_ha_tracker_kv_store_cas_total metric for user", "user", userID, "err", err)
}
if err := util.DeleteMatchingLabels(c.userReplicaGroupCount, filter); err != nil {
level.Warn(c.logger).Log("msg", "failed to remove cortex_ha_tracker_user_replica_group_count metric for user", "user", userID, "err", err)
}
}

// Returns a snapshot of the currently elected replicas. Useful for status display
Expand All @@ -521,3 +549,12 @@ func (c *HATracker) SnapshotElectedReplicas() map[string]ReplicaDesc {
}
return electedCopy
}

func (t *HATracker) updateUserReplicaGroupCount() {
t.electedLock.RLock()
defer t.electedLock.RUnlock()

for user, groups := range t.replicaGroups {
t.userReplicaGroupCount.WithLabelValues(user).Set(float64(len(groups)))
}
}
6 changes: 6 additions & 0 deletions pkg/ha/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ func TestHATracker_MetricsCleanup(t *testing.T) {
"cortex_ha_tracker_elected_replica_changes_total",
"cortex_ha_tracker_elected_replica_timestamp_seconds",
"cortex_ha_tracker_kv_store_cas_total",
"cortex_ha_tracker_user_replica_group_count",
}

tr.electedReplicaChanges.WithLabelValues("userA", "replicaGroup1").Add(5)
Expand All @@ -640,6 +641,7 @@ func TestHATracker_MetricsCleanup(t *testing.T) {
tr.kvCASCalls.WithLabelValues("userA", "replicaGroup1").Add(5)
tr.kvCASCalls.WithLabelValues("userA", "replicaGroup2").Add(8)
tr.kvCASCalls.WithLabelValues("userB", "replicaGroup").Add(10)
tr.userReplicaGroupCount.WithLabelValues("userA").Add(5)

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ha_tracker_elected_replica_changes_total The total number of times the elected replica has changed for a user ID/cluster.
Expand All @@ -659,6 +661,10 @@ func TestHATracker_MetricsCleanup(t *testing.T) {
cortex_ha_tracker_kv_store_cas_total{cluster="replicaGroup",user="userB"} 10
cortex_ha_tracker_kv_store_cas_total{cluster="replicaGroup1",user="userA"} 5
cortex_ha_tracker_kv_store_cas_total{cluster="replicaGroup2",user="userA"} 8
# HELP cortex_ha_tracker_user_replica_group_count Number of HA replica groups tracked for each user.
# TYPE cortex_ha_tracker_user_replica_group_count gauge
cortex_ha_tracker_user_replica_group_count{user="userA"} 5
`), metrics...))

tr.CleanupHATrackerMetricsForUser("userA")
Expand Down

0 comments on commit 7ec57ac

Please sign in to comment.