test_ssh.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. # -*- coding: utf-8 -*-
  2. '''
  3. Test the ssh module
  4. '''
  5. # Import python libs
  6. from __future__ import absolute_import, unicode_literals, print_function
  7. import os
  8. import shutil
  9. # Import Salt Testing libs
  10. from tests.support.runtests import RUNTIME_VARS
  11. from tests.support.case import ModuleCase
  12. # Import salt libs
  13. import salt.utils.files
  14. import salt.utils.platform
  15. # Import 3rd-party libs
  16. import pytest
  17. from tornado.httpclient import HTTPClient
  18. GITHUB_FINGERPRINT = '9d:38:5b:83:a9:17:52:92:56:1a:5e:c4:d4:81:8e:0a:ca:51:a2:64:f1:74:20:11:2e:f8:8a:c3:a1:39:49:8f'
  19. def check_status():
  20. '''
  21. Check the status of Github for remote operations
  22. '''
  23. try:
  24. return HTTPClient().fetch('http://github.com').code == 200
  25. except Exception: # pylint: disable=broad-except
  26. return False
  27. @pytest.mark.skip_if_binaries_missing(['ssh', 'ssh-keygen'], check_all=True)
  28. @pytest.mark.windows_whitelisted
  29. class SSHModuleTest(ModuleCase):
  30. '''
  31. Test the ssh module
  32. '''
  33. @classmethod
  34. def setUpClass(cls):
  35. cls.subsalt_dir = os.path.join(RUNTIME_VARS.TMP, 'subsalt')
  36. cls.authorized_keys = os.path.join(cls.subsalt_dir, 'authorized_keys')
  37. cls.known_hosts = os.path.join(cls.subsalt_dir, 'known_hosts')
  38. def setUp(self):
  39. '''
  40. Set up the ssh module tests
  41. '''
  42. if not check_status():
  43. self.skipTest('External source, github.com is down')
  44. super(SSHModuleTest, self).setUp()
  45. if not os.path.isdir(self.subsalt_dir):
  46. os.makedirs(self.subsalt_dir)
  47. ssh_raw_path = os.path.join(RUNTIME_VARS.FILES, 'ssh', 'raw')
  48. with salt.utils.files.fopen(ssh_raw_path) as fd:
  49. self.key = fd.read().strip()
  50. def tearDown(self):
  51. '''
  52. Tear down the ssh module tests
  53. '''
  54. if os.path.isdir(self.subsalt_dir):
  55. shutil.rmtree(self.subsalt_dir)
  56. super(SSHModuleTest, self).tearDown()
  57. del self.key
  58. def test_auth_keys(self):
  59. '''
  60. test ssh.auth_keys
  61. '''
  62. shutil.copyfile(
  63. os.path.join(RUNTIME_VARS.FILES, 'ssh', 'authorized_keys'),
  64. self.authorized_keys)
  65. user = 'root'
  66. if salt.utils.platform.is_windows():
  67. user = 'Administrator'
  68. ret = self.run_function('ssh.auth_keys', [user, self.authorized_keys])
  69. self.assertEqual(len(list(ret.items())), 1) # exactly one key is found
  70. key_data = list(ret.items())[0][1]
  71. try:
  72. self.assertEqual(key_data['comment'], 'github.com')
  73. self.assertEqual(key_data['enc'], 'ssh-rsa')
  74. self.assertEqual(
  75. key_data['options'], ['command="/usr/local/lib/ssh-helper"']
  76. )
  77. self.assertEqual(key_data['fingerprint'], GITHUB_FINGERPRINT)
  78. except AssertionError as exc:
  79. raise AssertionError(
  80. 'AssertionError: {0}. Function returned: {1}'.format(
  81. exc, ret
  82. )
  83. )
  84. def test_bad_enctype(self):
  85. '''
  86. test to make sure that bad key encoding types don't generate an
  87. invalid key entry in authorized_keys
  88. '''
  89. shutil.copyfile(
  90. os.path.join(RUNTIME_VARS.FILES, 'ssh', 'authorized_badkeys'),
  91. self.authorized_keys)
  92. ret = self.run_function('ssh.auth_keys', ['root', self.authorized_keys])
  93. # The authorized_badkeys file contains a key with an invalid ssh key
  94. # encoding (dsa-sha2-nistp256 instead of ecdsa-sha2-nistp256)
  95. # auth_keys should skip any keys with invalid encodings. Internally
  96. # the minion will throw a CommandExecutionError so the
  97. # user will get an indicator of what went wrong.
  98. self.assertEqual(len(list(ret.items())), 0) # Zero keys found
  99. def test_get_known_host_entries(self):
  100. '''
  101. Check that known host information is returned from ~/.ssh/config
  102. '''
  103. shutil.copyfile(
  104. os.path.join(RUNTIME_VARS.FILES, 'ssh', 'known_hosts'),
  105. self.known_hosts)
  106. arg = ['root', 'github.com']
  107. kwargs = {'config': self.known_hosts}
  108. ret = self.run_function('ssh.get_known_host_entries', arg, **kwargs)[0]
  109. try:
  110. self.assertEqual(ret['enc'], 'ssh-rsa')
  111. self.assertEqual(ret['key'], self.key)
  112. self.assertEqual(ret['fingerprint'], GITHUB_FINGERPRINT)
  113. except AssertionError as exc:
  114. raise AssertionError(
  115. 'AssertionError: {0}. Function returned: {1}'.format(
  116. exc, ret
  117. )
  118. )
  119. def test_recv_known_host_entries(self):
  120. '''
  121. Check that known host information is returned from remote host
  122. '''
  123. ret = self.run_function('ssh.recv_known_host_entries', ['github.com'])
  124. try:
  125. self.assertNotEqual(ret, None)
  126. self.assertEqual(ret[0]['enc'], 'ssh-rsa')
  127. self.assertEqual(ret[0]['key'], self.key)
  128. self.assertEqual(ret[0]['fingerprint'], GITHUB_FINGERPRINT)
  129. except AssertionError as exc:
  130. raise AssertionError(
  131. 'AssertionError: {0}. Function returned: {1}'.format(
  132. exc, ret
  133. )
  134. )
  135. def test_check_known_host_add(self):
  136. '''
  137. Check known hosts by its fingerprint. File needs to be updated
  138. '''
  139. arg = ['root', 'github.com']
  140. kwargs = {'fingerprint': GITHUB_FINGERPRINT, 'config': self.known_hosts}
  141. ret = self.run_function('ssh.check_known_host', arg, **kwargs)
  142. self.assertEqual(ret, 'add')
  143. def test_check_known_host_update(self):
  144. '''
  145. ssh.check_known_host update verification
  146. '''
  147. shutil.copyfile(
  148. os.path.join(RUNTIME_VARS.FILES, 'ssh', 'known_hosts'),
  149. self.known_hosts)
  150. arg = ['root', 'github.com']
  151. kwargs = {'config': self.known_hosts}
  152. # wrong fingerprint
  153. ret = self.run_function('ssh.check_known_host', arg,
  154. **dict(kwargs, fingerprint='aa:bb:cc:dd'))
  155. self.assertEqual(ret, 'update')
  156. # wrong keyfile
  157. ret = self.run_function('ssh.check_known_host', arg,
  158. **dict(kwargs, key='YQ=='))
  159. self.assertEqual(ret, 'update')
  160. def test_check_known_host_exists(self):
  161. '''
  162. Verify check_known_host_exists
  163. '''
  164. shutil.copyfile(
  165. os.path.join(RUNTIME_VARS.FILES, 'ssh', 'known_hosts'),
  166. self.known_hosts)
  167. arg = ['root', 'github.com']
  168. kwargs = {'config': self.known_hosts}
  169. # wrong fingerprint
  170. ret = self.run_function('ssh.check_known_host', arg,
  171. **dict(kwargs, fingerprint=GITHUB_FINGERPRINT))
  172. self.assertEqual(ret, 'exists')
  173. # wrong keyfile
  174. ret = self.run_function('ssh.check_known_host', arg,
  175. **dict(kwargs, key=self.key))
  176. self.assertEqual(ret, 'exists')
  177. def test_rm_known_host(self):
  178. '''
  179. ssh.rm_known_host
  180. '''
  181. shutil.copyfile(
  182. os.path.join(RUNTIME_VARS.FILES, 'ssh', 'known_hosts'),
  183. self.known_hosts)
  184. arg = ['root', 'github.com']
  185. kwargs = {'config': self.known_hosts, 'key': self.key}
  186. # before removal
  187. ret = self.run_function('ssh.check_known_host', arg, **kwargs)
  188. self.assertEqual(ret, 'exists')
  189. # remove
  190. self.run_function('ssh.rm_known_host', arg, config=self.known_hosts)
  191. # after removal
  192. ret = self.run_function('ssh.check_known_host', arg, **kwargs)
  193. self.assertEqual(ret, 'add')
  194. def test_set_known_host(self):
  195. '''
  196. ssh.set_known_host
  197. '''
  198. # add item
  199. ret = self.run_function('ssh.set_known_host', ['root', 'github.com'],
  200. config=self.known_hosts)
  201. try:
  202. self.assertEqual(ret['status'], 'updated')
  203. self.assertEqual(ret['old'], None)
  204. self.assertEqual(ret['new'][0]['fingerprint'], GITHUB_FINGERPRINT)
  205. except AssertionError as exc:
  206. raise AssertionError(
  207. 'AssertionError: {0}. Function returned: {1}'.format(
  208. exc, ret
  209. )
  210. )
  211. # check that item does exist
  212. ret = self.run_function('ssh.get_known_host_entries', ['root', 'github.com'],
  213. config=self.known_hosts)[0]
  214. try:
  215. self.assertEqual(ret['fingerprint'], GITHUB_FINGERPRINT)
  216. except AssertionError as exc:
  217. raise AssertionError(
  218. 'AssertionError: {0}. Function returned: {1}'.format(
  219. exc, ret
  220. )
  221. )
  222. # add the same item once again
  223. ret = self.run_function('ssh.set_known_host', ['root', 'github.com'],
  224. config=self.known_hosts)
  225. try:
  226. self.assertEqual(ret['status'], 'exists')
  227. except AssertionError as exc:
  228. raise AssertionError(
  229. 'AssertionError: {0}. Function returned: {1}'.format(
  230. exc, ret
  231. )
  232. )