# -*- coding: utf-8 -*- # bthlabs-jsonrpc-aiohttp | (c) 2022-present Tomek Wójcik | MIT License from __future__ import annotations import logging import inspect import typing from aiohttp import web from bthlabs_jsonrpc_core import Codec, Executor, JSONRPCAccessDeniedError from bthlabs_jsonrpc_core.exceptions import JSONRPCParseError from bthlabs_jsonrpc_core.serializer import JSONRPCSerializer LOGGER = logging.getLogger('bthlabs_jsonrpc.aiohttp.executor') TCanCall = typing.Callable[[web.Request, str, list, dict], typing.Awaitable[bool]] class AioHttpExecutor(Executor): """AioHttp-specific executor.""" def __init__(self, request: web.Request, can_call: TCanCall, namespace: str | None = None, codec: Codec | None = None, ): super().__init__(namespace=namespace, codec=codec) self.request = request self.can_call = can_call # pragma mark - Public interface async def list_methods(self, *args, **kwargs) -> list[str]: # type: ignore[override] return super().list_methods() async def deserialize_data(self, request: web.Request) -> typing.Any: # type: ignore[override] """ Deserializes *data* and returns the result. Raises :py:exc:`JSONRPCParseError` if there was an error in the process. """ try: payload = await request.text() result = self.codec.decode(payload) if inspect.isawaitable(result): return await result return result except Exception as exception: LOGGER.error( 'Unhandled exception when deserializing RPC call: %s', exception, exc_info=exception, ) raise JSONRPCParseError() from exception def enrich_args(self, args: list) -> list: """ Injects the current :py:class:`aiohttp.web.Request` as the first argument. """ return [self.request, *super().enrich_args(args)] async def before_call(self, method: str, args: list, kwargs: dict): """ Executes *can_call* and raises :py:exc:`JSONRPCAccessDeniedError` accordingly. """ can_call = await self.can_call(self.request, method, args, kwargs) if can_call is False: raise JSONRPCAccessDeniedError(data='can_call') async def execute(self) -> JSONRPCSerializer | None: # type: ignore[override] """ Executes the JSONRPC request. Returns an instance of :py:class:`JSONRPCSerializer` or ``None`` if the list of responses is empty. """ with self.execute_context() as execute_context: data = await self.deserialize_data(self.request) calls = self.get_calls(data) for call in calls: with self.call_context(execute_context, call) as call_context: if call_context.is_valid is True: await self.before_call( call_context.method, call_context.args, call_context.kwargs, ) call_context.result = await call_context.handler( *call_context.args, **call_context.kwargs, ) return execute_context.serializer