test_venafiapi.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests for the salt-run command
  4. """
  5. from __future__ import absolute_import
  6. import functools
  7. import random
  8. import string
  9. import tempfile
  10. from cryptography import x509
  11. from cryptography.hazmat.backends import default_backend
  12. from cryptography.hazmat.primitives import serialization
  13. from cryptography.x509.oid import NameOID
  14. from salt.ext.six import text_type
  15. from salt.ext.six.moves import range
  16. from tests.support.case import ShellCase
  17. from tests.support.helpers import slowTest
  18. def _random_name(prefix=""):
  19. ret = prefix
  20. for _ in range(8):
  21. ret += random.choice(string.ascii_lowercase)
  22. return ret
  23. def with_random_name(func):
  24. """
  25. generate a randomized name for a container
  26. """
  27. @functools.wraps(func)
  28. def wrapper(self, *args, **kwargs):
  29. name = _random_name(prefix="salt_")
  30. return func(self, _random_name(prefix="salt-test-"), *args, **kwargs)
  31. return wrapper
  32. class VenafiTest(ShellCase):
  33. """
  34. Test the venafi runner
  35. """
  36. @with_random_name
  37. @slowTest
  38. def test_request(self, name):
  39. cn = "{0}.example.com".format(name)
  40. # Provide python27 compatibility
  41. if not isinstance(cn, text_type):
  42. cn = cn.decode()
  43. ret = self.run_run_plus(
  44. fun="venafi.request",
  45. minion_id=cn,
  46. dns_name=cn,
  47. key_password="secretPassword",
  48. zone="fake",
  49. )
  50. cert_output = ret["return"][0]
  51. assert cert_output is not None, "venafi_certificate not found in `output_value`"
  52. cert = x509.load_pem_x509_certificate(cert_output.encode(), default_backend())
  53. assert isinstance(cert, x509.Certificate)
  54. assert cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) == [
  55. x509.NameAttribute(NameOID.COMMON_NAME, cn)
  56. ]
  57. pkey_output = ret["return"][1]
  58. assert pkey_output is not None, "venafi_private key not found in output_value"
  59. pkey = serialization.load_pem_private_key(
  60. pkey_output.encode(), password=b"secretPassword", backend=default_backend()
  61. )
  62. pkey_public_key_pem = pkey.public_key().public_bytes(
  63. encoding=serialization.Encoding.PEM,
  64. format=serialization.PublicFormat.SubjectPublicKeyInfo,
  65. )
  66. cert_public_key_pem = cert.public_key().public_bytes(
  67. encoding=serialization.Encoding.PEM,
  68. format=serialization.PublicFormat.SubjectPublicKeyInfo,
  69. )
  70. assert pkey_public_key_pem == cert_public_key_pem
  71. @with_random_name
  72. @slowTest
  73. def test_sign(self, name):
  74. csr_pem = """-----BEGIN CERTIFICATE REQUEST-----
  75. MIIFbDCCA1QCAQAwgbQxCzAJBgNVBAYTAlVTMQ0wCwYDVQQIDARVdGFoMRIwEAYD
  76. VQQHDAlTYWx0IExha2UxFDASBgNVBAoMC1ZlbmFmaSBJbmMuMRQwEgYDVQQLDAtJ
  77. bnRlZ3JhdGlvbjEnMCUGCSqGSIb3DQEJARYYZW1haWxAdmVuYWZpLmV4YW1wbGUu
  78. Y29tMS0wKwYDVQQDDCR0ZXN0LWNzci0zMjMxMzEzMS52ZW5hZmkuZXhhbXBsZS5j
  79. b20wggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQC4T0bdjq+mF+DABhF+
  80. XWCwOXXUWbPNWa72VVhxoelbyTS0iIeZEe64AvNGykytFdOuT/F9pdkZa+Io07R1
  81. ZMp6Ak8dp2Wjt4c5rayVZus6ZK+0ZwBRJO7if/cqhEpxy8Wz1RMfVLf2AE1u/xZS
  82. QSYY0BTRWGmPqrFJrIGbnyQfvmGVPk3cA0RfdrwYJZXtZ2/4QNrbNCoSoSmqTHzt
  83. NAtZhvT2dPU9U48Prx4b2460x+ck3xA1OdJNXV7n5u53QbxOIcjdGT0lJ62ml70G
  84. 5gvEHmdPcg+t5cw/Sm5cfDSUEDtNEXvD4oJXfP98ty6f1cYsZpcrgxRwk9RfGain
  85. hvoweXhZP3NWnU5nRdn2nOfExv+xMeQOyB/rYv98zqzK6LvwKhwI5UB1l/n9KTpg
  86. jgaNCP4x/KAsrPecbHK91oiqGSbPn4wtTYOmPkDxSzATN317u7fE20iqvVAUy/O+
  87. 7SCNNKEDPX2NP9LLz0IPK0roQxLiwd2CVyN6kEXuzs/3psptkNRMSlhyeAZdfrOE
  88. CNOp46Pam9f9HGBqzXxxoIlfzLqHHL584kgFlBm7qmivVrgp6zdLPDa+UayXEl2N
  89. O17SnGS8nkOTmfg3cez7lzX/LPLO9X/Y1xKYqx5hoGZhh754K8mzDWCVCYThWgou
  90. yBOYY8uNXiX6ldqzQUHpbxxQgwIDAQABoHIwcAYJKoZIhvcNAQkOMWMwYTBfBgNV
  91. HREEWDBWgilhbHQxLXRlc3QtY3NyLTMyMzEzMTMxLnZlbmFmaS5leGFtcGxlLmNv
  92. bYIpYWx0Mi10ZXN0LWNzci0zMjMxMzEzMS52ZW5hZmkuZXhhbXBsZS5jb20wDQYJ
  93. KoZIhvcNAQELBQADggIBAJd87BIdeh0WWoyQ4IX+ENpNqmm/sLmdfmUB/hj9NpBL
  94. qbr2UTWaSr1jadoZ+mrDxtm1Z0YJDTTIrEWxkBOW5wQ039lYZNe2tfDXSJZwJn7u
  95. 2keaXtWQ2SdduK1wOPDO9Hra6WnH7aEq5D1AyoghvPsZwTqZkNynt/A1BZW5C/ha
  96. J9/mwgWfL4qXBGBOhLwKN5GUo3erUkJIdH0TlMqI906D/c/YAuJ86SRdQtBYci6X
  97. bJ7C+OnoiV6USn1HtQE6dfOMeS8voJuixpSIvHZ/Aim6kSAN1Za1f6FQAkyqbF+o
  98. oKTJHDS1CPWikCeLdpPUcOCDIbsiISTsMZkEvIkzZ7dKBIlIugauxw3vaEpk47jN
  99. Wq09r639RbSv/Qs8D6uY66m1IpL4zHm4lTAknrjM/BqihPxc8YiN76ssajvQ4SFT
  100. DHPrDweEVe4KL1ENw8nv4wdkIFKwJTDarV5ZygbETzIhfa2JSBZFTdN+Wmd2Mh5h
  101. OTu+vuHrJF2TO8g1G48EB/KWGt+yvVUpWAanRMwldnFX80NcUlM7GzNn6IXTeE+j
  102. BttIbvAAVJPG8rVCP8u3DdOf+vgm5macj9oLoVP8RBYo/z0E3e+H50nXv3uS6JhN
  103. xlAKgaU6i03jOm5+sww5L2YVMi1eeBN+kx7o94ogpRemC/EUidvl1PUJ6+e7an9V
  104. -----END CERTIFICATE REQUEST-----
  105. """
  106. with tempfile.NamedTemporaryFile("w+") as f:
  107. f.write(csr_pem)
  108. f.flush()
  109. csr_path = f.name
  110. cn = "test-csr-32313131.venafi.example.com"
  111. # Provide python27 compatibility
  112. if not isinstance(cn, text_type):
  113. cn = cn.decode()
  114. ret = self.run_run_plus(
  115. fun="venafi.request", minion_id=cn, csr_path=csr_path, zone="fake"
  116. )
  117. cert_output = ret["return"][0]
  118. assert (
  119. cert_output is not None
  120. ), "venafi_certificate not found in `output_value`"
  121. cert = x509.load_pem_x509_certificate(
  122. cert_output.encode(), default_backend()
  123. )
  124. assert isinstance(cert, x509.Certificate)
  125. assert cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) == [
  126. x509.NameAttribute(NameOID.COMMON_NAME, cn)
  127. ]