test_x509.py 15 KB


  1. # -*- coding: utf-8 -*-
  2. #
  3. # Author: Bo Maryniuk <bo@suse.de>
  4. #
  5. # Copyright 2018 SUSE LLC
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. # Import Salt Testing Libs
  18. from __future__ import absolute_import, print_function, unicode_literals
  19. import os
  20. import tempfile
  21. try:
  22. import pytest
  23. except ImportError as import_error:
  24. pytest = None
  25. from tests.support.mixins import LoaderModuleMockMixin
  26. from tests.support.unit import TestCase, skipIf
  27. from tests.support.mock import (
  28. patch,
  29. MagicMock,
  30. )
  31. from salt.modules import x509
  32. import salt.utils.stringutils
  33. import salt.utils.files
  34. try:
  35. import M2Crypto # pylint: disable=unused-import
  36. HAS_M2CRYPTO = True
  37. except ImportError:
  38. HAS_M2CRYPTO = False
  39. @skipIf(not bool(pytest), False)
  40. class X509TestCase(TestCase, LoaderModuleMockMixin):
  41. def setup_loader_modules(self):
  42. return {x509: {}}
  43. @patch('salt.modules.x509.log', MagicMock())
  44. def test_private_func__parse_subject(self):
  45. '''
  46. Test private function _parse_subject(subject) it handles a missing fields
  47. :return:
  48. '''
  49. class FakeSubject(object):
  50. '''
  51. Class for faking x509'th subject.
  52. '''
  53. def __init__(self):
  54. self.nid = {'Darth Vader': 1}
  55. def __getattr__(self, item):
  56. if item != 'nid':
  57. raise TypeError('A star wars satellite accidentally blew up the WAN.')
  58. subj = FakeSubject()
  59. x509._parse_subject(subj)
  60. assert x509.log.trace.call_args[0][0] == "Missing attribute '%s'. Error: %s"
  61. assert x509.log.trace.call_args[0][1] == list(subj.nid.keys())[0]
  62. assert isinstance(x509.log.trace.call_args[0][2], TypeError)
  63. @skipIf(not HAS_M2CRYPTO, 'Skipping, M2Crypto is unavailble')
  64. def test_get_pem_entry(self):
  65. '''
  66. Test private function _parse_subject(subject) it handles a missing fields
  67. :return:
  68. '''
  69. ca_key = b'''-----BEGIN RSA PRIVATE KEY-----
  70. MIICWwIBAAKBgQCjdjbgL4kQ8Lu73xeRRM1q3C3K3ptfCLpyfw38LRnymxaoJ6ls
  71. pNSx2dU1uJ89YKFlYLo1QcEk4rJ2fdIjarV0kuNCY3rC8jYUp9BpAU5Z6p9HKeT1
  72. 2rTPH81JyjbQDR5PyfCyzYOQtpwpB4zIUUK/Go7tTm409xGKbbUFugJNgQIDAQAB
  73. AoGAF24we34U1ZrMLifSRv5nu3OIFNZHyx2DLDpOFOGaII5edwgIXwxZeIzS5Ppr
  74. yO568/8jcdLVDqZ4EkgCwRTgoXRq3a1GLHGFmBdDNvWjSTTMLoozuM0t2zjRmIsH
  75. hUd7tnai9Lf1Bp5HlBEhBU2gZWk+SXqLvxXe74/+BDAj7gECQQDRw1OPsrgTvs3R
  76. 3MNwX6W8+iBYMTGjn6f/6rvEzUs/k6rwJluV7n8ISNUIAxoPy5g5vEYK6Ln/Ttc7
  77. u0K1KNlRAkEAx34qcxjuswavL3biNGE+8LpDJnJx1jaNWoH+ObuzYCCVMusdT2gy
  78. kKuq9ytTDgXd2qwZpIDNmscvReFy10glMQJAXebMz3U4Bk7SIHJtYy7OKQzn0dMj
  79. 35WnRV81c2Jbnzhhu2PQeAvt/i1sgEuzLQL9QEtSJ6wLJ4mJvImV0TdaIQJAAYyk
  80. TcKK0A8kOy0kMp3yvDHmJZ1L7wr7bBGIZPBlQ0Ddh8i1sJExm1gJ+uN2QKyg/XrK
  81. tDFf52zWnCdVGgDwcQJALW/WcbSEK+JVV6KDJYpwCzWpKIKpBI0F6fdCr1G7Xcwj
  82. c9bcgp7D7xD+TxWWNj4CSXEccJgGr91StV+gFg4ARQ==
  83. -----END RSA PRIVATE KEY-----
  84. '''
  85. ret = x509.get_pem_entry(ca_key)
  86. self.assertEqual(ret, ca_key)
  87. @skipIf(not HAS_M2CRYPTO, 'Skipping, M2Crypto is unavailble')
  88. def test_get_private_key_size(self):
  89. '''
  90. Test private function _parse_subject(subject) it handles a missing fields
  91. :return:
  92. '''
  93. ca_key = '''
  94. -----BEGIN RSA PRIVATE KEY-----
  95. MIICWwIBAAKBgQCjdjbgL4kQ8Lu73xeRRM1q3C3K3ptfCLpyfw38LRnymxaoJ6ls
  96. pNSx2dU1uJ89YKFlYLo1QcEk4rJ2fdIjarV0kuNCY3rC8jYUp9BpAU5Z6p9HKeT1
  97. 2rTPH81JyjbQDR5PyfCyzYOQtpwpB4zIUUK/Go7tTm409xGKbbUFugJNgQIDAQAB
  98. AoGAF24we34U1ZrMLifSRv5nu3OIFNZHyx2DLDpOFOGaII5edwgIXwxZeIzS5Ppr
  99. yO568/8jcdLVDqZ4EkgCwRTgoXRq3a1GLHGFmBdDNvWjSTTMLoozuM0t2zjRmIsH
  100. hUd7tnai9Lf1Bp5HlBEhBU2gZWk+SXqLvxXe74/+BDAj7gECQQDRw1OPsrgTvs3R
  101. 3MNwX6W8+iBYMTGjn6f/6rvEzUs/k6rwJluV7n8ISNUIAxoPy5g5vEYK6Ln/Ttc7
  102. u0K1KNlRAkEAx34qcxjuswavL3biNGE+8LpDJnJx1jaNWoH+ObuzYCCVMusdT2gy
  103. kKuq9ytTDgXd2qwZpIDNmscvReFy10glMQJAXebMz3U4Bk7SIHJtYy7OKQzn0dMj
  104. 35WnRV81c2Jbnzhhu2PQeAvt/i1sgEuzLQL9QEtSJ6wLJ4mJvImV0TdaIQJAAYyk
  105. TcKK0A8kOy0kMp3yvDHmJZ1L7wr7bBGIZPBlQ0Ddh8i1sJExm1gJ+uN2QKyg/XrK
  106. tDFf52zWnCdVGgDwcQJALW/WcbSEK+JVV6KDJYpwCzWpKIKpBI0F6fdCr1G7Xcwj
  107. c9bcgp7D7xD+TxWWNj4CSXEccJgGr91StV+gFg4ARQ==
  108. -----END RSA PRIVATE KEY-----
  109. '''
  110. ret = x509.get_private_key_size(ca_key)
  111. self.assertEqual(ret, 1024)
  112. @skipIf(not HAS_M2CRYPTO, 'Skipping, M2Crypto is unavailble')
  113. def test_create_key(self):
  114. '''
  115. Test that x509.create_key returns a private key
  116. :return:
  117. '''
  118. ret = x509.create_private_key(text=True,
  119. passphrase='super_secret_passphrase')
  120. self.assertIn('BEGIN RSA PRIVATE KEY', ret)
  121. @skipIf(not HAS_M2CRYPTO, 'Skipping, M2Crypto is unavailble')
  122. def test_create_certificate(self):
  123. '''
  124. Test private function _parse_subject(subject) it handles a missing fields
  125. :return:
  126. '''
  127. ca_key = '''
  128. -----BEGIN RSA PRIVATE KEY-----
  129. MIICWwIBAAKBgQCjdjbgL4kQ8Lu73xeRRM1q3C3K3ptfCLpyfw38LRnymxaoJ6ls
  130. pNSx2dU1uJ89YKFlYLo1QcEk4rJ2fdIjarV0kuNCY3rC8jYUp9BpAU5Z6p9HKeT1
  131. 2rTPH81JyjbQDR5PyfCyzYOQtpwpB4zIUUK/Go7tTm409xGKbbUFugJNgQIDAQAB
  132. AoGAF24we34U1ZrMLifSRv5nu3OIFNZHyx2DLDpOFOGaII5edwgIXwxZeIzS5Ppr
  133. yO568/8jcdLVDqZ4EkgCwRTgoXRq3a1GLHGFmBdDNvWjSTTMLoozuM0t2zjRmIsH
  134. hUd7tnai9Lf1Bp5HlBEhBU2gZWk+SXqLvxXe74/+BDAj7gECQQDRw1OPsrgTvs3R
  135. 3MNwX6W8+iBYMTGjn6f/6rvEzUs/k6rwJluV7n8ISNUIAxoPy5g5vEYK6Ln/Ttc7
  136. u0K1KNlRAkEAx34qcxjuswavL3biNGE+8LpDJnJx1jaNWoH+ObuzYCCVMusdT2gy
  137. kKuq9ytTDgXd2qwZpIDNmscvReFy10glMQJAXebMz3U4Bk7SIHJtYy7OKQzn0dMj
  138. 35WnRV81c2Jbnzhhu2PQeAvt/i1sgEuzLQL9QEtSJ6wLJ4mJvImV0TdaIQJAAYyk
  139. TcKK0A8kOy0kMp3yvDHmJZ1L7wr7bBGIZPBlQ0Ddh8i1sJExm1gJ+uN2QKyg/XrK
  140. tDFf52zWnCdVGgDwcQJALW/WcbSEK+JVV6KDJYpwCzWpKIKpBI0F6fdCr1G7Xcwj
  141. c9bcgp7D7xD+TxWWNj4CSXEccJgGr91StV+gFg4ARQ==
  142. -----END RSA PRIVATE KEY-----
  143. '''
  144. ret = x509.create_certificate(text=True,
  145. signing_private_key=ca_key,
  146. CN='Redacted Root CA',
  147. O='Redacted',
  148. C='BE',
  149. ST='Antwerp',
  150. L='Local Town',
  151. Email='certadm@example.org',
  152. basicConstraints="critical CA:true",
  153. keyUsage="critical cRLSign, keyCertSign",
  154. subjectKeyIdentifier='hash',
  155. authorityKeyIdentifier='keyid,issuer:always',
  156. days_valid=3650,
  157. days_remaining=0)
  158. self.assertIn('BEGIN CERTIFICATE', ret)
  159. @skipIf(not HAS_M2CRYPTO, 'Skipping, M2Crypto is unavailble')
  160. def test_create_crl(self):
  161. ca_key = '''
  162. -----BEGIN RSA PRIVATE KEY-----
  163. MIICWwIBAAKBgQCjdjbgL4kQ8Lu73xeRRM1q3C3K3ptfCLpyfw38LRnymxaoJ6ls
  164. pNSx2dU1uJ89YKFlYLo1QcEk4rJ2fdIjarV0kuNCY3rC8jYUp9BpAU5Z6p9HKeT1
  165. 2rTPH81JyjbQDR5PyfCyzYOQtpwpB4zIUUK/Go7tTm409xGKbbUFugJNgQIDAQAB
  166. AoGAF24we34U1ZrMLifSRv5nu3OIFNZHyx2DLDpOFOGaII5edwgIXwxZeIzS5Ppr
  167. yO568/8jcdLVDqZ4EkgCwRTgoXRq3a1GLHGFmBdDNvWjSTTMLoozuM0t2zjRmIsH
  168. hUd7tnai9Lf1Bp5HlBEhBU2gZWk+SXqLvxXe74/+BDAj7gECQQDRw1OPsrgTvs3R
  169. 3MNwX6W8+iBYMTGjn6f/6rvEzUs/k6rwJluV7n8ISNUIAxoPy5g5vEYK6Ln/Ttc7
  170. u0K1KNlRAkEAx34qcxjuswavL3biNGE+8LpDJnJx1jaNWoH+ObuzYCCVMusdT2gy
  171. kKuq9ytTDgXd2qwZpIDNmscvReFy10glMQJAXebMz3U4Bk7SIHJtYy7OKQzn0dMj
  172. 35WnRV81c2Jbnzhhu2PQeAvt/i1sgEuzLQL9QEtSJ6wLJ4mJvImV0TdaIQJAAYyk
  173. TcKK0A8kOy0kMp3yvDHmJZ1L7wr7bBGIZPBlQ0Ddh8i1sJExm1gJ+uN2QKyg/XrK
  174. tDFf52zWnCdVGgDwcQJALW/WcbSEK+JVV6KDJYpwCzWpKIKpBI0F6fdCr1G7Xcwj
  175. c9bcgp7D7xD+TxWWNj4CSXEccJgGr91StV+gFg4ARQ==
  176. -----END RSA PRIVATE KEY-----
  177. '''
  178. ca_cert = x509.create_certificate(text=True,
  179. signing_private_key=ca_key,
  180. CN='Redacted Root CA',
  181. O='Redacted',
  182. C='BE',
  183. ST='Antwerp',
  184. L='Local Town',
  185. Email='certadm@example.org',
  186. basicConstraints="critical CA:true",
  187. keyUsage="critical cRLSign, keyCertSign",
  188. subjectKeyIdentifier='hash',
  189. authorityKeyIdentifier='keyid,issuer:always',
  190. days_valid=3650,
  191. days_remaining=0)
  192. with tempfile.NamedTemporaryFile('w+', delete=False) as ca_key_file:
  193. ca_key_file.write(ca_key)
  194. ca_key_file.flush()
  195. with tempfile.NamedTemporaryFile('w+', delete=False) as ca_cert_file:
  196. ca_cert_file.write(salt.utils.stringutils.to_str(ca_cert))
  197. ca_cert_file.flush()
  198. with tempfile.NamedTemporaryFile('w+', delete=False) as ca_crl_file:
  199. x509.create_crl(path=ca_crl_file.name,
  200. text=False,
  201. signing_private_key=ca_key_file.name,
  202. signing_private_key_passphrase=None,
  203. signing_cert=ca_cert_file.name,
  204. revoked=None,
  205. include_expired=False,
  206. days_valid=100,
  207. digest='sha512')
  208. with salt.utils.files.fopen(ca_crl_file.name, 'r') as crl_file:
  209. crl = crl_file.read()
  210. os.remove(ca_key_file.name)
  211. os.remove(ca_cert_file.name)
  212. os.remove(ca_crl_file.name)
  213. # Ensure that a CRL was actually created
  214. self.assertIn('BEGIN X509 CRL', crl)
  215. @skipIf(not HAS_M2CRYPTO, 'Skipping, M2Crypto is unavailble')
  216. def test_revoke_certificate_with_crl(self):
  217. ca_key = '''
  218. -----BEGIN RSA PRIVATE KEY-----
  219. MIICWwIBAAKBgQCjdjbgL4kQ8Lu73xeRRM1q3C3K3ptfCLpyfw38LRnymxaoJ6ls
  220. pNSx2dU1uJ89YKFlYLo1QcEk4rJ2fdIjarV0kuNCY3rC8jYUp9BpAU5Z6p9HKeT1
  221. 2rTPH81JyjbQDR5PyfCyzYOQtpwpB4zIUUK/Go7tTm409xGKbbUFugJNgQIDAQAB
  222. AoGAF24we34U1ZrMLifSRv5nu3OIFNZHyx2DLDpOFOGaII5edwgIXwxZeIzS5Ppr
  223. yO568/8jcdLVDqZ4EkgCwRTgoXRq3a1GLHGFmBdDNvWjSTTMLoozuM0t2zjRmIsH
  224. hUd7tnai9Lf1Bp5HlBEhBU2gZWk+SXqLvxXe74/+BDAj7gECQQDRw1OPsrgTvs3R
  225. 3MNwX6W8+iBYMTGjn6f/6rvEzUs/k6rwJluV7n8ISNUIAxoPy5g5vEYK6Ln/Ttc7
  226. u0K1KNlRAkEAx34qcxjuswavL3biNGE+8LpDJnJx1jaNWoH+ObuzYCCVMusdT2gy
  227. kKuq9ytTDgXd2qwZpIDNmscvReFy10glMQJAXebMz3U4Bk7SIHJtYy7OKQzn0dMj
  228. 35WnRV81c2Jbnzhhu2PQeAvt/i1sgEuzLQL9QEtSJ6wLJ4mJvImV0TdaIQJAAYyk
  229. TcKK0A8kOy0kMp3yvDHmJZ1L7wr7bBGIZPBlQ0Ddh8i1sJExm1gJ+uN2QKyg/XrK
  230. tDFf52zWnCdVGgDwcQJALW/WcbSEK+JVV6KDJYpwCzWpKIKpBI0F6fdCr1G7Xcwj
  231. c9bcgp7D7xD+TxWWNj4CSXEccJgGr91StV+gFg4ARQ==
  232. -----END RSA PRIVATE KEY-----
  233. '''
  234. # Issue the CA certificate (self-signed)
  235. ca_cert = x509.create_certificate(text=True,
  236. signing_private_key=ca_key,
  237. CN='Redacted Root CA',
  238. O='Redacted',
  239. C='BE',
  240. ST='Antwerp',
  241. L='Local Town',
  242. Email='certadm@example.org',
  243. basicConstraints="critical CA:true",
  244. keyUsage="critical cRLSign, keyCertSign",
  245. subjectKeyIdentifier='hash',
  246. authorityKeyIdentifier='keyid,issuer:always',
  247. days_valid=3650,
  248. days_remaining=0)
  249. # Sign a client certificate with the CA
  250. server_cert = x509.create_certificate(text=True,
  251. signing_private_key=ca_key,
  252. signing_cert=ca_cert,
  253. CN='Redacted Normal Certificate',
  254. O='Redacted',
  255. C='BE',
  256. ST='Antwerp',
  257. L='Local Town',
  258. Email='certadm@example.org',
  259. basicConstraints="critical CA:false",
  260. keyUsage="critical keyEncipherment",
  261. subjectKeyIdentifier='hash',
  262. authorityKeyIdentifier='keyid,issuer:always',
  263. days_valid=365,
  264. days_remaining=0)
  265. # Save CA cert + key and server cert to disk as PEM files
  266. with tempfile.NamedTemporaryFile('w+', delete=False) as ca_key_file:
  267. ca_key_file.write(ca_key)
  268. ca_key_file.flush()
  269. with tempfile.NamedTemporaryFile('w+', delete=False) as ca_cert_file:
  270. ca_cert_file.write(salt.utils.stringutils.to_str(ca_cert))
  271. ca_cert_file.flush()
  272. with tempfile.NamedTemporaryFile('w+', delete=False) as server_cert_file:
  273. server_cert_file.write(salt.utils.stringutils.to_str(server_cert))
  274. server_cert_file.flush()
  275. # Revoke server CRL
  276. revoked = [
  277. {
  278. 'certificate': server_cert_file.name,
  279. 'revocation_date': '2015-03-01 00:00:00'
  280. }
  281. ]
  282. with tempfile.NamedTemporaryFile('w+', delete=False) as ca_crl_file:
  283. x509.create_crl(path=ca_crl_file.name,
  284. text=False,
  285. signing_private_key=ca_key_file.name,
  286. signing_private_key_passphrase=None,
  287. signing_cert=ca_cert_file.name,
  288. revoked=revoked,
  289. include_expired=False,
  290. days_valid=100,
  291. digest='sha512')
  292. # Retrieve serial number from server certificate
  293. server_cert_details = x509.read_certificate(server_cert_file.name)
  294. serial_number = server_cert_details['Serial Number'].replace(':', '')
  295. serial_number = salt.utils.stringutils.to_str(serial_number)
  296. # Retrieve CRL as text
  297. crl = M2Crypto.X509.load_crl(ca_crl_file.name).as_text()
  298. # Cleanup
  299. os.remove(ca_key_file.name)
  300. os.remove(ca_cert_file.name)
  301. os.remove(ca_crl_file.name)
  302. os.remove(server_cert_file.name)
  303. # Ensure that the correct server cert serial is amongst
  304. # the revoked certificates
  305. self.assertIn(serial_number, crl)