test_x509.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. import datetime
  4. import hashlib
  5. import os
  6. import textwrap
  7. import salt.utils.files
  8. from salt.ext import six
  9. from tests.support.case import ModuleCase
  10. from tests.support.helpers import slowTest, with_tempfile
  11. from tests.support.mixins import SaltReturnAssertsMixin
  12. from tests.support.runtests import RUNTIME_VARS
  13. from tests.support.unit import skipIf
  14. try:
  15. import M2Crypto # pylint: disable=W0611
  16. HAS_M2CRYPTO = True
  17. except ImportError:
  18. HAS_M2CRYPTO = False
  19. @skipIf(not HAS_M2CRYPTO, "Skip when no M2Crypto found")
  20. class x509Test(ModuleCase, SaltReturnAssertsMixin):
  21. @classmethod
  22. def setUpClass(cls):
  23. cert_path = os.path.join(RUNTIME_VARS.BASE_FILES, "x509_test.crt")
  24. with salt.utils.files.fopen(cert_path) as fp:
  25. cls.x509_cert_text = fp.read()
  26. def setUp(self):
  27. with salt.utils.files.fopen(
  28. os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "signing_policies.sls"), "w"
  29. ) as fp:
  30. fp.write(
  31. textwrap.dedent(
  32. """\
  33. x509_signing_policies:
  34. ca_policy:
  35. - minions: '*'
  36. - signing_private_key: {0}/pki/ca.key
  37. - signing_cert: {0}/pki/ca.crt
  38. - O: Test Company
  39. - basicConstraints: "CA:false"
  40. - keyUsage: "critical digitalSignature, keyEncipherment"
  41. - extendedKeyUsage: "critical serverAuth, clientAuth"
  42. - subjectKeyIdentifier: hash
  43. - authorityKeyIdentifier: keyid
  44. - days_valid: 730
  45. - copypath: {0}/pki
  46. compound_match:
  47. - minions: 'G@x509_test_grain:correct_value'
  48. - signing_private_key: {0}/pki/ca.key
  49. - signing_cert: {0}/pki/ca.crt
  50. - O: Test Company
  51. - basicConstraints: "CA:false"
  52. - keyUsage: "critical digitalSignature, keyEncipherment"
  53. - extendedKeyUsage: "critical serverAuth, clientAuth"
  54. - subjectKeyIdentifier: hash
  55. - authorityKeyIdentifier: keyid
  56. - days_valid: 730
  57. - copypath: {0}/pki
  58. """.format(
  59. RUNTIME_VARS.TMP
  60. )
  61. )
  62. )
  63. with salt.utils.files.fopen(
  64. os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "top.sls"), "w"
  65. ) as fp:
  66. fp.write(
  67. textwrap.dedent(
  68. """\
  69. base:
  70. '*':
  71. - signing_policies
  72. """
  73. )
  74. )
  75. self.run_function("saltutil.refresh_pillar")
  76. self.run_function(
  77. "grains.set", ["x509_test_grain", "correct_value"], minion_tgt="sub_minion"
  78. )
  79. self.run_function(
  80. "grains.set", ["x509_test_grain", "not_correct_value"], minion_tgt="minion"
  81. )
  82. def tearDown(self):
  83. os.remove(os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "signing_policies.sls"))
  84. os.remove(os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "top.sls"))
  85. certs_path = os.path.join(RUNTIME_VARS.TMP, "pki")
  86. if os.path.exists(certs_path):
  87. salt.utils.files.rm_rf(certs_path)
  88. self.run_function("saltutil.refresh_pillar")
  89. self.run_function("grains.delkey", ["x509_test_grain"], minion_tgt="sub_minion")
  90. self.run_function("grains.delkey", ["x509_test_grain"], minion_tgt="minion")
  91. def run_function(self, *args, **kwargs): # pylint: disable=arguments-differ
  92. ret = super(x509Test, self).run_function(*args, **kwargs)
  93. return ret
  94. @staticmethod
  95. def file_checksum(path):
  96. hash = hashlib.sha1()
  97. with salt.utils.files.fopen(path, "rb") as f:
  98. for block in iter(lambda: f.read(4096), b""):
  99. hash.update(block)
  100. return hash.hexdigest()
  101. @with_tempfile(suffix=".pem", create=False)
  102. @slowTest
  103. def test_issue_49027(self, pemfile):
  104. ret = self.run_state("x509.pem_managed", name=pemfile, text=self.x509_cert_text)
  105. assert isinstance(ret, dict), ret
  106. ret = ret[next(iter(ret))]
  107. assert ret.get("result") is True, ret
  108. with salt.utils.files.fopen(pemfile) as fp:
  109. result = fp.readlines()
  110. self.assertEqual(self.x509_cert_text.splitlines(True), result)
  111. @with_tempfile(suffix=".crt", create=False)
  112. @with_tempfile(suffix=".key", create=False)
  113. @slowTest
  114. def test_issue_49008(self, keyfile, crtfile):
  115. ret = self.run_function(
  116. "state.apply",
  117. ["issue-49008"],
  118. pillar={"keyfile": keyfile, "crtfile": crtfile},
  119. )
  120. assert isinstance(ret, dict), ret
  121. for state_result in six.itervalues(ret):
  122. assert state_result["result"] is True, state_result
  123. assert os.path.exists(keyfile)
  124. assert os.path.exists(crtfile)
  125. @slowTest
  126. def test_cert_signing(self):
  127. ret = self.run_function(
  128. "state.apply", ["x509.cert_signing"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
  129. )
  130. key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
  131. RUNTIME_VARS.TMP
  132. )
  133. assert key in ret
  134. assert "changes" in ret[key]
  135. assert "Certificate" in ret[key]["changes"]
  136. assert "New" in ret[key]["changes"]["Certificate"]
  137. def test_cert_issue_not_before_not_after(self):
  138. ret = self.run_function(
  139. "state.apply",
  140. ["test_cert_not_before_not_after"],
  141. pillar={"tmp_dir": RUNTIME_VARS.TMP},
  142. )
  143. key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
  144. RUNTIME_VARS.TMP
  145. )
  146. assert key in ret
  147. assert "changes" in ret[key]
  148. assert "Certificate" in ret[key]["changes"]
  149. assert "New" in ret[key]["changes"]["Certificate"]
  150. assert "Not Before" in ret[key]["changes"]["Certificate"]["New"]
  151. assert "Not After" in ret[key]["changes"]["Certificate"]["New"]
  152. not_before = ret[key]["changes"]["Certificate"]["New"]["Not Before"]
  153. not_after = ret[key]["changes"]["Certificate"]["New"]["Not After"]
  154. assert not_before == "2019-05-05 00:00:00"
  155. assert not_after == "2020-05-05 14:30:00"
  156. def test_cert_issue_not_before(self):
  157. ret = self.run_function(
  158. "state.apply",
  159. ["test_cert_not_before"],
  160. pillar={"tmp_dir": RUNTIME_VARS.TMP},
  161. )
  162. key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
  163. RUNTIME_VARS.TMP
  164. )
  165. assert key in ret
  166. assert "changes" in ret[key]
  167. assert "Certificate" in ret[key]["changes"]
  168. assert "New" in ret[key]["changes"]["Certificate"]
  169. assert "Not Before" in ret[key]["changes"]["Certificate"]["New"]
  170. assert "Not After" in ret[key]["changes"]["Certificate"]["New"]
  171. not_before = ret[key]["changes"]["Certificate"]["New"]["Not Before"]
  172. assert not_before == "2019-05-05 00:00:00"
  173. def test_cert_issue_not_after(self):
  174. ret = self.run_function(
  175. "state.apply", ["test_cert_not_after"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
  176. )
  177. key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
  178. RUNTIME_VARS.TMP
  179. )
  180. assert key in ret
  181. assert "changes" in ret[key]
  182. assert "Certificate" in ret[key]["changes"]
  183. assert "New" in ret[key]["changes"]["Certificate"]
  184. assert "Not Before" in ret[key]["changes"]["Certificate"]["New"]
  185. assert "Not After" in ret[key]["changes"]["Certificate"]["New"]
  186. not_after = ret[key]["changes"]["Certificate"]["New"]["Not After"]
  187. assert not_after == "2020-05-05 14:30:00"
  188. @with_tempfile(suffix=".crt", create=False)
  189. @with_tempfile(suffix=".key", create=False)
  190. def test_issue_41858(self, keyfile, crtfile):
  191. ret_key = "x509_|-test_crt_|-{0}_|-certificate_managed".format(crtfile)
  192. signing_policy = "no_such_policy"
  193. ret = self.run_function(
  194. "state.apply",
  195. ["issue-41858.gen_cert"],
  196. pillar={
  197. "keyfile": keyfile,
  198. "crtfile": crtfile,
  199. "tmp_dir": RUNTIME_VARS.TMP,
  200. },
  201. )
  202. self.assertTrue(ret[ret_key]["result"])
  203. cert_sum = self.file_checksum(crtfile)
  204. ret = self.run_function(
  205. "state.apply",
  206. ["issue-41858.check"],
  207. pillar={
  208. "keyfile": keyfile,
  209. "crtfile": crtfile,
  210. "signing_policy": signing_policy,
  211. },
  212. )
  213. self.assertFalse(ret[ret_key]["result"])
  214. # self.assertSaltCommentRegexpMatches(ret[ret_key], "Signing policy {0} does not exist".format(signing_policy))
  215. self.assertEqual(self.file_checksum(crtfile), cert_sum)
  216. @with_tempfile(suffix=".crt", create=False)
  217. @with_tempfile(suffix=".key", create=False)
  218. def test_compound_match_minion_have_correct_grain_value(self, keyfile, crtfile):
  219. ret_key = "x509_|-test_crt_|-{0}_|-certificate_managed".format(crtfile)
  220. signing_policy = "compound_match"
  221. ret = self.run_function(
  222. "state.apply",
  223. ["x509_compound_match.gen_ca"],
  224. pillar={"tmp_dir": RUNTIME_VARS.TMP},
  225. )
  226. # sub_minion have grain set and CA is on other minion
  227. # CA minion have same grain with incorrect value
  228. ret = self.run_function(
  229. "state.apply",
  230. ["x509_compound_match.check"],
  231. minion_tgt="sub_minion",
  232. pillar={
  233. "keyfile": keyfile,
  234. "crtfile": crtfile,
  235. "signing_policy": signing_policy,
  236. },
  237. )
  238. self.assertTrue(ret[ret_key]["result"])
  239. @with_tempfile(suffix=".crt", create=False)
  240. @with_tempfile(suffix=".key", create=False)
  241. def test_compound_match_ca_have_correct_grain_value(self, keyfile, crtfile):
  242. self.run_function(
  243. "grains.set", ["x509_test_grain", "correct_value"], minion_tgt="minion"
  244. )
  245. self.run_function(
  246. "grains.set",
  247. ["x509_test_grain", "not_correct_value"],
  248. minion_tgt="sub_minion",
  249. )
  250. ret_key = "x509_|-test_crt_|-{0}_|-certificate_managed".format(crtfile)
  251. signing_policy = "compound_match"
  252. self.run_function(
  253. "state.apply",
  254. ["x509_compound_match.gen_ca"],
  255. pillar={"tmp_dir": RUNTIME_VARS.TMP},
  256. )
  257. ret = self.run_function(
  258. "state.apply",
  259. ["x509_compound_match.check"],
  260. minion_tgt="sub_minion",
  261. pillar={
  262. "keyfile": keyfile,
  263. "crtfile": crtfile,
  264. "signing_policy": signing_policy,
  265. },
  266. )
  267. self.assertFalse(ret[ret_key]["result"])
  268. @with_tempfile(suffix=".crt", create=False)
  269. @with_tempfile(suffix=".key", create=False)
  270. def test_self_signed_cert(self, keyfile, crtfile):
  271. """
  272. Self-signed certificate, no CA.
  273. Run the state twice to confirm the cert is only created once
  274. and its contents don't change.
  275. """
  276. first_run = self.run_function(
  277. "state.apply",
  278. ["x509.self_signed"],
  279. pillar={"keyfile": keyfile, "crtfile": crtfile},
  280. )
  281. key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
  282. self.assertIn("New", first_run[key]["changes"]["Certificate"])
  283. self.assertEqual(
  284. "Certificate is valid and up to date",
  285. first_run[key]["changes"]["Status"]["New"],
  286. )
  287. self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
  288. with salt.utils.files.fopen(crtfile, "r") as first_cert:
  289. cert_contents = first_cert.read()
  290. second_run = self.run_function(
  291. "state.apply",
  292. ["x509.self_signed"],
  293. pillar={"keyfile": keyfile, "crtfile": crtfile},
  294. )
  295. self.assertEqual({}, second_run[key]["changes"])
  296. with salt.utils.files.fopen(crtfile, "r") as second_cert:
  297. self.assertEqual(
  298. cert_contents,
  299. second_cert.read(),
  300. "Certificate contents should not have changed.",
  301. )
  302. @with_tempfile(suffix=".crt", create=False)
  303. @with_tempfile(suffix=".key", create=False)
  304. def test_old_self_signed_cert_is_recreated(self, keyfile, crtfile):
  305. """
  306. Self-signed certificate, no CA.
  307. First create a cert that expires in 30 days, then recreate
  308. the cert because the second state run requires days_remaining
  309. to be at least 90.
  310. """
  311. first_run = self.run_function(
  312. "state.apply",
  313. ["x509.self_signed_expiry"],
  314. pillar={
  315. "keyfile": keyfile,
  316. "crtfile": crtfile,
  317. "days_valid": 30,
  318. "days_remaining": 10,
  319. },
  320. )
  321. key = "x509_|-self_signed_cert_|-{0}_|-certificate_managed".format(crtfile)
  322. self.assertEqual(
  323. "Certificate is valid and up to date",
  324. first_run[key]["changes"]["Status"]["New"],
  325. )
  326. expiry = datetime.datetime.strptime(
  327. first_run[key]["changes"]["Certificate"]["New"]["Not After"],
  328. "%Y-%m-%d %H:%M:%S",
  329. )
  330. self.assertEqual(29, (expiry - datetime.datetime.now()).days)
  331. self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
  332. with salt.utils.files.fopen(crtfile, "r") as first_cert:
  333. cert_contents = first_cert.read()
  334. second_run = self.run_function(
  335. "state.apply",
  336. ["x509.self_signed_expiry"],
  337. pillar={
  338. "keyfile": keyfile,
  339. "crtfile": crtfile,
  340. "days_valid": 180,
  341. "days_remaining": 90,
  342. },
  343. )
  344. self.assertEqual(
  345. "Certificate needs renewal: 29 days remaining but it needs to be at least 90",
  346. second_run[key]["changes"]["Status"]["Old"],
  347. )
  348. expiry = datetime.datetime.strptime(
  349. second_run[key]["changes"]["Certificate"]["New"]["Not After"],
  350. "%Y-%m-%d %H:%M:%S",
  351. )
  352. self.assertEqual(179, (expiry - datetime.datetime.now()).days)
  353. with salt.utils.files.fopen(crtfile, "r") as second_cert:
  354. self.assertNotEqual(
  355. cert_contents,
  356. second_cert.read(),
  357. "Certificate contents should have changed.",
  358. )
  359. @with_tempfile(suffix=".crt", create=False)
  360. @with_tempfile(suffix=".key", create=False)
  361. def test_mismatched_self_signed_cert_is_recreated(self, keyfile, crtfile):
  362. """
  363. Self-signed certificate, no CA.
  364. First create a cert, then run the state again with a different
  365. subjectAltName. The cert should be recreated.
  366. Finally, run once more with the same subjectAltName as the
  367. second run. Nothing should change.
  368. """
  369. first_run = self.run_function(
  370. "state.apply",
  371. ["x509.self_signed_different_properties"],
  372. pillar={
  373. "keyfile": keyfile,
  374. "crtfile": crtfile,
  375. "subjectAltName": "DNS:alt.service.local",
  376. },
  377. )
  378. key = "x509_|-self_signed_cert_|-{0}_|-certificate_managed".format(crtfile)
  379. self.assertEqual(
  380. "Certificate is valid and up to date",
  381. first_run[key]["changes"]["Status"]["New"],
  382. )
  383. sans = first_run[key]["changes"]["Certificate"]["New"]["X509v3 Extensions"][
  384. "subjectAltName"
  385. ]
  386. self.assertEqual("DNS:alt.service.local", sans)
  387. self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
  388. with salt.utils.files.fopen(crtfile, "r") as first_cert:
  389. first_cert_contents = first_cert.read()
  390. second_run_pillar = {
  391. "keyfile": keyfile,
  392. "crtfile": crtfile,
  393. "subjectAltName": "DNS:alt1.service.local, DNS:alt2.service.local",
  394. }
  395. second_run = self.run_function(
  396. "state.apply",
  397. ["x509.self_signed_different_properties"],
  398. pillar=second_run_pillar,
  399. )
  400. self.assertEqual(
  401. "Certificate properties are different: X509v3 Extensions",
  402. second_run[key]["changes"]["Status"]["Old"],
  403. )
  404. sans = second_run[key]["changes"]["Certificate"]["New"]["X509v3 Extensions"][
  405. "subjectAltName"
  406. ]
  407. self.assertEqual("DNS:alt1.service.local, DNS:alt2.service.local", sans)
  408. with salt.utils.files.fopen(crtfile, "r") as second_cert:
  409. second_cert_contents = second_cert.read()
  410. self.assertNotEqual(
  411. first_cert_contents,
  412. second_cert_contents,
  413. "Certificate contents should have changed.",
  414. )
  415. third_run = self.run_function(
  416. "state.apply",
  417. ["x509.self_signed_different_properties"],
  418. pillar=second_run_pillar,
  419. )
  420. self.assertEqual({}, third_run[key]["changes"])
  421. with salt.utils.files.fopen(crtfile, "r") as third_cert:
  422. self.assertEqual(
  423. second_cert_contents,
  424. third_cert.read(),
  425. "Certificate contents should not have changed.",
  426. )
  427. @with_tempfile(suffix=".crt", create=False)
  428. @with_tempfile(suffix=".key", create=False)
  429. def test_certificate_managed_with_managed_private_key_does_not_error(
  430. self, keyfile, crtfile
  431. ):
  432. """
  433. Test using the deprecated managed_private_key arg in certificate_managed does not throw an error.
  434. TODO: Remove this test in Aluminium when the arg is removed.
  435. """
  436. self.run_state("x509.private_key_managed", name=keyfile, bits=4096)
  437. ret = self.run_state(
  438. "x509.certificate_managed",
  439. name=crtfile,
  440. CN="localhost",
  441. signing_private_key=keyfile,
  442. managed_private_key={"name": keyfile, "bits": 4096},
  443. )
  444. key = "x509_|-{0}_|-{0}_|-certificate_managed".format(crtfile)
  445. self.assertEqual(True, ret[key]["result"])