forked from marcghorayeb/event-sourcing
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.js
60 lines (45 loc) · 1.61 KB
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
var AMQP = require('./amqp.js');
var ConsumingWarning = require('./error/warning');
function Consumer(config) {
if (!this.domain) throw new Error('Missing domain property');
if (!config) throw new Error('Missing amqp config');
this.amqp = new AMQP(config);
}
Consumer.prototype.consume = function (callback) {
var self = this;
var queue = this.domain;
this.amqp.connect().then(function (channel) {
self.channel = channel;
channel.assertQueue(queue);
channel.consume(queue, self.handleMessage.bind(self));
}).then(callback, callback);
};
Consumer.prototype.handleMessage = function (msg) {
var self = this;
function onProcessed(err, results) {
if (err) {
if (err instanceof ConsumingWarning) {
if (self.onConsumerWarning) self.onConsumerWarning(err, msg);
} else {
return self.onConsumerError && self.onConsumerError(err, msg);
}
}
self.channel.ack(msg);
if (!msg.properties.replyTo) return;
var reply = { err: err, results: results };
var rpcConfig = { correlationId: msg.properties.correlationId };
self.channel.sendToQueue(msg.properties.replyTo, new Buffer(JSON.stringify(reply)), rpcConfig);
}
try {
var event = JSON.parse(msg.content.toString());
this.handleEvent(event, onProcessed.bind(this));
} catch (e) {
onProcessed(e);
}
};
Consumer.prototype.handleEvent = function (event, callback) {
if (!event.type) throw new Error('Missing event type');
if (!this.projections[event.type]) return callback(new Error('No projection found for ' + event.type));
this.projections[event.type].bind(this)(event.data || event.args || {}, callback);
};
module.exports = Consumer;