1
0

test_key.py 9.6 KB


  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import os
  4. import shutil
  5. import tempfile
  6. import textwrap
  7. import pytest
  8. import salt.utils.files
  9. import salt.utils.platform
  10. import salt.utils.yaml
  11. from salt.ext import six
  12. from tests.support.case import ShellCase
  13. from tests.support.helpers import destructiveTest, skip_if_not_root
  14. from tests.support.mixins import ShellCaseCommonTestsMixin
  15. from tests.support.runtests import RUNTIME_VARS
  16. from tests.support.unit import skipIf
  17. USERA = "saltdev"
  18. USERA_PWD = "saltdev"
  19. HASHED_USERA_PWD = "$6$SALTsalt$ZZFD90fKFWq8AGmmX0L3uBtS9fXL62SrTk5zcnQ6EkD6zoiM3kB88G1Zvs0xm/gZ7WXJRs5nsTBybUvGSqZkT."
  20. @pytest.mark.windows_whitelisted
  21. class KeyTest(ShellCase, ShellCaseCommonTestsMixin):
  22. """
  23. Test salt-key script
  24. """
  25. _call_binary_ = "salt-key"
  26. def _add_user(self):
  27. """
  28. helper method to add user
  29. """
  30. try:
  31. add_user = self.run_call("user.add {0} createhome=False".format(USERA))
  32. add_pwd = self.run_call(
  33. "shadow.set_password {0} '{1}'".format(
  34. USERA,
  35. USERA_PWD if salt.utils.platform.is_darwin() else HASHED_USERA_PWD,
  36. )
  37. )
  38. self.assertTrue(add_user)
  39. self.assertTrue(add_pwd)
  40. user_list = self.run_call("user.list_users")
  41. self.assertIn(USERA, six.text_type(user_list))
  42. except AssertionError:
  43. self.run_call("user.delete {0} remove=True".format(USERA))
  44. self.skipTest("Could not add user or password, skipping test")
  45. def _remove_user(self):
  46. """
  47. helper method to remove user
  48. """
  49. user_list = self.run_call("user.list_users")
  50. for user in user_list:
  51. if USERA in user:
  52. self.run_call("user.delete {0} remove=True".format(USERA))
  53. @skipIf(True, "SLOWTEST skip")
  54. def test_remove_key(self):
  55. """
  56. test salt-key -d usage
  57. """
  58. min_name = "minibar"
  59. pki_dir = self.master_opts["pki_dir"]
  60. key = os.path.join(pki_dir, "minions", min_name)
  61. with salt.utils.files.fopen(key, "w") as fp:
  62. fp.write(
  63. textwrap.dedent(
  64. """\
  65. -----BEGIN PUBLIC KEY-----
  66. MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoqIZDtcQtqUNs0wC7qQz
  67. JwFhXAVNT5C8M8zhI+pFtF/63KoN5k1WwAqP2j3LquTG68WpxcBwLtKfd7FVA/Kr
  68. OF3kXDWFnDi+HDchW2lJObgfzLckWNRFaF8SBvFM2dys3CGSgCV0S/qxnRAjrJQb
  69. B3uQwtZ64ncJAlkYpArv3GwsfRJ5UUQnYPDEJwGzMskZ0pHd60WwM1gMlfYmNX5O
  70. RBEjybyNpYDzpda6e6Ypsn6ePGLkP/tuwUf+q9wpbRE3ZwqERC2XRPux+HX2rGP+
  71. mkzpmuHkyi2wV33A9pDfMgRHdln2CLX0KgfRGixUQhW1o+Kmfv2rq4sGwpCgLbTh
  72. NwIDAQAB
  73. -----END PUBLIC KEY-----
  74. """
  75. )
  76. )
  77. check_key = self.run_key("-p {0}".format(min_name))
  78. self.assertIn("Accepted Keys:", check_key)
  79. self.assertIn("minibar: -----BEGIN PUBLIC KEY-----", check_key)
  80. remove_key = self.run_key("-d {0} -y".format(min_name))
  81. check_key = self.run_key("-p {0}".format(min_name))
  82. self.assertEqual([], check_key)
  83. @skipIf(True, "SLOWTEST skip")
  84. def test_list_accepted_args(self):
  85. """
  86. test salt-key -l for accepted arguments
  87. """
  88. for key in ("acc", "pre", "den", "un", "rej"):
  89. # These should not trigger any error
  90. data = self.run_key("-l {0}".format(key), catch_stderr=True)
  91. self.assertNotIn("error:", "\n".join(data[1]))
  92. data = self.run_key("-l foo-{0}".format(key), catch_stderr=True)
  93. self.assertIn("error:", "\n".join(data[1]))
  94. @skipIf(True, "SLOWTEST skip")
  95. def test_list_all(self):
  96. """
  97. test salt-key -L
  98. """
  99. data = self.run_key("-L")
  100. expect = None
  101. if self.master_opts["transport"] in ("zeromq", "tcp"):
  102. expect = [
  103. "Accepted Keys:",
  104. "minion",
  105. "sub_minion",
  106. "Denied Keys:",
  107. "Unaccepted Keys:",
  108. "Rejected Keys:",
  109. ]
  110. self.assertEqual(data, expect)
  111. @skipIf(True, "SLOWTEST skip")
  112. def test_list_json_out(self):
  113. """
  114. test salt-key -L --json-out
  115. """
  116. data = self.run_key("-L --out json")
  117. ret = {}
  118. try:
  119. import salt.utils.json
  120. ret = salt.utils.json.loads("\n".join(data))
  121. except ValueError:
  122. pass
  123. expect = None
  124. if self.master_opts["transport"] in ("zeromq", "tcp"):
  125. expect = {
  126. "minions_rejected": [],
  127. "minions_denied": [],
  128. "minions_pre": [],
  129. "minions": ["minion", "sub_minion"],
  130. }
  131. self.assertEqual(ret, expect)
  132. @skipIf(True, "SLOWTEST skip")
  133. def test_list_yaml_out(self):
  134. """
  135. test salt-key -L --yaml-out
  136. """
  137. data = self.run_key("-L --out yaml")
  138. ret = {}
  139. try:
  140. import salt.utils.yaml
  141. ret = salt.utils.yaml.safe_load("\n".join(data))
  142. except Exception: # pylint: disable=broad-except
  143. pass
  144. expect = []
  145. if self.master_opts["transport"] in ("zeromq", "tcp"):
  146. expect = {
  147. "minions_rejected": [],
  148. "minions_denied": [],
  149. "minions_pre": [],
  150. "minions": ["minion", "sub_minion"],
  151. }
  152. self.assertEqual(ret, expect)
  153. @skipIf(True, "SLOWTEST skip")
  154. def test_list_raw_out(self):
  155. """
  156. test salt-key -L --raw-out
  157. """
  158. data = self.run_key("-L --out raw")
  159. self.assertEqual(len(data), 1)
  160. ret = {}
  161. try:
  162. import ast
  163. ret = ast.literal_eval(data[0])
  164. except ValueError:
  165. pass
  166. expect = None
  167. if self.master_opts["transport"] in ("zeromq", "tcp"):
  168. expect = {
  169. "minions_rejected": [],
  170. "minions_denied": [],
  171. "minions_pre": [],
  172. "minions": ["minion", "sub_minion"],
  173. }
  174. self.assertEqual(ret, expect)
  175. @skipIf(True, "SLOWTEST skip")
  176. def test_list_acc(self):
  177. """
  178. test salt-key -l
  179. """
  180. data = self.run_key("-l acc")
  181. expect = ["Accepted Keys:", "minion", "sub_minion"]
  182. self.assertEqual(data, expect)
  183. @skip_if_not_root
  184. @destructiveTest
  185. @skipIf(True, "SLOWTEST skip")
  186. def test_list_acc_eauth(self):
  187. """
  188. test salt-key -l with eauth
  189. """
  190. self._add_user()
  191. data = self.run_key(
  192. "-l acc --eauth pam --username {0} --password {1}".format(USERA, USERA_PWD)
  193. )
  194. expect = ["Accepted Keys:", "minion", "sub_minion"]
  195. self.assertEqual(data, expect)
  196. self._remove_user()
  197. @skip_if_not_root
  198. @destructiveTest
  199. @skipIf(True, "SLOWTEST skip")
  200. def test_list_acc_eauth_bad_creds(self):
  201. """
  202. test salt-key -l with eauth and bad creds
  203. """
  204. self._add_user()
  205. data = self.run_key(
  206. "-l acc --eauth pam --username {0} --password wrongpassword".format(USERA)
  207. )
  208. expect = [
  209. 'Authentication failure of type "eauth" occurred for user {0}.'.format(
  210. USERA
  211. )
  212. ]
  213. self.assertEqual(data, expect)
  214. self._remove_user()
  215. @skipIf(True, "SLOWTEST skip")
  216. def test_list_acc_wrong_eauth(self):
  217. """
  218. test salt-key -l with wrong eauth
  219. """
  220. data = self.run_key(
  221. "-l acc --eauth wrongeauth --username {0} --password {1}".format(
  222. USERA, USERA_PWD
  223. )
  224. )
  225. expect = r"^The specified external authentication system \"wrongeauth\" is not available\tAvailable eauth types: auto, .*"
  226. self.assertRegex("\t".join(data), expect)
  227. @skipIf(True, "SLOWTEST skip")
  228. def test_list_un(self):
  229. """
  230. test salt-key -l
  231. """
  232. data = self.run_key("-l un")
  233. expect = ["Unaccepted Keys:"]
  234. self.assertEqual(data, expect)
  235. @skipIf(True, "SLOWTEST skip")
  236. def test_keys_generation(self):
  237. tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  238. arg_str = "--gen-keys minibar --gen-keys-dir {0}".format(tempdir)
  239. self.run_key(arg_str)
  240. try:
  241. key_names = None
  242. if self.master_opts["transport"] in ("zeromq", "tcp"):
  243. key_names = ("minibar.pub", "minibar.pem")
  244. for fname in key_names:
  245. self.assertTrue(os.path.isfile(os.path.join(tempdir, fname)))
  246. finally:
  247. for dirname, dirs, files in os.walk(tempdir):
  248. for filename in files:
  249. os.chmod(os.path.join(dirname, filename), 0o700)
  250. shutil.rmtree(tempdir)
  251. @skipIf(True, "SLOWTEST skip")
  252. def test_keys_generation_keysize_minmax(self):
  253. tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  254. arg_str = "--gen-keys minion --gen-keys-dir {0}".format(tempdir)
  255. try:
  256. data, error = self.run_key(arg_str + " --keysize=1024", catch_stderr=True)
  257. self.assertIn(
  258. "error: The minimum value for keysize is 2048", "\n".join(error)
  259. )
  260. data, error = self.run_key(arg_str + " --keysize=32769", catch_stderr=True)
  261. self.assertIn(
  262. "error: The maximum value for keysize is 32768", "\n".join(error)
  263. )
  264. finally:
  265. shutil.rmtree(tempdir)