Skip to content

Commit

Permalink
Merge pull request #18 from ph0bos/added-destory
Browse files Browse the repository at this point in the history
increment version: 0.5.4
  • Loading branch information
ph0bos authored Nov 3, 2016
2 parents c9522c8 + a1e6124 commit 4627be6
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 56 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ election.on('topologyChange', function (data) {

election.on('error', function (err) {
console.log('Error: ' + err);

// Restart election listener.
election.start(function(err){
console.log("Election restarting!");
});
});

setInterval(function () {
Expand Down
11 changes: 8 additions & 3 deletions example/leader-election.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,25 @@ election.on('groupLeader', function () {
console.log('I am the leader, watch me lead!');
});

election.on('leader', function (myLeader) {
election.on('myLeader', function (myLeader) {
console.log('My leader is', myLeader);
});

election.on('follower', function (myFollower) {
election.on('myFollower', function (myFollower) {
console.log('My follower is', myFollower);
});

election.on('topology', function (data) {
election.on('topologyChange', function (data) {
console.log('Topology Change: ' + data);
});

election.on('error', function (err) {
console.log('Error: ' + err);

// Restart election listener.
election.start(function(err){
console.log("Election restarting!");
});
});

setInterval(function () {
Expand Down
150 changes: 98 additions & 52 deletions lib/LeaderElection.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@
* @module node-service-discovery
*/

var log = require('bunyan').createLogger({name: 'service-resolver'});
var _ = require('underscore');
var async = require('async');
var util = require('util');
var customErr = require('custom-error-generator');
var cache = require('memory-cache');
var log = require('bunyan').createLogger({name: 'service-resolver'});
var _ = require('underscore');
var async = require('async');
var util = require('util');
var cache = require('memory-cache');

var EventEmitter = require('events').EventEmitter;
var zkClient = require('node-zookeeper-client');
var CreateMode = require('node-zookeeper-client').CreateMode;

var zkClient = require('node-zookeeper-client');

/**
* Custom Errors
*/
var NotFoundError = customErr('NotFoundError', Error);
var ZooKeeperError = customErr('ZooKeeperError', Error);
var NotFoundError = require('custom-error-generator')('NotFoundError', Error);
var ZooKeeperError = require('custom-error-generator')('ZooKeeperError', Error);

/**
* Inherit from EventEmitter.
Expand All @@ -34,27 +34,25 @@ function LeaderElection(client, path, id, callback) {
var self = this;

self.client = client;
self.path = path;
self.id = id;
self.path = path + '/' + id;
self.znode = null;

self._isGroupLeader = false;

/**
* Override the connection callback.
*/
self.connectionCallback = callback || function (err, node) {
if (err){
log.error('connection error on start up', self.id);
}
if (err) log.error('connection error on start up', self.path);
else log.debug('connection started', self.znode);
};

if (self.client.started === true) {
self._start(self.connectionCallback);
self.start(self.connectionCallback);
}

self.client.on('connected', function () {
self._start(self.connectionCallback);
self.start(self.connectionCallback);
});

self.client.on('disconnected', function () {
Expand All @@ -63,9 +61,11 @@ function LeaderElection(client, path, id, callback) {
});
}

LeaderElection.prototype._start = function (callback) {
LeaderElection.prototype.start = function (callback) {
var self = this;

self._isWithdrawn = false;

if (self.znode) {
this._watch();
return callback(new Error('already part of an election'));
Expand All @@ -88,7 +88,7 @@ LeaderElection.prototype._start = function (callback) {
self.client
.getClient()
.transaction()
.create(self.path + '/' + self.id + '-', new Buffer(JSON.stringify({})), null, CreateMode.EPHEMERAL_SEQUENTIAL)
.create(self.path + '/node-', new Buffer(JSON.stringify({})), null, zkClient.CreateMode.EPHEMERAL_SEQUENTIAL)
.commit(cb);
}

Expand All @@ -105,24 +105,85 @@ LeaderElection.prototype._start = function (callback) {
}
};

LeaderElection.prototype.withdraw = function (callback) {
var self = this;
self._disconnect();

if(!this.znode){
log.warn("can not withdraw a znode that doesn't exist");
return callback(new Error("znode that does not exist"));
}

var serviceInstancePath = [this.path, this.znode].join('/');

self.client
.getClient()
.transaction()
.check(serviceInstancePath)
.remove(serviceInstancePath)
.commit(function(err){
self.znode = null;
callback(err);
});
};

LeaderElection.prototype.destroy = function (callback) {
var self = this;

self.client
.getClient()
.getChildren(self.path, function(err, data){
async.each(data, function(node, cb){
var serviceInstancePath = [self.path, node].join('/');

self.client
.getClient()
.transaction()
.check(serviceInstancePath)
.remove(serviceInstancePath)
.commit(function(err){
if(err) log.warn(err, "there was an issue removing a name from the path on destroy.");
cb();
});

}, function(){
self.client
.getClient()
.remove(self.path, callback);
});
})
};

LeaderElection.prototype.hasLeadership = function() {
return this._isGroupLeader;
};

LeaderElection.prototype._watch = function () {
var self = this;

function getChildrenCallback(err, data) {

if (self.hasWithdrawn) {
// If we have withdrawn this election, just return;
if (self._isWithdrawn) return;

// If the topology contains no nodes, then reset the election and error;
if(data !== undefined && data.length === 0){
self._disconnect();
self.znode = null;
self.emit('error', new Error("all nodes have gone"));
log.error({err: err}, 'all nodes have gone');
return;
}

if (err) {
// Do nothing on these particular errors since we proxy these
// client states back to the consumer already in the constructor.
if (err.getCode() !== zkClient.Exception.CONNECTION_LOSS &&
err.getCode() !== zkClient.Exception.SESSION_EXPIRED) {
log.error({err: err}, 'got zk error getting election nodes');
self._disconnect();
self.emit('error', err);
if (err.getCode() !== zkClient.Exception.CONNECTION_LOSS && err.getCode() !== zkClient.Exception.SESSION_EXPIRED) {
self.withdraw(function(err) {
log.error({err: err}, 'got zk error getting election nodes');
self.emit('error', new Error("error getting election nodes"));
});
return;
}
} else {

Expand All @@ -132,13 +193,17 @@ LeaderElection.prototype._watch = function () {
var myIndex = data.indexOf(self.znode);

if (myIndex === -1) {
self.emit('error', new Error('my own znode not found in zk'));
self.withdraw(function(err){
log.error({err: err}, 'my own znode not found in zk');
self.emit('error', new Error('my own znode not found in zk'));
});
return;
}

var myLeader = (myIndex === 0 ? null : data[myIndex - 1]);
var myFollower = ((myIndex + 1 === data.length) ? null : data[myIndex + 1]);

log.info({
log.debug({
currentMyLeader: self._myLeader,
currentMyFollower: self._myFollower,
currentIsGroupLeader: self._isGroupLeader,
Expand All @@ -147,20 +212,17 @@ LeaderElection.prototype._watch = function () {
newMyFollower: myFollower
}, 'Election.watch: determining new election state.');

if (myIndex === -1) {
return self.emit('error', new mod_verror.VError('my own znode not found in zk.'));
}

if (myIndex === 0 && !self._isGroupLeader) {
self._myLeader = null;
self._isGroupLeader = true;
log.info('emitting groupLeader');
log.debug('emitting groupLeader');
self.emit('groupLeader');
}

if (self._myFollower !== myFollower) {
self._myFollower = myFollower;
log.info({follower: myFollower}, 'emitting follower');
log.debug({follower: myFollower}, 'emitting follower');
self.emit('myFollower', self._myFollower);
}

Expand All @@ -170,7 +232,7 @@ LeaderElection.prototype._watch = function () {

if (self._myLeader !== myLeader) {
self._myLeader = myLeader;
log.info({leader: myLeader}, 'emitting myLeader');
log.debug({leader: myLeader}, 'emitting myLeader');
self.emit('myLeader', self._myLeader);
}
}
Expand All @@ -179,7 +241,7 @@ LeaderElection.prototype._watch = function () {
log.debug({newTopology: data, oldTopology: self._topology}, 'checking topology');

if (!_.isEqual(data, self._topology)) {
log.info({newTopology: data, oldTopology: self._topology}, 'topology changed, emitting topology event');
log.debug({newTopology: data, oldTopology: self._topology}, 'topology changed, emitting topology event');

self._topology = data;
self.emit('topologyChange', data);
Expand All @@ -200,20 +262,8 @@ LeaderElection.prototype._watch = function () {
});
};

LeaderElection.prototype.withdraw = function (callback) {

var serviceInstancePath = [this.path, this.znode].join('/');

this.hasWithdrawn = true;

this.client
.getClient()
.transaction()
.remove(serviceInstancePath)
.commit(callback);
};

LeaderElection.prototype._disconnect = function () {
LeaderElection.prototype._disconnect = function() {
this._isWithdrawn = true;
this._myLeader = null;
this._myFollower = null;
this._isGroupLeader = false;
Expand All @@ -225,8 +275,4 @@ function compare(a, b) {
return (seqA - seqB);
}

LeaderElection.prototype.hasLeadership = function () {
return this._isGroupLeader;
};

module.exports = LeaderElection;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "zoologist",
"version": "0.5.3",
"version": "0.5.4",
"description": "A Curator-esque framework for ZooKeeper",
"main": "index.js",
"keywords": [
Expand Down
Loading

0 comments on commit 4627be6

Please sign in to comment.