Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add flag to skip legacy duplicate telemetry #4

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ type Raft struct {
// preVoteDisabled control if the pre-vote feature is activated,
// prevote feature is disabled if set to true.
preVoteDisabled bool

// noLegacyTelemetry allow to skip the legacy metrics to avoid duplicates.
// legacy metrics are which has `_peer_name` as metric suffix instead as labels.
// e.g: raft_replication_heartbeat_peer0
noLegacyTelemetry bool
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand All @@ -232,7 +237,8 @@ type Raft struct {
// listing just itself as a Voter, then invoke AddVoter() on it to add other
// servers to the cluster.
func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport, configuration Configuration) error {
snaps SnapshotStore, trans Transport, configuration Configuration,
) error {
// Validate the Raft server config.
if err := ValidateConfig(conf); err != nil {
return err
Expand Down Expand Up @@ -305,7 +311,8 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
// the sole voter, and then join up other new clean-state peer servers using
// the usual APIs in order to bring the cluster back into a known state.
func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport, configuration Configuration) error {
snaps SnapshotStore, trans Transport, configuration Configuration,
) error {
// Validate the Raft server config.
if err := ValidateConfig(conf); err != nil {
return err
Expand Down Expand Up @@ -436,7 +443,8 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
// without starting a Raft instance or connecting to the cluster. This function
// has identical behavior to Raft.GetConfiguration.
func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport) (Configuration, error) {
snaps SnapshotStore, trans Transport,
) (Configuration, error) {
conf.skipStartup = true
r, err := NewRaft(conf, fsm, logs, stable, snaps, trans)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ type Config struct {

// skipStartup allows NewRaft() to bypass all background work goroutines
skipStartup bool

// noLegacyTelemetry allow to skip the legacy metrics to avoid duplicates.
// legacy metrics are which has `_peer_name` as metric suffix instead as labels.
// e.g: raft_replication_heartbeat_peer0
noLegacyTelemetry bool
}

func (conf *Config) getOrCreateLogger() hclog.Logger {
Expand Down
30 changes: 20 additions & 10 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ START:
s.failures++
return
}
appendStats(string(peer.ID), start, float32(len(req.Entries)))
appendStats(string(peer.ID), start, float32(len(req.Entries)), r.noLegacyTelemetry)

// Check for a newer term, stop running
if resp.Term > req.Term {
Expand Down Expand Up @@ -347,8 +347,11 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
}
labels := []metrics.Label{{Name: "peer_id", Value: string(peer.ID)}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "installSnapshot"}, start, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(peer.ID)}, start)

if r.noLegacyTelemetry {
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(peer.ID)}, start)
}

// Check for a newer term, stop running
if resp.Term > req.Term {
Expand Down Expand Up @@ -423,8 +426,12 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
failures = 0
labels := []metrics.Label{{Name: "peer_id", Value: string(peer.ID)}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "heartbeat"}, start, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(peer.ID)}, start)

if r.noLegacyTelemetry {
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(peer.ID)}, start)
}

s.notifyAll(resp.Success)
}
}
Expand Down Expand Up @@ -533,7 +540,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh,
s.peerLock.RUnlock()

req, resp := ready.Request(), ready.Response()
appendStats(string(peer.ID), ready.Start(), float32(len(req.Entries)))
appendStats(string(peer.ID), ready.Start(), float32(len(req.Entries)), r.noLegacyTelemetry)

// Check for a newer term, stop running
if resp.Term > req.Term {
Expand Down Expand Up @@ -621,13 +628,16 @@ func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64
}

// appendStats is used to emit stats about an AppendEntries invocation.
func appendStats(peer string, start time.Time, logs float32) {
func appendStats(peer string, start time.Time, logs float32, skipLegacy bool) {
labels := []metrics.Label{{Name: "peer_id", Value: peer}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "appendEntries", "rpc"}, start, labels)
metrics.IncrCounterWithLabels([]string{"raft", "replication", "appendEntries", "logs"}, logs, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start)
metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs)

if !skipLegacy {
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start)
metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs)
}
}

// handleStaleTerm is used when a follower indicates that we have a stale term.
Expand Down