From 3e11b910843ac611ab38d9ab824bfbf3812e05ae Mon Sep 17 00:00:00 2001 From: Tory Wheelwright Date: Tue, 2 Apr 2024 19:45:38 -0400 Subject: [PATCH] fix --- lib/redispub/publisher.go | 8 +++++--- lib/redispub/publisher_test.go | 10 ++++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 198946f3..777b9d12 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -73,6 +73,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts var outChans []chan error var sendFailedMetrics []prometheus.Counter var sendSucceededMetrics []prometheus.Counter + var temporaryFailuresMetrics []prometheus.Counter defer func () { for _, c := range(inChans) { @@ -90,6 +91,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts outChans = append(outChans, outChan) sendSucceededMetrics = append(sendSucceededMetrics, metricSentMessages.WithLabelValues("sent", clientIdxStr)) sendFailedMetrics = append(sendFailedMetrics, metricSentMessages.WithLabelValues("failed", clientIdxStr)) + temporaryFailuresMetrics = append(temporaryFailuresMetrics, metricTemporaryFailures.WithLabelValues(clientIdxStr)) go func() { defer close(outChan) @@ -100,7 +102,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts for p := range inChan { log.Log.Debugw("Attempting to publish to", "clientIdx", clientIdx) - outChan <- publishSingleMessageWithRetries(p, 30, clientIdx, time.Second, publishFn) + outChan <- publishSingleMessageWithRetries(p, 30, clientIdx, time.Second, temporaryFailuresMetrics[clientIdx], publishFn) } }() } @@ -140,7 +142,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts } } -func publishSingleMessageWithRetries(p *Publication, maxRetries int, clientIdx int, sleepTime time.Duration, publishFn func(p *Publication) error) error { +func publishSingleMessageWithRetries(p *Publication, maxRetries int, clientIdx int, sleepTime time.Duration, temporaryFailuresMetric prometheus.Counter, publishFn func(p *Publication) error) error { if p == nil { return errors.New("Nil Redis publication") } @@ -155,7 +157,7 @@ func publishSingleMessageWithRetries(p *Publication, maxRetries int, clientIdx i "retryNumber", retries) // failure, retry - metricTemporaryFailures.WithLabelValues(strconv.FormatInt(int64(clientIdx), 10)).Inc() + temporaryFailuresMetric.Inc() retries++ time.Sleep(sleepTime) } else { diff --git a/lib/redispub/publisher_test.go b/lib/redispub/publisher_test.go index 3448623e..d1778417 100644 --- a/lib/redispub/publisher_test.go +++ b/lib/redispub/publisher_test.go @@ -11,6 +11,8 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) +var temporaryFailuresMetric = metricTemporaryFailures.WithLabelValues("0") + // We don't test PublishStream here -- it requires a real Redis server because // miniredis doesn't support PUBLISH and its lua support is spotty. It gets // tested in integration tests. @@ -33,7 +35,7 @@ func TestPublishSingleMessageWithRetriesImmediateSuccess(t *testing.T) { return nil } - err := publishSingleMessageWithRetries(publication, 30, 0, time.Second, publishFn) + err := publishSingleMessageWithRetries(publication, 30, 0, time.Second, temporaryFailuresMetric, publishFn) if err != nil { t.Errorf("Got unexpected error: %s", err) @@ -67,7 +69,7 @@ func TestPublishSingleMessageWithRetriesTransientFailure(t *testing.T) { return nil } - err := publishSingleMessageWithRetries(publication, 30, 0, 0, publishFn) + err := publishSingleMessageWithRetries(publication, 30, 0, 0, temporaryFailuresMetric, publishFn) if err != nil { t.Errorf("Got unexpected error: %s", err) @@ -85,7 +87,7 @@ func TestPublishSingleMessageWithRetriesPermanentFailure(t *testing.T) { return errors.New("Some error") } - err := publishSingleMessageWithRetries(publication, 30, 0, 0, publishFn) + err := publishSingleMessageWithRetries(publication, 30, 0, 0, temporaryFailuresMetric, publishFn) if err == nil { t.Errorf("Expected an error, but didn't get one") @@ -173,7 +175,7 @@ func TestPeriodicallyUpdateTimestamp(t *testing.T) { } func TestNilPublicationMessage(t *testing.T) { - err := publishSingleMessageWithRetries(nil, 5, 0, 1*time.Second, func(p *Publication) error { + err := publishSingleMessageWithRetries(nil, 5, 0, 1*time.Second, temporaryFailuresMetric, func(p *Publication) error { t.Error("Should not have been called") return nil })