Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize writes when there are multiple redis sinks #60

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions integration-tests/fault-injection/redisStopStart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,22 @@ func TestRedisStopStart(t *testing.T) {

nSuccess := harness.FindPromMetricCounter(metrics, "otr_redispub_processed_messages", map[string]string{
"status": "sent",
"clientIdx": "0",
})
if nSuccess != 100 {
t.Errorf("Metric otr_redispub_processed_messages(status: sent) = %d, expected 100", nSuccess)
t.Errorf("Metric otr_redispub_processed_messages(status: sent, clientIdx: 0) = %d, expected 100", nSuccess)
}

nPermFail := harness.FindPromMetricCounter(metrics, "otr_redispub_processed_messages", map[string]string{
"status": "failed",
"clientIdx": "0",
})
if nPermFail != 0 {
t.Errorf("Metric otr_redispub_processed_messages(status: failed) = %d, expected 0", nPermFail)
t.Errorf("Metric otr_redispub_processed_messages(status: failed, clientIdx: 0) = %d, expected 0", nPermFail)
}

nTempFail := harness.FindPromMetricCounter(metrics, "otr_redispub_temporary_send_failures", map[string]string{})
nTempFail := harness.FindPromMetricCounter(metrics, "otr_redispub_temporary_send_failures", map[string]string{ "clientIdx": "0" })
if nTempFail <= 0 {
t.Errorf("Metric otr_redispub_processed_messages = %d, expected >0", nTempFail)
t.Errorf("Metric otr_redispub_temporary_send_failures(clientIdx: 0) = %d, expected >0", nTempFail)
}
}
84 changes: 57 additions & 27 deletions lib/redispub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package redispub
import (
"context"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -45,14 +46,14 @@ var metricSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "redispub",
Name: "processed_messages",
Help: "Messages processed by Redis publisher, partitioned by whether or not we successfully sent them",
}, []string{"status"})
}, []string{"status", "clientIdx"})

var metricTemporaryFailures = promauto.NewCounter(prometheus.CounterOpts{
var metricTemporaryFailures = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "otr",
Subsystem: "redispub",
Name: "temporary_send_failures",
Help: "Number of failures encountered when trying to send a message. We automatically retry, and only register a permanent failure (in otr_redispub_processed_messages) after 30 failures.",
})
}, []string{"clientIdx"})

// PublishStream reads Publications from the given channel and publishes them
// to Redis.
Expand All @@ -68,20 +69,43 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
// time.Duration
dedupeExpirationSeconds := int(opts.DedupeExpiration.Seconds())

type PubFn func(*Publication)error
var inChans []chan *Publication
var outChans []chan error
var sendFailedMetrics []prometheus.Counter
var sendSucceededMetrics []prometheus.Counter
var temporaryFailuresMetrics []prometheus.Counter

var publishFns []PubFn
defer func () {
for _, c := range(inChans) {
close(c)
}
}()

for _,client := range clients {
for clientIdx, client := range clients {
clientIdx := clientIdx
clientIdxStr := strconv.FormatInt(int64(clientIdx), 10)
client := client
publishFn := func(p *Publication) error {
return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds)
}
publishFns = append(publishFns, publishFn)
}
inChan := make(chan *Publication)
inChans = append(inChans, inChan)
outChan := make(chan error)
outChans = append(outChans, outChan)
sendSucceededMetrics = append(sendSucceededMetrics, metricSentMessages.WithLabelValues("sent", clientIdxStr))
sendFailedMetrics = append(sendFailedMetrics, metricSentMessages.WithLabelValues("failed", clientIdxStr))
temporaryFailuresMetrics = append(temporaryFailuresMetrics, metricTemporaryFailures.WithLabelValues(clientIdxStr))

go func() {
defer close(outChan)

publishFn := func(p *Publication) error {
return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds)
}

