Skip to content

Commit

Permalink
move more logic to base impl of pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
AnakinRaW committed Jun 1, 2024
1 parent a47bc1c commit 0666b10
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline;
/// Useful, if preparation is work intensive.
/// </remarks>
public abstract class ParallelProducerConsumerPipeline : Pipeline
{
private readonly bool _failFast;
private CancellationTokenSource? _linkedCancellationTokenSource;
{
private readonly ParallelProducerConsumerRunner _runner;

private Exception? _preparationException;


/// <inheritdoc />
protected override bool FailFast { get; }

/// <summary>
/// Initializes a new instance of the <see cref="ParallelProducerConsumerPipeline"/> class.
/// </summary>
Expand All @@ -28,7 +29,7 @@ public abstract class ParallelProducerConsumerPipeline : Pipeline
/// <param name="failFast">A value indicating whether the pipeline should fail fast.</param>
protected ParallelProducerConsumerPipeline(int workerCount, bool failFast, IServiceProvider serviceProvider) : base(serviceProvider)
{
_failFast = failFast;
FailFast = failFast;
_runner = new ParallelProducerConsumerRunner(workerCount, serviceProvider);
}

Expand All @@ -41,8 +42,6 @@ public sealed override async Task RunAsync(CancellationToken token = default)
if (PrepareSuccessful is false)
return;

_linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);

if (PrepareSuccessful is null)
{
Task.Run(async () =>
Expand All @@ -53,7 +52,7 @@ public sealed override async Task RunAsync(CancellationToken token = default)
if (!result)
{
PipelineFailed = true;
_linkedCancellationTokenSource?.Cancel();
Cancel();
}
}
catch (Exception e)
Expand All @@ -70,7 +69,7 @@ public sealed override async Task RunAsync(CancellationToken token = default)

try
{
await RunCoreAsync(_linkedCancellationTokenSource.Token).ConfigureAwait(false);
await RunCoreAsync(token).ConfigureAwait(false);
}
catch (Exception)
{
Expand Down Expand Up @@ -104,11 +103,6 @@ protected override async Task RunCoreAsync(CancellationToken token)
finally
{
_runner.Error -= OnError;
if (_linkedCancellationTokenSource is not null)
{
_linkedCancellationTokenSource.Dispose();
_linkedCancellationTokenSource = null;
}
}

if (!PipelineFailed)
Expand All @@ -120,18 +114,6 @@ protected override async Task RunCoreAsync(CancellationToken token)
ThrowIfAnyStepsFailed(_runner.Steps);
}

/// <summary>
/// Called when an error occurs within a step.
/// </summary>
/// <param name="sender">The sender of the event.</param>
/// <param name="e">The event arguments.</param>
protected virtual void OnError(object sender, StepErrorEventArgs e)
{
PipelineFailed = true;
if (_failFast || e.Cancel)
_linkedCancellationTokenSource?.Cancel();
}

/// <inheritdoc />
protected override void DisposeManagedResources()
{
Expand Down
43 changes: 42 additions & 1 deletion src/CommonUtilities.SimplePipeline/src/Pipelines/Pipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline;
/// </summary>
public abstract class Pipeline : DisposableObject, IPipeline
{
private CancellationTokenSource? _linkedCancellationTokenSource;

/// <summary>
/// The service provider of the <see cref="SimplePipeline{TRunner}"/>.
/// </summary>
Expand All @@ -33,6 +35,11 @@ public abstract class Pipeline : DisposableObject, IPipeline
/// </summary>
public bool PipelineFailed { get; protected set; }

/// <summary>
/// Gets a value indicating the pipeline shall abort execution on the first received error.
/// </summary>
protected virtual bool FailFast => false;

/// <summary>
///
/// </summary>
Expand Down Expand Up @@ -62,7 +69,20 @@ public virtual async Task RunAsync(CancellationToken token = default)

try
{
await RunCoreAsync(token).ConfigureAwait(false);

try
{
_linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
await RunCoreAsync(_linkedCancellationTokenSource.Token).ConfigureAwait(false);
}
finally
{
if (_linkedCancellationTokenSource is not null)
{
_linkedCancellationTokenSource.Dispose();
_linkedCancellationTokenSource = null;
}
}
}
catch (Exception)
{
Expand All @@ -71,6 +91,14 @@ public virtual async Task RunAsync(CancellationToken token = default)
}
}

/// <summary>
/// Cancels the pipeline
/// </summary>
public void Cancel()
{
_linkedCancellationTokenSource?.Cancel();
}

/// <inheritdoc/>
public override string ToString()
{
Expand Down Expand Up @@ -104,4 +132,17 @@ protected void ThrowIfAnyStepsFailed(IEnumerable<IStep> steps)
if (failedBuildSteps.Any())
throw new StepFailureException(failedBuildSteps);
}

/// <summary>
/// The default event handler that can be used when an error occurs within a step.
/// <see cref="PipelineFailed"/> is set to <see langword="true"/>. When <see cref="FailFast"/> is <see langword="true"/>, the pipeline gets cancelled.
/// </summary>
/// <param name="sender">The sender of the event.</param>
/// <param name="e">The event arguments.</param>
protected virtual void OnError(object sender, StepErrorEventArgs e)
{
PipelineFailed = true;
if (FailFast || e.Cancel)
Cancel();
}
}
31 changes: 6 additions & 25 deletions src/CommonUtilities.SimplePipeline/src/Pipelines/SimplePipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ namespace AnakinRaW.CommonUtilities.SimplePipeline;
/// </summary>
/// <typeparam name="TRunner">The type of the step runner.</typeparam>
public abstract class SimplePipeline<TRunner> : Pipeline where TRunner : StepRunner
{
private readonly bool _failFast;

private CancellationTokenSource? _linkedCancellationTokenSource;
{
private IRunner _buildRunner = null!;

/// <inheritdoc />
protected override bool FailFast { get; }

/// <summary>
/// Initializes a new instance of the <see cref="SimplePipeline{TRunner}"/> class.
/// </summary>
Expand All @@ -27,7 +27,7 @@ public abstract class SimplePipeline<TRunner> : Pipeline where TRunner : StepRun
/// </remarks>
protected SimplePipeline(IServiceProvider serviceProvider, bool failFast = true) : base(serviceProvider)
{
_failFast = failFast;
FailFast = failFast;
}

/// <inheritdoc/>
Expand Down Expand Up @@ -66,19 +66,12 @@ protected override async Task RunCoreAsync(CancellationToken token)
{
try
{
_linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);

_buildRunner.Error += OnError;
await _buildRunner.RunAsync(_linkedCancellationTokenSource.Token).ConfigureAwait(false);
await _buildRunner.RunAsync(token).ConfigureAwait(false);
}
finally
{
_buildRunner.Error -= OnError;
if (_linkedCancellationTokenSource is not null)
{
_linkedCancellationTokenSource.Dispose();
_linkedCancellationTokenSource = null;
}
}

if (!PipelineFailed)
Expand All @@ -87,18 +80,6 @@ protected override async Task RunCoreAsync(CancellationToken token)
ThrowIfAnyStepsFailed(_buildRunner.Steps);
}

/// <summary>
/// Called when an error occurs within a step.
/// </summary>
/// <param name="sender">The sender of the event.</param>
/// <param name="e">The event arguments.</param>
protected virtual void OnError(object sender, StepErrorEventArgs e)
{
PipelineFailed = true;
if (_failFast || e.Cancel)
_linkedCancellationTokenSource?.Cancel();
}

/// <inheritdoc />
protected override void DisposeManagedResources()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using Moq.Protected;
using Xunit;

namespace AnakinRaW.CommonUtilities.SimplePipeline.Test;
namespace AnakinRaW.CommonUtilities.SimplePipeline.Test.Pipelines;

public class PipelineTest
{
Expand Down Expand Up @@ -34,12 +34,12 @@ public async Task Test_Run()
};

pipeline.Protected().Setup<Task<bool>>("PrepareCoreAsync").Returns(Task.FromResult(true));

await pipeline.Object.RunAsync();
await pipeline.Object.RunAsync();

pipeline.Protected().Verify<Task<bool>>("PrepareCoreAsync", Times.Exactly(1));
pipeline.Protected().Verify("RunCoreAsync", Times.Exactly(2), false, (CancellationToken) default);
pipeline.Protected().Verify("RunCoreAsync", Times.Exactly(2), false, ItExpr.IsAny<CancellationToken>());
}

[Fact]
Expand All @@ -58,7 +58,7 @@ public async Task Test_Prepare_Run()
await pipeline.Object.RunAsync();

pipeline.Protected().Verify<Task<bool>>("PrepareCoreAsync", Times.Exactly(1));
pipeline.Protected().Verify("RunCoreAsync", Times.Exactly(2), false, (CancellationToken)default);
pipeline.Protected().Verify("RunCoreAsync", Times.Exactly(2), false, ItExpr.IsAny<CancellationToken>());
}

[Fact]
Expand Down Expand Up @@ -89,7 +89,7 @@ public async Task Test_Prepare_Disposed_ThrowsObjectDisposedException()
pipeline.Object.Dispose();
pipeline.Object.Dispose();

await Assert.ThrowsAsync<ObjectDisposedException>(async() => await pipeline.Object.PrepareAsync());
await Assert.ThrowsAsync<ObjectDisposedException>(async () => await pipeline.Object.PrepareAsync());
}

