diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c index 3ec2b1dffa18..42af68f05842 100644 --- a/erts/emulator/beam/erl_lock_check.c +++ b/erts/emulator/beam/erl_lock_check.c @@ -143,6 +143,7 @@ static erts_lc_lock_order_t erts_lock_order[] = { {"proc_btm", "pid"}, {"safe_hash", "address"}, {"pollset", "address"}, + {"drv_ev_nif_select", NULL}, LEVEL, {"erl_db_catree_route_node", "index"}, {"proc_status", "pid"}, diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h index dfc9b4416ce8..7fbf615c4a9d 100644 --- a/erts/emulator/beam/erl_port_task.h +++ b/erts/emulator/beam/erl_port_task.h @@ -44,6 +44,7 @@ typedef erts_atomic_t ErtsPortTaskHandle; #if (defined(ERL_PROCESS_C__) \ || defined(ERL_PORT_TASK_C__) \ || defined(ERL_IO_C__) \ + || defined(ERL_CHECK_IO_C__) \ || (ERTS_GLB_INLINE_INCL_FUNC_DEF \ && defined(ERTS_DO_INCL_GLB_INLINE_FUNC_DEF))) #define ERTS_INCLUDE_SCHEDULER_INTERNALS @@ -138,6 +139,8 @@ ERTS_GLB_INLINE void erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched #if defined(ERTS_INCLUDE_SCHEDULER_INTERNALS) && ERTS_POLL_USE_SCHEDULER_POLLING ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void); +ERTS_GLB_INLINE void erts_port_task_inc_outstanding_io_tasks(void); +ERTS_GLB_INLINE void erts_port_task_dec_outstanding_io_tasks(void); /* NOTE: Do not access any of the exported variables directly */ extern erts_atomic_t erts_port_task_outstanding_io_tasks; #endif @@ -226,6 +229,18 @@ erts_port_task_have_outstanding_io_tasks(void) return (erts_atomic_read_acqb(&erts_port_task_outstanding_io_tasks) != 0); } + +ERTS_GLB_INLINE void +erts_port_task_inc_outstanding_io_tasks(void) +{ + erts_atomic_inc_nob(&erts_port_task_outstanding_io_tasks); +} + +ERTS_GLB_INLINE void +erts_port_task_dec_outstanding_io_tasks(void) +{ + erts_atomic_dec_nob(&erts_port_task_outstanding_io_tasks); +} #endif #endif diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index 8453547ee867..c9fe5a0f5cb9 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -35,6 +35,7 @@ #include "sys.h" #include "global.h" #include "erl_port.h" +#include "erl_port_task.h" #include "erl_check_io.h" #include "erl_thr_progress.h" #include "erl_bif_unique.h" @@ -51,7 +52,7 @@ (STATE) ? (STATE)->fd : (ErtsSysFdType)-1, ##__VA_ARGS__, \ ev2str((STATE) ? (STATE)->events : ERTS_POLL_EV_NONE), \ ev2str((STATE) ? (STATE)->active_events : ERTS_POLL_EV_NONE), \ - (STATE) ? flag2str((STATE)->flags) : ERTS_EV_FLAG_CLEAR) + (STATE) ? event_state_flag_to_str((STATE)->flags) : ERTS_EV_FLAG_CLEAR) #define DEBUG_PRINT_MODE #else #define DEBUG_PRINT(...) @@ -75,24 +76,27 @@ typedef enum { } EventStateType; typedef enum { - ERTS_EV_FLAG_CLEAR = 0, - ERTS_EV_FLAG_USED = 1, /* ERL_DRV_USE has been turned on */ + ERTS_EV_FLAG_CLEAR = 0x0, + ERTS_EV_FLAG_USED = 0x1, /* ERL_DRV_USE has been turned on */ #if ERTS_POLL_USE_SCHEDULER_POLLING - ERTS_EV_FLAG_SCHEDULER = 2, /* Set when the fd has been migrated + ERTS_EV_FLAG_SCHEDULER = 0x2, /* Set when the fd has been migrated to scheduler pollset */ - ERTS_EV_FLAG_IN_SCHEDULER = 4, /* Set when the fd is currently in + ERTS_EV_FLAG_IN_SCHEDULER = 0x4, /* Set when the fd is currently in scheduler pollset */ #else ERTS_EV_FLAG_SCHEDULER = ERTS_EV_FLAG_CLEAR, ERTS_EV_FLAG_IN_SCHEDULER = ERTS_EV_FLAG_CLEAR, #endif #ifdef ERTS_POLL_USE_FALLBACK - ERTS_EV_FLAG_FALLBACK = 8, /* Set when kernel poll rejected fd + ERTS_EV_FLAG_FALLBACK = 0x8, /* Set when kernel poll rejected fd and it was put in the nkp version */ #else ERTS_EV_FLAG_FALLBACK = ERTS_EV_FLAG_CLEAR, #endif ERTS_EV_FLAG_WANT_ERROR = 0x10, /* ERL_NIF_SELECT_ERROR turned on */ +#if ERTS_POLL_USE_SCHEDULER_POLLING + ERTS_EV_FLAG_NIF_SELECT = 0x20, /* Set if a nif select message is in-flight */ +#endif /* Combinations, defined only to be displayed by debugger (gdb) */ ERTS_EV_FLAG_USED_FALLBACK = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_FALLBACK, @@ -171,6 +175,10 @@ typedef struct { without being deselected. */ } ErtsDrvEventState; +static erts_mtx_t nif_select_fds_mtx; +static ErtsSysFdType nif_select_fds[256]; +static Uint16 nif_select_cnt = 0; + struct drv_ev_state_shared { union { @@ -697,9 +705,20 @@ static void prepare_select_msg(struct erts_nif_select_event* e, Eterm erts_io_handle_nif_select(ErtsMessage *sig) { + ErtsDrvEventState *state; ErtsNifSelectSignalData* xsig = (ErtsNifSelectSignalData*) (char *) (&sig->hfrag.mem[0] + sig->hfrag.used_size); ASSERT(sig->hfrag.used_size < sig->hfrag.alloc_size); + +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (sched_pollset) { + erts_mtx_lock(&nif_select_fds_mtx); + erts_port_task_dec_outstanding_io_tasks(); +// erts_fprintf(stderr, "Enqueue %d\r\n", xsig->evt); + nif_select_fds[nif_select_cnt++] = xsig->evt; + erts_mtx_unlock(&nif_select_fds_mtx); + } +#endif return xsig->message; } @@ -707,6 +726,8 @@ static ERTS_INLINE void send_select_msg(struct erts_nif_select_event* e) { if (!erts_proc_sig_send_nif_select(e->pid, e->mp)) { erts_cleanup_messages(e->mp); + } else { + erts_port_task_inc_outstanding_io_tasks(); } } @@ -1192,11 +1213,26 @@ enif_select_x(ErlNifEnv* env, old_events = state->events; if (on) { + if (state->type == ERTS_EV_TYPE_NONE) + ctl_op = ERTS_POLL_OP_ADD; + else { + if (!(state->flags & ERTS_EV_FLAG_IN_SCHEDULER) && + (ctl_events == ERTS_POLL_EV_IN)) { + if (!(state->flags & ERTS_EV_FLAG_FALLBACK) && + (state->flags & ERTS_EV_FLAG_NIF_SELECT) && + erts_sched_poll_enabled()) + state->count++; + if (state->count > 10 && erts_sched_poll_enabled()) { + if (!(state->flags & ERTS_EV_FLAG_SCHEDULER)) + ctl_op = ERTS_POLL_OP_ADD; + state->flags |= ERTS_EV_FLAG_IN_SCHEDULER|ERTS_EV_FLAG_SCHEDULER; + DEBUG_PRINT_FD("moving to scheduler ps", state); + } + } + } ctl_events &= ~old_events; state->events |= ctl_events; state->active_events |= ctl_events; - if (state->type == ERTS_EV_TYPE_NONE) - ctl_op = ERTS_POLL_OP_ADD; if (ctl_events & ERTS_POLL_EV_ERR) state->flags |= ERTS_EV_FLAG_WANT_ERROR; } @@ -1727,6 +1763,33 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only int poll_ret, i; ERTS_MSACC_PUSH_AND_SET_STATE(ERTS_MSACC_STATE_CHECK_IO); + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + int is_normal_sched = !!esdp && esdp->type == ERTS_SCHED_NORMAL; + + if (is_normal_sched) { + ErtsSysFdType fds[256]; + Uint16 cnt; + erts_mtx_lock(&nif_select_fds_mtx); + memcpy(fds, nif_select_fds, nif_select_cnt * sizeof(ErtsSysFdType)); + cnt = nif_select_cnt; + // if (nif_select_cnt) + // erts_fprintf(stderr, "%d: Need to clear %d fds\r\n", esdp->no, nif_select_cnt); + nif_select_cnt = 0; + erts_mtx_unlock(&nif_select_fds_mtx); + for (int i = 0; i < cnt; i++) { + ErtsDrvEventState *state; + ErtsSysFdType fd = fds[i]; + erts_mtx_t *mtx = fd_mtx(fd); + erts_mtx_lock(mtx); + state = get_drv_ev_state(fd); + if (state) { + // erts_fprintf(stderr, "Clearing %d\r\n", fd); + state->flags &= ~ERTS_EV_FLAG_NIF_SELECT; + } + erts_mtx_unlock(mtx); + } + } + restart: #ifdef ERTS_ENABLE_LOCK_CHECK @@ -1773,6 +1836,9 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only erl_errno_id(poll_ret), poll_ret); erts_send_error_to_logger_nogl(dsbufp); } + // if (is_normal_sched) { + // erts_fprintf(stderr, "%d: woke up\r\n", esdp->no); + // } ERTS_MSACC_POP_STATE(); return; } @@ -1914,6 +1980,11 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only check_fd_cleanup(state, &free_select, &free_nif); } +#if ERTS_POLL_USE_SCHEDULER_POLLING + // erts_fprintf(stderr, "Setting flag on %d\r\n", fd); + state->flags |= ERTS_EV_FLAG_NIF_SELECT; +#endif + erts_mtx_unlock(fd_mtx(fd)); if (is_not_nil(in_ev.pid)) { @@ -2002,6 +2073,10 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only sizeof(ErtsPollResFd) * psi->pollres_len); } + // if (is_normal_sched) { + // erts_fprintf(stderr, "%d: woke up\r\n", esdp->no); + // } + ERTS_MSACC_POP_STATE(); } @@ -2315,6 +2390,8 @@ erts_init_check_io(int *argc, char **argv) #endif if (erts_sched_poll_enabled()) { + erts_mtx_init(&nif_select_fds_mtx, "drv_ev_nif_select", NIL, + ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_IO); psiv[0].pollres_len = ERTS_CHECK_IO_POLL_RES_LEN; psiv[0].pollres = erts_alloc(ERTS_ALC_T_POLLSET, sizeof(ErtsPollResFd) * ERTS_CHECK_IO_POLL_RES_LEN); diff --git a/lib/kernel/src/inet.erl b/lib/kernel/src/inet.erl index ba1d303c1f83..c2d6c7028db7 100644 --- a/lib/kernel/src/inet.erl +++ b/lib/kernel/src/inet.erl @@ -483,7 +483,7 @@ See `t:gen_tcp:socket/0` and `t:gen_udp:socket/0`. -doc """ Implementation backend selector for `t:socket/0`. -Selects the implementation backend for [sockets](`t:socket/0`. +Selects the implementation backend for [sockets](`t:socket/0`). The current default is `inet` which uses `inet_drv.c` to call the platform's socket API. The value `socket` instead uses the `m:socket` module and its NIF implementation.