Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Scheduler.shield() and Scheduler.wait_and_close() #495

Merged
merged 25 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8068ef8
Add Scheduler.shield() and Scheduler.wait_and_close()
Dreamsorcerer Jul 14, 2024
f380aa6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 14, 2024
19d0585
Update _scheduler.py
Dreamsorcerer Jul 22, 2024
3b7d611
Update aiohttp.py
Dreamsorcerer Jul 22, 2024
3df7f40
Update _scheduler.py
Dreamsorcerer Jul 22, 2024
1f327f2
Update _scheduler.py
Dreamsorcerer Jul 22, 2024
c140e3c
Update _scheduler.py
Dreamsorcerer Jul 22, 2024
43ade6e
Merge branch 'master' into shield
Dreamsorcerer Jul 22, 2024
9ee2f8d
Update _scheduler.py
Dreamsorcerer Jul 22, 2024
e69f1ff
Add tests
Dreamsorcerer Jul 22, 2024
424fdb8
Merge branch 'master' into shield
Dreamsorcerer Jul 22, 2024
a867207
Type ignores
Dreamsorcerer Jul 22, 2024
9a1ce4d
Merge branch 'shield' of github.com:aio-libs/aiojobs into shield
Dreamsorcerer Jul 22, 2024
25cbd0a
Merge branch 'master' into shield
Dreamsorcerer Jul 22, 2024
a145d4d
Another test
Dreamsorcerer Jul 22, 2024
570797e
Merge branch 'shield' of github.com:aio-libs/aiojobs into shield
Dreamsorcerer Jul 22, 2024
f621cd9
Fix
Dreamsorcerer Jul 22, 2024
7fac6f7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 22, 2024
6fe84a2
Fix
Dreamsorcerer Jul 22, 2024
7dc46e6
Update _scheduler.py
Dreamsorcerer Jul 22, 2024
e774080
Docs
Dreamsorcerer Jul 23, 2024
e257a16
Merge branch 'shield' of github.com:aio-libs/aiojobs into shield
Dreamsorcerer Jul 23, 2024
1585f26
Docs
Dreamsorcerer Jul 23, 2024
452bdab
Docs
Dreamsorcerer Jul 23, 2024
61c80a4
Docs
Dreamsorcerer Jul 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,51 @@ Usage example
await asyncio.sleep(5.0)
# not all scheduled jobs are finished at the moment

# gracefully close spawned jobs
await scheduler.close()
# gracefully wait on tasks before closing any remaining spawned jobs
await scheduler.wait_and_close()

asyncio.get_event_loop().run_until_complete(main())
asyncio.run(main())

Shielding tasks with a scheduler
================================

It is typically recommended to use ``asyncio.shield`` to protect tasks
from cancellation. However, the inner shielded tasks can't be tracked and
are therefore at risk of being cancelled during application shutdown.

To resolve this issue aiojobs includes a ``aiojobs.Scheduler.shield``
method to shield tasks while also keeping track of them in the scheduler.
In combination with the ``aiojobs.Scheduler.wait_and_close`` method,
this allows shielded tasks the required time to complete successfully
during application shutdown.

For example:

.. code-block:: python

import asyncio
import aiojobs
from contextlib import suppress

async def important():
print("START")
await asyncio.sleep(5)
print("DONE")

async def run_something(scheduler):
# If we use asyncio.shield() here, then the task doesn't complete and DONE is never printed.
await scheduler.shield(important())

async def main():
scheduler = aiojobs.Scheduler()
t = asyncio.create_task(run_something(scheduler))
await asyncio.sleep(0.1)
t.cancel()
with suppress(asyncio.CancelledError):
await t
await scheduler.wait_and_close()

asyncio.run(main())


Integration with aiohttp.web
Expand Down
81 changes: 79 additions & 2 deletions aiojobs/_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import sys
from contextlib import suppress
from typing import (
Any,
Awaitable,
Callable,
Collection,
Coroutine,
Expand All @@ -9,14 +12,34 @@
Optional,
Set,
TypeVar,
Union,
)

from ._job import Job

if sys.version_info >= (3, 11):
from asyncio import timeout as asyncio_timeout
else:
from async_timeout import timeout as asyncio_timeout

_T = TypeVar("_T")
_FutureLike = Union["asyncio.Future[_T]", Awaitable[_T]]
ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None]


