123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- # -*- coding: utf-8 -*-
- '''
- Unit tests for the Vault runner
- '''
- # Import Python Libs
- from __future__ import absolute_import, print_function, unicode_literals
- import logging
- # Import Salt Testing Libs
- from tests.support.mixins import LoaderModuleMockMixin
- from tests.support.unit import TestCase
- from tests.support.mock import (
- MagicMock,
- Mock,
- patch,
- ANY,
- call
- )
- # Import salt libs
- from salt.ext import six
- import salt.runners.vault as vault
- log = logging.getLogger(__name__)
- class VaultTest(TestCase, LoaderModuleMockMixin):
- '''
- Tests for the runner module of the Vault integration
- '''
- def setup_loader_modules(self):
- return {vault: {}}
- def setUp(self):
- self.grains = {
- 'id': 'test-minion',
- 'roles': ['web', 'database'],
- 'aux': ['foo', 'bar'],
- 'deep': {
- 'foo': {
- 'bar': {
- 'baz': [
- 'hello',
- 'world'
- ]
- }
- }
- },
- 'mixedcase': 'UP-low-UP'
- }
- def tearDown(self):
- del self.grains
- def test_pattern_list_expander(self):
- '''
- Ensure _expand_pattern_lists works as intended:
- - Expand list-valued patterns
- - Do not change non-list-valued tokens
- '''
- cases = {
- 'no-tokens-to-replace': ['no-tokens-to-replace'],
- 'single-dict:{minion}': ['single-dict:{minion}'],
- 'single-list:{grains[roles]}': ['single-list:web', 'single-list:database'],
- 'multiple-lists:{grains[roles]}+{grains[aux]}': [
- 'multiple-lists:web+foo',
- 'multiple-lists:web+bar',
- 'multiple-lists:database+foo',
- 'multiple-lists:database+bar',
- ],
- 'single-list-with-dicts:{grains[id]}+{grains[roles]}+{grains[id]}': [
- 'single-list-with-dicts:{grains[id]}+web+{grains[id]}',
- 'single-list-with-dicts:{grains[id]}+database+{grains[id]}'
- ],
- 'deeply-nested-list:{grains[deep][foo][bar][baz]}': [
- 'deeply-nested-list:hello',
- 'deeply-nested-list:world'
- ]
- }
- # The mappings dict is assembled in _get_policies, so emulate here
- mappings = {'minion': self.grains['id'], 'grains': self.grains}
- for case, correct_output in six.iteritems(cases):
- output = vault._expand_pattern_lists(case, **mappings) # pylint: disable=protected-access
- diff = set(output).symmetric_difference(set(correct_output))
- if diff:
- log.debug('Test %s failed', case)
- log.debug('Expected:\n\t%s\nGot\n\t%s', output, correct_output)
- log.debug('Difference:\n\t%s', diff)
- self.assertEqual(output, correct_output)
- def test_get_policies_for_nonexisting_minions(self):
- minion_id = 'salt_master'
- # For non-existing minions, or the master-minion, grains will be None
- cases = {
- 'no-tokens-to-replace': ['no-tokens-to-replace'],
- 'single-dict:{minion}': ['single-dict:{0}'.format(minion_id)],
- 'single-list:{grains[roles]}': []
- }
- with patch('salt.utils.minions.get_minion_data',
- MagicMock(return_value=(None, None, None))):
- for case, correct_output in six.iteritems(cases):
- test_config = {'policies': [case]}
- output = vault._get_policies(minion_id, test_config) # pylint: disable=protected-access
- diff = set(output).symmetric_difference(set(correct_output))
- if diff:
- log.debug('Test %s failed', case)
- log.debug('Expected:\n\t%s\nGot\n\t%s', output, correct_output)
- log.debug('Difference:\n\t%s', diff)
- self.assertEqual(output, correct_output)
- def test_get_policies(self):
- '''
- Ensure _get_policies works as intended, including expansion of lists
- '''
- cases = {
- 'no-tokens-to-replace': ['no-tokens-to-replace'],
- 'single-dict:{minion}': ['single-dict:test-minion'],
- 'single-list:{grains[roles]}': ['single-list:web', 'single-list:database'],
- 'multiple-lists:{grains[roles]}+{grains[aux]}': [
- 'multiple-lists:web+foo',
- 'multiple-lists:web+bar',
- 'multiple-lists:database+foo',
- 'multiple-lists:database+bar',
- ],
- 'single-list-with-dicts:{grains[id]}+{grains[roles]}+{grains[id]}': [
- 'single-list-with-dicts:test-minion+web+test-minion',
- 'single-list-with-dicts:test-minion+database+test-minion'
- ],
- 'deeply-nested-list:{grains[deep][foo][bar][baz]}': [
- 'deeply-nested-list:hello',
- 'deeply-nested-list:world'
- ],
- 'should-not-cause-an-exception,but-result-empty:{foo}': [],
- 'Case-Should-Be-Lowered:{grains[mixedcase]}': [
- 'case-should-be-lowered:up-low-up'
- ]
- }
- with patch('salt.utils.minions.get_minion_data',
- MagicMock(return_value=(None, self.grains, None))):
- for case, correct_output in six.iteritems(cases):
- test_config = {'policies': [case]}
- output = vault._get_policies('test-minion', test_config) # pylint: disable=protected-access
- diff = set(output).symmetric_difference(set(correct_output))
- if diff:
- log.debug('Test %s failed', case)
- log.debug('Expected:\n\t%s\nGot\n\t%s', output, correct_output)
- log.debug('Difference:\n\t%s', diff)
- self.assertEqual(output, correct_output)
- def test_get_token_create_url(self):
- '''
- Ensure _get_token_create_url parses config correctly
- '''
- self.assertEqual(vault._get_token_create_url( # pylint: disable=protected-access
- {"url": "http://127.0.0.1"}),
- "http://127.0.0.1/v1/auth/token/create")
- self.assertEqual(vault._get_token_create_url( # pylint: disable=protected-access
- {"url": "https://127.0.0.1/"}),
- "https://127.0.0.1/v1/auth/token/create")
- self.assertEqual(vault._get_token_create_url( # pylint: disable=protected-access
- {"url": "http://127.0.0.1:8200", "role_name": "therole"}),
- "http://127.0.0.1:8200/v1/auth/token/create/therole")
- self.assertEqual(vault._get_token_create_url( # pylint: disable=protected-access
- {"url": "https://127.0.0.1/test", "role_name": "therole"}),
- "https://127.0.0.1/test/v1/auth/token/create/therole")
- def _mock_json_response(data, status_code=200, reason=""):
- '''
- Mock helper for http response
- '''
- response = MagicMock()
- response.json = MagicMock(return_value=data)
- response.status_code = status_code
- response.reason = reason
- return Mock(return_value=response)
- class VaultTokenAuthTest(TestCase, LoaderModuleMockMixin):
- '''
- Tests for the runner module of the Vault with token setup
- '''
- def setup_loader_modules(self):
- return {
- vault: {
- '__opts__': {
- 'vault': {
- 'url': "http://127.0.0.1",
- "auth": {
- 'token': 'test',
- 'method': 'token'
- }
- }
- }
- }
- }
- @patch('salt.runners.vault._validate_signature', MagicMock(return_value=None))
- @patch('salt.runners.vault._get_token_create_url', MagicMock(return_value="http://fake_url"))
- def test_generate_token(self):
- '''
- Basic tests for test_generate_token: all exits
- '''
- mock = _mock_json_response({'auth': {'client_token': 'test'}})
- with patch('requests.post', mock):
- result = vault.generate_token('test-minion', 'signature')
- log.debug('generate_token result: %s', result)
- self.assertTrue(isinstance(result, dict))
- self.assertFalse('error' in result)
- self.assertTrue('token' in result)
- self.assertEqual(result['token'], 'test')
- mock.assert_called_with("http://fake_url", headers=ANY, json=ANY, verify=ANY)
- mock = _mock_json_response({}, status_code=403, reason="no reason")
- with patch('requests.post', mock):
- result = vault.generate_token('test-minion', 'signature')
- self.assertTrue(isinstance(result, dict))
- self.assertTrue('error' in result)
- self.assertEqual(result['error'], "no reason")
- with patch('salt.runners.vault._get_policies', MagicMock(return_value=[])):
- result = vault.generate_token('test-minion', 'signature')
- self.assertTrue(isinstance(result, dict))
- self.assertTrue('error' in result)
- self.assertEqual(result['error'], 'No policies matched minion')
- with patch('requests.post',
- MagicMock(side_effect=Exception('Test Exception Reason'))):
- result = vault.generate_token('test-minion', 'signature')
- self.assertTrue(isinstance(result, dict))
- self.assertTrue('error' in result)
- self.assertEqual(result['error'], 'Test Exception Reason')
- class VaultAppRoleAuthTest(TestCase, LoaderModuleMockMixin):
- '''
- Tests for the runner module of the Vault with approle setup
- '''
- def setup_loader_modules(self):
- return {
- vault: {
- '__opts__': {
- 'vault': {
- 'url': "http://127.0.0.1",
- "auth": {
- 'method': 'approle',
- 'role_id': 'role',
- 'secret_id': 'secret'
- }
- }
- }
- }
- }
- @patch('salt.runners.vault._validate_signature', MagicMock(return_value=None))
- @patch('salt.runners.vault._get_token_create_url', MagicMock(return_value="http://fake_url"))
- def test_generate_token(self):
- '''
- Basic test for test_generate_token with approle (two vault calls)
- '''
- mock = _mock_json_response({'auth': {'client_token': 'test'}})
- with patch('requests.post', mock):
- result = vault.generate_token('test-minion', 'signature')
- log.debug('generate_token result: %s', result)
- self.assertTrue(isinstance(result, dict))
- self.assertFalse('error' in result)
- self.assertTrue('token' in result)
- self.assertEqual(result['token'], 'test')
- calls = [
- call("http://127.0.0.1/v1/auth/approle/login", json=ANY, verify=ANY),
- call("http://fake_url", headers=ANY, json=ANY, verify=ANY)
- ]
- mock.assert_has_calls(calls)
|