From b69e9b6a46f28e06f8191cc40faf1e836831cf2c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 21 Jan 2021 03:10:28 +0100 Subject: [PATCH] Add possibility to acquire bulk readings in compact JSON format The "compact JSON" format is currently defined with timestamps as keys. Example: { "1611082554": { "temperature": 21.42, "humidity": 41.55 }, "1611082568": { "temperature": 42.84, "humidity": 83.1 } } --- CHANGES.rst | 2 ++ kotori/daq/decoder/__init__.py | 7 ++++++ kotori/daq/decoder/json.py | 44 ++++++++++++++++++++++++++++++++++ test/settings/mqttkit.py | 1 + test/test_daq_mqtt.py | 34 ++++++++++++++++++++++++++ 5 files changed, 88 insertions(+) create mode 100644 kotori/daq/decoder/json.py diff --git a/CHANGES.rst b/CHANGES.rst index d41615d3..93d35f7c 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,6 +10,8 @@ in progress - CI: Add testing against Python 3.9 - CI: Run tests against different versions of Mosquitto, InfluxDB and Grafana - Add possibility to acquire bulk readings in JSON format +- Add possibility to acquire bulk readings in compact JSON format, + with timestamps as keys .. _kotori-0.26.8: diff --git a/kotori/daq/decoder/__init__.py b/kotori/daq/decoder/__init__.py index 35b0e3fe..f9a8a874 100644 --- a/kotori/daq/decoder/__init__.py +++ b/kotori/daq/decoder/__init__.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # (c) 2019-2021 Andreas Motl from kotori.daq.decoder.airrohr import AirrohrDecoder +from kotori.daq.decoder.json import CompactTimestampedJsonDecoder from kotori.daq.decoder.tasmota import TasmotaSensorDecoder, TasmotaStateDecoder from kotori.daq.decoder.schema import MessageType @@ -23,6 +24,12 @@ def probe(self): if 'slot' not in self.topology: return False + # Compact JSON format, with timestamps as keys + if self.topology.slot.endswith('tc.json'): + self.info.message_type = MessageType.DATA_CONTAINER + self.info.decoder = CompactTimestampedJsonDecoder + return True + # Airrohr if self.topology.slot.endswith('airrohr.json'): self.info.message_type = MessageType.DATA_CONTAINER diff --git a/kotori/daq/decoder/json.py b/kotori/daq/decoder/json.py new file mode 100644 index 00000000..5e66de69 --- /dev/null +++ b/kotori/daq/decoder/json.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# (c) 2021 Andreas Motl +import json + + +class CompactTimestampedJsonDecoder: + """ + Decode JSON payloads in compact format, with timestamps as keys. + + Documentation + ============= + - https://getkotori.org/docs/handbook/decoders/json.html (not yet) + - https://github.com/daq-tools/kotori/issues/39 + + Example + ======= + :: + + { + "1611082554": { + "temperature": 21.42, + "humidity": 41.55 + }, + "1611082568": { + "temperature": 42.84, + "humidity": 83.1 + } + } + + """ + + @staticmethod + def decode(payload): + + # Decode from JSON. + message = json.loads(payload) + + # Create list of data dictionaries. + data = [] + for timestamp, item in message.items(): + item["time"] = timestamp + data.append(item) + + return data diff --git a/test/settings/mqttkit.py b/test/settings/mqttkit.py index f99a921d..14caf6b9 100644 --- a/test/settings/mqttkit.py +++ b/test/settings/mqttkit.py @@ -26,6 +26,7 @@ class TestSettings: mqtt2_topic_json = 'mqttkit-1/itest/foo/bar2/data.json' mqtt_topic_event = 'mqttkit-1/itest/foo/bar/event.json' mqtt_topic_homie = 'mqttkit-1/itest/foo/bar/data/__json__' + mqtt_topic_json_compact = 'mqttkit-1/itest/foo/bar/tc.json' mqtt_topic_json_legacy = 'mqttkit-1/itest/foo/bar/message-json' # HTTP channel settings. diff --git a/test/test_daq_mqtt.py b/test/test_daq_mqtt.py index 3ca23683..beff2b9b 100644 --- a/test/test_daq_mqtt.py +++ b/test/test_daq_mqtt.py @@ -71,6 +71,40 @@ def test_mqtt_to_influxdb_json_bulk(machinery, create_influxdb, reset_influxdb): assert record == {u'temperature': 42.84, u'humidity': 83.1} +@pytest_twisted.inlineCallbacks +@pytest.mark.mqtt +def test_mqtt_to_influxdb_json_compact_bulk(machinery, create_influxdb, reset_influxdb): + """ + Publish multiple readings in compact JSON format to MQTT broker + and proof they are stored in the InfluxDB database. + + https://github.com/daq-tools/kotori/issues/39 + """ + + # Submit multiple measurements, with timestamp. + data = { + "1611082554": { + "temperature": 21.42, + "humidity": 41.55, + }, + "1611082568": { + "temperature": 42.84, + "humidity": 83.1, + }, + } + yield threads.deferToThread(mqtt_json_sensor, settings.mqtt_topic_json_compact, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY) + + # Proof that data arrived in InfluxDB. + record = influx_sensors.get_record(index=0) + assert record == {u'time': '2021-01-19T18:55:54Z', u'temperature': 21.42, u'humidity': 41.55} + + record = influx_sensors.get_record(index=1) + assert record == {u'time': '2021-01-19T18:56:08Z', u'temperature': 42.84, u'humidity': 83.1} + + @pytest_twisted.inlineCallbacks @pytest.mark.mqtt @pytest.mark.legacy