def _get_loop( # pragma: no cover
fut: "asyncio.Task[object]",
) -> asyncio.AbstractEventLoop:
# https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300
try:
get_loop = fut.get_loop
except AttributeError:
pass
else:
return get_loop()
return fut._loop


class Scheduler(Collection[Job[object]]):
def __init__(
self,
Expand All @@ -33,6 +56,7 @@
)

self._jobs: Set[Job[object]] = set()
self._shields: Set[asyncio.Task[object]] = set()
self._close_timeout = close_timeout
self._limit = limit
self._exception_handler = exception_handler
Expand Down Expand Up @@ -104,19 +128,72 @@
self._jobs.add(job)
return job

def shield(self, arg: _FutureLike[_T]) -> "asyncio.Future[_T]":
inner = asyncio.ensure_future(arg)
if inner.done():
return inner

Check warning on line 134 in aiojobs/_scheduler.py

View check run for this annotation

Codecov / codecov/patch

aiojobs/_scheduler.py#L134

Added line #L134 was not covered by tests

# This function is a copy of asyncio.shield(), except for the addition of
# the below 2 lines.
self._shields.add(inner)
inner.add_done_callback(self._shields.discard)

loop = _get_loop(inner)
outer = loop.create_future()

def _inner_done_callback(inner: "asyncio.Task[object]") -> None:
if outer.cancelled():
if not inner.cancelled():
inner.exception()

Check warning on line 147 in aiojobs/_scheduler.py

View check run for this annotation

Codecov / codecov/patch

aiojobs/_scheduler.py#L147

Added line #L147 was not covered by tests
return

if inner.cancelled():
outer.cancel()

Check warning on line 151 in aiojobs/_scheduler.py

View check run for this annotation

Codecov / codecov/patch

aiojobs/_scheduler.py#L151

Added line #L151 was not covered by tests
else:
exc = inner.exception()
if exc is not None:
outer.set_exception(exc)

Check warning on line 155 in aiojobs/_scheduler.py

View check run for this annotation

Codecov / codecov/patch

aiojobs/_scheduler.py#L155

Added line #L155 was not covered by tests
else:
outer.set_result(inner.result())

def _outer_done_callback(outer: "asyncio.Future[object]") -> None:
if not inner.done():
inner.remove_done_callback(_inner_done_callback)

inner.add_done_callback(_inner_done_callback)
outer.add_done_callback(_outer_done_callback)
return outer

async def wait_and_close(self, timeout: float = 60) -> None:
with suppress(asyncio.TimeoutError):
async with asyncio_timeout(timeout):
while self._jobs or self._shields:
gather = asyncio.gather(
*(job.wait() for job in self._jobs),
*self._shields,
return_exceptions=True,
)
await asyncio.shield(gather)
await self.close()

async def close(self) -> None:
if self._closed:
return
self._closed = True # prevent adding new jobs

jobs = self._jobs
if jobs:
if jobs or self._shields:
# cleanup pending queue
# all job will be started on closing
while not self._pending.empty():
self._pending.get_nowait()

for f in self._shields:
f.cancel()

