Skip to content

Commit

Permalink
feat(pubsub): migrate from paho-mqtt to mqtt.js
Browse files Browse the repository at this point in the history
  • Loading branch information
kyle-seongwoo-jun committed Feb 25, 2025
1 parent 677f466 commit e947379
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 3,001 deletions.
34 changes: 34 additions & 0 deletions packages/pubsub/__mocks__/mqtt.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const mqttClientMockCache: Record<string, any> = {};

const mockMqttClient = (clientId: string) => {
if (mqttClientMockCache[clientId]) {
return mqttClientMockCache[clientId];
}

const eventHandlers: Record<string, (...args: any[]) => void> = {};

const client = {
on: jest.fn((event, handler) => {
eventHandlers[event] = handler;
}),
publish: jest.fn((topic, message) => {
eventHandlers.message(topic, message);
}),
subscribe: jest.fn(),
unsubscribe: jest.fn(),
connected: true,
end: jest.fn(),
};

mqttClientMockCache[clientId] = client;

return client;
};

const mqtt = {
connectAsync: jest.fn((_, { clientId }) =>
Promise.resolve(mockMqttClient(clientId)),
),
};

export default mqtt;
19 changes: 0 additions & 19 deletions packages/pubsub/__mocks__/paho-mqtt.js

This file was deleted.

61 changes: 0 additions & 61 deletions packages/pubsub/__tests__/PubSub.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,51 +21,12 @@ jest.mock('@aws-amplify/core', () => ({
}));

import { Reachability } from '@aws-amplify/core/internals/utils';
import * as Paho from '../src/vendor/paho-mqtt';
import { ConnectionState, PubSub as IotPubSub, mqttTopicMatch } from '../src';
import { PubSub as MqttPubSub } from '../src/clients/mqtt';
import { HubConnectionListener } from './helpers';
import { Observable, Observer } from 'rxjs';
import * as constants from '../src/Providers/constants';

const pahoClientMockCache = {};

const mockConnect = jest.fn(options => {
options.onSuccess();
});

const pahoClientMock = jest.fn().mockImplementation((host, clientId) => {
if (pahoClientMockCache[clientId]) {
return pahoClientMockCache[clientId];
}

var client = {} as any;

client.connect = mockConnect;
client.send = jest.fn((topic, message) => {
client.onMessageArrived({ destinationName: topic, payloadString: message });
});
client.subscribe = jest.fn((topics, options) => {});
client.unsubscribe = jest.fn(() => {});
client.onMessageArrived = jest.fn(() => {});

client.isConnected = jest.fn(() => true);
client.disconnect = jest.fn(() => {});

pahoClientMockCache[clientId] = client;

return client;
});

jest.mock('../src/vendor/paho-mqtt', () => ({
__esModule: true,
...jest.requireActual('../src/vendor/paho-mqtt'),
Client: {},
}));

// @ts-ignore
Paho.Client = pahoClientMock;

const testPubSubAsync = (
pubsub,
topic,
Expand Down Expand Up @@ -414,28 +375,6 @@ describe('PubSub', () => {
});
});

describe('MqttOverWSProvider local testing config', () => {
test('ssl should be disabled in the case of local testing', async () => {
mockConnect.mockClear();
const pubsub = new MqttPubSubTest({
region: 'region',
aws_appsync_dangerously_connect_to_http_endpoint_for_testing: true,
});

await testPubSubAsync(pubsub, 'topicA', 'my message MqttOverWSProvider', {
provider: 'MqttOverWSProvider',
});

expect(pubsub['isSSLEnabled']).toBe(false);
expect(mockConnect).toHaveBeenCalledWith({
useSSL: false,
mqttVersion: 3,
onSuccess: expect.any(Function),
onFailure: expect.any(Function),
});
});
});

