Skip to content

Commit

Permalink
Improve handling of the client closed error.
Browse files Browse the repository at this point in the history
  • Loading branch information
vishnureddy17 authored and anthonyvercolano committed Oct 4, 2022
1 parent 7ecac87 commit 0d6bff2
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 155 deletions.
22 changes: 15 additions & 7 deletions device/core/src/device_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ function safeCallback(callback?: (err?: Error, result?: any) => void, error?: Er
* to create an IoT Hub device client.
*/
export class Client extends InternalClient {
private _c2dFeature: boolean;
private _userRegisteredC2dListener: boolean;
private _deviceDisconnectHandler: (err?: Error, result?: any) => void;
private _blobUploadClient: BlobUploadClient;
private _fileUploadApi: FileUploadInterface;
Expand All @@ -46,19 +46,19 @@ export class Client extends InternalClient {
constructor(transport: DeviceTransport, connStr?: string, blobUploadClient?: BlobUploadClient, fileUploadApi?: FileUploadInterface) {
super(transport, connStr);
this._blobUploadClient = blobUploadClient;
this._c2dFeature = false;
this._userRegisteredC2dListener = false;
this._fileUploadApi = fileUploadApi;

this.on('removeListener', (eventName) => {
if (eventName === 'message' && this.listeners('message').length === 0) {
this.on('removeListener', () => {
if (this.listenerCount('message') === 0) {
this._userRegisteredC2dListener = false;
/*Codes_SRS_NODE_DEVICE_CLIENT_16_005: [The client shall stop listening for messages from the service whenever the last listener unsubscribes from the `message` event.]*/
debug('in removeListener, disabling C2D.');
this._disableC2D((err) => {
if (err) {
debugErrors('in removeListener, error disabling C2D: ' + err);
this.emit('error', err);
} else {
this._c2dFeature = false;
debug('removeListener successfully disabled C2D.');
}
});
Expand All @@ -67,14 +67,22 @@ export class Client extends InternalClient {

this.on('newListener', (eventName) => {
if (eventName === 'message') {
//
// We want to always retain that the we want to have this feature enabled because the API (.on) doesn't really
// provide for the capability to say it failed. It can certainly fail because a network operation is required to
// enable.
// By saving this off, we are strictly honoring that the feature is enabled. If it doesn't turn on we signal via
// the emitted 'error' that something bad happened.
// But if we ever again attain a connected state, this feature will be operational.
//
this._userRegisteredC2dListener = true;
/*Codes_SRS_NODE_DEVICE_CLIENT_16_004: [The client shall start listening for messages from the service whenever there is a listener subscribed to the `message` event.]*/
debug('in newListener, enabling C2D.');
this._enableC2D((err) => {
if (err) {
debugErrors('in newListener, error enabling C2D: ' + err);
this.emit('error', err);
} else {
this._c2dFeature = true;
debug('in newListener, successfully enabled C2D');
}
});
Expand All @@ -96,7 +104,7 @@ export class Client extends InternalClient {
if (err && this._retryPolicy.shouldRetry(err)) {
debugErrors('reconnect policy specifies a reconnect on error');
/*Codes_SRS_NODE_DEVICE_CLIENT_16_097: [If the transport emits a `disconnect` event while the client is subscribed to c2d messages the retry policy shall be used to reconnect and re-enable the feature using the transport `enableC2D` method.]*/
if (this._c2dFeature) {
if (this._userRegisteredC2dListener) {
// turn on C2D
debug('disconnectHandler re-enabling C2D');
this._enableC2D((err) => {
Expand Down
44 changes: 26 additions & 18 deletions device/core/src/internal_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ export abstract class InternalClient extends EventEmitter {

private _methodCallbackMap: any;
private _disconnectHandler: (err?: Error, result?: any) => void;
private _methodsEnabled: boolean;
private _userRegisteredMethodListener: boolean;

constructor(transport: DeviceTransport, connStr?: string) {
/*Codes_SRS_NODE_INTERNAL_CLIENT_05_001: [The Client constructor shall throw ReferenceError if the transport argument is falsy.]*/
if (!transport) throw new ReferenceError('transport is \'' + transport + '\'');

super();
this._methodsEnabled = false;
this._userRegisteredMethodListener = false;

if (connStr) {
throw new errors.InvalidOperationError('the connectionString parameter of the constructor is not used - users of the SDK should be using the `fromConnectionString` factory method.');
Expand All @@ -92,8 +92,7 @@ export abstract class InternalClient extends EventEmitter {
}
if (err && this._retryPolicy.shouldRetry(err)) {
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_098: [If the transport emits a `disconnect` event while the client is subscribed to direct methods the retry policy shall be used to reconnect and re-enable the feature using the transport `enableMethods` method.]*/
if (this._methodsEnabled) {
this._methodsEnabled = false;
if (this._userRegisteredMethodListener) {
debug('re-enabling Methods link');
this._enableMethods((err) => {
if (err) {
Expand All @@ -105,7 +104,7 @@ export abstract class InternalClient extends EventEmitter {
}

/*Codes_SRS_NODE_INTERNAL_CLIENT_16_099: [If the transport emits a `disconnect` event while the client is subscribed to desired properties updates the retry policy shall be used to reconnect and re-enable the feature using the transport `enableTwinDesiredPropertiesUpdates` method.]*/
if (this._twin && this._twin.desiredPropertiesUpdatesEnabled) {
if (this._twin && this._twin.userRegisteredDesiredPropertiesListener) {
debug('re-enabling Twin');
this._twin.enableTwinDesiredPropertiesUpdates((err) => {
if (err) {
Expand Down Expand Up @@ -348,6 +347,16 @@ export abstract class InternalClient extends EventEmitter {
}

protected _onDeviceMethod(methodName: string, callback: (request: DeviceMethodRequest, response: DeviceMethodResponse) => void): void {
//
// We want to always retain that the we want to have this feature enabled because the API (.on) doesn't really
// provide for the capability to say it failed. It can certainly fail because a network operation is required to
// enable.
// By saving this off, we are strictly honoring that the feature is enabled. If it doesn't turn on we signal via
// the emitted 'error' that something bad happened.
// But if we ever again attain a connected state, this feature will be operational.
//
this._userRegisteredMethodListener = true;

// validate input args
this._validateDeviceMethodInputs(methodName, callback);

Expand Down Expand Up @@ -426,19 +435,18 @@ export abstract class InternalClient extends EventEmitter {
}

private _enableMethods(callback: (err?: Error) => void): void {
if (!this._methodsEnabled) {
const retryOp = new RetryOperation('_enableMethods', this._retryPolicy, this._maxOperationTimeout);
retryOp.retry((opCallback) => {
this._transport.enableMethods(opCallback);
}, (err) => {
if (!err) {
this._methodsEnabled = true;
}
callback(err);
});
} else {
callback();
}
debug('enabling methods');
const retryOp = new RetryOperation('_enableMethods', this._retryPolicy, this._maxOperationTimeout);
retryOp.retry((opCallback) => {
this._transport.enableMethods(opCallback);
}, (err) => {
if (!err) {
debug('enabled methods');
} else {
debugErrors('Error while enabling methods: ' + err);
}
callback(err);
});
}

// Currently there is no code making use of this function, because there is no "removeDeviceMethod" corresponding to "onDeviceMethod"
Expand Down
75 changes: 44 additions & 31 deletions device/core/src/module_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function safeCallback(callback?: (err?: Error, result?: any) => void, error?: Er
* to create an IoT Hub device client.
*/
export class ModuleClient extends InternalClient {
private _inputMessagesEnabled: boolean;
private _userRegisteredInputMessageListener: boolean;
private _moduleDisconnectHandler: (err?: Error, result?: any) => void;
private _methodClient: MethodClient;

Expand All @@ -44,7 +44,7 @@ export class ModuleClient extends InternalClient {
*/
constructor(transport: DeviceTransport, methodClient: MethodClient) {
super(transport, undefined);
this._inputMessagesEnabled = false;
this._userRegisteredInputMessageListener = false;
this._methodClient = methodClient;

/* Codes_SRS_NODE_MODULE_CLIENT_18_012: [ The `inputMessage` event shall be emitted when an inputMessage is received from the IoT Hub service. ]*/
Expand All @@ -53,24 +53,41 @@ export class ModuleClient extends InternalClient {
this.emit('inputMessage', inputName, msg);
});

this.on('removeListener', (eventName) => {
if (eventName === 'inputMessage' && this.listeners('inputMessage').length === 0) {
this.on('removeListener', () => {
if (this.listenerCount('inputMessage') === 0) {
this._userRegisteredInputMessageListener = false;
/* Codes_SRS_NODE_MODULE_CLIENT_18_015: [ The client shall stop listening for messages from the service whenever the last listener unsubscribes from the `inputMessage` event. ]*/
debug('in removeListener, disabling input messages');
this._disableInputMessages((err) => {
if (err) {
debugErrors('in removeListener, error disabling input messages: ' + err);
this.emit('error', err);
} else {
debug('removeListener successfully disabled input messages.');
}
});
}
});

this.on('newListener', (eventName) => {
if (eventName === 'inputMessage') {
//
// We want to always retain that the we want to have this feature enabled because the API (.on) doesn't really
// provide for the capability to say it failed. It can certainly fail because a network operation is required to
// enable.
// By saving this off, we are strictly honoring that the feature is enabled. If it doesn't turn on we signal via
// the emitted 'error' that something bad happened.
// But if we ever again attain a connected state, this feature will be operational.
//
this._userRegisteredInputMessageListener = true;
/* Codes_SRS_NODE_MODULE_CLIENT_18_014: [ The client shall start listening for messages from the service whenever there is a listener subscribed to the `inputMessage` event. ]*/
debug('in newListener, enabling input messages');
this._enableInputMessages((err) => {
if (err) {
/*Codes_SRS_NODE_MODULE_CLIENT_18_017: [The client shall emit an `error` if connecting the transport fails while subscribing to `inputMessage` events.]*/
debugErrors('in newListener, error enabling input messages: ' + err);
this.emit('error', err);
} else {
debug('in newListener, successfully enabled input messages');
}
});
}
Expand All @@ -83,8 +100,7 @@ export class ModuleClient extends InternalClient {
debug('transport disconnect event: no error');
}
if (err && this._retryPolicy.shouldRetry(err)) {
if (this._inputMessagesEnabled) {
this._inputMessagesEnabled = false;
if (this._userRegisteredInputMessageListener) {
debug('re-enabling input message link');
this._enableInputMessages((err) => {
if (err) {
Expand Down Expand Up @@ -251,33 +267,30 @@ export class ModuleClient extends InternalClient {
}

private _disableInputMessages(callback: (err?: Error) => void): void {
if (this._inputMessagesEnabled) {
this._transport.disableInputMessages((err) => {
if (!err) {
this._inputMessagesEnabled = false;
}
callback(err);
});
} else {
callback();
}
debug('disabling input messages');
this._transport.disableInputMessages((err) => {
if (!err) {
debug('disabled input messages');
} else {
debugErrors('Error while disabling input messages: ' + err);
}
callback(err);
});
}

private _enableInputMessages(callback: (err?: Error) => void): void {
if (!this._inputMessagesEnabled) {
const retryOp = new RetryOperation('_enableInputMessages', this._retryPolicy, this._maxOperationTimeout);
retryOp.retry((opCallback) => {
/* Codes_SRS_NODE_MODULE_CLIENT_18_016: [ The client shall connect the transport if needed in order to receive inputMessages. ]*/
this._transport.enableInputMessages(opCallback);
}, (err) => {
if (!err) {
this._inputMessagesEnabled = true;
}
callback(err);
});
} else {
callback();
}
debug('enabling input messages');
const retryOp = new RetryOperation('_enableInputMessages', this._retryPolicy, this._maxOperationTimeout);
retryOp.retry((opCallback) => {
this._transport.enableInputMessages(opCallback);
}, (err) => {
if (!err) {
debug('enabled input messages');
} else {
debugErrors('Error while enabling input messages: ' + err);
}
callback(err);
});
}

/**
Expand Down
28 changes: 21 additions & 7 deletions device/core/src/twin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class Twin extends EventEmitter {
* The desired and reported properties dictionaries (respectively in `properties.desired` and `properties.reported`).
*/
properties: TwinProperties;
desiredPropertiesUpdatesEnabled: boolean;
userRegisteredDesiredPropertiesListener: boolean;
private _transport: DeviceTransport;
private _retryPolicy: RetryPolicy;
private _maxOperationTimeout: number;
Expand All @@ -68,7 +68,7 @@ export class Twin extends EventEmitter {
this._transport = transport;
this._retryPolicy = retryPolicy;
this._maxOperationTimeout = maxTimeout;
this.desiredPropertiesUpdatesEnabled = false;
this.userRegisteredDesiredPropertiesListener = false;
this.on('newListener', this._handleNewListener.bind(this));
/*Codes_SRS_NODE_DEVICE_TWIN_16_001: [The `Twin` constructor shall subscribe to the `twinDesiredPropertiesUpdate` event off the `transport` object.]*/
this._transport.on('twinDesiredPropertiesUpdate', this._onDesiredPropertiesUpdate.bind(this));
Expand Down Expand Up @@ -119,13 +119,18 @@ export class Twin extends EventEmitter {
* @private
*/
enableTwinDesiredPropertiesUpdates(callback: (err?: Error) => void): void {
debug('enabling twin desired properties updates');
const retryOp = new RetryOperation('enableTwinDesiredPropertiesUpdates', this._retryPolicy, this._maxOperationTimeout);
retryOp.retry((opCallback) => {
this._transport.enableTwinDesiredPropertiesUpdates((err) => {
this.desiredPropertiesUpdatesEnabled = !err;
opCallback(err);
});
}, callback);
this._transport.enableTwinDesiredPropertiesUpdates(opCallback);
}, (err) => {
if (!err) {
debug('enabled twin desired properties updates');
} else {
debugErrors('Error while enabling twin desired properties updates: ' + err);
}
callback(err);
});
}

// Note: Since we currently don't keep track of listeners, so we don't "disable" the twin properties updates when no one is listening.
Expand Down Expand Up @@ -212,6 +217,15 @@ export class Twin extends EventEmitter {
self.emit(eventName, propertyValue);
});
}
//
// We want to always retain that the we want to have this feature enabled because the API (.on) doesn't really
// provide for the capability to say it failed. It can certainly fail because a network operation is required to
// enable.
// By saving this off, we are strictly honoring that the feature is enabled. If it doesn't turn on we signal via
// the emitted 'error' that something bad happened.
// But if we ever again attain a connected state, this feature will be operational.
//
this.userRegisteredDesiredPropertiesListener = true;

/*Codes_SRS_NODE_DEVICE_TWIN_16_010: [When a listener is added for the first time on an event which name starts with `properties.desired`, the twin shall call the `enableTwinDesiredPropertiesUpdates` method of the `Transport` object.]*/
this.enableTwinDesiredPropertiesUpdates((err) => {
Expand Down
5 changes: 1 addition & 4 deletions device/core/test/_module_client_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -615,11 +615,8 @@ describe('ModuleClient', function () {
sinon.spy(fakeTransport, testConfig.enableFunc);
var client = new ModuleClient(fakeTransport);

// Calling 'on' twice to make sure it's called only once on the receiver.
// It works because the test will fail if the test callback is called multiple times, and it's called for every time the testConfig.eventName event is subscribed on the receiver.
client.on(testConfig.eventName, function () {});
client.on(testConfig.eventName, function () {});
assert.isTrue(fakeTransport[testConfig.enableFunc].calledOnce);
assert.strictEqual(fakeTransport[testConfig.enableFunc].callCount, 1);
});

/*Tests_SRS_NODE_MODULE_CLIENT_18_015: [ The client shall stop listening for messages from the service whenever the last listener unsubscribes from the `inputMessage` event. ]*/
Expand Down
Loading

0 comments on commit 0d6bff2

Please sign in to comment.