Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flux-exec: use subprocess credits to avoid overflowing stdin buffers #6370

Merged
merged 5 commits into from
Nov 2, 2024

Conversation

chu11
Copy link
Member

@chu11 chu11 commented Oct 15, 2024

built on top of #6353, splitting it off since it's a separate issue

@chu11 chu11 changed the title WIP: flux-exec: use subprocess credits to avoid overflowing stdin buffers flux-exec: use subprocess credits to avoid overflowing stdin buffers Oct 15, 2024
@chu11
Copy link
Member Author

chu11 commented Oct 15, 2024

removed WIP, I think the tweak to flux-exec is good. I wish we could avoid the looping calculation of min_credits, but I'm not sure that updating to a min-heap kind of data structure is worth the effort.

longer out, a no credits kind of option could be on the table if that loop poses some slowdown issue

@chu11 chu11 force-pushed the issue4572_flux_exec branch 2 times, most recently from 5b7f812 to ee93c43 Compare October 17, 2024 20:32
@chu11 chu11 force-pushed the issue4572_flux_exec branch 2 times, most recently from a8256a5 to 5da21f7 Compare October 24, 2024 23:33
@chu11
Copy link
Member Author

chu11 commented Oct 25, 2024

as an aside, I looked into using "negative credits" to get the flux-exec stdin going faster. For the moment, decided not to add it. It would require some "did I send the first wave of data" flags, and didn't think it super worth it.

@garlick
Copy link
Member

garlick commented Oct 27, 2024

I noticed two problems here:

  • doesn't work with sdexec because sdexec doesn't yet accept the write-credit flag added to RFC 42.
  • if one subprocess fails with zero credit, and the procs are reading stdin, the whole exec could deadlock as the failed proc credits will never be replenished

Also I worry a bit about about iterating over all subprocs to find the minimum each time on_credit is called (think el cap size).

On the first problem, we could make stdin flow control optional, and then turn it off if the service is sdexec (for now). That would also allow us to more carefully evaluate the performance vs what we had before.

On the performance concern, maybe subprocesses could go in a zlistx ordered by credit count. When on_credit changes the credit value of one proc, call zlistx_reorder with low_value=false. When it changes for all after a write, the sort order shouldn't change since all procs' credit changes by the same amount. Call zlistx_first() to find the minimum value. Failed/exited procs could be removed from the list (triggering a check for the new minimum).

@garlick
Copy link
Member

garlick commented Oct 28, 2024

FWIW this adds an option to enable/disable flow control and disables flow control by default if the service is sdexec: 6ede342

@chu11
Copy link
Member Author

chu11 commented Oct 28, 2024

On the performance concern, maybe subprocesses could go in a zlistx ordered by credit count. When on_credit changes the credit value of one proc, call zlistx_reorder with low_value=false. When it changes for all after a write, the sort order shouldn't change since all procs' credit changes by the same amount. Call zlistx_first() to find the minimum value. Failed/exited procs could be removed from the list (triggering a check for the new minimum).

