From 4262dd0e7f71d6db7d1db66cffdc3c736f9bbb87 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Tue, 28 Jan 2025 15:26:16 -0700 Subject: [PATCH] Fix buffer posting and FI_EAGAIN When a receive buffer is flagged as full via FI_MULTI_RECV re-post it immediately while libfabric fills the other buffer. Also, when fi_recvmsg returns FI_EAGAIN try again after calling fi_cq_read to progress the endpoint. Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 136 +++++++++++++++++++++++--------- 1 file changed, 99 insertions(+), 37 deletions(-) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index 12d055b80386..974e72b1983b 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -3165,6 +3165,25 @@ void init_ofiForAms(void) { // set up two of these and swap back and forth between them, to hedge // against receiving "buffer filled and released" events out of order // with respect to the messages stored within them. + + // There are two receive buffers and we alternate between them. If there + // were only one buffer then there might be a window during which there is + // no available buffer space because we are processing the last message in + // the buffer while new messages are still being sent. Instead, we + // double-buffer. When the current buffer has been consumed up to a + // threshold (defined by FI_OPT_MIN_MULTI_RECV above), libfabric will tell + // us the buffer is (almost) full by setting the FI_MULTI_RECV flag in a + // completion event and switch to the other buffer. In response we re-post + // the buffer. + // + // One issue is knowing when there are no lingering dependencies on a buffer + // so we can repost it, since doing so will cause it to be filled with new + // messages. There are two types of active messages in the buffer; some are + // handled synchrously by the active message handler itself, and some are + // handled asynchronously by calling chpl_task_startMovedTask to create a + // new task to execute the active message. chpl_task_startMovedTask copies + // its arguments, so in either case there are no lingering dependencies on + // the message buffer. // CHPL_CALLOC_SZ(amLZs[0], 1, amLZSize); CHPL_CALLOC_SZ(amLZs[1], 1, amLZSize); @@ -3190,16 +3209,16 @@ void init_ofiForAms(void) { ofi_rxBuffer = ofi_msg_reqs[0].msg_iov->iov_base; ofi_rxEnd = (void *) ((char *) ofi_rxBuffer + ofi_msg_reqs[0].msg_iov->iov_len); + for (int i = 0; i < 2; i++) { memset(ofi_msg_reqs[i].msg_iov->iov_base, '\0', ofi_msg_reqs[i].msg_iov->iov_len); OFI_CHK(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[i], FI_MULTI_RECV)); DBG_PRINTF(DBG_AM_BUF, - "pre-post fi_recvmsg(AMLZs %p, len %#zx)", + "post fi_recvmsg(AMLZs %p, len %#zx)", ofi_msg_reqs[i].msg_iov->iov_base, ofi_msg_reqs[i].msg_iov->iov_len); } - init_amHandling(); } @@ -5117,6 +5136,30 @@ void processRxAmReqCntr(void) { ofi_rxCount += todo; } +// +// Post a receive buffer. +// +static +chpl_bool postBuffer(int i) { + chpl_bool posted = true; + int rc; + OFI_CHK_2(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[i], FI_MULTI_RECV), rc, + -FI_EAGAIN); + if (rc == -FI_EAGAIN) { + DBG_PRINTF(DBG_AM_BUF, + "(re)post fi_recvmsg(AMLZs %p, len %#zx) returned EAGAIN", + ofi_msg_reqs[i].msg_iov->iov_base, + ofi_msg_reqs[i].msg_iov->iov_len); + posted = false; + } else { + DBG_PRINTF(DBG_AM_BUF, + "(re)post fi_recvmsg(AMLZs %p, len %#zx) succeeded", + ofi_msg_reqs[i].msg_iov->iov_base, + ofi_msg_reqs[i].msg_iov->iov_len); + } + return posted; +} + static void processRxAmReqCQ(void) { // @@ -5125,44 +5168,63 @@ void processRxAmReqCQ(void) { struct fi_cq_data_entry cqes[5]; const size_t maxEvents = sizeof(cqes) / sizeof(cqes[0]); ssize_t ret; - CHK_TRUE((ret = fi_cq_read(ofi_rxCQ, cqes, maxEvents)) > 0 - || ret == -FI_EAGAIN - || ret == -FI_EAVAIL); - if (ret == -FI_EAVAIL) { - reportCQError(ofi_rxCQ); - } - - const size_t numEvents = (ret == -FI_EAGAIN) ? 0 : ret; - - for (int i = 0; i < numEvents; i++) { - if ((cqes[i].flags & FI_RECV) != 0) { - // - // This event is for an inbound AM request. Handle it. - // - amRequest_t* req = (amRequest_t*) cqes[i].buf; - DBG_PRINTF(DBG_AM_BUF, - "CQ rx AM req @ buffer offset %zd, sz %zd, seqId %s", - (char*) req - (char*) ofi_iov_reqs[ofi_msg_i].iov_base, - cqes[i].len, am_seqIdStr(req)); - DBG_PRINTF(DBG_AM | DBG_AM_RECV, - "rx AM req: %s", - am_reqStr(chpl_nodeID, req, cqes[i].len)); - (void) handleAmReq(req); + chpl_bool post = false; + do { + CHK_TRUE((ret = fi_cq_read(ofi_rxCQ, cqes, maxEvents)) > 0 + || ret == -FI_EAGAIN + || ret == -FI_EAVAIL); + if (ret == -FI_EAVAIL) { + reportCQError(ofi_rxCQ); } - if ((cqes[i].flags & FI_MULTI_RECV) != 0) { - // - // Multi-receive buffer filled; post the other one. - // - ofi_msg_i = 1 - ofi_msg_i; - OFI_CHK(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[ofi_msg_i], FI_MULTI_RECV)); - DBG_PRINTF(DBG_AM_BUF, - "re-post fi_recvmsg(AMLZs %p, len %#zx)", - ofi_msg_reqs[ofi_msg_i].msg_iov->iov_base, - ofi_msg_reqs[ofi_msg_i].msg_iov->iov_len); + + // + // Post the other buffer if we were unable to do it when we received + // FI_MULTI_RECV below. + // + if (post) { + DBG_PRINTF(DBG_AM_BUF, "post pending\n"); + if (postBuffer(1-ofi_msg_i) == true) { + post = false; + } } - CHK_TRUE((cqes[i].flags & ~(FI_MSG | FI_RECV | FI_MULTI_RECV)) == 0); - } + const size_t numEvents = (ret == -FI_EAGAIN) ? 0 : ret; + + for (int i = 0; i < numEvents; i++) { + if ((cqes[i].flags & FI_RECV) != 0) { + // + // This event is for an inbound AM request. Handle it. + // + amRequest_t* req = (amRequest_t*) cqes[i].buf; + DBG_PRINTF(DBG_AM_BUF, + "CQ rx AM req @ buffer offset %zd, sz %zd, seqId %s %s", + (char*) req - (char*) ofi_iov_reqs[ofi_msg_i].iov_base, + cqes[i].len, am_seqIdStr(req), + (cqes[i].flags & FI_MULTI_RECV) ? "FI_MULTI_RECV" : ""); + DBG_PRINTF(DBG_AM | DBG_AM_RECV, + "rx AM req: %s", + am_reqStr(chpl_nodeID, req, cqes[i].len)); + (void) handleAmReq(req); + } + if ((cqes[i].flags & FI_MULTI_RECV) != 0) { + // + // Multi-receive buffer filled; libfabric has switched to the other + // buffer. Repost this one. + // + + if (postBuffer(ofi_msg_i) == false) { + // + // Buffer was not posted due to FI_EAGAIN. Go around the outer loop + // again which will call fi_cq_read to progress the endpoint and + // then try reposting the buffer. + // + post = true; + } + ofi_msg_i = 1-ofi_msg_i; + } + CHK_TRUE((cqes[i].flags & ~(FI_MSG | FI_RECV | FI_MULTI_RECV)) == 0); + } + } while(post); } static