Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

not ok 45 - stdin flow control works (all ranks, one rank will exit early) #6542

Open
grondo opened this issue Jan 7, 2025 · 11 comments
Open
Assignees

Comments

@grondo
Copy link
Contributor

grondo commented Jan 7, 2025

I've been seeing this test fail often in CI (maybe once every 3 runs of the entire CI workflow)

expecting success: 
  	cat 5Mfile | flux exec -r 0-3 --setopt=stdin_BUFSIZE=4096 ./stdin2file &&
  	test_must_fail ls cpy.0 &&
  	cmp 5Mfile cpy.1 &&
  	cmp 5Mfile cpy.2 &&
  	cmp 5Mfile cpy.3 &&
  	rm cpy.*
  
  88+0 records in
  88+0 records out
  45056 bytes (45 kB, 44 KiB) copied, 0.000383387 s, 118 MB/s
  88+0 records in
  88+0 records out
  45056 bytes (45 kB, 44 KiB) copied, 0.00143866 s, 31.3 MB/s
  88+0 records in
  flux-exec: flux_subprocess_write: Broken pipe
  not ok 45 - stdin flow control works (all ranks, one rank will exit early)
@chu11
Copy link
Member

chu11 commented Jan 8, 2025

I hit this as well recently and what's interesting is that EPIPE is returned from flux_subprocess_write() (for remote subprocesses) when

        if (p->state != FLUX_SUBPROCESS_INIT                                                                                                
            && p->state != FLUX_SUBPROCESS_RUNNING) {                                                                                       
            errno = EPIPE;                                                                                                                  
            return -1;                                                                                                                      
        } 

suggesting perhaps the remote subprocess already closed/exited when the write was about to occur?

Lemme take a look. With the recent credits change, perhaps there is a small racy scenario I didn't consider in flux-exec

Edit: oh and you could EPIPE too if the stream is already closed

@chu11 chu11 self-assigned this Jan 8, 2025
@chu11
Copy link
Member

chu11 commented Jan 8, 2025

hmmmm in a bit of a surprise, in flux-exec's stdin callback

    if (lenp) {                                                                                                                             
        p = zlistx_first (subprocesses);                                                                                                    
        while (p) {                                                                                                                         
            if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT                                                                           
                || flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {                                                                  
                if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0)                                                              
                    log_err_exit ("flux_subprocess_write");                        

we seem to already be protecting against the EPIPE case .... huh ... .

@chu11
Copy link
Member

chu11 commented Jan 8, 2025

looking through code, something racy isn't immediately coming to mind. The check for FLUX_SUBPROCESS_INIT and FLUX_SUBPROCESS_RUNNING in stdin_cb should protect against the EPIPE. I was wondering if somehow the stream was getting closed early, but that doesn't seem to be the case either.

Reasonable efforts couldn't get this to reproduce locally. gonna see if I can get debug in CI and gimme a clue.

@chu11
Copy link
Member

chu11 commented Jan 8, 2025

also as an aside, i noticed in the above output

  88+0 records in
  88+0 records out
  45056 bytes (45 kB, 44 KiB) copied, 0.000383387 s, 118 MB/s
  88+0 records in
  88+0 records out
  45056 bytes (45 kB, 44 KiB) copied, 0.00143866 s, 31.3 MB/s
  88+0 records in
  flux-exec: flux_subprocess_write: Broken pipe

The input is supposed to be 5 megs, that 45056 bytes is really odd ...

@chu11
Copy link
Member

chu11 commented Jan 9, 2025

I'm going under the assumption that somehow the streams are closed and then attempted to be written to again. This would explain the output above where we (presumably) sending < 5M of data and some processes appear to be exiting.

This could theoretically happen if the stdin_cb has an accidental min_credits == 0. However, I cannot see a path in which this happens.

Trying to prove this is the case via debug in #6544, but have yet to reproduce this again after many CI runs.

@chu11
Copy link
Member

chu11 commented Jan 9, 2025

finally hit the issue again in CI, it confirmed my suspicions

expecting success: 
  	cat 5Mfile | flux exec -r 0-3 --setopt=stdin_BUFSIZE=4096 ./stdin2file &&
  	test_must_fail ls cpy.0 &&
  	cmp 5Mfile cpy.1 &&
  	cmp 5Mfile cpy.2 &&
  	cmp 5Mfile cpy.3 &&
  	rm cpy.*
  
  flux-exec: flux_subprocess_write rank=1 min_credits=4096 lenp=4096 closed_stdins=1: Broken pipe
  not ok 45 - stdin flow control works (all ranks, one rank will exit early)

the closed_stdins flag means that the stdin streams were closed and yet we're trying to write to them again.

Still a little perplexed how this could be happening. I'm wondering if it's possible the stdin_cb could be called even when it is stopped, b/c of when the stopping occurs within the libev loop. IIRC, if you're in the current loop and stop a watcher, that watcher may still run again.

@garlick
Copy link
Member

garlick commented Jan 9, 2025

IIRC, if you're in the current loop and stop a watcher, that watcher may still run again.

That's a new one on me if so! I'm pretty sure a stopped watcher cannot run.

@chu11
Copy link
Member

chu11 commented Jan 10, 2025

That's a new one on me if so! I'm pretty sure a stopped watcher cannot run.

It's possible I'm confusing myself. Maybe I'm getting confused w/ priority stuff that we've done before.

Going to the libev documentation, under ev_run we have the "mega loop" description.

   - Queue and call all prepare watchers.

and later

 - Queue all check watchers.
   - Call all queued watchers in reverse order

what if a watcher is stopped after it has already been queued? does it get booted from the queue? It would think so? hope so?

@chu11
Copy link
Member

chu11 commented Jan 10, 2025

what if a watcher is stopped after it has already been queued? does it get booted from the queue? It would think so? hope so?

Ok, I'm probably confusing myself over something from before. I wrote a quick experiment and if you stop a watcher when it is queued up, it will get properly dequeued. So all is good in the world.

@garlick
Copy link
Member

garlick commented Jan 10, 2025

Whew 👍

@chu11
Copy link
Member

chu11 commented Jan 10, 2025

Ok, I think I have a theory on what is happening. I don't have definitive "proof", but think this race exists.

High level code info as reminder

  • subprocess_credits is a list of active flux subprocesses, sorted based on their credits (lower towards front of list)
  • subprocess_update_credits() will update and if necessary reorder
  • subprocess_min_credits() gets the head of the list, which should be the min credits

I think the race exists here

    if (lenp) {
        p = zlistx_first (subprocesses);
        while (p) {
            if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT
                || flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
                if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0) {
                    log_err_exit ("flux_subprocess_write rank=%d min_credits=%d lenp=%d closed_stdins=%d",
                                  flux_subprocess_rank (p),
                                  min_credits,
                                  lenp,
                                  closed_stdins);
                }
                if (stdin_enable_flow_control) {
                    /* N.B. since we are subtracting the same number
                     * of credits from all subprocesses, the sorted
                     * order in the credits list should not change
                     */
                    subprocess_update_credits (p, -1*len, false);
                }
            }
            p = zlistx_next (subprocesses);
        }
        if (stdin_enable_flow_control) {
            min_credits = subprocess_min_credits ();
            if (min_credits == 0)
                flux_watcher_stop (stdin_w);
        }
    }

