Skip to content

Commit

Permalink
refactor of metadata processing function
Browse files Browse the repository at this point in the history
  • Loading branch information
eparker-tulip committed Nov 12, 2024
1 parent 4a0adb1 commit 4dae2ad
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 52 deletions.
84 changes: 58 additions & 26 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOr
continue
}

ts, pubs, sendMetricsData := tailer.unmarshalEntry(rawData, tailer.Denylist, readOrdinal)
ts, pubs, sendMetricsData := tailer.processEntry(rawData, tailer.Denylist, readOrdinal)

if ts != nil {
lastTimestamp = *ts
Expand Down Expand Up @@ -360,48 +360,80 @@ func closeCursor(cursor *mongo.Cursor) {
}
}

// unmarshalEntry unmarshals a single entry from the oplog.
//
// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it
// ignored it or failed at some later step.
func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) {
// unmarshalEntryMetadata processes the top-level data from an entry and returns a rawOplogEntry object.
func unmarshalEntryMetadata(rawData bson.Raw, denylist *sync.Map) *rawOplogEntry {
var result rawOplogEntry
var ok bool
result.Namespace, ok = rawData.Lookup("ns").StringValueOK()
if !ok {
log.Log.Error("Error unmarshalling oplog namespace entry")
nsLookup, err := rawData.LookupErr("ns");
if err == nil {
result.Namespace, ok = nsLookup.StringValueOK()
if !ok {
// this means there was a type mismatch
log.Log.Error("Error unmarshalling oplog namespace entry")
return nil
}
}

if len(result.Namespace) > 0 { // try to filter early if possible
// try to filter early if possible
if len(result.Namespace) > 0 {
db, _ := parseNamespace(result.Namespace)

if _, denied := denylist.Load(db); denied {
log.Log.Debugw("Skipping oplog entry", "database", db)
metricOplogEntriesFiltered.WithLabelValues(db).Add(1)

return
return nil
}
}

t, i, ok := rawData.Lookup("ts").TimestampOK()
if !ok {
log.Log.Warn("Error unmarshalling oplog timestamp entry")
tsLookup, err := rawData.LookupErr("ts")
if err == nil {
t, i, ok := tsLookup.TimestampOK()
if !ok {
log.Log.Error("Error unmarshalling oplog timestamp entry")
return nil
}
result.Timestamp = primitive.Timestamp{T: t, I: i}
}
result.Timestamp = primitive.Timestamp{T: t, I: i}

result.Operation, ok = rawData.Lookup("op").StringValueOK()
if !ok {
log.Log.Warn("Error unmarshalling oplog operation entry")
opLookup, err := rawData.LookupErr("op")
if err == nil {
result.Operation, ok = opLookup.StringValueOK()
if !ok {
log.Log.Error("Error unmarshalling oplog operation entry")
return nil
}
}

result.Doc, ok = rawData.Lookup("o").DocumentOK()
if !ok {
log.Log.Warn("Error unmarshalling oplog document entry")
oLookup, err := rawData.LookupErr("o")
if err == nil {
result.Doc, ok = oLookup.DocumentOK()
if !ok {
log.Log.Error("Error unmarshalling oplog document entry")
return nil
}
}

o2Lookup, err := rawData.LookupErr("o2")
if err == nil {
result.Update, ok = o2Lookup.DocumentOK()
if !ok {
log.Log.Error("Error unmarshalling oplog update entry")
return nil
}
}

result.Update, ok = rawData.Lookup("o2").DocumentOK()
if !ok {
log.Log.Debug("Error unmarshalling oplog update entry")
return &result
}

// processEntry processes a single entry from the oplog.
//
// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it
// ignored it or failed at some later step.
func (tailer *Tailer) processEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) {
result := unmarshalEntryMetadata(rawData, denylist)
if result == nil {
return
}

timestamp = &result.Timestamp
Expand Down Expand Up @@ -522,7 +554,7 @@ func parseID(idRaw bson.RawValue) (id interface{}, err error) {
}

// converts a rawOplogEntry to an oplogEntry
func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []oplogEntry {
func (tailer *Tailer) parseRawOplogEntry(entry *rawOplogEntry, txIdx *uint) []oplogEntry {
if txIdx == nil {
idx := uint(0)
txIdx = &idx
Expand Down Expand Up @@ -573,7 +605,7 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl

for _, v := range txData.ApplyOps {
v.Timestamp = entry.Timestamp
ret = append(ret, tailer.parseRawOplogEntry(v, txIdx)...)
ret = append(ret, tailer.parseRawOplogEntry(&v, txIdx)...)
}

return ret
Expand Down
42 changes: 16 additions & 26 deletions lib/oplog/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,6 @@ func TestGetStartTime(t *testing.T) {
}
}

func mustRaw(t *testing.T, data interface{}) bson.Raw {
b, err := bson.Marshal(data)
require.NoError(t, err)

var raw bson.Raw
require.NoError(t, bson.Unmarshal(b, &raw))

return raw
}

func TestParseRawOplogEntry(t *testing.T) {
tests := map[string]struct {
in rawOplogEntry
Expand All @@ -124,7 +114,7 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "i",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{"_id": "someid", "foo": "bar"}),
Doc: rawBson(t, map[string]interface{}{"_id": "someid", "foo": "bar"}),
},
want: []oplogEntry{{
Timestamp: primitive.Timestamp{T: 1234},
Expand All @@ -141,7 +131,7 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "u",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{"new": "data"}),
Doc: rawBson(t, map[string]interface{}{"new": "data"}),
Update: rawBson(t, map[string]interface{}{"_id": "updateid"}),
},
want: []oplogEntry{{
Expand All @@ -159,7 +149,7 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "d",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{"_id": "someid"}),
Doc: rawBson(t, map[string]interface{}{"_id": "someid"}),
},
want: []oplogEntry{{
Timestamp: primitive.Timestamp{T: 1234},
Expand All @@ -176,7 +166,7 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "c",
Namespace: "foo.$cmd",
Doc: mustRaw(t, map[string]interface{}{"drop": "Foo"}),
Doc: rawBson(t, map[string]interface{}{"drop": "Foo"}),
},
want: nil,
},
Expand All @@ -185,51 +175,51 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "c",
Namespace: "admin.$cmd",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"applyOps": []rawOplogEntry{
{
Timestamp: primitive.Timestamp{T: 1234},
Operation: "c",
Namespace: "admin.$cmd",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"applyOps": []rawOplogEntry{
{
Operation: "i",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"_id": "id1",
"foo": "baz",
}),
Update: mustRaw(t, map[string]interface{}{}),
Update: rawBson(t, map[string]interface{}{}),
},
},
}),
Update: mustRaw(t, map[string]interface{}{}),
Update: rawBson(t, map[string]interface{}{}),
},
{
Operation: "i",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"_id": "id1",
"foo": "bar",
}),
Update: mustRaw(t, map[string]interface{}{}),
Update: rawBson(t, map[string]interface{}{}),
},
{
Operation: "u",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"foo": "quux",
}),
Update: mustRaw(t, map[string]interface{}{"_id": "id2"}),
Update: rawBson(t, map[string]interface{}{"_id": "id2"}),
},
{
Operation: "d",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"_id": "id3",
}),
Update: mustRaw(t, map[string]interface{}{}),
Update: rawBson(t, map[string]interface{}{}),
},
},
}),
Expand Down Expand Up @@ -291,7 +281,7 @@ func TestParseRawOplogEntry(t *testing.T) {

for testName, test := range tests {
t.Run(testName, func(t *testing.T) {
got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(test.in, nil)
got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(&test.in, nil)

if diff := pretty.Compare(parseEntry(t, got), parseEntry(t, test.want)); diff != "" {
t.Errorf("Got incorrect result (-got +want)\n%s", diff)
Expand Down

0 comments on commit 4dae2ad

Please sign in to comment.