forked from RWTH-EBC/FiLiP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathe09_ngsi_v2_iota_filip_mqtt.py
313 lines (255 loc) · 10 KB
/
e09_ngsi_v2_iota_filip_mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
"""
# This example shows in more detail how to interact with a device over MQTT
# using FiLiP's IoTA-MQTT Client. This client comes along with a convenient
# API for handling MQTT communication with FIWARE's IoT-Agent
"""
# ## Import packages
import logging
import random
import time
import paho.mqtt.client as mqtt
from filip.config import settings
from urllib.parse import urlparse
from filip.clients.mqtt import IoTAMQTTClient
from filip.models import FiwareHeader
from filip.models.ngsi_v2.iot import \
Device, \
DeviceAttribute, \
DeviceCommand, \
ServiceGroup, \
PayloadProtocol
from filip.models.ngsi_v2.context import NamedCommand
# ## Parameters
#
# To run this example you need a working Fiware v2 setup with a
# context-broker, an IoT-Agent and mqtt-broker. You can here set the
# addresses:
#
# Host address of Context Broker
CB_URL = settings.CB_URL
# Host address of IoT-Agent
IOTA_URL = settings.IOTA_URL
# Host address of the MQTT-Broker
MQTT_BROKER_URL = str(settings.MQTT_BROKER_URL)
# You can here also change the used Fiware service
# FIWARE-Service
SERVICE = 'filip'
# FIWARE-Service path
SERVICE_PATH = '/example'
# You may also change the ApiKey Information
# ApiKey of the ServiceGroup
SERVICE_GROUP_APIKEY = 'filip-example-service-group'
# Setting up logging
logging.basicConfig(
level='INFO',
format='%(asctime)s %(name)s %(levelname)s: %(message)s',
datefmt='%d-%m-%Y %H:%M:%S')
logger = logging.getLogger(__name__)
if __name__ == '__main__':
# # 1 Setup
#
# ## 1.1 FiwareHeader
#
# Since we want to use the multi-tenancy concept of fiware we always start
# with create a fiware header
fiware_header = FiwareHeader(service=SERVICE,
service_path=SERVICE_PATH)
# ## 1.2 Device configuration
#
service_group_json = ServiceGroup(
apikey=SERVICE_PATH.strip('/'),
resource="/iot/json")
service_group_ul = ServiceGroup(
apikey=SERVICE_PATH.strip('/'),
resource="/iot/d")
device_attr = DeviceAttribute(name='temperature',
object_id='t',
type="Number")
device_command = DeviceCommand(name='heater', type="Boolean")
device_json = Device(device_id='my_json_device',
entity_name='my_json_device',
entity_type='Thing',
protocol='IoTA-JSON',
transport='MQTT',
apikey=service_group_json.apikey,
attributes=[device_attr],
commands=[device_command])
device_ul = Device(device_id='my_ul_device',
entity_name='my_ul_device',
entity_type='Thing',
protocol='PDI-IoTA-UltraLight',
transport='MQTT',
apikey=service_group_ul.apikey,
attributes=[device_attr],
commands=[device_command])
# ## 1.3 IoTAMQTTClient
#
mqttc = IoTAMQTTClient()
def on_connect(mqttc, obj, flags, rc, properties=None):
mqttc.logger.info(f"on_connect callback function: Reason code: {rc}")
def on_connect_fail(mqttc, obj):
mqttc.logger.info("Connect failed")
def on_publish(mqttc, obj, mid, rc, properties=None):
mqttc.logger.info(f"on_publish callback function: Message identifier: {mid}")
def on_subscribe(mqttc, obj, mid, granted_qos, properties=None):
mqttc.logger.info(f"Subscribed: {granted_qos[0]}, message identifier: {mid}")
def on_log(mqttc, obj, level, string):
mqttc.logger.info(string)
mqttc.on_connect = on_connect
mqttc.on_connect_fail = on_connect_fail
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
mqttc.on_log = on_log
# # 2 Normal client behaviour
#
# this section demonstrates normal client behavior
# For additional examples on how to use the client please check:
# https://github.com/eclipse/paho.mqtt.python/tree/master/examples
#
first_topic = f"/filip/{SERVICE_PATH.strip('/')}/first"
second_topic = f"/filip/{SERVICE_PATH.strip('/')}/second"
first_payload = "filip_test_1"
second_payload = "filip_test_2"
def on_message_first(mqttc, obj, msg, properties=None):
pass
# do something
def on_message_second(mqttc, obj, msg, properties=None):
pass
# do something
mqttc.message_callback_add(sub=first_topic,
callback=on_message_first)
mqttc.message_callback_add(sub=second_topic,
callback=on_message_second)
mqtt_broker_url = urlparse(MQTT_BROKER_URL)
mqttc.connect(host=mqtt_broker_url.hostname,
port=mqtt_broker_url.port,
keepalive=60,
bind_address="",
bind_port=0,
clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
properties=None)
mqttc.subscribe(topic=first_topic)
# create a non-blocking loop
mqttc.loop_start()
mqttc.publish(topic=first_topic, payload="filip_test")
# add additional subscription to the connection
mqttc.subscribe(topic=second_topic)
mqttc.publish(topic=second_topic, payload="filip_test")
# remove subscriptions and callbacks
mqttc.message_callback_remove(first_topic)
mqttc.message_callback_remove(second_topic)
mqttc.unsubscribe(first_topic)
mqttc.unsubscribe(second_topic)
# stop the network loop and disconnect cleanly
# close the mqtt listening thread
mqttc.loop_stop()
# disconnect the mqtt device
mqttc.disconnect()
# # 3 Devices provisioning
#
# ## 3.1 Service groups
#
# ### 3.1.1 create service Groups
#
mqttc.add_service_group(service_group=service_group_json)
# ### 3.1.2 Interact with service groups
#
mqttc.get_service_group(service_group_json.apikey)
mqttc.update_service_group(service_group=service_group_json)
# ### 3.1.2 Delete service groups
#
mqttc.delete_service_group(apikey=service_group_json.apikey)
# ## 3.2 Devices
#
# ### 3.2.1 Create Device
#
mqttc.add_device(device=device_json)
# ### 3.2.2 Interact with device
#
mqttc.get_device(device_json.device_id)
mqttc.update_device(device=device_json)
# ### 3.2.3 Delete device
#
mqttc.delete_device(device_id=device_json.device_id)
# # 4 Commands
#
# This example is written for the JSON MQTT client, but it can be easily
# adapted for Ultralight, by changing all JSON/json variable name parts
# with the corresponding UL/ul parts
#
# ## 4.1 Setup MQTT client
#
# small clean up
for group in mqttc.service_groups:
mqttc.delete_service_group(group.apikey)
for device in mqttc.devices:
mqttc.delete_device(device.device_id)
def on_command(client, obj, msg):
apikey, device_id, payload = \
client.get_encoder(PayloadProtocol.IOTA_JSON).decode_message(
msg=msg)
# acknowledge a command. Here command are usually single
# messages. The first key is equal to the commands name.
client.publish(device_id=device_id,
command_name=next(iter(payload)),
payload=payload)
mqttc.add_service_group(service_group_json)
mqttc.add_device(device_json)
mqttc.add_command_callback(device_id=device_json.device_id,
callback=on_command)
from filip.clients.ngsi_v2 import HttpClient, HttpClientConfig
httpc_config = HttpClientConfig(cb_url=CB_URL,
iota_url=IOTA_URL)
httpc = HttpClient(fiware_header=fiware_header,
config=httpc_config)
httpc.iota.post_group(service_group=service_group_json, update=True)
httpc.iota.post_device(device=device_json, update=True)
mqtt_broker_url = urlparse(MQTT_BROKER_URL)
mqttc.connect(host=mqtt_broker_url.hostname,
port=mqtt_broker_url.port,
keepalive=60,
bind_address="",
bind_port=0,
clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
properties=None)
mqttc.subscribe()
mqttc.loop_start()
# ## 4.2 Command
#
entity = httpc.cb.get_entity(entity_id=device_json.device_id,
entity_type=device_json.entity_type)
context_command = NamedCommand(name=device_json.commands[0].name,
value=False)
httpc.cb.post_command(entity_id=entity.id,
entity_type=entity.type,
command=context_command)
time.sleep(2)
entity = httpc.cb.get_entity(entity_id=device_json.device_id,
entity_type=device_json.entity_type)
# The entity.heater_status.value should now have the status 'OK'
print(f"Heater status value: {entity.heater_status.value}")
# ## 4.3 Publish
#
payload = random.randint(0, 30)
mqttc.publish(device_id=device_json.device_id,
payload={device_json.attributes[0].object_id: payload})
time.sleep(1)
entity = httpc.cb.get_entity(entity_id=device_json.device_id,
entity_type=device_json.entity_type)
# Set Temperature Value
print(f"Entity temperature value before publishing: {entity.temperature.value}")
payload = random.randint(0, 30)
mqttc.publish(device_id=device_json.device_id,
attribute_name="temperature",
payload=payload)
time.sleep(1)
entity = httpc.cb.get_entity(entity_id=device_json.device_id,
entity_type=device_json.entity_type)
# Changed Temperature Value
print(f"Entity temperature value after publishing: {entity.temperature.value}")
# ## 4.4 Close Client
# stop network loop and disconnect cleanly
# close the mqtt listening thread
mqttc.loop_stop()
# disconnect the mqtt device
mqttc.disconnect()