Skip to content

Commit

Permalink
Merge pull request #6370 from chu11/issue4572_flux_exec
Browse files Browse the repository at this point in the history
flux-exec: use subprocess credits to avoid overflowing stdin buffers
  • Loading branch information
mergify[bot] authored Nov 2, 2024
2 parents 0fe32a4 + 4aea6bb commit 718a9ba
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 26 deletions.
173 changes: 148 additions & 25 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include "src/common/libsubprocess/fbuf_watcher.h"
#include "ccan/str/str.h"

#define NUMCMP(a,b) ((a)==(b)?0:((a)<(b)?-1:1))

static struct optparse_option cmdopts[] = {
{ .name = "rank", .key = 'r', .has_arg = 1, .arginfo = "IDSET",
.usage = "Specify target ranks. Default is \"all\"" },
Expand All @@ -53,6 +55,9 @@ static struct optparse_option cmdopts[] = {
{ .name = "setopt", .has_arg = 1, .arginfo = "NAME=VALUE",
.flags = OPTPARSE_OPT_HIDDEN,
.usage = "Set subprocess option NAME to VALUE (multiple use ok)" },
{ .name = "stdin-flow", .has_arg = 1, .arginfo = "on|off",
.flags = OPTPARSE_OPT_HIDDEN,
.usage = "Forcibly enable or disable stdin flow control" },
{ .name = "with-imp", .has_arg = 0,
.usage = "Run args under 'flux-imp run'" },
{ .name = "jobid", .key = 'j', .has_arg = 1, .arginfo = "JOBID",
Expand All @@ -72,12 +77,22 @@ int exit_code = 0;
zhashx_t *exitsets;
struct idset *hanging;

zlist_t *subprocesses;
zlistx_t *subprocesses;
/* subprocess credits ordered low to high. Exited and failed
* subprocesses are removed from the list.
*/
zlistx_t *subprocess_credits;

struct subproc_credit {
void *handle; /* handle to subprocess in credits list */
int credits;
};

optparse_t *opts = NULL;

int stdin_flags;
flux_watcher_t *stdin_w;
bool stdin_enable_flow_control = true;

/* time to wait in between SIGINTs */
#define INTERRUPT_MILLISECS 1000.0
Expand Down Expand Up @@ -146,15 +161,46 @@ void completion_cb (flux_subprocess_t *p)
log_err_exit ("idset_clear");
}

int subprocess_min_credits (void)
{
/* subprocess_credits ordered, min at head */
flux_subprocess_t *p = zlistx_head (subprocess_credits);
struct subproc_credit *spcred;
/* list possibly empty if all subprocesses failed, so return no
* credits so stdin watcher won't be started
*/
if (!p)
return 0;
spcred = flux_subprocess_aux_get (p, "credits");
return spcred->credits;
}

void subprocess_update_credits (flux_subprocess_t *p, int bytes, bool reorder)
{
struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits");
spcred->credits += bytes;
if (reorder)
zlistx_reorder (subprocess_credits, spcred->handle, false);
}

void subprocess_remove_credits (flux_subprocess_t *p)
{
struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits");
if (zlistx_delete (subprocess_credits, spcred->handle) < 0)
log_err_exit ("zlistx_delete");
}

void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
{
if (state == FLUX_SUBPROCESS_RUNNING) {
started++;
/* see FLUX_SUBPROCESS_FAILED case below */
(void)flux_subprocess_aux_set (p, "started", p, NULL);
}
else if (state == FLUX_SUBPROCESS_EXITED)
else if (state == FLUX_SUBPROCESS_EXITED) {
exited++;
subprocess_remove_credits (p);
}
else if (state == FLUX_SUBPROCESS_FAILED) {
/* FLUX_SUBPROCESS_FAILED is a catch all error case, no way to
* know if process started or not. So we cheat with a
Expand All @@ -163,11 +209,21 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
if (flux_subprocess_aux_get (p, "started") == NULL)
started++;
exited++;
subprocess_remove_credits (p);
}

if (stdin_w) {
if (started == rank_count)
flux_watcher_start (stdin_w);
if (started == rank_count) {
/* don't start stdin_w unless all subprocesses have
* received credits to write to stdin */
if (stdin_enable_flow_control) {
int min_credits = subprocess_min_credits ();
if (min_credits)
flux_watcher_start (stdin_w);
}
else
flux_watcher_start (stdin_w);
}
if (exited == rank_count)
flux_watcher_stop (stdin_w);
}
Expand Down Expand Up @@ -218,6 +274,16 @@ void output_cb (flux_subprocess_t *p, const char *stream)
}
}

void credit_cb (flux_subprocess_t *p, const char *stream, int bytes)
{
subprocess_update_credits (p, bytes, true);
if (started == rank_count) {
int min_credits = subprocess_min_credits ();
if (min_credits)
flux_watcher_start (stdin_w);
}
}

static void stdin_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
Expand All @@ -226,28 +292,44 @@ static void stdin_cb (flux_reactor_t *r,
struct fbuf *fb = fbuf_read_watcher_get_buffer (w);
flux_subprocess_t *p;
const char *ptr;
int lenp;
int len, lenp;
int min_credits = -1;

if (stdin_enable_flow_control)
min_credits = subprocess_min_credits ();

if (!(ptr = fbuf_read (fb, -1, &lenp)))
if (!(ptr = fbuf_read (fb, min_credits, &lenp)))
log_err_exit ("fbuf_read");

if (lenp) {
p = zlist_first (subprocesses);
p = zlistx_first (subprocesses);
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
if (flux_subprocess_write (p, "stdin", ptr, lenp) < 0)
if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0)
log_err_exit ("flux_subprocess_write");
if (stdin_enable_flow_control) {
/* N.B. since we are subtracting the same number
* of credits from all subprocesses, the sorted
* order in the credits list should not change
*/
subprocess_update_credits (p, -1*len, false);
}
}
p = zlist_next (subprocesses);
p = zlistx_next (subprocesses);
}
if (stdin_enable_flow_control) {
min_credits = subprocess_min_credits ();
if (min_credits == 0)
flux_watcher_stop (stdin_w);
}
}
else {
p = zlist_first (subprocesses);
p = zlistx_first (subprocesses);
while (p) {
if (flux_subprocess_close (p, "stdin") < 0)
log_err_exit ("flux_subprocess_close");
p = zlist_next (subprocesses);
p = zlistx_next (subprocesses);
}
flux_watcher_stop (stdin_w);
}
Expand Down Expand Up @@ -291,9 +373,9 @@ static flux_subprocess_t *imp_kill (flux_subprocess_t *p, int signum)
&ops);
}

static void killall (zlist_t *l, int signum)
static void killall (zlistx_t *l, int signum)
{
flux_subprocess_t *p = zlist_first (l);
flux_subprocess_t *p = zlistx_first (l);
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
if (use_imp) {
Expand All @@ -316,7 +398,7 @@ static void killall (zlist_t *l, int signum)
flux_future_destroy (f);
}
}
p = zlist_next (l);
p = zlistx_next (l);
}
}

Expand Down Expand Up @@ -361,12 +443,21 @@ static void signal_cb (int signum)
}
}

void subprocess_destroy (void *arg)
void subprocess_destroy (void **arg)
{
flux_subprocess_t *p = arg;
flux_subprocess_t *p = *arg;
flux_subprocess_destroy (p);
}

int subprocess_credits_compare (const void *item1, const void *item2)
{
flux_subprocess_t *p1 = (flux_subprocess_t *) item1;
flux_subprocess_t *p2 = (flux_subprocess_t *) item2;
struct subproc_credit *spcred1 = flux_subprocess_aux_get (p1, "credits");
struct subproc_credit *spcred2 = flux_subprocess_aux_get (p2, "credits");
return NUMCMP (spcred1->credits, spcred2->credits);
}

