Skip to content

Commit

Permalink
add kill_on_cancel kwarg
Browse files Browse the repository at this point in the history
  • Loading branch information
richardsheridan committed Oct 20, 2024
1 parent 1054789 commit 23f09c7
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 22 deletions.
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ Example
# This job could take far too long, make it cancellable!
nursery.start_soon(
functools.partial(
trio_parallel.run_sync, loop, 10 ** 20, cancellable=True
trio_parallel.run_sync, loop, 10 ** 20, kill_on_cancel=True
)
)
await trio.sleep(2)
# Only explicitly cancellable jobs are killed on cancel
# Only explicit kill_on_cancel jobs are terminated
nursery.cancel_scope.cancel()
print("Total runtime:", trio.current_time() - t0)
Expand Down
4 changes: 2 additions & 2 deletions docs/source/examples/async_parallel_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
async def to_process_map_as_completed(
sync_fn,
job_aiter,
cancellable=False,
kill_on_cancel=False,
limiter=None,
*,
task_status,
Expand All @@ -28,7 +28,7 @@ async def worker(job_item, task_status):
result = await trio_parallel.run_sync(
sync_fn,
*job_item,
cancellable=cancellable,
kill_on_cancel=kill_on_cancel,
limiter=trio.CapacityLimiter(1),
)
await send_chan.send(result)
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def amain():
await trio.to_thread.run_sync(bool)

with trio.move_on_after(0.5):
await trio_parallel.run_sync(hello_delayed_world, cancellable=True)
await trio_parallel.run_sync(hello_delayed_world, kill_on_cancel=True)

with trio.move_on_after(0.5):
await trio.to_thread.run_sync(hello_delayed_world, abandon_on_cancel=True)
Expand Down
8 changes: 4 additions & 4 deletions docs/source/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ Cancellation
~~~~~~~~~~~~

Cancellation of :func:`trio_parallel.run_sync` is modeled after
:func:`trio.to_thread.run_sync`, with a ``cancellable`` keyword argument that
:func:`trio.to_thread.run_sync`, with a ``kill_on_cancel`` keyword argument that
defaults to ``False``. Entry is an unconditional checkpoint, i.e. regardless of
the value of ``cancellable``. The only difference in behavior comes upon cancellation
when ``cancellable=True``. A Trio thread will be abandoned to run in the background
the value of ``kill_on_cancel``. The key difference in behavior comes upon cancellation
when ``kill_on_cancel=True``. A Trio thread will be abandoned to run in the background
while this package will kill the worker with ``SIGKILL``/``TerminateProcess``:

.. literalinclude:: examples/cancellation.py

We recommend to avoid using the cancellation feature
We recommend to avoid using the ``kill_on_cancel`` feature
if loss of intermediate results, writes to the filesystem, or shared memory writes
may leave the larger system in an incoherent state.

Expand Down
14 changes: 11 additions & 3 deletions trio_parallel/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ async def run_sync(
self,
sync_fn: Callable[..., T],
*args,
kill_on_cancel: bool = False,
cancellable: bool = False,
limiter: trio.CapacityLimiter = None,
) -> T:
Expand All @@ -162,7 +163,7 @@ async def run_sync(
async with limiter, self._lifetime:
self._worker_cache.prune()
while True:
with trio.CancelScope(shield=not cancellable):
with trio.CancelScope(shield=not (cancellable or kill_on_cancel)):
try:
worker = self._worker_cache.pop()
except IndexError:
Expand Down Expand Up @@ -379,6 +380,7 @@ async def open_worker_context(
async def run_sync(
sync_fn: Callable[..., T],
*args,
kill_on_cancel: bool = False,
cancellable: bool = False,
limiter: trio.CapacityLimiter = None,
) -> T:
Expand Down Expand Up @@ -406,10 +408,12 @@ async def run_sync(
limitations.
*args: Positional arguments to pass to sync_fn. If you need keyword
arguments, use :func:`functools.partial`.
cancellable (bool): Whether to allow cancellation of this operation.
kill_on_cancel (bool): Whether to allow cancellation of this operation.
Cancellation always involves abrupt termination of the worker process
with SIGKILL/TerminateProcess. To obtain correct semantics with CTRL+C,
SIGINT is ignored when raised in workers.
cancellable (bool): Alias for ``kill_on_cancel``. If both aliases are passed,
Python's ``or`` operator combines them.
limiter (None, or trio.CapacityLimiter):
An object used to limit the number of simultaneous processes. Most
commonly this will be a `~trio.CapacityLimiter`, but any async
Expand All @@ -425,7 +429,11 @@ async def run_sync(
"""
return await get_default_context().run_sync(
sync_fn, *args, cancellable=cancellable, limiter=limiter
sync_fn,
*args,
kill_on_cancel=kill_on_cancel,
cancellable=cancellable,
limiter=limiter,
)


Expand Down
20 changes: 10 additions & 10 deletions trio_parallel/_tests/test_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ async def test_run_sync(shutdown_cache):


async def test_entry_cancellation(manager, shutdown_cache):
async def child(cancellable):
async def child(kill_on_cancel):
nonlocal child_start, child_done
child_start = True
try:
return await run_sync(
_block_worker, block, worker_start, worker_done, cancellable=cancellable
_block_worker, block, worker_start, worker_done, kill_on_cancel=kill_on_cancel
)
finally:
child_done = True
Expand All @@ -75,12 +75,12 @@ async def child(cancellable):


async def test_kill_cancellation(manager, shutdown_cache):
async def child(cancellable):
async def child(kill_on_cancel):
nonlocal child_start, child_done
child_start = True
try:
return await run_sync(
_block_worker, block, worker_start, worker_done, cancellable=cancellable
_block_worker, block, worker_start, worker_done, kill_on_cancel=kill_on_cancel
)
finally:
child_done = True
Expand Down Expand Up @@ -109,11 +109,11 @@ async def child(cancellable):


async def test_uncancellable_cancellation(manager, shutdown_cache):
async def child(cancellable):
async def child(kill_on_cancel):
nonlocal child_start, child_done
child_start = True
await run_sync(
_block_worker, block, worker_start, worker_done, cancellable=cancellable
_block_worker, block, worker_start, worker_done, kill_on_cancel=kill_on_cancel
)
child_done = True

Expand Down Expand Up @@ -260,7 +260,7 @@ async def test_get_default_context_stats(): # noqa: ASYNC910
def test_sequential_runs(shutdown_cache):
async def run_with_timeout():
with trio.fail_after(20):
return await run_sync(os.getpid, cancellable=True)
return await run_sync(os.getpid, kill_on_cancel=True)

same_pid = trio.run(run_with_timeout) == trio.run(run_with_timeout)
assert same_pid
Expand All @@ -269,12 +269,12 @@ async def run_with_timeout():
async def test_concurrent_runs(shutdown_cache):
async def worker(i):
with trio.fail_after(20):
assert await run_sync(int, i, cancellable=True) == i
assert await run_sync(int, i, kill_on_cancel=True) == i
for _ in range(30):
assert await run_sync(int, i, cancellable=True) == i
assert await run_sync(int, i, kill_on_cancel=True) == i
with trio.move_on_after(0.5):
while True:
assert await run_sync(int, i, cancellable=True) == i
assert await run_sync(int, i, kill_on_cancel=True) == i

async with trio.open_nursery() as n:
for i in range(2):
Expand Down
2 changes: 2 additions & 0 deletions trio_parallel/_tests/test_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ async def test_cancellable(mock_context):
assert obsvd_deadline == float("inf")
_, _, obsvd_deadline = await run_sync(bool, cancellable=True)
assert obsvd_deadline == deadline
_, _, obsvd_deadline = await run_sync(bool, kill_on_cancel=True)
assert obsvd_deadline == deadline


async def test_cache_scope_args(mock_context):
Expand Down

0 comments on commit 23f09c7

Please sign in to comment.