diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 00ded64..2b13091 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -208,7 +208,7 @@ func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOr continue } - ts, pubs, sendMetricsData := tailer.unmarshalEntry(rawData, tailer.Denylist, readOrdinal) + ts, pubs, sendMetricsData := tailer.processEntry(rawData, tailer.Denylist, readOrdinal) if ts != nil { lastTimestamp = *ts @@ -360,48 +360,80 @@ func closeCursor(cursor *mongo.Cursor) { } } -// unmarshalEntry unmarshals a single entry from the oplog. -// -// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it -// ignored it or failed at some later step. -func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) { +// unmarshalEntryMetadata processes the top-level data from an entry and returns a rawOplogEntry object. +func unmarshalEntryMetadata(rawData bson.Raw, denylist *sync.Map) *rawOplogEntry { var result rawOplogEntry var ok bool - result.Namespace, ok = rawData.Lookup("ns").StringValueOK() - if !ok { - log.Log.Error("Error unmarshalling oplog namespace entry") + nsLookup, err := rawData.LookupErr("ns"); + if err == nil { + result.Namespace, ok = nsLookup.StringValueOK() + if !ok { + // this means there was a type mismatch + log.Log.Error("Error unmarshalling oplog namespace entry") + return nil + } } - if len(result.Namespace) > 0 { // try to filter early if possible + // try to filter early if possible + if len(result.Namespace) > 0 { db, _ := parseNamespace(result.Namespace) if _, denied := denylist.Load(db); denied { log.Log.Debugw("Skipping oplog entry", "database", db) metricOplogEntriesFiltered.WithLabelValues(db).Add(1) - return + return nil } } - t, i, ok := rawData.Lookup("ts").TimestampOK() - if !ok { - log.Log.Warn("Error unmarshalling oplog timestamp entry") + tsLookup, err := rawData.LookupErr("ts") + if err == nil { + t, i, ok := tsLookup.TimestampOK() + if !ok { + log.Log.Error("Error unmarshalling oplog timestamp entry") + return nil + } + result.Timestamp = primitive.Timestamp{T: t, I: i} } - result.Timestamp = primitive.Timestamp{T: t, I: i} - result.Operation, ok = rawData.Lookup("op").StringValueOK() - if !ok { - log.Log.Warn("Error unmarshalling oplog operation entry") + opLookup, err := rawData.LookupErr("op") + if err == nil { + result.Operation, ok = opLookup.StringValueOK() + if !ok { + log.Log.Error("Error unmarshalling oplog operation entry") + return nil + } } - result.Doc, ok = rawData.Lookup("o").DocumentOK() - if !ok { - log.Log.Warn("Error unmarshalling oplog document entry") + oLookup, err := rawData.LookupErr("o") + if err == nil { + result.Doc, ok = oLookup.DocumentOK() + if !ok { + log.Log.Error("Error unmarshalling oplog document entry") + return nil + } + } + + o2Lookup, err := rawData.LookupErr("o2") + if err == nil { + result.Update, ok = o2Lookup.DocumentOK() + if !ok { + log.Log.Error("Error unmarshalling oplog update entry") + return nil + } } - result.Update, ok = rawData.Lookup("o2").DocumentOK() - if !ok { - log.Log.Debug("Error unmarshalling oplog update entry") + return &result +} + +// processEntry processes a single entry from the oplog. +// +// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it +// ignored it or failed at some later step. +func (tailer *Tailer) processEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) { + result := unmarshalEntryMetadata(rawData, denylist) + if result == nil { + return } timestamp = &result.Timestamp @@ -522,7 +554,7 @@ func parseID(idRaw bson.RawValue) (id interface{}, err error) { } // converts a rawOplogEntry to an oplogEntry -func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []oplogEntry { +func (tailer *Tailer) parseRawOplogEntry(entry *rawOplogEntry, txIdx *uint) []oplogEntry { if txIdx == nil { idx := uint(0) txIdx = &idx @@ -573,7 +605,7 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl for _, v := range txData.ApplyOps { v.Timestamp = entry.Timestamp - ret = append(ret, tailer.parseRawOplogEntry(v, txIdx)...) + ret = append(ret, tailer.parseRawOplogEntry(&v, txIdx)...) } return ret diff --git a/lib/oplog/tail_test.go b/lib/oplog/tail_test.go index 1504623..68e5e05 100644 --- a/lib/oplog/tail_test.go +++ b/lib/oplog/tail_test.go @@ -104,16 +104,6 @@ func TestGetStartTime(t *testing.T) { } } -func mustRaw(t *testing.T, data interface{}) bson.Raw { - b, err := bson.Marshal(data) - require.NoError(t, err) - - var raw bson.Raw - require.NoError(t, bson.Unmarshal(b, &raw)) - - return raw -} - func TestParseRawOplogEntry(t *testing.T) { tests := map[string]struct { in rawOplogEntry @@ -124,7 +114,7 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "i", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{"_id": "someid", "foo": "bar"}), + Doc: rawBson(t, map[string]interface{}{"_id": "someid", "foo": "bar"}), }, want: []oplogEntry{{ Timestamp: primitive.Timestamp{T: 1234}, @@ -141,7 +131,7 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "u", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{"new": "data"}), + Doc: rawBson(t, map[string]interface{}{"new": "data"}), Update: rawBson(t, map[string]interface{}{"_id": "updateid"}), }, want: []oplogEntry{{ @@ -159,7 +149,7 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "d", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{"_id": "someid"}), + Doc: rawBson(t, map[string]interface{}{"_id": "someid"}), }, want: []oplogEntry{{ Timestamp: primitive.Timestamp{T: 1234}, @@ -176,7 +166,7 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "c", Namespace: "foo.$cmd", - Doc: mustRaw(t, map[string]interface{}{"drop": "Foo"}), + Doc: rawBson(t, map[string]interface{}{"drop": "Foo"}), }, want: nil, }, @@ -185,51 +175,51 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "c", Namespace: "admin.$cmd", - Doc: mustRaw(t, map[string]interface{}{ + Doc: rawBson(t, map[string]interface{}{ "applyOps": []rawOplogEntry{ { Timestamp: primitive.Timestamp{T: 1234}, Operation: "c", Namespace: "admin.$cmd", - Doc: mustRaw(t, map[string]interface{}{ + Doc: rawBson(t, map[string]interface{}{ "applyOps": []rawOplogEntry{ { Operation: "i", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{ + Doc: rawBson(t, map[string]interface{}{ "_id": "id1", "foo": "baz", }), - Update: mustRaw(t, map[string]interface{}{}), + Update: rawBson(t, map[string]interface{}{}), }, }, }), - Update: mustRaw(t, map[string]interface{}{}), + Update: rawBson(t, map[string]interface{}{}), }, { Operation: "i", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{ + Doc: rawBson(t, map[string]interface{}{ "_id": "id1", "foo": "bar", }), - Update: mustRaw(t, map[string]interface{}{}), + Update: rawBson(t, map[string]interface{}{}), }, { Operation: "u", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{ + Doc: rawBson(t, map[string]interface{}{ "foo": "quux", }), - Update: mustRaw(t, map[string]interface{}{"_id": "id2"}), + Update: rawBson(t, map[string]interface{}{"_id": "id2"}), }, { Operation: "d", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{ + Doc: rawBson(t, map[string]interface{}{ "_id": "id3", }), - Update: mustRaw(t, map[string]interface{}{}), + Update: rawBson(t, map[string]interface{}{}), }, }, }), @@ -291,7 +281,7 @@ func TestParseRawOplogEntry(t *testing.T) { for testName, test := range tests { t.Run(testName, func(t *testing.T) { - got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(test.in, nil) + got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(&test.in, nil) if diff := pretty.Compare(parseEntry(t, got), parseEntry(t, test.want)); diff != "" { t.Errorf("Got incorrect result (-got +want)\n%s", diff)