You've already forked hotpocket
96 lines
2.6 KiB
Python
96 lines
2.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import logging
|
|
import uuid
|
|
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils.timezone import now
|
|
import uuid6
|
|
|
|
from hotpocket_backend.apps.accounts.models import AuthKey
|
|
from hotpocket_backend.apps.core.conf import settings
|
|
from hotpocket_soa.exceptions.backend import (
|
|
InternalError,
|
|
Invalid as InvalidError,
|
|
NotFound as NotFoundError,
|
|
)
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class AuthKeysService:
|
|
class AuthKeysServiceError(Exception):
|
|
pass
|
|
|
|
class Invalid(InvalidError, AuthKeysServiceError):
|
|
pass
|
|
|
|
class NotFound(NotFoundError, AuthKeysServiceError):
|
|
pass
|
|
|
|
class Expired(InternalError, AuthKeysServiceError):
|
|
pass
|
|
|
|
def create(self, *, account_uuid: uuid.UUID) -> AuthKey:
|
|
try:
|
|
key = str(uuid6.uuid7())
|
|
|
|
return AuthKey.objects.create(
|
|
account_uuid=account_uuid,
|
|
key=key,
|
|
)
|
|
except ValidationError as exception:
|
|
raise self.Invalid.from_django_validation_error(exception)
|
|
|
|
def get(self, *, pk: uuid.UUID) -> AuthKey:
|
|
try:
|
|
query_set = AuthKey.active_objects
|
|
|
|
return query_set.get(pk=pk)
|
|
except AuthKey.DoesNotExist as exception:
|
|
raise self.NotFound(
|
|
f'Auth Key not found: pk=`{pk}`',
|
|
) from exception
|
|
|
|
def get_by_key(self, *, key: str, ttl: int | None = None) -> AuthKey:
|
|
try:
|
|
query_set = AuthKey.active_objects
|
|
|
|
result = query_set.get(key=key)
|
|
|
|
if ttl is None:
|
|
ttl = settings.AUTH_KEY_TTL
|
|
|
|
if ttl > 0:
|
|
if result.created_at < now() - datetime.timedelta(seconds=ttl):
|
|
raise self.Expired(
|
|
f'Auth Key expired: pk=`{key}`',
|
|
)
|
|
|
|
if result.consumed_at is not None:
|
|
raise self.Expired(
|
|
f'Auth Key already consumed: pk=`{key}`',
|
|
)
|
|
|
|
return result
|
|
except AuthKey.DoesNotExist as exception:
|
|
raise self.NotFound(
|
|
f'Auth Key not found: key=`{key}`',
|
|
) from exception
|
|
|
|
def clean_expired_auth_keys(self) -> int:
|
|
current_timestamp = now()
|
|
cutoff_timestamp = current_timestamp - datetime.timedelta(
|
|
seconds=(settings.AUTH_KEY_TTL + 5),
|
|
)
|
|
|
|
deleted, _ = AuthKey.active_objects.\
|
|
filter(
|
|
created_at__lte=cutoff_timestamp,
|
|
).\
|
|
delete()
|
|
|
|
return deleted
|