Skip to content

Commit

Permalink
Limit the number of epoll instances per active fd
Browse files Browse the repository at this point in the history
XCM uses shared, always active, event fds to signal to the application
that a socket is active when, for example, a message is buffered or
the connection is closed. Such fds are added to the respective
socket-internal epoll instance, when needed.

The Linux kernel limits the number of epoll instances a file object
can be registered in. See source/fs/eventpoll.c in the kernel for
details.

Before this change, XCM allowed the registration of a single
always-active event fd in an arbitrary number of epoll instances. In
certain situations (e.g, when all connections were lost), where a
process had many (500+) TLS and/or TCP transport connections open, an
epoll_ctl() call which is not supposed to fail with EINVAL did,
triggering an abort.

With this change, XCM grows the number of always-active event fds as
needed, limiting the maximum number of connections using the same
event fd to 100.

Signed-off-by: Mattias Rönnblom <[email protected]>
  • Loading branch information
m-ronnblom committed Sep 9, 2021
1 parent 262e21d commit 65d3fb9
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 25 deletions.
96 changes: 75 additions & 21 deletions libxcm/active_fd.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,102 @@

#include <pthread.h>
#include <sys/eventfd.h>
#include <sys/queue.h>

/*
* The Linux kernel limits the number of epoll instances fds the
* always-active fd can be registered in. See source/fs/eventpoll.c in
* for details. Thus, to support a large number of connections, many
* always-active fds must be allowed.
*/

#define MAX_USERS_PER_FD (100)

struct active_fd
{
int fd;
int cnt;
LIST_ENTRY(active_fd) elem;
};

LIST_HEAD(active_fd_list, active_fd);

/* socket id, unique on a per-process basis */
static pthread_mutex_t active_fd_lock = PTHREAD_MUTEX_INITIALIZER;
static int active_fd = -1;
static int active_fd_ref_cnt = 0;
static struct active_fd_list active_fds = LIST_HEAD_INITIALIZER(&active_fds);

static struct active_fd *fd_retrieve(void)
{
struct active_fd *active_fd;
LIST_FOREACH(active_fd, &active_fds, elem)
if (active_fd->cnt < MAX_USERS_PER_FD) {
active_fd->cnt++;
return active_fd;
}
return NULL;
}


static struct active_fd *fd_create(void)
{
int fd = eventfd(1, EFD_NONBLOCK);
if (fd < 0) {
LOG_ACTIVE_FD_FAILED(errno);
return NULL;
}

struct active_fd *active_fd = ut_malloc(sizeof(struct active_fd));

*active_fd = (struct active_fd) {
.fd = fd,
.cnt = 1
};

LIST_INSERT_HEAD(&active_fds, active_fd, elem);

LOG_ACTIVE_FD_CREATED(active_fd->fd);

return active_fd;
}

int active_fd_get(void)
{
ut_mutex_lock(&active_fd_lock);

ut_assert(active_fd_ref_cnt >= 0);
struct active_fd *active_fd = fd_retrieve();

if (active_fd_ref_cnt == 0) {
active_fd = eventfd(1, EFD_NONBLOCK);
if (active_fd < 0) {
LOG_ACTIVE_FD_FAILED(errno);
goto out;
}
LOG_ACTIVE_FD_CREATED(active_fd);
}
if (active_fd != NULL)
goto out;

active_fd_ref_cnt++;
active_fd = fd_create();

out:
ut_mutex_unlock(&active_fd_lock);

return active_fd;
return active_fd != NULL ? active_fd->fd : -1;
}

void active_fd_put(void)
void active_fd_put(int fd)
{
ut_mutex_lock(&active_fd_lock);

ut_assert(active_fd_ref_cnt >= 0);
struct active_fd *active_fd;
LIST_FOREACH(active_fd, &active_fds, elem)
if (active_fd->fd == fd) {
active_fd->cnt--;

active_fd_ref_cnt--;
if (active_fd->cnt == 0) {
LIST_REMOVE(active_fd, elem);
UT_PROTECT_ERRNO(close(active_fd->fd));
LOG_ACTIVE_FD_CLOSED(active_fd->fd);
ut_free(active_fd);
}

if (active_fd_ref_cnt == 0) {
UT_PROTECT_ERRNO(close(active_fd));
LOG_ACTIVE_FD_CLOSED(active_fd);
}
goto out;
}

ut_assert(0);

out:
ut_mutex_unlock(&active_fd_lock);
}

2 changes: 1 addition & 1 deletion libxcm/active_fd.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
#define ACTIVE_FD

int active_fd_get(void);
void active_fd_put(void);
void active_fd_put(int fd);

#endif
3 changes: 2 additions & 1 deletion libxcm/xcm_tp_sctp.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,9 @@ static void deinit(struct xcm_socket *s)
{
if (s->type == xcm_socket_type_conn) {
struct sctp_socket *ss = TOSCTP(s);
int active_fd = ss->conn.active_fd_reg.fd;
epoll_reg_reset(&ss->conn.active_fd_reg);
active_fd_put();
active_fd_put(active_fd);
xcm_dns_query_free(TOSCTP(s)->conn.query);
}
}
Expand Down
3 changes: 2 additions & 1 deletion libxcm/xcm_tp_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,9 @@ static void deinit(struct xcm_socket *s)
{
if (s->type == xcm_socket_type_conn) {
struct tcp_socket *ts = TOTCP(s);
int active_fd = ts->conn.active_fd_reg.fd;
epoll_reg_reset(&ts->conn.active_fd_reg);
active_fd_put();
active_fd_put(active_fd);
xcm_dns_query_free(ts->conn.query);
mbuf_deinit(&ts->conn.send_mbuf);
mbuf_deinit(&ts->conn.receive_mbuf);
Expand Down
3 changes: 2 additions & 1 deletion libxcm/xcm_tp_tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,9 @@ static int conn_deinit(struct xcm_socket *s, bool owner)
ts->conn.ssl = NULL;
}

int active_fd = ts->conn.active_fd_reg.fd;
epoll_reg_reset(&ts->conn.active_fd_reg);
active_fd_put();
active_fd_put(active_fd);
xcm_dns_query_free(ts->conn.query);
mbuf_deinit(&ts->conn.send_mbuf);
mbuf_deinit(&ts->conn.receive_mbuf);
Expand Down

0 comments on commit 65d3fb9

Please sign in to comment.