Skip to content

Commit

Permalink
PROTON-2790: finer grained session flow control
Browse files Browse the repository at this point in the history
  • Loading branch information
Cliff Jansen committed Oct 9, 2024
1 parent dec9fdc commit 60ab050
Show file tree
Hide file tree
Showing 9 changed files with 533 additions and 30 deletions.
2 changes: 1 addition & 1 deletion c/docs/buffering.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ gets a @ref PN_LINK_FLOW event.

The AMQP protocol allows peers to exchange session limits so they can predict
their buffering requirements for incoming data (
`pn_session_set_incoming_capacity()` and
`pn_session_set_incoming_incoming_window_and_lwm()` and
`pn_session_set_outgoing_window()`). Proton will not exceed those limits when
sending to or receiving from the peer. However proton does *not* limit the
amount of data buffered in local memory at the request of the application. It
Expand Down
82 changes: 82 additions & 0 deletions c/include/proton/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ PN_EXTERN void pn_session_open(pn_session_t *session);
PN_EXTERN void pn_session_close(pn_session_t *session);

/**
* **Deprecated** - Use ::pn_session_incoming_window().
*
* Get the incoming capacity of the session measured in bytes.
*
* The incoming capacity of a session determines how much incoming
Expand All @@ -205,6 +207,8 @@ PN_EXTERN void pn_session_close(pn_session_t *session);
PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session);

/**
* **Deprecated** - Use ::pn_session_set_incoming_window_and_lwm().
*
* Set the incoming capacity for a session object.
*
* The incoming capacity of a session determines how much incoming message
Expand All @@ -223,6 +227,84 @@ PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session);
*/
PN_EXTERN void pn_session_set_incoming_capacity(pn_session_t *session, size_t capacity);

/**
* Get the maximum incoming window window for a session object.
*
* The maximum incoming window can be set by ::pn_session_set_incoming_window_and_lwm.
*
* @param[in] session the session object
* @return the maximum size of the incoming window or 0 if not set.
**/
PN_EXTERN pn_frame_count_t pn_session_incoming_window(pn_session_t *session);

/**
* Get the low water mark for the session incoming window.
*
* The low water mark governs how frequently the session updates the remote
* peer with changes to the incoming window.
*
* A value of zero indicates that Proton will choose a default strategy for
* updating the peer.
*
* The low water mark can be set by ::pn_session_set_incoming_window_and_lwm.
*
* @param[in] session the session object
* @return the low water mark of incoming window.
**/
PN_EXTERN pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t *session);

/**
* Set the maximum incoming window and low water mark for a session object.
*
* The session incoming window is a count of the number of AMQP transfer frames
* that can be accepted and buffered locally by the session object until
* processed by the application (i.e. consumed by ::pn_link_recv or dropped by
* ::pn_link_advance). The maximum bytes buffered by the session will never
* exceed (max_incoming_window * max_frame_size). The incoming window frame count
* decreases 1-1 with incoming AMQP transfer frames. Whenever the application
* processes the buffered incoming bytes, the incoming window increases to the
* largest frame count that can be used by the peer without causing the local
* buffered bytes to exceed the maximum stated above.
*
* The session will defer updating the peer with a changed incoming window until
* it drops below the low water mark (lwm). Too many updates can delay
* other traffic on the connection without providing improved performance on the
* session. Too few can leave a remote sender frequently unable to send due
* to a closed window. The best balance is application specific. Note that the
* session incoming window is always updated along with the link credit on any
* of its child links, so the frequency of link credit updates is also a
* consideration when choosing a low water mark.
*
* The low water mark must be less than or equal to the incoming window. If
* set to zero, Proton will choose a default strategy for updating the
* incoming window.
*
* This call is only valid before the call to ::pn_session_open on the session.
* Subsequently, the settings are fixed for the life of the session and only
* have effect if a max frame size is also set on the session's connection.
*
* @param[in] session the session object
* @param[in] window the maximum incoming window buffered by the session
* @param[in] lwm the low water mark (or 0 for default window updating)
*
* @return 0 on success, PN_ARG_ERR if window is zero or lwm is greater than
* window, or PN_STATE_ERR if the session is already open.
*/
PN_EXTERN int pn_session_set_incoming_window_and_lwm(pn_session_t *session, pn_frame_count_t window, pn_frame_count_t lwm);