Thinking about this, this probably would be faster, but it appears to be a slightly faster O(n) vs a slightly slower O(n) (i.e. the reorders should average n/2 iterations of the list vs n for min-credits, and when stdin_cb is called, we don't iterate the credits list).

Per my comment above, maybe we want to go down the min-heap data structure path? It seems ccan has a heap data structure. So now we're guaranteed O(log n) for every min-credits reorder.

Edit: as I am skimming the ccan heap data structure, I think this is a good move, i'm gonna give it a whirl

Edit2: Oh crap, I cannot remove an element from the ccan heap ...

@garlick
Copy link
Member

garlick commented Oct 28, 2024 via email

@chu11
Copy link
Member Author

chu11 commented Oct 28, 2024

Since we only reorder after adding credits (not when subtracting), it seems like the list should be O(1) for the common case
where the same number of credits is being added across many subprocesses. IOW there will be a bunch of clients that all end up with the same credit value at the end of the list, so if we tell zlistx_reorder() to search for the new position at the tail of the list after an add-credit response, it is likely to be added as the last item, O(1).

Ahhh, I think you're right. I got the low-value logic mixed up.

@chu11
Copy link
Member Author

chu11 commented Oct 28, 2024

if one subprocess fails with zero credit, and the procs are reading stdin, the whole exec could deadlock as the failed proc credits will never be replenished

Skimming through the code, we don't handle this generically for stdin and subprocesses right now. i.e. we can call flux_subprocess_write() for a subprocess that already failed. So we should probably fix that first. I'll write up an issue.

@chu11 chu11 force-pushed the issue4572_flux_exec branch 2 times, most recently from 9c0eefa to 57671eb Compare October 29, 2024 20:39
@chu11
Copy link
Member Author

chu11 commented Oct 29, 2024

re-pushed with fixes per discussion above, and added a tweaked version of the --stdin-flow option on top.

No tests at the moment. Wondering if we could use a similar hidden option to create a very small buffer and run something simliar to the original cat bigfile | flux exec .... reproducer.

@garlick
Copy link
Member

garlick commented Oct 29, 2024

and added a tweaked version of the --stdin-flow option on top.

What was tweaked? I'm not spotting it.

Wondering if we could use a similar hidden option to create a very small buffer and run something simliar to the original cat bigfile | flux exec .... reproducer.

Sounds good! Note there is a hidden --setopt function that can be used to set any subprocess option including buffer sizes.

Copy link
Member

@garlick garlick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple quick initial comments

Comment on lines -364 to 450
void subprocess_destroy (void *arg)
void subprocess_destroy (void **arg)
{
flux_subprocess_t *p = arg;
flux_subprocess_t *p = *arg;
flux_subprocess_destroy (p);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set *arg = NULL after free.

Comment on lines 810 to 848
if (!(handle = zlistx_add_end (subprocess_credits, credits)))
log_err_exit ("zlistx_add_end");
Copy link
Member

@garlick garlick Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh I would have thought you would leave the credits in the subprocess aux container like you had before and put the subprocess in the zlistx, not put credits in the zlistx and the list handle in the subprocess aux container.

The only worry I have about caching list handles is that one has to remember the undocumented fact that the handles remain valid on a zlistx_reorder() but are invalidated by zlistx_sort(). What if someone who doesn't remember this comes along and changes something and needs to sort the list? The result might be a hard to detect bug.

If you want to keep it this way better add a comment like

Beware: calling zlistx_sort() on subprocess_credits invalidates cached list handles.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh I would have thought you would leave the credits in the subprocess aux container like you had before and put the subprocess in the zlistx, not the put credits in the zlistx and the list handle in the subprocess aux container.

Ya know, originally I stored the integer credits in the zlistx but disliked the fact I also had to store the "handle" as an aux as well. It's sort of fallout from the fact that most uses of zlistx have a struct that contains the item + handle in one handy location. But I didn't think of storing the subprocess pointer in the "credits" list. Let me try it that way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized this is the first go and "like you had it before" was just how I imagined it :-)

@chu11
Copy link
Member Author

chu11 commented Oct 29, 2024

What was tweaked? I'm not spotting it.

Sorry, only minor tweaks (fixed a typo, fix conflicts), nothing major.

Sounds good! Note there is a hidden --setopt function that can be used to set any subprocess option including buffer sizes.

Oh sweet! Didn't see that.

@chu11
Copy link
Member Author

chu11 commented Oct 29, 2024

re-pushed, tweaked things by putting the subprocess pointer into the "credits" list. Wrapping all of the "get handle" and "get credits" pointers into functions and hiding all those details makes it not as annoying as I thought it would be.

Also added a test. Piping a 50 meg file via a 4096 stdin buffer. If flow control wasn't working, would definitely overflow. Hopefully this is not too large of a test for the CI. We'll see. Could scale it down if it turns out to be.

Copy link
Member

@garlick garlick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good!

Maybe it would be good to add a variant of the test you added where one remote proc exits immediately but the other procs are checked to ensure they still get all the data?

Comment on lines 813 to 828
if (!(credits = calloc (1, sizeof (int))))
log_err_exit ("calloc");
if (!zlistx_add_end (subprocesses, p))
log_err_exit ("zlistx_add_end");
if (!(handle = zlistx_add_end (subprocess_credits, p)))
log_err_exit ("zlistx_add_end");
if (flux_subprocess_aux_set (p,
"credits",
credits,
(flux_free_f) free) < 0)
log_err_exit ("flux_subprocess_aux_set");
if (flux_subprocess_aux_set (p,
"credits_handle",
handle,
NULL) < 0)
log_err_exit ("flux_subprocess_aux_set");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: put credits and handle in a struct and alloc the struct. Then you have only one aux_set/aux_get to do to access, and fewer memory allocs.

if (!p)
return 0;
credits = flux_subprocess_aux_get (p, "credits");
assert (credits);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the assertions that the aux items exist are particularly helpful since we treat failing to set one as a fatal error during initialization.

t/t0005-exec.t Outdated
Comment on lines 260 to 261
cat 50Mfile | flux exec -r 0 --setopt=stdin_BUFSIZE=4096 sed -n "w 50Mcpy" &&
diff 50Mfile 50Mcpy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use cmp instead of diff since the 50Mfile may not be text.

Maybe dd of=50Mcpy would be clearer than the sed call?

Comment on lines 438 to 459
int subprocess_credits_compare (const void *item1, const void *item2)
{
flux_subprocess_t *p1 = (flux_subprocess_t *) item1;
flux_subprocess_t *p2 = (flux_subprocess_t *) item2;
int *credits1 = flux_subprocess_aux_get (p1, "credits");
int *credits2 = flux_subprocess_aux_get (p2, "credits");
return (*credits1 - *credits2);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't strictly follow instructions of returning -1, 0, or 1 from the zlistx docs:

//  Set a user-defined comparator for zlistx_find and zlistx_sort; the method
//  must return -1, 0, or 1 depending on whether item1 is less than, equal to,
//  or greater than, item2.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhh, good catch. I was actually copying from another place in flux-core. Perhaps we should audit and correct that elsewhere.

@chu11
Copy link
Member Author

chu11 commented Oct 30, 2024

re-pushed, fixing per comments above and adding some flux-exec to > 1 rank tests, and per comment above, one test the subprocess exits early. It's a good test, caught a corner case!

Copy link
Member

@garlick garlick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of minor comments but I think this looks pretty good!

Comment on lines 277 to 278
else
flux_watcher_stop (stdin_w);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this else clause is needed since this only adds credits, so the current min_credits is never reduced over what might have come before when the watcher was started?

Comment on lines 292 to +301

if (!(ptr = fbuf_read (fb, -1, &lenp)))
if (!(ptr = fbuf_read (fb, min_credits, &lenp)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a yes but are we certain min_credits is always > 0 here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmmmm. It should be impossible, discounting unknown or future bugs. Think we should assert check just in case/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe nothing is needed, but I suppose it could just stop the watcher and return if credits is zero.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that sounds like a decent idea.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait! i think it is possible to be 0, although extremely unlikely, when the stream is closed. should add some smarts in there for that case.

Copy link
Member Author

@chu11 chu11 Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i had to think about it a bit. I think it is possible min_credits could be zero if the remote subprocess has a fatal error (like libsubprocess ENOMEM kinda thing), so we're at 0 credits locally and never get credits back. But since this is only in an error scenario, I don't think it matters. And since the stdin_w is already stopped, it doesn't matter in another way :-)

Although, this did make me think of a corner case to fix. If I call subprocess_remove_credits(), it's possible that min_credits may no longer be zero. So I should re-check and start the stdin_w if necessary.

Edit: actually i don't need to make a change, the code actually does this already by chance

Comment on lines 310 to 311
if (updated_credits == 0)
flux_watcher_stop (stdin_w);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just not have subprocess_update_credits() return anything and make a call to subprocess_min_credits() once after the loop and stop the watcher there if zero, since that's now a cheapish operation.

@chu11
Copy link
Member Author

chu11 commented Oct 31, 2024

re-pushed per comments above, only minor tweaks

diff --git a/src/cmd/flux-exec.c b/src/cmd/flux-exec.c
index 44db06352..914d15abb 100644
--- a/src/cmd/flux-exec.c
+++ b/src/cmd/flux-exec.c
@@ -175,13 +175,12 @@ int subprocess_min_credits (void)
     return spcred->credits;
 }
 
-int subprocess_update_credits (flux_subprocess_t *p, int bytes, bool reorder)
+void subprocess_update_credits (flux_subprocess_t *p, int bytes, bool reorder)
 {
     struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits");
     spcred->credits += bytes;
     if (reorder)
         zlistx_reorder (subprocess_credits, spcred->handle, false);
-    return spcred->credits;
 }
 
 void subprocess_remove_credits (flux_subprocess_t *p)
@@ -282,8 +281,6 @@ void credit_cb (flux_subprocess_t *p, const char *stream, int bytes)
         int min_credits = subprocess_min_credits ();
         if (min_credits)
             flux_watcher_start (stdin_w);
-        else
-            flux_watcher_stop (stdin_w);
     }
 }
 
@@ -312,21 +309,19 @@ static void stdin_cb (flux_reactor_t *r,
                 if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0)
                     log_err_exit ("flux_subprocess_write");
                 if (stdin_enable_flow_control) {
-                    int updated_credits;
                     /* N.B. since we are subtracting the same number
                      * of credits from all subprocesses, the sorted
                      * order in the credits list should not change
                      */
-                    updated_credits = subprocess_update_credits (p,
-                                                                 -1*len,
-                                                                 false);
-                    /* if one subprocess has no more credits, stop stdin watcher */
-                    if (updated_credits == 0)
-                        flux_watcher_stop (stdin_w);
+                    subprocess_update_credits (p, -1*len, false);
                 }
             }
             p = zlistx_next (subprocesses);
         }
