From 55532422b06ef9066d628d2f5ac5979d3edf8c60 Mon Sep 17 00:00:00 2001 From: Phillip Hoff Date: Mon, 4 Dec 2023 15:17:18 -0800 Subject: [PATCH 1/4] Sketch no arguments with cancellation. Signed-off-by: Phillip Hoff --- src/Dapr.Actors/Runtime/ActorManager.cs | 4 ++ .../Runtime/ActorRuntimeTests.cs | 53 +++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/src/Dapr.Actors/Runtime/ActorManager.cs b/src/Dapr.Actors/Runtime/ActorManager.cs index b7ee3bf3e..9c77d72e5 100644 --- a/src/Dapr.Actors/Runtime/ActorManager.cs +++ b/src/Dapr.Actors/Runtime/ActorManager.cs @@ -152,6 +152,10 @@ async Task RequestFunc(Actor actor, CancellationToken ct) { awaitable = methodInfo.Invoke(actor, null); } + else if (parameters.Length == 1 && parameters[0].ParameterType == typeof(CancellationToken)) + { + awaitable = methodInfo.Invoke(actor, new object[] { ct }); + } else if (parameters.Length == 1) { // deserialize using stream. diff --git a/test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs b/test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs index 52ae4aa7b..15ea4d2dc 100644 --- a/test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs +++ b/test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs @@ -27,6 +27,7 @@ namespace Dapr.Actors.Test using Xunit; using Dapr.Actors.Client; using System.Reflection; + using System.Threading; public sealed class ActorRuntimeTests { @@ -109,6 +110,58 @@ public async Task NoActivateMessageFromRuntime() Assert.Contains(actorType.Name, runtime.RegisteredActors.Select(a => a.Type.ActorTypeName), StringComparer.InvariantCulture); } + public interface INotRemotedActor : IActor + { + Task NoArgumentsAsync(); + + Task NoArgumentsWithCancellationAsync(CancellationToken cancellationToken = default); + } + + public sealed class NotRemotedActor : Actor, INotRemotedActor + { + public NotRemotedActor(ActorHost host) + : base(host) + { + } + + public Task NoArgumentsAsync() + { + return Task.CompletedTask; + } + + public Task NoArgumentsWithCancellationAsync(CancellationToken cancellationToken = default) + { + return Task.CompletedTask; + } + } + + + [Fact] + public async Task NoRemotingMethodWithNoArguments() + { + var actorType = typeof(NotRemotedActor); + + var options = new ActorRuntimeOptions(); + options.Actors.RegisterActor(); + var runtime = new ActorRuntime(options, loggerFactory, activatorFactory, proxyFactory); + + using var output = new MemoryStream(); + await runtime.DispatchWithoutRemotingAsync(actorType.Name, ActorId.CreateRandom().ToString(), nameof(INotRemotedActor.NoArgumentsAsync), new MemoryStream(), output); + } + + [Fact] + public async Task NoRemotingMethodWithNoArgumentsWithCancellation() + { + var actorType = typeof(NotRemotedActor); + + var options = new ActorRuntimeOptions(); + options.Actors.RegisterActor(); + var runtime = new ActorRuntime(options, loggerFactory, activatorFactory, proxyFactory); + + using var output = new MemoryStream(); + await runtime.DispatchWithoutRemotingAsync(actorType.Name, ActorId.CreateRandom().ToString(), nameof(INotRemotedActor.NoArgumentsWithCancellationAsync), new MemoryStream(), output); + } + [Fact] public async Task Actor_UsesCustomActivator() { From d2db0d182aedfd9c314c2b2ec729b818a51d950c Mon Sep 17 00:00:00 2001 From: Phillip Hoff Date: Mon, 4 Dec 2023 15:35:02 -0800 Subject: [PATCH 2/4] Sketch the other argument permutations. Signed-off-by: Phillip Hoff --- src/Dapr.Actors/Runtime/ActorManager.cs | 12 ++--- .../Runtime/ActorRuntimeTests.cs | 52 +++++++++++++++++++ 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/src/Dapr.Actors/Runtime/ActorManager.cs b/src/Dapr.Actors/Runtime/ActorManager.cs index 9c77d72e5..d766cd485 100644 --- a/src/Dapr.Actors/Runtime/ActorManager.cs +++ b/src/Dapr.Actors/Runtime/ActorManager.cs @@ -148,20 +148,16 @@ async Task RequestFunc(Actor actor, CancellationToken ct) var parameters = methodInfo.GetParameters(); dynamic awaitable; - if (parameters.Length == 0) + if (parameters.Length == 0 || (parameters.Length == 1 && parameters[0].ParameterType == typeof(CancellationToken))) { - awaitable = methodInfo.Invoke(actor, null); + awaitable = methodInfo.Invoke(actor, parameters.Length == 0 ? null : new object[] { ct }); } - else if (parameters.Length == 1 && parameters[0].ParameterType == typeof(CancellationToken)) - { - awaitable = methodInfo.Invoke(actor, new object[] { ct }); - } - else if (parameters.Length == 1) + else if (parameters.Length == 1 || (parameters.Length == 2 && parameters[1].ParameterType == typeof(CancellationToken))) { // deserialize using stream. var type = parameters[0].ParameterType; var deserializedType = await JsonSerializer.DeserializeAsync(requestBodyStream, type, jsonSerializerOptions); - awaitable = methodInfo.Invoke(actor, new object[] { deserializedType }); + awaitable = methodInfo.Invoke(actor, parameters.Length == 1 ? new object[] { deserializedType } : new object[] { deserializedType, ct }); } else { diff --git a/test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs b/test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs index 15ea4d2dc..934d1a305 100644 --- a/test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs +++ b/test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs @@ -115,6 +115,10 @@ public interface INotRemotedActor : IActor Task NoArgumentsAsync(); Task NoArgumentsWithCancellationAsync(CancellationToken cancellationToken = default); + + Task SingleArgumentAsync(bool arg); + + Task SingleArgumentWithCancellationAsync(bool arg, CancellationToken cancellationToken = default); } public sealed class NotRemotedActor : Actor, INotRemotedActor @@ -133,6 +137,16 @@ public Task NoArgumentsWithCancellationAsync(CancellationToken cancellationToken { return Task.CompletedTask; } + + public Task SingleArgumentAsync(bool arg) + { + return Task.CompletedTask; + } + + public Task SingleArgumentWithCancellationAsync(bool arg, CancellationToken cancellationToken = default) + { + return Task.CompletedTask; + } } @@ -162,6 +176,44 @@ public async Task NoRemotingMethodWithNoArgumentsWithCancellation() await runtime.DispatchWithoutRemotingAsync(actorType.Name, ActorId.CreateRandom().ToString(), nameof(INotRemotedActor.NoArgumentsWithCancellationAsync), new MemoryStream(), output); } + [Fact] + public async Task NoRemotingMethodWithSingleArgument() + { + var actorType = typeof(NotRemotedActor); + + var options = new ActorRuntimeOptions(); + options.Actors.RegisterActor(); + var runtime = new ActorRuntime(options, loggerFactory, activatorFactory, proxyFactory); + + using var input = new MemoryStream(); + + JsonSerializer.Serialize(input, true); + + input.Seek(0, SeekOrigin.Begin); + + using var output = new MemoryStream(); + await runtime.DispatchWithoutRemotingAsync(actorType.Name, ActorId.CreateRandom().ToString(), nameof(INotRemotedActor.SingleArgumentAsync), input, output); + } + + [Fact] + public async Task NoRemotingMethodWithSingleArgumentWithCancellation() + { + var actorType = typeof(NotRemotedActor); + + var options = new ActorRuntimeOptions(); + options.Actors.RegisterActor(); + var runtime = new ActorRuntime(options, loggerFactory, activatorFactory, proxyFactory); + + using var input = new MemoryStream(); + + JsonSerializer.Serialize(input, true); + + input.Seek(0, SeekOrigin.Begin); + + using var output = new MemoryStream(); + await runtime.DispatchWithoutRemotingAsync(actorType.Name, ActorId.CreateRandom().ToString(), nameof(INotRemotedActor.SingleArgumentWithCancellationAsync), input, output); + } + [Fact] public async Task Actor_UsesCustomActivator() { From ebe2e5e3aea0107308e6ba1a139939c5af7a3283 Mon Sep 17 00:00:00 2001 From: Phillip Hoff Date: Mon, 4 Dec 2023 16:23:33 -0800 Subject: [PATCH 3/4] Refactor tests. Signed-off-by: Phillip Hoff --- .../Runtime/ActorRuntimeTests.cs | 105 +++++++++--------- 1 file changed, 53 insertions(+), 52 deletions(-) diff --git a/test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs b/test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs index 934d1a305..c74d0b754 100644 --- a/test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs +++ b/test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs @@ -112,13 +112,13 @@ public async Task NoActivateMessageFromRuntime() public interface INotRemotedActor : IActor { - Task NoArgumentsAsync(); + Task NoArgumentsAsync(); - Task NoArgumentsWithCancellationAsync(CancellationToken cancellationToken = default); + Task NoArgumentsWithCancellationAsync(CancellationToken cancellationToken = default); - Task SingleArgumentAsync(bool arg); + Task SingleArgumentAsync(bool arg); - Task SingleArgumentWithCancellationAsync(bool arg, CancellationToken cancellationToken = default); + Task SingleArgumentWithCancellationAsync(bool arg, CancellationToken cancellationToken = default); } public sealed class NotRemotedActor : Actor, INotRemotedActor @@ -128,90 +128,91 @@ public NotRemotedActor(ActorHost host) { } - public Task NoArgumentsAsync() + public Task NoArgumentsAsync() { - return Task.CompletedTask; + return Task.FromResult(nameof(NoArgumentsAsync)); } - public Task NoArgumentsWithCancellationAsync(CancellationToken cancellationToken = default) + public Task NoArgumentsWithCancellationAsync(CancellationToken cancellationToken = default) { - return Task.CompletedTask; + return Task.FromResult(nameof(NoArgumentsWithCancellationAsync)); } - public Task SingleArgumentAsync(bool arg) + public Task SingleArgumentAsync(bool arg) { - return Task.CompletedTask; + return Task.FromResult(nameof(SingleArgumentAsync)); } - public Task SingleArgumentWithCancellationAsync(bool arg, CancellationToken cancellationToken = default) + public Task SingleArgumentWithCancellationAsync(bool arg, CancellationToken cancellationToken = default) { - return Task.CompletedTask; + return Task.FromResult(nameof(SingleArgumentWithCancellationAsync)); } } - - [Fact] - public async Task NoRemotingMethodWithNoArguments() + public async Task InvokeMethod(string methodName, object arg = null) where T : Actor { - var actorType = typeof(NotRemotedActor); - var options = new ActorRuntimeOptions(); - options.Actors.RegisterActor(); + + options.Actors.RegisterActor(); + var runtime = new ActorRuntime(options, loggerFactory, activatorFactory, proxyFactory); - using var output = new MemoryStream(); - await runtime.DispatchWithoutRemotingAsync(actorType.Name, ActorId.CreateRandom().ToString(), nameof(INotRemotedActor.NoArgumentsAsync), new MemoryStream(), output); - } + using var input = new MemoryStream(); - [Fact] - public async Task NoRemotingMethodWithNoArgumentsWithCancellation() - { - var actorType = typeof(NotRemotedActor); + if (arg is not null) + { + JsonSerializer.Serialize(input, arg); - var options = new ActorRuntimeOptions(); - options.Actors.RegisterActor(); - var runtime = new ActorRuntime(options, loggerFactory, activatorFactory, proxyFactory); + input.Seek(0, SeekOrigin.Begin); + } using var output = new MemoryStream(); - await runtime.DispatchWithoutRemotingAsync(actorType.Name, ActorId.CreateRandom().ToString(), nameof(INotRemotedActor.NoArgumentsWithCancellationAsync), new MemoryStream(), output); + + await runtime.DispatchWithoutRemotingAsync(typeof(T).Name, ActorId.CreateRandom().ToString(), methodName, input, output); + + output.Seek(0, SeekOrigin.Begin); + + return JsonSerializer.Deserialize(output); } [Fact] - public async Task NoRemotingMethodWithSingleArgument() + public async Task NoRemotingMethodWithNoArguments() { - var actorType = typeof(NotRemotedActor); + string methodName = nameof(INotRemotedActor.NoArgumentsAsync); + + string result = await InvokeMethod(methodName); - var options = new ActorRuntimeOptions(); - options.Actors.RegisterActor(); - var runtime = new ActorRuntime(options, loggerFactory, activatorFactory, proxyFactory); + Assert.Equal(methodName, result); + } - using var input = new MemoryStream(); + [Fact] + public async Task NoRemotingMethodWithNoArgumentsWithCancellation() + { + string methodName = nameof(INotRemotedActor.NoArgumentsWithCancellationAsync); + + string result = await InvokeMethod(methodName); - JsonSerializer.Serialize(input, true); + Assert.Equal(methodName, result); + } - input.Seek(0, SeekOrigin.Begin); + [Fact] + public async Task NoRemotingMethodWithSingleArgument() + { + string methodName = nameof(INotRemotedActor.SingleArgumentAsync); + + string result = await InvokeMethod(methodName, true); - using var output = new MemoryStream(); - await runtime.DispatchWithoutRemotingAsync(actorType.Name, ActorId.CreateRandom().ToString(), nameof(INotRemotedActor.SingleArgumentAsync), input, output); + Assert.Equal(methodName, result); } [Fact] public async Task NoRemotingMethodWithSingleArgumentWithCancellation() { - var actorType = typeof(NotRemotedActor); - - var options = new ActorRuntimeOptions(); - options.Actors.RegisterActor(); - var runtime = new ActorRuntime(options, loggerFactory, activatorFactory, proxyFactory); - - using var input = new MemoryStream(); - - JsonSerializer.Serialize(input, true); - - input.Seek(0, SeekOrigin.Begin); + string methodName = nameof(INotRemotedActor.SingleArgumentWithCancellationAsync); + + string result = await InvokeMethod(methodName, true); - using var output = new MemoryStream(); - await runtime.DispatchWithoutRemotingAsync(actorType.Name, ActorId.CreateRandom().ToString(), nameof(INotRemotedActor.SingleArgumentWithCancellationAsync), input, output); + Assert.Equal(methodName, result); } [Fact] From c12de4e092d0b3fa5350f890b592978170adc0d1 Mon Sep 17 00:00:00 2001 From: Phillip Hoff Date: Mon, 4 Dec 2023 16:31:06 -0800 Subject: [PATCH 4/4] Push HTTP request cancellation token down into handlers. Signed-off-by: Phillip Hoff --- .../ActorsEndpointRouteBuilderExtensions.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Dapr.Actors.AspNetCore/ActorsEndpointRouteBuilderExtensions.cs b/src/Dapr.Actors.AspNetCore/ActorsEndpointRouteBuilderExtensions.cs index 55d161d9a..574a172a8 100644 --- a/src/Dapr.Actors.AspNetCore/ActorsEndpointRouteBuilderExtensions.cs +++ b/src/Dapr.Actors.AspNetCore/ActorsEndpointRouteBuilderExtensions.cs @@ -103,7 +103,7 @@ private static IEndpointConventionBuilder MapActorMethodEndpoint(this IEndpointR try { - var (header, body) = await runtime.DispatchWithRemotingAsync(actorTypeName, actorId, methodName, daprActorheader, context.Request.Body); + var (header, body) = await runtime.DispatchWithRemotingAsync(actorTypeName, actorId, methodName, daprActorheader, context.Request.Body, context.RequestAborted); // Item 1 is header , Item 2 is body if (header != string.Empty) @@ -112,14 +112,14 @@ private static IEndpointConventionBuilder MapActorMethodEndpoint(this IEndpointR context.Response.Headers[Constants.ErrorResponseHeaderName] = header; // add error header } - await context.Response.Body.WriteAsync(body, 0, body.Length); // add response message body + await context.Response.Body.WriteAsync(body, 0, body.Length, context.RequestAborted); // add response message body } catch (Exception ex) { var (header, body) = CreateExceptionResponseMessage(ex); context.Response.Headers[Constants.ErrorResponseHeaderName] = header; - await context.Response.Body.WriteAsync(body, 0, body.Length); + await context.Response.Body.WriteAsync(body, 0, body.Length, context.RequestAborted); } finally { @@ -130,7 +130,7 @@ private static IEndpointConventionBuilder MapActorMethodEndpoint(this IEndpointR { try { - await runtime.DispatchWithoutRemotingAsync(actorTypeName, actorId, methodName, context.Request.Body, context.Response.Body); + await runtime.DispatchWithoutRemotingAsync(actorTypeName, actorId, methodName, context.Request.Body, context.Response.Body, context.RequestAborted); } finally {