test_aws_kms.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. # -*- coding: utf-8 -*-
  2. """
  3. Unit tests for AWS KMS Decryption Renderer.
  4. """
  5. # pylint: disable=protected-access
  6. # Import Python Libs
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. # Import Salt libs
  9. import salt.exceptions
  10. import salt.renderers.aws_kms as aws_kms
  11. # Import Salt Testing libs
  12. from tests.support.mixins import LoaderModuleMockMixin
  13. from tests.support.mock import MagicMock, patch
  14. from tests.support.unit import TestCase, skipIf
  15. try:
  16. import botocore.exceptions
  17. import botocore.session
  18. import botocore.stub
  19. NO_BOTOCORE = False
  20. except ImportError:
  21. NO_BOTOCORE = True
  22. try:
  23. import cryptography.fernet as fernet
  24. NO_FERNET = False
  25. except ImportError:
  26. NO_FERNET = True
  27. PLAINTEXT_SECRET = "Use more salt."
  28. ENCRYPTED_DATA_KEY = "encrypted-data-key"
  29. PLAINTEXT_DATA_KEY = b"plaintext-data-key"
  30. BASE64_DATA_KEY = b"cGxhaW50ZXh0LWRhdGEta2V5"
  31. AWS_PROFILE = "test-profile"
  32. REGION_NAME = "us-test-1"
  33. @skipIf(NO_BOTOCORE, "Unable to import botocore libraries")
  34. class AWSKMSTestCase(TestCase, LoaderModuleMockMixin):
  35. """
  36. unit test AWS KMS renderer
  37. """
  38. def setup_loader_modules(self):
  39. return {aws_kms: {}}
  40. def test__cfg_data_key(self):
  41. """
  42. _cfg_data_key returns the aws_kms:data_key from configuration.
  43. """
  44. config = {"aws_kms": {"data_key": ENCRYPTED_DATA_KEY}}
  45. with patch.dict(
  46. aws_kms.__salt__, {"config.get": config.get}
  47. ): # pylint: disable=no-member
  48. self.assertEqual(
  49. aws_kms._cfg_data_key(),
  50. ENCRYPTED_DATA_KEY,
  51. "_cfg_data_key did not return the data key configured in __salt__.",
  52. )
  53. with patch.dict(aws_kms.__opts__, config): # pylint: disable=no-member
  54. self.assertEqual(
  55. aws_kms._cfg_data_key(),
  56. ENCRYPTED_DATA_KEY,
  57. "_cfg_data_key did not return the data key configured in __opts__.",
  58. )
  59. def test__cfg_data_key_no_key(self):
  60. """
  61. When no aws_kms:data_key is configured,
  62. calling _cfg_data_key should raise a SaltConfigurationError
  63. """
  64. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._cfg_data_key)
  65. def test__session_profile(self): # pylint: disable=no-self-use
  66. """
  67. _session instantiates boto3.Session with the configured profile_name
  68. """
  69. with patch.object(aws_kms, "_cfg", lambda k: AWS_PROFILE):
  70. with patch("boto3.Session") as session:
  71. aws_kms._session()
  72. session.assert_called_with(profile_name=AWS_PROFILE)
  73. def test__session_noprofile(self):
  74. """
  75. _session raises a SaltConfigurationError
  76. when boto3 raises botocore.exceptions.ProfileNotFound.
  77. """
  78. with patch("boto3.Session") as session:
  79. session.side_effect = botocore.exceptions.ProfileNotFound(
  80. profile=AWS_PROFILE
  81. )
  82. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._session)
  83. def test__session_noregion(self):
  84. """
  85. _session raises a SaltConfigurationError
  86. when boto3 raises botocore.exceptions.NoRegionError
  87. """
  88. with patch("boto3.Session") as session:
  89. session.side_effect = botocore.exceptions.NoRegionError
  90. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._session)
  91. def test__kms(self): # pylint: disable=no-self-use
  92. """
  93. _kms calls boto3.Session.client with 'kms' as its only argument.
  94. """
  95. with patch("boto3.Session.client") as client:
  96. aws_kms._kms()
  97. client.assert_called_with("kms")
  98. def test__kms_noregion(self):
  99. """
  100. _kms raises a SaltConfigurationError
  101. when boto3 raises a NoRegionError.
  102. """
  103. with patch("boto3.Session") as session:
  104. session.side_effect = botocore.exceptions.NoRegionError
  105. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._kms)
  106. def test__api_decrypt(self): # pylint: disable=no-self-use
  107. """
  108. _api_decrypt_response calls kms.decrypt with the
  109. configured data key as the CiphertextBlob kwarg.
  110. """
  111. kms_client = MagicMock()
  112. with patch.object(aws_kms, "_kms") as kms_getter:
  113. kms_getter.return_value = kms_client
  114. with patch.object(aws_kms, "_cfg_data_key", lambda: ENCRYPTED_DATA_KEY):
  115. aws_kms._api_decrypt()
  116. kms_client.decrypt.assert_called_with(
  117. CiphertextBlob=ENCRYPTED_DATA_KEY
  118. ) # pylint: disable=no-member
  119. def test__api_decrypt_badkey(self):
  120. """
  121. _api_decrypt_response raises SaltConfigurationError
  122. when kms.decrypt raises a botocore.exceptions.ClientError
  123. with an error_code of 'InvalidCiphertextException'.
  124. """
  125. kms_client = MagicMock()
  126. kms_client.decrypt.side_effect = botocore.exceptions.ClientError( # pylint: disable=no-member
  127. error_response={"Error": {"Code": "InvalidCiphertextException"}},
  128. operation_name="Decrypt",
  129. )
  130. with patch.object(aws_kms, "_kms") as kms_getter:
  131. kms_getter.return_value = kms_client
  132. with patch.object(aws_kms, "_cfg_data_key", lambda: ENCRYPTED_DATA_KEY):
  133. self.assertRaises(
  134. salt.exceptions.SaltConfigurationError, aws_kms._api_decrypt
  135. )
  136. def test__plaintext_data_key(self):
  137. """
  138. _plaintext_data_key returns the 'Plaintext' value from the response.
  139. It caches the response and only calls _api_decrypt exactly once.
  140. """
  141. with patch.object(
  142. aws_kms,
  143. "_api_decrypt",
  144. return_value={"KeyId": "key-id", "Plaintext": PLAINTEXT_DATA_KEY},
  145. ) as api_decrypt:
  146. self.assertEqual(aws_kms._plaintext_data_key(), PLAINTEXT_DATA_KEY)
  147. aws_kms._plaintext_data_key()
  148. api_decrypt.assert_called_once()
  149. def test__base64_plaintext_data_key(self):
  150. """
  151. _base64_plaintext_data_key returns the urlsafe base64 encoded plain text data key.
  152. """
  153. with patch.object(
  154. aws_kms, "_plaintext_data_key", return_value=PLAINTEXT_DATA_KEY
  155. ):
  156. self.assertEqual(aws_kms._base64_plaintext_data_key(), BASE64_DATA_KEY)
  157. @skipIf(NO_FERNET, "Failed to import cryptography.fernet")
  158. def test__decrypt_ciphertext(self):
  159. """
  160. test _decrypt_ciphertext
  161. """
  162. test_key = fernet.Fernet.generate_key()
  163. crypted = fernet.Fernet(test_key).encrypt(PLAINTEXT_SECRET.encode())
  164. with patch.object(aws_kms, "_base64_plaintext_data_key", return_value=test_key):
  165. self.assertEqual(aws_kms._decrypt_ciphertext(crypted), PLAINTEXT_SECRET)
  166. @skipIf(NO_FERNET, "Failed to import cryptography.fernet")
  167. def test__decrypt_object(self):
  168. """
  169. Test _decrypt_object
  170. """
  171. test_key = fernet.Fernet.generate_key()
  172. crypted = fernet.Fernet(test_key).encrypt(PLAINTEXT_SECRET.encode())
  173. secret_map = {"secret": PLAINTEXT_SECRET}
  174. crypted_map = {"secret": crypted}
  175. secret_list = [PLAINTEXT_SECRET]
  176. crypted_list = [crypted]
  177. with patch.object(aws_kms, "_base64_plaintext_data_key", return_value=test_key):
  178. self.assertEqual(
  179. aws_kms._decrypt_object(PLAINTEXT_SECRET), PLAINTEXT_SECRET
  180. )
  181. self.assertEqual(aws_kms._decrypt_object(crypted), PLAINTEXT_SECRET)
  182. self.assertEqual(aws_kms._decrypt_object(crypted_map), secret_map)
  183. self.assertEqual(aws_kms._decrypt_object(crypted_list), secret_list)
  184. self.assertEqual(aws_kms._decrypt_object(None), None)
  185. @skipIf(NO_FERNET, "Failed to import cryptography.fernet")
  186. def test_render(self):
  187. """
  188. Test that we can decrypt some data.
  189. """
  190. test_key = fernet.Fernet.generate_key()
  191. crypted = fernet.Fernet(test_key).encrypt(PLAINTEXT_SECRET.encode())
  192. with patch.object(aws_kms, "_base64_plaintext_data_key", return_value=test_key):
  193. self.assertEqual(aws_kms.render(crypted), PLAINTEXT_SECRET)