1
0
Fork 0
bthlabs-jsonrpc/packages/bthlabs-jsonrpc-core/bthlabs_jsonrpc_core/executor.py

398 lines
12 KiB
Python

# -*- coding: utf-8 -*-
# django-jsonrpc-core | (c) 2022-present Tomek Wójcik | MIT License
from contextlib import contextmanager
from dataclasses import dataclass
import json
import logging
import typing
from bthlabs_jsonrpc_core.exceptions import (
BaseJSONRPCError,
JSONRPCInternalError,
JSONRPCInvalidParamsError,
JSONRPCInvalidRequestError,
JSONRPCMethodNotFoundError,
JSONRPCParseError,
)
from bthlabs_jsonrpc_core.registry import MethodRegistry
from bthlabs_jsonrpc_core.serializer import JSONRPCSerializer
LOGGER = logging.getLogger('bthlabs_jsonrpc.core.executor')
class Executor:
"""
*Executor* is the main interface for the integrations. It processes the
JSONRPC request, executes the calls and returns the responses.
*namespace* will be used to look up called methods in the registry. If
omitted, it'll fall back to the default namespace.
Example:
.. code-block:: python
def rpc_handler(request):
executor = Executor()
serializer = executor.execute(request.body)
return JSONResponse(serializer.data)
"""
# pragma mark - Private class attributes
# Supported internal methods.
# These methods will be resolved and handled internally.
INTERNAL_METHODS = ('system.list_methods',)
# The method registry class to use for handler lookups.
registry = MethodRegistry
# The serializer registry class to use for response serialization.
serializer = JSONRPCSerializer
@dataclass
class CallContext:
"""
The context of a single call.
:meta private:
"""
#: Method
method: str
#: Handler
handler: typing.Callable
#: Call args
args: list[typing.Any]
#: Call kwargs
kwargs: dict
#: Call result
result: typing.Optional[typing.Any] = None
@classmethod
def invalid_context(cls):
return cls(None, None, None, None)
@property
def is_valid(self) -> bool:
"""Returns ``True`` if the context is valid."""
return all((
self.method is not None,
self.handler is not None,
self.args is not None,
self.kwargs is not None,
))
@dataclass
class ExecuteContext:
"""
The context of an execute call.
:meta private:
"""
#: List of call results.
results: list
#: The serializer instance.
serializer: typing.Optional[JSONRPCSerializer] = None
# pragma mark - Private interface
def __init__(self, namespace=None):
self.namespace = namespace or MethodRegistry.DEFAULT_NAMESPACE
def get_internal_handler(self, method: str) -> typing.Callable:
"""
Returns the internal handler for *method* or raises
``JSONRPCMethodNotFoundError``.
:meta private:
"""
match method:
case 'system.list_methods':
return self.list_methods
case _:
raise JSONRPCMethodNotFoundError()
def get_calls(self, data: typing.Union[dict, list]) -> list:
"""
Returns the list of calls.
If *data* is a list, it's returned verbatim. If it's a dict, it's
wrapped in a list.
Raises ``JSONRPCInvalidRequestError`` if the effective list of calls
is empty:
:meta private:
"""
result = list()
if isinstance(data, list):
result = data
else:
result.append(data)
if len(result) == 0:
raise JSONRPCInvalidRequestError()
return result
def get_call_spec(self,
call: typing.Any,
) -> tuple[str, typing.Callable, list, dict]:
"""
Validates and pre-processes the *call*.
Returns tuple of *method*, *handler*, *args*, *kwargs*.
:meta private:
"""
method = None
handler = None
args = []
kwargs = {}
try:
assert isinstance(call, dict), JSONRPCInvalidRequestError
assert call.get('jsonrpc', None) == '2.0', JSONRPCInvalidRequestError
method = call.get('method', None)
assert method is not None, JSONRPCInvalidRequestError
if method in self.INTERNAL_METHODS:
handler = self.get_internal_handler(method)
else:
handler = self.registry.shared_registry().get_handler(
self.namespace, method,
)
assert handler is not None, JSONRPCMethodNotFoundError
except AssertionError as exception:
klass = exception.args[0]
raise klass()
call_params = call.get('params', None)
if call_params is not None:
if isinstance(call_params, list):
args = call_params
elif isinstance(call_params, dict):
kwargs = call_params
else:
raise JSONRPCInvalidParamsError()
args = self.enrich_args(args)
kwargs = self.enrich_kwargs(kwargs)
return method, handler, args, kwargs
def process_results(self,
results: list,
) -> typing.Optional[typing.Union[list, dict]]:
"""
Post-processes the *results* and returns responses.
If *results* is a single-element list, the result is a single
response object. Otherwise, it's a list of response objects.
If the effective response is empty (e.g. all the calls were
notifications), returns ``None``.
:meta private:
"""
responses = []
for result in results:
call, call_result = result
response: dict[str, typing.Any] = {
'jsonrpc': '2.0',
}
if call is None:
response['id'] = None
response['error'] = call_result
elif call.get('id', None) is not None:
response['id'] = call['id']
if isinstance(call_result, BaseJSONRPCError):
response['error'] = call_result
else:
response['result'] = call_result
else:
continue
responses.append(response)
if len(responses) == 0:
return None
elif len(responses) == 1:
return responses[0]
return responses
@contextmanager
def call_context(self, execute_context: ExecuteContext, call: dict):
"""
The call context manager. Yields ``CallContext``, which can be
invalid invalid if there was en error processing the call.
Handles errors and the call result accordingly.
:meta private:
"""
method = None
error = None
try:
context = self.CallContext.invalid_context()
try:
method, handler, args, kwargs = self.get_call_spec(call)
context = self.CallContext(method, handler, args, kwargs)
except BaseJSONRPCError as exception:
error = exception
yield context
except Exception as exception:
if isinstance(exception, BaseJSONRPCError):
error = exception
else:
LOGGER.error(
f'Error handling RPC method: {method}!',
exc_info=exception,
)
error = JSONRPCInternalError(str(exception))
finally:
if error is not None:
execute_context.results.append((call, error))
else:
execute_context.results.append((call, context.result))
@contextmanager
def execute_context(self):
"""
The execution context. Yields ``ExecuteContext``.
Handles errors and manages the serializer post execution.
:meta private:
"""
try:
context = self.ExecuteContext([])
yield context
except Exception as exc:
if isinstance(exc, BaseJSONRPCError):
context.results = [(None, exc)]
else:
raise
responses = self.process_results(context.results)
if responses is not None:
context.serializer = self.serializer(responses)
# pragma mark - Public interface
def deserialize_data(self, data: bytes) -> typing.Any:
"""
Deserializes *data* and returns the result.
Raises :py:exc:`JSONRPCParseError` if there was an error in the process.
Subclasses should also raise this exception, so it can be resulting
response object conforms to the spec.
"""
try:
return json.loads(data)
except Exception as exception:
LOGGER.error('Error deserializing RPC call!', exc_info=exception)
raise JSONRPCParseError() from exception
def list_methods(self, *args, **kwargs) -> list[str]:
"""
The handler for ``system.list_methods`` internal method.
Returns list of methods this *Executor* can handle.
"""
result = list(self.INTERNAL_METHODS)
result.extend(MethodRegistry.shared_registry().get_methods(
self.namespace,
))
return result
def enrich_args(self, args: list) -> list:
"""
Hook for subclasses to pass additional args to the handler. The default
implementation returns the *args* verbatim.
Example:
.. code-block:: python
class ExampleExecutor(Executor):
def enrich_args(self, args):
return ['spam', *args]
"""
return [*args]
def enrich_kwargs(self, kwargs: dict) -> dict:
"""
Hook for subclasses to pass additional kwaargs to the handler.
The default implementation returns the *kwargs* verbatim.
Example:
.. code-block:: python
class ExampleExecutor(Executor):
def enrich_kwargs(self, kwargs):
return {'spam': True, **kwargs}
"""
return {**kwargs}
def before_call(self, method: str, args: list, kwargs: dict):
"""
Hook for subclasses to perform additional operations before executing
the call.
If this method raises a subclass of
:py:exc:`BaseJSONRPCError`, it'll be used to construct the response
object directly. Any other exception will be wrapped in
:py:exc:`JSONRPCInternalError`.
The default implementation does nothing.
"""
pass
def execute(self,
payload: typing.Any,
) -> typing.Optional[JSONRPCSerializer]:
"""
Executes the JSONRPC request in *payload*.
Returns an instance of :py:class:`JSONRPCSerializer` or ``None`` if
the list of responses is empty.
"""
with self.execute_context() as execute_context:
data = self.deserialize_data(payload)
calls = self.get_calls(data)
for call in calls:
with self.call_context(execute_context, call) as call_context:
if call_context.is_valid is True:
self.before_call(
call_context.method,
call_context.args,
call_context.kwargs,
)
call_context.result = call_context.handler(
*call_context.args, **call_context.kwargs,
)
return execute_context.serializer