keep-it-secret/keep_it_secret/ext/vault.py

178 lines
5.4 KiB
Python
Raw Permalink Normal View History

2024-02-08 20:52:08 +00:00
# -*- coding: utf-8 -*-
from __future__ import annotations
import typing
import hvac
from keep_it_secret.fields import EnvField, Field
from keep_it_secret.secrets import Secrets
class VaultSecrets(Secrets):
"""
Concrete :py:class:`keep_it_secret.Secrets` subclass that maps environment
variables to Vault credentials.
"""
url: str = EnvField.new('VAULT_URL', required=True)
"""
Maps ``VAULT_URL`` environment variable.
:type: ``str``
"""
token: str = EnvField.new('VAULT_TOKEN', required=True)
"""
Maps ``VAULT_TOKEN`` environment variable.
:type: ``str``
"""
client_cert_path: str | None = EnvField.new('VAULT_CLIENT_CERT_PATH', required=False)
"""
Maps ``VAULT_CLIENT_CERT_PATH`` environment variable.
:type: ``str | None``
"""
client_key_path: str | None = EnvField.new('VAULT_CLIENT_KEY_PATH', required=False)
"""
Maps ``VAULT_CLIENT_KEY_PATH`` environment variable.
:type: ``str | None``
"""
server_cert_path: str | None = EnvField.new('VAULT_SERVER_CERT_PATH', required=False)
"""
Maps ``VAULT_SERVER_CERT_PATH`` environment variable.
:type: ``str | None``
"""
def __init__(self, parent: Secrets | None = None):
super().__init__(parent)
self.client: hvac.Client | None = None
def as_hvac_client_kwargs(self) -> dict[str, typing.Any]:
"""
Return representation of the mapped variables for use in
``hvac.Client`` constructor.
"""
result: dict[str, typing.Any] = {
'url': self.url,
'token': self.token,
}
if self.client_cert_path is not None and self.client_key_path is not None:
result['cert'] = (self.client_cert_path, self.client_key_path)
if self.server_cert_path is not None:
result['verify'] = self.server_cert_path
return result
def get_client(self) -> hvac.Client:
"""
Return the ``hvac.Client`` instance configured using the credentials.
"""
if self.client is None:
self.client = hvac.Client(
**self.as_hvac_client_kwargs(),
)
return self.client
class VaultKV2Field(Field):
"""
Concrete :py:class:`keep_it_secret.Field` subclass that uses Hashicorp
Vault KV V2 secrets engine to resolve the value.
If ``as_type`` isn't provided, the fetched value will be returned as-is (
i.e. as a dict). Otherwise, ``as_type`` should be a type which accepts
kwargs representing the value's keys in its constructor.
:param mount_point: Mount path for the secret engine.
:param path: Path to the secret to fetch.
:param version: Version identifier. Defaults to ``None`` (aka the newest
version).
:param default: Default value. Defaults to ``None``.
"""
def __init__(self,
mount_point: str,
path: str,
version: str | None = None,
default: typing.Any = None,
**field_options):
super().__init__(**field_options)
as_type: type | None = field_options.pop('as_type', None)
self.as_type = self.cast if as_type is not None else None # type: ignore[assignment]
self.mount_point = mount_point
self.path = path
self.version = version
self.default = default
self.cast_to = as_type
@classmethod
def new(cls: type[VaultKV2Field], # type: ignore[override]
mount_point: str,
path: str,
version: str | None = None,
default: typing.Any = None,
**field_options):
return cls(mount_point, path, version=version, default=default, **field_options)
def cast(self, data: typing.Any) -> typing.Any:
"""
Cast ``data`` to the type specified in ``as_type`` argument of the
constructor.
"""
if isinstance(data, dict) is False:
return data
if self.cast_to is None:
return data
return self.cast_to(**data)
def get_value(self, secrets: Secrets) -> typing.Any:
"""
Retrieve, decode and return the secret stored in a KV V2 secrets engine
mounted at *mount_path* under the path *path*.
Depends on :py:class:`VaultSecrets` to be declared in ``vault`` field
on ``secrets`` or one of its parents.
:raises DependencyMissing: Signal that ``secrets.aws`` field is
missing.
:raises RequiredValueMissing: Signal the field's value is required but
*secret_id* is not present in the secrets engine.
"""
vault_secrets: VaultSecrets = secrets.resolve_dependency(
'vault', include_parents=True,
)
if vault_secrets is secrets.UNRESOLVED_DEPENDENCY:
raise self.DependencyMissing('vault')
client = vault_secrets.get_client()
try:
secret_response = client.secrets.kv.v2.read_secret_version(
self.path,
version=self.version,
mount_point=self.mount_point,
raise_on_deleted_version=True,
)
return secret_response['data']['data']
except hvac.exceptions.InvalidPath as exception:
if self.required is True:
raise self.RequiredValueMissing(f'{self.mount_point}{self.path}') from exception
else:
return self.default