Skip to content

Commit

Permalink
fix double counting acks
Browse files Browse the repository at this point in the history
  • Loading branch information
alecgibson committed Jan 29, 2025
1 parent 3881cb4 commit 241863e
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 22 deletions.
25 changes: 20 additions & 5 deletions lib/client/doc.js
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ Doc.prototype._opAcknowledged = function(message) {
// The op was committed successfully. Increment the version number
this.version++;

this._clearInflight();
this._clearInflight(null, message.seq);
};

Doc.prototype._rollback = function(err) {
Expand Down Expand Up @@ -1051,6 +1051,7 @@ Doc.prototype._hardRollback = function(err) {
// Cancel all pending ops and reset if we can't invert
this._setType(null);
this.version = null;
// TODO: Clear transaction
this.inflightOp = null;
this.pendingOps = [];

Expand Down Expand Up @@ -1107,15 +1108,29 @@ Doc.prototype._hasInflight = function() {
return !!(this.inflightOp || this._transaction);
};

Doc.prototype._clearInflight = function(err) {
Doc.prototype._clearInflight = function(err, seq) {
var callbacks = [];
if (this.inflightOp) callbacks = this._clearInflightOp();
else if (this._transaction) callbacks = this._clearTransaction();
else if (this._transaction) {
if (!seq) {
callbacks = this._clearTransaction();
} else {
var i = this.pendingOps.findIndex(function(pendingOp) {
return pendingOp.seq === seq;
});
if (i >= 0) {
const op = this.pendingOps.splice(i, 1)[0];
callbacks = callbacks.concat(op.callbacks);
}
}
}

var called = util.callEach(callbacks, err);

this.flush();
this._emitNothingPending();
if (!this._transaction) {
this.flush();
this._emitNothingPending();
}

if (err && !called) return this.emit('error', err);
};
Expand Down
5 changes: 0 additions & 5 deletions lib/submit-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ SubmitRequest.prototype.submit = function(callback) {
var collection = this.collection;
var id = this.id;
var op = this.op;
console.log('submit', op);
// Send a special projection so that getSnapshot knows to return all fields.
// With a null projection, it strips document metadata
var fields = {$submit: true};
Expand All @@ -109,7 +108,6 @@ SubmitRequest.prototype.submit = function(callback) {
getSnapshot = function(collection, id, fields, snapshotOptions, callback) {
transaction.getSnapshotAndOps(request, function(error, snapshot, ops) {
if (!snapshot) return getSnapshotFromDb(collection, id, fields, snapshotOptions, callback);
console.log('>> got ops', request.op, ops);
var type = snapshot.type;
// TODO: Use this._transformOp()? Get a version mismatch, and don't want ops on this.ops though
for (var op of ops) {
Expand All @@ -122,7 +120,6 @@ SubmitRequest.prototype.submit = function(callback) {
}

getSnapshot(collection, id, fields, snapshotOptions, function(err, snapshot) {
console.log('got snapshot', request.op, snapshot);
if (err) return callback(err);

request.snapshot = snapshot;
Expand Down Expand Up @@ -170,7 +167,6 @@ SubmitRequest.prototype.submit = function(callback) {

// Transform the op up to the current snapshot version, then apply
var from = op.v;
console.log('get ops to snapshot', from, snapshot.v);
backend.db.getOpsToSnapshot(collection, id, from, snapshot, {metadata: true}, function(err, ops) {
if (err) return callback(err);

Expand Down Expand Up @@ -298,7 +294,6 @@ SubmitRequest.prototype.retry = function(callback) {
};

SubmitRequest.prototype._transformOp = function(ops) {
console.log('transform op', this.op, ops);
var type = this.snapshot.type;
for (var i = 0; i < ops.length; i++) {
var op = ops[i];
Expand Down
15 changes: 4 additions & 11 deletions lib/transaction/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ Transaction.prototype.submit = function(callback) {
// TODO: Handle multiple calls?
this._callback = callback;

for (var collection in this._docOps) {
for (var id in this._docOps[collection]) {
for (var op of this._docOps[collection][id]) {
this._submitOp(op);
}
}
for (var op of this._ops) {
this._submitOp(op);
}
};

Expand All @@ -51,7 +47,6 @@ Transaction.prototype.ready = function(request, callback) {
docRequests.push(request);
this._requestCallbacks.push(callback);

console.log('> REQ READY', request.op, request.snapshot);
this.emit('requestReady', request);

if (this._isReady()) return this._commitTransaction();
Expand All @@ -64,19 +59,18 @@ Transaction.prototype.getSnapshotAndOps = function(request, callback) {
this._waitForPreviousOpRequest(op, function(req) {
if (!req) return callback();
var versionDiff = req.snapshot.v - request.op.v;
var ops = req.ops.slice(-versionDiff);
var ops = util.clone(req.ops.slice(-versionDiff));
if (ops.length) {
var offset = request.op.v - ops[0].v;
for (var op of ops) {
op.v = op.v + offset;
}
}
callback(null, util.clone(req.snapshot), util.clone(ops));
callback(null, util.clone(req.snapshot), ops);
});
};

Transaction.prototype._waitForPreviousOpRequest = function(op, callback) {
console.log('wait for previous', op);
var collection = op.c;
var id = op.d;

Expand All @@ -87,7 +81,6 @@ Transaction.prototype._waitForPreviousOpRequest = function(op, callback) {
previousOp = docOp;
}

console.log('previous op', previousOp);
if (!previousOp) return callback();

var requests = util.dig(this._readyRequests, collection, id) || [];
Expand Down
2 changes: 1 addition & 1 deletion test/client/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ module.exports = function() {
], done);
});

it.only('transaction is behind remote', function(done) {
it('transaction is behind remote', function(done) {
async.series([
doc.create.bind(doc, {tricks: ['fetch']}),
remoteDoc.fetch.bind(remoteDoc),
Expand Down

0 comments on commit 241863e

Please sign in to comment.