Skip to content

Commit

Permalink
more pipeline improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
AnakinRaW committed Apr 20, 2024
1 parent 15e7627 commit 35b449a
Show file tree
Hide file tree
Showing 20 changed files with 715 additions and 432 deletions.
14 changes: 13 additions & 1 deletion src/CommonUtilities.SimplePipeline/src/IRunner.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -7,7 +8,7 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline;
/// <summary>
/// Execution engine to run one or many <see cref="IStep"/>s.
/// </summary>
public interface IRunner : IStepQueue, IDisposable
public interface IRunner : IDisposable
{
/// <summary>
/// Gets raised when the execution of an <see cref="IStep"/> failed with an exception.
Expand All @@ -20,4 +21,15 @@ public interface IRunner : IStepQueue, IDisposable
/// <param name="token">The cancellation token, allowing the runner to cancel the operation.</param>
/// <returns>A task that represents the completion of the operation.</returns>
Task RunAsync(CancellationToken token);

/// <summary>
/// List of only those steps which are scheduled for execution of an <see cref="IRunner"/>.
/// </summary>
public IReadOnlyList<IStep> Steps { get; }

/// <summary>
/// Adds an <see cref="IStep"/> to the <see cref="IRunner"/>.
/// </summary>
/// <param name="activity">The step to app.</param>
void AddStep(IStep activity);
}
20 changes: 0 additions & 20 deletions src/CommonUtilities.SimplePipeline/src/IStepQueue.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System;
using System.Linq;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using AnakinRaW.CommonUtilities.SimplePipeline.Runners;
Expand All @@ -12,79 +12,94 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline;
/// <remarks>
/// Useful, if preparation is work intensive.
/// </remarks>
public abstract class ParallelProducerConsumerPipeline : DisposableObject, IPipeline
public abstract class ParallelProducerConsumerPipeline : Pipeline
{
private readonly bool _failFast;
private CancellationTokenSource? _linkedCancellationTokenSource;
private readonly ParallelProducerConsumerRunner _runner;

private bool? _prepareSuccessful;

/// <summary>
/// Gets or sets a value indicating whether the pipeline has encountered a failure.
/// </summary>
protected bool PipelineFailed { get; set; }


private Exception? _preparationException;

/// <summary>
/// Initializes a new instance of the <see cref="ParallelProducerConsumerPipeline"/> class.
/// </summary>
/// <param name="serviceProvider">The service provider for dependency injection within the pipeline.</param>
/// <param name="workerCount">The number of worker threads to be used for parallel execution.</param>
/// <param name="failFast">A value indicating whether the pipeline should fail fast.</param>
protected ParallelProducerConsumerPipeline(IServiceProvider serviceProvider, int workerCount = 4, bool failFast = true)
protected ParallelProducerConsumerPipeline(int workerCount, bool failFast, IServiceProvider serviceProvider) : base(serviceProvider)
{
_failFast = failFast;
_runner = new ParallelProducerConsumerRunner(workerCount, serviceProvider);
}

/// <inheritdoc/>
public async Task<bool> PrepareAsync()
{
ThrowIfDisposed();
if (_prepareSuccessful.HasValue)
return _prepareSuccessful.Value;

await BuildSteps(_runner).ConfigureAwait(false);

_prepareSuccessful = true;
return _prepareSuccessful.Value;
}

/// <inheritdoc/>
public async Task RunAsync(CancellationToken token = default)
public sealed override async Task RunAsync(CancellationToken token = default)
{
ThrowIfDisposed();
token.ThrowIfCancellationRequested();

if (_prepareSuccessful is false)
if (PrepareSuccessful is false)
return;

try
{
_linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
_linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);

if (_prepareSuccessful is null)
if (PrepareSuccessful is null)
{
Task.Run(async () =>
{
Task.Run(async () =>
try
{
try
{
var result = await PrepareAsync().ConfigureAwait(false);
if (!result)
_linkedCancellationTokenSource?.Cancel();
}
finally
var result = await PrepareAsync().ConfigureAwait(false);
if (!result)
{
_runner.Finish();
PipelineFailed = true;
_linkedCancellationTokenSource?.Cancel();
}
}, token).Forget();
}
}
catch (Exception e)
{
PipelineFailed = true;
_preparationException = e;
}
finally
{
_runner.Finish();
}
}, token).Forget();
}

try
{
await RunCoreAsync(_linkedCancellationTokenSource.Token).ConfigureAwait(false);
}
catch (Exception)
{
PipelineFailed = true;
throw;
}
}

/// <summary>
/// Builds the steps in the order they should be executed within the pipeline.
/// </summary>
/// <returns>A list of steps in the order they should be executed.</returns>
protected abstract IAsyncEnumerable<IStep> BuildSteps();

/// <inheritdoc/>
protected override async Task<bool> PrepareCoreAsync()
{
await foreach (var step in BuildSteps().ConfigureAwait(false))
_runner.AddStep(step);
return true;
}

/// <inheritdoc/>
protected override async Task RunCoreAsync(CancellationToken token)
{
try
{
_runner.Error += OnError;
await _runner.RunAsync(_linkedCancellationTokenSource.Token).ConfigureAwait(false);
await _runner.RunAsync(token).ConfigureAwait(false);
}
finally
{
Expand All @@ -96,26 +111,15 @@ public async Task RunAsync(CancellationToken token = default)
}
}

if (!PipelineFailed && _prepareSuccessful.HasValue && _prepareSuccessful.Value)
if (!PipelineFailed)
return;

if (_prepareSuccessful is not true)
throw new InvalidOperationException("Preparation of the pipeline failed.");

var failedBuildSteps = _runner.Steps
.Where(p => p.Error != null && !p.Error.IsExceptionType<OperationCanceledException>())
.ToList();
if (_preparationException is not null)
throw _preparationException;

if (failedBuildSteps.Any())
throw new StepFailureException(failedBuildSteps);
ThrowIfAnyStepsFailed(_runner.Steps);
}

/// <summary>
/// Builds the steps in the order they should be executed within the pipeline.
/// </summary>
/// <returns>A list of steps in the order they should be executed.</returns>
protected abstract Task BuildSteps(IStepQueue queue);

/// <summary>
/// Called when an error occurs within a step.
/// </summary>
Expand All @@ -127,4 +131,11 @@ protected virtual void OnError(object sender, StepErrorEventArgs e)
if (_failFast || e.Cancel)
_linkedCancellationTokenSource?.Cancel();
}

/// <inheritdoc />
protected override void DisposeManagedResources()
{
base.DisposeManagedResources();
_runner.Dispose();
}
}
83 changes: 71 additions & 12 deletions src/CommonUtilities.SimplePipeline/src/Pipelines/Pipeline.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -9,28 +13,68 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline;
/// </summary>
public abstract class Pipeline : DisposableObject, IPipeline
{
private bool? _prepareSuccessful;
/// <summary>
/// The service provider of the <see cref="SimplePipeline{TRunner}"/>.
/// </summary>
protected readonly IServiceProvider ServiceProvider;

/// <summary>
/// The logger of the <see cref="SimplePipeline{TRunner}"/>.
/// </summary>
protected readonly ILogger? Logger;

/// <summary>
/// Gets a value indicating whether the preparation of the <see cref="Pipeline"/> was successful.
/// </summary>
protected bool? PrepareSuccessful { get; private set; }

/// <summary>
/// Gets or sets a value indicating whether the pipeline has encountered a failure.
/// </summary>
public bool PipelineFailed { get; protected set; }

/// <summary>
///
/// </summary>
protected Pipeline(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
Logger = serviceProvider.GetService<ILoggerFactory>()?.CreateLogger(GetType());
}

/// <inheritdoc/>
public async Task<bool> PrepareAsync()
{
if (IsDisposed)
throw new ObjectDisposedException("Pipeline already disposed");
if (_prepareSuccessful.HasValue)
return _prepareSuccessful.Value;
_prepareSuccessful = await PrepareCoreAsync().ConfigureAwait(false);
return _prepareSuccessful.Value;
ThrowIfDisposed();
if (PrepareSuccessful.HasValue)
return PrepareSuccessful.Value;
PrepareSuccessful = await PrepareCoreAsync().ConfigureAwait(false);
return PrepareSuccessful.Value;
}

/// <inheritdoc/>
public async Task RunAsync(CancellationToken token = default)
public virtual async Task RunAsync(CancellationToken token = default)
{
if (IsDisposed)
throw new ObjectDisposedException("Pipeline already disposed");
ThrowIfDisposed();
token.ThrowIfCancellationRequested();
if (!await PrepareAsync().ConfigureAwait(false))
return;
await RunCoreAsync(token).ConfigureAwait(false);

try
{
await RunCoreAsync(token).ConfigureAwait(false);
}
catch (Exception)
{
PipelineFailed = true;
throw;
}
}

/// <inheritdoc/>
public override string ToString()
{
return GetType().Name;
}

/// <summary>
Expand All @@ -45,4 +89,19 @@ public async Task RunAsync(CancellationToken token = default)
/// <remarks>It's assured this instance is already prepared when this method gets called.</remarks>
/// <param name="token">Provided <see cref="CancellationToken"/> to allow cancellation.</param>
protected abstract Task RunCoreAsync(CancellationToken token);

/// <summary>
/// Throws an <see cref="StepFailureException"/> if any of the passed steps ended with an error that is not the result of cancellation.
/// </summary>
/// <param name="steps">The steps that were executed by the pipeline.</param>
/// <exception cref="StepFailureException">If any of <paramref name="steps"/> has an error that is not the result of cancellation.</exception>
protected void ThrowIfAnyStepsFailed(IEnumerable<IStep> steps)
{
var failedBuildSteps = steps
.Where(p => p.Error != null && !p.Error.IsExceptionType<OperationCanceledException>())
.ToList();

if (failedBuildSteps.Any())
throw new StepFailureException(failedBuildSteps);
}
}
Loading

0 comments on commit 35b449a

Please sign in to comment.