Skip to content

Commit

Permalink
Fixy fix
Browse files Browse the repository at this point in the history
  • Loading branch information
garazdawi committed Nov 28, 2024
1 parent eb2935e commit 7babe94
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 9 deletions.
1 change: 1 addition & 0 deletions erts/emulator/beam/erl_lock_check.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ static erts_lc_lock_order_t erts_lock_order[] = {
{"proc_btm", "pid"},
{"safe_hash", "address"},
{"pollset", "address"},
{"drv_ev_nif_select", NULL},
LEVEL,
{"erl_db_catree_route_node", "index"},
{"proc_status", "pid"},
Expand Down
15 changes: 15 additions & 0 deletions erts/emulator/beam/erl_port_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ typedef erts_atomic_t ErtsPortTaskHandle;
#if (defined(ERL_PROCESS_C__) \
|| defined(ERL_PORT_TASK_C__) \
|| defined(ERL_IO_C__) \
|| defined(ERL_CHECK_IO_C__) \
|| (ERTS_GLB_INLINE_INCL_FUNC_DEF \
&& defined(ERTS_DO_INCL_GLB_INLINE_FUNC_DEF)))
#define ERTS_INCLUDE_SCHEDULER_INTERNALS
Expand Down Expand Up @@ -138,6 +139,8 @@ ERTS_GLB_INLINE void erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched

#if defined(ERTS_INCLUDE_SCHEDULER_INTERNALS) && ERTS_POLL_USE_SCHEDULER_POLLING
ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void);
ERTS_GLB_INLINE void erts_port_task_inc_outstanding_io_tasks(void);
ERTS_GLB_INLINE void erts_port_task_dec_outstanding_io_tasks(void);
/* NOTE: Do not access any of the exported variables directly */
extern erts_atomic_t erts_port_task_outstanding_io_tasks;
#endif
Expand Down Expand Up @@ -226,6 +229,18 @@ erts_port_task_have_outstanding_io_tasks(void)
return (erts_atomic_read_acqb(&erts_port_task_outstanding_io_tasks)
!= 0);
}

ERTS_GLB_INLINE void
erts_port_task_inc_outstanding_io_tasks(void)
{
erts_atomic_inc_nob(&erts_port_task_outstanding_io_tasks);
}

ERTS_GLB_INLINE void
erts_port_task_dec_outstanding_io_tasks(void)
{
erts_atomic_dec_nob(&erts_port_task_outstanding_io_tasks);
}
#endif

#endif
Expand Down
93 changes: 85 additions & 8 deletions erts/emulator/sys/common/erl_check_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "sys.h"
#include "global.h"
#include "erl_port.h"
#include "erl_port_task.h"
#include "erl_check_io.h"
#include "erl_thr_progress.h"
#include "erl_bif_unique.h"
Expand All @@ -51,7 +52,7 @@
(STATE) ? (STATE)->fd : (ErtsSysFdType)-1, ##__VA_ARGS__, \
ev2str((STATE) ? (STATE)->events : ERTS_POLL_EV_NONE), \
ev2str((STATE) ? (STATE)->active_events : ERTS_POLL_EV_NONE), \
(STATE) ? flag2str((STATE)->flags) : ERTS_EV_FLAG_CLEAR)
(STATE) ? event_state_flag_to_str((STATE)->flags) : ERTS_EV_FLAG_CLEAR)
#define DEBUG_PRINT_MODE
#else
#define DEBUG_PRINT(...)
Expand All @@ -75,24 +76,27 @@ typedef enum {
} EventStateType;

