Skip to content

Commit

Permalink
partition metrics by redis cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
torywheelwright committed Apr 1, 2024
1 parent 78bdb42 commit 31d3b6e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 20 deletions.
35 changes: 19 additions & 16 deletions lib/redispub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package redispub
import (
"context"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -45,14 +46,14 @@ var metricSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "redispub",
Name: "processed_messages",
Help: "Messages processed by Redis publisher, partitioned by whether or not we successfully sent them",
}, []string{"status"})
}, []string{"status", "clientIdx"})

var metricTemporaryFailures = promauto.NewCounter(prometheus.CounterOpts{
var metricTemporaryFailures = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "otr",
Subsystem: "redispub",
Name: "temporary_send_failures",
Help: "Number of failures encountered when trying to send a message. We automatically retry, and only register a permanent failure (in otr_redispub_processed_messages) after 30 failures.",
})
}, []string{"clientIdx"})

// PublishStream reads Publications from the given channel and publishes them
// to Redis.
Expand All @@ -78,7 +79,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
}()

for i, client := range clients {
i := i
clientIdx := i
client := client

go func() {
Expand All @@ -93,15 +94,12 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
}

for p := range inChan {
log.Log.Debugw("Attempting to publish to", "idx", i)
outChan <- publishSingleMessageWithRetries(p, 30, time.Second, publishFn)
log.Log.Debugw("Attempting to publish to", "clientIdx", clientIdx)
outChan <- publishSingleMessageWithRetries(p, 30, clientIdx, time.Second, publishFn)
}
}()
}

metricSendFailed := metricSentMessages.WithLabelValues("failed")
metricSendSuccess := metricSentMessages.WithLabelValues("sent")

for {
select {
case <-stop:
Expand All @@ -113,16 +111,21 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
inChan <- p
}

for _, outChan := range outChans {
for clientIdx, outChan := range outChans {
err := <-outChan
clientIdxStr := strconv.FormatInt(int64(clientIdx), 10)

if err != nil {
metricSendFailed.Inc()
log.Log.Errorw("Permanent error while trying to publish message; giving up",
metricSentMessages.WithLabelValues("failed", clientIdxStr).Inc()
log.Log.Errorw(
"Permanent error while trying to publish message; giving up",
"clientIdx", clientIdx,
"error", err,
"message", p)
"message", p,

)
} else {
metricSendSuccess.Inc()
metricSentMessages.WithLabelValues("sent", clientIdxStr).Inc()
}
}

Expand All @@ -133,7 +136,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
}
}

func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime time.Duration, publishFn func(p *Publication) error) error {
func publishSingleMessageWithRetries(p *Publication, maxRetries int, clientIdx int, sleepTime time.Duration, publishFn func(p *Publication) error) error {
if p == nil {
return errors.New("Nil Redis publication")
}
Expand All @@ -148,7 +151,7 @@ func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime t
"retryNumber", retries)

// failure, retry
metricTemporaryFailures.Inc()
metricTemporaryFailures.WithLabelValues("clientIdx", strconv.FormatInt(int64(clientIdx), 10)).Inc()
retries++
time.Sleep(sleepTime)
} else {
Expand Down
8 changes: 4 additions & 4 deletions lib/redispub/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestPublishSingleMessageWithRetriesImmediateSuccess(t *testing.T) {
return nil
}

err := publishSingleMessageWithRetries(publication, 30, time.Second, publishFn)
err := publishSingleMessageWithRetries(publication, 30, 0, time.Second, publishFn)

if err != nil {
t.Errorf("Got unexpected error: %s", err)
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestPublishSingleMessageWithRetriesTransientFailure(t *testing.T) {
return nil
}

err := publishSingleMessageWithRetries(publication, 30, 0, publishFn)
err := publishSingleMessageWithRetries(publication, 30, 0, 0, publishFn)

if err != nil {
t.Errorf("Got unexpected error: %s", err)
Expand All @@ -85,7 +85,7 @@ func TestPublishSingleMessageWithRetriesPermanentFailure(t *testing.T) {
return errors.New("Some error")
}

err := publishSingleMessageWithRetries(publication, 30, 0, publishFn)
err := publishSingleMessageWithRetries(publication, 30, 0, 0, publishFn)

if err == nil {
t.Errorf("Expected an error, but didn't get one")
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestPeriodicallyUpdateTimestamp(t *testing.T) {
}

func TestNilPublicationMessage(t *testing.T) {
err := publishSingleMessageWithRetries(nil, 5, 1*time.Second, func(p *Publication) error {
err := publishSingleMessageWithRetries(nil, 5, 0, 1*time.Second, func(p *Publication) error {
t.Error("Should not have been called")
return nil
})
Expand Down

0 comments on commit 31d3b6e

Please sign in to comment.