diff --git a/src/cmd/flux-exec.c b/src/cmd/flux-exec.c index 7b5223d87fa8..173f127bb107 100644 --- a/src/cmd/flux-exec.c +++ b/src/cmd/flux-exec.c @@ -32,6 +32,8 @@ #include "src/common/libsubprocess/fbuf_watcher.h" #include "ccan/str/str.h" +#define NUMCMP(a,b) ((a)==(b)?0:((a)<(b)?-1:1)) + static struct optparse_option cmdopts[] = { { .name = "rank", .key = 'r', .has_arg = 1, .arginfo = "IDSET", .usage = "Specify target ranks. Default is \"all\"" }, @@ -53,6 +55,9 @@ static struct optparse_option cmdopts[] = { { .name = "setopt", .has_arg = 1, .arginfo = "NAME=VALUE", .flags = OPTPARSE_OPT_HIDDEN, .usage = "Set subprocess option NAME to VALUE (multiple use ok)" }, + { .name = "stdin-flow", .has_arg = 1, .arginfo = "on|off", + .flags = OPTPARSE_OPT_HIDDEN, + .usage = "Forcibly enable or disable stdin flow control" }, { .name = "with-imp", .has_arg = 0, .usage = "Run args under 'flux-imp run'" }, { .name = "jobid", .key = 'j', .has_arg = 1, .arginfo = "JOBID", @@ -72,12 +77,22 @@ int exit_code = 0; zhashx_t *exitsets; struct idset *hanging; -zlist_t *subprocesses; +zlistx_t *subprocesses; +/* subprocess credits ordered low to high. Exited and failed + * subprocesses are removed from the list. + */ +zlistx_t *subprocess_credits; + +struct subproc_credit { + void *handle; /* handle to subprocess in credits list */ + int credits; +}; optparse_t *opts = NULL; int stdin_flags; flux_watcher_t *stdin_w; +bool stdin_enable_flow_control = true; /* time to wait in between SIGINTs */ #define INTERRUPT_MILLISECS 1000.0 @@ -146,6 +161,35 @@ void completion_cb (flux_subprocess_t *p) log_err_exit ("idset_clear"); } +int subprocess_min_credits (void) +{ + /* subprocess_credits ordered, min at head */ + flux_subprocess_t *p = zlistx_head (subprocess_credits); + struct subproc_credit *spcred; + /* list possibly empty if all subprocesses failed, so return no + * credits so stdin watcher won't be started + */ + if (!p) + return 0; + spcred = flux_subprocess_aux_get (p, "credits"); + return spcred->credits; +} + +void subprocess_update_credits (flux_subprocess_t *p, int bytes, bool reorder) +{ + struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits"); + spcred->credits += bytes; + if (reorder) + zlistx_reorder (subprocess_credits, spcred->handle, false); +} + +void subprocess_remove_credits (flux_subprocess_t *p) +{ + struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits"); + if (zlistx_delete (subprocess_credits, spcred->handle) < 0) + log_err_exit ("zlistx_delete"); +} + void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state) { if (state == FLUX_SUBPROCESS_RUNNING) { @@ -153,8 +197,10 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state) /* see FLUX_SUBPROCESS_FAILED case below */ (void)flux_subprocess_aux_set (p, "started", p, NULL); } - else if (state == FLUX_SUBPROCESS_EXITED) + else if (state == FLUX_SUBPROCESS_EXITED) { exited++; + subprocess_remove_credits (p); + } else if (state == FLUX_SUBPROCESS_FAILED) { /* FLUX_SUBPROCESS_FAILED is a catch all error case, no way to * know if process started or not. So we cheat with a @@ -163,11 +209,21 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state) if (flux_subprocess_aux_get (p, "started") == NULL) started++; exited++; + subprocess_remove_credits (p); } if (stdin_w) { - if (started == rank_count) - flux_watcher_start (stdin_w); + if (started == rank_count) { + /* don't start stdin_w unless all subprocesses have + * received credits to write to stdin */ + if (stdin_enable_flow_control) { + int min_credits = subprocess_min_credits (); + if (min_credits) + flux_watcher_start (stdin_w); + } + else + flux_watcher_start (stdin_w); + } if (exited == rank_count) flux_watcher_stop (stdin_w); } @@ -218,6 +274,16 @@ void output_cb (flux_subprocess_t *p, const char *stream) } } +void credit_cb (flux_subprocess_t *p, const char *stream, int bytes) +{ + subprocess_update_credits (p, bytes, true); + if (started == rank_count) { + int min_credits = subprocess_min_credits (); + if (min_credits) + flux_watcher_start (stdin_w); + } +} + static void stdin_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, @@ -226,28 +292,44 @@ static void stdin_cb (flux_reactor_t *r, struct fbuf *fb = fbuf_read_watcher_get_buffer (w); flux_subprocess_t *p; const char *ptr; - int lenp; + int len, lenp; + int min_credits = -1; + + if (stdin_enable_flow_control) + min_credits = subprocess_min_credits (); - if (!(ptr = fbuf_read (fb, -1, &lenp))) + if (!(ptr = fbuf_read (fb, min_credits, &lenp))) log_err_exit ("fbuf_read"); if (lenp) { - p = zlist_first (subprocesses); + p = zlistx_first (subprocesses); while (p) { if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT || flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) { - if (flux_subprocess_write (p, "stdin", ptr, lenp) < 0) + if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0) log_err_exit ("flux_subprocess_write"); + if (stdin_enable_flow_control) { + /* N.B. since we are subtracting the same number + * of credits from all subprocesses, the sorted + * order in the credits list should not change + */ + subprocess_update_credits (p, -1*len, false); + } } - p = zlist_next (subprocesses); + p = zlistx_next (subprocesses); + } + if (stdin_enable_flow_control) { + min_credits = subprocess_min_credits (); + if (min_credits == 0) + flux_watcher_stop (stdin_w); } } else { - p = zlist_first (subprocesses); + p = zlistx_first (subprocesses); while (p) { if (flux_subprocess_close (p, "stdin") < 0) log_err_exit ("flux_subprocess_close"); - p = zlist_next (subprocesses); + p = zlistx_next (subprocesses); } flux_watcher_stop (stdin_w); } @@ -291,9 +373,9 @@ static flux_subprocess_t *imp_kill (flux_subprocess_t *p, int signum) &ops); } -static void killall (zlist_t *l, int signum) +static void killall (zlistx_t *l, int signum) { - flux_subprocess_t *p = zlist_first (l); + flux_subprocess_t *p = zlistx_first (l); while (p) { if (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) { if (use_imp) { @@ -316,7 +398,7 @@ static void killall (zlist_t *l, int signum) flux_future_destroy (f); } } - p = zlist_next (l); + p = zlistx_next (l); } } @@ -361,12 +443,21 @@ static void signal_cb (int signum) } } -void subprocess_destroy (void *arg) +void subprocess_destroy (void **arg) { - flux_subprocess_t *p = arg; + flux_subprocess_t *p = *arg; flux_subprocess_destroy (p); } +int subprocess_credits_compare (const void *item1, const void *item2) +{ + flux_subprocess_t *p1 = (flux_subprocess_t *) item1; + flux_subprocess_t *p2 = (flux_subprocess_t *) item2; + struct subproc_credit *spcred1 = flux_subprocess_aux_get (p1, "credits"); + struct subproc_credit *spcred2 = flux_subprocess_aux_get (p2, "credits"); + return NUMCMP (spcred1->credits, spcred2->credits); +} + /* atexit handler * This is a good faith attempt to restore stdin flags to what they were * before we set O_NONBLOCK per bug #1803. @@ -575,6 +666,7 @@ int main (int argc, char *argv[]) .on_channel_out = NULL, .on_stdout = output_cb, .on_stderr = output_cb, + .on_credit = credit_cb, }; struct timespec t0; const char *service_name; @@ -704,8 +796,13 @@ int main (int argc, char *argv[]) free (nodeset); } - if (!(subprocesses = zlist_new ())) - log_err_exit ("zlist_new"); + if (!(subprocesses = zlistx_new ())) + log_err_exit ("zlistx_new"); + zlistx_set_destructor (subprocesses, subprocess_destroy); + + if (!(subprocess_credits = zlistx_new ())) + log_err_exit ("zlistx_new"); + zlistx_set_comparator (subprocess_credits, subprocess_credits_compare); if (!(exitsets = zhashx_new ())) log_err_exit ("zhashx_new()"); @@ -713,9 +810,27 @@ int main (int argc, char *argv[]) service_name = optparse_get_str (opts, "service", job_service ? job_service : "rexec"); + + // sdexec stdin flow is disabled by default + if (streq (service_name, "sdexec")) + stdin_enable_flow_control = false; + + const char *stdin_flow = optparse_get_str (opts, "stdin-flow", NULL); + if (stdin_flow) { + if (streq (stdin_flow, "off")) + stdin_enable_flow_control = false; + else if (streq (stdin_flow, "on")) + stdin_enable_flow_control = true; + else + log_msg_exit ("Set --stdin-flow to on or off"); + } + if (!stdin_enable_flow_control) + ops.on_credit = NULL; + rank = idset_first (targets); while (rank != IDSET_INVALID_ID) { flux_subprocess_t *p; + struct subproc_credit *spcred; if (!(p = flux_rexec_ex (h, service_name, rank, @@ -725,10 +840,17 @@ int main (int argc, char *argv[]) NULL, NULL))) log_err_exit ("flux_rexec"); - if (zlist_append (subprocesses, p) < 0) - log_err_exit ("zlist_append"); - if (!zlist_freefn (subprocesses, p, subprocess_destroy, true)) - log_err_exit ("zlist_freefn"); + if (!(spcred = calloc (1, sizeof (*spcred)))) + log_err_exit ("calloc"); + if (!zlistx_add_end (subprocesses, p)) + log_err_exit ("zlistx_add_end"); + if (!(spcred->handle = zlistx_add_end (subprocess_credits, p))) + log_err_exit ("zlistx_add_end"); + if (flux_subprocess_aux_set (p, + "credits", + spcred, + (flux_free_f) free) < 0) + log_err_exit ("flux_subprocess_aux_set"); rank = idset_next (targets, rank); } @@ -739,11 +861,11 @@ int main (int argc, char *argv[]) */ if (optparse_getopt (opts, "noinput", NULL) > 0) { flux_subprocess_t *p; - p = zlist_first (subprocesses); + p = zlistx_first (subprocesses); while (p) { if (flux_subprocess_close (p, "stdin") < 0) log_err_exit ("flux_subprocess_close"); - p = zlist_next (subprocesses); + p = zlistx_next (subprocesses); } } /* configure stdin watcher @@ -800,7 +922,8 @@ int main (int argc, char *argv[]) log_fini (); zhashx_destroy (&exitsets); - zlist_destroy (&subprocesses); + zlistx_destroy (&subprocesses); + zlistx_destroy (&subprocess_credits); return exit_code; } diff --git a/t/t0005-exec.t b/t/t0005-exec.t index d05c39505851..b9236e5f40b0 100755 --- a/t/t0005-exec.t +++ b/t/t0005-exec.t @@ -247,7 +247,57 @@ test_expect_success 'stdin redirect from /dev/null works without -n' ' ' test_expect_success 'stdin redirect from /dev/null works with -n' ' - test_expect_code 0 run_timeout 10 flux exec -n -r0-3 cat + test_expect_code 0 run_timeout 10 flux exec -n -r0-3 cat +' + +test_expect_success 'create large file for tests' ' + dd if=/dev/urandom of=5Mfile bs=5M count=1 +' + +test_expect_success 'create test script to redirect stdin to a file' ' + cat <<-EOT >stdin2file && + #!/bin/bash + rank=\$(flux getattr rank) + dd of=cpy.\$rank + EOT + chmod +x stdin2file +' + +# piping a 5M file using a 4K buffer should overflow if flow control +# is not functioning correctly +test_expect_success 'stdin flow control works (1 rank)' ' + cat 5Mfile | flux exec -r 0 --setopt=stdin_BUFSIZE=4096 ./stdin2file && + cmp 5Mfile cpy.0 && + rm cpy.0 +' + +test_expect_success 'stdin flow control works (all ranks)' ' + cat 5Mfile | flux exec -r 0-3 --setopt=stdin_BUFSIZE=4096 ./stdin2file && + cmp 5Mfile cpy.0 && + cmp 5Mfile cpy.1 && + cmp 5Mfile cpy.2 && + cmp 5Mfile cpy.3 && + rm cpy.* +' + +test_expect_success 'create test script to redirect stdin to a file, one rank exits early' ' + cat <<-EOT >stdin2file && + #!/bin/bash + rank=\$(flux getattr rank) + if test \$rank -ne 0; then + dd of=cpy.\$rank + fi + EOT + chmod +x stdin2file +' + +test_expect_success 'stdin flow control works (all ranks, one rank will exit early)' ' + cat 5Mfile | flux exec -r 0-3 --setopt=stdin_BUFSIZE=4096 ./stdin2file && + test_must_fail ls cpy.0 && + cmp 5Mfile cpy.1 && + cmp 5Mfile cpy.2 && + cmp 5Mfile cpy.3 && + rm cpy.* ' test_expect_success 'stdin broadcast -- multiple lines' '