typedef enum {
ERTS_EV_FLAG_CLEAR = 0,
ERTS_EV_FLAG_USED = 1, /* ERL_DRV_USE has been turned on */
ERTS_EV_FLAG_CLEAR = 0x0,
ERTS_EV_FLAG_USED = 0x1, /* ERL_DRV_USE has been turned on */
#if ERTS_POLL_USE_SCHEDULER_POLLING
ERTS_EV_FLAG_SCHEDULER = 2, /* Set when the fd has been migrated
ERTS_EV_FLAG_SCHEDULER = 0x2, /* Set when the fd has been migrated
to scheduler pollset */
ERTS_EV_FLAG_IN_SCHEDULER = 4, /* Set when the fd is currently in
ERTS_EV_FLAG_IN_SCHEDULER = 0x4, /* Set when the fd is currently in
scheduler pollset */
#else
ERTS_EV_FLAG_SCHEDULER = ERTS_EV_FLAG_CLEAR,
ERTS_EV_FLAG_IN_SCHEDULER = ERTS_EV_FLAG_CLEAR,
#endif
#ifdef ERTS_POLL_USE_FALLBACK
ERTS_EV_FLAG_FALLBACK = 8, /* Set when kernel poll rejected fd
ERTS_EV_FLAG_FALLBACK = 0x8, /* Set when kernel poll rejected fd
and it was put in the nkp version */
#else
ERTS_EV_FLAG_FALLBACK = ERTS_EV_FLAG_CLEAR,
#endif
ERTS_EV_FLAG_WANT_ERROR = 0x10, /* ERL_NIF_SELECT_ERROR turned on */
#if ERTS_POLL_USE_SCHEDULER_POLLING
ERTS_EV_FLAG_NIF_SELECT = 0x20, /* Set if a nif select message is in-flight */
#endif

/* Combinations, defined only to be displayed by debugger (gdb) */
ERTS_EV_FLAG_USED_FALLBACK = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_FALLBACK,
Expand Down Expand Up @@ -171,6 +175,10 @@ typedef struct {
without being deselected. */
} ErtsDrvEventState;

static erts_mtx_t nif_select_fds_mtx;
static ErtsSysFdType nif_select_fds[256];
static Uint16 nif_select_cnt = 0;

struct drv_ev_state_shared {

union {
Expand Down Expand Up @@ -697,16 +705,29 @@ static void prepare_select_msg(struct erts_nif_select_event* e,

Eterm
erts_io_handle_nif_select(ErtsMessage *sig) {
ErtsDrvEventState *state;
ErtsNifSelectSignalData* xsig = (ErtsNifSelectSignalData*) (char *) (&sig->hfrag.mem[0]
+ sig->hfrag.used_size);
ASSERT(sig->hfrag.used_size < sig->hfrag.alloc_size);

#if ERTS_POLL_USE_SCHEDULER_POLLING
if (sched_pollset) {
erts_mtx_lock(&nif_select_fds_mtx);
erts_port_task_dec_outstanding_io_tasks();
// erts_fprintf(stderr, "Enqueue %d\r\n", xsig->evt);
nif_select_fds[nif_select_cnt++] = xsig->evt;
erts_mtx_unlock(&nif_select_fds_mtx);
}
#endif
return xsig->message;
}

static ERTS_INLINE void send_select_msg(struct erts_nif_select_event* e)
{
if (!erts_proc_sig_send_nif_select(e->pid, e->mp)) {
erts_cleanup_messages(e->mp);
} else {
erts_port_task_inc_outstanding_io_tasks();
}
}

Expand Down Expand Up @@ -1192,11 +1213,26 @@ enif_select_x(ErlNifEnv* env,
old_events = state->events;

if (on) {
if (state->type == ERTS_EV_TYPE_NONE)
ctl_op = ERTS_POLL_OP_ADD;
else {
if (!(state->flags & ERTS_EV_FLAG_IN_SCHEDULER) &&
(ctl_events == ERTS_POLL_EV_IN)) {
if (!(state->flags & ERTS_EV_FLAG_FALLBACK) &&
(state->flags & ERTS_EV_FLAG_NIF_SELECT) &&
erts_sched_poll_enabled())
state->count++;
if (state->count > 10 && erts_sched_poll_enabled()) {
if (!(state->flags & ERTS_EV_FLAG_SCHEDULER))
ctl_op = ERTS_POLL_OP_ADD;
state->flags |= ERTS_EV_FLAG_IN_SCHEDULER|ERTS_EV_FLAG_SCHEDULER;
DEBUG_PRINT_FD("moving to scheduler ps", state);
}
}
}
ctl_events &= ~old_events;
state->events |= ctl_events;
state->active_events |= ctl_events;
if (state->type == ERTS_EV_TYPE_NONE)
ctl_op = ERTS_POLL_OP_ADD;
if (ctl_events & ERTS_POLL_EV_ERR)
state->flags |= ERTS_EV_FLAG_WANT_ERROR;
}
Expand Down Expand Up @@ -1727,6 +1763,33 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only
int poll_ret, i;
ERTS_MSACC_PUSH_AND_SET_STATE(ERTS_MSACC_STATE_CHECK_IO);

ErtsSchedulerData *esdp = erts_get_scheduler_data();
int is_normal_sched = !!esdp && esdp->type == ERTS_SCHED_NORMAL;

if (is_normal_sched) {
ErtsSysFdType fds[256];
Uint16 cnt;
erts_mtx_lock(&nif_select_fds_mtx);
memcpy(fds, nif_select_fds, nif_select_cnt * sizeof(ErtsSysFdType));
cnt = nif_select_cnt;
// if (nif_select_cnt)
// erts_fprintf(stderr, "%d: Need to clear %d fds\r\n", esdp->no, nif_select_cnt);
nif_select_cnt = 0;
erts_mtx_unlock(&nif_select_fds_mtx);
for (int i = 0; i < cnt; i++) {
ErtsDrvEventState *state;
ErtsSysFdType fd = fds[i];
erts_mtx_t *mtx = fd_mtx(fd);
erts_mtx_lock(mtx);
state = get_drv_ev_state(fd);
if (state) {
// erts_fprintf(stderr, "Clearing %d\r\n", fd);
state->flags &= ~ERTS_EV_FLAG_NIF_SELECT;
}
erts_mtx_unlock(mtx);
}
}

restart:

#ifdef ERTS_ENABLE_LOCK_CHECK
Expand Down Expand Up @@ -1773,6 +1836,9 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only
erl_errno_id(poll_ret), poll_ret);
erts_send_error_to_logger_nogl(dsbufp);
}
// if (is_normal_sched) {
// erts_fprintf(stderr, "%d: woke up\r\n", esdp->no);
// }
ERTS_MSACC_POP_STATE();
return;
}
Expand Down Expand Up @@ -1914,6 +1980,11 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only
check_fd_cleanup(state, &free_select, &free_nif);
}

#if ERTS_POLL_USE_SCHEDULER_POLLING
// erts_fprintf(stderr, "Setting flag on %d\r\n", fd);
state->flags |= ERTS_EV_FLAG_NIF_SELECT;
#endif

erts_mtx_unlock(fd_mtx(fd));

if (is_not_nil(in_ev.pid)) {
Expand Down Expand Up @@ -2002,6 +2073,10 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only
sizeof(ErtsPollResFd) * psi->pollres_len);
}

