This commit is contained in:
2024-02-08 20:52:08 +00:00
parent 4ba36b3a87
commit c17775b715
13 changed files with 579 additions and 6 deletions

View File

View File

@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
# type: ignore
from __future__ import annotations
import pytest
from .fixtures import TestingVaultSecrets
@pytest.fixture
def testing_vault_secrets() -> TestingVaultSecrets:
return TestingVaultSecrets()

View File

@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
# -*- coding: utf-8 -*-
from __future__ import annotations
from keep_it_secret.ext.vault import VaultSecrets
from keep_it_secret.fields import SecretsField
from keep_it_secret.secrets import Secrets
class TestingVaultSecrets(Secrets):
vault = SecretsField.new(VaultSecrets)

View File

@@ -0,0 +1,212 @@
# -*- coding: utf-8 -*-
# type: ignore
from __future__ import annotations
from unittest import mock
import hvac
import pytest
from pytest_mock import MockerFixture
from keep_it_secret import Secrets
from keep_it_secret.ext import vault
@pytest.fixture
def field() -> vault.VaultKV2Field:
return vault.VaultKV2Field('keep_it_secret/', 'tests/spam')
@pytest.fixture
def as_type() -> mock.Mock:
return mock.Mock()
@pytest.fixture
def field_with_as_type(as_type: mock.Mock) -> vault.VaultKV2Field:
return vault.VaultKV2Field('keep_it_secret/', 'tests/spam', as_type=as_type)
@pytest.fixture
def hvac_kv_v2_client(mocker: MockerFixture) -> mock.Mock:
result = mock.Mock()
result.secrets.kv.v2 = mock.Mock(spec=['read_secret_version'])
return result
def test_init():
# When
result = vault.VaultKV2Field('keep_it_secret/', 'tests/spam')
# Then
assert result.mount_point == 'keep_it_secret/'
assert result.path == 'tests/spam'
assert result.version is None
assert result.default is None
assert result.cast_to is None
assert result.as_type is None
def test_init_with_version():
# When
result = vault.VaultKV2Field(
'keep_it_secret/', 'tests/spam', version='1',
)
# Then
assert result.version == '1'
def test_init_with_default():
# When
result = vault.VaultKV2Field(
'keep_it_secret/', 'tests/spam', default='eggs',
)
# Then
assert result.default == 'eggs'
def test_init_with_as_type():
# When
result = vault.VaultKV2Field(
'keep_it_secret/', 'tests/spam', as_type=dict,
)
# Then
assert result.as_type == result.cast
assert result.cast_to == dict
def test_init_with_field_options():
# When
result = vault.VaultKV2Field(
'keep_it_secret/', 'tests/spam', required=False, description='eggs',
)
# Then
assert result.required is False
assert result.description == 'eggs'
def test_new(mocker: MockerFixture):
# Given
mock_init = mocker.patch.object(
vault.VaultKV2Field, '__init__', return_value=None,
)
# When
_ = vault.VaultKV2Field.new(
'keep_it_secret/',
'tests/spam',
version='1',
default=dict,
as_type=dict,
required=False,
description='eggs',
)
# Then
mock_init.assert_called_once_with(
'keep_it_secret/',
'tests/spam',
version='1',
default=dict,
as_type=dict,
required=False,
description='eggs',
)
def test_cast_not_dict(field: vault.VaultKV2Field):
# When
result = field.cast('spam')
# Then
assert result == 'spam'
def test_cast(field_with_as_type: vault.VaultKV2Field, as_type: mock.Mock):
# Given
as_type.return_value = 'spam'
# When
result = field_with_as_type.cast({
'spam': True,
'eggs': False,
})
# Then
assert result == 'spam'
as_type.assert_called_once_with(
spam=True,
eggs=False,
)
def test_get_value_vault_dependency_missing(field: vault.VaultKV2Field,
testing_secrets: Secrets):
# Given
with pytest.raises(field.DependencyMissing) as exception_info:
# When
_ = field(testing_secrets)
# Then
assert exception_info.value.args[0] == 'vault'
def test_get_value_required_value_not_found(testing_vault_secrets: Secrets,
hvac_kv_v2_client: mock.Mock,
field: vault.VaultKV2Field):
# Given
testing_vault_secrets.vault.client = hvac_kv_v2_client
hvac_kv_v2_client.secrets.kv.v2.read_secret_version.side_effect = hvac.exceptions.InvalidPath()
with pytest.raises(field.RequiredValueMissing) as exception_info:
# When
_ = field(testing_vault_secrets)
# Then
assert exception_info.value.args[0] == 'keep_it_secret/tests/spam'
def test_get_value_not_found_not_required(testing_vault_secrets: Secrets,
hvac_kv_v2_client: mock.Mock,
field: vault.VaultKV2Field):
# Given
testing_vault_secrets.vault.client = hvac_kv_v2_client
hvac_kv_v2_client.secrets.kv.v2.read_secret_version.side_effect = hvac.exceptions.InvalidPath()
field = vault.VaultKV2Field(
'keep_it_secret/', 'tests/spam', required=False,
)
result = field(testing_vault_secrets)
# Then
assert result is None
def test_get_value(testing_vault_secrets: Secrets,
hvac_kv_v2_client: mock.Mock,
field: vault.VaultKV2Field):
# Given
testing_vault_secrets.vault.client = hvac_kv_v2_client
hvac_kv_v2_client.secrets.kv.v2.read_secret_version.return_value = {
'data': {
'data': {
'spam': True,
},
},
}
# When
result = field(testing_vault_secrets)
# Then
assert result == {'spam': True}

