From 641e0276602adca6435a64f8a0d2d6ebdf14bc96 Mon Sep 17 00:00:00 2001 From: Tory Wheelwright Date: Sat, 30 Mar 2024 07:13:25 -0400 Subject: [PATCH 01/10] Parallelize writes when there are multiple redis sinks --- lib/redispub/publisher.go | 60 +++++++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index a1acd392..8dc493ba 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -8,6 +8,7 @@ import ( "context" "fmt" "strings" + "sync" "time" "github.com/tulip/oplogtoredis/lib/log" @@ -68,16 +69,43 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts // time.Duration dedupeExpirationSeconds := int(opts.DedupeExpiration.Seconds()) - type PubFn func(*Publication)error + type PubFn func(*Publication) error - var publishFns []PubFn + var wg sync.WaitGroup + defer wg.Wait() - for _,client := range clients { - client := client - publishFn := func(p *Publication) error { - return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds) + var inChans []chan *Publication + var outChans []chan error + + defer func () { + for _, c := range(inChans) { + close(c) } - publishFns = append(publishFns, publishFn) + }() + + for i, client := range clients { + go func() { + wg.Add(1) + defer wg.Done() + + inChan := make(chan *Publication) + inChans = append(inChans, inChan) + outChan := make(chan error) + defer close(outChan) + outChans = append(outChans, outChan) + chanIdx := i + + client := client + + publishFn := func(p *Publication) error { + return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds) + } + + for p := range inChan { + log.Log.Debugw("Attempting to publish to", "idx", chanIdx) + outChan <- publishSingleMessageWithRetries(p, 30, time.Second, publishFn) + } + }() } metricSendFailed := metricSentMessages.WithLabelValues("failed") @@ -90,10 +118,12 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts return case p := <-in: - for i,publishFn := range publishFns { - err := publishSingleMessageWithRetries(p, 30, time.Second, publishFn) - log.Log.Debugw("Published to", "idx", i) - + for _, inChan := range inChans { + inChan <- p + } + + for _, outChan := range outChans { + err := <-outChan if err != nil { metricSendFailed.Inc() @@ -102,12 +132,12 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts "message", p) } else { metricSendSuccess.Inc() - - // We want to make sure we do this *after* we've successfully published - // the messages - timestampC <- p.OplogTimestamp } } + + // We want to make sure we do this *after* we've successfully published + // the messages + timestampC <- p.OplogTimestamp } } } From c36d5af4be7eab1de9f63a98d38f0184e69278ad Mon Sep 17 00:00:00 2001 From: Tory Wheelwright Date: Sat, 30 Mar 2024 07:20:34 -0400 Subject: [PATCH 02/10] fix lint --- lib/redispub/publisher.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 8dc493ba..18ca9ef2 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -8,7 +8,6 @@ import ( "context" "fmt" "strings" - "sync" "time" "github.com/tulip/oplogtoredis/lib/log" @@ -69,11 +68,6 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts // time.Duration dedupeExpirationSeconds := int(opts.DedupeExpiration.Seconds()) - type PubFn func(*Publication) error - - var wg sync.WaitGroup - defer wg.Wait() - var inChans []chan *Publication var outChans []chan error @@ -84,25 +78,22 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts }() for i, client := range clients { - go func() { - wg.Add(1) - defer wg.Done() + i := i + client := client + go func() { inChan := make(chan *Publication) inChans = append(inChans, inChan) outChan := make(chan error) defer close(outChan) outChans = append(outChans, outChan) - chanIdx := i - - client := client publishFn := func(p *Publication) error { return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds) } for p := range inChan { - log.Log.Debugw("Attempting to publish to", "idx", chanIdx) + log.Log.Debugw("Attempting to publish to", "idx", i) outChan <- publishSingleMessageWithRetries(p, 30, time.Second, publishFn) } }() From 0c17a5fe14f561f06ac1fa1421ae5bebdfa05a0b Mon Sep 17 00:00:00 2001 From: Tory Wheelwright Date: Mon, 1 Apr 2024 11:08:12 -0400 Subject: [PATCH 03/10] partition metrics by redis cluster --- lib/redispub/publisher.go | 35 ++++++++++++++++++---------------- lib/redispub/publisher_test.go | 8 ++++---- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 18ca9ef2..1009b6bf 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -7,6 +7,7 @@ package redispub import ( "context" "fmt" + "strconv" "strings" "time" @@ -45,14 +46,14 @@ var metricSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{ Subsystem: "redispub", Name: "processed_messages", Help: "Messages processed by Redis publisher, partitioned by whether or not we successfully sent them", -}, []string{"status"}) +}, []string{"status", "clientIdx"}) -var metricTemporaryFailures = promauto.NewCounter(prometheus.CounterOpts{ +var metricTemporaryFailures = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "otr", Subsystem: "redispub", Name: "temporary_send_failures", Help: "Number of failures encountered when trying to send a message. We automatically retry, and only register a permanent failure (in otr_redispub_processed_messages) after 30 failures.", -}) +}, []string{"clientIdx"}) // PublishStream reads Publications from the given channel and publishes them // to Redis. @@ -78,7 +79,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts }() for i, client := range clients { - i := i + clientIdx := i client := client go func() { @@ -93,15 +94,12 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts } for p := range inChan { - log.Log.Debugw("Attempting to publish to", "idx", i) - outChan <- publishSingleMessageWithRetries(p, 30, time.Second, publishFn) + log.Log.Debugw("Attempting to publish to", "clientIdx", clientIdx) + outChan <- publishSingleMessageWithRetries(p, 30, clientIdx, time.Second, publishFn) } }() } - metricSendFailed := metricSentMessages.WithLabelValues("failed") - metricSendSuccess := metricSentMessages.WithLabelValues("sent") - for { select { case <-stop: @@ -113,16 +111,21 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts inChan <- p } - for _, outChan := range outChans { + for clientIdx, outChan := range outChans { err := <-outChan + clientIdxStr := strconv.FormatInt(int64(clientIdx), 10) if err != nil { - metricSendFailed.Inc() - log.Log.Errorw("Permanent error while trying to publish message; giving up", + metricSentMessages.WithLabelValues("failed", clientIdxStr).Inc() + log.Log.Errorw( + "Permanent error while trying to publish message; giving up", + "clientIdx", clientIdx, "error", err, - "message", p) + "message", p, + + ) } else { - metricSendSuccess.Inc() + metricSentMessages.WithLabelValues("sent", clientIdxStr).Inc() } } @@ -133,7 +136,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts } } -func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime time.Duration, publishFn func(p *Publication) error) error { +func publishSingleMessageWithRetries(p *Publication, maxRetries int, clientIdx int, sleepTime time.Duration, publishFn func(p *Publication) error) error { if p == nil { return errors.New("Nil Redis publication") } @@ -148,7 +151,7 @@ func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime t "retryNumber", retries) // failure, retry - metricTemporaryFailures.Inc() + metricTemporaryFailures.WithLabelValues("clientIdx", strconv.FormatInt(int64(clientIdx), 10)).Inc() retries++ time.Sleep(sleepTime) } else { diff --git a/lib/redispub/publisher_test.go b/lib/redispub/publisher_test.go index 0d7f39fa..3448623e 100644 --- a/lib/redispub/publisher_test.go +++ b/lib/redispub/publisher_test.go @@ -33,7 +33,7 @@ func TestPublishSingleMessageWithRetriesImmediateSuccess(t *testing.T) { return nil } - err := publishSingleMessageWithRetries(publication, 30, time.Second, publishFn) + err := publishSingleMessageWithRetries(publication, 30, 0, time.Second, publishFn) if err != nil { t.Errorf("Got unexpected error: %s", err) @@ -67,7 +67,7 @@ func TestPublishSingleMessageWithRetriesTransientFailure(t *testing.T) { return nil } - err := publishSingleMessageWithRetries(publication, 30, 0, publishFn) + err := publishSingleMessageWithRetries(publication, 30, 0, 0, publishFn) if err != nil { t.Errorf("Got unexpected error: %s", err) @@ -85,7 +85,7 @@ func TestPublishSingleMessageWithRetriesPermanentFailure(t *testing.T) { return errors.New("Some error") } - err := publishSingleMessageWithRetries(publication, 30, 0, publishFn) + err := publishSingleMessageWithRetries(publication, 30, 0, 0, publishFn) if err == nil { t.Errorf("Expected an error, but didn't get one") @@ -173,7 +173,7 @@ func TestPeriodicallyUpdateTimestamp(t *testing.T) { } func TestNilPublicationMessage(t *testing.T) { - err := publishSingleMessageWithRetries(nil, 5, 1*time.Second, func(p *Publication) error { + err := publishSingleMessageWithRetries(nil, 5, 0, 1*time.Second, func(p *Publication) error { t.Error("Should not have been called") return nil }) From 29aae485c5dfc05bf04e57a3851955a4809ca955 Mon Sep 17 00:00:00 2001 From: Tory Wheelwright Date: Mon, 1 Apr 2024 11:23:29 -0400 Subject: [PATCH 04/10] fix metric invocation --- lib/redispub/publisher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 1009b6bf..2e44bd5a 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -151,7 +151,7 @@ func publishSingleMessageWithRetries(p *Publication, maxRetries int, clientIdx i "retryNumber", retries) // failure, retry - metricTemporaryFailures.WithLabelValues("clientIdx", strconv.FormatInt(int64(clientIdx), 10)).Inc() + metricTemporaryFailures.WithLabelValues(strconv.FormatInt(int64(clientIdx), 10)).Inc() retries++ time.Sleep(sleepTime) } else { From 928c7b4b337b3f910f3ae9c3504bc5cb5952e777 Mon Sep 17 00:00:00 2001 From: Tory Wheelwright Date: Mon, 1 Apr 2024 13:04:10 -0400 Subject: [PATCH 05/10] fix test hopefully --- integration-tests/fault-injection/redisStopStart_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/integration-tests/fault-injection/redisStopStart_test.go b/integration-tests/fault-injection/redisStopStart_test.go index 155fdc09..f3320e9e 100644 --- a/integration-tests/fault-injection/redisStopStart_test.go +++ b/integration-tests/fault-injection/redisStopStart_test.go @@ -68,6 +68,7 @@ func TestRedisStopStart(t *testing.T) { nPermFail := harness.FindPromMetricCounter(metrics, "otr_redispub_processed_messages", map[string]string{ "status": "failed", + "clientIdx": "0", }) if nPermFail != 0 { t.Errorf("Metric otr_redispub_processed_messages(status: failed) = %d, expected 0", nPermFail) From 8b516dea6524ee6e23fe901959d6b696f3605222 Mon Sep 17 00:00:00 2001 From: Tory Wheelwright Date: Mon, 1 Apr 2024 13:25:52 -0400 Subject: [PATCH 06/10] fix moar --- integration-tests/fault-injection/redisStopStart_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/integration-tests/fault-injection/redisStopStart_test.go b/integration-tests/fault-injection/redisStopStart_test.go index f3320e9e..bc12d1ed 100644 --- a/integration-tests/fault-injection/redisStopStart_test.go +++ b/integration-tests/fault-injection/redisStopStart_test.go @@ -61,6 +61,7 @@ func TestRedisStopStart(t *testing.T) { nSuccess := harness.FindPromMetricCounter(metrics, "otr_redispub_processed_messages", map[string]string{ "status": "sent", + "clientIdx": "0", }) if nSuccess != 100 { t.Errorf("Metric otr_redispub_processed_messages(status: sent) = %d, expected 100", nSuccess) From e6bd128442bcb0832e8e6e71e9ca348e0e81cdfc Mon Sep 17 00:00:00 2001 From: Tory Wheelwright Date: Mon, 1 Apr 2024 14:17:51 -0400 Subject: [PATCH 07/10] fix --- integration-tests/fault-injection/redisStopStart_test.go | 4 ++-- lib/redispub/publisher.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/integration-tests/fault-injection/redisStopStart_test.go b/integration-tests/fault-injection/redisStopStart_test.go index bc12d1ed..a34b6f66 100644 --- a/integration-tests/fault-injection/redisStopStart_test.go +++ b/integration-tests/fault-injection/redisStopStart_test.go @@ -64,7 +64,7 @@ func TestRedisStopStart(t *testing.T) { "clientIdx": "0", }) if nSuccess != 100 { - t.Errorf("Metric otr_redispub_processed_messages(status: sent) = %d, expected 100", nSuccess) + t.Errorf("Metric otr_redispub_processed_messages(status: sent, clientIdx: 0) = %d, expected 100", nSuccess) } nPermFail := harness.FindPromMetricCounter(metrics, "otr_redispub_processed_messages", map[string]string{ @@ -72,7 +72,7 @@ func TestRedisStopStart(t *testing.T) { "clientIdx": "0", }) if nPermFail != 0 { - t.Errorf("Metric otr_redispub_processed_messages(status: failed) = %d, expected 0", nPermFail) + t.Errorf("Metric otr_redispub_processed_messages(status: failed, clientIdx: 0) = %d, expected 0", nPermFail) } nTempFail := harness.FindPromMetricCounter(metrics, "otr_redispub_temporary_send_failures", map[string]string{}) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 2e44bd5a..9434e588 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -81,13 +81,13 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts for i, client := range clients { clientIdx := i client := client + inChan := make(chan *Publication) + inChans = append(inChans, inChan) + outChan := make(chan error) + outChans = append(outChans, outChan) go func() { - inChan := make(chan *Publication) - inChans = append(inChans, inChan) - outChan := make(chan error) defer close(outChan) - outChans = append(outChans, outChan) publishFn := func(p *Publication) error { return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds) From f412bbbd7f3d24dedd3483cc84632fb2502ebf44 Mon Sep 17 00:00:00 2001 From: Tory Wheelwright Date: Tue, 2 Apr 2024 10:49:48 -0400 Subject: [PATCH 08/10] fix? --- lib/redispub/publisher.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 9434e588..198946f3 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -71,6 +71,8 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts var inChans []chan *Publication var outChans []chan error + var sendFailedMetrics []prometheus.Counter + var sendSucceededMetrics []prometheus.Counter defer func () { for _, c := range(inChans) { @@ -78,13 +80,16 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts } }() - for i, client := range clients { - clientIdx := i + for clientIdx, client := range clients { + clientIdx := clientIdx + clientIdxStr := strconv.FormatInt(int64(clientIdx), 10) client := client inChan := make(chan *Publication) inChans = append(inChans, inChan) outChan := make(chan error) outChans = append(outChans, outChan) + sendSucceededMetrics = append(sendSucceededMetrics, metricSentMessages.WithLabelValues("sent", clientIdxStr)) + sendFailedMetrics = append(sendFailedMetrics, metricSentMessages.WithLabelValues("failed", clientIdxStr)) go func() { defer close(outChan) @@ -113,10 +118,9 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts for clientIdx, outChan := range outChans { err := <-outChan - clientIdxStr := strconv.FormatInt(int64(clientIdx), 10) if err != nil { - metricSentMessages.WithLabelValues("failed", clientIdxStr).Inc() + sendFailedMetrics[clientIdx].Inc() log.Log.Errorw( "Permanent error while trying to publish message; giving up", "clientIdx", clientIdx, @@ -125,7 +129,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts ) } else { - metricSentMessages.WithLabelValues("sent", clientIdxStr).Inc() + sendSucceededMetrics[clientIdx].Inc() } } From 3e11b910843ac611ab38d9ab824bfbf3812e05ae Mon Sep 17 00:00:00 2001 From: Tory Wheelwright Date: Tue, 2 Apr 2024 19:45:38 -0400 Subject: [PATCH 09/10] fix --- lib/redispub/publisher.go | 8 +++++--- lib/redispub/publisher_test.go | 10 ++++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 198946f3..777b9d12 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -73,6 +73,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts var outChans []chan error var sendFailedMetrics []prometheus.Counter var sendSucceededMetrics []prometheus.Counter + var temporaryFailuresMetrics []prometheus.Counter defer func () { for _, c := range(inChans) { @@ -90,6 +91,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts outChans = append(outChans, outChan) sendSucceededMetrics = append(sendSucceededMetrics, metricSentMessages.WithLabelValues("sent", clientIdxStr)) sendFailedMetrics = append(sendFailedMetrics, metricSentMessages.WithLabelValues("failed", clientIdxStr)) + temporaryFailuresMetrics = append(temporaryFailuresMetrics, metricTemporaryFailures.WithLabelValues(clientIdxStr)) go func() { defer close(outChan) @@ -100,7 +102,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts for p := range inChan { log.Log.Debugw("Attempting to publish to", "clientIdx", clientIdx) - outChan <- publishSingleMessageWithRetries(p, 30, clientIdx, time.Second, publishFn) + outChan <- publishSingleMessageWithRetries(p, 30, clientIdx, time.Second, temporaryFailuresMetrics[clientIdx], publishFn) } }() } @@ -140,7 +142,7 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts } } -func publishSingleMessageWithRetries(p *Publication, maxRetries int, clientIdx int, sleepTime time.Duration, publishFn func(p *Publication) error) error { +func publishSingleMessageWithRetries(p *Publication, maxRetries int, clientIdx int, sleepTime time.Duration, temporaryFailuresMetric prometheus.Counter, publishFn func(p *Publication) error) error { if p == nil { return errors.New("Nil Redis publication") } @@ -155,7 +157,7 @@ func publishSingleMessageWithRetries(p *Publication, maxRetries int, clientIdx i "retryNumber", retries) // failure, retry - metricTemporaryFailures.WithLabelValues(strconv.FormatInt(int64(clientIdx), 10)).Inc() + temporaryFailuresMetric.Inc() retries++ time.Sleep(sleepTime) } else { diff --git a/lib/redispub/publisher_test.go b/lib/redispub/publisher_test.go index 3448623e..d1778417 100644 --- a/lib/redispub/publisher_test.go +++ b/lib/redispub/publisher_test.go @@ -11,6 +11,8 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) +var temporaryFailuresMetric = metricTemporaryFailures.WithLabelValues("0") + // We don't test PublishStream here -- it requires a real Redis server because // miniredis doesn't support PUBLISH and its lua support is spotty. It gets // tested in integration tests. @@ -33,7 +35,7 @@ func TestPublishSingleMessageWithRetriesImmediateSuccess(t *testing.T) { return nil } - err := publishSingleMessageWithRetries(publication, 30, 0, time.Second, publishFn) + err := publishSingleMessageWithRetries(publication, 30, 0, time.Second, temporaryFailuresMetric, publishFn) if err != nil { t.Errorf("Got unexpected error: %s", err) @@ -67,7 +69,7 @@ func TestPublishSingleMessageWithRetriesTransientFailure(t *testing.T) { return nil } - err := publishSingleMessageWithRetries(publication, 30, 0, 0, publishFn) + err := publishSingleMessageWithRetries(publication, 30, 0, 0, temporaryFailuresMetric, publishFn) if err != nil { t.Errorf("Got unexpected error: %s", err) @@ -85,7 +87,7 @@ func TestPublishSingleMessageWithRetriesPermanentFailure(t *testing.T) { return errors.New("Some error") } - err := publishSingleMessageWithRetries(publication, 30, 0, 0, publishFn) + err := publishSingleMessageWithRetries(publication, 30, 0, 0, temporaryFailuresMetric, publishFn) if err == nil { t.Errorf("Expected an error, but didn't get one") @@ -173,7 +175,7 @@ func TestPeriodicallyUpdateTimestamp(t *testing.T) { } func TestNilPublicationMessage(t *testing.T) { - err := publishSingleMessageWithRetries(nil, 5, 0, 1*time.Second, func(p *Publication) error { + err := publishSingleMessageWithRetries(nil, 5, 0, 1*time.Second, temporaryFailuresMetric, func(p *Publication) error { t.Error("Should not have been called") return nil }) From 5dcc6ac9860029d993e4f95537078174359c8b28 Mon Sep 17 00:00:00 2001 From: Tory Wheelwright Date: Tue, 2 Apr 2024 19:53:12 -0400 Subject: [PATCH 10/10] ay --- integration-tests/fault-injection/redisStopStart_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/fault-injection/redisStopStart_test.go b/integration-tests/fault-injection/redisStopStart_test.go index a34b6f66..546e1706 100644 --- a/integration-tests/fault-injection/redisStopStart_test.go +++ b/integration-tests/fault-injection/redisStopStart_test.go @@ -75,8 +75,8 @@ func TestRedisStopStart(t *testing.T) { t.Errorf("Metric otr_redispub_processed_messages(status: failed, clientIdx: 0) = %d, expected 0", nPermFail) } - nTempFail := harness.FindPromMetricCounter(metrics, "otr_redispub_temporary_send_failures", map[string]string{}) + nTempFail := harness.FindPromMetricCounter(metrics, "otr_redispub_temporary_send_failures", map[string]string{ "clientIdx": "0" }) if nTempFail <= 0 { - t.Errorf("Metric otr_redispub_processed_messages = %d, expected >0", nTempFail) + t.Errorf("Metric otr_redispub_temporary_send_failures(clientIdx: 0) = %d, expected >0", nTempFail) } }