test_ssh.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. # -*- coding: utf-8 -*-
  2. '''
  3. Test the ssh module
  4. '''
  5. # Import python libs
  6. from __future__ import absolute_import, unicode_literals, print_function
  7. import os
  8. import shutil
  9. # Import Salt Testing libs
  10. from tests.support.case import ModuleCase
  11. from tests.support.paths import FILES, TMP
  12. from tests.support.helpers import skip_if_binaries_missing
  13. # Import salt libs
  14. import salt.utils.files
  15. import salt.utils.platform
  16. # Import 3rd-party libs
  17. from tornado.httpclient import HTTPClient
  18. SUBSALT_DIR = os.path.join(TMP, 'subsalt')
  19. AUTHORIZED_KEYS = os.path.join(SUBSALT_DIR, 'authorized_keys')
  20. KNOWN_HOSTS = os.path.join(SUBSALT_DIR, 'known_hosts')
  21. GITHUB_FINGERPRINT = '9d:38:5b:83:a9:17:52:92:56:1a:5e:c4:d4:81:8e:0a:ca:51:a2:64:f1:74:20:11:2e:f8:8a:c3:a1:39:49:8f'
  22. def check_status():
  23. '''
  24. Check the status of Github for remote operations
  25. '''
  26. try:
  27. return HTTPClient().fetch('http://github.com').code == 200
  28. except Exception: # pylint: disable=broad-except
  29. return False
  30. @skip_if_binaries_missing(['ssh', 'ssh-keygen'], check_all=True)
  31. class SSHModuleTest(ModuleCase):
  32. '''
  33. Test the ssh module
  34. '''
  35. def setUp(self):
  36. '''
  37. Set up the ssh module tests
  38. '''
  39. if not check_status():
  40. self.skipTest('External source, github.com is down')
  41. super(SSHModuleTest, self).setUp()
  42. if not os.path.isdir(SUBSALT_DIR):
  43. os.makedirs(SUBSALT_DIR)
  44. ssh_raw_path = os.path.join(FILES, 'ssh', 'raw')
  45. with salt.utils.files.fopen(ssh_raw_path) as fd:
  46. self.key = fd.read().strip()
  47. def tearDown(self):
  48. '''
  49. Tear down the ssh module tests
  50. '''
  51. if os.path.isdir(SUBSALT_DIR):
  52. shutil.rmtree(SUBSALT_DIR)
  53. super(SSHModuleTest, self).tearDown()
  54. del self.key
  55. def test_auth_keys(self):
  56. '''
  57. test ssh.auth_keys
  58. '''
  59. shutil.copyfile(
  60. os.path.join(FILES, 'ssh', 'authorized_keys'),
  61. AUTHORIZED_KEYS)
  62. user = 'root'
  63. if salt.utils.platform.is_windows():
  64. user = 'Administrator'
  65. ret = self.run_function('ssh.auth_keys', [user, AUTHORIZED_KEYS])
  66. self.assertEqual(len(list(ret.items())), 1) # exactly one key is found
  67. key_data = list(ret.items())[0][1]
  68. try:
  69. self.assertEqual(key_data['comment'], 'github.com')
  70. self.assertEqual(key_data['enc'], 'ssh-rsa')
  71. self.assertEqual(
  72. key_data['options'], ['command="/usr/local/lib/ssh-helper"']
  73. )
  74. self.assertEqual(key_data['fingerprint'], GITHUB_FINGERPRINT)
  75. except AssertionError as exc:
  76. raise AssertionError(
  77. 'AssertionError: {0}. Function returned: {1}'.format(
  78. exc, ret
  79. )
  80. )
  81. def test_bad_enctype(self):
  82. '''
  83. test to make sure that bad key encoding types don't generate an
  84. invalid key entry in authorized_keys
  85. '''
  86. shutil.copyfile(
  87. os.path.join(FILES, 'ssh', 'authorized_badkeys'),
  88. AUTHORIZED_KEYS)
  89. ret = self.run_function('ssh.auth_keys', ['root', AUTHORIZED_KEYS])
  90. # The authorized_badkeys file contains a key with an invalid ssh key
  91. # encoding (dsa-sha2-nistp256 instead of ecdsa-sha2-nistp256)
  92. # auth_keys should skip any keys with invalid encodings. Internally
  93. # the minion will throw a CommandExecutionError so the
  94. # user will get an indicator of what went wrong.
  95. self.assertEqual(len(list(ret.items())), 0) # Zero keys found
  96. def test_get_known_host_entries(self):
  97. '''
  98. Check that known host information is returned from ~/.ssh/config
  99. '''
  100. shutil.copyfile(
  101. os.path.join(FILES, 'ssh', 'known_hosts'),
  102. KNOWN_HOSTS)
  103. arg = ['root', 'github.com']
  104. kwargs = {'config': KNOWN_HOSTS}
  105. ret = self.run_function('ssh.get_known_host_entries', arg, **kwargs)[0]
  106. try:
  107. self.assertEqual(ret['enc'], 'ssh-rsa')
  108. self.assertEqual(ret['key'], self.key)
  109. self.assertEqual(ret['fingerprint'], GITHUB_FINGERPRINT)
  110. except AssertionError as exc:
  111. raise AssertionError(
  112. 'AssertionError: {0}. Function returned: {1}'.format(
  113. exc, ret
  114. )
  115. )
  116. def test_recv_known_host_entries(self):
  117. '''
  118. Check that known host information is returned from remote host
  119. '''
  120. ret = self.run_function('ssh.recv_known_host_entries', ['github.com'])
  121. try:
  122. self.assertNotEqual(ret, None)
  123. self.assertEqual(ret[0]['enc'], 'ssh-rsa')
  124. self.assertEqual(ret[0]['key'], self.key)
  125. self.assertEqual(ret[0]['fingerprint'], GITHUB_FINGERPRINT)
  126. except AssertionError as exc:
  127. raise AssertionError(
  128. 'AssertionError: {0}. Function returned: {1}'.format(
  129. exc, ret
  130. )
  131. )
  132. def test_check_known_host_add(self):
  133. '''
  134. Check known hosts by its fingerprint. File needs to be updated
  135. '''
  136. arg = ['root', 'github.com']
  137. kwargs = {'fingerprint': GITHUB_FINGERPRINT, 'config': KNOWN_HOSTS}
  138. ret = self.run_function('ssh.check_known_host', arg, **kwargs)
  139. self.assertEqual(ret, 'add')
  140. def test_check_known_host_update(self):
  141. '''
  142. ssh.check_known_host update verification
  143. '''
  144. shutil.copyfile(
  145. os.path.join(FILES, 'ssh', 'known_hosts'),
  146. KNOWN_HOSTS)
  147. arg = ['root', 'github.com']
  148. kwargs = {'config': KNOWN_HOSTS}
  149. # wrong fingerprint
  150. ret = self.run_function('ssh.check_known_host', arg,
  151. **dict(kwargs, fingerprint='aa:bb:cc:dd'))
  152. self.assertEqual(ret, 'update')
  153. # wrong keyfile
  154. ret = self.run_function('ssh.check_known_host', arg,
  155. **dict(kwargs, key='YQ=='))
  156. self.assertEqual(ret, 'update')
  157. def test_check_known_host_exists(self):
  158. '''
  159. Verify check_known_host_exists
  160. '''
  161. shutil.copyfile(
  162. os.path.join(FILES, 'ssh', 'known_hosts'),
  163. KNOWN_HOSTS)
  164. arg = ['root', 'github.com']
  165. kwargs = {'config': KNOWN_HOSTS}
  166. # wrong fingerprint
  167. ret = self.run_function('ssh.check_known_host', arg,
  168. **dict(kwargs, fingerprint=GITHUB_FINGERPRINT))
  169. self.assertEqual(ret, 'exists')
  170. # wrong keyfile
  171. ret = self.run_function('ssh.check_known_host', arg,
  172. **dict(kwargs, key=self.key))
  173. self.assertEqual(ret, 'exists')
  174. def test_rm_known_host(self):
  175. '''
  176. ssh.rm_known_host
  177. '''
  178. shutil.copyfile(
  179. os.path.join(FILES, 'ssh', 'known_hosts'),
  180. KNOWN_HOSTS)
  181. arg = ['root', 'github.com']
  182. kwargs = {'config': KNOWN_HOSTS, 'key': self.key}
  183. # before removal
  184. ret = self.run_function('ssh.check_known_host', arg, **kwargs)
  185. self.assertEqual(ret, 'exists')
  186. # remove
  187. self.run_function('ssh.rm_known_host', arg, config=KNOWN_HOSTS)
  188. # after removal
  189. ret = self.run_function('ssh.check_known_host', arg, **kwargs)
  190. self.assertEqual(ret, 'add')
  191. def test_set_known_host(self):
  192. '''
  193. ssh.set_known_host
  194. '''
  195. # add item
  196. ret = self.run_function('ssh.set_known_host', ['root', 'github.com'],
  197. config=KNOWN_HOSTS)
  198. try:
  199. self.assertEqual(ret['status'], 'updated')
  200. self.assertEqual(ret['old'], None)
  201. self.assertEqual(ret['new'][0]['fingerprint'], GITHUB_FINGERPRINT)
  202. except AssertionError as exc:
  203. raise AssertionError(
  204. 'AssertionError: {0}. Function returned: {1}'.format(
  205. exc, ret
  206. )
  207. )
  208. # check that item does exist
  209. ret = self.run_function('ssh.get_known_host_entries', ['root', 'github.com'],
  210. config=KNOWN_HOSTS)[0]
  211. try:
  212. self.assertEqual(ret['fingerprint'], GITHUB_FINGERPRINT)
  213. except AssertionError as exc:
  214. raise AssertionError(
  215. 'AssertionError: {0}. Function returned: {1}'.format(
  216. exc, ret
  217. )
  218. )
  219. # add the same item once again
  220. ret = self.run_function('ssh.set_known_host', ['root', 'github.com'],
  221. config=KNOWN_HOSTS)
  222. try:
  223. self.assertEqual(ret['status'], 'exists')
  224. except AssertionError as exc:
  225. raise AssertionError(
  226. 'AssertionError: {0}. Function returned: {1}'.format(
  227. exc, ret
  228. )
  229. )