123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- # encoding: utf-8
- from __future__ import absolute_import, print_function, unicode_literals
- import logging
- import os
- import time
- import pytest
- import salt.config
- import salt.netapi
- from salt.exceptions import EauthAuthenticationError
- from tests.support.case import SSHCase
- from tests.support.helpers import (
- SaveRequestsPostHandler,
- Webserver,
- requires_sshd_server,
- slowTest,
- )
- from tests.support.mock import patch
- from tests.support.paths import TMP, TMP_CONF_DIR
- from tests.support.runtests import RUNTIME_VARS
- from tests.support.unit import TestCase, skipIf
- log = logging.getLogger(__name__)
- @pytest.mark.usefixtures("salt_sub_minion")
- class NetapiClientTest(TestCase):
- eauth_creds = {
- "username": "saltdev_auto",
- "password": "saltdev",
- "eauth": "auto",
- }
- def setUp(self):
- """
- Set up a NetapiClient instance
- """
- opts = salt.config.client_config(
- os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "master")
- )
- self.netapi = salt.netapi.NetapiClient(opts)
- def tearDown(self):
- del self.netapi
- @slowTest
- def test_local(self):
- low = {"client": "local", "tgt": "*", "fun": "test.ping", "timeout": 300}
- low.update(self.eauth_creds)
- ret = self.netapi.run(low)
- # If --proxy is set, it will cause an extra minion_id to be in the
- # response. Since there's not a great way to know if the test
- # runner's proxy minion is running, and we're not testing proxy
- # minions here anyway, just remove it from the response.
- ret.pop("proxytest", None)
- self.assertEqual(ret, {"minion": True, "sub_minion": True})
- @slowTest
- def test_local_batch(self):
- low = {"client": "local_batch", "tgt": "*", "fun": "test.ping", "timeout": 300}
- low.update(self.eauth_creds)
- ret = self.netapi.run(low)
- rets = []
- for _ret in ret:
- rets.append(_ret)
- self.assertIn({"sub_minion": True}, rets)
- self.assertIn({"minion": True}, rets)
- def test_local_async(self):
- low = {"client": "local_async", "tgt": "*", "fun": "test.ping"}
- low.update(self.eauth_creds)
- ret = self.netapi.run(low)
- # Remove all the volatile values before doing the compare.
- self.assertIn("jid", ret)
- ret.pop("jid", None)
- ret["minions"] = sorted(ret["minions"])
- try:
- # If --proxy is set, it will cause an extra minion_id to be in the
- # response. Since there's not a great way to know if the test
- # runner's proxy minion is running, and we're not testing proxy
- # minions here anyway, just remove it from the response.
- ret["minions"].remove("proxytest")
- except ValueError:
- pass
- self.assertEqual(ret, {"minions": sorted(["minion", "sub_minion"])})
- def test_local_unauthenticated(self):
- low = {"client": "local", "tgt": "*", "fun": "test.ping"}
- with self.assertRaises(EauthAuthenticationError) as excinfo:
- ret = self.netapi.run(low)
- @slowTest
- def test_wheel(self):
- low = {"client": "wheel", "fun": "key.list_all"}
- low.update(self.eauth_creds)
- ret = self.netapi.run(low)
- # Remove all the volatile values before doing the compare.
- self.assertIn("tag", ret)
- ret.pop("tag")
- data = ret.get("data", {})
- self.assertIn("jid", data)
- data.pop("jid", None)
- self.assertIn("tag", data)
- data.pop("tag", None)
- ret.pop("_stamp", None)
- data.pop("_stamp", None)
- self.maxDiff = None
- self.assertTrue(
- set(["master.pem", "master.pub"]).issubset(
- set(ret["data"]["return"]["local"])
- )
- )
- @slowTest
- def test_wheel_async(self):
- # Give this test a little breathing room
- time.sleep(3)
- low = {"client": "wheel_async", "fun": "key.list_all"}
- low.update(self.eauth_creds)
- ret = self.netapi.run(low)
- self.assertIn("jid", ret)
- self.assertIn("tag", ret)
- def test_wheel_unauthenticated(self):
- low = {"client": "wheel", "tgt": "*", "fun": "test.ping"}
- with self.assertRaises(EauthAuthenticationError) as excinfo:
- ret = self.netapi.run(low)
- @skipIf(True, "This is not testing anything. Skipping for now.")
- def test_runner(self):
- # TODO: fix race condition in init of event-- right now the event class
- # will finish init even if the underlying zmq socket hasn't connected yet
- # this is problematic for the runnerclient's master_call method if the
- # runner is quick
- # low = {'client': 'runner', 'fun': 'cache.grains'}
- low = {"client": "runner", "fun": "test.sleep", "arg": [2]}
- low.update(self.eauth_creds)
- ret = self.netapi.run(low)
- @skipIf(True, "This is not testing anything. Skipping for now.")
- def test_runner_async(self):
- low = {"client": "runner", "fun": "cache.grains"}
- low.update(self.eauth_creds)
- ret = self.netapi.run(low)
- def test_runner_unauthenticated(self):
- low = {"client": "runner", "tgt": "*", "fun": "test.ping"}
- with self.assertRaises(EauthAuthenticationError) as excinfo:
- ret = self.netapi.run(low)
- @requires_sshd_server
- class NetapiSSHClientTest(SSHCase):
- eauth_creds = {
- "username": "saltdev_auto",
- "password": "saltdev",
- "eauth": "auto",
- }
- def setUp(self):
- """
- Set up a NetapiClient instance
- """
- opts = salt.config.client_config(os.path.join(TMP_CONF_DIR, "master"))
- self.netapi = salt.netapi.NetapiClient(opts)
- self.priv_file = os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "key_test")
- self.rosters = os.path.join(RUNTIME_VARS.TMP_CONF_DIR)
- # Initialize salt-ssh
- self.run_function("test.ping")
- def tearDown(self):
- del self.netapi
- @classmethod
- def setUpClass(cls):
- cls.post_webserver = Webserver(handler=SaveRequestsPostHandler)
- cls.post_webserver.start()
- cls.post_web_root = cls.post_webserver.web_root
- cls.post_web_handler = cls.post_webserver.handler
- @classmethod
- def tearDownClass(cls):
- cls.post_webserver.stop()
- del cls.post_webserver
- @slowTest
- def test_ssh(self):
- low = {
- "client": "ssh",
- "tgt": "localhost",
- "fun": "test.ping",
- "ignore_host_keys": True,
- "roster_file": "roster",
- "rosters": [self.rosters],
- "ssh_priv": self.priv_file,
- }
- low.update(self.eauth_creds)
- ret = self.netapi.run(low)
- self.assertIn("localhost", ret)
- self.assertIn("return", ret["localhost"])
- self.assertEqual(ret["localhost"]["return"], True)
- self.assertEqual(ret["localhost"]["id"], "localhost")
- self.assertEqual(ret["localhost"]["fun"], "test.ping")
- @slowTest
- def test_ssh_unauthenticated(self):
- low = {"client": "ssh", "tgt": "localhost", "fun": "test.ping"}
- with self.assertRaises(EauthAuthenticationError) as excinfo:
- ret = self.netapi.run(low)
- @slowTest
- def test_ssh_unauthenticated_raw_shell_curl(self):
- fun = "-o ProxyCommand curl {0}".format(self.post_web_root)
- low = {"client": "ssh", "tgt": "localhost", "fun": fun, "raw_shell": True}
- ret = None
- with self.assertRaises(EauthAuthenticationError) as excinfo:
- ret = self.netapi.run(low)
- self.assertEqual(self.post_web_handler.received_requests, [])
- self.assertEqual(ret, None)
- @slowTest
- def test_ssh_unauthenticated_raw_shell_touch(self):
- badfile = os.path.join(TMP, "badfile.txt")
- fun = "-o ProxyCommand touch {0}".format(badfile)
- low = {"client": "ssh", "tgt": "localhost", "fun": fun, "raw_shell": True}
- ret = None
- with self.assertRaises(EauthAuthenticationError) as excinfo:
- ret = self.netapi.run(low)
- self.assertEqual(ret, None)
- self.assertFalse(os.path.exists("badfile.txt"))
- @slowTest
- def test_ssh_authenticated_raw_shell_disabled(self):
- badfile = os.path.join(TMP, "badfile.txt")
- fun = "-o ProxyCommand touch {0}".format(badfile)
- low = {"client": "ssh", "tgt": "localhost", "fun": fun, "raw_shell": True}
- low.update(self.eauth_creds)
- ret = None
- with patch.dict(self.netapi.opts, {"netapi_allow_raw_shell": False}):
- with self.assertRaises(EauthAuthenticationError) as excinfo:
- ret = self.netapi.run(low)
- self.assertEqual(ret, None)
- self.assertFalse(os.path.exists("badfile.txt"))
|