You've already forked keep-it-secret
Release v1.0.0
This commit is contained in:
18
keep_it_secret/__init__.py
Normal file
18
keep_it_secret/__init__.py
Normal file
@@ -0,0 +1,18 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import annotations
|
||||
|
||||
from .fields import ( # noqa: F401
|
||||
AbstractField, EnvField, Field, LiteralField, SecretsField,
|
||||
)
|
||||
from .secrets import Secrets # noqa: F401
|
||||
|
||||
__version__ = '1.0.0'
|
||||
|
||||
__all__ = [
|
||||
'AbstractField',
|
||||
'EnvField',
|
||||
'Field',
|
||||
'LiteralField',
|
||||
'SecretsField',
|
||||
'Secrets',
|
||||
]
|
||||
0
keep_it_secret/ext/__init__.py
Normal file
0
keep_it_secret/ext/__init__.py
Normal file
149
keep_it_secret/ext/aws.py
Normal file
149
keep_it_secret/ext/aws.py
Normal file
@@ -0,0 +1,149 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import typing
|
||||
|
||||
import boto3
|
||||
|
||||
from keep_it_secret.fields import EnvField, Field
|
||||
from keep_it_secret.secrets import Secrets
|
||||
|
||||
|
||||
class PAWSSecretsManagerClient(typing.Protocol):
|
||||
def get_secret_value(self,
|
||||
*,
|
||||
SecretId: str,
|
||||
VersionString: str | None = None,
|
||||
VersionStage: str | None = None) -> typing.Any:
|
||||
...
|
||||
|
||||
|
||||
class AWSSecrets(Secrets):
|
||||
"""
|
||||
Concrete :py:class:`keep_it_secret.Secrets` subclass that maps environment
|
||||
variables to AWS credentials.
|
||||
"""
|
||||
|
||||
access_key_id: str | None = EnvField.new('AWS_ACCESS_KEY_ID', required=False)
|
||||
"""
|
||||
Maps ``AWS_ACCESS_KEY_ID`` environment variable. Optional, defaults to
|
||||
``None``.
|
||||
|
||||
:type: ``str | None``
|
||||
"""
|
||||
|
||||
secret_access_key: str | None = EnvField.new('AWS_SECRET_ACCESS_KEY', required=False)
|
||||
"""
|
||||
Maps ``AWS_SECRET_ACCESS_KEY`` environment variable. Optional, defaults to
|
||||
``None``.
|
||||
|
||||
:type: ``str | None``
|
||||
"""
|
||||
|
||||
session_token: str | None = EnvField.new('AWS_SESSION_TOKEN', required=False)
|
||||
"""
|
||||
Maps ``AWS_SESSION_TOKEN`` environment variable. Optional, defaults to
|
||||
``None``.
|
||||
|
||||
:type: ``str | None``
|
||||
"""
|
||||
|
||||
region_name: str | None = EnvField.new('AWS_DEFAULT_REGION', required=False)
|
||||
"""
|
||||
Maps ``AWS_DEFAULT_REGION`` environment variable. Optional, defaults to
|
||||
``None``.
|
||||
|
||||
:type: ``str | None``
|
||||
"""
|
||||
|
||||
def as_boto3_client_kwargs(self) -> dict[str, typing.Any]:
|
||||
"""
|
||||
Return representation of the mapped variables for use in
|
||||
``boto3.client()`` call.
|
||||
"""
|
||||
result = {}
|
||||
|
||||
if self.access_key_id is not None:
|
||||
result['aws_access_key_id'] = self.access_key_id
|
||||
|
||||
if self.secret_access_key is not None:
|
||||
result['aws_secret_access_key'] = self.secret_access_key
|
||||
|
||||
if self.session_token is not None:
|
||||
result['aws_session_token'] = self.session_token
|
||||
|
||||
if self.region_name is not None:
|
||||
result['region_name'] = self.region_name
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class AWSSecretsManagerField(Field):
|
||||
"""
|
||||
Concrete :py:class:`keep_it_secret.Field` subclass that uses AWS Secrets
|
||||
Manager to resolve the value.
|
||||
|
||||
:param secret_id: ID of the secret to fetch.
|
||||
:param default: Default value. Defaults to ``None``.
|
||||
:param decoder: A callable to decode the fetched value. Defaults to
|
||||
``json.loads``.
|
||||
"""
|
||||
def __init__(self,
|
||||
secret_id: str,
|
||||
default: typing.Any = None,
|
||||
decoder: typing.Callable = json.loads,
|
||||
**field_options):
|
||||
field_options['as_type'] = None
|
||||
super().__init__(**field_options)
|
||||
self.secret_id = secret_id
|
||||
self.default = default
|
||||
self.decoder = decoder
|
||||
|
||||
self.client: PAWSSecretsManagerClient | None = None
|
||||
|
||||
@classmethod
|
||||
def new(cls: type[AWSSecretsManagerField], # type: ignore[override]
|
||||
secret_id: str,
|
||||
default: typing.Any = None,
|
||||
decoder: typing.Callable = json.loads,
|
||||
**field_options) -> AWSSecretsManagerField:
|
||||
return cls(secret_id, default=default, decoder=decoder, **field_options)
|
||||
|
||||
def get_client(self, secrets: Secrets) -> PAWSSecretsManagerClient:
|
||||
if self.client is None:
|
||||
aws_secrets = typing.cast(AWSSecrets, secrets.aws) # type: ignore[attr-defined]
|
||||
|
||||
self.client = boto3.client(
|
||||
'secretsmanager',
|
||||
**aws_secrets.as_boto3_client_kwargs(),
|
||||
)
|
||||
|
||||
return self.client
|
||||
|
||||
def get_value(self, secrets: Secrets) -> typing.Any:
|
||||
"""
|
||||
Retrieve, decode and return the secret specified by *secret_id*.
|
||||
|
||||
Depends on :py:class:`AWSSecrets` to be declared in ``secrets.aws``
|
||||
field.
|
||||
|
||||
:raises DependencyMissing: Signal that ``secrets.aws`` field is
|
||||
missing.
|
||||
:raises RequiredValueMissing: Signal the field's value is required but
|
||||
*secret_id* is not present in the Secrets Manager.
|
||||
"""
|
||||
if hasattr(secrets, 'aws') is False:
|
||||
raise self.DependencyMissing('aws')
|
||||
|
||||
client = self.get_client(secrets)
|
||||
|
||||
try:
|
||||
secret = client.get_secret_value(SecretId=self.secret_id)
|
||||
|
||||
return self.decoder(secret['SecretString'])
|
||||
except client.exceptions.ResourceNotFoundException as exception: # type: ignore[attr-defined]
|
||||
if self.required is True:
|
||||
raise self.RequiredValueMissing(self.secret_id) from exception
|
||||
else:
|
||||
return self.default
|
||||
18
keep_it_secret/ext/loader.py
Normal file
18
keep_it_secret/ext/loader.py
Normal file
@@ -0,0 +1,18 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
import typing
|
||||
|
||||
|
||||
def load_secrets(package: str, env: str, app: str) -> typing.Any:
|
||||
"""
|
||||
A basic secrets loader. Will attempt to import the secrets module and
|
||||
return the ``__secrets__`` attribute.
|
||||
|
||||
:param package: The package which contains the module.
|
||||
:param env: Environment identifier (e.g. ``development``).
|
||||
:param app: Application identifier (e.g. ``weather_service``).
|
||||
"""
|
||||
secrets_module = importlib.import_module(f'{package}.{env}.{app}')
|
||||
return secrets_module.__secrets__
|
||||
287
keep_it_secret/fields.py
Normal file
287
keep_it_secret/fields.py
Normal file
@@ -0,0 +1,287 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import os
|
||||
import typing
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from keep_it_secret.secrets import Secrets
|
||||
|
||||
|
||||
class Field(abc.ABC):
|
||||
"""
|
||||
Base class for fields.
|
||||
|
||||
Example:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
class SpamField(Field):
|
||||
def get_value(self, secrets):
|
||||
return 'spam'
|
||||
|
||||
class AppSecrets(Secrets):
|
||||
spam: str = SpamField.new()
|
||||
|
||||
Normal instantiation and using the ``new()`` factory method are
|
||||
functionally equal. The ``new()`` method is provided for compatibility
|
||||
with type annotations.
|
||||
|
||||
.. code-block:: pycon
|
||||
|
||||
>>> spam_one = SpamField.new()
|
||||
>>> spam_two = SpamField()
|
||||
>>> spam_one(secrets) == spam_two(secrets)
|
||||
True
|
||||
|
||||
The field is evaluated when its instance is called. This is done during
|
||||
initialization of the :py:class:`keep_it_secret.Secrets` subclass in which
|
||||
the field was used. The result of this is either the value cast to
|
||||
``as_type`` (if specified) or ``None``.
|
||||
|
||||
Note that the base class doesn't enforce the ``required`` flag. Its
|
||||
behaviour is implementation specific.
|
||||
|
||||
:param as_type: Type to cast the value to. If ``None``, no casting will be
|
||||
done. Defaults to ``str``.
|
||||
:param required: Required flag. Defaults to ``True``.
|
||||
:param description: Human readable description. Defaults to ``None``.
|
||||
"""
|
||||
|
||||
class KeepItSecretFieldError(Exception):
|
||||
"""Base class for field exceptions."""
|
||||
pass
|
||||
|
||||
class RequiredValueMissing(KeepItSecretFieldError):
|
||||
"""Raised when the field's value is required but missing."""
|
||||
pass
|
||||
|
||||
class DependencyMissing(KeepItSecretFieldError):
|
||||
"""Raised when the field depends on another, which isn't defined."""
|
||||
pass
|
||||
|
||||
def __init__(self, *, as_type: type | None = str, required: bool = True, description: str | None = None):
|
||||
self.as_type = as_type
|
||||
self.required = required
|
||||
self.description = description
|
||||
self.name = str(self)
|
||||
|
||||
@classmethod
|
||||
def new(cls: type[Field], **field_options) -> typing.Any:
|
||||
"""
|
||||
The field factory. Constructs the field in a manner which is compatible
|
||||
with type annotations.
|
||||
|
||||
Positional arguments, keyword arguments and *field_options* are passed
|
||||
to the constructor.
|
||||
"""
|
||||
return cls(**field_options)
|
||||
|
||||
def __call__(self, secrets: Secrets) -> typing.Any:
|
||||
"""Evaluate the field and return the value."""
|
||||
value = self.get_value(secrets)
|
||||
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
if self.as_type is not None:
|
||||
return self.as_type(value)
|
||||
|
||||
return value
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_value(self, secrets: Secrets) -> typing.Any:
|
||||
"""
|
||||
Get and return the field's value. Subclasses must implement this
|
||||
method.
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
class LiteralField(Field):
|
||||
"""
|
||||
Concrete :py:class:`keep_it_secret.Field` subclass that wraps a literal
|
||||
value.
|
||||
|
||||
Example:
|
||||
|
||||
.. code-block:: pycon
|
||||
|
||||
>>> spam = LiteralField('spam')
|
||||
>>> spam(secrets)
|
||||
'spam'
|
||||
>>> one = LiteralField(1)
|
||||
>>> one()
|
||||
>>> one(secrets)
|
||||
1
|
||||
>>> anything_works = LiteralField(RuntimeError('BOOM'))
|
||||
>>> anything_works(secrets)
|
||||
RuntimeError('BOOM')
|
||||
|
||||
:param value: The value to wrap.
|
||||
"""
|
||||
|
||||
def __init__(self, value: typing.Any, **field_options):
|
||||
field_options['as_type'] = None
|
||||
super().__init__(**field_options)
|
||||
|
||||
self.value = value
|
||||
|
||||
@classmethod
|
||||
def new(cls: type[LiteralField], value: typing.Any, **field_options) -> typing.Any: # type: ignore[override]
|
||||
return cls(value, **field_options)
|
||||
|
||||
def get_value(self, secrets: Secrets) -> typing.Any:
|
||||
"""Returns the wrapped value."""
|
||||
return self.value
|
||||
|
||||
|
||||
class EnvField(Field):
|
||||
"""
|
||||
Concrete :py:class:`keep_it_secret.Field` subclass that uses ``os.environ``
|
||||
to resolve the value.
|
||||
|
||||
Example:
|
||||
|
||||
.. code-block:: pycon
|
||||
|
||||
>>> db_password = EnvField('APP_DB_PASSWORD')
|
||||
>>> db_password(secrets)
|
||||
Traceback (most recent call last):
|
||||
File "<stdin>", line 1, in <module>
|
||||
File "/Users/bilbo/Projects/PLAYG/keep_it_secret/keep_it_secret/fields.py", line 83, in __call__
|
||||
if value is None:
|
||||
File "/Users/bilbo/Projects/PLAYG/keep_it_secret/keep_it_secret/fields.py", line 129, in get_value
|
||||
def new(cls: type[EnvField], key: str, default: typing.Any = None, **field_options) -> typing.Any:
|
||||
keep_it_secret.fields.Field.RequiredValueMissing: APP_DB_PASSWORD
|
||||
>>> os.environ['APP_DB_PASSWORD'] = 'spam'
|
||||
>>> db_password(secrets)
|
||||
'spam'
|
||||
|
||||
:param key: Environment dictionary key.
|
||||
:param default: Default value. Defaults to ``None``.
|
||||
"""
|
||||
def __init__(self, key: str, default: typing.Any = None, **field_options):
|
||||
super().__init__(**field_options)
|
||||
self.key = key
|
||||
self.default = default
|
||||
|
||||
@classmethod
|
||||
def new(cls: type[EnvField], # type: ignore[override]
|
||||
key: str,
|
||||
default: typing.Any = None,
|
||||
**field_options) -> typing.Any:
|
||||
return cls(key, default=default, **field_options)
|
||||
|
||||
def get_value(self, secrets: Secrets) -> typing.Any:
|
||||
"""
|
||||
Resolve the value using ``os.environ``.
|
||||
|
||||
:raises RequiredValueMissing: Signal the field's value is required but
|
||||
*key* is not present in the environment.
|
||||
"""
|
||||
if self.required is True and self.key not in os.environ:
|
||||
raise self.RequiredValueMissing(self.key)
|
||||
|
||||
return os.environ.get(self.key, self.default)
|
||||
|
||||
|
||||
class SecretsField(Field):
|
||||
"""
|
||||
Concrete :py:class:`keep_it_secret.Field` subclass that wraps a
|
||||
:py:class:`keep_it_secret.Secrets` subclass. Provides API to declare
|
||||
complex secret structures.
|
||||
|
||||
Example:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
class WeatherAPICredentials(Secrets):
|
||||
username: str = LiteralField('spam')
|
||||
password: str = EnvField.new('APP_WEATHER_API_PASSWORD', required=True)
|
||||
|
||||
class AppSecrets(Secrets):
|
||||
db_password: str = EnvField.new('APP_DB_PASSWORD', required=True)
|
||||
weather_api: WeatherAPICredentials = SecretsField(WeatherAPICredentials)
|
||||
|
||||
.. code-block:: pycon
|
||||
|
||||
>>> secrets = AppSecrets()
|
||||
>>> secrets.weather_api.password
|
||||
'eggs'
|
||||
>>> secrets.weather_api.__secrets_parent__ == secrets
|
||||
True
|
||||
|
||||
:param klass: :py:class:`keep_it_secret.Secrets` subclass to wrap.
|
||||
"""
|
||||
def __init__(self, klass: type[Secrets], **field_options):
|
||||
field_options['as_type'] = None
|
||||
super().__init__(**field_options)
|
||||
|
||||
self.klass = klass
|
||||
|
||||
@classmethod
|
||||
def new(cls: type[SecretsField], klass: type[Secrets], **field_options) -> typing.Any: # type: ignore[override]
|
||||
return cls(klass, **field_options)
|
||||
|
||||
def get_value(self, secrets: Secrets) -> typing.Any:
|
||||
"""
|
||||
Instantiate the wrapped *klass* and return it. *secrets* will be
|
||||
passed as ``parent`` to the instance.
|
||||
"""
|
||||
return self.klass(parent=secrets)
|
||||
|
||||
|
||||
class AbstractField(Field):
|
||||
"""
|
||||
The "placeholder" field. Use it in a :py:class:`keep_it_secret.Secrets`
|
||||
subclass to indicate that the field must be overloaded by a subclass.
|
||||
|
||||
Instances will raise :py:exc:`NotImplementedError` during evaluation.
|
||||
|
||||
Example:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
class BaseSecrets(Secrets):
|
||||
secret_key: str = AbstractField.new()
|
||||
|
||||
class DevelopmentSecrets(BaseSecrets):
|
||||
secret_key: str = LiteralField.new('thisisntsecure')
|
||||
|
||||
class ProductionSecrets(BaseSecrets):
|
||||
secret_key: str = EnvField.new('APP_SECRET_KEY', required=True)
|
||||
|
||||
Trying to instantiate ``BaseSecrets`` will fail:
|
||||
|
||||
.. code-block:: pycon
|
||||
|
||||
>>> secrets = BaseSecrets()
|
||||
Traceback (most recent call last):
|
||||
File "<stdin>", line 1, in <module>
|
||||
File "/Users/bilbo/Projects/PLAYG/keep_it_secret/keep_it_secret/secrets.py", line 105, in __init__
|
||||
self.__secrets_data__[field_name] = getattr(self, field_name)
|
||||
File "/Users/bilbo/Projects/PLAYG/keep_it_secret/keep_it_secret/secrets.py", line 27, in getter
|
||||
instance.__secrets_data__[field_name] = field(instance)
|
||||
File "/Users/bilbo/Projects/PLAYG/keep_it_secret/keep_it_secret/fields.py", line 83, in __call__
|
||||
value = self.get_value(secrets)
|
||||
File "/Users/bilbo/Projects/PLAYG/keep_it_secret/keep_it_secret/fields.py", line 171, in get_value
|
||||
raise NotImplementedError('Abstract field must be overloaded: `%s`' % self.name)
|
||||
NotImplementedError: Abstract field must be overloaded: `BaseSecrets.secret_key`
|
||||
|
||||
Instantiating ``DevelopmentSecrets`` will work as expected:
|
||||
|
||||
.. code-block:: pycon
|
||||
|
||||
>>> secrets = DevelopmentSecrets()
|
||||
>>> secrets.secret_key
|
||||
'thisisntsecure'
|
||||
"""
|
||||
|
||||
def get_value(self, secrets: Secrets) -> typing.Any:
|
||||
"""
|
||||
:raises NotImplementedError: Signal that the field needs to be
|
||||
overloaded.
|
||||
"""
|
||||
raise NotImplementedError('Abstract field must be overloaded: `%s`' % self.name)
|
||||
0
keep_it_secret/py.typed
Normal file
0
keep_it_secret/py.typed
Normal file
106
keep_it_secret/secrets.py
Normal file
106
keep_it_secret/secrets.py
Normal file
@@ -0,0 +1,106 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
|
||||
from keep_it_secret.fields import Field
|
||||
|
||||
TFields: typing.TypeAlias = dict[str, Field]
|
||||
|
||||
|
||||
def process_class_attributes(attributes) -> tuple[dict[str, typing.Any], TFields]:
|
||||
new_attributes: dict[str, typing.Any] = {}
|
||||
fields: TFields = {}
|
||||
|
||||
for attribute_name, attribute in attributes.items():
|
||||
if isinstance(attribute, Field) is True:
|
||||
fields[attribute_name] = attribute
|
||||
else:
|
||||
new_attributes[attribute_name] = attribute
|
||||
|
||||
return new_attributes, fields
|
||||
|
||||
|
||||
def field_property_factory(field_name: str, field: Field) -> property:
|
||||
def getter(instance: Secrets) -> typing.Any:
|
||||
if field_name not in instance.__secrets_data__:
|
||||
field: Field = instance.__secrets_fields__[field_name]
|
||||
instance.__secrets_data__[field_name] = field(instance)
|
||||
|
||||
return instance.__secrets_data__[field_name]
|
||||
|
||||
return property(fget=getter, doc=field.description)
|
||||
|
||||
|
||||
class SecretsBase(type):
|
||||
def __new__(cls, name, bases, attributes, **kwargs):
|
||||
super_new = super().__new__
|
||||
|
||||
fields = {}
|
||||
for base in bases:
|
||||
if isinstance(base, SecretsBase) is True:
|
||||
fields.update(base.__secrets_fields__)
|
||||
|
||||
new_attributes, cls_fields = process_class_attributes(attributes)
|
||||
|
||||
final_fields = {**fields, **cls_fields}
|
||||
new_attributes['__secrets_fields__'] = final_fields
|
||||
|
||||
new_class = super_new(cls, name, bases, new_attributes, **kwargs)
|
||||
|
||||
for field_name, field in final_fields.items():
|
||||
field.name = f'{name}.{field_name}'
|
||||
|
||||
setattr(
|
||||
new_class,
|
||||
field_name,
|
||||
field_property_factory(field_name, field),
|
||||
)
|
||||
|
||||
return new_class
|
||||
|
||||
|
||||
class Secrets(metaclass=SecretsBase):
|
||||
"""
|
||||
The base Secrets class, used to declare application-specfic secrets
|
||||
containers.
|
||||
|
||||
Example:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
class AppSecrets(Secrets):
|
||||
secret_key: str = AbstractField.new()
|
||||
db_password: str = EnvField.new('APP_DB_PASSWORD', required=True)
|
||||
|
||||
not_a_secret = 'spam'
|
||||
|
||||
def do_something(self) -> bool:
|
||||
return 'eggs'
|
||||
|
||||
When instantiated, ``AppSecrets`` will evaluate each of the fields and fill
|
||||
in instance properties with the appropriate values. Attributes which don't
|
||||
evaluate to :py:class:`Field` instances will not be modified.
|
||||
|
||||
Secrets classes retain their behaviour when they're subclassed. This allows
|
||||
the developer to compose env-specific secrets:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
class DevelopmentSecrets(AppSecrets):
|
||||
secret_key: str = LiteralField.new('thisisntsecure')
|
||||
|
||||
In this case, the ``secret_key`` field gets overloaded, while all the
|
||||
others remain as declared in ``AppSecrets``.
|
||||
|
||||
:param parent: The parent :py:class:`Secrets` subclass or ``None``.
|
||||
"""
|
||||
__secrets_fields__: TFields
|
||||
|
||||
def __init__(self, parent: Secrets | None = None):
|
||||
self.__secrets_parent__ = parent
|
||||
|
||||
self.__secrets_data__: dict[str, typing.Any] = {}
|
||||
|
||||
for field_name, field in self.__secrets_fields__.items():
|
||||
self.__secrets_data__[field_name] = getattr(self, field_name)
|
||||
Reference in New Issue
Block a user