Skip to content

Commit

Permalink
Parallelize writes when there are multiple redis sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
torywheelwright committed Mar 30, 2024
1 parent bf16521 commit b66afa4
Showing 1 changed file with 45 additions and 15 deletions.
60 changes: 45 additions & 15 deletions lib/redispub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/tulip/oplogtoredis/lib/log"
Expand Down Expand Up @@ -68,16 +69,43 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
// time.Duration
dedupeExpirationSeconds := int(opts.DedupeExpiration.Seconds())

type PubFn func(*Publication)error
type PubFn func(*Publication) error

Check failure on line 72 in lib/redispub/publisher.go

View workflow job for this annotation

GitHub Actions / lint_and_units

type `PubFn` is unused (unused)

var publishFns []PubFn
var wg sync.WaitGroup
defer wg.Wait()

for _,client := range clients {
client := client
publishFn := func(p *Publication) error {
return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds)
var inChans []chan *Publication
var outChans []chan error

defer func () {
for _, c := range(inChans) {
close(c)
}
publishFns = append(publishFns, publishFn)
}()

for i, client := range clients {
go func() {
wg.Add(1)

Check failure on line 88 in lib/redispub/publisher.go

View workflow job for this annotation

GitHub Actions / lint_and_units

SA2000: should call wg.Add(1) before starting the goroutine to avoid a race (staticcheck)
defer wg.Done()

inChan := make(chan *Publication)
inChans = append(inChans, inChan)
outChan := make(chan error)
defer close(outChan)
outChans = append(outChans, outChan)
chanIdx := i

Check failure on line 96 in lib/redispub/publisher.go

View workflow job for this annotation

GitHub Actions / lint_and_units

loopclosure: loop variable i captured by func literal (govet)

client := client

Check failure on line 98 in lib/redispub/publisher.go

View workflow job for this annotation

GitHub Actions / lint_and_units

loopclosure: loop variable client captured by func literal (govet)

publishFn := func(p *Publication) error {
return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds)
}

for p := range inChan {
log.Log.Debugw("Attempting to publish to", "idx", chanIdx)
outChan <- publishSingleMessageWithRetries(p, 30, time.Second, publishFn)
}
}()
}

metricSendFailed := metricSentMessages.WithLabelValues("failed")
Expand All @@ -90,10 +118,12 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
return

case p := <-in:
for i,publishFn := range publishFns {
err := publishSingleMessageWithRetries(p, 30, time.Second, publishFn)
log.Log.Debugw("Published to", "idx", i)

for _, inChan := range inChans {
inChan <- p
}

for _, outChan := range outChans {
err := <-outChan

if err != nil {
metricSendFailed.Inc()
Expand All @@ -102,12 +132,12 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
"message", p)
} else {
metricSendSuccess.Inc()

// We want to make sure we do this *after* we've successfully published
// the messages
timestampC <- p.OplogTimestamp
}
}

// We want to make sure we do this *after* we've successfully published
// the messages
timestampC <- p.OplogTimestamp
}
}
}
Expand Down

0 comments on commit b66afa4

Please sign in to comment.