diff --git a/lib/agent.js b/lib/agent.js index a07650c8a..65a5bb778 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -70,6 +70,8 @@ function Agent(backend, stream) { this._firstReceivedMessage = null; this._handshakeReceived = false; + this._transactions = Object.create(null); + // Send the legacy message to initialize old clients with the random agent Id this.send(this._initMessage(ACTIONS.initLegacy)); } @@ -470,7 +472,7 @@ Agent.prototype._handleMessage = function(request, callback) { )); } if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message')); - return this._submit(request.c, request.d, op, callback); + return this._submit(request.c, request.d, op, request.t, callback); case ACTIONS.snapshotFetch: return this._fetchSnapshot(request.c, request.d, request.v, callback); case ACTIONS.snapshotFetchByTimestamp: @@ -494,6 +496,8 @@ Agent.prototype._handleMessage = function(request, callback) { return this._requestPresence(request.ch, callback); case ACTIONS.pingPong: return this._pingPong(callback); + case ACTIONS.transactionCommit: + return this._commitTransaction(request, callback); default: callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')); } @@ -761,9 +765,12 @@ Agent.prototype._unsubscribeBulk = function(collection, ids, callback) { util.nextTick(callback); }; -Agent.prototype._submit = function(collection, id, op, callback) { +Agent.prototype._submit = function(collection, id, op, transactionId, callback) { var agent = this; - this.backend.submit(this, collection, id, op, null, function(err, ops, request) { + var options = { + transaction: transactionId + }; + this.backend.submit(this, collection, id, op, options, function(err, ops, request) { // Message to acknowledge the op was successfully submitted var ack = {src: op.src, seq: op.seq, v: op.v}; if (request._fixupOps.length) ack[ACTIONS.fixup] = request._fixupOps; @@ -993,6 +1000,16 @@ function createClientOp(request, clientId) { undefined; } +Agent.prototype._commitTransaction = function(request, callback) { + var transaction = this._transactions[request.id]; + transaction.commit(callback); +}; + +Agent.prototype._abortTransaction = function(request, callback) { + var transaction = this._transactions[request.id]; + transaction.abort(callback); +}; + function shallowCopy(object) { var out = {}; for (var key in object) { diff --git a/lib/client/connection.js b/lib/client/connection.js index da89dfef5..cceb3de70 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -12,6 +12,7 @@ var util = require('../util'); var logger = require('../logger'); var DocPresenceEmitter = require('./presence/doc-presence-emitter'); var protocol = require('../protocol'); +var Transaction = require('./transaction'); var ERROR_CODE = ShareDBError.CODES; @@ -64,6 +65,8 @@ function Connection(socket) { // A unique message number for presence this._presenceSeq = 1; + this._transactions = Object.create(null); + // Equals agent.src on the server this.id = null; @@ -258,6 +261,8 @@ Connection.prototype.handleMessage = function(message) { return this._handlePresenceRequest(err, message); case ACTIONS.pingPong: return this._handlePingPong(err); + case ACTIONS.transactionCommit: + return this._handleTransactionCommit(err, message); default: logger.warn('Ignoring unrecognized message', message); @@ -467,6 +472,7 @@ Connection.prototype.sendOp = function(doc, op) { if (op.create) message.create = op.create; if (op.del) message.del = op.del; if (doc.submitSource) message.x.source = op.source; + if (op.transaction) message.t = op.transaction; this.send(message); }; @@ -782,6 +788,26 @@ Connection.prototype._initialize = function(message) { this._setState('connected'); }; +Connection.prototype.startTransaction = function() { + var transaction = new Transaction(this); + return this._transactions[transaction.id] = transaction; +}; + +Connection.prototype._commitTransaction = function(transaction) { + this.send({a: ACTIONS.transactionCommit, id: transaction.id}); +}; + +Connection.prototype._abortTransaction = function(transaction) { + this.send({a: ACTIONS.transactionAbort, id: transaction.id}); +}; + +Connection.prototype._handleTransactionCommit = function(error, message) { + var transaction = this._transactions[message.id]; + if (!transaction) return; + transaction._handleCommit(error, message); + delete this._transactions[message.id]; +}; + Connection.prototype.getPresence = function(channel) { var connection = this; var presence = util.digOrCreate(this._presences, channel, function() { diff --git a/lib/client/doc.js b/lib/client/doc.js index acb457374..ed1475a90 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -121,6 +121,8 @@ function Doc(connection, collection, id) { // Internal counter that gets incremented every time doc.data is updated. // Used as a cheap way to check if doc.data has changed. this._dataStateVersion = 0; + + this._transaction = null; } emitter.mixin(Doc); @@ -337,6 +339,9 @@ Doc.prototype._handleOp = function(err, message) { // the remote, so that we're in a nice consistent state return this.fetch(this._clearInflightOp.bind(this)); } + if (this._transaction) { + return this._transaction._localAbort(err); + } if (this.inflightOp) { return this._rollback(err); } @@ -763,6 +768,7 @@ Doc.prototype._submit = function(op, source, callback) { this._pushOp(op, source, callback); this._otApply(op, source); } catch (error) { + // TODO: Abort transaction return this._hardRollback(error); } @@ -865,8 +871,10 @@ Doc.prototype.submitOp = function(component, options, callback) { callback = options; options = null; } + options = options || {}; var op = {op: component}; - var source = options && options.source; + this._setTransaction(op, options.transaction); + var source = options.source; this._submit(op, source, callback); }; @@ -888,6 +896,7 @@ Doc.prototype.create = function(data, type, options, callback) { callback = options; options = null; } + options = options || {}; if (!type) { type = types.defaultType.uri; } @@ -897,6 +906,7 @@ Doc.prototype.create = function(data, type, options, callback) { return this.emit('error', err); } var op = {create: {type: type, data: data}}; + this._setTransaction(op, options.transaction); var source = options && options.source; this._submit(op, source, callback); }; @@ -913,12 +923,14 @@ Doc.prototype.del = function(options, callback) { callback = options; options = null; } + options = options || {}; if (!this.type) { var err = new ShareDBError(ERROR_CODE.ERR_DOC_DOES_NOT_EXIST, 'Document does not exist'); if (callback) return callback(err); return this.emit('error', err); } var op = {del: true}; + this._setTransaction(op, options.transaction); var source = options && options.source; this._submit(op, source, callback); }; @@ -1106,4 +1118,25 @@ Doc.prototype._clearInflightOp = function(err) { if (err && !called) return this.emit('error', err); }; +Doc.prototype._setTransaction = function(op, transaction) { + if (transaction && typeof transaction === 'object') transaction = transaction.id; + if (transaction) { + op.transaction = transaction; + if (this._transaction) { + if (this._transaction.id === transaction) return; + throw new Error('Transaction already in progress. Commit transaction before starting a new one.'); + } + this._transaction = this.connection._transactions[transaction]; + if (!this._transaction) { + throw new Error('Transaction not started'); + } + if (!this._transaction._writeable) { + throw new Error('Transaction is no longer writeable'); + } + // TODO: Tidy up listener on commit/abort + this._transaction.on('abort', this._hardRollback.bind(this)); + } else if (this._transaction) { + throw new Error('Transaction in progress. Commit transaction before submitting ops outside transaction.'); + } +}; diff --git a/lib/client/transaction.js b/lib/client/transaction.js new file mode 100644 index 000000000..1cb5f6c90 --- /dev/null +++ b/lib/client/transaction.js @@ -0,0 +1,45 @@ +var emitter = require('../emitter'); + +var idCounter = 1; + +module.exports = Transaction; +function Transaction(connection) { + emitter.EventEmitter.call(this); + + this.connection = connection; + this.id = (idCounter++).toString(); + + this._callbacks = []; + this._writeable = true; +} +emitter.mixin(Transaction); + +Transaction.prototype.commit = function(callback) { + // TODO: Catch multiple calls + // TODO: Handle network changes + this._callbacks.push(callback); + this._writeable = false; + this.connection._commitTransaction(this); +}; + +Transaction.prototype.abort = function(callback) { + this._callbacks.push(callback); + this._writeable = false; +}; + +Transaction.prototype._handleCommit = function(error) { + this._writeable = false; + // TODO: Handle callbacks + if (error) this._localAbort(error); + else this.emit('commit'); + + var callbacks = this._callbacks; + this._callbacks = []; + if (!callbacks.length) this.emit('error', error); + for (var callback of callbacks) callback(error); +}; + +Transaction.prototype._localAbort = function(error) { + this._writeable = false; + this.emit('abort', error); +}; diff --git a/lib/db/memory.js b/lib/db/memory.js index 2db274d6e..9be9371b3 100644 --- a/lib/db/memory.js +++ b/lib/db/memory.js @@ -25,6 +25,8 @@ function MemoryDB(options) { this.ops = Object.create(null); this.closed = false; + + this._transactions = Object.create(null); }; module.exports = MemoryDB; @@ -39,20 +41,35 @@ MemoryDB.prototype.close = function(callback) { // callback(err, succeeded) MemoryDB.prototype.commit = function(collection, id, op, snapshot, options, callback) { var db = this; + options = options || {}; if (typeof callback !== 'function') throw new Error('Callback required'); util.nextTick(function() { var version = db._getVersionSync(collection, id); + + var transaction; + if (options.transaction) { + transaction = db._transactions[options.transaction]; + if (!transaction) return callback(null, false); + version = version + transaction.commits.length; + } + if (snapshot.v !== version + 1) { var succeeded = false; return callback(null, succeeded); } - var err = db._writeOpSync(collection, id, op); - if (err) return callback(err); - err = db._writeSnapshotSync(collection, id, snapshot); - if (err) return callback(err); - var succeeded = true; - callback(null, succeeded); + var commit = function() { + var err = db._writeOpSync(collection, id, op); + if (err) return err; + err = db._writeSnapshotSync(collection, id, snapshot); + if (err) return err; + }; + + var error; + if (!transaction) error = commit(); + callback(error, !error); + + if (transaction) transaction.commits.push(commit); }); }; @@ -133,6 +150,47 @@ MemoryDB.prototype.query = function(collection, query, fields, options, callback }); }; +MemoryDB.prototype.startTransaction = function(id, callback) { + if (this._transactions[id]) callback(new Error('Transaction already started')); + this._transactions[id] = this._transactions[id] || {}; + this._transactions[id].commits = []; + callback(); +}; + +MemoryDB.prototype.restartTransaction = function(id, callback) { + delete this._transactions[id]; + this.startTransaction(id, callback); +}; + +MemoryDB.prototype.commitTransaction = function(id, callback) { + var transaction = this._transactions[id]; + if (!transaction) return callback(); + + delete this._transactions[id]; + + // Heavy-handed but effective approach to synchronous transaction writing: + // let's just save the DB state before and restore if the transaction fails + var docs = JSON.stringify(this.docs); + var ops = JSON.stringify(this.ops); + + for (var commit of transaction.commits) { + var error = commit(); + + if (error) { + this.docs = JSON.parse(docs); + this.ops = JSON.parse(ops); + break; + } + } + + callback(error, !error); +}; + +MemoryDB.prototype.abortTransaction = function(id, callback) { + delete this._transactions[id]; + callback(null); +}; + // For testing, it may be useful to implement the desired query // language by defining this function. Returns an object with // two properties: diff --git a/lib/error.js b/lib/error.js index cde95bdbe..3c2269b32 100644 --- a/lib/error.js +++ b/lib/error.js @@ -72,6 +72,7 @@ ShareDBError.CODES = { */ ERR_SNAPSHOT_READS_REJECTED: 'ERR_SNAPSHOT_READS_REJECTED', ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND: 'ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND', + ERR_TRANSACTION_ABORTED: 'ERR_TRANSACTION_ABORTED', ERR_TYPE_CANNOT_BE_PROJECTED: 'ERR_TYPE_CANNOT_BE_PROJECTED', ERR_TYPE_DOES_NOT_SUPPORT_COMPOSE: 'ERR_TYPE_DOES_NOT_SUPPORT_COMPOSE', ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE: 'ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE', diff --git a/lib/message-actions.js b/lib/message-actions.js index e1a8e9942..ab3ca59f5 100644 --- a/lib/message-actions.js +++ b/lib/message-actions.js @@ -15,6 +15,8 @@ exports.ACTIONS = { op: 'op', snapshotFetch: 'nf', snapshotFetchByTimestamp: 'nt', + transactionAbort: 'ta', + transactionCommit: 'tc', pingPong: 'pp', presence: 'p', presenceSubscribe: 'ps', diff --git a/lib/submit-request.js b/lib/submit-request.js index 8e6715123..6520979f8 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -3,6 +3,8 @@ var projections = require('./projections'); var ShareDBError = require('./error'); var types = require('./types'); var protocol = require('./protocol'); +var util = require('./util'); +var Transaction = require('./transaction/transaction'); var ERROR_CODE = ShareDBError.CODES; @@ -16,7 +18,7 @@ function SubmitRequest(backend, agent, index, id, op, options) { this.collection = (projection) ? projection.target : index; this.id = id; this.op = op; - this.options = options; + this.options = options || {}; this.extra = op.x; delete op.x; @@ -43,6 +45,14 @@ function SubmitRequest(backend, agent, index, id, op, options) { this.ops = []; this.channels = null; this._fixupOps = []; + + this._succeeded = false; + if (this.options.transaction) { + this.transaction = util.digOrCreate(agent._transactions, options.transaction, function() { + return new Transaction(agent, options.transaction); + }); + this.transaction.registerSubmitRequest(this); + } } module.exports = SubmitRequest; @@ -85,6 +95,7 @@ SubmitRequest.prototype.$fixup = function(op) { SubmitRequest.prototype.submit = function(callback) { var request = this; + this._succeeded = false; var backend = this.backend; var collection = this.collection; var id = this.id; @@ -93,11 +104,27 @@ SubmitRequest.prototype.submit = function(callback) { // With a null projection, it strips document metadata var fields = {$submit: true}; + if (request.transaction) { + var originalCallback = callback; + callback = function(error) { + if (error) request.transaction.abort(); + originalCallback(error); + } + } + var snapshotOptions = {}; snapshotOptions.agentCustom = request.agent.custom; backend.db.getSnapshot(collection, id, fields, snapshotOptions, function(err, snapshot) { if (err) return callback(err); + var transactionOps = []; + if (request.transaction) { + // Get other ops in the same transaction that have not yet been committed to the DB + var transactionOps = request.transaction.pendingOpsUntil(request); + var error = ot.applyOps(snapshot, transactionOps); + if (error) return callback(error); + } + request.snapshot = snapshot; request._addSnapshotMeta(); @@ -141,15 +168,18 @@ SubmitRequest.prototype.submit = function(callback) { return callback(request.newerVersionError()); } + var from = op.v - transactionOps.length; + var to = snapshot.v - transactionOps.length; // Transform the op up to the current snapshot version, then apply - var from = op.v; - backend.db.getOpsToSnapshot(collection, id, from, snapshot, {metadata: true}, function(err, ops) { + backend.db.getOps(collection, id, from, to, {metadata: true}, function(err, ops) { if (err) return callback(err); - if (ops.length !== snapshot.v - from) { + if (ops.length !== to - from) { return callback(request.missingOpsError()); } + for (var o of ops) o.v = o.v + transactionOps.length; + err = request._transformOp(ops); if (err) return callback(err); @@ -225,30 +255,40 @@ SubmitRequest.prototype.commit = function(callback) { request.snapshot, request.options, function(err, succeeded) { + // TODO: Should this callback be called before committing a transaction? if (err) return callback(err); if (!succeeded) { // Between our fetch and our call to commit, another client committed an // operation. We expect this to be relatively infrequent but normal. - return request.retry(callback); - } - if (!request.suppressPublish) { - var op = request.op; - op.c = request.collection; - op.d = request.id; - op.m = undefined; - // Needed for agent to detect if it can ignore sending the op back to - // the client that submitted it in subscriptions - if (request.collection !== request.index) op.i = request.index; - backend.pubsub.publish(request.channels, op); - } - if (request._shouldSaveMilestoneSnapshot(request.snapshot)) { - request.backend.milestoneDb.saveMilestoneSnapshot(request.collection, request.snapshot); + // TODO: What if multiple docs fail and trigger a retry? + if (request.transaction) return request.transaction.retry(callback); + else return request.retry(callback); } + request._succeeded = !!succeeded; + if (request.transaction) request.transaction.update(); + else request.publish(); callback(); }); }); }; +SubmitRequest.prototype.publish = function() { + if (!this.suppressPublish) { + var op = this.op; + op.c = this.collection; + op.d = this.id; + op.m = undefined; + // Needed for agent to detect if it can ignore sending the op back to + // the client that submitted it in subscriptions + if (this.collection !== this.index) op.i = this.index; + this.backend.pubsub.publish(this.channels, op); + } + + if (this._shouldSaveMilestoneSnapshot(this.snapshot)) { + this.backend.milestoneDb.saveMilestoneSnapshot(this.collection, this.snapshot); + } +}; + SubmitRequest.prototype.retry = function(callback) { this.retries++; if (this.maxRetries != null && this.retries > this.maxRetries) { diff --git a/lib/transaction/aborted-transaction.js b/lib/transaction/aborted-transaction.js new file mode 100644 index 000000000..59f582e27 --- /dev/null +++ b/lib/transaction/aborted-transaction.js @@ -0,0 +1,36 @@ +var ShareDBError = require('../error'); + +var ERROR_CODE = ShareDBError.CODES; + +function AbortedTransaction(transaction) { + this._transaction = transaction; + + var self = this; + transaction.backend.db.abortTransaction(this._transaction.id, function(abortError) { + self._abortCallbacks(abortError && abortError.message); + }); +} +module.exports = AbortedTransaction; + +AbortedTransaction.prototype.commit = function() { + this.update(); +}; + +AbortedTransaction.prototype.abort = function() { + this.update(); +}; + +AbortedTransaction.prototype.registerSubmitRequest = function() { + // TODO: Error? +}; + +AbortedTransaction.prototype.update = function() { + this._abortCallbacks(); +}; + +AbortedTransaction.prototype._abortCallbacks = function(message) { + message = message || 'Transaction aborted'; + var error = new ShareDBError(ERROR_CODE.ERR_TRANSACTION_ABORTED, message); + this._transaction._callAndClearCallbacks(error); +}; + diff --git a/lib/transaction/committed-transaction.js b/lib/transaction/committed-transaction.js new file mode 100644 index 000000000..92850492c --- /dev/null +++ b/lib/transaction/committed-transaction.js @@ -0,0 +1,23 @@ +function CommittedTransaction(transaction) { + this._transaction = transaction; + // TODO: Check DB support + transaction.backend.db.commitTransaction(this._transaction.id, function(error) { + transaction._callAndClearCallbacks(error); + }); + // TODO: call request.publish() on transaction requests +} +module.exports = CommittedTransaction; + +CommittedTransaction.prototype.commit = function() { + // TODO: Error? +}; + +CommittedTransaction.prototype.abort = function() { + // TODO: Error? +}; + +CommittedTransaction.prototype.registerSubmitRequest = function() { + // TODO: Error? +}; + +CommittedTransaction.prototype.update = function() {}; diff --git a/lib/transaction/pending-transaction.js b/lib/transaction/pending-transaction.js new file mode 100644 index 000000000..c438b04dd --- /dev/null +++ b/lib/transaction/pending-transaction.js @@ -0,0 +1,69 @@ +const AbortedTransaction = require("./aborted-transaction"); +const CommittedTransaction = require("./committed-transaction"); + +function PendingTransaction(transaction) { + this._transaction = transaction; + this._wantsCommit = false; + this._retryCallbacks = null; +} +module.exports = PendingTransaction; + +PendingTransaction.prototype.commit = function() { + this._wantsCommit = true; + this.update(); +}; + +PendingTransaction.prototype.abort = function() { + this._transaction._state = new AbortedTransaction(this._transaction); +}; + +PendingTransaction.prototype.registerSubmitRequest = function(request) { + if (!this._transaction._requests.length) { + this._transaction.backend.db.startTransaction(this._transaction.id, function(error) { + // TODO: Handle errors / wait for callback + }); + } + this._transaction._requests.push(request); + this.update(); +}; + +PendingTransaction.prototype.retry = function(callback) { + if (Array.isArray(this._retryCallbacks)) return this._retryCallbacks.push(callback); + this._retryCallbacks = [callback]; + + var state = this; + var cb = function (error) { + var callbacks = state._retryCallbacks; + state._retryCallbacks = null; + for (var callback of callbacks) callback(error); + } + + var transaction = this._transaction; + var db = transaction.backend.db; + db.abortTransaction(this._transaction.id, function(error) { + if (error) return cb(error); + db.restartTransaction(transaction.id, function(error) { + if (error) return cb(error); + var requests = transaction._requests.slice(); + var retryNext = function(error) { + if (error) return cb(error); + var request = requests.shift(); + if (!request) return cb(); + request.retry(retryNext); + } + retryNext(); + }); + }); +}; + +PendingTransaction.prototype.update = function() { + if (!this._shouldCommit()) return; + this._transaction._state = new CommittedTransaction(this._transaction); +}; + +PendingTransaction.prototype._shouldCommit = function() { + if (!this._wantsCommit) return false; + return this._transaction._requests.every(function(request) { + return request._succeeded; + }); +}; diff --git a/lib/transaction/transaction.js b/lib/transaction/transaction.js new file mode 100644 index 000000000..0eff17d3e --- /dev/null +++ b/lib/transaction/transaction.js @@ -0,0 +1,55 @@ +var PendingTransaction = require('./pending-transaction'); + +function Transaction(agent, id) { + this.agent = agent; + this.backend = agent.backend; + this.id = id; + + this._state = new PendingTransaction(this); + this._requests = []; + this._callbacks = []; +} +module.exports = Transaction; + +Transaction.prototype.commit = function(callback) { + this._callbacks.push(callback); + this._state.commit(); +}; + +Transaction.prototype.abort = function(callback) { + this._callbacks.push(callback); + this._state.abort(); +}; + +Transaction.prototype.registerSubmitRequest = function(request) { + this._state.registerSubmitRequest(request); +}; + +Transaction.prototype.update = function() { + this._state.update(); +}; + +Transaction.prototype.retry = function(callback) { + this._state.retry(callback); +}; + +Transaction.prototype.pendingOpsUntil = function(untilRequest) { + var collection = untilRequest.collection; + var id = untilRequest.id; + var ops = []; + for (var request of this._requests) { + if (request === untilRequest) break; + if (request.collection === collection && request.id === id) { + ops.push(request.op); + } + } + return ops; +}; + +Transaction.prototype._callAndClearCallbacks = function(error) { + var callbacks = this._callbacks; + this._callbacks = []; + for (var callback of callbacks) { + if (typeof callback === 'function') callback(error); + } +}; diff --git a/test/client/transaction.js b/test/client/transaction.js new file mode 100644 index 000000000..a2516d601 --- /dev/null +++ b/test/client/transaction.js @@ -0,0 +1,219 @@ +var async = require('async'); +var expect = require('chai').expect; + +var idCounter = 0; + +module.exports = function() { + describe.only('transaction', function() { + var backend; + var connection; + + beforeEach(function() { + backend = this.backend; + connection = backend.connect(); + }); + + describe('single Doc', function() { + var id; + var doc; + var remoteDoc; + var transaction; + + beforeEach(function() { + id = (idCounter++).toString(); + doc = connection.get('dogs', id); + remoteDoc = backend.connect().get('dogs', id); + transaction = connection.startTransaction(); + + // TODO: Discuss if this is an acceptable API? Doc will always emit error on + // a failed transaction, since the ops may have been successfully acked for this Doc, and + // we force a hard rollback with no callback, which causes an 'error' event + doc.on('error', function() {}); + }); + + it('commits two ops as a transaction', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), + doc.submitOp.bind(doc, [{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({ + name: 'Gaspode', + age: 3, + tricks: ['fetch'] + }); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('does not commit the first op if the second op fails', function(done) { + backend.use('commit', function(request, next) { + if (!request.snapshot.data.tricks) return next(); + next(new Error('fail')); + }); + + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + doc.submitOp.bind(doc, [{p: ['age'], oi: 3}], {transaction: transaction}), + function(next) { + doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {transaction: transaction}, function(error) { + expect(error.message).to.equal('fail'); + }); + doc.once('load', next); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('deletes and creates as part of a transaction', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + doc.del.bind(doc, {transaction: transaction}), + doc.create.bind(doc, {name: 'Recreated'}, 'json0', {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function (next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Recreated'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('does not delete if the following create fails', function(done) { + async.series([ + doc.create.bind(doc, {name: 'Gaspode'}), + function(next) { + backend.use('commit', function(request, next) { + var error = request.op.create ? new Error('Create not allowed') : null; + next(error); + }); + next(); + }, + doc.del.bind(doc, {transaction: transaction}), + function(next) { + doc.create({name: 'Recreated'}, 'json0', {transaction: transaction}, function(error) { + expect(error.message).to.equal('Create not allowed'); + }); + doc.once('load', next); + }, + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({name: 'Gaspode'}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('transaction is behind remote', function(done) { + async.series([ + doc.create.bind(doc, {tricks: ['fetch']}), + remoteDoc.fetch.bind(remoteDoc), + remoteDoc.submitOp.bind(remoteDoc, [{p: ['tricks', 0], ld: 'fetch'}]), + doc.submitOp.bind(doc, [{p: ['tricks', 1], li: 'sit'}], {transaction: transaction}), + doc.submitOp.bind(doc, [{p: ['tricks', 2], li: 'shake'}], {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: []}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: ['sit', 'shake']}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + + it('remote submits after but commits first', function(done) { + async.series([ + doc.create.bind(doc, {tricks: ['fetch']}), + remoteDoc.fetch.bind(remoteDoc), + doc.submitOp.bind(doc, [{p: ['tricks', 1], li: 'sit'}], {transaction: transaction}), + remoteDoc.submitOp.bind(remoteDoc, [{p: ['tricks', 0], ld: 'fetch'}]), + doc.submitOp.bind(doc, [{p: ['tricks', 2], li: 'shake'}], {transaction: transaction}), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: []}); + next(); + }, + transaction.commit.bind(transaction), + remoteDoc.fetch.bind(remoteDoc), + function(next) { + expect(remoteDoc.data).to.eql({tricks: ['sit', 'shake']}); + expect(doc.data).to.eql(remoteDoc.data); + next(); + } + ], done); + }); + }); + + describe('multiple Docs', function() { + it('rolls back multiple Docs if one commit fails', function(done) { + var id1 = (idCounter++).toString(); + var id2 = (idCounter++).toString(); + var doc1 = connection.get('dogs', id1); + var doc2 = connection.get('dogs', id2); + var remoteDoc1 = backend.connect().get('dogs', id1); + var remoteDoc2 = backend.connect().get('dogs', id2); + + var transaction = connection.startTransaction(); + + // Doc1 will throw even though its op is accepted, since the + // whole transaction is rejected + doc1.on('error', function() {}); + doc2.on('error', function() {}); + + async.series([ + doc1.create.bind(doc1, {name: 'Gaspode'}), + doc2.create.bind(doc2, {name: 'Snoopy'}), + function(next) { + backend.use('commit', function(request, next) { + var error = request.id === id2 ? new Error('fail') : null; + next(error); + }); + next(); + }, + doc1.submitOp.bind(doc1, [{p: ['age'], oi: 3}], {transaction: transaction}), + function(next) { + doc2.submitOp([{p: ['age'], oi: 4}], {transaction: transaction}, function(error) { + expect(error.message).to.equal('fail'); + }); + doc2.once('load', next); + }, + remoteDoc1.fetch.bind(remoteDoc1), + remoteDoc2.fetch.bind(remoteDoc2), + function(next) { + expect(remoteDoc1.data).to.eql({name: 'Gaspode'}); + expect(remoteDoc2.data).to.eql({name: 'Snoopy'}); + expect(doc1.data).to.eql(remoteDoc1.data); + expect(doc2.data).to.eql(remoteDoc2.data); + next(); + } + ], done); + }); + }); + }); +} diff --git a/test/db-memory.js b/test/db-memory.js index 8c4774418..2a6febc58 100644 --- a/test/db-memory.js +++ b/test/db-memory.js @@ -69,7 +69,8 @@ describe('MemoryDB', function() { }, getQuery: function(options) { return {filter: options.query, sort: options.sort}; - } + }, + transactions: true }); describe('deleteOps', function() { diff --git a/test/db.js b/test/db.js index 5fc8a293d..16135a0e6 100644 --- a/test/db.js +++ b/test/db.js @@ -64,6 +64,10 @@ module.exports = function(options) { require('./client/projections')({getQuery: options.getQuery}); } + if (options.transactions) { + require('./client/transaction')(); + } + require('./client/submit')(); require('./client/submit-json1')(); require('./client/subscribe')();