Skip to content

Commit

Permalink
Merge PR #386 'multiple-attaches'
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonWoolf committed Mar 13, 2017
2 parents 44c63f7 + 9cf0340 commit b1f55b1
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 36 deletions.
12 changes: 4 additions & 8 deletions common/lib/client/realtime.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,7 @@ var Realtime = (function() {
this.realtime = realtime;
this.all = {};
this.inProgress = {};
var self = this;
realtime.connection.connectionManager.on('transport.active', function() {
/* nextTick to allow connectionManager to set the connection state to 'connected' if necessary */
Utils.nextTick(function() {
self.onTransportActive();
});
});
realtime.connection.connectionManager.on('transport.active', this.onTransportActive.bind(this));
}
Utils.inherits(Channels, EventEmitter);

Expand Down Expand Up @@ -71,7 +65,9 @@ var Realtime = (function() {
Channels.prototype.reattach = function(reason) {
for(var channelId in this.all) {
var channel = this.all[channelId];
if(channel.state === 'attaching' || channel.state === 'attached') {
/* NB this should not trigger for merely attaching channels, as they will
* be reattached anyway through the onTransportActive checkPendingState */
if(channel.state === 'attached') {
channel.requestState('attaching', reason);
}
}
Expand Down
11 changes: 8 additions & 3 deletions common/lib/client/realtimechannel.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ var RealtimeChannel = (function() {

/* send sync request */
var syncMessage = ProtocolMessage.fromValues({action: actions.SYNC, channel: this.name});
syncMessage.channelSerial = this.syncChannelSerial;
if(this.syncChannelSerial) {
syncMessage.channelSerial = this.syncChannelSerial;
}
connectionManager.send(syncMessage);
};

Expand Down Expand Up @@ -476,8 +478,11 @@ var RealtimeChannel = (function() {

RealtimeChannel.prototype.checkPendingState = function() {
/* if can't send events, do nothing */
if(!this.connectionManager.state.sendEvents) {
Logger.logAction(Logger.LOG_MINOR, 'RealtimeChannel.checkPendingState', 'not connected');
var cmState = this.connectionManager.state;
/* Allow attach messages to queue up when synchronizing, since this will be
* the state we'll be in when upgrade transport.active triggers a checkpendingstate */
if(!(cmState.sendEvents || cmState.forceQueueEvents)) {
Logger.logAction(Logger.LOG_MINOR, 'RealtimeChannel.checkPendingState', 'sendEvents is false; state is ' + this.connectionManager.state.state);
return;
}

Expand Down
1 change: 1 addition & 0 deletions common/lib/client/realtimepresence.js
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ var RealtimePresence = (function() {
/* RTP5c2: re-enter our own members if they haven't shown up in the sync */
this._ensureMyMembersPresent();
this.channel.setInProgress(RealtimeChannel.progressOps.sync, false);
this.channel.syncChannelSerial = null;
}

/* broadcast to listeners */
Expand Down
37 changes: 20 additions & 17 deletions common/lib/transport/connectionmanager.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var ConnectionManager = (function() {
initialized: {state: 'initialized', terminal: false, queueEvents: true, sendEvents: false, failState: 'disconnected'},
connecting: {state: 'connecting', terminal: false, queueEvents: true, sendEvents: false, retryDelay: connectingTimeout, failState: 'disconnected'},
connected: {state: 'connected', terminal: false, queueEvents: false, sendEvents: true, failState: 'disconnected'},
synchronizing: {state: 'connected', terminal: false, queueEvents: true, sendEvents: false, failState: 'disconnected'},
synchronizing: {state: 'connected', terminal: false, queueEvents: true, sendEvents: false, forceQueueEvents: true, failState: 'disconnected'},
disconnected: {state: 'disconnected', terminal: false, queueEvents: true, sendEvents: false, retryDelay: timeouts.disconnectedRetryTimeout, failState: 'disconnected'},
suspended: {state: 'suspended', terminal: false, queueEvents: false, sendEvents: false, retryDelay: timeouts.suspendedRetryTimeout, failState: 'suspended'},
closing: {state: 'closing', terminal: false, queueEvents: false, sendEvents: false, retryDelay: timeouts.realtimeRequestTimeout, failState: 'closed'},
Expand Down Expand Up @@ -505,8 +505,6 @@ var ConnectionManager = (function() {
});
})

this.emit('transport.active', transport, connectionKey, transport.params);

/* If previously not connected, notify the state change (including any
* error). */
if(existingState.state === this.states.connected.state) {
Expand All @@ -522,6 +520,10 @@ var ConnectionManager = (function() {
this.errorReason = this.realtime.connection.errorReason = error || null;
}

/* Send after the connection state update, as Channels hooks into this to
* resend attaches on a new transport if necessary */
this.emit('transport.active', transport, connectionKey, transport.params);

/* Gracefully terminate existing protocol */
if(existingActiveProtocol) {
if(existingActiveProtocol.messageQueue.count() > 0) {
Expand Down Expand Up @@ -649,8 +651,10 @@ var ConnectionManager = (function() {
if(this.connectionId && this.connectionId !== connectionId) {
Logger.logAction(Logger.LOG_MINOR, 'ConnectionManager.setConnection()', 'connectionId has changed; resetting msgSerial and reattaching channels');
this.msgSerial = 0;
/* Wait till next tick before reattaching channels so that connection
* state will be updated */
/* Wait till next tick before reattaching channels, so that connection
* state will be updated and so that it will be applied after
* Channels#onTransportUpdate, else channels will not have an ATTACHED
* sent twice (once from this and once from that). */
Utils.nextTick(function() {
self.realtime.channels.reattach();
});
Expand Down Expand Up @@ -1264,7 +1268,7 @@ var ConnectionManager = (function() {
* event queueing
******************/

ConnectionManager.prototype.send = function(msg, queueEvents, callback) {
ConnectionManager.prototype.send = function(msg, queueEvent, callback) {
callback = callback || noop;
var state = this.state;

Expand All @@ -1273,18 +1277,17 @@ var ConnectionManager = (function() {
this.sendImpl(new PendingMessage(msg, callback));
return;
}
if(state.queueEvents) {
if(state == this.states.synchronizing || queueEvents) {
if (Logger.shouldLog(Logger.LOG_MICRO)) {
Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.send()', 'queueing msg; ' + ProtocolMessage.stringify(msg));
}
this.queue(msg, callback);
} else {
var err = 'rejecting event as queueMessages was disabled; state = ' + state.state;
Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.send()', err);
callback(this.errorReason || new ErrorInfo(err, 90000, 400));
}
var shouldQueue = (queueEvent && state.queueEvents) || state.forceQueueEvents;
if(!shouldQueue) {
var err = 'rejecting event, queueEvent was ' + queueEvent + ', state was ' + state.state;
Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.send()', err);
callback(this.errorReason || new ErrorInfo(err, 90000, 400));
return;
}
if(Logger.shouldLog(Logger.LOG_MICRO)) {
Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.send()', 'queueing msg; ' + ProtocolMessage.stringify(msg));
}
this.queue(msg, callback);
};

ConnectionManager.prototype.sendImpl = function(pendingMessage) {
Expand Down
2 changes: 1 addition & 1 deletion spec/realtime/channel.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) {
channel.attach();
channel.whenState('attached', function() {
firedImmediately = true;
test.ok(channel.state === 'attached', 'whenState fired when attached');
test.equal(channel.state, 'attached', 'whenState fired when attached');
closeAndFinish(test, realtime);
});
test.ok(!firedImmediately, 'whenState should not fire immediately as not attached');
Expand Down
7 changes: 4 additions & 3 deletions spec/realtime/connection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) {
channel = realtime.channels.get('connectionQueuing'),
connectionManager = realtime.connection.connectionManager;

connectionManager.once('transport.active', function(transport) {
realtime.connection.once('connected', function() {
var transport = connectionManager.activeProtocol.transport;
channel.attach(function(err) {
if(err) {
test.ok(false, 'Attach failed with error: ' + helper.displayError(err));
Expand All @@ -160,7 +161,7 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) {
},
function(cb) {
/* After the disconnect, on reconnect, spy on transport.send again */
connectionManager.once('transport.active', function(transport) {
connectionManager.once('transport.pending', function(transport) {
var oldSend = transport.send;

transport.send = function(msg, msgCb) {
Expand All @@ -169,7 +170,7 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) {
test.equal(msg.msgSerial, 0, 'Expect msgSerial of original message to still be 0');
test.equal(msg.messages.length, 1, 'Expect second message to not have been merged with the attempted message');
} else if(msg.messages[0].name === 'second') {
test.equal(msg.msgSerial, 1, 'Expect msgSerial of new message to be 0');
test.equal(msg.msgSerial, 1, 'Expect msgSerial of new message to be 1');
cb();
}
}
Expand Down
2 changes: 1 addition & 1 deletion spec/realtime/presence.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) {
* set but missing from a sync */
exports.leave_published_for_member_missing_from_sync = function(test) {
test.expect(6);
var realtime = helper.AblyRealtime(),
var realtime = helper.AblyRealtime({transports: helper.availableTransports}),
continuousClientId = 'continuous',
goneClientId = 'gone',
continuousRealtime = helper.AblyRealtime({clientId: continuousClientId}),
Expand Down
6 changes: 3 additions & 3 deletions spec/realtime/upgrade.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,9 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) {

realtime.connection.once('connected', function() {
channel.attach(function(err){
test.ok(err.code, 90000, 'Check error code for channel attach timing out');
test.ok(channel.state, 'suspended', 'Check channel goes into suspended state');
test.ok(realtime.connection.state, 'connected', 'Check connection state is still connected');
test.equal(err.code, 90007, 'Check error code for channel attach timing out');
test.equal(channel.state, 'suspended', 'Check channel goes into suspended state');
test.equal(realtime.connection.state, 'connected', 'Check connection state is still connected');
channel.attach(function(err){
test.ok(!err, 'Check a second attach works fine');
closeAndFinish(test, realtime)
Expand Down

0 comments on commit b1f55b1

Please sign in to comment.