Skip to content

Commit

Permalink
Add possibility to acquire bulk readings in compact JSON format
Browse files Browse the repository at this point in the history
The "compact JSON" format is currently defined with timestamps as keys.

Example:
{
  "1611082554": {
    "temperature": 21.42,
    "humidity": 41.55
  },
  "1611082568": {
    "temperature": 42.84,
    "humidity": 83.1
  }
}
  • Loading branch information
amotl committed Jul 28, 2021
1 parent a6b252b commit b69e9b6
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ in progress
- CI: Add testing against Python 3.9
- CI: Run tests against different versions of Mosquitto, InfluxDB and Grafana
- Add possibility to acquire bulk readings in JSON format
- Add possibility to acquire bulk readings in compact JSON format,
with timestamps as keys


.. _kotori-0.26.8:
Expand Down
7 changes: 7 additions & 0 deletions kotori/daq/decoder/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# (c) 2019-2021 Andreas Motl <[email protected]>
from kotori.daq.decoder.airrohr import AirrohrDecoder
from kotori.daq.decoder.json import CompactTimestampedJsonDecoder
from kotori.daq.decoder.tasmota import TasmotaSensorDecoder, TasmotaStateDecoder
from kotori.daq.decoder.schema import MessageType

Expand All @@ -23,6 +24,12 @@ def probe(self):
if 'slot' not in self.topology:
return False

# Compact JSON format, with timestamps as keys
if self.topology.slot.endswith('tc.json'):
self.info.message_type = MessageType.DATA_CONTAINER
self.info.decoder = CompactTimestampedJsonDecoder
return True

# Airrohr
if self.topology.slot.endswith('airrohr.json'):
self.info.message_type = MessageType.DATA_CONTAINER
Expand Down
44 changes: 44 additions & 0 deletions kotori/daq/decoder/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# (c) 2021 Andreas Motl <[email protected]>
import json


class CompactTimestampedJsonDecoder:
"""
Decode JSON payloads in compact format, with timestamps as keys.
Documentation
=============
- https://getkotori.org/docs/handbook/decoders/json.html (not yet)
- https://github.com/daq-tools/kotori/issues/39
Example
=======
::
{
"1611082554": {
"temperature": 21.42,
"humidity": 41.55
},
"1611082568": {
"temperature": 42.84,
"humidity": 83.1
}
}
"""

@staticmethod
def decode(payload):

# Decode from JSON.
message = json.loads(payload)

# Create list of data dictionaries.
data = []
for timestamp, item in message.items():
item["time"] = timestamp
data.append(item)

return data
1 change: 1 addition & 0 deletions test/settings/mqttkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class TestSettings:
mqtt2_topic_json = 'mqttkit-1/itest/foo/bar2/data.json'
mqtt_topic_event = 'mqttkit-1/itest/foo/bar/event.json'
mqtt_topic_homie = 'mqttkit-1/itest/foo/bar/data/__json__'
mqtt_topic_json_compact = 'mqttkit-1/itest/foo/bar/tc.json'
mqtt_topic_json_legacy = 'mqttkit-1/itest/foo/bar/message-json'

# HTTP channel settings.
Expand Down
34 changes: 34 additions & 0 deletions test/test_daq_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,40 @@ def test_mqtt_to_influxdb_json_bulk(machinery, create_influxdb, reset_influxdb):
assert record == {u'temperature': 42.84, u'humidity': 83.1}


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
def test_mqtt_to_influxdb_json_compact_bulk(machinery, create_influxdb, reset_influxdb):
"""
Publish multiple readings in compact JSON format to MQTT broker
and proof they are stored in the InfluxDB database.
https://github.com/daq-tools/kotori/issues/39
"""

# Submit multiple measurements, with timestamp.
data = {
"1611082554": {
"temperature": 21.42,
"humidity": 41.55,
},
"1611082568": {
"temperature": 42.84,
"humidity": 83.1,
},
}
yield threads.deferToThread(mqtt_json_sensor, settings.mqtt_topic_json_compact, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY)

# Proof that data arrived in InfluxDB.
record = influx_sensors.get_record(index=0)
assert record == {u'time': '2021-01-19T18:55:54Z', u'temperature': 21.42, u'humidity': 41.55}

record = influx_sensors.get_record(index=1)
assert record == {u'time': '2021-01-19T18:56:08Z', u'temperature': 42.84, u'humidity': 83.1}


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
@pytest.mark.legacy
Expand Down

0 comments on commit b69e9b6

Please sign in to comment.