From 504a828962fcf162e3f000dd02824622dab53488 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Wed, 17 Apr 2024 13:51:38 -0400 Subject: [PATCH 01/23] customerlist first draft --- lib/customers/main.go | 7 ++ lib/oplog/tail.go | 55 +++++++++----- lib/redispub/lastProcessedTime.go | 4 +- lib/redispub/publisher.go | 17 +++-- main.go | 115 +++++++++++++++++------------- 5 files changed, 117 insertions(+), 81 deletions(-) create mode 100644 lib/customers/main.go diff --git a/lib/customers/main.go b/lib/customers/main.go new file mode 100644 index 00000000..c6d39807 --- /dev/null +++ b/lib/customers/main.go @@ -0,0 +1,7 @@ +package customers + +func AllCustomers() []string { + return []string{ + "factory", + } +} diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 0f62b032..d60f764e 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -25,10 +25,10 @@ import ( // Tailer persistently tails the oplog of a Mongo cluster, handling // reconnection and resumption of where it left off. type Tailer struct { - MongoClient *mongo.Client + MongoClient *mongo.Client RedisClients []redis.UniversalClient - RedisPrefix string - MaxCatchUp time.Duration + RedisPrefix string + MaxCatchUp time.Duration } // Raw oplog entry from Mongo @@ -107,7 +107,7 @@ func init() { // Tail begins tailing the oplog. It doesn't return unless it receives a message // on the stop channel, in which case it wraps up its work and then returns. -func (tailer *Tailer) Tail(out chan<- *redispub.Publication, stop <-chan bool) { +func (tailer *Tailer) Tail(out chan<- *redispub.Publication, stop <-chan bool, customer string) { childStopC := make(chan bool) wasStopped := false @@ -118,9 +118,9 @@ func (tailer *Tailer) Tail(out chan<- *redispub.Publication, stop <-chan bool) { }() for { - log.Log.Info("Starting oplog tailing") - tailer.tailOnce(out, childStopC) - log.Log.Info("Oplog tailing ended") + log.Log.Info("Starting oplog tailing for " + customer) + tailer.tailOnce(out, childStopC, customer) + log.Log.Info("Oplog tailing ended for " + customer) if wasStopped { return @@ -131,7 +131,7 @@ func (tailer *Tailer) Tail(out chan<- *redispub.Publication, stop <-chan bool) { } } -func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan bool) { +func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan bool, customer string) { session, err := tailer.MongoClient.StartSession() if err != nil { log.Log.Errorw("Failed to start Mongo session", "error", err) @@ -140,7 +140,7 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo oplogCollection := session.Client().Database("local").Collection("oplog.rs") - startTime := tailer.getStartTime(func() (*primitive.Timestamp, error) { + startTime := tailer.getStartTime(customer, func() (*primitive.Timestamp, error) { // Get the timestamp of the last entry in the oplog (as a position to // start from if we don't have a last-written timestamp from Redis) var entry rawOplogEntry @@ -150,7 +150,14 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo queryContext, queryContextCancel := context.WithTimeout(context.Background(), config.MongoQueryTimeout()) defer queryContextCancel() - result := oplogCollection.FindOne(queryContext, bson.M{}, findOneOpts) + timeFilter := bson.M{} + if customer != "" { + timeFilter["ns"] = bson.M{ + "$regex": customer + "\\..*", + } + } + + result := oplogCollection.FindOne(queryContext, timeFilter, findOneOpts) if result.Err() != nil { return nil, result.Err() @@ -168,7 +175,7 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo return &ts, nil }) - query, queryErr := issueOplogFindQuery(oplogCollection, startTime) + query, queryErr := issueOplogFindQuery(oplogCollection, startTime, customer) if queryErr != nil { log.Log.Errorw("Error issuing tail query", "error", queryErr) @@ -215,7 +222,7 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo // timeout after our timeout duration, and we'll create a new one. log.Log.Debug("Oplog cursor timed out, will retry") - query, queryErr = issueOplogFindQuery(oplogCollection, lastTimestamp) + query, queryErr = issueOplogFindQuery(oplogCollection, lastTimestamp, customer) if queryErr != nil { log.Log.Errorw("Error issuing tail query", "error", queryErr) @@ -226,7 +233,7 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo } else if status.DidLosePosition { // Our cursor expired. Make a new cursor to pick up from where we // left off. - query, queryErr = issueOplogFindQuery(oplogCollection, lastTimestamp) + query, queryErr = issueOplogFindQuery(oplogCollection, lastTimestamp, customer) if queryErr != nil { log.Log.Errorw("Error issuing tail query", "error", queryErr) @@ -300,7 +307,7 @@ func readNextFromCursor(cursor *mongo.Cursor) (status cursorResultStatus, err er return } -func issueOplogFindQuery(c *mongo.Collection, startTime primitive.Timestamp) (*mongo.Cursor, error) { +func issueOplogFindQuery(c *mongo.Collection, startTime primitive.Timestamp, customer string) (*mongo.Cursor, error) { queryOpts := &options.FindOptions{} queryOpts.SetSort(bson.M{"$natural": 1}) queryOpts.SetCursorType(options.TailableAwait) @@ -308,9 +315,19 @@ func issueOplogFindQuery(c *mongo.Collection, startTime primitive.Timestamp) (*m queryContext, queryContextCancel := context.WithTimeout(context.Background(), config.MongoQueryTimeout()) defer queryContextCancel() - return c.Find(queryContext, bson.M{ - "ts": bson.M{"$gt": startTime}, - }, queryOpts) + queryFilter := bson.M{ + "ts": bson.M{ + "$gt": startTime, + }, + } + + if customer != "" { + queryFilter["ns"] = bson.M{ + "$regex": customer + "\\..*", // match "{customer}.{anything}" + } + } + + return c.Find(queryContext, queryFilter, queryOpts) } func closeCursor(cursor *mongo.Cursor) { @@ -403,8 +420,8 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw) (timestamp *primitive.Tim // We take the function to get the timestamp of the last oplog entry (as a // fallback if we don't have a latest timestamp from Redis) as an arg instead // of using tailer.mongoClient directly so we can unit test this function -func (tailer *Tailer) getStartTime(getTimestampOfLastOplogEntry func() (*primitive.Timestamp, error)) primitive.Timestamp { - ts, tsTime, redisErr := redispub.LastProcessedTimestamp(tailer.RedisClients[0], tailer.RedisPrefix) +func (tailer *Tailer) getStartTime(customer string, getTimestampOfLastOplogEntry func() (*primitive.Timestamp, error)) primitive.Timestamp { + ts, tsTime, redisErr := redispub.LastProcessedTimestamp(tailer.RedisClients[0], tailer.RedisPrefix, customer) if redisErr == nil { // we have a last write time, check that it's not too far in the diff --git a/lib/redispub/lastProcessedTime.go b/lib/redispub/lastProcessedTime.go index dbfa44ba..e852dd84 100644 --- a/lib/redispub/lastProcessedTime.go +++ b/lib/redispub/lastProcessedTime.go @@ -17,8 +17,8 @@ import ( // // If oplogtoredis has not processed any messages, returns redis.Nil as an // error. -func LastProcessedTimestamp(redisClient redis.UniversalClient, metadataPrefix string) (primitive.Timestamp, time.Time, error) { - str, err := redisClient.Get(context.Background(), metadataPrefix+"lastProcessedEntry").Result() +func LastProcessedTimestamp(redisClient redis.UniversalClient, metadataPrefix string, customer string) (primitive.Timestamp, time.Time, error) { + str, err := redisClient.Get(context.Background(), metadataPrefix+"lastProcessedEntry."+customer).Result() if err != nil { return primitive.Timestamp{}, time.Unix(0, 0), err } diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index f6363eac..f98ee669 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -63,23 +63,23 @@ var metricLastCommandDuration = promauto.NewGauge(prometheus.GaugeOpts{ // PublishStream reads Publications from the given channel and publishes them // to Redis. -func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts *PublishOpts, stop <-chan bool) { +func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts *PublishOpts, stop <-chan bool, customer string) { // Start up a background goroutine for periodically updating the last-processed // timestamp timestampC := make(chan primitive.Timestamp) - for _,client := range clients { - go periodicallyUpdateTimestamp(client, timestampC, opts) + for _, client := range clients { + go periodicallyUpdateTimestamp(client, timestampC, opts, customer) } // Redis expiration is in integer seconds, so we have to convert the // time.Duration dedupeExpirationSeconds := int(opts.DedupeExpiration.Seconds()) - type PubFn func(*Publication)error + type PubFn func(*Publication) error var publishFns []PubFn - for _,client := range clients { + for _, client := range clients { client := client publishFn := func(p *Publication) error { return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds) @@ -97,11 +97,10 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts return case p := <-in: - for i,publishFn := range publishFns { + for i, publishFn := range publishFns { err := publishSingleMessageWithRetries(p, 30, time.Second, publishFn) log.Log.Debugw("Published to", "idx", i) - if err != nil { metricSendFailed.Inc() log.Log.Errorw("Permanent error while trying to publish message; giving up", @@ -181,14 +180,14 @@ func formatKey(p *Publication, prefix string) string { // channel, and this function throttles that to only update occasionally. // // This blocks forever; it should be run in a goroutine -func periodicallyUpdateTimestamp(client redis.UniversalClient, timestamps <-chan primitive.Timestamp, opts *PublishOpts) { +func periodicallyUpdateTimestamp(client redis.UniversalClient, timestamps <-chan primitive.Timestamp, opts *PublishOpts, customer string) { var lastFlush time.Time var mostRecentTimestamp primitive.Timestamp var needFlush bool flush := func() { if needFlush { - client.Set(context.Background(), opts.MetadataPrefix+"lastProcessedEntry", encodeMongoTimestamp(mostRecentTimestamp), 0) + client.Set(context.Background(), opts.MetadataPrefix+"lastProcessedEntry."+customer, encodeMongoTimestamp(mostRecentTimestamp), 0) lastFlush = time.Now() needFlush = false } diff --git a/main.go b/main.go index 7eb68144..2d477597 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( "go.mongodb.org/mongo-driver/mongo/readpref" "github.com/tulip/oplogtoredis/lib/config" + "github.com/tulip/oplogtoredis/lib/customers" "github.com/tulip/oplogtoredis/lib/log" "github.com/tulip/oplogtoredis/lib/oplog" "github.com/tulip/oplogtoredis/lib/parse" @@ -68,56 +69,65 @@ func main() { }() log.Log.Info("Initialized connection to Redis") - // We crate two goroutines: - // - // The oplog.Tail goroutine reads messages from the oplog, and generates the - // messages that we need to write to redis. It then writes them to a - // buffered channel. - // - // The redispub.PublishStream goroutine reads messages from the buffered channel - // and sends them to Redis. - // - // TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2) - bufferSize := 10000 - redisPubs := make(chan *redispub.Publication, bufferSize) - - promauto.NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: "otr", - Name: "buffer_available", - Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.", - }, func () float64 { - return float64(bufferSize - len(redisPubs)) - }) - - waitGroup := sync.WaitGroup{} - - stopOplogTail := make(chan bool) - waitGroup.Add(1) - go func() { - tailer := oplog.Tailer{ - MongoClient: mongoSession, - RedisClients: redisClients, - RedisPrefix: config.RedisMetadataPrefix(), - MaxCatchUp: config.MaxCatchUp(), - } - tailer.Tail(redisPubs, stopOplogTail) + stoppers := []chan bool{} + waitgroups := []*sync.WaitGroup{} + + for _, customer := range customers.AllCustomers() { + + // We crate two goroutines: + // + // The oplog.Tail goroutine reads messages from the oplog, and generates the + // messages that we need to write to redis. It then writes them to a + // buffered channel. + // + // The redispub.PublishStream goroutine reads messages from the buffered channel + // and sends them to Redis. + // + // TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2) + bufferSize := 10000 + redisPubs := make(chan *redispub.Publication, bufferSize) + + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "otr", + Name: "buffer_available", + Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.", + }, func() float64 { + return float64(bufferSize - len(redisPubs)) + }) - log.Log.Info("Oplog tailer completed") - waitGroup.Done() - }() + waitGroup := sync.WaitGroup{} - stopRedisPub := make(chan bool) - waitGroup.Add(1) - go func() { - redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{ - FlushInterval: config.TimestampFlushInterval(), - DedupeExpiration: config.RedisDedupeExpiration(), - MetadataPrefix: config.RedisMetadataPrefix(), - }, stopRedisPub) - log.Log.Info("Redis publisher completed") - waitGroup.Done() - }() - log.Log.Info("Started up processing goroutines") + stopOplogTail := make(chan bool) + waitGroup.Add(1) + go func() { + tailer := oplog.Tailer{ + MongoClient: mongoSession, + RedisClients: redisClients, + RedisPrefix: config.RedisMetadataPrefix(), + MaxCatchUp: config.MaxCatchUp(), + } + tailer.Tail(redisPubs, stopOplogTail, customer) + + log.Log.Info("Oplog tailer completed") + waitGroup.Done() + }() + + stopRedisPub := make(chan bool) + waitGroup.Add(1) + go func() { + redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{ + FlushInterval: config.TimestampFlushInterval(), + DedupeExpiration: config.RedisDedupeExpiration(), + MetadataPrefix: config.RedisMetadataPrefix(), + }, stopRedisPub, customer) + log.Log.Info("Redis publisher completed") + waitGroup.Done() + }() + log.Log.Info("Started up processing goroutines") + + stoppers = append(stoppers, stopOplogTail, stopRedisPub) + waitgroups = append(waitgroups, &waitGroup) + } // Start one more goroutine for the HTTP server httpServer := makeHTTPServer(redisClients, mongoSession) @@ -146,8 +156,9 @@ func main() { log.Log.Warnf("Exiting cleanly due to signal %s. Interrupt again to force unclean shutdown.", sig) signal.Reset() - stopOplogTail <- true - stopRedisPub <- true + for _, stopper := range stoppers { + stopper <- true + } err = httpServer.Shutdown(context.Background()) if err != nil { @@ -155,7 +166,9 @@ func main() { "error", err) } - waitGroup.Wait() + for _, waitGroup := range waitgroups { + waitGroup.Wait() + } } // Connects to mongo From 075804e7ba5e47b077af82087373a05ba5e167f0 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Wed, 17 Apr 2024 14:39:35 -0400 Subject: [PATCH 02/23] some lint or something --- lib/oplog/tail_test.go | 8 ++++---- lib/redispub/lastProcessedTime_test.go | 12 ++++++------ lib/redispub/publisher_test.go | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/oplog/tail_test.go b/lib/oplog/tail_test.go index 8abcca44..144ed90c 100644 --- a/lib/oplog/tail_test.go +++ b/lib/oplog/tail_test.go @@ -73,7 +73,7 @@ func TestGetStartTime(t *testing.T) { panic(err) } defer redisServer.Close() - require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry", strconv.FormatInt(int64(test.redisTimestamp.T), 10))) + require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry.test", strconv.FormatInt(int64(test.redisTimestamp.T), 10))) redisClient := []redis.UniversalClient{redis.NewUniversalClient(&redis.UniversalOptions{ Addrs: []string{redisServer.Addr()}, @@ -81,11 +81,11 @@ func TestGetStartTime(t *testing.T) { tailer := Tailer{ RedisClients: redisClient, - RedisPrefix: "someprefix.", - MaxCatchUp: maxCatchUp, + RedisPrefix: "someprefix.", + MaxCatchUp: maxCatchUp, } - actualResult := tailer.getStartTime(func() (*primitive.Timestamp, error) { + actualResult := tailer.getStartTime("test", func() (*primitive.Timestamp, error) { if test.mongoEndOfOplogErr != nil { return nil, test.mongoEndOfOplogErr } diff --git a/lib/redispub/lastProcessedTime_test.go b/lib/redispub/lastProcessedTime_test.go index ffb4b185..948279a7 100644 --- a/lib/redispub/lastProcessedTime_test.go +++ b/lib/redispub/lastProcessedTime_test.go @@ -31,9 +31,9 @@ func TestLastProcessedTimestampSuccess(t *testing.T) { redisServer, redisClient := startMiniredis() defer redisServer.Close() - require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry", encodeMongoTimestamp(nowTS))) + require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry.test", encodeMongoTimestamp(nowTS))) - gotTS, gotTime, err := LastProcessedTimestamp(redisClient, "someprefix.") + gotTS, gotTime, err := LastProcessedTimestamp(redisClient, "someprefix.", "test") if err != nil { t.Errorf("Got unexpected error: %s", err) @@ -52,7 +52,7 @@ func TestLastProcessedTimestampNoRecord(t *testing.T) { redisServer, redisClient := startMiniredis() defer redisServer.Close() - _, _, err := LastProcessedTimestamp(redisClient, "someprefix.") + _, _, err := LastProcessedTimestamp(redisClient, "someprefix.", "test") if err == nil { t.Errorf("Expected redis.Nil error, got no error") @@ -64,9 +64,9 @@ func TestLastProcessedTimestampNoRecord(t *testing.T) { func TestLastProcessedTimestampInvalidRecord(t *testing.T) { redisServer, redisClient := startMiniredis() defer redisServer.Close() - require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry", "not a number")) + require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry.test", "not a number")) - _, _, err := LastProcessedTimestamp(redisClient, "someprefix.") + _, _, err := LastProcessedTimestamp(redisClient, "someprefix.", "test") if err == nil { t.Errorf("Expected strconv error, got no error") @@ -80,7 +80,7 @@ func TestLastProcessedTimestampRedisError(t *testing.T) { Addrs: []string{"not a server"}, }) - _, _, err := LastProcessedTimestamp(redisClient, "someprefix.") + _, _, err := LastProcessedTimestamp(redisClient, "someprefix.", "test") if err == nil { t.Errorf("Expected TCP error, got no error") diff --git a/lib/redispub/publisher_test.go b/lib/redispub/publisher_test.go index 0d7f39fa..bd4d3f10 100644 --- a/lib/redispub/publisher_test.go +++ b/lib/redispub/publisher_test.go @@ -121,11 +121,11 @@ func TestPeriodicallyUpdateTimestamp(t *testing.T) { periodicallyUpdateTimestamp(redisClient, timestampC, &PublishOpts{ MetadataPrefix: "someprefix.", FlushInterval: testSpeed, - }) + }, "test") waitGroup.Done() }() - key := "someprefix.lastProcessedEntry" + key := "someprefix.lastProcessedEntry.test" // Key should be unset if redisServer.Exists(key) { From 4985057e25525d72c3bcf052635ba57a9c8836f0 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Wed, 17 Apr 2024 14:56:03 -0400 Subject: [PATCH 03/23] head this one off at the pass? --- lib/customers/main.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/customers/main.go b/lib/customers/main.go index c6d39807..86a17112 100644 --- a/lib/customers/main.go +++ b/lib/customers/main.go @@ -3,5 +3,9 @@ package customers func AllCustomers() []string { return []string{ "factory", + "dev", + "tests", + "something", + "xxx", } } From 8217bfa08f9547d28146432ac74002fde50fbfb6 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Wed, 17 Apr 2024 15:01:00 -0400 Subject: [PATCH 04/23] rename the buffer gauge --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 2d477597..b8b310ae 100644 --- a/main.go +++ b/main.go @@ -89,7 +89,7 @@ func main() { promauto.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "otr", - Name: "buffer_available", + Name: "buffer_available_" + customer, Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.", }, func() float64 { return float64(bufferSize - len(redisPubs)) From 6cf53c88fe3aec2d3485eccc1dbdf93c929b1938 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Wed, 17 Apr 2024 15:05:20 -0400 Subject: [PATCH 05/23] maybe this? --- lib/customers/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/customers/main.go b/lib/customers/main.go index 86a17112..ea755ee7 100644 --- a/lib/customers/main.go +++ b/lib/customers/main.go @@ -4,6 +4,7 @@ func AllCustomers() []string { return []string{ "factory", "dev", + "test", "tests", "something", "xxx", From 70f3d1a392e301ba5ea2bcf1323bfb9e5ccec84f Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Thu, 18 Apr 2024 14:25:57 -0400 Subject: [PATCH 06/23] try to receive admin cmds also --- lib/oplog/tail.go | 42 +++++++++++++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index d60f764e..d07ea752 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -202,7 +202,7 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo continue } - ts, pubs := tailer.unmarshalEntry(rawData) + ts, pubs := tailer.unmarshalEntry(rawData, customer) if ts != nil { lastTimestamp = *ts @@ -322,8 +322,26 @@ func issueOplogFindQuery(c *mongo.Collection, startTime primitive.Timestamp, cus } if customer != "" { - queryFilter["ns"] = bson.M{ - "$regex": customer + "\\..*", // match "{customer}.{anything}" + queryFilter = bson.M{ + "$and": []bson.M{ + { + "ts": bson.M{ + "$gt": startTime, + }, + }, + { + "$or": []bson.M{ + { + "ns": bson.M{ + "$regex": customer + "\\..*", // match "{customer}.{anything}" + }, + }, + { + "ns": "admin.$cmd", // match exactly "admin.$cmd" + }, + }, + }, + }, } } @@ -345,7 +363,7 @@ func closeCursor(cursor *mongo.Cursor) { // // The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it // ignored it or failed at some later step. -func (tailer *Tailer) unmarshalEntry(rawData bson.Raw) (timestamp *primitive.Timestamp, pubs []*redispub.Publication) { +func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, customer string) (timestamp *primitive.Timestamp, pubs []*redispub.Publication) { var result rawOplogEntry err := bson.Unmarshal(rawData, &result) @@ -356,7 +374,7 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw) (timestamp *primitive.Tim timestamp = &result.Timestamp - entries := tailer.parseRawOplogEntry(result, nil) + entries := tailer.parseRawOplogEntry(result, nil, customer) log.Log.Debugw("Received oplog entry", "entry", result, "processTime", time.Now().UnixMilli()) status := "ignored" @@ -451,7 +469,7 @@ func (tailer *Tailer) getStartTime(customer string, getTimestampOfLastOplogEntry } // converts a rawOplogEntry to an oplogEntry -func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []oplogEntry { +func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint, customer string) []oplogEntry { if txIdx == nil { idx := uint(0) txIdx = &idx @@ -465,6 +483,14 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl return nil } + // only return leaf nodes if they match the namespace prefix (db/customer name) + // this should normally be filtered out by the mongo query, + // but because of tx documents or other admin commands, we might get them anyway. + // so just return an empty array in that case. + if !strings.HasPrefix(entry.Namespace, customer) { + return []oplogEntry{} + } + out := oplogEntry{ Operation: entry.Operation, Timestamp: entry.Timestamp, @@ -504,7 +530,9 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl for _, v := range txData.ApplyOps { v.Timestamp = entry.Timestamp - ret = append(ret, tailer.parseRawOplogEntry(v, txIdx)...) + // in case the nested entries match this customer, recur all of them. + // when leaves eventually are parsed, the final array will only contain matched ones. + ret = append(ret, tailer.parseRawOplogEntry(v, txIdx, customer)...) } return ret From fbbe725deb85e73f45446084c17117422f530c09 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Thu, 18 Apr 2024 14:31:45 -0400 Subject: [PATCH 07/23] fix another test --- lib/oplog/tail_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/oplog/tail_test.go b/lib/oplog/tail_test.go index 144ed90c..09f1695c 100644 --- a/lib/oplog/tail_test.go +++ b/lib/oplog/tail_test.go @@ -285,7 +285,7 @@ func TestParseRawOplogEntry(t *testing.T) { for testName, test := range tests { t.Run(testName, func(t *testing.T) { - got := (&Tailer{}).parseRawOplogEntry(test.in, nil) + got := (&Tailer{}).parseRawOplogEntry(test.in, nil, "foo") if diff := pretty.Compare(got, test.want); diff != "" { t.Errorf("Got incorrect result (-got +want)\n%s", diff) From 741600c20c86faec957e51382c401b26cd4efc6e Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Thu, 18 Apr 2024 14:34:58 -0400 Subject: [PATCH 08/23] we grasp at straws again --- lib/oplog/tail.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index d07ea752..3fe1a497 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -487,9 +487,9 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint, custo // this should normally be filtered out by the mongo query, // but because of tx documents or other admin commands, we might get them anyway. // so just return an empty array in that case. - if !strings.HasPrefix(entry.Namespace, customer) { - return []oplogEntry{} - } + // if !strings.HasPrefix(entry.Namespace, customer) { + // return []oplogEntry{} + // } out := oplogEntry{ Operation: entry.Operation, From 11dab62cafd4c0c3aeacd4df9ff4c1860a44414e Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Thu, 18 Apr 2024 14:45:04 -0400 Subject: [PATCH 09/23] try inlining the filters --- lib/oplog/tail.go | 30 ++++++------------------------ 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 3fe1a497..1b2752fc 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -153,7 +153,7 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo timeFilter := bson.M{} if customer != "" { timeFilter["ns"] = bson.M{ - "$regex": customer + "\\..*", + "$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)", } } @@ -322,26 +322,8 @@ func issueOplogFindQuery(c *mongo.Collection, startTime primitive.Timestamp, cus } if customer != "" { - queryFilter = bson.M{ - "$and": []bson.M{ - { - "ts": bson.M{ - "$gt": startTime, - }, - }, - { - "$or": []bson.M{ - { - "ns": bson.M{ - "$regex": customer + "\\..*", // match "{customer}.{anything}" - }, - }, - { - "ns": "admin.$cmd", // match exactly "admin.$cmd" - }, - }, - }, - }, + queryFilter["ns"] = bson.M{ + "$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)", } } @@ -487,9 +469,9 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint, custo // this should normally be filtered out by the mongo query, // but because of tx documents or other admin commands, we might get them anyway. // so just return an empty array in that case. - // if !strings.HasPrefix(entry.Namespace, customer) { - // return []oplogEntry{} - // } + if !strings.HasPrefix(entry.Namespace, customer) { + return []oplogEntry{} + } out := oplogEntry{ Operation: entry.Operation, From 9fb353b46aec088538e631c111788acdcb704514 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Thu, 18 Apr 2024 14:53:21 -0400 Subject: [PATCH 10/23] what if there was no filter at all --- lib/oplog/tail.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 1b2752fc..63933748 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -321,11 +321,11 @@ func issueOplogFindQuery(c *mongo.Collection, startTime primitive.Timestamp, cus }, } - if customer != "" { - queryFilter["ns"] = bson.M{ - "$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)", - } - } + // if customer != "" { + // queryFilter["ns"] = bson.M{ + // "$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)", + // } + // } return c.Find(queryContext, queryFilter, queryOpts) } From a31ebd0137cd45214e2954ac880ac6ce6e2d4983 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Thu, 18 Apr 2024 15:03:20 -0400 Subject: [PATCH 11/23] i have no idea --- lib/customers/main.go | 5 ----- lib/oplog/tail.go | 10 +++++----- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/lib/customers/main.go b/lib/customers/main.go index ea755ee7..02fb1cfd 100644 --- a/lib/customers/main.go +++ b/lib/customers/main.go @@ -2,11 +2,6 @@ package customers func AllCustomers() []string { return []string{ - "factory", - "dev", - "test", "tests", - "something", - "xxx", } } diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 63933748..fce12f39 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -151,11 +151,11 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo defer queryContextCancel() timeFilter := bson.M{} - if customer != "" { - timeFilter["ns"] = bson.M{ - "$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)", - } - } + // if customer != "" { + // timeFilter["ns"] = bson.M{ + // "$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)", + // } + // } result := oplogCollection.FindOne(queryContext, timeFilter, findOneOpts) From d08e6f50c5f4f2c5cfc1795624f6b96d5e348044 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Thu, 18 Apr 2024 15:23:37 -0400 Subject: [PATCH 12/23] put the filters back in --- lib/oplog/tail.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index fce12f39..1b2752fc 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -151,11 +151,11 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo defer queryContextCancel() timeFilter := bson.M{} - // if customer != "" { - // timeFilter["ns"] = bson.M{ - // "$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)", - // } - // } + if customer != "" { + timeFilter["ns"] = bson.M{ + "$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)", + } + } result := oplogCollection.FindOne(queryContext, timeFilter, findOneOpts) @@ -321,11 +321,11 @@ func issueOplogFindQuery(c *mongo.Collection, startTime primitive.Timestamp, cus }, } - // if customer != "" { - // queryFilter["ns"] = bson.M{ - // "$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)", - // } - // } + if customer != "" { + queryFilter["ns"] = bson.M{ + "$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)", + } + } return c.Find(queryContext, queryFilter, queryOpts) } From 9ccba6ada84df3c717607e7d2f37404b56cef916 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Thu, 18 Apr 2024 15:30:33 -0400 Subject: [PATCH 13/23] moonshot --- lib/customers/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/customers/main.go b/lib/customers/main.go index 02fb1cfd..e5178338 100644 --- a/lib/customers/main.go +++ b/lib/customers/main.go @@ -3,5 +3,6 @@ package customers func AllCustomers() []string { return []string{ "tests", + "test", } } From fd0722c62c6696bbefb9ce71fea32e86df5eceaf Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Thu, 18 Apr 2024 18:10:45 -0400 Subject: [PATCH 14/23] coroutines make my head hurt --- main.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index b8b310ae..ffc12ccd 100644 --- a/main.go +++ b/main.go @@ -99,30 +99,30 @@ func main() { stopOplogTail := make(chan bool) waitGroup.Add(1) - go func() { + go func(c string) { tailer := oplog.Tailer{ MongoClient: mongoSession, RedisClients: redisClients, RedisPrefix: config.RedisMetadataPrefix(), MaxCatchUp: config.MaxCatchUp(), } - tailer.Tail(redisPubs, stopOplogTail, customer) + tailer.Tail(redisPubs, stopOplogTail, c) log.Log.Info("Oplog tailer completed") waitGroup.Done() - }() + }(customer) stopRedisPub := make(chan bool) waitGroup.Add(1) - go func() { + go func(c string) { redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{ FlushInterval: config.TimestampFlushInterval(), DedupeExpiration: config.RedisDedupeExpiration(), MetadataPrefix: config.RedisMetadataPrefix(), - }, stopRedisPub, customer) + }, stopRedisPub, c) log.Log.Info("Redis publisher completed") waitGroup.Done() - }() + }(customer) log.Log.Info("Started up processing goroutines") stoppers = append(stoppers, stopOplogTail, stopRedisPub) From ea8643279e8cad2ef0c77441acfdb95e0ca9eef2 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Thu, 18 Apr 2024 18:51:21 -0400 Subject: [PATCH 15/23] fix redis and comments --- lib/oplog/tail.go | 4 ++++ main.go | 42 +++++++++++++++++++++++++----------------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 1b2752fc..fa311cc1 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -150,6 +150,8 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo queryContext, queryContextCancel := context.WithTimeout(context.Background(), config.MongoQueryTimeout()) defer queryContextCancel() + // if we're getting timestamp from the oplog, we need to filter the same way we would filter the actual + // tail, so filter out everything except for the specififed namespace, or admin.$cmd timeFilter := bson.M{} if customer != "" { timeFilter["ns"] = bson.M{ @@ -321,6 +323,8 @@ func issueOplogFindQuery(c *mongo.Collection, startTime primitive.Timestamp, cus }, } + // to prevent each processor coroutine from blocking each other, each one should add a query filter + // for only that customer's messages. However we also have to get admin.$cmd, for transaction operations. if customer != "" { queryFilter["ns"] = bson.M{ "$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)", diff --git a/main.go b/main.go index ffc12ccd..4f095eba 100644 --- a/main.go +++ b/main.go @@ -39,6 +39,7 @@ func main() { panic("Error parsing environment variables: " + err.Error()) } + // share single mongo connection (pool) between all coroutines mongoSession, err := createMongoClient() if err != nil { panic("Error initializing oplog tailer: " + err.Error()) @@ -54,27 +55,33 @@ func main() { }() log.Log.Info("Initialized connection to Mongo") - redisClients, err := createRedisClients() - if err != nil { - panic("Error initializing Redis client: " + err.Error()) - } - defer func() { - for _, redisClient := range redisClients { - redisCloseErr := redisClient.Close() - if redisCloseErr != nil { - log.Log.Errorw("Error closing Redis client", - "error", redisCloseErr) - } - } - }() - log.Log.Info("Initialized connection to Redis") - + // accumulate from all the coroutines stoppers := []chan bool{} waitgroups := []*sync.WaitGroup{} + allRedisClients := []redis.UniversalClient{} + // TEMPORARY: hardcode list of customers and run all the processors based on that list + // TODO: have a singleton coroutine that ingests everything and uses it to detect new customers for _, customer := range customers.AllCustomers() { - // We crate two goroutines: + // each processing coroutine needs its own redis clients, so that they don't get bottlenecked + // on one customer's redis write speed + redisClients, err := createRedisClients() + if err != nil { + panic("Error initializing Redis client: " + err.Error()) + } + defer func() { + for _, redisClient := range redisClients { + redisCloseErr := redisClient.Close() + if redisCloseErr != nil { + log.Log.Errorw("Error closing Redis client", + "error", redisCloseErr) + } + } + }() + log.Log.Info("Initialized connection to Redis") + + // For each processor, we crate two goroutines: // // The oplog.Tail goroutine reads messages from the oplog, and generates the // messages that we need to write to redis. It then writes them to a @@ -127,10 +134,11 @@ func main() { stoppers = append(stoppers, stopOplogTail, stopRedisPub) waitgroups = append(waitgroups, &waitGroup) + allRedisClients = append(allRedisClients, redisClients...) } // Start one more goroutine for the HTTP server - httpServer := makeHTTPServer(redisClients, mongoSession) + httpServer := makeHTTPServer(allRedisClients, mongoSession) go func() { httpErr := httpServer.ListenAndServe() if httpErr != nil { From b43421b158a8196bf38980f3498af46d895f311e Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Thu, 18 Apr 2024 19:30:20 -0400 Subject: [PATCH 16/23] fault injection maybe uses a different name --- lib/customers/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/customers/main.go b/lib/customers/main.go index e5178338..f119be8f 100644 --- a/lib/customers/main.go +++ b/lib/customers/main.go @@ -4,5 +4,6 @@ func AllCustomers() []string { return []string{ "tests", "test", + "testdb", } } From cd5103d8e1cc9c6f5b054aa3923921f434aba69b Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Thu, 18 Apr 2024 20:20:41 -0400 Subject: [PATCH 17/23] comments and more customers --- lib/customers/main.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/lib/customers/main.go b/lib/customers/main.go index f119be8f..0168f8fd 100644 --- a/lib/customers/main.go +++ b/lib/customers/main.go @@ -1,9 +1,28 @@ package customers +/* +* +This is a NON-PRODUCTION workaround for testing the parallelism. +It will hardcode in a list of customers/mongo databases/namespace prefixes. +Each one will create a separate read/write coroutine and separate tailable mongo cursor. +This means if one db's OTR buffer fills, the others should continue processing. +This has no discovery mechanism and cannot change dynamically at all. +It is (at the moment) solely used for performance and load testing on staging. +*/ func AllCustomers() []string { return []string{ + // namespaces used by acceptance tests "tests", "test", "testdb", + // used when running in hori + "factory", + // used for misc config (probably unnecessary) + "dev", + "xxx", + "something", + // g2 staging sites + "alex", + /* TODO */ } } From 85207bcb3b9015ff58e74ff37843739204aa5572 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Fri, 19 Apr 2024 12:57:39 -0400 Subject: [PATCH 18/23] change from hardcoded to env var --- blackbox-tests/docker-compose.yml | 1 + docker-compose.yml | 1 + .../acceptance/docker-compose.yml | 1 + .../fault-injection/harness/otr.go | 1 + integration-tests/meteor/docker-compose.yml | 1 + .../performance/docker-compose.yml | 1 + lib/customers/main.go | 28 ------------------- main.go | 5 ++-- 8 files changed, 9 insertions(+), 30 deletions(-) delete mode 100644 lib/customers/main.go diff --git a/blackbox-tests/docker-compose.yml b/blackbox-tests/docker-compose.yml index 9e5c31a1..c7b57ac9 100644 --- a/blackbox-tests/docker-compose.yml +++ b/blackbox-tests/docker-compose.yml @@ -20,6 +20,7 @@ services: - OTR_MONGO_URL=mongodb://mongo/dev - OTR_REDIS_URL=rediss://redis:6380 - OTR_OPLOG_V2_EXTRACT_SUBFIELD_CHANGES=true + - OTR_HARDCODED_CUSTOMERS=test,tests,testdb,dev ports: - 9000:9000 depends_on: diff --git a/docker-compose.yml b/docker-compose.yml index 9e3d6816..700f8066 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,6 +22,7 @@ services: - OTR_MONGO_URL=mongodb://mongo/dev - OTR_REDIS_URL=redis://redis,redis-sentinel://redis-sentinel:26379?sentinelMasterId=mymaster - OTR_LOG_DEBUG=true + - OTR_HARDCODED_CUSTOMERS=test,tests,testdb,dev ports: - 9000:9000 volumes: diff --git a/integration-tests/acceptance/docker-compose.yml b/integration-tests/acceptance/docker-compose.yml index 04ff4fa6..c661ee1d 100644 --- a/integration-tests/acceptance/docker-compose.yml +++ b/integration-tests/acceptance/docker-compose.yml @@ -42,6 +42,7 @@ services: - OTR_REDIS_URL=redis-sentinel://redis-sentinel:26379?sentinelMasterId=mymaster,redis://redis - OTR_LOG_DEBUG=true - OTR_OPLOG_V2_EXTRACT_SUBFIELD_CHANGES=true + - OTR_HARDCODED_CUSTOMERS=test,tests,testdb,dev depends_on: mongo: condition: service_healthy diff --git a/integration-tests/fault-injection/harness/otr.go b/integration-tests/fault-injection/harness/otr.go index 0eb4b683..485c51c8 100644 --- a/integration-tests/fault-injection/harness/otr.go +++ b/integration-tests/fault-injection/harness/otr.go @@ -38,6 +38,7 @@ func StartOTRProcessWithEnv(mongoURL string, redisURL string, port int, extraEnv "OTR_LOG_DEBUG=true", "OTR_METADATA_PREFIX=" + randString(16), fmt.Sprintf("OTR_HTTP_SERVER_ADDR=0.0.0.0:%d", port), + "OTR_HARDCODED_CUSTOMERS=test,tests,testdb,dev", }, extraEnv...), } diff --git a/integration-tests/meteor/docker-compose.yml b/integration-tests/meteor/docker-compose.yml index 87051a19..7e69ebee 100644 --- a/integration-tests/meteor/docker-compose.yml +++ b/integration-tests/meteor/docker-compose.yml @@ -40,6 +40,7 @@ services: - OTR_REDIS_URL=redis://redis # - OTR_LOG_DEBUG=true - OTR_OPLOG_V2_EXTRACT_SUBFIELD_CHANGES=true + - OTR_HARDCODED_CUSTOMERS=test,tests,testdb,dev depends_on: mongo: condition: service_healthy diff --git a/integration-tests/performance/docker-compose.yml b/integration-tests/performance/docker-compose.yml index d09bf424..0a98ccba 100644 --- a/integration-tests/performance/docker-compose.yml +++ b/integration-tests/performance/docker-compose.yml @@ -38,6 +38,7 @@ services: environment: - OTR_MONGO_URL=mongodb://mongo/tests - OTR_REDIS_URL=redis-sentinel://redis-sentinel:26379?sentinelMasterId=mymaster,redis://redis + - OTR_HARDCODED_CUSTOMERS=test,tests,testdb,dev depends_on: mongo: condition: service_healthy diff --git a/lib/customers/main.go b/lib/customers/main.go deleted file mode 100644 index 0168f8fd..00000000 --- a/lib/customers/main.go +++ /dev/null @@ -1,28 +0,0 @@ -package customers - -/* -* -This is a NON-PRODUCTION workaround for testing the parallelism. -It will hardcode in a list of customers/mongo databases/namespace prefixes. -Each one will create a separate read/write coroutine and separate tailable mongo cursor. -This means if one db's OTR buffer fills, the others should continue processing. -This has no discovery mechanism and cannot change dynamically at all. -It is (at the moment) solely used for performance and load testing on staging. -*/ -func AllCustomers() []string { - return []string{ - // namespaces used by acceptance tests - "tests", - "test", - "testdb", - // used when running in hori - "factory", - // used for misc config (probably unnecessary) - "dev", - "xxx", - "something", - // g2 staging sites - "alex", - /* TODO */ - } -} diff --git a/main.go b/main.go index 4f095eba..140ade46 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,6 @@ import ( "go.mongodb.org/mongo-driver/mongo/readpref" "github.com/tulip/oplogtoredis/lib/config" - "github.com/tulip/oplogtoredis/lib/customers" "github.com/tulip/oplogtoredis/lib/log" "github.com/tulip/oplogtoredis/lib/oplog" "github.com/tulip/oplogtoredis/lib/parse" @@ -60,9 +59,11 @@ func main() { waitgroups := []*sync.WaitGroup{} allRedisClients := []redis.UniversalClient{} + allCustomers := strings.Split(os.Getenv("OTR_HARDCODED_CUSTOMERS"), ",") + // TEMPORARY: hardcode list of customers and run all the processors based on that list // TODO: have a singleton coroutine that ingests everything and uses it to detect new customers - for _, customer := range customers.AllCustomers() { + for _, customer := range allCustomers { // each processing coroutine needs its own redis clients, so that they don't get bottlenecked // on one customer's redis write speed From 7d1e964c4c70ae6839fe75d251d061edb076f06c Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Fri, 19 Apr 2024 13:31:50 -0400 Subject: [PATCH 19/23] future proof --- main.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 140ade46..a172537d 100644 --- a/main.go +++ b/main.go @@ -59,7 +59,12 @@ func main() { waitgroups := []*sync.WaitGroup{} allRedisClients := []redis.UniversalClient{} - allCustomers := strings.Split(os.Getenv("OTR_HARDCODED_CUSTOMERS"), ",") + customerString := os.Getenv("OTR_HARDCODED_CUSTOMERS") + if customerString == "" { + panic("OTR_HARDCODED_CUSTOMERS not specified or blank") + } + + allCustomers := strings.Split(customerString, ",") // TEMPORARY: hardcode list of customers and run all the processors based on that list // TODO: have a singleton coroutine that ingests everything and uses it to detect new customers From f2cb478fe1c774ccec9ca37a12dac43dbf811054 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Fri, 19 Apr 2024 15:49:13 -0400 Subject: [PATCH 20/23] customer names could have dashes in them --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index a172537d..f28dcbf0 100644 --- a/main.go +++ b/main.go @@ -102,7 +102,7 @@ func main() { promauto.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: "otr", - Name: "buffer_available_" + customer, + Name: "buffer_available_" + strings.ReplaceAll(customer, "-", "_"), Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.", }, func() float64 { return float64(bufferSize - len(redisPubs)) From af4b821479c36278386e4b7d8752e642e750ccda Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Fri, 19 Apr 2024 16:25:05 -0400 Subject: [PATCH 21/23] trying to see if making it a longer timeout means it can hold staging all at once --- lib/config/main.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/config/main.go b/lib/config/main.go index 0dec671d..3135fb45 100644 --- a/lib/config/main.go +++ b/lib/config/main.go @@ -4,8 +4,9 @@ package config import ( - "time" "strings" + "time" + "github.com/kelseyhightower/envconfig" ) @@ -19,7 +20,7 @@ type oplogtoredisConfiguration struct { RedisDedupeExpiration time.Duration `default:"120s" split_words:"true"` RedisMetadataPrefix string `default:"oplogtoredis::" split_words:"true"` MongoConnectTimeout time.Duration `default:"10s" split_words:"true"` - MongoQueryTimeout time.Duration `default:"5s" split_words:"true"` + MongoQueryTimeout time.Duration `default:"1m" split_words:"true"` OplogV2ExtractSubfieldChanges bool `default:"false" envconfig:"OPLOG_V2_EXTRACT_SUBFIELD_CHANGES"` } From dc424b19846425ef2d81fd391b2fad99f9160889 Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Fri, 19 Apr 2024 16:37:48 -0400 Subject: [PATCH 22/23] temporarily take out health check --- main.go | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/main.go b/main.go index f28dcbf0..7706daff 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,6 @@ import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/mongo/readpref" "github.com/tulip/oplogtoredis/lib/config" "github.com/tulip/oplogtoredis/lib/log" @@ -253,30 +252,30 @@ func createRedisClients() ([]redis.UniversalClient, error) { return ret, nil } -func makeHTTPServer(clients []redis.UniversalClient, mongo *mongo.Client) *http.Server { +func makeHTTPServer( /*clients []redis.UniversalClient, mongo *mongo.Client*/ ) *http.Server { mux := http.NewServeMux() mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { redisOK := true - for _, redis := range clients { - redisErr := redis.Ping(context.Background()).Err() - redisOK = (redisOK && (redisErr == nil)) - if !redisOK { - log.Log.Errorw("Error connecting to Redis during healthz check", - "error", redisErr) - } - } - - ctx, cancel := context.WithTimeout(context.Background(), config.MongoConnectTimeout()) - defer cancel() - - mongoErr := mongo.Ping(ctx, readpref.Primary()) - mongoOK := mongoErr == nil - - if !mongoOK { - log.Log.Errorw("Error connecting to Mongo during healthz check", - "error", mongoErr) - } + // for _, redis := range clients { + // redisErr := redis.Ping(context.Background()).Err() + // redisOK = (redisOK && (redisErr == nil)) + // if !redisOK { + // log.Log.Errorw("Error connecting to Redis during healthz check", + // "error", redisErr) + // } + // } + + // ctx, cancel := context.WithTimeout(context.Background(), config.MongoConnectTimeout()) + // defer cancel() + + // mongoErr := mongo.Ping(ctx, readpref.Primary()) + mongoOK := true //mongoErr == nil + + // if !mongoOK { + // log.Log.Errorw("Error connecting to Mongo during healthz check", + // "error", mongoErr) + // } if mongoOK && redisOK { w.WriteHeader(http.StatusOK) From 84ff47b0f952ad98076eb899f2d823e3403b28be Mon Sep 17 00:00:00 2001 From: Alex Goodisman Date: Fri, 19 Apr 2024 16:42:08 -0400 Subject: [PATCH 23/23] oops --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 7706daff..d6e560c5 100644 --- a/main.go +++ b/main.go @@ -252,7 +252,7 @@ func createRedisClients() ([]redis.UniversalClient, error) { return ret, nil } -func makeHTTPServer( /*clients []redis.UniversalClient, mongo *mongo.Client*/ ) *http.Server { +func makeHTTPServer(clients []redis.UniversalClient, mongo *mongo.Client) *http.Server { mux := http.NewServeMux() mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {