Skip to content

Commit

Permalink
Issue523 (#543)
Browse files Browse the repository at this point in the history
* Issue 523

* Issue 523

* Issue 523

* Issue 523

* Issue 523

* Issue 523
  • Loading branch information
shamblett authored Jul 5, 2024
1 parent f511b15 commit e9c6819
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 27 deletions.
105 changes: 105 additions & 0 deletions example/mqtt_server_client_failed_connection.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Package : mqtt_client
* Author : S. Hamblett <[email protected]>
* Date : 31/05/2017
* Copyright : S.Hamblett
*/

import 'dart:async';
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

/// An annotated connection attempt failed usage example for mqtt_server_client.
///
/// To run this example on a linux host please execute 'netcat -l 1883' at the command line.
/// Use a suitably equivalent command for other hosts.
///
/// First create a client, the client is constructed with a broker name, client identifier
/// and port if needed. The client identifier (short ClientId) is an identifier of each MQTT
/// client connecting to a MQTT broker. As the word identifier already suggests, it should be unique per broker.
/// The broker uses it for identifying the client and the current state of the client. If you don’t need a state
/// to be hold by the broker, in MQTT 3.1.1 you can set an empty ClientId, which results in a connection without any state.
/// A condition is that clean session connect flag is true, otherwise the connection will be rejected.
/// The client identifier can be a maximum length of 23 characters. If a port is not specified the standard port
/// of 1883 is used.
/// Connect to a resolvable host that is not running a broker, hence the connection will fail.
/// Set the maximum connection attempts to 3.
final client = MqttServerClient('localhost', '', maxConnectionAttempts: 3);

Future<int> main() async {
/// Set logging on if needed, defaults to off
client.logging(on: false);

/// Set the correct MQTT protocol for mosquito
client.setProtocolV311();

/// The connection timeout period can be set if needed, the default is 5 seconds.
client.connectTimeoutPeriod = 2000; // milliseconds

/// Add the unsolicited disconnection callback
client.onDisconnected = onDisconnected;

/// Add the failed connection attempt callback.
/// This callback will be called on every failed connection attempt, in the case of this
/// example it will be called 3 times at a period of 2 seconds.
client.onFailedConnectionAttempt = failedConnectionAttemptCallback;

/// Create a connection message to use or use the default one. The default one sets the
/// client identifier, any supplied username/password and clean session,
/// an example of a specific one below.
final connMess = MqttConnectMessage()
.withClientIdentifier('Mqtt_MyClientUniqueId')
.withWillTopic('willtopic') // If you set this you must set a will message
.withWillMessage('My Will message')
.startClean() // Non persistent session for testing
.withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
client.connectionMessage = connMess;

/// Connect the client, any errors here are communicated via the failed
/// connection attempts callback
try {
await client.connect();
} on NoConnectionException catch (e) {
// Raised by the client when connection fails.
print('EXAMPLE::client exception - $e');
client.disconnect();
exit(-1);
} on SocketException catch (e) {
// Raised by the socket layer
print('EXAMPLE::socket exception - $e');
client.disconnect();
exit(-1);
}

/// Check we are not connected
if (client.connectionStatus!.state != MqttConnectionState.connected) {
print('EXAMPLE::Mosquitto client not connected');
}

exit(0);
}

/// Failed connection attempt callback
void failedConnectionAttemptCallback(int attempt) {
print('EXAMPLE::onFailedConnectionAttempt, attempt number is $attempt');
if (attempt == 3) {
client.disconnect();
}
}

/// The unsolicited disconnect callback
void onDisconnected() {
print('EXAMPLE::OnDisconnected client callback - Client disconnection');
if (client.connectionStatus!.disconnectionOrigin ==
MqttDisconnectionOrigin.solicited) {
print('EXAMPLE::OnDisconnected callback is solicited, this is correct');
} else {
print(
'EXAMPLE::OnDisconnected callback is unsolicited or none, this is incorrect - exiting');
exit(-1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,45 @@ class SynchronousMqttBrowserConnectionHandler
// We're the sync connection handler so we need to wait for the
// brokers acknowledgement of the connections
await connectTimer.sleep();
connectionAttempts++;
MqttLogger.log(
'SynchronousMqttBrowserConnectionHandler::internalConnect - '
'post sleep, state = $connectionStatus');
if (connectionStatus.state != MqttConnectionState.connected) {
if (!autoReconnectInProgress) {
MqttLogger.log(
'SynchronousMqttBrowserConnectionHandler::internalConnect failed, attempt $connectionAttempts');
if (onFailedConnectionAttempt != null) {
MqttLogger.log(
'SynchronousMqttBrowserConnectionHandler::calling onFailedConnectionAttempt');
onFailedConnectionAttempt!(connectionAttempts);
}
}
}
} while (connectionStatus.state != MqttConnectionState.connected &&
++connectionAttempts < maxConnectionAttempts!);
connectionAttempts < maxConnectionAttempts!);
// If we've failed to handshake with the broker, throw an exception.
if (connectionStatus.state != MqttConnectionState.connected) {
if (!autoReconnectInProgress) {
MqttLogger.log(
'SynchronousMqttBrowserConnectionHandler::internalConnect failed');
if (connectionStatus.returnCode ==
MqttConnectReturnCode.noneSpecified) {
throw NoConnectionException('The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message '
'(Missing Connection Acknowledgement?');
if (onFailedConnectionAttempt == null) {
if (connectionStatus.returnCode ==
MqttConnectReturnCode.noneSpecified) {
throw NoConnectionException(
'The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message '
'(Missing Connection Acknowledgement?');
} else {
throw NoConnectionException(
'The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message correctly '
'The return code is ${connectionStatus.returnCode}');
}
} else {
throw NoConnectionException('The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message correctly '
'The return code is ${connectionStatus.returnCode}');
connectionStatus.state = MqttConnectionState.faulted;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ abstract class IMqttConnectionHandler {
/// Auto reconnected callback
AutoReconnectCompleteCallback? onAutoReconnected;

/// Failed connection attempt callback
FailedConnectionAttemptCallback? onFailedConnectionAttempt;

/// Auto reconnect in progress
bool autoReconnectInProgress = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ abstract class MqttConnectionHandlerBase implements IMqttConnectionHandler {
@override
AutoReconnectCompleteCallback? onAutoReconnected;

/// Failed connection attempt callback
@override
FailedConnectionAttemptCallback? onFailedConnectionAttempt;

/// Auto reconnect in progress
@override
bool autoReconnectInProgress = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,27 +108,45 @@ class SynchronousMqttServerConnectionHandler
// We're the sync connection handler so we need to wait for the
// brokers acknowledgement of the connections
await connectTimer.sleep();
connectionAttempts++;
MqttLogger.log(
'SynchronousMqttServerConnectionHandler::internalConnect - '
'post sleep, state = $connectionStatus');
if (connectionStatus.state != MqttConnectionState.connected) {
if (!autoReconnectInProgress) {
MqttLogger.log(
'SynchronousMqttServerConnectionHandler::internalConnect failed, attempt $connectionAttempts');
if (onFailedConnectionAttempt != null) {
MqttLogger.log(
'SynchronousMqttServerConnectionHandler::calling onFailedConnectionAttempt');
onFailedConnectionAttempt!(connectionAttempts);
}
}
}
} while (connectionStatus.state != MqttConnectionState.connected &&
++connectionAttempts < maxConnectionAttempts!);
connectionAttempts < maxConnectionAttempts!);
// If we've failed to handshake with the broker, throw an exception.
if (connectionStatus.state != MqttConnectionState.connected) {
if (!autoReconnectInProgress) {
MqttLogger.log(
'SynchronousMqttServerConnectionHandler::internalConnect failed');
if (connectionStatus.returnCode ==
MqttConnectReturnCode.noneSpecified) {
throw NoConnectionException('The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message '
'(Missing Connection Acknowledgement?');
if (onFailedConnectionAttempt == null) {
if (connectionStatus.returnCode ==
MqttConnectReturnCode.noneSpecified) {
throw NoConnectionException(
'The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message '
'(Missing Connection Acknowledgement?');
} else {
throw NoConnectionException(
'The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message correctly '
'The return code is ${connectionStatus.returnCode}');
}
} else {
throw NoConnectionException('The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message correctly '
'The return code is ${connectionStatus.returnCode}');
connectionStatus.state = MqttConnectionState.faulted;
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions lib/src/mqtt_browser_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class MqttBrowserClient extends MqttClient {
MqttBrowserClient(
super.server,
super.clientIdentifier, {
this.maxConnectionAttempts = 3,
this.maxConnectionAttempts =
MqttClientConstants.defaultMaxConnectionAttempts,
});

/// Initializes a new instance of the MqttServerClient class using
Expand All @@ -27,7 +28,8 @@ class MqttBrowserClient extends MqttClient {
super.server,
super.clientIdentifier,
super.port, {
this.maxConnectionAttempts = 3,
this.maxConnectionAttempts =
MqttClientConstants.defaultMaxConnectionAttempts,
}) : super.withPort();

/// Max connection attempts
Expand Down
13 changes: 13 additions & 0 deletions lib/src/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ typedef AutoReconnectCallback = void Function();
/// The client auto reconnect complete callback type
typedef AutoReconnectCompleteCallback = void Function();

/// The client failed connection attempt callback
typedef FailedConnectionAttemptCallback = void Function(int attemptNumber);

/// A client class for interacting with MQTT Data Packets.
/// Do not instantiate this class directly, instead instantiate
/// either a [MqttClientServer] class or an [MqttBrowserClient] as needed.
Expand Down Expand Up @@ -201,6 +204,14 @@ class MqttClient {
/// perform any post auto reconnect actions.
AutoReconnectCompleteCallback? onAutoReconnected;

/// Failed Connection attempt callback.
/// Called on every failed connection attempt, if [maxConnectionAttempts] is
/// set to 5 say this will be called 5 times if the connection fails,
/// one for every failed attempt. Note this is never called
/// if [autoReconnect] is set, also the [NoConnectionException] is not raised
/// if this callback is supplied.
FailedConnectionAttemptCallback? onFailedConnectionAttempt;

/// Subscribed callback, function returns a void and takes a
/// string parameter, the topic that has been subscribed to.
SubscribeCallback? _onSubscribed;
Expand Down Expand Up @@ -288,6 +299,8 @@ class MqttClient {
connectionHandler.onConnected = onConnected;
connectionHandler.onAutoReconnect = onAutoReconnect;
connectionHandler.onAutoReconnected = onAutoReconnected;
connectionHandler.onFailedConnectionAttempt = onFailedConnectionAttempt;

MqttLogger.log(
'MqttClient::connect - Connection timeout period is $connectTimeoutPeriod milliseconds');
publishingManager = PublishingManager(connectionHandler, clientEventBus);
Expand Down
5 changes: 4 additions & 1 deletion lib/src/mqtt_client_constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class MqttClientConstants {

/// Default keep alive in seconds.
/// The default of 0 disables keep alive.
static int defaultKeepAlive = 0;
static const int defaultKeepAlive = 0;

/// Default maximum connection attempts
static const int defaultMaxConnectionAttempts = 3;

/// Protocol variants
/// V3
Expand Down
6 changes: 4 additions & 2 deletions lib/src/mqtt_server_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class MqttServerClient extends MqttClient {
MqttServerClient(
super.server,
super.clientIdentifier, {
this.maxConnectionAttempts = 3,
this.maxConnectionAttempts =
MqttClientConstants.defaultMaxConnectionAttempts,
});

/// Initializes a new instance of the MqttServerClient class using
Expand All @@ -27,7 +28,8 @@ class MqttServerClient extends MqttClient {
super.server,
super.clientIdentifier,
super.port, {
this.maxConnectionAttempts = 3,
this.maxConnectionAttempts =
MqttClientConstants.defaultMaxConnectionAttempts,
}) : super.withPort();

/// The security context for secure usage
Expand Down
7 changes: 7 additions & 0 deletions test/mqtt_client_connection_autoreconnect_nobroker_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ void main() {
await IOOverrides.runZoned(() async {
var autoReconnectCallbackCalled = false;
var disconnectCallbackCalled = false;
var connectionFailedCallbackCalled = false;

void autoReconnect() {
autoReconnectCallbackCalled = true;
Expand All @@ -30,6 +31,10 @@ void main() {
disconnectCallbackCalled = true;
}

void connectionFailed(int attempt) {
connectionFailedCallbackCalled = true;
}

final client = MqttServerClient('localhost', testClientId);
client.logging(on: true);
client.keepAlivePeriod = 1;
Expand All @@ -38,6 +43,7 @@ void main() {
client.socketOptions.add(socketOption);
client.onAutoReconnect = autoReconnect;
client.onDisconnected = disconnect;
client.onFailedConnectionAttempt = connectionFailed;
const username = 'unused 4';
print(username);
const password = 'password 4';
Expand All @@ -48,6 +54,7 @@ void main() {
await MqttUtilities.asyncSleep(2);
expect(autoReconnectCallbackCalled, isTrue);
expect(disconnectCallbackCalled, isFalse);
expect(connectionFailedCallbackCalled, isFalse);
expect(client.connectionStatus!.state == MqttConnectionState.connecting,
isTrue);
},
Expand Down
Loading

0 comments on commit e9c6819

Please sign in to comment.