-
Notifications
You must be signed in to change notification settings - Fork 419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix AwaitCompletion to yield results during source iteration #505
Changes from 20 commits
1ba1162
a30df54
d973670
86c94f9
4a4e08d
3111a90
688c2ee
616241b
4514060
da262bc
03419d3
91f4896
98ddd41
cf71a25
af05bd6
5f6a23b
5c759e6
69d5341
8dbdcbe
57212f2
bda7187
e956b46
ec6e365
467ba7c
4a5cfd3
30478c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -416,45 +416,124 @@ public static IAwaitQuery<TResult> AwaitCompletion<T, TTaskResult, TResult>( | |
|
||
return | ||
AwaitQuery.Create( | ||
options => _(options.MaxConcurrency ?? int.MaxValue, | ||
options => _(options.MaxConcurrency, | ||
options.Scheduler ?? TaskScheduler.Default, | ||
options.PreserveOrder)); | ||
|
||
IEnumerable<TResult> _(int maxConcurrency, TaskScheduler scheduler, bool ordered) | ||
IEnumerable<TResult> _(int? maxConcurrency, TaskScheduler scheduler, bool ordered) | ||
{ | ||
// A separate task will enumerate the source and launch tasks. | ||
// It will post all progress as notices to the collection below. | ||
// A notice is essentially a discriminated union like: | ||
// | ||
// type Notice<'a, 'b> = | ||
// | End | ||
// | Result of (int * 'a * Task<'b>) | ||
// | Error of ExceptionDispatchInfo | ||
// | ||
// Note that BlockingCollection.CompleteAdding is never used to | ||
// to mark the end (which its own notice above) because | ||
// BlockingCollection.Add throws if called after CompleteAdding | ||
// and we want to deliberately tolerate the race condition. | ||
|
||
var notices = new BlockingCollection<(Notice, (int, T, Task<TTaskResult>), ExceptionDispatchInfo)>(); | ||
var cancellationTokenSource = new CancellationTokenSource(); | ||
var cancellationToken = cancellationTokenSource.Token; | ||
var completed = false; | ||
|
||
var enumerator = | ||
source.Index() | ||
.Select(e => (e.Key, Item: e.Value, Task: evaluator(e.Value, cancellationToken))) | ||
.GetEnumerator(); | ||
var consumerCancellationTokenSource = new CancellationTokenSource(); | ||
(Exception, Exception) lastCriticalErrors = default; | ||
|
||
void PostNotice(Notice notice, | ||
(int, T, Task<TTaskResult>) item, | ||
Exception error) | ||
{ | ||
// If a notice fails to post then assume critical error | ||
// conditions (like low memory), capture the error without | ||
// further allocation of resources and trip the cancellation | ||
// token source used by the main loop waiting on notices. | ||
// Note that only the "last" critical error is reported | ||
// as maintaining a list would incur allocations. The idea | ||
// here is to make a best effort attempt to report any of | ||
// the error conditions that may be occuring, which is still | ||
// better than nothing. | ||
|
||
try | ||
{ | ||
var edi = error != null | ||
? ExceptionDispatchInfo.Capture(error) | ||
: null; | ||
notices.Add((notice, item, edi)); | ||
} | ||
catch (Exception e) | ||
{ | ||
// Don't use ExceptionDispatchInfo.Capture here to avoid | ||
// inducing allocations if already under low memory | ||
// conditions. | ||
|
||
lastCriticalErrors = (e, error); | ||
consumerCancellationTokenSource.Cancel(); | ||
throw; | ||
} | ||
} | ||
|
||
var completed = false; | ||
var cancellationTokenSource = new CancellationTokenSource(); | ||
|
||
var enumerator = source.Index().GetEnumerator(); | ||
IDisposable disposable = enumerator; // disables AccessToDisposedClosure warnings | ||
|
||
try | ||
{ | ||
var cancellationToken = cancellationTokenSource.Token; | ||
|
||
// Fire-up a parallel loop to iterate through the source and | ||
// launch tasks, posting a result-notice as each task | ||
// completes and another, an end-notice, when all tasks have | ||
// completed. | ||
|
||
Task.Factory.StartNew( | ||
() => | ||
CollectToAsync( | ||
enumerator, | ||
e => e.Task, | ||
notices, | ||
(e, r) => (Notice.Result, (e.Key, e.Item, e.Task), default), | ||
ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)), | ||
(Notice.End, default, default), | ||
maxConcurrency, cancellationTokenSource), | ||
async () => | ||
{ | ||
try | ||
{ | ||
await enumerator.StartAsync( | ||
e => evaluator(e.Value, cancellationToken), | ||
(e, r) => PostNotice(Notice.Result, (e.Key, e.Value, r), default), | ||
() => PostNotice(Notice.End, default, default), | ||
maxConcurrency, cancellationToken); | ||
} | ||
catch (Exception e) | ||
{ | ||
PostNotice(Notice.Error, default, e); | ||
} | ||
}, | ||
CancellationToken.None, | ||
TaskCreationOptions.DenyChildAttach, | ||
scheduler); | ||
|
||
// Remainde here is the main loop that waits for and | ||
// processes notices. | ||
|
||
var nextKey = 0; | ||
var holds = ordered ? new List<(int, T, Task<TTaskResult>)>() : null; | ||
|
||
foreach (var (kind, result, error) in notices.GetConsumingEnumerable()) | ||
using (var notice = notices.GetConsumingEnumerable(consumerCancellationTokenSource.Token) | ||
.GetEnumerator()) | ||
while (true) | ||
{ | ||
try | ||
{ | ||
if (!notice.MoveNext()) | ||
break; | ||
} | ||
catch (OperationCanceledException e) when (e.CancellationToken == consumerCancellationTokenSource.Token) | ||
{ | ||
var (error1, error2) = lastCriticalErrors; | ||
throw new Exception("One or more critical errors have occurred.", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the purpose of wrapping the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The rationale behind this is that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not following. This exception is telling me one or more errors occured. And this is precisely what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
error2 != null ? new AggregateException(error1, error2) | ||
: new AggregateException(error1)); | ||
} | ||
|
||
var (kind, result, error) = notice.Current; | ||
|
||
if (kind == Notice.Error) | ||
error.Throw(); | ||
|
||
|
@@ -531,149 +610,82 @@ IEnumerable<TResult> _(int maxConcurrency, TaskScheduler scheduler, bool ordered | |
} | ||
} | ||
|
||
enum Notice { Result, Error, End } | ||
|
||
static async Task CollectToAsync<T, TResult, TNotice>( | ||
this IEnumerator<T> e, | ||
Func<T, Task<TResult>> taskSelector, | ||
BlockingCollection<TNotice> collection, | ||
Func<T, Task<TResult>, TNotice> completionNoticeSelector, | ||
Func<Exception, TNotice> errorNoticeSelector, | ||
TNotice endNotice, | ||
int maxConcurrency, | ||
CancellationTokenSource cancellationTokenSource) | ||
enum Notice { End, Result, Error } | ||
|
||
static async Task StartAsync<T, TResult>( | ||
this IEnumerator<T> enumerator, | ||
Func<T, Task<TResult>> starter, | ||
Action<T, Task<TResult>> onCompletion, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is probably better named There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can understand why you'd say that and you'll notice that I got side-tracked in d973670 by the observer pattern here. Later in da262bc, I reverted as allowing harmless race conditions meant I couldn't guarantee the pattern's semantics. I find There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I see why you don't want |
||
Action onEnd, | ||
int? maxConcurrency, | ||
CancellationToken cancellationToken) | ||
{ | ||
Reader<T> reader = null; | ||
if (enumerator == null) throw new ArgumentNullException(nameof(enumerator)); | ||
if (starter == null) throw new ArgumentNullException(nameof(starter)); | ||
if (onCompletion == null) throw new ArgumentNullException(nameof(onCompletion)); | ||
if (onEnd == null) throw new ArgumentNullException(nameof(onEnd)); | ||
if (maxConcurrency < 1) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); | ||
|
||
try | ||
using (enumerator) | ||
{ | ||
reader = new Reader<T>(e); | ||
var pendingCount = 1; // terminator | ||
|
||
var cancellationToken = cancellationTokenSource.Token; | ||
var cancellationTaskSource = new TaskCompletionSource<bool>(); | ||
cancellationToken.Register(() => cancellationTaskSource.TrySetResult(true)); | ||
|
||
var tasks = new List<(T Item, Task<TResult> Task)>(); | ||
|
||
for (var i = 0; i < maxConcurrency; i++) | ||
void OnPendingCompleted() | ||
{ | ||
if (!reader.TryRead(out var item)) | ||
break; | ||
tasks.Add((item, taskSelector(item))); | ||
if (Interlocked.Decrement(ref pendingCount) == 0) | ||
onEnd(); | ||
} | ||
|
||
while (tasks.Count > 0) | ||
var semaphore = maxConcurrency is int count | ||
? new SemaphoreSlim(count, count) | ||
: null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Finally the docs don't lie 😉 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Huh? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. previously, using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I see. Yeah. 😃 |
||
|
||
while (enumerator.MoveNext()) | ||
{ | ||
// Task.WaitAny is synchronous and blocking but allows the | ||
// waiting to be cancelled via a CancellationToken. | ||
// Task.WhenAny can be awaited so it is better since the | ||
// thread won't be blocked and can return to the pool. | ||
// However, it doesn't support cancellation so instead a | ||
// task is built on top of the CancellationToken that | ||
// completes when the CancellationToken trips. | ||
// | ||
// Also, Task.WhenAny returns the task (Task) object that | ||
// completed but task objects may not be unique due to | ||
// caching, e.g.: | ||
// | ||
// async Task<bool> Foo() => true; | ||
// async Task<bool> Bar() => true; | ||
// var foo = Foo(); | ||
// var bar = Bar(); | ||
// var same = foo.Equals(bar); // == true | ||
// | ||
// In this case, the task returned by Task.WhenAny will | ||
// match `foo` and `bar`: | ||
// | ||
// var done = Task.WhenAny(foo, bar); | ||
// | ||
// Logically speaking, the uniqueness of a task does not | ||
// matter but here it does, especially when Await (the main | ||
// user of CollectAsync) needs to return results ordered. | ||
// Fortunately, we compose our own task on top of the | ||
// original that links each item with the task result and as | ||
// a consequence generate new and unique task objects. | ||
|
||
var completedTask = await | ||
Task.WhenAny(tasks.Select(it => (Task) it.Task).Concat(cancellationTaskSource.Task)) | ||
.ConfigureAwait(continueOnCapturedContext: false); | ||
|
||
if (completedTask == cancellationTaskSource.Task) | ||
if (semaphore != null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a wrapper class would make the code simpler: class SemaphoreSlimWrapper {
private readonly SemaphoreSlim semaphore;
public SemaphoreSlimWrapper(int? maxConcurrency) {
semaphore = maxConcurrency is int count
? new SemaphoreSlim(count, count)
: null;
}
public Task WaitAsync() => semaphore?.WaitAsync() ?? Task.CompletedTask;
public void Release() => semaphore?.Release();
} And then this if can be avoided. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me know what you make of it now with e956b46. |
||
{ | ||
// Cancellation during the wait means the enumeration | ||
// has been stopped by the user so the results of the | ||
// remaining tasks are no longer needed. Those tasks | ||
// should cancel as a result of sharing the same | ||
// cancellation token and provided that they passed it | ||
// on to any downstream asynchronous operations. Either | ||
// way, this loop is done so exit hard here. | ||
|
||
return; | ||
try | ||
{ | ||
await semaphore.WaitAsync(cancellationToken); | ||
} | ||
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
{ | ||
return; | ||
} | ||
} | ||
|
||
var i = tasks.FindIndex(it => it.Task.Equals(completedTask)); | ||
if (cancellationToken.IsCancellationRequested) | ||
return; | ||
|
||
{ | ||
var (item, task) = tasks[i]; | ||
tasks.RemoveAt(i); | ||
Interlocked.Increment(ref pendingCount); | ||
|
||
// Await the task rather than using its result directly | ||
// to avoid having the task's exception bubble up as | ||
// AggregateException if the task failed. | ||
var item = enumerator.Current; | ||
var task = starter(item); | ||
|
||
collection.Add(completionNoticeSelector(item, task)); | ||
} | ||
// Add a continutation that notifies completion of the task, | ||
// along with the necessary housekeeping, in case it | ||
// completes before maximum concurrency is reached. | ||
|
||
{ | ||
if (reader.TryRead(out var item)) | ||
tasks.Add((item, taskSelector(item))); | ||
} | ||
} | ||
#pragma warning disable 4014 // https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/compiler-messages/cs4014 | ||
|
||
collection.Add(endNotice); | ||
} | ||
catch (Exception ex) | ||
{ | ||
cancellationTokenSource.Cancel(); | ||
collection.Add(errorNoticeSelector(ex)); | ||
} | ||
finally | ||
{ | ||
reader?.Dispose(); | ||
} | ||
|
||
collection.CompleteAdding(); | ||
} | ||
task.ContinueWith(cancellationToken: cancellationToken, | ||
continuationOptions: TaskContinuationOptions.ExecuteSynchronously, | ||
scheduler: TaskScheduler.Current, | ||
continuationAction: t => | ||
{ | ||
semaphore?.Release(); | ||
|
||
sealed class Reader<T> : IDisposable | ||
{ | ||
IEnumerator<T> _enumerator; | ||
if (cancellationToken.IsCancellationRequested) | ||
return; | ||
|
||
public Reader(IEnumerator<T> enumerator) => | ||
_enumerator = enumerator; | ||
onCompletion(item, t); | ||
OnPendingCompleted(); | ||
}); | ||
|
||
public bool TryRead(out T item) | ||
{ | ||
var ended = false; | ||
if (_enumerator == null || (ended = !_enumerator.MoveNext())) | ||
{ | ||
if (ended) | ||
Dispose(); | ||
item = default; | ||
return false; | ||
#pragma warning restore 4014 | ||
} | ||
|
||
item = _enumerator.Current; | ||
return true; | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
var e = _enumerator; | ||
if (e == null) | ||
return; | ||
_enumerator = null; | ||
e.Dispose(); | ||
OnPendingCompleted(); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Remainde/Remainder/