Skip to content

Commit

Permalink
Make Pipeline async (#155) (#156)
Browse files Browse the repository at this point in the history
* use disposableobject

* implement async step

* rename method

* rename method

* merge

* to async pipeline

* make beta

* fix test

* try fix
  • Loading branch information
AnakinRaW authored Apr 19, 2024
1 parent 7f4beec commit cfcfdfd
Show file tree
Hide file tree
Showing 36 changed files with 1,108 additions and 444 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="Moq" Version="4.20.70" />
<PackageReference Include="Testably.Abstractions" Version="3.1.0" />
<PackageReference Include="Testably.Abstractions.Testing" Version="3.1.0" />
<PackageReference Include="xunit" Version="2.7.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.7">
<PackageReference Include="Testably.Abstractions" Version="3.1.2" />
<PackageReference Include="Testably.Abstractions.Testing" Version="3.1.2" />
<PackageReference Include="xunit" Version="2.7.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.8">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Testably.Abstractions" Version="3.1.0" />
<PackageReference Include="Testably.Abstractions.Testing" Version="3.1.0" />
<PackageReference Include="xunit" Version="2.7.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.7">
<PackageReference Include="Testably.Abstractions" Version="3.1.2" />
<PackageReference Include="Testably.Abstractions.Testing" Version="3.1.2" />
<PackageReference Include="xunit" Version="2.7.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.8">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="xunit" Version="2.7.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.7">
<PackageReference Include="xunit" Version="2.7.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.8">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
19 changes: 7 additions & 12 deletions src/CommonUtilities.SimplePipeline/src/IRunner.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace AnakinRaW.CommonUtilities.SimplePipeline;

/// <summary>
/// Execution engine to run one or many <see cref="IStep"/>s.
/// </summary>
public interface IRunner : IEnumerable<IStep>
public interface IRunner : IStepQueue, IDisposable
{
/// <summary>
/// Event which gets raised if the execution of an <see cref="IStep"/> failed with an exception.
/// Gets raised when the execution of an <see cref="IStep"/> failed with an exception.
/// </summary>
event EventHandler<StepErrorEventArgs>? Error;

/// <summary>
/// Runs all queued steps
/// Runs all queued steps.
/// </summary>
/// <param name="token">Provided <see cref="CancellationToken"/> to allow cancellation.</param>
void Run(CancellationToken token);

/// <summary>
/// Queues an <see cref="IStep"/> for execution.
/// </summary>
/// <param name="activity">The step to queue.</param>
void Queue(IStep activity);
/// <param name="token">The cancellation token, allowing the runner to cancel the operation.</param>
/// <returns>A task that represents the completion of the operation.</returns>
Task RunAsync(CancellationToken token);
}
20 changes: 20 additions & 0 deletions src/CommonUtilities.SimplePipeline/src/IStepQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Collections.Generic;

namespace AnakinRaW.CommonUtilities.SimplePipeline;

/// <summary>
/// A queue of <see cref="IStep"/>.
/// </summary>
public interface IStepQueue
{
/// <summary>
/// List of only those steps which are scheduled for execution of an <see cref="IRunner"/>.
/// </summary>
public IReadOnlyList<IStep> Steps { get; }

/// <summary>
/// Adds an <see cref="IStep"/> to the <see cref="IRunner"/>.
/// </summary>
/// <param name="activity">The step to app.</param>
void AddStep(IStep activity);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
namespace AnakinRaW.CommonUtilities.SimplePipeline;

/// <summary>
/// Specialized <see cref="IRunner"/> which allows
/// Specialized <see cref="IRunner"/> which allows for synchronous waiting.
/// </summary>
public interface IParallelRunner : IRunner
public interface ISynchronizedRunner : IRunner
{
/// <summary>
/// Synchronously waits for this runner for all of its steps to be finished.
Expand All @@ -18,5 +18,6 @@ public interface IParallelRunner : IRunner
/// </summary>
/// <param name="waitDuration">The time duration to wait.</param>
/// <exception cref="TimeoutException">If <paramref name="waitDuration"/> expired.</exception>
/// <exception cref="AggregateException">If any of the steps failed with an exception.</exception>
void Wait(TimeSpan waitDuration);
}
14 changes: 8 additions & 6 deletions src/CommonUtilities.SimplePipeline/src/Pipelines/IPipeline.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace AnakinRaW.CommonUtilities.SimplePipeline;

/// <summary>
/// A pipeline can run multiple operations in sequence or simultaneously, based on how it was prepared.
/// </summary>
public interface IPipeline : IDisposable
{
{
/// <summary>
/// Prepares this instance for execution.
/// </summary>
/// <remarks>
/// Preparation can only be done once per instance.
/// </remarks>
/// <returns><see langword="true"/> if the preparation was successful; <see langword="false"/> otherwise.</returns>
bool Prepare();

/// <returns>A task that represents whether the preparation was successful.</returns>
Task<bool> PrepareAsync();
/// <summary>
/// Runs pipeline.
/// Runs pipeline synchronously.
/// </summary>
/// <param name="token">Provided <see cref="CancellationToken"/> to allow cancellation.</param>
/// <returns>A task that represents the operation completion.</returns>
/// <exception cref="OperationCanceledException">If <paramref name="token"/> was requested for cancellation.</exception>
/// <exception cref="StepFailureException">The pipeline may throw this exception if one or many steps failed.</exception>
void Run(CancellationToken token = default);
Task RunAsync(CancellationToken token = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline;

/// <summary>
/// A simple pipeline that runs all steps on the thread pool in parallel.
/// The <see cref="Pipeline.Run"/> will block until the pipeline has finished all steps.
/// </summary>
public abstract class ParallelPipeline : SimplePipeline<ParallelRunner>
{
Expand All @@ -27,17 +26,4 @@ protected sealed override ParallelRunner CreateRunner()
{
return new ParallelRunner(_workerCount, ServiceProvider);
}

/// <inheritdoc/>
protected sealed override void OnRunning(ParallelRunner buildRunner)
{
try
{
buildRunner.Wait();
}
catch
{
// Ignore
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using AnakinRaW.CommonUtilities.SimplePipeline.Runners;

namespace AnakinRaW.CommonUtilities.SimplePipeline;

/// <summary>
/// A simple pipeline that runs all steps on the thread pool in parallel. Allows to run the pipeline even if preparation is not completed.
/// </summary>
/// <remarks>
/// Useful, if preparation is work intensive.
/// </remarks>
public abstract class ParallelProducerConsumerPipeline : DisposableObject, IPipeline
{
private readonly bool _failFast;
private CancellationTokenSource? _linkedCancellationTokenSource;
private readonly ParallelProducerConsumerRunner _runner;

private bool? _prepareSuccessful;

/// <summary>
/// Gets or sets a value indicating whether the pipeline has encountered a failure.
/// </summary>
protected bool PipelineFailed { get; set; }


/// <summary>
/// Initializes a new instance of the <see cref="ParallelProducerConsumerPipeline"/> class.
/// </summary>
/// <param name="serviceProvider">The service provider for dependency injection within the pipeline.</param>
/// <param name="workerCount">The number of worker threads to be used for parallel execution.</param>
/// <param name="failFast">A value indicating whether the pipeline should fail fast.</param>
protected ParallelProducerConsumerPipeline(IServiceProvider serviceProvider, int workerCount = 4, bool failFast = true)
{
_failFast = failFast;
_runner = new ParallelProducerConsumerRunner(workerCount, serviceProvider);
}

/// <inheritdoc/>
public async Task<bool> PrepareAsync()
{
ThrowIfDisposed();
if (_prepareSuccessful.HasValue)
return _prepareSuccessful.Value;

await BuildSteps(_runner).ConfigureAwait(false);

_prepareSuccessful = true;
return _prepareSuccessful.Value;
}

/// <inheritdoc/>
public async Task RunAsync(CancellationToken token = default)
{
ThrowIfDisposed();
token.ThrowIfCancellationRequested();

if (_prepareSuccessful is false)
return;

try
{
_linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);

if (_prepareSuccessful is null)
{
Task.Run(async () =>
{
try
{
var result = await PrepareAsync().ConfigureAwait(false);
if (!result)
_linkedCancellationTokenSource?.Cancel();
}
finally
{
_runner.Finish();
}
}, token).Forget();
}



_runner.Error += OnError;
await _runner.RunAsync(_linkedCancellationTokenSource.Token).ConfigureAwait(false);
}
finally
{
_runner.Error -= OnError;
if (_linkedCancellationTokenSource is not null)
{
_linkedCancellationTokenSource.Dispose();
_linkedCancellationTokenSource = null;
}
}

if (!PipelineFailed && _prepareSuccessful.HasValue && _prepareSuccessful.Value)
return;

if (_prepareSuccessful is not true)
throw new InvalidOperationException("Preparation of the pipeline failed.");

var failedBuildSteps = _runner.Steps
.Where(p => p.Error != null && !p.Error.IsExceptionType<OperationCanceledException>())
.ToList();

if (failedBuildSteps.Any())
throw new StepFailureException(failedBuildSteps);
}

/// <summary>
/// Builds the steps in the order they should be executed within the pipeline.
/// </summary>
/// <returns>A list of steps in the order they should be executed.</returns>
protected abstract Task BuildSteps(IStepQueue queue);

/// <summary>
/// Called when an error occurs within a step.
/// </summary>
/// <param name="sender">The sender of the event.</param>
/// <param name="e">The event arguments.</param>
protected virtual void OnError(object sender, StepErrorEventArgs e)
{
PipelineFailed = true;
if (_failFast || e.Cancel)
_linkedCancellationTokenSource?.Cancel();
}
}
17 changes: 9 additions & 8 deletions src/CommonUtilities.SimplePipeline/src/Pipelines/Pipeline.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace AnakinRaW.CommonUtilities.SimplePipeline;

Expand All @@ -9,39 +10,39 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline;
public abstract class Pipeline : DisposableObject, IPipeline
{
private bool? _prepareSuccessful;

/// <inheritdoc/>
public bool Prepare()
public async Task<bool> PrepareAsync()
{
if (IsDisposed)
throw new ObjectDisposedException("Pipeline already disposed");
if (_prepareSuccessful.HasValue)
return _prepareSuccessful.Value;
_prepareSuccessful = PrepareCore();
_prepareSuccessful = await PrepareCoreAsync().ConfigureAwait(false);
return _prepareSuccessful.Value;
}

/// <inheritdoc/>
public void Run(CancellationToken token = default)
public async Task RunAsync(CancellationToken token = default)
{
if (IsDisposed)
throw new ObjectDisposedException("Pipeline already disposed");
token.ThrowIfCancellationRequested();
if (!Prepare())
if (!await PrepareAsync().ConfigureAwait(false))
return;
RunCore(token);
await RunCoreAsync(token).ConfigureAwait(false);
}

/// <summary>
/// Performs the actual preparation of this instance.
/// </summary>
/// <returns><see langword="true"/> if the planning was successful; <see langword="false"/> otherwise.</returns>
protected abstract bool PrepareCore();
protected abstract Task<bool> PrepareCoreAsync();

/// <summary>
/// Implements the run logic of this instance.
/// </summary>
/// <remarks>It's assured this instance is already prepared when this method gets called.</remarks>
/// <param name="token">Provided <see cref="CancellationToken"/> to allow cancellation.</param>
protected abstract void RunCore(CancellationToken token);
protected abstract Task RunCoreAsync(CancellationToken token);
}
Loading

0 comments on commit cfcfdfd

Please sign in to comment.