Skip to content

Commit

Permalink
Setup Short Run Mqtt5 Canary (#503)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera authored Nov 24, 2023
1 parent b294263 commit a91e55a
Show file tree
Hide file tree
Showing 6 changed files with 1,868 additions and 109 deletions.
271 changes: 163 additions & 108 deletions canary/mqtt5/canary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0.
*/

import {ICrtError, mqtt5, mqtt5_packet} from "aws-crt";
import {ICrtError, mqtt5} from "aws-crt";
import {once} from "events";
import {v4 as uuid} from "uuid";
var weightedRandom = require('weighted-random');
Expand All @@ -13,207 +13,262 @@ type Args = { [index: string]: any };
const yargs = require('yargs');

yargs.command('*', false, (yargs: any) => {
yargs.option('duration', {
description: 'INT: time in seconds to run the canary',
type: 'number',
default: 3600,
})
yargs.option({
'duration': {
description: 'INT: time in seconds to run the canary',
type: 'number',
default: 120,
},
'endpoint': {
description: 'STR: endpoint to connect to',
type: 'string',
default: 'localhost',
},
'port': {
description: 'INT: port to connect to',
type: 'number',
default: 1883,
},
'tps': {
description: 'INT: transaction per second',
type: 'number',
default: 0,
},
'clients': {
description: 'INT: concurrent running clients',
type: 'number',
default: 10,
}
});
}, main).parse();

let RECEIVED_TOPIC : string = "Canary/Received/Topic";
let RECEIVED_TOPIC: string = "Canary/Received/Topic";

interface CanaryMqttStatistics {
clientsUsed : number;
clientsUsed: number;
publishesReceived: number;
subscribesAttempted : number;
subscribesSucceeded : number;
subscribesFailed : number;
unsubscribesAttempted : number;
unsubscribesSucceeded : number;
unsubscribesFailed : number;
publishesAttempted : number;
publishesSucceeded : number;
publishesFailed : number;
subscribesAttempted: number;
subscribesSucceeded: number;
subscribesFailed: number;
unsubscribesAttempted: number;
unsubscribesSucceeded: number;
unsubscribesFailed: number;
publishesAttempted: number;
publishesSucceeded: number;
publishesFailed: number;
totalOperation: number;
}

interface TestContext {
duration: number;
hostname: string;
port: number;
tps_sleep_time: number;
clients: number;
}

interface CanaryContext {
client : mqtt5.Mqtt5Client;
clients: mqtt5.Mqtt5Client[];

mqttStats: CanaryMqttStatistics;

subscriptions: string[][];
}

mqttStats : CanaryMqttStatistics;
function sleep(millisecond: number) {
return new Promise((resolve) => setInterval(resolve, millisecond));
}

subscriptions: string[];
function getRandomIndex(clients : mqtt5.Mqtt5Client[]): number
{
return Math.floor(Math.random() * clients.length);
}

function createCanaryClient(mqttStats : CanaryMqttStatistics) : mqtt5.Mqtt5Client {
const client_config : mqtt5.Mqtt5ClientConfig = {
hostName : process.env.AWS_TEST_MQTT5_DIRECT_MQTT_HOST ?? "localhost",
port : parseInt(process.env.AWS_TEST_MQTT5_DIRECT_MQTT_PORT ?? "0")
function createCanaryClients(testContext: TestContext, mqttStats: CanaryMqttStatistics): mqtt5.Mqtt5Client[] {
const client_config: mqtt5.Mqtt5ClientConfig = {
hostName: testContext.hostname,
port: testContext.port
};

let client : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(client_config);
const clients = [];

client.on('error', (error: ICrtError) => {});
client.on("messageReceived",(message: mqtt5_packet.PublishPacket) : void => {
mqttStats.publishesReceived++;
});
for (let i = 0; i < testContext.clients; i++) {
let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(client_config);

client.on('error', (error: ICrtError) => { });
client.on("messageReceived", (eventData: mqtt5.MessageReceivedEvent): void => {
mqttStats.publishesReceived++;
});

++mqttStats.clientsUsed;

return client;
clients.push(client);
}

return clients;
}

async function doSubscribe(context : CanaryContext) {
async function doSubscribe(context: CanaryContext) {
let topicFilter: string = `Mqtt5/Canary/RandomSubscribe${uuid()}`;

let index = getRandomIndex(context.clients);
try {
context.mqttStats.subscribesAttempted++;

await context.client.subscribe({
await context.clients[index].subscribe({
subscriptions: [
{topicFilter: RECEIVED_TOPIC, qos: mqtt5_packet.QoS.AtLeastOnce}
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce }
]
});

context.subscriptions.push(topicFilter);
context.mqttStats.subscribesSucceeded++;
} catch (err) {
context.mqttStats.subscribesFailed++;
context.subscriptions.filter(entry => entry !== topicFilter);
return;
}

context.subscriptions[index].push(topicFilter);
context.mqttStats.subscribesSucceeded++;
}

async function doUnsubscribe(context : CanaryContext) {
if (context.subscriptions.length == 0) {
async function doUnsubscribe(context: CanaryContext) {
let index = getRandomIndex(context.clients);
if (context.subscriptions[index].length == 0) {
return;
}

let topicFilter: string = context.subscriptions.pop() ?? "canthappen";
let topicFilter: string = context.subscriptions[index].pop() ?? "canthappen";

try {
context.mqttStats.unsubscribesAttempted++;

await context.client.unsubscribe({
topicFilters: [ topicFilter ]
await context.clients[index].unsubscribe({
topicFilters: [topicFilter]
});

context.mqttStats.unsubscribesSucceeded++;
} catch (err) {
context.mqttStats.unsubscribesFailed++;
context.subscriptions.push(topicFilter);
context.subscriptions[index].push(topicFilter);
}
}

async function doPublish(context : CanaryContext, qos: mqtt5_packet.QoS) {
async function doPublish(context: CanaryContext, qos: mqtt5.QoS) {
try {
context.mqttStats.publishesAttempted++;

await context.client.publish({
// Generate random binary payload data
let payload = Buffer.alloc(10000, 'a', "utf-8");
let index = getRandomIndex(context.clients);
await context.clients[index].publish({
topicName: RECEIVED_TOPIC,
qos: qos,
payload: Buffer.alloc(10000),
payload: payload,
retain: false,
payloadFormat: mqtt5_packet.PayloadFormatIndicator.Utf8,
payloadFormat: mqtt5.PayloadFormatIndicator.Utf8,
messageExpiryIntervalSeconds: 60,
responseTopic: "talk/to/me",
correlationData: Buffer.alloc(3000),
contentType: "not-json",
userProperties: [
{name: "name", value: "value"}
{ name: "name", value: "value" }
]
});

context.mqttStats.publishesSucceeded++;
} catch (err) {
context.mqttStats.publishesFailed++;
console.log("Publish Failed with " + err);
}
}

async function runCanaryIteration(endTime: Date, mqttStats : CanaryMqttStatistics) {
async function runCanary(testContext: TestContext, mqttStats: CanaryMqttStatistics) {
let startTime: Date = new Date();
let currentTime: Date = startTime;
let secondsElapsed: number = 0;

let context : CanaryContext = {
client : createCanaryClient(mqttStats),
mqttStats : mqttStats,
subscriptions : []
let context: CanaryContext = {
clients: createCanaryClients(testContext, mqttStats),
mqttStats: mqttStats,
subscriptions: []
};

mqttStats.clientsUsed++;
// Start clients
context.clients.forEach( async client => {
client.start();
const connectionSuccess = once(client, "connectionSuccess");

await connectionSuccess;

await client.subscribe({
subscriptions: [
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce }
]
});
// setup empty subscription string array
context.subscriptions.push(new Array());
});

let operationTable = [
{ weight : 1, op: async () => { await doSubscribe(context); }},
{ weight : 1, op: async () => { await doUnsubscribe(context); }},
{ weight : 20, op: async () => { await doPublish(context, mqtt5_packet.QoS.AtMostOnce); }},
{ weight : 20, op: async () => { await doPublish(context, mqtt5_packet.QoS.AtLeastOnce); }}
{ weight : 20, op: async () => { await doPublish(context, mqtt5.QoS.AtMostOnce); }},
{ weight : 20, op: async () => { await doPublish(context, mqtt5.QoS.AtLeastOnce); }}
];

var weightedOperations = operationTable.map(function (operation) {
return operation.weight;
});

const connectionSuccess = once(context.client, "connectionSuccess");

context.client.start();

await connectionSuccess;
while (secondsElapsed < testContext.duration) {

await context.client.subscribe({
subscriptions: [
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5_packet.QoS.AtLeastOnce }
]
});

let currentTime : Date = new Date();
while (currentTime.getTime() < endTime.getTime()) {
let index : number = weightedRandom(weightedOperations);
let index: number = weightedRandom(weightedOperations);

await (operationTable[index].op)();

++context.mqttStats.totalOperation;
await sleep(testContext.tps_sleep_time);
currentTime = new Date();
}

const stopped = once(context.client, "stopped");

context.client.stop();

await stopped;

context.client.close();
}

async function runCanary(durationInSeconds: number, mqttStats : CanaryMqttStatistics) {
let startTime: Date = new Date();
let currentTime: Date = startTime;
let secondsElapsed : number = 0;
let iteration : number = 0;
secondsElapsed = (currentTime.getTime() - startTime.getTime()) / 1000;
}

while (secondsElapsed < durationInSeconds) {
let iterationTime : number = Math.min(durationInSeconds - secondsElapsed, 60);
let iterationEnd = new Date(currentTime.getTime() + iterationTime * 1000);
await runCanaryIteration(iterationEnd, mqttStats);

iteration++;
console.log(`Iteration ${iteration} stats: ${JSON.stringify(mqttStats)}`);
// Stop and close clients
context.clients.forEach( async client => {
const stopped = once(client, "stopped");
client.stop();
await stopped;
client.close();
});

currentTime = new Date();
secondsElapsed = (currentTime.getTime() - startTime.getTime()) / 1000;
}
}

async function main(args : Args){
let mqttStats : CanaryMqttStatistics = {
clientsUsed : 0,
let mqttStats : CanaryMqttStatistics = {
clientsUsed: 0,
publishesReceived: 0,
subscribesAttempted : 0,
subscribesSucceeded : 0,
subscribesFailed : 0,
unsubscribesAttempted : 0,
unsubscribesSucceeded : 0,
unsubscribesFailed : 0,
publishesAttempted : 0,
publishesSucceeded : 0,
publishesFailed : 0
subscribesAttempted: 0,
subscribesSucceeded: 0,
subscribesFailed: 0,
unsubscribesAttempted: 0,
unsubscribesSucceeded: 0,
unsubscribesFailed: 0,
publishesAttempted: 0,
publishesSucceeded: 0,
publishesFailed: 0,
totalOperation: 0,
};

await runCanary(args.duration, mqttStats);
let testContext: TestContext = {
duration: args.duration,
hostname: args.endpoint,
port: args.port,
tps_sleep_time: args.tps == 0 ? 0 : (1000 / args.tps),
clients: args.clients,
}

await runCanary(testContext, mqttStats);

console.log(`Final Stats: ${JSON.stringify(mqttStats)}`)
console.log(`Final Stats: ${JSON.stringify(mqttStats)}`);
console.log(`Operation TPS: ${mqttStats.totalOperation / testContext.duration}`);

process.exit(0);

Expand Down
2 changes: 1 addition & 1 deletion canary/mqtt5/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"homepage": "https://github.com/awslabs/aws-crt-nodejs#readme",
"devDependencies": {
"@types/node": "^10.17.17",
"typescript": "^3.8.3"
"typescript": "^4.7.4"
},
"dependencies": {
"aws-crt": "file:../../",
Expand Down
Loading

0 comments on commit a91e55a

Please sign in to comment.