Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: parallelize by customer #66

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions blackbox-tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ services:
- OTR_MONGO_URL=mongodb://mongo/dev
- OTR_REDIS_URL=rediss://redis:6380
- OTR_OPLOG_V2_EXTRACT_SUBFIELD_CHANGES=true
- OTR_HARDCODED_CUSTOMERS=test,tests,testdb,dev
ports:
- 9000:9000
depends_on:
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ services:
- OTR_MONGO_URL=mongodb://mongo/dev
- OTR_REDIS_URL=redis://redis,redis-sentinel://redis-sentinel:26379?sentinelMasterId=mymaster
- OTR_LOG_DEBUG=true
- OTR_HARDCODED_CUSTOMERS=test,tests,testdb,dev
ports:
- 9000:9000
volumes:
Expand Down
1 change: 1 addition & 0 deletions integration-tests/acceptance/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ services:
- OTR_REDIS_URL=redis-sentinel://redis-sentinel:26379?sentinelMasterId=mymaster,redis://redis
- OTR_LOG_DEBUG=true
- OTR_OPLOG_V2_EXTRACT_SUBFIELD_CHANGES=true
- OTR_HARDCODED_CUSTOMERS=test,tests,testdb,dev
depends_on:
mongo:
condition: service_healthy
Expand Down
1 change: 1 addition & 0 deletions integration-tests/fault-injection/harness/otr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func StartOTRProcessWithEnv(mongoURL string, redisURL string, port int, extraEnv
"OTR_LOG_DEBUG=true",
"OTR_METADATA_PREFIX=" + randString(16),
fmt.Sprintf("OTR_HTTP_SERVER_ADDR=0.0.0.0:%d", port),
"OTR_HARDCODED_CUSTOMERS=test,tests,testdb,dev",
}, extraEnv...),
}

Expand Down
1 change: 1 addition & 0 deletions integration-tests/meteor/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ services:
- OTR_REDIS_URL=redis://redis
# - OTR_LOG_DEBUG=true
- OTR_OPLOG_V2_EXTRACT_SUBFIELD_CHANGES=true
- OTR_HARDCODED_CUSTOMERS=test,tests,testdb,dev
depends_on:
mongo:
condition: service_healthy
Expand Down
1 change: 1 addition & 0 deletions integration-tests/performance/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ services:
environment:
- OTR_MONGO_URL=mongodb://mongo/tests
- OTR_REDIS_URL=redis-sentinel://redis-sentinel:26379?sentinelMasterId=mymaster,redis://redis
- OTR_HARDCODED_CUSTOMERS=test,tests,testdb,dev
depends_on:
mongo:
condition: service_healthy
Expand Down
5 changes: 3 additions & 2 deletions lib/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
package config

import (
"time"
"strings"
"time"

"github.com/kelseyhightower/envconfig"
)

Expand All @@ -19,7 +20,7 @@ type oplogtoredisConfiguration struct {
RedisDedupeExpiration time.Duration `default:"120s" split_words:"true"`
RedisMetadataPrefix string `default:"oplogtoredis::" split_words:"true"`
MongoConnectTimeout time.Duration `default:"10s" split_words:"true"`
MongoQueryTimeout time.Duration `default:"5s" split_words:"true"`
MongoQueryTimeout time.Duration `default:"1m" split_words:"true"`
OplogV2ExtractSubfieldChanges bool `default:"false" envconfig:"OPLOG_V2_EXTRACT_SUBFIELD_CHANGES"`
}

Expand Down
79 changes: 55 additions & 24 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
// Tailer persistently tails the oplog of a Mongo cluster, handling
// reconnection and resumption of where it left off.
type Tailer struct {
MongoClient *mongo.Client
MongoClient *mongo.Client
RedisClients []redis.UniversalClient
RedisPrefix string
MaxCatchUp time.Duration
RedisPrefix string
MaxCatchUp time.Duration
}

