An easy-to-use and flexible routing library for MQTT.
@pera-swarm/mqtt-router is a library for handling MQTT publish/subscribe capabilities with a straight forward routing architecture.
This is a Node.js library available on both npm registry and GitHub package registry.
Installation done using npm install
command:
npm i --save @pera-swarm/mqtt-router
Also, you need to install mqtt
library as well.
npm i --save mqtt
Create routes for subscribe and publish:
// Sample dynamic route list with handler functions
const SAMPLE_ROUTES = [
{
topic: 'v1/sample',
allowRetained: true,
subscribe: true,
publish: false,
handler: (msg) => {
const data = JSON.parse(msg);
console.log('Sample subscription picked up the topic', data);
}
}
];
module.exports = SAMPLE_ROUTES;
You should configure your own mqttOptions according to your mqtt broker and application settings.
// Configure mqttClient
const mqttClient = require('mqtt');
const mqttOptions = {
port: 1883,
clientId: process.env.MQTT_CLIENT,
username: process.env.MQTT_USER || '',
password: process.env.MQTT_PASS || ''
};
const mqtt = mqttClient.connect(process.env.MQTT_HOST, mqttOptions);
// Import MQTTRouter from mqtt-router
const { MQTTRouter } = require('@pera-swarm/mqtt-router');
const routes = require('./routes');
var router;
// Sample MQTT Message Options
const SAMPLE_OPTIONS = { qos: 2, rap: true, rh: true };
// Sample setup function that runs on connect
const SAMPLE_SETUP_FN = () => {
console.log('sample setup fn');
};
// Sample MQTT Error handler function
const SAMPLE_ON_ERROR_FN = (err) => {
console.log('error: mqtt');
console.log(err);
};
router = new MQTTRouter(
mqtt,
routes,
SAMPLE_OPTIONS,
SAMPLE_SETUP_FN,
SAMPLE_ON_ERROR_FN
);
router.start();
router.start()
will listen to the subscribed routes that are specified as subscribed: true
in the route
specification and then if the subscriber picked up a message for the associated topic, the MQTTRouter will call the relevant handler
funtion.
You can also wrap the routes using wrapper
function to include additional higher level attribute to the handler function as well.
// Import MQTTRouter and wrapper from mqtt-router
const { MQTTRouter, wrapper } = require('@pera-swarm/mqtt-router');
const routes = require('./routes');
var router;
// Sample MQTT Message Options
const SAMPLE_OPTIONS = { qos: 2, rap: true, rh: true };
// Sample setup function that runs on connect
const SAMPLE_SETUP_FN = () => {
console.log('sample setup fn');
};
// Sample MQTT Error handler function
const SAMPLE_ON_ERROR_FN = (err) => {
console.log('error: mqtt');
console.log(err);
};
// Sample higher level attribute for the handler function
const sampleAttrib = {
time: Date.now()
};
router = new MQTTRouter(
mqtt,
wrapper(routes, sampleAttrib),
SAMPLE_OPTIONS,
SAMPLE_SETUP_FN,
SAMPLE_ON_ERROR_FN
);
router.start();
// Sample dynamic route list with handler functions.
// sampleAttrib will be added to the handler function as the second parameter.
const SAMPLE_ROUTES = [
{
topic: 'v1/sample',
allowRetained: true,
subscribe: true,
publish: false,
handler: (msg, attrib) => {
const data = JSON.parse(msg);
// console.log(attrib);
console.log('Sample subscription picked up the topic', data);
}
}
];
module.exports = SAMPLE_ROUTES;
Note: You can also configure a topic prefix by configuring an environment variable
MQTT_CHANNEL
. (example:MQTT_CHANNEL=beta
in a .env file locally)
Install project dependencies.
npm install
Note: Before running the test cases, you should configure environment variables
MQTT_HOST
,MQTT_USER
,MQTT_PASS
, andMQTT_CLIENT
. Please refersample.nodemon.json
file for nodemon environment variable configuration.
Manually run the test cases.
node test/index.js
or you can use nodemon script once environment variables configured correctly.
npm run client
A route definition for handling route subscription logic. Following are the properties supported on the Route
definition:
-
topic: string
The Route topic -
type: 'String' | 'JSON'
Payload type (default:String) -
allowRetained: boolean
Retain allowance -
subscribe: boolean
Subscribe flag -
publish: boolean
Publish flag -
handler: Function
The default subscribe handler function, called when subscribe:true, packet.retain:true|false and allowRetained:true. Retained messages and new messages will be handled by default. -
fallbackRetainHandler?: Function
Subscribe handler function only for retained messages, but for route specific custom logic. called when subscribe:true, packet.retain:true and allowRetained:false.
Note: If specified,
fallbackRetainHandler
function will be called. If not specified, retained messages will be discarded.
The main entrypoint of mqtt-router that defines the router logic.
You have to import the MQTTRouter
class and instantiate with a mqtt client.
Parameters supported in the constructor:
mqttConnection {MqttClient}
: mqtt connectionroutes {Route[]}
: routes with mqtt topic, handler and allowRetained propertiesoptions {IClientSubscribeOptions}
: mqtt message optionssetup {Function}
: setup function that runs on successful connectiononError {Function}
: error handler function
The method for starting the mqtt router.
You must call this method once, a MQTTRouter object instantiated.
Add a message to the publish queue that to be scheduled to publish. Parameters supported:
topic {string}
: message topicdata {string|Buffer}
: message data
Add a route to the subscriber routes list. Parameter supported:
route {Route}
: route object to be added to the subscriber list
Remove a route from the subscriber routes list. Parameter supported:
topic {string}
: route topic
Wrap an array of Route
objects with a higher order property (ex: property can bethis
from the callee class) or
a separate attribute to the handler
function as a second parameter, in each route object.
Parameters supported:
routes {Route[]}
: routes arrayproperty {any}
: property that required to be wrapped with
A Queue implementation for the mqtt-router
with a scheduler that acts as a "Publish Queue".
Parameters supported in the constructor:
mqttClient {MqttClient}
: mqtt connectionoptions {IClientSubscribeOptions}
: mqtt message optionsnumber {number}
: interval
Begin the queue processing (scheduler).
Note: You must call this method once, a Queue object instantiated.
Add a message to the publish queue. This message will be published by the scheduler. Parameters supported:
topic {string}
: message topicdata {string|Buffer}
: message data
Remove a message in the queue by a given topic. Parameter supported:
topic {string}
: message topic
Find a message with the given topic in the queue. Returns -1
if not found on the queue.
Parameter supported:
topic {string}
: message topic
Subscribe to a given topic with options. Parameters supported:
mqtt {MqttClient}
: mqtt connectiontopic {string}
: message topicoptions {IClientSubscribeOptions}
: mqtt message options
Publish a message to a given message topic with options and a callback function. Parameters supported:
mqtt {MqttClient}
: mqtt connectiontopic {string}
: message topicmessage {string}
: message dataoptions {IClientSubscribeOptions}
: mqtt message optionscallback {Function}
: callback function
Generates a cron interval called in given seconds Parameter supported:
interval {number}
: interval in seconds
Generates a cron interval called in given minutes Parameter supported:
interval {number}
: interval in minutes
- Fix duplicate topic support for routing.
This project is licensed under LGPL-2.1 Licence.