123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- # -*- coding: utf-8 -*-
- # Import python libs
- from __future__ import absolute_import, print_function, unicode_literals
- # Import Salt Libs
- import salt.pillar.consul_pillar as consul_pillar
- # Import 3rd-party libs
- from salt.ext import six
- # Import Salt Testing libs
- from tests.support.mixins import LoaderModuleMockMixin
- from tests.support.mock import MagicMock, patch
- from tests.support.unit import TestCase, skipIf
- OPTS = {"consul_config": {"consul.port": 8500, "consul.host": "172.17.0.15"}}
- PILLAR_DATA = [
- {
- "Value": "/path/to/certs/testsite1.crt",
- "Key": "test-shared/sites/testsite1/ssl/certs/SSLCertificateFile",
- },
- {
- "Value": "/path/to/certs/testsite1.key",
- "Key": "test-shared/sites/testsite1/ssl/certs/SSLCertificateKeyFile",
- },
- {"Value": None, "Key": "test-shared/sites/testsite1/ssl/certs/"},
- {"Value": "True", "Key": "test-shared/sites/testsite1/ssl/force"},
- {"Value": None, "Key": "test-shared/sites/testsite1/ssl/"},
- {
- "Value": "salt://sites/testsite1.tmpl",
- "Key": "test-shared/sites/testsite1/template",
- },
- {"Value": "test.example.com", "Key": "test-shared/sites/testsite1/uri"},
- {"Value": None, "Key": "test-shared/sites/testsite1/"},
- {"Value": None, "Key": "test-shared/sites/"},
- {"Value": "Test User", "Key": "test-shared/user/full_name"},
- {"Value": "adm\nwww-data\nmlocate", "Key": "test-shared/user/groups"},
- {"Value": '"adm\nwww-data\nmlocate"', "Key": "test-shared/user/dontsplit"},
- {"Value": "yaml:\n key: value\n", "Key": "test-shared/user/dontexpand"},
- {"Value": None, "Key": "test-shared/user/blankvalue"},
- {"Value": "test", "Key": "test-shared/user/login"},
- {"Value": None, "Key": "test-shared/user/"},
- ]
- SIMPLE_DICT = {"key1": {"key2": "val1"}}
- @skipIf(not consul_pillar.consul, "python-consul module not installed")
- class ConsulPillarTestCase(TestCase, LoaderModuleMockMixin):
- """
- Test cases for salt.pillar.consul_pillar
- """
- def setup_loader_modules(self):
- return {
- consul_pillar: {
- "__opts__": OPTS,
- "get_conn": MagicMock(return_value="consul_connection"),
- }
- }
- def test_connection(self):
- with patch.dict(
- consul_pillar.__salt__, {"grains.get": MagicMock(return_value=({}))}
- ):
- with patch.object(
- consul_pillar,
- "consul_fetch",
- MagicMock(return_value=("2232", PILLAR_DATA)),
- ):
- consul_pillar.ext_pillar(
- "testminion", {}, "consul_config root=test-shared/"
- )
- consul_pillar.get_conn.assert_called_once_with(OPTS, "consul_config")
- def test_pillar_data(self):
- with patch.dict(
- consul_pillar.__salt__, {"grains.get": MagicMock(return_value=({}))}
- ):
- with patch.object(
- consul_pillar,
- "consul_fetch",
- MagicMock(return_value=("2232", PILLAR_DATA)),
- ):
- pillar_data = consul_pillar.ext_pillar(
- "testminion", {}, "consul_config root=test-shared/"
- )
- consul_pillar.consul_fetch.assert_called_once_with(
- "consul_connection", "test-shared"
- )
- assert sorted(pillar_data) == ["sites", "user"]
- self.assertNotIn("blankvalue", pillar_data["user"])
- def test_blank_root(self):
- with patch.dict(
- consul_pillar.__salt__, {"grains.get": MagicMock(return_value=({}))}
- ):
- with patch.object(
- consul_pillar,
- "consul_fetch",
- MagicMock(return_value=("2232", PILLAR_DATA)),
- ):
- pillar_data = consul_pillar.ext_pillar(
- "testminion", {}, "consul_config"
- )
- consul_pillar.consul_fetch.assert_called_once_with(
- "consul_connection", ""
- )
- assert sorted(pillar_data) == ["test-shared"]
- def test_pillar_nest(self):
- with patch.dict(
- consul_pillar.__salt__, {"grains.get": MagicMock(return_value=({}))}
- ):
- with patch.object(
- consul_pillar,
- "consul_fetch",
- MagicMock(return_value=("2232", PILLAR_DATA)),
- ):
- pillar_data = consul_pillar.ext_pillar(
- "testminion",
- {},
- "consul_config pillar_root=nested-key/ root=test-shared/ ",
- )
- assert sorted(pillar_data["nested-key"]) == ["sites", "user"]
- self.assertNotIn("blankvalue", pillar_data["nested-key"]["user"])
- def test_value_parsing(self):
- with patch.dict(
- consul_pillar.__salt__, {"grains.get": MagicMock(return_value=({}))}
- ):
- with patch.object(
- consul_pillar,
- "consul_fetch",
- MagicMock(return_value=("2232", PILLAR_DATA)),
- ):
- pillar_data = consul_pillar.ext_pillar(
- "testminion", {}, "consul_config root=test-shared/"
- )
- assert isinstance(pillar_data["user"]["dontsplit"], six.string_types)
- def test_non_expansion(self):
- with patch.dict(
- consul_pillar.__salt__, {"grains.get": MagicMock(return_value=({}))}
- ):
- with patch.object(
- consul_pillar,
- "consul_fetch",
- MagicMock(return_value=("2232", PILLAR_DATA)),
- ):
- pillar_data = consul_pillar.ext_pillar(
- "testminion",
- {},
- "consul_config root=test-shared/ expand_keys=false",
- )
- assert isinstance(pillar_data["user"]["dontexpand"], six.string_types)
- def test_dict_merge(self):
- test_dict = {}
- with patch.dict(test_dict, SIMPLE_DICT):
- self.assertDictEqual(
- consul_pillar.dict_merge(test_dict, SIMPLE_DICT), SIMPLE_DICT
- )
- with patch.dict(test_dict, {"key1": {"key3": {"key4": "value"}}}):
- self.assertDictEqual(
- consul_pillar.dict_merge(test_dict, SIMPLE_DICT),
- {"key1": {"key2": "val1", "key3": {"key4": "value"}}},
- )
|