Skip to content

Commit

Permalink
replace channel abuse with a lifetime manager class
Browse files Browse the repository at this point in the history
  • Loading branch information
richardsheridan committed Oct 8, 2021
1 parent af5dd7c commit 21ae1fd
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 47 deletions.
98 changes: 57 additions & 41 deletions trio_parallel/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,39 @@ def current_default_worker_limiter():
``WorkerType.FORK`` is available on POSIX for experimentation, but not recommended."""


class NullClonableContext:
def __enter__(self):
pass

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def clone(self):
return self
@attr.s(auto_attribs=True, slots=True, eq=False)
class ContextLifetimeManager:
running: int = 0
closed: bool = False
task: Any = None

async def __aenter__(self):
# only async to save indentation
if self.closed:
import trio

raise trio.ClosedResourceError
self.running += 1

async def __aexit__(self, exc_type, exc_val, exc_tb):
# only async to save indentation
self.running -= 1
if self.running == 0 and self.task:
import trio

trio.lowlevel.reschedule(self.task)

async def wait_and_close(self):
assert not self.closed
self.closed = True
if self.running != 0:
import trio

self.task = trio.lowlevel.current_task()
await trio.lowlevel.wait_task_rescheduled(
lambda _: trio.lowlevel.Abort.FAILED # pragma: no cover
)
self.task = None


def check_non_negative(instance, attribute, value):
Expand Down Expand Up @@ -104,9 +128,9 @@ class WorkerContext(metaclass=NoPublicConstructor):
)
_worker_class: Type[AbstractWorker] = attr.ib(repr=False, init=False)
_worker_cache: WorkerCache = attr.ib(repr=False, init=False)
# These are externally initialized in open_worker_context
_sem_chan: Any = attr.ib(default=NullClonableContext(), repr=False, init=False)
_wait_chan: Any = attr.ib(default=None, repr=False, init=False)
_lifetime: ContextLifetimeManager = attr.ib(
factory=ContextLifetimeManager, repr=False, init=False
)

def __attrs_post_init__(self):
worker_class, worker_cache_class = WORKER_MAP[self.worker_type]
Expand All @@ -126,41 +150,36 @@ async def run_sync(self, sync_fn, *args, cancellable=False, limiter=None):
if limiter is None:
limiter = current_default_worker_limiter()

with self._sem_chan.clone():
async with limiter:
self._worker_cache.prune()
while True:
try:
worker = self._worker_cache.pop()
except IndexError:
worker = self._worker_class(
self.idle_timeout, self.init, self.retire
)

with trio.CancelScope(shield=not cancellable):
result = await worker.run_sync(sync_fn, *args)

if result is None:
# Prevent uninterruptible loop
# when KI-protected & cancellable=False
await trio.lowlevel.checkpoint_if_cancelled()
else:
self._worker_cache.append(worker)
return result.unwrap()
async with limiter, self._lifetime:
self._worker_cache.prune()
while True:
try:
worker = self._worker_cache.pop()
except IndexError:
worker = self._worker_class(
self.idle_timeout, self.init, self.retire
)

with trio.CancelScope(shield=not cancellable):
result = await worker.run_sync(sync_fn, *args)

if result is None:
# Prevent uninterruptible loop
# when KI-protected & cancellable=False
await trio.lowlevel.checkpoint_if_cancelled()
else:
self._worker_cache.append(worker)
return result.unwrap()

async def _aclose(self):
"""Wait for all workers to become idle, then disable the context and shut down
all workers.
Subsequent calls to :meth:`run_sync` will raise `trio.ClosedResourceError`."""
self._sem_chan.close()
import trio

with trio.CancelScope(shield=True):
try:
await self._wait_chan.receive()
except trio.EndOfChannel:
pass
await self._lifetime.wait_and_close()
await trio.to_thread.run_sync(
self._worker_cache.shutdown, self.grace_period
)
Expand Down Expand Up @@ -226,10 +245,7 @@ async def open_worker_context(
:func:`WorkerContext.run_sync` call.
"""
import trio

ctx = WorkerContext._create(idle_timeout, init, retire, grace_period, worker_type)
ctx._sem_chan, ctx._wait_chan = trio.open_memory_channel(0)
try:
yield ctx
finally:
Expand Down
14 changes: 8 additions & 6 deletions trio_parallel/_tests/test_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,22 @@ async def test_aclose():
async def test_context_waits(manager):
# TODO: convert this to a collaboration test
finished = False
ev = manager.Event()
block = manager.Event()
start = manager.Event()
done = manager.Event()

async def child(task_status):
async def child():
nonlocal finished
task_status.started()
try:
await ctx.run_sync(ev.wait)
await ctx.run_sync(_block_worker, block, start, done)
finally:
finished = True

async with trio.open_nursery() as nursery:
async with open_worker_context() as ctx:
await nursery.start(child)
ev.set()
nursery.start_soon(child)
await trio.to_thread.run_sync(start.wait, cancellable=True)
block.set()
assert finished


Expand Down

0 comments on commit 21ae1fd

Please sign in to comment.