2022-06-04 08:41:53 +00:00
|
|
|
# -*- coding: utf-8 -*-
|
2024-01-15 20:20:10 +00:00
|
|
|
# bthlabs-jsonrpc-django | (c) 2022-present Tomek Wójcik | MIT License
|
|
|
|
from __future__ import annotations
|
|
|
|
|
2022-06-04 08:41:53 +00:00
|
|
|
import typing
|
|
|
|
|
2024-01-15 20:20:10 +00:00
|
|
|
from bthlabs_jsonrpc_core import Codec, Executor, JSONRPCAccessDeniedError
|
2022-06-04 08:41:53 +00:00
|
|
|
from django.http import HttpRequest
|
|
|
|
|
|
|
|
from bthlabs_jsonrpc_django.serializer import DjangoJSONRPCSerializer
|
|
|
|
|
2024-01-15 20:20:10 +00:00
|
|
|
TCanCall = typing.Callable[[HttpRequest, str, list, dict], bool]
|
|
|
|
|
2022-06-04 08:41:53 +00:00
|
|
|
|
|
|
|
class DjangoExecutor(Executor):
|
2024-01-15 20:20:10 +00:00
|
|
|
"""Django-specific executor"""
|
|
|
|
|
2022-06-04 08:41:53 +00:00
|
|
|
serializer = DjangoJSONRPCSerializer
|
|
|
|
|
|
|
|
def __init__(self,
|
|
|
|
request: HttpRequest,
|
2024-01-15 20:20:10 +00:00
|
|
|
can_call: TCanCall,
|
|
|
|
namespace: str | None = None,
|
|
|
|
codec: Codec | None = None):
|
|
|
|
super().__init__(namespace=namespace, codec=codec)
|
|
|
|
self.request = request
|
|
|
|
self.can_call = can_call
|
|
|
|
|
|
|
|
# pragma mark - Public interface
|
2022-06-04 08:41:53 +00:00
|
|
|
|
2024-01-15 20:20:10 +00:00
|
|
|
def enrich_args(self, args: list) -> list:
|
|
|
|
"""
|
|
|
|
Injects the current :py:class:`django.http.HttpRequest` as the first
|
|
|
|
argument.
|
|
|
|
"""
|
2022-06-04 08:41:53 +00:00
|
|
|
return [self.request, *super().enrich_args(args)]
|
|
|
|
|
2024-01-15 20:20:10 +00:00
|
|
|
def before_call(self, method: str, args: list, kwargs: dict):
|
|
|
|
"""
|
|
|
|
Executes *can_call* and raises :py:exc:`JSONRPCAccessDeniedError`
|
|
|
|
accordingly.
|
|
|
|
"""
|
2022-06-04 08:41:53 +00:00
|
|
|
can_call = self.can_call(self.request, method, args, kwargs)
|
|
|
|
if can_call is False:
|
|
|
|
raise JSONRPCAccessDeniedError(data='can_call')
|