metricSendFailed := metricSentMessages.WithLabelValues("failed")
metricSendSuccess := metricSentMessages.WithLabelValues("sent")
for p := range inChan {
log.Log.Debugw("Attempting to publish to", "clientIdx", clientIdx)
outChan <- publishSingleMessageWithRetries(p, 30, clientIdx, time.Second, temporaryFailuresMetrics[clientIdx], publishFn)
}
}()
}

for {
select {
Expand All @@ -90,29 +114,35 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
return

case p := <-in:
for i,publishFn := range publishFns {
err := publishSingleMessageWithRetries(p, 30, time.Second, publishFn)
log.Log.Debugw("Published to", "idx", i)

for _, inChan := range inChans {
inChan <- p
}

for clientIdx, outChan := range outChans {
err := <-outChan

if err != nil {
metricSendFailed.Inc()
log.Log.Errorw("Permanent error while trying to publish message; giving up",
sendFailedMetrics[clientIdx].Inc()
log.Log.Errorw(
"Permanent error while trying to publish message; giving up",
"clientIdx", clientIdx,
"error", err,
"message", p)
} else {
metricSendSuccess.Inc()
"message", p,

// We want to make sure we do this *after* we've successfully published
// the messages
timestampC <- p.OplogTimestamp
)
} else {
sendSucceededMetrics[clientIdx].Inc()
}
}

// We want to make sure we do this *after* we've successfully published
// the messages
timestampC <- p.OplogTimestamp
}
}
}

func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime time.Duration, publishFn func(p *Publication) error) error {
func publishSingleMessageWithRetries(p *Publication, maxRetries int, clientIdx int, sleepTime time.Duration, temporaryFailuresMetric prometheus.Counter, publishFn func(p *Publication) error) error {
if p == nil {
return errors.New("Nil Redis publication")
}
Expand All @@ -127,7 +157,7 @@ func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime t
"retryNumber", retries)

// failure, retry
metricTemporaryFailures.Inc()
temporaryFailuresMetric.Inc()
retries++
time.Sleep(sleepTime)
} else {
Expand Down
10 changes: 6 additions & 4 deletions lib/redispub/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
)

var temporaryFailuresMetric = metricTemporaryFailures.WithLabelValues("0")

// We don't test PublishStream here -- it requires a real Redis server because
// miniredis doesn't support PUBLISH and its lua support is spotty. It gets
// tested in integration tests.
Expand All @@ -33,7 +35,7 @@ func TestPublishSingleMessageWithRetriesImmediateSuccess(t *testing.T) {
return nil
}

err := publishSingleMessageWithRetries(publication, 30, time.Second, publishFn)
err := publishSingleMessageWithRetries(publication, 30, 0, time.Second, temporaryFailuresMetric, publishFn)

if err != nil {
t.Errorf("Got unexpected error: %s", err)
Expand Down Expand Up @@ -67,7 +69,7 @@ func TestPublishSingleMessageWithRetriesTransientFailure(t *testing.T) {
return nil
}

err := publishSingleMessageWithRetries(publication, 30, 0, publishFn)
err := publishSingleMessageWithRetries(publication, 30, 0, 0, temporaryFailuresMetric, publishFn)

if err != nil {
t.Errorf("Got unexpected error: %s", err)
Expand All @@ -85,7 +87,7 @@ func TestPublishSingleMessageWithRetriesPermanentFailure(t *testing.T) {
return errors.New("Some error")
}

err := publishSingleMessageWithRetries(publication, 30, 0, publishFn)
err := publishSingleMessageWithRetries(publication, 30, 0, 0, temporaryFailuresMetric, publishFn)

if err == nil {
t.Errorf("Expected an error, but didn't get one")
Expand Down Expand Up @@ -173,7 +175,7 @@ func TestPeriodicallyUpdateTimestamp(t *testing.T) {
}

func TestNilPublicationMessage(t *testing.T) {
err := publishSingleMessageWithRetries(nil, 5, 1*time.Second, func(p *Publication) error {
err := publishSingleMessageWithRetries(nil, 5, 0, 1*time.Second, temporaryFailuresMetric, func(p *Publication) error {
t.Error("Should not have been called")
return nil
})
Expand Down
Loading