From 16556ff41efc05cab961b0db11ca7ae74a8aa6c4 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 21 Jan 2021 02:20:49 +0100 Subject: [PATCH] Add possibility to acquire bulk readings in JSON format --- CHANGES.rst | 5 +++ kotori/daq/graphing/grafana/dashboard.py | 13 +++++- kotori/daq/storage/influx.py | 9 ++++ test/test_daq_grafana.py | 52 ++++++++++++++++++++++++ test/test_daq_mqtt.py | 34 ++++++++++++++++ 5 files changed, 112 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index 3a096fa3..17725d5f 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -3,6 +3,11 @@ Changelog ********* +incomplete +========== +- Add possibility to acquire bulk readings in JSON format + + in progress =========== - Add basic information about RabbitMQ diff --git a/kotori/daq/graphing/grafana/dashboard.py b/kotori/daq/graphing/grafana/dashboard.py index 7fa92ab3..6173f30d 100644 --- a/kotori/daq/graphing/grafana/dashboard.py +++ b/kotori/daq/graphing/grafana/dashboard.py @@ -275,8 +275,19 @@ def collect_fields(data, prefixes=None, sorted=True): # time is from intercom.mqtt blacklist = ['_hex_', 'time'] + # Compute list of unique attribute names. + if isinstance(data, dict): + keys = data.keys() + elif isinstance(data, list): + keys = set() + for item in data: + for key in item.keys(): + keys.add(key) + else: + raise ValueError(f"Type of data {type(data)} not accepted") + fields = [] - for field in data.keys(): + for field in keys: if field in blacklist: continue diff --git a/kotori/daq/storage/influx.py b/kotori/daq/storage/influx.py index d48f9f6a..7a0ebcbb 100644 --- a/kotori/daq/storage/influx.py +++ b/kotori/daq/storage/influx.py @@ -64,6 +64,15 @@ def is_udp_database(self, name): return False def write(self, meta, data): + if isinstance(data, dict): + self.write_single(meta, data) + elif isinstance(data, list): + for item in data: + self.write_single(meta, item) + else: + raise ValueError(f"Type of data {type(data)} not accepted") + + def write_single(self, meta, data): meta_copy = deepcopy(dict(meta)) data_copy = deepcopy(data) diff --git a/test/test_daq_grafana.py b/test/test_daq_grafana.py index 7f5418bb..b6c13f93 100644 --- a/test/test_daq_grafana.py +++ b/test/test_daq_grafana.py @@ -171,3 +171,55 @@ def test_mqtt_to_grafana_two_dashboards(machinery, create_influxdb, reset_influx titles = grafana.get_dashboard_titles() assert settings.grafana_dashboards[0] in titles assert settings.grafana_dashboards[1] in titles + + +@pytest_twisted.inlineCallbacks +@pytest.mark.grafana +def test_mqtt_to_grafana_bulk(machinery, create_influxdb, reset_influxdb, reset_grafana): + """ + Publish multiple readings in JSON format to MQTT broker and proof + that a corresponding datasource and a dashboard was created in Grafana. + """ + + # Submit multiple measurements, without timestamp. + data = [ + { + 'temperature': 21.42, + 'humidity': 41.55, + }, + { + 'temperature': 42.84, + 'humidity': 83.1, + 'voltage': 4.2, + }, + { + 'weight': 10.10, + }, + ] + yield mqtt_json_sensor(settings.mqtt_topic_json, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that Grafana is well provisioned. + logger.info('Grafana: Checking datasource') + datasource_names = [] + for datasource in grafana.client.datasources.get(): + datasource_names.append(datasource['name']) + assert settings.influx_database in datasource_names + + logger.info('Grafana: Checking dashboard') + dashboard_name = settings.grafana_dashboards[0] + dashboard = grafana.get_dashboard_by_name(dashboard_name) + targets = dashboard['rows'][0]['panels'][0]['targets'] + + # Validate table name. + assert targets[0]['measurement'] == settings.influx_measurement_sensors + + # Validate field names. + fields = set() + for target in targets: + fields.add(target["fields"][0]["name"]) + assert fields == set(["temperature", "humidity", "weight", "voltage"]) diff --git a/test/test_daq_mqtt.py b/test/test_daq_mqtt.py index a92b970d..3ca23683 100644 --- a/test/test_daq_mqtt.py +++ b/test/test_daq_mqtt.py @@ -37,6 +37,40 @@ def test_mqtt_to_influxdb_json_single(machinery, create_influxdb, reset_influxdb yield record +@pytest_twisted.inlineCallbacks +@pytest.mark.mqtt +def test_mqtt_to_influxdb_json_bulk(machinery, create_influxdb, reset_influxdb): + """ + Publish multiple readings in JSON format to MQTT broker + and proof it is stored in the InfluxDB database. + """ + + # Submit multiple measurements, without timestamp. + data = [ + { + 'temperature': 21.42, + 'humidity': 41.55, + }, + { + 'temperature': 42.84, + 'humidity': 83.1, + }, + ] + yield threads.deferToThread(mqtt_json_sensor, settings.mqtt_topic_json, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that data arrived in InfluxDB. + record = influx_sensors.get_record(index=0) + del record['time'] + assert record == {u'temperature': 21.42, u'humidity': 41.55} + + record = influx_sensors.get_record(index=1) + del record['time'] + assert record == {u'temperature': 42.84, u'humidity': 83.1} + + @pytest_twisted.inlineCallbacks @pytest.mark.mqtt @pytest.mark.legacy