123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- # -*- coding: utf-8 -*-
- # Import python libs
- from __future__ import absolute_import, unicode_literals
- import sys
- # Import Salt Libs
- import salt.utils.minions
- from tests.support.mock import MagicMock, patch
- # Import Salt Testing Libs
- from tests.support.unit import TestCase, skipIf
- NODEGROUPS = {
- "group1": "L@host1,host2,host3",
- "group2": ["G@foo:bar", "or", "web1*"],
- "group3": ["N@group1", "or", "N@group2"],
- "group4": ["host4", "host5", "host6"],
- "group5": "N@group4",
- "group6": "N@group3",
- "group7": ["host1"],
- }
- EXPECTED = {
- "group1": ["L@host1,host2,host3"],
- "group2": ["G@foo:bar", "or", "web1*"],
- "group3": [
- "(",
- "(",
- "L@host1,host2,host3",
- ")",
- "or",
- "(",
- "G@foo:bar",
- "or",
- "web1*",
- ")",
- ")",
- ],
- "group4": ["L@host4,host5,host6"],
- "group5": ["(", "L@host4,host5,host6", ")"],
- "group6": [
- "(",
- "(",
- "(",
- "L@host1,host2,host3",
- ")",
- "or",
- "(",
- "G@foo:bar",
- "or",
- "web1*",
- ")",
- ")",
- ")",
- ],
- "group7": ["L@host1"],
- }
- class MinionsTestCase(TestCase):
- """
- TestCase for salt.utils.minions module functions
- """
- def test_nodegroup_comp(self):
- """
- Test a simple string nodegroup
- """
- for nodegroup in NODEGROUPS:
- expected = EXPECTED[nodegroup]
- ret = salt.utils.minions.nodegroup_comp(nodegroup, NODEGROUPS)
- self.assertEqual(ret, expected)
- class CkMinionsTestCase(TestCase):
- """
- TestCase for salt.utils.minions.CkMinions class
- """
- def setUp(self):
- self.ckminions = salt.utils.minions.CkMinions({"minion_data_cache": True})
- def test_spec_check(self):
- # Test spec-only rule
- auth_list = ["@runner"]
- ret = self.ckminions.spec_check(auth_list, "test.arg", {}, "runner")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "test.arg", {}, "wheel")
- self.assertFalse(ret)
- ret = self.ckminions.spec_check(auth_list, "testarg", {}, "runner")
- mock_ret = {
- "error": {
- "name": "SaltInvocationError",
- "message": "A command invocation error occurred: Check syntax.",
- }
- }
- self.assertDictEqual(mock_ret, ret)
- # Test spec in plural form
- auth_list = ["@runners"]
- ret = self.ckminions.spec_check(auth_list, "test.arg", {}, "runner")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "test.arg", {}, "wheel")
- self.assertFalse(ret)
- # Test spec with module.function restriction
- auth_list = [{"@runner": "test.arg"}]
- ret = self.ckminions.spec_check(auth_list, "test.arg", {}, "runner")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "test.arg", {}, "wheel")
- self.assertFalse(ret)
- ret = self.ckminions.spec_check(auth_list, "tes.arg", {}, "runner")
- self.assertFalse(ret)
- ret = self.ckminions.spec_check(auth_list, "test.ar", {}, "runner")
- self.assertFalse(ret)
- # Test function name is a regex
- auth_list = [{"@runner": "test.arg.*some"}]
- ret = self.ckminions.spec_check(auth_list, "test.arg", {}, "runner")
- self.assertFalse(ret)
- ret = self.ckminions.spec_check(auth_list, "test.argsome", {}, "runner")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "test.arg_aaa_some", {}, "runner")
- self.assertTrue(ret)
- # Test a list of funcs
- auth_list = [{"@runner": ["test.arg", "jobs.active"]}]
- ret = self.ckminions.spec_check(auth_list, "test.arg", {}, "runner")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "jobs.active", {}, "runner")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "test.active", {}, "runner")
- self.assertFalse(ret)
- ret = self.ckminions.spec_check(auth_list, "jobs.arg", {}, "runner")
- self.assertFalse(ret)
- # Test args-kwargs rules
- auth_list = [
- {
- "@runner": {
- "test.arg": {
- "args": ["1", "2"],
- "kwargs": {"aaa": "bbb", "ccc": "ddd"},
- }
- }
- }
- ]
- ret = self.ckminions.spec_check(auth_list, "test.arg", {}, "runner")
- self.assertFalse(ret)
- args = {"arg": ["1", "2"], "kwarg": {"aaa": "bbb", "ccc": "ddd"}}
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- args = {"arg": ["1", "2", "3"], "kwarg": {"aaa": "bbb", "ccc": "ddd"}}
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- args = {"arg": ["1", "2"], "kwarg": {"aaa": "bbb", "ccc": "ddd", "zzz": "zzz"}}
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- args = {"arg": ["1", "2"], "kwarg": {"aaa": "bbb", "ccc": "ddc"}}
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertFalse(ret)
- args = {"arg": ["1", "2"], "kwarg": {"aaa": "bbb"}}
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertFalse(ret)
- args = {"arg": ["1", "3"], "kwarg": {"aaa": "bbb", "ccc": "ddd"}}
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertFalse(ret)
- args = {"arg": ["1"], "kwarg": {"aaa": "bbb", "ccc": "ddd"}}
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertFalse(ret)
- args = {"kwarg": {"aaa": "bbb", "ccc": "ddd"}}
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertFalse(ret)
- args = {
- "arg": ["1", "2"],
- }
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertFalse(ret)
- # Test kwargs only
- auth_list = [
- {"@runner": {"test.arg": {"kwargs": {"aaa": "bbb", "ccc": "ddd"}}}}
- ]
- ret = self.ckminions.spec_check(auth_list, "test.arg", {}, "runner")
- self.assertFalse(ret)
- args = {"arg": ["1", "2"], "kwarg": {"aaa": "bbb", "ccc": "ddd"}}
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- # Test args only
- auth_list = [{"@runner": {"test.arg": {"args": ["1", "2"]}}}]
- ret = self.ckminions.spec_check(auth_list, "test.arg", {}, "runner")
- self.assertFalse(ret)
- args = {"arg": ["1", "2"], "kwarg": {"aaa": "bbb", "ccc": "ddd"}}
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- # Test list of args
- auth_list = [
- {
- "@runner": [
- {
- "test.arg": [
- {
- "args": ["1", "2"],
- "kwargs": {"aaa": "bbb", "ccc": "ddd"},
- },
- {
- "args": ["2", "3"],
- "kwargs": {"aaa": "aaa", "ccc": "ccc"},
- },
- ]
- }
- ]
- }
- ]
- args = {"arg": ["1", "2"], "kwarg": {"aaa": "bbb", "ccc": "ddd"}}
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- args = {"arg": ["2", "3"], "kwarg": {"aaa": "aaa", "ccc": "ccc"}}
- ret = self.ckminions.spec_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- # Test @module form
- auth_list = ["@jobs"]
- ret = self.ckminions.spec_check(auth_list, "jobs.active", {}, "runner")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "jobs.active", {}, "wheel")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "test.arg", {}, "runner")
- self.assertFalse(ret)
- ret = self.ckminions.spec_check(auth_list, "job.arg", {}, "runner")
- self.assertFalse(ret)
- # Test @module: function
- auth_list = [{"@jobs": "active"}]
- ret = self.ckminions.spec_check(auth_list, "jobs.active", {}, "runner")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "jobs.active", {}, "wheel")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "jobs.active_jobs", {}, "runner")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "jobs.activ", {}, "runner")
- self.assertFalse(ret)
- # Test @module: [functions]
- auth_list = [{"@jobs": ["active", "li"]}]
- ret = self.ckminions.spec_check(auth_list, "jobs.active", {}, "runner")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "jobs.list_jobs", {}, "runner")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "jobs.last_run", {}, "runner")
- self.assertFalse(ret)
- # Test @module: function with args
- auth_list = [
- {"@jobs": {"active": {"args": ["1", "2"], "kwargs": {"a": "b", "c": "d"}}}}
- ]
- args = {"arg": ["1", "2"], "kwarg": {"a": "b", "c": "d"}}
- ret = self.ckminions.spec_check(auth_list, "jobs.active", args, "runner")
- self.assertTrue(ret)
- ret = self.ckminions.spec_check(auth_list, "jobs.active", {}, "runner")
- self.assertFalse(ret)
- @patch(
- "salt.utils.minions.CkMinions._pki_minions",
- MagicMock(return_value=["alpha", "beta", "gamma"]),
- )
- def test_auth_check(self):
- # Test function-only rule
- auth_list = ["test.ping"]
- ret = self.ckminions.auth_check(auth_list, "test.ping", None, "alpha")
- self.assertTrue(ret)
- ret = self.ckminions.auth_check(auth_list, "test.arg", None, "alpha")
- self.assertFalse(ret)
- # Test minion and function
- auth_list = [{"alpha": "test.ping"}]
- ret = self.ckminions.auth_check(auth_list, "test.ping", None, "alpha")
- self.assertTrue(ret)
- ret = self.ckminions.auth_check(auth_list, "test.arg", None, "alpha")
- self.assertFalse(ret)
- ret = self.ckminions.auth_check(auth_list, "test.ping", None, "beta")
- self.assertFalse(ret)
- # Test function list
- auth_list = [{"*": ["test.*", "saltutil.cmd"]}]
- ret = self.ckminions.auth_check(auth_list, "test.arg", None, "alpha")
- self.assertTrue(ret)
- ret = self.ckminions.auth_check(auth_list, "test.ping", None, "beta")
- self.assertTrue(ret)
- ret = self.ckminions.auth_check(auth_list, "saltutil.cmd", None, "gamma")
- self.assertTrue(ret)
- ret = self.ckminions.auth_check(auth_list, "saltutil.running", None, "gamma")
- self.assertFalse(ret)
- # Test an args and kwargs rule
- auth_list = [
- {
- "alpha": {
- "test.arg": {
- "args": ["1", "2"],
- "kwargs": {"aaa": "bbb", "ccc": "ddd"},
- }
- }
- }
- ]
- ret = self.ckminions.auth_check(auth_list, "test.arg", None, "runner")
- self.assertFalse(ret)
- ret = self.ckminions.auth_check(auth_list, "test.arg", [], "runner")
- self.assertFalse(ret)
- args = ["1", "2", {"aaa": "bbb", "ccc": "ddd", "__kwarg__": True}]
- ret = self.ckminions.auth_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- args = [
- "1",
- "2",
- "3",
- {"aaa": "bbb", "ccc": "ddd", "eee": "fff", "__kwarg__": True},
- ]
- ret = self.ckminions.auth_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- args = ["1", {"aaa": "bbb", "ccc": "ddd", "__kwarg__": True}]
- ret = self.ckminions.auth_check(auth_list, "test.arg", args, "runner")
- self.assertFalse(ret)
- args = ["1", "2", {"aaa": "bbb", "__kwarg__": True}]
- ret = self.ckminions.auth_check(auth_list, "test.arg", args, "runner")
- self.assertFalse(ret)
- args = ["1", "3", {"aaa": "bbb", "ccc": "ddd", "__kwarg__": True}]
- ret = self.ckminions.auth_check(auth_list, "test.arg", args, "runner")
- self.assertFalse(ret)
- args = ["1", "2", {"aaa": "bbb", "ccc": "fff", "__kwarg__": True}]
- ret = self.ckminions.auth_check(auth_list, "test.arg", args, "runner")
- self.assertFalse(ret)
- # Test kwargs only rule
- auth_list = [{"alpha": {"test.arg": {"kwargs": {"aaa": "bbb", "ccc": "ddd"}}}}]
- args = ["1", "2", {"aaa": "bbb", "ccc": "ddd", "__kwarg__": True}]
- ret = self.ckminions.auth_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- args = [{"aaa": "bbb", "ccc": "ddd", "eee": "fff", "__kwarg__": True}]
- ret = self.ckminions.auth_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- # Test args only rule
- auth_list = [{"alpha": {"test.arg": {"args": ["1", "2"]}}}]
- args = ["1", "2", {"aaa": "bbb", "ccc": "ddd", "__kwarg__": True}]
- ret = self.ckminions.auth_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- args = ["1", "2"]
- ret = self.ckminions.auth_check(auth_list, "test.arg", args, "runner")
- self.assertTrue(ret)
- @skipIf(
- sys.version_info < (2, 7), "Python 2.7 needed for dictionary equality assertions"
- )
- class TargetParseTestCase(TestCase):
- def test_parse_grains_target(self):
- """
- Ensure proper parsing for grains
- """
- g_tgt = "G@a:b"
- ret = salt.utils.minions.parse_target(g_tgt)
- self.assertDictEqual(ret, {"engine": "G", "pattern": "a:b", "delimiter": None})
- def test_parse_grains_pcre_target(self):
- """
- Ensure proper parsing for grains PCRE matching
- """
- p_tgt = "P@a:b"
- ret = salt.utils.minions.parse_target(p_tgt)
- self.assertDictEqual(ret, {"engine": "P", "pattern": "a:b", "delimiter": None})
- def test_parse_pillar_pcre_target(self):
- """
- Ensure proper parsing for pillar PCRE matching
- """
- j_tgt = "J@a:b"
- ret = salt.utils.minions.parse_target(j_tgt)
- self.assertDictEqual(ret, {"engine": "J", "pattern": "a:b", "delimiter": None})
- def test_parse_list_target(self):
- """
- Ensure proper parsing for list matching
- """
- l_tgt = "L@a:b"
- ret = salt.utils.minions.parse_target(l_tgt)
- self.assertDictEqual(ret, {"engine": "L", "pattern": "a:b", "delimiter": None})
- def test_parse_nodegroup_target(self):
- """
- Ensure proper parsing for pillar matching
- """
- n_tgt = "N@a:b"
- ret = salt.utils.minions.parse_target(n_tgt)
- self.assertDictEqual(ret, {"engine": "N", "pattern": "a:b", "delimiter": None})
- def test_parse_subnet_target(self):
- """
- Ensure proper parsing for subnet matching
- """
- s_tgt = "S@a:b"
- ret = salt.utils.minions.parse_target(s_tgt)
- self.assertDictEqual(ret, {"engine": "S", "pattern": "a:b", "delimiter": None})
- def test_parse_minion_pcre_target(self):
- """
- Ensure proper parsing for minion PCRE matching
- """
- e_tgt = "E@a:b"
- ret = salt.utils.minions.parse_target(e_tgt)
- self.assertDictEqual(ret, {"engine": "E", "pattern": "a:b", "delimiter": None})
- def test_parse_range_target(self):
- """
- Ensure proper parsing for range matching
- """
- r_tgt = "R@a:b"
- ret = salt.utils.minions.parse_target(r_tgt)
- self.assertDictEqual(ret, {"engine": "R", "pattern": "a:b", "delimiter": None})
- def test_parse_multiword_target(self):
- """
- Ensure proper parsing for multi-word targets
- Refs https://github.com/saltstack/salt/issues/37231
- """
- mw_tgt = "G@a:b c"
- ret = salt.utils.minions.parse_target(mw_tgt)
- self.assertEqual(ret["pattern"], "a:b c")
- class NodegroupCompTest(TestCase):
- """
- Test nodegroup comparisons found in
- salt.utils.minions.nodgroup_comp()
- """
- def test_simple_nodegroup(self):
- """
- Smoke test a very simple nodegroup. No recursion.
- """
- simple_nodegroup = {
- "group1": "L@foo.domain.com,bar.domain.com,baz.domain.com or bl*.domain.com"
- }
- ret = salt.utils.minions.nodegroup_comp("group1", simple_nodegroup)
- expected_ret = [
- "L@foo.domain.com,bar.domain.com,baz.domain.com",
- "or",
- "bl*.domain.com",
- ]
- self.assertListEqual(ret, expected_ret)
- def test_simple_expression_nodegroup(self):
- """
- Smoke test a nodegroup with a simple expression. No recursion.
- """
- simple_nodegroup = {"group1": "[foo,bar,baz].domain.com"}
- ret = salt.utils.minions.nodegroup_comp("group1", simple_nodegroup)
- expected_ret = ["E@[foo,bar,baz].domain.com"]
- self.assertListEqual(ret, expected_ret)
- def test_simple_recurse(self):
- """
- Test a case where one nodegroup contains a second nodegroup
- """
- referenced_nodegroups = {
- "group1": "L@foo.domain.com,bar.domain.com,baz.domain.com or bl*.domain.com",
- "group2": "G@os:Debian and N@group1",
- }
- ret = salt.utils.minions.nodegroup_comp("group2", referenced_nodegroups)
- expected_ret = [
- "(",
- "G@os:Debian",
- "and",
- "(",
- "L@foo.domain.com,bar.domain.com,baz.domain.com",
- "or",
- "bl*.domain.com",
- ")",
- ")",
- ]
- self.assertListEqual(ret, expected_ret)
- def test_circular_nodegroup_reference(self):
- """
- Test to see what happens if A refers to B
- and B in turn refers back to A
- """
- referenced_nodegroups = {"group1": "N@group2", "group2": "N@group1"}
- # If this works, it should also print an error to the console
- ret = salt.utils.minions.nodegroup_comp("group1", referenced_nodegroups)
- self.assertEqual(ret, [])
|