Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Scheduler.shield() and Scheduler.wait_and_close() #495

Merged
merged 25 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8068ef8
Add Scheduler.shield() and Scheduler.wait_and_close()
Dreamsorcerer Jul 14, 2024
f380aa6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 14, 2024
19d0585
Update _scheduler.py
Dreamsorcerer Jul 22, 2024
3b7d611
Update aiohttp.py
Dreamsorcerer Jul 22, 2024
3df7f40
Update _scheduler.py
Dreamsorcerer Jul 22, 2024
1f327f2
Update _scheduler.py
Dreamsorcerer Jul 22, 2024
c140e3c
Update _scheduler.py
Dreamsorcerer Jul 22, 2024
43ade6e
Merge branch 'master' into shield
Dreamsorcerer Jul 22, 2024
9ee2f8d
Update _scheduler.py
Dreamsorcerer Jul 22, 2024
e69f1ff
Add tests
Dreamsorcerer Jul 22, 2024
424fdb8
Merge branch 'master' into shield
Dreamsorcerer Jul 22, 2024
a867207
Type ignores
Dreamsorcerer Jul 22, 2024
9a1ce4d
Merge branch 'shield' of github.com:aio-libs/aiojobs into shield
Dreamsorcerer Jul 22, 2024
25cbd0a
Merge branch 'master' into shield
Dreamsorcerer Jul 22, 2024
a145d4d
Another test
Dreamsorcerer Jul 22, 2024
570797e
Merge branch 'shield' of github.com:aio-libs/aiojobs into shield
Dreamsorcerer Jul 22, 2024
f621cd9
Fix
Dreamsorcerer Jul 22, 2024
7fac6f7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 22, 2024
6fe84a2
Fix
Dreamsorcerer Jul 22, 2024
7dc46e6
Update _scheduler.py
Dreamsorcerer Jul 22, 2024
e774080
Docs
Dreamsorcerer Jul 23, 2024
e257a16
Merge branch 'shield' of github.com:aio-libs/aiojobs into shield
Dreamsorcerer Jul 23, 2024
1585f26
Docs
Dreamsorcerer Jul 23, 2024
452bdab
Docs
Dreamsorcerer Jul 23, 2024
61c80a4
Docs
Dreamsorcerer Jul 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 76 additions & 2 deletions aiojobs/_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import sys
from contextlib import suppress
from typing import (
Any,
Awaitable,
Callable,
Collection,
Coroutine,
Expand All @@ -9,14 +12,32 @@
Optional,
Set,
TypeVar,
Union,
)

from ._job import Job

if sys.version_info >= (3, 11):
from asyncio import timeout as asyncio_timeout
else:
from async_timeout import timeout as asyncio_timeout

_T = TypeVar("_T")
_FutureLike = Union["asyncio.Future[_T]", Awaitable[_T]]
ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None]


def _get_loop(fut: "asyncio.Task[object]") -> asyncio.AbstractEventLoop:
# https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300
try:
get_loop = fut.get_loop
except AttributeError:
pass

Check warning on line 35 in aiojobs/_scheduler.py

View check run for this annotation

Codecov / codecov/patch

aiojobs/_scheduler.py#L34-L35

Added lines #L34 - L35 were not covered by tests
else:
return get_loop()
return fut._loop

Check warning on line 38 in aiojobs/_scheduler.py

View check run for this annotation

Codecov / codecov/patch

aiojobs/_scheduler.py#L38

Added line #L38 was not covered by tests


class Scheduler(Collection[Job[object]]):
def __init__(
self,
Expand All @@ -33,6 +54,7 @@
)

self._jobs: Set[Job[object]] = set()
self._shields: Set[asyncio.Task[object]] = set()
self._close_timeout = close_timeout
self._limit = limit
self._exception_handler = exception_handler
Expand Down Expand Up @@ -104,19 +126,71 @@
self._jobs.add(job)
return job

def shield(self, arg: _FutureLike[_T]) -> "asyncio.Future[_T]":
inner = asyncio.ensure_future(arg)
if inner.done():
return inner

Check warning on line 132 in aiojobs/_scheduler.py

