Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add flag to skip legacy duplicate telemetry #4

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# UNRELEASED

IMPROVEMENETS

* Added a flag to skip legacy duplicate telemetry. [GH-630](https://github.com/hashicorp/raft/pull/630)

# 1.7.0 (June 5th, 2024)

CHANGES
Expand Down
15 changes: 12 additions & 3 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ type Raft struct {
// preVoteDisabled control if the pre-vote feature is activated,
// prevote feature is disabled if set to true.
preVoteDisabled bool

// noLegacyTelemetry allows to skip the legacy metrics to avoid duplicates.
// legacy metrics are those that have `_peer_name` as metric suffix instead as labels.
// e.g: raft_replication_heartbeat_peer0
noLegacyTelemetry bool
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand All @@ -232,7 +237,8 @@ type Raft struct {
// listing just itself as a Voter, then invoke AddVoter() on it to add other
// servers to the cluster.
func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport, configuration Configuration) error {
snaps SnapshotStore, trans Transport, configuration Configuration,
) error {
// Validate the Raft server config.
if err := ValidateConfig(conf); err != nil {
return err
Expand Down Expand Up @@ -305,7 +311,8 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
// the sole voter, and then join up other new clean-state peer servers using
// the usual APIs in order to bring the cluster back into a known state.
func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport, configuration Configuration) error {
snaps SnapshotStore, trans Transport, configuration Configuration,
) error {
// Validate the Raft server config.
if err := ValidateConfig(conf); err != nil {
return err
Expand Down Expand Up @@ -436,7 +443,8 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
// without starting a Raft instance or connecting to the cluster. This function
// has identical behavior to Raft.GetConfiguration.
func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport) (Configuration, error) {
snaps SnapshotStore, trans Transport,
) (Configuration, error) {
conf.skipStartup = true
r, err := NewRaft(conf, fsm, logs, stable, snaps, trans)
if err != nil {
Expand Down Expand Up @@ -566,6 +574,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
followerNotifyCh: make(chan struct{}, 1),
mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second),
preVoteDisabled: conf.PreVoteDisabled || !transportSupportPreVote,
noLegacyTelemetry: conf.NoLegacyTelemetry,
}
if !transportSupportPreVote && !conf.PreVoteDisabled {
r.logger.Warn("pre-vote is disabled because it is not supported by the Transport")
Expand Down
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ type Config struct {
// PreVoteDisabled deactivate the pre-vote feature when set to true
PreVoteDisabled bool

// NoLegacyTelemetry allows to skip the legacy metrics to avoid duplicates.
// legacy metrics are those that have `_peer_name` as metric suffix instead as labels.
// e.g: raft_replication_heartbeat_peer0
NoLegacyTelemetry bool

// skipStartup allows NewRaft() to bypass all background work goroutines
skipStartup bool
}
Expand Down
30 changes: 20 additions & 10 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ START:
s.failures++
return
}
appendStats(string(peer.ID), start, float32(len(req.Entries)))
appendStats(string(peer.ID), start, float32(len(req.Entries)), r.noLegacyTelemetry)

// Check for a newer term, stop running
if resp.Term > req.Term {
Expand Down Expand Up @@ -347,8 +347,11 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
}
labels := []metrics.Label{{Name: "peer_id", Value: string(peer.ID)}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "installSnapshot"}, start, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(peer.ID)}, start)

if !r.noLegacyTelemetry {
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(peer.ID)}, start)
}

// Check for a newer term, stop running
if resp.Term > req.Term {
Expand Down Expand Up @@ -423,8 +426,12 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
failures = 0
labels := []metrics.Label{{Name: "peer_id", Value: string(peer.ID)}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "heartbeat"}, start, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(peer.ID)}, start)

if !r.noLegacyTelemetry {
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(peer.ID)}, start)
}

s.notifyAll(resp.Success)
}
}
Expand Down Expand Up @@ -533,7 +540,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh,
s.peerLock.RUnlock()

req, resp := ready.Request(), ready.Response()
appendStats(string(peer.ID), ready.Start(), float32(len(req.Entries)))
appendStats(string(peer.ID), ready.Start(), float32(len(req.Entries)), r.noLegacyTelemetry)

// Check for a newer term, stop running
if resp.Term > req.Term {
Expand Down Expand Up @@ -621,13 +628,16 @@ func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64
}

// appendStats is used to emit stats about an AppendEntries invocation.
func appendStats(peer string, start time.Time, logs float32) {
func appendStats(peer string, start time.Time, logs float32, skipLegacy bool) {
labels := []metrics.Label{{Name: "peer_id", Value: peer}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "appendEntries", "rpc"}, start, labels)
metrics.IncrCounterWithLabels([]string{"raft", "replication", "appendEntries", "logs"}, logs, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start)
metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs)

if !skipLegacy {
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start)
metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs)
}
}

// handleStaleTerm is used when a follower indicates that we have a stale term.
Expand Down