From 2192c49d9c3795d45dfe0e93d40c8f9d9fc505c5 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 20 Jan 2025 09:26:19 +0200 Subject: [PATCH] Removed processing for sendDataOnlyOnChange as deprecated module, use reportStrategy ON_CHANGE instead --- .../connectors/mqtt/mqtt_connector.py | 34 ++++--------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index 92b76b0c..453efdc1 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -140,15 +140,9 @@ def __init__(self, gateway, config, connector_type): # Extract main sections from configuration --------------------------------------------------------------------- self.__broker = config.get('broker') - self.__send_data_only_on_change = self.__broker.get(SEND_ON_CHANGE_PARAMETER, - self.config.get(SEND_ON_CHANGE_PARAMETER, - DEFAULT_SEND_ON_CHANGE_VALUE)) - self.__send_data_only_on_change_ttl = self.__broker.get(SEND_ON_CHANGE_TTL_PARAMETER, - self.config.get(SEND_ON_CHANGE_TTL_PARAMETER, - DEFAULT_SEND_ON_CHANGE_INFINITE_TTL_VALUE)) # noqa - - # for sendDataOnlyOnChange param - self.__topic_content = {} + if not self.__broker: + self.__log.error('Broker configuration is missing!') + return self.__mapping = [] self.__server_side_rpc = [] @@ -259,27 +253,21 @@ def __init__(self, gateway, config, connector_type): self.__stop_event = Event() self.daemon = True - self.__msg_queue = Queue(config['broker'].get('maxMessageQueue', 1000000000)) + self.__msg_queue = Queue(self.__broker.get('maxMessageQueue', 1000000000)) self.__workers_thread_pool = [] - self.__max_msg_number_for_worker = config['broker'].get('maxMessageNumberPerWorker', 10) - self.__max_number_of_workers = config['broker'].get('maxNumberOfWorkers', 100) + self.__max_msg_number_for_worker = self.__broker.get('maxMessageNumberPerWorker', 10) + self.__max_number_of_workers = self.__broker.get('maxNumberOfWorkers', 100) - self._on_message_queue = Queue(config['broker'].get('maxProcessingMessageQueue', 1000000000)) + self._on_message_queue = Queue(self.__broker.get('maxProcessingMessageQueue', 1000000000)) self._on_message_thread = Thread(name='On Message', target=self._process_on_message, daemon=True) self._on_message_thread.start() - def is_filtering_enable(self, device_name): - return self.__send_data_only_on_change - def get_config(self): return self.config def get_type(self): return self._connector_type - def get_ttl_for_duplicates(self, device_name): - return self.__send_data_only_on_change_ttl - @staticmethod def __add_ts_to_test_message(msg, ts_name): msg = simplejson.loads(msg.decode('utf-8').replace("'", '"')) @@ -638,14 +626,6 @@ def _process_on_message(self): available_converters = self.__mapping_sub_topics[topic] for converter in available_converters: try: - # check if data is equal - if converter.config.get('sendDataOnlyOnChange', False) and self.__topic_content.get( - message.topic) == content: - request_handled = True - continue - - self.__topic_content[message.topic] = content - request_handled = self.put_data_to_convert(converter, message, content) except Exception as e: self.__log.exception(e)