From 269ef3213aee4c3fde0990b2df26de776c96ddb6 Mon Sep 17 00:00:00 2001 From: osn64 Date: Wed, 15 Aug 2018 20:38:28 +1000 Subject: [PATCH 1/9] Rabbot can now have separate instances --- src/index.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/index.js b/src/index.js index f01ee8d..0f61cbe 100644 --- a/src/index.js +++ b/src/index.js @@ -548,6 +548,4 @@ require('./config.js')(Broker); Monologue.mixInto(Broker); -var broker = new Broker(); - -module.exports = broker; +module.exports = Broker; From c2873136554cad1f6bdc78328e9e7f3791c33c29 Mon Sep 17 00:00:00 2001 From: osn64 Date: Wed, 15 Aug 2018 20:49:12 +1000 Subject: [PATCH 2/9] tests now create individual rabbot instances --- spec/integration/badConnection.spec.js | 3 ++- spec/integration/bulkPublish.spec.js | 3 ++- spec/integration/connection.spec.js | 3 ++- spec/integration/consistentHash.spec.js | 3 ++- spec/integration/directReplyQueue.spec.js | 3 ++- spec/integration/fanout.spec.js | 3 ++- spec/integration/mandatory.spec.js | 3 ++- spec/integration/noReplyQueue.spec.js | 3 ++- spec/integration/noack.spec.js | 3 ++- spec/integration/nobatch.spec.js | 3 ++- spec/integration/poisonMessages.spec.js | 3 ++- spec/integration/publish.spec.js | 3 ++- spec/integration/purgeQueue.spec.js | 3 ++- spec/integration/queueSpecificHandle.spec.js | 3 ++- spec/integration/randomQueue.spec.js | 3 ++- spec/integration/rejection.spec.js | 3 ++- spec/integration/request.spec.js | 3 ++- spec/integration/subscription.spec.js | 3 ++- spec/integration/topicExchange.spec.js | 3 ++- spec/integration/typeSpecific.spec.js | 3 ++- spec/integration/typeless.spec.js | 3 ++- spec/integration/unhandled.spec.js | 3 ++- spec/integration/unrouted.spec.js | 3 ++- spec/integration/wildCardTypes.spec.js | 3 ++- 24 files changed, 48 insertions(+), 24 deletions(-) diff --git a/spec/integration/badConnection.spec.js b/spec/integration/badConnection.spec.js index fa99942..1057bda 100644 --- a/spec/integration/badConnection.spec.js +++ b/spec/integration/badConnection.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); describe('Bad Connection', function () { const noop = () => {}; diff --git a/spec/integration/bulkPublish.spec.js b/spec/integration/bulkPublish.spec.js index 5e0f4ba..193ea2c 100644 --- a/spec/integration/bulkPublish.spec.js +++ b/spec/integration/bulkPublish.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); /* diff --git a/spec/integration/connection.spec.js b/spec/integration/connection.spec.js index 64b7a38..0322592 100644 --- a/spec/integration/connection.spec.js +++ b/spec/integration/connection.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); describe('Connection', function () { diff --git a/spec/integration/consistentHash.spec.js b/spec/integration/consistentHash.spec.js index 3955dc6..d26fd05 100644 --- a/spec/integration/consistentHash.spec.js +++ b/spec/integration/consistentHash.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); describe('Consistent Hash Exchange', function () { diff --git a/spec/integration/directReplyQueue.spec.js b/spec/integration/directReplyQueue.spec.js index 7775a19..22af7f0 100644 --- a/spec/integration/directReplyQueue.spec.js +++ b/spec/integration/directReplyQueue.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); describe(`Direct Reply Queue (replyQueue: 'rabbit')`, function () { diff --git a/spec/integration/fanout.spec.js b/spec/integration/fanout.spec.js index 15e770f..90480b7 100644 --- a/spec/integration/fanout.spec.js +++ b/spec/integration/fanout.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); describe('Fanout Exchange With Multiple Subscribed Queues', function () { diff --git a/spec/integration/mandatory.spec.js b/spec/integration/mandatory.spec.js index 495f63a..e588972 100644 --- a/spec/integration/mandatory.spec.js +++ b/spec/integration/mandatory.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); /* diff --git a/spec/integration/noReplyQueue.spec.js b/spec/integration/noReplyQueue.spec.js index 32bc221..f5790e8 100644 --- a/spec/integration/noReplyQueue.spec.js +++ b/spec/integration/noReplyQueue.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); describe('No Reply Queue (replyQueue: false)', function () { diff --git a/spec/integration/noack.spec.js b/spec/integration/noack.spec.js index ea0c02b..81c6514 100644 --- a/spec/integration/noack.spec.js +++ b/spec/integration/noack.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); describe('Message Acknowledgments Disabled (noAck: true)', function () { diff --git a/spec/integration/nobatch.spec.js b/spec/integration/nobatch.spec.js index b90c3cb..9352c45 100644 --- a/spec/integration/nobatch.spec.js +++ b/spec/integration/nobatch.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); describe('Batch Acknowledgments Disabled (noBatch: true)', function () { diff --git a/spec/integration/poisonMessages.spec.js b/spec/integration/poisonMessages.spec.js index 8d5070a..a859838 100644 --- a/spec/integration/poisonMessages.spec.js +++ b/spec/integration/poisonMessages.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); /* diff --git a/spec/integration/publish.spec.js b/spec/integration/publish.spec.js index 927b2d3..bc1d406 100644 --- a/spec/integration/publish.spec.js +++ b/spec/integration/publish.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); describe('Publishing Messages', function () { describe('without a connection defined', function () { diff --git a/spec/integration/purgeQueue.spec.js b/spec/integration/purgeQueue.spec.js index 9864640..7c7897f 100644 --- a/spec/integration/purgeQueue.spec.js +++ b/spec/integration/purgeQueue.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); /* diff --git a/spec/integration/queueSpecificHandle.spec.js b/spec/integration/queueSpecificHandle.spec.js index bc69d74..0204a81 100644 --- a/spec/integration/queueSpecificHandle.spec.js +++ b/spec/integration/queueSpecificHandle.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); /* diff --git a/spec/integration/randomQueue.spec.js b/spec/integration/randomQueue.spec.js index fa814c6..ea75d0d 100644 --- a/spec/integration/randomQueue.spec.js +++ b/spec/integration/randomQueue.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); /* diff --git a/spec/integration/rejection.spec.js b/spec/integration/rejection.spec.js index a241a35..7fb6a85 100644 --- a/spec/integration/rejection.spec.js +++ b/spec/integration/rejection.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); /* diff --git a/spec/integration/request.spec.js b/spec/integration/request.spec.js index 50eb6fd..d1d6e58 100644 --- a/spec/integration/request.spec.js +++ b/spec/integration/request.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); describe('Request & Response', function () { diff --git a/spec/integration/subscription.spec.js b/spec/integration/subscription.spec.js index 26d0c78..da79ad0 100644 --- a/spec/integration/subscription.spec.js +++ b/spec/integration/subscription.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); /* diff --git a/spec/integration/topicExchange.spec.js b/spec/integration/topicExchange.spec.js index 882fc0e..a68c826 100644 --- a/spec/integration/topicExchange.spec.js +++ b/spec/integration/topicExchange.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); /* diff --git a/spec/integration/typeSpecific.spec.js b/spec/integration/typeSpecific.spec.js index 4d8b493..8c6aff1 100644 --- a/spec/integration/typeSpecific.spec.js +++ b/spec/integration/typeSpecific.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); /* diff --git a/spec/integration/typeless.spec.js b/spec/integration/typeless.spec.js index 349907a..000d607 100644 --- a/spec/integration/typeless.spec.js +++ b/spec/integration/typeless.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); /* diff --git a/spec/integration/unhandled.spec.js b/spec/integration/unhandled.spec.js index 354b509..40fce33 100644 --- a/spec/integration/unhandled.spec.js +++ b/spec/integration/unhandled.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); describe('Unhandled Strategies', function () { diff --git a/spec/integration/unrouted.spec.js b/spec/integration/unrouted.spec.js index 768f3ad..028ca27 100644 --- a/spec/integration/unrouted.spec.js +++ b/spec/integration/unrouted.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); describe('Unroutable Messages - Alternate Exchanges', function () { diff --git a/spec/integration/wildCardTypes.spec.js b/spec/integration/wildCardTypes.spec.js index 8b79f9b..d4e8f7d 100644 --- a/spec/integration/wildCardTypes.spec.js +++ b/spec/integration/wildCardTypes.spec.js @@ -1,5 +1,6 @@ require('../setup'); -const rabbit = require('../../src/index.js'); +const Rabbit = require('../../src/index.js'); +const rabbit = new Rabbit(); const config = require('./configuration'); /* From a49c2a52ee26f0c93c8e528e42ea674b09c99cd9 Mon Sep 17 00:00:00 2001 From: osn64 Date: Wed, 15 Aug 2018 21:01:07 +1000 Subject: [PATCH 3/9] removed hardcoded values in integration connection test --- spec/integration/connection.spec.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spec/integration/connection.spec.js b/spec/integration/connection.spec.js index 0322592..0bf0654 100644 --- a/spec/integration/connection.spec.js +++ b/spec/integration/connection.spec.js @@ -15,7 +15,8 @@ describe('Connection', function () { }); it('should assign uri to connection', function () { - connected.uri.should.equal('amqp://guest:guest@127.0.0.1:5672/%2f?heartbeat=30'); + const con = config.connection; + connected.uri.should.equal(`amqp://${con.user}:${con.pass}@${con.host}:${con.port}/${con.vhost}?heartbeat=30`); }); after(function () { From 1ffb16058b57a8f38630bd404bd66c23c9afed14 Mon Sep 17 00:00:00 2001 From: osn64 Date: Wed, 15 Aug 2018 22:24:02 +1000 Subject: [PATCH 4/9] in memory pub sub now namespaced with uuid per rabbot instance --- src/ackBatch.js | 7 ++++--- src/amqp/queue.js | 6 +++--- src/index.js | 16 +++++++++------- src/topology.js | 1 + 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/ackBatch.js b/src/ackBatch.js index 2e1a938..b06248d 100644 --- a/src/ackBatch.js +++ b/src/ackBatch.js @@ -1,6 +1,5 @@ const postal = require('postal'); const Monologue = require('monologue.js'); -const signal = postal.channel('rabbit.ack'); const log = require('./log.js')('rabbot.acknack'); /* log @@ -21,10 +20,12 @@ const calls = { reject: '_reject' }; -const AckBatch = function (name, connectionName, resolver) { +const AckBatch = function (name, connectionName, resolver, options) { this.name = name; this.connectionName = connectionName; this.resolver = resolver; + this.options = options || {}; + this.signal = postal.channel(`rabbit.ack.${options.pubSubNamespace}`); this.reset(); }; @@ -251,7 +252,7 @@ AckBatch.prototype.ignoreSignal = function () { AckBatch.prototype.listenForSignal = function () { if (!this.signalSubscription) { - this.signalSubscription = signal.subscribe('#', () => { + this.signalSubscription = this.signal.subscribe('#', () => { this._processBatch(); }); } diff --git a/src/amqp/queue.js b/src/amqp/queue.js index 60b38a8..ddae4cc 100644 --- a/src/amqp/queue.js +++ b/src/amqp/queue.js @@ -1,7 +1,5 @@ const AckBatch = require('../ackBatch.js'); const postal = require('postal'); -const dispatch = postal.channel('rabbit.dispatch'); -const responses = postal.channel('rabbit.responses'); const info = require('../info'); const log = require('../log')('rabbot.queue'); const format = require('util').format; @@ -320,6 +318,8 @@ function resolveTags (channel, queue, connection) { } function subscribe (channelName, channel, topology, serializers, messages, options, exclusive) { + const dispatch = postal.channel(`rabbit.dispatch.${options.pubSubNamespace}`); + const responses = postal.channel(`rabbit.responses.${options.pubSubNamespace}`); var shouldAck = !options.noAck; var shouldBatch = !options.noBatch; var shouldCacheKeys = !options.noCacheKeys; @@ -456,7 +456,7 @@ module.exports = function (options, topology, serializers) { var channelName = [ 'queue', options.uniqueName ].join(':'); return topology.connection.getChannel(channelName, false, 'queue channel for ' + options.name) .then(function (channel) { - var messages = new AckBatch(options.name, topology.connection.name, resolveTags(channel, options.name, topology.connection.name)); + var messages = new AckBatch(options.name, topology.connection.name, resolveTags(channel, options.name, topology.connection.name), options); var subscriber = subscribe.bind(undefined, options.uniqueName, channel, topology, serializers, messages, options); var definer = define.bind(undefined, channel, options, subscriber, topology.connection.name); return { diff --git a/src/index.js b/src/index.js index 0f61cbe..43548a3 100644 --- a/src/index.js +++ b/src/index.js @@ -3,9 +3,6 @@ const connectionFn = require('./connectionFsm.js'); const topologyFn = require('./topology.js'); const postal = require('postal'); const uuid = require('uuid'); -const dispatch = postal.channel('rabbit.dispatch'); -const responses = postal.channel('rabbit.responses'); -const signal = postal.channel('rabbit.ack'); const log = require('./log'); const DEFAULT = 'default'; @@ -64,6 +61,7 @@ const serializers = { }; var Broker = function () { + this.pubSubNamespace = uuid.v4(); this.connections = {}; this.hasHandles = false; this.autoNack = false; @@ -71,6 +69,9 @@ var Broker = function () { this.configurations = {}; this.configuring = {}; this.log = log; + this.dispatch = postal.channel(`rabbit.dispatch.${this.pubSubNamespace}`); + this.responses = postal.channel(`rabbit.responses.${this.pubSubNamespace}`); + this.signal = postal.channel(`rabbit.ack.${this.pubSubNamespace}`); }; Broker.prototype.addConnection = function (opts) { @@ -82,6 +83,7 @@ Broker.prototype.addConnection = function (opts) { failAfter: 60 }, opts); const name = options.name; + options.pubSubNamespace = this.pubSubNamespace; let connection; const connectionPromise = new Promise((resolve, reject) => { @@ -157,7 +159,7 @@ Broker.prototype.addSerializer = function (contentType, serializer) { }; Broker.prototype.batchAck = function () { - signal.publish('ack', {}); + this.signal.publish('ack', {}); }; Broker.prototype.bindExchange = function (source, target, keys, connectionName = DEFAULT) { @@ -301,7 +303,7 @@ Broker.prototype.handle = function (messageType, handler, queueName, context) { } const target = parts.join('.'); - const subscription = dispatch.subscribe(target, options.handler.bind(options.context)); + const subscription = this.dispatch.subscribe(target, options.handler.bind(options.context)); if (options.autoNack) { subscription.catch(function (err, msg) { console.log("Handler for '" + target + "' failed with:", err.stack); @@ -445,7 +447,7 @@ Broker.prototype.request = function (exchangeName, options = {}, notify, connect const requestId = uuid.v1(); options.messageId = requestId; options.connectionName = options.connectionName || connectionName; - + const responses = this.responses; if (!this.connections[ options.connectionName ]) { return Promise.reject(new Error(`Request failed - no connection ${options.connectionName} has been configured`)); } @@ -497,7 +499,7 @@ Broker.prototype.setAckInterval = function (interval) { if (this.ackIntervalId) { this.clearAckInterval(); } - this.ackIntervalId = setInterval(this.batchAck, interval); + this.ackIntervalId = setInterval(this.batchAck.bind(this), interval); }; Broker.prototype.shutdown = function () { diff --git a/src/topology.js b/src/topology.js index 98f185f..e1634a2 100644 --- a/src/topology.js +++ b/src/topology.js @@ -232,6 +232,7 @@ Topology.prototype.createExchange = function (options) { Topology.prototype.createQueue = function (options) { options.uniqueName = this.getUniqueName(options); + options.pubSubNamespace = this.options.pubSubNamespace; return this.createPrimitive(Queue, 'queue', options); }; From 77f8a872eb9cea97ed86696b3ffdfbe75a6a4a34 Mon Sep 17 00:00:00 2001 From: osn64 Date: Wed, 15 Aug 2018 22:25:02 +1000 Subject: [PATCH 5/9] fixed broken test due to pub sub namespacing --- spec/behavior/ackBatch.spec.js | 33 ++++++++++++++++++++++++--------- spec/behavior/topology.spec.js | 9 +++++++-- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/spec/behavior/ackBatch.spec.js b/spec/behavior/ackBatch.spec.js index 15d28e0..38af4fa 100644 --- a/spec/behavior/ackBatch.spec.js +++ b/spec/behavior/ackBatch.spec.js @@ -1,6 +1,6 @@ require('../setup.js'); var postal = require('postal'); -var signal = postal.channel('rabbit.ack'); +const uuid = require('uuid'); var AckBatch = require('../../src/ackBatch.js'); var noOp = function () {}; @@ -9,7 +9,8 @@ describe('Ack Batching', function () { var batch; var messageData; before(function () { - batch = new AckBatch('test-queue', 'test-connection', noOp); + const pubSubNamespace = uuid.v4(); + batch = new AckBatch('test-queue', 'test-connection', noOp, { pubSubNamespace }); messageData = batch.getMessageOps(101); batch.addMessage(messageData); }); @@ -58,7 +59,9 @@ describe('Ack Batching', function () { status = s; done(); }; - batch = new AckBatch('test-queue', 'test-connection', resolver); + const pubSubNamespace = uuid.v4(); + const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`); + batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace }); batch.listenForSignal(); signal.publish('go', {}); }); @@ -85,7 +88,9 @@ describe('Ack Batching', function () { status = s; done(); }; - batch = new AckBatch('test-queue', 'test-connection', resolver); + const pubSubNamespace = uuid.v4(); + const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`); + batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace }); batch.addMessage({ tag: 101, status: 'pending' }); batch.addMessage({ tag: 102, status: 'pending' }); batch.addMessage({ tag: 103, status: 'pending' }); @@ -125,7 +130,9 @@ describe('Ack Batching', function () { status = s; done(); }; - batch = new AckBatch('test-queue', 'test-connection', resolver); + const pubSubNamespace = uuid.v4(); + const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`); + batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace }); batch.addMessage({ tag: 101, status: 'pending' }); batch.addMessage({ tag: 102, status: 'pending' }); batch.addMessage({ tag: 103, status: 'ack' }); @@ -168,7 +175,9 @@ describe('Ack Batching', function () { data = d; return Promise.resolve(true); }; - batch = new AckBatch('test-queue', 'test-connection', resolver); + const pubSubNamespace = uuid.v4(); + const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`); + batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace }); batch.on('empty', function () { done(); }); @@ -219,7 +228,9 @@ describe('Ack Batching', function () { data = d; return Promise.resolve(true); }; - batch = new AckBatch('test-queue', 'test-connection', resolver); + const pubSubNamespace = uuid.v4(); + const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`); + batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace }); batch.on('empty', function () { done(); }); @@ -270,7 +281,9 @@ describe('Ack Batching', function () { data = d; return Promise.resolve(true); }; - batch = new AckBatch('test-queue', 'test-connection', resolver); + const pubSubNamespace = uuid.v4(); + const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`); + batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace }); batch.on('empty', function () { done(); }); @@ -322,7 +335,9 @@ describe('Ack Batching', function () { data.push(d); return Promise.resolve(true); }; - batch = new AckBatch('test-queue', 'test-connection', resolver); + const pubSubNamespace = uuid.v4(); + const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`); + batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace }); batch.on('empty', function () { done(); }); diff --git a/spec/behavior/topology.spec.js b/spec/behavior/topology.spec.js index 2c847b2..5d6af04 100644 --- a/spec/behavior/topology.spec.js +++ b/spec/behavior/topology.spec.js @@ -97,7 +97,7 @@ describe('Topology', function () { .once() .resolves(control); - topology = topologyFn(conn.instance, {}, {}, undefined, undefined, Exchange, Queue, 'test'); + topology = topologyFn(conn.instance, { pubSubNamespace: 'test' }, {}, undefined, undefined, Exchange, Queue, 'test'); Promise.all([ topology.createExchange({ name: 'top-ex', type: 'topic' }), topology.createQueue({ name: 'top-q', unique: 'hash' }) @@ -119,6 +119,7 @@ describe('Topology', function () { { name: 'test.response.queue', uniqueName: 'test.response.queue', + pubSubNamespace: 'test', autoDelete: true, subscribe: true } @@ -167,6 +168,7 @@ describe('Topology', function () { { name: 'test.response.queue', uniqueName: 'test.response.queue', + pubSubNamespace: 'test', autoDelete: true, subscribe: true } @@ -202,7 +204,8 @@ describe('Topology', function () { uniqueName: 'mine', autoDelete: false, subscribe: true - } + }, + pubSubNamespace: 'test' }; topology = topologyFn(conn.instance, options, {}, undefined, undefined, Exchange, Queue, 'test'); topology.once('replyQueue.ready', function (queue) { @@ -219,6 +222,7 @@ describe('Topology', function () { { name: 'mine', uniqueName: 'mine', + pubSubNamespace: 'test', autoDelete: false, subscribe: true } @@ -240,6 +244,7 @@ describe('Topology', function () { { name: 'mine', uniqueName: 'mine', + pubSubNamespace: 'test', autoDelete: false, subscribe: true } From c1d816961c9c268548446d69aaf665b81b9647d2 Mon Sep 17 00:00:00 2001 From: osn64 Date: Wed, 15 Aug 2018 22:25:32 +1000 Subject: [PATCH 6/9] added test for pub sub namespace using two vhosts --- spec/integration/configuration.js | 9 +++ spec/integration/instance.spec.js | 116 ++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 spec/integration/instance.spec.js diff --git a/spec/integration/configuration.js b/spec/integration/configuration.js index c4a45c6..501943f 100644 --- a/spec/integration/configuration.js +++ b/spec/integration/configuration.js @@ -8,6 +8,15 @@ module.exports = { vhost: '%2f', replyQueue: 'customReplyQueue' }, + differentVhost: { + name: 'differentVhost', + user: 'guest', + pass: 'guest', + host: '127.0.0.1', + port: 5672, + vhost: '%2fdifferent', + replyQueue: 'customReplyQueue' + }, noReplyQueue: { name: 'noReplyQueue', diff --git a/spec/integration/instance.spec.js b/spec/integration/instance.spec.js new file mode 100644 index 0000000..4fe3c11 --- /dev/null +++ b/spec/integration/instance.spec.js @@ -0,0 +1,116 @@ +require('../setup'); +const Rabbit = require('../../src/index.js'); +const config = require('./configuration'); + +const firstRabbit = new Rabbit(); +const secondRabbit = new Rabbit(); + +const configs = { + instance_1: { + connection: config.connection, + exchanges: [ + { + name: 'rabbot-ex.subscription', + type: 'topic', + alternate: 'rabbot-ex.alternate', + autoDelete: true + } + ], + queues: [ + { + name: 'rabbot-q.subscription', + autoDelete: true, + subscribe: true, + deadletter: 'rabbot-ex.deadletter' + } + ], + bindings: [ + { + exchange: 'rabbot-ex.subscription', + target: 'rabbot-q.subscription', + keys: 'this.is.#' + } + ] + }, + instance_2: { + connection: config.differentVhost, + exchanges: [ + { + name: 'rabbot-ex.subscription', + type: 'topic', + alternate: 'rabbot-ex.alternate', + autoDelete: true + } + ], + queues: [ + { + name: 'rabbot-q.subscription', + autoDelete: true, + subscribe: true, + deadletter: 'rabbot-ex.deadletter' + } + ], + bindings: [ + { + exchange: 'rabbot-ex.subscription', + target: 'rabbot-q.subscription', + keys: 'this.is.#' + } + ] + } +}; + +describe('Multiple Instances', function () { + var firstHarness, secondHarness; + + before(function (done) { + new Promise(function (resolve, reject) { + firstRabbit.configure(configs.instance_1).then(() => { + firstHarness.handle('topic'); + firstRabbit.startSubscription('rabbot-q.subscription'); + firstRabbit.publish('rabbot-ex.subscription', { type: 'topic', routingKey: 'this.is.a.test', body: 'broadcast' }); + }); + + firstHarness = harnessFactory(firstRabbit, resolve, 1); + }).then(function () { + secondRabbit.configure(configs.instance_2).then(() => { + secondHarness.handle('topic'); + secondRabbit.startSubscription('rabbot-q.subscription', false, 'differentVhost'); + secondRabbit.publish('rabbot-ex.subscription', { type: 'topic', routingKey: 'this.is.a.different.test', body: 'broadcast' }, 'differentVhost'); + secondRabbit.publish('rabbot-ex.subscription', { type: 'topic', routingKey: 'this.is.a.different.test2', body: 'broadcast' }, 'differentVhost'); + }); + + secondHarness = harnessFactory(secondRabbit, done, 1); + }); + }); + + it('should not recieve the same message', function () { + const filterMsg = (m) => + ({ + body: m.body, + key: m.fields.routingKey + }); + + const firstResults = firstHarness.received.map(filterMsg); + const secondResults = secondHarness.received.map(filterMsg); + + sortBy(firstResults, 'body').should.eql( + [ + { body: 'broadcast', key: 'this.is.a.test' } + ] + ); + + sortBy(secondResults, 'body').should.eql( + [ + { body: 'broadcast', key: 'this.is.a.different.test' }, + { body: 'broadcast', key: 'this.is.a.different.test2' } + ] + ); + }); + + after(function () { + return firstHarness.clean('default').then(function () { + return secondHarness.clean('differentVhost'); + }); + }); +}); From 2d7c281832eeebbd5244bfb8c16916c6301170c6 Mon Sep 17 00:00:00 2001 From: osn64 Date: Wed, 15 Aug 2018 22:43:52 +1000 Subject: [PATCH 7/9] added second test vhost to docker image --- Dockerfile | 4 ++++ docker_files/custom_definitions.json | 25 +++++++++++++++++++++++++ docker_files/rabbitmq.config | 18 ++++++++++++++++++ 3 files changed, 47 insertions(+) create mode 100644 docker_files/custom_definitions.json create mode 100644 docker_files/rabbitmq.config diff --git a/Dockerfile b/Dockerfile index 99da18e..c9ea6ac 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,3 +1,7 @@ FROM rabbitmq:3-management +COPY docker_files/rabbitmq.config /etc/rabbitmq/ + +COPY docker_files/custom_definitions.json /etc/rabbitmq/ + RUN rabbitmq-plugins enable --offline rabbitmq_consistent_hash_exchange diff --git a/docker_files/custom_definitions.json b/docker_files/custom_definitions.json new file mode 100644 index 0000000..fa772dd --- /dev/null +++ b/docker_files/custom_definitions.json @@ -0,0 +1,25 @@ +{ + "users": [{ + "name": "guest", + "password": "guest", + "tags": "administrator" + }], + "vhosts": [{ + "name": "/" + }, { + "name": "/different" + }], + "permissions": [{ + "user": "guest", + "vhost": "/", + "configure": ".*", + "write": ".*", + "read": ".*" + }, { + "user": "guest", + "vhost": "/different", + "configure": ".*", + "write": ".*", + "read": ".*" + }] +} diff --git a/docker_files/rabbitmq.config b/docker_files/rabbitmq.config new file mode 100644 index 0000000..bfcbd61 --- /dev/null +++ b/docker_files/rabbitmq.config @@ -0,0 +1,18 @@ +[ + {rabbit, + [ + { tcp_listeners, [ 5672 ] }, + { ssl_listeners, [ ] }, + {loopback_users, []} + ] + }, + { rabbitmq_management, [ + {load_definitions, "/etc/rabbitmq/custom_definitions.json"}, + { listener, [ + { port, 15672 }, + { ssl, false } + ] + } + ] + } +]. From 4bf6c6d69d5d95ddd22318469adfa2de97758d88 Mon Sep 17 00:00:00 2001 From: osn64 Date: Thu, 23 Aug 2018 01:30:07 +1000 Subject: [PATCH 8/9] added option passive to check the queue exists by name instead of asserting the queue --- src/amqp/queue.js | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/amqp/queue.js b/src/amqp/queue.js index ddae4cc..5cf8d3c 100644 --- a/src/amqp/queue.js +++ b/src/amqp/queue.js @@ -46,13 +46,18 @@ function define (channel, options, subscriber, connectionName) { }, 'subscribe', 'limit', 'noBatch', 'unique'); topLog.info("Declaring queue '%s' on connection '%s' with the options: %s", options.uniqueName, connectionName, JSON.stringify(options)); - return channel.assertQueue(options.uniqueName, valid) - .then(function (q) { - if (options.limit) { - channel.prefetch(options.limit); - } - return q; - }); + + if (options.passive){ + return channel.checkQueue(options.uniqueName); + } else { + return channel.assertQueue(options.uniqueName, valid) + .then(function (q) { + if (options.limit) { + channel.prefetch(options.limit); + } + return q; + }); + } } function finalize (channel, messages) { From 2952c203915371f3a75f16871b145725dbd29333 Mon Sep 17 00:00:00 2001 From: osn64 Date: Thu, 23 Aug 2018 02:07:03 +1000 Subject: [PATCH 9/9] Timestamp for messages now confirms to ampq 0-9-1 standard of seconds --- src/amqp/exchange.js | 3 ++- src/amqp/queue.js | 3 ++- src/index.js | 5 +++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/amqp/exchange.js b/src/amqp/exchange.js index b5f7c86..795e5e7 100644 --- a/src/amqp/exchange.js +++ b/src/amqp/exchange.js @@ -76,6 +76,7 @@ function publish (channel, options, topology, log, serializers, message) { return Promise.reject(new Error(errMessage)); } var payload = serializer.serialize(message.body); + var timestamp = Math.floor(Date.now()/1000); var publishOptions = { type: message.type || '', contentType: contentType, @@ -83,7 +84,7 @@ function publish (channel, options, topology, log, serializers, message) { correlationId: message.correlationId || '', replyTo: message.replyTo || topology.replyQueue.name || '', messageId: message.messageId || message.id || '', - timestamp: message.timestamp || Date.now(), + timestamp: message.timestamp || timestamp, appId: message.appId || info.id, headers: message.headers || {}, expiration: message.expiresAfter || undefined, diff --git a/src/amqp/queue.js b/src/amqp/queue.js index 5cf8d3c..196a9dc 100644 --- a/src/amqp/queue.js +++ b/src/amqp/queue.js @@ -128,6 +128,7 @@ function getReply (channel, serializers, raw, replyQueue, connectionName) { var replyType = options ? (options.replyType || defaultReplyType) : defaultReplyType; var contentType = getContentType(reply, options); var serializer = serializers[ contentType ]; + var timestamp = Math.floor(Date.now()/1000); if (!serializer) { var message = format('Failed to publish message with contentType %s - no serializer defined', contentType); log.error(message); @@ -143,7 +144,7 @@ function getReply (channel, serializers, raw, replyQueue, connectionName) { contentType: contentType, contentEncoding: 'utf8', correlationId: raw.properties.messageId, - timestamp: options && options.timestamp ? options.timestamp : Date.now(), + timestamp: options && options.timestamp ? options.timestamp : timestamp, replyTo: replyQueue === false ? undefined : replyQueue, headers: options && options.headers ? options.headers : {} }; diff --git a/src/index.js b/src/index.js index 43548a3..12ead6b 100644 --- a/src/index.js +++ b/src/index.js @@ -180,10 +180,11 @@ Broker.prototype.bulkPublish = function (set, connectionName = DEFAULT) { if (!this.connections[ connectionName ]) { return Promise.reject(new Error(`BulkPublish failed - no connection ${connectionName} has been configured`)); } + var timestamp = Math.floor(Date.now()/1000); const publish = (exchange, options) => { options.appId = options.appId || this.appId; - options.timestamp = options.timestamp || Date.now(); + options.timestamp = options.timestamp || timestamp; if (this.connections[ connectionName ] && this.connections[ connectionName ].options.publishTimeout) { options.connectionPublishTimeout = this.connections[ connectionName ].options.publishTimeout; } @@ -383,7 +384,7 @@ Broker.prototype.onReturned = function (handler) { }; Broker.prototype.publish = function (exchangeName, type, message, routingKey, correlationId, connectionName, sequenceNo) { - const timestamp = Date.now(); + const timestamp = Math.floor(Date.now()/1000); let options; if (typeof type === 'object') { options = type;