Compare commits

..

No commits in common. "release" and "v1.0.0" have entirely different histories.

24 changed files with 128 additions and 984 deletions

View File

@ -1,27 +0,0 @@
name: "CI"
on:
push:
branches:
- "release"
jobs:
run-checks:
name: "Checks"
runs-on: "ubuntu-latest"
steps:
- uses: "actions/checkout@v4"
- uses: "actions/setup-python@v5"
with:
python-version: "3.10"
- uses: "Gr1N/setup-poetry@v8"
with:
poetry-version: "1.7.1"
- name: "Install deps"
run: |
set -x
poetry install
- name: "Run CI task"
run: |
set -x
poetry run inv ci

View File

@ -1,25 +1,5 @@
## Keep It Secret Changelog ## Keep It Secret Changelog
#### v1.2.2 (2024-06-05)
* TeamCity integration for private builds.
#### v1.2.1 (2024-05-29)
* Gitea Actions integration.
* `README.md` language fixes.
#### v1.2.0 (2024-02-08)
* Hashicorp Vault integration.
#### v1.1.0 (2024-01-18)
* `Secrets.resolve_dependency()` API for resolving field dependencies.
* AWS Secrets Manager client management moved from `AWSSecretsManagerField` to
`AWSSecrets`.
* Docs tweaked.
#### v1.0.0 (2024-01-04) #### v1.0.0 (2024-01-04)
* Initial public release. * Initial public release.

View File

