1
0

test_key.py 9.0 KB


  1. # -*- coding: utf-8 -*-
  2. # Import Python libs
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. import os
  5. import shutil
  6. import tempfile
  7. import textwrap
  8. # Import Salt Testing libs
  9. from tests.support.runtests import RUNTIME_VARS
  10. from tests.support.case import ShellCase
  11. from tests.support.mixins import ShellCaseCommonTestsMixin
  12. # Import 3rd-party libs
  13. import pytest
  14. from salt.ext import six
  15. # Import Salt libs
  16. import salt.utils.files
  17. import salt.utils.platform
  18. import salt.utils.yaml
  19. USERA = 'saltdev'
  20. USERA_PWD = 'saltdev'
  21. HASHED_USERA_PWD = '$6$SALTsalt$ZZFD90fKFWq8AGmmX0L3uBtS9fXL62SrTk5zcnQ6EkD6zoiM3kB88G1Zvs0xm/gZ7WXJRs5nsTBybUvGSqZkT.'
  22. @pytest.mark.windows_whitelisted
  23. class KeyTest(ShellCase, ShellCaseCommonTestsMixin):
  24. '''
  25. Test salt-key script
  26. '''
  27. _call_binary_ = 'salt-key'
  28. def _add_user(self):
  29. '''
  30. helper method to add user
  31. '''
  32. try:
  33. add_user = self.run_call('user.add {0} createhome=False'.format(USERA))
  34. add_pwd = self.run_call('shadow.set_password {0} \'{1}\''.format(USERA,
  35. USERA_PWD if salt.utils.platform.is_darwin() else HASHED_USERA_PWD))
  36. self.assertTrue(add_user)
  37. self.assertTrue(add_pwd)
  38. user_list = self.run_call('user.list_users')
  39. self.assertIn(USERA, six.text_type(user_list))
  40. except AssertionError:
  41. self.run_call('user.delete {0} remove=True'.format(USERA))
  42. self.skipTest(
  43. 'Could not add user or password, skipping test'
  44. )
  45. def _remove_user(self):
  46. '''
  47. helper method to remove user
  48. '''
  49. user_list = self.run_call('user.list_users')
  50. for user in user_list:
  51. if USERA in user:
  52. self.run_call('user.delete {0} remove=True'.format(USERA))
  53. def test_remove_key(self):
  54. '''
  55. test salt-key -d usage
  56. '''
  57. min_name = 'minibar'
  58. pki_dir = self.master_opts['pki_dir']
  59. key = os.path.join(pki_dir, 'minions', min_name)
  60. with salt.utils.files.fopen(key, 'w') as fp:
  61. fp.write(textwrap.dedent('''\
  62. -----BEGIN PUBLIC KEY-----
  63. MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoqIZDtcQtqUNs0wC7qQz
  64. JwFhXAVNT5C8M8zhI+pFtF/63KoN5k1WwAqP2j3LquTG68WpxcBwLtKfd7FVA/Kr
  65. OF3kXDWFnDi+HDchW2lJObgfzLckWNRFaF8SBvFM2dys3CGSgCV0S/qxnRAjrJQb
  66. B3uQwtZ64ncJAlkYpArv3GwsfRJ5UUQnYPDEJwGzMskZ0pHd60WwM1gMlfYmNX5O
  67. RBEjybyNpYDzpda6e6Ypsn6ePGLkP/tuwUf+q9wpbRE3ZwqERC2XRPux+HX2rGP+
  68. mkzpmuHkyi2wV33A9pDfMgRHdln2CLX0KgfRGixUQhW1o+Kmfv2rq4sGwpCgLbTh
  69. NwIDAQAB
  70. -----END PUBLIC KEY-----
  71. '''))
  72. check_key = self.run_key('-p {0}'.format(min_name))
  73. self.assertIn('Accepted Keys:', check_key)
  74. self.assertIn('minibar: -----BEGIN PUBLIC KEY-----', check_key)
  75. remove_key = self.run_key('-d {0} -y'.format(min_name))
  76. check_key = self.run_key('-p {0}'.format(min_name))
  77. self.assertEqual([], check_key)
  78. def test_list_accepted_args(self):
  79. '''
  80. test salt-key -l for accepted arguments
  81. '''
  82. for key in ('acc', 'pre', 'den', 'un', 'rej'):
  83. # These should not trigger any error
  84. data = self.run_key('-l {0}'.format(key), catch_stderr=True)
  85. self.assertNotIn('error:', '\n'.join(data[1]))
  86. data = self.run_key('-l foo-{0}'.format(key), catch_stderr=True)
  87. self.assertIn('error:', '\n'.join(data[1]))
  88. def test_list_all(self):
  89. '''
  90. test salt-key -L
  91. '''
  92. data = self.run_key('-L')
  93. expect = None
  94. if self.master_opts['transport'] in ('zeromq', 'tcp'):
  95. expect = [
  96. 'Accepted Keys:',
  97. 'minion',
  98. 'sub_minion',
  99. 'Denied Keys:',
  100. 'Unaccepted Keys:',
  101. 'Rejected Keys:'
  102. ]
  103. self.assertEqual(data, expect)
  104. def test_list_json_out(self):
  105. '''
  106. test salt-key -L --json-out
  107. '''
  108. data = self.run_key('-L --out json')
  109. ret = {}
  110. try:
  111. import salt.utils.json
  112. ret = salt.utils.json.loads('\n'.join(data))
  113. except ValueError:
  114. pass
  115. expect = None
  116. if self.master_opts['transport'] in ('zeromq', 'tcp'):
  117. expect = {'minions_rejected': [],
  118. 'minions_denied': [],
  119. 'minions_pre': [],
  120. 'minions': ['minion', 'sub_minion']}
  121. self.assertEqual(ret, expect)
  122. def test_list_yaml_out(self):
  123. '''
  124. test salt-key -L --yaml-out
  125. '''
  126. data = self.run_key('-L --out yaml')
  127. ret = {}
  128. try:
  129. import salt.utils.yaml
  130. ret = salt.utils.yaml.safe_load('\n'.join(data))
  131. except Exception:
  132. pass
  133. expect = []
  134. if self.master_opts['transport'] in ('zeromq', 'tcp'):
  135. expect = {'minions_rejected': [],
  136. 'minions_denied': [],
  137. 'minions_pre': [],
  138. 'minions': ['minion', 'sub_minion']}
  139. self.assertEqual(ret, expect)
  140. def test_list_raw_out(self):
  141. '''
  142. test salt-key -L --raw-out
  143. '''
  144. data = self.run_key('-L --out raw')
  145. self.assertEqual(len(data), 1)
  146. ret = {}
  147. try:
  148. import ast
  149. ret = ast.literal_eval(data[0])
  150. except ValueError:
  151. pass
  152. expect = None
  153. if self.master_opts['transport'] in ('zeromq', 'tcp'):
  154. expect = {'minions_rejected': [],
  155. 'minions_denied': [],
  156. 'minions_pre': [],
  157. 'minions': ['minion', 'sub_minion']}
  158. self.assertEqual(ret, expect)
  159. def test_list_acc(self):
  160. '''
  161. test salt-key -l
  162. '''
  163. data = self.run_key('-l acc')
  164. expect = ['Accepted Keys:', 'minion', 'sub_minion']
  165. self.assertEqual(data, expect)
  166. @pytest.mark.skip_if_not_root
  167. @pytest.mark.destructive_test
  168. def test_list_acc_eauth(self):
  169. '''
  170. test salt-key -l with eauth
  171. '''
  172. self._add_user()
  173. data = self.run_key('-l acc --eauth pam --username {0} --password {1}'.format(USERA, USERA_PWD))
  174. expect = ['Accepted Keys:', 'minion', 'sub_minion']
  175. self.assertEqual(data, expect)
  176. self._remove_user()
  177. @pytest.mark.skip_if_not_root
  178. @pytest.mark.destructive_test
  179. def test_list_acc_eauth_bad_creds(self):
  180. '''
  181. test salt-key -l with eauth and bad creds
  182. '''
  183. self._add_user()
  184. data = self.run_key('-l acc --eauth pam --username {0} --password wrongpassword'.format(USERA))
  185. expect = ['Authentication failure of type "eauth" occurred for user {0}.'.format(USERA)]
  186. self.assertEqual(data, expect)
  187. self._remove_user()
  188. def test_list_acc_wrong_eauth(self):
  189. '''
  190. test salt-key -l with wrong eauth
  191. '''
  192. data = self.run_key('-l acc --eauth wrongeauth --username {0} --password {1}'.format(USERA, USERA_PWD))
  193. expect = r"^The specified external authentication system \"wrongeauth\" is not available\tAvailable eauth types: auto, .*"
  194. self.assertRegex("\t".join(data), expect)
  195. def test_list_un(self):
  196. '''
  197. test salt-key -l
  198. '''
  199. data = self.run_key('-l un')
  200. expect = ['Unaccepted Keys:']
  201. self.assertEqual(data, expect)
  202. def test_keys_generation(self):
  203. tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  204. arg_str = '--gen-keys minibar --gen-keys-dir {0}'.format(tempdir)
  205. self.run_key(arg_str)
  206. try:
  207. key_names = None
  208. if self.master_opts['transport'] in ('zeromq', 'tcp'):
  209. key_names = ('minibar.pub', 'minibar.pem')
  210. for fname in key_names:
  211. self.assertTrue(os.path.isfile(os.path.join(tempdir, fname)))
  212. finally:
  213. for dirname, dirs, files in os.walk(tempdir):
  214. for filename in files:
  215. os.chmod(os.path.join(dirname, filename), 0o700)
  216. shutil.rmtree(tempdir)
  217. def test_keys_generation_keysize_minmax(self):
  218. tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  219. arg_str = '--gen-keys minion --gen-keys-dir {0}'.format(tempdir)
  220. try:
  221. data, error = self.run_key(
  222. arg_str + ' --keysize=1024', catch_stderr=True
  223. )
  224. self.assertIn(
  225. 'error: The minimum value for keysize is 2048', '\n'.join(error)
  226. )
  227. data, error = self.run_key(
  228. arg_str + ' --keysize=32769', catch_stderr=True
  229. )
  230. self.assertIn(
  231. 'error: The maximum value for keysize is 32768',
  232. '\n'.join(error)
  233. )
  234. finally:
  235. shutil.rmtree(tempdir)