Skip to content

Commit

Permalink
Rewrite callbacks to arrow functions
Browse files Browse the repository at this point in the history
  • Loading branch information
mcheshkov committed Jun 15, 2018
1 parent 3eca254 commit 953fac1
Show file tree
Hide file tree
Showing 48 changed files with 287 additions and 384 deletions.
5 changes: 4 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
"root": true,
"extends": "nodules",
"rules": {
"arrow-parens": ["error", "as-needed"],
"arrow-spacing": "error",
"no-console": 0,
"no-mixed-requires": 0,
"no-multiple-empty-lines": [2, {"max": 1}],
"no-var": "error",
"global-require": 0
"global-require": 0,
"prefer-arrow-callback": "error"
},
"env": {
"es6": true,
Expand Down
4 changes: 2 additions & 2 deletions examples/custom_master_and_ipc/master.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ if (proc.isMaster) {
* @param {WorkerWrapper} sender
* @param {*} value
*/
function(sender, value) {
proc.forEach(function(worker) {
(sender, value) => {
proc.forEach(worker => {
// repeat command to all workers except `sender`
if (worker.id !== sender.id) {
// pass sender.wid to another workers know command source
Expand Down
4 changes: 2 additions & 2 deletions examples/custom_master_and_ipc/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ counters[worker.id] = 0;

worker.registerRemoteCommand(
'updateCounter',
function(target, workerId, value) {
(target, workerId, value) => {
// update recieved counter
counters[workerId] = value;
});

http
.createServer(function(req, res) {
.createServer((req, res) => {
res.end('Worker #' + worker.id + ' at your service, sir!\n\nCounters: ' + JSON.stringify(counters));

// update counter in another workers
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/simple_extension/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if (cluster.worker.id === 1) {
}

http
.createServer(function(req, res) {
.createServer((req, res) => {
res.end('Worker #' + cluster.worker.id + ' at your service, sir!');
})
.listen(process.env.port);
2 changes: 1 addition & 1 deletion examples/simple_server/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if (worker.wid === 1 || worker.wid === 0) {
}

http
.createServer(function(req, res) {
.createServer((req, res) => {
res.end('Worker #' + worker.wid + ' at your service, sir!');
})
.listen(process.env.port);
22 changes: 9 additions & 13 deletions lib/cluster_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,9 @@ class ClusterProcess extends EventEmitterEx {
config = Configuration.extend(config, this.config.getBaseDir());

if (extension.configure.length > 2) {
setImmediate(function() {
extension.configure(config, self, callback);
});
setImmediate(() => extension.configure(config, self, callback));
} else {
setImmediate(function() {
setImmediate(() => {
extension.configure(config, self);
callback();
});
Expand Down Expand Up @@ -188,8 +186,8 @@ class ClusterProcess extends EventEmitterEx {
return;
}

extensions.forEach(function(name) {
this.loadExtension(name, function(error) {
extensions.forEach(name => {
this.loadExtension(name, error => {
if (error) {
return self.emit('error', error);
}
Expand All @@ -203,12 +201,10 @@ class ClusterProcess extends EventEmitterEx {
self.emit('initialized');
}
});
}, this);
});

loadTimer = setTimeout(function() {
const timeouted = extensions.filter(function(name) {
return !loadedExtensions.has(name);
}),
loadTimer = setTimeout(() => {
const timeouted = extensions.filter(name => !loadedExtensions.has(name)),
error = LusterClusterProcessError.createError(
LusterClusterProcessError.CODES.EXTENSIONS_LOAD_TIMEOUT,
{timeouted: timeouted, timeout: loadTimeout});
Expand Down Expand Up @@ -323,11 +319,11 @@ class ClusterProcess extends EventEmitterEx {
* @param {*} [data]
* @param {String} callbackId
*/
this.registerRemoteCommand(command, function(proc, data, callbackId) {
this.registerRemoteCommand(command, (proc, data, callbackId) => {
/**
* @param {*} [callbackData]
*/
return handler(function(callbackData) {
return handler(callbackData => {
proc.remoteCall(RPC.fns.callback, callbackId, callbackData);
}, data);
});
Expand Down
77 changes: 36 additions & 41 deletions lib/master.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ class Master extends ClusterProcess {
*/
_onSignalQuit() {
this
.once('shutdown', function() {
process.exit(0);
})
.once('shutdown', () => process.exit(0))
.shutdown();
}

Expand All @@ -88,14 +86,14 @@ class Master extends ClusterProcess {
}

const self = this,
inUse = this.getWorkersArray().some(function(w) {
return worker.wid !== w.wid &&
w.isRunning() &&
port.isEqualTo(w.options.port);
});
inUse = this.getWorkersArray().some(w =>
worker.wid !== w.wid &&
w.isRunning() &&
port.isEqualTo(w.options.port)
);

if (!inUse) {
port.unlink(function(err) {
port.unlink(err => {
if (err) {
self.emit('error', err);
}
Expand All @@ -109,9 +107,10 @@ class Master extends ClusterProcess {
*/
_checkWorkersAlive() {
const workers = this.getWorkersArray(),
alive = workers.reduce(function(count, w) {
return w.dead ? count - 1 : count;
}, workers.length);
alive = workers.reduce(
(count, w) => w.dead ? count - 1 : count,
workers.length
);

if (alive === 0) {
this.emit('shutdown');
Expand All @@ -126,9 +125,10 @@ class Master extends ClusterProcess {
*/
_proxyWorkerEvents(worker) {
WorkerWrapper.EVENTS
.forEach(function(eventName) {
worker.on(eventName, this.emit.bind(this, 'worker ' + eventName, worker));
}, this);
.forEach(eventName => {
const proxyEventName = 'worker ' + eventName;
worker.on(eventName, this.emit.bind(this, proxyEventName, worker));
});
}

/**
Expand Down Expand Up @@ -182,9 +182,10 @@ class Master extends ClusterProcess {
* }, master);
*/
forEach(fn) {
this.getWorkersArray().forEach(function(worker) {
fn.call(this, worker);
}, this);
this.getWorkersArray()
.forEach(
worker => fn.call(this, worker)
);

return this;
}
Expand Down Expand Up @@ -248,24 +249,23 @@ class Master extends ClusterProcess {
* @returns {Master} self
*/
waitForWorkers(wids, event, callback) {
const self = this;
const pendingWids = new Set(wids);

function onWorkerState(worker) {
const onWorkerState = worker => {
const wid = worker.wid;
if (pendingWids.has(wid)) {
pendingWids.delete(wid);
}
if (pendingWids.size === 0) {
self.removeListener(event, onWorkerState);
callback.call(self);
this.removeListener(event, onWorkerState);
callback.call(this);
}
}
};

if (wids.length > 0) {
this.on(event, onWorkerState);
} else {
setImmediate(callback.bind(self));
setImmediate(callback.bind(this));
}

return this;
Expand Down Expand Up @@ -306,9 +306,8 @@ class Master extends ClusterProcess {

this.waitForAllWorkers(
'worker ready',
function() {
this.emit('restarted');
});
() => this.emit('restarted')
);

return this;
}
Expand All @@ -322,9 +321,7 @@ class Master extends ClusterProcess {
* @fires Master#restarted when workers spawned and ready.
*/
softRestart() {
this.forEach(function(worker) {
worker.softRestart();
});
this.forEach(worker => worker.softRestart());
this._restartQueue.once('drain', this.emit.bind(this, 'restarted'));
return this;
}
Expand Down Expand Up @@ -359,11 +356,11 @@ class Master extends ClusterProcess {
* @public
*/
remoteCallToAll(name, ...args) {
this.forEach(function(worker) {
this.forEach(worker => {
if (worker.ready) {
worker.remoteCall(name, ...args);
} else {
worker.on('ready', function() {
worker.on('ready', () => {
worker.remoteCall(name, ...args);
});
}
Expand All @@ -378,7 +375,7 @@ class Master extends ClusterProcess {
* @public
*/
broadcastEventToAll(event, ...args) {
this.forEach(function(worker) {
this.forEach(worker => {
if (worker.ready) {
worker.broadcastEvent(event, ...args);
}
Expand Down Expand Up @@ -409,7 +406,7 @@ class Master extends ClusterProcess {
shutdown() {
const stoppedWorkers = [];

this.forEach(function(worker) {
this.forEach(worker => {
if (worker.isRunning()) {
worker.stop();
stoppedWorkers.push(worker.wid);
Expand All @@ -419,9 +416,8 @@ class Master extends ClusterProcess {
this.waitForWorkers(
stoppedWorkers,
'worker exit',
function() {
this.emit('shutdown');
});
() => this.emit('shutdown')
);

return this;
}
Expand All @@ -436,7 +432,7 @@ class Master extends ClusterProcess {
* @public
*/
remoteCallToAllWithCallback(opts) {
this.forEach(function(worker) {
this.forEach(worker => {
if (worker.isRunning()) {
worker.remoteCallWithCallback(opts);
}
Expand Down Expand Up @@ -473,9 +469,8 @@ Master.prototype.run = Master.whenInitialized(function() {

this.waitForAllWorkers(
'worker ready',
function() {
this.emit('running');
});
() => this.emit('running')
);

return this;
});
Expand Down
2 changes: 1 addition & 1 deletion lib/port.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class Port {
return;
}

fs.unlink(value, function(err) {
fs.unlink(value, err => {
if (err && err.code !== 'ENOENT') {
cb(LusterPortError
.createError(LusterPortError.CODES.CAN_NOT_UNLINK_UNIX_SOCKET, err)
Expand Down
6 changes: 3 additions & 3 deletions lib/restart_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ class RestartQueue extends EventEmitterEx {
return;
}

const removeWorker = function() {
const removeWorker = () => {
worker.removeListener('ready', removeWorker);
worker.removeListener('state', checkDead);
this._remove(worker);
}.bind(this);
};

const checkDead = function(state) {
const checkDead = state => {
if (state === WorkerWrapper.STATES.STOPPED && worker.dead) {
removeWorker();
}
Expand Down
13 changes: 5 additions & 8 deletions lib/rpc-callback.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,24 @@ const RPCCallback = {
* @returns {String} callbackId
*/
setCallback: function(proc, command, callback, timeout) {
const self = this,
storage = self._storage;
const storage = this._storage;

if ( ! timeout) {
timeout = 10000;
}

const callbackId = proc.wid + '_' + self._counter++;
const callbackId = proc.wid + '_' + this._counter++;

storage[callbackId] = {
callback: callback,
timeout:
setTimeout(function() {
setTimeout(() => {
storage[callbackId].callback(
proc,
LusterRPCCallbackError.createError(
LusterRPCCallbackError.CODES.REMOTE_CALL_WITH_CALLBACK_TIMEOUT,
{ command: command }));
self.removeCallback(callbackId);
this.removeCallback(callbackId);
}, timeout)
};

Expand All @@ -50,9 +49,7 @@ const RPCCallback = {
return;
}

setImmediate(function() {
stored.callback(proc, null, data);
});
setImmediate(() => stored.callback(proc, null, data));
this.removeCallback(callbackId);
},

Expand Down
Loading

0 comments on commit 953fac1

Please sign in to comment.