View check run for this annotation

Codecov / codecov/patch

aiojobs/_scheduler.py#L132

Added line #L132 was not covered by tests

# This function is a copy of asyncio.shield(), except for the addition of
# the below 2 lines.
self._shields.add(inner)
inner.add_done_callback(self._shields.discard)

loop = _get_loop(inner)
outer = loop.create_future()

def _inner_done_callback(inner: asyncio.Task[object]) -> None:
if outer.cancelled():
if not inner.cancelled():
inner.exception()
return

Check warning on line 146 in aiojobs/_scheduler.py

View check run for this annotation

Codecov / codecov/patch

aiojobs/_scheduler.py#L145-L146

Added lines #L145 - L146 were not covered by tests

if inner.cancelled():
outer.cancel()

Check warning on line 149 in aiojobs/_scheduler.py

View check run for this annotation

Codecov / codecov/patch

aiojobs/_scheduler.py#L149

Added line #L149 was not covered by tests
else:
exc = inner.exception()
if exc is not None:
outer.set_exception(exc)

Check warning on line 153 in aiojobs/_scheduler.py

View check run for this annotation

Codecov / codecov/patch

aiojobs/_scheduler.py#L153

Added line #L153 was not covered by tests
else:
outer.set_result(inner.result())

def _outer_done_callback(outer: "asyncio.Future[object]") -> None:
if not inner.done():
inner.remove_done_callback(_inner_done_callback)

inner.add_done_callback(_inner_done_callback)
outer.add_done_callback(_outer_done_callback)
return outer

async def wait_and_close(self, timeout: float = 60) -> None:
with suppress(asyncio.TimeoutError):
async with asyncio_timeout(timeout):
while self._jobs or self._shields:
await asyncio.gather(
*(job.wait() for job in self._jobs),
*self._shields,
return_exceptions=True,
)
await self.close()

async def close(self) -> None:
if self._closed:
return
self._closed = True # prevent adding new jobs

jobs = self._jobs
if jobs:
if jobs or self._shields:
# cleanup pending queue
# all job will be started on closing
while not self._pending.empty():
self._pending.get_nowait()

for f in self._shields:
f.cancel()

Check warning on line 189 in aiojobs/_scheduler.py

View check run for this annotation

Codecov / codecov/patch

aiojobs/_scheduler.py#L189

Added line #L189 was not covered by tests

await asyncio.gather(
*[job._close(self._close_timeout) for job in jobs],
*(job._close(self._close_timeout) for job in jobs),
*(asyncio.wait_for(f, self._close_timeout) for f in self._shields),
return_exceptions=True,
)
self._jobs.clear()
Expand Down
8 changes: 7 additions & 1 deletion aiojobs/aiohttp.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from functools import wraps
from typing import (
Any,
Expand All @@ -18,6 +19,7 @@
__all__ = ("setup", "spawn", "get_scheduler", "get_scheduler_from_app", "atomic")

_T = TypeVar("_T")
_FutureLike = Union["asyncio.Future[_T]", Awaitable[_T]]
_RequestView = TypeVar("_RequestView", bound=Union[web.Request, web.View])


Expand All @@ -43,6 +45,10 @@ async def spawn(request: web.Request, coro: Coroutine[object, object, _T]) -> Jo
return await get_scheduler(request).spawn(coro)


def shield(request: web.Request, arg: _FutureLike[_T]) -> "asyncio.Future[_T]":
return get_scheduler(request).shield(arg)


def atomic(
coro: Callable[[_RequestView], Coroutine[object, object, _T]]
) -> Callable[[_RequestView], Awaitable[_T]]:
Expand All @@ -65,6 +71,6 @@ def setup(app: web.Application, **kwargs: Any) -> None:
async def cleanup_context(app: web.Application) -> AsyncIterator[None]:
app[AIOJOBS_SCHEDULER] = scheduler = Scheduler(**kwargs)
yield
await scheduler.close()
await scheduler.wait_and_close()

app.cleanup_ctx.append(cleanup_context)
10 changes: 9 additions & 1 deletion tests/test_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
get_scheduler_from_app,
get_scheduler_from_request,
setup as aiojobs_setup,
shield,
spawn,
)

