-
Notifications
You must be signed in to change notification settings - Fork 419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix AwaitCompletion to yield results during source iteration #505
Conversation
Especially if we fail to notify under low memory conditions!
This removes the need for Lazy allocations
This reverts commit d973670.
Note that there are race conditions permitted, like some tasks could potentially post completion notices even after an error and termination of the main loop. It could cause some waste like keeping and adding to the notices collection but the impact should be very marginal. An optimisation could be added in the future by nulling out the notices collection so it's available for GC and notices are dropped if the collection is no longer (immaterial because no one would be listening on the other end of the line). In this PR, I am most interested in focusing on fixing the issue raised in #502 from a behavioural perspective (as long as the race conditions are harmless). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we don't have tests for this operator... so we can't check this hasn't introduced regressions...
catch (OperationCanceledException e) when (e.CancellationToken == consumerCancellationTokenSource.Token) | ||
{ | ||
var (error1, error2) = lastCriticalErrors; | ||
throw new Exception("One or more critical errors have occurred.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of wrapping the AggregateException
with an Exception
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rationale behind this is that the AggregateException
is an informational detail of the critical exception that's actually being raised. The exception raised here is due to a critical condition that made it impossible for the method to guarantee correct operation and report the original exception(s) (in the detail, as an aggregate).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not following. This exception is telling me one or more errors occured. And this is precisely what AggregateException
is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AggregateException
is common with tasks and could confuse someone if AwaitCompletion
throws it too, and if they have a catch for AggregateException
. My intention was to make a clear distinction here. This is a single exception due to critical conditions, period. The aggregate within is for informational and diagnostic purposes only. Perhaps this is a futile attempt at semantics that exist just in my head?
MoreLinq/Experimental/Await.cs
Outdated
while (tasks.Count > 0) | ||
var semaphore = maxConcurrency is int count | ||
? new SemaphoreSlim(count, count) | ||
: null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally the docs don't lie 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previously, using UnboundedConcurrency
actually meant int.MaxValue
concurrency. Now it really means unbounded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see. Yeah. 😃
MoreLinq/Experimental/Await.cs
Outdated
static async Task StartAsync<T, TResult>( | ||
this IEnumerator<T> enumerator, | ||
Func<T, Task<TResult>> starter, | ||
Action<T, Task<TResult>> onCompletion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably better named onNext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can understand why you'd say that and you'll notice that I got side-tracked in d973670 by the observer pattern here. Later in da262bc, I reverted as allowing harmless race conditions meant I couldn't guarantee the pattern's semantics. I find onNext
will mislead in the same direction once again. Tasks complete so completion here is in that sense rather than completion of the loop. I felt that the task as a parameter of the function would be sufficient to say what's completing in case there's any confusion. I guess that's not how you read it. I would prefer onTaskCompletion
than onNext
if you still feel strong about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I see why you don't want onNext
. onTaskCompletion
would be a better name. The main issue I think is that onCompletion
is too easy to confuse with completing the entire task instead of completing each iteration. But your proposed name works for me.
MoreLinq/Experimental/Await.cs
Outdated
.ConfigureAwait(continueOnCapturedContext: false); | ||
|
||
if (completedTask == cancellationTaskSource.Task) | ||
if (semaphore != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a wrapper class would make the code simpler:
class SemaphoreSlimWrapper {
private readonly SemaphoreSlim semaphore;
public SemaphoreSlimWrapper(int? maxConcurrency) {
semaphore = maxConcurrency is int count
? new SemaphoreSlim(count, count)
: null;
}
public Task WaitAsync() => semaphore?.WaitAsync() ?? Task.CompletedTask;
public void Release() => semaphore?.Release();
}
And then this if can be avoided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know what you make of it now with e956b46.
MoreLinq/Experimental/Await.cs
Outdated
{ | ||
await semaphore.WaitAsync(cancellationToken); | ||
} | ||
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This catch
block could be moved back up into the caller, and below make change the cancellation token check to cancellationToken.ThrowIfCancellationRequested();
. This should make this function simple, while not making the caller too complicated as it is already catching exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The catch
in the caller is for exceptional issues in StartAsync
, like invocation of starter
throwing (task fails to start). The returns are normal conditions or exits that are accounted for.
MoreLinq/Experimental/Await.cs
Outdated
CancellationToken.None, | ||
TaskCreationOptions.DenyChildAttach, | ||
scheduler); | ||
|
||
// Remainde here is the main loop that waits for and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Remainde/Remainder/
Aye, thus the experimental status. I just haven't got round to thinking about good tests although #439 is somewhat of a sprint/start at that. |
This unties its initialization with ConcurrencyGate.
@fsateler Thanks for your review. We good to merge this? |
@fsateler I'd like to keep the momentum towards releasing 3.0. I feel guilty merging this since you started a review but haven't approved the changes you requested, which I hope have addressed with follow-up commits and comments. If you're busy then just give me a sign that I should go ahead. I want to publish a release candidate and then let it simmer for a week before considering it golden. That should still leave some time to iron out any kinks should you spot any. Also bear in mind that this is an experimental feature and while we do want to do our very best to get it right, I wouldn't want a minor issue like the choice of exception thrown ( |
Yes, go ahead. I think my concerns are addressed |
Thanks, mark approved then? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks Good
This PR addresses #502.
Changes:
The source iteration loop has been refactored considerably. It is more complex since it uses asynchronous notification and requires synchronisation of shared state. Previously, the naïve approach was to loop through the entire source, launching tasks and then wait in another loop to post notifications as they complete. That unfortunately led to the behaviour documented in #502.