/* atexit handler
* This is a good faith attempt to restore stdin flags to what they were
* before we set O_NONBLOCK per bug #1803.
Expand Down Expand Up @@ -575,6 +666,7 @@ int main (int argc, char *argv[])
.on_channel_out = NULL,
.on_stdout = output_cb,
.on_stderr = output_cb,
.on_credit = credit_cb,
};
struct timespec t0;
const char *service_name;
Expand Down Expand Up @@ -704,18 +796,41 @@ int main (int argc, char *argv[])
free (nodeset);
}

if (!(subprocesses = zlist_new ()))
log_err_exit ("zlist_new");
if (!(subprocesses = zlistx_new ()))
log_err_exit ("zlistx_new");
zlistx_set_destructor (subprocesses, subprocess_destroy);

if (!(subprocess_credits = zlistx_new ()))
log_err_exit ("zlistx_new");
zlistx_set_comparator (subprocess_credits, subprocess_credits_compare);

if (!(exitsets = zhashx_new ()))
log_err_exit ("zhashx_new()");

service_name = optparse_get_str (opts,
"service",
job_service ? job_service : "rexec");

// sdexec stdin flow is disabled by default
if (streq (service_name, "sdexec"))
stdin_enable_flow_control = false;

const char *stdin_flow = optparse_get_str (opts, "stdin-flow", NULL);
if (stdin_flow) {
if (streq (stdin_flow, "off"))
stdin_enable_flow_control = false;
else if (streq (stdin_flow, "on"))
stdin_enable_flow_control = true;
else
log_msg_exit ("Set --stdin-flow to on or off");
}
if (!stdin_enable_flow_control)
ops.on_credit = NULL;

rank = idset_first (targets);
while (rank != IDSET_INVALID_ID) {
flux_subprocess_t *p;
struct subproc_credit *spcred;
if (!(p = flux_rexec_ex (h,
service_name,
rank,
Expand All @@ -725,10 +840,17 @@ int main (int argc, char *argv[])
NULL,
NULL)))
log_err_exit ("flux_rexec");
if (zlist_append (subprocesses, p) < 0)
log_err_exit ("zlist_append");
if (!zlist_freefn (subprocesses, p, subprocess_destroy, true))
log_err_exit ("zlist_freefn");
if (!(spcred = calloc (1, sizeof (*spcred))))
log_err_exit ("calloc");
if (!zlistx_add_end (subprocesses, p))
log_err_exit ("zlistx_add_end");
if (!(spcred->handle = zlistx_add_end (subprocess_credits, p)))
log_err_exit ("zlistx_add_end");
if (flux_subprocess_aux_set (p,
"credits",
spcred,
(flux_free_f) free) < 0)
log_err_exit ("flux_subprocess_aux_set");
rank = idset_next (targets, rank);
}

Expand All @@ -739,11 +861,11 @@ int main (int argc, char *argv[])
*/
if (optparse_getopt (opts, "noinput", NULL) > 0) {
flux_subprocess_t *p;
p = zlist_first (subprocesses);
p = zlistx_first (subprocesses);
while (p) {
if (flux_subprocess_close (p, "stdin") < 0)
log_err_exit ("flux_subprocess_close");
p = zlist_next (subprocesses);
p = zlistx_next (subprocesses);
}
}
/* configure stdin watcher
Expand Down Expand Up @@ -800,7 +922,8 @@ int main (int argc, char *argv[])
log_fini ();

zhashx_destroy (&exitsets);
zlist_destroy (&subprocesses);
zlistx_destroy (&subprocesses);
zlistx_destroy (&subprocess_credits);

return exit_code;
}
Expand Down
52 changes: 51 additions & 1 deletion t/t0005-exec.t
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,57 @@ test_expect_success 'stdin redirect from /dev/null works without -n' '
'

test_expect_success 'stdin redirect from /dev/null works with -n' '
test_expect_code 0 run_timeout 10 flux exec -n -r0-3 cat
test_expect_code 0 run_timeout 10 flux exec -n -r0-3 cat
'

test_expect_success 'create large file for tests' '
dd if=/dev/urandom of=5Mfile bs=5M count=1
'

test_expect_success 'create test script to redirect stdin to a file' '
cat <<-EOT >stdin2file &&
#!/bin/bash
rank=\$(flux getattr rank)
dd of=cpy.\$rank
EOT
chmod +x stdin2file
'

# piping a 5M file using a 4K buffer should overflow if flow control
# is not functioning correctly
test_expect_success 'stdin flow control works (1 rank)' '
cat 5Mfile | flux exec -r 0 --setopt=stdin_BUFSIZE=4096 ./stdin2file &&
cmp 5Mfile cpy.0 &&
rm cpy.0
'

test_expect_success 'stdin flow control works (all ranks)' '
cat 5Mfile | flux exec -r 0-3 --setopt=stdin_BUFSIZE=4096 ./stdin2file &&
cmp 5Mfile cpy.0 &&
cmp 5Mfile cpy.1 &&
cmp 5Mfile cpy.2 &&
cmp 5Mfile cpy.3 &&
rm cpy.*
'

test_expect_success 'create test script to redirect stdin to a file, one rank exits early' '
cat <<-EOT >stdin2file &&
#!/bin/bash
rank=\$(flux getattr rank)
if test \$rank -ne 0; then
dd of=cpy.\$rank
fi
EOT
chmod +x stdin2file
'

test_expect_success 'stdin flow control works (all ranks, one rank will exit early)' '
cat 5Mfile | flux exec -r 0-3 --setopt=stdin_BUFSIZE=4096 ./stdin2file &&
test_must_fail ls cpy.0 &&
cmp 5Mfile cpy.1 &&
cmp 5Mfile cpy.2 &&
cmp 5Mfile cpy.3 &&
rm cpy.*
'

test_expect_success 'stdin broadcast -- multiple lines' '
Expand Down

0 comments on commit 718a9ba

Please sign in to comment.