123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- # coding: utf-8
- from __future__ import absolute_import
- import os
- import salt.utils.json
- import salt.utils.stringutils
- import tests.support.cherrypy_testclasses as cptc
- from salt.ext.six.moves.urllib.parse import ( # pylint: disable=no-name-in-module,import-error
- urlencode,
- )
- from tests.support.helpers import flaky, slowTest
- class TestAuth(cptc.BaseRestCherryPyTest):
- def test_get_root_noauth(self):
- """
- GET requests to the root URL should not require auth
- """
- request, response = self.request("/")
- self.assertEqual(response.status, "200 OK")
- def test_post_root_auth(self):
- """
- POST requests to the root URL redirect to login
- """
- request, response = self.request("/", method="POST", data={})
- self.assertEqual(response.status, "401 Unauthorized")
- def test_login_noauth(self):
- """
- GET requests to the login URL should not require auth
- """
- request, response = self.request("/login")
- self.assertEqual(response.status, "200 OK")
- def test_webhook_auth(self):
- """
- Requests to the webhook URL require auth by default
- """
- request, response = self.request("/hook", method="POST", data={})
- self.assertEqual(response.status, "401 Unauthorized")
- class TestLogin(cptc.BaseRestCherryPyTest):
- auth_creds = (("username", "saltdev"), ("password", "saltdev"), ("eauth", "auto"))
- def test_good_login(self):
- """
- Test logging in
- """
- body = urlencode(self.auth_creds)
- request, response = self.request(
- "/login",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- self.assertEqual(response.status, "200 OK")
- return response
- def test_bad_login(self):
- """
- Test logging in
- """
- body = urlencode({"totally": "invalid_creds"})
- request, response = self.request(
- "/login",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- self.assertEqual(response.status, "401 Unauthorized")
- def test_logout(self):
- ret = self.test_good_login()
- token = ret.headers["X-Auth-Token"]
- body = urlencode({})
- request, response = self.request(
- "/logout",
- method="POST",
- body=body,
- headers={
- "content-type": "application/x-www-form-urlencoded",
- "X-Auth-Token": token,
- },
- )
- self.assertEqual(response.status, "200 OK")
- class TestRun(cptc.BaseRestCherryPyTest):
- auth_creds = (
- ("username", "saltdev_auto"),
- ("password", "saltdev"),
- ("eauth", "auto"),
- )
- low = (
- ("client", "local"),
- ("tgt", "*"),
- ("fun", "test.ping"),
- )
- @slowTest
- def test_run_good_login(self):
- """
- Test the run URL with good auth credentials
- """
- cmd = dict(self.low, **dict(self.auth_creds))
- body = urlencode(cmd)
- request, response = self.request(
- "/run",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- self.assertEqual(response.status, "200 OK")
- def test_run_bad_login(self):
- """
- Test the run URL with bad auth credentials
- """
- cmd = dict(self.low, **{"totally": "invalid_creds"})
- body = urlencode(cmd)
- request, response = self.request(
- "/run",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- self.assertEqual(response.status, "401 Unauthorized")
- def test_run_empty_token(self):
- """
- Test the run URL with empty token
- """
- cmd = dict(self.low, **{"token": ""})
- body = urlencode(cmd)
- request, response = self.request(
- "/run",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- assert response.status == "401 Unauthorized"
- def test_run_empty_token_upercase(self):
- """
- Test the run URL with empty token with upercase characters
- """
- cmd = dict(self.low, **{"ToKen": ""})
- body = urlencode(cmd)
- request, response = self.request(
- "/run",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- assert response.status == "401 Unauthorized"
- def test_run_wrong_token(self):
- """
- Test the run URL with incorrect token
- """
- cmd = dict(self.low, **{"token": "bad"})
- body = urlencode(cmd)
- request, response = self.request(
- "/run",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- assert response.status == "401 Unauthorized"
- def test_run_pathname_token(self):
- """
- Test the run URL with path that exists in token
- """
- cmd = dict(self.low, **{"token": os.path.join("etc", "passwd")})
- body = urlencode(cmd)
- request, response = self.request(
- "/run",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- assert response.status == "401 Unauthorized"
- def test_run_pathname_not_exists_token(self):
- """
- Test the run URL with path that does not exist in token
- """
- cmd = dict(self.low, **{"token": os.path.join("tmp", "doesnotexist")})
- body = urlencode(cmd)
- request, response = self.request(
- "/run",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- assert response.status == "401 Unauthorized"
- @slowTest
- def test_run_extra_parameters(self):
- """
- Test the run URL with good auth credentials
- """
- cmd = dict(self.low, **dict(self.auth_creds))
- cmd["id_"] = "someminionname"
- body = urlencode(cmd)
- request, response = self.request(
- "/run",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- self.assertEqual(response.status, "200 OK")
- class TestWebhookDisableAuth(cptc.BaseRestCherryPyTest):
- def __get_opts__(self):
- return {
- "rest_cherrypy": {
- "port": 8000,
- "debug": True,
- "webhook_disable_auth": True,
- },
- }
- def test_webhook_noauth(self):
- """
- Auth can be disabled for requests to the webhook URL
- """
- body = urlencode({"foo": "Foo!"})
- request, response = self.request(
- "/hook",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- self.assertEqual(response.status, "200 OK")
- class TestArgKwarg(cptc.BaseRestCherryPyTest):
- auth_creds = (("username", "saltdev"), ("password", "saltdev"), ("eauth", "auto"))
- low = (
- ("client", "runner"),
- ("fun", "test.arg"),
- # use singular form for arg and kwarg
- ("arg", [1234]),
- ("kwarg", {"ext_source": "redis"}),
- )
- def _token(self):
- """
- Return the token
- """
- body = urlencode(self.auth_creds)
- request, response = self.request(
- "/login",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- return response.headers["X-Auth-Token"]
- @slowTest
- def test_accepts_arg_kwarg_keys(self):
- """
- Ensure that (singular) arg and kwarg keys (for passing parameters)
- are supported by runners.
- """
- cmd = dict(self.low)
- body = salt.utils.json.dumps(cmd)
- request, response = self.request(
- "/",
- method="POST",
- body=body,
- headers={
- "content-type": "application/json",
- "X-Auth-Token": self._token(),
- "Accept": "application/json",
- },
- )
- resp = salt.utils.json.loads(salt.utils.stringutils.to_str(response.body[0]))
- self.assertEqual(resp["return"][0]["args"], [1234])
- self.assertEqual(resp["return"][0]["kwargs"], {"ext_source": "redis"})
- class TestJobs(cptc.BaseRestCherryPyTest):
- auth_creds = (
- ("username", "saltdev_auto"),
- ("password", "saltdev"),
- ("eauth", "auto"),
- )
- low = (
- ("client", "local"),
- ("tgt", "*"),
- ("fun", "test.ping"),
- )
- def _token(self):
- """
- Return the token
- """
- body = urlencode(self.auth_creds)
- request, response = self.request(
- "/login",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- return response.headers["X-Auth-Token"]
- def _add_job(self):
- """
- Helper function to add a job to the job cache
- """
- cmd = dict(self.low, **dict(self.auth_creds))
- body = urlencode(cmd)
- request, response = self.request(
- "/run",
- method="POST",
- body=body,
- headers={"content-type": "application/x-www-form-urlencoded"},
- )
- self.assertEqual(response.status, "200 OK")
- @flaky
- @slowTest
- def test_all_jobs(self):
- """
- test query to /jobs returns job data
- """
- self._add_job()
- request, response = self.request(
- "/jobs",
- method="GET",
- headers={"Accept": "application/json", "X-Auth-Token": self._token()},
- )
- resp = salt.utils.json.loads(salt.utils.stringutils.to_str(response.body[0]))
- self.assertIn("test.ping", str(resp["return"]))
- self.assertEqual(response.status, "200 OK")
|