test_key.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import os
  4. import shutil
  5. import tempfile
  6. import textwrap
  7. import pytest
  8. import salt.utils.files
  9. import salt.utils.platform
  10. import salt.utils.yaml
  11. from tests.support.case import ShellCase
  12. from tests.support.helpers import (
  13. destructiveTest,
  14. skip_if_not_root,
  15. slowTest,
  16. with_system_user,
  17. )
  18. from tests.support.mixins import ShellCaseCommonTestsMixin
  19. from tests.support.runtests import RUNTIME_VARS
  20. from tests.support.unit import skipIf
  21. USERA = "saltdev"
  22. USERA_PWD = "saltdev"
  23. PUB_KEY = textwrap.dedent(
  24. """\
  25. -----BEGIN PUBLIC KEY-----
  26. MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoqIZDtcQtqUNs0wC7qQz
  27. JwFhXAVNT5C8M8zhI+pFtF/63KoN5k1WwAqP2j3LquTG68WpxcBwLtKfd7FVA/Kr
  28. OF3kXDWFnDi+HDchW2lJObgfzLckWNRFaF8SBvFM2dys3CGSgCV0S/qxnRAjrJQb
  29. B3uQwtZ64ncJAlkYpArv3GwsfRJ5UUQnYPDEJwGzMskZ0pHd60WwM1gMlfYmNX5O
  30. RBEjybyNpYDzpda6e6Ypsn6ePGLkP/tuwUf+q9wpbRE3ZwqERC2XRPux+HX2rGP+
  31. mkzpmuHkyi2wV33A9pDfMgRHdln2CLX0KgfRGixUQhW1o+Kmfv2rq4sGwpCgLbTh
  32. NwIDAQAB
  33. -----END PUBLIC KEY-----
  34. """
  35. )
  36. MIN_NAME = "minibar"
  37. @pytest.mark.windows_whitelisted
  38. @pytest.mark.usefixtures("salt_sub_minion")
  39. class KeyTest(ShellCase, ShellCaseCommonTestsMixin):
  40. """
  41. Test salt-key script
  42. """
  43. _call_binary_ = "salt-key"
  44. @slowTest
  45. def test_remove_key(self):
  46. """
  47. test salt-key -d usage
  48. """
  49. pki_dir = self.master_opts["pki_dir"]
  50. key = os.path.join(pki_dir, "minions", MIN_NAME)
  51. with salt.utils.files.fopen(key, "w") as fp:
  52. fp.write(PUB_KEY)
  53. check_key = self.run_key("-p {0}".format(MIN_NAME))
  54. self.assertIn("Accepted Keys:", check_key)
  55. self.assertIn("{0}: -----BEGIN PUBLIC KEY-----".format(MIN_NAME), check_key)
  56. remove_key = self.run_key("-d {0} -y".format(MIN_NAME))
  57. check_key = self.run_key("-p {0}".format(MIN_NAME))
  58. self.assertEqual([], check_key)
  59. @skip_if_not_root
  60. @destructiveTest
  61. @skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
  62. @with_system_user(USERA, password=USERA_PWD)
  63. @slowTest
  64. def test_remove_key_eauth(self, username):
  65. """
  66. test salt-key -d usage
  67. """
  68. pki_dir = self.master_opts["pki_dir"]
  69. key = os.path.join(pki_dir, "minions", MIN_NAME)
  70. with salt.utils.files.fopen(key, "w") as fp:
  71. fp.write(PUB_KEY)
  72. check_key = self.run_key("-p {0}".format(MIN_NAME))
  73. self.assertIn("Accepted Keys:", check_key)
  74. self.assertIn("{0}: -----BEGIN PUBLIC KEY-----".format(MIN_NAME), check_key)
  75. remove_key = self.run_key(
  76. "-d {0} -y --eauth pam --username {1} --password {2}".format(
  77. MIN_NAME, username, USERA_PWD
  78. )
  79. )
  80. check_key = self.run_key("-p {0}".format(MIN_NAME))
  81. self.assertEqual([], check_key)
  82. @slowTest
  83. def test_list_accepted_args(self):
  84. """
  85. test salt-key -l for accepted arguments
  86. """
  87. for key in ("acc", "pre", "den", "un", "rej"):
  88. # These should not trigger any error
  89. data = self.run_key("-l {0}".format(key), catch_stderr=True)
  90. self.assertNotIn("error:", "\n".join(data[1]))
  91. data = self.run_key("-l foo-{0}".format(key), catch_stderr=True)
  92. self.assertIn("error:", "\n".join(data[1]))
  93. @slowTest
  94. def test_list_all(self):
  95. """
  96. test salt-key -L
  97. """
  98. data = self.run_key("-L")
  99. expect = None
  100. if self.master_opts["transport"] in ("zeromq", "tcp"):
  101. expect = [
  102. "Accepted Keys:",
  103. "minion",
  104. "sub_minion",
  105. "Denied Keys:",
  106. "Unaccepted Keys:",
  107. "Rejected Keys:",
  108. ]
  109. self.assertEqual(data, expect)
  110. @slowTest
  111. def test_list_json_out(self):
  112. """
  113. test salt-key -L --json-out
  114. """
  115. data = self.run_key("-L --out json")
  116. ret = {}
  117. try:
  118. import salt.utils.json
  119. ret = salt.utils.json.loads("\n".join(data))
  120. except ValueError:
  121. pass
  122. expect = None
  123. if self.master_opts["transport"] in ("zeromq", "tcp"):
  124. expect = {
  125. "minions_rejected": [],
  126. "minions_denied": [],
  127. "minions_pre": [],
  128. "minions": ["minion", "sub_minion"],
  129. }
  130. self.assertEqual(ret, expect)
  131. @slowTest
  132. def test_list_yaml_out(self):
  133. """
  134. test salt-key -L --yaml-out
  135. """
  136. data = self.run_key("-L --out yaml")
  137. ret = {}
  138. try:
  139. import salt.utils.yaml
  140. ret = salt.utils.yaml.safe_load("\n".join(data))
  141. except Exception: # pylint: disable=broad-except
  142. pass
  143. expect = []
  144. if self.master_opts["transport"] in ("zeromq", "tcp"):
  145. expect = {
  146. "minions_rejected": [],
  147. "minions_denied": [],
  148. "minions_pre": [],
  149. "minions": ["minion", "sub_minion"],
  150. }
  151. self.assertEqual(ret, expect)
  152. @slowTest
  153. def test_list_raw_out(self):
  154. """
  155. test salt-key -L --raw-out
  156. """
  157. data = self.run_key("-L --out raw")
  158. self.assertEqual(len(data), 1)
  159. ret = {}
  160. try:
  161. import ast
  162. ret = ast.literal_eval(data[0])
  163. except ValueError:
  164. pass
  165. expect = None
  166. if self.master_opts["transport"] in ("zeromq", "tcp"):
  167. expect = {
  168. "minions_rejected": [],
  169. "minions_denied": [],
  170. "minions_pre": [],
  171. "minions": ["minion", "sub_minion"],
  172. }
  173. self.assertEqual(ret, expect)
  174. @slowTest
  175. def test_list_acc(self):
  176. """
  177. test salt-key -l
  178. """
  179. data = self.run_key("-l acc")
  180. expect = ["Accepted Keys:", "minion", "sub_minion"]
  181. self.assertEqual(data, expect)
  182. @skip_if_not_root
  183. @destructiveTest
  184. @skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
  185. @with_system_user(USERA, password=USERA_PWD)
  186. @slowTest
  187. def test_list_acc_eauth(self, username):
  188. """
  189. test salt-key -l with eauth
  190. """
  191. data = self.run_key(
  192. "-l acc --eauth pam --username {0} --password {1}".format(
  193. username, USERA_PWD
  194. )
  195. )
  196. expect = ["Accepted Keys:", "minion", "sub_minion"]
  197. self.assertEqual(data, expect)
  198. @skip_if_not_root
  199. @destructiveTest
  200. @skipIf(salt.utils.platform.is_windows(), "PAM eauth not available on Windows")
  201. @with_system_user(USERA, password=USERA_PWD)
  202. @skipIf(True, "SLOWTEST skip")
  203. @slowTest
  204. def test_list_acc_eauth_bad_creds(self, username):
  205. """
  206. test salt-key -l with eauth and bad creds
  207. """
  208. data = self.run_key(
  209. "-l acc --eauth pam --username {0} --password wrongpassword".format(
  210. username
  211. )
  212. )
  213. expect = [
  214. 'Authentication failure of type "eauth" occurred for user {0}.'.format(
  215. username
  216. )
  217. ]
  218. self.assertEqual(data, expect)
  219. @slowTest
  220. def test_list_acc_wrong_eauth(self):
  221. """
  222. test salt-key -l with wrong eauth
  223. """
  224. data = self.run_key(
  225. "-l acc --eauth wrongeauth --username {0} --password {1}".format(
  226. USERA, USERA_PWD
  227. )
  228. )
  229. expect = r"^The specified external authentication system \"wrongeauth\" is not available\tAvailable eauth types: auto, .*"
  230. self.assertRegex("\t".join(data), expect)
  231. @slowTest
  232. def test_list_un(self):
  233. """
  234. test salt-key -l
  235. """
  236. data = self.run_key("-l un")
  237. expect = ["Unaccepted Keys:"]
  238. self.assertEqual(data, expect)
  239. @slowTest
  240. def test_keys_generation(self):
  241. tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  242. arg_str = "--gen-keys {0} --gen-keys-dir {1}".format(MIN_NAME, tempdir)
  243. self.run_key(arg_str)
  244. try:
  245. key_names = None
  246. if self.master_opts["transport"] in ("zeromq", "tcp"):
  247. key_names = ("{0}.pub".format(MIN_NAME), "{0}.pem".format(MIN_NAME))
  248. for fname in key_names:
  249. self.assertTrue(os.path.isfile(os.path.join(tempdir, fname)))
  250. finally:
  251. for dirname, dirs, files in os.walk(tempdir):
  252. for filename in files:
  253. os.chmod(os.path.join(dirname, filename), 0o700)
  254. shutil.rmtree(tempdir)
  255. @slowTest
  256. def test_keys_generation_keysize_minmax(self):
  257. tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  258. arg_str = "--gen-keys minion --gen-keys-dir {0}".format(tempdir)
  259. try:
  260. data, error = self.run_key(arg_str + " --keysize=1024", catch_stderr=True)
  261. self.assertIn(
  262. "error: The minimum value for keysize is 2048", "\n".join(error)
  263. )
  264. data, error = self.run_key(arg_str + " --keysize=32769", catch_stderr=True)
  265. self.assertIn(
  266. "error: The maximum value for keysize is 32768", "\n".join(error)
  267. )
  268. finally:
  269. shutil.rmtree(tempdir)