-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transactions #689
base: master
Are you sure you want to change the base?
Transactions #689
Changes from all commits
28f5035
68f3de8
ca7607c
1ec0371
b064753
fcbbd3f
0aa4adc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
var emitter = require('../emitter'); | ||
|
||
var idCounter = 1; | ||
|
||
module.exports = Transaction; | ||
function Transaction(connection) { | ||
emitter.EventEmitter.call(this); | ||
|
||
this.connection = connection; | ||
this.id = (idCounter++).toString(); | ||
|
||
this._callbacks = []; | ||
this._writeable = true; | ||
} | ||
emitter.mixin(Transaction); | ||
|
||
Transaction.prototype.commit = function(callback) { | ||
// TODO: Catch multiple calls | ||
// TODO: Handle network changes | ||
this._callbacks.push(callback); | ||
this._writeable = false; | ||
this.connection._commitTransaction(this); | ||
}; | ||
|
||
Transaction.prototype.abort = function(callback) { | ||
this._callbacks.push(callback); | ||
this._writeable = false; | ||
}; | ||
|
||
Transaction.prototype._handleCommit = function(error) { | ||
this._writeable = false; | ||
// TODO: Handle callbacks | ||
if (error) this._localAbort(error); | ||
else this.emit('commit'); | ||
|
||
var callbacks = this._callbacks; | ||
this._callbacks = []; | ||
if (!callbacks.length) this.emit('error', error); | ||
for (var callback of callbacks) callback(error); | ||
}; | ||
|
||
Transaction.prototype._localAbort = function(error) { | ||
this._writeable = false; | ||
this.emit('abort', error); | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,8 @@ function MemoryDB(options) { | |
this.ops = Object.create(null); | ||
|
||
this.closed = false; | ||
|
||
this._transactions = Object.create(null); | ||
}; | ||
module.exports = MemoryDB; | ||
|
||
|
@@ -39,20 +41,35 @@ MemoryDB.prototype.close = function(callback) { | |
// callback(err, succeeded) | ||
MemoryDB.prototype.commit = function(collection, id, op, snapshot, options, callback) { | ||
var db = this; | ||
options = options || {}; | ||
if (typeof callback !== 'function') throw new Error('Callback required'); | ||
util.nextTick(function() { | ||
var version = db._getVersionSync(collection, id); | ||
|
||
var transaction; | ||
if (options.transaction) { | ||
transaction = db._transactions[options.transaction]; | ||
if (!transaction) return callback(null, false); | ||
version = version + transaction.commits.length; | ||
} | ||
|
||
if (snapshot.v !== version + 1) { | ||
var succeeded = false; | ||
return callback(null, succeeded); | ||
} | ||
var err = db._writeOpSync(collection, id, op); | ||
if (err) return callback(err); | ||
err = db._writeSnapshotSync(collection, id, snapshot); | ||
if (err) return callback(err); | ||
|
||
var succeeded = true; | ||
callback(null, succeeded); | ||
var commit = function() { | ||
var err = db._writeOpSync(collection, id, op); | ||
if (err) return err; | ||
err = db._writeSnapshotSync(collection, id, snapshot); | ||
if (err) return err; | ||
}; | ||
|
||
var error; | ||
if (!transaction) error = commit(); | ||
callback(error, !error); | ||
|
||
if (transaction) transaction.commits.push(commit); | ||
}); | ||
}; | ||
|
||
|
@@ -133,6 +150,47 @@ MemoryDB.prototype.query = function(collection, query, fields, options, callback | |
}); | ||
}; | ||
|
||
MemoryDB.prototype.startTransaction = function(id, callback) { | ||
if (this._transactions[id]) callback(new Error('Transaction already started')); | ||
this._transactions[id] = this._transactions[id] || {}; | ||
this._transactions[id].commits = []; | ||
callback(); | ||
}; | ||
|
||
MemoryDB.prototype.restartTransaction = function(id, callback) { | ||
delete this._transactions[id]; | ||
this.startTransaction(id, callback); | ||
}; | ||
|
||
MemoryDB.prototype.commitTransaction = function(id, callback) { | ||
var transaction = this._transactions[id]; | ||
if (!transaction) return callback(); | ||
|
||
delete this._transactions[id]; | ||
|
||
// Heavy-handed but effective approach to synchronous transaction writing: | ||
// let's just save the DB state before and restore if the transaction fails | ||
var docs = JSON.stringify(this.docs); | ||
var ops = JSON.stringify(this.ops); | ||
Comment on lines
+173
to
+174
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know if this is technically breaking — we'd clobber any references to |
||
|
||
for (var commit of transaction.commits) { | ||
var error = commit(); | ||
|
||
if (error) { | ||
this.docs = JSON.parse(docs); | ||
this.ops = JSON.parse(ops); | ||
Comment on lines
+180
to
+181
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With potentially large sets of docs/ops in tests, using a structured clone would be better for performance/memory. You mentioned rfdc is working well for you in other situations, and I've looked at in the past and it seems good. |
||
break; | ||
} | ||
} | ||
|
||
callback(error, !error); | ||
}; | ||
|
||
MemoryDB.prototype.abortTransaction = function(id, callback) { | ||
delete this._transactions[id]; | ||
callback(null); | ||
}; | ||
|
||
// For testing, it may be useful to implement the desired query | ||
// language by defining this function. Returns an object with | ||
// two properties: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I really like how much the
Doc
"knows" about the transaction concept, but maybe this is a necessary evil given that we have to pass a transaction as part of the op submit?I'd toyed with having some sort of
class TransactionConnection extends Connection {}
sort of architecture, where you get a "special" connection that has its ownDoc
s and everything in there is assumed to be part of a transaction. It seemed overly-complicated, though, and passing atransaction
as part of the submission seemed a bit more flexible. It has its own downsides, though — aDoc
that is part of a transaction cannot be part of another transaction (or non-transaction); the workaround is to create a secondConnection
, which would give you similar behaviour to the extended class approach (which is also why it feels a bit unnecessary).