From e9c6819eec8ee382fa3d3455f1ba58e50a2480d8 Mon Sep 17 00:00:00 2001 From: Steve Hamblett Date: Fri, 5 Jul 2024 13:48:42 +0100 Subject: [PATCH] Issue523 (#543) * Issue 523 * Issue 523 * Issue 523 * Issue 523 * Issue 523 * Issue 523 --- .../mqtt_server_client_failed_connection.dart | 105 ++++++++++++++++++ ...onous_mqtt_browser_connection_handler.dart | 40 +++++-- .../mqtt_client_imqtt_connection_handler.dart | 3 + ...t_client_mqtt_connection_handler_base.dart | 4 + ...ronous_mqtt_server_connection_handler.dart | 40 +++++-- lib/src/mqtt_browser_client.dart | 6 +- lib/src/mqtt_client.dart | 13 +++ lib/src/mqtt_client_constants.dart | 5 +- lib/src/mqtt_server_client.dart | 6 +- ...onnection_autoreconnect_nobroker_test.dart | 7 ++ .../mqtt_client_connection_unsecure_test.dart | 84 ++++++++++++++ 11 files changed, 286 insertions(+), 27 deletions(-) create mode 100644 example/mqtt_server_client_failed_connection.dart diff --git a/example/mqtt_server_client_failed_connection.dart b/example/mqtt_server_client_failed_connection.dart new file mode 100644 index 0000000..0c8de95 --- /dev/null +++ b/example/mqtt_server_client_failed_connection.dart @@ -0,0 +1,105 @@ +/* + * Package : mqtt_client + * Author : S. Hamblett + * Date : 31/05/2017 + * Copyright : S.Hamblett + */ + +import 'dart:async'; +import 'dart:io'; +import 'package:mqtt_client/mqtt_client.dart'; +import 'package:mqtt_client/mqtt_server_client.dart'; + +/// An annotated connection attempt failed usage example for mqtt_server_client. +/// +/// To run this example on a linux host please execute 'netcat -l 1883' at the command line. +/// Use a suitably equivalent command for other hosts. +/// +/// First create a client, the client is constructed with a broker name, client identifier +/// and port if needed. The client identifier (short ClientId) is an identifier of each MQTT +/// client connecting to a MQTT broker. As the word identifier already suggests, it should be unique per broker. +/// The broker uses it for identifying the client and the current state of the client. If you don’t need a state +/// to be hold by the broker, in MQTT 3.1.1 you can set an empty ClientId, which results in a connection without any state. +/// A condition is that clean session connect flag is true, otherwise the connection will be rejected. +/// The client identifier can be a maximum length of 23 characters. If a port is not specified the standard port +/// of 1883 is used. + +/// Connect to a resolvable host that is not running a broker, hence the connection will fail. +/// Set the maximum connection attempts to 3. +final client = MqttServerClient('localhost', '', maxConnectionAttempts: 3); + +Future main() async { + /// Set logging on if needed, defaults to off + client.logging(on: false); + + /// Set the correct MQTT protocol for mosquito + client.setProtocolV311(); + + /// The connection timeout period can be set if needed, the default is 5 seconds. + client.connectTimeoutPeriod = 2000; // milliseconds + + /// Add the unsolicited disconnection callback + client.onDisconnected = onDisconnected; + + /// Add the failed connection attempt callback. + /// This callback will be called on every failed connection attempt, in the case of this + /// example it will be called 3 times at a period of 2 seconds. + client.onFailedConnectionAttempt = failedConnectionAttemptCallback; + + /// Create a connection message to use or use the default one. The default one sets the + /// client identifier, any supplied username/password and clean session, + /// an example of a specific one below. + final connMess = MqttConnectMessage() + .withClientIdentifier('Mqtt_MyClientUniqueId') + .withWillTopic('willtopic') // If you set this you must set a will message + .withWillMessage('My Will message') + .startClean() // Non persistent session for testing + .withWillQos(MqttQos.atLeastOnce); + print('EXAMPLE::Mosquitto client connecting....'); + client.connectionMessage = connMess; + + /// Connect the client, any errors here are communicated via the failed + /// connection attempts callback + + try { + await client.connect(); + } on NoConnectionException catch (e) { + // Raised by the client when connection fails. + print('EXAMPLE::client exception - $e'); + client.disconnect(); + exit(-1); + } on SocketException catch (e) { + // Raised by the socket layer + print('EXAMPLE::socket exception - $e'); + client.disconnect(); + exit(-1); + } + + /// Check we are not connected + if (client.connectionStatus!.state != MqttConnectionState.connected) { + print('EXAMPLE::Mosquitto client not connected'); + } + + exit(0); +} + +/// Failed connection attempt callback +void failedConnectionAttemptCallback(int attempt) { + print('EXAMPLE::onFailedConnectionAttempt, attempt number is $attempt'); + if (attempt == 3) { + client.disconnect(); + } +} + +/// The unsolicited disconnect callback +void onDisconnected() { + print('EXAMPLE::OnDisconnected client callback - Client disconnection'); + if (client.connectionStatus!.disconnectionOrigin == + MqttDisconnectionOrigin.solicited) { + print('EXAMPLE::OnDisconnected callback is solicited, this is correct'); + } else { + print( + 'EXAMPLE::OnDisconnected callback is unsolicited or none, this is incorrect - exiting'); + exit(-1); + } +} diff --git a/lib/src/connectionhandling/browser/mqtt_client_synchronous_mqtt_browser_connection_handler.dart b/lib/src/connectionhandling/browser/mqtt_client_synchronous_mqtt_browser_connection_handler.dart index 927a60c..5ca3818 100644 --- a/lib/src/connectionhandling/browser/mqtt_client_synchronous_mqtt_browser_connection_handler.dart +++ b/lib/src/connectionhandling/browser/mqtt_client_synchronous_mqtt_browser_connection_handler.dart @@ -78,27 +78,45 @@ class SynchronousMqttBrowserConnectionHandler // We're the sync connection handler so we need to wait for the // brokers acknowledgement of the connections await connectTimer.sleep(); + connectionAttempts++; MqttLogger.log( 'SynchronousMqttBrowserConnectionHandler::internalConnect - ' 'post sleep, state = $connectionStatus'); + if (connectionStatus.state != MqttConnectionState.connected) { + if (!autoReconnectInProgress) { + MqttLogger.log( + 'SynchronousMqttBrowserConnectionHandler::internalConnect failed, attempt $connectionAttempts'); + if (onFailedConnectionAttempt != null) { + MqttLogger.log( + 'SynchronousMqttBrowserConnectionHandler::calling onFailedConnectionAttempt'); + onFailedConnectionAttempt!(connectionAttempts); + } + } + } } while (connectionStatus.state != MqttConnectionState.connected && - ++connectionAttempts < maxConnectionAttempts!); + connectionAttempts < maxConnectionAttempts!); // If we've failed to handshake with the broker, throw an exception. if (connectionStatus.state != MqttConnectionState.connected) { if (!autoReconnectInProgress) { MqttLogger.log( 'SynchronousMqttBrowserConnectionHandler::internalConnect failed'); - if (connectionStatus.returnCode == - MqttConnectReturnCode.noneSpecified) { - throw NoConnectionException('The maximum allowed connection attempts ' - '({$maxConnectionAttempts}) were exceeded. ' - 'The broker is not responding to the connection request message ' - '(Missing Connection Acknowledgement?'); + if (onFailedConnectionAttempt == null) { + if (connectionStatus.returnCode == + MqttConnectReturnCode.noneSpecified) { + throw NoConnectionException( + 'The maximum allowed connection attempts ' + '({$maxConnectionAttempts}) were exceeded. ' + 'The broker is not responding to the connection request message ' + '(Missing Connection Acknowledgement?'); + } else { + throw NoConnectionException( + 'The maximum allowed connection attempts ' + '({$maxConnectionAttempts}) were exceeded. ' + 'The broker is not responding to the connection request message correctly ' + 'The return code is ${connectionStatus.returnCode}'); + } } else { - throw NoConnectionException('The maximum allowed connection attempts ' - '({$maxConnectionAttempts}) were exceeded. ' - 'The broker is not responding to the connection request message correctly ' - 'The return code is ${connectionStatus.returnCode}'); + connectionStatus.state = MqttConnectionState.faulted; } } } diff --git a/lib/src/connectionhandling/mqtt_client_imqtt_connection_handler.dart b/lib/src/connectionhandling/mqtt_client_imqtt_connection_handler.dart index 9e663c9..5ddec5e 100644 --- a/lib/src/connectionhandling/mqtt_client_imqtt_connection_handler.dart +++ b/lib/src/connectionhandling/mqtt_client_imqtt_connection_handler.dart @@ -27,6 +27,9 @@ abstract class IMqttConnectionHandler { /// Auto reconnected callback AutoReconnectCompleteCallback? onAutoReconnected; + /// Failed connection attempt callback + FailedConnectionAttemptCallback? onFailedConnectionAttempt; + /// Auto reconnect in progress bool autoReconnectInProgress = false; diff --git a/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler_base.dart b/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler_base.dart index b710d67..fde412a 100644 --- a/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler_base.dart +++ b/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler_base.dart @@ -30,6 +30,10 @@ abstract class MqttConnectionHandlerBase implements IMqttConnectionHandler { @override AutoReconnectCompleteCallback? onAutoReconnected; + /// Failed connection attempt callback + @override + FailedConnectionAttemptCallback? onFailedConnectionAttempt; + /// Auto reconnect in progress @override bool autoReconnectInProgress = false; diff --git a/lib/src/connectionhandling/server/mqtt_client_synchronous_mqtt_server_connection_handler.dart b/lib/src/connectionhandling/server/mqtt_client_synchronous_mqtt_server_connection_handler.dart index 306dc3a..c8abf3b 100644 --- a/lib/src/connectionhandling/server/mqtt_client_synchronous_mqtt_server_connection_handler.dart +++ b/lib/src/connectionhandling/server/mqtt_client_synchronous_mqtt_server_connection_handler.dart @@ -108,27 +108,45 @@ class SynchronousMqttServerConnectionHandler // We're the sync connection handler so we need to wait for the // brokers acknowledgement of the connections await connectTimer.sleep(); + connectionAttempts++; MqttLogger.log( 'SynchronousMqttServerConnectionHandler::internalConnect - ' 'post sleep, state = $connectionStatus'); + if (connectionStatus.state != MqttConnectionState.connected) { + if (!autoReconnectInProgress) { + MqttLogger.log( + 'SynchronousMqttServerConnectionHandler::internalConnect failed, attempt $connectionAttempts'); + if (onFailedConnectionAttempt != null) { + MqttLogger.log( + 'SynchronousMqttServerConnectionHandler::calling onFailedConnectionAttempt'); + onFailedConnectionAttempt!(connectionAttempts); + } + } + } } while (connectionStatus.state != MqttConnectionState.connected && - ++connectionAttempts < maxConnectionAttempts!); + connectionAttempts < maxConnectionAttempts!); // If we've failed to handshake with the broker, throw an exception. if (connectionStatus.state != MqttConnectionState.connected) { if (!autoReconnectInProgress) { MqttLogger.log( 'SynchronousMqttServerConnectionHandler::internalConnect failed'); - if (connectionStatus.returnCode == - MqttConnectReturnCode.noneSpecified) { - throw NoConnectionException('The maximum allowed connection attempts ' - '({$maxConnectionAttempts}) were exceeded. ' - 'The broker is not responding to the connection request message ' - '(Missing Connection Acknowledgement?'); + if (onFailedConnectionAttempt == null) { + if (connectionStatus.returnCode == + MqttConnectReturnCode.noneSpecified) { + throw NoConnectionException( + 'The maximum allowed connection attempts ' + '({$maxConnectionAttempts}) were exceeded. ' + 'The broker is not responding to the connection request message ' + '(Missing Connection Acknowledgement?'); + } else { + throw NoConnectionException( + 'The maximum allowed connection attempts ' + '({$maxConnectionAttempts}) were exceeded. ' + 'The broker is not responding to the connection request message correctly ' + 'The return code is ${connectionStatus.returnCode}'); + } } else { - throw NoConnectionException('The maximum allowed connection attempts ' - '({$maxConnectionAttempts}) were exceeded. ' - 'The broker is not responding to the connection request message correctly ' - 'The return code is ${connectionStatus.returnCode}'); + connectionStatus.state = MqttConnectionState.faulted; } } } diff --git a/lib/src/mqtt_browser_client.dart b/lib/src/mqtt_browser_client.dart index 21346a5..a06f5c8 100644 --- a/lib/src/mqtt_browser_client.dart +++ b/lib/src/mqtt_browser_client.dart @@ -15,7 +15,8 @@ class MqttBrowserClient extends MqttClient { MqttBrowserClient( super.server, super.clientIdentifier, { - this.maxConnectionAttempts = 3, + this.maxConnectionAttempts = + MqttClientConstants.defaultMaxConnectionAttempts, }); /// Initializes a new instance of the MqttServerClient class using @@ -27,7 +28,8 @@ class MqttBrowserClient extends MqttClient { super.server, super.clientIdentifier, super.port, { - this.maxConnectionAttempts = 3, + this.maxConnectionAttempts = + MqttClientConstants.defaultMaxConnectionAttempts, }) : super.withPort(); /// Max connection attempts diff --git a/lib/src/mqtt_client.dart b/lib/src/mqtt_client.dart index a4becca..c2bfb7b 100755 --- a/lib/src/mqtt_client.dart +++ b/lib/src/mqtt_client.dart @@ -19,6 +19,9 @@ typedef AutoReconnectCallback = void Function(); /// The client auto reconnect complete callback type typedef AutoReconnectCompleteCallback = void Function(); +/// The client failed connection attempt callback +typedef FailedConnectionAttemptCallback = void Function(int attemptNumber); + /// A client class for interacting with MQTT Data Packets. /// Do not instantiate this class directly, instead instantiate /// either a [MqttClientServer] class or an [MqttBrowserClient] as needed. @@ -201,6 +204,14 @@ class MqttClient { /// perform any post auto reconnect actions. AutoReconnectCompleteCallback? onAutoReconnected; + /// Failed Connection attempt callback. + /// Called on every failed connection attempt, if [maxConnectionAttempts] is + /// set to 5 say this will be called 5 times if the connection fails, + /// one for every failed attempt. Note this is never called + /// if [autoReconnect] is set, also the [NoConnectionException] is not raised + /// if this callback is supplied. + FailedConnectionAttemptCallback? onFailedConnectionAttempt; + /// Subscribed callback, function returns a void and takes a /// string parameter, the topic that has been subscribed to. SubscribeCallback? _onSubscribed; @@ -288,6 +299,8 @@ class MqttClient { connectionHandler.onConnected = onConnected; connectionHandler.onAutoReconnect = onAutoReconnect; connectionHandler.onAutoReconnected = onAutoReconnected; + connectionHandler.onFailedConnectionAttempt = onFailedConnectionAttempt; + MqttLogger.log( 'MqttClient::connect - Connection timeout period is $connectTimeoutPeriod milliseconds'); publishingManager = PublishingManager(connectionHandler, clientEventBus); diff --git a/lib/src/mqtt_client_constants.dart b/lib/src/mqtt_client_constants.dart index 42ff4c2..b8531e5 100644 --- a/lib/src/mqtt_client_constants.dart +++ b/lib/src/mqtt_client_constants.dart @@ -26,7 +26,10 @@ class MqttClientConstants { /// Default keep alive in seconds. /// The default of 0 disables keep alive. - static int defaultKeepAlive = 0; + static const int defaultKeepAlive = 0; + + /// Default maximum connection attempts + static const int defaultMaxConnectionAttempts = 3; /// Protocol variants /// V3 diff --git a/lib/src/mqtt_server_client.dart b/lib/src/mqtt_server_client.dart index f9f98ac..fe022b7 100644 --- a/lib/src/mqtt_server_client.dart +++ b/lib/src/mqtt_server_client.dart @@ -15,7 +15,8 @@ class MqttServerClient extends MqttClient { MqttServerClient( super.server, super.clientIdentifier, { - this.maxConnectionAttempts = 3, + this.maxConnectionAttempts = + MqttClientConstants.defaultMaxConnectionAttempts, }); /// Initializes a new instance of the MqttServerClient class using @@ -27,7 +28,8 @@ class MqttServerClient extends MqttClient { super.server, super.clientIdentifier, super.port, { - this.maxConnectionAttempts = 3, + this.maxConnectionAttempts = + MqttClientConstants.defaultMaxConnectionAttempts, }) : super.withPort(); /// The security context for secure usage diff --git a/test/mqtt_client_connection_autoreconnect_nobroker_test.dart b/test/mqtt_client_connection_autoreconnect_nobroker_test.dart index 53f71f1..488a596 100644 --- a/test/mqtt_client_connection_autoreconnect_nobroker_test.dart +++ b/test/mqtt_client_connection_autoreconnect_nobroker_test.dart @@ -21,6 +21,7 @@ void main() { await IOOverrides.runZoned(() async { var autoReconnectCallbackCalled = false; var disconnectCallbackCalled = false; + var connectionFailedCallbackCalled = false; void autoReconnect() { autoReconnectCallbackCalled = true; @@ -30,6 +31,10 @@ void main() { disconnectCallbackCalled = true; } + void connectionFailed(int attempt) { + connectionFailedCallbackCalled = true; + } + final client = MqttServerClient('localhost', testClientId); client.logging(on: true); client.keepAlivePeriod = 1; @@ -38,6 +43,7 @@ void main() { client.socketOptions.add(socketOption); client.onAutoReconnect = autoReconnect; client.onDisconnected = disconnect; + client.onFailedConnectionAttempt = connectionFailed; const username = 'unused 4'; print(username); const password = 'password 4'; @@ -48,6 +54,7 @@ void main() { await MqttUtilities.asyncSleep(2); expect(autoReconnectCallbackCalled, isTrue); expect(disconnectCallbackCalled, isFalse); + expect(connectionFailedCallbackCalled, isFalse); expect(client.connectionStatus!.state == MqttConnectionState.connecting, isTrue); }, diff --git a/test/mqtt_client_connection_unsecure_test.dart b/test/mqtt_client_connection_unsecure_test.dart index 4b8a3b6..4f22d78 100644 --- a/test/mqtt_client_connection_unsecure_test.dart +++ b/test/mqtt_client_connection_unsecure_test.dart @@ -121,6 +121,90 @@ void main() { timeout: timeout)); }); + test('Connect no connect ack onFailedConnectionAttempt callback set', + () async { + await IOOverrides.runZoned(() async { + bool connectionFailed = false; + int tAttempt = 0; + final lAttempt = []; + void onFailedConnectionAttempt(int attempt) { + tAttempt++; + lAttempt.add(attempt); + connectionFailed = true; + } + + final clientEventBus = events.EventBus(); + final ch = SynchronousMqttServerConnectionHandler(clientEventBus, + maxConnectionAttempts: 3, socketOptions: socketOptions); + ch.onFailedConnectionAttempt = onFailedConnectionAttempt; + final start = DateTime.now(); + try { + await ch.connect(mockBrokerAddress, mockBrokerPort, + MqttConnectMessage().withClientIdentifier(testClientId)); + } on Exception catch (e) { + expect(e is NoConnectionException, isTrue); + } + expect(connectionFailed, isTrue); + expect(tAttempt, 3); + expect(lAttempt, [1, 2, 3]); + expect(ch.connectionStatus.state, MqttConnectionState.faulted); + expect(ch.connectionStatus.returnCode, + MqttConnectReturnCode.noneSpecified); + final end = DateTime.now(); + expect(end.difference(start).inSeconds > 4, true); + }, + socketConnect: (dynamic host, int port, + {dynamic sourceAddress, + int sourcePort = 0, + Duration? timeout}) => + MqttMockSocketSimpleConnectNoAck.connect(host, port, + sourceAddress: sourceAddress, + sourcePort: sourcePort, + timeout: timeout)); + }); + + test('Connect no connect ack onFailedConnectionAttempt callback set', + () async { + await IOOverrides.runZoned(() async { + bool connectionFailed = false; + int tAttempt = 0; + final lAttempt = []; + void onFailedConnectionAttempt(int attempt) { + tAttempt++; + lAttempt.add(attempt); + connectionFailed = true; + } + + final clientEventBus = events.EventBus(); + final ch = SynchronousMqttServerConnectionHandler(clientEventBus, + maxConnectionAttempts: 3, socketOptions: socketOptions); + ch.onFailedConnectionAttempt = onFailedConnectionAttempt; + final start = DateTime.now(); + try { + await ch.connect(mockBrokerAddress, mockBrokerPort, + MqttConnectMessage().withClientIdentifier(testClientId)); + } on Exception catch (e) { + expect(e is NoConnectionException, isTrue); + } + expect(connectionFailed, isTrue); + expect(tAttempt, 3); + expect(lAttempt, [1, 2, 3]); + expect(ch.connectionStatus.state, MqttConnectionState.faulted); + expect(ch.connectionStatus.returnCode, + MqttConnectReturnCode.noneSpecified); + final end = DateTime.now(); + expect(end.difference(start).inSeconds > 4, true); + }, + socketConnect: (dynamic host, int port, + {dynamic sourceAddress, + int sourcePort = 0, + Duration? timeout}) => + MqttMockSocketSimpleConnectNoAck.connect(host, port, + sourceAddress: sourceAddress, + sourcePort: sourcePort, + timeout: timeout)); + }); + test('1000ms connect period', () async { await IOOverrides.runZoned(() async { final clientEventBus = events.EventBus();