Skip to content

Commit

Permalink
avoid injecting an exit checkpoint with sync cm channel interface
Browse files Browse the repository at this point in the history
  • Loading branch information
richardsheridan committed Oct 5, 2021
1 parent 5e45cf7 commit 912770c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 24 deletions.
49 changes: 25 additions & 24 deletions trio_parallel/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ def current_default_worker_limiter():
``WorkerType.FORK`` is available on POSIX for experimentation, but not recommended."""


class NullClonableAsyncContext:
async def __aenter__(self):
class NullClonableContext:
def __enter__(self):
pass

async def __aexit__(self, exc_type, exc_val, exc_tb):
def __exit__(self, exc_type, exc_val, exc_tb):
pass

def clone(self):
Expand Down Expand Up @@ -105,7 +105,7 @@ class WorkerContext(metaclass=NoPublicConstructor):
_worker_class: Type[AbstractWorker] = attr.ib(repr=False, init=False)
_worker_cache: WorkerCache = attr.ib(repr=False, init=False)
# These are externally initialized in open_worker_context
_sem_chan: Any = attr.ib(default=NullClonableAsyncContext(), repr=False, init=False)
_sem_chan: Any = attr.ib(default=NullClonableContext(), repr=False, init=False)
_wait_chan: Any = attr.ib(default=None, repr=False, init=False)

def __attrs_post_init__(self):
Expand All @@ -126,26 +126,27 @@ async def run_sync(self, sync_fn, *args, cancellable=False, limiter=None):
if limiter is None:
limiter = current_default_worker_limiter()

sem = self._sem_chan.clone()
async with sem, limiter:
self._worker_cache.prune()
while True:
try:
worker = self._worker_cache.pop()
except IndexError:
worker = self._worker_class(
self.idle_timeout, self.init, self.retire
)

with trio.CancelScope(shield=not cancellable):
result = await worker.run_sync(sync_fn, *args)

if result is None:
# Prevent uninterruptible loop when KI-protected & cancellable=False
await trio.lowlevel.checkpoint_if_cancelled()
else:
self._worker_cache.append(worker)
return result.unwrap()
with self._sem_chan.clone():
async with limiter:
self._worker_cache.prune()
while True:
try:
worker = self._worker_cache.pop()
except IndexError:
worker = self._worker_class(
self.idle_timeout, self.init, self.retire
)

with trio.CancelScope(shield=not cancellable):
result = await worker.run_sync(sync_fn, *args)

if result is None:
# Prevent uninterruptible loop
# when KI-protected & cancellable=False
await trio.lowlevel.checkpoint_if_cancelled()
else:
self._worker_cache.append(worker)
return result.unwrap()

async def _aclose(self):
"""Wait for all workers to become idle, then disable the context and shut down
Expand Down
2 changes: 2 additions & 0 deletions trio_parallel/_tests/test_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ async def child(cancellable):
return await run_sync(
_block_worker, block, worker_start, worker_done, cancellable=cancellable
)
except trio.Cancelled:
assert False
finally:
child_done = True

Expand Down

0 comments on commit 912770c

Please sign in to comment.