test_ssh.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. # -*- coding: utf-8 -*-
  2. '''
  3. Test the ssh module
  4. '''
  5. # Import python libs
  6. from __future__ import absolute_import, unicode_literals, print_function
  7. import os
  8. import shutil
  9. # Import Salt Testing libs
  10. from tests.support.runtime import RUNTIME_VARS
  11. from tests.support.case import ModuleCase
  12. # Import salt libs
  13. import salt.utils.files
  14. import salt.utils.platform
  15. # Import 3rd-party libs
  16. import pytest
  17. from tornado.httpclient import HTTPClient
  18. GITHUB_FINGERPRINT = '9d:38:5b:83:a9:17:52:92:56:1a:5e:c4:d4:81:8e:0a:ca:51:a2:64:f1:74:20:11:2e:f8:8a:c3:a1:39:49:8f'
  19. def check_status():
  20. '''
  21. Check the status of Github for remote operations
  22. '''
  23. try:
  24. return HTTPClient().fetch('http://github.com').code == 200
  25. except Exception: # pylint: disable=broad-except
  26. return False
  27. @pytest.mark.skip_if_binaries_missing(['ssh', 'ssh-keygen'], check_all=True)
  28. class SSHModuleTest(ModuleCase):
  29. '''
  30. Test the ssh module
  31. '''
  32. @classmethod
  33. def setUpClass(cls):
  34. cls.subsalt_dir = os.path.join(RUNTIME_VARS.TMP, 'subsalt')
  35. cls.authorized_keys = os.path.join(cls.subsalt_dir, 'authorized_keys')
  36. cls.known_hosts = os.path.join(cls.subsalt_dir, 'known_hosts')
  37. def setUp(self):
  38. '''
  39. Set up the ssh module tests
  40. '''
  41. if not check_status():
  42. self.skipTest('External source, github.com is down')
  43. super(SSHModuleTest, self).setUp()
  44. if not os.path.isdir(self.subsalt_dir):
  45. os.makedirs(self.subsalt_dir)
  46. ssh_raw_path = os.path.join(RUNTIME_VARS.FILES, 'ssh', 'raw')
  47. with salt.utils.files.fopen(ssh_raw_path) as fd:
  48. self.key = fd.read().strip()
  49. def tearDown(self):
  50. '''
  51. Tear down the ssh module tests
  52. '''
  53. if os.path.isdir(self.subsalt_dir):
  54. shutil.rmtree(self.subsalt_dir)
  55. super(SSHModuleTest, self).tearDown()
  56. del self.key
  57. def test_auth_keys(self):
  58. '''
  59. test ssh.auth_keys
  60. '''
  61. shutil.copyfile(
  62. os.path.join(RUNTIME_VARS.FILES, 'ssh', 'authorized_keys'),
  63. self.authorized_keys)
  64. user = 'root'
  65. if salt.utils.platform.is_windows():
  66. user = 'Administrator'
  67. ret = self.run_function('ssh.auth_keys', [user, self.authorized_keys])
  68. self.assertEqual(len(list(ret.items())), 1) # exactly one key is found
  69. key_data = list(ret.items())[0][1]
  70. try:
  71. self.assertEqual(key_data['comment'], 'github.com')
  72. self.assertEqual(key_data['enc'], 'ssh-rsa')
  73. self.assertEqual(
  74. key_data['options'], ['command="/usr/local/lib/ssh-helper"']
  75. )
  76. self.assertEqual(key_data['fingerprint'], GITHUB_FINGERPRINT)
  77. except AssertionError as exc:
  78. raise AssertionError(
  79. 'AssertionError: {0}. Function returned: {1}'.format(
  80. exc, ret
  81. )
  82. )
  83. def test_bad_enctype(self):
  84. '''
  85. test to make sure that bad key encoding types don't generate an
  86. invalid key entry in authorized_keys
  87. '''
  88. shutil.copyfile(
  89. os.path.join(RUNTIME_VARS.FILES, 'ssh', 'authorized_badkeys'),
  90. self.authorized_keys)
  91. ret = self.run_function('ssh.auth_keys', ['root', self.authorized_keys])
  92. # The authorized_badkeys file contains a key with an invalid ssh key
  93. # encoding (dsa-sha2-nistp256 instead of ecdsa-sha2-nistp256)
  94. # auth_keys should skip any keys with invalid encodings. Internally
  95. # the minion will throw a CommandExecutionError so the
  96. # user will get an indicator of what went wrong.
  97. self.assertEqual(len(list(ret.items())), 0) # Zero keys found
  98. def test_get_known_host_entries(self):
  99. '''
  100. Check that known host information is returned from ~/.ssh/config
  101. '''
  102. shutil.copyfile(
  103. os.path.join(RUNTIME_VARS.FILES, 'ssh', 'known_hosts'),
  104. self.known_hosts)
  105. arg = ['root', 'github.com']
  106. kwargs = {'config': self.known_hosts}
  107. ret = self.run_function('ssh.get_known_host_entries', arg, **kwargs)[0]
  108. try:
  109. self.assertEqual(ret['enc'], 'ssh-rsa')
  110. self.assertEqual(ret['key'], self.key)
  111. self.assertEqual(ret['fingerprint'], GITHUB_FINGERPRINT)
  112. except AssertionError as exc:
  113. raise AssertionError(
  114. 'AssertionError: {0}. Function returned: {1}'.format(
  115. exc, ret
  116. )
  117. )
  118. def test_recv_known_host_entries(self):
  119. '''
  120. Check that known host information is returned from remote host
  121. '''
  122. ret = self.run_function('ssh.recv_known_host_entries', ['github.com'])
  123. try:
  124. self.assertNotEqual(ret, None)
  125. self.assertEqual(ret[0]['enc'], 'ssh-rsa')
  126. self.assertEqual(ret[0]['key'], self.key)
  127. self.assertEqual(ret[0]['fingerprint'], GITHUB_FINGERPRINT)
  128. except AssertionError as exc:
  129. raise AssertionError(
  130. 'AssertionError: {0}. Function returned: {1}'.format(
  131. exc, ret
  132. )
  133. )
  134. def test_check_known_host_add(self):
  135. '''
  136. Check known hosts by its fingerprint. File needs to be updated
  137. '''
  138. arg = ['root', 'github.com']
  139. kwargs = {'fingerprint': GITHUB_FINGERPRINT, 'config': self.known_hosts}
  140. ret = self.run_function('ssh.check_known_host', arg, **kwargs)
  141. self.assertEqual(ret, 'add')
  142. def test_check_known_host_update(self):
  143. '''
  144. ssh.check_known_host update verification
  145. '''
  146. shutil.copyfile(
  147. os.path.join(RUNTIME_VARS.FILES, 'ssh', 'known_hosts'),
  148. self.known_hosts)
  149. arg = ['root', 'github.com']
  150. kwargs = {'config': self.known_hosts}
  151. # wrong fingerprint
  152. ret = self.run_function('ssh.check_known_host', arg,
  153. **dict(kwargs, fingerprint='aa:bb:cc:dd'))
  154. self.assertEqual(ret, 'update')
  155. # wrong keyfile
  156. ret = self.run_function('ssh.check_known_host', arg,
  157. **dict(kwargs, key='YQ=='))
  158. self.assertEqual(ret, 'update')
  159. def test_check_known_host_exists(self):
  160. '''
  161. Verify check_known_host_exists
  162. '''
  163. shutil.copyfile(
  164. os.path.join(RUNTIME_VARS.FILES, 'ssh', 'known_hosts'),
  165. self.known_hosts)
  166. arg = ['root', 'github.com']
  167. kwargs = {'config': self.known_hosts}
  168. # wrong fingerprint
  169. ret = self.run_function('ssh.check_known_host', arg,
  170. **dict(kwargs, fingerprint=GITHUB_FINGERPRINT))
  171. self.assertEqual(ret, 'exists')
  172. # wrong keyfile
  173. ret = self.run_function('ssh.check_known_host', arg,
  174. **dict(kwargs, key=self.key))
  175. self.assertEqual(ret, 'exists')
  176. def test_rm_known_host(self):
  177. '''
  178. ssh.rm_known_host
  179. '''
  180. shutil.copyfile(
  181. os.path.join(RUNTIME_VARS.FILES, 'ssh', 'known_hosts'),
  182. self.known_hosts)
  183. arg = ['root', 'github.com']
  184. kwargs = {'config': self.known_hosts, 'key': self.key}
  185. # before removal
  186. ret = self.run_function('ssh.check_known_host', arg, **kwargs)
  187. self.assertEqual(ret, 'exists')
  188. # remove
  189. self.run_function('ssh.rm_known_host', arg, config=self.known_hosts)
  190. # after removal
  191. ret = self.run_function('ssh.check_known_host', arg, **kwargs)
  192. self.assertEqual(ret, 'add')
  193. def test_set_known_host(self):
  194. '''
  195. ssh.set_known_host
  196. '''
  197. # add item
  198. ret = self.run_function('ssh.set_known_host', ['root', 'github.com'],
  199. config=self.known_hosts)
  200. try:
  201. self.assertEqual(ret['status'], 'updated')
  202. self.assertEqual(ret['old'], None)
  203. self.assertEqual(ret['new'][0]['fingerprint'], GITHUB_FINGERPRINT)
  204. except AssertionError as exc:
  205. raise AssertionError(
  206. 'AssertionError: {0}. Function returned: {1}'.format(
  207. exc, ret
  208. )
  209. )
  210. # check that item does exist
  211. ret = self.run_function('ssh.get_known_host_entries', ['root', 'github.com'],
  212. config=self.known_hosts)[0]
  213. try:
  214. self.assertEqual(ret['fingerprint'], GITHUB_FINGERPRINT)
  215. except AssertionError as exc:
  216. raise AssertionError(
  217. 'AssertionError: {0}. Function returned: {1}'.format(
  218. exc, ret
  219. )
  220. )
  221. # add the same item once again
  222. ret = self.run_function('ssh.set_known_host', ['root', 'github.com'],
  223. config=self.known_hosts)
  224. try:
  225. self.assertEqual(ret['status'], 'exists')
  226. except AssertionError as exc:
  227. raise AssertionError(
  228. 'AssertionError: {0}. Function returned: {1}'.format(
  229. exc, ret
  230. )
  231. )