diff --git a/WorkflowCore.sln b/WorkflowCore.sln index e7fb81a2e..aa8e3f37e 100644 --- a/WorkflowCore.sln +++ b/WorkflowCore.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.29509.3 +# Visual Studio Version 17 +VisualStudioVersion = 17.9.34518.117 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{EF47161E-E399-451C-BDE8-E92AAD3BD761}" EndProject @@ -154,6 +154,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample19", "sr EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Persistence.RavenDB", "src\providers\WorkflowCore.Persistence.RavenDB\WorkflowCore.Persistence.RavenDB.csproj", "{AF205715-C8B7-42EF-BF14-AFC9E7F27242}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.TestScope", "src\samples\WorkflowCore.TestScope\WorkflowCore.TestScope.csproj", "{5F9E0E2C-BA1E-4F8A-8A46-D52050402B8A}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -376,6 +378,10 @@ Global {AF205715-C8B7-42EF-BF14-AFC9E7F27242}.Debug|Any CPU.Build.0 = Debug|Any CPU {AF205715-C8B7-42EF-BF14-AFC9E7F27242}.Release|Any CPU.ActiveCfg = Release|Any CPU {AF205715-C8B7-42EF-BF14-AFC9E7F27242}.Release|Any CPU.Build.0 = Release|Any CPU + {5F9E0E2C-BA1E-4F8A-8A46-D52050402B8A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5F9E0E2C-BA1E-4F8A-8A46-D52050402B8A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5F9E0E2C-BA1E-4F8A-8A46-D52050402B8A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5F9E0E2C-BA1E-4F8A-8A46-D52050402B8A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -438,6 +444,7 @@ Global {54DE20BA-EBA7-4BF0-9BD9-F03766849716} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB} {1223ED47-3E5E-4960-B70D-DFAF550F6666} = {5080DB09-CBE8-4C45-9957-C3BB7651755E} {AF205715-C8B7-42EF-BF14-AFC9E7F27242} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2} + {5F9E0E2C-BA1E-4F8A-8A46-D52050402B8A} = {5080DB09-CBE8-4C45-9957-C3BB7651755E} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4} diff --git a/src/WorkflowCore/Interface/IWorkflowController.cs b/src/WorkflowCore/Interface/IWorkflowController.cs index 924925d2c..48d60a113 100644 --- a/src/WorkflowCore/Interface/IWorkflowController.cs +++ b/src/WorkflowCore/Interface/IWorkflowController.cs @@ -1,4 +1,5 @@ -using System; +using Microsoft.Extensions.DependencyInjection; +using System; using System.Threading.Tasks; namespace WorkflowCore.Interface @@ -9,6 +10,10 @@ public interface IWorkflowController Task StartWorkflow(string workflowId, int? version, object data = null, string reference=null); Task StartWorkflow(string workflowId, TData data = null, string reference=null) where TData : class, new(); Task StartWorkflow(string workflowId, int? version, TData data = null, string reference=null) where TData : class, new(); + Task StartWorkflowWithScope(IServiceScope scope, string workflowId, object data = null, string reference = null); + Task StartWorkflowWithScope(IServiceScope scope, string workflowId, int? version, object data = null, string reference = null); + Task StartWorkflowWithScope(IServiceScope scope, string workflowId, TData data = null, string reference = null) where TData : class, new(); + Task StartWorkflowWithScope(IServiceScope scope, string workflowId, int? version, TData data = null, string reference = null) where TData : class, new(); Task PublishEvent(string eventName, string eventKey, object eventData, DateTime? effectiveDate = null); void RegisterWorkflow() where TWorkflow : IWorkflow; diff --git a/src/WorkflowCore/Models/WorkflowInstance.cs b/src/WorkflowCore/Models/WorkflowInstance.cs index 71f1e3c28..6877d46a3 100644 --- a/src/WorkflowCore/Models/WorkflowInstance.cs +++ b/src/WorkflowCore/Models/WorkflowInstance.cs @@ -1,4 +1,5 @@ -using System; +using Microsoft.Extensions.DependencyInjection; +using System; using System.Linq; namespace WorkflowCore.Models @@ -27,6 +28,8 @@ public class WorkflowInstance public DateTime? CompleteTime { get; set; } + public IServiceScope CurrentServiceScope { get; set; } + public bool IsBranchComplete(string parentId) { return ExecutionPointers diff --git a/src/WorkflowCore/Services/WorkflowController.cs b/src/WorkflowCore/Services/WorkflowController.cs index 79272e084..fbc72b1aa 100755 --- a/src/WorkflowCore/Services/WorkflowController.cs +++ b/src/WorkflowCore/Services/WorkflowController.cs @@ -37,12 +37,12 @@ public WorkflowController(IPersistenceProvider persistenceStore, IDistributedLoc _dateTimeProvider = dateTimeProvider; } - public Task StartWorkflow(string workflowId, object data = null, string reference=null) + public Task StartWorkflow(string workflowId, object data = null, string reference = null) { return StartWorkflow(workflowId, null, data, reference); } - public Task StartWorkflow(string workflowId, int? version, object data = null, string reference=null) + public Task StartWorkflow(string workflowId, int? version, object data = null, string reference = null) { return StartWorkflow(workflowId, version, data, reference); } @@ -53,10 +53,37 @@ public Task StartWorkflow(string workflowId, TData data = null, s return StartWorkflow(workflowId, null, data, reference); } - public async Task StartWorkflow(string workflowId, int? version, TData data = null, string reference=null) + public Task StartWorkflow(string workflowId, int? version, TData data = null, string reference = null) where TData : class, new() { + return StartWorkflowCore(workflowId, version, data, reference); + } + + public Task StartWorkflowWithScope(IServiceScope scope, string workflowId, object data = null, string reference = null) + { + return StartWorkflowWithScope(scope, workflowId, null, data, reference); + } + + public Task StartWorkflowWithScope(IServiceScope scope, string workflowId, int? version, object data = null, string reference = null) + { + return StartWorkflowWithScope(scope, workflowId, version, data, reference); + } + + public Task StartWorkflowWithScope(IServiceScope scope, string workflowId, TData data = null, string reference = null) + where TData : class, new() + { + return StartWorkflowWithScope(scope, workflowId, null, data, reference); + } + public Task StartWorkflowWithScope(IServiceScope scope, string workflowId, int? version, TData data = null, string reference = null) + where TData : class, new() + { + return StartWorkflowCore(workflowId, version, data, reference, scope); + } + + private async Task StartWorkflowCore(string workflowId, int? version, TData data, string reference, IServiceScope workflowScope = null) + where TData : class, new() + { var def = _registry.GetDefinition(workflowId, version); if (def == null) { @@ -72,7 +99,8 @@ public async Task StartWorkflow(string workflowId, int? version, NextExecution = 0, CreateTime = _dateTimeProvider.UtcNow, Status = WorkflowStatus.Runnable, - Reference = reference + Reference = reference, + CurrentServiceScope = workflowScope }; if ((def.DataType != null) && (data == null)) diff --git a/src/WorkflowCore/Services/WorkflowExecutor.cs b/src/WorkflowCore/Services/WorkflowExecutor.cs index da3e9cd85..437b8445b 100755 --- a/src/WorkflowCore/Services/WorkflowExecutor.cs +++ b/src/WorkflowCore/Services/WorkflowExecutor.cs @@ -156,50 +156,60 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe CancellationToken = cancellationToken }; - using (var scope = _scopeProvider.CreateScope(context)) + var shouldDisposeScope = false; + var scope = context.Workflow.CurrentServiceScope; + if (scope == null) { - _logger.LogDebug("Starting step {StepName} on workflow {WorkflowId}", step.Name, workflow.Id); + scope = _scopeProvider.CreateScope(context); + shouldDisposeScope = true; + } + + _logger.LogDebug("Starting step {StepName} on workflow {WorkflowId}", step.Name, workflow.Id); - IStepBody body = step.ConstructBody(scope.ServiceProvider); - var stepExecutor = scope.ServiceProvider.GetRequiredService(); + IStepBody body = step.ConstructBody(scope.ServiceProvider); + var stepExecutor = scope.ServiceProvider.GetRequiredService(); - if (body == null) + if (body == null) + { + _logger.LogError("Unable to construct step body {BodyType}", step.BodyType.ToString()); + pointer.SleepUntil = _datetimeProvider.UtcNow.Add(_options.ErrorRetryInterval); + wfResult.Errors.Add(new ExecutionError { - _logger.LogError("Unable to construct step body {BodyType}", step.BodyType.ToString()); - pointer.SleepUntil = _datetimeProvider.UtcNow.Add(_options.ErrorRetryInterval); - wfResult.Errors.Add(new ExecutionError - { - WorkflowId = workflow.Id, - ExecutionPointerId = pointer.Id, - ErrorTime = _datetimeProvider.UtcNow, - Message = $"Unable to construct step body {step.BodyType}" - }); - return; - } + WorkflowId = workflow.Id, + ExecutionPointerId = pointer.Id, + ErrorTime = _datetimeProvider.UtcNow, + Message = $"Unable to construct step body {step.BodyType}" + }); + return; + } - foreach (var input in step.Inputs) - input.AssignInput(workflow.Data, body, context); + foreach (var input in step.Inputs) + input.AssignInput(workflow.Data, body, context); - switch (step.BeforeExecute(wfResult, context, pointer, body)) - { - case ExecutionPipelineDirective.Defer: - return; - case ExecutionPipelineDirective.EndWorkflow: - workflow.Status = WorkflowStatus.Complete; - workflow.CompleteTime = _datetimeProvider.UtcNow; - return; - } + switch (step.BeforeExecute(wfResult, context, pointer, body)) + { + case ExecutionPipelineDirective.Defer: + return; + case ExecutionPipelineDirective.EndWorkflow: + workflow.Status = WorkflowStatus.Complete; + workflow.CompleteTime = _datetimeProvider.UtcNow; + return; + } - var result = await stepExecutor.ExecuteStep(context, body); + var result = await stepExecutor.ExecuteStep(context, body); - if (result.Proceed) - { - foreach (var output in step.Outputs) - output.AssignOutput(workflow.Data, body, context); - } + if (result.Proceed) + { + foreach (var output in step.Outputs) + output.AssignOutput(workflow.Data, body, context); + } - _executionResultProcessor.ProcessExecutionResult(workflow, def, pointer, step, result, wfResult); - step.AfterExecute(wfResult, context, result, pointer); + _executionResultProcessor.ProcessExecutionResult(workflow, def, pointer, step, result, wfResult); + step.AfterExecute(wfResult, context, result, pointer); + + if (shouldDisposeScope) + { + scope.Dispose(); } } diff --git a/src/WorkflowCore/Services/WorkflowHost.cs b/src/WorkflowCore/Services/WorkflowHost.cs index 73c8850fa..5d3beecc6 100644 --- a/src/WorkflowCore/Services/WorkflowHost.cs +++ b/src/WorkflowCore/Services/WorkflowHost.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using OpenTelemetry.Trace; using WorkflowCore.Interface; @@ -49,7 +50,7 @@ public WorkflowHost(IPersistenceProvider persistenceStore, IQueueProvider queueP _activityController = activityController; _lifeCycleEventHub = lifeCycleEventHub; } - + public Task StartWorkflow(string workflowId, object data = null, string reference=null) { return _workflowController.StartWorkflow(workflowId, data, reference); @@ -65,13 +66,35 @@ public Task StartWorkflow(string workflowId, TData data = null, s { return _workflowController.StartWorkflow(workflowId, null, data, reference); } - + public Task StartWorkflow(string workflowId, int? version, TData data = null, string reference=null) where TData : class, new() { return _workflowController.StartWorkflow(workflowId, version, data, reference); } + public Task StartWorkflowWithScope(IServiceScope scope, string workflowId, object data = null, string reference = null) + { + return _workflowController.StartWorkflowWithScope(scope, workflowId, data, reference); + } + + public Task StartWorkflowWithScope(IServiceScope scope, string workflowId, int? version, object data = null, string reference = null) + { + return _workflowController.StartWorkflowWithScope(scope, workflowId, version, data, reference); + } + + public Task StartWorkflowWithScope(IServiceScope scope, string workflowId, TData data = null, string reference = null) + where TData : class, new() + { + return _workflowController.StartWorkflowWithScope(scope, workflowId, null, data, reference); + } + + public Task StartWorkflowWithScope(IServiceScope scope, string workflowId, int? version, TData data = null, string reference = null) + where TData : class, new() + { + return _workflowController.StartWorkflowWithScope(scope, workflowId, version, data, reference); + } + public Task PublishEvent(string eventName, string eventKey, object eventData, DateTime? effectiveDate = null) { return _workflowController.PublishEvent(eventName, eventKey, eventData, effectiveDate); @@ -81,7 +104,7 @@ public void Start() { StartAsync(CancellationToken.None).Wait(); } - + public async Task StartAsync(CancellationToken cancellationToken) { var activity = WorkflowActivity.StartHost(); @@ -118,7 +141,7 @@ public void Stop() { StopAsync(CancellationToken.None).Wait(); } - + public async Task StopAsync(CancellationToken cancellationToken) { _shutdown = true; diff --git a/src/samples/WorkflowCore.TestScope/NUnitTest.cs b/src/samples/WorkflowCore.TestScope/NUnitTest.cs new file mode 100644 index 000000000..0273a8d5b --- /dev/null +++ b/src/samples/WorkflowCore.TestScope/NUnitTest.cs @@ -0,0 +1,101 @@ +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; +using NUnit.Framework; +using System; +using System.Collections.Generic; +using System.Threading; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using WorkflowCore.Testing; +using WorkflowCore.TestScope.Workflow; + +namespace WorkflowCore.TestScope +{ + [TestFixture] + public class NUnitTest : IDisposable + { + private IServiceProvider _serviceProvider; + private IWorkflowHost _host; + private IPersistenceProvider _persistenceProvider; + private List _unhandledStepErrors = new(); + + [SetUp] + protected void Setup() + { + IServiceCollection services = new ServiceCollection(); + services.AddLogging(); + services.AddWorkflow(options => options.UsePollInterval(TimeSpan.FromSeconds(3))); + services.AddScoped(); + services.AddTransient(); + services.AddTransient(); + + _serviceProvider = services.BuildServiceProvider(); + + _persistenceProvider = _serviceProvider.GetService(); + _host = _serviceProvider.GetService(); + _host.RegisterWorkflow(); + _host.RegisterWorkflow(); + + _host.OnStepError += Host_OnStepError; + _host.Start(); + } + + [Test] + public void NUnit_workflow_scope_test_sample() + { + using var scope1 = _serviceProvider.CreateScope(); + var countService1 = scope1.ServiceProvider.GetRequiredService(); + using var scope2 = _serviceProvider.CreateScope(); + var countService2 = scope2.ServiceProvider.GetRequiredService(); + + var helloWorldWorkflowId = _host.StartWorkflowWithScope(scope1, "HelloWorld", new WorkflowData { ExecuteTimes = 6 }).Result; + + var goodbyeWorldWorkflowId = _host.StartWorkflowWithScope(scope2, "GoodbyeWorld", new WorkflowData { ExecuteTimes = 8 }).Result; + + WaitForWorkflowToComplete(helloWorldWorkflowId, TimeSpan.FromSeconds(30)); + WaitForWorkflowToComplete(goodbyeWorldWorkflowId, TimeSpan.FromSeconds(30)); + + GetStatus(helloWorldWorkflowId).Should().Be(WorkflowStatus.Complete); + countService1.Count.Should().Be(6); + + GetStatus(goodbyeWorldWorkflowId).Should().Be(WorkflowStatus.Complete); + countService2.Count.Should().Be(8); + + _unhandledStepErrors.Count.Should().Be(0); + } + + private void Host_OnStepError(WorkflowInstance workflow, WorkflowStep step, Exception exception) + { + _unhandledStepErrors.Add(new StepError + { + Exception = exception, + Step = step, + Workflow = workflow + }); + } + + private void WaitForWorkflowToComplete(string workflowId, TimeSpan timeOut) + { + var status = GetStatus(workflowId); + var counter = 0; + while ((status == WorkflowStatus.Runnable) && (counter < (timeOut.TotalMilliseconds / 100))) + { + Thread.Sleep(100); + counter++; + status = GetStatus(workflowId); + } + } + + private WorkflowStatus GetStatus(string workflowId) + { + var instance = _persistenceProvider.GetWorkflowInstance(workflowId).Result; + return instance.Status; + } + + public void Dispose() + { + _host.Stop(); + } + } +} + diff --git a/src/samples/WorkflowCore.TestScope/Workflow/CountService.cs b/src/samples/WorkflowCore.TestScope/Workflow/CountService.cs new file mode 100644 index 000000000..19a86c1fa --- /dev/null +++ b/src/samples/WorkflowCore.TestScope/Workflow/CountService.cs @@ -0,0 +1,12 @@ +namespace WorkflowCore.TestScope.Workflow +{ + public class CountService + { + public int Count { get; private set; } + + public void Increment() + { + Count++; + } + } +} diff --git a/src/samples/WorkflowCore.TestScope/Workflow/GoodbyeWorld.cs b/src/samples/WorkflowCore.TestScope/Workflow/GoodbyeWorld.cs new file mode 100644 index 000000000..938462852 --- /dev/null +++ b/src/samples/WorkflowCore.TestScope/Workflow/GoodbyeWorld.cs @@ -0,0 +1,23 @@ +using System; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.TestScope.Workflow +{ + public class GoodbyeWorld : StepBody + { + private readonly CountService _countService; + + public GoodbyeWorld(CountService countService) + { + _countService = countService; + } + + public override ExecutionResult Run(IStepExecutionContext context) + { + _countService.Increment(); + Console.WriteLine($"[{_countService.Count}] Goodbye world"); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.TestScope/Workflow/GoodbyeWorldWorkflow.cs b/src/samples/WorkflowCore.TestScope/Workflow/GoodbyeWorldWorkflow.cs new file mode 100644 index 000000000..7a9a58a03 --- /dev/null +++ b/src/samples/WorkflowCore.TestScope/Workflow/GoodbyeWorldWorkflow.cs @@ -0,0 +1,21 @@ +using System; +using System.Linq; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.TestScope.Workflow +{ + public class GoodbyeWorldWorkflow : IWorkflow + { + public string Id => "GoodbyeWorld"; + public int Version => 1; + + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(_ => Console.WriteLine("GoodbyeWorld started")) + .ForEach(data => Enumerable.Range(1, data.ExecuteTimes)) + .Do(x => x.StartWith()); + } + } +} diff --git a/src/samples/WorkflowCore.TestScope/Workflow/HelloWorld.cs b/src/samples/WorkflowCore.TestScope/Workflow/HelloWorld.cs new file mode 100644 index 000000000..2bcce9fbe --- /dev/null +++ b/src/samples/WorkflowCore.TestScope/Workflow/HelloWorld.cs @@ -0,0 +1,23 @@ +using System; +using WorkflowCore.Interface; +using WorkflowCore.Models; + +namespace WorkflowCore.TestScope.Workflow +{ + public class HelloWorld : StepBody + { + private readonly CountService _countService; + + public HelloWorld(CountService countService) + { + _countService = countService; + } + + public override ExecutionResult Run(IStepExecutionContext context) + { + _countService.Increment(); + Console.WriteLine($"[{_countService.Count}] Hello world"); + return ExecutionResult.Next(); + } + } +} diff --git a/src/samples/WorkflowCore.TestScope/Workflow/HelloWorldWorkflow.cs b/src/samples/WorkflowCore.TestScope/Workflow/HelloWorldWorkflow.cs new file mode 100644 index 000000000..f4c27394c --- /dev/null +++ b/src/samples/WorkflowCore.TestScope/Workflow/HelloWorldWorkflow.cs @@ -0,0 +1,20 @@ +using System; +using System.Linq; +using WorkflowCore.Interface; + +namespace WorkflowCore.TestScope.Workflow +{ + public class HelloWorldWorkflow : IWorkflow + { + public string Id => "HelloWorld"; + public int Version => 1; + + public void Build(IWorkflowBuilder builder) + { + builder + .StartWith(_ => Console.WriteLine("HelloWorld started")) + .ForEach(data => Enumerable.Range(1, data.ExecuteTimes)) + .Do(x => x.StartWith()); + } + } +} diff --git a/src/samples/WorkflowCore.TestScope/Workflow/WorkflowData.cs b/src/samples/WorkflowCore.TestScope/Workflow/WorkflowData.cs new file mode 100644 index 000000000..aabf4be8a --- /dev/null +++ b/src/samples/WorkflowCore.TestScope/Workflow/WorkflowData.cs @@ -0,0 +1,6 @@ +namespace WorkflowCore.TestScope.Workflow; + +public class WorkflowData +{ + public int ExecuteTimes { get; set; } +} diff --git a/src/samples/WorkflowCore.TestScope/WorkflowCore.TestScope.csproj b/src/samples/WorkflowCore.TestScope/WorkflowCore.TestScope.csproj new file mode 100644 index 000000000..255380123 --- /dev/null +++ b/src/samples/WorkflowCore.TestScope/WorkflowCore.TestScope.csproj @@ -0,0 +1,22 @@ + + + + + + + + + + + + + + + + + + + + + +