Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separation of rabbot instances #173

Closed
wants to merge 9 commits into from
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
FROM rabbitmq:3-management

COPY docker_files/rabbitmq.config /etc/rabbitmq/

COPY docker_files/custom_definitions.json /etc/rabbitmq/

RUN rabbitmq-plugins enable --offline rabbitmq_consistent_hash_exchange
25 changes: 25 additions & 0 deletions docker_files/custom_definitions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"users": [{
"name": "guest",
"password": "guest",
"tags": "administrator"
}],
"vhosts": [{
"name": "/"
}, {
"name": "/different"
}],
"permissions": [{
"user": "guest",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
}, {
"user": "guest",
"vhost": "/different",
"configure": ".*",
"write": ".*",
"read": ".*"
}]
}
18 changes: 18 additions & 0 deletions docker_files/rabbitmq.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[
{rabbit,
[
{ tcp_listeners, [ 5672 ] },
{ ssl_listeners, [ ] },
{loopback_users, []}
]
},
{ rabbitmq_management, [
{load_definitions, "/etc/rabbitmq/custom_definitions.json"},
{ listener, [
{ port, 15672 },
{ ssl, false }
]
}
]
}
].
33 changes: 24 additions & 9 deletions spec/behavior/ackBatch.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require('../setup.js');
var postal = require('postal');
var signal = postal.channel('rabbit.ack');
const uuid = require('uuid');
var AckBatch = require('../../src/ackBatch.js');
var noOp = function () {};