await asyncio.gather(
*[job._close(self._close_timeout) for job in jobs],
*(job._close(self._close_timeout) for job in jobs),
*(asyncio.wait_for(f, self._close_timeout) for f in self._shields),
return_exceptions=True,
)
self._jobs.clear()
Expand Down
8 changes: 7 additions & 1 deletion aiojobs/aiohttp.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from functools import wraps
from typing import (
Any,
Expand All @@ -18,6 +19,7 @@
__all__ = ("setup", "spawn", "get_scheduler", "get_scheduler_from_app", "atomic")

_T = TypeVar("_T")
_FutureLike = Union["asyncio.Future[_T]", Awaitable[_T]]
_RequestView = TypeVar("_RequestView", bound=Union[web.Request, web.View])


Expand All @@ -43,6 +45,10 @@ async def spawn(request: web.Request, coro: Coroutine[object, object, _T]) -> Jo
return await get_scheduler(request).spawn(coro)


def shield(request: web.Request, arg: _FutureLike[_T]) -> "asyncio.Future[_T]":
return get_scheduler(request).shield(arg)


def atomic(
coro: Callable[[_RequestView], Coroutine[object, object, _T]]
) -> Callable[[_RequestView], Awaitable[_T]]:
Expand All @@ -65,6 +71,6 @@ def setup(app: web.Application, **kwargs: Any) -> None:
async def cleanup_context(app: web.Application) -> AsyncIterator[None]:
app[AIOJOBS_SCHEDULER] = scheduler = Scheduler(**kwargs)
yield
await scheduler.close()
await scheduler.wait_and_close()

app.cleanup_ctx.append(cleanup_context)
34 changes: 30 additions & 4 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,31 @@ Scheduler

The method respects :attr:`pending_limit` now.

.. py:method:: shield(coro)
:async:

Protect an awaitable from being cancelled.

This is a drop-in replacement for :func:`asyncio.shield`, with the
addition of tracking the shielded task in the scheduler. This can be
used to ensure that shielded tasks will actually be completed on
application shutdown.

.. py:method:: wait_and_close(timeout=60)
:async:

Wait for currently scheduled tasks to finish gracefully for the given
*timeout*. Then proceed with closing the scheduler, where any
remaining tasks will be cancelled.

.. py:method:: close()
:async:

Close scheduler and all its jobs.
Close scheduler and all its jobs by cancelling the tasks and then
waiting on them.

It finishing time for particular job exceeds
:attr:`close_timeout` this job is logged by
:meth:`call_exception_handler`.
It finishing time for a particular job exceeds :attr:`close_timeout`
the job is logged by :meth:`call_exception_handler`.


.. attribute:: exception_handler
Expand Down Expand Up @@ -221,6 +238,15 @@ jobs.

Return :class:`aiojobs.Job` instance

.. function:: shield(request, coro)
:async:

Protect an awaitable from being cancelled while registering the shielded
task into the registered scheduler.

Any shielded tasks will then be run to completion when the web app shuts
down (assuming it doesn't exceed the shutdown timeout).


Helpers

Expand Down
46 changes: 44 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,56 @@ Usage example
await asyncio.sleep(5.0)
# not all scheduled jobs are finished at the moment

# gracefully close spawned jobs
await scheduler.close()
# gracefully wait on tasks before closing any remaining spawned jobs
await scheduler.wait_and_close()

asyncio.run(main())

For further information read :ref:`aiojobs-quickstart`,
:ref:`aiojobs-intro` and :ref:`aiojobs-api`.

Shielding tasks with a scheduler
--------------------------------

It is typically recommended to use :func:`asyncio.shield` to protect tasks
from cancellation. However, the inner shielded tasks can't be tracked and
are therefore at risk of being cancelled during application shutdown.

To resolve this issue aiojobs includes a :meth:`aiojobs.Scheduler.shield`
method to shield tasks while also keeping track of them in the scheduler.
In combination with the :meth:`aiojobs.Scheduler.wait_and_close` method,
this allows shielded tasks the required time to complete successfully
during application shutdown.

For example:

.. code-block:: python

import asyncio
import aiojobs
from contextlib import suppress

async def important():
print("START")
await asyncio.sleep(5)
print("DONE")

async def run_something(scheduler):
# If we use asyncio.shield() here, then the task doesn't complete and DONE is never printed.
await scheduler.shield(important())

async def main():
scheduler = aiojobs.Scheduler()
t = asyncio.create_task(run_something(scheduler))
await asyncio.sleep(0.1)
t.cancel()
with suppress(asyncio.CancelledError):
await t
await scheduler.wait_and_close()

asyncio.run(main())


Integration with aiohttp.web
----------------------------

Expand Down
10 changes: 9 additions & 1 deletion tests/test_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
get_scheduler_from_app,
get_scheduler_from_request,
setup as aiojobs_setup,
shield,
spawn,
)

Expand All @@ -23,6 +24,10 @@
async def test_plugin(aiohttp_client: _Client) -> None:
job = None

async def shielded() -> str:
await asyncio.sleep(0)
return "TEST"

async def coro() -> None:
await asyncio.sleep(10)

Expand All @@ -31,7 +36,9 @@ async def handler(request: web.Request) -> web.Response:

job = await spawn(request, coro())
assert not job.closed
return web.Response()

res = await shield(request, shielded())
return web.Response(text=res)

app = web.Application()
app.router.add_get("/", handler)
Expand All @@ -40,6 +47,7 @@ async def handler(request: web.Request) -> web.Response:
client = await aiohttp_client(app)
resp = await client.get("/")
assert resp.status == 200
assert await resp.text() == "TEST"

assert job is not None
assert job.active
Expand Down
Loading