1
0

test_ssh.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. # -*- coding: utf-8 -*-
  2. """
  3. Test the ssh module
  4. """
  5. from __future__ import absolute_import, print_function, unicode_literals
  6. import os
  7. import shutil
  8. import pytest
  9. import salt.utils.files
  10. import salt.utils.platform
  11. from salt.ext.tornado.httpclient import HTTPClient
  12. from tests.support.case import ModuleCase
  13. from tests.support.helpers import skip_if_binaries_missing, slowTest
  14. from tests.support.runtests import RUNTIME_VARS
  15. GITHUB_FINGERPRINT = "9d:38:5b:83:a9:17:52:92:56:1a:5e:c4:d4:81:8e:0a:ca:51:a2:64:f1:74:20:11:2e:f8:8a:c3:a1:39:49:8f"
  16. def check_status():
  17. """
  18. Check the status of Github for remote operations
  19. """
  20. try:
  21. return HTTPClient().fetch("http://github.com").code == 200
  22. except Exception: # pylint: disable=broad-except
  23. return False
  24. @skip_if_binaries_missing(["ssh", "ssh-keygen"], check_all=True)
  25. @pytest.mark.windows_whitelisted
  26. class SSHModuleTest(ModuleCase):
  27. """
  28. Test the ssh module
  29. """
  30. @classmethod
  31. def setUpClass(cls):
  32. cls.subsalt_dir = os.path.join(RUNTIME_VARS.TMP, "subsalt")
  33. cls.authorized_keys = os.path.join(cls.subsalt_dir, "authorized_keys")
  34. cls.known_hosts = os.path.join(cls.subsalt_dir, "known_hosts")
  35. def setUp(self):
  36. """
  37. Set up the ssh module tests
  38. """
  39. if not check_status():
  40. self.skipTest("External source, github.com is down")
  41. super(SSHModuleTest, self).setUp()
  42. if not os.path.isdir(self.subsalt_dir):
  43. os.makedirs(self.subsalt_dir)
  44. ssh_raw_path = os.path.join(RUNTIME_VARS.FILES, "ssh", "raw")
  45. with salt.utils.files.fopen(ssh_raw_path) as fd:
  46. self.key = fd.read().strip()
  47. def tearDown(self):
  48. """
  49. Tear down the ssh module tests
  50. """
  51. if os.path.isdir(self.subsalt_dir):
  52. shutil.rmtree(self.subsalt_dir)
  53. super(SSHModuleTest, self).tearDown()
  54. del self.key
  55. @slowTest
  56. def test_auth_keys(self):
  57. """
  58. test ssh.auth_keys
  59. """
  60. shutil.copyfile(
  61. os.path.join(RUNTIME_VARS.FILES, "ssh", "authorized_keys"),
  62. self.authorized_keys,
  63. )
  64. user = "root"
  65. if salt.utils.platform.is_windows():
  66. user = "Administrator"
  67. ret = self.run_function("ssh.auth_keys", [user, self.authorized_keys])
  68. self.assertEqual(len(list(ret.items())), 1) # exactly one key is found
  69. key_data = list(ret.items())[0][1]
  70. try:
  71. self.assertEqual(key_data["comment"], "github.com")
  72. self.assertEqual(key_data["enc"], "ssh-rsa")
  73. self.assertEqual(
  74. key_data["options"], ['command="/usr/local/lib/ssh-helper"']
  75. )
  76. self.assertEqual(key_data["fingerprint"], GITHUB_FINGERPRINT)
  77. except AssertionError as exc:
  78. raise AssertionError(
  79. "AssertionError: {0}. Function returned: {1}".format(exc, ret)
  80. )
  81. @slowTest
  82. def test_bad_enctype(self):
  83. """
  84. test to make sure that bad key encoding types don't generate an
  85. invalid key entry in authorized_keys
  86. """
  87. shutil.copyfile(
  88. os.path.join(RUNTIME_VARS.FILES, "ssh", "authorized_badkeys"),
  89. self.authorized_keys,
  90. )
  91. ret = self.run_function("ssh.auth_keys", ["root", self.authorized_keys])
  92. # The authorized_badkeys file contains a key with an invalid ssh key
  93. # encoding (dsa-sha2-nistp256 instead of ecdsa-sha2-nistp256)
  94. # auth_keys should skip any keys with invalid encodings. Internally
  95. # the minion will throw a CommandExecutionError so the
  96. # user will get an indicator of what went wrong.
  97. self.assertEqual(len(list(ret.items())), 0) # Zero keys found
  98. @slowTest
  99. def test_get_known_host_entries(self):
  100. """
  101. Check that known host information is returned from ~/.ssh/config
  102. """
  103. shutil.copyfile(
  104. os.path.join(RUNTIME_VARS.FILES, "ssh", "known_hosts"), self.known_hosts
  105. )
  106. arg = ["root", "github.com"]
  107. kwargs = {"config": self.known_hosts}
  108. ret = self.run_function("ssh.get_known_host_entries", arg, **kwargs)[0]
  109. try:
  110. self.assertEqual(ret["enc"], "ssh-rsa")
  111. self.assertEqual(ret["key"], self.key)
  112. self.assertEqual(ret["fingerprint"], GITHUB_FINGERPRINT)
  113. except AssertionError as exc:
  114. raise AssertionError(
  115. "AssertionError: {0}. Function returned: {1}".format(exc, ret)
  116. )
  117. @slowTest
  118. def test_recv_known_host_entries(self):
  119. """
  120. Check that known host information is returned from remote host
  121. """
  122. ret = self.run_function("ssh.recv_known_host_entries", ["github.com"])
  123. try:
  124. self.assertNotEqual(ret, None)
  125. self.assertEqual(ret[0]["enc"], "ssh-rsa")
  126. self.assertEqual(ret[0]["key"], self.key)
  127. self.assertEqual(ret[0]["fingerprint"], GITHUB_FINGERPRINT)
  128. except AssertionError as exc:
  129. raise AssertionError(
  130. "AssertionError: {0}. Function returned: {1}".format(exc, ret)
  131. )
  132. @slowTest
  133. def test_check_known_host_add(self):
  134. """
  135. Check known hosts by its fingerprint. File needs to be updated
  136. """
  137. arg = ["root", "github.com"]
  138. kwargs = {"fingerprint": GITHUB_FINGERPRINT, "config": self.known_hosts}
  139. ret = self.run_function("ssh.check_known_host", arg, **kwargs)
  140. self.assertEqual(ret, "add")
  141. @slowTest
  142. def test_check_known_host_update(self):
  143. """
  144. ssh.check_known_host update verification
  145. """
  146. shutil.copyfile(
  147. os.path.join(RUNTIME_VARS.FILES, "ssh", "known_hosts"), self.known_hosts
  148. )
  149. arg = ["root", "github.com"]
  150. kwargs = {"config": self.known_hosts}
  151. # wrong fingerprint
  152. ret = self.run_function(
  153. "ssh.check_known_host", arg, **dict(kwargs, fingerprint="aa:bb:cc:dd")
  154. )
  155. self.assertEqual(ret, "update")
  156. # wrong keyfile
  157. ret = self.run_function("ssh.check_known_host", arg, **dict(kwargs, key="YQ=="))
  158. self.assertEqual(ret, "update")
  159. @slowTest
  160. def test_check_known_host_exists(self):
  161. """
  162. Verify check_known_host_exists
  163. """
  164. shutil.copyfile(
  165. os.path.join(RUNTIME_VARS.FILES, "ssh", "known_hosts"), self.known_hosts
  166. )
  167. arg = ["root", "github.com"]
  168. kwargs = {"config": self.known_hosts}
  169. # wrong fingerprint
  170. ret = self.run_function(
  171. "ssh.check_known_host", arg, **dict(kwargs, fingerprint=GITHUB_FINGERPRINT)
  172. )
  173. self.assertEqual(ret, "exists")
  174. # wrong keyfile
  175. ret = self.run_function(
  176. "ssh.check_known_host", arg, **dict(kwargs, key=self.key)
  177. )
  178. self.assertEqual(ret, "exists")
  179. @slowTest
  180. def test_rm_known_host(self):
  181. """
  182. ssh.rm_known_host
  183. """
  184. shutil.copyfile(
  185. os.path.join(RUNTIME_VARS.FILES, "ssh", "known_hosts"), self.known_hosts
  186. )
  187. arg = ["root", "github.com"]
  188. kwargs = {"config": self.known_hosts, "key": self.key}
  189. # before removal
  190. ret = self.run_function("ssh.check_known_host", arg, **kwargs)
  191. self.assertEqual(ret, "exists")
  192. # remove
  193. self.run_function("ssh.rm_known_host", arg, config=self.known_hosts)
  194. # after removal
  195. ret = self.run_function("ssh.check_known_host", arg, **kwargs)
  196. self.assertEqual(ret, "add")
  197. @slowTest
  198. def test_set_known_host(self):
  199. """
  200. ssh.set_known_host
  201. """
  202. # add item
  203. ret = self.run_function(
  204. "ssh.set_known_host", ["root", "github.com"], config=self.known_hosts
  205. )
  206. try:
  207. self.assertEqual(ret["status"], "updated")
  208. self.assertEqual(ret["old"], None)
  209. self.assertEqual(ret["new"][0]["fingerprint"], GITHUB_FINGERPRINT)
  210. except AssertionError as exc:
  211. raise AssertionError(
  212. "AssertionError: {0}. Function returned: {1}".format(exc, ret)
  213. )
  214. # check that item does exist
  215. ret = self.run_function(
  216. "ssh.get_known_host_entries",
  217. ["root", "github.com"],
  218. config=self.known_hosts,
  219. )[0]
  220. try:
  221. self.assertEqual(ret["fingerprint"], GITHUB_FINGERPRINT)
  222. except AssertionError as exc:
  223. raise AssertionError(
  224. "AssertionError: {0}. Function returned: {1}".format(exc, ret)
  225. )
  226. # add the same item once again
  227. ret = self.run_function(
  228. "ssh.set_known_host", ["root", "github.com"], config=self.known_hosts
  229. )
  230. try:
  231. self.assertEqual(ret["status"], "exists")
  232. except AssertionError as exc:
  233. raise AssertionError(
  234. "AssertionError: {0}. Function returned: {1}".format(exc, ret)
  235. )