// Raw oplog entry from Mongo
Expand Down Expand Up @@ -107,7 +107,7 @@ func init() {

// Tail begins tailing the oplog. It doesn't return unless it receives a message
// on the stop channel, in which case it wraps up its work and then returns.
func (tailer *Tailer) Tail(out chan<- *redispub.Publication, stop <-chan bool) {
func (tailer *Tailer) Tail(out chan<- *redispub.Publication, stop <-chan bool, customer string) {
childStopC := make(chan bool)
wasStopped := false

Expand All @@ -118,9 +118,9 @@ func (tailer *Tailer) Tail(out chan<- *redispub.Publication, stop <-chan bool) {
}()

for {
log.Log.Info("Starting oplog tailing")
tailer.tailOnce(out, childStopC)
log.Log.Info("Oplog tailing ended")
log.Log.Info("Starting oplog tailing for " + customer)
tailer.tailOnce(out, childStopC, customer)
log.Log.Info("Oplog tailing ended for " + customer)

if wasStopped {
return
Expand All @@ -131,7 +131,7 @@ func (tailer *Tailer) Tail(out chan<- *redispub.Publication, stop <-chan bool) {
}
}

func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan bool) {
func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan bool, customer string) {
session, err := tailer.MongoClient.StartSession()
if err != nil {
log.Log.Errorw("Failed to start Mongo session", "error", err)
Expand All @@ -140,7 +140,7 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo

oplogCollection := session.Client().Database("local").Collection("oplog.rs")

startTime := tailer.getStartTime(func() (*primitive.Timestamp, error) {
startTime := tailer.getStartTime(customer, func() (*primitive.Timestamp, error) {
// Get the timestamp of the last entry in the oplog (as a position to
// start from if we don't have a last-written timestamp from Redis)
var entry rawOplogEntry
Expand All @@ -150,7 +150,16 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo
queryContext, queryContextCancel := context.WithTimeout(context.Background(), config.MongoQueryTimeout())
defer queryContextCancel()

result := oplogCollection.FindOne(queryContext, bson.M{}, findOneOpts)
// if we're getting timestamp from the oplog, we need to filter the same way we would filter the actual
// tail, so filter out everything except for the specififed namespace, or admin.$cmd
timeFilter := bson.M{}
if customer != "" {
timeFilter["ns"] = bson.M{
"$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)",
}
}

result := oplogCollection.FindOne(queryContext, timeFilter, findOneOpts)

if result.Err() != nil {
return nil, result.Err()
Expand All @@ -168,7 +177,7 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo
return &ts, nil
})

query, queryErr := issueOplogFindQuery(oplogCollection, startTime)
query, queryErr := issueOplogFindQuery(oplogCollection, startTime, customer)

if queryErr != nil {
log.Log.Errorw("Error issuing tail query", "error", queryErr)
Expand All @@ -195,7 +204,7 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo
continue
}

ts, pubs := tailer.unmarshalEntry(rawData)
ts, pubs := tailer.unmarshalEntry(rawData, customer)

if ts != nil {
lastTimestamp = *ts
Expand All @@ -215,7 +224,7 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo
// timeout after our timeout duration, and we'll create a new one.
log.Log.Debug("Oplog cursor timed out, will retry")

query, queryErr = issueOplogFindQuery(oplogCollection, lastTimestamp)
query, queryErr = issueOplogFindQuery(oplogCollection, lastTimestamp, customer)

if queryErr != nil {
log.Log.Errorw("Error issuing tail query", "error", queryErr)
Expand All @@ -226,7 +235,7 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo
} else if status.DidLosePosition {
// Our cursor expired. Make a new cursor to pick up from where we
// left off.
query, queryErr = issueOplogFindQuery(oplogCollection, lastTimestamp)
query, queryErr = issueOplogFindQuery(oplogCollection, lastTimestamp, customer)

if queryErr != nil {
log.Log.Errorw("Error issuing tail query", "error", queryErr)
Expand Down Expand Up @@ -300,17 +309,29 @@ func readNextFromCursor(cursor *mongo.Cursor) (status cursorResultStatus, err er
return
}

func issueOplogFindQuery(c *mongo.Collection, startTime primitive.Timestamp) (*mongo.Cursor, error) {
func issueOplogFindQuery(c *mongo.Collection, startTime primitive.Timestamp, customer string) (*mongo.Cursor, error) {
queryOpts := &options.FindOptions{}
queryOpts.SetSort(bson.M{"$natural": 1})
queryOpts.SetCursorType(options.TailableAwait)

queryContext, queryContextCancel := context.WithTimeout(context.Background(), config.MongoQueryTimeout())
defer queryContextCancel()

return c.Find(queryContext, bson.M{
"ts": bson.M{"$gt": startTime},
}, queryOpts)
queryFilter := bson.M{
"ts": bson.M{
"$gt": startTime,
},
}

// to prevent each processor coroutine from blocking each other, each one should add a query filter
// for only that customer's messages. However we also have to get admin.$cmd, for transaction operations.
if customer != "" {
queryFilter["ns"] = bson.M{
"$regex": "(admin\\.\\$cmd)|(" + customer + "\\..*)",
}
}

return c.Find(queryContext, queryFilter, queryOpts)
}

func closeCursor(cursor *mongo.Cursor) {
Expand All @@ -328,7 +349,7 @@ func closeCursor(cursor *mongo.Cursor) {
//
// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it
// ignored it or failed at some later step.
func (tailer *Tailer) unmarshalEntry(rawData bson.Raw) (timestamp *primitive.Timestamp, pubs []*redispub.Publication) {
func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, customer string) (timestamp *primitive.Timestamp, pubs []*redispub.Publication) {
var result rawOplogEntry

err := bson.Unmarshal(rawData, &result)
Expand All @@ -339,7 +360,7 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw) (timestamp *primitive.Tim

timestamp = &result.Timestamp

entries := tailer.parseRawOplogEntry(result, nil)
entries := tailer.parseRawOplogEntry(result, nil, customer)
log.Log.Debugw("Received oplog entry", "entry", result, "processTime", time.Now().UnixMilli())

status := "ignored"
Expand Down Expand Up @@ -403,8 +424,8 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw) (timestamp *primitive.Tim
// We take the function to get the timestamp of the last oplog entry (as a
// fallback if we don't have a latest timestamp from Redis) as an arg instead
// of using tailer.mongoClient directly so we can unit test this function
func (tailer *Tailer) getStartTime(getTimestampOfLastOplogEntry func() (*primitive.Timestamp, error)) primitive.Timestamp {
ts, tsTime, redisErr := redispub.LastProcessedTimestamp(tailer.RedisClients[0], tailer.RedisPrefix)
func (tailer *Tailer) getStartTime(customer string, getTimestampOfLastOplogEntry func() (*primitive.Timestamp, error)) primitive.Timestamp {
ts, tsTime, redisErr := redispub.LastProcessedTimestamp(tailer.RedisClients[0], tailer.RedisPrefix, customer)

if redisErr == nil {
// we have a last write time, check that it's not too far in the
Expand Down Expand Up @@ -434,7 +455,7 @@ func (tailer *Tailer) getStartTime(getTimestampOfLastOplogEntry func() (*primiti
}

// converts a rawOplogEntry to an oplogEntry
func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []oplogEntry {
func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint, customer string) []oplogEntry {
if txIdx == nil {
idx := uint(0)
txIdx = &idx
Expand All @@ -448,6 +469,14 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl
return nil
}

// only return leaf nodes if they match the namespace prefix (db/customer name)
// this should normally be filtered out by the mongo query,
// but because of tx documents or other admin commands, we might get them anyway.
// so just return an empty array in that case.
if !strings.HasPrefix(entry.Namespace, customer) {
return []oplogEntry{}
}

out := oplogEntry{
Operation: entry.Operation,
Timestamp: entry.Timestamp,
Expand Down Expand Up @@ -487,7 +516,9 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl

for _, v := range txData.ApplyOps {
v.Timestamp = entry.Timestamp
ret = append(ret, tailer.parseRawOplogEntry(v, txIdx)...)
// in case the nested entries match this customer, recur all of them.
// when leaves eventually are parsed, the final array will only contain matched ones.
ret = append(ret, tailer.parseRawOplogEntry(v, txIdx, customer)...)
}

return ret
Expand Down
10 changes: 5 additions & 5 deletions lib/oplog/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,19 @@ func TestGetStartTime(t *testing.T) {
panic(err)
}
defer redisServer.Close()
require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry", strconv.FormatInt(int64(test.redisTimestamp.T), 10)))
require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry.test", strconv.FormatInt(int64(test.redisTimestamp.T), 10)))

redisClient := []redis.UniversalClient{redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{redisServer.Addr()},
})}

tailer := Tailer{
RedisClients: redisClient,
RedisPrefix: "someprefix.",
MaxCatchUp: maxCatchUp,
RedisPrefix: "someprefix.",
MaxCatchUp: maxCatchUp,
}

actualResult := tailer.getStartTime(func() (*primitive.Timestamp, error) {
actualResult := tailer.getStartTime("test", func() (*primitive.Timestamp, error) {
if test.mongoEndOfOplogErr != nil {
return nil, test.mongoEndOfOplogErr
}
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestParseRawOplogEntry(t *testing.T) {

for testName, test := range tests {
t.Run(testName, func(t *testing.T) {
got := (&Tailer{}).parseRawOplogEntry(test.in, nil)
got := (&Tailer{}).parseRawOplogEntry(test.in, nil, "foo")

if diff := pretty.Compare(got, test.want); diff != "" {
t.Errorf("Got incorrect result (-got +want)\n%s", diff)
Expand Down
4 changes: 2 additions & 2 deletions lib/redispub/lastProcessedTime.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
//
// If oplogtoredis has not processed any messages, returns redis.Nil as an
// error.
func LastProcessedTimestamp(redisClient redis.UniversalClient, metadataPrefix string) (primitive.Timestamp, time.Time, error) {
str, err := redisClient.Get(context.Background(), metadataPrefix+"lastProcessedEntry").Result()
func LastProcessedTimestamp(redisClient redis.UniversalClient, metadataPrefix string, customer string) (primitive.Timestamp, time.Time, error) {
str, err := redisClient.Get(context.Background(), metadataPrefix+"lastProcessedEntry."+customer).Result()
if err != nil {
return primitive.Timestamp{}, time.Unix(0, 0), err
}
Expand Down
12 changes: 6 additions & 6 deletions lib/redispub/lastProcessedTime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func TestLastProcessedTimestampSuccess(t *testing.T) {
redisServer, redisClient := startMiniredis()
defer redisServer.Close()

require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry", encodeMongoTimestamp(nowTS)))
require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry.test", encodeMongoTimestamp(nowTS)))

gotTS, gotTime, err := LastProcessedTimestamp(redisClient, "someprefix.")
gotTS, gotTime, err := LastProcessedTimestamp(redisClient, "someprefix.", "test")

if err != nil {
t.Errorf("Got unexpected error: %s", err)
Expand All @@ -52,7 +52,7 @@ func TestLastProcessedTimestampNoRecord(t *testing.T) {
redisServer, redisClient := startMiniredis()
defer redisServer.Close()

_, _, err := LastProcessedTimestamp(redisClient, "someprefix.")
_, _, err := LastProcessedTimestamp(redisClient, "someprefix.", "test")

if err == nil {
t.Errorf("Expected redis.Nil error, got no error")
Expand All @@ -64,9 +64,9 @@ func TestLastProcessedTimestampNoRecord(t *testing.T) {
func TestLastProcessedTimestampInvalidRecord(t *testing.T) {
redisServer, redisClient := startMiniredis()
defer redisServer.Close()
require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry", "not a number"))
require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry.test", "not a number"))

_, _, err := LastProcessedTimestamp(redisClient, "someprefix.")
_, _, err := LastProcessedTimestamp(redisClient, "someprefix.", "test")

if err == nil {
t.Errorf("Expected strconv error, got no error")
Expand All @@ -80,7 +80,7 @@ func TestLastProcessedTimestampRedisError(t *testing.T) {
Addrs: []string{"not a server"},
})

_, _, err := LastProcessedTimestamp(redisClient, "someprefix.")
_, _, err := LastProcessedTimestamp(redisClient, "someprefix.", "test")

if err == nil {
t.Errorf("Expected TCP error, got no error")
Expand Down
Loading
Loading