Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement web.Runner context manager #8723

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/8723.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement web.Runner context manager -- by :user:`DavidRomanovizc`
261 changes: 234 additions & 27 deletions aiohttp/web.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import asyncio
import contextvars
import enum
import functools
import logging
import os
import signal
import socket
import sys
import threading
import warnings
from argparse import ArgumentParser
from asyncio import Task, constants, coroutines, events, exceptions, tasks
from collections.abc import Iterable
from importlib import import_module
from types import FrameType, TracebackType
from typing import (
Any,
Awaitable,
Expand All @@ -18,6 +25,7 @@
Type,
Union,
cast,
final,
)

from .abc import AbstractAccessLogger
Expand Down Expand Up @@ -263,9 +271,9 @@
"WSMsgType",
# web
"run_app",
"Runner",
)


try:
from ssl import SSLContext
except ImportError: # pragma: no cover
Expand All @@ -277,6 +285,222 @@
HostSequence = TypingIterable[str]


class _State(enum.Enum):
CREATED = "created"
INITIALIZED = "initialized"
CLOSED = "closed"


@final
class Runner:
"""A context manager that controls event loop life cycle"""
Dreamsorcerer marked this conversation as resolved.
Show resolved Hide resolved

def __init__(
self,
*,
debug: Optional[bool] = None,
loop_factory: Optional[Callable[[], asyncio.AbstractEventLoop]] = None,
):
self._state = _State.CREATED
self._debug = debug
self._loop_factory = loop_factory
self._loop = None
self._context = None
self._interrupt_count = 0
self._set_event_loop = False

def __enter__(self) -> "Runner":
self._lazy_init()
return self