+        min_credits = subprocess_min_credits ();
+        /* if one subprocess has no more credits, stop stdin watcher */
+        if (min_credits == 0)
+            flux_watcher_stop (stdin_w);
     }
     else {
         p = zlistx_first (subprocesses);

@chu11 chu11 force-pushed the issue4572_flux_exec branch 2 times, most recently from 39041e4 to a21ae85 Compare October 31, 2024 21:47
@garlick
Copy link
Member

garlick commented Nov 1, 2024

LGTM, although this comment is not really applicable anymore and could be deleted

/* if one subprocess has no more credits, stop stdin watcher */

@chu11
Copy link
Member Author

chu11 commented Nov 1, 2024

thanks ... and i see I accidentally added a "fdsafsdf" commit while I was fixing the other issue, oops! I'll re-push and add MWP

Problem: One test in t0005-exec.t uses spaces instead of a tab
for indentation.

Fix tabbing for consistency.
Problem: In the near future additional code flux-exec will use the
zlistx_t data structure.  Some other code in flux-exec already uses
the zhashx_t data structure.  One lingering data structure still
uses the zlist_t data structure.

For consistency going forward, convert the lingering use of zlist_t
to zlistx_t.
@chu11
Copy link
Member Author

chu11 commented Nov 1, 2024

it appears the system coverage builder has failed on the 60m timeout 3 times in a row. Unclear to me if this is due to the extra tests, which are "beefier" with all of the 50 meg file copies. Could be slowing things down a lot there or eating up too much disk.

I'm going to temporarily cut those down to 5 megs, see if that makes a difference (locally running tests, 5 megs is still enough to trigger buffer overflow on a 4K buffer without flow control). Edit: locally even 1 meg is enough to trigger the overflow.

@grondo
Copy link
Contributor

grondo commented Nov 1, 2024

it appears the system coverage builder has failed on the 60m timeout 3 times in a row. Unclear to me if this is due to the extra tests, which are "beefier" with all of the 50 meg file copies. Could be slowing things down a lot there or eating up too much disk.

Only tests with ci=system are run in the system builder, so I don't think that test is even run there. Could be some other hang?

Also, just noticed the commit message t: cover flow control is a bit non-specific. Maybe t: cover libsubprocess flow control?

@chu11
Copy link
Member Author

chu11 commented Nov 1, 2024

Only tests with ci=system are run in the system builder, so I don't think that test is even run there. Could be some other hang?

Didn't think of that, should try locally in docker.

Also, just noticed the commit message t: cover flow control is a bit non-specific. Maybe t: cover libsubprocess flow control?

Sounds good

@chu11
Copy link
Member Author

chu11 commented Nov 2, 2024

hmm, ok atleast got a clue now (I'm not sure why the timeout message is before the last test message, possible output race)

  ./t2409-sdexec.t timed out after 300.0s
  ok 11 - sdexec stdout+stderr works
  PASS: t2409-sdexec.t 11 - sdexec stdout+stderr works

Update: The test immediately after test 11 in t2409-sdexec.t is sdexec stdin works, so it's a flux-exec issue for stdin with sdexec.

Update2: reproduces in docker, so that's step 1 :-)

@chu11
Copy link
Member Author

chu11 commented Nov 2, 2024

doh, took me like 3 seconds to realize the bug

diff --git a/src/cmd/flux-exec.c b/src/cmd/flux-exec.c
index 2b51d84ef..173f127bb 100644
--- a/src/cmd/flux-exec.c
+++ b/src/cmd/flux-exec.c
@@ -318,9 +318,11 @@ static void stdin_cb (flux_reactor_t *r,
             }
             p = zlistx_next (subprocesses);
         }
-        min_credits = subprocess_min_credits ();
-        if (min_credits == 0)
-            flux_watcher_stop (stdin_w);
+        if (stdin_enable_flow_control) {
+            min_credits = subprocess_min_credits ();
+            if (min_credits == 0)
+                flux_watcher_stop (stdin_w);
+        }
     }
     else {
         p = zlistx_first (subprocesses);

will push a fix. Also, I'll keep the 5M file size instead of the 50M file size, since it still reproduces reliably with 5M and makes things run faster.

chu11 and others added 3 commits November 2, 2024 07:38
Problem: libsubprocess now supports stdin flow control via credits,
but that is not used in flux-exec.

Support credits and flow control in flux-exec to avoid overflowing
the stdin buffer.

Fixes flux-framework#4572
Problem: sdexec does not yet support flow control

Disable flow control if the service is set to sdexec.
Add a --stdin-flow=on|off hidden option to force it either way.
Problem: There is no coverage to ensure that stdin flow control
truly works with flux-exec.

Add a test to t0005-exec.t that sends a 5 meg file while only using
a 4K buffer.  A buffer overflow would almost certainly happen
if flow control was not working.
@mergify mergify bot merged commit 718a9ba into flux-framework:master Nov 2, 2024
34 checks passed
Copy link

codecov bot commented Nov 2, 2024

Codecov Report

Attention: Patch coverage is 86.66667% with 12 lines in your changes missing coverage. Please review.

Project coverage is 83.60%. Comparing base (0fe32a4) to head (4aea6bb).
Report is 6 commits behind head on master.

Files with missing lines Patch % Lines
src/cmd/flux-exec.c 86.66% 12 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #6370      +/-   ##
==========================================
+ Coverage   83.57%   83.60%   +0.02%     
==========================================
  Files         524      524              
  Lines       87634    87700      +66     
==========================================
+ Hits        73244    73320      +76     
+ Misses      14390    14380      -10     
Files with missing lines Coverage Δ
src/cmd/flux-exec.c 78.24% <86.66%> (+1.37%) ⬆️

... and 16 files with indirect coverage changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants