Skip to content

Commit

Permalink
(fix): Special case dt-subject. issue#1141 & #1135
Browse files Browse the repository at this point in the history
  • Loading branch information
anthonyvercolano committed Oct 14, 2022
1 parent 0d6bff2 commit ddfebf7
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 18 deletions.
23 changes: 17 additions & 6 deletions common/transport/amqp/src/amqp_message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,26 @@ export class AmqpMessage {
const props = message.properties;
const propsCount = props.count();
if (propsCount > 0) {
if (!amqpMessage.application_properties) {
ensureApplicationPropertiesCreated();
}
const DT_SUBJECT = 'dt-subject';
for (let index = 0; index < propsCount; index++) {
const item = props.getItem(index);
if (!!item) {
/*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_013: [If one of the property key is `IoThub-status`, this property is reserved and shall be forced to an `int` `rhea` type.]*/
const val = (item.key === 'IoThub-status') ? rheaTypes.wrap_int(parseInt(item.value)) : item.value;
amqpMessage.application_properties[item.key] = val;
if (item.key === DT_SUBJECT) {
if (!amqpMessage.message_annotations) {
amqpMessage.message_annotations = {
[DT_SUBJECT]: item.value
};
} else {
amqpMessage.message_annotations[DT_SUBJECT] = item.value;
}
} else {
if (!amqpMessage.application_properties) {
ensureApplicationPropertiesCreated();
}
/*Codes_SRS_NODE_IOTHUB_AMQPMSG_16_013: [If one of the property key is `IoThub-status`, this property is reserved and shall be forced to an `int` `rhea` type.]*/
const val = (item.key === 'IoThub-status') ? rheaTypes.wrap_int(parseInt(item.value)) : item.value;
amqpMessage.application_properties[item.key] = val;
}
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions common/transport/amqp/test/_amqp_message_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ describe('AmqpMessage', function () {
}
});

it('SPECIAL cases the message property \'dt-subject\' to be placed into the message annotations.', function() {
var message = new Message();
const DT_SUBJECT = 'dt-subject';
const DT_VALUE = 'abcd';
message.properties.add(DT_SUBJECT, DT_VALUE);

var amqpMessage = AmqpMessage.fromMessage(message);
assert.isOk(amqpMessage.message_annotations[DT_SUBJECT]);
assert.strictEqual(amqpMessage.message_annotations[DT_SUBJECT], DT_VALUE);

});

/*Tests_SRS_NODE_IOTHUB_AMQPMSG_16_013: [If one of the property key is `IoThub-status`, this property is reserved and shall be forced to an `int` `rhea` type.]*/
it('forces the IoThub-status property encoding to \'int\' if it exists', function() {
var message = new Message();
Expand Down
2 changes: 1 addition & 1 deletion common/transport/http/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"uuid": "^8.3.2"
},
"devDependencies": {
"@azure/core-http": "1.2.3",
"@azure/core-http": "^1.2.3",
"@types/node": "^16.10.2",
"chai": "^4.3.3",
"mocha": "^9.2.1",
Expand Down
5 changes: 3 additions & 2 deletions device/transport/http/src/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { DeviceClientOptions, HttpReceiverOptions } from 'azure-iot-device';
import { getUserAgentString } from 'azure-iot-device';

const MESSAGE_PROP_HEADER_PREFIX = 'iothub-app-';
const MESSAGE_PROP_DT_SUBJECT = 'dt-subject';

/*Codes_SRS_NODE_DEVICE_HTTP_05_009: [When any Http method receives an HTTP response with a status code >= 300, it shall invoke the done callback function with the following arguments:
err - the standard JavaScript Error object, with the Node.js http.ServerResponse object attached as the property response]*/
Expand Down Expand Up @@ -174,7 +175,7 @@ export class Http extends EventEmitter implements DeviceTransport {
for (let i = 0; i < message.properties.count(); i++) {
const propItem = message.properties.getItem(i);
/*Codes_SRS_NODE_DEVICE_HTTP_13_001: [ sendEvent shall add message properties as HTTP headers and prefix the key name with the string iothub-app. ]*/
httpHeaders[MESSAGE_PROP_HEADER_PREFIX + propItem.key] = propItem.value;
httpHeaders[((propItem.key !== MESSAGE_PROP_DT_SUBJECT) ? (MESSAGE_PROP_HEADER_PREFIX) : ('')) + propItem.key] = propItem.value;
}

if (message.messageId) {
Expand Down Expand Up @@ -275,7 +276,7 @@ export class Http extends EventEmitter implements DeviceTransport {
property += ',';
const propItem = message.properties.getItem(propertyIdx);
/*Codes_SRS_NODE_DEVICE_HTTP_13_002: [ sendEventBatch shall prefix the key name for all message properties with the string iothub-app. ]*/
property += '\"' + MESSAGE_PROP_HEADER_PREFIX + propItem.key + '\":\"' + propItem.value + '\"';
property += '\"' + ((propItem.key !== MESSAGE_PROP_DT_SUBJECT) ? (MESSAGE_PROP_HEADER_PREFIX) : '') + propItem.key + '\":\"' + propItem.value + '\"';
}
if (propertyIdx > 0) {
property += '}';
Expand Down
11 changes: 8 additions & 3 deletions device/transport/http/test/_http_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,15 @@ describe('Http', function () {
});
transport._http = MockHttp;

const dtSubject = 'dt-subject';
const dtValue = 'value';
var msg = new Message("boo");
var i;
var propsCount = 3;
for(i = 1; i <= propsCount; ++i) {
var propsCount = 4;
for(i = 1; i <= propsCount-1; ++i) {
msg.properties.add('k' + i.toString(), 'v' + i.toString());
}
msg.properties.add(dtSubject, dtValue);

// act
transport.sendEvent(msg, function() {});
Expand All @@ -234,11 +237,13 @@ describe('Http', function () {
assert.isOk(spy.args[0]);
assert.isOk(spy.args[0][2]);
var headers = spy.args[0][2];
for(i = 1; i <= propsCount; ++i) {
for(i = 1; i <= propsCount-1; ++i) {
var key = 'iothub-app-k' + i.toString();
assert.isOk(headers[key]);
assert.strictEqual(headers[key], 'v' + i.toString());
}
assert.isOk(headers[dtSubject]);
assert.strictEqual(headers[dtSubject],dtValue);

// cleanup
done();
Expand Down
5 changes: 4 additions & 1 deletion device/transport/mqtt/src/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,7 @@ export class Mqtt extends EventEmitter implements DeviceTransport {

private _getEventTopicFromMessage(message: Message, extraSystemProperties?: { [key: string]: string }): string {

const DT_SUBJECT = 'dt-subject';
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_008: [The `sendEvent` method shall use a topic formatted using the following convention: `devices/<deviceId>/messages/events/`.]*/
/*Codes_SRS_NODE_DEVICE_MQTT_18_036: [ If a `moduleId` was not specified in the transport connection, the `sendOutputEvent` method shall use a topic formatted using the following convention: `devices/<deviceId>/messages/events/`. ]*/
/*Codes_SRS_NODE_DEVICE_MQTT_18_037: [ If a `moduleId` was specified in the transport connection, the `sendOutputEvent` method shall use a topic formatted using the following convention: `devices/<deviceId>/<moduleId>/messages/events/`. ]*/
Expand Down Expand Up @@ -1002,7 +1003,9 @@ export class Mqtt extends EventEmitter implements DeviceTransport {
if (message.properties && message.properties.count() > 0) {
for (let i = 0; i < message.properties.count(); i++) {
if (i > 0 || sysPropString) topic += '&';
topic += encodeURIComponent(message.properties.propertyList[i].key) + '=' + encodeURIComponent(message.properties.propertyList[i].value);
const keyName = message.properties.propertyList[i].key;
topic += encodeURIComponent(((keyName === DT_SUBJECT) ? ('$.sub') : (keyName)))
+ '=' + encodeURIComponent(message.properties.propertyList[i].value);
}
}

Expand Down
3 changes: 2 additions & 1 deletion device/transport/mqtt/test/_mqtt_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,12 @@ describe('Mqtt', function () {
testMessage.properties.add('key1', 'value1');
testMessage.properties.add('key2', 'value2');
testMessage.properties.add('key$', 'value$');
testMessage.properties.add('dt-subject','abcd$');

const transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase);
transport.connect(function () {
testConfig.sendFunc(transport, testMessage, function () {
assert.equal(fakeMqttBase.publish.firstCall.args[0], testConfig.baseTopicWithProps+'key1=value1&key2=value2&key%24=value%24' + testConfig.topicEnder);
assert.equal(fakeMqttBase.publish.firstCall.args[0], testConfig.baseTopicWithProps+'key1=value1&key2=value2&key%24=value%24&%24.sub=abcd%24' + testConfig.topicEnder);
done();
});
});
Expand Down
2 changes: 2 additions & 0 deletions e2etests/test/device_service.js
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,13 @@ function device_service_tests(deviceTransport, createDeviceMethod) {
debug(' message max send: maximum message size ' + maximumMessageSize);
message.messageId = uuidData;
message.properties.add('a', uuidData);
message.properties.add('dt-subject', uuidData);
buffer.fill(uuidData);

var onEventHubMessage = function (eventData) {
if (eventData.annotations['iothub-connection-device-id'] === provisionedDevice.deviceId) {
if ((eventData.body.length === bufferSize) && (eventData.body.indexOf(uuidData) === 0)) {
assert.strictEqual(eventData.annotations['dt-subject'], uuidData);
debug('trying to finish from the receiving side');
rdv.imDone('ehClient');
} else {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"codespaceinstall": "(cd device/samples/javascript && npm install & cd ../../../) && (cd device/samples/javascript/pnp && npm install & cd ../../../../) && (cd device/samples/typescript && npm install && npm run build)"
},
"devDependencies": {
"lerna": "^4.0.0",
"lerna": "^5.0.0",
"mocha": "^9.2.1"
},
"engines": {
Expand Down
2 changes: 1 addition & 1 deletion provisioning/service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"azure-iot-common": "1.13.0",
"azure-iot-http-base": "1.12.0",
"debug": "^4.3.1",
"@azure/core-http": "1.2.3"
"@azure/core-http": "^1.2.3"
},
"devDependencies": {
"@types/debug": "^4.1.5",
Expand Down
4 changes: 2 additions & 2 deletions service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
"dependencies": {
"async": "^3.2.3",
"es5-ext": "0.10.53",
"@azure/core-http": "1.2.3",
"@azure/identity": "2.0.0",
"@azure/core-http": "^1.2.3",
"@azure/identity": "^2.0.0",
"@azure/ms-rest-js": "^2.0.5",
"azure-iot-amqp-base": "2.5.0",
"azure-iot-common": "1.13.0",
Expand Down

0 comments on commit ddfebf7

Please sign in to comment.