/**
* Get the remote view of the incoming window for the session.
*
* This evaluates to the most recent incoming window value communicated by the
* peer minus any subsequent transfer frames for the session that have been
* sent. It does not include transfer frames that may be created in future
* for locally buffered content tracked by @ref pn_session_outgoing_bytes.
*
* @param[in] session the session object
* @return the remote incoming window
*/
PN_EXTERN pn_frame_count_t pn_session_remote_incoming_window(pn_session_t *session);

/**
* Get the outgoing window for a session object.
*
Expand Down
8 changes: 7 additions & 1 deletion c/include/proton/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,16 @@ PN_EXTERN uint32_t pn_transport_get_max_frame(pn_transport_t *transport);
/**
* Set the maximum frame size of a transport.
*
* The negotiated frame size cannot change over the life of the transport. After
* the transport has started sending AMQP frames to the peer, this function call
* has no effect. Typically, the maximum frame size is set when the transport is
* created.
*
* @param[in] transport a transport object
* @param[in] size the maximum frame size for the transport object
*
* @internal XXX Deprecate when moved to connection
* @internal XXX Deprecate when moved to connection, note size can change on
* reconnect with new transport, consider status return on new API.
*/
PN_EXTERN void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size);

Expand Down
7 changes: 7 additions & 0 deletions c/include/proton/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ extern "C" {
*/
typedef uint32_t pn_sequence_t;

/**
* A count or limit of AMQP transfer frames.
*
* @ingroup api_types
*/
typedef uint32_t pn_frame_count_t;

/**
* A span of time in milliseconds.
*
Expand Down
6 changes: 6 additions & 0 deletions c/src/core/engine-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ struct pn_session_t {
pn_sequence_t incoming_deliveries;
pn_sequence_t outgoing_deliveries;
pn_sequence_t outgoing_window;
pn_frame_count_t incoming_window_lwm;
pn_frame_count_t max_incoming_window;
bool check_flow;
bool need_flow;
bool lwm_default;
};

struct pn_terminus_t {
Expand Down Expand Up @@ -395,6 +400,7 @@ void pn_ep_incref(pn_endpoint_t *endpoint);
void pn_ep_decref(pn_endpoint_t *endpoint);

ssize_t pni_transport_grow_capacity(pn_transport_t *transport, size_t n);
void pni_session_update_incoming_lwm(pn_session_t *ssn);

#if __cplusplus
}
Expand Down
86 changes: 80 additions & 6 deletions c/src/core/engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,10 @@ pn_session_t *pn_session(pn_connection_t *conn)
ssn->outgoing_deliveries = 0;
ssn->outgoing_window = AMQP_MAX_WINDOW_SIZE;
ssn->local_handle_max = PN_IMPL_HANDLE_MAX;
ssn->incoming_window_lwm = 1;
ssn->check_flow = false;
ssn->need_flow = false;
ssn->lwm_default = true;

// begin transport state
memset(&ssn->state, 0, sizeof(ssn->state));
Expand Down Expand Up @@ -1095,11 +1099,74 @@ size_t pn_session_get_incoming_capacity(pn_session_t *ssn)
return ssn->incoming_capacity;
}

// Update required when (re)set by user or when session started (proxy: BEGIN frame). No
// session flow control actually means flow control with huge window, so set lwm to 1. There is
// low probability of a stall. Any link credit flow frame will update session credit too.
void pni_session_update_incoming_lwm(pn_session_t *ssn) {
if (ssn->incoming_capacity) {
// Old API.
if (!ssn->connection->transport)
return; // Defer until called again from BEGIN frame setup with max frame known.
if (ssn->connection->transport->local_max_frame) {
ssn->incoming_window_lwm = (ssn->incoming_capacity / ssn->connection->transport->local_max_frame) / 2;
if (!ssn->incoming_window_lwm)
ssn->incoming_window_lwm = 1; // Zero may hang.
} else {
ssn->incoming_window_lwm = 1;
}
} else if (ssn->max_incoming_window) {
// New API.
// Only need to deal with default. Called whensending BEGIN frame.
if (ssn->connection->transport && ssn->connection->transport->local_max_frame && ssn->lwm_default) {
ssn->incoming_window_lwm = (ssn->max_incoming_window + 1) / 2;
}
} else {
ssn->incoming_window_lwm = 1;
}
assert(ssn->incoming_window_lwm != 0); // 0 allows session flow to hang
}

void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity)
{
assert(ssn);
// XXX: should this trigger a flow?
ssn->incoming_capacity = capacity;
ssn->max_incoming_window = 0;
ssn->incoming_window_lwm = 1;
ssn->lwm_default = true;
if (ssn->connection->transport) {
ssn->check_flow = true;
ssn->need_flow = true;
pn_modified(ssn->connection, &ssn->endpoint, false);
}
pni_session_update_incoming_lwm(ssn);
// If capacity invalid, failure occurs when transport calculates value of incoming window.
}

int pn_session_set_incoming_window_and_lwm(pn_session_t *ssn, pn_frame_count_t window, pn_frame_count_t lwm) {
assert(ssn);
if (!window || (lwm && lwm > window))
return PN_ARG_ERR;
// Settings fixed after session open for simplicity. AMPQ actually allows dynamic change with risk
// of overflow if window reduced while transfers in flight.
if (ssn->endpoint.state & PN_LOCAL_ACTIVE)
return PN_STATE_ERR;
ssn->incoming_capacity = 0;
ssn->max_incoming_window = window;
ssn->lwm_default = (lwm == 0);
ssn->incoming_window_lwm = lwm;
return 0;
}

pn_frame_count_t pn_session_incoming_window(pn_session_t *ssn) {
return ssn->max_incoming_window;
}

pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t *ssn) {
return (!ssn->max_incoming_window || ssn->lwm_default) ? 0 : ssn->incoming_window_lwm;
}

pn_frame_count_t pn_session_remote_incoming_window(pn_session_t *ssn) {
return ssn->state.remote_incoming_window;
}

size_t pn_session_get_outgoing_window(pn_session_t *ssn)
Expand Down Expand Up @@ -1873,11 +1940,16 @@ static void pni_advance_receiver(pn_link_t *link)
link->session->incoming_deliveries--;

pn_delivery_t *current = link->current;
link->session->incoming_bytes -= pn_buffer_size(current->bytes);
size_t drop_count = pn_buffer_size(current->bytes);
pn_buffer_clear(current->bytes);

if (!link->session->state.incoming_window) {
pni_add_tpwork(current);
if (drop_count) {
pn_session_t *ssn = link->session;
ssn->incoming_bytes -= drop_count;
if (!ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm) {
ssn->check_flow = true;
pni_add_tpwork(current);
}
}

link->current = link->current->unsettled_next;
Expand Down Expand Up @@ -2025,8 +2097,10 @@ ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n)
size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
pn_buffer_trim(delivery->bytes, size, 0);
if (size) {
receiver->session->incoming_bytes -= size;
if (!receiver->session->state.incoming_window) {
pn_session_t *ssn = receiver->session;
ssn->incoming_bytes -= size;
if (!ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm) {
ssn->check_flow = true;
pni_add_tpwork(delivery);
}
return size;
Expand Down
Loading

0 comments on commit 60ab050

Please sign in to comment.