123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819 |
- # -*- coding: utf-8 -*-
- """
- :codeauthor: Mike Place <mp@saltstack.com>
- """
- # Import pytohn libs
- from __future__ import absolute_import, print_function, unicode_literals
- import time
- import pytest
- # Import Salt libraries
- import salt.master
- import salt.utils.platform
- from salt import auth
- from salt.exceptions import SaltDeserializationError
- from tests.support.case import ModuleCase
- from tests.support.mock import MagicMock, call, patch
- # Import Salt Testing libs
- from tests.support.unit import TestCase, skipIf
- class LoadAuthTestCase(TestCase):
- def setUp(self): # pylint: disable=W0221
- patches = (
- ("salt.payload.Serial", None),
- (
- "salt.loader.auth",
- dict(
- return_value={
- "pam.auth": "fake_func_str",
- "pam.groups": "fake_groups_function_str",
- }
- ),
- ),
- (
- "salt.loader.eauth_tokens",
- dict(
- return_value={
- "localfs.mk_token": "fake_func_mktok",
- "localfs.get_token": "fake_func_gettok",
- "localfs.rm_roken": "fake_func_rmtok",
- }
- ),
- ),
- )
- for mod, mock in patches:
- if mock:
- patcher = patch(mod, **mock)
- else:
- patcher = patch(mod)
- patcher.start()
- self.addCleanup(patcher.stop)
- self.lauth = auth.LoadAuth({}) # Load with empty opts
- def test_get_tok_with_broken_file_will_remove_bad_token(self):
- fake_get_token = MagicMock(side_effect=SaltDeserializationError("hi"))
- patch_opts = patch.dict(self.lauth.opts, {"eauth_tokens": "testfs"})
- patch_get_token = patch.dict(
- self.lauth.tokens, {"testfs.get_token": fake_get_token},
- )
- mock_rm_token = MagicMock()
- patch_rm_token = patch.object(self.lauth, "rm_token", mock_rm_token)
- with patch_opts, patch_get_token, patch_rm_token:
- expected_token = "fnord"
- self.lauth.get_tok(expected_token)
- mock_rm_token.assert_called_with(expected_token)
- def test_get_tok_with_no_expiration_should_remove_bad_token(self):
- fake_get_token = MagicMock(return_value={"no_expire_here": "Nope"})
- patch_opts = patch.dict(self.lauth.opts, {"eauth_tokens": "testfs"})
- patch_get_token = patch.dict(
- self.lauth.tokens, {"testfs.get_token": fake_get_token},
- )
- mock_rm_token = MagicMock()
- patch_rm_token = patch.object(self.lauth, "rm_token", mock_rm_token)
- with patch_opts, patch_get_token, patch_rm_token:
- expected_token = "fnord"
- self.lauth.get_tok(expected_token)
- mock_rm_token.assert_called_with(expected_token)
- def test_get_tok_with_expire_before_current_time_should_remove_token(self):
- fake_get_token = MagicMock(return_value={"expire": time.time() - 1})
- patch_opts = patch.dict(self.lauth.opts, {"eauth_tokens": "testfs"})
- patch_get_token = patch.dict(
- self.lauth.tokens, {"testfs.get_token": fake_get_token},
- )
- mock_rm_token = MagicMock()
- patch_rm_token = patch.object(self.lauth, "rm_token", mock_rm_token)
- with patch_opts, patch_get_token, patch_rm_token:
- expected_token = "fnord"
- self.lauth.get_tok(expected_token)
- mock_rm_token.assert_called_with(expected_token)
- def test_get_tok_with_valid_expiration_should_return_token(self):
- expected_token = {"expire": time.time() + 1}
- fake_get_token = MagicMock(return_value=expected_token)
- patch_opts = patch.dict(self.lauth.opts, {"eauth_tokens": "testfs"})
- patch_get_token = patch.dict(
- self.lauth.tokens, {"testfs.get_token": fake_get_token},
- )
- mock_rm_token = MagicMock()
- patch_rm_token = patch.object(self.lauth, "rm_token", mock_rm_token)
- with patch_opts, patch_get_token, patch_rm_token:
- token_name = "fnord"
- actual_token = self.lauth.get_tok(token_name)
- mock_rm_token.assert_not_called()
- assert expected_token is actual_token, "Token was not returned"
- def test_load_name(self):
- valid_eauth_load = {
- "username": "test_user",
- "show_timeout": False,
- "test_password": "",
- "eauth": "pam",
- }
- # Test a case where the loader auth doesn't have the auth type
- without_auth_type = dict(valid_eauth_load)
- without_auth_type.pop("eauth")
- ret = self.lauth.load_name(without_auth_type)
- self.assertEqual(
- ret, "", "Did not bail when the auth loader didn't have the auth type."
- )
- # Test a case with valid params
- with patch(
- "salt.utils.args.arg_lookup",
- MagicMock(return_value={"args": ["username", "password"]}),
- ) as format_call_mock:
- expected_ret = call("fake_func_str")
- ret = self.lauth.load_name(valid_eauth_load)
- format_call_mock.assert_has_calls((expected_ret,), any_order=True)
- self.assertEqual(ret, "test_user")
- def test_get_groups(self):
- valid_eauth_load = {
- "username": "test_user",
- "show_timeout": False,
- "test_password": "",
- "eauth": "pam",
- }
- with patch("salt.utils.args.format_call") as format_call_mock:
- expected_ret = call(
- "fake_groups_function_str",
- {
- "username": "test_user",
- "test_password": "",
- "show_timeout": False,
- "eauth": "pam",
- },
- expected_extra_kws=auth.AUTH_INTERNAL_KEYWORDS,
- )
- self.lauth.get_groups(valid_eauth_load)
- format_call_mock.assert_has_calls((expected_ret,), any_order=True)
- class MasterACLTestCase(ModuleCase):
- """
- A class to check various aspects of the publisher ACL system
- """
- def setUp(self):
- self.fire_event_mock = MagicMock(return_value="dummy_tag")
- self.addCleanup(delattr, self, "fire_event_mock")
- opts = self.get_temp_config("master")
- patches = (
- ("zmq.Context", MagicMock()),
- ("salt.payload.Serial.dumps", MagicMock()),
- ("salt.master.tagify", MagicMock()),
- ("salt.utils.event.SaltEvent.fire_event", self.fire_event_mock),
- ("salt.auth.LoadAuth.time_auth", MagicMock(return_value=True)),
- ("salt.minion.MasterMinion", MagicMock()),
- ("salt.utils.verify.check_path_traversal", MagicMock()),
- ("salt.client.get_local_client", MagicMock(return_value=opts["conf_file"])),
- )
- for mod, mock in patches:
- patcher = patch(mod, mock)
- patcher.start()
- self.addCleanup(patcher.stop)
- opts["publisher_acl"] = {}
- opts["publisher_acl_blacklist"] = {}
- opts["master_job_cache"] = ""
- opts["sign_pub_messages"] = False
- opts["con_cache"] = ""
- opts["external_auth"] = {}
- opts["external_auth"]["pam"] = {
- "test_user": [
- {"*": ["test.ping"]},
- {"minion_glob*": ["foo.bar"]},
- {"minion_func_test": ["func_test.*"]},
- ],
- "test_group%": [{"*": ["test.echo"]}],
- "test_user_mminion": [{"target_minion": ["test.ping"]}],
- "*": [{"my_minion": ["my_mod.my_func"]}],
- "test_user_func": [
- {
- "*": [
- {"test.echo": {"args": ["MSG:.*"]}},
- {
- "test.echo": {
- "kwargs": {
- "text": "KWMSG:.*",
- "anything": ".*",
- "none": None,
- }
- }
- },
- {
- "my_mod.*": {
- "args": ["a.*", "b.*"],
- "kwargs": {"kwa": "kwa.*", "kwb": "kwb"},
- }
- },
- ]
- },
- {
- "minion1": [
- {"test.echo": {"args": ["TEST", None, "TEST.*"]}},
- {"test.empty": {}},
- ]
- },
- ],
- }
- self.clear = salt.master.ClearFuncs(opts, MagicMock())
- self.addCleanup(delattr, self, "clear")
- # overwrite the _send_pub method so we don't have to serialize MagicMock
- self.clear._send_pub = lambda payload: True
- # make sure to return a JID, instead of a mock
- self.clear.mminion.returners = {".prep_jid": lambda x: 1}
- self.valid_clear_load = {
- "tgt_type": "glob",
- "jid": "",
- "cmd": "publish",
- "tgt": "test_minion",
- "kwargs": {
- "username": "test_user",
- "password": "test_password",
- "show_timeout": False,
- "eauth": "pam",
- "show_jid": False,
- },
- "ret": "",
- "user": "test_user",
- "key": "",
- "arg": "",
- "fun": "test.ping",
- }
- self.addCleanup(delattr, self, "valid_clear_load")
- @skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_master_publish_name(self):
- """
- Test to ensure a simple name can auth against a given function.
- This tests to ensure test_user can access test.ping but *not* sys.doc
- """
- _check_minions_return = {"minions": ["some_minions"], "missing": []}
- with patch(
- "salt.utils.minions.CkMinions.check_minions",
- MagicMock(return_value=_check_minions_return),
- ):
- # Can we access test.ping?
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.call_args[0][0]["fun"], "test.ping")
- # Are we denied access to sys.doc?
- sys_doc_load = self.valid_clear_load
- sys_doc_load["fun"] = "sys.doc"
- self.clear.publish(sys_doc_load)
- self.assertNotEqual(
- self.fire_event_mock.call_args[0][0]["fun"], "sys.doc"
- ) # If sys.doc were to fire, this would match
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_master_publish_group(self):
- """
- Tests to ensure test_group can access test.echo but *not* sys.doc
- """
- _check_minions_return = {"minions": ["some_minions"], "missing": []}
- with patch(
- "salt.utils.minions.CkMinions.check_minions",
- MagicMock(return_value=_check_minions_return),
- ):
- self.valid_clear_load["kwargs"]["user"] = "new_user"
- self.valid_clear_load["fun"] = "test.echo"
- self.valid_clear_load["arg"] = "hello"
- with patch(
- "salt.auth.LoadAuth.get_groups",
- return_value=["test_group", "second_test_group"],
- ):
- self.clear.publish(self.valid_clear_load)
- # Did we fire test.echo?
- self.assertEqual(self.fire_event_mock.call_args[0][0]["fun"], "test.echo")
- # Request sys.doc
- self.valid_clear_load["fun"] = "sys.doc"
- # Did we fire it?
- self.assertNotEqual(self.fire_event_mock.call_args[0][0]["fun"], "sys.doc")
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_master_publish_some_minions(self):
- """
- Tests to ensure we can only target minions for which we
- have permission with publisher acl.
- Note that in order for these sorts of tests to run correctly that
- you should NOT patch check_minions!
- """
- self.valid_clear_load["kwargs"]["username"] = "test_user_mminion"
- self.valid_clear_load["user"] = "test_user_mminion"
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_master_not_user_glob_all(self):
- """
- Test to ensure that we DO NOT access to a given
- function to all users with publisher acl. ex:
- '*':
- my_minion:
- - my_func
- Yes, this seems like a bit of a no-op test but it's
- here to document that this functionality
- is NOT supported currently.
- WARNING: Do not patch this wit
- """
- self.valid_clear_load["kwargs"]["username"] = "NOT_A_VALID_USERNAME"
- self.valid_clear_load["user"] = "NOT_A_VALID_USERNAME"
- self.valid_clear_load["fun"] = "test.ping"
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- @skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_master_minion_glob(self):
- """
- Test to ensure we can allow access to a given
- function for a user to a subset of minions
- selected by a glob. ex:
- test_user:
- 'minion_glob*':
- - glob_mod.glob_func
- This test is a bit tricky, because ultimately the real functionality
- lies in what's returned from check_minions, but this checks a limited
- amount of logic on the way there as well. Note the inline patch.
- """
- requested_function = "foo.bar"
- requested_tgt = "minion_glob1"
- self.valid_clear_load["tgt"] = requested_tgt
- self.valid_clear_load["fun"] = requested_function
- _check_minions_return = {"minions": ["minion_glob1"], "missing": []}
- with patch(
- "salt.utils.minions.CkMinions.check_minions",
- MagicMock(return_value=_check_minions_return),
- ): # Assume that there is a listening minion match
- self.clear.publish(self.valid_clear_load)
- self.assertTrue(
- self.fire_event_mock.called,
- "Did not fire {0} for minion tgt {1}".format(
- requested_function, requested_tgt
- ),
- )
- self.assertEqual(
- self.fire_event_mock.call_args[0][0]["fun"],
- requested_function,
- "Did not fire {0} for minion glob".format(requested_function),
- )
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_master_function_glob(self):
- """
- Test to ensure that we can allow access to a given
- set of functions in an execution module as selected
- by a glob. ex:
- my_user:
- my_minion:
- 'test.*'
- """
- # Unimplemented
- @skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_args_empty_spec(self):
- """
- Test simple arg restriction allowed.
- 'test_user_func':
- minion1:
- - test.empty:
- """
- _check_minions_return = {"minions": ["minion1"], "missing": []}
- with patch(
- "salt.utils.minions.CkMinions.check_minions",
- MagicMock(return_value=_check_minions_return),
- ):
- self.valid_clear_load["kwargs"].update({"username": "test_user_func"})
- self.valid_clear_load.update(
- {
- "user": "test_user_func",
- "tgt": "minion1",
- "fun": "test.empty",
- "arg": ["TEST"],
- }
- )
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.call_args[0][0]["fun"], "test.empty")
- @skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_args_simple_match(self):
- """
- Test simple arg restriction allowed.
- 'test_user_func':
- minion1:
- - test.echo:
- args:
- - 'TEST'
- - 'TEST.*'
- """
- _check_minions_return = {"minions": ["minion1"], "missing": []}
- with patch(
- "salt.utils.minions.CkMinions.check_minions",
- MagicMock(return_value=_check_minions_return),
- ):
- self.valid_clear_load["kwargs"].update({"username": "test_user_func"})
- self.valid_clear_load.update(
- {
- "user": "test_user_func",
- "tgt": "minion1",
- "fun": "test.echo",
- "arg": ["TEST", "any", "TEST ABC"],
- }
- )
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.call_args[0][0]["fun"], "test.echo")
- @skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_args_more_args(self):
- """
- Test simple arg restriction allowed to pass unlisted args.
- 'test_user_func':
- minion1:
- - test.echo:
- args:
- - 'TEST'
- - 'TEST.*'
- """
- _check_minions_return = {"minions": ["minion1"], "missing": []}
- with patch(
- "salt.utils.minions.CkMinions.check_minions",
- MagicMock(return_value=_check_minions_return),
- ):
- self.valid_clear_load["kwargs"].update({"username": "test_user_func"})
- self.valid_clear_load.update(
- {
- "user": "test_user_func",
- "tgt": "minion1",
- "fun": "test.echo",
- "arg": [
- "TEST",
- "any",
- "TEST ABC",
- "arg 3",
- {"kwarg1": "val1", "__kwarg__": True},
- ],
- }
- )
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.call_args[0][0]["fun"], "test.echo")
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_args_simple_forbidden(self):
- """
- Test simple arg restriction forbidden.
- 'test_user_func':
- minion1:
- - test.echo:
- args:
- - 'TEST'
- - 'TEST.*'
- """
- _check_minions_return = {"minions": ["minion1"], "missing": []}
- with patch(
- "salt.utils.minions.CkMinions.check_minions",
- MagicMock(return_value=_check_minions_return),
- ):
- self.valid_clear_load["kwargs"].update({"username": "test_user_func"})
- # Wrong last arg
- self.valid_clear_load.update(
- {
- "user": "test_user_func",
- "tgt": "minion1",
- "fun": "test.echo",
- "arg": ["TEST", "any", "TESLA"],
- }
- )
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Wrong first arg
- self.valid_clear_load["arg"] = ["TES", "any", "TEST1234"]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Missing the last arg
- self.valid_clear_load["arg"] = ["TEST", "any"]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # No args
- self.valid_clear_load["arg"] = []
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- @skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_args_kwargs_match(self):
- """
- Test simple kwargs restriction allowed.
- 'test_user_func':
- '*':
- - test.echo:
- kwargs:
- text: 'KWMSG:.*'
- """
- _check_minions_return = {"minions": ["some_minions"], "missing": []}
- with patch(
- "salt.utils.minions.CkMinions.check_minions",
- MagicMock(return_value=_check_minions_return),
- ):
- self.valid_clear_load["kwargs"].update({"username": "test_user_func"})
- self.valid_clear_load.update(
- {
- "user": "test_user_func",
- "tgt": "*",
- "fun": "test.echo",
- "arg": [
- {
- "text": "KWMSG: a message",
- "anything": "hello all",
- "none": "hello none",
- "__kwarg__": True,
- }
- ],
- }
- )
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.call_args[0][0]["fun"], "test.echo")
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_args_kwargs_mismatch(self):
- """
- Test simple kwargs restriction allowed.
- 'test_user_func':
- '*':
- - test.echo:
- kwargs:
- text: 'KWMSG:.*'
- """
- _check_minions_return = {"minions": ["some_minions"], "missing": []}
- with patch(
- "salt.utils.minions.CkMinions.check_minions",
- MagicMock(return_value=_check_minions_return),
- ):
- self.valid_clear_load["kwargs"].update({"username": "test_user_func"})
- self.valid_clear_load.update(
- {"user": "test_user_func", "tgt": "*", "fun": "test.echo"}
- )
- # Wrong kwarg value
- self.valid_clear_load["arg"] = [
- {
- "text": "KWMSG a message",
- "anything": "hello all",
- "none": "hello none",
- "__kwarg__": True,
- }
- ]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Missing kwarg value
- self.valid_clear_load["arg"] = [
- {"anything": "hello all", "none": "hello none", "__kwarg__": True}
- ]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- self.valid_clear_load["arg"] = [{"__kwarg__": True}]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- self.valid_clear_load["arg"] = [{}]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- self.valid_clear_load["arg"] = []
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Missing kwarg allowing any value
- self.valid_clear_load["arg"] = [
- {"text": "KWMSG: a message", "none": "hello none", "__kwarg__": True}
- ]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- self.valid_clear_load["arg"] = [
- {"text": "KWMSG: a message", "anything": "hello all", "__kwarg__": True}
- ]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- @skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_args_mixed_match(self):
- """
- Test mixed args and kwargs restriction allowed.
- 'test_user_func':
- '*':
- - 'my_mod.*':
- args:
- - 'a.*'
- - 'b.*'
- kwargs:
- 'kwa': 'kwa.*'
- 'kwb': 'kwb'
- """
- _check_minions_return = {"minions": ["some_minions"], "missing": []}
- with patch(
- "salt.utils.minions.CkMinions.check_minions",
- MagicMock(return_value=_check_minions_return),
- ):
- self.valid_clear_load["kwargs"].update({"username": "test_user_func"})
- self.valid_clear_load.update(
- {
- "user": "test_user_func",
- "tgt": "*",
- "fun": "my_mod.some_func",
- "arg": [
- "alpha",
- "beta",
- "gamma",
- {
- "kwa": "kwarg #1",
- "kwb": "kwb",
- "one_more": "just one more",
- "__kwarg__": True,
- },
- ],
- }
- )
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(
- self.fire_event_mock.call_args[0][0]["fun"], "my_mod.some_func"
- )
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_args_mixed_mismatch(self):
- """
- Test mixed args and kwargs restriction forbidden.
- 'test_user_func':
- '*':
- - 'my_mod.*':
- args:
- - 'a.*'
- - 'b.*'
- kwargs:
- 'kwa': 'kwa.*'
- 'kwb': 'kwb'
- """
- _check_minions_return = {"minions": ["some_minions"], "missing": []}
- with patch(
- "salt.utils.minions.CkMinions.check_minions",
- MagicMock(return_value=_check_minions_return),
- ):
- self.valid_clear_load["kwargs"].update({"username": "test_user_func"})
- self.valid_clear_load.update(
- {"user": "test_user_func", "tgt": "*", "fun": "my_mod.some_func"}
- )
- # Wrong arg value
- self.valid_clear_load["arg"] = [
- "alpha",
- "gamma",
- {
- "kwa": "kwarg #1",
- "kwb": "kwb",
- "one_more": "just one more",
- "__kwarg__": True,
- },
- ]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Wrong kwarg value
- self.valid_clear_load["arg"] = [
- "alpha",
- "beta",
- "gamma",
- {
- "kwa": "kkk",
- "kwb": "kwb",
- "one_more": "just one more",
- "__kwarg__": True,
- },
- ]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Missing arg
- self.valid_clear_load["arg"] = [
- "alpha",
- {
- "kwa": "kwarg #1",
- "kwb": "kwb",
- "one_more": "just one more",
- "__kwarg__": True,
- },
- ]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Missing kwarg
- self.valid_clear_load["arg"] = [
- "alpha",
- "beta",
- "gamma",
- {"kwa": "kwarg #1", "one_more": "just one more", "__kwarg__": True},
- ]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- class AuthACLTestCase(ModuleCase):
- """
- A class to check various aspects of the publisher ACL system
- """
- def setUp(self):
- self.auth_check_mock = MagicMock(return_value=True)
- opts = self.get_temp_config("master")
- patches = (
- ("salt.minion.MasterMinion", MagicMock()),
- ("salt.utils.verify.check_path_traversal", MagicMock()),
- ("salt.utils.minions.CkMinions.auth_check", self.auth_check_mock),
- ("salt.auth.LoadAuth.time_auth", MagicMock(return_value=True)),
- ("salt.client.get_local_client", MagicMock(return_value=opts["conf_file"])),
- )
- for mod, mock in patches:
- patcher = patch(mod, mock)
- patcher.start()
- self.addCleanup(patcher.stop)
- self.addCleanup(delattr, self, "auth_check_mock")
- opts["publisher_acl"] = {}
- opts["publisher_acl_blacklist"] = {}
- opts["master_job_cache"] = ""
- opts["sign_pub_messages"] = False
- opts["con_cache"] = ""
- opts["external_auth"] = {}
- opts["external_auth"]["pam"] = {"test_user": [{"alpha_minion": ["test.ping"]}]}
- self.clear = salt.master.ClearFuncs(opts, MagicMock())
- self.addCleanup(delattr, self, "clear")
- # overwrite the _send_pub method so we don't have to serialize MagicMock
- self.clear._send_pub = lambda payload: True
- # make sure to return a JID, instead of a mock
- self.clear.mminion.returners = {".prep_jid": lambda x: 1}
- self.valid_clear_load = {
- "tgt_type": "glob",
- "jid": "",
- "cmd": "publish",
- "tgt": "test_minion",
- "kwargs": {
- "username": "test_user",
- "password": "test_password",
- "show_timeout": False,
- "eauth": "pam",
- "show_jid": False,
- },
- "ret": "",
- "user": "test_user",
- "key": "",
- "arg": "",
- "fun": "test.ping",
- }
- self.addCleanup(delattr, self, "valid_clear_load")
- @skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_acl_simple_allow(self):
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(
- self.auth_check_mock.call_args[0][0], [{"alpha_minion": ["test.ping"]}]
- )
- @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
- def test_acl_simple_deny(self):
- with patch(
- "salt.auth.LoadAuth.get_auth_list",
- MagicMock(return_value=[{"beta_minion": ["test.ping"]}]),
- ):
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(
- self.auth_check_mock.call_args[0][0], [{"beta_minion": ["test.ping"]}]
- )
|