Skip to content

Commit

Permalink
feat(io): add SendAndWaitAnswer helper for connection
Browse files Browse the repository at this point in the history
  • Loading branch information
asvol committed Nov 27, 2024
1 parent 51aa5f6 commit 97d23bc
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 18 deletions.
8 changes: 4 additions & 4 deletions src/Asv.Common.Shell/Commands/TcpTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ public void Benchmark()
/// </summary>
[Command("tcp-test")]
public async Task<int> Run(
string server = "tcps://127.0.0.1:7341",
string client = "tcp://127.0.0.1:7341"
/*string server = "serial:COM11?br=57600",
string client = "serial:COM44?br=57600"*/
/*string server = "tcps://127.0.0.1:7341",
string client = "tcp://127.0.0.1:7341"*/
string server = "serial:COM11?br=57600",
string client = "serial:COM45?br=57600"
)
{
var loggerFactory = ConsoleAppHelper.CreateDefaultLog();
Expand Down
98 changes: 85 additions & 13 deletions src/Asv.IO/Protocol/Connection/IProtocolConnection.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Asv.Common;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using R3;
using ZLogger;

namespace Asv.IO;

Expand All @@ -17,7 +21,7 @@ public interface IProtocolConnection:ISupportTag,ISupportStatistic, IDisposable,
public static class ProtocolConnectionHelper
{
public static Observable<TMessage> RxFilter<TMessage, TMessageId>(this IProtocolConnection connection)
where TMessage: IProtocolMessage<TMessageId>, new()
where TMessage : IProtocolMessage<TMessageId>, new()
{
var messageId = new TMessage().Id;
return connection.OnRxMessage.Where(messageId, (raw, id) =>
Expand All @@ -26,13 +30,15 @@ public static Observable<TMessage> RxFilter<TMessage, TMessageId>(this IProtocol
{
return message.Id != null && message.Id.Equals(id);
}

return false;

}).Cast<IProtocolMessage,TMessage>();
}).Cast<IProtocolMessage, TMessage>();
}

public static Observable<TMessage> RxFilter<TMessage, TMessageId>(this IProtocolConnection connection, Func<TMessage, bool> filter)
where TMessage: IProtocolMessage<TMessageId>, new()

public static Observable<TMessage> RxFilter<TMessage, TMessageId>(this IProtocolConnection connection,
Func<TMessage, bool> filter)
where TMessage : IProtocolMessage<TMessageId>, new()
{
var messageId = new TMessage().Id;
return connection.OnRxMessage.Where(messageId, (raw, id) =>
Expand All @@ -41,16 +47,18 @@ public static Observable<TMessage> RxFilter<TMessage, TMessageId>(this IProtocol
{
return message.Id != null && message.Id.Equals(id) && filter(message);
}

return false;

}).Cast<IProtocolMessage,TMessage>();
}).Cast<IProtocolMessage, TMessage>();
}

public static async Task<TResult> SendAndWaitAnswer<TResult,TMessage,TMessageId>(this IProtocolConnection connection,
IProtocolMessage send,
FilterDelegate<TResult, TMessage,TMessageId> filterAndGetResult,
public static async Task<TResult> SendAndWaitAnswer<TResult, TMessage, TMessageId>(
this IProtocolConnection connection,
IProtocolMessage request,
FilterDelegate<TResult, TMessage, TMessageId> filterAndGetResult,
CancellationToken cancel = default)
where TMessage: IProtocolMessage<TMessageId>, new()
where TMessage : IProtocolMessage<TMessageId>, new()
{
cancel.ThrowIfCancellationRequested();
var tcs = new TaskCompletionSource<TResult>();
Expand All @@ -62,12 +70,76 @@ public static async Task<TResult> SendAndWaitAnswer<TResult,TMessage,TMessageId>
tcs.TrySetResult(result);
}
});
await connection.Send(send, cancel);
await connection.Send(request, cancel).ConfigureAwait(false);
return await tcs.Task.ConfigureAwait(false);
}



public static async Task<TResult> SendAndWaitAnswer<TResult, TMessage, TMessageId>(
this IProtocolConnection connection,
IProtocolMessage request,
FilterDelegate<TResult, TMessage, TMessageId> filterAndGetResult,
TimeSpan timeout,
CancellationToken cancel = default,
TimeProvider? timeProvider = null)
where TMessage : IProtocolMessage<TMessageId>, new()
{
timeProvider ??= TimeProvider.System;
using var linkedCancel = CancellationTokenSource.CreateLinkedTokenSource(cancel);
linkedCancel.CancelAfter(timeout, timeProvider);
return await connection.SendAndWaitAnswer(request, filterAndGetResult, cancel);
}

public static async Task<TResult> SendAndWaitAnswer<TResult, TRequestMessage, TResultMessage, TMessageId>(
this IProtocolConnection connection,
TRequestMessage request,
FilterDelegate<TResult, TResultMessage, TMessageId> filterAndGetResult,
TimeSpan timeout,
int maxAttemptCount,
ResendMessageModifyDelegate<TRequestMessage, TMessageId>? modifyRequestOnResend = null,
CancellationToken cancel = default,
TimeProvider? timeProvider = null,
IProgress<int>? progress = null)
where TResultMessage : IProtocolMessage<TMessageId>, new()
where TRequestMessage : IProtocolMessage<TMessageId>
{
cancel.ThrowIfCancellationRequested();
TResult? result = default;
byte currentAttempt = 0;
progress ??= new Progress<int>();
while (IsRetryCondition())
{
progress.Report(currentAttempt);
if (currentAttempt != 0)
{
modifyRequestOnResend?.Invoke(request, currentAttempt);
}

++currentAttempt;
try
{
result = await connection.SendAndWaitAnswer(request, filterAndGetResult, timeout, cancel, timeProvider)
.ConfigureAwait(false);
break;
}
catch (OperationCanceledException)
{
if (IsRetryCondition())
{
continue;
}

cancel.ThrowIfCancellationRequested();
}
}
if (result != null) return result;
throw new TimeoutException($"Timeout to execute '{request}' with {maxAttemptCount} x {timeout}'");

bool IsRetryCondition() => currentAttempt < maxAttemptCount;
}
}

public delegate bool FilterDelegate<TResult, in TMessage,TMessageId>(TMessage input, out TResult result)
where TMessage: IProtocolMessage<TMessageId>;

public delegate bool ResendMessageModifyDelegate<in TMessage,TMessageId>(TMessage input, int attempt)
where TMessage: IProtocolMessage<TMessageId>;
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Asv.IO;
public class ProtocolDeserializeMessageException : ProtocolParserException
{
public ProtocolDeserializeMessageException(ProtocolInfo parser, IProtocolMessage message, Exception ex)
: base(parser, $"Deserialization {parser}.{message.Name} message error:{ex.Message}",ex)
: base(parser, $"Deserialization {parser}.{message.Name} message error.",ex)
{

}
Expand Down

0 comments on commit 97d23bc

Please sign in to comment.