describe('multiple providers', () => {
test('subscribe and publish to specific provider', async () => {
const iotClient = new IotPubSub({
Expand Down
7 changes: 4 additions & 3 deletions packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
"test:size": "size-limit",
"build-with-test": "npm run clean && npm run build",
"build:umd": "webpack && webpack --config ./webpack.config.dev.js",
"build:esm-cjs": "rollup --forceExit -c rollup.config.mjs && cp -R src/vendor dist/cjs/vendor && cp -R src/vendor dist/esm/vendor",
"build:watch": "mkdirp dist/esm/vendor && mkdirp dist/cjs/vendor && cp -R src/vendor dist/cjs/vendor && cp -R src/vendor dist/esm/vendor && rollup --forceExit -c rollup.config.mjs --watch",
"build:esm-cjs": "rollup --forceExit -c rollup.config.mjs",
"build:watch": "rollup --forceExit -c rollup.config.mjs --watch",
"build": "npm run clean && npm run build:esm-cjs && npm run build:umd",
"clean": "npm run clean:size && rimraf dist lib lib-esm",
"clean:size": "rimraf dual-publish-tmp tmp*",
"format": "echo \"Not implemented\"",
"lint": "eslint '**/*.{ts,tsx}' && npm run ts-coverage",
"lint:fix": "eslint '**/*.{ts,tsx}' --fix",
"ts-coverage": "typescript-coverage-report -p ./tsconfig.build.json -t 93.0 -i src/vendor/paho-mqtt.js"
"ts-coverage": "typescript-coverage-report -p ./tsconfig.build.json -t 93.0"
},
"typesVersions": {
">=4.2": {
Expand Down Expand Up @@ -76,6 +76,7 @@
"@aws-amplify/auth": "6.11.1",
"buffer": "4.9.2",
"graphql": "15.8.0",
"mqtt": "^5.10.3",
"rxjs": "^7.8.1",
"tslib": "^2.5.0",
"url": "0.11.0"
Expand Down
7 changes: 0 additions & 7 deletions packages/pubsub/rollup.config.mjs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { fileURLToPath } from 'node:url';
import { defineConfig } from 'rollup';
import typescript from '@rollup/plugin-typescript';
import { getInputForGlob } from '../../rollup/utils.mjs';
Expand All @@ -18,18 +17,12 @@ const config = defineConfig([
// CJS config
{
input: input,
external: [
fileURLToPath(new URL('src/vendor/paho-mqtt.js', import.meta.url)),
],
output: cjsOutput,
plugins: [typescript(cjsTSOptions)],
},
// ESM config
{
input: input,
external: [
fileURLToPath(new URL('src/vendor/paho-mqtt.js', import.meta.url)),
],
output: esmOutput,
plugins: [typescript(esmTSOptions)],
},
Expand Down
104 changes: 32 additions & 72 deletions packages/pubsub/src/Providers/MqttOverWS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { Observable, Observer, SubscriptionLike as Subscription } from 'rxjs';
import { ConsoleLogger, Hub, HubPayload } from '@aws-amplify/core';
import { amplifyUuid } from '@aws-amplify/core/internals/utils';
import mqtt, { MqttClient } from 'mqtt';

import {
ConnectionState,
Expand All @@ -13,9 +14,6 @@ import {
PublishInput,
SubscribeInput,
} from '../types/PubSub';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore this module is expected to not have declaration file
import * as Paho from '../vendor/paho-mqtt.js';
import {
CONNECTION_CHANGE,
ConnectionStateMonitor,
Expand Down Expand Up @@ -51,28 +49,12 @@ export interface MqttOptions extends PubSubOptions {
endpoint?: string;
}

interface PahoClient {
onMessageArrived(params: {
destinationName: string;
payloadString: string;
}): void;
onConnectionLost(params: { errorCode: number }): void;
connect(
params: Record<string, string | number | boolean | (() => void)>,
): void;
disconnect(): void;
isConnected(): boolean;
subscribe(topic: string): void;
unsubscribe(topic: string): void;
send(topic: string, message: string): void;
}

class ClientsQueue {
private promises = new Map<string, Promise<PahoClient | undefined>>();
private promises = new Map<string, Promise<MqttClient | undefined>>();

async get(
clientId: string,
clientFactory?: (input: string) => Promise<PahoClient | undefined>,
clientFactory?: (input: string) => Promise<MqttClient | undefined>,
) {
const cachedPromise = this.promises.get(clientId);
if (cachedPromise) return cachedPromise;
Expand Down Expand Up @@ -149,11 +131,6 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
return this._clientsQueue;
}

protected get isSSLEnabled() {
return !this.options
.aws_appsync_dangerously_connect_to_http_endpoint_for_testing;
}

public onDisconnect({
clientId,
errorCode,
Expand All @@ -176,61 +153,44 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
}
}

public async newClient({ url, clientId }: MqttOptions): Promise<PahoClient> {
public async newClient({
url,
clientId,
}: MqttOptions): Promise<MqttClient | undefined> {
logger.debug('Creating new MQTT client', clientId);

this.connectionStateMonitor.record(CONNECTION_CHANGE.OPENING_CONNECTION);
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore this module is expected to not have declaration file
const client = new Paho.Client(url, clientId) as PahoClient;

client.onMessageArrived = ({
destinationName: topic,
payloadString: msg,
}: {
destinationName: string;
payloadString: string;
}) => {
this._onMessage(topic, msg);
};
client.onConnectionLost = ({
errorCode,
...args
}: {
errorCode: number;
}) => {
this.onDisconnect({ clientId, errorCode, ...args });

let client: MqttClient;
try {
client = await mqtt.connectAsync(url!, { clientId, protocolVersion: 3 });
} catch (e) {
if (clientId) this._clientsQueue.remove(clientId);
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
};

const connected = await new Promise((resolve, _reject) => {
client.connect({
useSSL: this.isSSLEnabled,
mqttVersion: 3,
onSuccess: () => {
resolve(true);
},
onFailure: () => {
if (clientId) this._clientsQueue.remove(clientId);
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
resolve(false);
},
});
});

if (connected) {
this.connectionStateMonitor.record(
CONNECTION_CHANGE.CONNECTION_ESTABLISHED,
);
return undefined;
}

client.on('message', (topic, payload) => {
this._onMessage(topic, payload.toString());
});
client.on('disconnect', packet => {
const { reasonCode, properties } = packet;
this.onDisconnect({ clientId, errorCode: reasonCode, ...properties });
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
});

this.connectionStateMonitor.record(
CONNECTION_CHANGE.CONNECTION_ESTABLISHED,
);

return client;
}

protected async connect(
clientId: string,
options: MqttOptions = {},
): Promise<PahoClient | undefined> {
): Promise<MqttClient | undefined> {
return this.clientsQueue.get(clientId, async inputClientId => {
const client = await this.newClient({
...options,
Expand All @@ -253,8 +213,8 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
protected async disconnect(clientId: string): Promise<void> {
const client = await this.clientsQueue.get(clientId);

if (client && client.isConnected()) {
client.disconnect();
if (client && client.connected) {
client.end();
}
this.clientsQueue.remove(clientId);
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
Expand All @@ -269,7 +229,7 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
if (client) {
logger.debug('Publishing to topic(s)', targetTopics.join(','), message);
targetTopics.forEach(topic => {
client.send(topic, msg);
client.publish(topic, msg);
});
} else {
logger.debug(
Expand Down Expand Up @@ -396,7 +356,7 @@ export class MqttOverWS extends AbstractPubSub<MqttOptions> {
// if no observers exists for the topic, topic should be removed
if (observersForTopic.size === 0) {
this._topicObservers.delete(topic);
if (client.isConnected()) {
if (client.connected) {
client.unsubscribe(topic);
}
}
Expand Down
Loading

0 comments on commit e947379

Please sign in to comment.