Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connectors/opcua/high load (for comparison) #1511

Draft
wants to merge 48 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
6e22db9
maxPayloadSizeBytes 1000000
smatvienko-tb Aug 15, 2024
316764d
__min_pack_size_to_send workout, logs added to debug the mqtt client
smatvienko-tb Aug 15, 2024
e7900ae
log = TbLogger('service', gateway=self, level='DEBUG')
smatvienko-tb Aug 15, 2024
4773e9b
asyncua requirements
smatvienko-tb Aug 16, 2024
2900eb8
docker python12 and cleanup
smatvienko-tb Aug 16, 2024
0e7271c
revert back default logger to INFO
smatvienko-tb Aug 16, 2024
480a238
Merge remote-tracking branch 'upstream/connectors/opcua/high-load' in…
smatvienko-tb Aug 17, 2024
fade666
Merge remote-tracking branch 'upstream/connectors/opcua/high-load' in…
smatvienko-tb Aug 17, 2024
41494b8
Merge remote-tracking branch 'upstream/connectors/opcua/high-load' in…
smatvienko-tb Aug 17, 2024
a80630a
log debug Converted devices, attr, telemetry
smatvienko-tb Aug 18, 2024
dea7868
Merge remote-tracking branch 'upstream/connectors/opcua/high-load' in…
smatvienko-tb Aug 19, 2024
29420ef
Merge remote-tracking branch 'upstream/connectors/opcua/high-load' in…
smatvienko-tb Aug 19, 2024
9e7c1df
echo version tag after finish on docker build script
smatvienko-tb Aug 19, 2024
c8c8284
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Aug 27, 2024
cffd0c8
set get_max_payload_size_bytes custom value 1_000_000
smatvienko-tb Aug 27, 2024
7db6cdc
custom value minPackSendDelayMS = 10
smatvienko-tb Aug 27, 2024
2ede099
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Sep 5, 2024
0af3183
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Sep 5, 2024
6977d22
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Sep 5, 2024
fc4f1d8
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Sep 10, 2024
a788bb7
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Sep 10, 2024
66c4464
OPCUA converter timestamp = int(time() * 1000)
smatvienko-tb Sep 10, 2024
e29c971
opcua_json_uplink_converter.py
smatvienko-tb Sep 12, 2024
a6082ea
opcua_json_uplink_converter timestamp = int(time()) * 1_000 # round u…
smatvienko-tb Sep 12, 2024
b435ef8
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Sep 12, 2024
542855f
self.__min_pack_send_delay_ms = 1000 self.__min_pack_size_to_send = 50
smatvienko-tb Sep 13, 2024
26e1596
Merge remote-tracking branch 'origin/master' into connectors/opcua/hi…
imbeacon Sep 16, 2024
858433f
Adjusted OPCUA JSON uplink converter to work with new logic
imbeacon Sep 16, 2024
5f184d9
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Sep 19, 2024
2a08ef7
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Sep 21, 2024
54961e3
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Sep 23, 2024
934ba5d
Merge branch 'master' of github.com:thingsboard/thingsboard-gateway i…
imbeacon Sep 23, 2024
ea02c46
Changed timestamp to get grouping and less messages in rule chain
imbeacon Sep 23, 2024
4d11b34
Moved logs to trace
imbeacon Sep 23, 2024
8cc42a3
ts round 100ms, subscr by batches, subscr logs info and warn; trace l…
smatvienko-tb Sep 23, 2024
5af9509
subscription_batch_size
smatvienko-tb Sep 23, 2024
69b89a1
trace: New data change event
smatvienko-tb Sep 23, 2024
876cfaa
Merge branch 'master' of github.com:thingsboard/thingsboard-gateway i…
imbeacon Sep 24, 2024
892b551
Added additional logs
imbeacon Sep 24, 2024
576b8c3
Added additional logs
imbeacon Sep 24, 2024
7e43fed
Added additional logs
imbeacon Sep 24, 2024
35a6da5
subscriptionBatchSize 100_000
smatvienko-tb Sep 24, 2024
b44eb84
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Sep 25, 2024
551d467
self.__subscription_batch_size = self.__server_conf.get('subscription…
smatvienko-tb Sep 25, 2024
634762e
cleanup
smatvienko-tb Sep 25, 2024
045faa7
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Sep 25, 2024
a96d5b5
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Sep 26, 2024
d11c271
Merge remote-tracking branch 'upstream/master' into connectors/opcua/…
smatvienko-tb Oct 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions build_latest_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ VERSION_TAG="${BRANCH_NAME}-${COMMIT_ID}"
echo "$(date) Building project with version tag $VERSION_TAG ..."
set -x

BUILDKIT_PROGRESS=plain \
docker build . -t sevlamat/tb-gateway:$VERSION_TAG -f docker/Dockerfile -o type=registry

# multi arch
DOCKER_CLI_EXPERIMENTAL=enabled \
docker buildx build . -t tb-gateway:$VERSION_TAG -f docker/Dockerfile --platform=linux/amd64,linux/arm64 -o type=registry
#DOCKER_CLI_EXPERIMENTAL=enabled \
#docker buildx build . -t sevlamat/tb-gateway:$VERSION_TAG -f docker/Dockerfile --platform=linux/amd64,linux/arm64,linux/386 -o type=registry

set +x
echo "$(date) Done."
echo "$(date) Done. ${VERSION_TAG}"
6 changes: 3 additions & 3 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ RUN mkdir -p /default-config/config /default-config/extensions/ && \
echo "Unsupported platform detected. Trying to use default value...";; \
esac && \
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-host=$DEFAULT_HOST --profile minimal && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/* && \
echo '#!/bin/sh\n\
# Main start script\n\
CONF_FOLDER="/thingsboard_gateway/config"\n\
Expand All @@ -46,9 +44,11 @@ python /thingsboard_gateway/tb_gateway.py' > /start-gateway.sh && chmod +x /star
python3 -m pip install --no-cache-dir --upgrade pip setuptools wheel && \
python3 -m pip install --no-cache-dir cryptography && \
python3 -m pip install --no-cache-dir -r requirements.txt && \
rustup self uninstall -y && \
apt-get remove --purge -y gcc python3-dev build-essential libssl-dev libffi-dev zlib1g-dev pkg-config && \
apt-get autoremove -y && \
rustup self uninstall -y
apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/*

VOLUME ["${configs}", "${extensions}", "${logs}"]

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ cachetools
tb-mqtt-client>=1.10.3
service-identity
psutil
asyncua
6 changes: 5 additions & 1 deletion thingsboard_gateway/connectors/opcua/opcua_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __init__(self, gateway, config, connector_type):
# Batch size for data change subscription, the gateway will process this amount of data, received from subscriptions, or less in one iteration
self.__sub_data_max_batch_size = self.__server_conf.get("subDataMaxBatchSize", 1000)
self.__sub_data_min_batch_creation_time = max(self.__server_conf.get("subDataMinBatchCreationTimeMs", 200), 100) / 1000
self.__subscription_batch_size = self.__server_conf.get('subscriptionProcessBatchSize', 2000)
self.__subscription_batch_size = self.__server_conf.get('subscriptionBatchSize', 100_000)

self.__sub_data_to_convert = Queue(-1)
self.__data_to_convert = Queue(-1)
Expand Down Expand Up @@ -723,6 +723,10 @@ def __thread_pool_executor_processor(self):
def __convert_retrieved_data(self, values, received_ts, data_retrieving_started):
try:
converted_nodes_count = 0
converted_data_dev_count = 0
converted_data_attr_count = 0
converted_telemetry_ts_count = 0

for device in self.__device_nodes:
nodes_count = len(device.nodes)
device_values = values[converted_nodes_count:converted_nodes_count + nodes_count]
Expand Down
178 changes: 178 additions & 0 deletions thingsboard_gateway/extensions/opcua/opcua_json_uplink_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# Copyright 2024. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent.futures import ThreadPoolExecutor
from datetime import timezone
from time import time

from thingsboard_gateway.connectors.converter import Converter
from asyncua.ua.uatypes import VariantType

from thingsboard_gateway.connectors.opcua.opcua_converter import OpcUaConverter
from thingsboard_gateway.gateway.constants import TELEMETRY_PARAMETER, ATTRIBUTES_PARAMETER
from thingsboard_gateway.gateway.entities.converted_data import ConvertedData
from thingsboard_gateway.gateway.entities.telemetry_entry import TelemetryEntry
from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService

DATA_TYPES = {
'attributes': ATTRIBUTES_PARAMETER,
'timeseries': TELEMETRY_PARAMETER
}

VARIANT_TYPE_HANDLERS = {
VariantType.ExtensionObject: lambda data: str(data),
VariantType.DateTime: lambda data: data.replace(
tzinfo=timezone.utc).isoformat() if data.tzinfo is None else data.isoformat(),
VariantType.StatusCode: lambda data: data.name,
VariantType.QualifiedName: lambda data: data.to_string(),
VariantType.NodeId: lambda data: data.to_string(),
VariantType.ExpandedNodeId: lambda data: data.to_string(),
VariantType.ByteString: lambda data: data.hex(),
VariantType.XmlElement: lambda data: data.decode('utf-8'),
VariantType.Guid: lambda data: str(data),
VariantType.DiagnosticInfo: lambda data: data.to_string(),
VariantType.Null: lambda data: None
}

ERROR_MSG_TEMPLATE = "Bad status code: {} for node: {} with description {}"

class OpcuaJsonUplinkConverter(Converter):
def __init__(self, config, logger):
self._log = logger
self.__config = config

@staticmethod
def process_datapoint(config, val, basic_timestamp):
try:
error = None
data = val.Value.Value
if isinstance(data, list):
data = [str(item) for item in data]
else:
handler = VARIANT_TYPE_HANDLERS.get(val.Value.VariantType, lambda d: str(d) if not hasattr(d, 'to_string') else d.to_string())
value = handler(data)
try:
data = {
'Value': {
'value': value,
'VariantType': val.Value.VariantType._name_,
'IsArray': val.Value.IsArray if hasattr(val.Value, 'IsArray') else 'Unknown',
},
'SourceTimestamp': val.SourceTimestamp.isoformat(),
'ServerTimestamp': val.ServerTimestamp.isoformat(),
'SourcePicoseconds': val.SourcePicoseconds,
'ServerPicoseconds': val.ServerPicoseconds,
'DataType': {
'Identifier': val.data_type.Identifier,
'NamespaceIndex': val.data_type.NamespaceIndex,
'NodeIdType': '{}:{}'.format(val.data_type.NodeIdType._name_, val.data_type.NodeIdType._name_),
},
'Encoding': val.Encoding,
'PlatformKey': config.get('key'),
'Node': config['node'].__str__() if config.get('node') else 'Can\'t get node string representation',
}
except Exception as e:
data = str(e)
error = True

if data is None and val.StatusCode.is_bad():
data = str.format(ERROR_MSG_TEMPLATE,val.StatusCode.name, val.data_type, val.StatusCode.doc)
error = True

timestamp_location = config.get('timestampLocation', 'gateway').lower()
timestamp = basic_timestamp # Default timestamp
if timestamp_location == 'sourcetimestamp' and val.SourceTimestamp is not None:
timestamp = val.SourceTimestamp.timestamp() * 1000
elif timestamp_location == 'servertimestamp' and val.ServerTimestamp is not None:
timestamp = val.ServerTimestamp.timestamp() * 1000

section = DATA_TYPES[config['section']]
if section == TELEMETRY_PARAMETER:
return TelemetryEntry({config['key']: data}, ts=timestamp), error
elif section == ATTRIBUTES_PARAMETER:
return {config['key']: data}, error
except Exception as e:
return None, str(e)

def convert(self, configs, values) -> ConvertedData:
StatisticsService.count_connector_message(self._log.name, 'convertersMsgProcessed')
basic_timestamp = int(time() * 1000)
basic_timestamp = round(basic_timestamp / 100) * 100

is_debug_enabled = self._log.isEnabledFor(5)

try:
if not isinstance(configs, list):
configs = [configs]
if not isinstance(values, list):
values = [values]

if is_debug_enabled:
start_iteration = basic_timestamp
converted_data = ConvertedData(device_name=self.__config['device_name'], device_type=self.__config['device_type'])

max_workers = min(8, len(values))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(self.process_datapoint, configs, values, [basic_timestamp] * len(values)))

telemetry_batch = []
attributes_batch = []

if is_debug_enabled:
filling_start_time = int(time() * 1000)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_telemetry = executor.submit(self.fill_telemetry, results)
future_attributes = executor.submit(self.fill_attributes, results)
telemetry_batch = future_telemetry.result()
attributes_batch = future_attributes.result()

converted_data.add_to_telemetry(telemetry_batch)
for attr in attributes_batch:
converted_data.add_to_attributes(attr)

if is_debug_enabled:
converted_data_fill_time = int(time() * 1000) - filling_start_time
total_datapoints_in_converted_data = converted_data.telemetry_datapoints_count + converted_data.attributes_datapoints_count

# if is_debug_enabled:
# self._log.trace("Iteration took %d ms", int(time() * 1000) - start_iteration)
# self._log.trace("Filling took %d ms", converted_data_fill_time)
# self._log.trace("Average time per iteration: %2f ms", (float(int(time() * 1000)) - start_iteration) / float(len(values)))
# self._log.trace("Average filling time: %2f ms", float(converted_data_fill_time) / float(total_datapoints_in_converted_data))
# self._log.trace("Total datapoints in converted data: %d", total_datapoints_in_converted_data)

StatisticsService.count_connector_message(self._log.name, 'convertersAttrProduced', count=converted_data.attributes_datapoints_count)
StatisticsService.count_connector_message(self._log.name, 'convertersTsProduced', count=converted_data.telemetry_datapoints_count)

return converted_data
except Exception as e:
self._log.exception("Error occurred while converting data: ", exc_info=e)
StatisticsService.count_connector_message(self._log.name, 'convertersMsgDropped')

@staticmethod
def fill_telemetry(results):
telemetry_batch = []
for result, error in results:
if isinstance(result, TelemetryEntry):
telemetry_batch.append(result)
return telemetry_batch

@staticmethod
def fill_attributes(results):
attributes_batch = []
for result, error in results:
if isinstance(result, dict):
attributes_batch.append(result)
return attributes_batch
12 changes: 6 additions & 6 deletions thingsboard_gateway/gateway/tb_gateway_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ def __init__(self, config_file=None):

self.init_statistics_service(self.__config['thingsboard'].get('statistics', DEFAULT_STATISTIC))

self.__min_pack_send_delay_ms = self.__config['thingsboard'].get('minPackSendDelayMS', 50)
self.__min_pack_send_delay_ms = 1000 #self.__config['thingsboard'].get('minPackSendDelayMS', 50)
self.__min_pack_send_delay_ms = self.__min_pack_send_delay_ms / 1000.0
self.__min_pack_size_to_send = self.__config['thingsboard'].get('minPackSizeToSend', 500)
self.__max_payload_size_in_bytes = self.__config["thingsboard"].get("maxPayloadSizeBytes", 8196)
self.__min_pack_size_to_send = 50 #self.__config['thingsboard'].get('minPackSizeToSend', 50)
self.__max_payload_size_in_bytes = 1_000_000 #self.__config["thingsboard"].get("maxPayloadSizeBytes", 400)

self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
name="Send data to Thingsboard Thread")
Expand Down Expand Up @@ -1303,8 +1303,8 @@ def check_size(self, devices_data_in_event_pack, current_data_pack_size, item_si
def __read_data_from_storage(self):
devices_data_in_event_pack = {}
global log
log.debug("Send data Thread has been started successfully.")
log.debug("Maximal size of the client message queue is: %r",
log.info("Send data Thread has been started successfully.")
log.info("Maximal size of the client message queue is: %r",
self.tb_client.client._client._max_queued_messages) # noqa pylint: disable=protected-access
current_event_pack_data_size = 0
logger_get_time = 0
Expand Down Expand Up @@ -1956,7 +1956,7 @@ def get_max_payload_size_bytes(self):
if hasattr(self, '_TBGatewayService__max_payload_size_in_bytes'):
return int(self.__max_payload_size_in_bytes * 0.9)

return 8196
return 1_000_000

# ----------------------------
# Storage --------------------
Expand Down
Loading