Skip to content
This repository has been archived by the owner on Sep 29, 2021. It is now read-only.

Commit

Permalink
fix: only reset backoff once datafeed has been successfully polled
Browse files Browse the repository at this point in the history
fixes #56
  • Loading branch information
jonfreedman committed Jun 11, 2017
1 parent 8c9b7e6 commit 8955400
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 27 deletions.
68 changes: 42 additions & 26 deletions src/adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,18 @@ import {V2Message} from './message';

const entities = new XmlEntities();

type AdapterOptionsType = {
type AdapterOptionsTypeWithNulls = {
failConnectAfter?: number,
backoff?: Backoff,
shutdownFunc?: () => void
};

type AdapterOptionsType = {
failConnectAfter: number,
backoff: Backoff,
shutdownFunc: () => void
};

type SimpleMessageEnvelopeType = {
room: string
};
Expand All @@ -44,8 +51,8 @@ type MessageEnvelopeType = {
};

type MessageType = {
text: string,
format: string
text: string,
format: string
};

type MessageTypeOrString = MessageType | string;
Expand All @@ -64,15 +71,15 @@ type HubotUserType = {
class SymphonyAdapter extends Adapter {
robot: Robot;
symphony: Symphony;
expBackoff: Backoff;
backoff: Backoff;
_userLookup: (GetUserArgsType, ?string) => Promise<Object>;

/**
* @param {Robot} robot Hubot robot
* @param {AdapterOptionsType} options Configuration options that may be overridden for testing
* @constructor
*/
constructor(robot: Robot, options: AdapterOptionsType = {}) {
constructor(robot: Robot, options: AdapterOptionsType) {
super(robot);
this.robot = robot;

Expand All @@ -89,47 +96,45 @@ class SymphonyAdapter extends Adapter {
throw new Error('HUBOT_SYMPHONY_PASSPHRASE undefined');
}

this.expBackoff = backoff.exponential({
initialDelay: 10,
maxDelay: 60000,
});
this.expBackoff.on('backoff', (num, delay) => {
this.backoff = options.backoff;
this.backoff.on('backoff', (num, delay) => {
if (num > 0) {
this.robot.logger.info(`Re-attempting to create datafeed - attempt ${num} after ${delay}ms`);
}
});
this.expBackoff.on('ready', () => {
this.backoff.on('ready', () => {
this.symphony.createDatafeed()
.then((response) => {
if (response.id) {
this.robot.logger.info(`Created datafeed: ${response.id}`);
this.removeAllListeners('poll');
this.on('poll', this._pollDatafeed);
this.emit('poll', response.id);
this.emit('poll', response.id, () => {
this.backoff.reset();
this.robot.logger.debug('Successfully polled datafeed so resetting backoff');
});
this.robot.logger.debug('First poll event emitted');
this.emit('connected');
this.robot.logger.debug('connected event emitted');
this.expBackoff.reset();
} else {
this.robot.emit('error', new Error(`Unable to create datafeed: ${JSON.stringify(response)}`));
this.expBackoff.backoff();
this.backoff.backoff();
}
})
.catch((error) => {
this.robot.emit('error', new Error(`Unable to create datafeed: ${error}`));
this.expBackoff.backoff();
this.backoff.backoff();
});
});
this.expBackoff.on('fail', () => {
this.backoff.on('fail', () => {
this.robot.logger.info('Shutting down...');
if (options.shutdownFunc) {
options.shutdownFunc();
}
});
// will time out reconnecting after ~10min
const failAfter = options.failConnectAfter || 23;
this.robot.logger.info(`Reconnect attempts = ${failAfter}`);
this.expBackoff.failAfter(failAfter);
this.robot.logger.info(`Reconnect attempts = ${options.failConnectAfter}`);
this.backoff.failAfter(options.failConnectAfter);
}

/**
Expand Down Expand Up @@ -240,7 +245,7 @@ class SymphonyAdapter extends Adapter {
run() {
this.robot.logger.info('Initialising...');

const getEnv = function(key: string, defaultVal: ?string): string {
const getEnv = function (key: string, defaultVal: ?string): string {
const value = process.env[key];
if (value) {
return value;
Expand Down Expand Up @@ -274,7 +279,7 @@ class SymphonyAdapter extends Adapter {
});
// cache user details for an hour
const hourlyRefresh = memoize(this._getUser.bind(this), {maxAge: 3600000, length: 2});
this._userLookup = function(query: GetUserArgsType, streamId: ?string): Promise<Object> {
this._userLookup = function (query: GetUserArgsType, streamId: ?string): Promise<Object> {
return hourlyRefresh(query, streamId);
};
this._createDatafeed();
Expand All @@ -293,16 +298,17 @@ class SymphonyAdapter extends Adapter {
* @private
*/
_createDatafeed() {
this.expBackoff.backoff();
this.backoff.backoff();
}

/**
* Poll datafeed for zero or more messages. Ignores anything that is not a V2Message.
*
* @param {string} id Datafeed id
* @param {?function} onSuccess no-arg callback called if poll completes without error
* @private
*/
_pollDatafeed(id: string) {
_pollDatafeed(id: string, onSuccess: () => void) {
// defer execution to ensure we don't go into an infinite polling loop
const self = this;
process.nextTick(() => {
Expand All @@ -318,6 +324,9 @@ class SymphonyAdapter extends Adapter {
}
}
this.emit('poll', id);
if (onSuccess !== undefined) {
onSuccess();
}
})
.catch((error) => {
self.robot.emit('error', new Error(`Unable to read datafeed ${id}: ${error}`));
Expand Down Expand Up @@ -373,9 +382,16 @@ class SymphonyAdapter extends Adapter {
}
}

exports.use = (robot: Robot, options: AdapterOptionsType = {}) => {
options.shutdownFunc = options.shutdownFunc || function(): void {
process.exit(1);
exports.use = (robot: Robot, optionsWithNulls: AdapterOptionsTypeWithNulls = {}) => {
const options = {
failConnectAfter: optionsWithNulls.failConnectAfter || 23,
backoff: optionsWithNulls.backoff || backoff.exponential({
initialDelay: 10,
maxDelay: 60000,
}),
shutdownFunc: optionsWithNulls.shutdownFunc || function (): void {
process.exit(1);
},
};
return new SymphonyAdapter(robot, options);
};
2 changes: 1 addition & 1 deletion test/adapter-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ describe('Adapter test suite', () => {
nock.datafeedCreateHttp400Count = 1;
let robot = new FakeRobot();
let adapter = SymphonyAdapter.use(robot, {
shutdownFunc: () => done(),
failConnectAfter: 1,
shutdownFunc: () => done(),
});
adapter.run();
});
Expand Down

0 comments on commit 8955400

Please sign in to comment.