diff --git a/default.nix b/default.nix index b2494937..7bb8f374 100644 --- a/default.nix +++ b/default.nix @@ -2,7 +2,7 @@ buildGoModule { pname = "oplogtoredis"; - version = "3.6.0"; + version = "3.7.0"; src = builtins.path { path = ./.; }; postInstall = '' diff --git a/integration-tests/acceptance/harness_test.go b/integration-tests/acceptance/harness_test.go index 63c9aba0..ae87fd4f 100644 --- a/integration-tests/acceptance/harness_test.go +++ b/integration-tests/acceptance/harness_test.go @@ -18,13 +18,13 @@ import ( // This file is a simple test harness for acceptance tests type harness struct { - redisClient redis.UniversalClient + redisClient redis.UniversalClient legacyRedisClient redis.UniversalClient - subscription *redis.PubSub - subscriptionC <-chan *redis.Message + subscription *redis.PubSub + subscriptionC <-chan *redis.Message legacySubscription *redis.PubSub legacySubscriptionC <-chan *redis.Message - mongoClient *mongo.Database + mongoClient *mongo.Database } // Clears the mongo database, connects to redis, and starts a subscription to @@ -110,7 +110,7 @@ func (h *harness) verifyPub(t *testing.T, pub map[string][]helpers.OTRMessage, e helpers.SortOTRMessagesByID(pubs) } for key := range pub { - if strings.Contains(key, "sentinel"){ + if strings.Contains(key, "sentinel") { delete(pub, key) } } @@ -141,7 +141,7 @@ func (h *harness) verify(t *testing.T, expectedPubs map[string][]helpers.OTRMess } h.verifyPub(t, actualPubs, expectedPubs) h.verifyPub(t, actualLegacyPubs, expectedPubs) - // pop the __sentinel__ entry + // pop the __sentinel__ entry // Verify the correct messages were received on each channel } diff --git a/integration-tests/helpers/redis.go b/integration-tests/helpers/redis.go index 5d24145a..bda4bf27 100644 --- a/integration-tests/helpers/redis.go +++ b/integration-tests/helpers/redis.go @@ -1,16 +1,16 @@ package helpers import ( + "github.com/go-redis/redis/v8" "os" "strings" - "github.com/go-redis/redis/v8" ) -func isSentinel(url string) bool{ +func isSentinel(url string) bool { return strings.Contains(url, "sentinel") } -func createOptions(url string, sentinel bool) (redis.UniversalOptions) { +func createOptions(url string, sentinel bool) redis.UniversalOptions { redisOpts, err := redis.ParseURL(url) if err != nil { panic(err) @@ -18,13 +18,13 @@ func createOptions(url string, sentinel bool) (redis.UniversalOptions) { var clientOptions redis.UniversalOptions if sentinel { clientOptions = redis.UniversalOptions{ - Addrs: []string{redisOpts.Addr}, - DB: redisOpts.DB, - Password: redisOpts.Password, - TLSConfig: redisOpts.TLSConfig, + Addrs: []string{redisOpts.Addr}, + DB: redisOpts.DB, + Password: redisOpts.Password, + TLSConfig: redisOpts.TLSConfig, MasterName: "mymaster", } - }else{ + } else { clientOptions = redis.UniversalOptions{ Addrs: []string{redisOpts.Addr}, DB: redisOpts.DB, @@ -33,14 +33,12 @@ func createOptions(url string, sentinel bool) (redis.UniversalOptions) { } } return clientOptions - -} +} -func redisClient(sentinel bool) redis.UniversalClient{ +func redisClient(sentinel bool) redis.UniversalClient { var urls = strings.Split(os.Getenv("REDIS_URL"), ",") - - + for _, url := range urls { if isSentinel(url) == sentinel { clientOptions := createOptions(url, sentinel) @@ -54,7 +52,6 @@ func LegacyRedisClient() redis.UniversalClient { return redisClient(false) } - // RedisClient returns the second redis client to the URL specified in the REDIS_URL // The first one is the legacy fallback URL func RedisClient() redis.UniversalClient { diff --git a/integration-tests/meteor/docker-compose.yml b/integration-tests/meteor/docker-compose.yml index 87051a19..40a07958 100644 --- a/integration-tests/meteor/docker-compose.yml +++ b/integration-tests/meteor/docker-compose.yml @@ -38,7 +38,7 @@ services: environment: - OTR_MONGO_URL=mongodb://mongo/tests - OTR_REDIS_URL=redis://redis - # - OTR_LOG_DEBUG=true + - OTR_LOG_DEBUG=true - OTR_OPLOG_V2_EXTRACT_SUBFIELD_CHANGES=true depends_on: mongo: @@ -52,6 +52,10 @@ services: - --timeout=600 - testapp1:8080 - '--' + - /wait-for.sh + - --timeout=600 + - testapp2:8080 + - '--' - /bin/oplogtoredis mongo: diff --git a/lib/log/main.go b/lib/log/main.go index 60ff5b51..6a11427f 100644 --- a/lib/log/main.go +++ b/lib/log/main.go @@ -9,10 +9,10 @@ import ( "github.com/tulip/oplogtoredis/lib/config" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" "github.com/TheZeroSlave/zapsentry" "github.com/getsentry/sentry-go" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // Log is a zap Sugared logger (a logger with a convenient API). You'll almost diff --git a/lib/oplog/oplogEntry.go b/lib/oplog/oplogEntry.go index 19d5497e..0ae072dc 100644 --- a/lib/oplog/oplogEntry.go +++ b/lib/oplog/oplogEntry.go @@ -4,6 +4,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/tulip/oplogtoredis/lib/log" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -26,7 +27,7 @@ var metricUnprocessableChangedFields = promauto.NewCounter(prometheus.CounterOpt type oplogEntry struct { DocID interface{} Timestamp primitive.Timestamp - Data map[string]interface{} + Data bson.Raw Operation string Namespace string Database string @@ -52,40 +53,13 @@ func (op *oplogEntry) IsRemove() bool { // Returns whether this is an oplog update format v2 update (new in MongoDB 5.0) func (op *oplogEntry) IsV2Update() bool { - dataVersion, ok := op.Data["$v"] - if !ok { + dataVersionRaw := op.Data.Lookup("$v") + if dataVersionRaw.IsZero() { return false } - // bson unmarshals integers into interface{} differently depending on platform, - // so we handle any kind of number - var dataVersionInt int - switch t := dataVersion.(type) { - case int: - dataVersionInt = t - case int8: - dataVersionInt = int(t) - case int16: - dataVersionInt = int(t) - case int32: - dataVersionInt = int(t) - case int64: - dataVersionInt = int(t) - case uint: - dataVersionInt = int(t) - case uint8: - dataVersionInt = int(t) - case uint16: - dataVersionInt = int(t) - case uint32: - dataVersionInt = int(t) - case uint64: - dataVersionInt = int(t) - case float32: - dataVersionInt = int(t) - case float64: - dataVersionInt = int(t) - default: + dataVersionInt, ok := dataVersionRaw.AsInt64OK() + if !ok { return false } @@ -93,16 +67,16 @@ func (op *oplogEntry) IsV2Update() bool { return false } - _, ok = op.Data["diff"] - return ok + diff := op.Data.Lookup("diff") + return !diff.IsZero() } // If this oplogEntry is for an insert, returns whether that insert is a // replacement (rather than a modification) func (op *oplogEntry) UpdateIsReplace() bool { - if _, ok := op.Data["$set"]; ok { + if data := op.Data.Lookup("$set"); !data.IsZero() { return false - } else if _, ok := op.Data["$unset"]; ok { + } else if data := op.Data.Lookup("$unset"); !data.IsZero() { return false } else if op.IsV2Update() { // the v2 update format is only used for modifications @@ -113,9 +87,9 @@ func (op *oplogEntry) UpdateIsReplace() bool { } // Given an operation, returned the fields affected by that operation -func (op *oplogEntry) ChangedFields() []string { +func (op *oplogEntry) ChangedFields() ([]string, error) { if op.IsInsert() || (op.IsUpdate() && op.UpdateIsReplace()) { - return mapKeys(op.Data) + return mapKeysRaw(op.Data) } else if op.IsUpdate() && op.IsV2Update() { // New-style update. Looks like: // { $v: 2, diff: { sa: "10", sb: "20", d: { c: true } } @@ -125,27 +99,55 @@ func (op *oplogEntry) ChangedFields() []string { // { $v: 1, $set: { "a": 10, "b": 20 }, $unset: { "c": true } } fields := []string{} - for operationKey, operation := range op.Data { + elements, err := op.Data.Elements() + if err != nil { + metricUnprocessableChangedFields.Inc() + log.Log.Errorw("Oplog data for non-replacement v1 update failed to unmarshal", + "op", op, "error", err) + return []string{}, err + } + for _, element := range elements { + operationKey := element.Key() if operationKey == "$v" { // $v indicates the update document format; it's not a changed key continue } - operationMap, operationMapOK := operation.(map[string]interface{}) + operationMap, operationMapOK := element.Value().DocumentOK() if !operationMapOK { metricUnprocessableChangedFields.Inc() log.Log.Errorw("Oplog data for non-replacement v1 update contained a key with a non-map value", - "op", op) + "op", op, "operationKey", operationKey) continue } - - fields = append(fields, mapKeys(operationMap)...) + mapFields, err := mapKeysRaw(operationMap) + if err != nil { + return []string{}, err + } + fields = append(fields, mapFields...) } - return fields + return fields, nil + } + + return []string{}, nil +} + +// Given a bson.Raw object, returns the top level keys of the document it represents +func mapKeysRaw(rawData bson.Raw) ([]string, error) { + elements, err := rawData.Elements() + if err != nil { + log.Log.Errorw("Failed to unmarshal oplog data", + "error", err) + return []string{}, err + } + fields := make([]string, len(elements)) + + for i := 0; i < len(fields); i++ { + fields[i] = elements[i].Key() } - return []string{} + return fields, nil } // Given a map, returns the keys of that map diff --git a/lib/oplog/oplogEntry_test.go b/lib/oplog/oplogEntry_test.go index 055bb1fb..3c494a2e 100644 --- a/lib/oplog/oplogEntry_test.go +++ b/lib/oplog/oplogEntry_test.go @@ -8,8 +8,48 @@ import ( "testing" "github.com/tulip/oplogtoredis/lib/config" + + "go.mongodb.org/mongo-driver/bson" ) +func rawBson(t *testing.T, data interface{}) bson.Raw { + raw, err := bson.Marshal(data) + if err != nil { + t.Error("Failed to marshal test data", err) + } + return raw +} + +func arraysMatch(a, b []string) bool { + if len(a) != len(b) { + return false + } + for _, valA := range a { + found := false + for _, valB := range b { + if valA == valB { + found = true + break + } + } + if !found { + return false + } + } + return true +} + +func TestMapKeysRaw(t *testing.T) { + want := []string{"key1", "key2", "key3"} + got, err := mapKeysRaw(rawBson(t, map[string]interface{}{"key1": "one", "key2": "two", "key3": "three"})) + if err != nil { + t.Error("mapKeysRaw() error", err) + } + if !arraysMatch(got, want) { + t.Errorf("mapKeysRaw() = %v, want %v", got, want) + } +} + func TestCategorization(t *testing.T) { tests := map[string]struct { in *oplogEntry @@ -101,7 +141,7 @@ func TestUpdateIsReplace(t *testing.T) { for testName, test := range tests { t.Run(testName, func(t *testing.T) { - got := (&oplogEntry{Data: test.in}).UpdateIsReplace() + got := (&oplogEntry{Data: rawBson(t, test.in)}).UpdateIsReplace() if got != test.expectedResult { t.Errorf("UpdateIsReplace(%#v) = %t; want %t", @@ -120,10 +160,10 @@ func TestChangedFields(t *testing.T) { "Insert": { input: &oplogEntry{ Operation: "i", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "foo": "a", "bar": 10, - }, + }), }, want: []string{"foo", "bar"}, }, @@ -131,10 +171,10 @@ func TestChangedFields(t *testing.T) { "Replacement update": { input: &oplogEntry{ Operation: "u", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "foo": "a", "bar": 10, - }, + }), }, want: []string{"foo", "bar"}, }, @@ -142,10 +182,10 @@ func TestChangedFields(t *testing.T) { "Delete": { input: &oplogEntry{ Operation: "d", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "foo": "a", "bar": 10, - }, + }), }, want: []string{}, }, @@ -153,7 +193,7 @@ func TestChangedFields(t *testing.T) { "Update": { input: &oplogEntry{ Operation: "u", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "$v": "1.0", "$set": map[string]interface{}{ "foo": "a", @@ -163,7 +203,7 @@ func TestChangedFields(t *testing.T) { "$unset": map[string]interface{}{ "qax": true, }, - }, + }), }, want: []string{"foo", "bar", "baz.qux", "qax"}, }, @@ -171,10 +211,10 @@ func TestChangedFields(t *testing.T) { "Update, no operations": { input: &oplogEntry{ Operation: "u", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "$v": "1.0", "$set": map[string]interface{}{}, - }, + }), }, want: []string{}, }, @@ -182,13 +222,13 @@ func TestChangedFields(t *testing.T) { "Update, unexpected operation value type": { input: &oplogEntry{ Operation: "u", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "$v": "1.0", "weird": "thing", "$set": map[string]interface{}{ "foo": "a", }, - }, + }), }, want: []string{"foo"}, }, @@ -196,7 +236,7 @@ func TestChangedFields(t *testing.T) { "Update v2": { input: &oplogEntry{ Operation: "u", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "$v": 2, "diff": map[string]interface{}{ "i": map[string]interface{}{"a": 1, "b": "2"}, @@ -205,7 +245,7 @@ func TestChangedFields(t *testing.T) { "sg": 10, "sfoobar": map[string]interface{}{}, }, - }, + }), }, want: []string{"a", "b", "c", "d", "e", "f", "g", "foobar"}, }, @@ -213,7 +253,7 @@ func TestChangedFields(t *testing.T) { "Update v2 deep": { input: &oplogEntry{ Operation: "u", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "$v": 2, "diff": map[string]interface{}{ "i": map[string]interface{}{"a": 1, "b": "2"}, @@ -222,7 +262,7 @@ func TestChangedFields(t *testing.T) { "sg": map[string]interface{}{}, "sfoobar": map[string]interface{}{}, }, - }, + }), }, want: []string{"a", "b", "c", "d", "e", "f"}, enableV2ExtractDeepFieldChanges: true, @@ -231,10 +271,10 @@ func TestChangedFields(t *testing.T) { "Update v2, no operations": { input: &oplogEntry{ Operation: "u", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "$v": 2, "diff": map[string]interface{}{}, - }, + }), }, want: []string{}, }, @@ -242,10 +282,10 @@ func TestChangedFields(t *testing.T) { "Update v2, no operations deep": { input: &oplogEntry{ Operation: "u", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "$v": 2, "diff": map[string]interface{}{}, - }, + }), }, want: []string{}, enableV2ExtractDeepFieldChanges: true, @@ -254,7 +294,7 @@ func TestChangedFields(t *testing.T) { "Update v2, unexpected operation value type": { input: &oplogEntry{ Operation: "u", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "$v": 2, "weird": "thing", "diff": map[string]interface{}{ @@ -262,7 +302,7 @@ func TestChangedFields(t *testing.T) { "otherwierd": "thing", "sfoo": "bar", }, - }, + }), }, want: []string{"foo"}, }, @@ -270,7 +310,7 @@ func TestChangedFields(t *testing.T) { "Update v2, unexpected operation value type deep": { input: &oplogEntry{ Operation: "u", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "$v": 2, "weird": "thing", "diff": map[string]interface{}{ @@ -278,7 +318,7 @@ func TestChangedFields(t *testing.T) { "otherwierd": "thing", "sfoo": map[string]interface{}{"u": map[string]interface{}{"x": "10"}}, }, - }, + }), }, want: []string{"foo.x"}, enableV2ExtractDeepFieldChanges: true, @@ -295,7 +335,10 @@ func TestChangedFields(t *testing.T) { t.Errorf("Failed to parse env with subfield setting %t", test.enableV2ExtractDeepFieldChanges) } - got := test.input.ChangedFields() + got, err := test.input.ChangedFields() + if err != nil { + t.Error("ChangedFields() error", err) + } sort.Strings(got) sort.Strings(test.want) @@ -387,7 +430,7 @@ func TestUpdateIsV2Formatted(t *testing.T) { for testName, test := range tests { t.Run(testName, func(t *testing.T) { - got := (&oplogEntry{Data: test.in}).IsV2Update() + got := (&oplogEntry{Data: rawBson(t, test.in)}).IsV2Update() if got != test.expectedResult { t.Errorf("UpdateIsV2Formatted(%#v) = %t; want %t", diff --git a/lib/oplog/oplog_v2_converter.go b/lib/oplog/oplog_v2_converter.go index 7094f1e4..ec1f8ebd 100644 --- a/lib/oplog/oplog_v2_converter.go +++ b/lib/oplog/oplog_v2_converter.go @@ -1,11 +1,14 @@ package oplog import ( + "errors" "regexp" "strings" "github.com/tulip/oplogtoredis/lib/config" "github.com/tulip/oplogtoredis/lib/log" + + "go.mongodb.org/mongo-driver/bson" ) // Translated from https://github.com/meteor/meteor/blob/devel/packages/mongo/oplog_v2_converter.js @@ -76,7 +79,7 @@ func flatObjectKeys(prefix string, obj map[string]interface{}) []string { return acc } -func getChangedFieldsFromOplogV2UpdateDeep(diffMap map[string]interface{}, prefix string) []string { +func getChangedFieldsFromOplogV2UpdateDeep(diffMap map[string]interface{}, prefix string) ([]string, error) { fields := []string{} for operationKey, operation := range diffMap { @@ -118,7 +121,11 @@ func getChangedFieldsFromOplogV2UpdateDeep(diffMap map[string]interface{}, prefi } // indicates a sub-field set - fields = append(fields, getChangedFieldsFromOplogV2UpdateDeep(operationMap, prefix+operationKey[1:]+".")...) + subFields, err := getChangedFieldsFromOplogV2UpdateDeep(operationMap, prefix+operationKey[1:]+".") + if err != nil { + return []string{}, err + } + fields = append(fields, subFields...) } else if operationKey == "a" { // ignore continue @@ -131,23 +138,35 @@ func getChangedFieldsFromOplogV2UpdateDeep(diffMap map[string]interface{}, prefi } - return fields + return fields, nil } -func getChangedFieldsFromOplogV2UpdateShallow(diffMap map[string]interface{}) []string { +func getChangedFieldsFromOplogV2UpdateShallow(diffRaw bson.Raw) ([]string, error) { fields := []string{} - for operationKey, operation := range diffMap { + + elements, err := diffRaw.Elements() + if err != nil { + metricUnprocessableChangedFields.Inc() + log.Log.Errorw("Oplog data for non-replacement v1 update failed to unmarshal", + "op", diffRaw, "error", err) + return []string{}, err + } + for _, element := range elements { + operationKey := element.Key() if operationKey == "i" || operationKey == "u" || operationKey == "d" { // indicates an insert, update, or delete of a whole subtree - operationMap, operationMapOK := operation.(map[string]interface{}) + operationMap, operationMapOK := element.Value().DocumentOK() if !operationMapOK { metricUnprocessableChangedFields.Inc() log.Log.Errorw("Oplog data for non-replacement v2 update contained a i/u/d key with a non-map value", - "op", diffMap) + "op", diffRaw) continue } - - fields = append(fields, mapKeys(operationMap)...) + mapFields, err := mapKeysRaw(operationMap) + if err != nil { + return []string{}, err + } + fields = append(fields, mapFields...) } else if strings.HasPrefix(operationKey, "s") { // indicates a sub-field set fields = append(fields, strings.TrimPrefix(operationKey, "s")) @@ -157,37 +176,46 @@ func getChangedFieldsFromOplogV2UpdateShallow(diffMap map[string]interface{}) [] } else { metricUnprocessableChangedFields.Inc() log.Log.Errorw("Oplog data for non-replacement v2 update contained a field that was not an i/u/d or an s-prefixed field", - "op", diffMap) + "op", diffRaw) continue } } - return fields + return fields, nil } -func getChangedFieldsFromOplogV2Update(op *oplogEntry) []string { +func getChangedFieldsFromOplogV2Update(op *oplogEntry) ([]string, error) { // New-style update. Looks like: // { $v: 2, diff: { sa: "10", sb: "20", d: { c: true } } - diff, ok := op.Data["diff"] - if !ok { + diffRawElement := op.Data.Lookup("diff") + if diffRawElement.IsZero() { metricUnprocessableChangedFields.Inc() log.Log.Errorw("Oplog data for non-replacement v2 update did not have a diff field", "op", op) - return []string{} + return []string{}, errors.New("Oplog data for non-replacement v2 update did not have a diff field") } - diffMap, ok := diff.(map[string]interface{}) + diffRaw, ok := diffRawElement.DocumentOK() + if !ok { metricUnprocessableChangedFields.Inc() log.Log.Errorw("Oplog data for non-replacement v2 update had a diff that was not a map", "op", op) - return []string{} + return []string{}, errors.New("Oplog data for non-replacement v2 update had a diff that was not a map") } if config.OplogV2ExtractSubfieldChanges() { + var diffMap map[string]interface{} + err := bson.Unmarshal(diffRaw, &diffMap) + if err != nil { + metricUnprocessableChangedFields.Inc() + log.Log.Errorw("Oplog data for non-replacement v2 update had a diff that was not a map", + "op", op) + return []string{}, err + } return getChangedFieldsFromOplogV2UpdateDeep(diffMap, "") } else { - return getChangedFieldsFromOplogV2UpdateShallow(diffMap) + return getChangedFieldsFromOplogV2UpdateShallow(diffRaw) } } diff --git a/lib/oplog/oplog_v2_converter_test.go b/lib/oplog/oplog_v2_converter_test.go index dc9fdbbe..05b687c9 100644 --- a/lib/oplog/oplog_v2_converter_test.go +++ b/lib/oplog/oplog_v2_converter_test.go @@ -163,7 +163,10 @@ func TestOplogV2DeepConverter(t *testing.T) { sort.Strings(test.want) t.Run(testName, func(t *testing.T) { - got := getChangedFieldsFromOplogV2UpdateDeep(test.in, "") + got, err := getChangedFieldsFromOplogV2UpdateDeep(test.in, "") + if err != nil { + t.Error("getChangedFieldsFromOplogV2UpdateDeep() returned error", err) + } sort.Strings(got) assert.Equal(t, test.want, got) }) diff --git a/lib/oplog/processor.go b/lib/oplog/processor.go index b9ebda7b..20f23fbc 100644 --- a/lib/oplog/processor.go +++ b/lib/oplog/processor.go @@ -65,6 +65,11 @@ func processOplogEntry(op *oplogEntry) (*redispub.Publication, error) { return nil, errors.Wrapf(ErrUnsupportedDocIDType, "expected string or ObjectID, got %T instead", op.DocID) } + changedFields, errCF := op.ChangedFields() + if errCF != nil { + return nil, errors.Wrap(errCF, "error getting changed fields") + } + // Construct the JSON we're going to send to Redis // // TODO PERF: consider a specialized JSON encoder @@ -72,7 +77,7 @@ func processOplogEntry(op *oplogEntry) (*redispub.Publication, error) { msg := outgoingMessage{ Event: eventNameForOperation(op), Doc: outgoingMessageDocument{idForMessage}, - Fields: op.ChangedFields(), + Fields: changedFields, } log.Log.Debugw("Sending outgoing message", "message", msg) msgJSON, err := json.Marshal(&msg) diff --git a/lib/oplog/processor_test.go b/lib/oplog/processor_test.go index 95079b79..c90287ef 100644 --- a/lib/oplog/processor_test.go +++ b/lib/oplog/processor_test.go @@ -56,9 +56,9 @@ func TestProcessOplogEntry(t *testing.T) { Namespace: "foo.bar", Database: "foo", Collection: "bar", - Data: bson.M{ + Data: rawBson(t, bson.M{ "some": "field", - }, + }), Timestamp: primitive.Timestamp{T: 1234}, }, want: &decodedPublication{ @@ -81,10 +81,10 @@ func TestProcessOplogEntry(t *testing.T) { Namespace: "foo.bar", Database: "foo", Collection: "bar", - Data: bson.M{ + Data: rawBson(t, bson.M{ "some": "field", "new": "field", - }, + }), Timestamp: primitive.Timestamp{T: 1234}, }, want: &decodedPublication{ @@ -107,7 +107,7 @@ func TestProcessOplogEntry(t *testing.T) { Namespace: "foo.bar", Database: "foo", Collection: "bar", - Data: bson.M{ + Data: rawBson(t, bson.M{ "$v": "1.2.3", "$set": map[string]interface{}{ "a": "foo", @@ -116,7 +116,7 @@ func TestProcessOplogEntry(t *testing.T) { "$unset": map[string]interface{}{ "c": "foo", }, - }, + }), Timestamp: primitive.Timestamp{T: 1234}, }, want: &decodedPublication{ @@ -139,7 +139,7 @@ func TestProcessOplogEntry(t *testing.T) { Namespace: "foo.bar", Database: "foo", Collection: "bar", - Data: bson.M{}, + Data: rawBson(t, bson.M{}), Timestamp: primitive.Timestamp{T: 1234}, }, want: &decodedPublication{ @@ -162,9 +162,9 @@ func TestProcessOplogEntry(t *testing.T) { Namespace: "foo.bar", Database: "foo", Collection: "bar", - Data: bson.M{ + Data: rawBson(t, bson.M{ "some": "field", - }, + }), Timestamp: primitive.Timestamp{T: 1234}, }, want: &decodedPublication{ @@ -190,9 +190,9 @@ func TestProcessOplogEntry(t *testing.T) { Namespace: "foo.bar", Database: "foo", Collection: "bar", - Data: bson.M{ + Data: rawBson(t, bson.M{ "some": "field", - }, + }), Timestamp: primitive.Timestamp{T: 1234}, }, wantError: ErrUnsupportedDocIDType, @@ -205,9 +205,9 @@ func TestProcessOplogEntry(t *testing.T) { Namespace: "foo.system.indexes", Database: "foo", Collection: "system.indexes", - Data: bson.M{ + Data: rawBson(t, bson.M{ "some": "field", - }, + }), Timestamp: primitive.Timestamp{T: 1234}, }, want: nil, @@ -223,12 +223,12 @@ func TestProcessOplogEntry(t *testing.T) { Database: "config", Collection: "transactions", TxIdx: 0, - Data: bson.M{ + Data: rawBson(t, bson.M{ "_id": bson.M{ "id": bson.M{"Subtype": 4, "Data": "dGVzdA=="}, "uid": bson.M{"Subtype": 0, "Data": "MTIz"}, }, - }, + }), Timestamp: primitive.Timestamp{T: 1636616135}, }, want: nil, diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index f5376332..36242eab 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -492,17 +492,11 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl switch entry.Operation { case operationInsert, operationUpdate, operationRemove: - var data map[string]interface{} - if err := bson.Unmarshal(entry.Doc, &data); err != nil { - log.Log.Errorf("unmarshalling oplog entry data: %v", err) - return nil - } - out := oplogEntry{ Operation: entry.Operation, Timestamp: entry.Timestamp, Namespace: entry.Namespace, - Data: data, + Data: entry.Doc, TxIdx: *txIdx, } @@ -514,7 +508,24 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl if out.Operation == operationUpdate { out.DocID = entry.Update.ID } else { - out.DocID = data["_id"] + idLookup := entry.Doc.Lookup("_id") + if idLookup.IsZero() { + log.Log.Errorf("failed to get objectId: _id is empty or not set") + return nil + } + oid, ok := idLookup.ObjectIDOK() + if ok { + // this is left as ObjectID type for now so it can be properly converted in processor.go:56 + out.DocID = oid + } else { + oidString, ok := idLookup.StringValueOK() + if ok { + out.DocID = oidString + } else { + log.Log.Errorf("failed to get objectId: _id is not ObjectID or String type") + return nil + } + } } return []oplogEntry{out} diff --git a/lib/oplog/tail_test.go b/lib/oplog/tail_test.go index 651e346d..d99364db 100644 --- a/lib/oplog/tail_test.go +++ b/lib/oplog/tail_test.go @@ -130,7 +130,7 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "i", Namespace: "foo.Bar", - Data: map[string]interface{}{"_id": "someid", "foo": "bar"}, + Data: rawBson(t, map[string]interface{}{"_id": "someid", "foo": "bar"}), DocID: interface{}("someid"), Database: "foo", Collection: "Bar", @@ -148,7 +148,7 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "u", Namespace: "foo.Bar", - Data: map[string]interface{}{"new": "data"}, + Data: rawBson(t, map[string]interface{}{"new": "data"}), DocID: interface{}("updateid"), Database: "foo", Collection: "Bar", @@ -165,7 +165,7 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "d", Namespace: "foo.Bar", - Data: map[string]interface{}{"_id": "someid"}, + Data: rawBson(t, map[string]interface{}{"_id": "someid"}), DocID: interface{}("someid"), Database: "foo", Collection: "Bar", @@ -238,10 +238,10 @@ func TestParseRawOplogEntry(t *testing.T) { Namespace: "foo.Bar", Database: "foo", Collection: "Bar", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "_id": "id1", "foo": "baz", - }, + }), TxIdx: 0, }, { @@ -251,10 +251,10 @@ func TestParseRawOplogEntry(t *testing.T) { Namespace: "foo.Bar", Database: "foo", Collection: "Bar", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "_id": "id1", "foo": "bar", - }, + }), TxIdx: 1, }, { @@ -264,9 +264,9 @@ func TestParseRawOplogEntry(t *testing.T) { Namespace: "foo.Bar", Database: "foo", Collection: "Bar", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "foo": "quux", - }, + }), TxIdx: 2, }, { @@ -276,9 +276,9 @@ func TestParseRawOplogEntry(t *testing.T) { Namespace: "foo.Bar", Database: "foo", Collection: "Bar", - Data: map[string]interface{}{ + Data: rawBson(t, map[string]interface{}{ "_id": "id3", - }, + }), TxIdx: 3, }, }, @@ -289,13 +289,46 @@ func TestParseRawOplogEntry(t *testing.T) { t.Run(testName, func(t *testing.T) { got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(test.in, nil) - if diff := pretty.Compare(got, test.want); diff != "" { + if diff := pretty.Compare(parseEntry(t, got), parseEntry(t, test.want)); diff != "" { t.Errorf("Got incorrect result (-got +want)\n%s", diff) } }) } } +type oplogEntryConverted struct { + DocID interface{} + Timestamp primitive.Timestamp + Data map[string]interface{} + Operation string + Namespace string + Database string + Collection string + + TxIdx uint +} + +func parseEntry(t *testing.T, op []oplogEntry) []oplogEntryConverted { + opc := make([]oplogEntryConverted, len(op)) + + for i := 0; i < len(op); i++ { + data := map[string]interface{}{} + err := bson.Unmarshal(op[i].Data, &data) + if err != nil { + t.Error("Error unmarshalling oplog data", err) + } + opc[i].DocID = op[i].DocID + opc[i].Timestamp = op[i].Timestamp + opc[i].Data = data + opc[i].Operation = op[i].Operation + opc[i].Namespace = op[i].Namespace + opc[i].Database = op[i].Database + opc[i].Collection = op[i].Collection + opc[i].TxIdx = op[i].TxIdx + } + return opc +} + func TestParseNamespace(t *testing.T) { tests := map[string]struct { in string diff --git a/testapp/Dockerfile b/testapp/Dockerfile index 583f5f98..84f04099 100644 --- a/testapp/Dockerfile +++ b/testapp/Dockerfile @@ -6,7 +6,7 @@ ENV METEOR_ALLOW_SUPERUSER=true RUN apt-get update && \ apt-get install -y g++ build-essential curl && \ rm -rf /var/lib/apt/lists/* && \ - curl https://install.meteor.com/ | sh + curl https://install.meteor.com/?release=2.13 | sh RUN meteor create --release 2.5.6 /throwaway && rm -rf /throwaway