-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathworker.js
209 lines (178 loc) · 5.79 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
const cluster = require('cluster'),
RPC = require('./rpc'),
RPCCallback = require('./rpc-callback'),
ClusterProcess = require('./cluster_process'),
LusterWorkerError = require('./errors').LusterWorkerError;
const wid = parseInt(process.env.LUSTER_WID, 10);
/**
* @constructor
* @class Worker
* @augments ClusterProcess
*/
class Worker extends ClusterProcess {
constructor() {
super();
const broadcastEvent = this._broadcastEvent;
this._foreignPropertiesReceivedPromise = new Promise(resolve => {
this.once('foreign properties received', () => {
resolve();
});
});
this.on('configured', broadcastEvent.bind(this, 'configured'));
this.on('extension loaded', broadcastEvent.bind(this, 'extension loaded'));
this.on('initialized', broadcastEvent.bind(this, 'initialized'));
this.on('loaded', broadcastEvent.bind(this, 'loaded'));
this.on('ready', broadcastEvent.bind(this, 'ready'));
cluster.worker.on('disconnect', this.emit.bind(this, 'disconnect'));
this._ready = false;
this.registerRemoteCommand(RPC.fns.worker.applyForeignProperties, this.applyForeignProperties.bind(this));
this.registerRemoteCommand(RPC.fns.worker.broadcastMasterEvent, this.broadcastMasterEvent.bind(this));
this._suspendFunctions = [];
this.registerRemoteCommandWithCallback(RPC.fns.worker.suspend, this._suspend.bind(this));
this._suspendPromise = null;
}
/**
* @memberOf {Worker}
* @property {Number} Persistent Worker identifier
* @readonly
* @public
*/
get wid(){
return wid;
}
/**
* Worker id (alias for cluster.worker.id)
* @memberOf {Worker}
* @property {Number} id
* @readonly
* @public
*/
get id() {
return cluster.worker.id;
}
/**
* Emit an event received from the master as 'master <event>'.
*/
broadcastMasterEvent(proc, emitArgs) {
const [eventName, ...eventArgs] = emitArgs;
this.emit('master ' + eventName, ...eventArgs);
}
/**
* Transmit worker event to master, which plays as relay,
* retransmitting it as 'worker <event>' to all master-side listeners.
* @param {String} event Event name
* @param {...*} args
* @private
*/
_broadcastEvent(event, ...args) {
this.remoteCall(RPC.fns.master.broadcastWorkerEvent, event, ...args);
}
/**
* Extend {Worker} properties with passed by {Master}.
* @param {ClusterProcess} proc
* @param {*} props
*/
applyForeignProperties(proc, props) {
for (const propName of Object.keys(props)) {
Object.defineProperty(this, propName, {
value: props[propName],
enumerable: true
});
}
this.emit('foreign properties received');
}
whenForeignPropertiesReceived() {
return this._foreignPropertiesReceivedPromise;
}
/**
* @override
* @see ClusterProcess
* @private
*/
_setupIPCMessagesHandler() {
process.on('message', this._onMessage.bind(this, this));
}
/**
* Turns worker to `ready` state. Must be called by worker
* if option `control.triggerReadyStateManually` set `true`.
* @returns {Worker} self
* @public
*/
ready() {
if (this._ready) {
throw new LusterWorkerError(LusterWorkerError.CODES.ALREADY_READY);
}
this._ready = true;
this.emit('ready');
return this;
}
/**
* Do a remote call to master, wait for master to handle it, then execute registered callback
* @method
* @param {String} opts.command
* @param {Function} opts.callback
* @param {Number} [opts.timeout] in milliseconds
* @param {*} [opts.data]
* @public
*/
remoteCallWithCallback(opts) {
const callbackId = RPCCallback.setCallback(this, opts.command, opts.callback, opts.timeout);
this.remoteCall(opts.command, opts.data, callbackId);
}
async _run() {
await this.whenInitialized();
await this.whenForeignPropertiesReceived();
const workerBase = this.config.resolve('app');
require(workerBase);
this.emit('loaded', workerBase);
if (!this.config.get('control.triggerReadyStateManually', false)) {
setImmediate(this.ready.bind(this));
}
}
/**
* `Require` application main script.
* Execution will be delayed until Worker became configured
* (`configured` event fired).
* @returns {Worker} self
* @public
*/
run() {
this._run();
return this;
}
/**
* @callback SuspendFunction
* @returns void|Promise<void>
*/
/**
* This adds new function that will be called before stopping worker.
* Worker will wait for returned promise to resolve and then report to master it suspended successfully.
* Rejects will emit 'error' event, no report to master will happen.
* All suspend functions are called simultaneously.
* Suspend function will not be called more than once.
* @param {SuspendFunction} func
* @public
*/
registerSuspendFunction(func) {
this._suspendFunctions.push(func);
}
_suspend(callback) {
if (!this._suspendPromise) {
this._suspendPromise = Promise.all(this._suspendFunctions.map(func => func()));
}
this._suspendPromise.then(
callback,
error => {
this.emit('error', error);
}
);
}
}
/**
* Call Master method via RPC
* @method
* @param {String} name of called command in the master
* @param {*} ...args
*/
Worker.prototype.remoteCall = RPC.createCaller(process);
module.exports = Worker;