-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve initial metadata read using bson instead of unmarshal #88
Changes from 4 commits
e60bdd7
4a0adb1
4dae2ad
7101ad4
47dc4fa
da48e3a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,12 +37,10 @@ type Tailer struct { | |
// Raw oplog entry from Mongo | ||
type rawOplogEntry struct { | ||
Timestamp primitive.Timestamp `bson:"ts"` | ||
HistoryID int64 `bson:"h"` | ||
MongoVersion int `bson:"v"` | ||
Operation string `bson:"op"` | ||
Namespace string `bson:"ns"` | ||
Doc bson.Raw `bson:"o"` | ||
Update rawOplogEntryID `bson:"o2"` | ||
Update bson.Raw `bson:"o2"` | ||
} | ||
|
||
// Parsed Cursor Result | ||
|
@@ -52,10 +50,6 @@ type cursorResultStatus struct { | |
DidLosePosition bool | ||
} | ||
|
||
type rawOplogEntryID struct { | ||
ID interface{} `bson:"_id"` | ||
} | ||
|
||
const requeryDuration = time.Second | ||
|
||
var ( | ||
|
@@ -214,7 +208,7 @@ func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOr | |
continue | ||
} | ||
|
||
ts, pubs, sendMetricsData := tailer.unmarshalEntry(rawData, tailer.Denylist, readOrdinal) | ||
ts, pubs, sendMetricsData := tailer.processEntry(rawData, tailer.Denylist, readOrdinal) | ||
|
||
if ts != nil { | ||
lastTimestamp = *ts | ||
|
@@ -366,16 +360,77 @@ func closeCursor(cursor *mongo.Cursor) { | |
} | ||
} | ||
|
||
// unmarshalEntry unmarshals a single entry from the oplog. | ||
// unmarshalEntryMetadata processes the top-level data from an entry and returns a rawOplogEntry object. | ||
func unmarshalEntryMetadata(rawData bson.Raw, denylist *sync.Map) *rawOplogEntry { | ||
var result rawOplogEntry | ||
var ok bool | ||
nsLookup, err := rawData.LookupErr("ns"); | ||
if err == nil { | ||
result.Namespace, ok = nsLookup.StringValueOK() | ||
if !ok { | ||
// this means there was a type mismatch | ||
log.Log.Error("Error unmarshalling oplog namespace entry") | ||
return nil | ||
} | ||
} | ||
|
||
// try to filter early if possible | ||
if len(result.Namespace) > 0 && result.Namespace != "admin.$cmd" { | ||
db, _ := parseNamespace(result.Namespace) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're doing a parse here, since we're not unmarshalling into the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, but it's more complicated because while it would work for the first one, parseRawOplogEntry is recursive for transactions, so it still needs to be done there anyway. And the parse is a pretty lightweight call, so to me this seems ok for the sake of simplicity. |
||
if _, denied := denylist.Load(db); denied { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We check the deny list again in line 420, which is probably not necessary anymore. That check would only do anything if the namespace exists and is parseable so we cover that case here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The difference is that the second check is post-transaction, the way the original behavior was (though still it only checks the first entry in the transaction). But maybe we don't care about transactions for filtering? This might be good to confirm with @alex-goodisman, who originally added the denylist. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I think what you currently have is right. Rereading the parse code, it seems if there’s a command operation, the top level entry’s namespace is just |
||
log.Log.Debugw("Skipping oplog entry", "database", db) | ||
metricOplogEntriesFiltered.WithLabelValues(db).Add(1) | ||
return nil | ||
} | ||
} | ||
|
||
tsLookup, err := rawData.LookupErr("ts") | ||
if err == nil { | ||
t, i, ok := tsLookup.TimestampOK() | ||
if !ok { | ||
log.Log.Error("Error unmarshalling oplog timestamp entry") | ||
return nil | ||
} | ||
result.Timestamp = primitive.Timestamp{T: t, I: i} | ||
} | ||
|
||
opLookup, err := rawData.LookupErr("op") | ||
if err == nil { | ||
result.Operation, ok = opLookup.StringValueOK() | ||
if !ok { | ||
log.Log.Error("Error unmarshalling oplog operation entry") | ||
return nil | ||
} | ||
} | ||
|
||
oLookup, err := rawData.LookupErr("o") | ||
if err == nil { | ||
result.Doc, ok = oLookup.DocumentOK() | ||
if !ok { | ||
log.Log.Error("Error unmarshalling oplog document entry") | ||
return nil | ||
} | ||
} | ||
|
||
o2Lookup, err := rawData.LookupErr("o2") | ||
if err == nil { | ||
result.Update, ok = o2Lookup.DocumentOK() | ||
if !ok { | ||
log.Log.Error("Error unmarshalling oplog update entry") | ||
return nil | ||
} | ||
} | ||
|
||
return &result | ||
} | ||
|
||
// processEntry processes a single entry from the oplog. | ||
// | ||
// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it | ||
// ignored it or failed at some later step. | ||
func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) { | ||
var result rawOplogEntry | ||
|
||
err := bson.Unmarshal(rawData, &result) | ||
if err != nil { | ||
log.Log.Errorw("Error unmarshalling oplog entry", "error", err) | ||
func (tailer *Tailer) processEntry(rawData bson.Raw, denylist *sync.Map, readOrdinal int) (timestamp *primitive.Timestamp, pubs []*redispub.Publication, sendMetricsData func()) { | ||
result := unmarshalEntryMetadata(rawData, denylist) | ||
if result == nil { | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this code, if there is any error in the unmarshal, we just returned here. Now, even if the various lookups or type checks fail, we log a message and continue. Should we just return if any step of this lookup process fails? I'm also curious about when the previous unmarshal used to error. Did it error if the struct field type didn't match the type in the document? Probably. The change I proposed above deals with that. However, did it error if a matching key in the document for the struct wasn't found? I don't think so, and doing my change would mean different behavior in that case. I think looking into when bson unmarshalling errors and making sure behavior is the same as before (splitting the lookup and type check if necessary) is a good move. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, I tested this. If a field was missing or extra, there was no error. Only if a field existed but with a different type would the original unmarshal error. So this tries to mostly follow that pattern, but does error on namespace which would seem to be a required field. However, maybe none are -- I don't know what namespace would be set to in the case of a transaction. In that case, splitting up the lookup and type conversion and exactly matching the old behavior is probably best -- I had started that way, but wanted to avoid it since it makes for terribly messy code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it does get gnarly. Maybe put the conversion from bson to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would work -- I'd also have to pass in the denylist to allow for the early filtering, but that's probably fine |
||
} | ||
|
||
|
@@ -483,8 +538,21 @@ func (tailer *Tailer) getStartTime(maxOrdinal int, getTimestampOfLastOplogEntry | |
return primitive.Timestamp{T: uint32(time.Now().Unix() << 32)} | ||
} | ||
|
||
func parseID(idRaw bson.RawValue) (id interface{}, err error) { | ||
if idRaw.IsZero() { | ||
log.Log.Error("failed to get objectId: _id is empty or not set") | ||
err = errors.New("empty or missing objectId") | ||
return | ||
} | ||
err = idRaw.Unmarshal(&id) | ||
if err != nil { | ||
log.Log.Errorf("failed to unmarshal objectId: %v", err) | ||
} | ||
return | ||
} | ||
|
||
// converts a rawOplogEntry to an oplogEntry | ||
func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []oplogEntry { | ||
func (tailer *Tailer) parseRawOplogEntry(entry *rawOplogEntry, txIdx *uint) []oplogEntry { | ||
if txIdx == nil { | ||
idx := uint(0) | ||
txIdx = &idx | ||
|
@@ -505,19 +573,14 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl | |
|
||
out.Database, out.Collection = parseNamespace(out.Namespace) | ||
|
||
var errID error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: What about calling this just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this is a pattern Yotam likes -- to be clear what each error variable is for, rather than just always |
||
if out.Operation == operationUpdate { | ||
out.DocID = entry.Update.ID | ||
out.DocID, errID = parseID(entry.Update.Lookup("_id")) | ||
} else { | ||
idLookup := entry.Doc.Lookup("_id") | ||
if idLookup.IsZero() { | ||
log.Log.Error("failed to get objectId: _id is empty or not set") | ||
return nil | ||
} | ||
err := idLookup.Unmarshal(&out.DocID) | ||
if err != nil { | ||
log.Log.Errorf("failed to unmarshal objectId: %v", err) | ||
return nil | ||
} | ||
out.DocID, errID = parseID(entry.Doc.Lookup("_id")) | ||
} | ||
if errID != nil { | ||
return nil | ||
} | ||
|
||
return []oplogEntry{out} | ||
|
@@ -540,7 +603,7 @@ func (tailer *Tailer) parseRawOplogEntry(entry rawOplogEntry, txIdx *uint) []opl | |
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think it's worth it to replace the bson unmarshalling in the transaction case as well? We'd probably have to add a new function since we're now looking at a document array. I don't think this is necessary currently, but I just thought I'd mention it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right -- I actually was intending to do this when I split off the function and then forgot after the long weekend |
||
for _, v := range txData.ApplyOps { | ||
v.Timestamp = entry.Timestamp | ||
ret = append(ret, tailer.parseRawOplogEntry(v, txIdx)...) | ||
ret = append(ret, tailer.parseRawOplogEntry(&v, txIdx)...) | ||
} | ||
|
||
return ret | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -104,16 +104,6 @@ func TestGetStartTime(t *testing.T) { | |
} | ||
} | ||
|
||
func mustRaw(t *testing.T, data interface{}) bson.Raw { | ||
b, err := bson.Marshal(data) | ||
require.NoError(t, err) | ||
|
||
var raw bson.Raw | ||
require.NoError(t, bson.Unmarshal(b, &raw)) | ||
|
||
return raw | ||
} | ||
|
||
func TestParseRawOplogEntry(t *testing.T) { | ||
tests := map[string]struct { | ||
in rawOplogEntry | ||
|
@@ -124,7 +114,7 @@ func TestParseRawOplogEntry(t *testing.T) { | |
Timestamp: primitive.Timestamp{T: 1234}, | ||
Operation: "i", | ||
Namespace: "foo.Bar", | ||
Doc: mustRaw(t, map[string]interface{}{"_id": "someid", "foo": "bar"}), | ||
Doc: rawBson(t, map[string]interface{}{"_id": "someid", "foo": "bar"}), | ||
}, | ||
want: []oplogEntry{{ | ||
Timestamp: primitive.Timestamp{T: 1234}, | ||
|
@@ -141,8 +131,8 @@ func TestParseRawOplogEntry(t *testing.T) { | |
Timestamp: primitive.Timestamp{T: 1234}, | ||
Operation: "u", | ||
Namespace: "foo.Bar", | ||
Doc: mustRaw(t, map[string]interface{}{"new": "data"}), | ||
Update: rawOplogEntryID{ID: "updateid"}, | ||
Doc: rawBson(t, map[string]interface{}{"new": "data"}), | ||
Update: rawBson(t, map[string]interface{}{"_id": "updateid"}), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see you used There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, mustRaw was already there, and I added rawBson for the other tests, so when I had to update this I used the rawBson, but I see now they're basically the same so I replaced all the mustRaw with rawBson to simplify. |
||
}, | ||
want: []oplogEntry{{ | ||
Timestamp: primitive.Timestamp{T: 1234}, | ||
|
@@ -159,7 +149,7 @@ func TestParseRawOplogEntry(t *testing.T) { | |
Timestamp: primitive.Timestamp{T: 1234}, | ||
Operation: "d", | ||
Namespace: "foo.Bar", | ||
Doc: mustRaw(t, map[string]interface{}{"_id": "someid"}), | ||
Doc: rawBson(t, map[string]interface{}{"_id": "someid"}), | ||
}, | ||
want: []oplogEntry{{ | ||
Timestamp: primitive.Timestamp{T: 1234}, | ||
|
@@ -176,7 +166,7 @@ func TestParseRawOplogEntry(t *testing.T) { | |
Timestamp: primitive.Timestamp{T: 1234}, | ||
Operation: "c", | ||
Namespace: "foo.$cmd", | ||
Doc: mustRaw(t, map[string]interface{}{"drop": "Foo"}), | ||
Doc: rawBson(t, map[string]interface{}{"drop": "Foo"}), | ||
}, | ||
want: nil, | ||
}, | ||
|
@@ -185,47 +175,51 @@ func TestParseRawOplogEntry(t *testing.T) { | |
Timestamp: primitive.Timestamp{T: 1234}, | ||
Operation: "c", | ||
Namespace: "admin.$cmd", | ||
Doc: mustRaw(t, map[string]interface{}{ | ||
Doc: rawBson(t, map[string]interface{}{ | ||
"applyOps": []rawOplogEntry{ | ||
{ | ||
Timestamp: primitive.Timestamp{T: 1234}, | ||
Operation: "c", | ||
Namespace: "admin.$cmd", | ||
Doc: mustRaw(t, map[string]interface{}{ | ||
Doc: rawBson(t, map[string]interface{}{ | ||
"applyOps": []rawOplogEntry{ | ||
{ | ||
Operation: "i", | ||
Namespace: "foo.Bar", | ||
Doc: mustRaw(t, map[string]interface{}{ | ||
Doc: rawBson(t, map[string]interface{}{ | ||
"_id": "id1", | ||
"foo": "baz", | ||
}), | ||
Update: rawBson(t, map[string]interface{}{}), | ||
}, | ||
}, | ||
}), | ||
Update: rawBson(t, map[string]interface{}{}), | ||
}, | ||
{ | ||
Operation: "i", | ||
Namespace: "foo.Bar", | ||
Doc: mustRaw(t, map[string]interface{}{ | ||
Doc: rawBson(t, map[string]interface{}{ | ||
"_id": "id1", | ||
"foo": "bar", | ||
}), | ||
Update: rawBson(t, map[string]interface{}{}), | ||
}, | ||
{ | ||
Operation: "u", | ||
Namespace: "foo.Bar", | ||
Doc: mustRaw(t, map[string]interface{}{ | ||
Doc: rawBson(t, map[string]interface{}{ | ||
"foo": "quux", | ||
}), | ||
Update: rawOplogEntryID{"id2"}, | ||
Update: rawBson(t, map[string]interface{}{"_id": "id2"}), | ||
}, | ||
{ | ||
Operation: "d", | ||
Namespace: "foo.Bar", | ||
Doc: mustRaw(t, map[string]interface{}{ | ||
Doc: rawBson(t, map[string]interface{}{ | ||
"_id": "id3", | ||
}), | ||
Update: rawBson(t, map[string]interface{}{}), | ||
}, | ||
}, | ||
}), | ||
|
@@ -287,7 +281,7 @@ func TestParseRawOplogEntry(t *testing.T) { | |
|
||
for testName, test := range tests { | ||
t.Run(testName, func(t *testing.T) { | ||
got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(test.in, nil) | ||
got := (&Tailer{Denylist: &sync.Map{}}).parseRawOplogEntry(&test.in, nil) | ||
|
||
if diff := pretty.Compare(parseEntry(t, got), parseEntry(t, test.want)); diff != "" { | ||
t.Errorf("Got incorrect result (-got +want)\n%s", diff) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nice to say a bit more about the performance issue with the usual unmarshalling, explaining why we need to do all these manual lookups.