From 136216af435cea9807c416304c6b33e3fa9d32a8 Mon Sep 17 00:00:00 2001 From: alex-goodisman <42395598+alex-goodisman@users.noreply.github.com> Date: Fri, 3 May 2024 15:51:45 -0400 Subject: [PATCH] OplogToRedis: Run writer routines in parallel for each redis client. (#75) in main, count through each redis client and make a separate chan and coroutine for clientsCount * ordinalCount different entries. make some changes to aggregation in order to accomodate this. in the tailer, get the [][]chan and for each incoming publication, first index by the shard ordinal (hash of db name), then iterate through the remaining chans (should be one for each redis client) and publish to all of them. at present, the publisher stream still accepts an array, but now it's only getting one (each). --- lib/oplog/tail.go | 21 ++++++++-- main.go | 104 +++++++++++++++++++++++++++++----------------- 2 files changed, 83 insertions(+), 42 deletions(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 54a9aa70..f5376332 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -115,9 +115,15 @@ func init() { prometheus.MustRegister(metricMaxOplogEntryByMinute) } +// PublisherChannels represents a collection of intake channels for a set of Redis Publishers. +// When multiple redis URLs are specified via OTR_REDIS_URL, each one produce a redis client, +// publisher coroutine, and intake channel. Since we want every message to go to all redis +// destinations, the tailer should send each message to all channels in the array. +type PublisherChannels []chan<- *redispub.Publication + // Tail begins tailing the oplog. It doesn't return unless it receives a message // on the stop channel, in which case it wraps up its work and then returns. -func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) { +func (tailer *Tailer) Tail(out []PublisherChannels, stop <-chan bool, readOrdinal, readParallelism int) { childStopC := make(chan bool) wasStopped := false @@ -141,7 +147,10 @@ func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool, } } -func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) { +// this accepts an array of PublisherChannels instances whose size is equal to the degree of write-parallelism. +// Each incoming message will be routed to one of the PublisherChannels instances based on its parallelism key +// (hash of the database name), then sent to every channel within that PublisherChannels instance. +func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOrdinal, readParallelism int) { session, err := tailer.MongoClient.StartSession() if err != nil { log.Log.Errorw("Failed to start Mongo session", "error", err) @@ -228,9 +237,15 @@ func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan b sendMetricsData() } + // determine which shard this message should route to // inIdx and outIdx may be different if there are different #s of read and write routines outIdx := assignToShard(pub.ParallelismKey, len(out)) - out[outIdx] <- pub + // get the set of publisher channels for that shard + pubChans := out[outIdx] + // send the message to each channel on that shard + for _, pubChan := range pubChans { + pubChan <- pub + } } else { log.Log.Error("Nil Redis publication") } diff --git a/main.go b/main.go index 80d8ddc8..7bc5aca6 100644 --- a/main.go +++ b/main.go @@ -42,14 +42,19 @@ func main() { } writeParallelism := config.WriteParallelism() + // each array of redis clients holds one client for each destination (regular redis, sentinel) + // the aggregated array holds one such array for every write-parallelism shard aggregatedRedisClients := make([][]redis.UniversalClient, writeParallelism) - aggregatedRedisPubs := make([]chan<- *redispub.Publication, writeParallelism) - stopRedisPubs := make([]chan bool, writeParallelism) + // make one PublisherChannels for each parallel writer + aggregatedRedisPubs := make([]oplog.PublisherChannels, writeParallelism) + // one stopper channel corresponds to each writer, so it uses the same 2D array structure. + stopRedisPubs := make([][]chan bool, writeParallelism) bufferSize := 10000 waitGroup := sync.WaitGroup{} denylist := sync.Map{} + // this loop starts one writer shard on each pass. Repeat it a number of times equal to the write parallelism level. for i := 0; i < writeParallelism; i++ { redisClients, err := createRedisClients() if err != nil { @@ -68,42 +73,60 @@ func main() { log.Log.Infow("Initialized connection to Redis", "i", i) aggregatedRedisClients[i] = redisClients + clientsSize := len(redisClients) + + // each writer shard is going to make multiple writer coroutines, one for each redis destination, + // so we create one PublisherChannels for this shard and put each coroutine's intake channel in it. + // these will all be aggregated in the aggregatedRedisPubs 2D array and passed to the tailer. + redisPubsAggregationEntry := make(oplog.PublisherChannels, clientsSize) + stopRedisPubsEntry := make([]chan bool, clientsSize) + + for j := 0; j < clientsSize; j++ { + redisClient := redisClients[i] + + redisPubs := make(chan *redispub.Publication, bufferSize) + redisPubsAggregationEntry[j] = redisPubs + + stopRedisPub := make(chan bool) + stopRedisPubsEntry[j] = stopRedisPub + + waitGroup.Add(1) + + // We create two goroutines: + // + // The oplog.Tail goroutine reads messages from the oplog, and generates the + // messages that we need to write to redis. It then writes them to a + // buffered channel. + // + // The redispub.PublishStream goroutine reads messages from the buffered channel + // and sends them to Redis. + // + // TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2) + go func(ordinal int, clientIndex int) { + redispub.PublishStream([]redis.UniversalClient{redisClient}, redisPubs, &redispub.PublishOpts{ + FlushInterval: config.TimestampFlushInterval(), + DedupeExpiration: config.RedisDedupeExpiration(), + MetadataPrefix: config.RedisMetadataPrefix(), + }, stopRedisPub, ordinal) + log.Log.Infow("Redis publisher completed", "ordinal", ordinal, "clientIndex", clientIndex) + waitGroup.Done() + }(i, j) + log.Log.Info("Started up processing goroutines") + + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "otr", + Name: "buffer_available", + Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.", + ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i), "clientIndex": strconv.Itoa(j)}, + }, func() float64 { + return float64(bufferSize - len(redisPubs)) + }) - // We crate two goroutines: - // - // The oplog.Tail goroutine reads messages from the oplog, and generates the - // messages that we need to write to redis. It then writes them to a - // buffered channel. - // - // The redispub.PublishStream goroutine reads messages from the buffered channel - // and sends them to Redis. - // - // TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2) - redisPubs := make(chan *redispub.Publication, bufferSize) - aggregatedRedisPubs[i] = redisPubs - - stopRedisPub := make(chan bool) - waitGroup.Add(1) - go func(ordinal int) { - redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{ - FlushInterval: config.TimestampFlushInterval(), - DedupeExpiration: config.RedisDedupeExpiration(), - MetadataPrefix: config.RedisMetadataPrefix(), - }, stopRedisPub, ordinal) - log.Log.Infow("Redis publisher completed", "i", ordinal) - waitGroup.Done() - }(i) - log.Log.Info("Started up processing goroutines") - stopRedisPubs[i] = stopRedisPub - - promauto.NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: "otr", - Name: "buffer_available", - Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.", - ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i)}, - }, func() float64 { - return float64(bufferSize - len(redisPubs)) - }) + } + + // aggregate + aggregatedRedisPubs[i] = redisPubsAggregationEntry + stopRedisPubs[i] = stopRedisPubsEntry } readParallelism := config.ReadParallelism() @@ -140,6 +163,7 @@ func main() { MaxCatchUp: config.MaxCatchUp(), Denylist: &denylist, } + // pass all intake channels to the tailer, which will route messages accordingly tailer.Tail(aggregatedRedisPubs, stopOplogTail, i, readParallelism) log.Log.Info("Oplog tailer completed") @@ -177,8 +201,10 @@ func main() { for _, stopOplogTail := range stopOplogTails { stopOplogTail <- true } - for _, stopRedisPub := range stopRedisPubs { - stopRedisPub <- true + for _, stopRedisPubEntry := range stopRedisPubs { + for _, stopRedisPub := range stopRedisPubEntry { + stopRedisPub <- true + } } err = httpServer.Shutdown(context.Background())