From eccc9f1e1cb3d76bfa4524b4891884c44375d0ba Mon Sep 17 00:00:00 2001 From: AnakinRaW Date: Sun, 31 Mar 2024 14:09:03 +0200 Subject: [PATCH 1/9] use disposableobject --- .../src/Steps/PipelineStep.cs | 28 +------------------ .../src/Steps/SynchronizedStep.cs | 18 +++--------- 2 files changed, 5 insertions(+), 41 deletions(-) diff --git a/src/CommonUtilities.SimplePipeline/src/Steps/PipelineStep.cs b/src/CommonUtilities.SimplePipeline/src/Steps/PipelineStep.cs index 5564d50..9048917 100644 --- a/src/CommonUtilities.SimplePipeline/src/Steps/PipelineStep.cs +++ b/src/CommonUtilities.SimplePipeline/src/Steps/PipelineStep.cs @@ -9,10 +9,8 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Steps; /// /// Base implementation for an /// -public abstract class PipelineStep : IStep +public abstract class PipelineStep : DisposableObject, IStep { - internal bool IsDisposed { get; private set; } - /// /// The service provider of this step. /// @@ -37,20 +35,7 @@ protected PipelineStep(IServiceProvider serviceProvider) Services = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); Logger = serviceProvider.GetService()?.CreateLogger(GetType()); } - - /// - ~PipelineStep() - { - Dispose(false); - } - /// - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - /// public void Run(CancellationToken token) { @@ -101,17 +86,6 @@ public override string ToString() /// Provided to allow cancellation. protected abstract void RunCore(CancellationToken token); - /// - /// Disposes managed resources of this instance. - /// - /// is this instance gets disposed; if it get's finalized. - protected virtual void Dispose(bool disposing) - { - if (IsDisposed) - return; - IsDisposed = true; - } - private void LogFaultException(Exception ex) { Error = ex; diff --git a/src/CommonUtilities.SimplePipeline/src/Steps/SynchronizedStep.cs b/src/CommonUtilities.SimplePipeline/src/Steps/SynchronizedStep.cs index 7ac591f..20c6d2e 100644 --- a/src/CommonUtilities.SimplePipeline/src/Steps/SynchronizedStep.cs +++ b/src/CommonUtilities.SimplePipeline/src/Steps/SynchronizedStep.cs @@ -24,12 +24,6 @@ protected SynchronizedStep(IServiceProvider serviceProvider) : base(serviceProvi _handle = new ManualResetEvent(false); } - /// - ~SynchronizedStep() - { - Dispose(false); - } - /// /// Waits until the predefined runner has finished. /// @@ -55,15 +49,11 @@ public void Wait(TimeSpan timeout) /// protected abstract void SynchronizedInvoke(CancellationToken token); - /// - /// Disposes managed resources of this instance. - /// - /// is this instance gets disposed; if it get's finalized. - protected override void Dispose(bool disposing) + /// + protected override void DisposeManagedResources() { - if (disposing) - _handle.Dispose(); - base.Dispose(disposing); + base.DisposeManagedResources(); + _handle.Dispose(); } /// From 16b91dde9fc6b66288d6f308b0e2612926c25222 Mon Sep 17 00:00:00 2001 From: AnakinRaW Date: Sun, 31 Mar 2024 14:40:10 +0200 Subject: [PATCH 2/9] implement async step --- .../src/Steps/AsyncStep.cs | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 src/CommonUtilities.SimplePipeline/src/Steps/AsyncStep.cs diff --git a/src/CommonUtilities.SimplePipeline/src/Steps/AsyncStep.cs b/src/CommonUtilities.SimplePipeline/src/Steps/AsyncStep.cs new file mode 100644 index 0000000..9351c0c --- /dev/null +++ b/src/CommonUtilities.SimplePipeline/src/Steps/AsyncStep.cs @@ -0,0 +1,86 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace AnakinRaW.CommonUtilities.SimplePipeline.Steps; + +/// +/// +/// +public abstract class AsyncStep : DisposableObject, IStep +{ + private readonly TaskCompletionSource _taskCompletion = new(); + + /// + /// Gets the service provider of this step. + /// + protected IServiceProvider Services { get; } + + /// + /// Gets the logger of this step. + /// + protected ILogger? Logger { get; } + + + /// + public Exception? Error + { + get + { + if (_taskCompletion.Task.IsFaulted) + return _taskCompletion.Task.Exception?.InnerException; + + if (!_taskCompletion.Task.IsCompleted) + return null; + + if (_taskCompletion.Task.Result.IsFaulted) + return _taskCompletion.Task.Result.Exception?.InnerException; + + return null; + } + } + + /// + /// + /// + /// + protected AsyncStep(IServiceProvider serviceProvider) + { + Services = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); + Logger = serviceProvider.GetService()?.CreateLogger(GetType()); + } + + /// + /// + /// + /// + public TaskAwaiter GetAwaiter() + { + if (_taskCompletion.Task.IsCompleted) + return _taskCompletion.Task.Result.GetAwaiter(); + + return Task.Run(async () => + { + var task = await _taskCompletion.Task.ConfigureAwait(false); + await task.ConfigureAwait(false); + }).GetAwaiter(); + } + + /// + /// + /// + /// + /// + protected abstract Task RunAsync(CancellationToken token); + + /// + public void Run(CancellationToken token) + { + Logger?.LogTrace($"BEGIN on thread-pool: {this}"); + var task = RunAsync(token); + _taskCompletion.SetResult(task); + } +} \ No newline at end of file From 2d4399214ac72839c4028dac2c97eaded037827c Mon Sep 17 00:00:00 2001 From: AnakinRaW Date: Sun, 31 Mar 2024 14:40:16 +0200 Subject: [PATCH 3/9] rename method --- .../src/IRunner.cs | 8 ++++---- .../src/Pipelines/SimplePipeline.cs | 2 +- .../src/Runners/ParallelBlockingRunner.cs | 2 +- .../src/Runners/StepRunner.cs | 4 ++-- .../test/Runners/ParallelBlockingRunnerTest.cs | 18 +++++++++--------- .../test/Runners/ParallelRunnerTest.cs | 16 ++++++++-------- .../test/Runners/TaskRunnerTest.cs | 4 ++-- 7 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/CommonUtilities.SimplePipeline/src/IRunner.cs b/src/CommonUtilities.SimplePipeline/src/IRunner.cs index 83662ea..3272a54 100644 --- a/src/CommonUtilities.SimplePipeline/src/IRunner.cs +++ b/src/CommonUtilities.SimplePipeline/src/IRunner.cs @@ -10,7 +10,7 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline; public interface IRunner : IEnumerable { /// - /// Event which gets raised if the execution of an failed with an exception. + /// Gets raised when the execution of an failed with an exception. /// event EventHandler? Error; @@ -21,8 +21,8 @@ public interface IRunner : IEnumerable void Run(CancellationToken token); /// - /// Queues an for execution. + /// Adds an to the . /// - /// The step to queue. - void Queue(IStep activity); + /// The step to app. + void AddStep(IStep activity); } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs b/src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs index 8040ac1..ad6ab14 100644 --- a/src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs +++ b/src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs @@ -65,7 +65,7 @@ protected sealed override bool PrepareCore() _buildRunner = CreateRunner() ?? throw new InvalidOperationException("RunnerFactory created null value!"); var steps = BuildStepsOrdered(); foreach (var step in steps) - _buildRunner.Queue(step); + _buildRunner.AddStep(step); return true; } diff --git a/src/CommonUtilities.SimplePipeline/src/Runners/ParallelBlockingRunner.cs b/src/CommonUtilities.SimplePipeline/src/Runners/ParallelBlockingRunner.cs index 28525f6..efb3f62 100644 --- a/src/CommonUtilities.SimplePipeline/src/Runners/ParallelBlockingRunner.cs +++ b/src/CommonUtilities.SimplePipeline/src/Runners/ParallelBlockingRunner.cs @@ -104,7 +104,7 @@ public void FinishAndWait(bool throwsException = false) } /// - public void Queue(IStep step) + public void AddStep(IStep step) { if (step is null) throw new ArgumentNullException(nameof(step)); diff --git a/src/CommonUtilities.SimplePipeline/src/Runners/StepRunner.cs b/src/CommonUtilities.SimplePipeline/src/Runners/StepRunner.cs index 84aed08..faebe71 100644 --- a/src/CommonUtilities.SimplePipeline/src/Runners/StepRunner.cs +++ b/src/CommonUtilities.SimplePipeline/src/Runners/StepRunner.cs @@ -23,7 +23,7 @@ public class StepRunner : IRunner protected readonly List StepList; /// - /// Queue of all to be performed steps. + /// AddStep of all to be performed steps. /// protected ConcurrentQueue StepQueue { get; } @@ -60,7 +60,7 @@ public void Run(CancellationToken token) } /// - public void Queue(IStep activity) + public void AddStep(IStep activity) { if (activity == null) throw new ArgumentNullException(nameof(activity)); diff --git a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelBlockingRunnerTest.cs b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelBlockingRunnerTest.cs index 0669ef7..ec63fc2 100644 --- a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelBlockingRunnerTest.cs +++ b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelBlockingRunnerTest.cs @@ -18,8 +18,8 @@ public void Test_Run_WaitNotFinished() var s1 = new Mock(); var s2 = new Mock(); - runner.Queue(s1.Object); - runner.Queue(s2.Object); + runner.AddStep(s1.Object); + runner.AddStep(s2.Object); var ran1 = false; s1.Setup(t => t.Run(default)).Callback(() => @@ -49,8 +49,8 @@ public void Test_Wait_Finished() var s1 = new Mock(); var s2 = new Mock(); - runner.Queue(s1.Object); - runner.Queue(s2.Object); + runner.AddStep(s1.Object); + runner.AddStep(s2.Object); var ran1 = false; s1.Setup(t => t.Run(default)).Callback(() => @@ -87,8 +87,8 @@ public void Test_FinishAndWait() var s1 = new Mock(); var s2 = new Mock(); - runner.Queue(s1.Object); - runner.Queue(s2.Object); + runner.AddStep(s1.Object); + runner.AddStep(s2.Object); var ran1 = false; s1.Setup(t => t.Run(default)).Callback(() => @@ -120,8 +120,8 @@ public void Test_Run_AddDelayed() var s2 = new Mock(); var s3 = new Mock(); - runner.Queue(s1.Object); - runner.Queue(s2.Object); + runner.AddStep(s1.Object); + runner.AddStep(s2.Object); var ran1 = false; s1.Setup(t => t.Run(default)).Callback(() => @@ -145,7 +145,7 @@ public void Test_Run_AddDelayed() Task.Run(() => { - runner.Queue(s3.Object); + runner.AddStep(s3.Object); Task.Delay(1000); runner.Finish(); }); diff --git a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelRunnerTest.cs b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelRunnerTest.cs index 9c11f29..c5b914f 100644 --- a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelRunnerTest.cs +++ b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelRunnerTest.cs @@ -19,8 +19,8 @@ public void Test_Wait() var s1 = new Mock(); var s2 = new Mock(); - runner.Queue(s1.Object); - runner.Queue(s2.Object); + runner.AddStep(s1.Object); + runner.AddStep(s2.Object); var ran1 = false; s1.Setup(t => t.Run(default)).Callback(() => @@ -54,8 +54,8 @@ public void Test_Run_NoWait() var b = new ManualResetEvent(false); - runner.Queue(s1.Object); - runner.Queue(s2.Object); + runner.AddStep(s1.Object); + runner.AddStep(s2.Object); var ran1 = false; s1.Setup(t => t.Run(default)).Callback(() => @@ -87,7 +87,7 @@ public void Test_Wait_Timeout_ThrowsTimeoutException() var b = new ManualResetEvent(false); - runner.Queue(s1.Object); + runner.AddStep(s1.Object); s1.Setup(t => t.Run(default)).Callback(() => { @@ -118,7 +118,7 @@ public void Test_Run_WithError() ran = true; }).Throws(); - runner.Queue(step.Object); + runner.AddStep(step.Object); runner.Run(default); runner.Wait(Timeout.InfiniteTimeSpan); @@ -157,8 +157,8 @@ public void Test_Run_Cancelled() b.WaitOne(); }); - runner.Queue(t1.Object); - runner.Queue(t2.Object); + runner.AddStep(t1.Object); + runner.AddStep(t2.Object); runner.Run(cts.Token); runner.Wait(Timeout.InfiniteTimeSpan); diff --git a/src/CommonUtilities.SimplePipeline/test/Runners/TaskRunnerTest.cs b/src/CommonUtilities.SimplePipeline/test/Runners/TaskRunnerTest.cs index 4933941..492f8c8 100644 --- a/src/CommonUtilities.SimplePipeline/test/Runners/TaskRunnerTest.cs +++ b/src/CommonUtilities.SimplePipeline/test/Runners/TaskRunnerTest.cs @@ -43,7 +43,7 @@ public void Test_Run_Cancelled() ran = true; }); - runner.Queue(step.Object); + runner.AddStep(step.Object); runner.Run(cts.Token); Assert.True(hasError); @@ -71,7 +71,7 @@ public void Test_Run_WithError() ran = true; }).Throws(); - runner.Queue(step.Object); + runner.AddStep(step.Object); runner.Run(default); Assert.True(hasError); From c3758478f5d24ed270e30173ba01bcf174b76c28 Mon Sep 17 00:00:00 2001 From: AnakinRaW Date: Sun, 31 Mar 2024 14:52:18 +0200 Subject: [PATCH 4/9] rename method --- .../src/Steps/PipelineStep.cs | 6 +++--- .../src/Steps/SynchronizedStep.cs | 6 +++--- .../test/Steps/SynchronizedStepTest.cs | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/CommonUtilities.SimplePipeline/src/Steps/PipelineStep.cs b/src/CommonUtilities.SimplePipeline/src/Steps/PipelineStep.cs index 9048917..057a1d0 100644 --- a/src/CommonUtilities.SimplePipeline/src/Steps/PipelineStep.cs +++ b/src/CommonUtilities.SimplePipeline/src/Steps/PipelineStep.cs @@ -12,17 +12,17 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Steps; public abstract class PipelineStep : DisposableObject, IStep { /// - /// The service provider of this step. + /// Gets the service provider of this step. /// protected IServiceProvider Services { get; } /// - /// The logger of this step. + /// Gets the logger of this step. /// protected ILogger? Logger { get; } /// - /// The exception, if any, that occurred during execution. + /// Gets the exception that occurred during execution or if no error occurred. /// public Exception? Error { get; internal set; } diff --git a/src/CommonUtilities.SimplePipeline/src/Steps/SynchronizedStep.cs b/src/CommonUtilities.SimplePipeline/src/Steps/SynchronizedStep.cs index 20c6d2e..d9680df 100644 --- a/src/CommonUtilities.SimplePipeline/src/Steps/SynchronizedStep.cs +++ b/src/CommonUtilities.SimplePipeline/src/Steps/SynchronizedStep.cs @@ -4,7 +4,7 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Steps; /// -/// An awaitable step implementation. +/// A step that can be waited for. /// public abstract class SynchronizedStep : PipelineStep { @@ -47,7 +47,7 @@ public void Wait(TimeSpan timeout) /// Executes this step. /// /// - protected abstract void SynchronizedInvoke(CancellationToken token); + protected abstract void RunSynchronized(CancellationToken token); /// protected override void DisposeManagedResources() @@ -61,7 +61,7 @@ protected sealed override void RunCore(CancellationToken token) { try { - SynchronizedInvoke(token); + RunSynchronized(token); } catch (Exception ex) { diff --git a/src/CommonUtilities.SimplePipeline/test/Steps/SynchronizedStepTest.cs b/src/CommonUtilities.SimplePipeline/test/Steps/SynchronizedStepTest.cs index 7e26a1c..06cb6b4 100644 --- a/src/CommonUtilities.SimplePipeline/test/Steps/SynchronizedStepTest.cs +++ b/src/CommonUtilities.SimplePipeline/test/Steps/SynchronizedStepTest.cs @@ -33,7 +33,7 @@ public void Test_Run_ThrowsWait() }; - step.Protected().Setup("SynchronizedInvoke", false, (CancellationToken)default) + step.Protected().Setup("RunSynchronized", false, (CancellationToken)default) .Callback(() => throw new Exception()); Assert.Throws(() => step.Object.Run(default)); @@ -60,7 +60,7 @@ public void Test_Run_Cancelled_ThrowsOperationCanceledException() cts.Cancel(); - step.Protected().Setup("SynchronizedInvoke", false, cts.Token) + step.Protected().Setup("RunSynchronized", false, cts.Token) .Callback(t => t.ThrowIfCancellationRequested()); Assert.Throws(() => step.Object.Run(cts.Token)); @@ -87,7 +87,7 @@ public TestSync(IServiceProvider serviceProvider) : base(serviceProvider) { } - protected override void SynchronizedInvoke(CancellationToken token) + protected override void RunSynchronized(CancellationToken token) { Task.Delay(500, token); Flag = true; @@ -103,7 +103,7 @@ public void Test_Wait_WithTimeout() CallBase = true }; - step.Protected().Setup("SynchronizedInvoke", false, (CancellationToken)default) + step.Protected().Setup("RunSynchronized", false, (CancellationToken)default) .Callback(() => { Task.Delay(1000).Wait(); From c4a422275d72600da69e7878d6705b53e113e130 Mon Sep 17 00:00:00 2001 From: AnakinRaW Date: Thu, 18 Apr 2024 10:42:10 +0200 Subject: [PATCH 5/9] merge --- ...ommonUtilities.DownloadManager.Test.csproj | 8 +- .../CommonUtilities.FileSystem.Test.csproj | 8 +- .../test/CommonUtilities.Registry.Test.csproj | 4 +- .../src/IParallelRunner.cs | 22 -- .../src/IRunner.cs | 17 +- .../src/Pipelines/IPipeline.cs | 14 +- .../src/Pipelines/ParallelPipeline.cs | 14 -- .../src/Pipelines/Pipeline.cs | 17 +- .../src/Pipelines/SimplePipeline.cs | 38 ++-- .../src/Runners/ParallelBlockingRunner.cs | 174 ---------------- .../src/Runners/ParallelRunner.cs | 10 +- .../src/Runners/StepRunner.cs | 33 +-- .../src/Steps/WaitStep.cs | 6 +- ...CommonUtilities.SimplePipeline.Test.csproj | 4 +- .../test/ParallelPipelineTests.cs | 51 ++++- .../test/PipelineTest.cs | 51 ++--- .../Runners/ParallelBlockingRunnerTest.cs | 159 -------------- .../test/Runners/ParallelRunnerTest.cs | 28 ++- .../test/Runners/TaskRunnerTest.cs | 15 +- .../test/SequentialPipelineTests.cs | 50 ++++- .../test/SimplePipelineTests.cs | 196 ++++++++++++++++-- .../test/Steps/WaitStepTest.cs | 3 +- .../CommonUtilities.TestingUtilities.csproj | 4 +- src/CommonUtilities/src/AwaitExtensions.cs | 10 +- .../test/AwaitExtensionsTests.cs | 19 +- .../test/CommonUtilities.Test.csproj | 8 +- 26 files changed, 419 insertions(+), 544 deletions(-) delete mode 100644 src/CommonUtilities.SimplePipeline/src/IParallelRunner.cs delete mode 100644 src/CommonUtilities.SimplePipeline/src/Runners/ParallelBlockingRunner.cs delete mode 100644 src/CommonUtilities.SimplePipeline/test/Runners/ParallelBlockingRunnerTest.cs diff --git a/src/CommonUtilities.DownloadManager/test/CommonUtilities.DownloadManager.Test.csproj b/src/CommonUtilities.DownloadManager/test/CommonUtilities.DownloadManager.Test.csproj index db16efb..9ce1493 100644 --- a/src/CommonUtilities.DownloadManager/test/CommonUtilities.DownloadManager.Test.csproj +++ b/src/CommonUtilities.DownloadManager/test/CommonUtilities.DownloadManager.Test.csproj @@ -21,10 +21,10 @@ - - - - + + + + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/CommonUtilities.FileSystem/test/CommonUtilities.FileSystem.Test.csproj b/src/CommonUtilities.FileSystem/test/CommonUtilities.FileSystem.Test.csproj index 9f51146..cce7be3 100644 --- a/src/CommonUtilities.FileSystem/test/CommonUtilities.FileSystem.Test.csproj +++ b/src/CommonUtilities.FileSystem/test/CommonUtilities.FileSystem.Test.csproj @@ -27,10 +27,10 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - - - + + + + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/CommonUtilities.Registry/test/CommonUtilities.Registry.Test.csproj b/src/CommonUtilities.Registry/test/CommonUtilities.Registry.Test.csproj index 6c1ca89..87592ac 100644 --- a/src/CommonUtilities.Registry/test/CommonUtilities.Registry.Test.csproj +++ b/src/CommonUtilities.Registry/test/CommonUtilities.Registry.Test.csproj @@ -18,8 +18,8 @@ runtime; build; native; contentfiles; analyzers; buildtransitive - - + + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/CommonUtilities.SimplePipeline/src/IParallelRunner.cs b/src/CommonUtilities.SimplePipeline/src/IParallelRunner.cs deleted file mode 100644 index 3542ab0..0000000 --- a/src/CommonUtilities.SimplePipeline/src/IParallelRunner.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; - -namespace AnakinRaW.CommonUtilities.SimplePipeline; - -/// -/// Specialized which allows -/// -public interface IParallelRunner : IRunner -{ - /// - /// Synchronously waits for this runner for all of its steps to be finished. - /// - /// If any of the steps failed with an exception. - void Wait(); - - /// - /// Synchronously waits for this runner for all of its steps to be finished. - /// - /// The time duration to wait. - /// If expired. - void Wait(TimeSpan waitDuration); -} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/IRunner.cs b/src/CommonUtilities.SimplePipeline/src/IRunner.cs index 3272a54..11d440c 100644 --- a/src/CommonUtilities.SimplePipeline/src/IRunner.cs +++ b/src/CommonUtilities.SimplePipeline/src/IRunner.cs @@ -1,13 +1,13 @@ using System; -using System.Collections.Generic; using System.Threading; +using System.Threading.Tasks; namespace AnakinRaW.CommonUtilities.SimplePipeline; /// /// Execution engine to run one or many s. /// -public interface IRunner : IEnumerable +public interface IRunner : IStepQueue, IDisposable { /// /// Gets raised when the execution of an failed with an exception. @@ -15,14 +15,9 @@ public interface IRunner : IEnumerable event EventHandler? Error; /// - /// Runs all queued steps + /// Runs all queued steps. /// - /// Provided to allow cancellation. - void Run(CancellationToken token); - - /// - /// Adds an to the . - /// - /// The step to app. - void AddStep(IStep activity); + /// The cancellation token, allowing the runner to cancel the operation. + /// A task that represents the completion of the operation. + Task RunAsync(CancellationToken token); } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Pipelines/IPipeline.cs b/src/CommonUtilities.SimplePipeline/src/Pipelines/IPipeline.cs index facac4b..0bfeb02 100644 --- a/src/CommonUtilities.SimplePipeline/src/Pipelines/IPipeline.cs +++ b/src/CommonUtilities.SimplePipeline/src/Pipelines/IPipeline.cs @@ -1,5 +1,6 @@ using System; using System.Threading; +using System.Threading.Tasks; namespace AnakinRaW.CommonUtilities.SimplePipeline; @@ -7,21 +8,22 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline; /// A pipeline can run multiple operations in sequence or simultaneously, based on how it was prepared. /// public interface IPipeline : IDisposable -{ +{ /// /// Prepares this instance for execution. /// /// /// Preparation can only be done once per instance. /// - /// if the preparation was successful; otherwise. - bool Prepare(); - + /// A task that represents whether the preparation was successful. + Task PrepareAsync(); + /// - /// Runs pipeline. + /// Runs pipeline synchronously. /// /// Provided to allow cancellation. + /// A task that represents the operation completion. /// If was requested for cancellation. /// The pipeline may throw this exception if one or many steps failed. - void Run(CancellationToken token = default); + Task RunAsync(CancellationToken token = default); } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Pipelines/ParallelPipeline.cs b/src/CommonUtilities.SimplePipeline/src/Pipelines/ParallelPipeline.cs index 23d281d..4666ce0 100644 --- a/src/CommonUtilities.SimplePipeline/src/Pipelines/ParallelPipeline.cs +++ b/src/CommonUtilities.SimplePipeline/src/Pipelines/ParallelPipeline.cs @@ -5,7 +5,6 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline; /// /// A simple pipeline that runs all steps on the thread pool in parallel. -/// The will block until the pipeline has finished all steps. /// public abstract class ParallelPipeline : SimplePipeline { @@ -27,17 +26,4 @@ protected sealed override ParallelRunner CreateRunner() { return new ParallelRunner(_workerCount, ServiceProvider); } - - /// - protected sealed override void OnRunning(ParallelRunner buildRunner) - { - try - { - buildRunner.Wait(); - } - catch - { - // Ignore - } - } } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Pipelines/Pipeline.cs b/src/CommonUtilities.SimplePipeline/src/Pipelines/Pipeline.cs index 263b9f4..e90afd9 100644 --- a/src/CommonUtilities.SimplePipeline/src/Pipelines/Pipeline.cs +++ b/src/CommonUtilities.SimplePipeline/src/Pipelines/Pipeline.cs @@ -1,5 +1,6 @@ using System; using System.Threading; +using System.Threading.Tasks; namespace AnakinRaW.CommonUtilities.SimplePipeline; @@ -9,39 +10,39 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline; public abstract class Pipeline : DisposableObject, IPipeline { private bool? _prepareSuccessful; - + /// - public bool Prepare() + public async Task PrepareAsync() { if (IsDisposed) throw new ObjectDisposedException("Pipeline already disposed"); if (_prepareSuccessful.HasValue) return _prepareSuccessful.Value; - _prepareSuccessful = PrepareCore(); + _prepareSuccessful = await PrepareCoreAsync().ConfigureAwait(false); return _prepareSuccessful.Value; } /// - public void Run(CancellationToken token = default) + public async Task RunAsync(CancellationToken token = default) { if (IsDisposed) throw new ObjectDisposedException("Pipeline already disposed"); token.ThrowIfCancellationRequested(); - if (!Prepare()) + if (!await PrepareAsync().ConfigureAwait(false)) return; - RunCore(token); + await RunCoreAsync(token).ConfigureAwait(false); } /// /// Performs the actual preparation of this instance. /// /// if the planning was successful; otherwise. - protected abstract bool PrepareCore(); + protected abstract Task PrepareCoreAsync(); /// /// Implements the run logic of this instance. /// /// It's assured this instance is already prepared when this method gets called. /// Provided to allow cancellation. - protected abstract void RunCore(CancellationToken token); + protected abstract Task RunCoreAsync(CancellationToken token); } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs b/src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs index ad6ab14..500df64 100644 --- a/src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs +++ b/src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs @@ -2,15 +2,15 @@ using System.Collections.Generic; using System.Linq; using System.Threading; -using AnakinRaW.CommonUtilities.SimplePipeline.Runners; +using System.Threading.Tasks; namespace AnakinRaW.CommonUtilities.SimplePipeline; /// -/// Base class for a simple pipeline implementation utilizing one runner and then waits for the execution to finish. +/// Base class for a simple pipeline implementation utilizing one runner. /// /// The type of the step runner. -public abstract class SimplePipeline : Pipeline where TRunner : StepRunner +public abstract class SimplePipeline : Pipeline where TRunner : IRunner { /// /// The service provider within the pipeline. @@ -20,7 +20,7 @@ public abstract class SimplePipeline : Pipeline where TRunner : StepRun private readonly bool _failFast; private CancellationTokenSource? _linkedCancellationTokenSource; - private TRunner _buildRunner = null!; + private IRunner _buildRunner = null!; /// /// Gets or sets a value indicating whether the pipeline has encountered a failure. @@ -54,32 +54,33 @@ public override string ToString() protected abstract TRunner CreateRunner(); /// - /// Builds the steps in the order they should be executed within the pipeline. + /// Builds the steps that should be executed within the pipeline. /// - /// A list of steps in the order they should be executed. - protected abstract IList BuildStepsOrdered(); + /// + /// The order of the steps might be relevant, depending on the type of . + /// + /// A task that returns a list of steps. + protected abstract Task> BuildSteps(); /// - protected sealed override bool PrepareCore() + protected override async Task PrepareCoreAsync() { _buildRunner = CreateRunner() ?? throw new InvalidOperationException("RunnerFactory created null value!"); - var steps = BuildStepsOrdered(); + var steps = await BuildSteps().ConfigureAwait(false); foreach (var step in steps) _buildRunner.AddStep(step); return true; } /// - protected sealed override void RunCore(CancellationToken token) + protected override async Task RunCoreAsync(CancellationToken token) { try { _linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); _buildRunner.Error += OnError; - _buildRunner.Run(_linkedCancellationTokenSource.Token); - - OnRunning(_buildRunner); + await _buildRunner.RunAsync(_linkedCancellationTokenSource.Token).ConfigureAwait(false); } finally { @@ -94,21 +95,14 @@ protected sealed override void RunCore(CancellationToken token) if (!PipelineFailed) return; - var failedBuildSteps = _buildRunner.Steps.Where(p => p.Error != null && !p.Error.IsExceptionType()) + var failedBuildSteps = _buildRunner.Steps + .Where(p => p.Error != null && !p.Error.IsExceptionType()) .ToList(); if (failedBuildSteps.Any()) throw new StepFailureException(failedBuildSteps); } - /// - /// Called after the step runner was started. - /// - /// The step runner. - protected virtual void OnRunning(TRunner buildRunner) - { - } - /// /// Called when an error occurs within a step. /// diff --git a/src/CommonUtilities.SimplePipeline/src/Runners/ParallelBlockingRunner.cs b/src/CommonUtilities.SimplePipeline/src/Runners/ParallelBlockingRunner.cs deleted file mode 100644 index efb3f62..0000000 --- a/src/CommonUtilities.SimplePipeline/src/Runners/ParallelBlockingRunner.cs +++ /dev/null @@ -1,174 +0,0 @@ -using System; -using System.Collections; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; - -namespace AnakinRaW.CommonUtilities.SimplePipeline.Runners; - -/// -/// Runner engine, which executes all queued _steps parallel. Steps may be queued while step execution has been started. -/// The execution is finished only if or was called explicitly. -/// -public sealed class ParallelBlockingRunner : IParallelRunner -{ - /// - public event EventHandler? Error; - - private readonly ConcurrentBag _exceptions; - private readonly ConcurrentBag _steps; - private readonly ILogger? _logger; - private readonly int _workerCount; - private readonly Task[] _runnerTasks; - private CancellationToken _cancel; - - private BlockingCollection StepQueue { get; } - - /// - /// Aggregates all step exceptions, if any happened. - /// - public AggregateException? Exception => _exceptions.Count > 0 ? new AggregateException(_exceptions) : null; - - internal bool IsCancelled { get; private set; } - - /// - /// Initializes a new instance. - /// - /// The number of parallel workers. - /// The service provider. - /// If the number of workers is below 1. - public ParallelBlockingRunner(int workerCount, IServiceProvider serviceProvider) - { - if (workerCount is < 1 or >= 64) - throw new ArgumentException("invalid parallel worker count"); - _workerCount = workerCount; - _runnerTasks = new Task[_workerCount]; - _steps = new ConcurrentBag(); - StepQueue = new BlockingCollection(); - _logger = serviceProvider.GetService()?.CreateLogger(GetType()); - _exceptions = new ConcurrentBag(); - } - - /// - public void Run(CancellationToken token) - { - token.ThrowIfCancellationRequested(); - _cancel = token; - for (var index = 0; index < _workerCount; ++index) - _runnerTasks[index] = Task.Factory.StartNew(RunThreaded, CancellationToken.None); - } - - /// - public void Wait() - { - Wait(Timeout.InfiniteTimeSpan); - var exception = Exception; - if (exception != null) - throw exception; - } - - /// - public void Wait(TimeSpan timeout) - { - if (!Task.WaitAll(_runnerTasks, timeout)) - throw new TimeoutException(); - } - - /// - /// Signals, this instance does not expect any more steps. - /// - public void Finish() - { - StepQueue.CompleteAdding(); - } - - /// - /// Signals, this instance does not expect any more steps and waits for finished execution. - /// - /// - public void FinishAndWait(bool throwsException = false) - { - Finish(); - try - { - Wait(); - } - catch - { - if (throwsException) - throw; - } - } - - /// - public void AddStep(IStep step) - { - if (step is null) - throw new ArgumentNullException(nameof(step)); - StepQueue.Add(step, CancellationToken.None); - } - - /// - public IEnumerator GetEnumerator() - { - return _steps.GetEnumerator(); - } - - private void RunThreaded() - { - var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_cancel); - var canceled = false; - foreach (var step in StepQueue.GetConsumingEnumerable()) - { - try - { - _cancel.ThrowIfCancellationRequested(); - _steps.Add(step); - step.Run(_cancel); - } - catch (StopRunnerException) - { - _logger?.LogTrace("Stop subsequent steps"); - StepQueue.CompleteAdding(); - break; - } - catch (Exception ex) - { - _exceptions.Add(ex); - if (!canceled) - { - if (ex.IsExceptionType()) - _logger?.LogTrace($"Activity threw exception {ex.GetType()}: {ex.Message}" + Environment.NewLine + $"{ex.StackTrace}"); - else - _logger?.LogTrace(ex, $"Activity threw exception {ex.GetType()}: {ex.Message}"); - } - var e = new StepErrorEventArgs(step) - { - Cancel = _cancel.IsCancellationRequested || IsCancelled || ex.IsExceptionType() - }; - OnError(e); - if (e.Cancel) - { - canceled = true; - linkedTokenSource.Cancel(); - } - } - } - } - - IEnumerator IEnumerable.GetEnumerator() - { - return GetEnumerator(); - } - - private void OnError(StepErrorEventArgs e) - { - Error?.Invoke(this, e); - if (!e.Cancel) - return; - IsCancelled |= e.Cancel; - } -} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Runners/ParallelRunner.cs b/src/CommonUtilities.SimplePipeline/src/Runners/ParallelRunner.cs index 80afd39..026ec24 100644 --- a/src/CommonUtilities.SimplePipeline/src/Runners/ParallelRunner.cs +++ b/src/CommonUtilities.SimplePipeline/src/Runners/ParallelRunner.cs @@ -9,7 +9,7 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Runners; /// /// Runner engine, which executes all queued steps parallel. /// -public class ParallelRunner: StepRunner, IParallelRunner +public class ParallelRunner: StepRunner, ISynchronizedRunner { private readonly ConcurrentBag _exceptions; private readonly Task[] _tasks; @@ -40,6 +40,13 @@ public ParallelRunner(int workerCount, IServiceProvider serviceProvider) : base( _tasks = new Task[workerCount]; } + /// + public override Task RunAsync(CancellationToken token) + { + Invoke(token); + return Task.WhenAll(_tasks); + } + /// public void Wait() { @@ -75,6 +82,7 @@ private void InvokeThreaded() var canceled = false; while (StepQueue.TryDequeue(out var step)) { + ThrowIfDisposed(); try { ThrowIfCancelled(_cancel); diff --git a/src/CommonUtilities.SimplePipeline/src/Runners/StepRunner.cs b/src/CommonUtilities.SimplePipeline/src/Runners/StepRunner.cs index faebe71..e55ac33 100644 --- a/src/CommonUtilities.SimplePipeline/src/Runners/StepRunner.cs +++ b/src/CommonUtilities.SimplePipeline/src/Runners/StepRunner.cs @@ -1,9 +1,9 @@ using System; -using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -12,7 +12,7 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Runners; /// /// Runner engine, which executes all queued sequentially in the order they are queued. /// -public class StepRunner : IRunner +public class StepRunner : DisposableObject, IRunner { /// public event EventHandler? Error; @@ -35,7 +35,7 @@ public class StepRunner : IRunner /// /// List of all steps scheduled for execution. /// - /// Steps queued *after* was called, are not included. + /// Steps queued *after* was called, are not included. public IReadOnlyList Steps => new ReadOnlyCollection(StepList); internal bool IsCancelled { get; private set; } @@ -54,9 +54,12 @@ public StepRunner(IServiceProvider services) } /// - public void Run(CancellationToken token) + public virtual Task RunAsync(CancellationToken token) { - Invoke(token); + return Task.Run(() => + { + Invoke(token); + }, default); } /// @@ -67,17 +70,6 @@ public void AddStep(IStep activity) StepQueue.Enqueue(activity); } - /// - public IEnumerator GetEnumerator() - { - return StepList.GetEnumerator(); - } - - IEnumerator IEnumerable.GetEnumerator() - { - return StepList.GetEnumerator(); - } - /// /// Sequentially runs all queued steps. Faulted steps will raise the event. /// @@ -88,6 +80,7 @@ protected virtual void Invoke(CancellationToken token) StepList.AddRange(StepQueue); while (StepQueue.TryDequeue(out var step)) { + ThrowIfDisposed(); try { ThrowIfCancelled(token); @@ -143,4 +136,12 @@ protected void ThrowIfCancelled(CancellationToken token) if (IsCancelled) throw new OperationCanceledException(token); } + + /// + protected override void DisposeManagedResources() + { + base.DisposeManagedResources(); + foreach (var step in Steps) + step.Dispose(); + } } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Steps/WaitStep.cs b/src/CommonUtilities.SimplePipeline/src/Steps/WaitStep.cs index 3e31b7e..b696413 100644 --- a/src/CommonUtilities.SimplePipeline/src/Steps/WaitStep.cs +++ b/src/CommonUtilities.SimplePipeline/src/Steps/WaitStep.cs @@ -5,18 +5,18 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Steps; /// -/// A step that waits for a given to finish. +/// A step that waits for a given to finish. /// public sealed class WaitStep : PipelineStep { - private readonly IParallelRunner _runner; + private readonly ISynchronizedRunner _runner; /// /// Initializes a new . /// /// The awaitable step runner /// The service provider. - public WaitStep(IParallelRunner runner, IServiceProvider serviceProvider) : base(serviceProvider) + public WaitStep(ISynchronizedRunner runner, IServiceProvider serviceProvider) : base(serviceProvider) { _runner = runner ?? throw new ArgumentNullException(nameof(runner)); } diff --git a/src/CommonUtilities.SimplePipeline/test/CommonUtilities.SimplePipeline.Test.csproj b/src/CommonUtilities.SimplePipeline/test/CommonUtilities.SimplePipeline.Test.csproj index 9d64d16..14a63c3 100644 --- a/src/CommonUtilities.SimplePipeline/test/CommonUtilities.SimplePipeline.Test.csproj +++ b/src/CommonUtilities.SimplePipeline/test/CommonUtilities.SimplePipeline.Test.csproj @@ -21,8 +21,8 @@ - - + + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/CommonUtilities.SimplePipeline/test/ParallelPipelineTests.cs b/src/CommonUtilities.SimplePipeline/test/ParallelPipelineTests.cs index 6e58de5..93bcff8 100644 --- a/src/CommonUtilities.SimplePipeline/test/ParallelPipelineTests.cs +++ b/src/CommonUtilities.SimplePipeline/test/ParallelPipelineTests.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; @@ -11,7 +12,7 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Test; public class ParallelPipelineTests { [Fact] - public void Test_Run_Waits() + public async Task Test_Run() { var sc = new ServiceCollection(); @@ -38,15 +39,55 @@ public void Test_Run_Waits() CallBase = true }; - pipelineMock.Protected().Setup>("BuildStepsOrdered").Returns(new List + pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List { s1.Object, s2.Object - }); + })); var pipeline = pipelineMock.Object; - pipeline.Run(); + await pipeline.RunAsync(); Assert.Equal(2, j); } + + [Fact] + public async Task Test_Run_WithError() + { + var sc = new ServiceCollection(); + + var sp = sc.BuildServiceProvider(); + + var j = 0; + + var s1 = new Mock(); + s1.Setup(i => i.Run(It.IsAny())).Callback(() => + { + Task.Delay(1000); + Interlocked.Increment(ref j); + }); + + var s2 = new Mock(); + s2.SetupGet(s => s.Error).Returns(new Exception()); + s2.Setup(i => i.Run(It.IsAny())).Callback(() => + { + Task.Delay(1000); + }).Throws(); + + var pipelineMock = new Mock(sp, 2, true) + { + CallBase = true + }; + + pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List + { + s1.Object, + s2.Object + })); + + var pipeline = pipelineMock.Object; + + await Assert.ThrowsAsync(async () => await pipeline.RunAsync()); + Assert.Equal(1, j); + } } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/test/PipelineTest.cs b/src/CommonUtilities.SimplePipeline/test/PipelineTest.cs index 4087a0d..a695349 100644 --- a/src/CommonUtilities.SimplePipeline/test/PipelineTest.cs +++ b/src/CommonUtilities.SimplePipeline/test/PipelineTest.cs @@ -1,5 +1,6 @@ using System; using System.Threading; +using System.Threading.Tasks; using Moq; using Moq.Protected; using Xunit; @@ -9,71 +10,71 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Test; public class PipelineTest { [Fact] - public void Test_Prepare() + public async Task Test_Prepare() { var pipeline = new Mock { CallBase = true }; - pipeline.Object.Prepare(); - pipeline.Object.Prepare(); + await pipeline.Object.PrepareAsync(); + await pipeline.Object.PrepareAsync(); - pipeline.Protected().Verify("PrepareCore", Times.Exactly(1)); + pipeline.Protected().Verify>("PrepareCoreAsync", Times.Exactly(1)); } [Fact] - public void Test_Run() + public async Task Test_Run() { var pipeline = new Mock { CallBase = true }; - pipeline.Protected().Setup("PrepareCore").Returns(true); + pipeline.Protected().Setup>("PrepareCoreAsync").Returns(Task.FromResult(true)); - pipeline.Object.Run(); - pipeline.Object.Run(); + await pipeline.Object.RunAsync(); + await pipeline.Object.RunAsync(); - pipeline.Protected().Verify("PrepareCore", Times.Exactly(1)); - pipeline.Protected().Verify("RunCore", Times.Exactly(2), false, (CancellationToken) default); + pipeline.Protected().Verify>("PrepareCoreAsync", Times.Exactly(1)); + pipeline.Protected().Verify("RunCoreAsync", Times.Exactly(2), false, (CancellationToken) default); } [Fact] - public void Test_Prepare_Run() + public async Task Test_Prepare_Run() { var pipeline = new Mock { CallBase = true }; - pipeline.Protected().Setup("PrepareCore").Returns(true); + pipeline.Protected().Setup>("PrepareCoreAsync").Returns(Task.FromResult(true)); - pipeline.Object.Prepare(); - pipeline.Object.Run(); - pipeline.Object.Run(); + await pipeline.Object.PrepareAsync(); + await pipeline.Object.RunAsync(); + await pipeline.Object.RunAsync(); - pipeline.Protected().Verify("PrepareCore", Times.Exactly(1)); - pipeline.Protected().Verify("RunCore", Times.Exactly(2), false, (CancellationToken)default); + pipeline.Protected().Verify>("PrepareCoreAsync", Times.Exactly(1)); + pipeline.Protected().Verify("RunCoreAsync", Times.Exactly(2), false, (CancellationToken)default); } [Fact] - public void Test_Run_Cancelled_ThrowsOperationCanceledException() + public async Task Test_Run_Cancelled_ThrowsOperationCanceledException() { var pipeline = new Mock { CallBase = true }; - pipeline.Protected().Setup("PrepareCore").Returns(true); + pipeline.Protected().Setup>("PrepareCoreAsync").Returns(Task.FromResult(true)); var cts = new CancellationTokenSource(); cts.Cancel(); - Assert.Throws(() => pipeline.Object.Run(cts.Token)); + await Assert.ThrowsAsync(async () => await pipeline.Object.RunAsync(cts.Token)); } [Fact] - public void Test_Prepare_Disposed_ThrowsObjectDisposedException() + public async Task Test_Prepare_Disposed_ThrowsObjectDisposedException() { var pipeline = new Mock { @@ -83,20 +84,20 @@ public void Test_Prepare_Disposed_ThrowsObjectDisposedException() pipeline.Object.Dispose(); pipeline.Object.Dispose(); - Assert.Throws(() => pipeline.Object.Prepare()); + await Assert.ThrowsAsync(async() => await pipeline.Object.PrepareAsync()); } [Fact] - public void Test_Run_Disposed_ThrowsObjectDisposedException() + public async Task Test_Run_Disposed_ThrowsObjectDisposedException() { var pipeline = new Mock { CallBase = true }; - pipeline.Object.Prepare(); + await pipeline.Object.PrepareAsync(); pipeline.Object.Dispose(); - Assert.Throws(() => pipeline.Object.Run()); + await Assert.ThrowsAsync(async () => await pipeline.Object.RunAsync()); } } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelBlockingRunnerTest.cs b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelBlockingRunnerTest.cs deleted file mode 100644 index ec63fc2..0000000 --- a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelBlockingRunnerTest.cs +++ /dev/null @@ -1,159 +0,0 @@ -using System; -using System.Threading.Tasks; -using AnakinRaW.CommonUtilities.SimplePipeline.Runners; -using Microsoft.Extensions.DependencyInjection; -using Moq; -using Xunit; - -namespace AnakinRaW.CommonUtilities.SimplePipeline.Test.Runners; - -public class ParallelBlockingRunnerTest -{ - [Fact] - public void Test_Run_WaitNotFinished() - { - var sc = new ServiceCollection(); - var runner = new ParallelBlockingRunner(2, sc.BuildServiceProvider()); - - var s1 = new Mock(); - var s2 = new Mock(); - - runner.AddStep(s1.Object); - runner.AddStep(s2.Object); - - var ran1 = false; - s1.Setup(t => t.Run(default)).Callback(() => - { - ran1 = true; - }); - var ran2 = false; - s2.Setup(t => t.Run(default)).Callback(() => - { - ran2 = true; - }); - - - runner.Run(default); - Assert.Throws(() => runner.Wait(TimeSpan.FromSeconds(2))); - - Assert.True(ran1); - Assert.True(ran2); - } - - [Fact] - public void Test_Wait_Finished() - { - var sc = new ServiceCollection(); - var runner = new ParallelBlockingRunner(2, sc.BuildServiceProvider()); - - var s1 = new Mock(); - var s2 = new Mock(); - - runner.AddStep(s1.Object); - runner.AddStep(s2.Object); - - var ran1 = false; - s1.Setup(t => t.Run(default)).Callback(() => - { - ran1 = true; - }); - var ran2 = false; - s2.Setup(t => t.Run(default)).Callback(() => - { - ran2 = true; - }); - - - runner.Run(default); - - Task.Run(() => - { - Task.Delay(1000); - runner.Finish(); - }); - - runner.Wait(); - - Assert.True(ran1); - Assert.True(ran2); - } - - [Fact] - public void Test_FinishAndWait() - { - var sc = new ServiceCollection(); - var runner = new ParallelBlockingRunner(2, sc.BuildServiceProvider()); - - var s1 = new Mock(); - var s2 = new Mock(); - - runner.AddStep(s1.Object); - runner.AddStep(s2.Object); - - var ran1 = false; - s1.Setup(t => t.Run(default)).Callback(() => - { - ran1 = true; - }); - var ran2 = false; - s2.Setup(t => t.Run(default)).Callback(() => - { - ran2 = true; - }); - - - runner.Run(default); - - runner.FinishAndWait(); - - Assert.True(ran1); - Assert.True(ran2); - } - - [Fact] - public void Test_Run_AddDelayed() - { - var sc = new ServiceCollection(); - var runner = new ParallelBlockingRunner(2, sc.BuildServiceProvider()); - - var s1 = new Mock(); - var s2 = new Mock(); - var s3 = new Mock(); - - runner.AddStep(s1.Object); - runner.AddStep(s2.Object); - - var ran1 = false; - s1.Setup(t => t.Run(default)).Callback(() => - { - ran1 = true; - }); - var ran2 = false; - s2.Setup(t => t.Run(default)).Callback(() => - { - ran2 = true; - }); - - var ran3 = false; - s3.Setup(t => t.Run(default)).Callback(() => - { - ran3 = true; - }); - - - runner.Run(default); - - Task.Run(() => - { - runner.AddStep(s3.Object); - Task.Delay(1000); - runner.Finish(); - }); - - runner.Wait(); - - Assert.True(ran1); - Assert.True(ran2); - Assert.True(ran3); - } -} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelRunnerTest.cs b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelRunnerTest.cs index c5b914f..0242555 100644 --- a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelRunnerTest.cs +++ b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelRunnerTest.cs @@ -36,7 +36,7 @@ public void Test_Wait() }); - runner.Run(default); + _ = runner.RunAsync(default); runner.Wait(); Assert.True(ran1); @@ -44,7 +44,7 @@ public void Test_Wait() } [Fact] - public void Test_Run_NoWait() + public async Task Test_Run_NoWait() { var sc = new ServiceCollection(); var runner = new ParallelRunner(2, sc.BuildServiceProvider()); @@ -71,14 +71,19 @@ public void Test_Run_NoWait() }); - runner.Run(default); + var runTask = runner.RunAsync(default); Assert.False(ran1); Assert.False(ran2); b.Set(); + + await runTask; + + Assert.True(ran1); + Assert.True(ran2); } [Fact] - public void Test_Wait_Timeout_ThrowsTimeoutException() + public async Task Test_Wait_Timeout_ThrowsTimeoutException() { var sc = new ServiceCollection(); var runner = new ParallelRunner(2, sc.BuildServiceProvider()); @@ -94,13 +99,16 @@ public void Test_Wait_Timeout_ThrowsTimeoutException() b.WaitOne(); }); - runner.Run(default); + var runnerTask = runner.RunAsync(default); + Assert.Throws(() => runner.Wait(TimeSpan.FromMilliseconds(100))); b.Set(); + + await runnerTask; } [Fact] - public void Test_Run_WithError() + public async Task Test_Run_WithError() { var sc = new ServiceCollection(); var runner = new ParallelRunner(2, sc.BuildServiceProvider()); @@ -119,8 +127,7 @@ public void Test_Run_WithError() }).Throws(); runner.AddStep(step.Object); - runner.Run(default); - runner.Wait(Timeout.InfiniteTimeSpan); + await runner.RunAsync(default); Assert.True(hasError); Assert.True(ran); @@ -128,7 +135,7 @@ public void Test_Run_WithError() } [Fact] - public void Test_Run_Cancelled() + public async Task Test_Run_Cancelled() { var sc = new ServiceCollection(); var runner = new ParallelRunner(1, sc.BuildServiceProvider()); @@ -159,8 +166,7 @@ public void Test_Run_Cancelled() runner.AddStep(t1.Object); runner.AddStep(t2.Object); - runner.Run(cts.Token); - runner.Wait(Timeout.InfiniteTimeSpan); + await runner.RunAsync(cts.Token); Assert.True(hasError); Assert.True(ran); diff --git a/src/CommonUtilities.SimplePipeline/test/Runners/TaskRunnerTest.cs b/src/CommonUtilities.SimplePipeline/test/Runners/TaskRunnerTest.cs index 492f8c8..99f8628 100644 --- a/src/CommonUtilities.SimplePipeline/test/Runners/TaskRunnerTest.cs +++ b/src/CommonUtilities.SimplePipeline/test/Runners/TaskRunnerTest.cs @@ -1,5 +1,6 @@ using System; using System.Threading; +using System.Threading.Tasks; using AnakinRaW.CommonUtilities.SimplePipeline.Runners; using Microsoft.Extensions.DependencyInjection; using Moq; @@ -10,19 +11,18 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Test.Runners; public class TaskRunnerTest { [Fact] - public void Test_Run_Empty() + public async Task Test_Run_Empty() { var sc = new ServiceCollection(); var runner = new StepRunner(sc.BuildServiceProvider()); - runner.Run(default); + await runner.RunAsync(default); Assert.Empty(runner.Steps); - Assert.Empty(runner); } [Fact] - public void Test_Run_Cancelled() + public async Task Test_Run_Cancelled() { var sc = new ServiceCollection(); var runner = new StepRunner(sc.BuildServiceProvider()); @@ -44,16 +44,15 @@ public void Test_Run_Cancelled() }); runner.AddStep(step.Object); - runner.Run(cts.Token); + await runner.RunAsync(cts.Token); Assert.True(hasError); Assert.False(ran); Assert.Single(runner.Steps); - Assert.Single(runner); } [Fact] - public void Test_Run_WithError() + public async Task Test_Run_WithError() { var sc = new ServiceCollection(); var runner = new StepRunner(sc.BuildServiceProvider()); @@ -72,7 +71,7 @@ public void Test_Run_WithError() }).Throws(); runner.AddStep(step.Object); - runner.Run(default); + await runner.RunAsync(default); Assert.True(hasError); Assert.True(ran); diff --git a/src/CommonUtilities.SimplePipeline/test/SequentialPipelineTests.cs b/src/CommonUtilities.SimplePipeline/test/SequentialPipelineTests.cs index f7c069c..ca2379a 100644 --- a/src/CommonUtilities.SimplePipeline/test/SequentialPipelineTests.cs +++ b/src/CommonUtilities.SimplePipeline/test/SequentialPipelineTests.cs @@ -1,6 +1,8 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Text; using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Moq; using Moq.Protected; @@ -11,7 +13,7 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Test; public class SequentialPipelineTests { [Fact] - public void Test_Run_SequentialPipeline_RunsInSequence() + public async Task Test_Run_SequentialPipeline_RunsInSequence() { var sc = new ServiceCollection(); @@ -36,15 +38,53 @@ public void Test_Run_SequentialPipeline_RunsInSequence() CallBase = true }; - pipelineMock.Protected().Setup>("BuildStepsOrdered").Returns(new List + pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List { s1.Object, s2.Object - }); + })); var pipeline = pipelineMock.Object; - pipeline.Run(); + await pipeline.RunAsync(); Assert.Equal("ab", sb.ToString()); } + + [Theory] + [InlineData(true, "")] + [InlineData(false, "b")] + public async Task Test_Run_WithError(bool failFast, string result) + { + var sc = new ServiceCollection(); + + var sp = sc.BuildServiceProvider(); + + var sb = new StringBuilder(); + + var s1 = new Mock(); + s1.SetupGet(s => s.Error).Returns(new Exception()); + s1.Setup(i => i.Run(It.IsAny())).Throws(); + + var s2 = new Mock(); + s2.Setup(i => i.Run(It.IsAny())).Callback(() => + { + sb.Append('b'); + }); + + var pipelineMock = new Mock(sp, failFast) + { + CallBase = true + }; + + pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List + { + s1.Object, + s2.Object + })); + + var pipeline = pipelineMock.Object; + + await Assert.ThrowsAsync(async () => await pipeline.RunAsync()); + Assert.Equal(result, sb.ToString()); + } } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/test/SimplePipelineTests.cs b/src/CommonUtilities.SimplePipeline/test/SimplePipelineTests.cs index aa1f9c0..fc27911 100644 --- a/src/CommonUtilities.SimplePipeline/test/SimplePipelineTests.cs +++ b/src/CommonUtilities.SimplePipeline/test/SimplePipelineTests.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Threading; +using System.Threading.Tasks; using AnakinRaW.CommonUtilities.SimplePipeline.Runners; using Microsoft.Extensions.DependencyInjection; using Moq; @@ -9,10 +10,171 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline.Test; +public class ParallelProducerConsumerPipelineTest +{ + [Fact] + public async Task Test_Prepare() + { + var sp = new Mock(); + var pipeline = new Mock(sp.Object, 4, true) + { + CallBase = true + }; + + await pipeline.Object.PrepareAsync(); + await pipeline.Object.PrepareAsync(); + + pipeline.Protected().Verify("BuildSteps", Times.Once(), ItExpr.IsAny()); + } + + [Fact] + public async Task Test_Run_RunsNormally() + { + var sp = new Mock(); + var pipeline = new Mock(sp.Object, 4, true) + { + CallBase = true + }; + + var stepRun = false; + var s = new Mock(); + s.Setup(i => i.Run(It.IsAny())).Callback(() => stepRun = true); + + pipeline.Protected().Setup("BuildSteps", ItExpr.IsAny()).Callback((IStepQueue q) => + { + q.AddStep(s.Object); + }); + + await pipeline.Object.RunAsync(); + + Assert.True(stepRun); + pipeline.Protected().Verify("BuildSteps", Times.Once(), ItExpr.IsAny()); + } + + [Fact] + public async Task Test_Run_DelayedAdd() + { + var sp = new Mock(); + var pipeline = new Mock(sp.Object, 4, true) + { + CallBase = true + }; + + var mre = new ManualResetEventSlim(); + + var s1 = new Mock(); + s1.Setup(i => i.Run(It.IsAny())).Callback(() => + { + { + Task.Delay(3000).Wait(); + mre.Set(); + } + }); + + + var s2Run = false; + var s2 = new Mock(); + s2.Setup(i => i.Run(It.IsAny())).Callback(() => + { + { + s2Run = true; + } + }); + + pipeline.Protected().Setup("BuildSteps", ItExpr.IsAny()).Callback((IStepQueue q) => + { + q.AddStep(s1.Object); + mre.Wait(); + q.AddStep(s2.Object); + + }); + + await pipeline.Object.RunAsync(); + + Assert.True(s2Run); + pipeline.Protected().Verify("BuildSteps", Times.Once(), ItExpr.IsAny()); + } + + [Fact] + public async Task Test_Run_DelayedAdd_PrepareFails() + { + var sp = new Mock(); + var pipeline = new Mock(sp.Object, 4, true) + { + CallBase = true + }; + + var s1 = new Mock(); + s1.Setup(i => i.Run(It.IsAny())); + + var s2 = new Mock(); + s2.Setup(i => i.Run(It.IsAny())); + + pipeline.Protected().Setup("BuildSteps", ItExpr.IsAny()) + .Callback((IStepQueue q) => + { + q.AddStep(s1.Object); + q.AddStep(s2.Object); + }) + .Throws(); + + await Assert.ThrowsAsync(async () => await pipeline.Object.RunAsync()); + + pipeline.Protected().Verify("BuildSteps", Times.Once(), ItExpr.IsAny()); + } + + + [Fact] + public async Task Test_Run_PrepareCancelled() + { + var sp = new Mock(); + var pipeline = new Mock(sp.Object, 4, true) + { + CallBase = true + }; + + var cts = new CancellationTokenSource(); + var mre = new ManualResetEventSlim(); + + var s1 = new Mock(); + s1.Setup(i => i.Run(It.IsAny())).Callback(() => + { + { + Task.Delay(3000).Wait(); + mre.Set(); + } + }); + + + var s2Run = false; + var s2 = new Mock(); + s2.Setup(i => i.Run(It.IsAny())).Callback(() => + { + { + s2Run = true; + } + }); + + pipeline.Protected().Setup("BuildSteps", ItExpr.IsAny()).Callback((IStepQueue q) => + { + q.AddStep(s1.Object); + mre.Wait(); + cts.Cancel(); + q.AddStep(s2.Object); + + }); + + await pipeline.Object.RunAsync(cts.Token); + + Assert.True(s2Run); + pipeline.Protected().Verify("BuildSteps", Times.Once(), ItExpr.IsAny()); + } +} + public class SimplePipelineTests { [Fact] - public void Test_Run_SimplePipelineRunsNormally() + public async Task Test_Run_SimplePipelineRunsNormally() { var sc = new ServiceCollection(); @@ -23,19 +185,19 @@ public void Test_Run_SimplePipelineRunsNormally() CallBase = true }; pipelineMock.Protected().Setup("CreateRunner").Returns(new StepRunner(sp)); - pipelineMock.Protected().Setup>("BuildStepsOrdered").Returns(new List + pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List { new TestStep(1, "123") - }); + })); var pipeline = pipelineMock.Object; var cancellationTokenSource = new CancellationTokenSource(); - pipeline.Run(cancellationTokenSource.Token); + await pipeline.RunAsync(cancellationTokenSource.Token); } [Fact] - public void Test_Run_SimplePipelineFails_ThrowsStepFailureException() + public async Task Test_Run_SimplePipelineFails_ThrowsStepFailureException() { var sc = new ServiceCollection(); @@ -51,19 +213,19 @@ public void Test_Run_SimplePipelineFails_ThrowsStepFailureException() }; pipelineMock.Protected().Setup("CreateRunner").Returns(new StepRunner(sp)); - pipelineMock.Protected().Setup>("BuildStepsOrdered").Returns(new List + pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List { - s.Object - }); + s.Object, + })); var pipeline = pipelineMock.Object; var cancellationTokenSource = new CancellationTokenSource(); - Assert.Throws(() => pipeline.Run(cancellationTokenSource.Token)); + await Assert.ThrowsAsync(async () => await pipeline.RunAsync(cancellationTokenSource.Token)); } [Fact] - public void Test_Run_SimplePipelineFailsSlow_ThrowsStepFailureException() + public async Task Test_Run_SimplePipelineFailsSlow_ThrowsStepFailureException() { var sc = new ServiceCollection(); @@ -83,20 +245,20 @@ public void Test_Run_SimplePipelineFailsSlow_ThrowsStepFailureException() }; pipelineMock.Protected().Setup("CreateRunner").Returns(new StepRunner(sp)); - pipelineMock.Protected().Setup>("BuildStepsOrdered").Returns(new List + pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List { s1.Object, s2.Object - }); + })); var pipeline = pipelineMock.Object; - Assert.Throws(() => pipeline.Run()); + await Assert.ThrowsAsync(async () => await pipeline.RunAsync()); Assert.True(flag); } [Fact] - public void Test_Run_SimplePipelineFailsFast_ThrowsStepFailureException() + public async Task Test_Run_SimplePipelineFailsFast_ThrowsStepFailureException() { var sc = new ServiceCollection(); @@ -116,15 +278,15 @@ public void Test_Run_SimplePipelineFailsFast_ThrowsStepFailureException() }; pipelineMock.Protected().Setup("CreateRunner").Returns(new StepRunner(sp)); - pipelineMock.Protected().Setup>("BuildStepsOrdered").Returns(new List + pipelineMock.Protected().Setup>>("BuildSteps").Returns(Task.FromResult>(new List { s1.Object, s2.Object - }); + })); var pipeline = pipelineMock.Object; - Assert.Throws(() => pipeline.Run()); + await Assert.ThrowsAsync(async () => await pipeline.RunAsync()); Assert.False(flag); } } \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/test/Steps/WaitStepTest.cs b/src/CommonUtilities.SimplePipeline/test/Steps/WaitStepTest.cs index c3a20a7..93d92d7 100644 --- a/src/CommonUtilities.SimplePipeline/test/Steps/WaitStepTest.cs +++ b/src/CommonUtilities.SimplePipeline/test/Steps/WaitStepTest.cs @@ -10,7 +10,8 @@ public class WaitStepTest [Fact] public void Test_Wait() { - var runner = new Mock(); + var runner = new Mock(); + var sc = new ServiceCollection(); var step = new WaitStep(runner.Object, sc.BuildServiceProvider()); diff --git a/src/CommonUtilities.TestingUtilities/CommonUtilities.TestingUtilities.csproj b/src/CommonUtilities.TestingUtilities/CommonUtilities.TestingUtilities.csproj index ec3057b..c065950 100644 --- a/src/CommonUtilities.TestingUtilities/CommonUtilities.TestingUtilities.csproj +++ b/src/CommonUtilities.TestingUtilities/CommonUtilities.TestingUtilities.csproj @@ -22,8 +22,8 @@ runtime; build; native; contentfiles; analyzers; buildtransitive - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/CommonUtilities/src/AwaitExtensions.cs b/src/CommonUtilities/src/AwaitExtensions.cs index 9f3ff9b..83bda78 100644 --- a/src/CommonUtilities/src/AwaitExtensions.cs +++ b/src/CommonUtilities/src/AwaitExtensions.cs @@ -1,5 +1,4 @@ -#if !NET5_0_OR_GREATER -using System; +using System; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -22,7 +21,7 @@ public static class AwaitExtensions /// /// A task whose result is the of the . // From https://github.com/dotnet/runtime - public static async Task WaitForExitAsync(this Process process, CancellationToken cancellationToken = default) + public static async Task WaitForExitAsyncEx(this Process process, CancellationToken cancellationToken = default) { if (process == null) throw new ArgumentNullException(nameof(process)); @@ -50,14 +49,11 @@ public static async Task WaitForExitAsync(this Process process, Cancellatio return process.ExitCode; using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken))) - { return await tcs.Task.ConfigureAwait(false); - } } finally { process.Exited -= Handler!; } } -} -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/src/CommonUtilities/test/AwaitExtensionsTests.cs b/src/CommonUtilities/test/AwaitExtensionsTests.cs index 88e0adb..1cfc1c4 100644 --- a/src/CommonUtilities/test/AwaitExtensionsTests.cs +++ b/src/CommonUtilities/test/AwaitExtensionsTests.cs @@ -1,6 +1,4 @@ -#if !NET5_0_OR_GREATER - -using System; +using System; using System.Diagnostics; using System.Runtime.InteropServices; using System.Threading; @@ -16,7 +14,7 @@ public class AwaitExtensionsTests [Fact] public async Task Test_WaitForExitAsync_NullArgument() { - await Assert.ThrowsAsync(() => AwaitExtensions.WaitForExitAsync(null!)); + await Assert.ThrowsAsync(() => AwaitExtensions.WaitForExitAsyncEx(null!)); } [PlatformSpecificFact(TestPlatformIdentifier.Windows)] @@ -28,7 +26,7 @@ public async Task Test_WaitForExitAsync_ExitCode_Windows() CreateNoWindow = true, WindowStyle = ProcessWindowStyle.Hidden, })!; - var exitCode = await p.WaitForExitAsync(); + var exitCode = await p.WaitForExitAsyncEx(); Assert.Equal(55, exitCode); } @@ -42,7 +40,7 @@ public void Test_WaitForExitAsync_AlreadyExited_Windows() WindowStyle = ProcessWindowStyle.Hidden, })!; p.WaitForExit(); - var t = p.WaitForExitAsync(); + var t = p.WaitForExitAsyncEx(); Assert.True(t.IsCompleted); Assert.Equal(55, t.Result); } @@ -54,7 +52,7 @@ public async Task Test_WaitForExitAsync_UnstartedProcess() var process = new Process(); process.StartInfo.FileName = processName; process.StartInfo.CreateNoWindow = true; - await Assert.ThrowsAsync(() => process.WaitForExitAsync()); + await Assert.ThrowsAsync(() => process.WaitForExitAsyncEx()); } [Fact] @@ -65,7 +63,7 @@ public async Task Test_WaitForExitAsync_DoesNotCompleteTillKilled() var p = Process.Start(new ProcessStartInfo(processName) { CreateNoWindow = true, WindowStyle = ProcessWindowStyle.Hidden })!; try { - var t = p.WaitForExitAsync(); + var t = p.WaitForExitAsyncEx(); Assert.False(t.IsCompleted); p.Kill(); var exitCode = await t; @@ -94,7 +92,7 @@ public async Task Test_WaitForExitAsync_Canceled() try { var cts = new CancellationTokenSource(); - var t = p.WaitForExitAsync(cts.Token); + var t = p.WaitForExitAsyncEx(cts.Token); Assert.False(t.IsCompleted); cts.Cancel(); await Assert.ThrowsAsync(() => t); @@ -104,5 +102,4 @@ public async Task Test_WaitForExitAsync_Canceled() p.Kill(); } } -} -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/src/CommonUtilities/test/CommonUtilities.Test.csproj b/src/CommonUtilities/test/CommonUtilities.Test.csproj index 6dd2867..e66bc5f 100644 --- a/src/CommonUtilities/test/CommonUtilities.Test.csproj +++ b/src/CommonUtilities/test/CommonUtilities.Test.csproj @@ -22,10 +22,10 @@ - - - - + + + + runtime; build; native; contentfiles; analyzers; buildtransitive all From c07aed9591c2eec259216c500cdc6eebf0a76dc5 Mon Sep 17 00:00:00 2001 From: AnakinRaW Date: Thu, 18 Apr 2024 10:43:35 +0200 Subject: [PATCH 6/9] to async pipeline --- .../src/IStepQueue.cs | 20 ++ .../src/ISynchronizedRunner.cs | 23 ++ .../ParallelProducerConsumerPipeline.cs | 130 ++++++++++ .../Runners/ParallelProducerConsumerRunner.cs | 159 ++++++++++++ .../ParallelProducerConsumerRunnerTest.cs | 244 ++++++++++++++++++ src/CommonUtilities/src/TaskExtensions.cs | 66 +++++ .../test/TaskExtensionsTests.cs | 91 +++++++ 7 files changed, 733 insertions(+) create mode 100644 src/CommonUtilities.SimplePipeline/src/IStepQueue.cs create mode 100644 src/CommonUtilities.SimplePipeline/src/ISynchronizedRunner.cs create mode 100644 src/CommonUtilities.SimplePipeline/src/Pipelines/ParallelProducerConsumerPipeline.cs create mode 100644 src/CommonUtilities.SimplePipeline/src/Runners/ParallelProducerConsumerRunner.cs create mode 100644 src/CommonUtilities.SimplePipeline/test/Runners/ParallelProducerConsumerRunnerTest.cs create mode 100644 src/CommonUtilities/src/TaskExtensions.cs create mode 100644 src/CommonUtilities/test/TaskExtensionsTests.cs diff --git a/src/CommonUtilities.SimplePipeline/src/IStepQueue.cs b/src/CommonUtilities.SimplePipeline/src/IStepQueue.cs new file mode 100644 index 0000000..68fc484 --- /dev/null +++ b/src/CommonUtilities.SimplePipeline/src/IStepQueue.cs @@ -0,0 +1,20 @@ +using System.Collections.Generic; + +namespace AnakinRaW.CommonUtilities.SimplePipeline; + +/// +/// A queue of . +/// +public interface IStepQueue +{ + /// + /// List of only those steps which are scheduled for execution of an . + /// + public IReadOnlyList Steps { get; } + + /// + /// Adds an to the . + /// + /// The step to app. + void AddStep(IStep activity); +} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/ISynchronizedRunner.cs b/src/CommonUtilities.SimplePipeline/src/ISynchronizedRunner.cs new file mode 100644 index 0000000..349a431 --- /dev/null +++ b/src/CommonUtilities.SimplePipeline/src/ISynchronizedRunner.cs @@ -0,0 +1,23 @@ +using System; + +namespace AnakinRaW.CommonUtilities.SimplePipeline; + +/// +/// Specialized which allows for synchronous waiting. +/// +public interface ISynchronizedRunner : IRunner +{ + /// + /// Synchronously waits for this runner for all of its steps to be finished. + /// + /// If any of the steps failed with an exception. + void Wait(); + + /// + /// Synchronously waits for this runner for all of its steps to be finished. + /// + /// The time duration to wait. + /// If expired. + /// If any of the steps failed with an exception. + void Wait(TimeSpan waitDuration); +} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Pipelines/ParallelProducerConsumerPipeline.cs b/src/CommonUtilities.SimplePipeline/src/Pipelines/ParallelProducerConsumerPipeline.cs new file mode 100644 index 0000000..db8d300 --- /dev/null +++ b/src/CommonUtilities.SimplePipeline/src/Pipelines/ParallelProducerConsumerPipeline.cs @@ -0,0 +1,130 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using AnakinRaW.CommonUtilities.SimplePipeline.Runners; + +namespace AnakinRaW.CommonUtilities.SimplePipeline; + +/// +/// A simple pipeline that runs all steps on the thread pool in parallel. Allows to run the pipeline even if preparation is not completed. +/// +/// +/// Useful, if preparation is work intensive. +/// +public abstract class ParallelProducerConsumerPipeline : DisposableObject, IPipeline +{ + private readonly bool _failFast; + private CancellationTokenSource? _linkedCancellationTokenSource; + private readonly ParallelProducerConsumerRunner _runner; + + private bool? _prepareSuccessful; + + /// + /// Gets or sets a value indicating whether the pipeline has encountered a failure. + /// + protected bool PipelineFailed { get; set; } + + + /// + /// Initializes a new instance of the class. + /// + /// The service provider for dependency injection within the pipeline. + /// The number of worker threads to be used for parallel execution. + /// A value indicating whether the pipeline should fail fast. + protected ParallelProducerConsumerPipeline(IServiceProvider serviceProvider, int workerCount = 4, bool failFast = true) + { + _failFast = failFast; + _runner = new ParallelProducerConsumerRunner(workerCount, serviceProvider); + } + + /// + public async Task PrepareAsync() + { + ThrowIfDisposed(); + if (_prepareSuccessful.HasValue) + return _prepareSuccessful.Value; + + await BuildSteps(_runner).ConfigureAwait(false); + + _prepareSuccessful = true; + return _prepareSuccessful.Value; + } + + /// + public async Task RunAsync(CancellationToken token = default) + { + ThrowIfDisposed(); + token.ThrowIfCancellationRequested(); + + if (_prepareSuccessful is false) + return; + + try + { + _linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); + + if (_prepareSuccessful is null) + { + Task.Run(async () => + { + try + { + var result = await PrepareAsync().ConfigureAwait(false); + if (!result) + _linkedCancellationTokenSource?.Cancel(); + } + finally + { + _runner.Finish(); + } + }, token).Forget(); + } + + + + _runner.Error += OnError; + await _runner.RunAsync(_linkedCancellationTokenSource.Token).ConfigureAwait(false); + } + finally + { + _runner.Error -= OnError; + if (_linkedCancellationTokenSource is not null) + { + _linkedCancellationTokenSource.Dispose(); + _linkedCancellationTokenSource = null; + } + } + + if (!PipelineFailed && _prepareSuccessful.HasValue && _prepareSuccessful.Value) + return; + + if (_prepareSuccessful is not true) + throw new InvalidOperationException("Preparation of the pipeline failed."); + + var failedBuildSteps = _runner.Steps + .Where(p => p.Error != null && !p.Error.IsExceptionType()) + .ToList(); + + if (failedBuildSteps.Any()) + throw new StepFailureException(failedBuildSteps); + } + + /// + /// Builds the steps in the order they should be executed within the pipeline. + /// + /// A list of steps in the order they should be executed. + protected abstract Task BuildSteps(IStepQueue queue); + + /// + /// Called when an error occurs within a step. + /// + /// The sender of the event. + /// The event arguments. + protected virtual void OnError(object sender, StepErrorEventArgs e) + { + PipelineFailed = true; + if (_failFast || e.Cancel) + _linkedCancellationTokenSource?.Cancel(); + } +} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/src/Runners/ParallelProducerConsumerRunner.cs b/src/CommonUtilities.SimplePipeline/src/Runners/ParallelProducerConsumerRunner.cs new file mode 100644 index 0000000..ac21b86 --- /dev/null +++ b/src/CommonUtilities.SimplePipeline/src/Runners/ParallelProducerConsumerRunner.cs @@ -0,0 +1,159 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace AnakinRaW.CommonUtilities.SimplePipeline.Runners; + +/// +/// Runner engine, which executes all queued _steps parallel. Steps may be queued while step execution has been started. +/// The execution can finish only if was called explicitly. +/// +public sealed class ParallelProducerConsumerRunner : DisposableObject, ISynchronizedRunner +{ + /// + public event EventHandler? Error; + + private readonly ConcurrentBag _exceptions; + private readonly ConcurrentBag _steps; + private readonly ILogger? _logger; + private readonly int _workerCount; + private readonly Task[] _runnerTasks; + private CancellationToken _cancel; + + /// + public IReadOnlyList Steps => _steps.ToArray(); + + private BlockingCollection StepQueue { get; } + + /// + /// Aggregates all step exceptions, if any happened. + /// + public AggregateException? Exception => _exceptions.Count > 0 ? new AggregateException(_exceptions) : null; + + internal bool IsCancelled { get; private set; } + + /// + /// Initializes a new instance. + /// + /// The number of parallel workers. + /// The service provider. + /// If the number of workers is below 1. + public ParallelProducerConsumerRunner(int workerCount, IServiceProvider serviceProvider) + { + if (workerCount is < 1 or >= 64) + throw new ArgumentException("invalid parallel worker count"); + _workerCount = workerCount; + _runnerTasks = new Task[_workerCount]; + _steps = new ConcurrentBag(); + StepQueue = new BlockingCollection(); + _logger = serviceProvider.GetService()?.CreateLogger(GetType()); + _exceptions = new ConcurrentBag(); + } + + + /// + public Task RunAsync(CancellationToken token) + { + token.ThrowIfCancellationRequested(); + _cancel = token; + for (var index = 0; index < _workerCount; ++index) + _runnerTasks[index] = Task.Factory.StartNew(RunThreaded, CancellationToken.None); + return Task.WhenAll(_runnerTasks).WaitAsync(token); + } + + /// + public void Wait() + { + Wait(Timeout.InfiniteTimeSpan); + var exception = Exception; + if (exception != null) + throw exception; + } + + /// + public void Wait(TimeSpan timeout) + { + if (!Task.WaitAll(_runnerTasks, timeout)) + throw new TimeoutException(); + } + + /// + /// Signals, this instance does not expect any more steps. + /// + public void Finish() + { + StepQueue.CompleteAdding(); + } + + /// + public void AddStep(IStep step) + { + if (step is null) + throw new ArgumentNullException(nameof(step)); + StepQueue.Add(step, CancellationToken.None); + } + + /// + protected override void DisposeManagedResources() + { + base.DisposeManagedResources(); + StepQueue.Dispose(); + foreach (var step in Steps) + step.Dispose(); + } + + private void RunThreaded() + { + var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_cancel); + var canceled = false; + foreach (var step in StepQueue.GetConsumingEnumerable()) + { + ThrowIfDisposed(); + try + { + _cancel.ThrowIfCancellationRequested(); + _steps.Add(step); + step.Run(_cancel); + } + catch (StopRunnerException) + { + _logger?.LogTrace("Stop subsequent steps"); + StepQueue.CompleteAdding(); + break; + } + catch (Exception ex) + { + _exceptions.Add(ex); + if (!canceled) + { + if (ex.IsExceptionType()) + _logger?.LogTrace($"Activity threw exception {ex.GetType()}: {ex.Message}" + Environment.NewLine + $"{ex.StackTrace}"); + else + _logger?.LogTrace(ex, $"Activity threw exception {ex.GetType()}: {ex.Message}"); + } + var e = new StepErrorEventArgs(step) + { + Cancel = _cancel.IsCancellationRequested || IsCancelled || ex.IsExceptionType() + }; + OnError(e); + if (e.Cancel) + { + canceled = true; + linkedTokenSource.Cancel(); + } + } + } + } + + private void OnError(StepErrorEventArgs e) + { + Error?.Invoke(this, e); + if (!e.Cancel) + return; + IsCancelled |= e.Cancel; + } +} \ No newline at end of file diff --git a/src/CommonUtilities.SimplePipeline/test/Runners/ParallelProducerConsumerRunnerTest.cs b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelProducerConsumerRunnerTest.cs new file mode 100644 index 0000000..1b3e4d5 --- /dev/null +++ b/src/CommonUtilities.SimplePipeline/test/Runners/ParallelProducerConsumerRunnerTest.cs @@ -0,0 +1,244 @@ +using System; +using System.Threading.Tasks; +using AnakinRaW.CommonUtilities.SimplePipeline.Runners; +using Microsoft.Extensions.DependencyInjection; +using Moq; +using Xunit; + +namespace AnakinRaW.CommonUtilities.SimplePipeline.Test.Runners; + +public class ParallelProducerConsumerRunnerTest +{ + [Fact] + public void Test_Run_WaitNotFinished() + { + var sc = new ServiceCollection(); + var runner = new ParallelProducerConsumerRunner(2, sc.BuildServiceProvider()); + + var s1 = new Mock(); + var s2 = new Mock(); + + runner.AddStep(s1.Object); + runner.AddStep(s2.Object); + + var ran1 = false; + s1.Setup(t => t.Run(default)).Callback(() => + { + ran1 = true; + }); + var ran2 = false; + s2.Setup(t => t.Run(default)).Callback(() => + { + ran2 = true; + }); + + + _ = runner.RunAsync(default); + + Assert.Throws(() => runner.Wait(TimeSpan.FromSeconds(2))); + + Assert.True(ran1); + Assert.True(ran2); + } + + [Fact] + public async Task Test_Run_AwaitDoesNotThrow() + { + var sc = new ServiceCollection(); + var runner = new ParallelProducerConsumerRunner(2, sc.BuildServiceProvider()); + + var s1 = new Mock(); + var s2 = new Mock(); + + runner.AddStep(s1.Object); + runner.AddStep(s2.Object); + + runner.Finish(); + + s1.Setup(t => t.Run(default)).Throws(); + + var ran2 = false; + s2.Setup(t => t.Run(default)).Callback(() => + { + ran2 = true; + }); + + + await runner.RunAsync(default); + Assert.NotNull(runner.Exception); + Assert.True(ran2); + } + + [Fact] + public void Test_Run_SyncWaitDoesThrow() + { + var sc = new ServiceCollection(); + var runner = new ParallelProducerConsumerRunner(2, sc.BuildServiceProvider()); + + var s1 = new Mock(); + var s2 = new Mock(); + + runner.AddStep(s1.Object); + runner.AddStep(s2.Object); + + runner.Finish(); + + s1.Setup(t => t.Run(default)).Throws(); + + var ran2 = false; + s2.Setup(t => t.Run(default)).Callback(() => + { + ran2 = true; + }); + + + runner.RunAsync(default); + + Assert.Throws(() => runner.Wait()); + + Assert.NotNull(runner.Exception); + Assert.True(ran2); + } + + [Fact] + public void Test_Run_AddDelayed() + { + var sc = new ServiceCollection(); + var runner = new ParallelProducerConsumerRunner(2, sc.BuildServiceProvider()); + + var s1 = new Mock(); + var s2 = new Mock(); + var s3 = new Mock(); + + runner.AddStep(s1.Object); + runner.AddStep(s2.Object); + + var ran1 = false; + s1.Setup(t => t.Run(default)).Callback(() => + { + ran1 = true; + }); + var ran2 = false; + s2.Setup(t => t.Run(default)).Callback(() => + { + ran2 = true; + }); + + var ran3 = false; + s3.Setup(t => t.Run(default)).Callback(() => + { + ran3 = true; + }); + + + _ = runner.RunAsync(default); + + Task.Run(() => + { + runner.AddStep(s3.Object); + Task.Delay(1000); + runner.Finish(); + }); + + runner.Wait(); + + Assert.True(ran1); + Assert.True(ran2); + Assert.True(ran3); + } + + [Fact] + public async Task Test_Run_AddDelayed_Await() + { + var sc = new ServiceCollection(); + var runner = new ParallelProducerConsumerRunner(2, sc.BuildServiceProvider()); + + var s1 = new Mock(); + var s2 = new Mock(); + var s3 = new Mock(); + + runner.AddStep(s1.Object); + runner.AddStep(s2.Object); + + var ran1 = false; + s1.Setup(t => t.Run(default)).Callback(() => + { + ran1 = true; + }); + var ran2 = false; + s2.Setup(t => t.Run(default)).Callback(() => + { + ran2 = true; + }); + + var ran3 = false; + s3.Setup(t => t.Run(default)).Callback(() => + { + ran3 = true; + }); + + + var runTask = runner.RunAsync(default); + + Task.Run(() => + { + runner.AddStep(s3.Object); + Task.Delay(1000); + runner.Finish(); + }); + + await runTask; + + Assert.True(ran1); + Assert.True(ran2); + Assert.True(ran3); + } + + [Fact] + public async Task Test_Run_AddDelayed_AwaitAndWait() + { + var sc = new ServiceCollection(); + var runner = new ParallelProducerConsumerRunner(2, sc.BuildServiceProvider()); + + var s1 = new Mock(); + var s2 = new Mock(); + var s3 = new Mock(); + + runner.AddStep(s1.Object); + runner.AddStep(s2.Object); + + var ran1 = false; + s1.Setup(t => t.Run(default)).Callback(() => + { + ran1 = true; + }); + var ran2 = false; + s2.Setup(t => t.Run(default)).Callback(() => + { + ran2 = true; + }); + + var ran3 = false; + s3.Setup(t => t.Run(default)).Callback(() => + { + ran3 = true; + }); + + + var runTask = runner.RunAsync(default); + + Task.Run(() => + { + runner.AddStep(s3.Object); + Task.Delay(1000); + runner.Finish(); + }); + + await runTask; + runner.Wait(); + + Assert.True(ran1); + Assert.True(ran2); + Assert.True(ran3); + } +} \ No newline at end of file diff --git a/src/CommonUtilities/src/TaskExtensions.cs b/src/CommonUtilities/src/TaskExtensions.cs new file mode 100644 index 0000000..823cf54 --- /dev/null +++ b/src/CommonUtilities/src/TaskExtensions.cs @@ -0,0 +1,66 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace AnakinRaW.CommonUtilities; + +/// +/// Provides extension methods for the and types. +/// +public static class TaskExtensions +{ + /// + /// Consumes a task and doesn't do anything with it. Useful for fire-and-forget calls to async methods within async methods. + /// + /// The task whose result is to be ignored. + public static void Forget(this Task? task) + { + } + + /// + /// Asynchronously waits for the task to complete, or for the cancellation token to be canceled. + /// + /// The task to wait for. May not be null. + /// The cancellation token that cancels the wait. + public static Task WaitAsync(this Task task, CancellationToken cancellationToken) + { + if (task == null) + throw new ArgumentNullException(nameof(task)); + + if (!cancellationToken.CanBeCanceled) + return task; + if (cancellationToken.IsCancellationRequested) + return Task.FromCanceled(cancellationToken); + return DoWaitAsync(task, cancellationToken); + } + + private static async Task DoWaitAsync(Task task, CancellationToken cancellationToken) + { + using var cancelTaskSource = new CancellationTokenTaskSource(cancellationToken); + await (await Task.WhenAny(task, cancelTaskSource.Task).ConfigureAwait(false)).ConfigureAwait(false); + } + + internal sealed class CancellationTokenTaskSource : IDisposable + { + private readonly IDisposable? _registration; + + public Task Task { get; } + + public CancellationTokenTaskSource(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + Task = System.Threading.Tasks.Task.FromCanceled(cancellationToken); + return; + } + var tcs = new TaskCompletionSource(); + _registration = cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken), useSynchronizationContext: false); + Task = tcs.Task; + } + + public void Dispose() + { + _registration?.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/CommonUtilities/test/TaskExtensionsTests.cs b/src/CommonUtilities/test/TaskExtensionsTests.cs new file mode 100644 index 0000000..28077e1 --- /dev/null +++ b/src/CommonUtilities/test/TaskExtensionsTests.cs @@ -0,0 +1,91 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace AnakinRaW.CommonUtilities.Test; + +public class TaskExtensionsTests +{ + [Fact] + public void WaitAsyncTResult_TokenThatCannotCancel_ReturnsSourceTask() + { + var tcs = new TaskCompletionSource(); + var task = tcs.Task.WaitAsync(CancellationToken.None); + + Assert.Same(tcs.Task, task); + } + + [Fact] + public void WaitAsyncTResult_AlreadyCanceledToken_ReturnsSynchronouslyCanceledTask() + { + var tcs = new TaskCompletionSource(); + var token = new CancellationToken(true); + var task = tcs.Task.WaitAsync(token); + + Assert.True(task.IsCanceled); + Assert.Equal(token, GetCancellationTokenFromTask(task)); + } + + [Fact] + public async Task WaitAsyncTResult_TokenCanceled_CancelsTask() + { + var tcs = new TaskCompletionSource(); + var cts = new CancellationTokenSource(); + var task = tcs.Task.WaitAsync(cts.Token); + Assert.False(task.IsCompleted); + + cts.Cancel(); + + await Assert.ThrowsAnyAsync(async () => await task); + Assert.Equal(cts.Token, GetCancellationTokenFromTask(task)); + } + + [Fact] + public void WaitAsync_TokenThatCannotCancel_ReturnsSourceTask() + { + var tcs = new TaskCompletionSource(); + var task = ((Task)tcs.Task).WaitAsync(CancellationToken.None); + + Assert.Same(tcs.Task, task); + } + + [Fact] + public void WaitAsync_AlreadyCanceledToken_ReturnsSynchronouslyCanceledTask() + { + var tcs = new TaskCompletionSource(); + var token = new CancellationToken(true); + var task = ((Task)tcs.Task).WaitAsync(token); + + Assert.True(task.IsCanceled); + Assert.Equal(token, GetCancellationTokenFromTask(task)); + } + + [Fact] + public async Task WaitAsync_TokenCanceled_CancelsTask() + { + var tcs = new TaskCompletionSource(); + var cts = new CancellationTokenSource(); + var task = ((Task)tcs.Task).WaitAsync(cts.Token); + Assert.False(task.IsCompleted); + + cts.Cancel(); + + await Assert.ThrowsAnyAsync(async () => await task); + Assert.Equal(cts.Token, GetCancellationTokenFromTask(task)); + } + + private static CancellationToken GetCancellationTokenFromTask(Task task) + { + try + { + task.Wait(); + } + catch (AggregateException ex) + { + if (ex.InnerException is OperationCanceledException oce) + return oce.CancellationToken; + } + return CancellationToken.None; + } +} \ No newline at end of file From e9724bc7d6e3b0beb932e785030097037cb7f86c Mon Sep 17 00:00:00 2001 From: AnakinRaW Date: Fri, 19 Apr 2024 10:44:58 +0200 Subject: [PATCH 7/9] make beta --- version.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.json b/version.json index ad503a5..53346e6 100644 --- a/version.json +++ b/version.json @@ -1,6 +1,6 @@ { "$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json", - "version": "11.1", + "version": "12.0-beta", "assemblyVersion": { "precision": "major" }, From ef2cc14d8faea744a6e93a5aa374dec7af8e29c9 Mon Sep 17 00:00:00 2001 From: AnakinRaW Date: Fri, 19 Apr 2024 10:58:33 +0200 Subject: [PATCH 8/9] fix test --- .../test/SimplePipelineTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/CommonUtilities.SimplePipeline/test/SimplePipelineTests.cs b/src/CommonUtilities.SimplePipeline/test/SimplePipelineTests.cs index fc27911..019b7e4 100644 --- a/src/CommonUtilities.SimplePipeline/test/SimplePipelineTests.cs +++ b/src/CommonUtilities.SimplePipeline/test/SimplePipelineTests.cs @@ -164,9 +164,9 @@ public async Task Test_Run_PrepareCancelled() }); - await pipeline.Object.RunAsync(cts.Token); + await Assert.ThrowsAnyAsync(async () => await pipeline.Object.RunAsync(cts.Token)); - Assert.True(s2Run); + Assert.False(s2Run); pipeline.Protected().Verify("BuildSteps", Times.Once(), ItExpr.IsAny()); } } From 4773c69ea008128b33e26e7abf9e22fda5ae94ca Mon Sep 17 00:00:00 2001 From: AnakinRaW Date: Fri, 19 Apr 2024 11:05:18 +0200 Subject: [PATCH 9/9] try fix --- src/CommonUtilities/src/AwaitExtensions.cs | 20 ++++++++------ .../test/AwaitExtensionsTests.cs | 26 ++++++++++--------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/CommonUtilities/src/AwaitExtensions.cs b/src/CommonUtilities/src/AwaitExtensions.cs index 83bda78..312d5b1 100644 --- a/src/CommonUtilities/src/AwaitExtensions.cs +++ b/src/CommonUtilities/src/AwaitExtensions.cs @@ -1,4 +1,5 @@ -using System; +#if !NET5_0_OR_GREATER +using System; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -21,7 +22,7 @@ public static class AwaitExtensions /// /// A task whose result is the of the . // From https://github.com/dotnet/runtime - public static async Task WaitForExitAsyncEx(this Process process, CancellationToken cancellationToken = default) + public static async Task WaitForExitAsync(this Process process, CancellationToken cancellationToken = default) { if (process == null) throw new ArgumentNullException(nameof(process)); @@ -36,24 +37,27 @@ public static async Task WaitForExitAsyncEx(this Process process, Cancellat catch (InvalidOperationException) { if (process.HasExited) - return process.ExitCode; + return; throw; } - var tcs = new TaskCompletionSource(); - void Handler(object o, EventArgs eventArgs) => tcs.TrySetResult(process.ExitCode); + var tcs = new TaskCompletionSource(); + void Handler(object o, EventArgs eventArgs) => tcs.TrySetResult(default); try { process.Exited += Handler!; if (process.HasExited) - return process.ExitCode; + return; using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken))) - return await tcs.Task.ConfigureAwait(false); + await tcs.Task.ConfigureAwait(false); } finally { process.Exited -= Handler!; } } -} \ No newline at end of file + + private readonly struct EmptyStruct; +} +#endif \ No newline at end of file diff --git a/src/CommonUtilities/test/AwaitExtensionsTests.cs b/src/CommonUtilities/test/AwaitExtensionsTests.cs index 1cfc1c4..73ac52d 100644 --- a/src/CommonUtilities/test/AwaitExtensionsTests.cs +++ b/src/CommonUtilities/test/AwaitExtensionsTests.cs @@ -1,4 +1,5 @@ -using System; +#if !NET5_0_OR_GREATER +using System; using System.Diagnostics; using System.Runtime.InteropServices; using System.Threading; @@ -14,7 +15,7 @@ public class AwaitExtensionsTests [Fact] public async Task Test_WaitForExitAsync_NullArgument() { - await Assert.ThrowsAsync(() => AwaitExtensions.WaitForExitAsyncEx(null!)); + await Assert.ThrowsAsync(() => AwaitExtensions.WaitForExitAsync(null!)); } [PlatformSpecificFact(TestPlatformIdentifier.Windows)] @@ -26,8 +27,8 @@ public async Task Test_WaitForExitAsync_ExitCode_Windows() CreateNoWindow = true, WindowStyle = ProcessWindowStyle.Hidden, })!; - var exitCode = await p.WaitForExitAsyncEx(); - Assert.Equal(55, exitCode); + await p.WaitForExitAsync(); + Assert.Equal(55, p.ExitCode); } [PlatformSpecificFact(TestPlatformIdentifier.Windows)] @@ -40,9 +41,9 @@ public void Test_WaitForExitAsync_AlreadyExited_Windows() WindowStyle = ProcessWindowStyle.Hidden, })!; p.WaitForExit(); - var t = p.WaitForExitAsyncEx(); + var t = p.WaitForExitAsync(); Assert.True(t.IsCompleted); - Assert.Equal(55, t.Result); + Assert.Equal(55, p.ExitCode); } [Fact] @@ -52,7 +53,7 @@ public async Task Test_WaitForExitAsync_UnstartedProcess() var process = new Process(); process.StartInfo.FileName = processName; process.StartInfo.CreateNoWindow = true; - await Assert.ThrowsAsync(() => process.WaitForExitAsyncEx()); + await Assert.ThrowsAsync(() => process.WaitForExitAsync()); } [Fact] @@ -63,11 +64,11 @@ public async Task Test_WaitForExitAsync_DoesNotCompleteTillKilled() var p = Process.Start(new ProcessStartInfo(processName) { CreateNoWindow = true, WindowStyle = ProcessWindowStyle.Hidden })!; try { - var t = p.WaitForExitAsyncEx(); + var t = p.WaitForExitAsync(); Assert.False(t.IsCompleted); p.Kill(); - var exitCode = await t; - Assert.Equal(expectedExitCode, exitCode); + await t; + Assert.Equal(expectedExitCode, p.ExitCode); } catch { @@ -92,7 +93,7 @@ public async Task Test_WaitForExitAsync_Canceled() try { var cts = new CancellationTokenSource(); - var t = p.WaitForExitAsyncEx(cts.Token); + var t = p.WaitForExitAsync(cts.Token); Assert.False(t.IsCompleted); cts.Cancel(); await Assert.ThrowsAsync(() => t); @@ -102,4 +103,5 @@ public async Task Test_WaitForExitAsync_Canceled() p.Kill(); } } -} \ No newline at end of file +} +#endif \ No newline at end of file