diff --git a/integration-tests/fault-injection/redisStopStart_test.go b/integration-tests/fault-injection/redisStopStart_test.go index 155fdc09..546e1706 100644 --- a/integration-tests/fault-injection/redisStopStart_test.go +++ b/integration-tests/fault-injection/redisStopStart_test.go @@ -61,20 +61,22 @@ func TestRedisStopStart(t *testing.T) { nSuccess := harness.FindPromMetricCounter(metrics, "otr_redispub_processed_messages", map[string]string{ "status": "sent", + "clientIdx": "0", }) if nSuccess != 100 { - t.Errorf("Metric otr_redispub_processed_messages(status: sent) = %d, expected 100", nSuccess) + t.Errorf("Metric otr_redispub_processed_messages(status: sent, clientIdx: 0) = %d, expected 100", nSuccess) } nPermFail := harness.FindPromMetricCounter(metrics, "otr_redispub_processed_messages", map[string]string{ "status": "failed", + "clientIdx": "0", }) if nPermFail != 0 { - t.Errorf("Metric otr_redispub_processed_messages(status: failed) = %d, expected 0", nPermFail) + t.Errorf("Metric otr_redispub_processed_messages(status: failed, clientIdx: 0) = %d, expected 0", nPermFail) } - nTempFail := harness.FindPromMetricCounter(metrics, "otr_redispub_temporary_send_failures", map[string]string{}) + nTempFail := harness.FindPromMetricCounter(metrics, "otr_redispub_temporary_send_failures", map[string]string{ "clientIdx": "0" }) if nTempFail <= 0 { - t.Errorf("Metric otr_redispub_processed_messages = %d, expected >0", nTempFail) + t.Errorf("Metric otr_redispub_temporary_send_failures(clientIdx: 0) = %d, expected >0", nTempFail) } } diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index a1acd392..777b9d12 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -7,6 +7,7 @@ package redispub import ( "context" "fmt" + "strconv" "strings" "time" @@ -45,14 +46,14 @@ var metricSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{ Subsystem: "redispub", Name: "processed_messages", Help: "Messages processed by Redis publisher, partitioned by whether or not we successfully sent them", -}, []string{"status"}) +}, []string{"status", "clientIdx"}) -var metricTemporaryFailures = promauto.NewCounter(prometheus.CounterOpts{ +var metricTemporaryFailures = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "otr", Subsystem: "redispub", Name: "temporary_send_failures", Help: "Number of failures encountered when trying to send a message. We automatically retry, and only register a permanent failure (in otr_redispub_processed_messages) after 30 failures.", -}) +}, []string{"clientIdx"}) // PublishStream reads Publications from the given channel and publishes them // to Redis. @@ -68,20 +69,43 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts // time.Duration dedupeExpirationSeconds := int(opts.DedupeExpiration.Seconds()) - type PubFn func(*Publication)error + var inChans []chan *Publication + var outChans []chan error + var sendFailedMetrics []prometheus.Counter + var sendSucceededMetrics []prometheus.Counter + var temporaryFailuresMetrics []prometheus.Counter - var publishFns []PubFn + defer func () { + for _, c := range(inChans) { + close(c) + } + }() - for _,client := range clients { + for clientIdx, client := range clients { + clientIdx := clientIdx + clientIdxStr := strconv.FormatInt(int64(clientIdx), 10) client := client - publishFn := func(p *Publication) error { - return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds) - } - publishFns = append(publishFns, publishFn) - } + inChan := make(chan *Publication) + inChans = append(inChans, inChan) + outChan := make(chan error) + outChans = append(outChans, outChan) + sendSucceededMetrics = append(sendSucceededMetrics, metricSentMessages.WithLabelValues("sent", clientIdxStr)) + sendFailedMetrics = append(sendFailedMetrics, metricSentMessages.WithLabelValues("failed", clientIdxStr)) + temporaryFailuresMetrics = append(temporaryFailuresMetrics, metricTemporaryFailures.WithLabelValues(clientIdxStr)) + + go func() { + defer close(outChan) + + publishFn := func(p *Publication) error { + return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds) + } - metricSendFailed := metricSentMessages.WithLabelValues("failed") - metricSendSuccess := metricSentMessages.WithLabelValues("sent") + for p := range inChan { + log.Log.Debugw("Attempting to publish to", "clientIdx", clientIdx) + outChan <- publishSingleMessageWithRetries(p, 30, clientIdx, time.Second, temporaryFailuresMetrics[clientIdx], publishFn) + } + }() + } for { select { @@ -90,29 +114,35 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts return case p := <-in: - for i,publishFn := range publishFns { - err := publishSingleMessageWithRetries(p, 30, time.Second, publishFn) - log.Log.Debugw("Published to", "idx", i) - + for _, inChan := range inChans { + inChan <- p + } + + for clientIdx, outChan := range outChans { + err := <-outChan if err != nil { - metricSendFailed.Inc() - log.Log.Errorw("Permanent error while trying to publish message; giving up", + sendFailedMetrics[clientIdx].Inc() + log.Log.Errorw( + "Permanent error while trying to publish message; giving up", + "clientIdx", clientIdx, "error", err, - "message", p) - } else { - metricSendSuccess.Inc() + "message", p, - // We want to make sure we do this *after* we've successfully published - // the messages - timestampC <- p.OplogTimestamp + ) + } else { + sendSucceededMetrics[clientIdx].Inc() } } + + // We want to make sure we do this *after* we've successfully published + // the messages + timestampC <- p.OplogTimestamp } } } -func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime time.Duration, publishFn func(p *Publication) error) error { +func publishSingleMessageWithRetries(p *Publication, maxRetries int, clientIdx int, sleepTime time.Duration, temporaryFailuresMetric prometheus.Counter, publishFn func(p *Publication) error) error { if p == nil { return errors.New("Nil Redis publication") } @@ -127,7 +157,7 @@ func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime t "retryNumber", retries) // failure, retry - metricTemporaryFailures.Inc() + temporaryFailuresMetric.Inc() retries++ time.Sleep(sleepTime) } else { diff --git a/lib/redispub/publisher_test.go b/lib/redispub/publisher_test.go index 0d7f39fa..d1778417 100644 --- a/lib/redispub/publisher_test.go +++ b/lib/redispub/publisher_test.go @@ -11,6 +11,8 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) +var temporaryFailuresMetric = metricTemporaryFailures.WithLabelValues("0") + // We don't test PublishStream here -- it requires a real Redis server because // miniredis doesn't support PUBLISH and its lua support is spotty. It gets // tested in integration tests. @@ -33,7 +35,7 @@ func TestPublishSingleMessageWithRetriesImmediateSuccess(t *testing.T) { return nil } - err := publishSingleMessageWithRetries(publication, 30, time.Second, publishFn) + err := publishSingleMessageWithRetries(publication, 30, 0, time.Second, temporaryFailuresMetric, publishFn) if err != nil { t.Errorf("Got unexpected error: %s", err) @@ -67,7 +69,7 @@ func TestPublishSingleMessageWithRetriesTransientFailure(t *testing.T) { return nil } - err := publishSingleMessageWithRetries(publication, 30, 0, publishFn) + err := publishSingleMessageWithRetries(publication, 30, 0, 0, temporaryFailuresMetric, publishFn) if err != nil { t.Errorf("Got unexpected error: %s", err) @@ -85,7 +87,7 @@ func TestPublishSingleMessageWithRetriesPermanentFailure(t *testing.T) { return errors.New("Some error") } - err := publishSingleMessageWithRetries(publication, 30, 0, publishFn) + err := publishSingleMessageWithRetries(publication, 30, 0, 0, temporaryFailuresMetric, publishFn) if err == nil { t.Errorf("Expected an error, but didn't get one") @@ -173,7 +175,7 @@ func TestPeriodicallyUpdateTimestamp(t *testing.T) { } func TestNilPublicationMessage(t *testing.T) { - err := publishSingleMessageWithRetries(nil, 5, 1*time.Second, func(p *Publication) error { + err := publishSingleMessageWithRetries(nil, 5, 0, 1*time.Second, temporaryFailuresMetric, func(p *Publication) error { t.Error("Should not have been called") return nil })