Files
hotpocket/services/packages/soa/hotpocket_soa/services/auth_keys.py
Tomek Wójcik 8b86145519 BTHLABS-61: Service layer refactoring
A journey to fix `ValidationError` in Pocket imports turned service
layer refactoring :D
2025-10-12 20:54:00 +02:00

101 lines
2.8 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import annotations
import http
import uuid
from hotpocket_backend.apps.accounts.services import (
AuthKeysService as BackendAuthKeysService,
)
from hotpocket_soa.dto.accounts import AuthKeyOut
from hotpocket_soa.exceptions.backend import NotFound
from .base import ProxyService, SOAError
class AuthKeysService(ProxyService):
class AuthKeysServiceError(SOAError):
pass
class NotFound(AuthKeysServiceError):
pass
class AccessDenied(AuthKeysServiceError):
pass
def __init__(self):
super().__init__()
self.backend_auth_keys_service = BackendAuthKeysService()
def _check_auth_key_access(self,
auth_key: AuthKeyOut,
account_uuid: uuid.UUID | None,
) -> bool:
if account_uuid is not None:
return auth_key.account_uuid == account_uuid
return True
def create(self,
*,
account_uuid: uuid.UUID,
) -> AuthKeyOut:
return AuthKeyOut.model_validate(
self.call(
self.backend_auth_keys_service,
'create',
account_uuid=account_uuid,
),
from_attributes=True,
)
def get(self,
*,
account_uuid: uuid.UUID,
pk: uuid.UUID,
) -> AuthKeyOut:
try:
result = AuthKeyOut.model_validate(
self.call(
self.backend_auth_keys_service,
'get',
pk=pk,
),
from_attributes=True,
)
if self._check_auth_key_access(result, account_uuid) is False:
raise self.AccessDenied(
http.HTTPStatus.FORBIDDEN.value,
f'account_uuid=`{account_uuid}` pk=`{pk}`',
)
return result
except NotFound as exception:
raise self.NotFound.from_backend_error(exception)
def get_by_key(self,
*,
account_uuid: uuid.UUID | None,
key: str,
) -> AuthKeyOut:
try:
result = AuthKeyOut.model_validate(
self.call(
self.backend_auth_keys_service,
'get_by_key',
key=key,
),
from_attributes=True,
)
if self._check_auth_key_access(result, account_uuid) is False:
raise self.AccessDenied(
http.HTTPStatus.FORBIDDEN.value,
f'account_uuid=`{account_uuid}` key=`{key}`',
)
return result
except NotFound as exception:
raise self.NotFound.from_backend_error(exception)