Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

76831-SystemNetQuicTestsQuicStreamTestsWriteCanceled_NextWriteThrows #92266

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ private enum State
private ManualResetValueTaskSourceCore<bool> _valueTaskSource;
private CancellationTokenRegistration _cancellationRegistration;
private Action<object?>? _cancellationAction;
private object? _cancellationActionLock;
private GCHandle _keepAlive;

private readonly TaskCompletionSource _finalTaskSource;
Expand All @@ -50,6 +51,11 @@ public ResettableValueTaskSource(bool runContinuationsAsynchronously = true)
/// </summary>
public Action<object?> CancellationAction { init { _cancellationAction = value; } }

/// <summary>
/// Optional lock object to be held during the <see cref="CancellationAction"/> callback. If no lock is provided, the callback will be invoked without holding any lock.
/// </summary>
public object? CancellationActionLock { init { _cancellationActionLock = value; } }

/// <summary>
/// Returns <c>true</c> is this task source has entered its final state, i.e. <see cref="TrySetResult(bool)"/> or <see cref="TrySetException(Exception, bool)"/>
/// was called with <c>final</c> set to <c>true</c> and the result was propagated.
Expand Down Expand Up @@ -91,10 +97,27 @@ public bool TryGetValueTask(out ValueTask valueTask, object? keepAlive = null, C
_cancellationRegistration = cancellationToken.UnsafeRegister(static (obj, cancellationToken) =>
{
(ResettableValueTaskSource thisRef, object? target) = ((ResettableValueTaskSource, object?))obj!;
// This will transition the state to Ready.
if (thisRef.TrySetException(new OperationCanceledException(cancellationToken)))

object? lockObject = thisRef._cancellationActionLock;
try
{
if (lockObject != null)
{
Monitor.Enter(lockObject);
}

// This will transition the state to Ready.
if (thisRef.TrySetException(new OperationCanceledException(cancellationToken)))
{
thisRef._cancellationAction?.Invoke(target);
}
}
finally
{
thisRef._cancellationAction?.Invoke(target);
if (lockObject != null)
{
Monitor.Exit(lockObject);
}
}
}, (this, keepAlive));
}
Expand Down
20 changes: 16 additions & 4 deletions src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public sealed partial class QuicStream
private ReceiveBuffers _receiveBuffers = new ReceiveBuffers();
private int _receivedNeedsEnable;

private readonly ResettableValueTaskSource _sendTcs = new ResettableValueTaskSource()
private readonly ResettableValueTaskSource _sendTcs;
// helper function since we need to pass the _shutdownTcs to the _sendTcs
private ResettableValueTaskSource CreateSendTaskCompletionSource() => new ResettableValueTaskSource()
{
CancellationAction = target =>
{
Expand All @@ -106,7 +108,12 @@ public sealed partial class QuicStream
// when using CancellationTokenSource.CancelAfter.
// Ignore the exception
}
}
},

// since we're using _sendTcs (Abortive) and _shutdownTcs (Graceful) for controlling write direction shutdown,
// CancellationAction above can race with disposing the stream. Holding _shutdownTcs will prevent Dispose from
// performing graceful shutdown while we are Aborting it.
CancellationActionLock = _shutdownTcs
};
private MsQuicBuffers _sendBuffers = new MsQuicBuffers();
private readonly object _sendBuffersLock = new object();
Expand Down Expand Up @@ -185,6 +192,7 @@ internal unsafe QuicStream(MsQuicContextSafeHandle connectionHandle, QuicStreamT
_receiveTcs.TrySetResult(final: true);
}
_type = type;
_sendTcs = CreateSendTaskCompletionSource();
}

/// <summary>
Expand All @@ -211,6 +219,7 @@ internal unsafe QuicStream(MsQuicContextSafeHandle connectionHandle, QUIC_HANDLE
context.Free();
throw;
}
_sendTcs = CreateSendTaskCompletionSource();

_defaultErrorCode = defaultErrorCode;

Expand Down Expand Up @@ -355,15 +364,18 @@ public ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, bool completeWrites, Ca
return ValueTask.FromCanceled(cancellationToken);
}

// Concurrent call, this one lost the race.
// attempt to reserve the task. If the token was already canceled, this will also invoke Abort(Write, DefaultErrorCode) and transition to final state.
if (!_sendTcs.TryGetValueTask(out ValueTask valueTask, this, cancellationToken))
{
// Concurrent call, this one lost the race.
throw new InvalidOperationException(SR.Format(SR.net_io_invalidnestedcall, "write"));
}

// No need to call anything since we already have a result, most likely an exception.
// Task.Delay(TimeSpan.FromSeconds(1), CancellationToken.None).Wait(CancellationToken.None);

if (valueTask.IsCompleted)
{
// No need to continue since we already have a result, most likely an exception.
return valueTask;
}

Expand Down