View File

@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
# type: ignore
from __future__ import annotations
import os
from unittest import mock
import pytest
from pytest_mock import MockerFixture
from keep_it_secret.ext import vault
@pytest.fixture
def mock_hvac_client(mocker: MockerFixture) -> mock.Mock:
return mocker.patch.object(vault.hvac, 'Client')
@pytest.fixture
def hvac_client() -> mock.Mock:
return mock.Mock()
def test_init():
# When
result = vault.VaultSecrets()
# Then
assert result.client is None
@mock.patch.dict(
os.environ,
{
'VAULT_URL': 'https://vault.work/',
'VAULT_TOKEN': 'test_vault_token',
'VAULT_CLIENT_CERT_PATH': '/tmp/vault_client_cert.pem',
'VAULT_CLIENT_KEY_PATH': '/tmp/vault_client_key.pem',
'VAULT_SERVER_CERT_PATH': '/tmp/vault_server_cert.pem',
},
)
def test_as_hvac_client_kwargs():
# Given
secrets = vault.VaultSecrets()
# When
result = secrets.as_hvac_client_kwargs()
# Then
assert result == {
'url': 'https://vault.work/',
'token': 'test_vault_token',
'cert': ('/tmp/vault_client_cert.pem', '/tmp/vault_client_key.pem'),
'verify': '/tmp/vault_server_cert.pem',
}
@mock.patch.dict(
os.environ,
{
'VAULT_URL': 'https://vault.work/',
'VAULT_TOKEN': 'test_vault_token',
},
)
def test_as_hvac_client_kwargs_without_optional_fields():
# Given
secrets = vault.VaultSecrets()
# When
result = secrets.as_hvac_client_kwargs()
# Then
assert result == {
'url': 'https://vault.work/',
'token': 'test_vault_token',
}
def test_get_client_cache_miss(mock_hvac_client: mock.Mock,
hvac_client: mock.Mock):
# Given
mock_hvac_client.return_value = hvac_client
secrets = vault.VaultSecrets()
# When
result = secrets.get_client()
# Then
assert result == hvac_client
assert secrets.client == hvac_client
mock_hvac_client.assert_called_once_with(**secrets.as_hvac_client_kwargs())
def test_get_client_cache_hit(mock_hvac_client: mock.Mock,
hvac_client: mock.Mock):
# Given
secrets = vault.VaultSecrets()
secrets.client = hvac_client
# When
result = secrets.get_client()
# Then
assert result == hvac_client
mock_hvac_client.assert_not_called()