def __exit__(
self,
exc_type: Optional[type],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.close()

def close(self) -> None:
"""Shutdown and close event loop."""
if self._state is not _State.INITIALIZED:
return
loop = self._loop
try:
_cancel_tasks(tasks.all_tasks(loop), loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(
loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT)
)
finally:
if self._set_event_loop:
events.set_event_loop(None)
loop.close()
self._loop = None
self._state = _State.CLOSED

def get_loop(self) -> asyncio.AbstractEventLoop:
"""Return embedded event loop."""
self._lazy_init()
return self._loop

def run(
self, coro: Awaitable, *, context: Optional[contextvars.Context] = None
) -> Any:
"""Run a coroutine inside the embedded event loop."""
if not coroutines.iscoroutine(coro):
raise ValueError(f"a coroutine was expected, got {coro!r}")

if events._get_running_loop() is not None:
# fail fast with short traceback
raise RuntimeError(
"Runner.run() cannot be called from a running event loop"
)

self._lazy_init()

if context is None:
context = self._context
task = self._loop.create_task(coro, context=context)

if (
threading.current_thread() is threading.main_thread()
and signal.getsignal(signal.SIGINT) is signal.default_int_handler
):
sigint_handler = functools.partial(self._on_sigint, main_task=task)
try:
signal.signal(signal.SIGINT, sigint_handler)
except ValueError:
# `signal.signal` may throw if `threading.main_thread` does
# not support signals (e.g. embedded interpreter with signals
# not registered - see gh-91880)
sigint_handler = None
else:
sigint_handler = None
Comment on lines +366 to +379
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this SIGINT code needs to be copied to run_app()...
Will have to do some more testing.


self._interrupt_count = 0
try:
return self._loop.run_until_complete(task)
except exceptions.CancelledError:
if self._interrupt_count > 0:
uncancel = getattr(task, "uncancel", None)
if uncancel is not None and uncancel() == 0:
raise KeyboardInterrupt()
raise # CancelledError
finally:
if (
sigint_handler is not None
and signal.getsignal(signal.SIGINT) is sigint_handler
):
signal.signal(signal.SIGINT, signal.default_int_handler)

def run_app(
self,
app: Union[Application, Awaitable[Application]],
*,
host: Optional[Union[str, HostSequence]] = None,
port: Optional[int] = None,
path: Union[PathLike, TypingIterable[PathLike], None] = None,
sock: Optional[Union[socket.socket, TypingIterable[socket.socket]]] = None,
shutdown_timeout: float = 60.0,
keepalive_timeout: float = 75.0,
ssl_context: Optional[SSLContext] = None,
print: Optional[Callable[..., None]] = print,
backlog: int = 128,
access_log_class: Type[AbstractAccessLogger] = AccessLogger,
access_log_format: str = AccessLogger.LOG_FORMAT,
access_log: Optional[logging.Logger] = access_logger,
handle_signals: bool = True,
reuse_address: Optional[bool] = None,
reuse_port: Optional[bool] = None,
handler_cancellation: bool = False,
) -> None:
"""Run an app locally"""
self._lazy_init()

self._loop.set_debug(self._debug)

Comment on lines +421 to +422
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already done in _laze_init().

Suggested change
self._loop.set_debug(self._debug)

if (
self._loop.get_debug()
and access_log
and access_log.name == "aiohttp.access"
):
if access_log.level == logging.NOTSET:
access_log.setLevel(logging.DEBUG)
if not access_log.hasHandlers():
access_log.addHandler(logging.StreamHandler())

main_task = self._loop.create_task(
_run_app(
app,
host=host,
port=port,
path=path,
sock=sock,
shutdown_timeout=shutdown_timeout,
keepalive_timeout=keepalive_timeout,
ssl_context=ssl_context,
print=print,
backlog=backlog,
access_log_class=access_log_class,
access_log_format=access_log_format,
access_log=access_log,
handle_signals=handle_signals,
reuse_address=reuse_address,
reuse_port=reuse_port,
handler_cancellation=handler_cancellation,
)
)

try:
if self._set_event_loop:
asyncio.set_event_loop(self._loop)
Comment on lines +456 to +457
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in _lazy_init()

Suggested change
if self._set_event_loop:
asyncio.set_event_loop(self._loop)

self._loop.run_until_complete(main_task)
except (GracefulExit, KeyboardInterrupt): # pragma: no cover
pass
finally:
_cancel_tasks({main_task}, self._loop)
_cancel_tasks(asyncio.all_tasks(self._loop), self._loop)
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
self.close()
asyncio.set_event_loop(None)
Comment on lines +462 to +466
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already done in .close():

Suggested change
_cancel_tasks({main_task}, self._loop)
_cancel_tasks(asyncio.all_tasks(self._loop), self._loop)
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
self.close()
asyncio.set_event_loop(None)
_cancel_tasks({main_task}, self._loop)
self.close()


def _lazy_init(self) -> None:
if self._state is _State.CLOSED:
raise RuntimeError("Runner is closed")
if self._state is _State.INITIALIZED:
return
if self._loop_factory is None:
self._loop = events.new_event_loop()
if not self._set_event_loop:
# Call set_event_loop only once to avoid calling
# attach_loop multiple times on child watchers
events.set_event_loop(self._loop)
self._set_event_loop = True
else:
try:
self._loop = self._loop_factory()
except RuntimeError:
Dreamsorcerer marked this conversation as resolved.
Show resolved Hide resolved
self._loop = events.new_event_loop()
events.set_event_loop(self._loop)
self._set_event_loop = True
if self._debug is not None:
self._loop.set_debug(self._debug)
self._context = contextvars.copy_context()
self._state = _State.INITIALIZED

def _on_sigint(
self, signum: int, frame: Optional[FrameType], main_task: Task
) -> None:
self._interrupt_count += 1
if self._interrupt_count == 1 and not main_task.done():
main_task.cancel()
# wakeup loop if it is blocked by select() with long timeout
self._loop.call_soon_threadsafe(lambda: None)
return
raise KeyboardInterrupt()


async def _run_app(
app: Union[Application, Awaitable[Application]],
*,
Expand Down Expand Up @@ -462,19 +686,15 @@ def run_app(
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
"""Run an app locally"""
if loop is None:
loop = asyncio.new_event_loop()
loop.set_debug(debug)

# Configure if and only if in debugging mode and using the default logger
if loop.get_debug() and access_log and access_log.name == "aiohttp.access":
if access_log.level == logging.NOTSET:
access_log.setLevel(logging.DEBUG)
if not access_log.hasHandlers():
access_log.addHandler(logging.StreamHandler())

main_task = loop.create_task(
_run_app(
if loop is not None:

def loop_factory():
return loop

else:
loop_factory = events.get_running_loop
Comment on lines +689 to +695
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think loop_factory should default to None (maybe this is the cause of the RuntimeError above?).

Suggested change
if loop is not None:
def loop_factory():
return loop
else:
loop_factory = events.get_running_loop
loop_factory = None if loop is None else lambda: loop

with Runner(debug=debug, loop_factory=loop_factory) as runner:
runner.run_app(
app,
host=host,
port=port,
Expand All @@ -493,19 +713,6 @@ def run_app(
reuse_port=reuse_port,
handler_cancellation=handler_cancellation,
)
)

try:
asyncio.set_event_loop(loop)
loop.run_until_complete(main_task)
except (GracefulExit, KeyboardInterrupt): # pragma: no cover
pass
finally:
_cancel_tasks({main_task}, loop)
_cancel_tasks(asyncio.all_tasks(loop), loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
asyncio.set_event_loop(None)


def main(argv: List[str]) -> None:
Expand Down
Loading
Loading