Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix buffer posting and FI_EAGAIN #26617

Merged
merged 1 commit into from
Feb 12, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 99 additions & 37 deletions runtime/src/comm/ofi/comm-ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -3165,6 +3165,25 @@ void init_ofiForAms(void) {
// set up two of these and swap back and forth between them, to hedge
// against receiving "buffer filled and released" events out of order
// with respect to the messages stored within them.

// There are two receive buffers and we alternate between them. If there
// were only one buffer then there might be a window during which there is
// no available buffer space because we are processing the last message in
// the buffer while new messages are still being sent. Instead, we
// double-buffer. When the current buffer has been consumed up to a
// threshold (defined by FI_OPT_MIN_MULTI_RECV above), libfabric will tell
// us the buffer is (almost) full by setting the FI_MULTI_RECV flag in a
// completion event and switch to the other buffer. In response we re-post
// the buffer.
//
// One issue is knowing when there are no lingering dependencies on a buffer
// so we can repost it, since doing so will cause it to be filled with new
// messages. There are two types of active messages in the buffer; some are
// handled synchronously by the active message handler itself, and some are
// handled asynchronously by calling chpl_task_startMovedTask to create a
// new task to execute the active message. chpl_task_startMovedTask copies
// its arguments, so in either case there are no lingering dependencies on
// the message buffer.
//
CHPL_CALLOC_SZ(amLZs[0], 1, amLZSize);
CHPL_CALLOC_SZ(amLZs[1], 1, amLZSize);
Expand All @@ -3190,16 +3209,16 @@ void init_ofiForAms(void) {
ofi_rxBuffer = ofi_msg_reqs[0].msg_iov->iov_base;
ofi_rxEnd = (void *) ((char *) ofi_rxBuffer +
ofi_msg_reqs[0].msg_iov->iov_len);

for (int i = 0; i < 2; i++) {
memset(ofi_msg_reqs[i].msg_iov->iov_base, '\0',
ofi_msg_reqs[i].msg_iov->iov_len);
OFI_CHK(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[i], FI_MULTI_RECV));
DBG_PRINTF(DBG_AM_BUF,
"pre-post fi_recvmsg(AMLZs %p, len %#zx)",
"post fi_recvmsg(AMLZs %p, len %#zx)",
ofi_msg_reqs[i].msg_iov->iov_base,
ofi_msg_reqs[i].msg_iov->iov_len);
}

init_amHandling();
}

Expand Down Expand Up @@ -5117,6 +5136,30 @@ void processRxAmReqCntr(void) {
ofi_rxCount += todo;
}

//
// Post a receive buffer.
//
static
chpl_bool postBuffer(int i) {
chpl_bool posted = true;
int rc;
OFI_CHK_2(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[i], FI_MULTI_RECV), rc,
-FI_EAGAIN);
if (rc == -FI_EAGAIN) {
DBG_PRINTF(DBG_AM_BUF,
"(re)post fi_recvmsg(AMLZs %p, len %#zx) returned EAGAIN",
ofi_msg_reqs[i].msg_iov->iov_base,
ofi_msg_reqs[i].msg_iov->iov_len);
posted = false;
} else {
DBG_PRINTF(DBG_AM_BUF,
"(re)post fi_recvmsg(AMLZs %p, len %#zx) succeeded",
ofi_msg_reqs[i].msg_iov->iov_base,
ofi_msg_reqs[i].msg_iov->iov_len);
}
return posted;
}

static
void processRxAmReqCQ(void) {
//
Expand All @@ -5125,44 +5168,63 @@ void processRxAmReqCQ(void) {
struct fi_cq_data_entry cqes[5];
const size_t maxEvents = sizeof(cqes) / sizeof(cqes[0]);
ssize_t ret;
CHK_TRUE((ret = fi_cq_read(ofi_rxCQ, cqes, maxEvents)) > 0
|| ret == -FI_EAGAIN
|| ret == -FI_EAVAIL);
if (ret == -FI_EAVAIL) {
reportCQError(ofi_rxCQ);
}

const size_t numEvents = (ret == -FI_EAGAIN) ? 0 : ret;

for (int i = 0; i < numEvents; i++) {
if ((cqes[i].flags & FI_RECV) != 0) {
//
// This event is for an inbound AM request. Handle it.
//
amRequest_t* req = (amRequest_t*) cqes[i].buf;
DBG_PRINTF(DBG_AM_BUF,
"CQ rx AM req @ buffer offset %zd, sz %zd, seqId %s",
(char*) req - (char*) ofi_iov_reqs[ofi_msg_i].iov_base,
cqes[i].len, am_seqIdStr(req));
DBG_PRINTF(DBG_AM | DBG_AM_RECV,
"rx AM req: %s",
am_reqStr(chpl_nodeID, req, cqes[i].len));
(void) handleAmReq(req);
chpl_bool post = false;
do {
CHK_TRUE((ret = fi_cq_read(ofi_rxCQ, cqes, maxEvents)) > 0
|| ret == -FI_EAGAIN
|| ret == -FI_EAVAIL);
if (ret == -FI_EAVAIL) {
reportCQError(ofi_rxCQ);
}
if ((cqes[i].flags & FI_MULTI_RECV) != 0) {
//
// Multi-receive buffer filled; post the other one.
//
ofi_msg_i = 1 - ofi_msg_i;
OFI_CHK(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[ofi_msg_i], FI_MULTI_RECV));
DBG_PRINTF(DBG_AM_BUF,
"re-post fi_recvmsg(AMLZs %p, len %#zx)",
ofi_msg_reqs[ofi_msg_i].msg_iov->iov_base,
ofi_msg_reqs[ofi_msg_i].msg_iov->iov_len);

//
// Post the other buffer if we were unable to do it when we received
// FI_MULTI_RECV below.
//
if (post) {
DBG_PRINTF(DBG_AM_BUF, "post pending\n");
if (postBuffer(1-ofi_msg_i) == true) {
post = false;
}
}

CHK_TRUE((cqes[i].flags & ~(FI_MSG | FI_RECV | FI_MULTI_RECV)) == 0);
}
const size_t numEvents = (ret == -FI_EAGAIN) ? 0 : ret;

for (int i = 0; i < numEvents; i++) {
if ((cqes[i].flags & FI_RECV) != 0) {
//
// This event is for an inbound AM request. Handle it.
//
amRequest_t* req = (amRequest_t*) cqes[i].buf;
DBG_PRINTF(DBG_AM_BUF,
"CQ rx AM req @ buffer offset %zd, sz %zd, seqId %s %s",
(char*) req - (char*) ofi_iov_reqs[ofi_msg_i].iov_base,
cqes[i].len, am_seqIdStr(req),
(cqes[i].flags & FI_MULTI_RECV) ? "FI_MULTI_RECV" : "");
DBG_PRINTF(DBG_AM | DBG_AM_RECV,
"rx AM req: %s",
am_reqStr(chpl_nodeID, req, cqes[i].len));
(void) handleAmReq(req);
}
if ((cqes[i].flags & FI_MULTI_RECV) != 0) {
//
// Multi-receive buffer filled; libfabric has switched to the other
// buffer. Repost this one.
//

if (postBuffer(ofi_msg_i) == false) {
//
// Buffer was not posted due to FI_EAGAIN. Go around the outer loop
// again which will call fi_cq_read to progress the endpoint and
// then try reposting the buffer.
//
post = true;
}
ofi_msg_i = 1-ofi_msg_i;
}
CHK_TRUE((cqes[i].flags & ~(FI_MSG | FI_RECV | FI_MULTI_RECV)) == 0);
}
} while(post);
}

static
Expand Down