1
0

test_cmdmod.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. # -*- coding: utf-8 -*-
  2. # Import python libs
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. from contextlib import contextmanager
  5. import os
  6. import sys
  7. import tempfile
  8. import textwrap
  9. # Import Salt Testing libs
  10. from tests.support.case import ModuleCase
  11. from tests.support.runtests import RUNTIME_VARS
  12. from tests.support.unit import skipIf
  13. # Import salt libs
  14. import salt.utils.path
  15. import salt.utils.platform
  16. # Import 3rd-party libs
  17. import pytest
  18. from salt.ext import six
  19. AVAILABLE_PYTHON_EXECUTABLE = salt.utils.path.which_bin([
  20. 'python',
  21. 'python2',
  22. 'python2.6',
  23. 'python2.7'
  24. ])
  25. @pytest.mark.windows_whitelisted
  26. class CMDModuleTest(ModuleCase):
  27. '''
  28. Validate the cmd module
  29. '''
  30. def setUp(self):
  31. self.runas_usr = 'nobody'
  32. if salt.utils.platform.is_darwin():
  33. self.runas_usr = 'macsalttest'
  34. @contextmanager
  35. def _ensure_user_exists(self, name):
  36. if name in self.run_function('user.info', [name]).values():
  37. # User already exists; don't touch
  38. yield
  39. else:
  40. # Need to create user for test
  41. self.run_function('user.add', [name])
  42. try:
  43. yield
  44. finally:
  45. self.run_function('user.delete', [name], remove=True)
  46. def test_run(self):
  47. '''
  48. cmd.run
  49. '''
  50. shell = os.environ.get('SHELL')
  51. if shell is None:
  52. # Failed to get the SHELL var, don't run
  53. self.skipTest('Unable to get the SHELL environment variable')
  54. self.assertTrue(self.run_function('cmd.run', ['echo $SHELL']))
  55. self.assertEqual(
  56. self.run_function('cmd.run',
  57. ['echo $SHELL',
  58. 'shell={0}'.format(shell)],
  59. python_shell=True).rstrip(), shell)
  60. self.assertEqual(self.run_function('cmd.run',
  61. ['ls / | grep etc'],
  62. python_shell=True), 'etc')
  63. self.assertEqual(self.run_function('cmd.run',
  64. ['echo {{grains.id}} | awk "{print $1}"'],
  65. template='jinja',
  66. python_shell=True), 'minion')
  67. self.assertEqual(self.run_function('cmd.run',
  68. ['grep f'],
  69. stdin='one\ntwo\nthree\nfour\nfive\n'), 'four\nfive')
  70. self.assertEqual(self.run_function('cmd.run',
  71. ['echo "a=b" | sed -e s/=/:/g'],
  72. python_shell=True), 'a:b')
  73. def test_stdout(self):
  74. '''
  75. cmd.run_stdout
  76. '''
  77. self.assertEqual(self.run_function('cmd.run_stdout',
  78. ['echo "cheese"']).rstrip(),
  79. 'cheese' if not salt.utils.platform.is_windows() else '"cheese"')
  80. def test_stderr(self):
  81. '''
  82. cmd.run_stderr
  83. '''
  84. if sys.platform.startswith(('freebsd', 'openbsd')):
  85. shell = '/bin/sh'
  86. else:
  87. shell = '/bin/bash'
  88. self.assertEqual(self.run_function('cmd.run_stderr',
  89. ['echo "cheese" 1>&2',
  90. 'shell={0}'.format(shell)], python_shell=True
  91. ).rstrip(),
  92. 'cheese' if not salt.utils.platform.is_windows() else '"cheese"')
  93. def test_run_all(self):
  94. '''
  95. cmd.run_all
  96. '''
  97. if sys.platform.startswith(('freebsd', 'openbsd')):
  98. shell = '/bin/sh'
  99. else:
  100. shell = '/bin/bash'
  101. ret = self.run_function('cmd.run_all', ['echo "cheese" 1>&2',
  102. 'shell={0}'.format(shell)], python_shell=True)
  103. self.assertTrue('pid' in ret)
  104. self.assertTrue('retcode' in ret)
  105. self.assertTrue('stdout' in ret)
  106. self.assertTrue('stderr' in ret)
  107. self.assertTrue(isinstance(ret.get('pid'), int))
  108. self.assertTrue(isinstance(ret.get('retcode'), int))
  109. self.assertTrue(isinstance(ret.get('stdout'), six.string_types))
  110. self.assertTrue(isinstance(ret.get('stderr'), six.string_types))
  111. self.assertEqual(ret.get('stderr').rstrip(), 'cheese' if not salt.utils.platform.is_windows() else '"cheese"')
  112. def test_retcode(self):
  113. '''
  114. cmd.retcode
  115. '''
  116. self.assertEqual(self.run_function('cmd.retcode', ['exit 0'], python_shell=True), 0)
  117. self.assertEqual(self.run_function('cmd.retcode', ['exit 1'], python_shell=True), 1)
  118. def test_run_all_with_success_retcodes(self):
  119. '''
  120. cmd.run with success_retcodes
  121. '''
  122. ret = self.run_function('cmd.run_all',
  123. ['exit 42'],
  124. success_retcodes=[42],
  125. python_shell=True)
  126. self.assertTrue('retcode' in ret)
  127. self.assertEqual(ret.get('retcode'), 0)
  128. def test_retcode_with_success_retcodes(self):
  129. '''
  130. cmd.run with success_retcodes
  131. '''
  132. ret = self.run_function('cmd.retcode',
  133. ['exit 42'],
  134. success_retcodes=[42],
  135. python_shell=True)
  136. self.assertEqual(ret, 0)
  137. def test_blacklist_glob(self):
  138. '''
  139. cmd_blacklist_glob
  140. '''
  141. self.assertEqual(self.run_function('cmd.run',
  142. ['bad_command --foo']).rstrip(),
  143. 'ERROR: The shell command "bad_command --foo" is not permitted')
  144. def test_script(self):
  145. '''
  146. cmd.script
  147. '''
  148. args = 'saltines crackers biscuits=yes'
  149. script = 'salt://script.py'
  150. ret = self.run_function('cmd.script', [script, args])
  151. self.assertEqual(ret['stdout'], args)
  152. def test_script_retcode(self):
  153. '''
  154. cmd.script_retcode
  155. '''
  156. script = 'salt://script.py'
  157. ret = self.run_function('cmd.script_retcode', [script])
  158. self.assertEqual(ret, 0)
  159. def test_script_cwd(self):
  160. '''
  161. cmd.script with cwd
  162. '''
  163. tmp_cwd = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  164. args = 'saltines crackers biscuits=yes'
  165. script = 'salt://script.py'
  166. ret = self.run_function('cmd.script', [script, args], cwd=tmp_cwd)
  167. self.assertEqual(ret['stdout'], args)
  168. def test_script_cwd_with_space(self):
  169. '''
  170. cmd.script with cwd
  171. '''
  172. tmp_cwd = "{0}{1}test 2".format(tempfile.mkdtemp(dir=RUNTIME_VARS.TMP), os.path.sep)
  173. os.mkdir(tmp_cwd)
  174. args = 'saltines crackers biscuits=yes'
  175. script = 'salt://script.py'
  176. ret = self.run_function('cmd.script', [script, args], cwd=tmp_cwd)
  177. self.assertEqual(ret['stdout'], args)
  178. @pytest.mark.destructive_test
  179. def test_tty(self):
  180. '''
  181. cmd.tty
  182. '''
  183. for tty in ('tty0', 'pts3'):
  184. if os.path.exists(os.path.join('/dev', tty)):
  185. ret = self.run_function('cmd.tty', [tty, 'apply salt liberally'])
  186. self.assertTrue('Success' in ret)
  187. @pytest.mark.skip_if_binaries_missing(['which'])
  188. def test_which(self):
  189. '''
  190. cmd.which
  191. '''
  192. self.assertEqual(self.run_function('cmd.which', ['cat']).rstrip(),
  193. self.run_function('cmd.run', ['which cat']).rstrip())
  194. @pytest.mark.skip_if_binaries_missing(['which'])
  195. def test_which_bin(self):
  196. '''
  197. cmd.which_bin
  198. '''
  199. cmds = ['pip3', 'pip2', 'pip', 'pip-python']
  200. ret = self.run_function('cmd.which_bin', [cmds])
  201. self.assertTrue(os.path.split(ret)[1] in cmds)
  202. def test_has_exec(self):
  203. '''
  204. cmd.has_exec
  205. '''
  206. self.assertTrue(self.run_function('cmd.has_exec',
  207. [AVAILABLE_PYTHON_EXECUTABLE]))
  208. self.assertFalse(self.run_function('cmd.has_exec',
  209. ['alllfsdfnwieulrrh9123857ygf']))
  210. def test_exec_code(self):
  211. '''
  212. cmd.exec_code
  213. '''
  214. code = textwrap.dedent('''\
  215. import sys
  216. sys.stdout.write('cheese')''')
  217. self.assertEqual(self.run_function('cmd.exec_code',
  218. [AVAILABLE_PYTHON_EXECUTABLE,
  219. code]).rstrip(),
  220. 'cheese')
  221. def test_exec_code_with_single_arg(self):
  222. '''
  223. cmd.exec_code
  224. '''
  225. code = textwrap.dedent('''\
  226. import sys
  227. sys.stdout.write(sys.argv[1])''')
  228. arg = 'cheese'
  229. self.assertEqual(self.run_function('cmd.exec_code',
  230. [AVAILABLE_PYTHON_EXECUTABLE,
  231. code],
  232. args=arg).rstrip(),
  233. arg)
  234. def test_exec_code_with_multiple_args(self):
  235. '''
  236. cmd.exec_code
  237. '''
  238. code = textwrap.dedent('''\
  239. import sys
  240. sys.stdout.write(sys.argv[1])''')
  241. arg = 'cheese'
  242. self.assertEqual(self.run_function('cmd.exec_code',
  243. [AVAILABLE_PYTHON_EXECUTABLE,
  244. code],
  245. args=[arg, 'test']).rstrip(),
  246. arg)
  247. def test_quotes(self):
  248. '''
  249. cmd.run with quoted command
  250. '''
  251. cmd = '''echo 'SELECT * FROM foo WHERE bar="baz"' '''
  252. expected_result = 'SELECT * FROM foo WHERE bar="baz"'
  253. if salt.utils.platform.is_windows():
  254. expected_result = '\'SELECT * FROM foo WHERE bar="baz"\''
  255. result = self.run_function('cmd.run_stdout', [cmd]).strip()
  256. self.assertEqual(result, expected_result)
  257. @pytest.mark.skip_if_not_root
  258. @skipIf(salt.utils.platform.is_windows(), 'skip windows, requires password')
  259. def test_quotes_runas(self):
  260. '''
  261. cmd.run with quoted command
  262. '''
  263. cmd = '''echo 'SELECT * FROM foo WHERE bar="baz"' '''
  264. expected_result = 'SELECT * FROM foo WHERE bar="baz"'
  265. runas = RUNTIME_VARS.RUNNING_TESTS_USER
  266. result = self.run_function('cmd.run_stdout', [cmd],
  267. runas=runas).strip()
  268. self.assertEqual(result, expected_result)
  269. @pytest.mark.destructive_test
  270. @pytest.mark.skip_if_not_root
  271. @skipIf(salt.utils.platform.is_windows(), 'skip windows, uses unix commands')
  272. def test_avoid_injecting_shell_code_as_root(self):
  273. '''
  274. cmd.run should execute the whole command as the "runas" user, not
  275. running substitutions as root.
  276. '''
  277. cmd = 'echo $(id -u)'
  278. root_id = self.run_function('cmd.run_stdout', [cmd])
  279. runas_root_id = self.run_function('cmd.run_stdout', [cmd], runas=RUNTIME_VARS.RUNNING_TESTS_USER)
  280. with self._ensure_user_exists(self.runas_usr):
  281. user_id = self.run_function('cmd.run_stdout', [cmd], runas=self.runas_usr)
  282. self.assertNotEqual(user_id, root_id)
  283. self.assertNotEqual(user_id, runas_root_id)
  284. self.assertEqual(root_id, runas_root_id)
  285. @pytest.mark.destructive_test
  286. @pytest.mark.skip_if_not_root
  287. @skipIf(salt.utils.platform.is_windows(), 'skip windows, uses unix commands')
  288. def test_cwd_runas(self):
  289. '''
  290. cmd.run should be able to change working directory correctly, whether
  291. or not runas is in use.
  292. '''
  293. cmd = 'pwd'
  294. tmp_cwd = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  295. os.chmod(tmp_cwd, 0o711)
  296. cwd_normal = self.run_function('cmd.run_stdout', [cmd], cwd=tmp_cwd).rstrip('\n')
  297. self.assertEqual(tmp_cwd, cwd_normal)
  298. with self._ensure_user_exists(self.runas_usr):
  299. cwd_runas = self.run_function('cmd.run_stdout', [cmd], cwd=tmp_cwd, runas=self.runas_usr).rstrip('\n')
  300. self.assertEqual(tmp_cwd, cwd_runas)
  301. @pytest.mark.destructive_test
  302. @pytest.mark.skip_if_not_root
  303. @skipIf(not salt.utils.platform.is_darwin(), 'applicable to MacOS only')
  304. def test_runas_env(self):
  305. '''
  306. cmd.run should be able to change working directory correctly, whether
  307. or not runas is in use.
  308. '''
  309. with self._ensure_user_exists(self.runas_usr):
  310. user_path = self.run_function('cmd.run_stdout', ['printf %s "$PATH"'], runas=self.runas_usr)
  311. # XXX: Not sure of a better way. Environment starts out with
  312. # /bin:/usr/bin and should be populated by path helper and the bash
  313. # profile.
  314. self.assertNotEqual("/bin:/usr/bin", user_path)
  315. @pytest.mark.destructive_test
  316. @pytest.mark.skip_if_not_root
  317. @skipIf(not salt.utils.platform.is_darwin(), 'applicable to MacOS only')
  318. def test_runas_complex_command_bad_cwd(self):
  319. '''
  320. cmd.run should not accidentally run parts of a complex command when
  321. given a cwd which cannot be used by the user the command is run as.
  322. Due to the need to use `su -l` to login to another user on MacOS, we
  323. cannot cd into directories that the target user themselves does not
  324. have execute permission for. To an extent, this test is testing that
  325. buggy behaviour, but its purpose is to ensure that the greater bug of
  326. running commands after failing to cd does not occur.
  327. '''
  328. tmp_cwd = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  329. os.chmod(tmp_cwd, 0o700)
  330. with self._ensure_user_exists(self.runas_usr):
  331. cmd_result = self.run_function('cmd.run_all', ['pwd; pwd; : $(echo "You have failed the test" >&2)'], cwd=tmp_cwd, runas=self.runas_usr)
  332. self.assertEqual("", cmd_result['stdout'])
  333. self.assertNotIn("You have failed the test", cmd_result['stderr'])
  334. self.assertNotEqual(0, cmd_result['retcode'])
  335. @skipIf(salt.utils.platform.is_windows(), 'minion is windows')
  336. @pytest.mark.skip_if_not_root
  337. @pytest.mark.destructive_test
  338. def test_runas(self):
  339. '''
  340. Ensure that the env is the runas user's
  341. '''
  342. with self._ensure_user_exists(self.runas_usr):
  343. out = self.run_function('cmd.run', ['env'], runas=self.runas_usr).splitlines()
  344. self.assertIn('USER={0}'.format(self.runas_usr), out)
  345. @skipIf(not salt.utils.path.which_bin('sleep'), 'sleep cmd not installed')
  346. def test_timeout(self):
  347. '''
  348. cmd.run trigger timeout
  349. '''
  350. out = self.run_function('cmd.run',
  351. ['sleep 2 && echo hello'],
  352. f_timeout=1,
  353. python_shell=True)
  354. self.assertTrue('Timed out' in out)
  355. @skipIf(not salt.utils.path.which_bin('sleep'), 'sleep cmd not installed')
  356. def test_timeout_success(self):
  357. '''
  358. cmd.run sufficient timeout to succeed
  359. '''
  360. out = self.run_function('cmd.run',
  361. ['sleep 1 && echo hello'],
  362. f_timeout=2,
  363. python_shell=True)
  364. self.assertEqual(out, 'hello')
  365. def test_hide_output(self):
  366. '''
  367. Test the hide_output argument
  368. '''
  369. ls_command = ['ls', '/'] \
  370. if not salt.utils.platform.is_windows() \
  371. else ['dir', 'c:\\']
  372. error_command = ['thiscommanddoesnotexist']
  373. # cmd.run
  374. out = self.run_function(
  375. 'cmd.run',
  376. ls_command,
  377. hide_output=True)
  378. self.assertEqual(out, '')
  379. # cmd.shell
  380. out = self.run_function(
  381. 'cmd.shell',
  382. ls_command,
  383. hide_output=True)
  384. self.assertEqual(out, '')
  385. # cmd.run_stdout
  386. out = self.run_function(
  387. 'cmd.run_stdout',
  388. ls_command,
  389. hide_output=True)
  390. self.assertEqual(out, '')
  391. # cmd.run_stderr
  392. out = self.run_function(
  393. 'cmd.shell',
  394. error_command,
  395. hide_output=True)
  396. self.assertEqual(out, '')
  397. # cmd.run_all (command should have produced stdout)
  398. out = self.run_function(
  399. 'cmd.run_all',
  400. ls_command,
  401. hide_output=True)
  402. self.assertEqual(out['stdout'], '')
  403. self.assertEqual(out['stderr'], '')
  404. # cmd.run_all (command should have produced stderr)
  405. out = self.run_function(
  406. 'cmd.run_all',
  407. error_command,
  408. hide_output=True)
  409. self.assertEqual(out['stdout'], '')
  410. self.assertEqual(out['stderr'], '')
  411. def test_cmd_run_whoami(self):
  412. '''
  413. test return of whoami
  414. '''
  415. cmd = self.run_function('cmd.run', ['whoami'])
  416. if salt.utils.platform.is_windows():
  417. self.assertIn('administrator', cmd)
  418. else:
  419. self.assertEqual('root', cmd)
  420. @skipIf(not salt.utils.platform.is_windows(), 'minion is not windows')
  421. def test_windows_env_handling(self):
  422. '''
  423. Ensure that nt.environ is used properly with cmd.run*
  424. '''
  425. out = self.run_function('cmd.run', ['set'], env={"abc": "123", "ABC": "456"}).splitlines()
  426. self.assertIn('abc=123', out)
  427. self.assertIn('ABC=456', out)