The subprocess_update_credits() call does not re-order the subprocess credits list b/c (as it says)

  /* N.B. since we are subtracting the same number
   * of credits from all subprocesses, the sorted
   * order in the credits list should not change
   */

But this assumes we actually wrote the same number of stdin bytes to each subprocess. But as can be seen above we check if a subprocess is INIT/RUNNING before writing to it.

What if some subprocess was (lets say) EXITED, then the above invariant would not hold.

Under normal operations, the subprocess_credits list removes exited/failed subprocesses from its list. So presumably the call to subprocess_min_credits() should always return correct "min credits".

BUT I think this is racy. It requires that the subprocess exit notify flux-exec via the state_cb() callback first. If that notification does not happen before stdin_cb() is called, we can reach a situation in which subprocess_min_credits() may not report the right min credits.

And that can explain getting us into the situation we're now seeing:

  • subprocess_credits list incorrectly has a non-zero subprocess at the front of the list (i.e. "min" is non zero when it should be zero)
  • stdin_w stays active when it actually shouldn't be
  • next call to state_cb() updates the subprocess_credits list to remove an exited process
  • next call to stdin_cb() sees min_credits (possibly) as 0, and then closes the stdin streams
  • next time we call flux_subprocess_write() we get EPIPE

There are obvious fixes that would not be decently performing (i.e. subprocess_min_credits() just scans whole list everytime for the min). But pondering a fix that would be still be decently fast.

chu11 added a commit to chu11/flux-core that referenced this issue Jan 11, 2025
Problem: When a subprocess exits / fails, it is removed from the
"subprocess credits" list.  However, it is possible this exit / failure
is not seen yet before the subprocess credits is updated during a stdin
callback.  This can lead to the subprocess credits list calculating an
an invalid "min credits".  This can lead to the stdin callback being
started when it should not be.

Solution: If the number of "active" subprocesses does not equal the
number of subprocesses on the credits list, remove exited/failed credits
from the credits list in stdin_cb().

Fixes flux-framework#6542
chu11 added a commit to chu11/flux-core that referenced this issue Jan 11, 2025
Problem: When a subprocess exits / fails, it is removed from the
"subprocess credits" list.  However, it is possible this exit / failure
is not seen yet before the subprocess credits is updated during a stdin
callback.  This can lead to the subprocess credits list calculating an
an invalid "min credits".  This can lead to the stdin callback being
started when it should not be.

Solution: If the number of "active" subprocesses does not equal the
number of subprocesses on the credits list, remove exited/failed credits
from the credits list in stdin_cb().

Fixes flux-framework#6542
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants