123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538 |
- # -*- coding: utf-8 -*-
- # Import Python libs
- from __future__ import absolute_import
- import pytest
- # Import Salt libs
- import salt.config
- import salt.master
- from tests.support.mock import MagicMock, patch
- # Import Salt Testing Libs
- from tests.support.unit import TestCase
- class ClearFuncsTestCase(TestCase):
- """
- TestCase for salt.master.ClearFuncs class
- """
- @classmethod
- def setUpClass(cls):
- opts = salt.config.master_config(None)
- cls.clear_funcs = salt.master.ClearFuncs(opts, {})
- @classmethod
- def tearDownClass(cls):
- del cls.clear_funcs
- # runner tests
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_runner_token_not_authenticated(self):
- """
- Asserts that a TokenAuthenticationError is returned when the token can't authenticate.
- """
- mock_ret = {
- "error": {
- "name": "TokenAuthenticationError",
- "message": 'Authentication failure of type "token" occurred.',
- }
- }
- ret = self.clear_funcs.runner({"token": "asdfasdfasdfasdf"})
- self.assertDictEqual(mock_ret, ret)
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_runner_token_authorization_error(self):
- """
- Asserts that a TokenAuthenticationError is returned when the token authenticates, but is
- not authorized.
- """
- token = "asdfasdfasdfasdf"
- clear_load = {"token": token, "fun": "test.arg"}
- mock_token = {"token": token, "eauth": "foo", "name": "test"}
- mock_ret = {
- "error": {
- "name": "TokenAuthenticationError",
- "message": 'Authentication failure of type "token" occurred '
- "for user test.",
- }
- }
- with patch(
- "salt.auth.LoadAuth.authenticate_token", MagicMock(return_value=mock_token)
- ), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=[])):
- ret = self.clear_funcs.runner(clear_load)
- self.assertDictEqual(mock_ret, ret)
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_runner_token_salt_invocation_error(self):
- """
- Asserts that a SaltInvocationError is returned when the token authenticates, but the
- command is malformed.
- """
- token = "asdfasdfasdfasdf"
- clear_load = {"token": token, "fun": "badtestarg"}
- mock_token = {"token": token, "eauth": "foo", "name": "test"}
- mock_ret = {
- "error": {
- "name": "SaltInvocationError",
- "message": "A command invocation error occurred: Check syntax.",
- }
- }
- with patch(
- "salt.auth.LoadAuth.authenticate_token", MagicMock(return_value=mock_token)
- ), patch(
- "salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])
- ):
- ret = self.clear_funcs.runner(clear_load)
- self.assertDictEqual(mock_ret, ret)
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_runner_eauth_not_authenticated(self):
- """
- Asserts that an EauthAuthenticationError is returned when the user can't authenticate.
- """
- mock_ret = {
- "error": {
- "name": "EauthAuthenticationError",
- "message": 'Authentication failure of type "eauth" occurred for '
- "user UNKNOWN.",
- }
- }
- ret = self.clear_funcs.runner({"eauth": "foo"})
- self.assertDictEqual(mock_ret, ret)
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_runner_eauth_authorization_error(self):
- """
- Asserts that an EauthAuthenticationError is returned when the user authenticates, but is
- not authorized.
- """
- clear_load = {"eauth": "foo", "username": "test", "fun": "test.arg"}
- mock_ret = {
- "error": {
- "name": "EauthAuthenticationError",
- "message": 'Authentication failure of type "eauth" occurred for '
- "user test.",
- }
- }
- with patch(
- "salt.auth.LoadAuth.authenticate_eauth", MagicMock(return_value=True)
- ), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=[])):
- ret = self.clear_funcs.runner(clear_load)
- self.assertDictEqual(mock_ret, ret)
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_runner_eauth_salt_invocation_error(self):
- """
- Asserts that an EauthAuthenticationError is returned when the user authenticates, but the
- command is malformed.
- """
- clear_load = {"eauth": "foo", "username": "test", "fun": "bad.test.arg.func"}
- mock_ret = {
- "error": {
- "name": "SaltInvocationError",
- "message": "A command invocation error occurred: Check syntax.",
- }
- }
- with patch(
- "salt.auth.LoadAuth.authenticate_eauth", MagicMock(return_value=True)
- ), patch(
- "salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])
- ):
- ret = self.clear_funcs.runner(clear_load)
- self.assertDictEqual(mock_ret, ret)
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_runner_user_not_authenticated(self):
- """
- Asserts that an UserAuthenticationError is returned when the user can't authenticate.
- """
- mock_ret = {
- "error": {
- "name": "UserAuthenticationError",
- "message": 'Authentication failure of type "user" occurred',
- }
- }
- ret = self.clear_funcs.runner({})
- self.assertDictEqual(mock_ret, ret)
- # wheel tests
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_wheel_token_not_authenticated(self):
- """
- Asserts that a TokenAuthenticationError is returned when the token can't authenticate.
- """
- mock_ret = {
- "error": {
- "name": "TokenAuthenticationError",
- "message": 'Authentication failure of type "token" occurred.',
- }
- }
- ret = self.clear_funcs.wheel({"token": "asdfasdfasdfasdf"})
- self.assertDictEqual(mock_ret, ret)
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_wheel_token_authorization_error(self):
- """
- Asserts that a TokenAuthenticationError is returned when the token authenticates, but is
- not authorized.
- """
- token = "asdfasdfasdfasdf"
- clear_load = {"token": token, "fun": "test.arg"}
- mock_token = {"token": token, "eauth": "foo", "name": "test"}
- mock_ret = {
- "error": {
- "name": "TokenAuthenticationError",
- "message": 'Authentication failure of type "token" occurred '
- "for user test.",
- }
- }
- with patch(
- "salt.auth.LoadAuth.authenticate_token", MagicMock(return_value=mock_token)
- ), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=[])):
- ret = self.clear_funcs.wheel(clear_load)
- self.assertDictEqual(mock_ret, ret)
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_wheel_token_salt_invocation_error(self):
- """
- Asserts that a SaltInvocationError is returned when the token authenticates, but the
- command is malformed.
- """
- token = "asdfasdfasdfasdf"
- clear_load = {"token": token, "fun": "badtestarg"}
- mock_token = {"token": token, "eauth": "foo", "name": "test"}
- mock_ret = {
- "error": {
- "name": "SaltInvocationError",
- "message": "A command invocation error occurred: Check syntax.",
- }
- }
- with patch(
- "salt.auth.LoadAuth.authenticate_token", MagicMock(return_value=mock_token)
- ), patch(
- "salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])
- ):
- ret = self.clear_funcs.wheel(clear_load)
- self.assertDictEqual(mock_ret, ret)
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_wheel_eauth_not_authenticated(self):
- """
- Asserts that an EauthAuthenticationError is returned when the user can't authenticate.
- """
- mock_ret = {
- "error": {
- "name": "EauthAuthenticationError",
- "message": 'Authentication failure of type "eauth" occurred for '
- "user UNKNOWN.",
- }
- }
- ret = self.clear_funcs.wheel({"eauth": "foo"})
- self.assertDictEqual(mock_ret, ret)
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_wheel_eauth_authorization_error(self):
- """
- Asserts that an EauthAuthenticationError is returned when the user authenticates, but is
- not authorized.
- """
- clear_load = {"eauth": "foo", "username": "test", "fun": "test.arg"}
- mock_ret = {
- "error": {
- "name": "EauthAuthenticationError",
- "message": 'Authentication failure of type "eauth" occurred for '
- "user test.",
- }
- }
- with patch(
- "salt.auth.LoadAuth.authenticate_eauth", MagicMock(return_value=True)
- ), patch("salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=[])):
- ret = self.clear_funcs.wheel(clear_load)
- self.assertDictEqual(mock_ret, ret)
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_wheel_eauth_salt_invocation_error(self):
- """
- Asserts that an EauthAuthenticationError is returned when the user authenticates, but the
- command is malformed.
- """
- clear_load = {"eauth": "foo", "username": "test", "fun": "bad.test.arg.func"}
- mock_ret = {
- "error": {
- "name": "SaltInvocationError",
- "message": "A command invocation error occurred: Check syntax.",
- }
- }
- with patch(
- "salt.auth.LoadAuth.authenticate_eauth", MagicMock(return_value=True)
- ), patch(
- "salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=["testing"])
- ):
- ret = self.clear_funcs.wheel(clear_load)
- self.assertDictEqual(mock_ret, ret)
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_wheel_user_not_authenticated(self):
- """
- Asserts that an UserAuthenticationError is returned when the user can't authenticate.
- """
- mock_ret = {
- "error": {
- "name": "UserAuthenticationError",
- "message": 'Authentication failure of type "user" occurred',
- }
- }
- ret = self.clear_funcs.wheel({})
- self.assertDictEqual(mock_ret, ret)
- # publish tests
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_publish_user_is_blacklisted(self):
- """
- Asserts that an AuthorizationError is returned when the user has been blacklisted.
- """
- mock_ret = {
- "error": {
- "name": "AuthorizationError",
- "message": "Authorization error occurred.",
- }
- }
- with patch(
- "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=True)
- ):
- self.assertEqual(
- mock_ret, self.clear_funcs.publish({"user": "foo", "fun": "test.arg"})
- )
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_publish_cmd_blacklisted(self):
- """
- Asserts that an AuthorizationError is returned when the command has been blacklisted.
- """
- mock_ret = {
- "error": {
- "name": "AuthorizationError",
- "message": "Authorization error occurred.",
- }
- }
- with patch(
- "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False)
- ), patch(
- "salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=True)
- ):
- self.assertEqual(
- mock_ret, self.clear_funcs.publish({"user": "foo", "fun": "test.arg"})
- )
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_publish_token_not_authenticated(self):
- """
- Asserts that an AuthenticationError is returned when the token can't authenticate.
- """
- mock_ret = {
- "error": {
- "name": "AuthenticationError",
- "message": "Authentication error occurred.",
- }
- }
- load = {
- "user": "foo",
- "fun": "test.arg",
- "tgt": "test_minion",
- "kwargs": {"token": "asdfasdfasdfasdf"},
- }
- with patch(
- "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False)
- ), patch(
- "salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)
- ):
- self.assertEqual(mock_ret, self.clear_funcs.publish(load))
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_publish_token_authorization_error(self):
- """
- Asserts that an AuthorizationError is returned when the token authenticates, but is not
- authorized.
- """
- token = "asdfasdfasdfasdf"
- load = {
- "user": "foo",
- "fun": "test.arg",
- "tgt": "test_minion",
- "arg": "bar",
- "kwargs": {"token": token},
- }
- mock_token = {"token": token, "eauth": "foo", "name": "test"}
- mock_ret = {
- "error": {
- "name": "AuthorizationError",
- "message": "Authorization error occurred.",
- }
- }
- with patch(
- "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False)
- ), patch(
- "salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)
- ), patch(
- "salt.auth.LoadAuth.authenticate_token", MagicMock(return_value=mock_token)
- ), patch(
- "salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=[])
- ):
- self.assertEqual(mock_ret, self.clear_funcs.publish(load))
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_publish_eauth_not_authenticated(self):
- """
- Asserts that an AuthenticationError is returned when the user can't authenticate.
- """
- load = {
- "user": "test",
- "fun": "test.arg",
- "tgt": "test_minion",
- "kwargs": {"eauth": "foo"},
- }
- mock_ret = {
- "error": {
- "name": "AuthenticationError",
- "message": "Authentication error occurred.",
- }
- }
- with patch(
- "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False)
- ), patch(
- "salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)
- ):
- self.assertEqual(mock_ret, self.clear_funcs.publish(load))
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_publish_eauth_authorization_error(self):
- """
- Asserts that an AuthorizationError is returned when the user authenticates, but is not
- authorized.
- """
- load = {
- "user": "test",
- "fun": "test.arg",
- "tgt": "test_minion",
- "kwargs": {"eauth": "foo"},
- "arg": "bar",
- }
- mock_ret = {
- "error": {
- "name": "AuthorizationError",
- "message": "Authorization error occurred.",
- }
- }
- with patch(
- "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False)
- ), patch(
- "salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)
- ), patch(
- "salt.auth.LoadAuth.authenticate_eauth", MagicMock(return_value=True)
- ), patch(
- "salt.auth.LoadAuth.get_auth_list", MagicMock(return_value=[])
- ):
- self.assertEqual(mock_ret, self.clear_funcs.publish(load))
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_publish_user_not_authenticated(self):
- """
- Asserts that an AuthenticationError is returned when the user can't authenticate.
- """
- load = {"user": "test", "fun": "test.arg", "tgt": "test_minion"}
- mock_ret = {
- "error": {
- "name": "AuthenticationError",
- "message": "Authentication error occurred.",
- }
- }
- with patch(
- "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False)
- ), patch(
- "salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)
- ):
- self.assertEqual(mock_ret, self.clear_funcs.publish(load))
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_publish_user_authenticated_missing_auth_list(self):
- """
- Asserts that an AuthenticationError is returned when the user has an effective user id and is
- authenticated, but the auth_list is empty.
- """
- load = {
- "user": "test",
- "fun": "test.arg",
- "tgt": "test_minion",
- "kwargs": {"user": "test"},
- "arg": "foo",
- }
- mock_ret = {
- "error": {
- "name": "AuthenticationError",
- "message": "Authentication error occurred.",
- }
- }
- with patch(
- "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False)
- ), patch(
- "salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)
- ), patch(
- "salt.auth.LoadAuth.authenticate_key",
- MagicMock(return_value="fake-user-key"),
- ), patch(
- "salt.utils.master.get_values_of_matching_keys", MagicMock(return_value=[])
- ):
- self.assertEqual(mock_ret, self.clear_funcs.publish(load))
- @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
- def test_publish_user_authorization_error(self):
- """
- Asserts that an AuthorizationError is returned when the user authenticates, but is not
- authorized.
- """
- load = {
- "user": "test",
- "fun": "test.arg",
- "tgt": "test_minion",
- "kwargs": {"user": "test"},
- "arg": "foo",
- }
- mock_ret = {
- "error": {
- "name": "AuthorizationError",
- "message": "Authorization error occurred.",
- }
- }
- with patch(
- "salt.acl.PublisherACL.user_is_blacklisted", MagicMock(return_value=False)
- ), patch(
- "salt.acl.PublisherACL.cmd_is_blacklisted", MagicMock(return_value=False)
- ), patch(
- "salt.auth.LoadAuth.authenticate_key",
- MagicMock(return_value="fake-user-key"),
- ), patch(
- "salt.utils.master.get_values_of_matching_keys",
- MagicMock(return_value=["test"]),
- ), patch(
- "salt.utils.minions.CkMinions.auth_check", MagicMock(return_value=False)
- ):
- self.assertEqual(mock_ret, self.clear_funcs.publish(load))
|