test_x509.py 12 KB


  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. import datetime
  4. import os
  5. import textwrap
  6. import pytest
  7. import salt.utils.files
  8. from salt.ext import six
  9. from tests.support.case import ModuleCase
  10. from tests.support.helpers import with_tempfile
  11. from tests.support.mixins import SaltReturnAssertsMixin
  12. from tests.support.runtests import RUNTIME_VARS
  13. from tests.support.unit import skipIf
  14. try:
  15. import M2Crypto # pylint: disable=W0611
  16. HAS_M2CRYPTO = True
  17. except ImportError:
  18. HAS_M2CRYPTO = False
  19. @skipIf(not HAS_M2CRYPTO, "Skip when no M2Crypto found")
  20. class x509Test(ModuleCase, SaltReturnAssertsMixin):
  21. @classmethod
  22. def setUpClass(cls):
  23. cert_path = os.path.join(RUNTIME_VARS.BASE_FILES, "x509_test.crt")
  24. with salt.utils.files.fopen(cert_path) as fp:
  25. cls.x509_cert_text = fp.read()
  26. def setUp(self):
  27. with salt.utils.files.fopen(
  28. os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "signing_policies.sls"), "w"
  29. ) as fp:
  30. fp.write(
  31. textwrap.dedent(
  32. """\
  33. x509_signing_policies:
  34. ca_policy:
  35. - minions: '*'
  36. - signing_private_key: {0}/pki/ca.key
  37. - signing_cert: {0}/pki/ca.crt
  38. - O: Test Company
  39. - basicConstraints: "CA:false"
  40. - keyUsage: "critical digitalSignature, keyEncipherment"
  41. - extendedKeyUsage: "critical serverAuth, clientAuth"
  42. - subjectKeyIdentifier: hash
  43. - authorityKeyIdentifier: keyid
  44. - days_valid: 730
  45. - copypath: {0}/pki
  46. """.format(
  47. RUNTIME_VARS.TMP
  48. )
  49. )
  50. )
  51. with salt.utils.files.fopen(
  52. os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "top.sls"), "w"
  53. ) as fp:
  54. fp.write(
  55. textwrap.dedent(
  56. """\
  57. base:
  58. '*':
  59. - signing_policies
  60. """
  61. )
  62. )
  63. self.run_function("saltutil.refresh_pillar")
  64. def tearDown(self):
  65. os.remove(os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "signing_policies.sls"))
  66. os.remove(os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "top.sls"))
  67. certs_path = os.path.join(RUNTIME_VARS.TMP, "pki")
  68. if os.path.exists(certs_path):
  69. salt.utils.files.rm_rf(certs_path)
  70. self.run_function("saltutil.refresh_pillar")
  71. @with_tempfile(suffix=".pem", create=False)
  72. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  73. def test_issue_49027(self, pemfile):
  74. ret = self.run_state("x509.pem_managed", name=pemfile, text=self.x509_cert_text)
  75. assert isinstance(ret, dict), ret
  76. ret = ret[next(iter(ret))]
  77. assert ret.get("result") is True, ret
  78. with salt.utils.files.fopen(pemfile) as fp:
  79. result = fp.readlines()
  80. self.assertEqual(self.x509_cert_text.splitlines(True), result)
  81. @with_tempfile(suffix=".crt", create=False)
  82. @with_tempfile(suffix=".key", create=False)
  83. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  84. def test_issue_49008(self, keyfile, crtfile):
  85. ret = self.run_function(
  86. "state.apply",
  87. ["issue-49008"],
  88. pillar={"keyfile": keyfile, "crtfile": crtfile},
  89. )
  90. assert isinstance(ret, dict), ret
  91. for state_result in six.itervalues(ret):
  92. assert state_result["result"] is True, state_result
  93. assert os.path.exists(keyfile)
  94. assert os.path.exists(crtfile)
  95. @pytest.mark.slow_test(seconds=10) # Test takes >5 and <=10 seconds
  96. def test_cert_signing(self):
  97. ret = self.run_function(
  98. "state.apply", ["x509.cert_signing"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
  99. )
  100. key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
  101. RUNTIME_VARS.TMP
  102. )
  103. assert key in ret
  104. assert "changes" in ret[key]
  105. assert "Certificate" in ret[key]["changes"]
  106. assert "New" in ret[key]["changes"]["Certificate"]
  107. @with_tempfile(suffix=".crt", create=False)
  108. @with_tempfile(suffix=".key", create=False)
  109. def test_self_signed_cert(self, keyfile, crtfile):
  110. """
  111. Self-signed certificate, no CA.
  112. Run the state twice to confirm the cert is only created once
  113. and its contents don't change.
  114. """
  115. first_run = self.run_function(
  116. "state.apply",
  117. ["x509.self_signed"],
  118. pillar={"keyfile": keyfile, "crtfile": crtfile},
  119. )
  120. key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
  121. self.assertIn("New", first_run[key]["changes"]["Certificate"])
  122. self.assertEqual(
  123. "Certificate is valid and up to date",
  124. first_run[key]["changes"]["Status"]["New"],
  125. )
  126. self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
  127. with salt.utils.files.fopen(crtfile, "r") as first_cert:
  128. cert_contents = first_cert.read()
  129. second_run = self.run_function(
  130. "state.apply",
  131. ["x509.self_signed"],
  132. pillar={"keyfile": keyfile, "crtfile": crtfile},
  133. )
  134. self.assertEqual({}, second_run[key]["changes"])
  135. with salt.utils.files.fopen(crtfile, "r") as second_cert:
  136. self.assertEqual(
  137. cert_contents,
  138. second_cert.read(),
  139. "Certificate contents should not have changed.",
  140. )
  141. @with_tempfile(suffix=".crt", create=False)
  142. @with_tempfile(suffix=".key", create=False)
  143. def test_old_self_signed_cert_is_recreated(self, keyfile, crtfile):
  144. """
  145. Self-signed certificate, no CA.
  146. First create a cert that expires in 30 days, then recreate
  147. the cert because the second state run requires days_remaining
  148. to be at least 90.
  149. """
  150. first_run = self.run_function(
  151. "state.apply",
  152. ["x509.self_signed_expiry"],
  153. pillar={
  154. "keyfile": keyfile,
  155. "crtfile": crtfile,
  156. "days_valid": 30,
  157. "days_remaining": 10,
  158. },
  159. )
  160. key = "x509_|-self_signed_cert_|-{0}_|-certificate_managed".format(crtfile)
  161. self.assertEqual(
  162. "Certificate is valid and up to date",
  163. first_run[key]["changes"]["Status"]["New"],
  164. )
  165. expiry = datetime.datetime.strptime(
  166. first_run[key]["changes"]["Certificate"]["New"]["Not After"],
  167. "%Y-%m-%d %H:%M:%S",
  168. )
  169. self.assertEqual(29, (expiry - datetime.datetime.now()).days)
  170. self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
  171. with salt.utils.files.fopen(crtfile, "r") as first_cert:
  172. cert_contents = first_cert.read()
  173. second_run = self.run_function(
  174. "state.apply",
  175. ["x509.self_signed_expiry"],
  176. pillar={
  177. "keyfile": keyfile,
  178. "crtfile": crtfile,
  179. "days_valid": 180,
  180. "days_remaining": 90,
  181. },
  182. )
  183. self.assertEqual(
  184. "Certificate needs renewal: 29 days remaining but it needs to be at least 90",
  185. second_run[key]["changes"]["Status"]["Old"],
  186. )
  187. expiry = datetime.datetime.strptime(
  188. second_run[key]["changes"]["Certificate"]["New"]["Not After"],
  189. "%Y-%m-%d %H:%M:%S",
  190. )
  191. self.assertEqual(179, (expiry - datetime.datetime.now()).days)
  192. with salt.utils.files.fopen(crtfile, "r") as second_cert:
  193. self.assertNotEqual(
  194. cert_contents,
  195. second_cert.read(),
  196. "Certificate contents should have changed.",
  197. )
  198. @with_tempfile(suffix=".crt", create=False)
  199. @with_tempfile(suffix=".key", create=False)
  200. def test_mismatched_self_signed_cert_is_recreated(self, keyfile, crtfile):
  201. """
  202. Self-signed certificate, no CA.
  203. First create a cert, then run the state again with a different
  204. subjectAltName. The cert should be recreated.
  205. Finally, run once more with the same subjectAltName as the
  206. second run. Nothing should change.
  207. """
  208. first_run = self.run_function(
  209. "state.apply",
  210. ["x509.self_signed_different_properties"],
  211. pillar={
  212. "keyfile": keyfile,
  213. "crtfile": crtfile,
  214. "subjectAltName": "DNS:alt.service.local",
  215. },
  216. )
  217. key = "x509_|-self_signed_cert_|-{0}_|-certificate_managed".format(crtfile)
  218. self.assertEqual(
  219. "Certificate is valid and up to date",
  220. first_run[key]["changes"]["Status"]["New"],
  221. )
  222. sans = first_run[key]["changes"]["Certificate"]["New"]["X509v3 Extensions"][
  223. "subjectAltName"
  224. ]
  225. self.assertEqual("DNS:alt.service.local", sans)
  226. self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
  227. with salt.utils.files.fopen(crtfile, "r") as first_cert:
  228. first_cert_contents = first_cert.read()
  229. second_run_pillar = {
  230. "keyfile": keyfile,
  231. "crtfile": crtfile,
  232. "subjectAltName": "DNS:alt1.service.local, DNS:alt2.service.local",
  233. }
  234. second_run = self.run_function(
  235. "state.apply",
  236. ["x509.self_signed_different_properties"],
  237. pillar=second_run_pillar,
  238. )
  239. self.assertEqual(
  240. "Certificate properties are different: X509v3 Extensions",
  241. second_run[key]["changes"]["Status"]["Old"],
  242. )
  243. sans = second_run[key]["changes"]["Certificate"]["New"]["X509v3 Extensions"][
  244. "subjectAltName"
  245. ]
  246. self.assertEqual("DNS:alt1.service.local, DNS:alt2.service.local", sans)
  247. with salt.utils.files.fopen(crtfile, "r") as second_cert:
  248. second_cert_contents = second_cert.read()
  249. self.assertNotEqual(
  250. first_cert_contents,
  251. second_cert_contents,
  252. "Certificate contents should have changed.",
  253. )
  254. third_run = self.run_function(
  255. "state.apply",
  256. ["x509.self_signed_different_properties"],
  257. pillar=second_run_pillar,
  258. )
  259. self.assertEqual({}, third_run[key]["changes"])
  260. with salt.utils.files.fopen(crtfile, "r") as third_cert:
  261. self.assertEqual(
  262. second_cert_contents,
  263. third_cert.read(),
  264. "Certificate contents should not have changed.",
  265. )
  266. @with_tempfile(suffix=".crt", create=False)
  267. @with_tempfile(suffix=".key", create=False)
  268. def test_certificate_managed_with_managed_private_key_does_not_error(
  269. self, keyfile, crtfile
  270. ):
  271. """
  272. Test using the deprecated managed_private_key arg in certificate_managed does not throw an error.
  273. TODO: Remove this test in Aluminium when the arg is removed.
  274. """
  275. self.run_state("x509.private_key_managed", name=keyfile, bits=4096)
  276. ret = self.run_state(
  277. "x509.certificate_managed",
  278. name=crtfile,
  279. CN="localhost",
  280. signing_private_key=keyfile,
  281. managed_private_key={"name": keyfile, "bits": 4096},
  282. )
  283. key = "x509_|-{0}_|-{0}_|-certificate_managed".format(crtfile)
  284. self.assertEqual(True, ret[key]["result"])