123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664 |
- # -*- coding: utf-8 -*-
- '''
- :codeauthor: Mike Place <mp@saltstack.com>
- '''
- # Import pytohn libs
- from __future__ import absolute_import, print_function, unicode_literals
- import time
- # Import Salt Testing libs
- from tests.support.unit import TestCase, skipIf
- from tests.support.mock import patch, call, NO_MOCK, NO_MOCK_REASON, MagicMock
- # Import Salt libraries
- import salt.master
- from tests.support.case import ModuleCase
- from salt import auth
- from salt.exceptions import SaltDeserializationError
- import salt.utils.platform
- @skipIf(NO_MOCK, NO_MOCK_REASON)
- class LoadAuthTestCase(TestCase):
- def setUp(self): # pylint: disable=W0221
- patches = (
- ('salt.payload.Serial', None),
- ('salt.loader.auth', dict(return_value={'pam.auth': 'fake_func_str', 'pam.groups': 'fake_groups_function_str'})),
- ('salt.loader.eauth_tokens', dict(return_value={'localfs.mk_token': 'fake_func_mktok',
- 'localfs.get_token': 'fake_func_gettok',
- 'localfs.rm_roken': 'fake_func_rmtok'}))
- )
- for mod, mock in patches:
- if mock:
- patcher = patch(mod, **mock)
- else:
- patcher = patch(mod)
- patcher.start()
- self.addCleanup(patcher.stop)
- self.lauth = auth.LoadAuth({}) # Load with empty opts
- def test_get_tok_with_broken_file_will_remove_bad_token(self):
- fake_get_token = MagicMock(side_effect=SaltDeserializationError('hi'))
- patch_opts = patch.dict(self.lauth.opts, {'eauth_tokens': 'testfs'})
- patch_get_token = patch.dict(
- self.lauth.tokens,
- {
- 'testfs.get_token': fake_get_token
- },
- )
- mock_rm_token = MagicMock()
- patch_rm_token = patch.object(self.lauth, 'rm_token', mock_rm_token)
- with patch_opts, patch_get_token, patch_rm_token:
- expected_token = 'fnord'
- self.lauth.get_tok(expected_token)
- mock_rm_token.assert_called_with(expected_token)
- def test_get_tok_with_no_expiration_should_remove_bad_token(self):
- fake_get_token = MagicMock(return_value={'no_expire_here': 'Nope'})
- patch_opts = patch.dict(self.lauth.opts, {'eauth_tokens': 'testfs'})
- patch_get_token = patch.dict(
- self.lauth.tokens,
- {
- 'testfs.get_token': fake_get_token
- },
- )
- mock_rm_token = MagicMock()
- patch_rm_token = patch.object(self.lauth, 'rm_token', mock_rm_token)
- with patch_opts, patch_get_token, patch_rm_token:
- expected_token = 'fnord'
- self.lauth.get_tok(expected_token)
- mock_rm_token.assert_called_with(expected_token)
- def test_get_tok_with_expire_before_current_time_should_remove_token(self):
- fake_get_token = MagicMock(return_value={'expire': time.time()-1})
- patch_opts = patch.dict(self.lauth.opts, {'eauth_tokens': 'testfs'})
- patch_get_token = patch.dict(
- self.lauth.tokens,
- {
- 'testfs.get_token': fake_get_token
- },
- )
- mock_rm_token = MagicMock()
- patch_rm_token = patch.object(self.lauth, 'rm_token', mock_rm_token)
- with patch_opts, patch_get_token, patch_rm_token:
- expected_token = 'fnord'
- self.lauth.get_tok(expected_token)
- mock_rm_token.assert_called_with(expected_token)
- def test_get_tok_with_valid_expiration_should_return_token(self):
- expected_token = {'expire': time.time()+1}
- fake_get_token = MagicMock(return_value=expected_token)
- patch_opts = patch.dict(self.lauth.opts, {'eauth_tokens': 'testfs'})
- patch_get_token = patch.dict(
- self.lauth.tokens,
- {
- 'testfs.get_token': fake_get_token
- },
- )
- mock_rm_token = MagicMock()
- patch_rm_token = patch.object(self.lauth, 'rm_token', mock_rm_token)
- with patch_opts, patch_get_token, patch_rm_token:
- token_name = 'fnord'
- actual_token = self.lauth.get_tok(token_name)
- mock_rm_token.assert_not_called()
- assert expected_token is actual_token, 'Token was not returned'
- def test_load_name(self):
- valid_eauth_load = {'username': 'test_user',
- 'show_timeout': False,
- 'test_password': '',
- 'eauth': 'pam'}
- # Test a case where the loader auth doesn't have the auth type
- without_auth_type = dict(valid_eauth_load)
- without_auth_type.pop('eauth')
- ret = self.lauth.load_name(without_auth_type)
- self.assertEqual(ret, '', "Did not bail when the auth loader didn't have the auth type.")
- # Test a case with valid params
- with patch('salt.utils.args.arg_lookup',
- MagicMock(return_value={'args': ['username', 'password']})) as format_call_mock:
- expected_ret = call('fake_func_str')
- ret = self.lauth.load_name(valid_eauth_load)
- format_call_mock.assert_has_calls((expected_ret,), any_order=True)
- self.assertEqual(ret, 'test_user')
- def test_get_groups(self):
- valid_eauth_load = {'username': 'test_user',
- 'show_timeout': False,
- 'test_password': '',
- 'eauth': 'pam'}
- with patch('salt.utils.args.format_call') as format_call_mock:
- expected_ret = call('fake_groups_function_str', {
- 'username': 'test_user',
- 'test_password': '',
- 'show_timeout': False,
- 'eauth': 'pam'
- }, expected_extra_kws=auth.AUTH_INTERNAL_KEYWORDS)
- self.lauth.get_groups(valid_eauth_load)
- format_call_mock.assert_has_calls((expected_ret,), any_order=True)
- class MasterACLTestCase(ModuleCase):
- '''
- A class to check various aspects of the publisher ACL system
- '''
- def setUp(self):
- self.fire_event_mock = MagicMock(return_value='dummy_tag')
- self.addCleanup(delattr, self, 'fire_event_mock')
- opts = self.get_temp_config('master')
- patches = (
- ('zmq.Context', MagicMock()),
- ('salt.payload.Serial.dumps', MagicMock()),
- ('salt.master.tagify', MagicMock()),
- ('salt.utils.event.SaltEvent.fire_event', self.fire_event_mock),
- ('salt.auth.LoadAuth.time_auth', MagicMock(return_value=True)),
- ('salt.minion.MasterMinion', MagicMock()),
- ('salt.utils.verify.check_path_traversal', MagicMock()),
- ('salt.client.get_local_client', MagicMock(return_value=opts['conf_file'])),
- )
- for mod, mock in patches:
- patcher = patch(mod, mock)
- patcher.start()
- self.addCleanup(patcher.stop)
- opts['publisher_acl'] = {}
- opts['publisher_acl_blacklist'] = {}
- opts['master_job_cache'] = ''
- opts['sign_pub_messages'] = False
- opts['con_cache'] = ''
- opts['external_auth'] = {}
- opts['external_auth']['pam'] = \
- {'test_user': [{'*': ['test.ping']},
- {'minion_glob*': ['foo.bar']},
- {'minion_func_test': ['func_test.*']}],
- 'test_group%': [{'*': ['test.echo']}],
- 'test_user_mminion': [{'target_minion': ['test.ping']}],
- '*': [{'my_minion': ['my_mod.my_func']}],
- 'test_user_func': [{'*': [{'test.echo': {'args': ['MSG:.*']}},
- {'test.echo': {'kwargs': {'text': 'KWMSG:.*',
- 'anything': '.*',
- 'none': None}}},
- {'my_mod.*': {'args': ['a.*', 'b.*'],
- 'kwargs': {'kwa': 'kwa.*',
- 'kwb': 'kwb'}}}]},
- {'minion1': [{'test.echo': {'args': ['TEST',
- None,
- 'TEST.*']}},
- {'test.empty': {}}]}
- ]
- }
- self.clear = salt.master.ClearFuncs(opts, MagicMock())
- self.addCleanup(delattr, self, 'clear')
- # overwrite the _send_pub method so we don't have to serialize MagicMock
- self.clear._send_pub = lambda payload: True
- # make sure to return a JID, instead of a mock
- self.clear.mminion.returners = {'.prep_jid': lambda x: 1}
- self.valid_clear_load = {'tgt_type': 'glob',
- 'jid': '',
- 'cmd': 'publish',
- 'tgt': 'test_minion',
- 'kwargs':
- {'username': 'test_user',
- 'password': 'test_password',
- 'show_timeout': False,
- 'eauth': 'pam',
- 'show_jid': False},
- 'ret': '',
- 'user': 'test_user',
- 'key': '',
- 'arg': '',
- 'fun': 'test.ping',
- }
- self.addCleanup(delattr, self, 'valid_clear_load')
- @skipIf(salt.utils.platform.is_windows(), 'PAM eauth not available on Windows')
- def test_master_publish_name(self):
- '''
- Test to ensure a simple name can auth against a given function.
- This tests to ensure test_user can access test.ping but *not* sys.doc
- '''
- _check_minions_return = {'minions': ['some_minions'], 'missing': []}
- with patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value=_check_minions_return)):
- # Can we access test.ping?
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.call_args[0][0]['fun'], 'test.ping')
- # Are we denied access to sys.doc?
- sys_doc_load = self.valid_clear_load
- sys_doc_load['fun'] = 'sys.doc'
- self.clear.publish(sys_doc_load)
- self.assertNotEqual(self.fire_event_mock.call_args[0][0]['fun'], 'sys.doc') # If sys.doc were to fire, this would match
- def test_master_publish_group(self):
- '''
- Tests to ensure test_group can access test.echo but *not* sys.doc
- '''
- _check_minions_return = {'minions': ['some_minions'], 'missing': []}
- with patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value=_check_minions_return)):
- self.valid_clear_load['kwargs']['user'] = 'new_user'
- self.valid_clear_load['fun'] = 'test.echo'
- self.valid_clear_load['arg'] = 'hello'
- with patch('salt.auth.LoadAuth.get_groups', return_value=['test_group', 'second_test_group']):
- self.clear.publish(self.valid_clear_load)
- # Did we fire test.echo?
- self.assertEqual(self.fire_event_mock.call_args[0][0]['fun'], 'test.echo')
- # Request sys.doc
- self.valid_clear_load['fun'] = 'sys.doc'
- # Did we fire it?
- self.assertNotEqual(self.fire_event_mock.call_args[0][0]['fun'], 'sys.doc')
- def test_master_publish_some_minions(self):
- '''
- Tests to ensure we can only target minions for which we
- have permission with publisher acl.
- Note that in order for these sorts of tests to run correctly that
- you should NOT patch check_minions!
- '''
- self.valid_clear_load['kwargs']['username'] = 'test_user_mminion'
- self.valid_clear_load['user'] = 'test_user_mminion'
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- def test_master_not_user_glob_all(self):
- '''
- Test to ensure that we DO NOT access to a given
- function to all users with publisher acl. ex:
- '*':
- my_minion:
- - my_func
- Yes, this seems like a bit of a no-op test but it's
- here to document that this functionality
- is NOT supported currently.
- WARNING: Do not patch this wit
- '''
- self.valid_clear_load['kwargs']['username'] = 'NOT_A_VALID_USERNAME'
- self.valid_clear_load['user'] = 'NOT_A_VALID_USERNAME'
- self.valid_clear_load['fun'] = 'test.ping'
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- @skipIf(salt.utils.platform.is_windows(), 'PAM eauth not available on Windows')
- def test_master_minion_glob(self):
- '''
- Test to ensure we can allow access to a given
- function for a user to a subset of minions
- selected by a glob. ex:
- test_user:
- 'minion_glob*':
- - glob_mod.glob_func
- This test is a bit tricky, because ultimately the real functionality
- lies in what's returned from check_minions, but this checks a limited
- amount of logic on the way there as well. Note the inline patch.
- '''
- requested_function = 'foo.bar'
- requested_tgt = 'minion_glob1'
- self.valid_clear_load['tgt'] = requested_tgt
- self.valid_clear_load['fun'] = requested_function
- _check_minions_return = {'minions': ['minion_glob1'], 'missing': []}
- with patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value=_check_minions_return)): # Assume that there is a listening minion match
- self.clear.publish(self.valid_clear_load)
- self.assertTrue(self.fire_event_mock.called, 'Did not fire {0} for minion tgt {1}'.format(requested_function, requested_tgt))
- self.assertEqual(self.fire_event_mock.call_args[0][0]['fun'], requested_function, 'Did not fire {0} for minion glob'.format(requested_function))
- def test_master_function_glob(self):
- '''
- Test to ensure that we can allow access to a given
- set of functions in an execution module as selected
- by a glob. ex:
- my_user:
- my_minion:
- 'test.*'
- '''
- # Unimplemented
- pass
- @skipIf(salt.utils.platform.is_windows(), 'PAM eauth not available on Windows')
- def test_args_empty_spec(self):
- '''
- Test simple arg restriction allowed.
- 'test_user_func':
- minion1:
- - test.empty:
- '''
- _check_minions_return = {'minions': ['minion1'], 'missing': []}
- with patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value=_check_minions_return)):
- self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
- self.valid_clear_load.update({'user': 'test_user_func',
- 'tgt': 'minion1',
- 'fun': 'test.empty',
- 'arg': ['TEST']})
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.call_args[0][0]['fun'], 'test.empty')
- @skipIf(salt.utils.platform.is_windows(), 'PAM eauth not available on Windows')
- def test_args_simple_match(self):
- '''
- Test simple arg restriction allowed.
- 'test_user_func':
- minion1:
- - test.echo:
- args:
- - 'TEST'
- - 'TEST.*'
- '''
- _check_minions_return = {'minions': ['minion1'], 'missing': []}
- with patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value=_check_minions_return)):
- self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
- self.valid_clear_load.update({'user': 'test_user_func',
- 'tgt': 'minion1',
- 'fun': 'test.echo',
- 'arg': ['TEST', 'any', 'TEST ABC']})
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.call_args[0][0]['fun'], 'test.echo')
- @skipIf(salt.utils.platform.is_windows(), 'PAM eauth not available on Windows')
- def test_args_more_args(self):
- '''
- Test simple arg restriction allowed to pass unlisted args.
- 'test_user_func':
- minion1:
- - test.echo:
- args:
- - 'TEST'
- - 'TEST.*'
- '''
- _check_minions_return = {'minions': ['minion1'], 'missing': []}
- with patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value=_check_minions_return)):
- self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
- self.valid_clear_load.update({'user': 'test_user_func',
- 'tgt': 'minion1',
- 'fun': 'test.echo',
- 'arg': ['TEST',
- 'any',
- 'TEST ABC',
- 'arg 3',
- {'kwarg1': 'val1',
- '__kwarg__': True}]})
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.call_args[0][0]['fun'], 'test.echo')
- def test_args_simple_forbidden(self):
- '''
- Test simple arg restriction forbidden.
- 'test_user_func':
- minion1:
- - test.echo:
- args:
- - 'TEST'
- - 'TEST.*'
- '''
- _check_minions_return = {'minions': ['minion1'], 'missing': []}
- with patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value=_check_minions_return)):
- self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
- # Wrong last arg
- self.valid_clear_load.update({'user': 'test_user_func',
- 'tgt': 'minion1',
- 'fun': 'test.echo',
- 'arg': ['TEST', 'any', 'TESLA']})
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Wrong first arg
- self.valid_clear_load['arg'] = ['TES', 'any', 'TEST1234']
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Missing the last arg
- self.valid_clear_load['arg'] = ['TEST', 'any']
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # No args
- self.valid_clear_load['arg'] = []
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- @skipIf(salt.utils.platform.is_windows(), 'PAM eauth not available on Windows')
- def test_args_kwargs_match(self):
- '''
- Test simple kwargs restriction allowed.
- 'test_user_func':
- '*':
- - test.echo:
- kwargs:
- text: 'KWMSG:.*'
- '''
- _check_minions_return = {'minions': ['some_minions'], 'missing': []}
- with patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value=_check_minions_return)):
- self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
- self.valid_clear_load.update({'user': 'test_user_func',
- 'tgt': '*',
- 'fun': 'test.echo',
- 'arg': [{'text': 'KWMSG: a message',
- 'anything': 'hello all',
- 'none': 'hello none',
- '__kwarg__': True}]})
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.call_args[0][0]['fun'], 'test.echo')
- def test_args_kwargs_mismatch(self):
- '''
- Test simple kwargs restriction allowed.
- 'test_user_func':
- '*':
- - test.echo:
- kwargs:
- text: 'KWMSG:.*'
- '''
- _check_minions_return = {'minions': ['some_minions'], 'missing': []}
- with patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value=_check_minions_return)):
- self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
- self.valid_clear_load.update({'user': 'test_user_func',
- 'tgt': '*',
- 'fun': 'test.echo'})
- # Wrong kwarg value
- self.valid_clear_load['arg'] = [{'text': 'KWMSG a message',
- 'anything': 'hello all',
- 'none': 'hello none',
- '__kwarg__': True}]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Missing kwarg value
- self.valid_clear_load['arg'] = [{'anything': 'hello all',
- 'none': 'hello none',
- '__kwarg__': True}]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- self.valid_clear_load['arg'] = [{'__kwarg__': True}]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- self.valid_clear_load['arg'] = [{}]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- self.valid_clear_load['arg'] = []
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Missing kwarg allowing any value
- self.valid_clear_load['arg'] = [{'text': 'KWMSG: a message',
- 'none': 'hello none',
- '__kwarg__': True}]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- self.valid_clear_load['arg'] = [{'text': 'KWMSG: a message',
- 'anything': 'hello all',
- '__kwarg__': True}]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- @skipIf(salt.utils.platform.is_windows(), 'PAM eauth not available on Windows')
- def test_args_mixed_match(self):
- '''
- Test mixed args and kwargs restriction allowed.
- 'test_user_func':
- '*':
- - 'my_mod.*':
- args:
- - 'a.*'
- - 'b.*'
- kwargs:
- 'kwa': 'kwa.*'
- 'kwb': 'kwb'
- '''
- _check_minions_return = {'minions': ['some_minions'], 'missing': []}
- with patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value=_check_minions_return)):
- self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
- self.valid_clear_load.update({'user': 'test_user_func',
- 'tgt': '*',
- 'fun': 'my_mod.some_func',
- 'arg': ['alpha',
- 'beta',
- 'gamma',
- {'kwa': 'kwarg #1',
- 'kwb': 'kwb',
- 'one_more': 'just one more',
- '__kwarg__': True}]})
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.call_args[0][0]['fun'],
- 'my_mod.some_func')
- def test_args_mixed_mismatch(self):
- '''
- Test mixed args and kwargs restriction forbidden.
- 'test_user_func':
- '*':
- - 'my_mod.*':
- args:
- - 'a.*'
- - 'b.*'
- kwargs:
- 'kwa': 'kwa.*'
- 'kwb': 'kwb'
- '''
- _check_minions_return = {'minions': ['some_minions'], 'missing': []}
- with patch('salt.utils.minions.CkMinions.check_minions', MagicMock(return_value=_check_minions_return)):
- self.valid_clear_load['kwargs'].update({'username': 'test_user_func'})
- self.valid_clear_load.update({'user': 'test_user_func',
- 'tgt': '*',
- 'fun': 'my_mod.some_func'})
- # Wrong arg value
- self.valid_clear_load['arg'] = ['alpha',
- 'gamma',
- {'kwa': 'kwarg #1',
- 'kwb': 'kwb',
- 'one_more': 'just one more',
- '__kwarg__': True}]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Wrong kwarg value
- self.valid_clear_load['arg'] = ['alpha',
- 'beta',
- 'gamma',
- {'kwa': 'kkk',
- 'kwb': 'kwb',
- 'one_more': 'just one more',
- '__kwarg__': True}]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Missing arg
- self.valid_clear_load['arg'] = ['alpha',
- {'kwa': 'kwarg #1',
- 'kwb': 'kwb',
- 'one_more': 'just one more',
- '__kwarg__': True}]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- # Missing kwarg
- self.valid_clear_load['arg'] = ['alpha',
- 'beta',
- 'gamma',
- {'kwa': 'kwarg #1',
- 'one_more': 'just one more',
- '__kwarg__': True}]
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.fire_event_mock.mock_calls, [])
- class AuthACLTestCase(ModuleCase):
- '''
- A class to check various aspects of the publisher ACL system
- '''
- def setUp(self):
- self.auth_check_mock = MagicMock(return_value=True)
- opts = self.get_temp_config('master')
- patches = (
- ('salt.minion.MasterMinion', MagicMock()),
- ('salt.utils.verify.check_path_traversal', MagicMock()),
- ('salt.utils.minions.CkMinions.auth_check', self.auth_check_mock),
- ('salt.auth.LoadAuth.time_auth', MagicMock(return_value=True)),
- ('salt.client.get_local_client', MagicMock(return_value=opts['conf_file'])),
- )
- for mod, mock in patches:
- patcher = patch(mod, mock)
- patcher.start()
- self.addCleanup(patcher.stop)
- self.addCleanup(delattr, self, 'auth_check_mock')
- opts['publisher_acl'] = {}
- opts['publisher_acl_blacklist'] = {}
- opts['master_job_cache'] = ''
- opts['sign_pub_messages'] = False
- opts['con_cache'] = ''
- opts['external_auth'] = {}
- opts['external_auth']['pam'] = {'test_user': [{'alpha_minion': ['test.ping']}]}
- self.clear = salt.master.ClearFuncs(opts, MagicMock())
- self.addCleanup(delattr, self, 'clear')
- # overwrite the _send_pub method so we don't have to serialize MagicMock
- self.clear._send_pub = lambda payload: True
- # make sure to return a JID, instead of a mock
- self.clear.mminion.returners = {'.prep_jid': lambda x: 1}
- self.valid_clear_load = {'tgt_type': 'glob',
- 'jid': '',
- 'cmd': 'publish',
- 'tgt': 'test_minion',
- 'kwargs':
- {'username': 'test_user',
- 'password': 'test_password',
- 'show_timeout': False,
- 'eauth': 'pam',
- 'show_jid': False},
- 'ret': '',
- 'user': 'test_user',
- 'key': '',
- 'arg': '',
- 'fun': 'test.ping',
- }
- self.addCleanup(delattr, self, 'valid_clear_load')
- @skipIf(salt.utils.platform.is_windows(), 'PAM eauth not available on Windows')
- def test_acl_simple_allow(self):
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.auth_check_mock.call_args[0][0],
- [{'alpha_minion': ['test.ping']}])
- def test_acl_simple_deny(self):
- with patch('salt.auth.LoadAuth.get_auth_list', MagicMock(return_value=[{'beta_minion': ['test.ping']}])):
- self.clear.publish(self.valid_clear_load)
- self.assertEqual(self.auth_check_mock.call_args[0][0],
- [{'beta_minion': ['test.ping']}])
|