Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oplogtoredis: performance improvements by raw oplog processing #84

Merged
merged 19 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

buildGoModule {
pname = "oplogtoredis";
version = "3.6.0";
version = "3.7.0";
src = builtins.path { path = ./.; };

postInstall = ''
Expand Down
12 changes: 6 additions & 6 deletions integration-tests/acceptance/harness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
// This file is a simple test harness for acceptance tests

type harness struct {
redisClient redis.UniversalClient
redisClient redis.UniversalClient
legacyRedisClient redis.UniversalClient
subscription *redis.PubSub
subscriptionC <-chan *redis.Message
subscription *redis.PubSub
subscriptionC <-chan *redis.Message
legacySubscription *redis.PubSub
legacySubscriptionC <-chan *redis.Message
mongoClient *mongo.Database
mongoClient *mongo.Database
}

// Clears the mongo database, connects to redis, and starts a subscription to
Expand Down Expand Up @@ -110,7 +110,7 @@ func (h *harness) verifyPub(t *testing.T, pub map[string][]helpers.OTRMessage, e
helpers.SortOTRMessagesByID(pubs)
}
for key := range pub {
if strings.Contains(key, "sentinel"){
if strings.Contains(key, "sentinel") {
delete(pub, key)
}
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func (h *harness) verify(t *testing.T, expectedPubs map[string][]helpers.OTRMess
}
h.verifyPub(t, actualPubs, expectedPubs)
h.verifyPub(t, actualLegacyPubs, expectedPubs)
// pop the __sentinel__ entry
// pop the __sentinel__ entry

// Verify the correct messages were received on each channel
}
25 changes: 11 additions & 14 deletions integration-tests/helpers/redis.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
package helpers

import (
"github.com/go-redis/redis/v8"
"os"
"strings"
"github.com/go-redis/redis/v8"
)

func isSentinel(url string) bool{
func isSentinel(url string) bool {
return strings.Contains(url, "sentinel")
}

func createOptions(url string, sentinel bool) (redis.UniversalOptions) {
func createOptions(url string, sentinel bool) redis.UniversalOptions {
redisOpts, err := redis.ParseURL(url)
if err != nil {
panic(err)
}
var clientOptions redis.UniversalOptions
if sentinel {
clientOptions = redis.UniversalOptions{
Addrs: []string{redisOpts.Addr},
DB: redisOpts.DB,
Password: redisOpts.Password,
TLSConfig: redisOpts.TLSConfig,
Addrs: []string{redisOpts.Addr},
DB: redisOpts.DB,
Password: redisOpts.Password,
TLSConfig: redisOpts.TLSConfig,
MasterName: "mymaster",
}
}else{
} else {
clientOptions = redis.UniversalOptions{
Addrs: []string{redisOpts.Addr},
DB: redisOpts.DB,
Expand All @@ -33,14 +33,12 @@ func createOptions(url string, sentinel bool) (redis.UniversalOptions) {
}
}
return clientOptions

}

}

func redisClient(sentinel bool) redis.UniversalClient{
func redisClient(sentinel bool) redis.UniversalClient {
var urls = strings.Split(os.Getenv("REDIS_URL"), ",")



for _, url := range urls {
if isSentinel(url) == sentinel {
clientOptions := createOptions(url, sentinel)
Expand All @@ -54,7 +52,6 @@ func LegacyRedisClient() redis.UniversalClient {
return redisClient(false)
}


// RedisClient returns the second redis client to the URL specified in the REDIS_URL
// The first one is the legacy fallback URL
func RedisClient() redis.UniversalClient {
Expand Down
6 changes: 5 additions & 1 deletion integration-tests/meteor/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ services:
environment:
- OTR_MONGO_URL=mongodb://mongo/tests
- OTR_REDIS_URL=redis://redis
# - OTR_LOG_DEBUG=true
- OTR_LOG_DEBUG=true
- OTR_OPLOG_V2_EXTRACT_SUBFIELD_CHANGES=true
depends_on:
mongo:
Expand All @@ -52,6 +52,10 @@ services:
- --timeout=600
- testapp1:8080
- '--'
- /wait-for.sh
- --timeout=600
- testapp2:8080
- '--'
- /bin/oplogtoredis

mongo:
Expand Down
4 changes: 2 additions & 2 deletions lib/log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (

"github.com/tulip/oplogtoredis/lib/config"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/TheZeroSlave/zapsentry"
"github.com/getsentry/sentry-go"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// Log is a zap Sugared logger (a logger with a convenient API). You'll almost
Expand Down
92 changes: 47 additions & 45 deletions lib/oplog/oplogEntry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/tulip/oplogtoredis/lib/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)

Expand All @@ -26,7 +27,7 @@ var metricUnprocessableChangedFields = promauto.NewCounter(prometheus.CounterOpt
type oplogEntry struct {
DocID interface{}
Timestamp primitive.Timestamp
Data map[string]interface{}
Data bson.Raw
Operation string
Namespace string
Database string
Expand All @@ -52,57 +53,30 @@ func (op *oplogEntry) IsRemove() bool {

// Returns whether this is an oplog update format v2 update (new in MongoDB 5.0)
func (op *oplogEntry) IsV2Update() bool {
dataVersion, ok := op.Data["$v"]
if !ok {
dataVersionRaw := op.Data.Lookup("$v")
if dataVersionRaw.IsZero() {
return false
}

// bson unmarshals integers into interface{} differently depending on platform,
// so we handle any kind of number
var dataVersionInt int
switch t := dataVersion.(type) {
case int:
dataVersionInt = t
case int8:
dataVersionInt = int(t)
case int16:
dataVersionInt = int(t)
case int32:
dataVersionInt = int(t)
case int64:
dataVersionInt = int(t)
case uint:
dataVersionInt = int(t)
case uint8:
dataVersionInt = int(t)
case uint16:
dataVersionInt = int(t)
case uint32:
dataVersionInt = int(t)
case uint64:
dataVersionInt = int(t)
case float32:
dataVersionInt = int(t)
case float64:
dataVersionInt = int(t)
default:
dataVersionInt, ok := dataVersionRaw.AsInt64OK()
if !ok {
return false
}

if dataVersionInt != 2 {
return false
}

_, ok = op.Data["diff"]
return ok
diff := op.Data.Lookup("diff")
return !diff.IsZero()
}

// If this oplogEntry is for an insert, returns whether that insert is a
// replacement (rather than a modification)
func (op *oplogEntry) UpdateIsReplace() bool {
if _, ok := op.Data["$set"]; ok {
if data := op.Data.Lookup("$set"); !data.IsZero() {
return false
} else if _, ok := op.Data["$unset"]; ok {
} else if data := op.Data.Lookup("$unset"); !data.IsZero() {
return false
} else if op.IsV2Update() {
// the v2 update format is only used for modifications
Expand All @@ -113,9 +87,9 @@ func (op *oplogEntry) UpdateIsReplace() bool {
}

// Given an operation, returned the fields affected by that operation
func (op *oplogEntry) ChangedFields() []string {
func (op *oplogEntry) ChangedFields() ([]string, error) {
if op.IsInsert() || (op.IsUpdate() && op.UpdateIsReplace()) {
return mapKeys(op.Data)
return mapKeysRaw(op.Data)
} else if op.IsUpdate() && op.IsV2Update() {
// New-style update. Looks like:
// { $v: 2, diff: { sa: "10", sb: "20", d: { c: true } }
Expand All @@ -125,27 +99,55 @@ func (op *oplogEntry) ChangedFields() []string {
// { $v: 1, $set: { "a": 10, "b": 20 }, $unset: { "c": true } }

fields := []string{}
for operationKey, operation := range op.Data {
elements, err := op.Data.Elements()
if err != nil {
metricUnprocessableChangedFields.Inc()
log.Log.Errorw("Oplog data for non-replacement v1 update failed to unmarshal",
"op", op, "error", err)
return []string{}, err
}
for _, element := range elements {
operationKey := element.Key()
if operationKey == "$v" {
// $v indicates the update document format; it's not a changed key
continue
}

operationMap, operationMapOK := operation.(map[string]interface{})
operationMap, operationMapOK := element.Value().DocumentOK()
if !operationMapOK {
metricUnprocessableChangedFields.Inc()
log.Log.Errorw("Oplog data for non-replacement v1 update contained a key with a non-map value",
"op", op)
"op", op, "operationKey", operationKey)
continue
}

fields = append(fields, mapKeys(operationMap)...)
mapFields, err := mapKeysRaw(operationMap)
if err != nil {
return []string{}, err
}
fields = append(fields, mapFields...)
}

return fields
return fields, nil
}

return []string{}, nil
}

// Given a bson.Raw object, returns the top level keys of the document it represents
func mapKeysRaw(rawData bson.Raw) ([]string, error) {
elements, err := rawData.Elements()
if err != nil {
log.Log.Errorw("Failed to unmarshal oplog data",
"error", err)
return []string{}, err
}
fields := make([]string, len(elements))

for i := 0; i < len(fields); i++ {
fields[i] = elements[i].Key()
}

return []string{}
return fields, nil
}

// Given a map, returns the keys of that map
Expand Down
Loading
Loading