Skip to content

Commit

Permalink
comments and types
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-goodisman committed May 2, 2024
1 parent 353882c commit 12f7215
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
16 changes: 14 additions & 2 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,15 @@ func init() {
prometheus.MustRegister(metricMaxOplogEntryByMinute)
}

// PublisherChannels represents a collection of intake channels for a set of Redis Publishers.
// When multiple redis URLs are specified via OTR_REDIS_URL, each one produce a redis client,
// publisher coroutine, and intake channel. Since we want every message to go to all redis
// destinations, the tailer should send each message to all channels in the array.
type PublisherChannels []chan<- *redispub.Publication

// Tail begins tailing the oplog. It doesn't return unless it receives a message
// on the stop channel, in which case it wraps up its work and then returns.
func (tailer *Tailer) Tail(out [][]chan<- *redispub.Publication, stop <-chan bool) {
func (tailer *Tailer) Tail(out []PublisherChannels, stop <-chan bool) {
childStopC := make(chan bool)
wasStopped := false

Expand All @@ -140,7 +146,10 @@ func (tailer *Tailer) Tail(out [][]chan<- *redispub.Publication, stop <-chan boo
}
}

func (tailer *Tailer) tailOnce(out [][]chan<- *redispub.Publication, stop <-chan bool) {
// this accepts an array of PublisherChannels instances whose size is equal to the degree of write-parallelism.
// Each incoming message will be routed to one of the PublisherChannels instances based on its parallelism key
// (hash of the database name), then sent to every channel within that PublisherChannels instance.
func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool) {
parallelismSize := len(out)

session, err := tailer.MongoClient.StartSession()
Expand Down Expand Up @@ -214,8 +223,11 @@ func (tailer *Tailer) tailOnce(out [][]chan<- *redispub.Publication, stop <-chan

for _, pub := range pubs {
if pub != nil {
// determine which shard this message should route to
outIdx := (pub.ParallelismKey%parallelismSize + parallelismSize) % parallelismSize
// get the set of publisher channels for that shard
pubChans := out[outIdx]
// send the message to each channel on that shard
for _, pubChan := range pubChans {
pubChan <- pub
}
Expand Down
16 changes: 13 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,19 @@ func main() {
log.Log.Info("Initialized connection to Mongo")

parallelism := config.WriteParallelism()
// each array of redis clients holds one client for each destination (regular redis, sentinel)
// the aggregated array holds one such array for every write-parallelism shard
aggregatedRedisClients := make([][]redis.UniversalClient, parallelism)
aggregatedRedisPubs := make([][]chan<- *redispub.Publication, parallelism)
// make one PublisherChannels for each parallel writer
aggregatedRedisPubs := make([]oplog.PublisherChannels, parallelism)
// one stopper channel corresponds to each writer, so it uses the same 2D array structure.
stopRedisPubs := make([][]chan bool, parallelism)

bufferSize := 10000
waitGroup := sync.WaitGroup{}
denylist := sync.Map{}

// this loop starts one writer shard on each pass. Repeat it a number of times equal to the write parallelism level.
for i := 0; i < config.WriteParallelism(); i++ {
redisClients, err := createRedisClients()
if err != nil {
Expand All @@ -85,7 +90,10 @@ func main() {
aggregatedRedisClients[i] = redisClients
clientsSize := len(redisClients)

redisPubsAggregationEntry := make([]chan<- *redispub.Publication, clientsSize)
// each writer shard is going to make multiple writer coroutines, one for each redis destination,
// so we create one PublisherChannels for this shard and put each coroutine's intake channel in it.
// these will all be aggregated in the aggregatedRedisPubs 2D array and passed to the tailer.
redisPubsAggregationEntry := make(oplog.PublisherChannels, clientsSize)
stopRedisPubsEntry := make([]chan bool, clientsSize)

for j := 0; j < clientsSize; j++ {
Expand All @@ -99,7 +107,7 @@ func main() {

waitGroup.Add(1)

// We crate two goroutines:
// We create two goroutines:
//
// The oplog.Tail goroutine reads messages from the oplog, and generates the
// messages that we need to write to redis. It then writes them to a
Expand Down Expand Up @@ -131,6 +139,7 @@ func main() {

}

// aggregate
aggregatedRedisPubs[i] = redisPubsAggregationEntry
stopRedisPubs[i] = stopRedisPubsEntry
}
Expand All @@ -146,6 +155,7 @@ func main() {
MaxCatchUp: config.MaxCatchUp(),
Denylist: &denylist,
}
// pass all intake channels to the tailer, which will route messages accordingly
tailer.Tail(aggregatedRedisPubs, stopOplogTail)

log.Log.Info("Oplog tailer completed")
Expand Down

0 comments on commit 12f7215

Please sign in to comment.