Expand All @@ -9,7 +9,8 @@ describe('Ack Batching', function () {
var batch;
var messageData;
before(function () {
batch = new AckBatch('test-queue', 'test-connection', noOp);
const pubSubNamespace = uuid.v4();
batch = new AckBatch('test-queue', 'test-connection', noOp, { pubSubNamespace });
messageData = batch.getMessageOps(101);
batch.addMessage(messageData);
});
Expand Down Expand Up @@ -58,7 +59,9 @@ describe('Ack Batching', function () {
status = s;
done();
};
batch = new AckBatch('test-queue', 'test-connection', resolver);
const pubSubNamespace = uuid.v4();
const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`);
batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace });
batch.listenForSignal();
signal.publish('go', {});
});
Expand All @@ -85,7 +88,9 @@ describe('Ack Batching', function () {
status = s;
done();
};
batch = new AckBatch('test-queue', 'test-connection', resolver);
const pubSubNamespace = uuid.v4();
const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`);
batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace });
batch.addMessage({ tag: 101, status: 'pending' });
batch.addMessage({ tag: 102, status: 'pending' });
batch.addMessage({ tag: 103, status: 'pending' });
Expand Down Expand Up @@ -125,7 +130,9 @@ describe('Ack Batching', function () {
status = s;
done();
};
batch = new AckBatch('test-queue', 'test-connection', resolver);
const pubSubNamespace = uuid.v4();
const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`);
batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace });
batch.addMessage({ tag: 101, status: 'pending' });
batch.addMessage({ tag: 102, status: 'pending' });
batch.addMessage({ tag: 103, status: 'ack' });
Expand Down Expand Up @@ -168,7 +175,9 @@ describe('Ack Batching', function () {
data = d;
return Promise.resolve(true);
};
batch = new AckBatch('test-queue', 'test-connection', resolver);
const pubSubNamespace = uuid.v4();
const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`);
batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace });
batch.on('empty', function () {
done();
});
Expand Down Expand Up @@ -219,7 +228,9 @@ describe('Ack Batching', function () {
data = d;
return Promise.resolve(true);
};
batch = new AckBatch('test-queue', 'test-connection', resolver);
const pubSubNamespace = uuid.v4();
const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`);
batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace });
batch.on('empty', function () {
done();
});
Expand Down Expand Up @@ -270,7 +281,9 @@ describe('Ack Batching', function () {
data = d;
return Promise.resolve(true);
};
batch = new AckBatch('test-queue', 'test-connection', resolver);
const pubSubNamespace = uuid.v4();
const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`);
batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace });
batch.on('empty', function () {
done();
});
Expand Down Expand Up @@ -322,7 +335,9 @@ describe('Ack Batching', function () {
data.push(d);
return Promise.resolve(true);
};
batch = new AckBatch('test-queue', 'test-connection', resolver);
const pubSubNamespace = uuid.v4();
const signal = postal.channel(`rabbit.ack.${pubSubNamespace}`);
batch = new AckBatch('test-queue', 'test-connection', resolver, { pubSubNamespace });
batch.on('empty', function () {
done();
});
Expand Down
9 changes: 7 additions & 2 deletions spec/behavior/topology.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ describe('Topology', function () {
.once()
.resolves(control);

topology = topologyFn(conn.instance, {}, {}, undefined, undefined, Exchange, Queue, 'test');
topology = topologyFn(conn.instance, { pubSubNamespace: 'test' }, {}, undefined, undefined, Exchange, Queue, 'test');
Promise.all([
topology.createExchange({ name: 'top-ex', type: 'topic' }),
topology.createQueue({ name: 'top-q', unique: 'hash' })
Expand All @@ -119,6 +119,7 @@ describe('Topology', function () {
{
name: 'test.response.queue',
uniqueName: 'test.response.queue',
pubSubNamespace: 'test',
autoDelete: true,
subscribe: true
}
Expand Down Expand Up @@ -167,6 +168,7 @@ describe('Topology', function () {
{
name: 'test.response.queue',
uniqueName: 'test.response.queue',
pubSubNamespace: 'test',
autoDelete: true,
subscribe: true
}
Expand Down Expand Up @@ -202,7 +204,8 @@ describe('Topology', function () {
uniqueName: 'mine',
autoDelete: false,
subscribe: true
}
},
pubSubNamespace: 'test'
};
topology = topologyFn(conn.instance, options, {}, undefined, undefined, Exchange, Queue, 'test');
topology.once('replyQueue.ready', function (queue) {
Expand All @@ -219,6 +222,7 @@ describe('Topology', function () {
{
name: 'mine',
uniqueName: 'mine',
pubSubNamespace: 'test',
autoDelete: false,
subscribe: true
}
Expand All @@ -240,6 +244,7 @@ describe('Topology', function () {
{
name: 'mine',
uniqueName: 'mine',
pubSubNamespace: 'test',
autoDelete: false,
subscribe: true
}
Expand Down
3 changes: 2 additions & 1 deletion spec/integration/badConnection.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require('../setup');
const rabbit = require('../../src/index.js');
const Rabbit = require('../../src/index.js');
const rabbit = new Rabbit();

describe('Bad Connection', function () {
const noop = () => {};
Expand Down
3 changes: 2 additions & 1 deletion spec/integration/bulkPublish.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require('../setup');
const rabbit = require('../../src/index.js');
const Rabbit = require('../../src/index.js');
const rabbit = new Rabbit();
const config = require('./configuration');

/*
Expand Down
9 changes: 9 additions & 0 deletions spec/integration/configuration.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ module.exports = {
vhost: '%2f',
replyQueue: 'customReplyQueue'
},
differentVhost: {
name: 'differentVhost',
user: 'guest',
pass: 'guest',
host: '127.0.0.1',
port: 5672,
vhost: '%2fdifferent',
replyQueue: 'customReplyQueue'
},

noReplyQueue: {
name: 'noReplyQueue',
Expand Down
6 changes: 4 additions & 2 deletions spec/integration/connection.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require('../setup');
const rabbit = require('../../src/index.js');
const Rabbit = require('../../src/index.js');
const rabbit = new Rabbit();
const config = require('./configuration');

describe('Connection', function () {
Expand All @@ -14,7 +15,8 @@ describe('Connection', function () {
});

it('should assign uri to connection', function () {
connected.uri.should.equal('amqp://guest:[email protected]:5672/%2f?heartbeat=30');
const con = config.connection;
connected.uri.should.equal(`amqp://${con.user}:${con.pass}@${con.host}:${con.port}/${con.vhost}?heartbeat=30`);
});

after(function () {
Expand Down
3 changes: 2 additions & 1 deletion spec/integration/consistentHash.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require('../setup');
const rabbit = require('../../src/index.js');
const Rabbit = require('../../src/index.js');
const rabbit = new Rabbit();
const config = require('./configuration');

describe('Consistent Hash Exchange', function () {
Expand Down
3 changes: 2 additions & 1 deletion spec/integration/directReplyQueue.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require('../setup');
const rabbit = require('../../src/index.js');
const Rabbit = require('../../src/index.js');
const rabbit = new Rabbit();
const config = require('./configuration');

describe(`Direct Reply Queue (replyQueue: 'rabbit')`, function () {
Expand Down
3 changes: 2 additions & 1 deletion spec/integration/fanout.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require('../setup');
const rabbit = require('../../src/index.js');
const Rabbit = require('../../src/index.js');
const rabbit = new Rabbit();
const config = require('./configuration');

describe('Fanout Exchange With Multiple Subscribed Queues', function () {
Expand Down
116 changes: 116 additions & 0 deletions spec/integration/instance.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
require('../setup');
const Rabbit = require('../../src/index.js');
const config = require('./configuration');

const firstRabbit = new Rabbit();
const secondRabbit = new Rabbit();

const configs = {
instance_1: {
connection: config.connection,
exchanges: [
{
name: 'rabbot-ex.subscription',
type: 'topic',
alternate: 'rabbot-ex.alternate',
autoDelete: true
}
],
queues: [
{
name: 'rabbot-q.subscription',
autoDelete: true,
subscribe: true,
deadletter: 'rabbot-ex.deadletter'
}
],
bindings: [
{
exchange: 'rabbot-ex.subscription',
target: 'rabbot-q.subscription',
keys: 'this.is.#'
}
]
},
instance_2: {
connection: config.differentVhost,
exchanges: [
{
name: 'rabbot-ex.subscription',
type: 'topic',
alternate: 'rabbot-ex.alternate',
autoDelete: true
}
],
queues: [
{
name: 'rabbot-q.subscription',
autoDelete: true,
subscribe: true,
deadletter: 'rabbot-ex.deadletter'
}
],
bindings: [
{
exchange: 'rabbot-ex.subscription',
target: 'rabbot-q.subscription',
keys: 'this.is.#'
}
]
}
};

describe('Multiple Instances', function () {
var firstHarness, secondHarness;

before(function (done) {
new Promise(function (resolve, reject) {
firstRabbit.configure(configs.instance_1).then(() => {
firstHarness.handle('topic');
firstRabbit.startSubscription('rabbot-q.subscription');
firstRabbit.publish('rabbot-ex.subscription', { type: 'topic', routingKey: 'this.is.a.test', body: 'broadcast' });
});

firstHarness = harnessFactory(firstRabbit, resolve, 1);
}).then(function () {
secondRabbit.configure(configs.instance_2).then(() => {
secondHarness.handle('topic');
secondRabbit.startSubscription('rabbot-q.subscription', false, 'differentVhost');
secondRabbit.publish('rabbot-ex.subscription', { type: 'topic', routingKey: 'this.is.a.different.test', body: 'broadcast' }, 'differentVhost');
secondRabbit.publish('rabbot-ex.subscription', { type: 'topic', routingKey: 'this.is.a.different.test2', body: 'broadcast' }, 'differentVhost');
});

secondHarness = harnessFactory(secondRabbit, done, 1);
});
});

it('should not recieve the same message', function () {
const filterMsg = (m) =>
({
body: m.body,
key: m.fields.routingKey
});

const firstResults = firstHarness.received.map(filterMsg);
const secondResults = secondHarness.received.map(filterMsg);

sortBy(firstResults, 'body').should.eql(
[
{ body: 'broadcast', key: 'this.is.a.test' }
]
);

sortBy(secondResults, 'body').should.eql(
[
{ body: 'broadcast', key: 'this.is.a.different.test' },
{ body: 'broadcast', key: 'this.is.a.different.test2' }
]
);
});

after(function () {
return firstHarness.clean('default').then(function () {
return secondHarness.clean('differentVhost');
});
});
});
3 changes: 2 additions & 1 deletion spec/integration/mandatory.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require('../setup');
const rabbit = require('../../src/index.js');
const Rabbit = require('../../src/index.js');
const rabbit = new Rabbit();
const config = require('./configuration');

/*
Expand Down
Loading