Expand All @@ -23,6 +24,10 @@
async def test_plugin(aiohttp_client: _Client) -> None:
job = None

async def shielded() -> str:
await asyncio.sleep(0)
return "TEST"

async def coro() -> None:
await asyncio.sleep(10)

Expand All @@ -31,7 +36,9 @@ async def handler(request: web.Request) -> web.Response:

job = await spawn(request, coro())
assert not job.closed
return web.Response()

res = await shield(request, shielded())
return web.Response(text=res)

app = web.Application()
app.router.add_get("/", handler)
Expand All @@ -40,6 +47,7 @@ async def handler(request: web.Request) -> web.Response:
client = await aiohttp_client(app)
resp = await client.get("/")
assert resp.status == 200
assert await resp.text() == "TEST"

assert job is not None
assert job.active
Expand Down
112 changes: 112 additions & 0 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,118 @@
del coro


async def test_shield(scheduler: Scheduler) -> None:
async def coro() -> str:
await asyncio.sleep(0)
return "TEST"

result = await scheduler.shield(coro())
assert result == "TEST"
assert len(scheduler._shields) == 0


async def test_shielded_task_continues(scheduler: Scheduler) -> None:
completed = False

async def inner() -> None:
nonlocal completed
await asyncio.sleep(0.1)
completed = True

async def outer() -> None:
await scheduler.shield(inner())

t = asyncio.create_task(outer())
await asyncio.sleep(0)
t.cancel()
with pytest.raises(asyncio.CancelledError):
await t
Dismissed Show dismissed Hide dismissed

assert not completed
assert len(scheduler._shields) == 1
await asyncio.sleep(0.11)
assert completed
assert len(scheduler._shields) == 0


async def test_wait_and_close(scheduler: Scheduler) -> None:
inner_done = outer_done = False

async def inner() -> None:
nonlocal inner_done
await asyncio.sleep(0.1)
inner_done = True

async def outer() -> None:
nonlocal outer_done
await scheduler.shield(inner())
await asyncio.sleep(0.1)
outer_done = True

await scheduler.spawn(outer())
await asyncio.sleep(0)
assert not inner_done and not outer_done
assert len(scheduler._shields) == 1
assert len(scheduler._jobs) == 1

await scheduler.wait_and_close()
assert inner_done and outer_done
assert len(scheduler._shields) == 0
assert len(scheduler._jobs) == 0
assert scheduler.closed


async def test_wait_and_close_timeout(scheduler: Scheduler) -> None:
inner_done = outer_cancelled = False

async def inner() -> None:
nonlocal inner_done
await asyncio.sleep(0.1)
inner_done = True

async def outer() -> None:
nonlocal outer_cancelled
await scheduler.shield(inner())
try:
await asyncio.sleep(0.5)
except asyncio.CancelledError:
outer_cancelled = True

await scheduler.spawn(outer())
await asyncio.sleep(0)
assert not inner_done and not outer_cancelled
assert len(scheduler._shields) == 1
assert len(scheduler._jobs) == 1

await scheduler.wait_and_close(0.2)
assert inner_done and outer_cancelled
assert len(scheduler._shields) == 0
assert len(scheduler._jobs) == 0
assert scheduler.closed


async def test_wait_and_close_spawn(scheduler: Scheduler) -> None:
another_spawned = another_done = False

async def another() -> None:
nonlocal another_done
await scheduler.shield(asyncio.sleep(0.1))
another_done = True

async def coro() -> None:
nonlocal another_spawned
await asyncio.sleep(0.1)
another_spawned = True
await scheduler.spawn(another())

await scheduler.spawn(coro())
await asyncio.sleep(0)

assert not another_spawned and not another_done
await scheduler.wait_and_close()
assert another_spawned and another_done


def test_scheduler_must_be_created_within_running_loop() -> None:
with pytest.raises(RuntimeError) as exc_info:
Scheduler(close_timeout=0, limit=0, pending_limit=0, exception_handler=None)
Expand Down
Loading