test_x509.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. import os
  4. import logging
  5. import salt.utils.files
  6. from salt.ext import six
  7. import textwrap
  8. from tests.support.helpers import with_tempfile
  9. from tests.support.paths import BASE_FILES, TMP, TMP_PILLAR_TREE
  10. from tests.support.case import ModuleCase
  11. from tests.support.unit import skipIf
  12. from tests.support.mixins import SaltReturnAssertsMixin
  13. try:
  14. import M2Crypto # pylint: disable=W0611
  15. HAS_M2CRYPTO = True
  16. except ImportError:
  17. HAS_M2CRYPTO = False
  18. log = logging.getLogger(__name__)
  19. @skipIf(not HAS_M2CRYPTO, 'Skip when no M2Crypto found')
  20. class x509Test(ModuleCase, SaltReturnAssertsMixin):
  21. @classmethod
  22. def setUpClass(cls):
  23. cert_path = os.path.join(BASE_FILES, 'x509_test.crt')
  24. with salt.utils.files.fopen(cert_path) as fp:
  25. cls.x509_cert_text = fp.read()
  26. def setUp(self):
  27. with salt.utils.files.fopen(os.path.join(TMP_PILLAR_TREE, 'signing_policies.sls'), 'w') as fp:
  28. fp.write(textwrap.dedent('''\
  29. x509_signing_policies:
  30. ca_policy:
  31. - minions: '*'
  32. - signing_private_key: {0}/pki/ca.key
  33. - signing_cert: {0}/pki/ca.crt
  34. - O: Test Company
  35. - basicConstraints: "CA:false"
  36. - keyUsage: "critical digitalSignature, keyEncipherment"
  37. - extendedKeyUsage: "critical serverAuth, clientAuth"
  38. - subjectKeyIdentifier: hash
  39. - authorityKeyIdentifier: keyid
  40. - days_valid: 730
  41. - copypath: {0}/pki
  42. '''.format(TMP)))
  43. with salt.utils.files.fopen(os.path.join(TMP_PILLAR_TREE, 'top.sls'), 'w') as fp:
  44. fp.write(textwrap.dedent('''\
  45. base:
  46. '*':
  47. - signing_policies
  48. '''))
  49. self.run_function('saltutil.refresh_pillar')
  50. def tearDown(self):
  51. os.remove(os.path.join(TMP_PILLAR_TREE, 'signing_policies.sls'))
  52. os.remove(os.path.join(TMP_PILLAR_TREE, 'top.sls'))
  53. certs_path = os.path.join(TMP, 'pki')
  54. if os.path.exists(certs_path):
  55. salt.utils.files.rm_rf(certs_path)
  56. self.run_function('saltutil.refresh_pillar')
  57. def run_function(self, *args, **kwargs):
  58. ret = super(x509Test, self).run_function(*args, **kwargs)
  59. log.debug('ret = %s', ret)
  60. return ret
  61. @with_tempfile(suffix='.pem', create=False)
  62. def test_issue_49027(self, pemfile):
  63. ret = self.run_state(
  64. 'x509.pem_managed',
  65. name=pemfile,
  66. text=self.x509_cert_text)
  67. assert isinstance(ret, dict), ret
  68. ret = ret[next(iter(ret))]
  69. assert ret.get('result') is True, ret
  70. with salt.utils.files.fopen(pemfile) as fp:
  71. result = fp.readlines()
  72. self.assertEqual(self.x509_cert_text.splitlines(True), result)
  73. @with_tempfile(suffix='.crt', create=False)
  74. @with_tempfile(suffix='.key', create=False)
  75. def test_issue_49008(self, keyfile, crtfile):
  76. ret = self.run_function(
  77. 'state.apply',
  78. ['issue-49008'],
  79. pillar={'keyfile': keyfile, 'crtfile': crtfile})
  80. assert isinstance(ret, dict), ret
  81. for state_result in six.itervalues(ret):
  82. assert state_result['result'] is True, state_result
  83. assert os.path.exists(keyfile)
  84. assert os.path.exists(crtfile)
  85. def test_cert_signing(self):
  86. ret = self.run_function('state.apply', ['test_cert'], pillar={'tmp_dir': TMP})
  87. key = 'x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed'.format(TMP)
  88. assert key in ret
  89. assert 'changes' in ret[key]
  90. assert 'Certificate' in ret[key]['changes']
  91. assert 'New' in ret[key]['changes']['Certificate']