Skip to content

Commit

Permalink
prov/cxi: Fix CQ wait FD logic
Browse files Browse the repository at this point in the history
Implement cxi managed internal wait FDs. EP bound
to a CQ(s) with wait_obj  will allocate their own internal
CXI wait object and add sysfs_notify FD to the CQ. Fix CQ
trywait logic to correctly enable h/w EQ interrupts and
include control EQ which may require progress be initiated.

NETCASSINI-6749

Signed-off-by: Steve Welch <[email protected]>
  • Loading branch information
swelch authored and iziemba committed Jan 8, 2025
1 parent d4ee2cf commit 1eb88f6
Show file tree
Hide file tree
Showing 11 changed files with 646 additions and 266 deletions.
4 changes: 3 additions & 1 deletion man/fi_cxi.7.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ The CXI provider supports FI_THREAD_SAFE and FI_THREAD_DOMAIN threading models.

The CXI provider supports FI_WAIT_FD and FI_WAIT_POLLFD CQ wait object types.
FI_WAIT_UNSPEC will default to FI_WAIT_FD. However FI_WAIT_NONE should achieve
the lowest latency and reduce interrupt overhead.
the lowest latency and reduce interrupt overhead. NOTE: A process may return
from a epoll_wait/poll when provider progress is required and a CQ event may
not be available.

## Additional Features

