Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactions #689

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ function Agent(backend, stream) {
this._firstReceivedMessage = null;
this._handshakeReceived = false;

this._transactions = Object.create(null);

// Send the legacy message to initialize old clients with the random agent Id
this.send(this._initMessage(ACTIONS.initLegacy));
}
Expand Down Expand Up @@ -470,7 +472,7 @@ Agent.prototype._handleMessage = function(request, callback) {
));
}
if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message'));
return this._submit(request.c, request.d, op, callback);
return this._submit(request.c, request.d, op, request.t, callback);
case ACTIONS.snapshotFetch:
return this._fetchSnapshot(request.c, request.d, request.v, callback);
case ACTIONS.snapshotFetchByTimestamp:
Expand All @@ -494,6 +496,8 @@ Agent.prototype._handleMessage = function(request, callback) {
return this._requestPresence(request.ch, callback);
case ACTIONS.pingPong:
return this._pingPong(callback);
case ACTIONS.transactionCommit:
return this._commitTransaction(request, callback);
default:
callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message'));
}
Expand Down Expand Up @@ -761,9 +765,12 @@ Agent.prototype._unsubscribeBulk = function(collection, ids, callback) {
util.nextTick(callback);
};

Agent.prototype._submit = function(collection, id, op, callback) {
Agent.prototype._submit = function(collection, id, op, transactionId, callback) {
var agent = this;
this.backend.submit(this, collection, id, op, null, function(err, ops, request) {
var options = {
transaction: transactionId
};
this.backend.submit(this, collection, id, op, options, function(err, ops, request) {
// Message to acknowledge the op was successfully submitted
var ack = {src: op.src, seq: op.seq, v: op.v};
if (request._fixupOps.length) ack[ACTIONS.fixup] = request._fixupOps;
Expand Down Expand Up @@ -993,6 +1000,16 @@ function createClientOp(request, clientId) {
undefined;
}

Agent.prototype._commitTransaction = function(request, callback) {
var transaction = this._transactions[request.id];
transaction.commit(callback);
};

Agent.prototype._abortTransaction = function(request, callback) {
var transaction = this._transactions[request.id];
transaction.abort(callback);
};

function shallowCopy(object) {
var out = {};
for (var key in object) {
Expand Down
26 changes: 26 additions & 0 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var util = require('../util');
var logger = require('../logger');
var DocPresenceEmitter = require('./presence/doc-presence-emitter');
var protocol = require('../protocol');
var Transaction = require('./transaction');

var ERROR_CODE = ShareDBError.CODES;

Expand Down Expand Up @@ -64,6 +65,8 @@ function Connection(socket) {
// A unique message number for presence
this._presenceSeq = 1;

this._transactions = Object.create(null);

// Equals agent.src on the server
this.id = null;

Expand Down Expand Up @@ -258,6 +261,8 @@ Connection.prototype.handleMessage = function(message) {
return this._handlePresenceRequest(err, message);
case ACTIONS.pingPong:
return this._handlePingPong(err);
case ACTIONS.transactionCommit:
return this._handleTransactionCommit(err, message);

default:
logger.warn('Ignoring unrecognized message', message);
Expand Down Expand Up @@ -467,6 +472,7 @@ Connection.prototype.sendOp = function(doc, op) {
if (op.create) message.create = op.create;
if (op.del) message.del = op.del;
if (doc.submitSource) message.x.source = op.source;
if (op.transaction) message.t = op.transaction;
this.send(message);
};

Expand Down Expand Up @@ -782,6 +788,26 @@ Connection.prototype._initialize = function(message) {
this._setState('connected');
};

Connection.prototype.startTransaction = function() {
var transaction = new Transaction(this);
return this._transactions[transaction.id] = transaction;
};

Connection.prototype._commitTransaction = function(transaction) {
this.send({a: ACTIONS.transactionCommit, id: transaction.id});
};

Connection.prototype._abortTransaction = function(transaction) {
this.send({a: ACTIONS.transactionAbort, id: transaction.id});
};

Connection.prototype._handleTransactionCommit = function(error, message) {
var transaction = this._transactions[message.id];
if (!transaction) return;
transaction._handleCommit(error, message);
delete this._transactions[message.id];
};

Connection.prototype.getPresence = function(channel) {
var connection = this;
var presence = util.digOrCreate(this._presences, channel, function() {
Expand Down
35 changes: 34 additions & 1 deletion lib/client/doc.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ function Doc(connection, collection, id) {
// Internal counter that gets incremented every time doc.data is updated.
// Used as a cheap way to check if doc.data has changed.
this._dataStateVersion = 0;

this._transaction = null;
}
emitter.mixin(Doc);

Expand Down Expand Up @@ -337,6 +339,9 @@ Doc.prototype._handleOp = function(err, message) {
// the remote, so that we're in a nice consistent state
return this.fetch(this._clearInflightOp.bind(this));
}
if (this._transaction) {
return this._transaction._localAbort(err);
}
if (this.inflightOp) {
return this._rollback(err);
}
Expand Down Expand Up @@ -763,6 +768,7 @@ Doc.prototype._submit = function(op, source, callback) {
this._pushOp(op, source, callback);
this._otApply(op, source);
} catch (error) {
// TODO: Abort transaction
return this._hardRollback(error);
}

Expand Down Expand Up @@ -865,8 +871,10 @@ Doc.prototype.submitOp = function(component, options, callback) {
callback = options;
options = null;
}
options = options || {};
var op = {op: component};
var source = options && options.source;
this._setTransaction(op, options.transaction);
var source = options.source;
this._submit(op, source, callback);
};

Expand All @@ -888,6 +896,7 @@ Doc.prototype.create = function(data, type, options, callback) {
callback = options;
options = null;
}
options = options || {};
if (!type) {
type = types.defaultType.uri;
}
Expand All @@ -897,6 +906,7 @@ Doc.prototype.create = function(data, type, options, callback) {
return this.emit('error', err);
}
var op = {create: {type: type, data: data}};
this._setTransaction(op, options.transaction);
var source = options && options.source;
this._submit(op, source, callback);
};
Expand All @@ -913,12 +923,14 @@ Doc.prototype.del = function(options, callback) {
callback = options;
options = null;
}
options = options || {};
if (!this.type) {
var err = new ShareDBError(ERROR_CODE.ERR_DOC_DOES_NOT_EXIST, 'Document does not exist');
if (callback) return callback(err);
return this.emit('error', err);
}
var op = {del: true};
this._setTransaction(op, options.transaction);
var source = options && options.source;
this._submit(op, source, callback);
};
Expand Down Expand Up @@ -1106,4 +1118,25 @@ Doc.prototype._clearInflightOp = function(err) {
if (err && !called) return this.emit('error', err);
};

Doc.prototype._setTransaction = function(op, transaction) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I really like how much the Doc "knows" about the transaction concept, but maybe this is a necessary evil given that we have to pass a transaction as part of the op submit?

I'd toyed with having some sort of class TransactionConnection extends Connection {} sort of architecture, where you get a "special" connection that has its own Docs and everything in there is assumed to be part of a transaction. It seemed overly-complicated, though, and passing a transaction as part of the submission seemed a bit more flexible. It has its own downsides, though — a Doc that is part of a transaction cannot be part of another transaction (or non-transaction); the workaround is to create a second Connection, which would give you similar behaviour to the extended class approach (which is also why it feels a bit unnecessary).

if (transaction && typeof transaction === 'object') transaction = transaction.id;
if (transaction) {
op.transaction = transaction;

if (this._transaction) {
if (this._transaction.id === transaction) return;
throw new Error('Transaction already in progress. Commit transaction before starting a new one.');
}
this._transaction = this.connection._transactions[transaction];
if (!this._transaction) {
throw new Error('Transaction not started');
}
if (!this._transaction._writeable) {
throw new Error('Transaction is no longer writeable');
}
// TODO: Tidy up listener on commit/abort
this._transaction.on('abort', this._hardRollback.bind(this));
} else if (this._transaction) {
throw new Error('Transaction in progress. Commit transaction before submitting ops outside transaction.');
}
};
45 changes: 45 additions & 0 deletions lib/client/transaction.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
var emitter = require('../emitter');

var idCounter = 1;

module.exports = Transaction;
function Transaction(connection) {
emitter.EventEmitter.call(this);

this.connection = connection;
this.id = (idCounter++).toString();

this._callbacks = [];
this._writeable = true;
}
emitter.mixin(Transaction);

Transaction.prototype.commit = function(callback) {
// TODO: Catch multiple calls
// TODO: Handle network changes
this._callbacks.push(callback);
this._writeable = false;
this.connection._commitTransaction(this);
};

Transaction.prototype.abort = function(callback) {
this._callbacks.push(callback);
this._writeable = false;
};

Transaction.prototype._handleCommit = function(error) {
this._writeable = false;
// TODO: Handle callbacks
if (error) this._localAbort(error);
else this.emit('commit');

var callbacks = this._callbacks;
this._callbacks = [];
if (!callbacks.length) this.emit('error', error);
for (var callback of callbacks) callback(error);
};

Transaction.prototype._localAbort = function(error) {
this._writeable = false;
this.emit('abort', error);
};
70 changes: 64 additions & 6 deletions lib/db/memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ function MemoryDB(options) {
this.ops = Object.create(null);

this.closed = false;

this._transactions = Object.create(null);
};
module.exports = MemoryDB;

Expand All @@ -39,20 +41,35 @@ MemoryDB.prototype.close = function(callback) {
// callback(err, succeeded)
MemoryDB.prototype.commit = function(collection, id, op, snapshot, options, callback) {
var db = this;
options = options || {};
if (typeof callback !== 'function') throw new Error('Callback required');
util.nextTick(function() {
var version = db._getVersionSync(collection, id);

var transaction;
if (options.transaction) {
transaction = db._transactions[options.transaction];
if (!transaction) return callback(null, false);
version = version + transaction.commits.length;
}

if (snapshot.v !== version + 1) {
var succeeded = false;
return callback(null, succeeded);
}
var err = db._writeOpSync(collection, id, op);
if (err) return callback(err);
err = db._writeSnapshotSync(collection, id, snapshot);
if (err) return callback(err);

var succeeded = true;
callback(null, succeeded);
var commit = function() {
var err = db._writeOpSync(collection, id, op);
if (err) return err;
err = db._writeSnapshotSync(collection, id, snapshot);
if (err) return err;
};

var error;
if (!transaction) error = commit();
callback(error, !error);

if (transaction) transaction.commits.push(commit);
});
};

Expand Down Expand Up @@ -133,6 +150,47 @@ MemoryDB.prototype.query = function(collection, query, fields, options, callback
});
};

MemoryDB.prototype.startTransaction = function(id, callback) {
if (this._transactions[id]) callback(new Error('Transaction already started'));
this._transactions[id] = this._transactions[id] || {};
this._transactions[id].commits = [];
callback();
};

MemoryDB.prototype.restartTransaction = function(id, callback) {
delete this._transactions[id];
this.startTransaction(id, callback);
};

MemoryDB.prototype.commitTransaction = function(id, callback) {
var transaction = this._transactions[id];
if (!transaction) return callback();

delete this._transactions[id];

// Heavy-handed but effective approach to synchronous transaction writing:
// let's just save the DB state before and restore if the transaction fails
var docs = JSON.stringify(this.docs);
var ops = JSON.stringify(this.ops);
Comment on lines +173 to +174
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this is technically breaking — we'd clobber any references to db.docs or db.ops by doing this, but I think I consider these object private implementation details of this in-memory DB?


for (var commit of transaction.commits) {
var error = commit();

if (error) {
this.docs = JSON.parse(docs);
this.ops = JSON.parse(ops);
Comment on lines +180 to +181
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With potentially large sets of docs/ops in tests, using a structured clone would be better for performance/memory. You mentioned rfdc is working well for you in other situations, and I've looked at in the past and it seems good.

break;
}
}

callback(error, !error);
};

MemoryDB.prototype.abortTransaction = function(id, callback) {
delete this._transactions[id];
callback(null);
};

// For testing, it may be useful to implement the desired query
// language by defining this function. Returns an object with
// two properties:
Expand Down
1 change: 1 addition & 0 deletions lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ ShareDBError.CODES = {
*/
ERR_SNAPSHOT_READS_REJECTED: 'ERR_SNAPSHOT_READS_REJECTED',
ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND: 'ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND',
ERR_TRANSACTION_ABORTED: 'ERR_TRANSACTION_ABORTED',
ERR_TYPE_CANNOT_BE_PROJECTED: 'ERR_TYPE_CANNOT_BE_PROJECTED',
ERR_TYPE_DOES_NOT_SUPPORT_COMPOSE: 'ERR_TYPE_DOES_NOT_SUPPORT_COMPOSE',
ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE: 'ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE',
Expand Down
2 changes: 2 additions & 0 deletions lib/message-actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ exports.ACTIONS = {
op: 'op',
snapshotFetch: 'nf',
snapshotFetchByTimestamp: 'nt',
transactionAbort: 'ta',
transactionCommit: 'tc',
pingPong: 'pp',
presence: 'p',
presenceSubscribe: 'ps',
Expand Down
Loading
Loading