Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix AwaitCompletion to yield results during source iteration #505

Merged
merged 26 commits into from
Jun 4, 2018
Merged
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1ba1162
Use continuations to report tasks to support slow sources
atifaziz May 28, 2018
a30df54
End notice unnecessary
atifaziz May 28, 2018
d973670
Refactor to observable
atifaziz May 28, 2018
86c94f9
Move general helpers down
atifaziz May 28, 2018
4a4e08d
To-do notes about exceptional cases
atifaziz May 28, 2018
3111a90
Add arg validation to StartAsync
atifaziz May 28, 2018
688c2ee
Don't allocate semaphore if unbounded concurrency
atifaziz May 28, 2018
616241b
Fold lazy start semantics into StartAsync
atifaziz May 28, 2018
4514060
Consolidate pending completion actions
atifaziz May 28, 2018
da262bc
Revert "Refactor to observable"
atifaziz May 28, 2018
03419d3
Don't treat cancellation as completion; return early
atifaziz May 28, 2018
91f4896
Filter token when catching OperationCanceledException
atifaziz May 28, 2018
98ddd41
Add back (missing) end notice break
atifaziz May 28, 2018
cf71a25
Crititcal error handling
atifaziz May 28, 2018
af05bd6
Minor reivew/clean-up of formatting and names
atifaziz May 29, 2018
5f6a23b
Move notification & error handling in main iterator
atifaziz May 29, 2018
5c759e6
Remove reundant cancellation for simplicity
atifaziz May 29, 2018
69d5341
Move EDI capture into critical section
atifaziz May 29, 2018
8dbdcbe
Capture original error with error notification failure
atifaziz May 29, 2018
57212f2
Lots of comments
atifaziz May 29, 2018
bda7187
Fix typo (s/Remainde/Remainder/)
atifaziz May 29, 2018
e956b46
Model a concurrency gate around the semaphore
atifaziz May 29, 2018
ec6e365
Re-format conditional expression
atifaziz May 29, 2018
467ba7c
Move CompletedTask singleton into own class
atifaziz May 29, 2018
4a5cfd3
Fix ConcurrencyGate.EnterAsync to handle cancellation
atifaziz May 29, 2018
30478c0
Rename task completion parameter for clarity
atifaziz May 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 153 additions & 141 deletions MoreLinq/Experimental/Await.cs
Original file line number Diff line number Diff line change
Expand Up @@ -416,45 +416,124 @@ public static IAwaitQuery<TResult> AwaitCompletion<T, TTaskResult, TResult>(

return
AwaitQuery.Create(
options => _(options.MaxConcurrency ?? int.MaxValue,
options => _(options.MaxConcurrency,
options.Scheduler ?? TaskScheduler.Default,
options.PreserveOrder));

IEnumerable<TResult> _(int maxConcurrency, TaskScheduler scheduler, bool ordered)
IEnumerable<TResult> _(int? maxConcurrency, TaskScheduler scheduler, bool ordered)
{
// A separate task will enumerate the source and launch tasks.
// It will post all progress as notices to the collection below.
// A notice is essentially a discriminated union like:
//
// type Notice<'a, 'b> =
// | End
// | Result of (int * 'a * Task<'b>)
// | Error of ExceptionDispatchInfo
//
// Note that BlockingCollection.CompleteAdding is never used to
// to mark the end (which its own notice above) because
// BlockingCollection.Add throws if called after CompleteAdding
// and we want to deliberately tolerate the race condition.

var notices = new BlockingCollection<(Notice, (int, T, Task<TTaskResult>), ExceptionDispatchInfo)>();
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var completed = false;

var enumerator =
source.Index()
.Select(e => (e.Key, Item: e.Value, Task: evaluator(e.Value, cancellationToken)))
.GetEnumerator();
var consumerCancellationTokenSource = new CancellationTokenSource();
(Exception, Exception) lastCriticalErrors = default;

void PostNotice(Notice notice,
(int, T, Task<TTaskResult>) item,
Exception error)
{
// If a notice fails to post then assume critical error
// conditions (like low memory), capture the error without
// further allocation of resources and trip the cancellation
// token source used by the main loop waiting on notices.
// Note that only the "last" critical error is reported
// as maintaining a list would incur allocations. The idea
// here is to make a best effort attempt to report any of
// the error conditions that may be occuring, which is still
// better than nothing.

try
{
var edi = error != null
? ExceptionDispatchInfo.Capture(error)
: null;
notices.Add((notice, item, edi));
}
catch (Exception e)
{
// Don't use ExceptionDispatchInfo.Capture here to avoid
// inducing allocations if already under low memory
// conditions.

lastCriticalErrors = (e, error);
consumerCancellationTokenSource.Cancel();
throw;
}
}

var completed = false;
var cancellationTokenSource = new CancellationTokenSource();

var enumerator = source.Index().GetEnumerator();
IDisposable disposable = enumerator; // disables AccessToDisposedClosure warnings

try
{
var cancellationToken = cancellationTokenSource.Token;

// Fire-up a parallel loop to iterate through the source and
// launch tasks, posting a result-notice as each task
// completes and another, an end-notice, when all tasks have
// completed.

Task.Factory.StartNew(
() =>
CollectToAsync(
enumerator,
e => e.Task,
notices,
(e, r) => (Notice.Result, (e.Key, e.Item, e.Task), default),
ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)),
(Notice.End, default, default),
maxConcurrency, cancellationTokenSource),
async () =>
{
try
{
await enumerator.StartAsync(
e => evaluator(e.Value, cancellationToken),
(e, r) => PostNotice(Notice.Result, (e.Key, e.Value, r), default),
() => PostNotice(Notice.End, default, default),
maxConcurrency, cancellationToken);
}
catch (Exception e)
{
PostNotice(Notice.Error, default, e);
}
},
CancellationToken.None,
TaskCreationOptions.DenyChildAttach,
scheduler);

// Remainde here is the main loop that waits for and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Remainde/Remainder/

// processes notices.

var nextKey = 0;
var holds = ordered ? new List<(int, T, Task<TTaskResult>)>() : null;

foreach (var (kind, result, error) in notices.GetConsumingEnumerable())
using (var notice = notices.GetConsumingEnumerable(consumerCancellationTokenSource.Token)
.GetEnumerator())
while (true)
{
try
{
if (!notice.MoveNext())
break;
}
catch (OperationCanceledException e) when (e.CancellationToken == consumerCancellationTokenSource.Token)
{
var (error1, error2) = lastCriticalErrors;
throw new Exception("One or more critical errors have occurred.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of wrapping the AggregateException with an Exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale behind this is that the AggregateException is an informational detail of the critical exception that's actually being raised. The exception raised here is due to a critical condition that made it impossible for the method to guarantee correct operation and report the original exception(s) (in the detail, as an aggregate).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following. This exception is telling me one or more errors occured. And this is precisely what AggregateException is for.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregateException is common with tasks and could confuse someone if AwaitCompletion throws it too, and if they have a catch for AggregateException. My intention was to make a clear distinction here. This is a single exception due to critical conditions, period. The aggregate within is for informational and diagnostic purposes only. Perhaps this is a futile attempt at semantics that exist just in my head?

error2 != null ? new AggregateException(error1, error2)
: new AggregateException(error1));
}

var (kind, result, error) = notice.Current;

if (kind == Notice.Error)
error.Throw();

Expand Down Expand Up @@ -531,149 +610,82 @@ IEnumerable<TResult> _(int maxConcurrency, TaskScheduler scheduler, bool ordered
}
}

enum Notice { Result, Error, End }

static async Task CollectToAsync<T, TResult, TNotice>(
this IEnumerator<T> e,
Func<T, Task<TResult>> taskSelector,
BlockingCollection<TNotice> collection,
Func<T, Task<TResult>, TNotice> completionNoticeSelector,
Func<Exception, TNotice> errorNoticeSelector,
TNotice endNotice,
int maxConcurrency,
CancellationTokenSource cancellationTokenSource)
enum Notice { End, Result, Error }

static async Task StartAsync<T, TResult>(
this IEnumerator<T> enumerator,
Func<T, Task<TResult>> starter,
Action<T, Task<TResult>> onCompletion,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably better named onNext

Copy link
Member Author

@atifaziz atifaziz May 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can understand why you'd say that and you'll notice that I got side-tracked in d973670 by the observer pattern here. Later in da262bc, I reverted as allowing harmless race conditions meant I couldn't guarantee the pattern's semantics. I find onNext will mislead in the same direction once again. Tasks complete so completion here is in that sense rather than completion of the loop. I felt that the task as a parameter of the function would be sufficient to say what's completing in case there's any confusion. I guess that's not how you read it. I would prefer onTaskCompletion than onNext if you still feel strong about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see why you don't want onNext. onTaskCompletion would be a better name. The main issue I think is that onCompletion is too easy to confuse with completing the entire task instead of completing each iteration. But your proposed name works for me.

Action onEnd,
int? maxConcurrency,
CancellationToken cancellationToken)
{
Reader<T> reader = null;
if (enumerator == null) throw new ArgumentNullException(nameof(enumerator));
if (starter == null) throw new ArgumentNullException(nameof(starter));
if (onCompletion == null) throw new ArgumentNullException(nameof(onCompletion));
if (onEnd == null) throw new ArgumentNullException(nameof(onEnd));
if (maxConcurrency < 1) throw new ArgumentOutOfRangeException(nameof(maxConcurrency));

try
using (enumerator)
{
reader = new Reader<T>(e);
var pendingCount = 1; // terminator

var cancellationToken = cancellationTokenSource.Token;
var cancellationTaskSource = new TaskCompletionSource<bool>();
cancellationToken.Register(() => cancellationTaskSource.TrySetResult(true));

var tasks = new List<(T Item, Task<TResult> Task)>();

for (var i = 0; i < maxConcurrency; i++)
void OnPendingCompleted()
{
if (!reader.TryRead(out var item))
break;
tasks.Add((item, taskSelector(item)));
if (Interlocked.Decrement(ref pendingCount) == 0)
onEnd();
}

while (tasks.Count > 0)
var semaphore = maxConcurrency is int count
? new SemaphoreSlim(count, count)
: null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally the docs don't lie 😉

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously, using UnboundedConcurrency actually meant int.MaxValue concurrency. Now it really means unbounded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Yeah. 😃


while (enumerator.MoveNext())
{
// Task.WaitAny is synchronous and blocking but allows the
// waiting to be cancelled via a CancellationToken.
// Task.WhenAny can be awaited so it is better since the
// thread won't be blocked and can return to the pool.
// However, it doesn't support cancellation so instead a
// task is built on top of the CancellationToken that
// completes when the CancellationToken trips.
//
// Also, Task.WhenAny returns the task (Task) object that
// completed but task objects may not be unique due to
// caching, e.g.:
//
// async Task<bool> Foo() => true;
// async Task<bool> Bar() => true;
// var foo = Foo();
// var bar = Bar();
// var same = foo.Equals(bar); // == true
//
// In this case, the task returned by Task.WhenAny will
// match `foo` and `bar`:
//
// var done = Task.WhenAny(foo, bar);
//
// Logically speaking, the uniqueness of a task does not
// matter but here it does, especially when Await (the main
// user of CollectAsync) needs to return results ordered.
// Fortunately, we compose our own task on top of the
// original that links each item with the task result and as
// a consequence generate new and unique task objects.

var completedTask = await
Task.WhenAny(tasks.Select(it => (Task) it.Task).Concat(cancellationTaskSource.Task))
.ConfigureAwait(continueOnCapturedContext: false);

if (completedTask == cancellationTaskSource.Task)
if (semaphore != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a wrapper class would make the code simpler:

class SemaphoreSlimWrapper {
  private readonly SemaphoreSlim semaphore;
  public SemaphoreSlimWrapper(int? maxConcurrency) {
      semaphore = maxConcurrency is int count
                              ? new SemaphoreSlim(count, count)
                             : null;
  }
  public Task WaitAsync() => semaphore?.WaitAsync() ?? Task.CompletedTask;
  public void Release() => semaphore?.Release();
}

And then this if can be avoided.

Copy link
Member Author

@atifaziz atifaziz May 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know what you make of it now with e956b46.

{
// Cancellation during the wait means the enumeration
// has been stopped by the user so the results of the
// remaining tasks are no longer needed. Those tasks
// should cancel as a result of sharing the same
// cancellation token and provided that they passed it
// on to any downstream asynchronous operations. Either
// way, this loop is done so exit hard here.

return;
try
{
await semaphore.WaitAsync(cancellationToken);
}
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch block could be moved back up into the caller, and below make change the cancellation token check to cancellationToken.ThrowIfCancellationRequested(); . This should make this function simple, while not making the caller too complicated as it is already catching exceptions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catch in the caller is for exceptional issues in StartAsync, like invocation of starter throwing (task fails to start). The returns are normal conditions or exits that are accounted for.

{
return;
}
}

var i = tasks.FindIndex(it => it.Task.Equals(completedTask));
if (cancellationToken.IsCancellationRequested)
return;

{
var (item, task) = tasks[i];
tasks.RemoveAt(i);
Interlocked.Increment(ref pendingCount);

// Await the task rather than using its result directly
// to avoid having the task's exception bubble up as
// AggregateException if the task failed.
var item = enumerator.Current;
var task = starter(item);

collection.Add(completionNoticeSelector(item, task));
}
// Add a continutation that notifies completion of the task,
// along with the necessary housekeeping, in case it
// completes before maximum concurrency is reached.

{
if (reader.TryRead(out var item))
tasks.Add((item, taskSelector(item)));
}
}
#pragma warning disable 4014 // https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/compiler-messages/cs4014

collection.Add(endNotice);
}
catch (Exception ex)
{
cancellationTokenSource.Cancel();
collection.Add(errorNoticeSelector(ex));
}
finally
{
reader?.Dispose();
}

collection.CompleteAdding();
}
task.ContinueWith(cancellationToken: cancellationToken,
continuationOptions: TaskContinuationOptions.ExecuteSynchronously,
scheduler: TaskScheduler.Current,
continuationAction: t =>
{
semaphore?.Release();

sealed class Reader<T> : IDisposable
{
IEnumerator<T> _enumerator;
if (cancellationToken.IsCancellationRequested)
return;

public Reader(IEnumerator<T> enumerator) =>
_enumerator = enumerator;
onCompletion(item, t);
OnPendingCompleted();
});

public bool TryRead(out T item)
{
var ended = false;
if (_enumerator == null || (ended = !_enumerator.MoveNext()))
{
if (ended)
Dispose();
item = default;
return false;
#pragma warning restore 4014
}

item = _enumerator.Current;
return true;
}

public void Dispose()
{
var e = _enumerator;
if (e == null)
return;
_enumerator = null;
e.Dispose();
OnPendingCompleted();
}
}

Expand Down