Skip to content

Commit

Permalink
Merge branch 'master' into actor-state-ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
halspang authored Jan 8, 2024
2 parents d9d66b6 + 7616bfa commit a1b0b00
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ namespace MyActorClient

Console.WriteLine($"Calling GetDataAsync on {actorType}:{actorId}...");
var savedData = await proxy.GetDataAsync();
Console.WriteLine($"Got response: {response}");
Console.WriteLine($"Got response: {savedData}");
}
}
}
Expand Down Expand Up @@ -458,7 +458,7 @@ The projects that you've created can now to test the sample.
Calling SetDataAsync on MyActor:1...
Got response: Success
Calling GetDataAsync on MyActor:1...
Got response: Success
Got response: PropertyA: ValueA, PropertyB: ValueB
```
> 💡 This sample relies on a few assumptions. The default listening port for an ASP.NET Core web project is 5000, which is being passed to `dapr run` as `--app-port 5000`. The default HTTP port for the Dapr sidecar is 3500. We're telling the sidecar for `MyActorService` to use 3500 so that `MyActorClient` can rely on the default value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Console.WriteLine("Published deposit event!");
```

- For a full list of state operations visit [How-To: Publish & subscribe]({{< ref howto-publish-subscribe.md >}}).
- Visit [.NET SDK examples](https://github.com/dapr/dotnet-sdk/tree/master/examples/client/PublishSubscribe) for code samples and instructions to try out pub/sub
- Visit [.NET SDK examples](https://github.com/dapr/dotnet-sdk/tree/master/examples/Client/PublishSubscribe) for code samples and instructions to try out pub/sub

### Interact with output bindings

Expand Down
37 changes: 17 additions & 20 deletions examples/Workflow/WorkflowConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
using WorkflowConsoleApp.Models;
using WorkflowConsoleApp.Workflows;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;

const string StoreName = "statestore";
const string DaprWorkflowComponent = "dapr";

// The workflow host is a background service that connects to the sidecar over gRPC
var builder = Host.CreateDefaultBuilder(args).ConfigureServices(services =>
Expand Down Expand Up @@ -124,48 +124,46 @@
amount = 1;
}

var daprWorkflowClient = host.Services.GetRequiredService<DaprWorkflowClient>();

// Construct the order with a unique order ID
string orderId = $"{itemName.ToLowerInvariant()}-{Guid.NewGuid().ToString()[..8]}";
double totalCost = amount * item.PerItemCost;
var orderInfo = new OrderPayload(itemName.ToLowerInvariant(), totalCost, amount);

// Start the workflow using the order ID as the workflow ID
Console.WriteLine($"Starting order workflow '{orderId}' purchasing {amount} {itemName}");
await daprClient.StartWorkflowAsync(
workflowComponent: DaprWorkflowComponent,
workflowName: nameof(OrderProcessingWorkflow),
await daprWorkflowClient.ScheduleNewWorkflowAsync(
name: nameof(OrderProcessingWorkflow),
input: orderInfo,
instanceId: orderId);

// Wait for the workflow to start and confirm the input
GetWorkflowResponse state = await daprClient.WaitForWorkflowStartAsync(
instanceId: orderId,
workflowComponent: DaprWorkflowComponent);
WorkflowState state = await daprWorkflowClient.WaitForWorkflowStartAsync(
instanceId: orderId);

Console.WriteLine($"{state.WorkflowName} (ID = {orderId}) started successfully with {state.ReadInputAs<OrderPayload>()}");
Console.WriteLine($"{nameof(OrderProcessingWorkflow)} (ID = {orderId}) started successfully with {state.ReadInputAs<OrderPayload>()}");

// Wait for the workflow to complete
while (true)
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
state = await daprClient.WaitForWorkflowCompletionAsync(
state = await daprWorkflowClient.WaitForWorkflowCompletionAsync(
instanceId: orderId,
workflowComponent: DaprWorkflowComponent,
cancellationToken: cts.Token);
cancellation: cts.Token);
break;
}
catch (OperationCanceledException)
{
// Check to see if the workflow is blocked waiting for an approval
state = await daprClient.GetWorkflowAsync(
instanceId: orderId,
workflowComponent: DaprWorkflowComponent);
if (state.Properties.TryGetValue("dapr.workflow.custom_status", out string customStatus) &&
customStatus.Contains("Waiting for approval"))
state = await daprWorkflowClient.GetWorkflowStateAsync(
instanceId: orderId);

if(state.ReadCustomStatusAs<string>()?.Contains("Waiting for approval") == true)
{
Console.WriteLine($"{state.WorkflowName} (ID = {orderId}) requires approval. Approve? [Y/N]");
Console.WriteLine($"{nameof(OrderProcessingWorkflow)} (ID = {orderId}) requires approval. Approve? [Y/N]");
string approval = Console.ReadLine();
ApprovalResult approvalResult = ApprovalResult.Unspecified;
if (string.Equals(approval, "Y", StringComparison.OrdinalIgnoreCase))
Expand All @@ -182,11 +180,10 @@ await daprClient.StartWorkflowAsync(
if (approvalResult != ApprovalResult.Unspecified)
{
// Raise the workflow event to the workflow
await daprClient.RaiseWorkflowEventAsync(
await daprWorkflowClient.RaiseEventAsync(
instanceId: orderId,
workflowComponent: DaprWorkflowComponent,
eventName: "ManagerApproval",
eventData: approvalResult);
eventPayload: approvalResult);
}

// otherwise, keep waiting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private static IEndpointConventionBuilder MapActorMethodEndpoint(this IEndpointR

try
{
var (header, body) = await runtime.DispatchWithRemotingAsync(actorTypeName, actorId, methodName, daprActorheader, context.Request.Body);
var (header, body) = await runtime.DispatchWithRemotingAsync(actorTypeName, actorId, methodName, daprActorheader, context.Request.Body, context.RequestAborted);

// Item 1 is header , Item 2 is body
if (header != string.Empty)
Expand All @@ -112,14 +112,14 @@ private static IEndpointConventionBuilder MapActorMethodEndpoint(this IEndpointR
context.Response.Headers[Constants.ErrorResponseHeaderName] = header; // add error header
}

await context.Response.Body.WriteAsync(body, 0, body.Length); // add response message body
await context.Response.Body.WriteAsync(body, 0, body.Length, context.RequestAborted); // add response message body
}
catch (Exception ex)
{
var (header, body) = CreateExceptionResponseMessage(ex);

context.Response.Headers[Constants.ErrorResponseHeaderName] = header;
await context.Response.Body.WriteAsync(body, 0, body.Length);
await context.Response.Body.WriteAsync(body, 0, body.Length, context.RequestAborted);
}
finally
{
Expand All @@ -130,7 +130,7 @@ private static IEndpointConventionBuilder MapActorMethodEndpoint(this IEndpointR
{
try
{
await runtime.DispatchWithoutRemotingAsync(actorTypeName, actorId, methodName, context.Request.Body, context.Response.Body);
await runtime.DispatchWithoutRemotingAsync(actorTypeName, actorId, methodName, context.Request.Body, context.Response.Body, context.RequestAborted);
}
finally
{
Expand Down
8 changes: 4 additions & 4 deletions src/Dapr.Actors/Runtime/ActorManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,16 @@ async Task<object> RequestFunc(Actor actor, CancellationToken ct)
var parameters = methodInfo.GetParameters();
dynamic awaitable;

if (parameters.Length == 0)
if (parameters.Length == 0 || (parameters.Length == 1 && parameters[0].ParameterType == typeof(CancellationToken)))
{
awaitable = methodInfo.Invoke(actor, null);
awaitable = methodInfo.Invoke(actor, parameters.Length == 0 ? null : new object[] { ct });
}
else if (parameters.Length == 1)
else if (parameters.Length == 1 || (parameters.Length == 2 && parameters[1].ParameterType == typeof(CancellationToken)))
{
// deserialize using stream.
var type = parameters[0].ParameterType;
var deserializedType = await JsonSerializer.DeserializeAsync(requestBodyStream, type, jsonSerializerOptions);
awaitable = methodInfo.Invoke(actor, new object[] { deserializedType });
awaitable = methodInfo.Invoke(actor, parameters.Length == 1 ? new object[] { deserializedType } : new object[] { deserializedType, ct });
}
else
{
Expand Down
106 changes: 106 additions & 0 deletions test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace Dapr.Actors.Test
using Xunit;
using Dapr.Actors.Client;
using System.Reflection;
using System.Threading;

public sealed class ActorRuntimeTests
{
Expand Down Expand Up @@ -109,6 +110,111 @@ public async Task NoActivateMessageFromRuntime()
Assert.Contains(actorType.Name, runtime.RegisteredActors.Select(a => a.Type.ActorTypeName), StringComparer.InvariantCulture);
}

public interface INotRemotedActor : IActor
{
Task<string> NoArgumentsAsync();

Task<string> NoArgumentsWithCancellationAsync(CancellationToken cancellationToken = default);

Task<string> SingleArgumentAsync(bool arg);

Task<string> SingleArgumentWithCancellationAsync(bool arg, CancellationToken cancellationToken = default);
}

public sealed class NotRemotedActor : Actor, INotRemotedActor
{
public NotRemotedActor(ActorHost host)
: base(host)
{
}

public Task<string> NoArgumentsAsync()
{
return Task.FromResult(nameof(NoArgumentsAsync));
}

public Task<string> NoArgumentsWithCancellationAsync(CancellationToken cancellationToken = default)
{
return Task.FromResult(nameof(NoArgumentsWithCancellationAsync));
}

public Task<string> SingleArgumentAsync(bool arg)
{
return Task.FromResult(nameof(SingleArgumentAsync));
}

public Task<string> SingleArgumentWithCancellationAsync(bool arg, CancellationToken cancellationToken = default)
{
return Task.FromResult(nameof(SingleArgumentWithCancellationAsync));
}
}

public async Task<string> InvokeMethod<T>(string methodName, object arg = null) where T : Actor
{
var options = new ActorRuntimeOptions();

options.Actors.RegisterActor<T>();

var runtime = new ActorRuntime(options, loggerFactory, activatorFactory, proxyFactory);

using var input = new MemoryStream();

if (arg is not null)
{
JsonSerializer.Serialize(input, arg);

input.Seek(0, SeekOrigin.Begin);
}

using var output = new MemoryStream();

await runtime.DispatchWithoutRemotingAsync(typeof(T).Name, ActorId.CreateRandom().ToString(), methodName, input, output);

output.Seek(0, SeekOrigin.Begin);

return JsonSerializer.Deserialize<string>(output);
}

[Fact]
public async Task NoRemotingMethodWithNoArguments()
{
string methodName = nameof(INotRemotedActor.NoArgumentsAsync);

string result = await InvokeMethod<NotRemotedActor>(methodName);

Assert.Equal(methodName, result);
}

[Fact]
public async Task NoRemotingMethodWithNoArgumentsWithCancellation()
{
string methodName = nameof(INotRemotedActor.NoArgumentsWithCancellationAsync);

string result = await InvokeMethod<NotRemotedActor>(methodName);

Assert.Equal(methodName, result);
}

[Fact]
public async Task NoRemotingMethodWithSingleArgument()
{
string methodName = nameof(INotRemotedActor.SingleArgumentAsync);

string result = await InvokeMethod<NotRemotedActor>(methodName, true);

Assert.Equal(methodName, result);
}

[Fact]
public async Task NoRemotingMethodWithSingleArgumentWithCancellation()
{
string methodName = nameof(INotRemotedActor.SingleArgumentWithCancellationAsync);

string result = await InvokeMethod<NotRemotedActor>(methodName, true);

Assert.Equal(methodName, result);
}

[Fact]
public async Task Actor_UsesCustomActivator()
{
Expand Down

0 comments on commit a1b0b00

Please sign in to comment.