Expand Down
19 changes: 13 additions & 6 deletions prov/cxi/include/cxip.h
Original file line number Diff line number Diff line change
Expand Up @@ -1420,8 +1420,8 @@ struct cxip_cq {
*/
struct ofi_genlock ep_list_lock;

/* Internal CXI wait object allocated only if required. */
struct cxil_wait_obj *priv_wait;
/* CXI CQ wait object EPs are maintained in epoll FD */
int ep_fd;

/* CXI specific fields. */
struct cxip_domain *domain;
Expand Down Expand Up @@ -2428,6 +2428,10 @@ struct cxip_ep_obj {
struct cxip_txc *txc;
struct cxip_rxc *rxc;

/* Internal support for CQ wait object */
struct cxil_wait_obj *priv_wait;
int wait_fd;

/* ASIC version associated with EP/Domain */
enum cassini_version asic_ver;

Expand Down Expand Up @@ -3148,7 +3152,8 @@ static inline bool cxip_cmdq_match(struct cxip_cmdq *cmdq, uint16_t vni,
}

int cxip_evtq_init(struct cxip_evtq *evtq, struct cxip_cq *cq,
size_t num_events, size_t num_fc_events);
size_t num_events, size_t num_fc_events,
struct cxil_wait_obj *priv_wait);
void cxip_evtq_fini(struct cxip_evtq *eq);

int cxip_domain(struct fid_fabric *fabric, struct fi_info *info,
Expand Down Expand Up @@ -3228,16 +3233,19 @@ int cxip_cq_req_complete_addr(struct cxip_req *req, fi_addr_t src);
int cxip_cq_req_error(struct cxip_req *req, size_t olen,
int err, int prov_errno, void *err_data,
size_t err_data_size, fi_addr_t src_addr);
int cxip_cq_add_wait_fd(struct cxip_cq *cq, int wait_fd, int events);
void cxip_cq_del_wait_fd(struct cxip_cq *cq, int wait_fd);

int proverr2errno(int err);
struct cxip_req *cxip_evtq_req_alloc(struct cxip_evtq *evtq,
int remap, void *req_ctx);
void cxip_evtq_req_free(struct cxip_req *req);
void cxip_evtq_progress(struct cxip_evtq *evtq);

void cxip_ep_progress(struct fid *fid);
int cxip_ep_peek(struct fid *fid);
void cxip_ep_flush_trig_reqs(struct cxip_ep_obj *ep_obj);

int cxip_cq_trywait(struct cxip_cq *cq);
void cxip_cq_progress(struct cxip_cq *cq);
void cxip_util_cq_progress(struct util_cq *util_cq);
int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
Expand Down Expand Up @@ -3266,8 +3274,7 @@ void cxip_ep_tgt_ctrl_progress(struct cxip_ep_obj *ep_obj);
void cxip_ep_tgt_ctrl_progress_locked(struct cxip_ep_obj *ep_obj);
int cxip_ep_ctrl_init(struct cxip_ep_obj *ep_obj);
void cxip_ep_ctrl_fini(struct cxip_ep_obj *ep_obj);
void cxip_ep_ctrl_del_wait(struct cxip_ep_obj *ep_obj);
int cxip_ep_ctrl_trywait(void *arg);
int cxip_ep_trywait(struct cxip_ep_obj *ep_obj, struct cxip_cq *cq);

int cxip_av_set(struct fid_av *av, struct fi_av_set_attr *attr,
struct fid_av_set **av_set_fid, void * context);
Expand Down
223 changes: 151 additions & 72 deletions prov/cxi/src/cxip_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,37 +184,33 @@ static const char *cxip_cq_strerror(struct fid_cq *cq, int prov_errno,
return errmsg;
}

/*
* cxip_cq_trywait - Return success if able to block waiting for CQ events.
*/
static int cxip_cq_trywait(void *arg)
int cxip_cq_trywait(struct cxip_cq *cq)
{
struct cxip_cq *cq = (struct cxip_cq *)arg;
struct fid_list_entry *fid_entry;
struct dlist_entry *item;
struct cxip_ep *ep;

assert(cq->util_cq.wait);

if (!cq->priv_wait) {
if (cq->ep_fd < 0) {
CXIP_WARN("No CXI wait object\n");
return -FI_EINVAL;
}

ofi_genlock_lock(&cq->util_cq.cq_lock);
if (!ofi_cirque_isempty(cq->util_cq.cirq)) {
ofi_genlock_unlock(&cq->util_cq.cq_lock);
return -FI_EAGAIN;
}
ofi_genlock_unlock(&cq->util_cq.cq_lock);

ofi_genlock_lock(&cq->ep_list_lock);
dlist_foreach(&cq->util_cq.ep_list, item) {
fid_entry = container_of(item, struct fid_list_entry, entry);
if (cxip_ep_peek(fid_entry->fid)) {
ofi_genlock_unlock(&cq->ep_list_lock);
ep = container_of(fid_entry->fid, struct cxip_ep, ep.fid);

return -FI_EAGAIN;
}
}
if (!ep->ep_obj->priv_wait)
continue;

/* Clear wait, and check for any events */
cxil_clear_wait_obj(cq->priv_wait);
dlist_foreach(&cq->util_cq.ep_list, item) {
fid_entry = container_of(item, struct fid_list_entry, entry);
if (cxip_ep_peek(fid_entry->fid)) {
if (cxip_ep_trywait(ep->ep_obj, cq)) {
ofi_genlock_unlock(&cq->ep_list_lock);

return -FI_EAGAIN;
Expand Down Expand Up @@ -256,21 +252,12 @@ static int cxip_cq_close(struct fid *fid)
{
struct cxip_cq *cq = container_of(fid, struct cxip_cq,
util_cq.cq_fid.fid);
int ret;

if (ofi_atomic_get32(&cq->util_cq.ref))
return -FI_EBUSY;

if (cq->priv_wait) {
ret = ofi_wait_del_fd(cq->util_cq.wait,
cxil_get_wait_obj_fd(cq->priv_wait));
if (ret)
CXIP_WARN("Wait FD delete error: %d\n", ret);

ret = cxil_destroy_wait_obj(cq->priv_wait);
if (ret)
CXIP_WARN("Release CXI wait object failed: %d\n", ret);
}
if (cq->ep_fd >= 0)
close(cq->ep_fd);

ofi_cq_cleanup(&cq->util_cq);
ofi_genlock_destroy(&cq->ep_list_lock);
Expand All @@ -281,14 +268,116 @@ static int cxip_cq_close(struct fid *fid)
return 0;
}

static int cxip_cq_signal(struct fid_cq *cq_fid)
{
return -FI_ENOSYS;
}

static int cxip_cq_control(fid_t fid, int command, void *arg)
{
struct cxip_cq *cq = container_of(fid, struct cxip_cq, util_cq.cq_fid);
struct fi_wait_pollfd *pollfd;
int ret;

switch (command) {
case FI_GETWAIT:
if (cq->ep_fd < 0) {
ret = -FI_ENODATA;
break;
}
if (cq->attr.wait_obj == FI_WAIT_FD) {
*(int *) arg = cq->ep_fd;
return FI_SUCCESS;
}

pollfd = arg;
if (pollfd->nfds >= 1) {
pollfd->fd[0].fd = cq->ep_fd;
pollfd->fd[0].events = POLLIN;
pollfd->nfds = 1;

ret = FI_SUCCESS;
} else {
ret = -FI_ETOOSMALL;
}
break;
case FI_GETWAITOBJ:
*(enum fi_wait_obj *) arg = cq->attr.wait_obj;
ret = FI_SUCCESS;
break;
default:
ret = -FI_ENOSYS;
break;
}

return ret;
}

static ssize_t cxip_cq_sreadfrom(struct fid_cq *cq_fid, void *buf,
size_t count, fi_addr_t *src_addr,
const void *cond, int timeout)
{
struct cxip_cq *cq = container_of(cq_fid, struct cxip_cq,
util_cq.cq_fid);
struct epoll_event ev;
uint64_t endtime;
ssize_t ret;

if (!cq->attr.wait_obj)
return -FI_EINVAL;

endtime = ofi_timeout_time(timeout);

do {
ret = fi_cq_readfrom(cq_fid, buf, count, src_addr);
if (ret != -FI_EAGAIN)
break;

if (ofi_adjust_timeout(endtime, &timeout))
return -FI_EAGAIN;

ret = cxip_cq_trywait(cq);
if (ret == -FI_EAGAIN) {
ret = 0;
continue;
}
assert(ret == FI_SUCCESS);

memset(&ev, 0, sizeof(ev));
ret = epoll_wait(cq->ep_fd, &ev, 1, timeout);
if (ret > 0)
ret = 0;

} while (!ret);

return ret == -FI_ETIMEDOUT ? -FI_EAGAIN : ret;
}

static ssize_t cxip_cq_sread(struct fid_cq *cq_fid, void *buf, size_t count,
const void *cond, int timeout)
{
return cxip_cq_sreadfrom(cq_fid, buf, count, NULL, cond, timeout);
}

static struct fi_ops cxip_cq_fi_ops = {
.size = sizeof(struct fi_ops),
.close = cxip_cq_close,
.bind = fi_no_bind,
.control = ofi_cq_control,
.control = cxip_cq_control,
.ops_open = fi_no_ops_open,
};

static struct fi_ops_cq cxip_cq_ops = {
.size = sizeof(struct fi_ops_cq),
.read = ofi_cq_read,
.readfrom = ofi_cq_readfrom,
.readerr = ofi_cq_readerr,
.sread = cxip_cq_sread,
.sreadfrom = cxip_cq_sreadfrom,
.signal = cxip_cq_signal,
.strerror = ofi_cq_strerror,
};

static struct fi_cq_attr cxip_cq_def_attr = {
.flags = 0,
.format = FI_CQ_FORMAT_CONTEXT,
Expand Down Expand Up @@ -348,50 +437,35 @@ static int cxip_cq_verify_attr(struct fi_cq_attr *attr)
return FI_SUCCESS;
}

/*
* cxip_cq_alloc_priv_wait - Allocate an internal wait channel for the CQ.
*/
static int cxip_cq_alloc_priv_wait(struct cxip_cq *cq)
/* EP adds wait FD to the CQ epoll FD */
int cxip_cq_add_wait_fd(struct cxip_cq *cq, int wait_fd, int events)
{
struct epoll_event ev = {
.events = events,
};
int ret;
int wait_fd;

assert(cq->domain);

/* Not required or already created */
if (!cq->util_cq.wait || cq->priv_wait)
return FI_SUCCESS;

ret = cxil_alloc_wait_obj(cq->domain->lni->lni, &cq->priv_wait);
if (ret) {
CXIP_WARN("Allocation of internal wait object failed %d\n",
ret);
return ret;
}

wait_fd = cxil_get_wait_obj_fd(cq->priv_wait);
ret = fi_fd_nonblock(wait_fd);
if (ret) {
CXIP_WARN("Unable to set CQ wait non-blocking mode: %d\n", ret);
goto destroy_wait;
}
ret = epoll_ctl(cq->ep_fd, EPOLL_CTL_ADD, wait_fd, &ev);
if (ret < 0) {
ret = errno;
CXIP_WARN("EP wait FD add to CQ failed %d\n", ret);

ret = ofi_wait_add_fd(cq->util_cq.wait, wait_fd, POLLIN,
cxip_cq_trywait, cq, &cq->util_cq.cq_fid.fid);
if (ret) {
CXIP_WARN("Add FD of internal wait object failed: %d\n", ret);
goto destroy_wait;
return -FI_EINVAL;
}

CXIP_DBG("Add CQ private wait object, CQ intr FD: %d\n", wait_fd);

return FI_SUCCESS;
}

destroy_wait:
cxil_destroy_wait_obj(cq->priv_wait);
cq->priv_wait = NULL;
/* EP deletes wait FD from the CQ epoll FD */
void cxip_cq_del_wait_fd(struct cxip_cq *cq, int wait_fd)
{
int ret;

return ret;
ret = epoll_ctl(cq->ep_fd, EPOLL_CTL_DEL, wait_fd, NULL);
if (ret < 0) {
ret = errno;
CXIP_WARN("EP wait FD delete from CQ failed %d\n", ret);
}
}

/*
Expand All @@ -402,6 +476,7 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
{
struct cxip_domain *cxi_dom;
struct cxip_cq *cxi_cq;
struct fi_cq_attr temp_attr;
int ret;

if (!domain || !cq)
Expand All @@ -425,7 +500,10 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
cxi_cq->attr = *attr;
}

ret = ofi_cq_init(&cxip_prov, domain, &cxi_cq->attr, &cxi_cq->util_cq,
/* CXI does not use common code internal wait object */
temp_attr = cxi_cq->attr;
temp_attr.wait_obj = FI_WAIT_NONE;
ret = ofi_cq_init(&cxip_prov, domain, &temp_attr, &cxi_cq->util_cq,
cxip_util_cq_progress, context);
if (ret != FI_SUCCESS) {
CXIP_WARN("ofi_cq_init() failed: %d\n", ret);
Expand All @@ -434,9 +512,10 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,

cxi_cq->util_cq.cq_fid.ops->strerror = &cxip_cq_strerror;
cxi_cq->util_cq.cq_fid.fid.ops = &cxip_cq_fi_ops;

cxi_cq->util_cq.cq_fid.ops = &cxip_cq_ops;
cxi_cq->domain = cxi_dom;
cxi_cq->ack_batch_size = cxip_env.eq_ack_batch_size;
cxi_cq->ep_fd = -1;

/* Optimize locking when possible */
if (cxi_dom->util_domain.threading == FI_THREAD_DOMAIN ||
Expand All @@ -445,11 +524,11 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
else
ofi_genlock_init(&cxi_cq->ep_list_lock, OFI_LOCK_SPINLOCK);

if (cxi_cq->util_cq.wait) {
ret = cxip_cq_alloc_priv_wait(cxi_cq);
if (ret != FI_SUCCESS) {
CXIP_WARN("Unable to allocate CXI wait obj: %d\n",
ret);
if (cxi_cq->attr.wait_obj) {
cxi_cq->ep_fd = epoll_create1(0);
if (cxi_cq->ep_fd < 0) {
CXIP_WARN("Unable to open epoll FD: %s\n",
strerror(errno));
goto err_wait_alloc;
}
}
Expand Down
Loading

0 comments on commit 1eb88f6

Please sign in to comment.