398 lines
12 KiB
Python
398 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
# django-jsonrpc-core | (c) 2022-present Tomek Wójcik | MIT License
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass
|
|
import json
|
|
import logging
|
|
import typing
|
|
|
|
from bthlabs_jsonrpc_core.exceptions import (
|
|
BaseJSONRPCError,
|
|
JSONRPCInternalError,
|
|
JSONRPCInvalidParamsError,
|
|
JSONRPCInvalidRequestError,
|
|
JSONRPCMethodNotFoundError,
|
|
JSONRPCParseError,
|
|
)
|
|
from bthlabs_jsonrpc_core.registry import MethodRegistry
|
|
from bthlabs_jsonrpc_core.serializer import JSONRPCSerializer
|
|
|
|
LOGGER = logging.getLogger('bthlabs_jsonrpc.core.executor')
|
|
|
|
|
|
class Executor:
|
|
"""
|
|
*Executor* is the main interface for the integrations. It processes the
|
|
JSONRPC request, executes the calls and returns the responses.
|
|
|
|
*namespace* will be used to look up called methods in the registry. If
|
|
omitted, it'll fall back to the default namespace.
|
|
|
|
Example:
|
|
|
|
.. code-block:: python
|
|
|
|
def rpc_handler(request):
|
|
executor = Executor()
|
|
serializer = executor.execute(request.body)
|
|
|
|
return JSONResponse(serializer.data)
|
|
"""
|
|
|
|
# pragma mark - Private class attributes
|
|
|
|
# Supported internal methods.
|
|
# These methods will be resolved and handled internally.
|
|
INTERNAL_METHODS = ('system.list_methods',)
|
|
|
|
# The method registry class to use for handler lookups.
|
|
registry = MethodRegistry
|
|
|
|
# The serializer registry class to use for response serialization.
|
|
serializer = JSONRPCSerializer
|
|
|
|
@dataclass
|
|
class CallContext:
|
|
"""
|
|
The context of a single call.
|
|
|
|
:meta private:
|
|
"""
|
|
|
|
#: Method
|
|
method: str
|
|
|
|
#: Handler
|
|
handler: typing.Callable
|
|
|
|
#: Call args
|
|
args: list[typing.Any]
|
|
|
|
#: Call kwargs
|
|
kwargs: dict
|
|
|
|
#: Call result
|
|
result: typing.Optional[typing.Any] = None
|
|
|
|
@classmethod
|
|
def invalid_context(cls):
|
|
return cls(None, None, None, None)
|
|
|
|
@property
|
|
def is_valid(self) -> bool:
|
|
"""Returns ``True`` if the context is valid."""
|
|
return all((
|
|
self.method is not None,
|
|
self.handler is not None,
|
|
self.args is not None,
|
|
self.kwargs is not None,
|
|
))
|
|
|
|
@dataclass
|
|
class ExecuteContext:
|
|
"""
|
|
The context of an execute call.
|
|
|
|
:meta private:
|
|
"""
|
|
|
|
#: List of call results.
|
|
results: list
|
|
|
|
#: The serializer instance.
|
|
serializer: typing.Optional[JSONRPCSerializer] = None
|
|
|
|
# pragma mark - Private interface
|
|
|
|
def __init__(self, namespace=None):
|
|
self.namespace = namespace or MethodRegistry.DEFAULT_NAMESPACE
|
|
|
|
def get_internal_handler(self, method: str) -> typing.Callable:
|
|
"""
|
|
Returns the internal handler for *method* or raises
|
|
``JSONRPCMethodNotFoundError``.
|
|
|
|
:meta private:
|
|
"""
|
|
match method:
|
|
case 'system.list_methods':
|
|
return self.list_methods
|
|
|
|
case _:
|
|
raise JSONRPCMethodNotFoundError()
|
|
|
|
def get_calls(self, data: typing.Union[dict, list]) -> list:
|
|
"""
|
|
Returns the list of calls.
|
|
|
|
If *data* is a list, it's returned verbatim. If it's a dict, it's
|
|
wrapped in a list.
|
|
|
|
Raises ``JSONRPCInvalidRequestError`` if the effective list of calls
|
|
is empty:
|
|
|
|
:meta private:
|
|
"""
|
|
result = list()
|
|
if isinstance(data, list):
|
|
result = data
|
|
else:
|
|
result.append(data)
|
|
|
|
if len(result) == 0:
|
|
raise JSONRPCInvalidRequestError()
|
|
|
|
return result
|
|
|
|
def get_call_spec(self,
|
|
call: typing.Any,
|
|
) -> tuple[str, typing.Callable, list, dict]:
|
|
"""
|
|
Validates and pre-processes the *call*.
|
|
|
|
Returns tuple of *method*, *handler*, *args*, *kwargs*.
|
|
|
|
:meta private:
|
|
"""
|
|
method = None
|
|
handler = None
|
|
args = []
|
|
kwargs = {}
|
|
|
|
try:
|
|
assert isinstance(call, dict), JSONRPCInvalidRequestError
|
|
assert call.get('jsonrpc', None) == '2.0', JSONRPCInvalidRequestError
|
|
|
|
method = call.get('method', None)
|
|
assert method is not None, JSONRPCInvalidRequestError
|
|
|
|
if method in self.INTERNAL_METHODS:
|
|
handler = self.get_internal_handler(method)
|
|
else:
|
|
handler = self.registry.shared_registry().get_handler(
|
|
self.namespace, method,
|
|
)
|
|
|
|
assert handler is not None, JSONRPCMethodNotFoundError
|
|
except AssertionError as exception:
|
|
klass = exception.args[0]
|
|
raise klass()
|
|
|
|
call_params = call.get('params', None)
|
|
if call_params is not None:
|
|
if isinstance(call_params, list):
|
|
args = call_params
|
|
elif isinstance(call_params, dict):
|
|
kwargs = call_params
|
|
else:
|
|
raise JSONRPCInvalidParamsError()
|
|
|
|
args = self.enrich_args(args)
|
|
kwargs = self.enrich_kwargs(kwargs)
|
|
|
|
return method, handler, args, kwargs
|
|
|
|
def process_results(self,
|
|
results: list,
|
|
) -> typing.Optional[typing.Union[list, dict]]:
|
|
"""
|
|
Post-processes the *results* and returns responses.
|
|
|
|
If *results* is a single-element list, the result is a single
|
|
response object. Otherwise, it's a list of response objects.
|
|
|
|
If the effective response is empty (e.g. all the calls were
|
|
notifications), returns ``None``.
|
|
|
|
:meta private:
|
|
"""
|
|
responses = []
|
|
for result in results:
|
|
call, call_result = result
|
|
|
|
response: dict[str, typing.Any] = {
|
|
'jsonrpc': '2.0',
|
|
}
|
|
|
|
if call is None:
|
|
response['id'] = None
|
|
response['error'] = call_result
|
|
elif call.get('id', None) is not None:
|
|
response['id'] = call['id']
|
|
|
|
if isinstance(call_result, BaseJSONRPCError):
|
|
response['error'] = call_result
|
|
else:
|
|
response['result'] = call_result
|
|
else:
|
|
continue
|
|
|
|
responses.append(response)
|
|
|
|
if len(responses) == 0:
|
|
return None
|
|
elif len(responses) == 1:
|
|
return responses[0]
|
|
|
|
return responses
|
|
|
|
@contextmanager
|
|
def call_context(self, execute_context: ExecuteContext, call: dict):
|
|
"""
|
|
The call context manager. Yields ``CallContext``, which can be
|
|
invalid invalid if there was en error processing the call.
|
|
|
|
Handles errors and the call result accordingly.
|
|
|
|
:meta private:
|
|
"""
|
|
method = None
|
|
error = None
|
|
|
|
try:
|
|
context = self.CallContext.invalid_context()
|
|
try:
|
|
method, handler, args, kwargs = self.get_call_spec(call)
|
|
context = self.CallContext(method, handler, args, kwargs)
|
|
except BaseJSONRPCError as exception:
|
|
error = exception
|
|
|
|
yield context
|
|
except Exception as exception:
|
|
if isinstance(exception, BaseJSONRPCError):
|
|
error = exception
|
|
else:
|
|
LOGGER.error(
|
|
f'Error handling RPC method: {method}!',
|
|
exc_info=exception,
|
|
)
|
|
error = JSONRPCInternalError(str(exception))
|
|
finally:
|
|
if error is not None:
|
|
execute_context.results.append((call, error))
|
|
else:
|
|
execute_context.results.append((call, context.result))
|
|
|
|
@contextmanager
|
|
def execute_context(self):
|
|
"""
|
|
The execution context. Yields ``ExecuteContext``.
|
|
|
|
Handles errors and manages the serializer post execution.
|
|
|
|
:meta private:
|
|
"""
|
|
try:
|
|
context = self.ExecuteContext([])
|
|
yield context
|
|
except Exception as exc:
|
|
if isinstance(exc, BaseJSONRPCError):
|
|
context.results = [(None, exc)]
|
|
else:
|
|
raise
|
|
|
|
responses = self.process_results(context.results)
|
|
if responses is not None:
|
|
context.serializer = self.serializer(responses)
|
|
|
|
# pragma mark - Public interface
|
|
|
|
def deserialize_data(self, data: bytes) -> typing.Any:
|
|
"""
|
|
Deserializes *data* and returns the result.
|
|
|
|
Raises :py:exc:`JSONRPCParseError` if there was an error in the process.
|
|
Subclasses should also raise this exception, so it can be resulting
|
|
response object conforms to the spec.
|
|
"""
|
|
try:
|
|
return json.loads(data)
|
|
except Exception as exception:
|
|
LOGGER.error('Error deserializing RPC call!', exc_info=exception)
|
|
raise JSONRPCParseError() from exception
|
|
|
|
def list_methods(self, *args, **kwargs) -> list[str]:
|
|
"""
|
|
The handler for ``system.list_methods`` internal method.
|
|
|
|
Returns list of methods this *Executor* can handle.
|
|
"""
|
|
result = list(self.INTERNAL_METHODS)
|
|
result.extend(MethodRegistry.shared_registry().get_methods(
|
|
self.namespace,
|
|
))
|
|
|
|
return result
|
|
|
|
def enrich_args(self, args: list) -> list:
|
|
"""
|
|
Hook for subclasses to pass additional args to the handler. The default
|
|
implementation returns the *args* verbatim.
|
|
|
|
Example:
|
|
|
|
.. code-block:: python
|
|
|
|
class ExampleExecutor(Executor):
|
|
def enrich_args(self, args):
|
|
return ['spam', *args]
|
|
"""
|
|
return [*args]
|
|
|
|
def enrich_kwargs(self, kwargs: dict) -> dict:
|
|
"""
|
|
Hook for subclasses to pass additional kwaargs to the handler.
|
|
The default implementation returns the *kwargs* verbatim.
|
|
|
|
Example:
|
|
|
|
.. code-block:: python
|
|
|
|
class ExampleExecutor(Executor):
|
|
def enrich_kwargs(self, kwargs):
|
|
return {'spam': True, **kwargs}
|
|
"""
|
|
return {**kwargs}
|
|
|
|
def before_call(self, method: str, args: list, kwargs: dict):
|
|
"""
|
|
Hook for subclasses to perform additional operations before executing
|
|
the call.
|
|
|
|
If this method raises a subclass of
|
|
:py:exc:`BaseJSONRPCError`, it'll be used to construct the response
|
|
object directly. Any other exception will be wrapped in
|
|
:py:exc:`JSONRPCInternalError`.
|
|
|
|
The default implementation does nothing.
|
|
"""
|
|
pass
|
|
|
|
def execute(self,
|
|
payload: typing.Any,
|
|
) -> typing.Optional[JSONRPCSerializer]:
|
|
"""
|
|
Executes the JSONRPC request in *payload*.
|
|
|
|
Returns an instance of :py:class:`JSONRPCSerializer` or ``None`` if
|
|
the list of responses is empty.
|
|
"""
|
|
with self.execute_context() as execute_context:
|
|
data = self.deserialize_data(payload)
|
|
|
|
calls = self.get_calls(data)
|
|
for call in calls:
|
|
with self.call_context(execute_context, call) as call_context:
|
|
if call_context.is_valid is True:
|
|
self.before_call(
|
|
call_context.method,
|
|
call_context.args,
|
|
call_context.kwargs,
|
|
)
|
|
|
|
call_context.result = call_context.handler(
|
|
*call_context.args, **call_context.kwargs,
|
|
)
|
|
|
|
return execute_context.serializer
|