diff --git a/src/Asv.Common.Shell/Commands/TcpTest.cs b/src/Asv.Common.Shell/Commands/TcpTest.cs index 550d317..f46fe75 100644 --- a/src/Asv.Common.Shell/Commands/TcpTest.cs +++ b/src/Asv.Common.Shell/Commands/TcpTest.cs @@ -26,10 +26,10 @@ public void Benchmark() /// [Command("tcp-test")] public async Task Run( - string server = "tcps://127.0.0.1:7341", - string client = "tcp://127.0.0.1:7341" - /*string server = "serial:COM11?br=57600", - string client = "serial:COM44?br=57600"*/ + /*string server = "tcps://127.0.0.1:7341", + string client = "tcp://127.0.0.1:7341"*/ + string server = "serial:COM11?br=57600", + string client = "serial:COM45?br=57600" ) { var loggerFactory = ConsoleAppHelper.CreateDefaultLog(); diff --git a/src/Asv.IO/Protocol/Connection/IProtocolConnection.cs b/src/Asv.IO/Protocol/Connection/IProtocolConnection.cs index 5c2d306..d12934c 100644 --- a/src/Asv.IO/Protocol/Connection/IProtocolConnection.cs +++ b/src/Asv.IO/Protocol/Connection/IProtocolConnection.cs @@ -1,7 +1,11 @@ using System; using System.Threading; using System.Threading.Tasks; +using Asv.Common; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using R3; +using ZLogger; namespace Asv.IO; @@ -17,7 +21,7 @@ public interface IProtocolConnection:ISupportTag,ISupportStatistic, IDisposable, public static class ProtocolConnectionHelper { public static Observable RxFilter(this IProtocolConnection connection) - where TMessage: IProtocolMessage, new() + where TMessage : IProtocolMessage, new() { var messageId = new TMessage().Id; return connection.OnRxMessage.Where(messageId, (raw, id) => @@ -26,13 +30,15 @@ public static Observable RxFilter(this IProtocol { return message.Id != null && message.Id.Equals(id); } + return false; - }).Cast(); + }).Cast(); } - - public static Observable RxFilter(this IProtocolConnection connection, Func filter) - where TMessage: IProtocolMessage, new() + + public static Observable RxFilter(this IProtocolConnection connection, + Func filter) + where TMessage : IProtocolMessage, new() { var messageId = new TMessage().Id; return connection.OnRxMessage.Where(messageId, (raw, id) => @@ -41,16 +47,18 @@ public static Observable RxFilter(this IProtocol { return message.Id != null && message.Id.Equals(id) && filter(message); } + return false; - }).Cast(); + }).Cast(); } - public static async Task SendAndWaitAnswer(this IProtocolConnection connection, - IProtocolMessage send, - FilterDelegate filterAndGetResult, + public static async Task SendAndWaitAnswer( + this IProtocolConnection connection, + IProtocolMessage request, + FilterDelegate filterAndGetResult, CancellationToken cancel = default) - where TMessage: IProtocolMessage, new() + where TMessage : IProtocolMessage, new() { cancel.ThrowIfCancellationRequested(); var tcs = new TaskCompletionSource(); @@ -62,12 +70,76 @@ public static async Task SendAndWaitAnswer tcs.TrySetResult(result); } }); - await connection.Send(send, cancel); + await connection.Send(request, cancel).ConfigureAwait(false); return await tcs.Task.ConfigureAwait(false); } - - + + public static async Task SendAndWaitAnswer( + this IProtocolConnection connection, + IProtocolMessage request, + FilterDelegate filterAndGetResult, + TimeSpan timeout, + CancellationToken cancel = default, + TimeProvider? timeProvider = null) + where TMessage : IProtocolMessage, new() + { + timeProvider ??= TimeProvider.System; + using var linkedCancel = CancellationTokenSource.CreateLinkedTokenSource(cancel); + linkedCancel.CancelAfter(timeout, timeProvider); + return await connection.SendAndWaitAnswer(request, filterAndGetResult, cancel); + } + + public static async Task SendAndWaitAnswer( + this IProtocolConnection connection, + TRequestMessage request, + FilterDelegate filterAndGetResult, + TimeSpan timeout, + int maxAttemptCount, + ResendMessageModifyDelegate? modifyRequestOnResend = null, + CancellationToken cancel = default, + TimeProvider? timeProvider = null, + IProgress? progress = null) + where TResultMessage : IProtocolMessage, new() + where TRequestMessage : IProtocolMessage + { + cancel.ThrowIfCancellationRequested(); + TResult? result = default; + byte currentAttempt = 0; + progress ??= new Progress(); + while (IsRetryCondition()) + { + progress.Report(currentAttempt); + if (currentAttempt != 0) + { + modifyRequestOnResend?.Invoke(request, currentAttempt); + } + + ++currentAttempt; + try + { + result = await connection.SendAndWaitAnswer(request, filterAndGetResult, timeout, cancel, timeProvider) + .ConfigureAwait(false); + break; + } + catch (OperationCanceledException) + { + if (IsRetryCondition()) + { + continue; + } + + cancel.ThrowIfCancellationRequested(); + } + } + if (result != null) return result; + throw new TimeoutException($"Timeout to execute '{request}' with {maxAttemptCount} x {timeout}'"); + + bool IsRetryCondition() => currentAttempt < maxAttemptCount; + } } public delegate bool FilterDelegate(TMessage input, out TResult result) where TMessage: IProtocolMessage; + +public delegate bool ResendMessageModifyDelegate(TMessage input, int attempt) + where TMessage: IProtocolMessage; \ No newline at end of file diff --git a/src/Asv.IO/Protocol/Parser/Exceptions/ProtocolDeserializeMessageException.cs b/src/Asv.IO/Protocol/Parser/Exceptions/ProtocolDeserializeMessageException.cs index 5c64eb5..a18a9c3 100644 --- a/src/Asv.IO/Protocol/Parser/Exceptions/ProtocolDeserializeMessageException.cs +++ b/src/Asv.IO/Protocol/Parser/Exceptions/ProtocolDeserializeMessageException.cs @@ -5,7 +5,7 @@ namespace Asv.IO; public class ProtocolDeserializeMessageException : ProtocolParserException { public ProtocolDeserializeMessageException(ProtocolInfo parser, IProtocolMessage message, Exception ex) - : base(parser, $"Deserialization {parser}.{message.Name} message error:{ex.Message}",ex) + : base(parser, $"Deserialization {parser}.{message.Name} message error.",ex) { }