Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve initial metadata read using bson instead of unmarshal #88

Merged
merged 6 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 91 additions & 28 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ type Tailer struct {
// Raw oplog entry from Mongo
type rawOplogEntry struct {
Timestamp primitive.Timestamp `bson:"ts"`
HistoryID int64 `bson:"h"`
MongoVersion int `bson:"v"`
Operation string `bson:"op"`
Namespace string `bson:"ns"`
Doc bson.Raw `bson:"o"`
Update rawOplogEntryID `bson:"o2"`
Update bson.Raw `bson:"o2"`
}

// Parsed Cursor Result
Expand All @@ -52,10 +50,6 @@ type cursorResultStatus struct {
DidLosePosition bool
}

type rawOplogEntryID struct {
ID interface{} `bson:"_id"`
}

const requeryDuration = time.Second

var (
Expand Down Expand Up @@ -214,7 +208,7 @@ func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOr
continue
}

ts, pubs, sendMetricsData := tailer.unmarshalEntry(rawData, tailer.Denylist, readOrdinal)
ts, pubs, sendMetricsData := tailer.processEntry(rawData, tailer.Denylist, readOrdinal)

if ts != nil {
lastTimestamp = *ts
Expand Down Expand Up @@ -366,16 +360,77 @@ func closeCursor(cursor *mongo.Cursor) {
}
}

// unmarshalEntry unmarshals a single entry from the oplog.
// unmarshalEntryMetadata processes the top-level data from an entry and returns a rawOplogEntry object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to say a bit more about the performance issue with the usual unmarshalling, explaining why we need to do all these manual lookups.

func unmarshalEntryMetadata(rawData bson.Raw, denylist *sync.Map) *rawOplogEntry {
var result rawOplogEntry
var ok bool
nsLookup, err := rawData.LookupErr("ns");
if err == nil {
result.Namespace, ok = nsLookup.StringValueOK()
if !ok {
// this means there was a type mismatch
log.Log.Error("Error unmarshalling oplog namespace entry")
return nil
}
}

// try to filter early if possible
if len(result.Namespace) > 0 && result.Namespace != "admin.$cmd" {
db, _ := parseNamespace(result.Namespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're doing a parse here, since we're not unmarshalling into the rawOplogEntry struct manually anymore, it might be worth it to store the returned database and collection name we get here in that struct, and then skipping the namespace parsing which happens later in parseRawOplogEntry by using the stored ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but it's more complicated because while it would work for the first one, parseRawOplogEntry is recursive for transactions, so it still needs to be done there anyway. And the parse is a pretty lightweight call, so to me this seems ok for the sake of simplicity.

if _, denied := denylist.Load(db); denied {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check the deny list again in line 420, which is probably not necessary anymore. That check would only do anything if the namespace exists and is parseable so we cover that case here.

Copy link
Contributor Author

@eparker-tulip eparker-tulip Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is that the second check is post-transaction, the way the original behavior was (though still it only checks the first entry in the transaction). But maybe we don't care about transactions for filtering? This might be good to confirm with @alex-goodisman, who originally added the denylist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think what you currently have is right. Rereading the parse code, it seems if there’s a command operation, the top level entry’s namespace is just admin.$cmd, so we probably do need to check that first sub-entry in the denylist. I’m also realising that we also now check this case (i.e. checking the namespace admin.$cmd) in the new denylist check, while we never had to do that before. Should be fine since admin shouldn’t be in the denylist, but something to keep in mind.

log.Log.Debugw("Skipping oplog entry", "database", db)
metricOplogEntriesFiltered.WithLabelValues(db).Add(1)
return nil
}
}

tsLookup, err := rawData.LookupErr("ts")
if err == nil {
t, i, ok := tsLookup.TimestampOK()
if !ok {
log.Log.Error("Error unmarshalling oplog timestamp entry")
return nil
}
result.Timestamp = primitive.Timestamp{T: t, I: i}
}

opLookup, err := rawData.LookupErr("op")
if err == nil {
result.Operation, ok = opLookup.StringValueOK()
if !ok {
log.Log.Error("Error unmarshalling oplog operation entry")
return nil
}
}

oLookup, err := rawData.LookupErr("o")
if err == nil {
result.Doc, ok = oLookup.DocumentOK()
if !ok {
log.Log.Error("Error unmarshalling oplog document entry")
return nil
}
}

o2Lookup, err := rawData.LookupErr("o2")
if err == nil {
result.Update, ok = o2Lookup.DocumentOK()
if !ok {
log.Log.Error("Error unmarshalling oplog update entry")
return nil
}
}

return &result
}

// processEntry processes a single entry from the oplog.
//
// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it
// ignored it or failed at some later step.
func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) {
var result rawOplogEntry

err := bson.Unmarshal(rawData, &result)
if err != nil {
log.Log.Errorw("Error unmarshalling oplog entry", "error", err)
func (tailer *Tailer) processEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) {
result := unmarshalEntryMetadata(rawData, denylist)
if result == nil {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this code, if there is any error in the unmarshal, we just returned here. Now, even if the various lookups or type checks fail, we log a message and continue. Should we just return if any step of this lookup process fails?

I'm also curious about when the previous unmarshal used to error. Did it error if the struct field type didn't match the type in the document? Probably. The change I proposed above deals with that. However, did it error if a matching key in the document for the struct wasn't found? I don't think so, and doing my change would mean different behavior in that case. I think looking into when bson unmarshalling errors and making sure behavior is the same as before (splitting the lookup and type check if necessary) is a good move.

Copy link
Contributor Author

@eparker-tulip eparker-tulip Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did it error if the struct field type didn't match the type in the document?

Yes, I tested this. If a field was missing or extra, there was no error. Only if a field existed but with a different type would the original unmarshal error. So this tries to mostly follow that pattern, but does error on namespace which would seem to be a required field. However, maybe none are -- I don't know what namespace would be set to in the case of a transaction. In that case, splitting up the lookup and type conversion and exactly matching the old behavior is probably best -- I had started that way, but wanted to avoid it since it makes for terribly messy code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it does get gnarly. Maybe put the conversion from bson to rawOplogEntry in a helper function? That might help that, and also make it easier to make changes in the future if necessary,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work -- I'd also have to pass in the denylist to allow for the early filtering, but that's probably fine

}

Expand Down Expand Up @@ -483,8 +538,21 @@ func (tailer *Tailer) getStartTime(maxOrdinal int, getTimestampOfLastOplogEntry
return primitive.Timestamp{T: uint32(time.Now().Unix() << 32)}
}

func parseID(idRaw bson.RawValue) (id interface{}, err error) {
if idRaw.IsZero() {
log.Log.Error("failed to get objectId: _id is empty or not set")
err = errors.New("empty or missing objectId")
return
}
err = idRaw.Unmarshal(&id)
if err != nil {
log.Log.Errorf("failed to unmarshal objectId: %v", err)
}
return
}

// converts a rawOplogEntry to an oplogEntry
func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []oplogEntry {
func (tailer *Tailer) parseRawOplogEntry(entry *rawOplogEntry, txIdx *uint) []oplogEntry {
if txIdx == nil {
idx := uint(0)
txIdx = &idx
Expand All @@ -505,19 +573,14 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl

out.Database, out.Collection = parseNamespace(out.Namespace)

var errID error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: What about calling this just err? errID sounds like it stores an error ID, rather than it being the error for the ID parsing part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a pattern Yotam likes -- to be clear what each error variable is for, rather than just always err. But I'm fine either way, and I agree this could be confusing.

if out.Operation == operationUpdate {
out.DocID = entry.Update.ID
out.DocID, errID = parseID(entry.Update.Lookup("_id"))
} else {
idLookup := entry.Doc.Lookup("_id")
if idLookup.IsZero() {
log.Log.Error("failed to get objectId: _id is empty or not set")
return nil
}
err := idLookup.Unmarshal(&out.DocID)
if err != nil {
log.Log.Errorf("failed to unmarshal objectId: %v", err)
return nil
}
out.DocID, errID = parseID(entry.Doc.Lookup("_id"))
}
if errID != nil {
return nil
}

return []oplogEntry{out}
Expand All @@ -540,7 +603,7 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's worth it to replace the bson unmarshalling in the transaction case as well? We'd probably have to add a new function since we're now looking at a document array. I don't think this is necessary currently, but I just thought I'd mention it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right -- I actually was intending to do this when I split off the function and then forgot after the long weekend

for _, v := range txData.ApplyOps {
v.Timestamp = entry.Timestamp
ret = append(ret, tailer.parseRawOplogEntry(v, txIdx)...)
ret = append(ret, tailer.parseRawOplogEntry(&v, txIdx)...)
}

return ret
Expand Down
40 changes: 17 additions & 23 deletions lib/oplog/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,6 @@ func TestGetStartTime(t *testing.T) {
}
}

func mustRaw(t *testing.T, data interface{}) bson.Raw {
b, err := bson.Marshal(data)
require.NoError(t, err)

var raw bson.Raw
require.NoError(t, bson.Unmarshal(b, &raw))

return raw
}

func TestParseRawOplogEntry(t *testing.T) {
tests := map[string]struct {
in rawOplogEntry
Expand All @@ -124,7 +114,7 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "i",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{"_id": "someid", "foo": "bar"}),
Doc: rawBson(t, map[string]interface{}{"_id": "someid", "foo": "bar"}),
},
want: []oplogEntry{{
Timestamp: primitive.Timestamp{T: 1234},
Expand All @@ -141,8 +131,8 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "u",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{"new": "data"}),
Update: rawOplogEntryID{ID: "updateid"},
Doc: rawBson(t, map[string]interface{}{"new": "data"}),
Update: rawBson(t, map[string]interface{}{"_id": "updateid"}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you used rawBson above and mustRaw in the other test cases. Why so? The pattern I notice seems to be that mustRaw is used for the input, and rawBson for the expected result. mustRaw seems weird actually. Other than what rawBson does, it checks if a byte array can be unmarshalled into bson.Raw, but that doesn't seem helpful, because they both have the same underlying type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, mustRaw was already there, and I added rawBson for the other tests, so when I had to update this I used the rawBson, but I see now they're basically the same so I replaced all the mustRaw with rawBson to simplify.

},
want: []oplogEntry{{
Timestamp: primitive.Timestamp{T: 1234},
Expand All @@ -159,7 +149,7 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "d",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{"_id": "someid"}),
Doc: rawBson(t, map[string]interface{}{"_id": "someid"}),
},
want: []oplogEntry{{
Timestamp: primitive.Timestamp{T: 1234},
Expand All @@ -176,7 +166,7 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "c",
Namespace: "foo.$cmd",
Doc: mustRaw(t, map[string]interface{}{"drop": "Foo"}),
Doc: rawBson(t, map[string]interface{}{"drop": "Foo"}),
},
want: nil,
},
Expand All @@ -185,47 +175,51 @@ func TestParseRawOplogEntry(t *testing.T) {
Timestamp: primitive.Timestamp{T: 1234},
Operation: "c",
Namespace: "admin.$cmd",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"applyOps": []rawOplogEntry{
{
Timestamp: primitive.Timestamp{T: 1234},
Operation: "c",
Namespace: "admin.$cmd",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"applyOps": []rawOplogEntry{
{
Operation: "i",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"_id": "id1",
"foo": "baz",
}),
Update: rawBson(t, map[string]interface{}{}),
},
},
}),
Update: rawBson(t, map[string]interface{}{}),
},
{
Operation: "i",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"_id": "id1",
"foo": "bar",
}),
Update: rawBson(t, map[string]interface{}{}),
},
{
Operation: "u",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"foo": "quux",
}),
Update: rawOplogEntryID{"id2"},
Update: rawBson(t, map[string]interface{}{"_id": "id2"}),
},
{
Operation: "d",
Namespace: "foo.Bar",
Doc: mustRaw(t, map[string]interface{}{
Doc: rawBson(t, map[string]interface{}{
"_id": "id3",
}),
Update: rawBson(t, map[string]interface{}{}),
},
},
}),
Expand Down Expand Up @@ -287,7 +281,7 @@ func TestParseRawOplogEntry(t *testing.T) {

for testName, test := range tests {
t.Run(testName, func(t *testing.T) {
got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(test.in, nil)
got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(&test.in, nil)

if diff := pretty.Compare(parseEntry(t, got), parseEntry(t, test.want)); diff != "" {
t.Errorf("Got incorrect result (-got +want)\n%s", diff)
Expand Down
Loading