-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathindex.js
93 lines (83 loc) · 2.36 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
var redis = require('haredis')
, hydration = require('hydration')
, EventEmitter = require('events').EventEmitter
exports.attach = function (options) {
var amino = this
, subscriber = new EventEmitter
, client
if (typeof options === 'string') {
var split = options.split(':');
options = {host: split[0]};
if (split[1] && split[1].match(/^\d+$/)) {
options.port = parseInt(split[1], 10);
}
}
else if (typeof options === 'number') {
options = {port: options};
}
else if (options && !Array.isArray(options) && options['0']) {
// Support for nodes as object, like optimist might produce.
var nodes = [];
Object.keys(options).forEach(function (k) {
if (k.match(/^\d+$/)) {
nodes.push(options[k]);
delete options[k];
}
});
options.nodes = nodes;
}
else if (Array.isArray(options)) {
options = {nodes: options};
}
else {
options || (options = {host: 'localhost', port: 6379});
}
if (options.nodes) {
// haredis node list
amino.redis = client = redis.createClient(options.nodes, options);
}
else {
amino.redis = client = redis.createClient(options.port, options.host, options);
}
subscriber.setMaxListeners(0);
client.on('error', amino.emit.bind(amino, 'error'));
client.on('subscribe', amino.emit.bind(amino, 'subscribe'));
client.on('unsubscribe', amino.emit.bind(amino, 'unsubscribe'));
client.on('message', function (ev, packet) {
try {
packet = JSON.parse(packet);
packet = hydration.hydrate(packet);
subscriber.emit.apply(subscriber, [ev].concat(packet.args));
}
catch (e) {
amino.emit('error', e);
}
});
amino.publish = function () {
var args = Array.prototype.slice.call(arguments)
, ev = args.shift()
try {
args = {args: args}; // (dehydration only works on objects)
args = hydration.dehydrate(args);
client.publish(ev, JSON.stringify(args));
}
catch (e) {
amino.emit('error', e);
}
};
amino.subscribe = function (ev, handler) {
subscriber.on(ev, handler);
client.subscribe(ev);
};
amino.unsubscribe = function (ev, handler) {
if (handler) {
subscriber.removeListener(ev, handler);
}
else {
subscriber.removeAllListeners(ev);
}
if (subscriber.listeners(ev).length === 0) {
client.unsubscribe(ev);
}
};
};