import asyncio import logging import warnings from functools import partial, update_wrapper from typing import ( TYPE_CHECKING, Any, AsyncIterator, Awaitable, Callable, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence, Tuple, Type, Union, cast, ) from . import hdrs from .abc import ( AbstractAccessLogger, AbstractMatchInfo, AbstractRouter, AbstractStreamWriter, ) from .frozenlist import FrozenList from .helpers import DEBUG from .http_parser import RawRequestMessage from .log import web_logger from .signals import Signal from .streams import StreamReader from .web_log import AccessLogger from .web_middlewares import _fix_request_current_app from .web_protocol import RequestHandler from .web_request import Request from .web_response import StreamResponse from .web_routedef import AbstractRouteDef from .web_server import Server from .web_urldispatcher import ( AbstractResource, AbstractRoute, Domain, MaskDomain, MatchedSubAppResource, PrefixedSubAppResource, UrlDispatcher, ) __all__ = ("Application", "CleanupError") if TYPE_CHECKING: # pragma: no cover _AppSignal = Signal[Callable[["Application"], Awaitable[None]]] _RespPrepareSignal = Signal[Callable[[Request, StreamResponse], Awaitable[None]]] _Handler = Callable[[Request], Awaitable[StreamResponse]] _Middleware = Union[ Callable[[Request, _Handler], Awaitable[StreamResponse]], Callable[["Application", _Handler], Awaitable[_Handler]], # old-style ] _Middlewares = FrozenList[_Middleware] _MiddlewaresHandlers = Optional[Sequence[Tuple[_Middleware, bool]]] _Subapps = List["Application"] else: # No type checker mode, skip types _AppSignal = Signal _RespPrepareSignal = Signal _Handler = Callable _Middleware = Callable _Middlewares = FrozenList _MiddlewaresHandlers = Optional[Sequence] _Subapps = List class Application(MutableMapping[str, Any]): ATTRS = frozenset( [ "logger", "_debug", "_router", "_loop", "_handler_args", "_middlewares", "_middlewares_handlers", "_run_middlewares", "_state", "_frozen", "_pre_frozen", "_subapps", "_on_response_prepare", "_on_startup", "_on_shutdown", "_on_cleanup", "_client_max_size", "_cleanup_ctx", ] ) def __init__( self, *, logger: logging.Logger = web_logger, router: Optional[UrlDispatcher] = None, middlewares: Iterable[_Middleware] = (), handler_args: Optional[Mapping[str, Any]] = None, client_max_size: int = 1024 ** 2, loop: Optional[asyncio.AbstractEventLoop] = None, debug: Any = ..., # mypy doesn't support ellipsis ) -> None: if router is None: router = UrlDispatcher() else: warnings.warn( "router argument is deprecated", DeprecationWarning, stacklevel=2 ) assert isinstance(router, AbstractRouter), router if loop is not None: warnings.warn( "loop argument is deprecated", DeprecationWarning, stacklevel=2 ) if debug is not ...: warnings.warn( "debug argument is deprecated", DeprecationWarning, stacklevel=2 ) self._debug = debug self._router = router # type: UrlDispatcher self._loop = loop self._handler_args = handler_args self.logger = logger self._middlewares = FrozenList(middlewares) # type: _Middlewares # initialized on freezing self._middlewares_handlers = None # type: _MiddlewaresHandlers # initialized on freezing self._run_middlewares = None # type: Optional[bool] self._state = {} # type: Dict[str, Any] self._frozen = False self._pre_frozen = False self._subapps = [] # type: _Subapps self._on_response_prepare = Signal(self) # type: _RespPrepareSignal self._on_startup = Signal(self) # type: _AppSignal self._on_shutdown = Signal(self) # type: _AppSignal self._on_cleanup = Signal(self) # type: _AppSignal self._cleanup_ctx = CleanupContext() self._on_startup.append(self._cleanup_ctx._on_startup) self._on_cleanup.append(self._cleanup_ctx._on_cleanup) self._client_max_size = client_max_size def __init_subclass__(cls: Type["Application"]) -> None: warnings.warn( "Inheritance class {} from web.Application " "is discouraged".format(cls.__name__), DeprecationWarning, stacklevel=2, ) if DEBUG: # pragma: no cover def __setattr__(self, name: str, val: Any) -> None: if name not in self.ATTRS: warnings.warn( "Setting custom web.Application.{} attribute " "is discouraged".format(name), DeprecationWarning, stacklevel=2, ) super().__setattr__(name, val) # MutableMapping API def __eq__(self, other: object) -> bool: return self is other def __getitem__(self, key: str) -> Any: return self._state[key] def _check_frozen(self) -> None: if self._frozen: warnings.warn( "Changing state of started or joined " "application is deprecated", DeprecationWarning, stacklevel=3, ) def __setitem__(self, key: str, value: Any) -> None: self._check_frozen() self._state[key] = value def __delitem__(self, key: str) -> None: self._check_frozen() del self._state[key] def __len__(self) -> int: return len(self._state) def __iter__(self) -> Iterator[str]: return iter(self._state) ######## @property def loop(self) -> asyncio.AbstractEventLoop: # Technically the loop can be None # but we mask it by explicit type cast # to provide more convinient type annotation warnings.warn("loop property is deprecated", DeprecationWarning, stacklevel=2) return cast(asyncio.AbstractEventLoop, self._loop) def _set_loop(self, loop: Optional[asyncio.AbstractEventLoop]) -> None: if loop is None: loop = asyncio.get_event_loop() if self._loop is not None and self._loop is not loop: raise RuntimeError( "web.Application instance initialized with different loop" ) self._loop = loop # set loop debug if self._debug is ...: self._debug = loop.get_debug() # set loop to sub applications for subapp in self._subapps: subapp._set_loop(loop) @property def pre_frozen(self) -> bool: return self._pre_frozen def pre_freeze(self) -> None: if self._pre_frozen: return self._pre_frozen = True self._middlewares.freeze() self._router.freeze() self._on_response_prepare.freeze() self._cleanup_ctx.freeze() self._on_startup.freeze() self._on_shutdown.freeze() self._on_cleanup.freeze() self._middlewares_handlers = tuple(self._prepare_middleware()) # If current app and any subapp do not have middlewares avoid run all # of the code footprint that it implies, which have a middleware # hardcoded per app that sets up the current_app attribute. If no # middlewares are configured the handler will receive the proper # current_app without needing all of this code. self._run_middlewares = True if self.middlewares else False for subapp in self._subapps: subapp.pre_freeze() self._run_middlewares = self._run_middlewares or subapp._run_middlewares @property def frozen(self) -> bool: return self._frozen def freeze(self) -> None: if self._frozen: return self.pre_freeze() self._frozen = True for subapp in self._subapps: subapp.freeze() @property def debug(self) -> bool: warnings.warn("debug property is deprecated", DeprecationWarning, stacklevel=2) return self._debug def _reg_subapp_signals(self, subapp: "Application") -> None: def reg_handler(signame: str) -> None: subsig = getattr(subapp, signame) async def handler(app: "Application") -> None: await subsig.send(subapp) appsig = getattr(self, signame) appsig.append(handler) reg_handler("on_startup") reg_handler("on_shutdown") reg_handler("on_cleanup") def add_subapp(self, prefix: str, subapp: "Application") -> AbstractResource: if not isinstance(prefix, str): raise TypeError("Prefix must be str") prefix = prefix.rstrip("/") if not prefix: raise ValueError("Prefix cannot be empty") factory = partial(PrefixedSubAppResource, prefix, subapp) return self._add_subapp(factory, subapp) def _add_subapp( self, resource_factory: Callable[[], AbstractResource], subapp: "Application" ) -> AbstractResource: if self.frozen: raise RuntimeError("Cannot add sub application to frozen application") if subapp.frozen: raise RuntimeError("Cannot add frozen application") resource = resource_factory() self.router.register_resource(resource) self._reg_subapp_signals(subapp) self._subapps.append(subapp) subapp.pre_freeze() if self._loop is not None: subapp._set_loop(self._loop) return resource def add_domain(self, domain: str, subapp: "Application") -> AbstractResource: if not isinstance(domain, str): raise TypeError("Domain must be str") elif "*" in domain: rule = MaskDomain(domain) # type: Domain else: rule = Domain(domain) factory = partial(MatchedSubAppResource, rule, subapp) return self._add_subapp(factory, subapp) def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]: return self.router.add_routes(routes) @property def on_response_prepare(self) -> _RespPrepareSignal: return self._on_response_prepare @property def on_startup(self) -> _AppSignal: return self._on_startup @property def on_shutdown(self) -> _AppSignal: return self._on_shutdown @property def on_cleanup(self) -> _AppSignal: return self._on_cleanup @property def cleanup_ctx(self) -> "CleanupContext": return self._cleanup_ctx @property def router(self) -> UrlDispatcher: return self._router @property def middlewares(self) -> _Middlewares: return self._middlewares def _make_handler( self, *, loop: Optional[asyncio.AbstractEventLoop] = None, access_log_class: Type[AbstractAccessLogger] = AccessLogger, **kwargs: Any, ) -> Server: if not issubclass(access_log_class, AbstractAccessLogger): raise TypeError( "access_log_class must be subclass of " "aiohttp.abc.AbstractAccessLogger, got {}".format(access_log_class) ) self._set_loop(loop) self.freeze() kwargs["debug"] = self._debug kwargs["access_log_class"] = access_log_class if self._handler_args: for k, v in self._handler_args.items(): kwargs[k] = v return Server( self._handle, # type: ignore request_factory=self._make_request, loop=self._loop, **kwargs, ) def make_handler( self, *, loop: Optional[asyncio.AbstractEventLoop] = None, access_log_class: Type[AbstractAccessLogger] = AccessLogger, **kwargs: Any, ) -> Server: warnings.warn( "Application.make_handler(...) is deprecated, " "use AppRunner API instead", DeprecationWarning, stacklevel=2, ) return self._make_handler( loop=loop, access_log_class=access_log_class, **kwargs ) async def startup(self) -> None: """Causes on_startup signal Should be called in the event loop along with the request handler. """ await self.on_startup.send(self) async def shutdown(self) -> None: """Causes on_shutdown signal Should be called before cleanup() """ await self.on_shutdown.send(self) async def cleanup(self) -> None: """Causes on_cleanup signal Should be called after shutdown() """ await self.on_cleanup.send(self) def _make_request( self, message: RawRequestMessage, payload: StreamReader, protocol: RequestHandler, writer: AbstractStreamWriter, task: "asyncio.Task[None]", _cls: Type[Request] = Request, ) -> Request: return _cls( message, payload, protocol, writer, task, self._loop, client_max_size=self._client_max_size, ) def _prepare_middleware(self) -> Iterator[Tuple[_Middleware, bool]]: for m in reversed(self._middlewares): if getattr(m, "__middleware_version__", None) == 1: yield m, True else: warnings.warn( 'old-style middleware "{!r}" deprecated, ' "see #2252".format(m), DeprecationWarning, stacklevel=2, ) yield m, False yield _fix_request_current_app(self), True async def _handle(self, request: Request) -> StreamResponse: loop = asyncio.get_event_loop() debug = loop.get_debug() match_info = await self._router.resolve(request) if debug: # pragma: no cover if not isinstance(match_info, AbstractMatchInfo): raise TypeError( "match_info should be AbstractMatchInfo " "instance, not {!r}".format(match_info) ) match_info.add_app(self) match_info.freeze() resp = None request._match_info = match_info # type: ignore expect = request.headers.get(hdrs.EXPECT) if expect: resp = await match_info.expect_handler(request) await request.writer.drain() if resp is None: handler = match_info.handler if self._run_middlewares: for app in match_info.apps[::-1]: for m, new_style in app._middlewares_handlers: # type: ignore if new_style: handler = update_wrapper( partial(m, handler=handler), handler ) else: handler = await m(app, handler) # type: ignore resp = await handler(request) return resp def __call__(self) -> "Application": """gunicorn compatibility""" return self def __repr__(self) -> str: return "".format(id(self)) def __bool__(self) -> bool: return True class CleanupError(RuntimeError): @property def exceptions(self) -> List[BaseException]: return self.args[1] if TYPE_CHECKING: # pragma: no cover _CleanupContextBase = FrozenList[Callable[[Application], AsyncIterator[None]]] else: _CleanupContextBase = FrozenList class CleanupContext(_CleanupContextBase): def __init__(self) -> None: super().__init__() self._exits = [] # type: List[AsyncIterator[None]] async def _on_startup(self, app: Application) -> None: for cb in self: it = cb(app).__aiter__() await it.__anext__() self._exits.append(it) async def _on_cleanup(self, app: Application) -> None: errors = [] for it in reversed(self._exits): try: await it.__anext__() except StopAsyncIteration: pass except Exception as exc: errors.append(exc) else: errors.append(RuntimeError(f"{it!r} has more than one 'yield'")) if errors: if len(errors) == 1: raise errors[0] else: raise CleanupError("Multiple errors on cleanup stage", errors)