-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.js
147 lines (120 loc) · 4.04 KB
/
main.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
const DotEnv = require('dotenv')
const MQTTPattern = require('mqtt-pattern')
const env = require('./src/env')
const querystring = require('querystring')
// Load config
DotEnv.config()
// Grab configured MQTT, Openfaas, and Express instances
const openfaas = require('./src/openfaas')
const mqtt = require('./src/mqtt')
const express = require('./src/express')
// Setup the topics (openfaas/invoke/function-name and openfaas/result/function-name)
const mqttInvokeTopic = env('MQTT_TOPIC', 'openfaas') + '/invoke/+'
const mqttInvokeTopicFormat = env('MQTT_TOPIC', 'openfaas') + '/invoke/+functionName'
const mqttResultTopic = env('MQTT_TOPIC', 'openfaas') + '/result'
// Wait for a connection
mqtt.on('connect', function () {
// Subscribe
mqtt.subscribe(mqttInvokeTopic)
console.log('Connected to MQTT listening to topic ' + mqttInvokeTopic)
})
// Accept messages
mqtt.on('message', async function (topic, message) {
// Parse the JSON
try {
var data = JSON.parse(message.toString())
} catch (err) {
console.error('Incoming MQTT messages must be valid JSON')
return false
}
// Parse the topic
const topicParams = MQTTPattern.exec(mqttInvokeTopicFormat, topic)
// Get the function name
const functionName = topicParams.functionName
// Process the input and set a default content type
var input
let contentType
if (typeof (data.input) === 'string') {
input = data.input
contentType = 'text/plain'
} else {
input = JSON.stringify(data.input)
contentType = 'application/json'
}
// Setup defaults and allow overriding
const headers = Object.assign({
'Content-Type': contentType
}, data.headers)
// Default callback params with access token and function name
var params = Object.assign({
access_token: env('WEBHOOK_AUTH'),
function_name: functionName
}, data.callbackParams)
// Build a callback URL
const callbackUrl = env('WEBHOOK_URL') + ':' +
env('WEBHOOK_PORT') +
'?' + querystring.stringify(params)
console.log('- Invoking ' + functionName + ' on OpenFaas')
// Invoke the function on OpenFaas
openfaas.invoke(functionName, input, {
callbackUrl,
headers
})
})
// Handle errors or reconnects
mqtt.on('error', function () {
console.error('Could not connect to MQTT at: ' + env('MQTT_URL', 'n/a'))
})
mqtt.on('reconnect', function () {
console.error('Reconnect to MQTT ' + env('MQTT_URL', 'n/a') + ' trying again')
})
// Setup the webhook server
express.post('/', async function (req, res) {
// Authorize using a token
if (req.token !== env('WEBHOOK_AUTH')) {
return res.sendStatus(401)
}
var payload
// Figure out how it was parsed
if (req.body.constructor.name === 'String') {
// Plain text
payload = req.body
} else if (req.body.constructor.name === 'Object') {
// JSON
payload = req.body
} else if (req.body.constructor.name === 'Buffer') {
// Raw binary
// We need to be able to send back params data as well, and since
// JSON doesn't support binary data, we need to base64 encode them
payload = req.body.toString('base64')
} else {
// We don't understand the body
console.error('Unable to parse the response body')
return res.sendStatus(400)
}
// Remove the access_token
delete req.query.access_token
// Build the actual message
const message = JSON.stringify({
params: req.query,
output: payload
})
// Require a function name
if (typeof req.query.function_name === 'undefined') {
return res.status(400).send('Bad request - function_name parameter is required')
}
// Find the function name to build the topic
const functionName = req.query.function_name
// Publish the body Buffer on MQTT
try {
console.log('- Publishing result of', functionName)
await mqtt.publish(mqttResultTopic + '/' + functionName, message)
} catch (err) {
console.err(err)
}
return res.sendStatus(200)
})
// Start listening
express.listen(env('WEBHOOK_PORT'), env('WEBHOOK_BIND'), null, function () {
console.log('Webhook server is listening on ' + env('WEBHOOK_BIND') + ':' + env('WEBHOOK_PORT'))
})