Skip to content

Commit

Permalink
Merge pull request #6544 from chu11/issue6542_flux_exec_epipe
Browse files Browse the repository at this point in the history
flux-exec: fix credits list update race
  • Loading branch information
mergify[bot] authored Jan 13, 2025
2 parents 619cccf + e38bb34 commit 2bbe885
Showing 1 changed file with 31 additions and 5 deletions.
36 changes: 31 additions & 5 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,11 @@ void subprocess_update_credits (flux_subprocess_t *p, int bytes, bool reorder)
void subprocess_remove_credits (flux_subprocess_t *p)
{
struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits");
if (zlistx_delete (subprocess_credits, spcred->handle) < 0)
log_err_exit ("zlistx_delete");
if (spcred->handle) {
if (zlistx_delete (subprocess_credits, spcred->handle) < 0)
log_err_exit ("zlistx_delete");
spcred->handle = NULL;
}
}

void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
Expand Down Expand Up @@ -306,23 +309,46 @@ static void stdin_cb (flux_reactor_t *r,
log_err_exit ("fbuf_read");

if (lenp) {
int write_count = 0;
p = zlistx_first (subprocesses);
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0)
log_err_exit ("flux_subprocess_write");
if (stdin_enable_flow_control) {
/* N.B. since we are subtracting the same number
* of credits from all subprocesses, the sorted
* order in the credits list should not change
/* N.B. normally we are subtracting the same
* number of credits from all active subprocesses,
* so the sorted order in the credits list should
* not change. See possible corner case below.
*/
subprocess_update_credits (p, -1*len, false);
}
write_count++;
}
p = zlistx_next (subprocesses);
}
if (stdin_enable_flow_control) {
/* N.B. under a racy scenario stdin_cb() could be called
* before state_cb(), so that a subprocess that has exited
* has not been "recognized" as exited.
*
* This can lead to subprocesses in the subprocess_credits
* list not having their credits updated above and
* subprocess_min_credits() returning an invalid value.
*
* Correct this by removing exited/failed subprocesses
* from the credits list, just like in state_cb().
*/
if (write_count != zlistx_size (subprocess_credits)) {
p = zlistx_first (subprocesses);
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_EXITED
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_FAILED)
subprocess_remove_credits (p);
p = zlistx_next (subprocesses);
}
}
min_credits = subprocess_min_credits ();
if (min_credits == 0)
flux_watcher_stop (stdin_w);
Expand Down

0 comments on commit 2bbe885

Please sign in to comment.