From 60ab050bd4da40fd845a0a329bb134bdb4e3903a Mon Sep 17 00:00:00 2001 From: Cliff Jansen Date: Tue, 8 Oct 2024 17:22:34 -0700 Subject: [PATCH] PROTON-2790: finer grained session flow control --- c/docs/buffering.md | 2 +- c/include/proton/session.h | 82 ++++++++ c/include/proton/transport.h | 8 +- c/include/proton/types.h | 7 + c/src/core/engine-internal.h | 6 + c/src/core/engine.c | 86 ++++++++- c/src/core/transport.c | 75 ++++++-- c/tests/connection_driver_test.cpp | 7 +- c/tests/engine_test.cpp | 290 +++++++++++++++++++++++++++++ 9 files changed, 533 insertions(+), 30 deletions(-) diff --git a/c/docs/buffering.md b/c/docs/buffering.md index 71dafbf7bd..32567a2f06 100644 --- a/c/docs/buffering.md +++ b/c/docs/buffering.md @@ -16,7 +16,7 @@ gets a @ref PN_LINK_FLOW event. The AMQP protocol allows peers to exchange session limits so they can predict their buffering requirements for incoming data ( -`pn_session_set_incoming_capacity()` and +`pn_session_set_incoming_incoming_window_and_lwm()` and `pn_session_set_outgoing_window()`). Proton will not exceed those limits when sending to or receiving from the peer. However proton does *not* limit the amount of data buffered in local memory at the request of the application. It diff --git a/c/include/proton/session.h b/c/include/proton/session.h index e09d41113f..ac30ccd2b7 100644 --- a/c/include/proton/session.h +++ b/c/include/proton/session.h @@ -194,6 +194,8 @@ PN_EXTERN void pn_session_open(pn_session_t *session); PN_EXTERN void pn_session_close(pn_session_t *session); /** + * **Deprecated** - Use ::pn_session_incoming_window(). + * * Get the incoming capacity of the session measured in bytes. * * The incoming capacity of a session determines how much incoming @@ -205,6 +207,8 @@ PN_EXTERN void pn_session_close(pn_session_t *session); PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session); /** + * **Deprecated** - Use ::pn_session_set_incoming_window_and_lwm(). + * * Set the incoming capacity for a session object. * * The incoming capacity of a session determines how much incoming message @@ -223,6 +227,84 @@ PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session); */ PN_EXTERN void pn_session_set_incoming_capacity(pn_session_t *session, size_t capacity); +/** + * Get the maximum incoming window window for a session object. + * + * The maximum incoming window can be set by ::pn_session_set_incoming_window_and_lwm. + * + * @param[in] session the session object + * @return the maximum size of the incoming window or 0 if not set. + **/ +PN_EXTERN pn_frame_count_t pn_session_incoming_window(pn_session_t *session); + +/** + * Get the low water mark for the session incoming window. + * + * The low water mark governs how frequently the session updates the remote + * peer with changes to the incoming window. + * + * A value of zero indicates that Proton will choose a default strategy for + * updating the peer. + * + * The low water mark can be set by ::pn_session_set_incoming_window_and_lwm. + * + * @param[in] session the session object + * @return the low water mark of incoming window. + **/ +PN_EXTERN pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t *session); + +/** + * Set the maximum incoming window and low water mark for a session object. + * + * The session incoming window is a count of the number of AMQP transfer frames + * that can be accepted and buffered locally by the session object until + * processed by the application (i.e. consumed by ::pn_link_recv or dropped by + * ::pn_link_advance). The maximum bytes buffered by the session will never + * exceed (max_incoming_window * max_frame_size). The incoming window frame count + * decreases 1-1 with incoming AMQP transfer frames. Whenever the application + * processes the buffered incoming bytes, the incoming window increases to the + * largest frame count that can be used by the peer without causing the local + * buffered bytes to exceed the maximum stated above. + * + * The session will defer updating the peer with a changed incoming window until + * it drops below the low water mark (lwm). Too many updates can delay + * other traffic on the connection without providing improved performance on the + * session. Too few can leave a remote sender frequently unable to send due + * to a closed window. The best balance is application specific. Note that the + * session incoming window is always updated along with the link credit on any + * of its child links, so the frequency of link credit updates is also a + * consideration when choosing a low water mark. + * + * The low water mark must be less than or equal to the incoming window. If + * set to zero, Proton will choose a default strategy for updating the + * incoming window. + * + * This call is only valid before the call to ::pn_session_open on the session. + * Subsequently, the settings are fixed for the life of the session and only + * have effect if a max frame size is also set on the session's connection. + * + * @param[in] session the session object + * @param[in] window the maximum incoming window buffered by the session + * @param[in] lwm the low water mark (or 0 for default window updating) + * + * @return 0 on success, PN_ARG_ERR if window is zero or lwm is greater than + * window, or PN_STATE_ERR if the session is already open. + */ +PN_EXTERN int pn_session_set_incoming_window_and_lwm(pn_session_t *session, pn_frame_count_t window, pn_frame_count_t lwm); + +/** + * Get the remote view of the incoming window for the session. + * + * This evaluates to the most recent incoming window value communicated by the + * peer minus any subsequent transfer frames for the session that have been + * sent. It does not include transfer frames that may be created in future + * for locally buffered content tracked by @ref pn_session_outgoing_bytes. + * + * @param[in] session the session object + * @return the remote incoming window + */ +PN_EXTERN pn_frame_count_t pn_session_remote_incoming_window(pn_session_t *session); + /** * Get the outgoing window for a session object. * diff --git a/c/include/proton/transport.h b/c/include/proton/transport.h index 8b8b66403d..3aea0f2816 100644 --- a/c/include/proton/transport.h +++ b/c/include/proton/transport.h @@ -432,10 +432,16 @@ PN_EXTERN uint32_t pn_transport_get_max_frame(pn_transport_t *transport); /** * Set the maximum frame size of a transport. * + * The negotiated frame size cannot change over the life of the transport. After + * the transport has started sending AMQP frames to the peer, this function call + * has no effect. Typically, the maximum frame size is set when the transport is + * created. + * * @param[in] transport a transport object * @param[in] size the maximum frame size for the transport object * - * @internal XXX Deprecate when moved to connection + * @internal XXX Deprecate when moved to connection, note size can change on + * reconnect with new transport, consider status return on new API. */ PN_EXTERN void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size); diff --git a/c/include/proton/types.h b/c/include/proton/types.h index 2fed2b74d6..b0b8faae2d 100644 --- a/c/include/proton/types.h +++ b/c/include/proton/types.h @@ -141,6 +141,13 @@ extern "C" { */ typedef uint32_t pn_sequence_t; +/** + * A count or limit of AMQP transfer frames. + * + * @ingroup api_types + */ +typedef uint32_t pn_frame_count_t; + /** * A span of time in milliseconds. * diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index 2e836b8cf9..62aa124345 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -272,6 +272,11 @@ struct pn_session_t { pn_sequence_t incoming_deliveries; pn_sequence_t outgoing_deliveries; pn_sequence_t outgoing_window; + pn_frame_count_t incoming_window_lwm; + pn_frame_count_t max_incoming_window; + bool check_flow; + bool need_flow; + bool lwm_default; }; struct pn_terminus_t { @@ -395,6 +400,7 @@ void pn_ep_incref(pn_endpoint_t *endpoint); void pn_ep_decref(pn_endpoint_t *endpoint); ssize_t pni_transport_grow_capacity(pn_transport_t *transport, size_t n); + void pni_session_update_incoming_lwm(pn_session_t *ssn); #if __cplusplus } diff --git a/c/src/core/engine.c b/c/src/core/engine.c index 3d3de7ade3..4ad8c4b05e 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -1049,6 +1049,10 @@ pn_session_t *pn_session(pn_connection_t *conn) ssn->outgoing_deliveries = 0; ssn->outgoing_window = AMQP_MAX_WINDOW_SIZE; ssn->local_handle_max = PN_IMPL_HANDLE_MAX; + ssn->incoming_window_lwm = 1; + ssn->check_flow = false; + ssn->need_flow = false; + ssn->lwm_default = true; // begin transport state memset(&ssn->state, 0, sizeof(ssn->state)); @@ -1095,11 +1099,74 @@ size_t pn_session_get_incoming_capacity(pn_session_t *ssn) return ssn->incoming_capacity; } +// Update required when (re)set by user or when session started (proxy: BEGIN frame). No +// session flow control actually means flow control with huge window, so set lwm to 1. There is +// low probability of a stall. Any link credit flow frame will update session credit too. +void pni_session_update_incoming_lwm(pn_session_t *ssn) { + if (ssn->incoming_capacity) { + // Old API. + if (!ssn->connection->transport) + return; // Defer until called again from BEGIN frame setup with max frame known. + if (ssn->connection->transport->local_max_frame) { + ssn->incoming_window_lwm = (ssn->incoming_capacity / ssn->connection->transport->local_max_frame) / 2; + if (!ssn->incoming_window_lwm) + ssn->incoming_window_lwm = 1; // Zero may hang. + } else { + ssn->incoming_window_lwm = 1; + } + } else if (ssn->max_incoming_window) { + // New API. + // Only need to deal with default. Called whensending BEGIN frame. + if (ssn->connection->transport && ssn->connection->transport->local_max_frame && ssn->lwm_default) { + ssn->incoming_window_lwm = (ssn->max_incoming_window + 1) / 2; + } + } else { + ssn->incoming_window_lwm = 1; + } + assert(ssn->incoming_window_lwm != 0); // 0 allows session flow to hang +} + void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity) { assert(ssn); - // XXX: should this trigger a flow? ssn->incoming_capacity = capacity; + ssn->max_incoming_window = 0; + ssn->incoming_window_lwm = 1; + ssn->lwm_default = true; + if (ssn->connection->transport) { + ssn->check_flow = true; + ssn->need_flow = true; + pn_modified(ssn->connection, &ssn->endpoint, false); + } + pni_session_update_incoming_lwm(ssn); + // If capacity invalid, failure occurs when transport calculates value of incoming window. +} + +int pn_session_set_incoming_window_and_lwm(pn_session_t *ssn, pn_frame_count_t window, pn_frame_count_t lwm) { + assert(ssn); + if (!window || (lwm && lwm > window)) + return PN_ARG_ERR; + // Settings fixed after session open for simplicity. AMPQ actually allows dynamic change with risk + // of overflow if window reduced while transfers in flight. + if (ssn->endpoint.state & PN_LOCAL_ACTIVE) + return PN_STATE_ERR; + ssn->incoming_capacity = 0; + ssn->max_incoming_window = window; + ssn->lwm_default = (lwm == 0); + ssn->incoming_window_lwm = lwm; + return 0; +} + +pn_frame_count_t pn_session_incoming_window(pn_session_t *ssn) { + return ssn->max_incoming_window; +} + +pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t *ssn) { + return (!ssn->max_incoming_window || ssn->lwm_default) ? 0 : ssn->incoming_window_lwm; +} + +pn_frame_count_t pn_session_remote_incoming_window(pn_session_t *ssn) { + return ssn->state.remote_incoming_window; } size_t pn_session_get_outgoing_window(pn_session_t *ssn) @@ -1873,11 +1940,16 @@ static void pni_advance_receiver(pn_link_t *link) link->session->incoming_deliveries--; pn_delivery_t *current = link->current; - link->session->incoming_bytes -= pn_buffer_size(current->bytes); + size_t drop_count = pn_buffer_size(current->bytes); pn_buffer_clear(current->bytes); - if (!link->session->state.incoming_window) { - pni_add_tpwork(current); + if (drop_count) { + pn_session_t *ssn = link->session; + ssn->incoming_bytes -= drop_count; + if (!ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm) { + ssn->check_flow = true; + pni_add_tpwork(current); + } } link->current = link->current->unsettled_next; @@ -2025,8 +2097,10 @@ ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n) size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes); pn_buffer_trim(delivery->bytes, size, 0); if (size) { - receiver->session->incoming_bytes -= size; - if (!receiver->session->state.incoming_window) { + pn_session_t *ssn = receiver->session; + ssn->incoming_bytes -= size; + if (!ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm) { + ssn->check_flow = true; pni_add_tpwork(delivery); } return size; diff --git a/c/src/core/transport.c b/c/src/core/transport.c index 89a50f70b5..d896257972 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -1101,6 +1101,7 @@ int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, } else { ssn = pn_session(transport->connection); } + ssn->state.remote_incoming_window = incoming_window; ssn->state.incoming_transfer_count = next; if (handle_max_q) { ssn->state.remote_handle_max = handle_max; @@ -1459,9 +1460,11 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann ssn->state.incoming_transfer_count++; ssn->state.incoming_window--; - // XXX: need better policy for when to refresh window - if (!ssn->state.incoming_window && (int32_t) link->state.local_handle >= 0) { - pni_post_flow(transport, ssn, link); + if ((int32_t) link->state.local_handle >= 0 && ssn->state.incoming_window < ssn->incoming_window_lwm) { + if (!ssn->check_flow) { + ssn->check_flow = true; + pn_modified(ssn->connection, &link->endpoint, false); + } } return 0; @@ -1873,23 +1876,35 @@ static size_t pni_session_outgoing_window(pn_session_t *ssn) return ssn->outgoing_window; } -static size_t pni_session_incoming_window(pn_session_t *ssn) +// Calculate the available incomming window +static pn_frame_count_t pni_session_incoming_window(pn_session_t *ssn) { pn_transport_t *t = ssn->connection->transport; uint32_t size = t->local_max_frame; size_t capacity = ssn->incoming_capacity; - if (!size || !capacity) { /* session flow control is not enabled */ + if (!size || (!capacity && !ssn->max_incoming_window)) { /* session flow control is not enabled */ return AMQP_MAX_WINDOW_SIZE; - } else if (capacity >= size) { /* precondition */ - return (capacity - ssn->incoming_bytes) / size; - } else { /* error: we will never have a non-zero window */ - pn_condition_format( - pn_transport_condition(t), - "amqp:internal-error", - "session capacity %zu is less than frame size %" PRIu32, - capacity, size); - pn_transport_close_tail(t); - return 0; + } + // Calculate depending on whether application specified capacity or a frame count. + assert(capacity || ssn->max_incoming_window); + if (capacity) { + // Old API + if (capacity >= size) { /* precondition */ + return capacity > ssn->incoming_bytes ? (pn_frame_count_t) (capacity - ssn->incoming_bytes) / size : 0; + } else { /* error: we will never have a non-zero window */ + pn_condition_format( + pn_transport_condition(t), + "amqp:internal-error", + "session capacity %zu is less than frame size %" PRIu32, + capacity, size); + pn_transport_close_tail(t); + return 0; + } + } else { + // New API + // Find smallest number of frames that could have sent the buffered bytes. + pn_frame_count_t nominal_fc = (ssn->incoming_bytes + size - 1) / size; + return nominal_fc >= ssn->max_incoming_window ? 0 : ssn->max_incoming_window - nominal_fc; } } @@ -1920,6 +1935,8 @@ static int pni_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpo pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_WARNING, "unable to find an open available channel within limit of %u", transport->channel_max ); return PN_ERR; } + if (ssn->incoming_capacity || ssn->max_incoming_window) + pni_session_update_incoming_lwm(ssn); state->incoming_window = pni_session_incoming_window(ssn); state->outgoing_window = pni_session_outgoing_window(ssn); /* "DL[?HIIII]" */ @@ -2055,10 +2072,12 @@ static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endp static int pni_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t *link) { + ssn->check_flow = false; + ssn->need_flow = false; ssn->state.incoming_window = pni_session_incoming_window(ssn); ssn->state.outgoing_window = pni_session_outgoing_window(ssn); bool linkq = (bool) link; - pn_link_state_t *state = &link->state; + pn_link_state_t *state = linkq ? &link->state : NULL; /* "DL[?IIII?I?I?In?o]" */ pn_bytes_t buf = pn_amqp_encode_DLEQIIIIQIQIQInQoe(&transport->scratch_space, FLOW, (int16_t) ssn->state.remote_channel >= 0, ssn->state.incoming_transfer_count, @@ -2072,6 +2091,17 @@ static int pni_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t return pn_framing_send_amqp(transport, ssn->state.local_channel, buf); } +static inline bool pni_session_need_flow(pn_session_t *ssn) { + if (ssn->need_flow) + return true; + if (ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm && + pni_session_incoming_window(ssn) > ssn->state.incoming_window) + return true; + + ssn->check_flow = false; + return false; +} + static int pni_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t *endpoint) { if (endpoint->type == RECEIVER && endpoint->state & PN_LOCAL_ACTIVE) @@ -2081,7 +2111,7 @@ static int pni_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t *e pn_link_state_t *state = &rcv->state; if ((int16_t) ssn->state.local_channel >= 0 && (int32_t) state->local_handle >= 0 && - ((rcv->drain || state->link_credit != rcv->credit - rcv->queued) || !ssn->state.incoming_window)) { + ((rcv->drain || state->link_credit != rcv->credit - rcv->queued) || pni_session_need_flow(ssn))) { state->link_credit = rcv->credit - rcv->queued; return pni_post_flow(transport, ssn, rcv); } @@ -2244,8 +2274,7 @@ static int pni_process_tpwork_receiver(pn_transport_t *transport, pn_delivery_t if (err) return err; } - // XXX: need to centralize this policy and improve it - if (!ssn->state.incoming_window) { + if (pni_session_need_flow(ssn)) { int err = pni_post_flow(transport, ssn, link); if (err) return err; } @@ -2299,6 +2328,10 @@ static int pni_process_flush_disp(pn_transport_t *transport, pn_endpoint_t *endp { int err = pni_flush_disp(transport, session); if (err) return err; + if (session->need_flow) { + err = pni_post_flow(transport, session, NULL); + if (err) return err; + } } } @@ -2832,6 +2865,10 @@ uint32_t pn_transport_get_max_frame(pn_transport_t *transport) void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size) { + if (transport->open_sent) { + pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_WARNING, "Cannot change local max-frame after OPEN frame sent."); + return; + } // if size == 0, no advertised limit to input frame size. if (size && size < AMQP_MIN_MAX_FRAME_SIZE) size = AMQP_MIN_MAX_FRAME_SIZE; diff --git a/c/tests/connection_driver_test.cpp b/c/tests/connection_driver_test.cpp index 46dcb9c599..ff1b597c0a 100644 --- a/c/tests/connection_driver_test.cpp +++ b/c/tests/connection_driver_test.cpp @@ -397,7 +397,8 @@ TEST_CASE("driver_message_abort_mixed") { static void set_capacity_and_max_frame(size_t capacity, size_t max_frame, pn_test::driver_pair &d, const char *data) { - pn_transport_set_max_frame(d.client.transport, max_frame); + if (pn_transport_get_max_frame(d.client.transport) != max_frame) + pn_transport_set_max_frame(d.client.transport, max_frame); pn_connection_open(d.client.connection); pn_session_t *ssn = pn_session(d.client.connection); pn_session_set_incoming_capacity(ssn, capacity); @@ -444,11 +445,11 @@ TEST_CASE("driver_session_flow_control") { } /* Capacity smaller than frame size is an error */ - set_capacity_and_max_frame(1234, 12345, d, "foo"); + set_capacity_and_max_frame(1233, 1234, d, "foo"); CHECK_THAT( *client.last_condition, cond_matches("amqp:internal-error", - "session capacity 1234 is less than frame size 12345")); + "session capacity 1233 is less than frame size 1234")); free(buf.start); } diff --git a/c/tests/engine_test.cpp b/c/tests/engine_test.cpp index f8b174e7cd..2a48255458 100644 --- a/c/tests/engine_test.cpp +++ b/c/tests/engine_test.cpp @@ -22,6 +22,7 @@ #include "./pn_test.hpp" #include +#include using namespace pn_test; @@ -366,3 +367,292 @@ TEST_CASE("link_properties)") { pn_transport_free(t2); pn_connection_free(c2); } + +static ssize_t link_send(pn_link_t *s, size_t n) { + char buf[5120]; + if (n > 5120) return PN_ARG_ERR; + memset(buf, 'x', n); + return pn_link_send(s, buf, n); +} + +static ssize_t link_recv(pn_link_t *r, size_t n) { + char buf[5120]; + if (n > 5120) return PN_ARG_ERR; + return pn_link_recv(r, buf, n); +} + +TEST_CASE("session_capacity") { + pn_connection_t *c1 = pn_connection(); + pn_transport_t *t1 = pn_transport(); + pn_transport_bind(t1, c1); + + pn_connection_t *c2 = pn_connection(); + pn_transport_t *t2 = pn_transport(); + pn_transport_set_server(t2); + // Use 1K max frame size for test. + pn_transport_set_max_frame(t2, 1024); + pn_transport_bind(t2, c2); + + pn_connection_open(c1); + pn_connection_open(c2); + + pn_session_t *s1 = pn_session(c1); + REQUIRE(pn_session_get_incoming_capacity(s1) == 0); + pn_session_open(s1); + + pn_link_t *tx = pn_sender(s1, "tx"); + pn_link_open(tx); + + while (pump(t1, t2)) { + process_endpoints(c1); + process_endpoints(c2); + } + + // session and link should be up, c2 should have a receiver link: + REQUIRE(pn_link_state(tx) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)); + pn_session_t *s2 = pn_session_head(c2, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)); + pn_link_t *rx = pn_link_head(c2, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)); + REQUIRE(s2 != NULL); + REQUIRE(rx != NULL); + + REQUIRE(pn_session_get_incoming_capacity(s2) == 0); + + // Don't count partial max frame. + pn_session_set_incoming_capacity(s2, (1024 * 4) + 512); // 4.5 max frame + pn_link_flow(rx, 1); + while (pump(t1, t2)); + REQUIRE(pn_session_remote_incoming_window(s1) == 4); + REQUIRE(pn_link_credit(tx) > 0); + + // Send frames and check window. + + // This is complicated by messy accounting: max_frame_size is a proxy for frames buffered on the + // receiver side, but payload per transfer frame is strictly less than max frame size due to + // frame headers. For this test 997 bytes of payload fits in a 1024 byte transfer frame. + // Senders and receivers count/update frames a bit differently. + + size_t payloadsz = 997; + size_t onefrm = 1 * payloadsz; + size_t fourfrm = 4 * payloadsz; + size_t fivefrm = 5 * payloadsz; + + pn_delivery_t *d1 = pn_delivery(tx, pn_dtag("tag-1", 6)); + REQUIRE(link_send(tx, fivefrm) == (ssize_t) fivefrm); + while (pump(t1, t2)); + // Expect 4 frames sent and 1 remaining, window 0 + pn_delivery_t *d2 = pn_link_current(rx); + REQUIRE(d2); + REQUIRE(pn_delivery_pending(d2) == fourfrm); + REQUIRE(pn_delivery_partial(d2)); + REQUIRE(pn_delivery_pending(d1) == onefrm); + REQUIRE(pn_session_remote_incoming_window(s1) == 0); + + // Extract 3 frames. tx can send remaining bytes. + REQUIRE(link_recv(rx, 3072) == 3072); + while (pump(t1, t2)); + // Window should be 2 + REQUIRE(pn_delivery_pending(d1) == 0); + int remaining = pn_delivery_pending(d2); + REQUIRE(link_recv(rx, 5120) == remaining); + while (pump(t1, t2)); + + pn_transport_unbind(t1); + pn_transport_free(t1); + pn_connection_free(c1); + + pn_transport_unbind(t2); + pn_transport_free(t2); + pn_connection_free(c2); +} + +TEST_CASE("session_window") { + // 1 = client/sender, 2=server/receiver + // "a" = default lwm, "b" = user specified lwm + pn_connection_t *c1 = pn_connection(); + pn_transport_t *t1 = pn_transport(); + pn_transport_bind(t1, c1); + + pn_connection_t *c2 = pn_connection(); + pn_transport_t *t2 = pn_transport(); + pn_transport_set_server(t2); + // Use 1K max frame size for test. + pn_transport_set_max_frame(t2, 1024); + pn_transport_bind(t2, c2); + pn_connection_open(c1); + pn_connection_open(c2); + + // s0 not used for transfers, only for non-runtime checks. + pn_session_t *s0 = pn_session(c1); + REQUIRE(pn_session_incoming_window(s0) == 0); + REQUIRE(pn_session_incoming_window_lwm(s0) == 0); + // Incoming window arg 0 + REQUIRE(pn_session_set_incoming_window_and_lwm(s0, 0, 0) == PN_ARG_ERR); + // lwm > incoming window + REQUIRE(pn_session_set_incoming_window_and_lwm(s0, 5, 6) == PN_ARG_ERR); + REQUIRE(pn_session_set_incoming_window_and_lwm(s0, 6, 5) == 0); + pn_session_open(s0); + // Check can't change after open + REQUIRE(pn_session_set_incoming_window_and_lwm(s0, 7, 3) == PN_STATE_ERR); + REQUIRE(pn_session_incoming_window(s0) == 6); + REQUIRE(pn_session_incoming_window_lwm(s0) == 5); + + // Set up sessions for transfers + pn_session_t *s2a = pn_session(c2); + pn_session_t *s2b = pn_session(c2); + // Test relies on knowing implemented default lwm is ((max_incoming_window + 1) / 2) + REQUIRE(pn_session_set_incoming_window_and_lwm(s2a, 4, 0) == 0); // lwm will be 2 + REQUIRE(pn_session_set_incoming_window_and_lwm(s2b, 4, 3) == 0); + pn_link_t *rxa = pn_receiver(s2a, "linka"); + pn_link_t *rxb = pn_receiver(s2b, "linkb"); + pn_session_open(s2a); + pn_session_open(s2b); + pn_link_open(rxa); + pn_link_open(rxb); + pn_link_flow(rxa, 1); + pn_link_flow(rxb, 1); + + + while (pump(t1, t2)) { + process_endpoints(c1); + process_endpoints(c2); + } + + // sessions and links should be up, c2 should have two receiver links + REQUIRE(pn_link_state(rxa) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)); + REQUIRE(pn_link_state(rxb) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)); + pn_link_t *txa = pn_link_head(c1, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)); + REQUIRE(strcmp("linka", pn_link_name(txa)) == 0); + pn_link_t *txb = pn_link_next(txa, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)); + REQUIRE(strcmp("linkb", pn_link_name(txb)) == 0); + pn_session_t *s1a = pn_link_session(txa); + pn_session_t *s1b = pn_link_session(txb); + REQUIRE(pn_session_remote_incoming_window(s1a) == 4); + REQUIRE(pn_session_remote_incoming_window(s1b) == 4); + + // Send frames and check window. + + // This is complicated by messy accounting: max_frame_size is a proxy for frames buffered on the + // receiver side, but payload per transfer frame is strictly less than max frame size due to + // frame headers. For this test 997 bytes of payload fits in a 1024 byte transfer frame. + // Senders and receivers count/update frames a bit differently. + + size_t payloadsz = 997; + size_t onefrm = 1 * payloadsz; + size_t fourfrm = 4 * payloadsz; + size_t fivefrm = 5 * payloadsz; + + REQUIRE(pn_link_credit(txa) > 0); + REQUIRE(pn_link_credit(txb) > 0); + pn_delivery_t *dta1 = pn_delivery(txa, pn_dtag("dt-a1", 6)); + pn_delivery_t *dtb1 = pn_delivery(txb, pn_dtag("dt-b1", 6)); + REQUIRE(link_send(txa, fivefrm) == (ssize_t) fivefrm); + REQUIRE(link_send(txb, fivefrm) == (ssize_t) fivefrm); + while (pump(t1, t2)); + + // Expect 4 frames sent and 1 remaining, window 0 + // linka + pn_delivery_t *dra1 = pn_link_current(rxa); + REQUIRE(dra1); + REQUIRE(pn_delivery_pending(dra1) == fourfrm); + REQUIRE(pn_delivery_partial(dra1)); + REQUIRE(pn_delivery_pending(dta1) == onefrm); + REQUIRE(pn_session_remote_incoming_window(s1a) == 0); + // linkb + pn_delivery_t *drb1 = pn_link_current(rxb); + REQUIRE(drb1); + REQUIRE(pn_delivery_pending(drb1) == fourfrm); + REQUIRE(pn_delivery_partial(drb1)); + REQUIRE(pn_delivery_pending(dtb1) == onefrm); + REQUIRE(pn_session_remote_incoming_window(s1b) == 0); + + // Extract 3 frames, tx can send remaining bytes in one frame. + REQUIRE(link_recv(rxa, 3072) == 3072); + REQUIRE(link_recv(rxb, 3072) == 3072); + while (pump(t1, t2)); + REQUIRE(pn_delivery_pending(dta1) == 0); + REQUIRE(pn_delivery_pending(dtb1) == 0); + // Window should be 2 as seen by sender + REQUIRE(pn_session_remote_incoming_window(s1a) == 2); + REQUIRE(pn_session_remote_incoming_window(s1b) == 2); + + // Drain receivers. "b" is below lwm so peer gets update. Opposite for "a". + int remaining = pn_delivery_pending(dra1); + REQUIRE(link_recv(rxa, 5120) == remaining); + remaining = pn_delivery_pending(drb1); + REQUIRE(link_recv(rxb, 5120) == remaining); + while (pump(t1, t2)); + REQUIRE(pn_session_remote_incoming_window(s1a) == 2); + REQUIRE(pn_session_remote_incoming_window(s1b) == 4); + + // Send and consume one more frame. Now "a" incoming_window drops below lwm but "b" does not. + REQUIRE(link_send(txa, onefrm) == (ssize_t) onefrm); + REQUIRE(link_send(txb, onefrm) == (ssize_t) onefrm); + REQUIRE(xfer(t1,t2) > 0); + REQUIRE(pn_session_remote_incoming_window(s1a) == 1); + REQUIRE(pn_session_remote_incoming_window(s1b) == 3); + remaining = pn_delivery_pending(dra1); + REQUIRE(link_recv(rxa, 5120) == remaining); + remaining = pn_delivery_pending(drb1); + REQUIRE(link_recv(rxb, 5120) == remaining); + while (pump(t1, t2)); + REQUIRE(pn_session_remote_incoming_window(s1a) == 4); + REQUIRE(pn_session_remote_incoming_window(s1b) == 3); + + pn_transport_unbind(t1); + pn_transport_free(t1); + pn_connection_free(c1); + + pn_transport_unbind(t2); + pn_transport_free(t2); + pn_connection_free(c2); +} + +TEST_CASE("max_frame") { + const uint32_t amqp_min_max_frame_size = 512; + pn_connection_t *c1 = pn_connection(); + pn_transport_t *t1 = pn_transport(); + pn_transport_bind(t1, c1); + + pn_connection_t *c2 = pn_connection(); + pn_transport_t *t2 = pn_transport(); + pn_transport_set_server(t2); + + // Can set to zero, i.e. no max frame + pn_transport_set_max_frame(t2, 0); + REQUIRE(pn_transport_get_max_frame(t2) == 0); + // Restricted to AMQP minimum. + pn_transport_set_max_frame(t2, 1); + REQUIRE(pn_transport_get_max_frame(t2) == amqp_min_max_frame_size); + // Otherwise OK + pn_transport_set_max_frame(t2, amqp_min_max_frame_size + 1); + REQUIRE(pn_transport_get_max_frame(t2) == amqp_min_max_frame_size + 1); + pn_transport_set_max_frame(t2, UINT32_MAX); + REQUIRE(pn_transport_get_max_frame(t2) == UINT32_MAX); + + // Can still change post bind + pn_transport_bind(t2, c2); + pn_transport_set_max_frame(t2, 4096); + REQUIRE(pn_transport_get_max_frame(t2) == 4096); + + pn_connection_open(c1); + pn_connection_open(c2); + while (pump(t1, t2)) { + process_endpoints(c1); + process_endpoints(c2); + } + REQUIRE(pn_connection_state(c2) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)); + + // No change allowed after peers have negotiated OPEN frame. + pn_transport_set_max_frame(t2, 4097); // Should be silently ignored. + REQUIRE(pn_transport_get_max_frame(t2) == 4096); + pn_transport_set_max_frame(t2, 0); // Can't turn off either. + REQUIRE(pn_transport_get_max_frame(t2) == 4096); + + pn_transport_unbind(t1); + pn_transport_free(t1); + pn_connection_free(c1); + + pn_transport_unbind(t2); + pn_transport_free(t2); + pn_connection_free(c2); +}