Skip to content

Commit

Permalink
Add 'cancel on loss' send mode to MsQuicStream. (#4037)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianpick authored Jan 8, 2024
1 parent 607b174 commit 9416cd6
Show file tree
Hide file tree
Showing 13 changed files with 388 additions and 6 deletions.
6 changes: 6 additions & 0 deletions docs/Streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ When the send has been completely shut down the app will get a `QUIC_STREAM_EVEN

An app can opt in to sending stream data with 0-RTT keys (if available) by including the `QUIC_SEND_FLAG_ALLOW_0_RTT` flag on [StreamSend](api/StreamSend.md) call. MsQuic doesn't make any guarantees that the data will actually be sent with 0-RTT keys. There are several reasons it may not happen, such as keys not being available, packet loss, flow control, etc.

## Cancel On Loss

In case it is desirable to cancel a stream when packet loss is deteced instead of retransmitting the affected packets, the `QUIC_SEND_FLAG_CANCEL_ON_LOSS` can be supplied on a [StreamSend](api/StreamSend.md) call. Doing so will irreversibly switch the associated stream to this behavior. This includes *every* subsequent send call on the same stream, even if the call itself does not include the above flag.

If a stream gets canceled because it is in 'cancel on loss' mode, a `QUIC_STREAM_EVENT_CANCEL_ON_LOSS` event will get emitted. The event allows the app to provide an error code that is communicated to the peer via a `QUIC_STREAM_EVENT_PEER_SEND_ABORTED` event.

# Receiving

Data is received and delivered to apps via the `QUIC_STREAM_EVENT_RECEIVE` event. The event indicates zero, one or more contiguous buffers up to the application.
Expand Down
1 change: 1 addition & 0 deletions docs/api/StreamSend.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Value | Meaning
**QUIC_SEND_FLAG_FIN**<br>4 | Indicates the the stream send is the last or final data to be sent on the stream and should be gracefully shutdown (equivalent to calling [StreamShutdown](StreamShutdown.md) with the `QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL` flag).
**QUIC_SEND_FLAG_DGRAM_PRIORITY**<br>8 | **Unused and ignored** for `StreamSend`
**QUIC_SEND_FLAG_DELAY_SEND**<br>16 | Provides a hint to MsQuic to indicate the data does not need to be sent immediately, likely because more is soon to follow.
**QUIC_SEND_FLAG_CANCEL_ON_LOSS**<br>32 | Informs MsQuic to irreversibly mark the associated stream to be canceled when packet loss has been detected on it. I.e., all sends on a given stream are subject to this behavior from the moment the flag has been supplied for the first time.
`ClientSendContext`
Expand Down
4 changes: 3 additions & 1 deletion src/core/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ typedef union QUIC_STREAM_FLAGS {
BOOLEAN LocalCloseReset : 1; // Locally closed (locally aborted).
BOOLEAN LocalCloseResetReliable : 1; // Indicates that we should shutdown the send path once we sent/ACK'd ReliableOffsetSend bytes.
BOOLEAN LocalCloseResetReliableAcked : 1; // Indicates the peer has acknowledged we will stop sending once we sent/ACK'd ReliableOffsetSend bytes.
BOOLEAN RemoteCloseResetReliable : 1; // Indicates that the peer initiaited a reliable reset. Keep Recv path available for RecvMaxLength bytes.
BOOLEAN RemoteCloseResetReliable : 1; // Indicates that the peer initiated a reliable reset. Keep Recv path available for RecvMaxLength bytes.
BOOLEAN ReceivedStopSending : 1; // Peer sent STOP_SENDING frame.
BOOLEAN LocalCloseAcked : 1; // Any close acknowledged.
BOOLEAN FinAcked : 1; // Our FIN was acknowledged.
Expand All @@ -144,6 +144,8 @@ typedef union QUIC_STREAM_FLAGS {
BOOLEAN ReceiveCallPending : 1; // There is an uncompleted receive to the app.
BOOLEAN ReceiveCallActive : 1; // There is an active receive to the app.
BOOLEAN SendDelayed : 1; // A delayed send is currently queued.
BOOLEAN CancelOnLoss : 1; // Indicates that the stream is to be canceled
// if loss is detected.

BOOLEAN HandleSendShutdown : 1; // Send shutdown complete callback delivered.
BOOLEAN HandleShutdown : 1; // Shutdown callback delivered.
Expand Down
34 changes: 32 additions & 2 deletions src/core/stream_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ QuicStreamSendShutdown(

while (ApiSendRequests != NULL) {
//
// These sends were queued by the app after queueing a graceful
// These sends were queued by the app after queuing a graceful
// shutdown. Bad app!
//
QUIC_SEND_REQUEST* SendRequest = ApiSendRequests;
Expand All @@ -171,7 +171,7 @@ QuicStreamSendShutdown(

} else if (Stream->ReliableOffsetSend == 0 || Stream->Flags.LocalCloseResetReliable) {
//
// Enter abortive branch if we are not aborting reliablely or we have done it already.
// Enter abortive branch if we are not aborting reliably or we have done it already.
// Essentially, Reset trumps Reliable Reset, so if we have to call shutdown again, we reset.
//

Expand Down Expand Up @@ -613,6 +613,15 @@ QuicStreamSendFlush(

CXPLAT_DBG_ASSERT(!(SendRequest->Flags & QUIC_SEND_FLAG_BUFFERED));

//
// If a send has the 'cancel on loss' flag set, we irreversibly switch
// the associated stream over to that behavior.
//
if (!Stream->Flags.CancelOnLoss &&
(SendRequest->Flags & QUIC_SEND_FLAG_CANCEL_ON_LOSS) != 0) {
Stream->Flags.CancelOnLoss = TRUE;
}

if (!Stream->Flags.SendEnabled) {
//
// Only possible if they queue multiple sends, with a FIN flag set
Expand Down Expand Up @@ -1364,6 +1373,27 @@ QuicStreamOnLoss(
Done:

if (AddSendFlags != 0) {
//
// Check stream's 'cancel on loss' flag to determine how to handle
// the resends queued up at this point.
//
if (Stream->Flags.CancelOnLoss) {
QUIC_STREAM_EVENT Event;
Event.Type = QUIC_STREAM_EVENT_CANCEL_ON_LOSS;
Event.CANCEL_ON_LOSS.ErrorCode = 0;
(void)QuicStreamIndicateEvent(Stream, &Event);

//
// Immediately terminate stream (in both directions, if open)
// giving the error code from the app.
//
QuicStreamShutdown(
Stream,
QUIC_STREAM_SHUTDOWN_FLAG_ABORT,
Event.CANCEL_ON_LOSS.ErrorCode);

return FALSE; // Don't resend any data.
}

if (!Stream->Flags.InRecovery) {
Stream->Flags.InRecovery = TRUE; // TODO - Do we really need to be in recovery if no real data bytes need to be recovered?
Expand Down
20 changes: 20 additions & 0 deletions src/cs/lib/msquic_generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ internal enum QUIC_SEND_FLAGS
FIN = 0x0004,
DGRAM_PRIORITY = 0x0008,
DELAY_SEND = 0x0010,
CANCEL_ON_LOSS = 0x0020,
}

internal enum QUIC_DATAGRAM_SEND_STATE
Expand Down Expand Up @@ -2746,6 +2747,7 @@ internal enum QUIC_STREAM_EVENT_TYPE
SHUTDOWN_COMPLETE = 7,
IDEAL_SEND_BUFFER_SIZE = 8,
PEER_ACCEPTED = 9,
CANCEL_ON_LOSS = 10,
}

internal partial struct QUIC_STREAM_EVENT
Expand Down Expand Up @@ -2819,6 +2821,14 @@ internal ref _Anonymous_e__Union._IDEAL_SEND_BUFFER_SIZE_e__Struct IDEAL_SEND_BU
}
}

internal ref _Anonymous_e__Union._CANCEL_ON_LOSS_e__Struct CANCEL_ON_LOSS
{
get
{
return ref MemoryMarshal.GetReference(MemoryMarshal.CreateSpan(ref Anonymous.CANCEL_ON_LOSS, 1));
}
}

[StructLayout(LayoutKind.Explicit)]
internal partial struct _Anonymous_e__Union
{
Expand Down Expand Up @@ -2854,6 +2864,10 @@ internal partial struct _Anonymous_e__Union
[NativeTypeName("struct (anonymous struct)")]
internal _IDEAL_SEND_BUFFER_SIZE_e__Struct IDEAL_SEND_BUFFER_SIZE;

[FieldOffset(0)]
[NativeTypeName("struct (anonymous struct)")]
internal _CANCEL_ON_LOSS_e__Struct CANCEL_ON_LOSS;

internal partial struct _START_COMPLETE_e__Struct
{
[NativeTypeName("HRESULT")]
Expand Down Expand Up @@ -3011,6 +3025,12 @@ internal partial struct _IDEAL_SEND_BUFFER_SIZE_e__Struct
[NativeTypeName("uint64_t")]
internal ulong ByteCount;
}

internal partial struct _CANCEL_ON_LOSS_e__Struct
{
[NativeTypeName("QUIC_UINT62")]
internal ulong ErrorCode;
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/inc/msquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,14 @@ typedef enum QUIC_SEND_FLAGS {
QUIC_SEND_FLAG_FIN = 0x0004, // Indicates the request is the one last sent on the stream.
QUIC_SEND_FLAG_DGRAM_PRIORITY = 0x0008, // Indicates the datagram is higher priority than others.
QUIC_SEND_FLAG_DELAY_SEND = 0x0010, // Indicates the send should be delayed because more will be queued soon.
QUIC_SEND_FLAG_CANCEL_ON_LOSS = 0x0020, // Indicates that a stream is to be cancelled when packet loss is detected.
} QUIC_SEND_FLAGS;

DEFINE_ENUM_FLAG_OPERATORS(QUIC_SEND_FLAGS)

typedef enum QUIC_DATAGRAM_SEND_STATE {
QUIC_DATAGRAM_SEND_UNKNOWN, // Not yet sent.
QUIC_DATAGRAM_SEND_SENT, // Sent and awaiting acknowledegment
QUIC_DATAGRAM_SEND_SENT, // Sent and awaiting acknowledgment
QUIC_DATAGRAM_SEND_LOST_SUSPECT, // Suspected as lost, but still tracked
QUIC_DATAGRAM_SEND_LOST_DISCARDED, // Lost and not longer being tracked
QUIC_DATAGRAM_SEND_ACKNOWLEDGED, // Acknowledged
Expand Down Expand Up @@ -1385,6 +1386,7 @@ typedef enum QUIC_STREAM_EVENT_TYPE {
QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE = 7,
QUIC_STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE = 8,
QUIC_STREAM_EVENT_PEER_ACCEPTED = 9,
QUIC_STREAM_EVENT_CANCEL_ON_LOSS = 10,
} QUIC_STREAM_EVENT_TYPE;

typedef struct QUIC_STREAM_EVENT {
Expand Down Expand Up @@ -1430,6 +1432,9 @@ typedef struct QUIC_STREAM_EVENT {
struct {
uint64_t ByteCount;
} IDEAL_SEND_BUFFER_SIZE;
struct {
/* out */ QUIC_UINT62 ErrorCode;
} CANCEL_ON_LOSS;
};
} QUIC_STREAM_EVENT;

Expand Down
2 changes: 1 addition & 1 deletion src/inc/msquic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ struct MsQuicListener {
QUIC_STATUS
Start(
_In_ const MsQuicAlpn& Alpns,
_In_ const QUIC_ADDR* Address = nullptr
_In_opt_ const QUIC_ADDR* Address = nullptr
) noexcept {
return MsQuic->ListenerStart(Handle, Alpns, Alpns.Length(), Address);
}
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/dbg/quictypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ typedef union QUIC_STREAM_FLAGS {
BOOLEAN ReceiveCallPending : 1; // There is an uncompleted receive to the app.
BOOLEAN ReceiveCallActive : 1; // There is an active receive to the app.
BOOLEAN SendDelayed : 1; // A delayed send is currently queued.
BOOLEAN CancelOnLoss : 1; // Indicates that the stream is to be canceled
// if loss is detected.

BOOLEAN HandleSendShutdown : 1; // Send shutdown complete callback delivered.
BOOLEAN HandleShutdown : 1; // Shutdown callback delivered.
Expand Down
20 changes: 19 additions & 1 deletion src/test/MsQuicTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ QuicAbortiveTransfers(
_In_ QUIC_ABORTIVE_TRANSFER_FLAGS Flags
);

void
QuicCancelOnLossSend(
_In_ bool DropPackets
);

void
QuicTestCidUpdate(
_In_ int Family,
Expand Down Expand Up @@ -1253,4 +1258,17 @@ typedef struct {
#define IOCTL_QUIC_RUN_CONN_CLOSE_BEFORE_STREAM_CLOSE \
QUIC_CTL_CODE(117, METHOD_BUFFERED, FILE_WRITE_DATA)

#define QUIC_MAX_IOCTL_FUNC_CODE 117
#pragma pack(push)
#pragma pack(1)

typedef struct {
bool DropPackets;
} QUIC_RUN_CANCEL_ON_LOSS_PARAMS;

#pragma pack(pop)

#define IOCTL_QUIC_RUN_CANCEL_ON_LOSS \
QUIC_CTL_CODE(118, METHOD_BUFFERED, FILE_WRITE_DATA)
// QUIC_RUN_CANCEL_ON_LOSS_PARAMS

#define QUIC_MAX_IOCTL_FUNC_CODE 118
23 changes: 23 additions & 0 deletions src/test/bin/quic_gtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1973,6 +1973,20 @@ TEST_P(WithAbortiveArgs, AbortiveShutdown) {
}
}

#if QUIC_TEST_DATAPATH_HOOKS_ENABLED
TEST_P(WithCancelOnLossArgs, CancelOnLossSend) {
TestLoggerT<ParamType> Logger("QuicCancelOnLossSend", GetParam());
if (TestingKernelMode) {
QUIC_RUN_CANCEL_ON_LOSS_PARAMS Params = {
GetParam().DropPackets
};
ASSERT_TRUE(DriverClient.Run(IOCTL_QUIC_RUN_CANCEL_ON_LOSS, Params));
} else {
QuicCancelOnLossSend(GetParam().DropPackets);
}
}
#endif

TEST_P(WithCidUpdateArgs, CidUpdate) {
TestLoggerT<ParamType> Logger("QuicTestCidUpdate", GetParam());
if (TestingKernelMode) {
Expand Down Expand Up @@ -2437,6 +2451,15 @@ INSTANTIATE_TEST_SUITE_P(
WithAbortiveArgs,
testing::ValuesIn(AbortiveArgs::Generate()));

#if QUIC_TEST_DATAPATH_HOOKS_ENABLED

INSTANTIATE_TEST_SUITE_P(
Misc,
WithCancelOnLossArgs,
testing::ValuesIn(CancelOnLossArgs::Generate()));

#endif

INSTANTIATE_TEST_SUITE_P(
Misc,
WithCidUpdateArgs,
Expand Down
18 changes: 18 additions & 0 deletions src/test/bin/quic_gtest.h
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,24 @@ class WithAbortiveArgs : public testing::Test,
public testing::WithParamInterface<AbortiveArgs> {
};

struct CancelOnLossArgs {
bool DropPackets;
static ::std::vector<CancelOnLossArgs> Generate() {
::std::vector<CancelOnLossArgs> list;
for (bool DropPackets : {false, true})
list.push_back({ DropPackets });
return list;
}
};

std::ostream& operator << (std::ostream& o, const CancelOnLossArgs& args) {
return o << "DropPackets: " << (args.DropPackets ? "true" : "false");
}

class WithCancelOnLossArgs : public testing::Test,
public testing::WithParamInterface<CancelOnLossArgs> {
};

struct CidUpdateArgs {
int Family;
uint16_t Iterations;
Expand Down
7 changes: 7 additions & 0 deletions src/test/bin/winkernel/control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ size_t QUIC_IOCTL_BUFFER_SIZES[] =
0,
sizeof(INT32),
0,
sizeof(QUIC_RUN_CANCEL_ON_LOSS_PARAMS),
};

CXPLAT_STATIC_ASSERT(
Expand All @@ -528,6 +529,7 @@ typedef union {
QUIC_RUN_ABORTIVE_SHUTDOWN_PARAMS Params4;
QUIC_RUN_CID_UPDATE_PARAMS Params5;
QUIC_RUN_RECEIVE_RESUME_PARAMS Params6;
QUIC_RUN_CANCEL_ON_LOSS_PARAMS Params7;
UINT8 EnableKeepAlive;
UINT8 StopListenerFirst;
QUIC_RUN_DRILL_INITIAL_PACKET_CID_PARAMS DrillParams;
Expand Down Expand Up @@ -1419,6 +1421,11 @@ QuicTestCtlEvtIoDeviceControl(
QuicTestCtlRun(QuicTestConnectionCloseBeforeStreamClose());
break;

case IOCTL_QUIC_RUN_CANCEL_ON_LOSS:
CXPLAT_FRE_ASSERT(Params != nullptr);
QuicTestCtlRun(QuicCancelOnLossSend(Params->Params7.DropPackets));
break;

default:
Status = STATUS_NOT_IMPLEMENTED;
break;
Expand Down
Loading

0 comments on commit 9416cd6

Please sign in to comment.