diff --git a/.github/workflows/ReleaseNotes.md b/.github/workflows/ReleaseNotes.md index b5b84be15..6d7c38411 100644 --- a/.github/workflows/ReleaseNotes.md +++ b/.github/workflows/ReleaseNotes.md @@ -7,6 +7,7 @@ * [Client] Exposed _UseDefaultCredentials_ and more for Web Socket options and proxy options (#1734, thanks to @impworks). * [Client] Exposed more TLS options (#1729). * [Client] Fixed wrong return code conversion (#1729). +* [Client] Added an option to avoid throwing an exception when the server returned a proper non success (but valid) response (#1681). * [Server] Improved performance by changing internal locking strategy for subscriptions (#1716, thanks to @zeheng). * [Server] Fixed exceptions when clients are connecting and disconnecting very fast while accessing the client status for connection validation (#1742). * [Server] Exposed more properties in _ClientConnectedEventArgs_ (#1738). diff --git a/Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Connection_Tests.cs b/Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Connection_Tests.cs index 4a2e1f063..d38fc6926 100644 --- a/Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Connection_Tests.cs +++ b/Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Connection_Tests.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -10,6 +11,7 @@ using MQTTnet.Exceptions; using MQTTnet.Formatter; using MQTTnet.Internal; +using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server; @@ -128,7 +130,7 @@ public async Task Disconnect_Clean_With_Custom_Reason() await client.DisconnectAsync(disconnectOptions); await LongTestDelay(); - + Assert.IsNotNull(eventArgs); Assert.AreEqual(MqttDisconnectReasonCode.MessageRateTooHigh, eventArgs.ReasonCode); } @@ -156,7 +158,7 @@ public async Task Disconnect_Clean_With_User_Properties() await client.DisconnectAsync(disconnectOptions); await LongTestDelay(); - + Assert.IsNotNull(eventArgs); Assert.IsNotNull(eventArgs.UserProperties); Assert.AreEqual(1, eventArgs.UserProperties.Count); @@ -164,5 +166,35 @@ public async Task Disconnect_Clean_With_User_Properties() Assert.AreEqual("test_value", eventArgs.UserProperties[0].Value); } } + + [TestMethod] + public async Task Return_Non_Success() + { + using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500)) + { + var server = await testEnvironment.StartServer(); + + server.ValidatingConnectionAsync += args => + { + args.ResponseUserProperties = new List + { + new MqttUserProperty("Property", "Value") + }; + + args.ReasonCode = MqttConnectReasonCode.QuotaExceeded; + + return CompletedTask.Instance; + }; + + var client = testEnvironment.CreateClient(); + + var response = await client.ConnectAsync(testEnvironment.CreateDefaultClientOptionsBuilder().WithoutThrowOnNonSuccessfulConnectResponse().Build()); + + Assert.IsNotNull(response); + Assert.AreEqual(MqttClientConnectResultCode.QuotaExceeded, response.ResultCode); + Assert.AreEqual(response.UserProperties[0].Name, "Property"); + Assert.AreEqual(response.UserProperties[0].Value, "Value"); + } + } } } \ No newline at end of file diff --git a/Source/MQTTnet/Client/Internal/MqttClientResultFactory.cs b/Source/MQTTnet/Client/Internal/MqttClientResultFactory.cs index ef2800fe7..6629657eb 100644 --- a/Source/MQTTnet/Client/Internal/MqttClientResultFactory.cs +++ b/Source/MQTTnet/Client/Internal/MqttClientResultFactory.cs @@ -6,6 +6,7 @@ namespace MQTTnet.Client.Internal { public static class MqttClientResultFactory { + public static readonly MqttClientConnectResultFactory ConnectResult = new MqttClientConnectResultFactory(); public static readonly MqttClientPublishResultFactory PublishResult = new MqttClientPublishResultFactory(); public static readonly MqttClientSubscribeResultFactory SubscribeResult = new MqttClientSubscribeResultFactory(); public static readonly MqttClientUnsubscribeResultFactory UnsubscribeResult = new MqttClientUnsubscribeResultFactory(); diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 365a2f91c..12c282028 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -137,6 +137,12 @@ public async Task ConnectAsync(MqttClientOptions option } } + if (connectResult.ResultCode != MqttClientConnectResultCode.Success) + { + _logger.Warning("Connecting failed: {0}", connectResult.ResultCode); + return connectResult; + } + _lastPacketSentTimestamp = DateTime.UtcNow; var keepAliveInterval = Options.KeepAlivePeriod; @@ -434,8 +440,7 @@ async Task Authenticate(IMqttChannelAdapter channelAdap if (receivedPacket is MqttConnAckPacket connAckPacket) { - var clientConnectResultFactory = new MqttClientConnectResultFactory(); - result = clientConnectResultFactory.Create(connAckPacket, channelAdapter.PacketFormatterAdapter.ProtocolVersion); + result = MqttClientResultFactory.ConnectResult.Create(connAckPacket, _adapter.PacketFormatterAdapter.ProtocolVersion); } else { @@ -447,9 +452,18 @@ async Task Authenticate(IMqttChannelAdapter channelAdap throw new MqttConnectingFailedException($"Error while authenticating. {exception.Message}", exception, null); } - if (result.ResultCode != MqttClientConnectResultCode.Success) + // This is no feature. It is basically a backward compatibility option and should be removed in the future. + // The client should not throw any exception if the transport layer connection was successful and the server + // did send a proper ACK packet with a non success response. + if (options.ThrowOnNonSuccessfulConnectResponse) { - throw new MqttConnectingFailedException($"Connecting with MQTT server failed ({result.ResultCode}).", null, result); + _logger.Warning( + "Client will now throw an _MqttConnectingFailedException_. This is obsolete and will be removed in the future. Consider setting _ThrowOnNonSuccessfulResponseFromServer=False_ in client options."); + + if (result.ResultCode != MqttClientConnectResultCode.Success) + { + throw new MqttConnectingFailedException($"Connecting with MQTT server failed ({result.ResultCode}).", null, result); + } } _logger.Verbose("Authenticated MQTT connection with server established."); @@ -498,9 +512,11 @@ async Task ConnectInternal(IMqttChannelAdapter channelA _publishPacketReceiverQueue = new AsyncQueue(); var connectResult = await Authenticate(channelAdapter, Options, effectiveCancellationToken.Token).ConfigureAwait(false); - - _publishPacketReceiverTask = Task.Run(() => ProcessReceivedPublishPackets(backgroundCancellationToken), backgroundCancellationToken); - _packetReceiverTask = Task.Run(() => ReceivePacketsLoop(backgroundCancellationToken), backgroundCancellationToken); + if (connectResult.ResultCode == MqttClientConnectResultCode.Success) + { + _publishPacketReceiverTask = Task.Run(() => ProcessReceivedPublishPackets(backgroundCancellationToken), backgroundCancellationToken); + _packetReceiverTask = Task.Run(() => ReceivePacketsLoop(backgroundCancellationToken), backgroundCancellationToken); + } return connectResult; } @@ -754,6 +770,65 @@ async Task Receive(CancellationToken cancellationToken) return packet; } + async Task ReceivePacketsLoop(CancellationToken cancellationToken) + { + try + { + _logger.Verbose("Start receiving packets."); + + while (!cancellationToken.IsCancellationRequested) + { + var packet = await Receive(cancellationToken).ConfigureAwait(false); + + if (cancellationToken.IsCancellationRequested) + { + return; + } + + if (packet == null) + { + await DisconnectInternal(_packetReceiverTask, null, null).ConfigureAwait(false); + + return; + } + + await TryProcessReceivedPacket(packet, cancellationToken).ConfigureAwait(false); + } + } + catch (Exception exception) + { + if (_cleanDisconnectInitiated) + { + return; + } + + if (exception is AggregateException aggregateException) + { + exception = aggregateException.GetBaseException(); + } + + if (exception is OperationCanceledException) + { + } + else if (exception is MqttCommunicationException) + { + _logger.Warning(exception, "Communication error while receiving packets."); + } + else + { + _logger.Error(exception, "Error while receiving packets."); + } + + _packetDispatcher.FailAll(exception); + + await DisconnectInternal(_packetReceiverTask, exception, null).ConfigureAwait(false); + } + finally + { + _logger.Verbose("Stopped receiving packets."); + } + } + async Task Request(MqttPacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttPacket { cancellationToken.ThrowIfCancellationRequested(); @@ -920,65 +995,6 @@ async Task TryProcessReceivedPacket(MqttPacket packet, CancellationToken cancell } } - async Task ReceivePacketsLoop(CancellationToken cancellationToken) - { - try - { - _logger.Verbose("Start receiving packets."); - - while (!cancellationToken.IsCancellationRequested) - { - var packet = await Receive(cancellationToken).ConfigureAwait(false); - - if (cancellationToken.IsCancellationRequested) - { - return; - } - - if (packet == null) - { - await DisconnectInternal(_packetReceiverTask, null, null).ConfigureAwait(false); - - return; - } - - await TryProcessReceivedPacket(packet, cancellationToken).ConfigureAwait(false); - } - } - catch (Exception exception) - { - if (_cleanDisconnectInitiated) - { - return; - } - - if (exception is AggregateException aggregateException) - { - exception = aggregateException.GetBaseException(); - } - - if (exception is OperationCanceledException) - { - } - else if (exception is MqttCommunicationException) - { - _logger.Warning(exception, "Communication error while receiving packets."); - } - else - { - _logger.Error(exception, "Error while receiving packets."); - } - - _packetDispatcher.FailAll(exception); - - await DisconnectInternal(_packetReceiverTask, exception, null).ConfigureAwait(false); - } - finally - { - _logger.Verbose("Stopped receiving packets."); - } - } - async Task TrySendKeepAliveMessages(CancellationToken cancellationToken) { try diff --git a/Source/MQTTnet/Client/Options/MqttClientOptions.cs b/Source/MQTTnet/Client/Options/MqttClientOptions.cs index f296777f4..614548aad 100644 --- a/Source/MQTTnet/Client/Options/MqttClientOptions.cs +++ b/Source/MQTTnet/Client/Options/MqttClientOptions.cs @@ -12,6 +12,14 @@ namespace MQTTnet.Client { public sealed class MqttClientOptions { + /// + /// Usually the MQTT packets can be send partially. This is done by using multiple TCP packets + /// or WebSocket frames etc. Unfortunately not all brokers (like Amazon Web Services (AWS)) do support this feature and + /// will close the connection when receiving such packets. If such a service is used this flag must + /// be set to _false_. + /// + public bool AllowPacketFragmentation { get; set; } = true; + /// /// Gets or sets the authentication data. /// MQTT 5.0.0+ feature. @@ -24,14 +32,6 @@ public sealed class MqttClientOptions /// public string AuthenticationMethod { get; set; } - /// - /// Usually the MQTT packets can be send partially. This is done by using multiple TCP packets - /// or WebSocket frames etc. Unfortunately not all brokers (like Amazon Web Services (AWS)) do support this feature and - /// will close the connection when receiving such packets. If such a service is used this flag must - /// be set to _false_. - /// - public bool AllowPacketFragmentation { get; set; } = true; - public IMqttClientChannelOptions ChannelOptions { get; set; } /// @@ -100,6 +100,11 @@ public sealed class MqttClientOptions /// public uint SessionExpiryInterval { get; set; } + /// + /// Gets or sets whether an exception should be thrown when the server has sent a non success ACK packet. + /// + public bool ThrowOnNonSuccessfulConnectResponse { get; set; } = true; + /// /// Gets or sets the timeout which will be applied at socket level and internal operations. /// The default value is the same as for sockets in .NET in general. @@ -222,4 +227,4 @@ public sealed class MqttClientOptions /// public int WriterBufferSizeMax { get; set; } = 65535; } -} +} \ No newline at end of file diff --git a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs index 3f9f3adb3..e6bc39681 100644 --- a/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs +++ b/Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs @@ -78,7 +78,17 @@ public MqttClientOptions Build() return _options; } - + + /// + /// The client will not throw an exception when the MQTT server responses with a non success ACK packet. + /// This will become the default behavior in future versions of the library. + /// + public MqttClientOptionsBuilder WithoutThrowOnNonSuccessfulConnectResponse() + { + _options.ThrowOnNonSuccessfulConnectResponse = false; + return this; + } + public MqttClientOptionsBuilder WithAuthentication(string method, byte[] data) { _options.AuthenticationMethod = method;