// if (is_normal_sched) {
// erts_fprintf(stderr, "%d: woke up\r\n", esdp->no);
// }

ERTS_MSACC_POP_STATE();
}

Expand Down Expand Up @@ -2315,6 +2390,8 @@ erts_init_check_io(int *argc, char **argv)
#endif

if (erts_sched_poll_enabled()) {
erts_mtx_init(&nif_select_fds_mtx, "drv_ev_nif_select", NIL,
ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_IO);
psiv[0].pollres_len = ERTS_CHECK_IO_POLL_RES_LEN;
psiv[0].pollres = erts_alloc(ERTS_ALC_T_POLLSET,
sizeof(ErtsPollResFd) * ERTS_CHECK_IO_POLL_RES_LEN);
Expand Down
2 changes: 1 addition & 1 deletion lib/kernel/src/inet.erl
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ See `t:gen_tcp:socket/0` and `t:gen_udp:socket/0`.
-doc """
Implementation backend selector for `t:socket/0`.
Selects the implementation backend for [sockets](`t:socket/0`.
Selects the implementation backend for [sockets](`t:socket/0`).
The current default is `inet` which uses `inet_drv.c` to call
the platform's socket API. The value `socket` instead uses
the `m:socket` module and its NIF implementation.
Expand Down

0 comments on commit 7babe94

Please sign in to comment.