From 35b449ae38c72b0d9e56416ccca04f88151f3897 Mon Sep 17 00:00:00 2001 From: AnakinRaW Date: Sat, 20 Apr 2024 15:31:30 +0200 Subject: [PATCH] more pipeline improvements --- .../src/IRunner.cs | 14 +- .../src/IStepQueue.cs | 20 -- .../ParallelProducerConsumerPipeline.cs | 125 ++++---- .../src/Pipelines/Pipeline.cs | 83 ++++- .../src/Pipelines/SimplePipeline.cs | 35 +-- .../Runners/ParallelProducerConsumerRunner.cs | 7 +- .../src/Runners/ParallelRunner.cs | 7 +- .../src/Steps/RunPipelineStep.cs | 41 +++ ...CommonUtilities.SimplePipeline.Test.csproj | 2 + .../test/PipelineTest.cs | 18 +- .../{ => Pipelines}/ParallelPipelineTests.cs | 2 +- .../ParallelProducerConsumerPipelineTest.cs | 187 +++++++++++ .../SequentialPipelineTests.cs | 2 +- .../test/Pipelines/SimplePipelineTests.cs | 131 ++++++++ .../ParallelProducerConsumerRunnerTest.cs | 54 +++- .../test/Runners/ParallelRunnerTest.cs | 20 +- .../{TaskRunnerTest.cs => StepRunnerTest.cs} | 35 ++- .../test/SimplePipelineTests.cs | 292 ------------------ .../test/Steps/RunPipelineStepTest.cs | 63 ++++ .../test/TestStep.cs | 9 +- 20 files changed, 715 insertions(+), 432 deletions(-) delete mode 100644 src/CommonUtilities.SimplePipeline/src/IStepQueue.cs create mode 100644 src/CommonUtilities.SimplePipeline/src/Steps/RunPipelineStep.cs rename src/CommonUtilities.SimplePipeline/test/{ => Pipelines}/ParallelPipelineTests.cs (97%) create mode 100644 src/CommonUtilities.SimplePipeline/test/Pipelines/ParallelProducerConsumerPipelineTest.cs rename src/CommonUtilities.SimplePipeline/test/{ => Pipelines}/SequentialPipelineTests.cs (97%) create mode 100644 src/CommonUtilities.SimplePipeline/test/Pipelines/SimplePipelineTests.cs rename src/CommonUtilities.SimplePipeline/test/Runners/{TaskRunnerTest.cs => StepRunnerTest.cs} (69%) delete mode 100644 src/CommonUtilities.SimplePipeline/test/SimplePipelineTests.cs create mode 100644 src/CommonUtilities.SimplePipeline/test/Steps/RunPipelineStepTest.cs diff --git a/src/CommonUtilities.SimplePipeline/src/IRunner.cs b/src/CommonUtilities.SimplePipeline/src/IRunner.cs index 11d440c..920fc54 100644 --- a/src/CommonUtilities.SimplePipeline/src/IRunner.cs +++ b/src/CommonUtilities.SimplePipeline/src/IRunner.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -7,7 +8,7 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline; /// /// Execution engine to run one or many s. /// -public interface IRunner : IStepQueue, IDisposable +public interface IRunner : IDisposable { /// /// Gets raised when the execution of an failed with an exception. @@ -20,4 +21,15 @@ public interface IRunner : IStepQueue, IDisposable /// The cancellation token, allowing the runner to cancel the operation. /// A task that represents the completion of the operation. Task RunAsync(CancellationToken token); + + /// + /// List of only those steps which are scheduled for execution of an . + /// + public IReadOnlyList Steps { get; } + + /// + /// Adds an to the . + /// + /// The step to app. + void AddStep(IStep activity); } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/IStepQueue.cs b/src/CommonUtilities.SimplePipeline/src/IStepQueue.cs deleted file mode 100644 index 68fc484..0000000 --- a/src/CommonUtilities.SimplePipeline/src/IStepQueue.cs +++ /dev/null @@ -1,20 +0,0 @@ -using System.Collections.Generic; - -namespace AnakinRaW.CommonUtilities.SimplePipeline; - -/// -/// A queue of . -/// -public interface IStepQueue -{ - /// - /// List of only those steps which are scheduled for execution of an . - /// - public IReadOnlyList Steps { get; } - - /// - /// Adds an to the . - /// - /// The step to app. - void AddStep(IStep activity); -} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Pipelines/ParallelProducerConsumerPipeline.cs b/src/CommonUtilities.SimplePipeline/src/Pipelines/ParallelProducerConsumerPipeline.cs index db8d300..efd4a49 100644 --- a/src/CommonUtilities.SimplePipeline/src/Pipelines/ParallelProducerConsumerPipeline.cs +++ b/src/CommonUtilities.SimplePipeline/src/Pipelines/ParallelProducerConsumerPipeline.cs @@ -1,5 +1,5 @@ using System; -using System.Linq; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using AnakinRaW.CommonUtilities.SimplePipeline.Runners; @@ -12,79 +12,94 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline; /// /// Useful, if preparation is work intensive. /// -public abstract class ParallelProducerConsumerPipeline : DisposableObject, IPipeline +public abstract class ParallelProducerConsumerPipeline : Pipeline { private readonly bool _failFast; private CancellationTokenSource? _linkedCancellationTokenSource; private readonly ParallelProducerConsumerRunner _runner; - private bool? _prepareSuccessful; - - /// - /// Gets or sets a value indicating whether the pipeline has encountered a failure. - /// - protected bool PipelineFailed { get; set; } - - + private Exception? _preparationException; + /// /// Initializes a new instance of the class. /// /// The service provider for dependency injection within the pipeline. /// The number of worker threads to be used for parallel execution. /// A value indicating whether the pipeline should fail fast. - protected ParallelProducerConsumerPipeline(IServiceProvider serviceProvider, int workerCount = 4, bool failFast = true) + protected ParallelProducerConsumerPipeline(int workerCount, bool failFast, IServiceProvider serviceProvider) : base(serviceProvider) { _failFast = failFast; _runner = new ParallelProducerConsumerRunner(workerCount, serviceProvider); } /// - public async Task PrepareAsync() - { - ThrowIfDisposed(); - if (_prepareSuccessful.HasValue) - return _prepareSuccessful.Value; - - await BuildSteps(_runner).ConfigureAwait(false); - - _prepareSuccessful = true; - return _prepareSuccessful.Value; - } - - /// - public async Task RunAsync(CancellationToken token = default) + public sealed override async Task RunAsync(CancellationToken token = default) { ThrowIfDisposed(); token.ThrowIfCancellationRequested(); - if (_prepareSuccessful is false) + if (PrepareSuccessful is false) return; - try - { - _linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); + _linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); - if (_prepareSuccessful is null) + if (PrepareSuccessful is null) + { + Task.Run(async () => { - Task.Run(async () => + try { - try - { - var result = await PrepareAsync().ConfigureAwait(false); - if (!result) - _linkedCancellationTokenSource?.Cancel(); - } - finally + var result = await PrepareAsync().ConfigureAwait(false); + if (!result) { - _runner.Finish(); + PipelineFailed = true; + _linkedCancellationTokenSource?.Cancel(); } - }, token).Forget(); - } + } + catch (Exception e) + { + PipelineFailed = true; + _preparationException = e; + } + finally + { + _runner.Finish(); + } + }, token).Forget(); + } + try + { + await RunCoreAsync(_linkedCancellationTokenSource.Token).ConfigureAwait(false); + } + catch (Exception) + { + PipelineFailed = true; + throw; + } + } + + /// + /// Builds the steps in the order they should be executed within the pipeline. + /// + /// A list of steps in the order they should be executed. + protected abstract IAsyncEnumerable BuildSteps(); + /// + protected override async Task PrepareCoreAsync() + { + await foreach (var step in BuildSteps().ConfigureAwait(false)) + _runner.AddStep(step); + return true; + } + /// + protected override async Task RunCoreAsync(CancellationToken token) + { + try + { _runner.Error += OnError; - await _runner.RunAsync(_linkedCancellationTokenSource.Token).ConfigureAwait(false); + await _runner.RunAsync(token).ConfigureAwait(false); } finally { @@ -96,26 +111,15 @@ public async Task RunAsync(CancellationToken token = default) } } - if (!PipelineFailed && _prepareSuccessful.HasValue && _prepareSuccessful.Value) + if (!PipelineFailed) return; - if (_prepareSuccessful is not true) - throw new InvalidOperationException("Preparation of the pipeline failed."); - - var failedBuildSteps = _runner.Steps - .Where(p => p.Error != null && !p.Error.IsExceptionType()) - .ToList(); + if (_preparationException is not null) + throw _preparationException; - if (failedBuildSteps.Any()) - throw new StepFailureException(failedBuildSteps); + ThrowIfAnyStepsFailed(_runner.Steps); } - /// - /// Builds the steps in the order they should be executed within the pipeline. - /// - /// A list of steps in the order they should be executed. - protected abstract Task BuildSteps(IStepQueue queue); - /// /// Called when an error occurs within a step. /// @@ -127,4 +131,11 @@ protected virtual void OnError(object sender, StepErrorEventArgs e) if (_failFast || e.Cancel) _linkedCancellationTokenSource?.Cancel(); } + + /// + protected override void DisposeManagedResources() + { + base.DisposeManagedResources(); + _runner.Dispose(); + } } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Pipelines/Pipeline.cs b/src/CommonUtilities.SimplePipeline/src/Pipelines/Pipeline.cs index e90afd9..78ac591 100644 --- a/src/CommonUtilities.SimplePipeline/src/Pipelines/Pipeline.cs +++ b/src/CommonUtilities.SimplePipeline/src/Pipelines/Pipeline.cs @@ -1,4 +1,8 @@ -using System; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -9,28 +13,68 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline; /// public abstract class Pipeline : DisposableObject, IPipeline { - private bool? _prepareSuccessful; + /// + /// The service provider of the . + /// + protected readonly IServiceProvider ServiceProvider; + + /// + /// The logger of the . + /// + protected readonly ILogger? Logger; + + /// + /// Gets a value indicating whether the preparation of the was successful. + /// + protected bool? PrepareSuccessful { get; private set; } + + /// + /// Gets or sets a value indicating whether the pipeline has encountered a failure. + /// + public bool PipelineFailed { get; protected set; } + + /// + /// + /// + protected Pipeline(IServiceProvider serviceProvider) + { + ServiceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); + Logger = serviceProvider.GetService()?.CreateLogger(GetType()); + } /// public async Task PrepareAsync() { - if (IsDisposed) - throw new ObjectDisposedException("Pipeline already disposed"); - if (_prepareSuccessful.HasValue) - return _prepareSuccessful.Value; - _prepareSuccessful = await PrepareCoreAsync().ConfigureAwait(false); - return _prepareSuccessful.Value; + ThrowIfDisposed(); + if (PrepareSuccessful.HasValue) + return PrepareSuccessful.Value; + PrepareSuccessful = await PrepareCoreAsync().ConfigureAwait(false); + return PrepareSuccessful.Value; } /// - public async Task RunAsync(CancellationToken token = default) + public virtual async Task RunAsync(CancellationToken token = default) { - if (IsDisposed) - throw new ObjectDisposedException("Pipeline already disposed"); + ThrowIfDisposed(); token.ThrowIfCancellationRequested(); if (!await PrepareAsync().ConfigureAwait(false)) return; - await RunCoreAsync(token).ConfigureAwait(false); + + try + { + await RunCoreAsync(token).ConfigureAwait(false); + } + catch (Exception) + { + PipelineFailed = true; + throw; + } + } + + /// + public override string ToString() + { + return GetType().Name; } /// @@ -45,4 +89,19 @@ public async Task RunAsync(CancellationToken token = default) /// It's assured this instance is already prepared when this method gets called. /// Provided to allow cancellation. protected abstract Task RunCoreAsync(CancellationToken token); + + /// + /// Throws an if any of the passed steps ended with an error that is not the result of cancellation. + /// + /// The steps that were executed by the pipeline. + /// If any of has an error that is not the result of cancellation. + protected void ThrowIfAnyStepsFailed(IEnumerable steps) + { + var failedBuildSteps = steps + .Where(p => p.Error != null && !p.Error.IsExceptionType()) + .ToList(); + + if (failedBuildSteps.Any()) + throw new StepFailureException(failedBuildSteps); + } } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs b/src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs index 500df64..9b1f8a5 100644 --- a/src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs +++ b/src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs @@ -1,32 +1,22 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; +using AnakinRaW.CommonUtilities.SimplePipeline.Runners; namespace AnakinRaW.CommonUtilities.SimplePipeline; /// -/// Base class for a simple pipeline implementation utilizing one runner. +/// Base class for a simple pipeline implementation utilizing one . /// /// The type of the step runner. -public abstract class SimplePipeline : Pipeline where TRunner : IRunner +public abstract class SimplePipeline : Pipeline where TRunner : StepRunner { - /// - /// The service provider within the pipeline. - /// - protected readonly IServiceProvider ServiceProvider; - - private readonly bool _failFast; + private readonly bool _failFast; private CancellationTokenSource? _linkedCancellationTokenSource; private IRunner _buildRunner = null!; - /// - /// Gets or sets a value indicating whether the pipeline has encountered a failure. - /// - protected bool PipelineFailed { get; set; } - /// /// Initializes a new instance of the class. /// @@ -35,9 +25,8 @@ public abstract class SimplePipeline : Pipeline where TRunner : IRunner /// /// The parameter determines whether the pipeline should stop executing immediately upon encountering the first failure. /// - protected SimplePipeline(IServiceProvider serviceProvider, bool failFast = true) + protected SimplePipeline(IServiceProvider serviceProvider, bool failFast = true) : base(serviceProvider) { - ServiceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); _failFast = failFast; } @@ -95,12 +84,7 @@ protected override async Task RunCoreAsync(CancellationToken token) if (!PipelineFailed) return; - var failedBuildSteps = _buildRunner.Steps - .Where(p => p.Error != null && !p.Error.IsExceptionType()) - .ToList(); - - if (failedBuildSteps.Any()) - throw new StepFailureException(failedBuildSteps); + ThrowIfAnyStepsFailed(_buildRunner.Steps); } /// @@ -114,4 +98,11 @@ protected virtual void OnError(object sender, StepErrorEventArgs e) if (_failFast || e.Cancel) _linkedCancellationTokenSource?.Cancel(); } + + /// + protected override void DisposeManagedResources() + { + base.DisposeManagedResources(); + _buildRunner.Dispose(); + } } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Runners/ParallelProducerConsumerRunner.cs b/src/CommonUtilities.SimplePipeline/src/Runners/ParallelProducerConsumerRunner.cs index ac21b86..1a97489 100644 --- a/src/CommonUtilities.SimplePipeline/src/Runners/ParallelProducerConsumerRunner.cs +++ b/src/CommonUtilities.SimplePipeline/src/Runners/ParallelProducerConsumerRunner.cs @@ -69,9 +69,6 @@ public Task RunAsync(CancellationToken token) public void Wait() { Wait(Timeout.InfiniteTimeSpan); - var exception = Exception; - if (exception != null) - throw exception; } /// @@ -79,6 +76,10 @@ public void Wait(TimeSpan timeout) { if (!Task.WaitAll(_runnerTasks, timeout)) throw new TimeoutException(); + + var exception = Exception; + if (exception != null) + throw exception; } /// diff --git a/src/CommonUtilities.SimplePipeline/src/Runners/ParallelRunner.cs b/src/CommonUtilities.SimplePipeline/src/Runners/ParallelRunner.cs index 026ec24..7ac369d 100644 --- a/src/CommonUtilities.SimplePipeline/src/Runners/ParallelRunner.cs +++ b/src/CommonUtilities.SimplePipeline/src/Runners/ParallelRunner.cs @@ -51,9 +51,6 @@ public override Task RunAsync(CancellationToken token) public void Wait() { Wait(Timeout.InfiniteTimeSpan); - var exception = Exception; - if (exception != null) - throw exception; } /// @@ -61,6 +58,10 @@ public void Wait(TimeSpan timeout) { if (!Task.WaitAll(_tasks, timeout)) throw new TimeoutException(); + + var exception = Exception; + if (exception != null) + throw exception; } /// diff --git a/src/CommonUtilities.SimplePipeline/src/Steps/RunPipelineStep.cs b/src/CommonUtilities.SimplePipeline/src/Steps/RunPipelineStep.cs new file mode 100644 index 0000000..a0a3adf --- /dev/null +++ b/src/CommonUtilities.SimplePipeline/src/Steps/RunPipelineStep.cs @@ -0,0 +1,41 @@ +using System; +using System.Linq; +using System.Threading; +using Microsoft.Extensions.Logging; + +namespace AnakinRaW.CommonUtilities.SimplePipeline.Steps; + +/// +/// A step that executes a pipeline and waits for it to end +/// +/// The pipeline to execute. +/// The service provider +public class RunPipelineStep(IPipeline pipeline, IServiceProvider serviceProvider) : SynchronizedStep(serviceProvider) +{ + private readonly IPipeline _pipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline)); + + /// + protected override void RunSynchronized(CancellationToken token) + { + Logger?.LogInformation($"Running {_pipeline}..."); + try + { + _pipeline.RunAsync(token).Wait(); + Logger?.LogInformation($"Finished {_pipeline}"); + } + catch (AggregateException e) + { + var root = e.InnerExceptions.FirstOrDefault(); + if (root is not null) + throw root; + throw; + } + } + + /// + protected override void DisposeManagedResources() + { + base.DisposeManagedResources(); + _pipeline.Dispose(); + } +} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/test/CommonUtilities.SimplePipeline.Test.csproj b/src/CommonUtilities.SimplePipeline/test/CommonUtilities.SimplePipeline.Test.csproj index 14a63c3..a54128b 100644 --- a/src/CommonUtilities.SimplePipeline/test/CommonUtilities.SimplePipeline.Test.csproj +++ b/src/CommonUtilities.SimplePipeline/test/CommonUtilities.SimplePipeline.Test.csproj @@ -17,10 +17,12 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + + runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/CommonUtilities.SimplePipeline/test/PipelineTest.cs b/src/CommonUtilities.SimplePipeline/test/PipelineTest.cs index a695349..ae69cdd 100644 --- a/src/CommonUtilities.SimplePipeline/test/PipelineTest.cs +++ b/src/CommonUtilities.SimplePipeline/test/PipelineTest.cs @@ -12,7 +12,8 @@ public class PipelineTest [Fact] public async Task Test_Prepare() { - var pipeline = new Mock + var sp = new Mock().Object; + var pipeline = new Mock(sp) { CallBase = true }; @@ -26,7 +27,8 @@ public async Task Test_Prepare() [Fact] public async Task Test_Run() { - var pipeline = new Mock + var sp = new Mock().Object; + var pipeline = new Mock(sp) { CallBase = true }; @@ -43,7 +45,8 @@ public async Task Test_Run() [Fact] public async Task Test_Prepare_Run() { - var pipeline = new Mock + var sp = new Mock().Object; + var pipeline = new Mock(sp) { CallBase = true }; @@ -61,7 +64,8 @@ public async Task Test_Prepare_Run() [Fact] public async Task Test_Run_Cancelled_ThrowsOperationCanceledException() { - var pipeline = new Mock + var sp = new Mock().Object; + var pipeline = new Mock(sp) { CallBase = true }; @@ -76,7 +80,8 @@ public async Task Test_Run_Cancelled_ThrowsOperationCanceledException() [Fact] public async Task Test_Prepare_Disposed_ThrowsObjectDisposedException() { - var pipeline = new Mock + var sp = new Mock().Object; + var pipeline = new Mock(sp) { CallBase = true }; @@ -90,7 +95,8 @@ public async Task Test_Prepare_Disposed_ThrowsObjectDisposedException() [Fact] public async Task Test_Run_Disposed_ThrowsObjectDisposedException() { - var pipeline = new Mock + var sp = new Mock().Object; + var pipeline = new Mock(sp) { CallBase = true }; diff --git a/src/CommonUtilities.SimplePipeline/test/ParallelPipelineTests.cs b/src/CommonUtilities.SimplePipeline/test/Pipelines/ParallelPipelineTests.cs similarity index 97% rename from src/CommonUtilities.SimplePipeline/test/ParallelPipelineTests.cs rename to src/CommonUtilities.SimplePipeline/test/Pipelines/ParallelPipelineTests.cs index 93bcff8..cd7240e 100644 --- a/src/CommonUtilities.SimplePipeline/test/ParallelPipelineTests.cs +++ b/src/CommonUtilities.SimplePipeline/test/Pipelines/ParallelPipelineTests.cs @@ -7,7 +7,7 @@ using Moq.Protected; using Xunit; -namespace AnakinRaW.CommonUtilities.SimplePipeline.Test; +namespace AnakinRaW.CommonUtilities.SimplePipeline.Test.Pipelines; public class ParallelPipelineTests { diff --git a/src/CommonUtilities.SimplePipeline/test/Pipelines/ParallelProducerConsumerPipelineTest.cs b/src/CommonUtilities.SimplePipeline/test/Pipelines/ParallelProducerConsumerPipelineTest.cs new file mode 100644 index 0000000..af72308 --- /dev/null +++ b/src/CommonUtilities.SimplePipeline/test/Pipelines/ParallelProducerConsumerPipelineTest.cs @@ -0,0 +1,187 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Moq; +using Moq.Protected; +using Xunit; + +namespace AnakinRaW.CommonUtilities.SimplePipeline.Test.Pipelines; + +public class ParallelProducerConsumerPipelineTest +{ + [Fact] + public async Task Test_Prepare() + { + var sp = new Mock(); + var pipeline = new Mock(4, true, sp.Object) + { + CallBase = true + }; + + var steps = new List(); + pipeline.Protected().Setup>("BuildSteps").Returns(steps.ToAsyncEnumerable()); + + await pipeline.Object.PrepareAsync(); + await pipeline.Object.PrepareAsync(); + + pipeline.Protected().Verify>("BuildSteps", Times.Once()); + pipeline.Protected().Verify>("PrepareCoreAsync", Times.Once()); + } + + [Fact] + public async Task Test_Run_RunsNormally() + { + var sp = new Mock(); + var pipeline = new Mock(4, true, sp.Object) + { + CallBase = true + }; + + var stepRun = false; + var s = new Mock(); + s.Setup(i => i.Run(It.IsAny())).Callback(() => stepRun = true); + + var steps = new List{s.Object}; + pipeline.Protected().Setup>("BuildSteps").Returns(steps.ToAsyncEnumerable()); + + await pipeline.Object.RunAsync(); + + Assert.True(stepRun); + + pipeline.Protected().Verify>("BuildSteps", Times.Once()); + pipeline.Protected().Verify>("PrepareCoreAsync", Times.Once()); + } + + [Fact] + public async Task Test_Run_DelayedAdd() + { + var sp = new Mock(); + var pipeline = new Mock(4, true, sp.Object) + { + CallBase = true + }; + + var tsc = new TaskCompletionSource(); + + var s1 = new Mock(); + s1.Setup(i => i.Run(It.IsAny())).Callback(() => + { + { + Task.Delay(3000).Wait(); + tsc.SetResult(0); + } + }); + + + var s2Run = false; + var s2 = new Mock(); + s2.Setup(i => i.Run(It.IsAny())).Callback(() => + { + { + s2Run = true; + } + }); + + + pipeline.Protected().Setup>("BuildSteps").Returns(ValueFunction); + + await pipeline.Object.RunAsync(); + + Assert.True(s2Run); + + pipeline.Protected().Verify>("BuildSteps", Times.Once()); + pipeline.Protected().Verify>("PrepareCoreAsync", Times.Once()); + return; + + async IAsyncEnumerable ValueFunction() + { + yield return s1.Object; + await tsc.Task; + yield return s2.Object; + } + } + + [Fact] + public async Task Test_Run_DelayedAdd_PrepareFails() + { + var sp = new Mock(); + var pipeline = new Mock(4, true, sp.Object) + { + CallBase = true + }; + + var s1 = new Mock(); + s1.Setup(i => i.Run(It.IsAny())); + + var s2 = new Mock(); + s2.Setup(i => i.Run(It.IsAny())); + + pipeline.Protected().Setup>("BuildSteps").Returns(ValueFunction); + + await Assert.ThrowsAsync(async () => await pipeline.Object.RunAsync()); + + pipeline.Protected().Verify>("BuildSteps", Times.Once()); + pipeline.Protected().Verify>("PrepareCoreAsync", Times.Once()); + return; + + async IAsyncEnumerable ValueFunction() + { + yield return s1.Object; + yield return s2.Object; + throw new ApplicationException("test"); + } + } + + + [Fact] + public async Task Test_Run_PrepareCancelled() + { + var sp = new Mock(); + var pipeline = new Mock(4, true, sp.Object) + { + CallBase = true + }; + + var cts = new CancellationTokenSource(); + var tcs = new TaskCompletionSource(); + + var s1 = new Mock(); + s1.Setup(i => i.Run(It.IsAny())).Callback(() => + { + { + Task.Delay(3000).Wait(); + tcs.SetResult(0); + } + }); + + + var s2Run = false; + var s2 = new Mock(); + s2.Setup(i => i.Run(It.IsAny())).Callback(() => + { + { + s2Run = true; + } + }); + + pipeline.Protected().Setup>("BuildSteps").Returns(ValueFunction); + + await Assert.ThrowsAnyAsync(async () => await pipeline.Object.RunAsync(cts.Token)); + + Assert.False(s2Run); + + pipeline.Protected().Verify>("BuildSteps", Times.Once()); + pipeline.Protected().Verify>("PrepareCoreAsync", Times.Once()); + return; + + async IAsyncEnumerable ValueFunction() + { + yield return s1.Object; + await tcs.Task; + cts.Cancel(); + yield return s2.Object; + } + } +} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/test/SequentialPipelineTests.cs b/src/CommonUtilities.SimplePipeline/test/Pipelines/SequentialPipelineTests.cs similarity index 97% rename from src/CommonUtilities.SimplePipeline/test/SequentialPipelineTests.cs rename to src/CommonUtilities.SimplePipeline/test/Pipelines/SequentialPipelineTests.cs index ca2379a..46b1a81 100644 --- a/src/CommonUtilities.SimplePipeline/test/SequentialPipelineTests.cs +++ b/src/CommonUtilities.SimplePipeline/test/Pipelines/SequentialPipelineTests.cs @@ -8,7 +8,7 @@ using Moq.Protected; using Xunit; -namespace AnakinRaW.CommonUtilities.SimplePipeline.Test; +namespace AnakinRaW.CommonUtilities.SimplePipeline.Test.Pipelines; public class SequentialPipelineTests { diff --git a/src/CommonUtilities.SimplePipeline/test/Pipelines/SimplePipelineTests.cs b/src/CommonUtilities.SimplePipeline/test/Pipelines/SimplePipelineTests.cs new file mode 100644 index 0000000..5f40c7a --- /dev/null +++ b/src/CommonUtilities.SimplePipeline/test/Pipelines/SimplePipelineTests.cs @@ -0,0 +1,131 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using AnakinRaW.CommonUtilities.SimplePipeline.Runners; +using Microsoft.Extensions.DependencyInjection; +using Moq; +using Moq.Protected; +using Xunit; + +namespace AnakinRaW.CommonUtilities.SimplePipeline.Test.Pipelines; + +public class SimplePipelineTests +{ + [Fact] + public async Task Test_Run_SimplePipelineRunsNormally() + { + var sc = new ServiceCollection(); + + var sp = sc.BuildServiceProvider(); + + var pipelineMock = new Mock>(sp, false) + { + CallBase = true + }; + pipelineMock.Protected().Setup("CreateRunner").Returns(new StepRunner(sp)); + pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List + { + new TestStep(1, "123") + })); + + var pipeline = pipelineMock.Object; + + var cancellationTokenSource = new CancellationTokenSource(); + await pipeline.RunAsync(cancellationTokenSource.Token); + } + + [Fact] + public async Task Test_Run_SimplePipelineFails_ThrowsStepFailureException() + { + var sc = new ServiceCollection(); + + var sp = sc.BuildServiceProvider(); + + var s = new Mock(); + s.Setup(i => i.Run(It.IsAny())).Throws(); + s.Setup(i => i.Error).Returns(new Exception()); + + var pipelineMock = new Mock>(sp, false) + { + CallBase = true + }; + + pipelineMock.Protected().Setup("CreateRunner").Returns(new StepRunner(sp)); + pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List + { + s.Object, + })); + + var pipeline = pipelineMock.Object; + + var cancellationTokenSource = new CancellationTokenSource(); + await Assert.ThrowsAsync(async () => await pipeline.RunAsync(cancellationTokenSource.Token)); + } + + [Fact] + public async Task Test_Run_SimplePipelineFailsSlow_ThrowsStepFailureException() + { + var sc = new ServiceCollection(); + + var sp = sc.BuildServiceProvider(); + + var s1 = new Mock(); + s1.Setup(i => i.Run(It.IsAny())).Throws(); + s1.Setup(i => i.Error).Returns(new Exception()); + + var flag = false; + var s2 = new Mock(); + s2.Setup(i => i.Run(It.IsAny())).Callback(() => flag = true); + + var pipelineMock = new Mock>(sp, false) + { + CallBase = true + }; + + pipelineMock.Protected().Setup("CreateRunner").Returns(new StepRunner(sp)); + pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List + { + s1.Object, + s2.Object + })); + + var pipeline = pipelineMock.Object; + + await Assert.ThrowsAsync(async () => await pipeline.RunAsync()); + Assert.True(flag); + } + + [Fact] + public async Task Test_Run_SimplePipelineFailsFast_ThrowsStepFailureException() + { + var sc = new ServiceCollection(); + + var sp = sc.BuildServiceProvider(); + + var s1 = new Mock(); + s1.Setup(i => i.Run(It.IsAny())).Throws(); + s1.Setup(i => i.Error).Returns(new Exception()); + + var flag = false; + var s2 = new Mock(); + s2.Setup(i => i.Run(It.IsAny())).Callback(() => flag = true); + + var pipelineMock = new Mock>(sp, true) + { + CallBase = true + }; + + pipelineMock.Protected().Setup("CreateRunner").Returns(new StepRunner(sp)); + pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List + { + s1.Object, + s2.Object + })); + + var pipeline = pipelineMock.Object; + + await Assert.ThrowsAsync(async () => await pipeline.RunAsync()); + Assert.False(flag); + } +} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelProducerConsumerRunnerTest.cs b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelProducerConsumerRunnerTest.cs index 1b3e4d5..eb800cf 100644 --- a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelProducerConsumerRunnerTest.cs +++ b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelProducerConsumerRunnerTest.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using AnakinRaW.CommonUtilities.SimplePipeline.Runners; using Microsoft.Extensions.DependencyInjection; @@ -67,6 +69,8 @@ public async Task Test_Run_AwaitDoesNotThrow() await runner.RunAsync(default); Assert.NotNull(runner.Exception); Assert.True(ran2); + + Assert.Equal([s1.Object, s2.Object], new HashSet(runner.Steps)); } [Fact] @@ -180,10 +184,10 @@ public async Task Test_Run_AddDelayed_Await() var runTask = runner.RunAsync(default); - Task.Run(() => + Task.Run(async () => { runner.AddStep(s3.Object); - Task.Delay(1000); + await Task.Delay(1000); runner.Finish(); }); @@ -241,4 +245,50 @@ public async Task Test_Run_AddDelayed_AwaitAndWait() Assert.True(ran2); Assert.True(ran3); } + + [Fact] + public async Task Test_Run_AddDelayed_Cancelled() + { + var sc = new ServiceCollection(); + var runner = new ParallelProducerConsumerRunner(2, sc.BuildServiceProvider()); + + var tcs = new TaskCompletionSource(); + + var s1 = new Mock(); + var s2 = new Mock(); + + runner.AddStep(s1.Object); + + var cts = new CancellationTokenSource(); + + var ran1 = false; + s1.Setup(t => t.Run(cts.Token)).Callback(() => + { + ran1 = true; + tcs.SetResult(0); + }); + + var ran2 = false; + s2.Setup(t => t.Run(cts.Token)).Callback(() => + { + ran2 = true; + }); + + var runTask = runner.RunAsync(cts.Token); + + Task.Run(async () => + { + await tcs.Task.ConfigureAwait(false); + cts.Cancel(); + runner.AddStep(s2.Object); + runner.Finish(); + }).Forget(); + + await Assert.ThrowsAnyAsync(async () => await runTask); + + Assert.True(ran1); + Assert.False(ran2); + + Assert.Equal([s1.Object], runner.Steps); + } } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelRunnerTest.cs b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelRunnerTest.cs index 0242555..724bad4 100644 --- a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelRunnerTest.cs +++ b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelRunnerTest.cs @@ -49,27 +49,26 @@ public async Task Test_Run_NoWait() var sc = new ServiceCollection(); var runner = new ParallelRunner(2, sc.BuildServiceProvider()); - var s1 = new Mock(); - var s2 = new Mock(); var b = new ManualResetEvent(false); - runner.AddStep(s1.Object); - runner.AddStep(s2.Object); - var ran1 = false; - s1.Setup(t => t.Run(default)).Callback(() => + var s1 = new TestStep(_ => { b.WaitOne(); ran1 = true; }); + var ran2 = false; - s2.Setup(t => t.Run(default)).Callback(() => + var s2 = new TestStep(_ => { b.WaitOne(); ran2 = true; }); + runner.AddStep(s1); + runner.AddStep(s2); + var runTask = runner.RunAsync(default); Assert.False(ran1); @@ -80,6 +79,13 @@ public async Task Test_Run_NoWait() Assert.True(ran1); Assert.True(ran2); + + runner.Dispose(); + + Assert.True(s1.IsDisposed); + Assert.True(s2.IsDisposed); + + Assert.Equal([s1, s2], runner.Steps); } [Fact] diff --git a/src/CommonUtilities.SimplePipeline/test/Runners/TaskRunnerTest.cs b/src/CommonUtilities.SimplePipeline/test/Runners/StepRunnerTest.cs similarity index 69% rename from src/CommonUtilities.SimplePipeline/test/Runners/TaskRunnerTest.cs rename to src/CommonUtilities.SimplePipeline/test/Runners/StepRunnerTest.cs index 99f8628..b3ffe6e 100644 --- a/src/CommonUtilities.SimplePipeline/test/Runners/TaskRunnerTest.cs +++ b/src/CommonUtilities.SimplePipeline/test/Runners/StepRunnerTest.cs @@ -8,7 +8,7 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Test.Runners; -public class TaskRunnerTest +public class StepRunnerTest { [Fact] public async Task Test_Run_Empty() @@ -76,4 +76,37 @@ public async Task Test_Run_WithError() Assert.True(hasError); Assert.True(ran); } + + [Fact] + public async Task Test_Run() + { + var sc = new ServiceCollection(); + var runner = new StepRunner(sc.BuildServiceProvider()); + + var hasError = false; + runner.Error += (_, __) => + { + hasError = true; + }; + + var ran1 = false; + var ran2 = false; + var step1 = new TestStep(_ => ran1 = true); + var step2 = new TestStep(_ => ran2 = true); + + runner.AddStep(step1); + runner.AddStep(step2); + await runner.RunAsync(default); + + Assert.False(hasError); + Assert.True(ran1); + Assert.True(ran2); + + runner.Dispose(); + + Assert.True(step1.IsDisposed); + Assert.True(step2.IsDisposed); + + Assert.Equal([step1, step2], runner.Steps); + } } diff --git a/src/CommonUtilities.SimplePipeline/test/SimplePipelineTests.cs b/src/CommonUtilities.SimplePipeline/test/SimplePipelineTests.cs deleted file mode 100644 index 019b7e4..0000000 --- a/src/CommonUtilities.SimplePipeline/test/SimplePipelineTests.cs +++ /dev/null @@ -1,292 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using AnakinRaW.CommonUtilities.SimplePipeline.Runners; -using Microsoft.Extensions.DependencyInjection; -using Moq; -using Moq.Protected; -using Xunit; - -namespace AnakinRaW.CommonUtilities.SimplePipeline.Test; - -public class ParallelProducerConsumerPipelineTest -{ - [Fact] - public async Task Test_Prepare() - { - var sp = new Mock(); - var pipeline = new Mock(sp.Object, 4, true) - { - CallBase = true - }; - - await pipeline.Object.PrepareAsync(); - await pipeline.Object.PrepareAsync(); - - pipeline.Protected().Verify("BuildSteps", Times.Once(), ItExpr.IsAny()); - } - - [Fact] - public async Task Test_Run_RunsNormally() - { - var sp = new Mock(); - var pipeline = new Mock(sp.Object, 4, true) - { - CallBase = true - }; - - var stepRun = false; - var s = new Mock(); - s.Setup(i => i.Run(It.IsAny())).Callback(() => stepRun = true); - - pipeline.Protected().Setup("BuildSteps", ItExpr.IsAny()).Callback((IStepQueue q) => - { - q.AddStep(s.Object); - }); - - await pipeline.Object.RunAsync(); - - Assert.True(stepRun); - pipeline.Protected().Verify("BuildSteps", Times.Once(), ItExpr.IsAny()); - } - - [Fact] - public async Task Test_Run_DelayedAdd() - { - var sp = new Mock(); - var pipeline = new Mock(sp.Object, 4, true) - { - CallBase = true - }; - - var mre = new ManualResetEventSlim(); - - var s1 = new Mock(); - s1.Setup(i => i.Run(It.IsAny())).Callback(() => - { - { - Task.Delay(3000).Wait(); - mre.Set(); - } - }); - - - var s2Run = false; - var s2 = new Mock(); - s2.Setup(i => i.Run(It.IsAny())).Callback(() => - { - { - s2Run = true; - } - }); - - pipeline.Protected().Setup("BuildSteps", ItExpr.IsAny()).Callback((IStepQueue q) => - { - q.AddStep(s1.Object); - mre.Wait(); - q.AddStep(s2.Object); - - }); - - await pipeline.Object.RunAsync(); - - Assert.True(s2Run); - pipeline.Protected().Verify("BuildSteps", Times.Once(), ItExpr.IsAny()); - } - - [Fact] - public async Task Test_Run_DelayedAdd_PrepareFails() - { - var sp = new Mock(); - var pipeline = new Mock(sp.Object, 4, true) - { - CallBase = true - }; - - var s1 = new Mock(); - s1.Setup(i => i.Run(It.IsAny())); - - var s2 = new Mock(); - s2.Setup(i => i.Run(It.IsAny())); - - pipeline.Protected().Setup("BuildSteps", ItExpr.IsAny()) - .Callback((IStepQueue q) => - { - q.AddStep(s1.Object); - q.AddStep(s2.Object); - }) - .Throws(); - - await Assert.ThrowsAsync(async () => await pipeline.Object.RunAsync()); - - pipeline.Protected().Verify("BuildSteps", Times.Once(), ItExpr.IsAny()); - } - - - [Fact] - public async Task Test_Run_PrepareCancelled() - { - var sp = new Mock(); - var pipeline = new Mock(sp.Object, 4, true) - { - CallBase = true - }; - - var cts = new CancellationTokenSource(); - var mre = new ManualResetEventSlim(); - - var s1 = new Mock(); - s1.Setup(i => i.Run(It.IsAny())).Callback(() => - { - { - Task.Delay(3000).Wait(); - mre.Set(); - } - }); - - - var s2Run = false; - var s2 = new Mock(); - s2.Setup(i => i.Run(It.IsAny())).Callback(() => - { - { - s2Run = true; - } - }); - - pipeline.Protected().Setup("BuildSteps", ItExpr.IsAny()).Callback((IStepQueue q) => - { - q.AddStep(s1.Object); - mre.Wait(); - cts.Cancel(); - q.AddStep(s2.Object); - - }); - - await Assert.ThrowsAnyAsync(async () => await pipeline.Object.RunAsync(cts.Token)); - - Assert.False(s2Run); - pipeline.Protected().Verify("BuildSteps", Times.Once(), ItExpr.IsAny()); - } -} - -public class SimplePipelineTests -{ - [Fact] - public async Task Test_Run_SimplePipelineRunsNormally() - { - var sc = new ServiceCollection(); - - var sp = sc.BuildServiceProvider(); - - var pipelineMock = new Mock>(sp, false) - { - CallBase = true - }; - pipelineMock.Protected().Setup("CreateRunner").Returns(new StepRunner(sp)); - pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List - { - new TestStep(1, "123") - })); - - var pipeline = pipelineMock.Object; - - var cancellationTokenSource = new CancellationTokenSource(); - await pipeline.RunAsync(cancellationTokenSource.Token); - } - - [Fact] - public async Task Test_Run_SimplePipelineFails_ThrowsStepFailureException() - { - var sc = new ServiceCollection(); - - var sp = sc.BuildServiceProvider(); - - var s = new Mock(); - s.Setup(i => i.Run(It.IsAny())).Throws(); - s.Setup(i => i.Error).Returns(new Exception()); - - var pipelineMock = new Mock>(sp, false) - { - CallBase = true - }; - - pipelineMock.Protected().Setup("CreateRunner").Returns(new StepRunner(sp)); - pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List - { - s.Object, - })); - - var pipeline = pipelineMock.Object; - - var cancellationTokenSource = new CancellationTokenSource(); - await Assert.ThrowsAsync(async () => await pipeline.RunAsync(cancellationTokenSource.Token)); - } - - [Fact] - public async Task Test_Run_SimplePipelineFailsSlow_ThrowsStepFailureException() - { - var sc = new ServiceCollection(); - - var sp = sc.BuildServiceProvider(); - - var s1 = new Mock(); - s1.Setup(i => i.Run(It.IsAny())).Throws(); - s1.Setup(i => i.Error).Returns(new Exception()); - - var flag = false; - var s2 = new Mock(); - s2.Setup(i => i.Run(It.IsAny())).Callback(() => flag = true); - - var pipelineMock = new Mock>(sp, false) - { - CallBase = true - }; - - pipelineMock.Protected().Setup("CreateRunner").Returns(new StepRunner(sp)); - pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List - { - s1.Object, - s2.Object - })); - - var pipeline = pipelineMock.Object; - - await Assert.ThrowsAsync(async () => await pipeline.RunAsync()); - Assert.True(flag); - } - - [Fact] - public async Task Test_Run_SimplePipelineFailsFast_ThrowsStepFailureException() - { - var sc = new ServiceCollection(); - - var sp = sc.BuildServiceProvider(); - - var s1 = new Mock(); - s1.Setup(i => i.Run(It.IsAny())).Throws(); - s1.Setup(i => i.Error).Returns(new Exception()); - - var flag = false; - var s2 = new Mock(); - s2.Setup(i => i.Run(It.IsAny())).Callback(() => flag = true); - - var pipelineMock = new Mock>(sp, true) - { - CallBase = true - }; - - pipelineMock.Protected().Setup("CreateRunner").Returns(new StepRunner(sp)); - pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List - { - s1.Object, - s2.Object - })); - - var pipeline = pipelineMock.Object; - - await Assert.ThrowsAsync(async () => await pipeline.RunAsync()); - Assert.False(flag); - } -} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/test/Steps/RunPipelineStepTest.cs b/src/CommonUtilities.SimplePipeline/test/Steps/RunPipelineStepTest.cs new file mode 100644 index 0000000..f4a3500 --- /dev/null +++ b/src/CommonUtilities.SimplePipeline/test/Steps/RunPipelineStepTest.cs @@ -0,0 +1,63 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using AnakinRaW.CommonUtilities.SimplePipeline.Steps; +using Moq; +using Xunit; + +namespace AnakinRaW.CommonUtilities.SimplePipeline.Test.Steps; + +public class RunPipelineStepTest +{ + [Fact] + public void Test_Run() + { + var pipeline = new Mock(); + + var run = false; + pipeline.Setup(p => p.RunAsync(default)).Returns(Task.Run(async () => + { + await Task.Delay(3000).ConfigureAwait(false); + run = true; + })); + + var step = new RunPipelineStep(pipeline.Object, new Mock().Object); + + step.Run(default); + Assert.True(run); + } + + [Fact] + public void Test_Run_PipelineFails() + { + var pipeline = new Mock(); + + pipeline.Setup(p => p.RunAsync(default)) + .Returns(() => Task.Run(() => throw new ApplicationException("test"))); + + var step = new RunPipelineStep(pipeline.Object, new Mock().Object); + + Assert.Throws(() => step.Run(default)); + } + + [Fact] + public void Test_Run_Cancel() + { + var pipeline = new Mock(); + + var run = false; + + var cts = new CancellationTokenSource(); + pipeline.Setup(p => p.RunAsync(cts.Token)) + .Returns(() => Task.Run(() => + { + cts.Token.ThrowIfCancellationRequested(); + })); + + var step = new RunPipelineStep(pipeline.Object, new Mock().Object); + + + cts.Cancel(); + Assert.Throws(() => step.Run(cts.Token)); + } +} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/test/TestStep.cs b/src/CommonUtilities.SimplePipeline/test/TestStep.cs index ea55aa7..d2391e1 100644 --- a/src/CommonUtilities.SimplePipeline/test/TestStep.cs +++ b/src/CommonUtilities.SimplePipeline/test/TestStep.cs @@ -4,8 +4,9 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Test; -public class TestStep : IProgressStep +public class TestStep : DisposableObject, IProgressStep { + private readonly Action _action; public ProgressType Type => new() { Id = "test", DisplayName = "Test" }; public IStepProgressReporter ProgressReporter { get; } @@ -22,13 +23,13 @@ public TestStep(long size, string text) Text = text; } - public void Dispose() + public TestStep(Action action) { - throw new NotImplementedException(); + _action = action; } public void Run(CancellationToken token) { - throw new NotImplementedException(); + _action?.Invoke(token); } } \ No newline at end of file