@ -1,88 +0,0 @@
ARG APP_USER_UID=10001
ARG APP_USER_GID=10001
ARG IMAGE_TAG=development.000000
FROM python:3.10.14-slim-bookworm AS base
ARG APP_USER_UID
ARG APP_USER_GID
ARG IMAGE_TAG
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=off \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
PIP_INDEX_URL="https://nexus.bthlabs.pl/repository/pypi/simple/" \
POETRY_VERSION=1.7.1 \
POETRY_HOME="/srv/poetry" \
POETRY_NO_INTERACTION=1 \
VIRTUAL_ENV="/srv/venv" \
KEEP_IT_SECRET_IMAGE_TAG=${IMAGE_TAG}
RUN if [ ! $(getent group ${APP_USER_GID}) ];then groupadd -g ${APP_USER_GID} app; fi && \
useradd -m -d /home/app -u ${APP_USER_UID} -g ${APP_USER_GID} app && \
apt-get update && \
apt-get install -y --no-install-recommends wait-for-it dumb-init curl && \
(curl -sSL https://install.python-poetry.org | python -) && \
python3.10 -m venv ${VIRTUAL_ENV} && \
mkdir /srv/app /srv/bin /srv/lib /srv/log /srv/run && \
chown -R ${APP_USER_UID}:${APP_USER_GID} /srv
ENV PATH="${VIRTUAL_ENV}/bin:/srv/bin:/srv/poetry/bin:${PATH}"
USER app
WORKDIR /srv/app
FROM base AS development
ARG APP_USER_UID
ARG APP_USER_GID
ARG IMAGE_TAG
USER app
WORKDIR /srv/app
FROM development AS deployment-build
ARG APP_USER_UID
ARG APP_USER_GID
ARG IMAGE_TAG
ADD --chown=$APP_USER_UID:$APP_USER_GID . /srv/app
RUN poetry install --no-dev
FROM deployment-build AS ci
ARG APP_USER_UID
ARG APP_USER_GID
ARG IMAGE_TAG
RUN poetry install
FROM base AS deployment
ARG APP_USER_UID
ARG APP_USER_GID
ARG IMAGE_TAG
COPY --from=deployment-build /srv/app /srv/app
COPY --from=deployment-build /srv/venv /srv/venv
RUN chown -R $APP_USER_UID:$APP_USER_GID /srv
USER root
RUN apt-get clean autoclean && \
apt-get autoremove --yes && \
rm -rf /var/lib/apt /var/lib/dpkg && \
rm -rf /home/app/.cache
USER app
ENV PYTHONPATH="/srv/app"
ENV DJANGO_SETTINGS_MODULE="settings"
EXPOSE 8000
ENTRYPOINT ["/usr/bin/dumb-init"]
CMD ["echo NOOP"]

View File

@ -65,7 +65,7 @@ to provide secrets suitable for the development environment:
``` ```
The `ProductionSecrets` class uses environment variables and AWS Secrets The `ProductionSecrets` class uses environment variables and AWS Secrets
Manager to provide secrets suitable for the production environment: Manager to provide secrets suitable for the development environment:
``` ```
>>> production_secrets = ProductionSecrets() >>> production_secrets = ProductionSecrets()

View File

@ -11,10 +11,10 @@
project = 'Keep It Secret' project = 'Keep It Secret'
copyright = '2023-present Tomek Wójcik' copyright = '2023-present Tomek Wójcik'
author = 'Tomek Wójcik' author = 'Tomek Wójcik'
version = '1.2.2' version = '1.0.0'
# The full version, including alpha/beta/rc tags # The full version, including alpha/beta/rc tags
release = '1.2.2' release = '1.0.0'
# -- General configuration --------------------------------------------------- # -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
@ -30,8 +30,5 @@ exclude_patterns = []
# -- Options for HTML output ------------------------------------------------- # -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output # https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
html_theme = 'furo' html_theme = 'sphinx_rtd_theme'
html_static_path = ['_static'] html_static_path = ['_static']
# -- Options for autodoc -----------------------------------------------------
autodoc_member_order = 'bysource'

View File

@ -25,26 +25,6 @@ to be installed:
.. autoclass:: keep_it_secret.ext.aws.AWSSecretsManagerField .. autoclass:: keep_it_secret.ext.aws.AWSSecretsManagerField
:members: :members:
Hashicorp Vault Wrapper
-----------------------
**Installation**
Since Vault extension has external dependencies it needs to be explicitly named
to be installed:
.. code-block:: shell
$ pip install keep_it_secret[vault]
**API**
.. autoclass:: keep_it_secret.ext.vault.VaultSecrets
:members:
.. autoclass:: keep_it_secret.ext.vault.VaultKV2Field
:members:
Basic secrets loader Basic secrets loader
-------------------- --------------------

View File

@ -6,7 +6,7 @@ from .fields import ( # noqa: F401
) )
from .secrets import Secrets # noqa: F401 from .secrets import Secrets # noqa: F401
__version__ = '1.2.2' __version__ = '1.0.0'
__all__ = [ __all__ = [
'AbstractField', 'AbstractField',

View File

@ -57,11 +57,6 @@ class AWSSecrets(Secrets):
:type: ``str | None`` :type: ``str | None``
""" """
def __init__(self, parent: Secrets | None = None):
super().__init__(parent=parent)
self.client: PAWSSecretsManagerClient | None = None
def as_boto3_client_kwargs(self) -> dict[str, typing.Any]: def as_boto3_client_kwargs(self) -> dict[str, typing.Any]:
""" """
Return representation of the mapped variables for use in Return representation of the mapped variables for use in
@ -83,15 +78,6 @@ class AWSSecrets(Secrets):
return result return result
def get_client(self) -> PAWSSecretsManagerClient:
if self.client is None:
self.client = boto3.client(
'secretsmanager',
**self.as_boto3_client_kwargs(),
)
return self.client
class AWSSecretsManagerField(Field): class AWSSecretsManagerField(Field):
""" """
@ -101,7 +87,7 @@ class AWSSecretsManagerField(Field):
:param secret_id: ID of the secret to fetch. :param secret_id: ID of the secret to fetch.
:param default: Default value. Defaults to ``None``. :param default: Default value. Defaults to ``None``.
:param decoder: A callable to decode the fetched value. Defaults to :param decoder: A callable to decode the fetched value. Defaults to
:py:func:`json.loads`. ``json.loads``.
""" """
def __init__(self, def __init__(self,
secret_id: str, secret_id: str,
@ -114,6 +100,8 @@ class AWSSecretsManagerField(Field):
self.default = default self.default = default
self.decoder = decoder self.decoder = decoder
self.client: PAWSSecretsManagerClient | None = None
@classmethod @classmethod
def new(cls: type[AWSSecretsManagerField], # type: ignore[override] def new(cls: type[AWSSecretsManagerField], # type: ignore[override]
secret_id: str, secret_id: str,
@ -122,25 +110,33 @@ class AWSSecretsManagerField(Field):
**field_options) -> AWSSecretsManagerField: **field_options) -> AWSSecretsManagerField:
return cls(secret_id, default=default, decoder=decoder, **field_options) return cls(secret_id, default=default, decoder=decoder, **field_options)
def get_client(self, secrets: Secrets) -> PAWSSecretsManagerClient:
if self.client is None:
aws_secrets = typing.cast(AWSSecrets, secrets.aws) # type: ignore[attr-defined]
self.client = boto3.client(
'secretsmanager',
**aws_secrets.as_boto3_client_kwargs(),
)
return self.client
def get_value(self, secrets: Secrets) -> typing.Any: def get_value(self, secrets: Secrets) -> typing.Any:
""" """
Retrieve, decode and return the secret specified by *secret_id*. Retrieve, decode and return the secret specified by *secret_id*.
Depends on :py:class:`AWSSecrets` to be declared in ``aws`` field on Depends on :py:class:`AWSSecrets` to be declared in ``secrets.aws``
``secrets`` or one of its parents. field.
:raises DependencyMissing: Signal that ``secrets.aws`` field is :raises DependencyMissing: Signal that ``secrets.aws`` field is
missing. missing.
:raises RequiredValueMissing: Signal the field's value is required but :raises RequiredValueMissing: Signal the field's value is required but
*secret_id* is not present in the Secrets Manager. *secret_id* is not present in the Secrets Manager.
""" """
aws_secrets: AWSSecrets = secrets.resolve_dependency( if hasattr(secrets, 'aws') is False:
'aws', include_parents=True,
)
if aws_secrets is secrets.UNRESOLVED_DEPENDENCY:
raise self.DependencyMissing('aws') raise self.DependencyMissing('aws')
client = aws_secrets.get_client() client = self.get_client(secrets)
try: try:
secret = client.get_secret_value(SecretId=self.secret_id) secret = client.get_secret_value(SecretId=self.secret_id)

View File

@ -1,177 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import typing
import hvac
from keep_it_secret.fields import EnvField, Field
from keep_it_secret.secrets import Secrets
class VaultSecrets(Secrets):
"""
Concrete :py:class:`keep_it_secret.Secrets` subclass that maps environment
variables to Vault credentials.
"""
url: str = EnvField.new('VAULT_URL', required=True)
"""
Maps ``VAULT_URL`` environment variable.
:type: ``str``
"""
token: str = EnvField.new('VAULT_TOKEN', required=True)
"""
Maps ``VAULT_TOKEN`` environment variable.
:type: ``str``
"""
client_cert_path: str | None = EnvField.new('VAULT_CLIENT_CERT_PATH', required=False)
"""
Maps ``VAULT_CLIENT_CERT_PATH`` environment variable.
:type: ``str | None``
"""
client_key_path: str | None = EnvField.new('VAULT_CLIENT_KEY_PATH', required=False)
"""
Maps ``VAULT_CLIENT_KEY_PATH`` environment variable.
:type: ``str | None``
"""
server_cert_path: str | None = EnvField.new('VAULT_SERVER_CERT_PATH', required=False)
"""
Maps ``VAULT_SERVER_CERT_PATH`` environment variable.
:type: ``str | None``
"""
def __init__(self, parent: Secrets | None = None):
super().__init__(parent)
self.client: hvac.Client | None = None
def as_hvac_client_kwargs(self) -> dict[str, typing.Any]:
"""
Return representation of the mapped variables for use in
``hvac.Client`` constructor.
"""
result: dict[str, typing.Any] = {
'url': self.url,
'token': self.token,
}
if self.client_cert_path is not None and self.client_key_path is not None:
result['cert'] = (self.client_cert_path, self.client_key_path)
if self.server_cert_path is not None:
result['verify'] = self.server_cert_path
return result
def get_client(self) -> hvac.Client:
"""
Return the ``hvac.Client`` instance configured using the credentials.
"""
if self.client is None:
self.client = hvac.Client(
**self.as_hvac_client_kwargs(),
)
return self.client
class VaultKV2Field(Field):
"""
Concrete :py:class:`keep_it_secret.Field` subclass that uses Hashicorp
Vault KV V2 secrets engine to resolve the value.
If ``as_type`` isn't provided, the fetched value will be returned as-is (
i.e. as a dict). Otherwise, ``as_type`` should be a type which accepts
kwargs representing the value's keys in its constructor.
:param mount_point: Mount path for the secret engine.
:param path: Path to the secret to fetch.
:param version: Version identifier. Defaults to ``None`` (aka the newest
version).
:param default: Default value. Defaults to ``None``.
"""
def __init__(self,
mount_point: str,
path: str,
version: str | None = None,
default: typing.Any = None,
**field_options):
super().__init__(**field_options)
as_type: type | None = field_options.pop('as_type', None)
self.as_type = self.cast if as_type is not None else None # type: ignore[assignment]
self.mount_point = mount_point
self.path = path
self.version = version
self.default = default
self.cast_to = as_type
@classmethod
def new(cls: type[VaultKV2Field], # type: ignore[override]
mount_point: str,
path: str,
version: str | None = None,
default: typing.Any = None,
**field_options):
return cls(mount_point, path, version=version, default=default, **field_options)
def cast(self, data: typing.Any) -> typing.Any:
"""
Cast ``data`` to the type specified in ``as_type`` argument of the
constructor.
"""
if isinstance(data, dict) is False:
return data
if self.cast_to is None:
return data
return self.cast_to(**data)
def get_value(self, secrets: Secrets) -> typing.Any:
"""
Retrieve, decode and return the secret stored in a KV V2 secrets engine
mounted at *mount_path* under the path *path*.
Depends on :py:class:`VaultSecrets` to be declared in ``vault`` field
on ``secrets`` or one of its parents.
:raises DependencyMissing: Signal that ``secrets.aws`` field is
missing.
:raises RequiredValueMissing: Signal the field's value is required but
*secret_id* is not present in the secrets engine.
"""
vault_secrets: VaultSecrets = secrets.resolve_dependency(
'vault', include_parents=True,
)
if vault_secrets is secrets.UNRESOLVED_DEPENDENCY:
raise self.DependencyMissing('vault')
client = vault_secrets.get_client()
try:
secret_response = client.secrets.kv.v2.read_secret_version(
self.path,
version=self.version,
mount_point=self.mount_point,
raise_on_deleted_version=True,
)
return secret_response['data']['data']
except hvac.exceptions.InvalidPath as exception:
if self.required is True:
raise self.RequiredValueMissing(f'{self.mount_point}{self.path}') from exception
else:
return self.default

View File

@ -95,10 +95,6 @@ class Secrets(metaclass=SecretsBase):
:param parent: The parent :py:class:`Secrets` subclass or ``None``. :param parent: The parent :py:class:`Secrets` subclass or ``None``.
""" """
#: Sentinel for unresolved dependency.
UNRESOLVED_DEPENDENCY: list[typing.Any] = []
__secrets_fields__: TFields __secrets_fields__: TFields
def __init__(self, parent: Secrets | None = None): def __init__(self, parent: Secrets | None = None):
@ -108,24 +104,3 @@ class Secrets(metaclass=SecretsBase):
for field_name, field in self.__secrets_fields__.items(): for field_name, field in self.__secrets_fields__.items():
self.__secrets_data__[field_name] = getattr(self, field_name) self.__secrets_data__[field_name] = getattr(self, field_name)
def resolve_dependency(self,
name: str,
*,
include_parents: bool = True) -> typing.Any:
"""
Resolve a dependency field identified by *name* and return its value.
returns :py:attr:`keep_it_secret.Secrets.UNRESOLVED_DEPENDENCY` if
the value can't be resolved.
:param include_parents: Recursively include parents, if any.
"""
if name in self.__secrets_fields__:
return getattr(self, name)
if include_parents is True and self.__secrets_parent__ is not None:
return self.__secrets_parent__.resolve_dependency(
name, include_parents=include_parents,
)
return self.UNRESOLVED_DEPENDENCY

103
poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. # This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
[[package]] [[package]]
name = "alabaster" name = "alabaster"
@ -43,27 +43,6 @@ files = [
[package.extras] [package.extras]
dev = ["freezegun (>=1.0,<2.0)", "pytest (>=6.0)", "pytest-cov"] dev = ["freezegun (>=1.0,<2.0)", "pytest (>=6.0)", "pytest-cov"]
[[package]]
name = "beautifulsoup4"
version = "4.12.3"
description = "Screen-scraping library"
optional = false
python-versions = ">=3.6.0"
files = [
{file = "beautifulsoup4-4.12.3-py3-none-any.whl", hash = "sha256:b80878c9f40111313e55da8ba20bdba06d8fa3969fc68304167741bbf9e082ed"},
{file = "beautifulsoup4-4.12.3.tar.gz", hash = "sha256:74e3d1928edc070d21748185c46e3fb33490f22f52a3addee9aee0f4f7781051"},
]
[package.dependencies]
soupsieve = ">1.2"
[package.extras]
cchardet = ["cchardet"]
chardet = ["chardet"]
charset-normalizer = ["charset-normalizer"]
html5lib = ["html5lib"]
lxml = ["lxml"]
[[package]] [[package]]
name = "boto3" name = "boto3"
version = "1.34.8" version = "1.34.8"
@ -412,40 +391,6 @@ files = [
[package.dependencies] [package.dependencies]
flake8 = ">=2" flake8 = ">=2"
[[package]]
name = "furo"
version = "2023.9.10"
description = "A clean customisable Sphinx documentation theme."
optional = false
python-versions = ">=3.8"
files = [
{file = "furo-2023.9.10-py3-none-any.whl", hash = "sha256:513092538537dc5c596691da06e3c370714ec99bc438680edc1debffb73e5bfc"},
{file = "furo-2023.9.10.tar.gz", hash = "sha256:5707530a476d2a63b8cad83b4f961f3739a69f4b058bcf38a03a39fa537195b2"},
]
[package.dependencies]
beautifulsoup4 = "*"
pygments = ">=2.7"
sphinx = ">=6.0,<8.0"
sphinx-basic-ng = "*"
[[package]]
name = "hvac"
version = "2.1.0"
description = "HashiCorp Vault API client"
optional = false
python-versions = ">=3.8,<4.0"
files = [
{file = "hvac-2.1.0-py3-none-any.whl", hash = "sha256:73bc91e58c3fc7c6b8107cdaca9cb71fa0a893dfd80ffbc1c14e20f24c0c29d7"},
{file = "hvac-2.1.0.tar.gz", hash = "sha256:b48bcda11a4ab0a7b6c47232c7ba7c87fda318ae2d4a7662800c465a78742894"},
]
[package.dependencies]
requests = ">=2.27.1,<3.0.0"
[package.extras]
parser = ["pyhcl (>=0.4.4,<0.5.0)"]
[[package]] [[package]]
name = "idna" name = "idna"
version = "3.6" version = "3.6"
@ -1392,17 +1337,6 @@ files = [
{file = "snowballstemmer-2.2.0.tar.gz", hash = "sha256:09b16deb8547d3412ad7b590689584cd0fe25ec8db3be37788be3810cbf19cb1"}, {file = "snowballstemmer-2.2.0.tar.gz", hash = "sha256:09b16deb8547d3412ad7b590689584cd0fe25ec8db3be37788be3810cbf19cb1"},
] ]
[[package]]
name = "soupsieve"
version = "2.5"
description = "A modern CSS selector implementation for Beautiful Soup."
optional = false
python-versions = ">=3.8"
files = [
{file = "soupsieve-2.5-py3-none-any.whl", hash = "sha256:eaa337ff55a1579b6549dc679565eac1e3d000563bcb1c8ab0d0fefbc0c2cdc7"},
{file = "soupsieve-2.5.tar.gz", hash = "sha256:5663d5a7b3bfaeee0bc4372e7fc48f9cff4940b3eec54a6451cc5299f1097690"},
]
[[package]] [[package]]
name = "sphinx" name = "sphinx"
version = "7.2.6" version = "7.2.6"
@ -1438,21 +1372,23 @@ lint = ["docutils-stubs", "flake8 (>=3.5.0)", "flake8-simplify", "isort", "mypy
test = ["cython (>=3.0)", "filelock", "html5lib", "pytest (>=4.6)", "setuptools (>=67.0)"] test = ["cython (>=3.0)", "filelock", "html5lib", "pytest (>=4.6)", "setuptools (>=67.0)"]
[[package]] [[package]]
name = "sphinx-basic-ng" name = "sphinx-rtd-theme"
version = "1.0.0b2" version = "2.0.0"
description = "A modern skeleton for Sphinx themes." description = "Read the Docs theme for Sphinx"
optional = false optional = false
python-versions = ">=3.7" python-versions = ">=3.6"
files = [ files = [
{file = "sphinx_basic_ng-1.0.0b2-py3-none-any.whl", hash = "sha256:eb09aedbabfb650607e9b4b68c9d240b90b1e1be221d6ad71d61c52e29f7932b"}, {file = "sphinx_rtd_theme-2.0.0-py2.py3-none-any.whl", hash = "sha256:ec93d0856dc280cf3aee9a4c9807c60e027c7f7b461b77aeffed682e68f0e586"},
{file = "sphinx_basic_ng-1.0.0b2.tar.gz", hash = "sha256:9ec55a47c90c8c002b5960c57492ec3021f5193cb26cebc2dc4ea226848651c9"}, {file = "sphinx_rtd_theme-2.0.0.tar.gz", hash = "sha256:bd5d7b80622406762073a04ef8fadc5f9151261563d47027de09910ce03afe6b"},
] ]
[package.dependencies] [package.dependencies]
sphinx = ">=4.0" docutils = "<0.21"
sphinx = ">=5,<8"
sphinxcontrib-jquery = ">=4,<5"
[package.extras] [package.extras]
docs = ["furo", "ipython", "myst-parser", "sphinx-copybutton", "sphinx-inline-tabs"] dev = ["bump2version", "sphinxcontrib-httpdomain", "transifex-client", "wheel"]
[[package]] [[package]]
name = "sphinxcontrib-applehelp" name = "sphinxcontrib-applehelp"
@ -1508,6 +1444,20 @@ Sphinx = ">=5"
lint = ["docutils-stubs", "flake8", "mypy"] lint = ["docutils-stubs", "flake8", "mypy"]
test = ["html5lib", "pytest"] test = ["html5lib", "pytest"]
[[package]]
name = "sphinxcontrib-jquery"
version = "4.1"
description = "Extension to include jQuery on newer Sphinx releases"
optional = false
python-versions = ">=2.7"
files = [
{file = "sphinxcontrib-jquery-4.1.tar.gz", hash = "sha256:1620739f04e36a2c779f1a131a2dfd49b2fd07351bf1968ced074365933abc7a"},
{file = "sphinxcontrib_jquery-4.1-py2.py3-none-any.whl", hash = "sha256:f936030d7d0147dd026a4f2b5a57343d233f1fc7b363f68b3d4f1cb0993878ae"},
]
[package.dependencies]
Sphinx = ">=1.8"
[[package]] [[package]]
name = "sphinxcontrib-jsmath" name = "sphinxcontrib-jsmath"
version = "1.0.1" version = "1.0.1"
@ -1709,9 +1659,8 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p
[extras] [extras]
aws = ["boto3"] aws = ["boto3"]
vault = ["hvac"]
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.10" python-versions = "^3.10"
content-hash = "e4959d95b206c17013e548f015fbf288d9bc754584bfc128458cadead137a863" content-hash = "10bea99532b925eac76461f71e685103a771c5549deeaff65f9a96df9e585341"

View File

@ -1,26 +1,19 @@
[tool.poetry] [tool.poetry]
name = "keep-it-secret" name = "keep-it-secret"
version = "1.2.2" version = "1.0.0"
description = "Keep It Secret by BTHLabs" description = "Keep It Secret by BTHLabs"
authors = ["Tomek Wójcik <contact@bthlabs.pl>"] authors = ["Tomek Wójcik <contact@bthlabs.pl>"]
maintainers = ["BTHLabs <contact@bthlabs.pl>"]
license = "MIT" license = "MIT"
readme = "README.md" readme = "README.md"
homepage = "https://projects.bthlabs.pl/keep-it-secret/"
repository = "https://git.bthlabs.pl/tomekwojcik/keep-it-secret/"
documentation = "https://projects.bthlabs.pl/keep-it-secret/"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.10" python = "^3.10"
boto3 = {version = ">=1.34.0", optional = true} boto3 = {version = ">=1.34.0", optional = true}
hvac = {version = ">=2.1.0", optional = true}
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
boto3 = "1.34.8" boto3 = "1.34.8"
flake8 = "6.1.0" flake8 = "6.1.0"
flake8-commas = "2.1.0" flake8-commas = "2.1.0"
furo = "2023.9.10"
hvac = "2.1.0"
invoke = "1.7.3" invoke = "1.7.3"
ipdb = "0.13.13" ipdb = "0.13.13"
ipython = "8.19.0" ipython = "8.19.0"
@ -30,11 +23,11 @@ pytest = "7.4.3"
pytest-env = "1.1.3" pytest-env = "1.1.3"
pytest-mock = "3.12.0" pytest-mock = "3.12.0"
sphinx = "7.2.6" sphinx = "7.2.6"
sphinx-rtd-theme = "2.0.0"
twine = "4.0.2" twine = "4.0.2"
[tool.poetry.extras] [tool.poetry.extras]
aws = ["boto3"] aws = ["boto3"]
vault = ["hvac"]
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]

View File

@ -6,15 +6,12 @@ hang-closing = False
[tool:pytest] [tool:pytest]
addopts = --disable-warnings addopts = --disable-warnings
junit_suite_name = keep_it_secret
env = env =
KEEP_IT_SECRET_TESTS_SPAM=spam KEEP_IT_SECRET_TESTS_SPAM=spam
AWS_ACCESS_KEY_ID=thisisntright AWS_ACCESS_KEY_ID=thisisntright
AWS_SECRET_ACCESS_KEY=thisisntright AWS_SECRET_ACCESS_KEY=thisisntright
AWS_SESSION_TOKEN=thisisntright AWS_SESSION_TOKEN=thisisntright
AWS_DEFAULT_REGION=eu-central-1 AWS_DEFAULT_REGION=eu-central-1
VAULT_URL=http://thisisntright:8200/
VAULT_TOKEN=thisisntright
[mypy] [mypy]
@ -24,8 +21,5 @@ ignore_missing_imports = True
[mypy-boto3.*] [mypy-boto3.*]
ignore_missing_imports = True ignore_missing_imports = True
[mypy-hvac.*]
ignore_missing_imports = True
[mypy-moto.*] [mypy-moto.*]
ignore_missing_imports = True ignore_missing_imports = True

View File

@ -1,7 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# type: ignore # type: ignore
import os
from invoke import task from invoke import task
try: try:
@ -22,25 +20,13 @@ def mypy(ctx, warn=False):
@task @task
def tests(ctx, warn=False): def tests(ctx, warn=False):
pytest_command_line = [ return ctx.run('pytest -v', warn=warn)
'pytest',
'-v',
]
if 'KEEP_IT_SECRET_JUNIT_XML_PATH' in os.environ:
pytest_command_line.append(
f"--junit-xml={os.environ['KEEP_IT_SECRET_JUNIT_XML_PATH']}",
)
return ctx.run(' '.join(pytest_command_line), warn=warn)
@task @task
def ci(ctx): def ci(ctx):
result = True result = True
ctx.run('mkdir -p build')
if flake8(ctx, warn=True).exited != 0: if flake8(ctx, warn=True).exited != 0:
result = False result = False

View File

@ -1,12 +0,0 @@
# -*- coding: utf-8 -*-
# type: ignore
from __future__ import annotations
import pytest
from .fixtures import TestingAWSSecrets
@pytest.fixture
def testing_aws_secrets() -> TestingAWSSecrets:
return TestingAWSSecrets()

View File

@ -1,10 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from keep_it_secret.ext.aws import AWSSecrets
from keep_it_secret.fields import SecretsField
from keep_it_secret.secrets import Secrets
class TestingAWSSecrets(Secrets):
aws = SecretsField.new(AWSSecrets)

View File

@ -5,31 +5,8 @@ from __future__ import annotations
import os import os
from unittest import mock from unittest import mock
import pytest
from pytest_mock import MockerFixture
from keep_it_secret.ext import aws from keep_it_secret.ext import aws
from .fixtures import TestingAWSSecrets
@pytest.fixture
def mock_boto3_client(mocker: MockerFixture) -> mock.Mock:
return mocker.patch.object(aws.boto3, 'client')
@pytest.fixture
def aws_secrets_manager_client() -> mock.Mock:
return mock.Mock()
def test_init():
# When
result = aws.AWSSecrets()
# Then
result.client is None
@mock.patch.dict( @mock.patch.dict(
os.environ, os.environ,
@ -66,40 +43,3 @@ def test_as_boto3_client_kwargs_empty():
# Then # Then
assert result == {} assert result == {}
def test_get_client_cache_miss(mock_boto3_client: mock.Mock,
aws_secrets_manager_client: mock.Mock,
testing_aws_secrets: TestingAWSSecrets):
# Given
mock_boto3_client.return_value = aws_secrets_manager_client
field = aws.AWSSecrets()
# When
result = field.get_client()
# Then
assert result == aws_secrets_manager_client
assert field.client == aws_secrets_manager_client
mock_boto3_client.assert_called_once_with(
'secretsmanager', **testing_aws_secrets.aws.as_boto3_client_kwargs(),
)
def test_get_client_cache_hit(mock_boto3_client: mock.Mock,
aws_secrets_manager_client: mock.Mock,
testing_aws_secrets: TestingAWSSecrets):
# Given
field = aws.AWSSecrets()
field.client = aws_secrets_manager_client
# When
result = field.get_client()
# Then
assert result == aws_secrets_manager_client
mock_boto3_client.assert_not_called()

View File

@ -4,6 +4,7 @@ from __future__ import annotations
import datetime import datetime
import json import json
from unittest import mock
import boto3 import boto3
import moto import moto
@ -14,6 +15,25 @@ from keep_it_secret.ext import aws
from keep_it_secret.secrets import Secrets from keep_it_secret.secrets import Secrets
class TestingAWSSecrets(Secrets):
aws = aws.AWSSecrets()
@pytest.fixture
def aws_secrets_manager_client() -> mock.Mock:
return mock.Mock(spec=['get_secret_value'])
@pytest.fixture
def mock_boto3_client(mocker: MockerFixture) -> mock.Mock:
return mocker.patch.object(aws.boto3, 'client')
@pytest.fixture
def testing_aws_secrets() -> TestingAWSSecrets:
return TestingAWSSecrets()
def test_init(): def test_init():
# When # When
result = aws.AWSSecretsManagerField('keep_it_secret/tests/spam') result = aws.AWSSecretsManagerField('keep_it_secret/tests/spam')
@ -22,6 +42,7 @@ def test_init():
assert result.secret_id == 'keep_it_secret/tests/spam' assert result.secret_id == 'keep_it_secret/tests/spam'
assert result.default is None assert result.default is None
assert result.decoder == json.loads assert result.decoder == json.loads
assert result.client is None
def test_init_with_default(): def test_init_with_default():
@ -86,11 +107,53 @@ def test_new(mocker: MockerFixture):
) )
def test_get_client_cache_miss(mock_boto3_client: mock.Mock,
aws_secrets_manager_client: mock.Mock,
testing_aws_secrets: TestingAWSSecrets):
# Given
mock_boto3_client.return_value = aws_secrets_manager_client
field = aws.AWSSecretsManagerField('keep_it_secret/tests/spam')
# When
result = field.get_client(testing_aws_secrets)
# Then
assert result == aws_secrets_manager_client
assert field.client == aws_secrets_manager_client
mock_boto3_client.assert_called_once_with(
'secretsmanager', **testing_aws_secrets.aws.as_boto3_client_kwargs(),
)
def test_get_client_cache_hit(mock_boto3_client: mock.Mock,
aws_secrets_manager_client: mock.Mock,
testing_aws_secrets: TestingAWSSecrets):
# Given
field = aws.AWSSecretsManagerField('keep_it_secret/tests/spam')
field.client = aws_secrets_manager_client
# When
result = field.get_client(testing_aws_secrets)
# Then
assert result == aws_secrets_manager_client
mock_boto3_client.assert_not_called()
def test_get_value_aws_dependency_missing(mocker: MockerFixture, def test_get_value_aws_dependency_missing(mocker: MockerFixture,
aws_secrets_manager_client: mock.Mock,
testing_secrets: Secrets): testing_secrets: Secrets):
# Given # Given
field = aws.AWSSecretsManagerField('keep_it_secret/tests/spam') field = aws.AWSSecretsManagerField('keep_it_secret/tests/spam')
mock_field_get_client = mocker.patch.object(
field, 'get_client', return_value=aws_secrets_manager_client,
)
with pytest.raises(field.DependencyMissing) as exception_info: with pytest.raises(field.DependencyMissing) as exception_info:
# When # When
_ = field(testing_secrets) _ = field(testing_secrets)
@ -98,6 +161,10 @@ def test_get_value_aws_dependency_missing(mocker: MockerFixture,
# Then # Then
assert exception_info.value.args[0] == 'aws' assert exception_info.value.args[0] == 'aws'
mock_field_get_client.assert_not_called()
aws_secrets_manager_client.get_secret_value.assert_not_called()
@moto.mock_secretsmanager @moto.mock_secretsmanager
def test_get_value_required_value_not_found(testing_aws_secrets: Secrets): def test_get_value_required_value_not_found(testing_aws_secrets: Secrets):

View File

@ -1,12 +0,0 @@
# -*- coding: utf-8 -*-
# type: ignore
from __future__ import annotations
import pytest
from .fixtures import TestingVaultSecrets
@pytest.fixture
def testing_vault_secrets() -> TestingVaultSecrets:
return TestingVaultSecrets()

View File

@ -1,13 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
# -*- coding: utf-8 -*-
from __future__ import annotations
from keep_it_secret.ext.vault import VaultSecrets
from keep_it_secret.fields import SecretsField
from keep_it_secret.secrets import Secrets
class TestingVaultSecrets(Secrets):
vault = SecretsField.new(VaultSecrets)

View File

@ -1,212 +0,0 @@
# -*- coding: utf-8 -*-
# type: ignore
from __future__ import annotations
from unittest import mock
import hvac
import pytest
from pytest_mock import MockerFixture
from keep_it_secret import Secrets
from keep_it_secret.ext import vault
@pytest.fixture
def field() -> vault.VaultKV2Field:
return vault.VaultKV2Field('keep_it_secret/', 'tests/spam')
@pytest.fixture
def as_type() -> mock.Mock:
return mock.Mock()
@pytest.fixture
def field_with_as_type(as_type: mock.Mock) -> vault.VaultKV2Field:
return vault.VaultKV2Field('keep_it_secret/', 'tests/spam', as_type=as_type)
@pytest.fixture
def hvac_kv_v2_client(mocker: MockerFixture) -> mock.Mock:
result = mock.Mock()
result.secrets.kv.v2 = mock.Mock(spec=['read_secret_version'])
return result
def test_init():
# When
result = vault.VaultKV2Field('keep_it_secret/', 'tests/spam')
# Then
assert result.mount_point == 'keep_it_secret/'
assert result.path == 'tests/spam'
assert result.version is None
assert result.default is None
assert result.cast_to is None
assert result.as_type is None
def test_init_with_version():
# When
result = vault.VaultKV2Field(
'keep_it_secret/', 'tests/spam', version='1',
)
# Then
assert result.version == '1'
def test_init_with_default():
# When
result = vault.VaultKV2Field(
'keep_it_secret/', 'tests/spam', default='eggs',
)
# Then
assert result.default == 'eggs'
def test_init_with_as_type():
# When
result = vault.VaultKV2Field(
'keep_it_secret/', 'tests/spam', as_type=dict,
)
# Then
assert result.as_type == result.cast
assert result.cast_to == dict
def test_init_with_field_options():
# When
result = vault.VaultKV2Field(
'keep_it_secret/', 'tests/spam', required=False, description='eggs',
)
# Then
assert result.required is False
assert result.description == 'eggs'
def test_new(mocker: MockerFixture):
# Given
mock_init = mocker.patch.object(
vault.VaultKV2Field, '__init__', return_value=None,
)
# When
_ = vault.VaultKV2Field.new(
'keep_it_secret/',
'tests/spam',
version='1',
default=dict,
as_type=dict,
required=False,
description='eggs',
)
# Then
mock_init.assert_called_once_with(
'keep_it_secret/',
'tests/spam',
version='1',
default=dict,
as_type=dict,
required=False,
description='eggs',
)
def test_cast_not_dict(field: vault.VaultKV2Field):
# When
result = field.cast('spam')
# Then
assert result == 'spam'
def test_cast(field_with_as_type: vault.VaultKV2Field, as_type: mock.Mock):
# Given
as_type.return_value = 'spam'
# When
result = field_with_as_type.cast({
'spam': True,
'eggs': False,
})
# Then
assert result == 'spam'
as_type.assert_called_once_with(
spam=True,
eggs=False,
)
def test_get_value_vault_dependency_missing(field: vault.VaultKV2Field,
testing_secrets: Secrets):
# Given
with pytest.raises(field.DependencyMissing) as exception_info:
# When
_ = field(testing_secrets)
# Then
assert exception_info.value.args[0] == 'vault'
def test_get_value_required_value_not_found(testing_vault_secrets: Secrets,
hvac_kv_v2_client: mock.Mock,
field: vault.VaultKV2Field):
# Given
testing_vault_secrets.vault.client = hvac_kv_v2_client
hvac_kv_v2_client.secrets.kv.v2.read_secret_version.side_effect = hvac.exceptions.InvalidPath()
with pytest.raises(field.RequiredValueMissing) as exception_info:
# When
_ = field(testing_vault_secrets)
# Then
assert exception_info.value.args[0] == 'keep_it_secret/tests/spam'
def test_get_value_not_found_not_required(testing_vault_secrets: Secrets,
hvac_kv_v2_client: mock.Mock,
field: vault.VaultKV2Field):
# Given
testing_vault_secrets.vault.client = hvac_kv_v2_client
hvac_kv_v2_client.secrets.kv.v2.read_secret_version.side_effect = hvac.exceptions.InvalidPath()
field = vault.VaultKV2Field(
'keep_it_secret/', 'tests/spam', required=False,
)
result = field(testing_vault_secrets)
# Then
assert result is None
def test_get_value(testing_vault_secrets: Secrets,
hvac_kv_v2_client: mock.Mock,
field: vault.VaultKV2Field):
# Given
testing_vault_secrets.vault.client = hvac_kv_v2_client
hvac_kv_v2_client.secrets.kv.v2.read_secret_version.return_value = {
'data': {
'data': {
'spam': True,
},
},
}
# When
result = field(testing_vault_secrets)
# Then
assert result == {'spam': True}

View File

@ -1,109 +0,0 @@
# -*- coding: utf-8 -*-
# type: ignore
from __future__ import annotations
import os
from unittest import mock
import pytest
from pytest_mock import MockerFixture
from keep_it_secret.ext import vault
@pytest.fixture
def mock_hvac_client(mocker: MockerFixture) -> mock.Mock:
return mocker.patch.object(vault.hvac, 'Client')
@pytest.fixture
def hvac_client() -> mock.Mock:
return mock.Mock()
def test_init():
# When
result = vault.VaultSecrets()
# Then
assert result.client is None
@mock.patch.dict(
os.environ,
{
'VAULT_URL': 'https://vault.work/',
'VAULT_TOKEN': 'test_vault_token',
'VAULT_CLIENT_CERT_PATH': '/tmp/vault_client_cert.pem',
'VAULT_CLIENT_KEY_PATH': '/tmp/vault_client_key.pem',
'VAULT_SERVER_CERT_PATH': '/tmp/vault_server_cert.pem',
},
)
def test_as_hvac_client_kwargs():
# Given
secrets = vault.VaultSecrets()
# When
result = secrets.as_hvac_client_kwargs()
# Then
assert result == {
'url': 'https://vault.work/',
'token': 'test_vault_token',
'cert': ('/tmp/vault_client_cert.pem', '/tmp/vault_client_key.pem'),
'verify': '/tmp/vault_server_cert.pem',
}
@mock.patch.dict(
os.environ,
{
'VAULT_URL': 'https://vault.work/',
'VAULT_TOKEN': 'test_vault_token',
},
)
def test_as_hvac_client_kwargs_without_optional_fields():
# Given
secrets = vault.VaultSecrets()
# When
result = secrets.as_hvac_client_kwargs()
# Then
assert result == {
'url': 'https://vault.work/',
'token': 'test_vault_token',
}
def test_get_client_cache_miss(mock_hvac_client: mock.Mock,
hvac_client: mock.Mock):
# Given
mock_hvac_client.return_value = hvac_client
secrets = vault.VaultSecrets()
# When
result = secrets.get_client()
# Then
assert result == hvac_client
assert secrets.client == hvac_client
mock_hvac_client.assert_called_once_with(**secrets.as_hvac_client_kwargs())
def test_get_client_cache_hit(mock_hvac_client: mock.Mock,
hvac_client: mock.Mock):
# Given
secrets = vault.VaultSecrets()
secrets.client = hvac_client
# When
result = secrets.get_client()
# Then
assert result == hvac_client
mock_hvac_client.assert_not_called()

View File

@ -1,15 +1,14 @@
# -*- coding: utf-8 -*-x # -*- coding: utf-8 -*-
# type: ignore # type: ignore
from __future__ import annotations from __future__ import annotations
from keep_it_secret import secrets from keep_it_secret import secrets
from keep_it_secret.fields import LiteralField
from tests.fixtures import TestingSecrets from tests.fixtures import TestingSecrets
class NestedSecrets(secrets.Secrets): class ParentSecrets(secrets.Secrets):
spameggs: str = LiteralField.new('spameggs') pass
def test_init(): def test_init():
@ -22,12 +21,15 @@ def test_init():
assert result.__secrets_data__ == {'spam': 'spam', 'eggs': 'eggs'} assert result.__secrets_data__ == {'spam': 'spam', 'eggs': 'eggs'}
def test_init_with_parent(testing_secrets: TestingSecrets): def test_init_with_parent():
# Given
parent_secrets = ParentSecrets()
# When # When
result = NestedSecrets(parent=testing_secrets) result = TestingSecrets(parent=parent_secrets)
# Then # Then
assert result.__secrets_parent__ is testing_secrets assert result.__secrets_parent__ is parent_secrets
def test_field_property(testing_secrets: TestingSecrets): def test_field_property(testing_secrets: TestingSecrets):
@ -36,58 +38,3 @@ def test_field_property(testing_secrets: TestingSecrets):
# Then # Then
assert result == testing_secrets.__secrets_data__['spam'] assert result == testing_secrets.__secrets_data__['spam']
def test_resolve_dependency_from_self():
# Given
secrets = NestedSecrets()
# When
result = secrets.resolve_dependency('spameggs')
# Then
assert result == secrets.spameggs
def test_resolve_dependency_from_parent(testing_secrets: TestingSecrets):
# Given
secrets = NestedSecrets(parent=testing_secrets)
# When
result = secrets.resolve_dependency('spam')
# Then
assert result == testing_secrets.spam
def test_resolve_dependency_unresolved_from_self():
# Given
secrets = NestedSecrets()
# When
result = secrets.resolve_dependency('thisisntright')
# Then
assert result is secrets.UNRESOLVED_DEPENDENCY
def test_resolve_dependency_unresolved_with_parent(testing_secrets: TestingSecrets):
# Given
secrets = NestedSecrets(parent=testing_secrets)
# When
result = secrets.resolve_dependency('thisisntright')
# Then
assert result is secrets.UNRESOLVED_DEPENDENCY
def test_resolve_dependency_unresolved_dont_include_parents(testing_secrets: TestingSecrets):
# Given
secrets = NestedSecrets(parent=testing_secrets)
# When
result = secrets.resolve_dependency('spam', include_parents=False)
# Then
assert result is secrets.UNRESOLVED_DEPENDENCY