diff --git a/common/lib/client/realtime.js b/common/lib/client/realtime.js index badae36bd7..694c20a159 100644 --- a/common/lib/client/realtime.js +++ b/common/lib/client/realtime.js @@ -29,13 +29,7 @@ var Realtime = (function() { this.realtime = realtime; this.all = {}; this.inProgress = {}; - var self = this; - realtime.connection.connectionManager.on('transport.active', function() { - /* nextTick to allow connectionManager to set the connection state to 'connected' if necessary */ - Utils.nextTick(function() { - self.onTransportActive(); - }); - }); + realtime.connection.connectionManager.on('transport.active', this.onTransportActive.bind(this)); } Utils.inherits(Channels, EventEmitter); @@ -71,7 +65,9 @@ var Realtime = (function() { Channels.prototype.reattach = function(reason) { for(var channelId in this.all) { var channel = this.all[channelId]; - if(channel.state === 'attaching' || channel.state === 'attached') { + /* NB this should not trigger for merely attaching channels, as they will + * be reattached anyway through the onTransportActive checkPendingState */ + if(channel.state === 'attached') { channel.requestState('attaching', reason); } } diff --git a/common/lib/client/realtimechannel.js b/common/lib/client/realtimechannel.js index 04936f7cca..464c3aac6f 100644 --- a/common/lib/client/realtimechannel.js +++ b/common/lib/client/realtimechannel.js @@ -271,7 +271,9 @@ var RealtimeChannel = (function() { /* send sync request */ var syncMessage = ProtocolMessage.fromValues({action: actions.SYNC, channel: this.name}); - syncMessage.channelSerial = this.syncChannelSerial; + if(this.syncChannelSerial) { + syncMessage.channelSerial = this.syncChannelSerial; + } connectionManager.send(syncMessage); }; @@ -476,8 +478,11 @@ var RealtimeChannel = (function() { RealtimeChannel.prototype.checkPendingState = function() { /* if can't send events, do nothing */ - if(!this.connectionManager.state.sendEvents) { - Logger.logAction(Logger.LOG_MINOR, 'RealtimeChannel.checkPendingState', 'not connected'); + var cmState = this.connectionManager.state; + /* Allow attach messages to queue up when synchronizing, since this will be + * the state we'll be in when upgrade transport.active triggers a checkpendingstate */ + if(!(cmState.sendEvents || cmState.forceQueueEvents)) { + Logger.logAction(Logger.LOG_MINOR, 'RealtimeChannel.checkPendingState', 'sendEvents is false; state is ' + this.connectionManager.state.state); return; } diff --git a/common/lib/client/realtimepresence.js b/common/lib/client/realtimepresence.js index c391726b15..2770c85c37 100644 --- a/common/lib/client/realtimepresence.js +++ b/common/lib/client/realtimepresence.js @@ -277,6 +277,7 @@ var RealtimePresence = (function() { /* RTP5c2: re-enter our own members if they haven't shown up in the sync */ this._ensureMyMembersPresent(); this.channel.setInProgress(RealtimeChannel.progressOps.sync, false); + this.channel.syncChannelSerial = null; } /* broadcast to listeners */ diff --git a/common/lib/transport/connectionmanager.js b/common/lib/transport/connectionmanager.js index 92375714b1..4e4a54c42a 100644 --- a/common/lib/transport/connectionmanager.js +++ b/common/lib/transport/connectionmanager.js @@ -87,7 +87,7 @@ var ConnectionManager = (function() { initialized: {state: 'initialized', terminal: false, queueEvents: true, sendEvents: false, failState: 'disconnected'}, connecting: {state: 'connecting', terminal: false, queueEvents: true, sendEvents: false, retryDelay: connectingTimeout, failState: 'disconnected'}, connected: {state: 'connected', terminal: false, queueEvents: false, sendEvents: true, failState: 'disconnected'}, - synchronizing: {state: 'connected', terminal: false, queueEvents: true, sendEvents: false, failState: 'disconnected'}, + synchronizing: {state: 'connected', terminal: false, queueEvents: true, sendEvents: false, forceQueueEvents: true, failState: 'disconnected'}, disconnected: {state: 'disconnected', terminal: false, queueEvents: true, sendEvents: false, retryDelay: timeouts.disconnectedRetryTimeout, failState: 'disconnected'}, suspended: {state: 'suspended', terminal: false, queueEvents: false, sendEvents: false, retryDelay: timeouts.suspendedRetryTimeout, failState: 'suspended'}, closing: {state: 'closing', terminal: false, queueEvents: false, sendEvents: false, retryDelay: timeouts.realtimeRequestTimeout, failState: 'closed'}, @@ -505,8 +505,6 @@ var ConnectionManager = (function() { }); }) - this.emit('transport.active', transport, connectionKey, transport.params); - /* If previously not connected, notify the state change (including any * error). */ if(existingState.state === this.states.connected.state) { @@ -522,6 +520,10 @@ var ConnectionManager = (function() { this.errorReason = this.realtime.connection.errorReason = error || null; } + /* Send after the connection state update, as Channels hooks into this to + * resend attaches on a new transport if necessary */ + this.emit('transport.active', transport, connectionKey, transport.params); + /* Gracefully terminate existing protocol */ if(existingActiveProtocol) { if(existingActiveProtocol.messageQueue.count() > 0) { @@ -649,8 +651,10 @@ var ConnectionManager = (function() { if(this.connectionId && this.connectionId !== connectionId) { Logger.logAction(Logger.LOG_MINOR, 'ConnectionManager.setConnection()', 'connectionId has changed; resetting msgSerial and reattaching channels'); this.msgSerial = 0; - /* Wait till next tick before reattaching channels so that connection - * state will be updated */ + /* Wait till next tick before reattaching channels, so that connection + * state will be updated and so that it will be applied after + * Channels#onTransportUpdate, else channels will not have an ATTACHED + * sent twice (once from this and once from that). */ Utils.nextTick(function() { self.realtime.channels.reattach(); }); @@ -1264,7 +1268,7 @@ var ConnectionManager = (function() { * event queueing ******************/ - ConnectionManager.prototype.send = function(msg, queueEvents, callback) { + ConnectionManager.prototype.send = function(msg, queueEvent, callback) { callback = callback || noop; var state = this.state; @@ -1273,18 +1277,17 @@ var ConnectionManager = (function() { this.sendImpl(new PendingMessage(msg, callback)); return; } - if(state.queueEvents) { - if(state == this.states.synchronizing || queueEvents) { - if (Logger.shouldLog(Logger.LOG_MICRO)) { - Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.send()', 'queueing msg; ' + ProtocolMessage.stringify(msg)); - } - this.queue(msg, callback); - } else { - var err = 'rejecting event as queueMessages was disabled; state = ' + state.state; - Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.send()', err); - callback(this.errorReason || new ErrorInfo(err, 90000, 400)); - } + var shouldQueue = (queueEvent && state.queueEvents) || state.forceQueueEvents; + if(!shouldQueue) { + var err = 'rejecting event, queueEvent was ' + queueEvent + ', state was ' + state.state; + Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.send()', err); + callback(this.errorReason || new ErrorInfo(err, 90000, 400)); + return; + } + if(Logger.shouldLog(Logger.LOG_MICRO)) { + Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.send()', 'queueing msg; ' + ProtocolMessage.stringify(msg)); } + this.queue(msg, callback); }; ConnectionManager.prototype.sendImpl = function(pendingMessage) { diff --git a/spec/realtime/channel.test.js b/spec/realtime/channel.test.js index 947cbe0fe9..0b4541cf3d 100644 --- a/spec/realtime/channel.test.js +++ b/spec/realtime/channel.test.js @@ -325,7 +325,7 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) { channel.attach(); channel.whenState('attached', function() { firedImmediately = true; - test.ok(channel.state === 'attached', 'whenState fired when attached'); + test.equal(channel.state, 'attached', 'whenState fired when attached'); closeAndFinish(test, realtime); }); test.ok(!firedImmediately, 'whenState should not fire immediately as not attached'); diff --git a/spec/realtime/connection.test.js b/spec/realtime/connection.test.js index 3eaed5cc37..a1d6faadc4 100644 --- a/spec/realtime/connection.test.js +++ b/spec/realtime/connection.test.js @@ -136,7 +136,8 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) { channel = realtime.channels.get('connectionQueuing'), connectionManager = realtime.connection.connectionManager; - connectionManager.once('transport.active', function(transport) { + realtime.connection.once('connected', function() { + var transport = connectionManager.activeProtocol.transport; channel.attach(function(err) { if(err) { test.ok(false, 'Attach failed with error: ' + helper.displayError(err)); @@ -160,7 +161,7 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) { }, function(cb) { /* After the disconnect, on reconnect, spy on transport.send again */ - connectionManager.once('transport.active', function(transport) { + connectionManager.once('transport.pending', function(transport) { var oldSend = transport.send; transport.send = function(msg, msgCb) { @@ -169,7 +170,7 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) { test.equal(msg.msgSerial, 0, 'Expect msgSerial of original message to still be 0'); test.equal(msg.messages.length, 1, 'Expect second message to not have been merged with the attempted message'); } else if(msg.messages[0].name === 'second') { - test.equal(msg.msgSerial, 1, 'Expect msgSerial of new message to be 0'); + test.equal(msg.msgSerial, 1, 'Expect msgSerial of new message to be 1'); cb(); } } diff --git a/spec/realtime/presence.test.js b/spec/realtime/presence.test.js index a14bcb1dd0..8c6190017d 100644 --- a/spec/realtime/presence.test.js +++ b/spec/realtime/presence.test.js @@ -1626,7 +1626,7 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) { * set but missing from a sync */ exports.leave_published_for_member_missing_from_sync = function(test) { test.expect(6); - var realtime = helper.AblyRealtime(), + var realtime = helper.AblyRealtime({transports: helper.availableTransports}), continuousClientId = 'continuous', goneClientId = 'gone', continuousRealtime = helper.AblyRealtime({clientId: continuousClientId}), diff --git a/spec/realtime/upgrade.test.js b/spec/realtime/upgrade.test.js index f09e3f0891..ac713e56bc 100644 --- a/spec/realtime/upgrade.test.js +++ b/spec/realtime/upgrade.test.js @@ -540,9 +540,9 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) { realtime.connection.once('connected', function() { channel.attach(function(err){ - test.ok(err.code, 90000, 'Check error code for channel attach timing out'); - test.ok(channel.state, 'suspended', 'Check channel goes into suspended state'); - test.ok(realtime.connection.state, 'connected', 'Check connection state is still connected'); + test.equal(err.code, 90007, 'Check error code for channel attach timing out'); + test.equal(channel.state, 'suspended', 'Check channel goes into suspended state'); + test.equal(realtime.connection.state, 'connected', 'Check connection state is still connected'); channel.attach(function(err){ test.ok(!err, 'Check a second attach works fine'); closeAndFinish(test, realtime)