diff --git a/libxcm/active_fd.c b/libxcm/active_fd.c index 3b5d30eb8..2ea514c61 100644 --- a/libxcm/active_fd.c +++ b/libxcm/active_fd.c @@ -5,48 +5,102 @@ #include #include +#include + +/* + * The Linux kernel limits the number of epoll instances fds the + * always-active fd can be registered in. See source/fs/eventpoll.c in + * for details. Thus, to support a large number of connections, many + * always-active fds must be allowed. + */ + +#define MAX_USERS_PER_FD (100) + +struct active_fd +{ + int fd; + int cnt; + LIST_ENTRY(active_fd) elem; +}; + +LIST_HEAD(active_fd_list, active_fd); -/* socket id, unique on a per-process basis */ static pthread_mutex_t active_fd_lock = PTHREAD_MUTEX_INITIALIZER; -static int active_fd = -1; -static int active_fd_ref_cnt = 0; +static struct active_fd_list active_fds = LIST_HEAD_INITIALIZER(&active_fds); + +static struct active_fd *fd_retrieve(void) +{ + struct active_fd *active_fd; + LIST_FOREACH(active_fd, &active_fds, elem) + if (active_fd->cnt < MAX_USERS_PER_FD) { + active_fd->cnt++; + return active_fd; + } + return NULL; +} + + +static struct active_fd *fd_create(void) +{ + int fd = eventfd(1, EFD_NONBLOCK); + if (fd < 0) { + LOG_ACTIVE_FD_FAILED(errno); + return NULL; + } + + struct active_fd *active_fd = ut_malloc(sizeof(struct active_fd)); + + *active_fd = (struct active_fd) { + .fd = fd, + .cnt = 1 + }; + + LIST_INSERT_HEAD(&active_fds, active_fd, elem); + + LOG_ACTIVE_FD_CREATED(active_fd->fd); + + return active_fd; +} int active_fd_get(void) { ut_mutex_lock(&active_fd_lock); - ut_assert(active_fd_ref_cnt >= 0); + struct active_fd *active_fd = fd_retrieve(); - if (active_fd_ref_cnt == 0) { - active_fd = eventfd(1, EFD_NONBLOCK); - if (active_fd < 0) { - LOG_ACTIVE_FD_FAILED(errno); - goto out; - } - LOG_ACTIVE_FD_CREATED(active_fd); - } + if (active_fd != NULL) + goto out; - active_fd_ref_cnt++; + active_fd = fd_create(); out: ut_mutex_unlock(&active_fd_lock); - return active_fd; + return active_fd != NULL ? active_fd->fd : -1; } -void active_fd_put(void) +void active_fd_put(int fd) { ut_mutex_lock(&active_fd_lock); - ut_assert(active_fd_ref_cnt >= 0); + struct active_fd *active_fd; + LIST_FOREACH(active_fd, &active_fds, elem) + if (active_fd->fd == fd) { + active_fd->cnt--; - active_fd_ref_cnt--; + if (active_fd->cnt == 0) { + LIST_REMOVE(active_fd, elem); + UT_PROTECT_ERRNO(close(active_fd->fd)); + LOG_ACTIVE_FD_CLOSED(active_fd->fd); + ut_free(active_fd); + } - if (active_fd_ref_cnt == 0) { - UT_PROTECT_ERRNO(close(active_fd)); - LOG_ACTIVE_FD_CLOSED(active_fd); - } + goto out; + } + + ut_assert(0); +out: ut_mutex_unlock(&active_fd_lock); } diff --git a/libxcm/active_fd.h b/libxcm/active_fd.h index 7c7f9b620..ec6db2937 100644 --- a/libxcm/active_fd.h +++ b/libxcm/active_fd.h @@ -2,6 +2,6 @@ #define ACTIVE_FD int active_fd_get(void); -void active_fd_put(void); +void active_fd_put(int fd); #endif diff --git a/libxcm/xcm_tp_sctp.c b/libxcm/xcm_tp_sctp.c index a95c88c9b..50d04589a 100644 --- a/libxcm/xcm_tp_sctp.c +++ b/libxcm/xcm_tp_sctp.c @@ -198,8 +198,9 @@ static void deinit(struct xcm_socket *s) { if (s->type == xcm_socket_type_conn) { struct sctp_socket *ss = TOSCTP(s); + int active_fd = ss->conn.active_fd_reg.fd; epoll_reg_reset(&ss->conn.active_fd_reg); - active_fd_put(); + active_fd_put(active_fd); xcm_dns_query_free(TOSCTP(s)->conn.query); } } diff --git a/libxcm/xcm_tp_tcp.c b/libxcm/xcm_tp_tcp.c index 7bb931afb..c291630e8 100644 --- a/libxcm/xcm_tp_tcp.c +++ b/libxcm/xcm_tp_tcp.c @@ -218,8 +218,9 @@ static void deinit(struct xcm_socket *s) { if (s->type == xcm_socket_type_conn) { struct tcp_socket *ts = TOTCP(s); + int active_fd = ts->conn.active_fd_reg.fd; epoll_reg_reset(&ts->conn.active_fd_reg); - active_fd_put(); + active_fd_put(active_fd); xcm_dns_query_free(ts->conn.query); mbuf_deinit(&ts->conn.send_mbuf); mbuf_deinit(&ts->conn.receive_mbuf); diff --git a/libxcm/xcm_tp_tls.c b/libxcm/xcm_tp_tls.c index 6b30ebd2b..f93c1a5fc 100644 --- a/libxcm/xcm_tp_tls.c +++ b/libxcm/xcm_tp_tls.c @@ -309,8 +309,9 @@ static int conn_deinit(struct xcm_socket *s, bool owner) ts->conn.ssl = NULL; } + int active_fd = ts->conn.active_fd_reg.fd; epoll_reg_reset(&ts->conn.active_fd_reg); - active_fd_put(); + active_fd_put(active_fd); xcm_dns_query_free(ts->conn.query); mbuf_deinit(&ts->conn.send_mbuf); mbuf_deinit(&ts->conn.receive_mbuf);