test_cmdmod.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import os
  4. import sys
  5. import tempfile
  6. import textwrap
  7. from contextlib import contextmanager
  8. import pytest
  9. import salt.utils.path
  10. import salt.utils.platform
  11. from salt.ext import six
  12. from tests.support.case import ModuleCase
  13. from tests.support.runtests import RUNTIME_VARS
  14. from tests.support.unit import skipIf
  15. AVAILABLE_PYTHON_EXECUTABLE = salt.utils.path.which_bin(
  16. ["python", "python2", "python2.6", "python2.7"]
  17. )
  18. @pytest.mark.windows_whitelisted
  19. class CMDModuleTest(ModuleCase):
  20. """
  21. Validate the cmd module
  22. """
  23. def setUp(self):
  24. self.runas_usr = "nobody"
  25. if salt.utils.platform.is_darwin():
  26. self.runas_usr = "macsalttest"
  27. @contextmanager
  28. def _ensure_user_exists(self, name):
  29. if name in self.run_function("user.info", [name]).values():
  30. # User already exists; don't touch
  31. yield
  32. else:
  33. # Need to create user for test
  34. self.run_function("user.add", [name])
  35. try:
  36. yield
  37. finally:
  38. self.run_function("user.delete", [name], remove=True)
  39. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  40. def test_run(self):
  41. """
  42. cmd.run
  43. """
  44. shell = os.environ.get("SHELL")
  45. if shell is None:
  46. # Failed to get the SHELL var, don't run
  47. self.skipTest("Unable to get the SHELL environment variable")
  48. self.assertTrue(self.run_function("cmd.run", ["echo $SHELL"]))
  49. self.assertEqual(
  50. self.run_function(
  51. "cmd.run", ["echo $SHELL", "shell={0}".format(shell)], python_shell=True
  52. ).rstrip(),
  53. shell,
  54. )
  55. self.assertEqual(
  56. self.run_function("cmd.run", ["ls / | grep etc"], python_shell=True), "etc"
  57. )
  58. self.assertEqual(
  59. self.run_function(
  60. "cmd.run",
  61. ['echo {{grains.id}} | awk "{print $1}"'],
  62. template="jinja",
  63. python_shell=True,
  64. ),
  65. "minion",
  66. )
  67. self.assertEqual(
  68. self.run_function(
  69. "cmd.run", ["grep f"], stdin="one\ntwo\nthree\nfour\nfive\n"
  70. ),
  71. "four\nfive",
  72. )
  73. self.assertEqual(
  74. self.run_function(
  75. "cmd.run", ['echo "a=b" | sed -e s/=/:/g'], python_shell=True
  76. ),
  77. "a:b",
  78. )
  79. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  80. def test_stdout(self):
  81. """
  82. cmd.run_stdout
  83. """
  84. self.assertEqual(
  85. self.run_function("cmd.run_stdout", ['echo "cheese"']).rstrip(),
  86. "cheese" if not salt.utils.platform.is_windows() else '"cheese"',
  87. )
  88. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  89. def test_stderr(self):
  90. """
  91. cmd.run_stderr
  92. """
  93. if sys.platform.startswith(("freebsd", "openbsd")):
  94. shell = "/bin/sh"
  95. else:
  96. shell = "/bin/bash"
  97. self.assertEqual(
  98. self.run_function(
  99. "cmd.run_stderr",
  100. ['echo "cheese" 1>&2', "shell={0}".format(shell)],
  101. python_shell=True,
  102. ).rstrip(),
  103. "cheese" if not salt.utils.platform.is_windows() else '"cheese"',
  104. )
  105. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  106. def test_run_all(self):
  107. """
  108. cmd.run_all
  109. """
  110. if sys.platform.startswith(("freebsd", "openbsd")):
  111. shell = "/bin/sh"
  112. else:
  113. shell = "/bin/bash"
  114. ret = self.run_function(
  115. "cmd.run_all",
  116. ['echo "cheese" 1>&2', "shell={0}".format(shell)],
  117. python_shell=True,
  118. )
  119. self.assertTrue("pid" in ret)
  120. self.assertTrue("retcode" in ret)
  121. self.assertTrue("stdout" in ret)
  122. self.assertTrue("stderr" in ret)
  123. self.assertTrue(isinstance(ret.get("pid"), int))
  124. self.assertTrue(isinstance(ret.get("retcode"), int))
  125. self.assertTrue(isinstance(ret.get("stdout"), six.string_types))
  126. self.assertTrue(isinstance(ret.get("stderr"), six.string_types))
  127. self.assertEqual(
  128. ret.get("stderr").rstrip(),
  129. "cheese" if not salt.utils.platform.is_windows() else '"cheese"',
  130. )
  131. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  132. def test_retcode(self):
  133. """
  134. cmd.retcode
  135. """
  136. self.assertEqual(
  137. self.run_function("cmd.retcode", ["exit 0"], python_shell=True), 0
  138. )
  139. self.assertEqual(
  140. self.run_function("cmd.retcode", ["exit 1"], python_shell=True), 1
  141. )
  142. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  143. def test_run_all_with_success_retcodes(self):
  144. """
  145. cmd.run with success_retcodes
  146. """
  147. ret = self.run_function(
  148. "cmd.run_all", ["exit 42"], success_retcodes=[42], python_shell=True
  149. )
  150. self.assertTrue("retcode" in ret)
  151. self.assertEqual(ret.get("retcode"), 0)
  152. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  153. def test_retcode_with_success_retcodes(self):
  154. """
  155. cmd.run with success_retcodes
  156. """
  157. ret = self.run_function(
  158. "cmd.retcode", ["exit 42"], success_retcodes=[42], python_shell=True
  159. )
  160. self.assertEqual(ret, 0)
  161. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  162. def test_blacklist_glob(self):
  163. """
  164. cmd_blacklist_glob
  165. """
  166. self.assertEqual(
  167. self.run_function("cmd.run", ["bad_command --foo"]).rstrip(),
  168. 'ERROR: The shell command "bad_command --foo" is not permitted',
  169. )
  170. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  171. def test_script(self):
  172. """
  173. cmd.script
  174. """
  175. args = "saltines crackers biscuits=yes"
  176. script = "salt://script.py"
  177. ret = self.run_function("cmd.script", [script, args])
  178. self.assertEqual(ret["stdout"], args)
  179. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  180. def test_script_retcode(self):
  181. """
  182. cmd.script_retcode
  183. """
  184. script = "salt://script.py"
  185. ret = self.run_function("cmd.script_retcode", [script])
  186. self.assertEqual(ret, 0)
  187. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  188. def test_script_cwd(self):
  189. """
  190. cmd.script with cwd
  191. """
  192. tmp_cwd = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  193. args = "saltines crackers biscuits=yes"
  194. script = "salt://script.py"
  195. ret = self.run_function("cmd.script", [script, args], cwd=tmp_cwd)
  196. self.assertEqual(ret["stdout"], args)
  197. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  198. def test_script_cwd_with_space(self):
  199. """
  200. cmd.script with cwd
  201. """
  202. tmp_cwd = "{0}{1}test 2".format(
  203. tempfile.mkdtemp(dir=RUNTIME_VARS.TMP), os.path.sep
  204. )
  205. os.mkdir(tmp_cwd)
  206. args = "saltines crackers biscuits=yes"
  207. script = "salt://script.py"
  208. ret = self.run_function("cmd.script", [script, args], cwd=tmp_cwd)
  209. self.assertEqual(ret["stdout"], args)
  210. @pytest.mark.destructive_test
  211. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  212. def test_tty(self):
  213. """
  214. cmd.tty
  215. """
  216. for tty in ("tty0", "pts3"):
  217. if os.path.exists(os.path.join("/dev", tty)):
  218. ret = self.run_function("cmd.tty", [tty, "apply salt liberally"])
  219. self.assertTrue("Success" in ret)
  220. @pytest.mark.skip_if_binaries_missing("which")
  221. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  222. def test_which(self):
  223. """
  224. cmd.which
  225. """
  226. self.assertEqual(
  227. self.run_function("cmd.which", ["cat"]).rstrip(),
  228. self.run_function("cmd.run", ["which cat"]).rstrip(),
  229. )
  230. @pytest.mark.skip_if_binaries_missing("which")
  231. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  232. def test_which_bin(self):
  233. """
  234. cmd.which_bin
  235. """
  236. cmds = ["pip3", "pip2", "pip", "pip-python"]
  237. ret = self.run_function("cmd.which_bin", [cmds])
  238. self.assertTrue(os.path.split(ret)[1] in cmds)
  239. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  240. def test_has_exec(self):
  241. """
  242. cmd.has_exec
  243. """
  244. self.assertTrue(
  245. self.run_function("cmd.has_exec", [AVAILABLE_PYTHON_EXECUTABLE])
  246. )
  247. self.assertFalse(
  248. self.run_function("cmd.has_exec", ["alllfsdfnwieulrrh9123857ygf"])
  249. )
  250. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  251. def test_exec_code(self):
  252. """
  253. cmd.exec_code
  254. """
  255. code = textwrap.dedent(
  256. """\
  257. import sys
  258. sys.stdout.write('cheese')"""
  259. )
  260. self.assertEqual(
  261. self.run_function(
  262. "cmd.exec_code", [AVAILABLE_PYTHON_EXECUTABLE, code]
  263. ).rstrip(),
  264. "cheese",
  265. )
  266. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  267. def test_exec_code_with_single_arg(self):
  268. """
  269. cmd.exec_code
  270. """
  271. code = textwrap.dedent(
  272. """\
  273. import sys
  274. sys.stdout.write(sys.argv[1])"""
  275. )
  276. arg = "cheese"
  277. self.assertEqual(
  278. self.run_function(
  279. "cmd.exec_code", [AVAILABLE_PYTHON_EXECUTABLE, code], args=arg
  280. ).rstrip(),
  281. arg,
  282. )
  283. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  284. def test_exec_code_with_multiple_args(self):
  285. """
  286. cmd.exec_code
  287. """
  288. code = textwrap.dedent(
  289. """\
  290. import sys
  291. sys.stdout.write(sys.argv[1])"""
  292. )
  293. arg = "cheese"
  294. self.assertEqual(
  295. self.run_function(
  296. "cmd.exec_code", [AVAILABLE_PYTHON_EXECUTABLE, code], args=[arg, "test"]
  297. ).rstrip(),
  298. arg,
  299. )
  300. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  301. def test_quotes(self):
  302. """
  303. cmd.run with quoted command
  304. """
  305. cmd = """echo 'SELECT * FROM foo WHERE bar="baz"' """
  306. expected_result = 'SELECT * FROM foo WHERE bar="baz"'
  307. if salt.utils.platform.is_windows():
  308. expected_result = "'SELECT * FROM foo WHERE bar=\"baz\"'"
  309. result = self.run_function("cmd.run_stdout", [cmd]).strip()
  310. self.assertEqual(result, expected_result)
  311. @pytest.mark.skip_if_not_root
  312. @skipIf(salt.utils.platform.is_windows(), "skip windows, requires password")
  313. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  314. def test_quotes_runas(self):
  315. """
  316. cmd.run with quoted command
  317. """
  318. cmd = """echo 'SELECT * FROM foo WHERE bar="baz"' """
  319. expected_result = 'SELECT * FROM foo WHERE bar="baz"'
  320. runas = RUNTIME_VARS.RUNNING_TESTS_USER
  321. result = self.run_function("cmd.run_stdout", [cmd], runas=runas).strip()
  322. self.assertEqual(result, expected_result)
  323. @pytest.mark.destructive_test
  324. @pytest.mark.skip_if_not_root
  325. @skipIf(salt.utils.platform.is_windows(), "skip windows, uses unix commands")
  326. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  327. def test_avoid_injecting_shell_code_as_root(self):
  328. """
  329. cmd.run should execute the whole command as the "runas" user, not
  330. running substitutions as root.
  331. """
  332. cmd = "echo $(id -u)"
  333. root_id = self.run_function("cmd.run_stdout", [cmd])
  334. runas_root_id = self.run_function(
  335. "cmd.run_stdout", [cmd], runas=RUNTIME_VARS.RUNNING_TESTS_USER
  336. )
  337. with self._ensure_user_exists(self.runas_usr):
  338. user_id = self.run_function("cmd.run_stdout", [cmd], runas=self.runas_usr)
  339. self.assertNotEqual(user_id, root_id)
  340. self.assertNotEqual(user_id, runas_root_id)
  341. self.assertEqual(root_id, runas_root_id)
  342. @pytest.mark.destructive_test
  343. @pytest.mark.skip_if_not_root
  344. @skipIf(salt.utils.platform.is_windows(), "skip windows, uses unix commands")
  345. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  346. def test_cwd_runas(self):
  347. """
  348. cmd.run should be able to change working directory correctly, whether
  349. or not runas is in use.
  350. """
  351. cmd = "pwd"
  352. tmp_cwd = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  353. os.chmod(tmp_cwd, 0o711)
  354. cwd_normal = self.run_function("cmd.run_stdout", [cmd], cwd=tmp_cwd).rstrip(
  355. "\n"
  356. )
  357. self.assertEqual(tmp_cwd, cwd_normal)
  358. with self._ensure_user_exists(self.runas_usr):
  359. cwd_runas = self.run_function(
  360. "cmd.run_stdout", [cmd], cwd=tmp_cwd, runas=self.runas_usr
  361. ).rstrip("\n")
  362. self.assertEqual(tmp_cwd, cwd_runas)
  363. @pytest.mark.destructive_test
  364. @pytest.mark.skip_if_not_root
  365. @skipIf(not salt.utils.platform.is_darwin(), "applicable to MacOS only")
  366. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  367. def test_runas_env(self):
  368. """
  369. cmd.run should be able to change working directory correctly, whether
  370. or not runas is in use.
  371. """
  372. with self._ensure_user_exists(self.runas_usr):
  373. user_path = self.run_function(
  374. "cmd.run_stdout", ['printf %s "$PATH"'], runas=self.runas_usr
  375. )
  376. # XXX: Not sure of a better way. Environment starts out with
  377. # /bin:/usr/bin and should be populated by path helper and the bash
  378. # profile.
  379. self.assertNotEqual("/bin:/usr/bin", user_path)
  380. @pytest.mark.destructive_test
  381. @pytest.mark.skip_if_not_root
  382. @skipIf(not salt.utils.platform.is_darwin(), "applicable to MacOS only")
  383. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  384. def test_runas_complex_command_bad_cwd(self):
  385. """
  386. cmd.run should not accidentally run parts of a complex command when
  387. given a cwd which cannot be used by the user the command is run as.
  388. Due to the need to use `su -l` to login to another user on MacOS, we
  389. cannot cd into directories that the target user themselves does not
  390. have execute permission for. To an extent, this test is testing that
  391. buggy behaviour, but its purpose is to ensure that the greater bug of
  392. running commands after failing to cd does not occur.
  393. """
  394. tmp_cwd = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  395. os.chmod(tmp_cwd, 0o700)
  396. with self._ensure_user_exists(self.runas_usr):
  397. cmd_result = self.run_function(
  398. "cmd.run_all",
  399. ['pwd; pwd; : $(echo "You have failed the test" >&2)'],
  400. cwd=tmp_cwd,
  401. runas=self.runas_usr,
  402. )
  403. self.assertEqual("", cmd_result["stdout"])
  404. self.assertNotIn("You have failed the test", cmd_result["stderr"])
  405. self.assertNotEqual(0, cmd_result["retcode"])
  406. @skipIf(salt.utils.platform.is_windows(), "minion is windows")
  407. @pytest.mark.skip_if_not_root
  408. @pytest.mark.destructive_test
  409. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  410. def test_runas(self):
  411. """
  412. Ensure that the env is the runas user's
  413. """
  414. with self._ensure_user_exists(self.runas_usr):
  415. out = self.run_function(
  416. "cmd.run", ["env"], runas=self.runas_usr
  417. ).splitlines()
  418. self.assertIn("USER={0}".format(self.runas_usr), out)
  419. @skipIf(not salt.utils.path.which_bin("sleep"), "sleep cmd not installed")
  420. def test_timeout(self):
  421. """
  422. cmd.run trigger timeout
  423. """
  424. out = self.run_function(
  425. "cmd.run", ["sleep 2 && echo hello"], f_timeout=1, python_shell=True
  426. )
  427. self.assertTrue("Timed out" in out)
  428. @skipIf(not salt.utils.path.which_bin("sleep"), "sleep cmd not installed")
  429. def test_timeout_success(self):
  430. """
  431. cmd.run sufficient timeout to succeed
  432. """
  433. out = self.run_function(
  434. "cmd.run", ["sleep 1 && echo hello"], f_timeout=2, python_shell=True
  435. )
  436. self.assertEqual(out, "hello")
  437. @pytest.mark.slow_test(seconds=120) # Test takes >60 and <=120 seconds
  438. def test_hide_output(self):
  439. """
  440. Test the hide_output argument
  441. """
  442. ls_command = (
  443. ["ls", "/"] if not salt.utils.platform.is_windows() else ["dir", "c:\\"]
  444. )
  445. error_command = ["thiscommanddoesnotexist"]
  446. # cmd.run
  447. out = self.run_function("cmd.run", ls_command, hide_output=True)
  448. self.assertEqual(out, "")
  449. # cmd.shell
  450. out = self.run_function("cmd.shell", ls_command, hide_output=True)
  451. self.assertEqual(out, "")
  452. # cmd.run_stdout
  453. out = self.run_function("cmd.run_stdout", ls_command, hide_output=True)
  454. self.assertEqual(out, "")
  455. # cmd.run_stderr
  456. out = self.run_function("cmd.shell", error_command, hide_output=True)
  457. self.assertEqual(out, "")
  458. # cmd.run_all (command should have produced stdout)
  459. out = self.run_function("cmd.run_all", ls_command, hide_output=True)
  460. self.assertEqual(out["stdout"], "")
  461. self.assertEqual(out["stderr"], "")
  462. # cmd.run_all (command should have produced stderr)
  463. out = self.run_function("cmd.run_all", error_command, hide_output=True)
  464. self.assertEqual(out["stdout"], "")
  465. self.assertEqual(out["stderr"], "")
  466. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  467. def test_cmd_run_whoami(self):
  468. """
  469. test return of whoami
  470. """
  471. cmd = self.run_function("cmd.run", ["whoami"])
  472. if salt.utils.platform.is_windows():
  473. self.assertIn("administrator", cmd)
  474. else:
  475. self.assertEqual("root", cmd)
  476. @skipIf(not salt.utils.platform.is_windows(), "minion is not windows")
  477. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  478. def test_windows_env_handling(self):
  479. """
  480. Ensure that nt.environ is used properly with cmd.run*
  481. """
  482. out = self.run_function(
  483. "cmd.run", ["set"], env={"abc": "123", "ABC": "456"}
  484. ).splitlines()
  485. self.assertIn("abc=123", out)
  486. self.assertIn("ABC=456", out)
  487. @skipIf(not salt.utils.platform.is_windows(), "minion is not windows")
  488. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  489. def test_windows_powershell_script_args(self):
  490. """
  491. Ensure that powershell processes inline script in args
  492. """
  493. val = "i like cheese"
  494. args = '-SecureString (ConvertTo-SecureString -String "{0}" -AsPlainText -Force) -ErrorAction Stop'.format(
  495. val
  496. )
  497. script = "salt://issue-56195/test.ps1"
  498. ret = self.run_function("cmd.script", [script], args=args, shell="powershell")
  499. self.assertEqual(ret["stdout"], val)