1
0
Fork 0
bthlabs-jsonrpc/packages/bthlabs-jsonrpc-aiohttp/bthlabs_jsonrpc_aiohttp/executor.py

101 lines
3.4 KiB
Python

# -*- coding: utf-8 -*-
# bthlabs-jsonrpc-aiohttp | (c) 2022-present Tomek Wójcik | MIT License
from __future__ import annotations
import logging
import inspect
import typing
from aiohttp import web
from bthlabs_jsonrpc_core import Codec, Executor, JSONRPCAccessDeniedError
from bthlabs_jsonrpc_core.exceptions import JSONRPCParseError
from bthlabs_jsonrpc_core.serializer import JSONRPCSerializer
LOGGER = logging.getLogger('bthlabs_jsonrpc.aiohttp.executor')
TCanCall = typing.Callable[[web.Request, str, list, dict], typing.Awaitable[bool]]
class AioHttpExecutor(Executor):
"""AioHttp-specific executor."""
def __init__(self,
request: web.Request,
can_call: TCanCall,
namespace: str | None = None,
codec: Codec | None = None,
):
super().__init__(namespace=namespace, codec=codec)
self.request = request
self.can_call = can_call
# pragma mark - Public interface
async def list_methods(self, *args, **kwargs) -> list[str]: # type: ignore[override]
return super().list_methods()
async def deserialize_data(self, request: web.Request) -> typing.Any: # type: ignore[override]
"""
Deserializes *data* and returns the result.
Raises :py:exc:`JSONRPCParseError` if there was an error in the process.
"""
try:
payload = await request.text()
result = self.codec.decode(payload)
if inspect.isawaitable(result):
return await result
return result
except Exception as exception:
LOGGER.error(
'Unhandled exception when deserializing RPC call: %s',
exception,
exc_info=exception,
)
raise JSONRPCParseError() from exception
def enrich_args(self, args: list) -> list:
"""
Injects the current :py:class:`aiohttp.web.Request` as the first
argument.
"""
return [self.request, *super().enrich_args(args)]
async def before_call(self, method: str, args: list, kwargs: dict):
"""
Executes *can_call* and raises :py:exc:`JSONRPCAccessDeniedError`
accordingly.
"""
can_call = await self.can_call(self.request, method, args, kwargs)
if can_call is False:
raise JSONRPCAccessDeniedError(data='can_call')
async def execute(self) -> JSONRPCSerializer | None: # type: ignore[override]
"""
Executes the JSONRPC request.
Returns an instance of :py:class:`JSONRPCSerializer` or ``None`` if
the list of responses is empty.
"""
with self.execute_context() as execute_context:
data = await self.deserialize_data(self.request)
calls = self.get_calls(data)
for call in calls:
with self.call_context(execute_context, call) as call_context:
if call_context.is_valid is True:
await self.before_call(
call_context.method,
call_context.args,
call_context.kwargs,
)
call_context.result = await call_context.handler(
*call_context.args, **call_context.kwargs,
)
return execute_context.serializer