123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- # coding: utf-8
- # Import python libs
- from __future__ import absolute_import
- import os
- # Import salt libs
- import salt.utils.json
- import salt.utils.stringutils
- # Import test support libs
- import tests.support.cherrypy_testclasses as cptc
- from tests.support.helpers import flaky
- # Import 3rd-party libs
- from salt.ext.six.moves.urllib.parse import urlencode # pylint: disable=no-name-in-module,import-error
- class TestAuth(cptc.BaseRestCherryPyTest):
- def test_get_root_noauth(self):
- '''
- GET requests to the root URL should not require auth
- '''
- request, response = self.request('/')
- self.assertEqual(response.status, '200 OK')
- def test_post_root_auth(self):
- '''
- POST requests to the root URL redirect to login
- '''
- request, response = self.request('/', method='POST', data={})
- self.assertEqual(response.status, '401 Unauthorized')
- def test_login_noauth(self):
- '''
- GET requests to the login URL should not require auth
- '''
- request, response = self.request('/login')
- self.assertEqual(response.status, '200 OK')
- def test_webhook_auth(self):
- '''
- Requests to the webhook URL require auth by default
- '''
- request, response = self.request('/hook', method='POST', data={})
- self.assertEqual(response.status, '401 Unauthorized')
- class TestLogin(cptc.BaseRestCherryPyTest):
- auth_creds = (
- ('username', 'saltdev'),
- ('password', 'saltdev'),
- ('eauth', 'auto'))
- def test_good_login(self):
- '''
- Test logging in
- '''
- body = urlencode(self.auth_creds)
- request, response = self.request('/login', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- })
- self.assertEqual(response.status, '200 OK')
- return response
- def test_bad_login(self):
- '''
- Test logging in
- '''
- body = urlencode({'totally': 'invalid_creds'})
- request, response = self.request('/login', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- })
- self.assertEqual(response.status, '401 Unauthorized')
- def test_logout(self):
- ret = self.test_good_login()
- token = ret.headers['X-Auth-Token']
- body = urlencode({})
- request, response = self.request('/logout', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded',
- 'X-Auth-Token': token,
- })
- self.assertEqual(response.status, '200 OK')
- class TestRun(cptc.BaseRestCherryPyTest):
- auth_creds = (
- ('username', 'saltdev_auto'),
- ('password', 'saltdev'),
- ('eauth', 'auto'))
- low = (
- ('client', 'local'),
- ('tgt', '*'),
- ('fun', 'test.ping'),
- )
- def test_run_good_login(self):
- '''
- Test the run URL with good auth credentials
- '''
- cmd = dict(self.low, **dict(self.auth_creds))
- body = urlencode(cmd)
- request, response = self.request('/run', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- })
- self.assertEqual(response.status, '200 OK')
- def test_run_bad_login(self):
- '''
- Test the run URL with bad auth credentials
- '''
- cmd = dict(self.low, **{'totally': 'invalid_creds'})
- body = urlencode(cmd)
- request, response = self.request('/run', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- })
- self.assertEqual(response.status, '401 Unauthorized')
- def test_run_empty_token(self):
- '''
- Test the run URL with empty token
- '''
- cmd = dict(self.low, **{'token': ''})
- body = urlencode(cmd)
- request, response = self.request('/run', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- })
- assert response.status == '401 Unauthorized'
- def test_run_empty_token_upercase(self):
- '''
- Test the run URL with empty token with upercase characters
- '''
- cmd = dict(self.low, **{'ToKen': ''})
- body = urlencode(cmd)
- request, response = self.request('/run', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- })
- assert response.status == '401 Unauthorized'
- def test_run_wrong_token(self):
- '''
- Test the run URL with incorrect token
- '''
- cmd = dict(self.low, **{'token': 'bad'})
- body = urlencode(cmd)
- request, response = self.request('/run', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- })
- assert response.status == '401 Unauthorized'
- def test_run_pathname_token(self):
- '''
- Test the run URL with path that exists in token
- '''
- cmd = dict(self.low, **{'token': os.path.join('etc', 'passwd')})
- body = urlencode(cmd)
- request, response = self.request('/run', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- })
- assert response.status == '401 Unauthorized'
- def test_run_pathname_not_exists_token(self):
- '''
- Test the run URL with path that does not exist in token
- '''
- cmd = dict(self.low, **{'token': os.path.join('tmp', 'doesnotexist')})
- body = urlencode(cmd)
- request, response = self.request('/run', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- })
- assert response.status == '401 Unauthorized'
- def test_run_extra_parameters(self):
- '''
- Test the run URL with good auth credentials
- '''
- cmd = dict(self.low, **dict(self.auth_creds))
- cmd['id_'] = 'someminionname'
- body = urlencode(cmd)
- request, response = self.request('/run', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- })
- self.assertEqual(response.status, '200 OK')
- class TestWebhookDisableAuth(cptc.BaseRestCherryPyTest):
- def __get_opts__(self):
- return {
- 'rest_cherrypy': {
- 'port': 8000,
- 'debug': True,
- 'webhook_disable_auth': True,
- },
- }
- def test_webhook_noauth(self):
- '''
- Auth can be disabled for requests to the webhook URL
- '''
- body = urlencode({'foo': 'Foo!'})
- request, response = self.request('/hook', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- })
- self.assertEqual(response.status, '200 OK')
- class TestArgKwarg(cptc.BaseRestCherryPyTest):
- auth_creds = (
- ('username', 'saltdev'),
- ('password', 'saltdev'),
- ('eauth', 'auto'))
- low = (
- ('client', 'runner'),
- ('fun', 'test.arg'),
- # use singular form for arg and kwarg
- ('arg', [1234]),
- ('kwarg', {'ext_source': 'redis'}),
- )
- def _token(self):
- '''
- Return the token
- '''
- body = urlencode(self.auth_creds)
- request, response = self.request(
- '/login',
- method='POST',
- body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- }
- )
- return response.headers['X-Auth-Token']
- def test_accepts_arg_kwarg_keys(self):
- '''
- Ensure that (singular) arg and kwarg keys (for passing parameters)
- are supported by runners.
- '''
- cmd = dict(self.low)
- body = salt.utils.json.dumps(cmd)
- request, response = self.request(
- '/',
- method='POST',
- body=body,
- headers={
- 'content-type': 'application/json',
- 'X-Auth-Token': self._token(),
- 'Accept': 'application/json',
- }
- )
- resp = salt.utils.json.loads(salt.utils.stringutils.to_str(response.body[0]))
- self.assertEqual(resp['return'][0]['args'], [1234])
- self.assertEqual(resp['return'][0]['kwargs'],
- {'ext_source': 'redis'})
- class TestJobs(cptc.BaseRestCherryPyTest):
- auth_creds = (
- ('username', 'saltdev_auto'),
- ('password', 'saltdev'),
- ('eauth', 'auto'))
- low = (
- ('client', 'local'),
- ('tgt', '*'),
- ('fun', 'test.ping'),
- )
- def _token(self):
- '''
- Return the token
- '''
- body = urlencode(self.auth_creds)
- request, response = self.request(
- '/login',
- method='POST',
- body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- }
- )
- return response.headers['X-Auth-Token']
- def _add_job(self):
- '''
- Helper function to add a job to the job cache
- '''
- cmd = dict(self.low, **dict(self.auth_creds))
- body = urlencode(cmd)
- request, response = self.request('/run', method='POST', body=body,
- headers={
- 'content-type': 'application/x-www-form-urlencoded'
- })
- self.assertEqual(response.status, '200 OK')
- @flaky
- def test_all_jobs(self):
- '''
- test query to /jobs returns job data
- '''
- self._add_job()
- request, response = self.request('/jobs', method='GET',
- headers={
- 'Accept': 'application/json',
- 'X-Auth-Token': self._token(),
- })
- resp = salt.utils.json.loads(salt.utils.stringutils.to_str(response.body[0]))
- self.assertIn('test.ping', str(resp['return']))
- self.assertEqual(response.status, '200 OK')
|