Skip to content

Commit

Permalink
Adds workflow replay-safe logger (dapr#1434)
Browse files Browse the repository at this point in the history
* Removed obsolete type

Signed-off-by: Whit Waldo <[email protected]>

* Added missing using

Signed-off-by: Whit Waldo <[email protected]>

* Adding interface for IWorkflowContext for replayability concerns

Signed-off-by: Whit Waldo <[email protected]>

* Removed unused IConfiguration

Signed-off-by: Whit Waldo <[email protected]>

* Added ReplaySafeLogger type

Signed-off-by: Whit Waldo <[email protected]>

* Building out functionality to expose ReplayLogger in workflow context

Signed-off-by: Whit Waldo <[email protected]>

* Added license information to file

Signed-off-by: Whit Waldo <[email protected]>

* Removed unnecessary file

Signed-off-by: Whit Waldo <[email protected]>

* Updated copyright header for different project, made some tweaks for nullability errors

Signed-off-by: Whit Waldo <[email protected]>

* Added virtual methods that use the already-available ILoggerFactory to create the ReplaySafeLogger on the WorkflowContext

Signed-off-by: Whit Waldo <[email protected]>

* Removed unnecessary registration

Signed-off-by: Whit Waldo <[email protected]>

* Updated example to demonstrate using ReplaySafeLogger in the orchestration context

Signed-off-by: Whit Waldo <[email protected]>

* Tweaks on visibility and abstraction so that the methods are available in the context made visible to workflow developers

Signed-off-by: Whit Waldo <[email protected]>

* Removed obsolete type registrations

Signed-off-by: Whit Waldo <[email protected]>

* Simplified argument null check

Signed-off-by: Whit Waldo <[email protected]>

* Removed since-removed code leftover from merge

Signed-off-by: Whit Waldo <[email protected]>

* Added documentation demonstrating how to access the replay-safe logger

Signed-off-by: Whit Waldo <[email protected]>

* Removed unnecessary and separate ReplaySafeLogger in favor of method to create it off the TaskOrchestrationContext (innerContext)

Signed-off-by: Whit Waldo <[email protected]>

---------

Signed-off-by: Whit Waldo <[email protected]>
  • Loading branch information
WhitWaldo authored and jev-e committed Dec 26, 2024
1 parent 38532ea commit 6bde7db
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,64 @@ builder.Services.AddDaprWorkflow(options => {

var app = builder.Build();
await app.RunAsync();
```
```

## Injecting Services into Workflow Activities
Workflow activities support the same dependency injection that developers have come to expect of modern C# applications. Assuming a proper
registration at startup, any such type can be injected into the constructor of the workflow activity and available to utilize during
the execution of the workflow. This makes it simple to add logging via an injected `ILogger` or access to other Dapr
building blocks by injecting `DaprClient` or `DaprJobsClient`, for example.

```csharp
internal sealed class SquareNumberActivity : WorkflowActivity<int, int>
{
private readonly ILogger _logger;

public MyActivity(ILogger logger)
{
this._logger = logger;
}

public override Task<int> RunAsync(WorkflowActivityContext context, int input)
{
this._logger.LogInformation("Squaring the value {number}", input);
var result = input * input;
this._logger.LogInformation("Got a result of {squareResult}", result);

return Task.FromResult(result);
}
}
```

### Using ILogger in Workflow
Because workflows must be deterministic, it is not possible to inject arbitrary services into them. For example,
if you were able to inject a standard `ILogger` into a workflow and it needed to be replayed because of an error,
subsequent replay from the event source log would result in the log recording additional operations that didn't actually
take place a second or third time because their results were sourced from the log. This has the potential to introduce
a significant amount of confusion. Rather, a replay-safe logger is made available for use within workflows. It will only
log events the first time the workflow runs and will not log anything whenever the workflow is being replaced.

This logger can be retrieved from a method present on the `WorkflowContext` available on your workflow instance and
otherwise used precisely as you might otherwise use an `ILogger` instance.

An end-to-end sample demonstrating this can be seen in the
[.NET SDK repository](https://github.com/dapr/dotnet-sdk/blob/master/examples/Workflow/WorkflowConsoleApp/Workflows/OrderProcessingWorkflow.cs)
but a brief extraction of this sample is available below.

```csharp
public class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
string orderId = context.InstanceId;
var logger = context.CreateReplaySafeLogger<OrderProcessingWorkflow>(); //Use this method to access the logger instance
logger.LogInformation("Received order {orderId} for {quantity} {name} at ${totalCost}", orderId, order.Quantity, order.Name, order.TotalCost);

//...
}
}
```



Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Dapr.Workflow;
using Microsoft.Extensions.Logging;
using WorkflowConsoleApp.Activities;

namespace WorkflowConsoleApp.Workflows
Expand All @@ -16,7 +17,10 @@ public class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
string orderId = context.InstanceId;
var logger = context.CreateReplaySafeLogger<OrderProcessingWorkflow>();

logger.LogInformation("Received order {orderId} for {quantity} {name} at ${totalCost}", orderId, order.Quantity, order.Name, order.TotalCost);

// Notify the user that an order has come through
await context.CallActivityAsync(
nameof(NotifyActivity),
Expand All @@ -31,6 +35,8 @@ await context.CallActivityAsync(
// If there is insufficient inventory, fail and let the user know
if (!result.Success)
{
logger.LogError("Insufficient inventory for {orderName}", order.Name);

// End the workflow here since we don't have sufficient inventory
await context.CallActivityAsync(
nameof(NotifyActivity),
Expand All @@ -39,8 +45,10 @@ await context.CallActivityAsync(
}

// Require orders over a certain threshold to be approved
if (order.TotalCost > 50000)
const int threshold = 50000;
if (order.TotalCost > threshold)
{
logger.LogInformation("Requesting manager approval since total cost {totalCost} exceeds threshold {threshold}", order.TotalCost, threshold);
// Request manager approval for the order
await context.CallActivityAsync(nameof(RequestApprovalActivity), order);

Expand All @@ -51,9 +59,13 @@ await context.CallActivityAsync(
ApprovalResult approvalResult = await context.WaitForExternalEventAsync<ApprovalResult>(
eventName: "ManagerApproval",
timeout: TimeSpan.FromSeconds(30));

logger.LogInformation("Approval result: {approvalResult}", approvalResult);
context.SetCustomStatus($"Approval result: {approvalResult}");
if (approvalResult == ApprovalResult.Rejected)
{
logger.LogWarning("Order was rejected by approver");

// The order was rejected, end the workflow here
await context.CallActivityAsync(
nameof(NotifyActivity),
Expand All @@ -63,6 +75,8 @@ await context.CallActivityAsync(
}
catch (TaskCanceledException)
{
logger.LogError("Cancelling order because it didn't receive an approval");

// An approval timeout results in automatic order cancellation
await context.CallActivityAsync(
nameof(NotifyActivity),
Expand All @@ -72,6 +86,7 @@ await context.CallActivityAsync(
}

// There is enough inventory available so the user can purchase the item(s). Process their payment
logger.LogInformation("Processing payment as sufficient inventory is available");
await context.CallActivityAsync(
nameof(ProcessPaymentActivity),
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost),
Expand All @@ -88,13 +103,15 @@ await context.CallActivityAsync(
catch (WorkflowTaskFailedException e)
{
// Let them know their payment processing failed
logger.LogError("Order {orderId} failed! Details: {errorMessage}", orderId, e.FailureDetails.ErrorMessage);
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Order {orderId} Failed! Details: {e.FailureDetails.ErrorMessage}"));
return new OrderResult(Processed: false);
}

// Let them know their payment was processed
logger.LogError("Order {orderId} has completed!", orderId);
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Order {orderId} has completed!"));
Expand Down
23 changes: 22 additions & 1 deletion src/Dapr.Workflow/DaprWorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ------------------------------------------------------------------------

using Microsoft.Extensions.Logging;

namespace Dapr.Workflow
{
using System;
Expand All @@ -34,7 +36,7 @@ internal DaprWorkflowContext(TaskOrchestrationContext innerContext)
public override DateTime CurrentUtcDateTime => this.innerContext.CurrentUtcDateTime;

public override bool IsReplaying => this.innerContext.IsReplaying;

public override Task CallActivityAsync(string name, object? input = null, WorkflowTaskOptions? options = null)
{
return WrapExceptions(this.innerContext.CallActivityAsync(name, input, options?.ToDurableTaskOptions()));
Expand Down Expand Up @@ -95,6 +97,25 @@ public override Guid NewGuid()
return this.innerContext.NewGuid();
}

/// <summary>
/// Returns an instance of <see cref="ILogger"/> that is replay-safe, meaning that the logger only
/// writes logs when the orchestrator is not replaying previous history.
/// </summary>
/// <param name="categoryName">The logger's category name.</param>
/// <returns>An instance of <see cref="ILogger"/> that is replay-safe.</returns>
public override ILogger CreateReplaySafeLogger(string categoryName) =>
this.innerContext.CreateReplaySafeLogger(categoryName);

/// <inheritdoc cref="CreateReplaySafeLogger(string)" />
/// <param name="type">The type to derive the category name from.</param>
public override ILogger CreateReplaySafeLogger(Type type) =>
this.innerContext.CreateReplaySafeLogger(type);

/// <inheritdoc cref="CreateReplaySafeLogger(string)" />
/// <typeparam name="T">The type to derive category name from.</typeparam>
public override ILogger CreateReplaySafeLogger<T>() =>
this.innerContext.CreateReplaySafeLogger<T>();

static async Task WrapExceptions(Task task)
{
try
Expand Down
21 changes: 21 additions & 0 deletions src/Dapr.Workflow/IWorkflowContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace Dapr.Workflow;

/// <summary>
/// Provides functionality available to orchestration code.
/// </summary>
public interface IWorkflowContext
{
/// <summary>
/// Gets a value indicating whether the orchestration or operation is currently replaying itself.
/// </summary>
/// <remarks>
/// This property is useful when there is logic that needs to run only when *not* replaying. For example,
/// certain types of application logging may become too noisy when duplicated as part of replay. The
/// application code could check to see whether the function is being replayed and then issue
/// the log statements when this value is <c>false</c>.
/// </remarks>
/// <value>
/// <c>true</c> if the orchestration or operation is currently being replayed; otherwise <c>false</c>.
/// </value>
bool IsReplaying { get; }
}
22 changes: 20 additions & 2 deletions src/Dapr.Workflow/WorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ------------------------------------------------------------------------

using Microsoft.Extensions.Logging;

namespace Dapr.Workflow
{
using System;
Expand All @@ -21,13 +23,13 @@ namespace Dapr.Workflow
/// Context object used by workflow implementations to perform actions such as scheduling activities, durable timers, waiting for
/// external events, and for getting basic information about the current workflow instance.
/// </summary>
public abstract class WorkflowContext
public abstract class WorkflowContext : IWorkflowContext
{
/// <summary>
/// Gets the name of the current workflow.
/// </summary>
public abstract string Name { get; }

/// <summary>
/// Gets the instance ID of the current workflow.
/// </summary>
Expand Down Expand Up @@ -271,6 +273,22 @@ public virtual Task CallChildWorkflowAsync(
{
return this.CallChildWorkflowAsync<object>(workflowName, input, options);
}

/// <summary>
/// Returns an instance of <see cref="ILogger"/> that is replay-safe, meaning that the logger only
/// writes logs when the orchestrator is not replaying previous history.
/// </summary>
/// <param name="categoryName">The logger's category name.</param>
/// <returns>An instance of <see cref="ILogger"/> that is replay-safe.</returns>
public abstract ILogger CreateReplaySafeLogger(string categoryName);

/// <inheritdoc cref="CreateReplaySafeLogger(string)" />
/// <param name="type">The type to derive the category name from.</param>
public abstract ILogger CreateReplaySafeLogger(Type type);

/// <inheritdoc cref="CreateReplaySafeLogger(string)" />
/// <typeparam name="T">The type to derive category name from.</typeparam>
public abstract ILogger CreateReplaySafeLogger<T>();

/// <summary>
/// Restarts the workflow with a new input and clears its history.
Expand Down
34 changes: 0 additions & 34 deletions src/Dapr.Workflow/WorkflowEngineClient.cs

This file was deleted.

3 changes: 1 addition & 2 deletions src/Dapr.Workflow/WorkflowLoggingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ internal sealed class WorkflowLoggingService : IHostedService
private static readonly HashSet<string> registeredWorkflows = new();
private static readonly HashSet<string> registeredActivities = new();

public WorkflowLoggingService(ILogger<WorkflowLoggingService> logger, IConfiguration configuration)
public WorkflowLoggingService(ILogger<WorkflowLoggingService> logger)
{
this.logger = logger;

}
public Task StartAsync(CancellationToken cancellationToken)
{
Expand Down
22 changes: 5 additions & 17 deletions src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,50 +35,38 @@ public static IServiceCollection AddDaprWorkflow(
Action<WorkflowRuntimeOptions> configure,
ServiceLifetime lifetime = ServiceLifetime.Singleton)
{
if (serviceCollection == null)
{
throw new ArgumentNullException(nameof(serviceCollection));
}
ArgumentNullException.ThrowIfNull(serviceCollection, nameof(serviceCollection));

serviceCollection.AddDaprClient(lifetime: lifetime);
serviceCollection.AddHttpClient();
serviceCollection.AddHostedService<WorkflowLoggingService>();

switch (lifetime)
{
case ServiceLifetime.Singleton:
#pragma warning disable CS0618 // Type or member is obsolete - keeping around temporarily - replaced by DaprWorkflowClient
serviceCollection.TryAddSingleton<WorkflowEngineClient>();
#pragma warning restore CS0618 // Type or member is obsolete
serviceCollection.TryAddSingleton<DaprWorkflowClient>();
serviceCollection.TryAddSingleton<WorkflowRuntimeOptions>();
break;
case ServiceLifetime.Scoped:
#pragma warning disable CS0618 // Type or member is obsolete - keeping around temporarily - replaced by DaprWorkflowClient
serviceCollection.TryAddScoped<WorkflowEngineClient>();
#pragma warning restore CS0618 // Type or member is obsolete
serviceCollection.TryAddScoped<DaprWorkflowClient>();
serviceCollection.TryAddScoped<WorkflowRuntimeOptions>();
break;
case ServiceLifetime.Transient:
#pragma warning disable CS0618 // Type or member is obsolete - keeping around temporarily - replaced by DaprWorkflowClient
serviceCollection.TryAddTransient<WorkflowEngineClient>();
#pragma warning restore CS0618 // Type or member is obsolete
serviceCollection.TryAddTransient<DaprWorkflowClient>();
serviceCollection.TryAddTransient<WorkflowRuntimeOptions>();
break;
default:
throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, null);
}

serviceCollection.AddOptions<WorkflowRuntimeOptions>().Configure(configure);

//Register the factory and force resolution so the Durable Task client and worker can be registered
using (var scope = serviceCollection.BuildServiceProvider().CreateScope())
{
var httpClientFactory = scope.ServiceProvider.GetRequiredService<IHttpClientFactory>();
var configuration = scope.ServiceProvider.GetService<IConfiguration>();

var factory = new DaprWorkflowClientBuilderFactory(configuration, httpClientFactory);
factory.CreateClientBuilder(serviceCollection, configure);
}
Expand Down

0 comments on commit 6bde7db

Please sign in to comment.