From 90226f52a8fdc1bddf2f7e1835f6ba579076439b Mon Sep 17 00:00:00 2001 From: Kris De Volder Date: Mon, 12 Nov 2012 17:36:43 -0800 Subject: [PATCH 1/6] Add an 'id' to the protocol. All messages now 'type,topic,id,payload'. Use id to identify connection. Use topic only to identify registered channel on server side. --- multiplex_client.js | 37 ++++++++++++++++++---------- multiplex_server.js | 60 ++++++++++++++++++++++++--------------------- 2 files changed, 56 insertions(+), 41 deletions(-) diff --git a/multiplex_client.js b/multiplex_client.js index d007b63..c630a1c 100644 --- a/multiplex_client.js +++ b/multiplex_client.js @@ -1,5 +1,13 @@ +/*global setTimeout escape*/ var WebSocketMultiplex = (function(){ + var newId = (function () { + var count = 0; + return function () { + return count++; + }; + }()); + // **** @@ -30,16 +38,17 @@ var WebSocketMultiplex = (function(){ this.ws = ws; this.channels = {}; this.ws.addEventListener('message', function(e) { + //uns and msg should use channel id not channel name to identify their target var t = e.data.split(','); - var type = t.shift(), name = t.shift(), payload = t.join(); - if(!(name in that.channels)) { + var type = t.shift(), topic = t.shift(), id = t.shift(), payload = t.join(); + if(!(id in that.channels)) { return; } - var sub = that.channels[name]; + var sub = that.channels[id]; switch(type) { case 'uns': - delete that.channels[name]; + delete that.channels[id]; sub.emit('close', {}); break; case 'msg': @@ -49,19 +58,21 @@ var WebSocketMultiplex = (function(){ }); }; WebSocketMultiplex.prototype.channel = function(raw_name) { - return this.channels[escape(raw_name)] = - new Channel(this.ws, escape(raw_name), this.channels); + var id = newId(); + var ch = new Channel(this.ws, escape(raw_name), id, this.channels); + this.channels[id] = ch; + return ch; }; - - var Channel = function(ws, name, channels) { + var Channel = function(ws, name, id, channels) { DumbEventTarget.call(this); var that = this; this.ws = ws; this.name = name; + this.id = id; this.channels = channels; var onopen = function() { - that.ws.send('sub,' + that.name); + that.ws.send('sub,' + that.name+','+that.id); that.emit('open'); }; if(ws.readyState > 0) { @@ -70,15 +81,15 @@ var WebSocketMultiplex = (function(){ this.ws.addEventListener('open', onopen); } }; - Channel.prototype = new DumbEventTarget() + Channel.prototype = new DumbEventTarget(); Channel.prototype.send = function(data) { - this.ws.send('msg,' + this.name + ',' + data); + this.ws.send('msg,' + this.name + ',' +this.id + ',' + data); }; Channel.prototype.close = function() { var that = this; - this.ws.send('uns,' + this.name); - delete this.channels[this.name]; + this.ws.send('uns,' + this.name + ',' + this.id); + delete this.channels[this.id]; setTimeout(function(){that.emit('close', {});},0); }; diff --git a/multiplex_server.js b/multiplex_server.js index f841246..8def83a 100644 --- a/multiplex_server.js +++ b/multiplex_server.js @@ -1,8 +1,8 @@ +/*global exports require process escape*/ var events = require('events'); var stream = require('stream'); - -exports.MultiplexServer = MultiplexServer = function(service) { +var MultiplexServer = function(service) { var that = this; this.registered_channels = {}; this.service = service; @@ -10,36 +10,33 @@ exports.MultiplexServer = MultiplexServer = function(service) { var channels = {}; conn.on('data', function(message) { + var sub; var t = message.split(','); - var type = t.shift(), topic = t.shift(), payload = t.join(); - if (!(topic in that.registered_channels)) { - return; - } - if (topic in channels) { - var sub = channels[topic]; - + var type = t.shift(), topic = t.shift(), id=t.shift(), payload = t.join(); + if (type==='sub') { + if (!(topic in that.registered_channels)) { + return; + } + sub = channels[id] = new Channel(conn, topic, id, channels); + that.registered_channels[topic].emit('connection', sub); + } else if (id in channels) { + sub = channels[id]; switch(type) { case 'uns': - delete channels[topic]; + delete channels[id]; sub.emit('close'); break; case 'msg': sub.emit('data', payload); break; } - } else { - switch(type) { - case 'sub': - var sub = channels[topic] = new Channel(conn, topic, - channels); - that.registered_channels[topic].emit('connection', sub) - break; - } } }); conn.on('close', function() { - for (topic in channels) { - channels[topic].emit('close'); + for (var id in channels) { + if (channels.hasOwnProperty(id)) { + channels[id].emit('close'); + } } channels = {}; }); @@ -47,27 +44,31 @@ exports.MultiplexServer = MultiplexServer = function(service) { }; MultiplexServer.prototype.registerChannel = function(name) { - return this.registered_channels[escape(name)] = new events.EventEmitter(); + var emitter = new events.EventEmitter(); + this.registered_channels[escape(name)] = emitter; + return emitter; }; - -var Channel = function(conn, topic, channels) { +var Channel = function(conn, topic, id, channels) { this.conn = conn; this.topic = topic; + this.id = id; this.channels = channels; stream.Stream.call(this); }; Channel.prototype = new stream.Stream(); Channel.prototype.write = function(data) { - this.conn.write('msg,' + this.topic + ',' + data); + this.conn.write('msg,' + this.topic + ',' + this.id + ',' + data); }; Channel.prototype.end = function(data) { var that = this; - if (data) this.write(data); - if (this.topic in this.channels) { - this.conn.write('uns,' + this.topic); - delete this.channels[this.topic]; + if (data) { + this.write(data); + } + if (this.id in this.channels) { + this.conn.write('uns,' + this.topic + ',' + this.id); + delete this.channels[this.id]; process.nextTick(function(){that.emit('close');}); } }; @@ -76,3 +77,6 @@ Channel.prototype.destroy = Channel.prototype.destroySoon = this.removeAllListeners(); this.end(); }; + + +exports.MultiplexServer = MultiplexServer; \ No newline at end of file From 8cf4613347c3cdb70474c931a40850db1f4d6e2a Mon Sep 17 00:00:00 2001 From: Kris De Volder Date: Mon, 12 Nov 2012 17:43:37 -0800 Subject: [PATCH 2/6] Update example to make use of new capabilities. Also make it use the local/patched copy of multiplex implementation instead of the 'official' one. --- examples/sockjs/index.html | 37 +++++++++++++++++++++++++++++------- examples/sockjs/package.json | 3 +-- examples/sockjs/server.js | 5 +++-- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/examples/sockjs/index.html b/examples/sockjs/index.html index f3be5c3..32e62d8 100644 --- a/examples/sockjs/index.html +++ b/examples/sockjs/index.html @@ -2,7 +2,7 @@ - +