123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- # -*- coding: utf-8 -*-
- """
- Test case for the vault utils module
- """
- # Import python libs
- from __future__ import absolute_import, print_function, unicode_literals
- import json
- import logging
- import os
- from copy import copy
- # Import Salt libs
- import salt.utils.vault as vault
- from tests.support.mixins import LoaderModuleMockMixin
- from tests.support.mock import ANY, MagicMock, Mock, mock_open, patch
- # Import Salt Testing libs
- from tests.support.unit import TestCase
- class RequestMock(Mock):
- """
- Request Mock
- """
- def get(self, *args, **kwargs):
- return {}
- class TestVaultUtils(LoaderModuleMockMixin, TestCase):
- """
- Test case for the vault utils module
- """
- json_success = {
- "request_id": "35df4df1-c3d8-b270-0682-ddb0160c7450",
- "lease_id": "",
- "renewable": False,
- "lease_duration": 0,
- "data": {
- "data": {"something": "myvalue"},
- "metadata": {
- "created_time": "2020-05-02T07:26:12.180848003Z",
- "deletion_time": "",
- "destroyed": False,
- "version": 1,
- },
- },
- "wrap_info": None,
- "warnings": None,
- "auth": None,
- }
- json_denied = {"errors": ["permission denied"]}
- cache_single = {
- "url": "http://127.0.0.1:8200",
- "token": "test",
- "verify": None,
- "uses": 1,
- "lease_duration": 100,
- "issued": 3000,
- }
- cache_uses = {
- "url": "http://127.0.0.1:8200",
- "token": "test",
- "verify": None,
- "uses": 10,
- "lease_duration": 100,
- "issued": 3000,
- "unlimited_use_token": False,
- }
- cache_uses_last = {
- "url": "http://127.0.0.1:8200",
- "token": "test",
- "verify": None,
- "uses": 1,
- "lease_duration": 100,
- "issued": 3000,
- "unlimited_use_token": False,
- }
- cache_unlimited = {
- "url": "http://127.0.0.1:8200",
- "token": "test",
- "verify": None,
- "uses": 0,
- "lease_duration": 100,
- "issued": 3000,
- "unlimited_use_token": True,
- }
- metadata_v2 = {
- "accessor": "kv_f8731f1b",
- "config": {
- "default_lease_ttl": 0,
- "force_no_cache": False,
- "max_lease_ttl": 0,
- },
- "description": "key/value secret storage",
- "external_entropy_access": False,
- "local": False,
- "options": {"version": "2"},
- "path": "secret/",
- "seal_wrap": False,
- "type": "kv",
- "uuid": "1d9431ac-060a-9b63-4572-3ca7ffd78347",
- }
- cache_secret_meta = {"vault_secret_path_metadata": {"secret/mything": metadata_v2}}
- def setup_loader_modules(self):
- return {
- vault: {
- "__opts__": {
- "vault": {
- "url": "http://127.0.0.1",
- "auth": {
- "token": "test",
- "method": "token",
- "uses": 15,
- "ttl": 500,
- },
- },
- "file_client": "local",
- "cachedir": "somepath",
- },
- "__grains__": {"id": "test-minion"},
- "__context__": {},
- }
- }
- def _mock_json_response(self, data, status_code=200, reason=""):
- """
- Mock helper for http response
- """
- response = MagicMock()
- response.json = MagicMock(return_value=data)
- response.status_code = status_code
- response.reason = reason
- if status_code == 200:
- response.ok = True
- else:
- response.ok = False
- return Mock(return_value=response)
- def test_make_request_single_use_token_run_ok(self):
- """
- Given single use token in __context__, function should run successful secret lookup with no other modifications
- """
- mock = self._mock_json_response(self.json_success)
- supplied_context = {"vault_token": copy(self.cache_single)}
- expected_headers = {"X-Vault-Token": "test", "Content-Type": "application/json"}
- with patch.dict(vault.__context__, supplied_context):
- with patch("requests.request", mock):
- vault_return = vault.make_request("/secret/my/secret", "key")
- self.assertEqual(vault.__context__, {})
- mock.assert_called_with(
- "/secret/my/secret",
- "http://127.0.0.1:8200/key",
- headers=expected_headers,
- verify=ANY,
- )
- self.assertEqual(vault_return.json(), self.json_success)
- def test_make_request_single_use_token_run_auth_error(self):
- """
- Given single use token in __context__ and login error, function should request token and re-run
- """
- # Disable logging because simulated http failures are logged as errors
- logging.disable(logging.CRITICAL)
- mock = self._mock_json_response(self.json_denied, status_code=400)
- supplied_context = {"vault_token": copy(self.cache_single)}
- expected_headers = {"X-Vault-Token": "test", "Content-Type": "application/json"}
- with patch.dict(vault.__context__, supplied_context):
- with patch("requests.request", mock):
- with patch.object(vault, "del_cache") as mock_del_cache:
- vault_return = vault.make_request("/secret/my/secret", "key")
- self.assertEqual(vault.__context__, {})
- mock.assert_called_with(
- "/secret/my/secret",
- "http://127.0.0.1:8200/key",
- headers=expected_headers,
- verify=ANY,
- )
- self.assertEqual(vault_return.json(), self.json_denied)
- mock_del_cache.assert_called()
- self.assertEqual(mock.call_count, 2)
- logging.disable(logging.NOTSET)
- def test_multi_use_token_successful_run(self):
- """
- Given multi-use token, function should get secret and decrement token
- """
- expected_cache_write = {
- "url": "http://127.0.0.1:8200",
- "token": "test",
- "verify": None,
- "uses": 9,
- "lease_duration": 100,
- "issued": 3000,
- "unlimited_use_token": False,
- }
- mock = self._mock_json_response(self.json_success)
- expected_headers = {"X-Vault-Token": "test", "Content-Type": "application/json"}
- with patch.object(vault, "get_cache") as mock_get_cache:
- mock_get_cache.return_value = copy(self.cache_uses)
- with patch("requests.request", mock):
- with patch.object(vault, "del_cache") as mock_del_cache:
- with patch.object(vault, "write_cache") as mock_write_cache:
- vault_return = vault.make_request("/secret/my/secret", "key")
- mock.assert_called_with(
- "/secret/my/secret",
- "http://127.0.0.1:8200/key",
- headers=expected_headers,
- verify=ANY,
- )
- mock_write_cache.assert_called_with(expected_cache_write)
- self.assertEqual(vault_return.json(), self.json_success)
- self.assertEqual(mock.call_count, 1)
- def test_multi_use_token_last_use(self):
- """
- Given last use of multi-use token, function should succeed and flush token cache
- """
- mock = self._mock_json_response(self.json_success)
- expected_headers = {"X-Vault-Token": "test", "Content-Type": "application/json"}
- with patch.object(vault, "get_cache") as mock_get_cache:
- mock_get_cache.return_value = self.cache_uses_last
- with patch("requests.request", mock):
- with patch.object(vault, "del_cache") as mock_del_cache:
- with patch.object(vault, "write_cache") as mock_write_cache:
- vault_return = vault.make_request("/secret/my/secret", "key")
- mock.assert_called_with(
- "/secret/my/secret",
- "http://127.0.0.1:8200/key",
- headers=expected_headers,
- verify=ANY,
- )
- mock_del_cache.assert_called()
- self.assertEqual(vault_return.json(), self.json_success)
- self.assertEqual(mock.call_count, 1)
- def test_unlimited_use_token_no_decrement(self):
- """
- Given unlimited-use token, function should succeed not del cache or decrement
- """
- mock = self._mock_json_response(self.json_success)
- expected_headers = {"X-Vault-Token": "test", "Content-Type": "application/json"}
- with patch.object(vault, "get_cache") as mock_get_cache:
- mock_get_cache.return_value = self.cache_unlimited
- with patch("requests.request", mock):
- with patch.object(vault, "del_cache") as mock_del_cache:
- with patch.object(vault, "write_cache") as mock_write_cache:
- vault_return = vault.make_request("/secret/my/secret", "key")
- mock.assert_called_with(
- "/secret/my/secret",
- "http://127.0.0.1:8200/key",
- headers=expected_headers,
- verify=ANY,
- )
- assert (
- not mock_del_cache.called
- ), "del cache should not be called for unlimited use token"
- assert (
- not mock_write_cache.called
- ), "write cache should not be called for unlimited use token"
- self.assertEqual(vault_return.json(), self.json_success)
- self.assertEqual(mock.call_count, 1)
- def test_get_cache_standard(self):
- """
- test standard first run of no cache file. Should generate new connection and write cache
- """
- with patch.object(vault, "_read_cache_file") as mock_read_cache:
- mock_read_cache.return_value = {}
- with patch.object(
- vault, "get_vault_connection"
- ) as mock_get_vault_connection:
- mock_get_vault_connection.return_value = copy(self.cache_single)
- with patch.object(vault, "write_cache") as mock_write_cache:
- cache_result = vault.get_cache()
- mock_write_cache.assert_called_with(copy(self.cache_single))
- def test_get_cache_existing_cache_valid(self):
- """
- test standard valid cache file
- """
- with patch("time.time", return_value=1234):
- with patch.object(vault, "_read_cache_file") as mock_read_cache:
- mock_read_cache.return_value = self.cache_uses
- with patch.object(vault, "write_cache") as mock_write_cache:
- with patch.object(vault, "del_cache") as mock_del_cache:
- cache_result = vault.get_cache()
- assert not mock_write_cache.called
- assert not mock_del_cache.called
- self.assertEqual(cache_result, self.cache_uses)
- def test_get_cache_existing_cache_old(self):
- """
- test old cache file
- """
- with patch("time.time", return_value=3101):
- with patch.object(
- vault, "get_vault_connection"
- ) as mock_get_vault_connection:
- mock_get_vault_connection.return_value = self.cache_uses
- with patch.object(vault, "_read_cache_file") as mock_read_cache:
- mock_read_cache.return_value = self.cache_uses
- with patch.object(vault, "write_cache") as mock_write_cache:
- with patch.object(vault, "del_cache") as mock_del_cache:
- cache_result = vault.get_cache()
- assert mock_del_cache.called
- assert mock_write_cache.called
- self.assertEqual(cache_result, self.cache_uses)
- def test_write_cache_standard(self):
- """
- Test write cache with standard single use token
- """
- function_response = vault.write_cache(copy(self.cache_single))
- self.assertEqual(vault.__context__["vault_token"], copy(self.cache_single))
- self.assertTrue(function_response)
- def test_write_cache_multi_use_token(self):
- """
- Test write cache with multi-use token
- """
- expected_write = {
- "url": "http://127.0.0.1:8200",
- "token": "test",
- "verify": None,
- "uses": 10,
- "lease_duration": 100,
- "issued": 3000,
- "unlimited_use_token": False,
- }
- with patch("salt.utils.files.fpopen", mock_open()) as mock_fpopen:
- function_response = vault.write_cache(self.cache_uses)
- assert mock_fpopen.call_count == 1
- self.assertListEqual(
- list(mock_fpopen.filehandles),
- [os.path.join("somepath", "salt_vault_token")],
- )
- opens = mock_fpopen.filehandles[
- os.path.join("somepath", "salt_vault_token")
- ]
- write_calls_output = json.loads(opens[0].write_calls[0])
- self.assertDictEqual(write_calls_output, expected_write)
- self.assertTrue(function_response)
- def test_write_cache_unlimited_token(self):
- """
- Test write cache with unlimited use token
- """
- write_data = {
- "url": "http://127.0.0.1:8200",
- "token": "test",
- "verify": None,
- "uses": 0,
- "lease_duration": 100,
- "issued": 3000,
- }
- expected_write = {
- "url": "http://127.0.0.1:8200",
- "token": "test",
- "verify": None,
- "uses": 0,
- "lease_duration": 100,
- "issued": 3000,
- "unlimited_use_token": True,
- }
- with patch("salt.utils.files.fpopen", mock_open()) as mock_fpopen:
- function_response = vault.write_cache(write_data)
- assert mock_fpopen.call_count == 1
- self.assertListEqual(
- list(mock_fpopen.filehandles),
- [os.path.join("somepath", "salt_vault_token")],
- )
- opens = mock_fpopen.filehandles[
- os.path.join("somepath", "salt_vault_token")
- ]
- write_calls_output = json.loads(opens[0].write_calls[0])
- self.assertEqual(write_calls_output, expected_write)
- self.assertTrue(function_response)
- def test_path_is_v2(self):
- """
- Validated v2 path is detected as vault kv v2
- """
- expected_return = {
- "v2": True,
- "data": "secret/data/mything",
- "metadata": "secret/metadata/mything",
- "delete": "secret/mything",
- "type": "kv",
- "destroy": "secret/destroy/mything",
- }
- with patch.object(vault, "_get_secret_path_metadata") as mock_get_metadata:
- mock_get_metadata.return_value = self.metadata_v2
- function_return = vault.is_v2("secret/mything")
- self.assertEqual(function_return, expected_return)
- def test_get_secret_path_metadata_no_cache(self):
- """
- test with no cache file
- """
- make_request_response = {
- "request_id": "b82f2df7-a9b6-920c-0ed2-a3463b996f9e",
- "lease_id": "",
- "renewable": False,
- "lease_duration": 0,
- "data": self.metadata_v2,
- "wrap_info": None,
- "warnings": None,
- "auth": None,
- }
- cache_object = copy(self.cache_uses)
- expected_cache_object = copy(self.cache_uses)
- expected_cache_object.update(copy(self.cache_secret_meta))
- secret_path = "secret/mything"
- mock = self._mock_json_response(make_request_response)
- with patch.object(vault, "_read_cache_file") as mock_read_cache:
- mock_read_cache.return_value = cache_object
- with patch.object(vault, "write_cache") as mock_write_cache:
- with patch("salt.utils.vault.make_request", mock):
- function_result = vault._get_secret_path_metadata(secret_path)
- self.assertEqual(function_result, self.metadata_v2)
- mock_write_cache.assert_called_with(cache_object)
- self.assertEqual(cache_object, expected_cache_object)
|