You've already forked hotpocket
BTHLABS-58: Share Extension in Apple Apps
This commit is contained in:
109
services/packages/soa/hotpocket_soa/services/auth_keys.py
Normal file
109
services/packages/soa/hotpocket_soa/services/auth_keys.py
Normal file
@@ -0,0 +1,109 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
|
||||
from hotpocket_backend.apps.accounts.services import (
|
||||
AuthKeysService as BackendAuthKeysService,
|
||||
)
|
||||
from hotpocket_soa.dto.accounts import AuthKeyOut
|
||||
|
||||
from .base import ProxyService, SOAError
|
||||
|
||||
|
||||
class AuthKeysService(ProxyService):
|
||||
class AuthKeysServiceError(SOAError):
|
||||
pass
|
||||
|
||||
class AuthKeyNotFound(AuthKeysServiceError):
|
||||
pass
|
||||
|
||||
class AuthKeyAccessDenied(AuthKeysServiceError):
|
||||
pass
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.backend_auth_keys_service = BackendAuthKeysService()
|
||||
|
||||
def wrap_exception(self, exception: Exception) -> Exception:
|
||||
new_exception_args = []
|
||||
if len(exception.args) > 0:
|
||||
new_exception_args = [exception.args[0]]
|
||||
|
||||
return self.AuthKeysServiceError(*new_exception_args)
|
||||
|
||||
def _check_auth_key_access(self,
|
||||
auth_key: AuthKeyOut,
|
||||
account_uuid: uuid.UUID | None,
|
||||
) -> bool:
|
||||
if account_uuid is not None:
|
||||
return auth_key.account_uuid == account_uuid
|
||||
|
||||
return True
|
||||
|
||||
def create(self,
|
||||
*,
|
||||
account_uuid: uuid.UUID,
|
||||
) -> AuthKeyOut:
|
||||
return AuthKeyOut.model_validate(
|
||||
self.call(
|
||||
self.backend_auth_keys_service,
|
||||
'create',
|
||||
account_uuid=account_uuid,
|
||||
),
|
||||
from_attributes=True,
|
||||
)
|
||||
|
||||
def get(self,
|
||||
*,
|
||||
account_uuid: uuid.UUID,
|
||||
pk: uuid.UUID,
|
||||
) -> AuthKeyOut:
|
||||
try:
|
||||
result = AuthKeyOut.model_validate(
|
||||
self.call(
|
||||
self.backend_auth_keys_service,
|
||||
'get',
|
||||
pk=pk,
|
||||
),
|
||||
from_attributes=True,
|
||||
)
|
||||
|
||||
if self._check_auth_key_access(result, account_uuid) is False:
|
||||
raise self.AuthKeyAccessDenied(
|
||||
f'account_uuid=`{account_uuid}` pk=`{pk}`',
|
||||
)
|
||||
|
||||
return result
|
||||
except SOAError as exception:
|
||||
if isinstance(exception.__cause__, BackendAuthKeysService.AuthKeyNotFound) is True:
|
||||
raise self.AuthKeyNotFound(*exception.args) from exception
|
||||
else:
|
||||
raise
|
||||
|
||||
def get_by_key(self,
|
||||
*,
|
||||
account_uuid: uuid.UUID | None,
|
||||
key: str,
|
||||
) -> AuthKeyOut:
|
||||
try:
|
||||
result = AuthKeyOut.model_validate(
|
||||
self.call(
|
||||
self.backend_auth_keys_service,
|
||||
'get_by_key',
|
||||
key=key,
|
||||
),
|
||||
from_attributes=True,
|
||||
)
|
||||
|
||||
if self._check_auth_key_access(result, account_uuid) is False:
|
||||
raise self.AuthKeyAccessDenied(
|
||||
f'account_uuid=`{account_uuid}` key=`{key}`',
|
||||
)
|
||||
|
||||
return result
|
||||
except SOAError as exception:
|
||||
if isinstance(exception.__cause__, BackendAuthKeysService.AuthKeyNotFound) is True:
|
||||
raise self.AuthKeyNotFound(*exception.args) from exception
|
||||
else:
|
||||
raise
|
||||
Reference in New Issue
Block a user