diff --git a/example/mqtt_server_client.dart b/example/mqtt_server_client.dart index 87ec0f2..8033f79 100644 --- a/example/mqtt_server_client.dart +++ b/example/mqtt_server_client.dart @@ -27,6 +27,7 @@ import 'package:mqtt_client/mqtt_server_client.dart'; final client = MqttServerClient('test.mosquitto.org', ''); var pongCount = 0; // Pong counter +var pingCount = 0; // Ping counter Future main() async { /// A websocket URL must start with ws:// or wss:// or Dart will throw an exception, consult your websocket MQTT broker @@ -41,7 +42,7 @@ Future main() async { /// list so in most cases you can ignore this. /// Set logging on if needed, defaults to off - client.logging(on: true); + client.logging(on: false); /// Set the correct MQTT protocol for mosquito client.setProtocolV311(); @@ -66,9 +67,13 @@ Future main() async { client.onSubscribed = onSubscribed; /// Set a ping received callback if needed, called whenever a ping response(pong) is received - /// from the broker. + /// from the broker. Can be used for health monitoring. client.pongCallback = pong; + /// Set a ping sent callback if needed, called whenever a ping request(ping) is sent + /// by the client. Can be used for latency calculations. + client.pingCallback = ping; + /// Create a connection message to use or use the default one. The default one sets the /// client identifier, any supplied username/password and clean session, /// an example of a specific one below. @@ -160,6 +165,13 @@ Future main() async { print('EXAMPLE::Sleeping....'); await MqttUtilities.asyncSleep(60); + /// Print the ping/pong cycle latency data before disconnecting. + print('EXAMPLE::Keep alive latencies'); + print( + 'The latency of the last ping/pong cycle is ${client.lastCycleLatency} milliseconds'); + print( + 'The average latency of all the ping/pong cycles is ${client.averageCycleLatency} milliseconds'); + /// Finally, unsubscribe and exit gracefully print('EXAMPLE::Unsubscribing'); client.unsubscribe(topic); @@ -193,6 +205,11 @@ void onDisconnected() { } else { print('EXAMPLE:: Pong count is incorrect, expected 3. actual $pongCount'); } + if (pingCount == 3) { + print('EXAMPLE:: Ping count is correct'); + } else { + print('EXAMPLE:: Ping count is incorrect, expected 3. actual $pingCount'); + } } /// The successful connect callback @@ -205,4 +222,12 @@ void onConnected() { void pong() { print('EXAMPLE::Ping response client callback invoked'); pongCount++; + print( + 'EXAMPLE::Latency of this ping/pong cycle is ${client.lastCycleLatency} milliseconds'); +} + +/// Ping callback +void ping() { + print('EXAMPLE::Ping sent client callback invoked'); + pingCount++; } diff --git a/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart b/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart index 4f7b3fc..5b7e51e 100644 --- a/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart +++ b/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart @@ -10,6 +10,9 @@ part of '../../mqtt_client.dart'; /// Ping response received callback typedef PongCallback = void Function(); +/// Ping request sent callback +typedef PingCallback = void Function(); + /// Implements keep alive functionality on the Mqtt Connection, /// ensuring that the connection remains active according to the /// keep alive seconds setting. @@ -64,9 +67,24 @@ class MqttConnectionKeepAlive { /// Used to synchronise shutdown and ping operations. bool _shutdownPadlock = false; - /// Ping response received callback + /// Ping response received callback. PongCallback? pongCallback; + /// Ping request sent callback. + PingCallback? pingCallback; + + /// Latency(time between sending a ping and receiving a pong) in ms + /// of the last ping/pong cycle. Reset on disconnect. + int lastCycleLatency = 0; + + int _lastPingTime = 0; + + /// Average latency(time between sending a ping and receiving a pong) in ms + /// of all the ping/pong cycles in a connection period. Reset on disconnect. + int averageCycleLatency = 0; + + int _cycleCount = 0; + /// The event bus events.EventBus? _clientEventBus; @@ -88,6 +106,10 @@ class MqttConnectionKeepAlive { try { _connectionHandler.sendMessage(pingMsg); pinged = true; + _lastPingTime = DateTime.now().millisecondsSinceEpoch; + if (pingCallback != null) { + pingCallback!(); + } } catch (e) { MqttLogger.log( 'MqttConnectionKeepAlive::pingRequired - exception occurred'); @@ -151,10 +173,20 @@ class MqttConnectionKeepAlive { /// Processed ping response messages received from a message broker. bool pingResponseReceived(MqttMessage? pingMsg) { MqttLogger.log('MqttConnectionKeepAlive::pingResponseReceived'); + + // Calculate latencies + lastCycleLatency = DateTime.now().millisecondsSinceEpoch - _lastPingTime; + _cycleCount++; + // Average latency calculation is + // new_avg = prev_avg + ((new_value − prev_avg) ~/ n + 1) + averageCycleLatency += + (lastCycleLatency - averageCycleLatency) ~/ _cycleCount; + // Call the pong callback if not null if (pongCallback != null) { pongCallback!(); } + // Cancel the disconnect timer if needed. disconnectTimer?.cancel(); return true; @@ -168,6 +200,9 @@ class MqttConnectionKeepAlive { MqttLogger.log('MqttConnectionKeepAlive::stop - stopping keep alive'); pingTimer!.cancel(); disconnectTimer?.cancel(); + lastCycleLatency = 0; + averageCycleLatency = 0; + _cycleCount = 0; } /// Handle the disconnect timer timeout diff --git a/lib/src/mqtt_client.dart b/lib/src/mqtt_client.dart index c2bfb7b..1986e9c 100755 --- a/lib/src/mqtt_client.dart +++ b/lib/src/mqtt_client.dart @@ -250,7 +250,7 @@ class MqttClient { subscriptionsManager?.onUnsubscribed = cb; } - /// Ping response received callback. + /// Ping response(pong) received callback. /// If set when a ping response is received from the broker /// this will be called. /// Can be used for health monitoring outside of the client itself. @@ -264,6 +264,28 @@ class MqttClient { keepAlive?.pongCallback = cb; } + /// Ping request(ping) sent callback. + /// If set when a ping request is sent from the client + /// this will be called. + /// Can be used in tandem with the [pongCallback] for latency calculations. + PingCallback? _pingCallback; + + /// The ping sent callback + PingCallback? get pingCallback => _pingCallback; + + set pingCallback(PingCallback? cb) { + _pingCallback = cb; + keepAlive?.pingCallback = cb; + } + + /// The latency of the last ping/pong cycle in milliseconds. + /// Cleared on disconnect. + int? get lastCycleLatency => keepAlive?.lastCycleLatency; + + /// The average latency of all ping/pong cycles in a connection period in + /// milliseconds. Cleared on disconnect. + int? get averageCycleLatency => keepAlive?.averageCycleLatency; + /// The event bus @protected events.EventBus? clientEventBus; @@ -320,6 +342,9 @@ class MqttClient { if (pongCallback != null) { keepAlive!.pongCallback = pongCallback; } + if (pingCallback != null) { + keepAlive!.pingCallback = pingCallback; + } } else { MqttLogger.log('MqttClient::connect - keep alive is disabled'); } diff --git a/test/mqtt_client_keep_alive_test.dart b/test/mqtt_client_keep_alive_test.dart index e57e7b6..e23ce32 100644 --- a/test/mqtt_client_keep_alive_test.dart +++ b/test/mqtt_client_keep_alive_test.dart @@ -196,4 +196,97 @@ void main() { expect(ka.disconnectTimer, isNull); }); }); + group('Latency', () { + test('Ping callback', () async { + final clientEventBus = events.EventBus(); + var disconnect = false; + void disconnectOnNoPingResponse(DisconnectOnNoPingResponse event) { + disconnect = true; + } + + var pingCalled = false; + void pingCallback() { + pingCalled = true; + } + + clientEventBus + .on() + .listen(disconnectOnNoPingResponse); + final ch = MockCH( + clientEventBus, + maxConnectionAttempts: 3, + ); + ch.connectionStatus.state = MqttConnectionState.connected; + final ka = MqttConnectionKeepAlive(ch, clientEventBus, 2); + ka.pingCallback = pingCallback; + verify(() => ch.registerForMessage(MqttMessageType.pingRequest, any())) + .called(1); + verify(() => ch.registerForMessage(MqttMessageType.pingResponse, any())) + .called(1); + verify(() => ch.registerForAllSentMessages(ka.messageSent)).called(1); + expect(ka.pingTimer?.isActive, isTrue); + expect(ka.disconnectTimer, isNull); + await MqttUtilities.asyncSleep(3); + verify(() => ch.sendMessage(any())).called(1); + expect(pingCalled, isTrue); + final pingMessageRx = MqttPingResponseMessage(); + ka.pingResponseReceived(pingMessageRx); + expect(disconnect, isFalse); + ka.stop(); + expect(ka.pingTimer?.isActive, isFalse); + expect(ka.disconnectTimer, isNull); + }); + test('Latency counts', () async { + final latencies = [0, 0, 0]; + final clientEventBus = events.EventBus(); + var disconnect = false; + void disconnectOnNoPingResponse(DisconnectOnNoPingResponse event) { + disconnect = true; + } + + clientEventBus + .on() + .listen(disconnectOnNoPingResponse); + final ch = MockCH( + clientEventBus, + maxConnectionAttempts: 3, + ); + ch.connectionStatus.state = MqttConnectionState.connected; + final ka = MqttConnectionKeepAlive(ch, clientEventBus, 3); + verify(() => ch.registerForMessage(MqttMessageType.pingRequest, any())) + .called(1); + verify(() => ch.registerForMessage(MqttMessageType.pingResponse, any())) + .called(1); + verify(() => ch.registerForAllSentMessages(ka.messageSent)).called(1); + expect(ka.pingTimer?.isActive, isTrue); + expect(ka.disconnectTimer, isNull); + await MqttUtilities.asyncSleep(4); + verify(() => ch.sendMessage(any())).called(1); + final pingMessageRx = MqttPingResponseMessage(); + ka.pingResponseReceived(pingMessageRx); + latencies[0] = ka.lastCycleLatency; + expect(ka.lastCycleLatency > 1000, isTrue); + expect(ka.averageCycleLatency > 1000, isTrue); + await MqttUtilities.asyncSleep(3); + verify(() => ch.sendMessage(any())).called(1); + ka.pingResponseReceived(pingMessageRx); + latencies[1] = ka.lastCycleLatency; + expect(ka.lastCycleLatency > 1000, isTrue); + expect(ka.averageCycleLatency > 1000, isTrue); + await MqttUtilities.asyncSleep(3); + verify(() => ch.sendMessage(any())).called(1); + ka.pingResponseReceived(pingMessageRx); + latencies[2] = ka.lastCycleLatency; + expect(ka.lastCycleLatency > 1000, isTrue); + expect(ka.averageCycleLatency > 1000, isTrue); + expect(ka.averageCycleLatency, + (latencies[0] + latencies[1] + latencies[2]) ~/ 3); + expect(disconnect, isFalse); + ka.stop(); + expect(ka.averageCycleLatency, 0); + expect(ka.lastCycleLatency, 0); + expect(ka.pingTimer?.isActive, isFalse); + expect(ka.disconnectTimer, isNull); + }); + }); }