From 6e22db986b0172e402129a25fa59b40ec3f17992 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 15 Aug 2024 17:05:28 +0200 Subject: [PATCH 01/26] maxPayloadSizeBytes 1000000 --- thingsboard_gateway/gateway/tb_gateway_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 96881c001..8322ff1d0 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -1140,7 +1140,7 @@ def __send_data_pack_to_storage(self, data, connector_name, connector_id=None): def check_size(self, devices_data_in_event_pack): if (self.__get_data_size(devices_data_in_event_pack) - >= self.__config["thingsboard"].get("maxPayloadSizeBytes", 400)): + >= 1000000): #self.__config["thingsboard"].get("maxPayloadSizeBytes", 400)): self.__send_data(devices_data_in_event_pack) for device in devices_data_in_event_pack: devices_data_in_event_pack[device]["telemetry"] = [] @@ -1690,7 +1690,7 @@ def ping(self): return self.name def get_max_payload_size_bytes(self): - return self.__config["thingsboard"].get("maxPayloadSizeBytes", 400) + return 1000000 #self.__config["thingsboard"].get("maxPayloadSizeBytes", 400) # ---------------------------- # Storage -------------------- From 316764dbac44a756c78fc877547337f4892e21eb Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 15 Aug 2024 17:39:32 +0200 Subject: [PATCH 02/26] __min_pack_size_to_send workout, logs added to debug the mqtt client --- thingsboard_gateway/gateway/tb_gateway_service.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 8322ff1d0..4df5dbdd0 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -224,7 +224,7 @@ def __init__(self, config_file=None): self.__min_pack_send_delay_ms = 10#self.__config['thingsboard'].get('minPackSendDelayMS', 200) self.__min_pack_send_delay_ms = self.__min_pack_send_delay_ms / 1000.0 - self.__min_pack_size_to_send = 1000 #self.__config['thingsboard'].get('minPackSizeToSend', 50) + self.__min_pack_size_to_send = 10 #self.__config['thingsboard'].get('minPackSizeToSend', 50) self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True, name="Send data to Thingsboard Thread") @@ -1148,8 +1148,8 @@ def check_size(self, devices_data_in_event_pack): def __read_data_from_storage(self): devices_data_in_event_pack = {} - log.debug("Send data Thread has been started successfully.") - log.debug("Maximal size of the client message queue is: %r", + log.info("Send data Thread has been started successfully.") + log.info("Maximal size of the client message queue is: %r", self.tb_client.client._client._max_queued_messages) # noqa pylint: disable=protected-access while not self.stopped: @@ -1190,8 +1190,10 @@ def __read_data_from_storage(self): if devices_data_in_event_pack: if not self.tb_client.is_connected(): continue + log.debug("Await self.__rpc_reply_sent...") while self.__rpc_reply_sent: sleep(.01) + log.debug("__send_data...") self.__send_data(devices_data_in_event_pack) # noqa if self.tb_client.is_connected() and ( @@ -1253,17 +1255,21 @@ def __send_data(self, devices_data_in_event_pack): if devices_data_in_event_pack[device].get("attributes"): if device == self.name or device == "currentThingsBoardGateway": + log.debug("tb_client.client.send_attributes") self._published_events.put( self.tb_client.client.send_attributes(devices_data_in_event_pack[device]["attributes"])) else: + log.debug("tb_client.client.gw_send_attributes") self._published_events.put(self.tb_client.client.gw_send_attributes(final_device_name, devices_data_in_event_pack[ device]["attributes"])) if devices_data_in_event_pack[device].get("telemetry"): if device == self.name or device == "currentThingsBoardGateway": + log.debug("tb_client.client.send_telemetry") self._published_events.put( self.tb_client.client.send_telemetry(devices_data_in_event_pack[device]["telemetry"])) else: + log.debug("tb_client.client.gw_send_telemetry") self._published_events.put(self.tb_client.client.gw_send_telemetry(final_device_name, devices_data_in_event_pack[ device]["telemetry"])) From e7900ae29d2fa8e3c170cec4374a8953e6be4e02 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 15 Aug 2024 17:46:37 +0200 Subject: [PATCH 03/26] log = TbLogger('service', gateway=self, level='DEBUG') --- thingsboard_gateway/gateway/tb_gateway_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 4df5dbdd0..fe997e11e 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -154,7 +154,7 @@ def __init__(self, config_file=None): logging_error = e global log - log = TbLogger('service', gateway=self, level='INFO') + log = TbLogger('service', gateway=self, level='DEBUG') global main_handler self.main_handler = main_handler log.addHandler(self.main_handler) From 4773e9b63a1d9d2f6bbdc5adbb8f601176e44a3f Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 16 Aug 2024 14:04:11 +0200 Subject: [PATCH 04/26] asyncua requirements --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 5aceb640b..45111ba6f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ cachetools tb-mqtt-client>=1.9.9 service-identity psutil +asyncua From 2900eb89047a65dac7b6b97bd6cecbd32477615e Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 16 Aug 2024 14:19:15 +0200 Subject: [PATCH 05/26] docker python12 and cleanup --- build_docker.sh | 7 ++++--- docker/Dockerfile | 12 ++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/build_docker.sh b/build_docker.sh index 2e1cadd70..57b3d7c5b 100755 --- a/build_docker.sh +++ b/build_docker.sh @@ -25,11 +25,12 @@ VERSION_TAG="${BRANCH_NAME}-${COMMIT_ID}" echo "$(date) Building project with version tag $VERSION_TAG ..." set -x -#docker build . -t sevlamat/tb-gateway:$VERSION_TAG -f docker/Dockerfile -o type=registry +BUILDKIT_PROGRESS=plain \ +docker build . -t sevlamat/tb-gateway:$VERSION_TAG -f docker/Dockerfile -o type=registry # multi arch - DOCKER_CLI_EXPERIMENTAL=enabled \ - docker buildx build . -t sevlamat/tb-gateway:$VERSION_TAG -f docker/Dockerfile --platform=linux/amd64,linux/arm64 -o type=registry +#DOCKER_CLI_EXPERIMENTAL=enabled \ +#docker buildx build . -t sevlamat/tb-gateway:$VERSION_TAG -f docker/Dockerfile --platform=linux/amd64,linux/arm64,linux/386 -o type=registry set +x echo "$(date) Done." diff --git a/docker/Dockerfile b/docker/Dockerfile index 916be6a30..da579311c 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -29,11 +29,6 @@ RUN mkdir -p /default-config/config /default-config/extensions/ && \ echo "Unsupported platform detected. Trying to use default value...";; \ esac && \ curl https://sh.rustup.rs -sSf | sh -s -- -y --default-host=$DEFAULT_HOST --profile minimal && \ - apt-get remove --purge -y \ - gcc python3-dev build-essential libssl-dev libffi-dev zlib1g-dev pkg-config && \ - apt-get autoremove -y && \ - apt-get clean && \ - rm -rf /var/lib/apt/lists/* /tmp/* && \ echo '#!/bin/sh\n\ # Main start script\n\ CONF_FOLDER="/thingsboard_gateway/config"\n\ @@ -49,7 +44,12 @@ python /thingsboard_gateway/tb_gateway.py' > /start-gateway.sh && chmod +x /star python3 -m pip install --no-cache-dir --upgrade pip setuptools wheel && \ python3 -m pip install --no-cache-dir cryptography && \ python3 -m pip install --no-cache-dir -r requirements.txt && \ - rustup self uninstall -y + rustup self uninstall -y && \ + apt-get remove --purge -y \ + gcc python3-dev build-essential libssl-dev libffi-dev zlib1g-dev pkg-config && \ + apt-get autoremove -y && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* /tmp/* VOLUME ["${configs}", "${extensions}", "${logs}"] From 0e7271c1983306df9801bd31727fd52d3c7bb0e7 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 16 Aug 2024 14:19:50 +0200 Subject: [PATCH 06/26] revert back default logger to INFO --- thingsboard_gateway/gateway/tb_gateway_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index fe997e11e..4df5dbdd0 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -154,7 +154,7 @@ def __init__(self, config_file=None): logging_error = e global log - log = TbLogger('service', gateway=self, level='DEBUG') + log = TbLogger('service', gateway=self, level='INFO') global main_handler self.main_handler = main_handler log.addHandler(self.main_handler) From a80630acb36a8e306f1c992010e2133a33ee35cc Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Sun, 18 Aug 2024 21:20:47 +0200 Subject: [PATCH 07/26] log debug Converted devices, attr, telemetry --- .../connectors/opcua/opcua_connector.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 71e34226c..93bbce59a 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -510,6 +510,10 @@ async def __poll_nodes(self): values = await self.__client.read_attributes(all_nodes) converted_nodes_count = 0 + converted_data_dev_count = 0 + converted_data_attr_count = 0 + converted_telemetry_ts_count = 0 + for device in self.__device_nodes: nodes_count = len(device.nodes) device_values = values[converted_nodes_count:converted_nodes_count + nodes_count] @@ -517,11 +521,19 @@ async def __poll_nodes(self): device.converter.convert(device.nodes, device_values) converter_data = device.converter.get_data() if converter_data: + converted_data_dev_count = len(converter_data) + for data_entry in converter_data: + converted_data_attr_count += len(data_entry['attributes']) + converted_telemetry_ts_count += len(data_entry['telemetry']) + self.__data_to_send.put(*converter_data) device.converter.clear_data() - self.__log.debug('Converted nodes values count: %s', converted_nodes_count) + self.__log.debug('Converted nodes %s: devices %s, attr %s, telemetry %s', + converted_nodes_count, converted_data_dev_count, + converted_data_attr_count, converted_telemetry_ts_count) + else: self.__log.info('No nodes to poll') From 9e7c1dfb0363ae0c73f9e46e318546d82f8cf8e6 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 19 Aug 2024 15:30:28 +0200 Subject: [PATCH 08/26] echo version tag after finish on docker build script --- build_docker.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build_docker.sh b/build_docker.sh index 57b3d7c5b..a5da626fb 100755 --- a/build_docker.sh +++ b/build_docker.sh @@ -33,4 +33,4 @@ docker build . -t sevlamat/tb-gateway:$VERSION_TAG -f docker/Dockerfile -o type= #docker buildx build . -t sevlamat/tb-gateway:$VERSION_TAG -f docker/Dockerfile --platform=linux/amd64,linux/arm64,linux/386 -o type=registry set +x -echo "$(date) Done." +echo "$(date) Done. ${VERSION_TAG}" From cffd0c848d7f3735fa08daea68b5d5de1267bd50 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 27 Aug 2024 09:43:48 +0200 Subject: [PATCH 09/26] set get_max_payload_size_bytes custom value 1_000_000 --- thingsboard_gateway/gateway/tb_gateway_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 0d4283dc4..9262f2538 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -1692,7 +1692,7 @@ def get_max_payload_size_bytes(self): if hasattr(self, '_TBGatewayService__max_payload_size_in_bytes'): return self.__max_payload_size_in_bytes - return 8196 + return 1_000_000 # ---------------------------- # Storage -------------------- From 7db6cdc5007efa1af37ca548e8ba7c15542bcb20 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 27 Aug 2024 09:52:01 +0200 Subject: [PATCH 10/26] custom value minPackSendDelayMS = 10 --- thingsboard_gateway/gateway/tb_gateway_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 9262f2538..334e9af9a 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -223,7 +223,7 @@ def __init__(self, config_file=None): self.init_statistics_service(self.__config['thingsboard'].get('statistics', DEFAULT_STATISTIC)) - self.__min_pack_send_delay_ms = self.__config['thingsboard'].get('minPackSendDelayMS', 50) + self.__min_pack_send_delay_ms = 10 #self.__config['thingsboard'].get('minPackSendDelayMS', 50) self.__min_pack_send_delay_ms = self.__min_pack_send_delay_ms / 1000.0 self.__min_pack_size_to_send = 10 #self.__config['thingsboard'].get('minPackSizeToSend', 50) self.__max_payload_size_in_bytes = 1_000_000 #self.__config["thingsboard"].get("maxPayloadSizeBytes", 400) From 66c44646cfe11ca0ee2c937362ae0b776a0295af Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 10 Sep 2024 17:17:38 +0200 Subject: [PATCH 11/26] OPCUA converter timestamp = int(time() * 1000) --- .../connectors/opcua/opcua_uplink_converter.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index 9af4d8a83..9690fba30 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -67,7 +67,7 @@ def convert(self, configs, values): telemetry_datapoints_count = 0 attributes_datapoints_count = 0 - basic_timestamp = int(time() * 1000) + timestamp = int(time() * 1000) for val, config in zip(values, configs): if not val: @@ -91,16 +91,6 @@ def convert(self, configs, values): section = DATA_TYPES[config['section']] - if val.SourceTimestamp is not None: - if abs(basic_timestamp - val.SourceTimestamp.timestamp()) > 3600: - self._log.warning("Timestamps are not in sync for incoming value: %r. " - "Value timestamp: %s, current timestamp: %s", - val, val.SourceTimestamp.timestamp(), basic_timestamp) - else: - basic_timestamp = val.SourceTimestamp.timestamp() * 1000 - - timestamp = basic_timestamp - if val.SourceTimestamp and section == TELEMETRY_PARAMETER: telemetry_datapoints_count += 1 if timestamp in telemetry_datapoints: From e29c971162ca918c39e4d0537c287600f54f4c70 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 12 Sep 2024 14:31:51 +0200 Subject: [PATCH 12/26] opcua_json_uplink_converter.py --- .../opcua/opcua_json_uplink_converter.py | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py diff --git a/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py b/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py new file mode 100644 index 000000000..61f18151d --- /dev/null +++ b/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py @@ -0,0 +1,163 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import timezone +from time import time + +from thingsboard_gateway.connectors.converter import Converter +from asyncua.ua.uatypes import VariantType + +from thingsboard_gateway.gateway.constants import TELEMETRY_PARAMETER, ATTRIBUTES_PARAMETER, DEVICE_NAME_PARAMETER, \ + DEVICE_TYPE_PARAMETER, TELEMETRY_VALUES_PARAMETER, TELEMETRY_TIMESTAMP_PARAMETER +from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService + +DATA_TYPES = { + 'attributes': ATTRIBUTES_PARAMETER, + 'timeseries': TELEMETRY_PARAMETER +} + +VARIANT_TYPE_HANDLERS = { + VariantType.ExtensionObject: lambda data: str(data), + VariantType.DateTime: lambda data: data.replace( + tzinfo=timezone.utc).isoformat() if data.tzinfo is None else data.isoformat(), + VariantType.StatusCode: lambda data: data.name, + VariantType.QualifiedName: lambda data: data.to_string(), + VariantType.NodeId: lambda data: data.to_string(), + VariantType.ExpandedNodeId: lambda data: data.to_string(), + VariantType.ByteString: lambda data: data.hex(), + VariantType.XmlElement: lambda data: data.decode('utf-8'), + VariantType.Guid: lambda data: str(data), + VariantType.DiagnosticInfo: lambda data: data.to_string(), + VariantType.Null: lambda data: None +} + + +class OpcuaJsonUplinkConverter(Converter): + def __init__(self, config, logger): + self._log = logger + self.__config = config + + def __get_parsed_value(self, val): + value = val.Value.Value + + if isinstance(value, list): + value = [str(item) for item in value] + elif value is not None and not isinstance(value, (int, float, str, bool, dict, type(None))): + handler = VARIANT_TYPE_HANDLERS.get(val.Value.VariantType, + lambda value: str(value) if not hasattr(value, + 'to_string') else value.to_string()) + value = handler(value) + + if value is None and val.StatusCode.is_bad(): + msg = "Bad status code: %r for node: %r with description %r" % (val.StatusCode.name, + val.data_type, + val.StatusCode.doc) + self._log.warning(msg) + value = msg + + return value + + def __get_data_values(self, config, val): + try: + value = self.__get_parsed_value(val) + + data = { + 'Value': { + 'value': value, + 'VariantType': val.Value.VariantType._name_, + 'IsArray': val.Value.IsArray if hasattr(val.Value, 'IsArray') else 'Unknown', + }, + 'SourceTimestamp': val.SourceTimestamp.isoformat(), + 'ServerTimestamp': val.ServerTimestamp.isoformat(), + 'SourcePicoseconds': val.SourcePicoseconds, + 'ServerPicoseconds': val.ServerPicoseconds, + 'DataType': { + 'Identifier': val.data_type.Identifier, + 'NamespaceIndex': val.data_type.NamespaceIndex, + 'NodeIdType': '{}:{}'.format(val.data_type.NodeIdType._name_, val.data_type.NodeIdType._name_), + }, + 'Encoding': val.Encoding, + 'PlatformKey': config.get('key'), + 'Node': config['node'].__str__() if config.get('node') else 'Can\'t get node string representation', + } + + return data + except Exception as e: + self._log.error('Cannot get data values: %s' % str(e)) + return {} + + def convert(self, configs, values): + StatisticsService.count_connector_message(self._log.name, 'convertersMsgProcessed') + + try: + if not isinstance(configs, list): + configs = [configs] + if not isinstance(values, list): + values = [values] + + result_data = { + DEVICE_NAME_PARAMETER: self.__config['device_name'], + DEVICE_TYPE_PARAMETER: self.__config['device_type'], + ATTRIBUTES_PARAMETER: [], + TELEMETRY_PARAMETER: []} + telemetry_datapoints = {} + + telemetry_datapoints_count = 0 + attributes_datapoints_count = 0 + + basic_timestamp = int(time() * 1000) + + for val, config in zip(values, configs): + if not val: + continue + + data = self.__get_data_values(config, val) + + section = DATA_TYPES[config['section']] + + if val.SourceTimestamp is not None: + if abs(basic_timestamp - val.SourceTimestamp.timestamp()) > 3600: + self._log.warning("Timestamps are not in sync for incoming value: %r. " + "Value timestamp: %s, current timestamp: %s", + val, val.SourceTimestamp.timestamp(), basic_timestamp) + else: + basic_timestamp = val.SourceTimestamp.timestamp() * 1000 + + timestamp = basic_timestamp + + if val.SourceTimestamp and section == TELEMETRY_PARAMETER: + telemetry_datapoints_count += 1 + if timestamp in telemetry_datapoints: + telemetry_datapoints[timestamp].update({config['key']: data}) + else: + telemetry_datapoints[timestamp] = {config['key']: data} + else: + attributes_datapoints_count += 1 + result_data[section].append({config['key']: data}) + + if telemetry_datapoints: + result_data[TELEMETRY_PARAMETER].extend( + {TELEMETRY_TIMESTAMP_PARAMETER: timestamp, TELEMETRY_VALUES_PARAMETER: datapoints} + for timestamp, datapoints in telemetry_datapoints.items() + ) + + StatisticsService.count_connector_message(self._log.name, 'convertersAttrProduced', + count=attributes_datapoints_count) + StatisticsService.count_connector_message(self._log.name, 'convertersTsProduced', + count=telemetry_datapoints_count) + + return result_data + except Exception as e: + self._log.exception(e) + StatisticsService.count_connector_message(self._log.name, 'convertersMsgDropped') From a6082ea2ae5e444e0086f5151e17ba4b1a330b3e Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 12 Sep 2024 14:40:28 +0200 Subject: [PATCH 13/26] opcua_json_uplink_converter timestamp = int(time()) * 1_000 # round up to 1 second --- .../extensions/opcua/opcua_json_uplink_converter.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py b/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py index 61f18151d..663c4db23 100644 --- a/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py +++ b/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py @@ -116,7 +116,7 @@ def convert(self, configs, values): telemetry_datapoints_count = 0 attributes_datapoints_count = 0 - basic_timestamp = int(time() * 1000) + timestamp = int(time()) * 1_000 # round up to 1 second for val, config in zip(values, configs): if not val: @@ -126,16 +126,6 @@ def convert(self, configs, values): section = DATA_TYPES[config['section']] - if val.SourceTimestamp is not None: - if abs(basic_timestamp - val.SourceTimestamp.timestamp()) > 3600: - self._log.warning("Timestamps are not in sync for incoming value: %r. " - "Value timestamp: %s, current timestamp: %s", - val, val.SourceTimestamp.timestamp(), basic_timestamp) - else: - basic_timestamp = val.SourceTimestamp.timestamp() * 1000 - - timestamp = basic_timestamp - if val.SourceTimestamp and section == TELEMETRY_PARAMETER: telemetry_datapoints_count += 1 if timestamp in telemetry_datapoints: From 542855ff47df61d44dddd6e1be7b9a319ec23fde Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 13 Sep 2024 10:13:58 +0200 Subject: [PATCH 14/26] self.__min_pack_send_delay_ms = 1000 self.__min_pack_size_to_send = 50 --- thingsboard_gateway/gateway/tb_gateway_service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 52bd89658..9e239eb20 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -226,9 +226,9 @@ def __init__(self, config_file=None): self.init_statistics_service(self.__config['thingsboard'].get('statistics', DEFAULT_STATISTIC)) - self.__min_pack_send_delay_ms = 10 #self.__config['thingsboard'].get('minPackSendDelayMS', 50) + self.__min_pack_send_delay_ms = 1000 #self.__config['thingsboard'].get('minPackSendDelayMS', 50) self.__min_pack_send_delay_ms = self.__min_pack_send_delay_ms / 1000.0 - self.__min_pack_size_to_send = 10 #self.__config['thingsboard'].get('minPackSizeToSend', 50) + self.__min_pack_size_to_send = 50 #self.__config['thingsboard'].get('minPackSizeToSend', 50) self.__max_payload_size_in_bytes = 1_000_000 #self.__config["thingsboard"].get("maxPayloadSizeBytes", 400) self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True, From 858433fca0064c53ec3e90b9861ec3d2bcc9e6c6 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 16 Sep 2024 09:40:19 +0300 Subject: [PATCH 15/26] Adjusted OPCUA JSON uplink converter to work with new logic --- .../opcua/opcua_json_uplink_converter.py | 192 ++++++++++-------- 1 file changed, 108 insertions(+), 84 deletions(-) diff --git a/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py b/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py index 663c4db23..bdb7d9c4a 100644 --- a/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py +++ b/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py @@ -11,15 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from concurrent.futures import ThreadPoolExecutor from datetime import timezone from time import time from thingsboard_gateway.connectors.converter import Converter from asyncua.ua.uatypes import VariantType -from thingsboard_gateway.gateway.constants import TELEMETRY_PARAMETER, ATTRIBUTES_PARAMETER, DEVICE_NAME_PARAMETER, \ - DEVICE_TYPE_PARAMETER, TELEMETRY_VALUES_PARAMETER, TELEMETRY_TIMESTAMP_PARAMETER +from thingsboard_gateway.connectors.opcua.opcua_converter import OpcUaConverter +from thingsboard_gateway.gateway.constants import TELEMETRY_PARAMETER, ATTRIBUTES_PARAMETER +from thingsboard_gateway.gateway.entities.converted_data import ConvertedData +from thingsboard_gateway.gateway.entities.telemetry_entry import TelemetryEntry from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService DATA_TYPES = { @@ -42,63 +44,71 @@ VariantType.Null: lambda data: None } +ERROR_MSG_TEMPLATE = "Bad status code: {} for node: {} with description {}" class OpcuaJsonUplinkConverter(Converter): def __init__(self, config, logger): self._log = logger self.__config = config - def __get_parsed_value(self, val): - value = val.Value.Value - - if isinstance(value, list): - value = [str(item) for item in value] - elif value is not None and not isinstance(value, (int, float, str, bool, dict, type(None))): - handler = VARIANT_TYPE_HANDLERS.get(val.Value.VariantType, - lambda value: str(value) if not hasattr(value, - 'to_string') else value.to_string()) - value = handler(value) - - if value is None and val.StatusCode.is_bad(): - msg = "Bad status code: %r for node: %r with description %r" % (val.StatusCode.name, - val.data_type, - val.StatusCode.doc) - self._log.warning(msg) - value = msg - - return value - - def __get_data_values(self, config, val): + @staticmethod + def process_datapoint(config, val, basic_timestamp): try: - value = self.__get_parsed_value(val) - - data = { - 'Value': { - 'value': value, - 'VariantType': val.Value.VariantType._name_, - 'IsArray': val.Value.IsArray if hasattr(val.Value, 'IsArray') else 'Unknown', - }, - 'SourceTimestamp': val.SourceTimestamp.isoformat(), - 'ServerTimestamp': val.ServerTimestamp.isoformat(), - 'SourcePicoseconds': val.SourcePicoseconds, - 'ServerPicoseconds': val.ServerPicoseconds, - 'DataType': { - 'Identifier': val.data_type.Identifier, - 'NamespaceIndex': val.data_type.NamespaceIndex, - 'NodeIdType': '{}:{}'.format(val.data_type.NodeIdType._name_, val.data_type.NodeIdType._name_), - }, - 'Encoding': val.Encoding, - 'PlatformKey': config.get('key'), - 'Node': config['node'].__str__() if config.get('node') else 'Can\'t get node string representation', - } - - return data + error = None + data = val.Value.Value + if isinstance(data, list): + data = [str(item) for item in data] + else: + handler = VARIANT_TYPE_HANDLERS.get(val.Value.VariantType, lambda d: str(d) if not hasattr(d, 'to_string') else d.to_string()) + value = handler(data) + try: + data = { + 'Value': { + 'value': value, + 'VariantType': val.Value.VariantType._name_, + 'IsArray': val.Value.IsArray if hasattr(val.Value, 'IsArray') else 'Unknown', + }, + 'SourceTimestamp': val.SourceTimestamp.isoformat(), + 'ServerTimestamp': val.ServerTimestamp.isoformat(), + 'SourcePicoseconds': val.SourcePicoseconds, + 'ServerPicoseconds': val.ServerPicoseconds, + 'DataType': { + 'Identifier': val.data_type.Identifier, + 'NamespaceIndex': val.data_type.NamespaceIndex, + 'NodeIdType': '{}:{}'.format(val.data_type.NodeIdType._name_, val.data_type.NodeIdType._name_), + }, + 'Encoding': val.Encoding, + 'PlatformKey': config.get('key'), + 'Node': config['node'].__str__() if config.get('node') else 'Can\'t get node string representation', + } + except Exception as e: + data = str(e) + error = True + + if data is None and val.StatusCode.is_bad(): + data = str.format(ERROR_MSG_TEMPLATE,val.StatusCode.name, val.data_type, val.StatusCode.doc) + error = True + + timestamp_location = config.get('timestampLocation', 'gateway').lower() + timestamp = basic_timestamp # Default timestamp + if timestamp_location == 'sourcetimestamp' and val.SourceTimestamp is not None: + timestamp = val.SourceTimestamp.timestamp() * 1000 + elif timestamp_location == 'servertimestamp' and val.ServerTimestamp is not None: + timestamp = val.ServerTimestamp.timestamp() * 1000 + + section = DATA_TYPES[config['section']] + if section == TELEMETRY_PARAMETER: + return TelemetryEntry({config['key']: data}, ts=timestamp), error + elif section == ATTRIBUTES_PARAMETER: + return {config['key']: data}, error except Exception as e: - self._log.error('Cannot get data values: %s' % str(e)) - return {} + return None, str(e) - def convert(self, configs, values): + def convert(self, configs, values) -> ConvertedData: StatisticsService.count_connector_message(self._log.name, 'convertersMsgProcessed') + basic_timestamp = int(time() * 1000) + + is_debug_enabled = self._log.isEnabledFor(10) try: if not isinstance(configs, list): @@ -106,48 +116,62 @@ def convert(self, configs, values): if not isinstance(values, list): values = [values] - result_data = { - DEVICE_NAME_PARAMETER: self.__config['device_name'], - DEVICE_TYPE_PARAMETER: self.__config['device_type'], - ATTRIBUTES_PARAMETER: [], - TELEMETRY_PARAMETER: []} - telemetry_datapoints = {} + if is_debug_enabled: + start_iteration = basic_timestamp + converted_data = ConvertedData(device_name=self.__config['device_name'], device_type=self.__config['device_type']) - telemetry_datapoints_count = 0 - attributes_datapoints_count = 0 + max_workers = min(8, len(values)) - timestamp = int(time()) * 1_000 # round up to 1 second + with ThreadPoolExecutor(max_workers=max_workers) as executor: + results = list(executor.map(self.process_datapoint, configs, values, [basic_timestamp] * len(values))) - for val, config in zip(values, configs): - if not val: - continue + telemetry_batch = [] + attributes_batch = [] - data = self.__get_data_values(config, val) + if is_debug_enabled: + filling_start_time = int(time() * 1000) - section = DATA_TYPES[config['section']] + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_telemetry = executor.submit(self.fill_telemetry, results) + future_attributes = executor.submit(self.fill_attributes, results) + telemetry_batch = future_telemetry.result() + attributes_batch = future_attributes.result() - if val.SourceTimestamp and section == TELEMETRY_PARAMETER: - telemetry_datapoints_count += 1 - if timestamp in telemetry_datapoints: - telemetry_datapoints[timestamp].update({config['key']: data}) - else: - telemetry_datapoints[timestamp] = {config['key']: data} - else: - attributes_datapoints_count += 1 - result_data[section].append({config['key']: data}) + converted_data.add_to_telemetry(telemetry_batch) + for attr in attributes_batch: + converted_data.add_to_attributes(attr) - if telemetry_datapoints: - result_data[TELEMETRY_PARAMETER].extend( - {TELEMETRY_TIMESTAMP_PARAMETER: timestamp, TELEMETRY_VALUES_PARAMETER: datapoints} - for timestamp, datapoints in telemetry_datapoints.items() - ) + if is_debug_enabled: + converted_data_fill_time = int(time() * 1000) - filling_start_time + total_datapoints_in_converted_data = converted_data.telemetry_datapoints_count + converted_data.attributes_datapoints_count - StatisticsService.count_connector_message(self._log.name, 'convertersAttrProduced', - count=attributes_datapoints_count) - StatisticsService.count_connector_message(self._log.name, 'convertersTsProduced', - count=telemetry_datapoints_count) + if is_debug_enabled: + self._log.debug("Iteration took %d ms", int(time() * 1000) - start_iteration) + self._log.debug("Filling took %d ms", converted_data_fill_time) + self._log.debug("Average time per iteration: %2f ms", (float(int(time() * 1000)) - start_iteration) / float(len(values))) + self._log.debug("Average filling time: %2f ms", float(converted_data_fill_time) / float(total_datapoints_in_converted_data)) + self._log.debug("Total datapoints in converted data: %d", total_datapoints_in_converted_data) - return result_data + StatisticsService.count_connector_message(self._log.name, 'convertersAttrProduced', count=converted_data.attributes_datapoints_count) + StatisticsService.count_connector_message(self._log.name, 'convertersTsProduced', count=converted_data.telemetry_datapoints_count) + + return converted_data except Exception as e: - self._log.exception(e) + self._log.exception("Error occurred while converting data: ", exc_info=e) StatisticsService.count_connector_message(self._log.name, 'convertersMsgDropped') + + @staticmethod + def fill_telemetry(results): + telemetry_batch = [] + for result, error in results: + if isinstance(result, TelemetryEntry): + telemetry_batch.append(result) + return telemetry_batch + + @staticmethod + def fill_attributes(results): + attributes_batch = [] + for result, error in results: + if isinstance(result, dict): + attributes_batch.append(result) + return attributes_batch From ea02c46df56edf74b8821c2676142d89bfdb6561 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 23 Sep 2024 14:53:13 +0300 Subject: [PATCH 16/26] Changed timestamp to get grouping and less messages in rule chain --- .../extensions/opcua/opcua_json_uplink_converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py b/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py index bdb7d9c4a..9a6745d82 100644 --- a/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py +++ b/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py @@ -106,7 +106,7 @@ def process_datapoint(config, val, basic_timestamp): def convert(self, configs, values) -> ConvertedData: StatisticsService.count_connector_message(self._log.name, 'convertersMsgProcessed') - basic_timestamp = int(time() * 1000) + basic_timestamp = int(int(time()) * 1000) is_debug_enabled = self._log.isEnabledFor(10) From 4d11b34b8a40c7022c743d022010ac344dbc8065 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 23 Sep 2024 16:58:08 +0300 Subject: [PATCH 17/26] Moved logs to trace --- .../extensions/opcua/opcua_json_uplink_converter.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py b/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py index 9a6745d82..76239140c 100644 --- a/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py +++ b/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py @@ -108,7 +108,7 @@ def convert(self, configs, values) -> ConvertedData: StatisticsService.count_connector_message(self._log.name, 'convertersMsgProcessed') basic_timestamp = int(int(time()) * 1000) - is_debug_enabled = self._log.isEnabledFor(10) + is_debug_enabled = self._log.isEnabledFor(5) try: if not isinstance(configs, list): @@ -146,11 +146,11 @@ def convert(self, configs, values) -> ConvertedData: total_datapoints_in_converted_data = converted_data.telemetry_datapoints_count + converted_data.attributes_datapoints_count if is_debug_enabled: - self._log.debug("Iteration took %d ms", int(time() * 1000) - start_iteration) - self._log.debug("Filling took %d ms", converted_data_fill_time) - self._log.debug("Average time per iteration: %2f ms", (float(int(time() * 1000)) - start_iteration) / float(len(values))) - self._log.debug("Average filling time: %2f ms", float(converted_data_fill_time) / float(total_datapoints_in_converted_data)) - self._log.debug("Total datapoints in converted data: %d", total_datapoints_in_converted_data) + self._log.trace("Iteration took %d ms", int(time() * 1000) - start_iteration) + self._log.trace("Filling took %d ms", converted_data_fill_time) + self._log.trace("Average time per iteration: %2f ms", (float(int(time() * 1000)) - start_iteration) / float(len(values))) + self._log.trace("Average filling time: %2f ms", float(converted_data_fill_time) / float(total_datapoints_in_converted_data)) + self._log.trace("Total datapoints in converted data: %d", total_datapoints_in_converted_data) StatisticsService.count_connector_message(self._log.name, 'convertersAttrProduced', count=converted_data.attributes_datapoints_count) StatisticsService.count_connector_message(self._log.name, 'convertersTsProduced', count=converted_data.telemetry_datapoints_count) From 8cc42a3c6089611a6e21a4b43ac6444f66b1958b Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 23 Sep 2024 18:12:16 +0200 Subject: [PATCH 18/26] ts round 100ms, subscr by batches, subscr logs info and warn; trace log removed --- .../connectors/opcua/opcua_connector.py | 6 +++--- .../opcua/opcua_json_uplink_converter.py | 15 ++++++++------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index be6d451ec..1fb161429 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -606,7 +606,7 @@ async def _load_devices_nodes(self): if nodes_to_subscribe: nodes_data_change_subscriptions = await self._subscribe_for_node_updates_in_batches(device, nodes_to_subscribe, - 100) + 1000) subs = [] for subs_batch in nodes_data_change_subscriptions: subs.extend(subs_batch) @@ -627,9 +627,9 @@ async def _subscribe_for_node_updates_in_batches(self, device, nodes, batch_size batch = nodes[i:i + batch_size] try: result.append(await device.subscription.subscribe_data_change(batch)) - self.__log.debug(f"Subscribed to batch {i // batch_size + 1} with {len(batch)} nodes.") + self.__log.info(f"Subscribed to batch {i // batch_size + 1} with {len(batch)} nodes.") except Exception as e: - self.__log.debug(f"Error subscribing to batch {i // batch_size + 1}: {e}") + self.__log.warn(f"Error subscribing to batch {i // batch_size + 1}: {e}") break return result diff --git a/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py b/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py index 76239140c..84cbd2477 100644 --- a/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py +++ b/thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py @@ -106,7 +106,8 @@ def process_datapoint(config, val, basic_timestamp): def convert(self, configs, values) -> ConvertedData: StatisticsService.count_connector_message(self._log.name, 'convertersMsgProcessed') - basic_timestamp = int(int(time()) * 1000) + basic_timestamp = int(time() * 1000) + basic_timestamp = round(basic_timestamp / 100) * 100 is_debug_enabled = self._log.isEnabledFor(5) @@ -145,12 +146,12 @@ def convert(self, configs, values) -> ConvertedData: converted_data_fill_time = int(time() * 1000) - filling_start_time total_datapoints_in_converted_data = converted_data.telemetry_datapoints_count + converted_data.attributes_datapoints_count - if is_debug_enabled: - self._log.trace("Iteration took %d ms", int(time() * 1000) - start_iteration) - self._log.trace("Filling took %d ms", converted_data_fill_time) - self._log.trace("Average time per iteration: %2f ms", (float(int(time() * 1000)) - start_iteration) / float(len(values))) - self._log.trace("Average filling time: %2f ms", float(converted_data_fill_time) / float(total_datapoints_in_converted_data)) - self._log.trace("Total datapoints in converted data: %d", total_datapoints_in_converted_data) + # if is_debug_enabled: + # self._log.trace("Iteration took %d ms", int(time() * 1000) - start_iteration) + # self._log.trace("Filling took %d ms", converted_data_fill_time) + # self._log.trace("Average time per iteration: %2f ms", (float(int(time() * 1000)) - start_iteration) / float(len(values))) + # self._log.trace("Average filling time: %2f ms", float(converted_data_fill_time) / float(total_datapoints_in_converted_data)) + # self._log.trace("Total datapoints in converted data: %d", total_datapoints_in_converted_data) StatisticsService.count_connector_message(self._log.name, 'convertersAttrProduced', count=converted_data.attributes_datapoints_count) StatisticsService.count_connector_message(self._log.name, 'convertersTsProduced', count=converted_data.telemetry_datapoints_count) From 5af9509b3735405ac06609ed2c457844c9839c8e Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 23 Sep 2024 18:25:10 +0200 Subject: [PATCH 19/26] subscription_batch_size --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 1fb161429..5d650903e 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -603,10 +603,11 @@ async def _load_devices_nodes(self): except ValueError: pass + subscription_batch_size = self.__config.get('subscriptionBatchSize', 2000) if nodes_to_subscribe: nodes_data_change_subscriptions = await self._subscribe_for_node_updates_in_batches(device, nodes_to_subscribe, - 1000) + subscription_batch_size) subs = [] for subs_batch in nodes_data_change_subscriptions: subs.extend(subs_batch) From 69b89a1c6a79e3ec0877e4790bad1fbd548f61fa Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 23 Sep 2024 18:45:07 +0200 Subject: [PATCH 20/26] trace: New data change event --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 5d650903e..034495e4f 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -903,5 +903,5 @@ def __init__(self, queue, logger): self.__queue = queue def datachange_notification(self, node, _, data): - self.__log.debug("New data change event %s %s", node, data) + self.__log.trace("New data change event %s %s", node, data) self.__queue.put((node, data, int(time() * 1000))) From 892b551e5ad80f20611d1c8393ec8ac1dcbec600 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Tue, 24 Sep 2024 10:34:16 +0300 Subject: [PATCH 21/26] Added additional logs --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index ff103e8a2..7d16b1343 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -641,7 +641,9 @@ async def _subscribe_for_node_updates_in_batches(self, device, nodes, batch_size result.append(await device.subscription.subscribe_data_change(batch)) self.__log.info(f"Subscribed to batch {i // batch_size + 1} with {len(batch)} nodes.") except Exception as e: - self.__log.warn(f"Error subscribing to batch {i // batch_size + 1}: {e}") + self.__log.warn(f"Error subscribing to batch {i // batch_size + 1} with {len(batch)} : {e}") + self.__log.error(f"{batch}") + self.__log.exception(exc_info=e) break return result From 576b8c31cbed40b74ce80171e1d3e6123b620708 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Tue, 24 Sep 2024 10:35:26 +0300 Subject: [PATCH 22/26] Added additional logs --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 7d16b1343..4bdae462b 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -643,7 +643,7 @@ async def _subscribe_for_node_updates_in_batches(self, device, nodes, batch_size except Exception as e: self.__log.warn(f"Error subscribing to batch {i // batch_size + 1} with {len(batch)} : {e}") self.__log.error(f"{batch}") - self.__log.exception(exc_info=e) + self.__log.exception("exception", exc_info=e) break return result From 7e43fedbf54d4203d5121892dc211e52e4c3327c Mon Sep 17 00:00:00 2001 From: imbeacon Date: Tue, 24 Sep 2024 10:36:09 +0300 Subject: [PATCH 23/26] Added additional logs --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 4bdae462b..11d27ee4b 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -638,11 +638,12 @@ async def _subscribe_for_node_updates_in_batches(self, device, nodes, batch_size for i in range(0, total_nodes, batch_size): batch = nodes[i:i + batch_size] try: + self.__log.info(f"Subscribing to batch {i // batch_size + 1} with {len(batch)} nodes.") result.append(await device.subscription.subscribe_data_change(batch)) self.__log.info(f"Subscribed to batch {i // batch_size + 1} with {len(batch)} nodes.") except Exception as e: self.__log.warn(f"Error subscribing to batch {i // batch_size + 1} with {len(batch)} : {e}") - self.__log.error(f"{batch}") + # self.__log.error(f"{batch}") self.__log.exception("exception", exc_info=e) break return result From 35a6da570e131739e5551db1ea1c81f293c305e1 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 24 Sep 2024 14:55:11 +0200 Subject: [PATCH 24/26] subscriptionBatchSize 100_000 --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 11d27ee4b..4b4a4aa82 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -614,7 +614,7 @@ async def _load_devices_nodes(self): except ValueError: pass - subscription_batch_size = self.__config.get('subscriptionBatchSize', 2000) + subscription_batch_size = self.__config.get('subscriptionBatchSize', 100_000) if nodes_to_subscribe: nodes_data_change_subscriptions = await self._subscribe_for_node_updates_in_batches(device, nodes_to_subscribe, From 551d46713e3e7ac42aa942db96d30d792d7e56f7 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 25 Sep 2024 10:22:37 +0200 Subject: [PATCH 25/26] self.__subscription_batch_size = self.__server_conf.get('subscriptionBatchSize', 100_000) --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 9b0debe76..21014b217 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -101,7 +101,7 @@ def __init__(self, gateway, config, connector_type): # Batch size for data change subscription, the gateway will process this amount of data, received from subscriptions, or less in one iteration self.__sub_data_max_batch_size = self.__server_conf.get("subDataMaxBatchSize", 1000) self.__sub_data_min_batch_creation_time = max(self.__server_conf.get("subDataMinBatchCreationTimeMs", 200), 100) / 1000 - self.__subscription_batch_size = self.__server_conf.get('subscriptionProcessBatchSize', 2000) + self.__subscription_batch_size = self.__server_conf.get('subscriptionBatchSize', 100_000) self.__sub_data_to_convert = Queue(-1) self.__data_to_convert = Queue(-1) From 634762ee8fdcd36c160c8e4f2e96ca3584382b30 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 25 Sep 2024 10:23:45 +0200 Subject: [PATCH 26/26] cleanup --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 1 - 1 file changed, 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 21014b217..b68bfbdb7 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -656,7 +656,6 @@ async def _load_devices_nodes(self): except ValueError: pass - subscription_batch_size = self.__config.get('subscriptionBatchSize', 100_000) if nodes_to_subscribe: nodes_data_change_subscriptions = await self._subscribe_for_node_updates_in_batches(device, nodes_to_subscribe,