Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix AwaitCompletion to yield results during source iteration #505

Merged
merged 26 commits into from
Jun 4, 2018

Conversation

atifaziz
Copy link
Member

@atifaziz atifaziz commented May 29, 2018

This PR addresses #502.

Changes:

  • A continuation is used to post task completions to the main loop so they can be reported and yielded while the source is still being iterated
  • A semaphore is used to gate concurrency (when limited)

The source iteration loop has been refactored considerably. It is more complex since it uses asynchronous notification and requires synchronisation of shared state. Previously, the naïve approach was to loop through the entire source, launching tasks and then wait in another loop to post notifications as they complete. That unfortunately led to the behaviour documented in #502.

@atifaziz atifaziz requested a review from fsateler May 29, 2018 13:58
@atifaziz
Copy link
Member Author

Note that there are race conditions permitted, like some tasks could potentially post completion notices even after an error and termination of the main loop. It could cause some waste like keeping and adding to the notices collection but the impact should be very marginal. An optimisation could be added in the future by nulling out the notices collection so it's available for GC and notices are dropped if the collection is no longer (immaterial because no one would be listening on the other end of the line). In this PR, I am most interested in focusing on fixing the issue raised in #502 from a behavioural perspective (as long as the race conditions are harmless).

Copy link
Member

@fsateler fsateler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we don't have tests for this operator... so we can't check this hasn't introduced regressions...

catch (OperationCanceledException e) when (e.CancellationToken == consumerCancellationTokenSource.Token)
{
var (error1, error2) = lastCriticalErrors;
throw new Exception("One or more critical errors have occurred.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of wrapping the AggregateException with an Exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale behind this is that the AggregateException is an informational detail of the critical exception that's actually being raised. The exception raised here is due to a critical condition that made it impossible for the method to guarantee correct operation and report the original exception(s) (in the detail, as an aggregate).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following. This exception is telling me one or more errors occured. And this is precisely what AggregateException is for.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregateException is common with tasks and could confuse someone if AwaitCompletion throws it too, and if they have a catch for AggregateException. My intention was to make a clear distinction here. This is a single exception due to critical conditions, period. The aggregate within is for informational and diagnostic purposes only. Perhaps this is a futile attempt at semantics that exist just in my head?

while (tasks.Count > 0)
var semaphore = maxConcurrency is int count
? new SemaphoreSlim(count, count)
: null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally the docs don't lie 😉

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously, using UnboundedConcurrency actually meant int.MaxValue concurrency. Now it really means unbounded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Yeah. 😃

static async Task StartAsync<T, TResult>(
this IEnumerator<T> enumerator,
Func<T, Task<TResult>> starter,
Action<T, Task<TResult>> onCompletion,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably better named onNext

Copy link
Member Author

@atifaziz atifaziz May 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can understand why you'd say that and you'll notice that I got side-tracked in d973670 by the observer pattern here. Later in da262bc, I reverted as allowing harmless race conditions meant I couldn't guarantee the pattern's semantics. I find onNext will mislead in the same direction once again. Tasks complete so completion here is in that sense rather than completion of the loop. I felt that the task as a parameter of the function would be sufficient to say what's completing in case there's any confusion. I guess that's not how you read it. I would prefer onTaskCompletion than onNext if you still feel strong about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see why you don't want onNext. onTaskCompletion would be a better name. The main issue I think is that onCompletion is too easy to confuse with completing the entire task instead of completing each iteration. But your proposed name works for me.

.ConfigureAwait(continueOnCapturedContext: false);

if (completedTask == cancellationTaskSource.Task)
if (semaphore != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a wrapper class would make the code simpler:

class SemaphoreSlimWrapper {
  private readonly SemaphoreSlim semaphore;
  public SemaphoreSlimWrapper(int? maxConcurrency) {
      semaphore = maxConcurrency is int count
                              ? new SemaphoreSlim(count, count)
                             : null;
  }
  public Task WaitAsync() => semaphore?.WaitAsync() ?? Task.CompletedTask;
  public void Release() => semaphore?.Release();
}

And then this if can be avoided.

Copy link
Member Author

@atifaziz atifaziz May 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know what you make of it now with e956b46.

{
await semaphore.WaitAsync(cancellationToken);
}
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch block could be moved back up into the caller, and below make change the cancellation token check to cancellationToken.ThrowIfCancellationRequested(); . This should make this function simple, while not making the caller too complicated as it is already catching exceptions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catch in the caller is for exceptional issues in StartAsync, like invocation of starter throwing (task fails to start). The returns are normal conditions or exits that are accounted for.

CancellationToken.None,
TaskCreationOptions.DenyChildAttach,
scheduler);

// Remainde here is the main loop that waits for and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Remainde/Remainder/

@atifaziz
Copy link
Member Author

atifaziz commented May 29, 2018

Unfortunately we don't have tests for this operator... so we can't check this hasn't introduced regressions...

Aye, thus the experimental status. I just haven't got round to thinking about good tests although #439 is somewhat of a sprint/start at that.

@atifaziz
Copy link
Member Author

@fsateler Thanks for your review. We good to merge this?

@atifaziz
Copy link
Member Author

atifaziz commented Jun 3, 2018

@fsateler I'd like to keep the momentum towards releasing 3.0. I feel guilty merging this since you started a review but haven't approved the changes you requested, which I hope have addressed with follow-up commits and comments. If you're busy then just give me a sign that I should go ahead. I want to publish a release candidate and then let it simmer for a week before considering it golden. That should still leave some time to iron out any kinks should you spot any.

Also bear in mind that this is an experimental feature and while we do want to do our very best to get it right, I wouldn't want a minor issue like the choice of exception thrown (AggregateException embedded in an Exception versus not) under very critical circumstance to hold back the release. We can fork that into a separate issue to review if we want to give it some more thought. I don't think anyone will ever be catching that case to apply any sort of remedy except log and abort the request/program. The important thing is that we trip the main loop so a program doesn't enter a zombie state.

@fsateler
Copy link
Member

fsateler commented Jun 3, 2018

Yes, go ahead. I think my concerns are addressed

@atifaziz
Copy link
Member Author

atifaziz commented Jun 4, 2018

Yes, go ahead.

Thanks, mark approved then?

Copy link
Member

@fsateler fsateler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks Good

@atifaziz atifaziz merged commit 201dbb3 into morelinq:master Jun 4, 2018
@atifaziz atifaziz deleted the await-with-slow-source branch June 4, 2018 13:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants