diff --git a/src/cmd/flux-exec.c b/src/cmd/flux-exec.c index 0507b1e1f5fb..0af649c9a52c 100644 --- a/src/cmd/flux-exec.c +++ b/src/cmd/flux-exec.c @@ -190,8 +190,11 @@ void subprocess_update_credits (flux_subprocess_t *p, int bytes, bool reorder) void subprocess_remove_credits (flux_subprocess_t *p) { struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits"); - if (zlistx_delete (subprocess_credits, spcred->handle) < 0) - log_err_exit ("zlistx_delete"); + if (spcred->handle) { + if (zlistx_delete (subprocess_credits, spcred->handle) < 0) + log_err_exit ("zlistx_delete"); + spcred->handle = NULL; + } } void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state) @@ -306,6 +309,7 @@ static void stdin_cb (flux_reactor_t *r, log_err_exit ("fbuf_read"); if (lenp) { + int write_count = 0; p = zlistx_first (subprocesses); while (p) { if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT @@ -313,16 +317,38 @@ static void stdin_cb (flux_reactor_t *r, if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0) log_err_exit ("flux_subprocess_write"); if (stdin_enable_flow_control) { - /* N.B. since we are subtracting the same number - * of credits from all subprocesses, the sorted - * order in the credits list should not change + /* N.B. normally we are subtracting the same + * number of credits from all active subprocesses, + * so the sorted order in the credits list should + * not change. See possible corner case below. */ subprocess_update_credits (p, -1*len, false); } + write_count++; } p = zlistx_next (subprocesses); } if (stdin_enable_flow_control) { + /* N.B. under a racy scenario stdin_cb() could be called + * before state_cb(), so that a subprocess that has exited + * has not been "recognized" as exited. + * + * This can lead to subprocesses in the subprocess_credits + * list not having their credits updated above and + * subprocess_min_credits() returning an invalid value. + * + * Correct this by removing exited/failed subprocesses + * from the credits list, just like in state_cb(). + */ + if (write_count != zlistx_size (subprocess_credits)) { + p = zlistx_first (subprocesses); + while (p) { + if (flux_subprocess_state (p) == FLUX_SUBPROCESS_EXITED + || flux_subprocess_state (p) == FLUX_SUBPROCESS_FAILED) + subprocess_remove_credits (p); + p = zlistx_next (subprocesses); + } + } min_credits = subprocess_min_credits (); if (min_credits == 0) flux_watcher_stop (stdin_w);