1
0

test_ssh_known_hosts.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. # -*- coding: utf-8 -*-
  2. """
  3. Test the ssh_known_hosts states
  4. """
  5. # Import python libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import os
  8. import shutil
  9. import sys
  10. # Import 3rd-party libs
  11. from salt.ext import six
  12. # Import Salt Testing libs
  13. from tests.support.case import ModuleCase
  14. from tests.support.helpers import skip_if_binaries_missing, slowTest
  15. from tests.support.mixins import SaltReturnAssertsMixin
  16. from tests.support.runtests import RUNTIME_VARS
  17. GITHUB_FINGERPRINT = "9d:38:5b:83:a9:17:52:92:56:1a:5e:c4:d4:81:8e:0a:ca:51:a2:64:f1:74:20:11:2e:f8:8a:c3:a1:39:49:8f"
  18. GITHUB_IP = "192.30.253.113"
  19. @skip_if_binaries_missing(["ssh", "ssh-keygen"], check_all=True)
  20. class SSHKnownHostsStateTest(ModuleCase, SaltReturnAssertsMixin):
  21. """
  22. Validate the ssh state
  23. """
  24. @classmethod
  25. def setUpClass(cls):
  26. cls.known_hosts = os.path.join(RUNTIME_VARS.TMP, "known_hosts")
  27. def tearDown(self):
  28. if os.path.isfile(self.known_hosts):
  29. os.remove(self.known_hosts)
  30. super(SSHKnownHostsStateTest, self).tearDown()
  31. @slowTest
  32. def test_present(self):
  33. """
  34. ssh_known_hosts.present
  35. """
  36. kwargs = {
  37. "name": "github.com",
  38. "user": "root",
  39. "fingerprint": GITHUB_FINGERPRINT,
  40. "config": self.known_hosts,
  41. }
  42. # test first
  43. ret = self.run_state("ssh_known_hosts.present", test=True, **kwargs)
  44. self.assertSaltNoneReturn(ret)
  45. # save once, new key appears
  46. ret = self.run_state("ssh_known_hosts.present", **kwargs)
  47. try:
  48. self.assertSaltTrueReturn(ret)
  49. except AssertionError as err:
  50. try:
  51. self.assertInSaltComment("Unable to receive remote host key", ret)
  52. self.skipTest("Unable to receive remote host key")
  53. except AssertionError:
  54. six.reraise(*sys.exc_info())
  55. self.assertSaltStateChangesEqual(
  56. ret, GITHUB_FINGERPRINT, keys=("new", 0, "fingerprint")
  57. )
  58. # save twice, no changes
  59. self.run_state("ssh_known_hosts.present", **kwargs)
  60. # test again, nothing is about to be changed
  61. ret = self.run_state("ssh_known_hosts.present", test=True, **kwargs)
  62. self.assertSaltTrueReturn(ret)
  63. # then add a record for IP address
  64. # pylint: disable=repeated-keyword
  65. ret = self.run_state("ssh_known_hosts.present", **dict(kwargs, name=GITHUB_IP))
  66. # pylint: enable=repeated-keyword
  67. try:
  68. self.assertSaltStateChangesEqual(
  69. ret, GITHUB_FINGERPRINT, keys=("new", 0, "fingerprint")
  70. )
  71. except AssertionError as err:
  72. try:
  73. self.assertInSaltComment("Unable to receive remote host key", ret)
  74. self.skipTest("Unable to receive remote host key")
  75. except AssertionError:
  76. six.reraise(*sys.exc_info())
  77. # record for every host must be available
  78. ret = self.run_function(
  79. "ssh.get_known_host_entries",
  80. ["root", "github.com"],
  81. config=self.known_hosts,
  82. )[0]
  83. try:
  84. self.assertNotIn(ret, ("", None))
  85. except AssertionError:
  86. raise AssertionError("Salt return '{0}' is in ('', None).".format(ret))
  87. ret = self.run_function(
  88. "ssh.get_known_host_entries", ["root", GITHUB_IP], config=self.known_hosts
  89. )[0]
  90. try:
  91. self.assertNotIn(ret, ("", None, {}))
  92. except AssertionError:
  93. raise AssertionError(
  94. "Salt return '{0}' is in ('', None,".format(ret) + " {})"
  95. )
  96. @slowTest
  97. def test_present_fail(self):
  98. # save something wrong
  99. ret = self.run_state(
  100. "ssh_known_hosts.present",
  101. name="github.com",
  102. user="root",
  103. fingerprint="aa:bb:cc:dd",
  104. config=self.known_hosts,
  105. )
  106. self.assertSaltFalseReturn(ret)
  107. @slowTest
  108. def test_absent(self):
  109. """
  110. ssh_known_hosts.absent
  111. """
  112. known_hosts = os.path.join(RUNTIME_VARS.FILES, "ssh", "known_hosts")
  113. shutil.copyfile(known_hosts, self.known_hosts)
  114. if not os.path.isfile(self.known_hosts):
  115. self.skipTest(
  116. "Unable to copy {0} to {1}".format(known_hosts, self.known_hosts)
  117. )
  118. kwargs = {"name": "github.com", "user": "root", "config": self.known_hosts}
  119. # test first
  120. ret = self.run_state("ssh_known_hosts.absent", test=True, **kwargs)
  121. self.assertSaltNoneReturn(ret)
  122. # remove once, the key is gone
  123. ret = self.run_state("ssh_known_hosts.absent", **kwargs)
  124. self.assertSaltStateChangesEqual(
  125. ret, GITHUB_FINGERPRINT, keys=("old", 0, "fingerprint")
  126. )
  127. # remove twice, nothing has changed
  128. ret = self.run_state("ssh_known_hosts.absent", **kwargs)
  129. self.assertSaltStateChangesEqual(ret, {})
  130. # test again
  131. ret = self.run_state("ssh_known_hosts.absent", test=True, **kwargs)
  132. self.assertSaltTrueReturn(ret)