test_key.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. # -*- coding: utf-8 -*-
  2. # Import Python libs
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. import os
  5. import shutil
  6. import tempfile
  7. import textwrap
  8. # Import Salt Testing libs
  9. from tests.support.runtime import RUNTIME_VARS
  10. from tests.support.case import ShellCase
  11. from tests.support.mixins import ShellCaseCommonTestsMixin
  12. # Import 3rd-party libs
  13. import pytest
  14. from salt.ext import six
  15. # Import Salt libs
  16. import salt.utils.files
  17. import salt.utils.platform
  18. import salt.utils.yaml
  19. USERA = 'saltdev'
  20. USERA_PWD = 'saltdev'
  21. HASHED_USERA_PWD = '$6$SALTsalt$ZZFD90fKFWq8AGmmX0L3uBtS9fXL62SrTk5zcnQ6EkD6zoiM3kB88G1Zvs0xm/gZ7WXJRs5nsTBybUvGSqZkT.'
  22. class KeyTest(ShellCase, ShellCaseCommonTestsMixin):
  23. '''
  24. Test salt-key script
  25. '''
  26. _call_binary_ = 'salt-key'
  27. def _add_user(self):
  28. '''
  29. helper method to add user
  30. '''
  31. try:
  32. add_user = self.run_call('user.add {0} createhome=False'.format(USERA))
  33. add_pwd = self.run_call('shadow.set_password {0} \'{1}\''.format(USERA,
  34. USERA_PWD if salt.utils.platform.is_darwin() else HASHED_USERA_PWD))
  35. self.assertTrue(add_user)
  36. self.assertTrue(add_pwd)
  37. user_list = self.run_call('user.list_users')
  38. self.assertIn(USERA, six.text_type(user_list))
  39. except AssertionError:
  40. self.run_call('user.delete {0} remove=True'.format(USERA))
  41. self.skipTest(
  42. 'Could not add user or password, skipping test'
  43. )
  44. def _remove_user(self):
  45. '''
  46. helper method to remove user
  47. '''
  48. user_list = self.run_call('user.list_users')
  49. for user in user_list:
  50. if USERA in user:
  51. self.run_call('user.delete {0} remove=True'.format(USERA))
  52. def test_remove_key(self):
  53. '''
  54. test salt-key -d usage
  55. '''
  56. min_name = 'minibar'
  57. pki_dir = self.master_opts['pki_dir']
  58. key = os.path.join(pki_dir, 'minions', min_name)
  59. with salt.utils.files.fopen(key, 'w') as fp:
  60. fp.write(textwrap.dedent('''\
  61. -----BEGIN PUBLIC KEY-----
  62. MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoqIZDtcQtqUNs0wC7qQz
  63. JwFhXAVNT5C8M8zhI+pFtF/63KoN5k1WwAqP2j3LquTG68WpxcBwLtKfd7FVA/Kr
  64. OF3kXDWFnDi+HDchW2lJObgfzLckWNRFaF8SBvFM2dys3CGSgCV0S/qxnRAjrJQb
  65. B3uQwtZ64ncJAlkYpArv3GwsfRJ5UUQnYPDEJwGzMskZ0pHd60WwM1gMlfYmNX5O
  66. RBEjybyNpYDzpda6e6Ypsn6ePGLkP/tuwUf+q9wpbRE3ZwqERC2XRPux+HX2rGP+
  67. mkzpmuHkyi2wV33A9pDfMgRHdln2CLX0KgfRGixUQhW1o+Kmfv2rq4sGwpCgLbTh
  68. NwIDAQAB
  69. -----END PUBLIC KEY-----
  70. '''))
  71. check_key = self.run_key('-p {0}'.format(min_name))
  72. self.assertIn('Accepted Keys:', check_key)
  73. self.assertIn('minibar: -----BEGIN PUBLIC KEY-----', check_key)
  74. remove_key = self.run_key('-d {0} -y'.format(min_name))
  75. check_key = self.run_key('-p {0}'.format(min_name))
  76. self.assertEqual([], check_key)
  77. def test_list_accepted_args(self):
  78. '''
  79. test salt-key -l for accepted arguments
  80. '''
  81. for key in ('acc', 'pre', 'den', 'un', 'rej'):
  82. # These should not trigger any error
  83. data = self.run_key('-l {0}'.format(key), catch_stderr=True)
  84. self.assertNotIn('error:', '\n'.join(data[1]))
  85. data = self.run_key('-l foo-{0}'.format(key), catch_stderr=True)
  86. self.assertIn('error:', '\n'.join(data[1]))
  87. def test_list_all(self):
  88. '''
  89. test salt-key -L
  90. '''
  91. data = self.run_key('-L')
  92. expect = None
  93. if self.master_opts['transport'] in ('zeromq', 'tcp'):
  94. expect = [
  95. 'Accepted Keys:',
  96. 'minion',
  97. 'sub_minion',
  98. 'Denied Keys:',
  99. 'Unaccepted Keys:',
  100. 'Rejected Keys:'
  101. ]
  102. self.assertEqual(data, expect)
  103. def test_list_json_out(self):
  104. '''
  105. test salt-key -L --json-out
  106. '''
  107. data = self.run_key('-L --out json')
  108. ret = {}
  109. try:
  110. import salt.utils.json
  111. ret = salt.utils.json.loads('\n'.join(data))
  112. except ValueError:
  113. pass
  114. expect = None
  115. if self.master_opts['transport'] in ('zeromq', 'tcp'):
  116. expect = {'minions_rejected': [],
  117. 'minions_denied': [],
  118. 'minions_pre': [],
  119. 'minions': ['minion', 'sub_minion']}
  120. self.assertEqual(ret, expect)
  121. def test_list_yaml_out(self):
  122. '''
  123. test salt-key -L --yaml-out
  124. '''
  125. data = self.run_key('-L --out yaml')
  126. ret = {}
  127. try:
  128. import salt.utils.yaml
  129. ret = salt.utils.yaml.safe_load('\n'.join(data))
  130. except Exception:
  131. pass
  132. expect = []
  133. if self.master_opts['transport'] in ('zeromq', 'tcp'):
  134. expect = {'minions_rejected': [],
  135. 'minions_denied': [],
  136. 'minions_pre': [],
  137. 'minions': ['minion', 'sub_minion']}
  138. self.assertEqual(ret, expect)
  139. def test_list_raw_out(self):
  140. '''
  141. test salt-key -L --raw-out
  142. '''
  143. data = self.run_key('-L --out raw')
  144. self.assertEqual(len(data), 1)
  145. ret = {}
  146. try:
  147. import ast
  148. ret = ast.literal_eval(data[0])
  149. except ValueError:
  150. pass
  151. expect = None
  152. if self.master_opts['transport'] in ('zeromq', 'tcp'):
  153. expect = {'minions_rejected': [],
  154. 'minions_denied': [],
  155. 'minions_pre': [],
  156. 'minions': ['minion', 'sub_minion']}
  157. self.assertEqual(ret, expect)
  158. def test_list_acc(self):
  159. '''
  160. test salt-key -l
  161. '''
  162. data = self.run_key('-l acc')
  163. expect = ['Accepted Keys:', 'minion', 'sub_minion']
  164. self.assertEqual(data, expect)
  165. @pytest.mark.skip_if_not_root
  166. @pytest.mark.destructive_test
  167. def test_list_acc_eauth(self):
  168. '''
  169. test salt-key -l with eauth
  170. '''
  171. self._add_user()
  172. data = self.run_key('-l acc --eauth pam --username {0} --password {1}'.format(USERA, USERA_PWD))
  173. expect = ['Accepted Keys:', 'minion', 'sub_minion']
  174. self.assertEqual(data, expect)
  175. self._remove_user()
  176. @pytest.mark.skip_if_not_root
  177. @pytest.mark.destructive_test
  178. def test_list_acc_eauth_bad_creds(self):
  179. '''
  180. test salt-key -l with eauth and bad creds
  181. '''
  182. self._add_user()
  183. data = self.run_key('-l acc --eauth pam --username {0} --password wrongpassword'.format(USERA))
  184. expect = ['Authentication failure of type "eauth" occurred for user {0}.'.format(USERA)]
  185. self.assertEqual(data, expect)
  186. self._remove_user()
  187. def test_list_acc_wrong_eauth(self):
  188. '''
  189. test salt-key -l with wrong eauth
  190. '''
  191. data = self.run_key('-l acc --eauth wrongeauth --username {0} --password {1}'.format(USERA, USERA_PWD))
  192. expect = r"^The specified external authentication system \"wrongeauth\" is not available\tAvailable eauth types: auto, .*"
  193. self.assertRegex("\t".join(data), expect)
  194. def test_list_un(self):
  195. '''
  196. test salt-key -l
  197. '''
  198. data = self.run_key('-l un')
  199. expect = ['Unaccepted Keys:']
  200. self.assertEqual(data, expect)
  201. def test_keys_generation(self):
  202. tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  203. arg_str = '--gen-keys minibar --gen-keys-dir {0}'.format(tempdir)
  204. self.run_key(arg_str)
  205. try:
  206. key_names = None
  207. if self.master_opts['transport'] in ('zeromq', 'tcp'):
  208. key_names = ('minibar.pub', 'minibar.pem')
  209. for fname in key_names:
  210. self.assertTrue(os.path.isfile(os.path.join(tempdir, fname)))
  211. finally:
  212. for dirname, dirs, files in os.walk(tempdir):
  213. for filename in files:
  214. os.chmod(os.path.join(dirname, filename), 0o700)
  215. shutil.rmtree(tempdir)
  216. def test_keys_generation_keysize_minmax(self):
  217. tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  218. arg_str = '--gen-keys minion --gen-keys-dir {0}'.format(tempdir)
  219. try:
  220. data, error = self.run_key(
  221. arg_str + ' --keysize=1024', catch_stderr=True
  222. )
  223. self.assertIn(
  224. 'error: The minimum value for keysize is 2048', '\n'.join(error)
  225. )
  226. data, error = self.run_key(
  227. arg_str + ' --keysize=32769', catch_stderr=True
  228. )
  229. self.assertIn(
  230. 'error: The maximum value for keysize is 32768',
  231. '\n'.join(error)
  232. )
  233. finally:
  234. shutil.rmtree(tempdir)