keep-it-secret/keep_it_secret/ext/vault.py
2025-10-24 11:12:23 +00:00

221 lines
6.4 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import annotations
import typing
import hvac
from keep_it_secret.fields import EnvField, Field
from keep_it_secret.secrets import Secrets
class BaseVaultSecrets(Secrets):
"""
Base :py:class:`keep_it_secret.Secrets` subclass for Vault-base secrets.
"""
url: str = EnvField.new('VAULT_URL', required=True)
"""
Maps ``VAULT_URL`` environment variable.
:type: ``str``
"""
client_cert_path: str | None = EnvField.new('VAULT_CLIENT_CERT_PATH', required=False)
"""
Maps ``VAULT_CLIENT_CERT_PATH`` environment variable.
:type: ``str | None``
"""
client_key_path: str | None = EnvField.new('VAULT_CLIENT_KEY_PATH', required=False)
"""
Maps ``VAULT_CLIENT_KEY_PATH`` environment variable.
:type: ``str | None``
"""
server_cert_path: str | None = EnvField.new('VAULT_SERVER_CERT_PATH', required=False)
"""
Maps ``VAULT_SERVER_CERT_PATH`` environment variable.
:type: ``str | None``
"""
def __init__(self, parent: Secrets | None = None):
super().__init__(parent)
self.client: hvac.Client | None = None
def as_hvac_client_kwargs(self) -> dict[str, typing.Any]:
"""
Return representation of the mapped variables for use in
``hvac.Client`` constructor.
"""
result: dict[str, typing.Any] = {
'url': self.url,
}
if self.client_cert_path is not None and self.client_key_path is not None:
result['cert'] = (self.client_cert_path, self.client_key_path)
if self.server_cert_path is not None:
result['verify'] = self.server_cert_path
return result
def get_client(self) -> hvac.Client:
"""
Return the ``hvac.Client`` instance configured using the credentials.
"""
if self.client is None:
self.client = hvac.Client(
**self.as_hvac_client_kwargs(),
)
return self.client
class VaultSecrets(BaseVaultSecrets):
"""
Concrete :py:class:`BaseVaultSecrets` subclass that uses token to
authenticate with Vault.
"""
token: str = EnvField.new('VAULT_TOKEN', required=True)
"""
Maps ``VAULT_TOKEN`` environment variable.
:type: ``str``
"""
def as_hvac_client_kwargs(self) -> dict[str, typing.Any]:
result = super().as_hvac_client_kwargs()
result['token'] = self.token
return result
class AppRoleVaultSecrets(BaseVaultSecrets):
"""
Concrete :py:class:`BaseVaultSecrets` subclass that uses app role to
authenticate with Vault.
"""
role_id: str = EnvField.new('VAULT_ROLE_ID', required=True)
"""
Maps ``VAULT_ROLE_ID`` environment variable.
:type: ``str``
"""
secret_id: str = EnvField.new('VAULT_SECRET_ID', required=True)
"""
Maps ``VAULT_SECRET_ID`` environment variable.
:type: ``str``
"""
def get_client(self) -> hvac.Client:
if self.client is None:
super().get_client()
self.client.auth.approle.login( # type: ignore[attr-defined]
role_id=self.role_id,
secret_id=self.secret_id,
)
return self.client
class VaultKV2Field(Field):
"""
Concrete :py:class:`keep_it_secret.Field` subclass that uses Hashicorp
Vault KV V2 secrets engine to resolve the value.
If ``as_type`` isn't provided, the fetched value will be returned as-is (
i.e. as a dict). Otherwise, ``as_type`` should be a type which accepts
kwargs representing the value's keys in its constructor.
:param mount_point: Mount path for the secret engine.
:param path: Path to the secret to fetch.
:param version: Version identifier. Defaults to ``None`` (aka the newest
version).
:param default: Default value. Defaults to ``None``.
"""
def __init__(self,
mount_point: str,
path: str,
version: str | None = None,
default: typing.Any = None,
**field_options):
super().__init__(**field_options)
as_type: type | None = field_options.pop('as_type', None)
self.as_type = self.cast if as_type is not None else None # type: ignore[assignment]
self.mount_point = mount_point
self.path = path
self.version = version
self.default = default
self.cast_to = as_type
@classmethod
def new(cls: type[VaultKV2Field], # type: ignore[override]
mount_point: str,
path: str,
version: str | None = None,
default: typing.Any = None,
**field_options):
return cls(mount_point, path, version=version, default=default, **field_options)
def cast(self, data: typing.Any) -> typing.Any:
"""
Cast ``data`` to the type specified in ``as_type`` argument of the
constructor.
"""
if isinstance(data, dict) is False:
return data
if self.cast_to is None:
return data
return self.cast_to(**data)
def get_value(self, secrets: Secrets) -> typing.Any:
"""
Retrieve, decode and return the secret stored in a KV V2 secrets engine
mounted at *mount_path* under the path *path*.
Depends on :py:class:`VaultSecrets` to be declared in ``vault`` field
on ``secrets`` or one of its parents.
:raises DependencyMissing: Signal that ``secrets.aws`` field is
missing.
:raises RequiredValueMissing: Signal the field's value is required but
*secret_id* is not present in the secrets engine.
"""
vault_secrets: VaultSecrets = secrets.resolve_dependency(
'vault', include_parents=True,
)
if vault_secrets is secrets.UNRESOLVED_DEPENDENCY:
raise self.DependencyMissing('vault')
client = vault_secrets.get_client()
try:
secret_response = client.secrets.kv.v2.read_secret_version(
self.path,
version=self.version,
mount_point=self.mount_point,
raise_on_deleted_version=True,
)
return secret_response['data']['data']
except hvac.exceptions.InvalidPath as exception:
if self.required is True:
raise self.RequiredValueMissing(f'{self.mount_point}{self.path}') from exception
else:
return self.default