Skip to content

Commit

Permalink
reap worker after rare race
Browse files Browse the repository at this point in the history
  • Loading branch information
richardsheridan committed Oct 20, 2024
1 parent 9f49d53 commit c3112a2
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CHEATSHEET.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ To run black


To update pinned requirements
------------
-----------------------------

* Run ``pip install pip-compile-multi`` if necessary.

Expand Down
5 changes: 4 additions & 1 deletion _trio_parallel_workers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
MAX_TIMEOUT = 24.0 * 60.0 * 60.0
ACK = b"\x06"

EAGER_CLEANUP = True # test_retire hook


def handle_job(job):
try:
Expand Down Expand Up @@ -94,7 +96,8 @@ def worker_behavior(recv_pipe, send_pipe, idle_timeout, init, retire):
else:
# Clean idle shutdown or retirement: close recv_pipe first to minimize
# subsequent race.
recv_pipe.close()
if EAGER_CLEANUP: # Set False to maximize race for test_retire
recv_pipe.close()
# Race condition: it is possible to sneak a write through in the main process
# between the while loop predicate and recv_pipe.close(). Naively, this would
# make a clean shutdown look like a broken worker. By sending a sentinel
Expand Down
4 changes: 4 additions & 0 deletions _trio_parallel_workers/_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ def _bad_retire_fn():
def _init_run_twice():
global _NUM_RUNS_LEFT
_NUM_RUNS_LEFT = 2
# increase coverage in worker_behavior cleanup and SpawnProcWorker.run_sync
import _trio_parallel_workers

_trio_parallel_workers.EAGER_CLEANUP = False


def _retire_run_twice():
Expand Down
9 changes: 8 additions & 1 deletion trio_parallel/_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async def run_sync(self, sync_fn: Callable, *args) -> Optional[Outcome]:
return None

try:
return loads(await self._receive_chan.receive())
result = loads(await self._receive_chan.receive())
except trio.EndOfChannel:
self._send_pipe.close() # edge case: free proc spinning on recv_bytes
with trio.CancelScope(shield=True):
Expand All @@ -124,6 +124,13 @@ async def run_sync(self, sync_fn: Callable, *args) -> Optional[Outcome]:
await self.wait() # noqa: ASYNC102
raise

if result is None:
# race in worker_behavior cleanup was triggered
with trio.CancelScope(shield=True):
await self.wait() # noqa: ASYNC102

return result

def is_alive(self):
# if the proc is alive, there is a race condition where it could be
# dying. This call reaps zombie children on Unix.
Expand Down

0 comments on commit c3112a2

Please sign in to comment.