[Fact]
Expand All @@ -106,4 +106,49 @@ public async Task Test_Run_Disposed_ThrowsObjectDisposedException()

await Assert.ThrowsAsync<ObjectDisposedException>(async () => await pipeline.Object.RunAsync());
}

[Fact]
public async Task Test_Cancel()
{
var sp = new Mock<IServiceProvider>().Object;
var pipeline = new Mock<Pipeline>(sp)
{
CallBase = true
};

var callbackRunTsc = new TaskCompletionSource<int>();
var cancelledTsc = new TaskCompletionSource<int>();

CancellationToken runToken = default;

var callbackTask = Task.Run(async () =>
{
await callbackRunTsc.Task;
Assert.True(runToken.CanBeCanceled);
await cancelledTsc.Task;
Assert.True(runToken.IsCancellationRequested);
});

pipeline.Protected().Setup<Task<bool>>("PrepareCoreAsync").Returns(Task.FromResult(true));
pipeline.Protected().Setup<Task>("RunCoreAsync", ItExpr.IsAny<CancellationToken>())
.Callback((CancellationToken token) =>
{
runToken = token;
callbackRunTsc.SetResult(0);
}).Returns(callbackTask);


var pipelineTask = pipeline.Object.RunAsync();

pipeline.Object.Cancel();
cancelledTsc.SetResult(0);

await pipelineTask;

Assert.True(callbackTask.IsCompleted);

pipeline.Protected().Verify<Task<bool>>("PrepareCoreAsync", Times.Exactly(1));
pipeline.Protected().Verify("RunCoreAsync", Times.Exactly(1), false, ItExpr.IsAny<CancellationToken>());
}
}

0 comments on commit 0666b10

Please sign in to comment.