From 8068ef87a68e8665a62e338d1a2c32611e487d07 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sun, 14 Jul 2024 21:48:36 +0100 Subject: [PATCH 01/19] Add Scheduler.shield() and Scheduler.wait_and_close() --- aiojobs/_scheduler.py | 77 +++++++++++++++++++++++++++++++++++++++++-- aiojobs/aiohttp.py | 9 ++++- 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index d7d08cef..ccf23f4a 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -1,6 +1,8 @@ import asyncio +import sys from typing import ( Any, + Awaitable, Callable, Collection, Coroutine, @@ -9,14 +11,32 @@ Optional, Set, TypeVar, + Union ) from ._job import Job +if sys.version_info >= (3, 11): + from asyncio import timeout as asyncio_timeout +else: + from async_timeout import timeout as asyncio_timeout + _T = TypeVar("_T") +_FutureLike = Union[asyncio.Future[_T], Awaitable[_T]] ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None] +def _get_loop(fut): + # https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300 + try: + get_loop = fut.get_loop + except AttributeError: + pass + else: + return get_loop() + return fut._loop + + class Scheduler(Collection[Job[object]]): def __init__( self, @@ -33,6 +53,7 @@ def __init__( ) self._jobs: Set[Job[object]] = set() + self._shields: Set[asyncio.Future[object]] = set() self._close_timeout = close_timeout self._limit = limit self._exception_handler = exception_handler @@ -104,19 +125,71 @@ async def spawn( self._jobs.add(job) return job + def shield(self, arg: _FutureLike[_T]) -> asyncio.Future[_T]: + inner = asyncio.ensure_future(arg) + if inner.done(): + return inner + + # This function is a copy of asyncio.shield(), except for the addition of + # the below 2 lines. + self._shields.add(inner) + inner.add_done_callback(self._shields.discard) + + loop = _get_loop(inner) + outer = loop.create_future() + + def _inner_done_callback(inner): + if outer.cancelled(): + if not inner.cancelled(): + inner.exception() + return + + if inner.cancelled(): + outer.cancel() + else: + exc = inner.exception() + if exc is not None: + outer.set_exception(exc) + else: + outer.set_result(inner.result()) + + + def _outer_done_callback(outer): + if not inner.done(): + inner.remove_done_callback(_inner_done_callback) + + inner.add_done_callback(_inner_done_callback) + outer.add_done_callback(_outer_done_callback) + return outer + + async def wait_and_close(self, timeout: float = 60) -> None: + async with asyncio_timeout(timeout): + while self._jobs or self._shields: + await asyncio.gather( + *(job.wait() for job in self._jobs), + *self._shields, + return_exceptions=True + ) + await self.close() + async def close(self) -> None: if self._closed: return self._closed = True # prevent adding new jobs jobs = self._jobs - if jobs: + if jobs or self._shields: # cleanup pending queue # all job will be started on closing while not self._pending.empty(): self._pending.get_nowait() + + for f in self._shields: + f.cancel() + await asyncio.gather( - *[job._close(self._close_timeout) for job in jobs], + *(job._close(self._close_timeout) for job in jobs), + *(asyncio.wait_for(f, self._close_timeout) for f in self._shields), return_exceptions=True, ) self._jobs.clear() diff --git a/aiojobs/aiohttp.py b/aiojobs/aiohttp.py index 9d7eed1e..4597de17 100644 --- a/aiojobs/aiohttp.py +++ b/aiojobs/aiohttp.py @@ -1,6 +1,8 @@ +import asyncio from functools import wraps from typing import ( Any, + Awaitable, AsyncIterator, Awaitable, Callable, @@ -18,6 +20,7 @@ __all__ = ("setup", "spawn", "get_scheduler", "get_scheduler_from_app", "atomic") _T = TypeVar("_T") +_FutureLike = Union[asyncio.Future[_T], Awaitable[_T]] _RequestView = TypeVar("_RequestView", bound=Union[web.Request, web.View]) @@ -43,6 +46,10 @@ async def spawn(request: web.Request, coro: Coroutine[object, object, _T]) -> Jo return await get_scheduler(request).spawn(coro) +def shield(arg: _FutureLike[_T]) -> asyncio.Future[_T]: + return get_scheduler(request).shield(arg) + + def atomic( coro: Callable[[_RequestView], Coroutine[object, object, _T]] ) -> Callable[[_RequestView], Awaitable[_T]]: @@ -65,6 +72,6 @@ def setup(app: web.Application, **kwargs: Any) -> None: async def cleanup_context(app: web.Application) -> AsyncIterator[None]: app[AIOJOBS_SCHEDULER] = scheduler = Scheduler(**kwargs) yield - await scheduler.close() + await scheduler.wait_and_close() app.cleanup_ctx.append(cleanup_context) From f380aa66c08f9a0176f04f42f3fe34629a1ad4ec Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 14 Jul 2024 20:50:39 +0000 Subject: [PATCH 02/19] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- aiojobs/_scheduler.py | 5 ++--- aiojobs/aiohttp.py | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index ccf23f4a..509fcd66 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -11,7 +11,7 @@ Optional, Set, TypeVar, - Union + Union, ) from ._job import Job @@ -153,7 +153,6 @@ def _inner_done_callback(inner): else: outer.set_result(inner.result()) - def _outer_done_callback(outer): if not inner.done(): inner.remove_done_callback(_inner_done_callback) @@ -168,7 +167,7 @@ async def wait_and_close(self, timeout: float = 60) -> None: await asyncio.gather( *(job.wait() for job in self._jobs), *self._shields, - return_exceptions=True + return_exceptions=True, ) await self.close() diff --git a/aiojobs/aiohttp.py b/aiojobs/aiohttp.py index 4597de17..4ca2158d 100644 --- a/aiojobs/aiohttp.py +++ b/aiojobs/aiohttp.py @@ -2,7 +2,6 @@ from functools import wraps from typing import ( Any, - Awaitable, AsyncIterator, Awaitable, Callable, From 19d0585579fb01262527f3bdd742d1e65c5751f1 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Mon, 22 Jul 2024 20:57:03 +0100 Subject: [PATCH 03/19] Update _scheduler.py --- aiojobs/_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index 509fcd66..38b42f5e 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -22,7 +22,7 @@ from async_timeout import timeout as asyncio_timeout _T = TypeVar("_T") -_FutureLike = Union[asyncio.Future[_T], Awaitable[_T]] +_FutureLike = Union["asyncio.Future[_T]", Awaitable[_T]] ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None] From 3b7d611de53c9e5e3c1dfffc5747cda164736ba8 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Mon, 22 Jul 2024 20:59:21 +0100 Subject: [PATCH 04/19] Update aiohttp.py --- aiojobs/aiohttp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aiojobs/aiohttp.py b/aiojobs/aiohttp.py index 4ca2158d..f0c82501 100644 --- a/aiojobs/aiohttp.py +++ b/aiojobs/aiohttp.py @@ -19,7 +19,7 @@ __all__ = ("setup", "spawn", "get_scheduler", "get_scheduler_from_app", "atomic") _T = TypeVar("_T") -_FutureLike = Union[asyncio.Future[_T], Awaitable[_T]] +_FutureLike = Union["asyncio.Future[_T]", Awaitable[_T]] _RequestView = TypeVar("_RequestView", bound=Union[web.Request, web.View]) @@ -45,7 +45,7 @@ async def spawn(request: web.Request, coro: Coroutine[object, object, _T]) -> Jo return await get_scheduler(request).spawn(coro) -def shield(arg: _FutureLike[_T]) -> asyncio.Future[_T]: +def shield(request: web.Request, arg: _FutureLike[_T]) -> "asyncio.Future[_T]": return get_scheduler(request).shield(arg) From 3df7f40e21d77ee1a6dca43c987052dcebcdd3c6 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Mon, 22 Jul 2024 21:00:30 +0100 Subject: [PATCH 05/19] Update _scheduler.py --- aiojobs/_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index 38b42f5e..4d720a4b 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -125,7 +125,7 @@ async def spawn( self._jobs.add(job) return job - def shield(self, arg: _FutureLike[_T]) -> asyncio.Future[_T]: + def shield(self, arg: _FutureLike[_T]) -> "asyncio.Future[_T]": inner = asyncio.ensure_future(arg) if inner.done(): return inner From 1f327f297df9d3bfdbcf6f8ab51edeac4b848845 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Mon, 22 Jul 2024 21:15:18 +0100 Subject: [PATCH 06/19] Update _scheduler.py --- aiojobs/_scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index 4d720a4b..491c545a 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -26,7 +26,7 @@ ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None] -def _get_loop(fut): +def _get_loop(fut: "asyncio.Future[object]") -> asyncio.AbstractEventLoop: # https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300 try: get_loop = fut.get_loop @@ -53,7 +53,7 @@ def __init__( ) self._jobs: Set[Job[object]] = set() - self._shields: Set[asyncio.Future[object]] = set() + self._shields: Set[asyncio.Task[object]] = set() self._close_timeout = close_timeout self._limit = limit self._exception_handler = exception_handler @@ -138,7 +138,7 @@ def shield(self, arg: _FutureLike[_T]) -> "asyncio.Future[_T]": loop = _get_loop(inner) outer = loop.create_future() - def _inner_done_callback(inner): + def _inner_done_callback(inner: asyncio.Task[object]) -> None: if outer.cancelled(): if not inner.cancelled(): inner.exception() @@ -153,7 +153,7 @@ def _inner_done_callback(inner): else: outer.set_result(inner.result()) - def _outer_done_callback(outer): + def _outer_done_callback(outer: "asyncio.Future[object]") -> None: if not inner.done(): inner.remove_done_callback(_inner_done_callback) From c140e3cb537987f9dcf7109daea10504ee3e8c3e Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Mon, 22 Jul 2024 21:20:29 +0100 Subject: [PATCH 07/19] Update _scheduler.py --- aiojobs/_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index 491c545a..d4a9d330 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -26,7 +26,7 @@ ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None] -def _get_loop(fut: "asyncio.Future[object]") -> asyncio.AbstractEventLoop: +def _get_loop(fut: asyncio.Task[object]) -> asyncio.AbstractEventLoop: # https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300 try: get_loop = fut.get_loop From 9ee2f8d38f016b82a13f79fb65293a308afa854c Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Mon, 22 Jul 2024 21:21:39 +0100 Subject: [PATCH 08/19] Update _scheduler.py --- aiojobs/_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index d9cc5459..f6979b59 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -26,7 +26,7 @@ ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None] -def _get_loop(fut: asyncio.Task[object]) -> asyncio.AbstractEventLoop: +def _get_loop(fut: "asyncio.Task[object]") -> asyncio.AbstractEventLoop: # https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300 try: get_loop = fut.get_loop From e69f1ff302d6ed1ab04cae3d66ed0032893438c7 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Mon, 22 Jul 2024 22:33:15 +0100 Subject: [PATCH 09/19] Add tests --- aiojobs/_scheduler.py | 16 +++--- tests/test_aiohttp.py | 10 +++- tests/test_scheduler.py | 112 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 130 insertions(+), 8 deletions(-) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index f6979b59..e398339a 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -1,5 +1,6 @@ import asyncio import sys +from contextlib import suppress from typing import ( Any, Awaitable, @@ -162,13 +163,14 @@ def _outer_done_callback(outer: "asyncio.Future[object]") -> None: return outer async def wait_and_close(self, timeout: float = 60) -> None: - async with asyncio_timeout(timeout): - while self._jobs or self._shields: - await asyncio.gather( - *(job.wait() for job in self._jobs), - *self._shields, - return_exceptions=True, - ) + with suppress(asyncio.TimeoutError): + async with asyncio_timeout(timeout): + while self._jobs or self._shields: + await asyncio.gather( + *(job.wait() for job in self._jobs), + *self._shields, + return_exceptions=True, + ) await self.close() async def close(self) -> None: diff --git a/tests/test_aiohttp.py b/tests/test_aiohttp.py index deb1b3d6..15d40264 100644 --- a/tests/test_aiohttp.py +++ b/tests/test_aiohttp.py @@ -12,6 +12,7 @@ get_scheduler_from_app, get_scheduler_from_request, setup as aiojobs_setup, + shield, spawn, ) @@ -23,6 +24,10 @@ async def test_plugin(aiohttp_client: _Client) -> None: job = None + async def shielded() -> str: + await asyncio.sleep(0) + return "TEST" + async def coro() -> None: await asyncio.sleep(10) @@ -31,7 +36,9 @@ async def handler(request: web.Request) -> web.Response: job = await spawn(request, coro()) assert not job.closed - return web.Response() + + res = await shield(request, shielded()) + return web.Response(text=res) app = web.Application() app.router.add_get("/", handler) @@ -40,6 +47,7 @@ async def handler(request: web.Request) -> web.Response: client = await aiohttp_client(app) resp = await client.get("/") assert resp.status == 200 + assert await resp.text() == "TEST" assert job is not None assert job.active diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 5e2a3a1c..57297f4d 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -426,6 +426,118 @@ async def f() -> None: del coro +async def test_shield(scheduler: Scheduler) -> None: + async def coro() -> str: + await asyncio.sleep(0) + return "TEST" + + result = await scheduler.shield(coro()) + assert result == "TEST" + assert len(scheduler._shields) == 0 + + +async def test_shielded_task_continues(scheduler: Scheduler) -> None: + completed = False + + async def inner() -> None: + nonlocal completed + await asyncio.sleep(0.1) + completed = True + + async def outer() -> None: + await scheduler.shield(inner()) + + t = asyncio.create_task(outer()) + await asyncio.sleep(0) + t.cancel() + with pytest.raises(asyncio.CancelledError): + await t + + assert not completed + assert len(scheduler._shields) == 1 + await asyncio.sleep(0.11) + assert completed + assert len(scheduler._shields) == 0 + + +async def test_wait_and_close(scheduler: Scheduler) -> None: + inner_done = outer_done = False + + async def inner() -> None: + nonlocal inner_done + await asyncio.sleep(0.1) + inner_done = True + + async def outer() -> None: + nonlocal outer_done + await scheduler.shield(inner()) + await asyncio.sleep(0.1) + outer_done = True + + await scheduler.spawn(outer()) + await asyncio.sleep(0) + assert not inner_done and not outer_done + assert len(scheduler._shields) == 1 + assert len(scheduler._jobs) == 1 + + await scheduler.wait_and_close() + assert inner_done and outer_done + assert len(scheduler._shields) == 0 + assert len(scheduler._jobs) == 0 + assert scheduler.closed + + +async def test_wait_and_close_timeout(scheduler: Scheduler) -> None: + inner_done = outer_cancelled = False + + async def inner() -> None: + nonlocal inner_done + await asyncio.sleep(0.1) + inner_done = True + + async def outer() -> None: + nonlocal outer_cancelled + await scheduler.shield(inner()) + try: + await asyncio.sleep(0.5) + except asyncio.CancelledError: + outer_cancelled = True + + await scheduler.spawn(outer()) + await asyncio.sleep(0) + assert not inner_done and not outer_cancelled + assert len(scheduler._shields) == 1 + assert len(scheduler._jobs) == 1 + + await scheduler.wait_and_close(0.2) + assert inner_done and outer_cancelled + assert len(scheduler._shields) == 0 + assert len(scheduler._jobs) == 0 + assert scheduler.closed + + +async def test_wait_and_close_spawn(scheduler: Scheduler) -> None: + another_spawned = another_done = False + + async def another() -> None: + nonlocal another_done + await scheduler.shield(asyncio.sleep(0.1)) + another_done = True + + async def coro() -> None: + nonlocal another_spawned + await asyncio.sleep(0.1) + another_spawned = True + await scheduler.spawn(another()) + + await scheduler.spawn(coro()) + await asyncio.sleep(0) + + assert not another_spawned and not another_done + await scheduler.wait_and_close() + assert another_spawned and another_done + + def test_scheduler_must_be_created_within_running_loop() -> None: with pytest.raises(RuntimeError) as exc_info: Scheduler(close_timeout=0, limit=0, pending_limit=0, exception_handler=None) From a867207118ac95544afd7d37f4529a7ca7d1b1c1 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Mon, 22 Jul 2024 22:46:49 +0100 Subject: [PATCH 10/19] Type ignores --- tests/test_scheduler.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 57297f4d..c234d929 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -457,7 +457,7 @@ async def outer() -> None: assert len(scheduler._shields) == 1 await asyncio.sleep(0.11) assert completed - assert len(scheduler._shields) == 0 + assert len(scheduler._shields) == 0 # type: ignore[unreachable] async def test_wait_and_close(scheduler: Scheduler) -> None: @@ -481,8 +481,8 @@ async def outer() -> None: assert len(scheduler._jobs) == 1 await scheduler.wait_and_close() - assert inner_done and outer_done - assert len(scheduler._shields) == 0 + assert inner_done and outer_done # type: ignore[unreachable] + assert len(scheduler._shields) == 0 # type: ignore[unreachable] assert len(scheduler._jobs) == 0 assert scheduler.closed @@ -510,8 +510,8 @@ async def outer() -> None: assert len(scheduler._jobs) == 1 await scheduler.wait_and_close(0.2) - assert inner_done and outer_cancelled - assert len(scheduler._shields) == 0 + assert inner_done and outer_cancelled # type: ignore[unreachable] + assert len(scheduler._shields) == 0 # type: ignore[unreachable] assert len(scheduler._jobs) == 0 assert scheduler.closed @@ -535,7 +535,7 @@ async def coro() -> None: assert not another_spawned and not another_done await scheduler.wait_and_close() - assert another_spawned and another_done + assert another_spawned and another_done # type: ignore[unreachable] def test_scheduler_must_be_created_within_running_loop() -> None: From a145d4dc1741b58c79ecb305ab1600d16b7239e9 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Mon, 22 Jul 2024 22:56:21 +0100 Subject: [PATCH 11/19] Another test --- tests/test_scheduler.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index c234d929..b365c8f4 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -516,6 +516,37 @@ async def outer() -> None: assert scheduler.closed +async def test_wait_and_close_timeout_shield(scheduler: Scheduler) -> None: + inner_cancelled = outer_cancelled = False + + async def inner() -> None: + nonlocal inner_cancelled + try: + await asyncio.sleep(0.5) + except asyncio.CancelledError: + inner_cancelled = True + raise + + async def outer() -> None: + nonlocal outer_cancelled + try: + await scheduler.shield(inner()) + except asyncio.CancelledError: + outer_cancelled = True + + await scheduler.spawn(outer()) + await asyncio.sleep(0) + assert not inner_cancelled and not outer_cancelled + assert len(scheduler._shields) == 1 + assert len(scheduler._jobs) == 1 + + await scheduler.wait_and_close(0.1) + assert inner_cancelled and outer_cancelled # type: ignore[unreachable] + assert len(scheduler._shields) == 0 # type: ignore[unreachable] + assert len(scheduler._jobs) == 0 + assert scheduler.closed + + async def test_wait_and_close_spawn(scheduler: Scheduler) -> None: another_spawned = another_done = False From f621cd945a2788c293bada82702c158a706a3b8d Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Mon, 22 Jul 2024 22:58:47 +0100 Subject: [PATCH 12/19] Fix --- aiojobs/_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index e398339a..d9ef7ed9 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -27,7 +27,7 @@ ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None] -def _get_loop(fut: "asyncio.Task[object]") -> asyncio.AbstractEventLoop: +def _get_loop(fut: "asyncio.Task[object]") -> asyncio.AbstractEventLoop: # pragma: no cov # https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300 try: get_loop = fut.get_loop @@ -139,7 +139,7 @@ def shield(self, arg: _FutureLike[_T]) -> "asyncio.Future[_T]": loop = _get_loop(inner) outer = loop.create_future() - def _inner_done_callback(inner: asyncio.Task[object]) -> None: + def _inner_done_callback(inner: "asyncio.Task[object]") -> None: if outer.cancelled(): if not inner.cancelled(): inner.exception() From 7fac6f7db84dfdc297c86c783b18ace42f2c0708 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 22 Jul 2024 21:58:58 +0000 Subject: [PATCH 13/19] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- aiojobs/_scheduler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index d9ef7ed9..f21db786 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -27,7 +27,9 @@ ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None] -def _get_loop(fut: "asyncio.Task[object]") -> asyncio.AbstractEventLoop: # pragma: no cov +def _get_loop( + fut: "asyncio.Task[object]", +) -> asyncio.AbstractEventLoop: # pragma: no cov # https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300 try: get_loop = fut.get_loop From 6fe84a2e80f8dc52263fa0dba33a3fbfe3816e0b Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Mon, 22 Jul 2024 23:14:44 +0100 Subject: [PATCH 14/19] Fix --- aiojobs/_scheduler.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index f21db786..f51f2e4d 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -27,9 +27,9 @@ ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None] -def _get_loop( +def _get_loop( # pragma: no cov fut: "asyncio.Task[object]", -) -> asyncio.AbstractEventLoop: # pragma: no cov +) -> asyncio.AbstractEventLoop: # https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300 try: get_loop = fut.get_loop @@ -168,11 +168,12 @@ async def wait_and_close(self, timeout: float = 60) -> None: with suppress(asyncio.TimeoutError): async with asyncio_timeout(timeout): while self._jobs or self._shields: - await asyncio.gather( + gather = asyncio.gather( *(job.wait() for job in self._jobs), *self._shields, return_exceptions=True, ) + await asyncio.shield(gather) await self.close() async def close(self) -> None: From 7dc46e6d684a35f63498ea37eb0491fee71ed3e4 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Mon, 22 Jul 2024 23:19:00 +0100 Subject: [PATCH 15/19] Update _scheduler.py --- aiojobs/_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index f51f2e4d..0a4abc6b 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -27,7 +27,7 @@ ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None] -def _get_loop( # pragma: no cov +def _get_loop( # pragma: no cover fut: "asyncio.Task[object]", ) -> asyncio.AbstractEventLoop: # https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300 From e7740801455ce67bbafdb3af84f8e14d40dab1bc Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Tue, 23 Jul 2024 21:37:58 +0100 Subject: [PATCH 16/19] Docs --- docs/api.rst | 34 ++++++++++++++++++++++++++++++---- docs/index.rst | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index a4ce8b9d..8a4fb02d 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -103,14 +103,31 @@ Scheduler The method respects :attr:`pending_limit` now. + .. py:method:: shield(coro) + :async: + + Protect an awaitable from being cancelled. + + This is a drop-in replacement for :func:`asyncio.shield`, with the + addition of tracking the shielded task in the scheduler. This can be + used to ensure that shielded tasks will actually be completed on + application shutdown. + + .. py:method:: wait_and_close(timeout=60) + :async: + + Wait for currently scheduled tasks to finish gracefully for the given + *timeout*. Then proceed with closing the scheduler, where any + remaining tasks will be cancelled. + .. py:method:: close() :async: - Close scheduler and all its jobs. + Close scheduler and all its jobs by cancelling the tasks and then + waiting on them. - It finishing time for particular job exceeds - :attr:`close_timeout` this job is logged by - :meth:`call_exception_handler`. + It finishing time for a particular job exceeds :attr:`close_timeout` + the job is logged by :meth:`call_exception_handler`. .. attribute:: exception_handler @@ -221,6 +238,15 @@ jobs. Return :class:`aiojobs.Job` instance +.. function:: shield(request, coro) + :async: + + Protect an awaitable from being cancelled while registering the shielded + task into the registered scheduler. + + Any shielded tasks will then be run to completion when the web app shuts + down (assuming it doesn't exceed the shutdown timeout). + Helpers diff --git a/docs/index.rst b/docs/index.rst index 1660ae11..0fa39c33 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -44,6 +44,48 @@ Usage example For further information read :ref:`aiojobs-quickstart`, :ref:`aiojobs-intro` and :ref:`aiojobs-api`. +Shielding tasks with a scheduler +-------------------------------- + +It is typically recommended to use :func:`asyncio.shield` to protect tasks +from cancellation. However, the inner shielded tasks can't be tracked and +are therefore at risk of being cancelled during application shutdown. + +To resolve this issue aiojobs includes a :meth:`aiojobs.Scheduler.shield` +method to shield tasks while also keeping track of them in the scheduler. +In combination with the :meth:`aiojobs.Scheduler.wait_and_close` method, +this allows shielded tasks the required time to complete successfully +during application shutdown. + +For example: + +.. code-block:: python + + import asyncio + import aiojobs + from contextlib import suppress + + async def important(): + print("START") + await asyncio.sleep(5) + print("DONE") + + async def run_something(scheduler): + # If we use asyncio.shield() here, then the task doesn't complete and DONE is never printed. + await scheduler.shield(important()) + + async def main(): + scheduler = aiojobs.Scheduler() + t = asyncio.create_task(run_something(scheduler)) + await asyncio.sleep(0.1) + t.cancel() + with suppress(asyncio.CancelledError): + await t + await scheduler.wait_and_close() + + asyncio.run(main()) + + Integration with aiohttp.web ---------------------------- From 1585f26c18d6cd03f59fd7c70ba38167c07eefa4 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Tue, 23 Jul 2024 21:51:51 +0100 Subject: [PATCH 17/19] Docs --- README.rst | 47 ++++++++++++++++++++++++++++++++++++++++++++--- docs/index.rst | 4 ++-- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/README.rst b/README.rst index 938bf38b..0d0d5936 100644 --- a/README.rst +++ b/README.rst @@ -47,10 +47,51 @@ Usage example await asyncio.sleep(5.0) # not all scheduled jobs are finished at the moment - # gracefully close spawned jobs - await scheduler.close() + # gracefully wait on tasks before closing any remaining spawned jobs + await scheduler.wait_and_close() - asyncio.get_event_loop().run_until_complete(main()) + asyncio.run(main()) + +Shielding tasks with a scheduler +================================ + +It is typically recommended to use :func:`asyncio.shield` to protect tasks +from cancellation. However, the inner shielded tasks can't be tracked and +are therefore at risk of being cancelled during application shutdown. + +To resolve this issue aiojobs includes a :meth:`aiojobs.Scheduler.shield` +method to shield tasks while also keeping track of them in the scheduler. +In combination with the :meth:`aiojobs.Scheduler.wait_and_close` method, +this allows shielded tasks the required time to complete successfully +during application shutdown. + +For example: + +.. code-block:: python + + import asyncio + import aiojobs + from contextlib import suppress + + async def important(): + print("START") + await asyncio.sleep(5) + print("DONE") + + async def run_something(scheduler): + # If we use asyncio.shield() here, then the task doesn't complete and DONE is never printed. + await scheduler.shield(important()) + + async def main(): + scheduler = aiojobs.Scheduler() + t = asyncio.create_task(run_something(scheduler)) + await asyncio.sleep(0.1) + t.cancel() + with suppress(asyncio.CancelledError): + await t + await scheduler.wait_and_close() + + asyncio.run(main()) Integration with aiohttp.web diff --git a/docs/index.rst b/docs/index.rst index 0fa39c33..e1ab465a 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -36,8 +36,8 @@ Usage example await asyncio.sleep(5.0) # not all scheduled jobs are finished at the moment - # gracefully close spawned jobs - await scheduler.close() + # gracefully wait on tasks before closing any remaining spawned jobs + await scheduler.wait_and_close() asyncio.run(main()) From 452bdab3ed138998a08803187f4d59d4a697b695 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Tue, 23 Jul 2024 22:09:59 +0100 Subject: [PATCH 18/19] Docs --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 0d0d5936..7ecbd40c 100644 --- a/README.rst +++ b/README.rst @@ -55,7 +55,7 @@ Usage example Shielding tasks with a scheduler ================================ -It is typically recommended to use :func:`asyncio.shield` to protect tasks +It is typically recommended to use :meth:`asyncio.shield` to protect tasks from cancellation. However, the inner shielded tasks can't be tracked and are therefore at risk of being cancelled during application shutdown. From 61c80a43b24e27f2fe0e0b0528da962a34760ca2 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Tue, 23 Jul 2024 22:12:56 +0100 Subject: [PATCH 19/19] Docs --- README.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index 7ecbd40c..f0a759ba 100644 --- a/README.rst +++ b/README.rst @@ -55,13 +55,13 @@ Usage example Shielding tasks with a scheduler ================================ -It is typically recommended to use :meth:`asyncio.shield` to protect tasks +It is typically recommended to use ``asyncio.shield`` to protect tasks from cancellation. However, the inner shielded tasks can't be tracked and are therefore at risk of being cancelled during application shutdown. -To resolve this issue aiojobs includes a :meth:`aiojobs.Scheduler.shield` +To resolve this issue aiojobs includes a ``aiojobs.Scheduler.shield`` method to shield tasks while also keeping track of them in the scheduler. -In combination with the :meth:`aiojobs.Scheduler.wait_and_close` method, +In combination with the ``aiojobs.Scheduler.wait_and_close`` method, this allows shielded tasks the required time to complete successfully during application shutdown.