1
0

test_aws_kms.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. # -*- coding: utf-8 -*-
  2. '''
  3. Unit tests for AWS KMS Decryption Renderer.
  4. '''
  5. # pylint: disable=protected-access
  6. # Import Python Libs
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. # Import Salt Testing libs
  9. from tests.support.mixins import LoaderModuleMockMixin
  10. from tests.support.unit import skipIf, TestCase
  11. from tests.support.mock import (
  12. NO_MOCK,
  13. NO_MOCK_REASON,
  14. MagicMock,
  15. patch
  16. )
  17. # Import Salt libs
  18. import salt.exceptions
  19. import salt.renderers.aws_kms as aws_kms
  20. try:
  21. import botocore.exceptions
  22. import botocore.session
  23. import botocore.stub
  24. NO_BOTOCORE = False
  25. except ImportError:
  26. NO_BOTOCORE = True
  27. try:
  28. import cryptography.fernet as fernet
  29. NO_FERNET = False
  30. except ImportError:
  31. NO_FERNET = True
  32. PLAINTEXT_SECRET = 'Use more salt.'
  33. ENCRYPTED_DATA_KEY = 'encrypted-data-key'
  34. PLAINTEXT_DATA_KEY = b'plaintext-data-key'
  35. BASE64_DATA_KEY = b'cGxhaW50ZXh0LWRhdGEta2V5'
  36. AWS_PROFILE = 'test-profile'
  37. REGION_NAME = 'us-test-1'
  38. @skipIf(NO_MOCK, NO_MOCK_REASON)
  39. @skipIf(NO_BOTOCORE, 'Unable to import botocore libraries')
  40. class AWSKMSTestCase(TestCase, LoaderModuleMockMixin):
  41. '''
  42. unit test AWS KMS renderer
  43. '''
  44. def setup_loader_modules(self):
  45. return {aws_kms: {}}
  46. def test__cfg_data_key(self):
  47. '''
  48. _cfg_data_key returns the aws_kms:data_key from configuration.
  49. '''
  50. config = {'aws_kms': {'data_key': ENCRYPTED_DATA_KEY}}
  51. with patch.dict(aws_kms.__salt__, {'config.get': config.get}): # pylint: disable=no-member
  52. self.assertEqual(aws_kms._cfg_data_key(), ENCRYPTED_DATA_KEY,
  53. '_cfg_data_key did not return the data key configured in __salt__.')
  54. with patch.dict(aws_kms.__opts__, config): # pylint: disable=no-member
  55. self.assertEqual(aws_kms._cfg_data_key(), ENCRYPTED_DATA_KEY,
  56. '_cfg_data_key did not return the data key configured in __opts__.')
  57. def test__cfg_data_key_no_key(self):
  58. '''
  59. When no aws_kms:data_key is configured,
  60. calling _cfg_data_key should raise a SaltConfigurationError
  61. '''
  62. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._cfg_data_key)
  63. def test__session_profile(self): # pylint: disable=no-self-use
  64. '''
  65. _session instantiates boto3.Session with the configured profile_name
  66. '''
  67. with patch.object(aws_kms, '_cfg', lambda k: AWS_PROFILE):
  68. with patch('boto3.Session') as session:
  69. aws_kms._session()
  70. session.assert_called_with(profile_name=AWS_PROFILE)
  71. def test__session_noprofile(self):
  72. '''
  73. _session raises a SaltConfigurationError
  74. when boto3 raises botocore.exceptions.ProfileNotFound.
  75. '''
  76. with patch('boto3.Session') as session:
  77. session.side_effect = botocore.exceptions.ProfileNotFound(profile=AWS_PROFILE)
  78. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._session)
  79. def test__session_noregion(self):
  80. '''
  81. _session raises a SaltConfigurationError
  82. when boto3 raises botocore.exceptions.NoRegionError
  83. '''
  84. with patch('boto3.Session') as session:
  85. session.side_effect = botocore.exceptions.NoRegionError
  86. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._session)
  87. def test__kms(self): # pylint: disable=no-self-use
  88. '''
  89. _kms calls boto3.Session.client with 'kms' as its only argument.
  90. '''
  91. with patch('boto3.Session.client') as client:
  92. aws_kms._kms()
  93. client.assert_called_with('kms')
  94. def test__kms_noregion(self):
  95. '''
  96. _kms raises a SaltConfigurationError
  97. when boto3 raises a NoRegionError.
  98. '''
  99. with patch('boto3.Session') as session:
  100. session.side_effect = botocore.exceptions.NoRegionError
  101. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._kms)
  102. def test__api_decrypt(self): # pylint: disable=no-self-use
  103. '''
  104. _api_decrypt_response calls kms.decrypt with the
  105. configured data key as the CiphertextBlob kwarg.
  106. '''
  107. kms_client = MagicMock()
  108. with patch.object(aws_kms, '_kms') as kms_getter:
  109. kms_getter.return_value = kms_client
  110. with patch.object(aws_kms, '_cfg_data_key', lambda: ENCRYPTED_DATA_KEY):
  111. aws_kms._api_decrypt()
  112. kms_client.decrypt.assert_called_with(CiphertextBlob=ENCRYPTED_DATA_KEY) # pylint: disable=no-member
  113. def test__api_decrypt_badkey(self):
  114. '''
  115. _api_decrypt_response raises SaltConfigurationError
  116. when kms.decrypt raises a botocore.exceptions.ClientError
  117. with an error_code of 'InvalidCiphertextException'.
  118. '''
  119. kms_client = MagicMock()
  120. kms_client.decrypt.side_effect = botocore.exceptions.ClientError( # pylint: disable=no-member
  121. error_response={'Error': {'Code': 'InvalidCiphertextException'}},
  122. operation_name='Decrypt',
  123. )
  124. with patch.object(aws_kms, '_kms') as kms_getter:
  125. kms_getter.return_value = kms_client
  126. with patch.object(aws_kms, '_cfg_data_key', lambda: ENCRYPTED_DATA_KEY):
  127. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._api_decrypt)
  128. def test__plaintext_data_key(self):
  129. '''
  130. _plaintext_data_key returns the 'Plaintext' value from the response.
  131. It caches the response and only calls _api_decrypt exactly once.
  132. '''
  133. with patch.object(aws_kms, '_api_decrypt', return_value={'KeyId': 'key-id', 'Plaintext': PLAINTEXT_DATA_KEY}) as api_decrypt:
  134. self.assertEqual(aws_kms._plaintext_data_key(), PLAINTEXT_DATA_KEY)
  135. aws_kms._plaintext_data_key()
  136. api_decrypt.assert_called_once()
  137. def test__base64_plaintext_data_key(self):
  138. '''
  139. _base64_plaintext_data_key returns the urlsafe base64 encoded plain text data key.
  140. '''
  141. with patch.object(aws_kms, '_plaintext_data_key', return_value=PLAINTEXT_DATA_KEY):
  142. self.assertEqual(aws_kms._base64_plaintext_data_key(), BASE64_DATA_KEY)
  143. @skipIf(NO_FERNET, 'Failed to import cryptography.fernet')
  144. def test__decrypt_ciphertext(self):
  145. '''
  146. test _decrypt_ciphertext
  147. '''
  148. test_key = fernet.Fernet.generate_key()
  149. crypted = fernet.Fernet(test_key).encrypt(PLAINTEXT_SECRET.encode())
  150. with patch.object(aws_kms, '_base64_plaintext_data_key', return_value=test_key):
  151. self.assertEqual(aws_kms._decrypt_ciphertext(crypted), PLAINTEXT_SECRET)
  152. @skipIf(NO_FERNET, 'Failed to import cryptography.fernet')
  153. def test__decrypt_object(self):
  154. '''
  155. Test _decrypt_object
  156. '''
  157. test_key = fernet.Fernet.generate_key()
  158. crypted = fernet.Fernet(test_key).encrypt(PLAINTEXT_SECRET.encode())
  159. secret_map = {'secret': PLAINTEXT_SECRET}
  160. crypted_map = {'secret': crypted}
  161. secret_list = [PLAINTEXT_SECRET]
  162. crypted_list = [crypted]
  163. with patch.object(aws_kms, '_base64_plaintext_data_key', return_value=test_key):
  164. self.assertEqual(aws_kms._decrypt_object(PLAINTEXT_SECRET), PLAINTEXT_SECRET)
  165. self.assertEqual(aws_kms._decrypt_object(crypted), PLAINTEXT_SECRET)
  166. self.assertEqual(aws_kms._decrypt_object(crypted_map), secret_map)
  167. self.assertEqual(aws_kms._decrypt_object(crypted_list), secret_list)
  168. self.assertEqual(aws_kms._decrypt_object(None), None)
  169. @skipIf(NO_FERNET, 'Failed to import cryptography.fernet')
  170. def test_render(self):
  171. '''
  172. Test that we can decrypt some data.
  173. '''
  174. test_key = fernet.Fernet.generate_key()
  175. crypted = fernet.Fernet(test_key).encrypt(PLAINTEXT_SECRET.encode())
  176. with patch.object(aws_kms, '_base64_plaintext_data_key', return_value=test_key):
  177. self.assertEqual(aws_kms.render(crypted), PLAINTEXT_SECRET)