From 28f50350efb5ae20f28cffb3e42e7d99c1ee08d4 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Tue, 14 Jan 2025 17:57:45 +0000 Subject: [PATCH 1/7] wip --- lib/agent.js | 23 +++++++-- lib/backend.js | 4 ++ lib/client/connection.js | 26 ++++++++++ lib/client/doc.js | 8 ++- lib/client/transaction.js | 41 +++++++++++++++ lib/db/memory.js | 61 +++++++++++++++++++--- lib/error.js | 1 + lib/message-actions.js | 2 + lib/submit-request.js | 64 ++++++++++++++++++------ lib/transaction/aborted-transaction.js | 36 +++++++++++++ lib/transaction/committed-transaction.js | 23 +++++++++ lib/transaction/pending-transaction.js | 34 +++++++++++++ lib/transaction/transaction.js | 48 ++++++++++++++++++ test/client/transaction.js | 56 +++++++++++++++++++++ test/db-memory.js | 3 +- test/db.js | 4 ++ 16 files changed, 409 insertions(+), 25 deletions(-) create mode 100644 lib/client/transaction.js create mode 100644 lib/transaction/aborted-transaction.js create mode 100644 lib/transaction/committed-transaction.js create mode 100644 lib/transaction/pending-transaction.js create mode 100644 lib/transaction/transaction.js create mode 100644 test/client/transaction.js diff --git a/lib/agent.js b/lib/agent.js index a07650c8a..65a5bb778 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -70,6 +70,8 @@ function Agent(backend, stream) { this._firstReceivedMessage = null; this._handshakeReceived = false; + this._transactions = Object.create(null); + // Send the legacy message to initialize old clients with the random agent Id this.send(this._initMessage(ACTIONS.initLegacy)); } @@ -470,7 +472,7 @@ Agent.prototype._handleMessage = function(request, callback) { )); } if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message')); - return this._submit(request.c, request.d, op, callback); + return this._submit(request.c, request.d, op, request.t, callback); case ACTIONS.snapshotFetch: return this._fetchSnapshot(request.c, request.d, request.v, callback); case ACTIONS.snapshotFetchByTimestamp: @@ -494,6 +496,8 @@ Agent.prototype._handleMessage = function(request, callback) { return this._requestPresence(request.ch, callback); case ACTIONS.pingPong: return this._pingPong(callback); + case ACTIONS.transactionCommit: + return this._commitTransaction(request, callback); default: callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')); } @@ -761,9 +765,12 @@ Agent.prototype._unsubscribeBulk = function(collection, ids, callback) { util.nextTick(callback); }; -Agent.prototype._submit = function(collection, id, op, callback) { +Agent.prototype._submit = function(collection, id, op, transactionId, callback) { var agent = this; - this.backend.submit(this, collection, id, op, null, function(err, ops, request) { + var options = { + transaction: transactionId + }; + this.backend.submit(this, collection, id, op, options, function(err, ops, request) { // Message to acknowledge the op was successfully submitted var ack = {src: op.src, seq: op.seq, v: op.v}; if (request._fixupOps.length) ack[ACTIONS.fixup] = request._fixupOps; @@ -993,6 +1000,16 @@ function createClientOp(request, clientId) { undefined; } +Agent.prototype._commitTransaction = function(request, callback) { + var transaction = this._transactions[request.id]; + transaction.commit(callback); +}; + +Agent.prototype._abortTransaction = function(request, callback) { + var transaction = this._transactions[request.id]; + transaction.abort(callback); +}; + function shallowCopy(object) { var out = {}; for (var key in object) { diff --git a/lib/backend.js b/lib/backend.js index dcc95aa33..b760ca6d1 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -931,6 +931,10 @@ Backend.prototype.transformPresenceToLatestVersion = function(agent, presence, c }); }; +Backend.prototype._commitTransaction = function(id, seqs, callback) { + +}; + function pluckIds(snapshots) { var ids = []; for (var i = 0; i < snapshots.length; i++) { diff --git a/lib/client/connection.js b/lib/client/connection.js index da89dfef5..cceb3de70 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -12,6 +12,7 @@ var util = require('../util'); var logger = require('../logger'); var DocPresenceEmitter = require('./presence/doc-presence-emitter'); var protocol = require('../protocol'); +var Transaction = require('./transaction'); var ERROR_CODE = ShareDBError.CODES; @@ -64,6 +65,8 @@ function Connection(socket) { // A unique message number for presence this._presenceSeq = 1; + this._transactions = Object.create(null); + // Equals agent.src on the server this.id = null; @@ -258,6 +261,8 @@ Connection.prototype.handleMessage = function(message) { return this._handlePresenceRequest(err, message); case ACTIONS.pingPong: return this._handlePingPong(err); + case ACTIONS.transactionCommit: + return this._handleTransactionCommit(err, message); default: logger.warn('Ignoring unrecognized message', message); @@ -467,6 +472,7 @@ Connection.prototype.sendOp = function(doc, op) { if (op.create) message.create = op.create; if (op.del) message.del = op.del; if (doc.submitSource) message.x.source = op.source; + if (op.transaction) message.t = op.transaction; this.send(message); }; @@ -782,6 +788,26 @@ Connection.prototype._initialize = function(message) { this._setState('connected'); }; +Connection.prototype.startTransaction = function() { + var transaction = new Transaction(this); + return this._transactions[transaction.id] = transaction; +}; + +Connection.prototype._commitTransaction = function(transaction) { + this.send({a: ACTIONS.transactionCommit, id: transaction.id}); +}; + +Connection.prototype._abortTransaction = function(transaction) { + this.send({a: ACTIONS.transactionAbort, id: transaction.id}); +}; + +Connection.prototype._handleTransactionCommit = function(error, message) { + var transaction = this._transactions[message.id]; + if (!transaction) return; + transaction._handleCommit(error, message); + delete this._transactions[message.id]; +}; + Connection.prototype.getPresence = function(channel) { var connection = this; var presence = util.digOrCreate(this._presences, channel, function() { diff --git a/lib/client/doc.js b/lib/client/doc.js index acb457374..56ab419e9 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -763,6 +763,7 @@ Doc.prototype._submit = function(op, source, callback) { this._pushOp(op, source, callback); this._otApply(op, source); } catch (error) { + // TODO: Abort transaction return this._hardRollback(error); } @@ -865,8 +866,13 @@ Doc.prototype.submitOp = function(component, options, callback) { callback = options; options = null; } + options = options || {}; var op = {op: component}; - var source = options && options.source; + if (options.transaction) { + // TODO: Allow passing just ID + op.transaction = options.transaction.id; + } + var source = options.source; this._submit(op, source, callback); }; diff --git a/lib/client/transaction.js b/lib/client/transaction.js new file mode 100644 index 000000000..5d0704e66 --- /dev/null +++ b/lib/client/transaction.js @@ -0,0 +1,41 @@ +var emitter = require('../emitter'); + +var idCounter = 1; + +var STATE = { + pending: 'pending', + committing: 'committing', + aborting: 'aborting', + committed: 'committed', + aborted: 'aborted' +} + +module.exports = Transaction; +function Transaction(connection) { + emitter.EventEmitter.call(this); + + this.connection = connection; + this.id = (idCounter++).toString(); + + this._callback = null; + this._state = STATE.pending; +} +emitter.mixin(Transaction); + +Transaction.prototype.commit = function(callback) { + // TODO: Catch multiple calls + // TODO: Handle network changes + this._state = STATE.committing; + this._callback = callback; + this.connection._commitTransaction(this); +}; + +Transaction.prototype.abort = function(callback) { + this._state = STATE.aborting; + this._callback = callback; +}; + +Transaction.prototype._handleCommit = function(error, message) { + if (error) return this._callback(error); + this._callback(); +}; diff --git a/lib/db/memory.js b/lib/db/memory.js index 2db274d6e..847751eef 100644 --- a/lib/db/memory.js +++ b/lib/db/memory.js @@ -25,6 +25,8 @@ function MemoryDB(options) { this.ops = Object.create(null); this.closed = false; + + this._transactions = Object.create(null); }; module.exports = MemoryDB; @@ -39,6 +41,7 @@ MemoryDB.prototype.close = function(callback) { // callback(err, succeeded) MemoryDB.prototype.commit = function(collection, id, op, snapshot, options, callback) { var db = this; + options = options || {}; if (typeof callback !== 'function') throw new Error('Callback required'); util.nextTick(function() { var version = db._getVersionSync(collection, id); @@ -46,13 +49,21 @@ MemoryDB.prototype.commit = function(collection, id, op, snapshot, options, call var succeeded = false; return callback(null, succeeded); } - var err = db._writeOpSync(collection, id, op); - if (err) return callback(err); - err = db._writeSnapshotSync(collection, id, snapshot); - if (err) return callback(err); - var succeeded = true; - callback(null, succeeded); + var commit = function() { + var err = db._writeOpSync(collection, id, op); + if (err) return err; + err = db._writeSnapshotSync(collection, id, snapshot); + if (err) return err; + }; + + var error; + if (!options.transaction) error = commit(); + callback(error, !error); + + if (!options.transaction) return; + var transaction = db._getTransaction(options.transaction); + transaction.commits.push(commit); }); }; @@ -133,6 +144,38 @@ MemoryDB.prototype.query = function(collection, query, fields, options, callback }); }; +MemoryDB.prototype.commitTransaction = function(id, callback) { + var transaction = this._transactions[id]; + if (!transaction) { + throw new Error('No transaction found'); + } + + delete this._transactions[id]; + // transaction.callbacks.push(callback); + + // Heavy-handed but effective approach to synchronous transaction writing: + // let's just save the DB state before and restore if the transaction fails + var docs = JSON.stringify(this.docs); + var ops = JSON.stringify(this.ops); + + for (var commit of transaction.commits) { + var error = commit(); + + if (error) { + this.docs = JSON.parse(docs); + this.ops = JSON.parse(ops); + break; + } + } + + callback(error, !error); +}; + +MemoryDB.prototype.abortTransaction = function(id, callback) { + delete this._transactions[id]; + callback(null); +}; + // For testing, it may be useful to implement the desired query // language by defining this function. Returns an object with // two properties: @@ -187,3 +230,9 @@ MemoryDB.prototype._getVersionSync = function(collection, id) { var collectionOps = this.ops[collection]; return (collectionOps && collectionOps[id] && collectionOps[id].length) || 0; }; + +MemoryDB.prototype._getTransaction = function(id) { + var transaction = this._transactions[id] = this._transactions[id] || {}; + transaction.commits = transaction.commits || []; + return transaction; +} diff --git a/lib/error.js b/lib/error.js index cde95bdbe..3c2269b32 100644 --- a/lib/error.js +++ b/lib/error.js @@ -72,6 +72,7 @@ ShareDBError.CODES = { */ ERR_SNAPSHOT_READS_REJECTED: 'ERR_SNAPSHOT_READS_REJECTED', ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND: 'ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND', + ERR_TRANSACTION_ABORTED: 'ERR_TRANSACTION_ABORTED', ERR_TYPE_CANNOT_BE_PROJECTED: 'ERR_TYPE_CANNOT_BE_PROJECTED', ERR_TYPE_DOES_NOT_SUPPORT_COMPOSE: 'ERR_TYPE_DOES_NOT_SUPPORT_COMPOSE', ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE: 'ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE', diff --git a/lib/message-actions.js b/lib/message-actions.js index e1a8e9942..ab3ca59f5 100644 --- a/lib/message-actions.js +++ b/lib/message-actions.js @@ -15,6 +15,8 @@ exports.ACTIONS = { op: 'op', snapshotFetch: 'nf', snapshotFetchByTimestamp: 'nt', + transactionAbort: 'ta', + transactionCommit: 'tc', pingPong: 'pp', presence: 'p', presenceSubscribe: 'ps', diff --git a/lib/submit-request.js b/lib/submit-request.js index 8e6715123..22ea1a67f 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -3,6 +3,8 @@ var projections = require('./projections'); var ShareDBError = require('./error'); var types = require('./types'); var protocol = require('./protocol'); +var util = require('./util'); +var Transaction = require('./transaction/transaction'); var ERROR_CODE = ShareDBError.CODES; @@ -16,7 +18,7 @@ function SubmitRequest(backend, agent, index, id, op, options) { this.collection = (projection) ? projection.target : index; this.id = id; this.op = op; - this.options = options; + this.options = options || {}; this.extra = op.x; delete op.x; @@ -43,6 +45,14 @@ function SubmitRequest(backend, agent, index, id, op, options) { this.ops = []; this.channels = null; this._fixupOps = []; + + this._succeeded = false; + if (this.options.transaction) { + this.transaction = util.digOrCreate(agent._transactions, options.transaction, function() { + return new Transaction(agent, options.transaction); + }); + this.transaction.registerSubmitRequest(this); + } } module.exports = SubmitRequest; @@ -93,11 +103,29 @@ SubmitRequest.prototype.submit = function(callback) { // With a null projection, it strips document metadata var fields = {$submit: true}; + if (request.transaction) { + var originalCallback = callback; + callback = function(error) { + request.transaction.abort(); + originalCallback(error); + } + } + var snapshotOptions = {}; snapshotOptions.agentCustom = request.agent.custom; backend.db.getSnapshot(collection, id, fields, snapshotOptions, function(err, snapshot) { if (err) return callback(err); + if (request.transaction) { + // Get other ops in the same transaction that have not yet been committed to the DB + const ops = request.transaction.pendingOps(collection, id).filter(function(o) { + return o.v < op.v; + }) + // TODO: Handle remote ops + var error = ot.applyOps(snapshot, ops); + if (error) return callback(error); + } + request.snapshot = snapshot; request._addSnapshotMeta(); @@ -225,30 +253,38 @@ SubmitRequest.prototype.commit = function(callback) { request.snapshot, request.options, function(err, succeeded) { + // TODO: Should this callback be called before committing a transaction? if (err) return callback(err); if (!succeeded) { // Between our fetch and our call to commit, another client committed an // operation. We expect this to be relatively infrequent but normal. return request.retry(callback); } - if (!request.suppressPublish) { - var op = request.op; - op.c = request.collection; - op.d = request.id; - op.m = undefined; - // Needed for agent to detect if it can ignore sending the op back to - // the client that submitted it in subscriptions - if (request.collection !== request.index) op.i = request.index; - backend.pubsub.publish(request.channels, op); - } - if (request._shouldSaveMilestoneSnapshot(request.snapshot)) { - request.backend.milestoneDb.saveMilestoneSnapshot(request.collection, request.snapshot); - } + request._succeeded = !!succeeded; + if (request.transaction) request.transaction.update(); + else request.publish(); callback(); }); }); }; +SubmitRequest.prototype.publish = function() { + if (!this.suppressPublish) { + var op = this.op; + op.c = this.collection; + op.d = this.id; + op.m = undefined; + // Needed for agent to detect if it can ignore sending the op back to + // the client that submitted it in subscriptions + if (this.collection !== this.index) op.i = this.index; + this.backend.pubsub.publish(this.channels, op); + } + + if (this._shouldSaveMilestoneSnapshot(this.snapshot)) { + this.backend.milestoneDb.saveMilestoneSnapshot(this.collection, this.snapshot); + } +}; + SubmitRequest.prototype.retry = function(callback) { this.retries++; if (this.maxRetries != null && this.retries > this.maxRetries) { diff --git a/lib/transaction/aborted-transaction.js b/lib/transaction/aborted-transaction.js new file mode 100644 index 000000000..b57ef8434 --- /dev/null +++ b/lib/transaction/aborted-transaction.js @@ -0,0 +1,36 @@ +var ShareDBError = require('../error'); + +var ERROR_CODE = ShareDBError.CODES; + +function AbortedTransaction(transaction) { + this._transaction = transaction; + + var self = this; + transaction.backend.db.abortTransaction(this.id, function(abortError) { + self._abortCallbacks(abortError && abortError.message); + }); +} +module.exports = AbortedTransaction; + +AbortedTransaction.prototype.commit = function() { + this.update(); +}; + +AbortedTransaction.prototype.abort = function() { + this.update(); +}; + +AbortedTransaction.prototype.registerSubmitRequest = function() { + // TODO: Error? +}; + +AbortedTransaction.prototype.update = function() { + this._abortCallbacks(); +}; + +AbortedTransaction.prototype._abortCallbacks = function(message) { + message = message || 'Transaction aborted'; + var error = new ShareDBError(ERROR_CODE.ERR_TRANSACTION_ABORTED, message); + this._transaction._callAndClearCallbacks(error); +}; + diff --git a/lib/transaction/committed-transaction.js b/lib/transaction/committed-transaction.js new file mode 100644 index 000000000..b7862eaed --- /dev/null +++ b/lib/transaction/committed-transaction.js @@ -0,0 +1,23 @@ +function CommittedTransaction(transaction) { + this._transaction = transaction; + // TODO: Check DB support + transaction.backend.db.commitTransaction(this.id, function(error) { + transaction._callAndClearCallbacks(error); + }); + // TODO: call request.publish() on transaction requests +} +module.exports = CommittedTransaction; + +CommittedTransaction.prototype.commit = function() { + // TODO: Error? +}; + +CommittedTransaction.prototype.abort = function() { + // TODO: Error? +}; + +CommittedTransaction.prototype.registerSubmitRequest = function() { + // TODO: Error? +}; + +CommittedTransaction.prototype.update = function() {}; diff --git a/lib/transaction/pending-transaction.js b/lib/transaction/pending-transaction.js new file mode 100644 index 000000000..8797769ab --- /dev/null +++ b/lib/transaction/pending-transaction.js @@ -0,0 +1,34 @@ +const AbortedTransaction = require("./aborted-transaction"); +const CommittedTransaction = require("./committed-transaction"); + +function PendingTransaction(transaction) { + this._transaction = transaction; + this._wantsCommit = false; +} +module.exports = PendingTransaction; + +PendingTransaction.prototype.commit = function() { + this._wantsCommit = true; + this.update(); +}; + +PendingTransaction.prototype.abort = function() { + this._transaction._state = new AbortedTransaction(this._transaction); +}; + +PendingTransaction.prototype.registerSubmitRequest = function(request) { + this._transaction._requests.push(request); + this.update(); +}; + +PendingTransaction.prototype.update = function() { + if (!this._shouldCommit()) return; + this._transaction._state = new CommittedTransaction(this._transaction); +}; + +PendingTransaction.prototype._shouldCommit = function() { + if (!this._wantsCommit) return false; + return this._transaction._requests.every(function(request) { + return request._succeeded; + }); +}; diff --git a/lib/transaction/transaction.js b/lib/transaction/transaction.js new file mode 100644 index 000000000..b31320911 --- /dev/null +++ b/lib/transaction/transaction.js @@ -0,0 +1,48 @@ +var PendingTransaction = require('./pending-transaction'); + +function Transaction(agent, id) { + this.agent = agent; + this.backend = agent.backend; + this.id = id; + + this._state = new PendingTransaction(this); + this._requests = []; + this._callbacks = []; +} +module.exports = Transaction; + +Transaction.prototype.commit = function(callback) { + this._callbacks.push(callback); + this._state.commit(); +}; + +Transaction.prototype.abort = function(callback) { + this._callbacks.push(callback); + this._state.abort(); +}; + +Transaction.prototype.registerSubmitRequest = function(request) { + this._state.registerSubmitRequest(request); +}; + +Transaction.prototype.update = function() { + this._state.update(); +}; + +Transaction.prototype.pendingOps = function(collection, id) { + var ops = []; + for (var request of this._requests) { + if (request.collection === collection && request.id === id) { + ops.push(request.op); + } + } + return ops; +}; + +Transaction.prototype._callAndClearCallbacks = function(error) { + var callbacks = this._callbacks; + this._callbacks = []; + for (var callback of callbacks) { + if (typeof callback === 'function') callback(error); + } +}; diff --git a/test/client/transaction.js b/test/client/transaction.js new file mode 100644 index 000000000..9037ad279 --- /dev/null +++ b/test/client/transaction.js @@ -0,0 +1,56 @@ +var async = require('async'); +var expect = require('chai').expect; + +module.exports = function() { + describe.only('transaction', function() { + describe('single transaction', function() { + var backend; + var connection; + + beforeEach(function() { + backend = this.backend; + connection = backend.connect(); + }); + + it('does not commit the first op if the second op fails', function(done) { + var doc = connection.get('dogs', 'gaspode'); + // Disable composing to force two submissions + doc.preventCompose = true; + + backend.use('commit', function(request, next) { + if (!request.snapshot.data.tricks) return next(); + next(new Error('fail')); + }); + + var transaction = connection.startTransaction(); + + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), + function(next) { + doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, function(error) { + expect(error.message).to.equal('fail'); + next(); + }); + }, + function(next) { + transaction.commit(function(error) { + expect(error.code).to.equal('ERR_TRANSACTION_ABORTED'); + next(); + }); + }, + // TODO: Assert hard rollback + doc.destroy.bind(doc), + function(next) { + doc = connection.get('dogs', 'gaspode'); + doc.fetch(next); + }, + function(next) { + expect(doc.data).to.eql({name: 'Gaspode'}); + next(); + } + ], done); + }); + }); + }); +} diff --git a/test/db-memory.js b/test/db-memory.js index 8c4774418..2a6febc58 100644 --- a/test/db-memory.js +++ b/test/db-memory.js @@ -69,7 +69,8 @@ describe('MemoryDB', function() { }, getQuery: function(options) { return {filter: options.query, sort: options.sort}; - } + }, + transactions: true }); describe('deleteOps', function() { diff --git a/test/db.js b/test/db.js index 5fc8a293d..16135a0e6 100644 --- a/test/db.js +++ b/test/db.js @@ -64,6 +64,10 @@ module.exports = function(options) { require('./client/projections')({getQuery: options.getQuery}); } + if (options.transactions) { + require('./client/transaction')(); + } + require('./client/submit')(); require('./client/submit-json1')(); require('./client/subscribe')(); From 68f3de8c906637520ecdd3fc9d0d0fefa96878cb Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Wed, 15 Jan 2025 11:40:58 +0000 Subject: [PATCH 2/7] add happy case --- lib/db/memory.js | 13 ++- lib/submit-request.js | 2 +- lib/transaction/committed-transaction.js | 2 +- test/client/transaction.js | 104 +++++++++++++---------- 4 files changed, 71 insertions(+), 50 deletions(-) diff --git a/lib/db/memory.js b/lib/db/memory.js index 847751eef..3e0a8ce5f 100644 --- a/lib/db/memory.js +++ b/lib/db/memory.js @@ -45,6 +45,13 @@ MemoryDB.prototype.commit = function(collection, id, op, snapshot, options, call if (typeof callback !== 'function') throw new Error('Callback required'); util.nextTick(function() { var version = db._getVersionSync(collection, id); + + var transaction; + if (options.transaction) { + transaction = db._getTransaction(options.transaction); + version = version + transaction.commits.length; + } + if (snapshot.v !== version + 1) { var succeeded = false; return callback(null, succeeded); @@ -58,12 +65,10 @@ MemoryDB.prototype.commit = function(collection, id, op, snapshot, options, call }; var error; - if (!options.transaction) error = commit(); + if (!transaction) error = commit(); callback(error, !error); - if (!options.transaction) return; - var transaction = db._getTransaction(options.transaction); - transaction.commits.push(commit); + if (transaction) transaction.commits.push(commit); }); }; diff --git a/lib/submit-request.js b/lib/submit-request.js index 22ea1a67f..d584897bb 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -106,7 +106,7 @@ SubmitRequest.prototype.submit = function(callback) { if (request.transaction) { var originalCallback = callback; callback = function(error) { - request.transaction.abort(); + if (error) request.transaction.abort(); originalCallback(error); } } diff --git a/lib/transaction/committed-transaction.js b/lib/transaction/committed-transaction.js index b7862eaed..92850492c 100644 --- a/lib/transaction/committed-transaction.js +++ b/lib/transaction/committed-transaction.js @@ -1,7 +1,7 @@ function CommittedTransaction(transaction) { this._transaction = transaction; // TODO: Check DB support - transaction.backend.db.commitTransaction(this.id, function(error) { + transaction.backend.db.commitTransaction(this._transaction.id, function(error) { transaction._callAndClearCallbacks(error); }); // TODO: call request.publish() on transaction requests diff --git a/test/client/transaction.js b/test/client/transaction.js index 9037ad279..ac4eaab04 100644 --- a/test/client/transaction.js +++ b/test/client/transaction.js @@ -3,54 +3,70 @@ var expect = require('chai').expect; module.exports = function() { describe.only('transaction', function() { - describe('single transaction', function() { - var backend; - var connection; + var backend; + var connection; - beforeEach(function() { - backend = this.backend; - connection = backend.connect(); + beforeEach(function() { + backend = this.backend; + connection = backend.connect(); + }); + + it('commits two ops as a transaction', function(done) { + var doc = connection.get('dogs', 'gaspode'); + var remoteDoc = backend.connect().get('dogs', 'gaspode'); + var transaction = connection.startTransaction(); + + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), + doc.submitOp.bind(doc, [{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}), + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({ + name: 'Gaspode', + age: 3, + tricks: ['fetch'] + }); + next(); + } + ], done); + }); + + it('does not commit the first op if the second op fails', function(done) { + var doc = connection.get('dogs', 'gaspode'); + var remoteDoc = backend.connect().get('dogs', 'gaspode'); + + backend.use('commit', function(request, next) { + if (!request.snapshot.data.tricks) return next(); + next(new Error('fail')); }); - it('does not commit the first op if the second op fails', function(done) { - var doc = connection.get('dogs', 'gaspode'); - // Disable composing to force two submissions - doc.preventCompose = true; - - backend.use('commit', function(request, next) { - if (!request.snapshot.data.tricks) return next(); - next(new Error('fail')); - }); - - var transaction = connection.startTransaction(); - - async.series([ - doc.create.bind(doc, {name: 'Gaspode'}), - doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), - function(next) { - doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, function(error) { - expect(error.message).to.equal('fail'); - next(); - }); - }, - function(next) { - transaction.commit(function(error) { - expect(error.code).to.equal('ERR_TRANSACTION_ABORTED'); - next(); - }); - }, - // TODO: Assert hard rollback - doc.destroy.bind(doc), - function(next) { - doc = connection.get('dogs', 'gaspode'); - doc.fetch(next); - }, - function(next) { - expect(doc.data).to.eql({name: 'Gaspode'}); + var transaction = connection.startTransaction(); + + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), + function(next) { + doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, function(error) { + expect(error.message).to.equal('fail'); next(); - } - ], done); - }); + }); + }, + function(next) { + transaction.commit(function(error) { + expect(error.code).to.equal('ERR_TRANSACTION_ABORTED'); + next(); + }); + }, + // TODO: Assert hard rollback + doc.destroy.bind(doc), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + next(); + } + ], done); }); }); } From ca7607c87abc987193982fecec60269f9f65ee9e Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Wed, 15 Jan 2025 14:41:20 +0000 Subject: [PATCH 3/7] hard rollback doc on abort --- lib/client/doc.js | 24 +++++++++++++++++++----- lib/client/transaction.js | 26 ++++++++++++-------------- test/client/transaction.js | 12 ++++++++++-- 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 56ab419e9..2b69ba3a6 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -121,6 +121,8 @@ function Doc(connection, collection, id) { // Internal counter that gets incremented every time doc.data is updated. // Used as a cheap way to check if doc.data has changed. this._dataStateVersion = 0; + + this._transaction = null; } emitter.mixin(Doc); @@ -868,10 +870,7 @@ Doc.prototype.submitOp = function(component, options, callback) { } options = options || {}; var op = {op: component}; - if (options.transaction) { - // TODO: Allow passing just ID - op.transaction = options.transaction.id; - } + this._setTransaction(op, options.transaction); var source = options.source; this._submit(op, source, callback); }; @@ -1112,4 +1111,19 @@ Doc.prototype._clearInflightOp = function(err) { if (err && !called) return this.emit('error', err); }; - +Doc.prototype._setTransaction = function(op, transaction) { + if (transaction && typeof transaction === 'object') transaction = transaction.id; + if (transaction) { + if (this._transaction && this._transaction.id !== transaction) { + throw new Error('Transaction already in progress. Commit transaction before starting a new one.'); + } + op.transaction = transaction; + this._transaction = this.connection._transactions[transaction]; + if (!this._transaction) { + throw new Error('Transaction not started'); + } + this._transaction.on('abort', this._hardRollback.bind(this)); + } else if (!this._transaction) { + throw new Error('Transaction in progress. Commit transaction before submitting ops outside transaction.'); + } +}; diff --git a/lib/client/transaction.js b/lib/client/transaction.js index 5d0704e66..548c5fa91 100644 --- a/lib/client/transaction.js +++ b/lib/client/transaction.js @@ -1,15 +1,9 @@ var emitter = require('../emitter'); +var ShareDBError = require('../error'); +var ERROR_CODE = ShareDBError.CODES; var idCounter = 1; -var STATE = { - pending: 'pending', - committing: 'committing', - aborting: 'aborting', - committed: 'committed', - aborted: 'aborted' -} - module.exports = Transaction; function Transaction(connection) { emitter.EventEmitter.call(this); @@ -18,24 +12,28 @@ function Transaction(connection) { this.id = (idCounter++).toString(); this._callback = null; - this._state = STATE.pending; } emitter.mixin(Transaction); Transaction.prototype.commit = function(callback) { // TODO: Catch multiple calls // TODO: Handle network changes - this._state = STATE.committing; this._callback = callback; this.connection._commitTransaction(this); }; Transaction.prototype.abort = function(callback) { - this._state = STATE.aborting; this._callback = callback; }; -Transaction.prototype._handleCommit = function(error, message) { - if (error) return this._callback(error); - this._callback(); +Transaction.prototype._handleCommit = function(error) { + if (typeof this._callback === 'function') this._callback(error); + else if (error) this.emit('error', error); + + if (!error) this.emit('commit'); + else if (error.code === ERROR_CODE.ERR_TRANSACTION_ABORTED) this.emit('abort', error); + + this.emit('end'); + // No more events will be emitted, so tidy up + this.removeAllListeners(); }; diff --git a/test/client/transaction.js b/test/client/transaction.js index ac4eaab04..86547bd3a 100644 --- a/test/client/transaction.js +++ b/test/client/transaction.js @@ -28,6 +28,7 @@ module.exports = function() { age: 3, tricks: ['fetch'] }); + expect(doc.data).to.eql(remoteDoc.data); next(); } ], done); @@ -37,6 +38,12 @@ module.exports = function() { var doc = connection.get('dogs', 'gaspode'); var remoteDoc = backend.connect().get('dogs', 'gaspode'); + var errorHandlerCalled = false; + doc.on('error', (error) => { + errorHandlerCalled = true; + expect(error.code).to.equal('ERR_TRANSACTION_ABORTED'); + }); + backend.use('commit', function(request, next) { if (!request.snapshot.data.tricks) return next(); next(new Error('fail')); @@ -59,11 +66,12 @@ module.exports = function() { next(); }); }, - // TODO: Assert hard rollback - doc.destroy.bind(doc), + doc.once.bind(doc, 'load'), remoteDoc.fetch.bind(remoteDoc), function(next) { + expect(errorHandlerCalled).to.be.true; expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + expect(doc.data).to.eql(remoteDoc.data); next(); } ], done); From 1ec0371bd065805d934e82eeb29395417e42099a Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Wed, 15 Jan 2025 15:48:45 +0000 Subject: [PATCH 4/7] del create test --- lib/client/doc.js | 19 ++++++++-- lib/client/transaction.js | 32 +++++++++------- test/client/transaction.js | 78 ++++++++++++++++++++++++++++++++------ 3 files changed, 102 insertions(+), 27 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 2b69ba3a6..f0d99e91b 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -339,6 +339,9 @@ Doc.prototype._handleOp = function(err, message) { // the remote, so that we're in a nice consistent state return this.fetch(this._clearInflightOp.bind(this)); } + if (this._transaction) { + return this._transaction._localAbort(err); + } if (this.inflightOp) { return this._rollback(err); } @@ -893,6 +896,7 @@ Doc.prototype.create = function(data, type, options, callback) { callback = options; options = null; } + options = options || {}; if (!type) { type = types.defaultType.uri; } @@ -902,6 +906,7 @@ Doc.prototype.create = function(data, type, options, callback) { return this.emit('error', err); } var op = {create: {type: type, data: data}}; + this._setTransaction(op, options.transaction); var source = options && options.source; this._submit(op, source, callback); }; @@ -918,12 +923,14 @@ Doc.prototype.del = function(options, callback) { callback = options; options = null; } + options = options || {}; if (!this.type) { var err = new ShareDBError(ERROR_CODE.ERR_DOC_DOES_NOT_EXIST, 'Document does not exist'); if (callback) return callback(err); return this.emit('error', err); } var op = {del: true}; + this._setTransaction(op, options.transaction); var source = options && options.source; this._submit(op, source, callback); }; @@ -1114,16 +1121,22 @@ Doc.prototype._clearInflightOp = function(err) { Doc.prototype._setTransaction = function(op, transaction) { if (transaction && typeof transaction === 'object') transaction = transaction.id; if (transaction) { - if (this._transaction && this._transaction.id !== transaction) { + op.transaction = transaction; + + if (this._transaction) { + if (this._transaction.id === transaction) return; throw new Error('Transaction already in progress. Commit transaction before starting a new one.'); } - op.transaction = transaction; this._transaction = this.connection._transactions[transaction]; if (!this._transaction) { throw new Error('Transaction not started'); } + if (!this._transaction._writeable) { + throw new Error('Transaction is no longer writeable'); + } + // TODO: Tidy up listener this._transaction.on('abort', this._hardRollback.bind(this)); - } else if (!this._transaction) { + } else if (this._transaction) { throw new Error('Transaction in progress. Commit transaction before submitting ops outside transaction.'); } }; diff --git a/lib/client/transaction.js b/lib/client/transaction.js index 548c5fa91..1cb5f6c90 100644 --- a/lib/client/transaction.js +++ b/lib/client/transaction.js @@ -1,7 +1,5 @@ var emitter = require('../emitter'); -var ShareDBError = require('../error'); -var ERROR_CODE = ShareDBError.CODES; var idCounter = 1; module.exports = Transaction; @@ -11,29 +9,37 @@ function Transaction(connection) { this.connection = connection; this.id = (idCounter++).toString(); - this._callback = null; + this._callbacks = []; + this._writeable = true; } emitter.mixin(Transaction); Transaction.prototype.commit = function(callback) { // TODO: Catch multiple calls // TODO: Handle network changes - this._callback = callback; + this._callbacks.push(callback); + this._writeable = false; this.connection._commitTransaction(this); }; Transaction.prototype.abort = function(callback) { - this._callback = callback; + this._callbacks.push(callback); + this._writeable = false; }; Transaction.prototype._handleCommit = function(error) { - if (typeof this._callback === 'function') this._callback(error); - else if (error) this.emit('error', error); - - if (!error) this.emit('commit'); - else if (error.code === ERROR_CODE.ERR_TRANSACTION_ABORTED) this.emit('abort', error); + this._writeable = false; + // TODO: Handle callbacks + if (error) this._localAbort(error); + else this.emit('commit'); + + var callbacks = this._callbacks; + this._callbacks = []; + if (!callbacks.length) this.emit('error', error); + for (var callback of callbacks) callback(error); +}; - this.emit('end'); - // No more events will be emitted, so tidy up - this.removeAllListeners(); +Transaction.prototype._localAbort = function(error) { + this._writeable = false; + this.emit('abort', error); }; diff --git a/test/client/transaction.js b/test/client/transaction.js index 86547bd3a..504d2ef1b 100644 --- a/test/client/transaction.js +++ b/test/client/transaction.js @@ -20,6 +20,11 @@ module.exports = function() { doc.create.bind(doc, {name: 'Gaspode'}), doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), doc.submitOp.bind(doc, [{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + next(); + }, transaction.commit.bind(transaction), remoteDoc.fetch.bind(remoteDoc), function(next) { @@ -38,11 +43,10 @@ module.exports = function() { var doc = connection.get('dogs', 'gaspode'); var remoteDoc = backend.connect().get('dogs', 'gaspode'); - var errorHandlerCalled = false; - doc.on('error', (error) => { - errorHandlerCalled = true; - expect(error.code).to.equal('ERR_TRANSACTION_ABORTED'); - }); + // TODO: Discuss if this is an acceptable API? Doc will always emit error on + // a failed transaction, since the ops may have been successfully acked for this Doc, and + // we force a hard rollback with no callback, which causes an 'error' event + doc.on('error', () => {}); backend.use('commit', function(request, next) { if (!request.snapshot.data.tricks) return next(); @@ -57,19 +61,71 @@ module.exports = function() { function(next) { doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, function(error) { expect(error.message).to.equal('fail'); - next(); }); + doc.once('load', next); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('deletes and creates as part of a transaction', function(done) { + var doc = connection.get('dogs', 'gaspode'); + var remoteDoc = backend.connect().get('dogs', 'gaspode'); + + doc.on('error', () => {}); + + var transaction = connection.startTransaction(); + + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + doc.del.bind(doc, {transaction: transaction}), + doc.create.bind(doc, {name: 'Recreated'}, 'json0', {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function (next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Recreated'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('does not delete if the following create fails', function(done) { + var doc = connection.get('dogs', 'gaspode'); + var remoteDoc = backend.connect().get('dogs', 'gaspode'); + + doc.on('error', () => {}); + + var transaction = connection.startTransaction(); + + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + function(next) { + backend.use('commit', function(request, next) { + var error = request.op.create ? new Error('Create not allowed') : null; + next(error); + }); + next(); }, + doc.del.bind(doc, {transaction: transaction}), function(next) { - transaction.commit(function(error) { - expect(error.code).to.equal('ERR_TRANSACTION_ABORTED'); - next(); + doc.create({name: 'Recreated'}, 'json0', {transaction: transaction}, function(error) { + expect(error.message).to.equal('Create not allowed'); }); + doc.once('load', next); }, - doc.once.bind(doc, 'load'), remoteDoc.fetch.bind(remoteDoc), function(next) { - expect(errorHandlerCalled).to.be.true; expect(remoteDoc.data).to.eql({name: 'Gaspode'}); expect(doc.data).to.eql(remoteDoc.data); next(); From b0647537b9a301647927eea87ec0de7106eaeddc Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Wed, 15 Jan 2025 18:12:36 +0000 Subject: [PATCH 5/7] remote race fix --- lib/client/doc.js | 2 +- lib/submit-request.js | 22 +- lib/transaction/aborted-transaction.js | 2 +- lib/transaction/pending-transaction.js | 26 +++ lib/transaction/transaction.js | 9 +- test/client/transaction.js | 307 ++++++++++++++++--------- 6 files changed, 244 insertions(+), 124 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index f0d99e91b..ed1475a90 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -1134,7 +1134,7 @@ Doc.prototype._setTransaction = function(op, transaction) { if (!this._transaction._writeable) { throw new Error('Transaction is no longer writeable'); } - // TODO: Tidy up listener + // TODO: Tidy up listener on commit/abort this._transaction.on('abort', this._hardRollback.bind(this)); } else if (this._transaction) { throw new Error('Transaction in progress. Commit transaction before submitting ops outside transaction.'); diff --git a/lib/submit-request.js b/lib/submit-request.js index d584897bb..6520979f8 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -95,6 +95,7 @@ SubmitRequest.prototype.$fixup = function(op) { SubmitRequest.prototype.submit = function(callback) { var request = this; + this._succeeded = false; var backend = this.backend; var collection = this.collection; var id = this.id; @@ -116,13 +117,11 @@ SubmitRequest.prototype.submit = function(callback) { backend.db.getSnapshot(collection, id, fields, snapshotOptions, function(err, snapshot) { if (err) return callback(err); + var transactionOps = []; if (request.transaction) { // Get other ops in the same transaction that have not yet been committed to the DB - const ops = request.transaction.pendingOps(collection, id).filter(function(o) { - return o.v < op.v; - }) - // TODO: Handle remote ops - var error = ot.applyOps(snapshot, ops); + var transactionOps = request.transaction.pendingOpsUntil(request); + var error = ot.applyOps(snapshot, transactionOps); if (error) return callback(error); } @@ -169,15 +168,18 @@ SubmitRequest.prototype.submit = function(callback) { return callback(request.newerVersionError()); } + var from = op.v - transactionOps.length; + var to = snapshot.v - transactionOps.length; // Transform the op up to the current snapshot version, then apply - var from = op.v; - backend.db.getOpsToSnapshot(collection, id, from, snapshot, {metadata: true}, function(err, ops) { + backend.db.getOps(collection, id, from, to, {metadata: true}, function(err, ops) { if (err) return callback(err); - if (ops.length !== snapshot.v - from) { + if (ops.length !== to - from) { return callback(request.missingOpsError()); } + for (var o of ops) o.v = o.v + transactionOps.length; + err = request._transformOp(ops); if (err) return callback(err); @@ -258,7 +260,9 @@ SubmitRequest.prototype.commit = function(callback) { if (!succeeded) { // Between our fetch and our call to commit, another client committed an // operation. We expect this to be relatively infrequent but normal. - return request.retry(callback); + // TODO: What if multiple docs fail and trigger a retry? + if (request.transaction) return request.transaction.retry(callback); + else return request.retry(callback); } request._succeeded = !!succeeded; if (request.transaction) request.transaction.update(); diff --git a/lib/transaction/aborted-transaction.js b/lib/transaction/aborted-transaction.js index b57ef8434..59f582e27 100644 --- a/lib/transaction/aborted-transaction.js +++ b/lib/transaction/aborted-transaction.js @@ -6,7 +6,7 @@ function AbortedTransaction(transaction) { this._transaction = transaction; var self = this; - transaction.backend.db.abortTransaction(this.id, function(abortError) { + transaction.backend.db.abortTransaction(this._transaction.id, function(abortError) { self._abortCallbacks(abortError && abortError.message); }); } diff --git a/lib/transaction/pending-transaction.js b/lib/transaction/pending-transaction.js index 8797769ab..bf3016d4b 100644 --- a/lib/transaction/pending-transaction.js +++ b/lib/transaction/pending-transaction.js @@ -4,6 +4,7 @@ const CommittedTransaction = require("./committed-transaction"); function PendingTransaction(transaction) { this._transaction = transaction; this._wantsCommit = false; + this._retryCallbacks = null; } module.exports = PendingTransaction; @@ -21,6 +22,31 @@ PendingTransaction.prototype.registerSubmitRequest = function(request) { this.update(); }; +PendingTransaction.prototype.retry = function(callback) { + if (Array.isArray(this._retryCallbacks)) return this._retryCallbacks.push(callback); + this._retryCallbacks = [callback]; + + var state = this; + var cb = function (error) { + var callbacks = state._retryCallbacks; + state._retryCallbacks = null; + for (var callback of callbacks) callback(error); + } + + var transaction = this._transaction; + transaction.backend.db.abortTransaction(this._transaction.id, function(error) { + if (error) return cb(error); + var requests = transaction._requests.slice(); + var retryNext = function(error) { + if (error) return cb(error); + var request = requests.shift(); + if (!request) return cb(); + request.retry(retryNext); + } + retryNext(); + }); +}; + PendingTransaction.prototype.update = function() { if (!this._shouldCommit()) return; this._transaction._state = new CommittedTransaction(this._transaction); diff --git a/lib/transaction/transaction.js b/lib/transaction/transaction.js index b31320911..0eff17d3e 100644 --- a/lib/transaction/transaction.js +++ b/lib/transaction/transaction.js @@ -29,9 +29,16 @@ Transaction.prototype.update = function() { this._state.update(); }; -Transaction.prototype.pendingOps = function(collection, id) { +Transaction.prototype.retry = function(callback) { + this._state.retry(callback); +}; + +Transaction.prototype.pendingOpsUntil = function(untilRequest) { + var collection = untilRequest.collection; + var id = untilRequest.id; var ops = []; for (var request of this._requests) { + if (request === untilRequest) break; if (request.collection === collection && request.id === id) { ops.push(request.op); } diff --git a/test/client/transaction.js b/test/client/transaction.js index 504d2ef1b..a2516d601 100644 --- a/test/client/transaction.js +++ b/test/client/transaction.js @@ -1,6 +1,8 @@ var async = require('async'); var expect = require('chai').expect; +var idCounter = 0; + module.exports = function() { describe.only('transaction', function() { var backend; @@ -11,126 +13,207 @@ module.exports = function() { connection = backend.connect(); }); - it('commits two ops as a transaction', function(done) { - var doc = connection.get('dogs', 'gaspode'); - var remoteDoc = backend.connect().get('dogs', 'gaspode'); - var transaction = connection.startTransaction(); - - async.series([ - doc.create.bind(doc, {name: 'Gaspode'}), - doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), - doc.submitOp.bind(doc, [{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}), - remoteDoc.fetch.bind(remoteDoc), - function(next) { - expect(remoteDoc.data).to.eql({name: 'Gaspode'}); - next(); - }, - transaction.commit.bind(transaction), - remoteDoc.fetch.bind(remoteDoc), - function(next) { - expect(remoteDoc.data).to.eql({ - name: 'Gaspode', - age: 3, - tricks: ['fetch'] - }); - expect(doc.data).to.eql(remoteDoc.data); - next(); - } - ], done); - }); + describe('single Doc', function() { + var id; + var doc; + var remoteDoc; + var transaction; + + beforeEach(function() { + id = (idCounter++).toString(); + doc = connection.get('dogs', id); + remoteDoc = backend.connect().get('dogs', id); + transaction = connection.startTransaction(); + + // TODO: Discuss if this is an acceptable API? Doc will always emit error on + // a failed transaction, since the ops may have been successfully acked for this Doc, and + // we force a hard rollback with no callback, which causes an 'error' event + doc.on('error', function() {}); + }); - it('does not commit the first op if the second op fails', function(done) { - var doc = connection.get('dogs', 'gaspode'); - var remoteDoc = backend.connect().get('dogs', 'gaspode'); + it('commits two ops as a transaction', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), + doc.submitOp.bind(doc, [{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({ + name: 'Gaspode', + age: 3, + tricks: ['fetch'] + }); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); - // TODO: Discuss if this is an acceptable API? Doc will always emit error on - // a failed transaction, since the ops may have been successfully acked for this Doc, and - // we force a hard rollback with no callback, which causes an 'error' event - doc.on('error', () => {}); + it('does not commit the first op if the second op fails', function(done) { + backend.use('commit', function(request, next) { + if (!request.snapshot.data.tricks) return next(); + next(new Error('fail')); + }); + + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), + function(next) { + doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, function(error) { + expect(error.message).to.equal('fail'); + }); + doc.once('load', next); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); - backend.use('commit', function(request, next) { - if (!request.snapshot.data.tricks) return next(); - next(new Error('fail')); + it('deletes and creates as part of a transaction', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + doc.del.bind(doc, {transaction: transaction}), + doc.create.bind(doc, {name: 'Recreated'}, 'json0', {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function (next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Recreated'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); }); - var transaction = connection.startTransaction(); - - async.series([ - doc.create.bind(doc, {name: 'Gaspode'}), - doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), - function(next) { - doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, function(error) { - expect(error.message).to.equal('fail'); - }); - doc.once('load', next); - }, - remoteDoc.fetch.bind(remoteDoc), - function(next) { - expect(remoteDoc.data).to.eql({name: 'Gaspode'}); - expect(doc.data).to.eql(remoteDoc.data); - next(); - } - ], done); - }); + it('does not delete if the following create fails', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + function(next) { + backend.use('commit', function(request, next) { + var error = request.op.create ? new Error('Create not allowed') : null; + next(error); + }); + next(); + }, + doc.del.bind(doc, {transaction: transaction}), + function(next) { + doc.create({name: 'Recreated'}, 'json0', {transaction: transaction}, function(error) { + expect(error.message).to.equal('Create not allowed'); + }); + doc.once('load', next); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('transaction is behind remote', function(done) { + async.series([ + doc.create.bind(doc, {tricks: ['fetch']}), + remoteDoc.fetch.bind(remoteDoc), + remoteDoc.submitOp.bind(remoteDoc, [{p: ['tricks', 0], ld: 'fetch'}]), + doc.submitOp.bind(doc, [{p: ['tricks', 1], li: 'sit'}], {transaction: transaction}), + doc.submitOp.bind(doc, [{p: ['tricks', 2], li: 'shake'}], {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: []}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: ['sit', 'shake']}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); - it('deletes and creates as part of a transaction', function(done) { - var doc = connection.get('dogs', 'gaspode'); - var remoteDoc = backend.connect().get('dogs', 'gaspode'); - - doc.on('error', () => {}); - - var transaction = connection.startTransaction(); - - async.series([ - doc.create.bind(doc, {name: 'Gaspode'}), - doc.del.bind(doc, {transaction: transaction}), - doc.create.bind(doc, {name: 'Recreated'}, 'json0', {transaction: transaction}), - remoteDoc.fetch.bind(remoteDoc), - function (next) { - expect(remoteDoc.data).to.eql({name: 'Gaspode'}); - next(); - }, - transaction.commit.bind(transaction), - remoteDoc.fetch.bind(remoteDoc), - function(next) { - expect(remoteDoc.data).to.eql({name: 'Recreated'}); - expect(doc.data).to.eql(remoteDoc.data); - next(); - } - ], done); + it('remote submits after but commits first', function(done) { + async.series([ + doc.create.bind(doc, {tricks: ['fetch']}), + remoteDoc.fetch.bind(remoteDoc), + doc.submitOp.bind(doc, [{p: ['tricks', 1], li: 'sit'}], {transaction: transaction}), + remoteDoc.submitOp.bind(remoteDoc, [{p: ['tricks', 0], ld: 'fetch'}]), + doc.submitOp.bind(doc, [{p: ['tricks', 2], li: 'shake'}], {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: []}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: ['sit', 'shake']}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); }); - it('does not delete if the following create fails', function(done) { - var doc = connection.get('dogs', 'gaspode'); - var remoteDoc = backend.connect().get('dogs', 'gaspode'); - - doc.on('error', () => {}); - - var transaction = connection.startTransaction(); - - async.series([ - doc.create.bind(doc, {name: 'Gaspode'}), - function(next) { - backend.use('commit', function(request, next) { - var error = request.op.create ? new Error('Create not allowed') : null; - next(error); - }); - next(); - }, - doc.del.bind(doc, {transaction: transaction}), - function(next) { - doc.create({name: 'Recreated'}, 'json0', {transaction: transaction}, function(error) { - expect(error.message).to.equal('Create not allowed'); - }); - doc.once('load', next); - }, - remoteDoc.fetch.bind(remoteDoc), - function(next) { - expect(remoteDoc.data).to.eql({name: 'Gaspode'}); - expect(doc.data).to.eql(remoteDoc.data); - next(); - } - ], done); + describe('multiple Docs', function() { + it('rolls back multiple Docs if one commit fails', function(done) { + var id1 = (idCounter++).toString(); + var id2 = (idCounter++).toString(); + var doc1 = connection.get('dogs', id1); + var doc2 = connection.get('dogs', id2); + var remoteDoc1 = backend.connect().get('dogs', id1); + var remoteDoc2 = backend.connect().get('dogs', id2); + + var transaction = connection.startTransaction(); + + // Doc1 will throw even though its op is accepted, since the + // whole transaction is rejected + doc1.on('error', function() {}); + doc2.on('error', function() {}); + + async.series([ + doc1.create.bind(doc1, {name: 'Gaspode'}), + doc2.create.bind(doc2, {name: 'Snoopy'}), + function(next) { + backend.use('commit', function(request, next) { + var error = request.id === id2 ? new Error('fail') : null; + next(error); + }); + next(); + }, + doc1.submitOp.bind(doc1, [{p: ['age'], oi: 3}], {transaction: transaction}), + function(next) { + doc2.submitOp([{p: ['age'], oi: 4}], {transaction: transaction}, function(error) { + expect(error.message).to.equal('fail'); + }); + doc2.once('load', next); + }, + remoteDoc1.fetch.bind(remoteDoc1), + remoteDoc2.fetch.bind(remoteDoc2), + function(next) { + expect(remoteDoc1.data).to.eql({name: 'Gaspode'}); + expect(remoteDoc2.data).to.eql({name: 'Snoopy'}); + expect(doc1.data).to.eql(remoteDoc1.data); + expect(doc2.data).to.eql(remoteDoc2.data); + next(); + } + ], done); + }); }); }); } From fcbbd3fd388ba7786a8bc57d0fd5a59eaa5f3175 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Mon, 20 Jan 2025 17:08:32 +0000 Subject: [PATCH 6/7] mongodb deadlock tweaks --- lib/db/memory.js | 26 +++++++++++++++----------- lib/transaction/pending-transaction.js | 25 +++++++++++++++++-------- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/lib/db/memory.js b/lib/db/memory.js index 3e0a8ce5f..9be9371b3 100644 --- a/lib/db/memory.js +++ b/lib/db/memory.js @@ -48,7 +48,8 @@ MemoryDB.prototype.commit = function(collection, id, op, snapshot, options, call var transaction; if (options.transaction) { - transaction = db._getTransaction(options.transaction); + transaction = db._transactions[options.transaction]; + if (!transaction) return callback(null, false); version = version + transaction.commits.length; } @@ -149,14 +150,23 @@ MemoryDB.prototype.query = function(collection, query, fields, options, callback }); }; +MemoryDB.prototype.startTransaction = function(id, callback) { + if (this._transactions[id]) callback(new Error('Transaction already started')); + this._transactions[id] = this._transactions[id] || {}; + this._transactions[id].commits = []; + callback(); +}; + +MemoryDB.prototype.restartTransaction = function(id, callback) { + delete this._transactions[id]; + this.startTransaction(id, callback); +}; + MemoryDB.prototype.commitTransaction = function(id, callback) { var transaction = this._transactions[id]; - if (!transaction) { - throw new Error('No transaction found'); - } + if (!transaction) return callback(); delete this._transactions[id]; - // transaction.callbacks.push(callback); // Heavy-handed but effective approach to synchronous transaction writing: // let's just save the DB state before and restore if the transaction fails @@ -235,9 +245,3 @@ MemoryDB.prototype._getVersionSync = function(collection, id) { var collectionOps = this.ops[collection]; return (collectionOps && collectionOps[id] && collectionOps[id].length) || 0; }; - -MemoryDB.prototype._getTransaction = function(id) { - var transaction = this._transactions[id] = this._transactions[id] || {}; - transaction.commits = transaction.commits || []; - return transaction; -} diff --git a/lib/transaction/pending-transaction.js b/lib/transaction/pending-transaction.js index bf3016d4b..c438b04dd 100644 --- a/lib/transaction/pending-transaction.js +++ b/lib/transaction/pending-transaction.js @@ -18,6 +18,11 @@ PendingTransaction.prototype.abort = function() { }; PendingTransaction.prototype.registerSubmitRequest = function(request) { + if (!this._transaction._requests.length) { + this._transaction.backend.db.startTransaction(this._transaction.id, function(error) { + // TODO: Handle errors / wait for callback + }); + } this._transaction._requests.push(request); this.update(); }; @@ -34,16 +39,20 @@ PendingTransaction.prototype.retry = function(callback) { } var transaction = this._transaction; - transaction.backend.db.abortTransaction(this._transaction.id, function(error) { + var db = transaction.backend.db; + db.abortTransaction(this._transaction.id, function(error) { if (error) return cb(error); - var requests = transaction._requests.slice(); - var retryNext = function(error) { + db.restartTransaction(transaction.id, function(error) { if (error) return cb(error); - var request = requests.shift(); - if (!request) return cb(); - request.retry(retryNext); - } - retryNext(); + var requests = transaction._requests.slice(); + var retryNext = function(error) { + if (error) return cb(error); + var request = requests.shift(); + if (!request) return cb(); + request.retry(retryNext); + } + retryNext(); + }); }); }; From 0aa4adc0a5f1d9d5d423c5785b6a2e58462c4b82 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Tue, 21 Jan 2025 16:10:31 +0000 Subject: [PATCH 7/7] remove unused commit code --- lib/backend.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/backend.js b/lib/backend.js index b760ca6d1..dcc95aa33 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -931,10 +931,6 @@ Backend.prototype.transformPresenceToLatestVersion = function(agent, presence, c }); }; -Backend.prototype._commitTransaction = function(id, seqs, callback) { - -}; - function pluckIds(snapshots) { var ids = []; for (var i = 0; i < snapshots.length; i++) {