Skip to content

Commit

Permalink
Merge pull request #73 from kuzzleio/refactor-mq-protocol-controller
Browse files Browse the repository at this point in the history
Refactor WS/MQ controllers : use only one listenning topic for ALL controllers
  • Loading branch information
scottinet committed Nov 17, 2015
2 parents fa9c3d2 + 07018f9 commit ffd8f29
Show file tree
Hide file tree
Showing 12 changed files with 1,087 additions and 1,345 deletions.
515 changes: 179 additions & 336 deletions docs/API.AMQP.md

Large diffs are not rendered by default.

403 changes: 161 additions & 242 deletions docs/API.MQTT.md

Large diffs are not rendered by default.

467 changes: 175 additions & 292 deletions docs/API.STOMP.md

Large diffs are not rendered by default.

309 changes: 137 additions & 172 deletions docs/API.WebSocket.md

Large diffs are not rendered by default.

133 changes: 86 additions & 47 deletions features/support/apiAMQP.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,205 +41,244 @@ module.exports = {

create: function (body, persist) {
var
topic = ['write', this.world.fakeCollection, 'create'].join('.'),
msg = {
controller: 'write',
collection: this.world.fakeCollection,
action: 'create',
persist: persist,
body: body
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

createOrUpdate: function (body) {
var
topic = ['write', this.world.fakeCollection, 'createOrUpdate'].join('.'),
msg = {
controller: 'write',
collection: this.world.fakeCollection,
action: 'createOrUpdate',
body: body
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

get: function (id) {
var
topic = ['read', this.world.fakeCollection, 'get'].join('.'),
msg = {
controller: 'read',
collection: this.world.fakeCollection,
action: 'get',
_id: id
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

search: function (filters) {
var
topic = ['read', this.world.fakeCollection, 'search'].join('.'),
msg = {
controller: 'read',
collection: this.world.fakeCollection,
action: 'search',
body: filters
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

count: function (filters) {
var
topic = ['read', this.world.fakeCollection, 'count'].join('.'),
msg = {
controller: 'read',
collection: this.world.fakeCollection,
action: 'count',
body: filters
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

update: function (id, body) {
var
topic = ['write', this.world.fakeCollection, 'update'].join('.'),
msg = {
controller: 'write',
collection: this.world.fakeCollection,
action: 'update',
_id: id,
body: body
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

deleteById: function (id) {
var
topic = ['write', this.world.fakeCollection, 'delete'].join('.'),
msg = {
controller: 'write',
collection: this.world.fakeCollection,
action: 'delete',
_id: id
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

deleteByQuery: function (filters) {
var
topic = ['write', this.world.fakeCollection, 'deleteByQuery'].join('.'),
msg = {
controller: 'write',
collection: this.world.fakeCollection,
action: 'deleteByQuery',
body: filters
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

deleteCollection: function () {
var
topic = ['admin', this.world.fakeCollection, 'deleteCollection'].join('.'),
msg = {};
msg = {
controller: 'admin',
collection: this.world.fakeCollection,
action: 'deleteCollection',
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

putMapping: function () {
var
topic = ['admin', this.world.fakeCollection, 'putMapping'].join('.'),
msg = {
controller: 'admin',
collection: this.world.fakeCollection,
action: 'putMapping',
body: this.world.schema
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

bulkImport: function (bulk) {
var
topic = ['bulk', this.world.fakeCollection, 'import'].join('.'),
msg = {
controller: 'bulk',
collection: this.world.fakeCollection,
action: 'import',
body: bulk
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

globalBulkImport: function (bulk) {
var
topic = ['bulk', '', 'import'].join('.'),
msg = {
controller: 'bulk',
action: 'import',
body: bulk
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

subscribe: function (filters) {
var
topic = ['subscribe', this.world.fakeCollection, 'on'].join('.'),
msg = {
controller: 'subscribe',
collection: this.world.fakeCollection,
action: 'on',
body: null
};

if (filters) {
msg.body = filters;
}

return publishAndListen.call(this, topic, msg);
return publishAndListen.call(this, msg);
},

unsubscribe: function (room, clientId) {
var
topic = ['subscribe', this.world.fakeCollection, 'off'].join('.'),
msg = {
clientId: clientId,
controller: 'subscribe',
collection: this.world.fakeCollection,
action: 'off',
body: { roomId: room }
};

this.subscribedRooms[clientId][room].close();
delete this.subscribedRooms[clientId];

return publish.call(this, topic, msg, false);
return publish.call(this, msg, false);
},

countSubscription: function () {
var
topic = ['subscribe', this.world.fakeCollection, 'count'].join('.'),
clients = Object.keys(this.subscribedRooms),
rooms = Object.keys(this.subscribedRooms[clients[0]]),
msg = {
controller: 'subscribe',
collection: this.world.fakeCollection,
action: 'count',
body: {
roomId: rooms[0]
}
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

getStats: function () {
var
topic = ['admin', '', 'getStats'].join('.'),
msg = {};
msg = {
controller: 'admin',
action: 'getStats'
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

getAllStats: function () {
var
topic = ['admin', '', 'getAllStats'].join('.'),
msg = {};
msg = {
controller: 'admin',
action: 'getStats'
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

listCollections: function () {
var
topic = ['read', '', 'listCollections'].join('.'),
msg = {};
msg = {
controller: 'read',
action: 'listCollections'
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
},

now: function () {
var
topic = ['read', '', 'now'].join('.'),
msg = {};
msg = {
controller: 'read',
action: 'now'
};

return publish.call(this, topic, msg);
return publish.call(this, msg);
}
};

var publish = function (topic, message, waitForAnswer) {
var publish = function (message, waitForAnswer) {
var
deferred = q.defer(),
topic = 'kuzzle',
listen = (waitForAnswer !== undefined) ? waitForAnswer : true;

if ( !message.clientId ) {
if (!message.clientId) {
message.clientId = this.clientId;
}

Expand Down Expand Up @@ -277,14 +316,14 @@ var publish = function (topic, message, waitForAnswer) {
return deferred.promise;
};

var publishAndListen = function (topic, message) {
var publishAndListen = function (message) {
var
deferred = q.defer();

message.clientId = uuid.v1();
this.subscribedRooms[message.clientId] = {};

publish.call(this, topic, message)
publish.call(this, message)
.then(response => {
this.amqpClient.then(connection => { return connection.createChannel(); })
.then(channel => {
Expand Down
Loading

0 comments on commit ffd8f29

Please sign in to comment.