From e60bdd732fe8ded69d92075ff1c8ea75c5953d63 Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Wed, 30 Oct 2024 13:39:05 -0500 Subject: [PATCH 1/6] initial commit --- lib/oplog/tail.go | 77 ++++++++++++++++++++++++++++++------------ lib/oplog/tail_test.go | 8 +++-- 2 files changed, 61 insertions(+), 24 deletions(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 84e6da3..705bc50 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -37,12 +37,10 @@ type Tailer struct { // Raw oplog entry from Mongo type rawOplogEntry struct { Timestamp primitive.Timestamp `bson:"ts"` - HistoryID int64 `bson:"h"` - MongoVersion int `bson:"v"` Operation string `bson:"op"` Namespace string `bson:"ns"` Doc bson.Raw `bson:"o"` - Update rawOplogEntryID `bson:"o2"` + Update bson.Raw `bson:"o2"` } // Parsed Cursor Result @@ -52,10 +50,6 @@ type cursorResultStatus struct { DidLosePosition bool } -type rawOplogEntryID struct { - ID interface{} `bson:"_id"` -} - const requeryDuration = time.Second var ( @@ -372,11 +366,42 @@ func closeCursor(cursor *mongo.Cursor) { // ignored it or failed at some later step. func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) { var result rawOplogEntry + var ok bool + result.Namespace, ok = rawData.Lookup("ns").StringValueOK() + if !ok { + log.Log.Error("Error unmarshalling oplog namespace entry") + } - err := bson.Unmarshal(rawData, &result) - if err != nil { - log.Log.Errorw("Error unmarshalling oplog entry", "error", err) - return + if len(result.Namespace) > 0 { // try to filter early if possible + db, _ := parseNamespace(result.Namespace) + + if _, denied := denylist.Load(db); denied { + log.Log.Debugw("Skipping oplog entry", "database", db) + metricOplogEntriesFiltered.WithLabelValues(db).Add(1) + + return + } + } + + t, i, ok := rawData.Lookup("ts").TimestampOK() + if !ok { + log.Log.Warn("Error unmarshalling oplog timestamp entry") + } + result.Timestamp = primitive.Timestamp{T: t, I: i} + + result.Operation, ok = rawData.Lookup("op").StringValueOK() + if !ok { + log.Log.Warn("Error unmarshalling oplog operation entry") + } + + result.Doc, ok = rawData.Lookup("o").DocumentOK() + if !ok { + log.Log.Warn("Error unmarshalling oplog document entry") + } + + result.Update, ok = rawData.Lookup("o2").DocumentOK() + if !ok { + log.Log.Debug("Error unmarshalling oplog update entry") } timestamp = &result.Timestamp @@ -483,6 +508,19 @@ func (tailer *Tailer) getStartTime(maxOrdinal int, getTimestampOfLastOplogEntry return primitive.Timestamp{T: uint32(time.Now().Unix() << 32)} } +func parseID(idRaw bson.RawValue) (id interface{}, err error) { + if idRaw.IsZero() { + log.Log.Error("failed to get objectId: _id is empty or not set") + err = errors.New("empty or missing objectId") + return + } + err = idRaw.Unmarshal(&id) + if err != nil { + log.Log.Errorf("failed to unmarshal objectId: %v", err) + } + return +} + // converts a rawOplogEntry to an oplogEntry func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []oplogEntry { if txIdx == nil { @@ -505,19 +543,14 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl out.Database, out.Collection = parseNamespace(out.Namespace) + var errID error if out.Operation == operationUpdate { - out.DocID = entry.Update.ID + out.DocID, errID = parseID(entry.Update.Lookup("_id")) } else { - idLookup := entry.Doc.Lookup("_id") - if idLookup.IsZero() { - log.Log.Error("failed to get objectId: _id is empty or not set") - return nil - } - err := idLookup.Unmarshal(&out.DocID) - if err != nil { - log.Log.Errorf("failed to unmarshal objectId: %v", err) - return nil - } + out.DocID, errID = parseID(entry.Doc.Lookup("_id")) + } + if(errID != nil) { + return nil } return []oplogEntry{out} diff --git a/lib/oplog/tail_test.go b/lib/oplog/tail_test.go index d99364d..1504623 100644 --- a/lib/oplog/tail_test.go +++ b/lib/oplog/tail_test.go @@ -142,7 +142,7 @@ func TestParseRawOplogEntry(t *testing.T) { Operation: "u", Namespace: "foo.Bar", Doc: mustRaw(t, map[string]interface{}{"new": "data"}), - Update: rawOplogEntryID{ID: "updateid"}, + Update: rawBson(t, map[string]interface{}{"_id": "updateid"}), }, want: []oplogEntry{{ Timestamp: primitive.Timestamp{T: 1234}, @@ -200,9 +200,11 @@ func TestParseRawOplogEntry(t *testing.T) { "_id": "id1", "foo": "baz", }), + Update: mustRaw(t, map[string]interface{}{}), }, }, }), + Update: mustRaw(t, map[string]interface{}{}), }, { Operation: "i", @@ -211,6 +213,7 @@ func TestParseRawOplogEntry(t *testing.T) { "_id": "id1", "foo": "bar", }), + Update: mustRaw(t, map[string]interface{}{}), }, { Operation: "u", @@ -218,7 +221,7 @@ func TestParseRawOplogEntry(t *testing.T) { Doc: mustRaw(t, map[string]interface{}{ "foo": "quux", }), - Update: rawOplogEntryID{"id2"}, + Update: mustRaw(t, map[string]interface{}{"_id": "id2"}), }, { Operation: "d", @@ -226,6 +229,7 @@ func TestParseRawOplogEntry(t *testing.T) { Doc: mustRaw(t, map[string]interface{}{ "_id": "id3", }), + Update: mustRaw(t, map[string]interface{}{}), }, }, }), From 4a0adb1ea18d943d20ec8310dd400cabf73b0aa3 Mon Sep 17 00:00:00 2001 From: Elijah Parker <114941210+eparker-tulip@users.noreply.github.com> Date: Fri, 8 Nov 2024 15:29:36 -0600 Subject: [PATCH 2/6] Update lib/oplog/tail.go Co-authored-by: sannivasreddy <94570377+sannivasreddy@users.noreply.github.com> --- lib/oplog/tail.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 705bc50..00ded64 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -549,7 +549,7 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl } else { out.DocID, errID = parseID(entry.Doc.Lookup("_id")) } - if(errID != nil) { + if errID != nil { return nil } From 4dae2ad393f851e9c99d4865f6d7fcecab81e8b4 Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Tue, 12 Nov 2024 10:38:39 -0600 Subject: [PATCH 3/6] refactor of metadata processing function --- lib/oplog/tail.go | 84 +++++++++++++++++++++++++++++------------- lib/oplog/tail_test.go | 42 ++++++++------------- 2 files changed, 74 insertions(+), 52 deletions(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 00ded64..2b13091 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -208,7 +208,7 @@ func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOr continue } - ts, pubs, sendMetricsData := tailer.unmarshalEntry(rawData, tailer.Denylist, readOrdinal) + ts, pubs, sendMetricsData := tailer.processEntry(rawData, tailer.Denylist, readOrdinal) if ts != nil { lastTimestamp = *ts @@ -360,48 +360,80 @@ func closeCursor(cursor *mongo.Cursor) { } } -// unmarshalEntry unmarshals a single entry from the oplog. -// -// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it -// ignored it or failed at some later step. -func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) { +// unmarshalEntryMetadata processes the top-level data from an entry and returns a rawOplogEntry object. +func unmarshalEntryMetadata(rawData bson.Raw, denylist *sync.Map) *rawOplogEntry { var result rawOplogEntry var ok bool - result.Namespace, ok = rawData.Lookup("ns").StringValueOK() - if !ok { - log.Log.Error("Error unmarshalling oplog namespace entry") + nsLookup, err := rawData.LookupErr("ns"); + if err == nil { + result.Namespace, ok = nsLookup.StringValueOK() + if !ok { + // this means there was a type mismatch + log.Log.Error("Error unmarshalling oplog namespace entry") + return nil + } } - if len(result.Namespace) > 0 { // try to filter early if possible + // try to filter early if possible + if len(result.Namespace) > 0 { db, _ := parseNamespace(result.Namespace) if _, denied := denylist.Load(db); denied { log.Log.Debugw("Skipping oplog entry", "database", db) metricOplogEntriesFiltered.WithLabelValues(db).Add(1) - return + return nil } } - t, i, ok := rawData.Lookup("ts").TimestampOK() - if !ok { - log.Log.Warn("Error unmarshalling oplog timestamp entry") + tsLookup, err := rawData.LookupErr("ts") + if err == nil { + t, i, ok := tsLookup.TimestampOK() + if !ok { + log.Log.Error("Error unmarshalling oplog timestamp entry") + return nil + } + result.Timestamp = primitive.Timestamp{T: t, I: i} } - result.Timestamp = primitive.Timestamp{T: t, I: i} - result.Operation, ok = rawData.Lookup("op").StringValueOK() - if !ok { - log.Log.Warn("Error unmarshalling oplog operation entry") + opLookup, err := rawData.LookupErr("op") + if err == nil { + result.Operation, ok = opLookup.StringValueOK() + if !ok { + log.Log.Error("Error unmarshalling oplog operation entry") + return nil + } } - result.Doc, ok = rawData.Lookup("o").DocumentOK() - if !ok { - log.Log.Warn("Error unmarshalling oplog document entry") + oLookup, err := rawData.LookupErr("o") + if err == nil { + result.Doc, ok = oLookup.DocumentOK() + if !ok { + log.Log.Error("Error unmarshalling oplog document entry") + return nil + } + } + + o2Lookup, err := rawData.LookupErr("o2") + if err == nil { + result.Update, ok = o2Lookup.DocumentOK() + if !ok { + log.Log.Error("Error unmarshalling oplog update entry") + return nil + } } - result.Update, ok = rawData.Lookup("o2").DocumentOK() - if !ok { - log.Log.Debug("Error unmarshalling oplog update entry") + return &result +} + +// processEntry processes a single entry from the oplog. +// +// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it +// ignored it or failed at some later step. +func (tailer *Tailer) processEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) { + result := unmarshalEntryMetadata(rawData, denylist) + if result == nil { + return } timestamp = &result.Timestamp @@ -522,7 +554,7 @@ func parseID(idRaw bson.RawValue) (id interface{}, err error) { } // converts a rawOplogEntry to an oplogEntry -func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []oplogEntry { +func (tailer *Tailer) parseRawOplogEntry(entry *rawOplogEntry, txIdx *uint) []oplogEntry { if txIdx == nil { idx := uint(0) txIdx = &idx @@ -573,7 +605,7 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl for _, v := range txData.ApplyOps { v.Timestamp = entry.Timestamp - ret = append(ret, tailer.parseRawOplogEntry(v, txIdx)...) + ret = append(ret, tailer.parseRawOplogEntry(&v, txIdx)...) } return ret diff --git a/lib/oplog/tail_test.go b/lib/oplog/tail_test.go index 1504623..68e5e05 100644 --- a/lib/oplog/tail_test.go +++ b/lib/oplog/tail_test.go @@ -104,16 +104,6 @@ func TestGetStartTime(t *testing.T) { } } -func mustRaw(t *testing.T, data interface{}) bson.Raw { - b, err := bson.Marshal(data) - require.NoError(t, err) - - var raw bson.Raw - require.NoError(t, bson.Unmarshal(b, &raw)) - - return raw -} - func TestParseRawOplogEntry(t *testing.T) { tests := map[string]struct { in rawOplogEntry @@ -124,7 +114,7 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "i", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{"_id": "someid", "foo": "bar"}), + Doc: rawBson(t, map[string]interface{}{"_id": "someid", "foo": "bar"}), }, want: []oplogEntry{{ Timestamp: primitive.Timestamp{T: 1234}, @@ -141,7 +131,7 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "u", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{"new": "data"}), + Doc: rawBson(t, map[string]interface{}{"new": "data"}), Update: rawBson(t, map[string]interface{}{"_id": "updateid"}), }, want: []oplogEntry{{ @@ -159,7 +149,7 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "d", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{"_id": "someid"}), + Doc: rawBson(t, map[string]interface{}{"_id": "someid"}), }, want: []oplogEntry{{ Timestamp: primitive.Timestamp{T: 1234}, @@ -176,7 +166,7 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "c", Namespace: "foo.$cmd", - Doc: mustRaw(t, map[string]interface{}{"drop": "Foo"}), + Doc: rawBson(t, map[string]interface{}{"drop": "Foo"}), }, want: nil, }, @@ -185,51 +175,51 @@ func TestParseRawOplogEntry(t *testing.T) { Timestamp: primitive.Timestamp{T: 1234}, Operation: "c", Namespace: "admin.$cmd", - Doc: mustRaw(t, map[string]interface{}{ + Doc: rawBson(t, map[string]interface{}{ "applyOps": []rawOplogEntry{ { Timestamp: primitive.Timestamp{T: 1234}, Operation: "c", Namespace: "admin.$cmd", - Doc: mustRaw(t, map[string]interface{}{ + Doc: rawBson(t, map[string]interface{}{ "applyOps": []rawOplogEntry{ { Operation: "i", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{ + Doc: rawBson(t, map[string]interface{}{ "_id": "id1", "foo": "baz", }), - Update: mustRaw(t, map[string]interface{}{}), + Update: rawBson(t, map[string]interface{}{}), }, }, }), - Update: mustRaw(t, map[string]interface{}{}), + Update: rawBson(t, map[string]interface{}{}), }, { Operation: "i", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{ + Doc: rawBson(t, map[string]interface{}{ "_id": "id1", "foo": "bar", }), - Update: mustRaw(t, map[string]interface{}{}), + Update: rawBson(t, map[string]interface{}{}), }, { Operation: "u", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{ + Doc: rawBson(t, map[string]interface{}{ "foo": "quux", }), - Update: mustRaw(t, map[string]interface{}{"_id": "id2"}), + Update: rawBson(t, map[string]interface{}{"_id": "id2"}), }, { Operation: "d", Namespace: "foo.Bar", - Doc: mustRaw(t, map[string]interface{}{ + Doc: rawBson(t, map[string]interface{}{ "_id": "id3", }), - Update: mustRaw(t, map[string]interface{}{}), + Update: rawBson(t, map[string]interface{}{}), }, }, }), @@ -291,7 +281,7 @@ func TestParseRawOplogEntry(t *testing.T) { for testName, test := range tests { t.Run(testName, func(t *testing.T) { - got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(test.in, nil) + got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(&test.in, nil) if diff := pretty.Compare(parseEntry(t, got), parseEntry(t, test.want)); diff != "" { t.Errorf("Got incorrect result (-got +want)\n%s", diff) From 7101ad417b9d02d031c8169544ef1f7f8c82ea35 Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Tue, 12 Nov 2024 10:44:57 -0600 Subject: [PATCH 4/6] check for special case admin. namespace before filtering --- lib/oplog/tail.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 2b13091..665ce15 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -375,13 +375,11 @@ func unmarshalEntryMetadata(rawData bson.Raw, denylist *sync.Map) *rawOplogEntry } // try to filter early if possible - if len(result.Namespace) > 0 { + if len(result.Namespace) > 0 && result.Namespace != "admin.$cmd" { db, _ := parseNamespace(result.Namespace) - if _, denied := denylist.Load(db); denied { log.Log.Debugw("Skipping oplog entry", "database", db) metricOplogEntriesFiltered.WithLabelValues(db).Add(1) - return nil } } From 47dc4fad41a841e21aa0819a833df977d86d0fa7 Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Tue, 12 Nov 2024 13:35:23 -0600 Subject: [PATCH 5/6] refactor denylist, use raw processor for transactions as well --- lib/oplog/tail.go | 179 ++++++++++++++++++++++++---------------------- 1 file changed, 93 insertions(+), 86 deletions(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 665ce15..84fdc73 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -208,7 +208,7 @@ func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOr continue } - ts, pubs, sendMetricsData := tailer.processEntry(rawData, tailer.Denylist, readOrdinal) + ts, pubs, sendMetricsData := tailer.processEntry(rawData, readOrdinal) if ts != nil { lastTimestamp = *ts @@ -360,76 +360,12 @@ func closeCursor(cursor *mongo.Cursor) { } } -// unmarshalEntryMetadata processes the top-level data from an entry and returns a rawOplogEntry object. -func unmarshalEntryMetadata(rawData bson.Raw, denylist *sync.Map) *rawOplogEntry { - var result rawOplogEntry - var ok bool - nsLookup, err := rawData.LookupErr("ns"); - if err == nil { - result.Namespace, ok = nsLookup.StringValueOK() - if !ok { - // this means there was a type mismatch - log.Log.Error("Error unmarshalling oplog namespace entry") - return nil - } - } - - // try to filter early if possible - if len(result.Namespace) > 0 && result.Namespace != "admin.$cmd" { - db, _ := parseNamespace(result.Namespace) - if _, denied := denylist.Load(db); denied { - log.Log.Debugw("Skipping oplog entry", "database", db) - metricOplogEntriesFiltered.WithLabelValues(db).Add(1) - return nil - } - } - - tsLookup, err := rawData.LookupErr("ts") - if err == nil { - t, i, ok := tsLookup.TimestampOK() - if !ok { - log.Log.Error("Error unmarshalling oplog timestamp entry") - return nil - } - result.Timestamp = primitive.Timestamp{T: t, I: i} - } - - opLookup, err := rawData.LookupErr("op") - if err == nil { - result.Operation, ok = opLookup.StringValueOK() - if !ok { - log.Log.Error("Error unmarshalling oplog operation entry") - return nil - } - } - - oLookup, err := rawData.LookupErr("o") - if err == nil { - result.Doc, ok = oLookup.DocumentOK() - if !ok { - log.Log.Error("Error unmarshalling oplog document entry") - return nil - } - } - - o2Lookup, err := rawData.LookupErr("o2") - if err == nil { - result.Update, ok = o2Lookup.DocumentOK() - if !ok { - log.Log.Error("Error unmarshalling oplog update entry") - return nil - } - } - - return &result -} - // processEntry processes a single entry from the oplog. // // The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it // ignored it or failed at some later step. -func (tailer *Tailer) processEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) { - result := unmarshalEntryMetadata(rawData, denylist) +func (tailer *Tailer) processEntry(rawData bson.Raw, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) { + result := tailer.unmarshalEntryMetadata(rawData) if result == nil { return } @@ -453,17 +389,6 @@ func (tailer *Tailer) processEntry(rawData bson.Raw, denylist *sync.Map, readOrd metricLastReceivedStaleness.WithLabelValues(strconv.Itoa(readOrdinal)).Set(float64(time.Since(time.Unix(int64(timestamp.T), 0)))) } - if len(entries) > 0 { - database = entries[0].Database - } - - if _, denied := denylist.Load(database); denied { - log.Log.Debugw("Skipping oplog entry", "database", database) - metricOplogEntriesFiltered.WithLabelValues(database).Add(1) - - return - } - type errEntry struct { err error op *oplogEntry @@ -551,6 +476,72 @@ func parseID(idRaw bson.RawValue) (id interface{}, err error) { return } +// unmarshalEntryMetadata processes the top-level data from an entry and returns a rawOplogEntry object. +// This avoids using bson.Unmarshal on the whole document as that has very poor performance, even with the +// bson.Raw type to limit depth. While messy, using these raw bson methods here provides far better performance. +func (tailer *Tailer) unmarshalEntryMetadata(rawData bson.Raw) *rawOplogEntry { + var result rawOplogEntry + var ok bool + nsLookup, err := rawData.LookupErr("ns"); + if err == nil { + result.Namespace, ok = nsLookup.StringValueOK() + if !ok { + // this means there was a type mismatch + log.Log.Error("Error unmarshalling oplog namespace entry") + return nil + } + } + + // filter if db is in denylist + if len(result.Namespace) > 0 && result.Namespace != "admin.$cmd" { + db, _ := parseNamespace(result.Namespace) + if _, denied := tailer.Denylist.Load(db); denied { + log.Log.Debugw("Skipping oplog entry", "database", db) + metricOplogEntriesFiltered.WithLabelValues(db).Add(1) + return nil + } + } + + tsLookup, err := rawData.LookupErr("ts") + if err == nil { + t, i, ok := tsLookup.TimestampOK() + if !ok { + log.Log.Error("Error unmarshalling oplog timestamp entry") + return nil + } + result.Timestamp = primitive.Timestamp{T: t, I: i} + } + + opLookup, err := rawData.LookupErr("op") + if err == nil { + result.Operation, ok = opLookup.StringValueOK() + if !ok { + log.Log.Error("Error unmarshalling oplog operation entry") + return nil + } + } + + oLookup, err := rawData.LookupErr("o") + if err == nil { + result.Doc, ok = oLookup.DocumentOK() + if !ok { + log.Log.Error("Error unmarshalling oplog document entry") + return nil + } + } + + o2Lookup, err := rawData.LookupErr("o2") + if err == nil { + result.Update, ok = o2Lookup.DocumentOK() + if !ok { + log.Log.Error("Error unmarshalling oplog update entry") + return nil + } + } + + return &result +} + // converts a rawOplogEntry to an oplogEntry func (tailer *Tailer) parseRawOplogEntry(entry *rawOplogEntry, txIdx *uint) []oplogEntry { if txIdx == nil { @@ -590,20 +581,36 @@ func (tailer *Tailer) parseRawOplogEntry(entry *rawOplogEntry, txIdx *uint) []op return nil } - var txData struct { - ApplyOps []rawOplogEntry `bson:"applyOps"` + applyOpsLookup, err := entry.Doc.LookupErr("applyOps") + if err != nil { + log.Log.Errorf("Looking up transaction data: %v", err) + return nil } - if err := bson.Unmarshal(entry.Doc, &txData); err != nil { - log.Log.Errorf("unmarshaling transaction data: %v", err) + applyOpsValues, ok := applyOpsLookup.ArrayOK() + if !ok { + log.Log.Error("Failed to access transaction data as array") return nil } - var ret []oplogEntry + applyOpsArray, err := applyOpsValues.Values() + if err != nil { + log.Log.Errorf("Getting transaction ops array: %v", err) + return nil + } - for _, v := range txData.ApplyOps { - v.Timestamp = entry.Timestamp - ret = append(ret, tailer.parseRawOplogEntry(&v, txIdx)...) + var ret []oplogEntry + for _, rawEntry := range applyOpsArray { + rawDoc, ok := rawEntry.DocumentOK() + if ok { + v := tailer.unmarshalEntryMetadata(rawDoc) + if v != nil { + v.Timestamp = entry.Timestamp + ret = append(ret, tailer.parseRawOplogEntry(v, txIdx)...) + } + } else { + log.Log.Error("Getting transaction op doc") + } } return ret From da48e3a8654fb8942826e1b30dabda9ea98179b1 Mon Sep 17 00:00:00 2001 From: eparker-tulip Date: Tue, 12 Nov 2024 13:50:48 -0600 Subject: [PATCH 6/6] populate db var for metrics --- lib/oplog/tail.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 84fdc73..7ca30e9 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -378,6 +378,10 @@ func (tailer *Tailer) processEntry(rawData bson.Raw, readOrdinal int) (timestamp status := "ignored" database := "(no database)" messageLen := float64(len(rawData)) + + if len(entries) > 0 { + database = entries[0].Database + } sendMetricsData = func() { // TODO: remove these in a future version