test_aws_kms.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. # -*- coding: utf-8 -*-
  2. '''
  3. Unit tests for AWS KMS Decryption Renderer.
  4. '''
  5. # pylint: disable=protected-access
  6. # Import Python Libs
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. # Import Salt Testing libs
  9. from tests.support.mixins import LoaderModuleMockMixin
  10. from tests.support.unit import skipIf, TestCase
  11. from tests.support.mock import (
  12. MagicMock,
  13. patch
  14. )
  15. # Import Salt libs
  16. import salt.exceptions
  17. import salt.renderers.aws_kms as aws_kms
  18. try:
  19. import botocore.exceptions
  20. import botocore.session
  21. import botocore.stub
  22. NO_BOTOCORE = False
  23. except ImportError:
  24. NO_BOTOCORE = True
  25. try:
  26. import cryptography.fernet as fernet
  27. NO_FERNET = False
  28. except ImportError:
  29. NO_FERNET = True
  30. PLAINTEXT_SECRET = 'Use more salt.'
  31. ENCRYPTED_DATA_KEY = 'encrypted-data-key'
  32. PLAINTEXT_DATA_KEY = b'plaintext-data-key'
  33. BASE64_DATA_KEY = b'cGxhaW50ZXh0LWRhdGEta2V5'
  34. AWS_PROFILE = 'test-profile'
  35. REGION_NAME = 'us-test-1'
  36. @skipIf(NO_BOTOCORE, 'Unable to import botocore libraries')
  37. class AWSKMSTestCase(TestCase, LoaderModuleMockMixin):
  38. '''
  39. unit test AWS KMS renderer
  40. '''
  41. def setup_loader_modules(self):
  42. return {aws_kms: {}}
  43. def test__cfg_data_key(self):
  44. '''
  45. _cfg_data_key returns the aws_kms:data_key from configuration.
  46. '''
  47. config = {'aws_kms': {'data_key': ENCRYPTED_DATA_KEY}}
  48. with patch.dict(aws_kms.__salt__, {'config.get': config.get}): # pylint: disable=no-member
  49. self.assertEqual(aws_kms._cfg_data_key(), ENCRYPTED_DATA_KEY,
  50. '_cfg_data_key did not return the data key configured in __salt__.')
  51. with patch.dict(aws_kms.__opts__, config): # pylint: disable=no-member
  52. self.assertEqual(aws_kms._cfg_data_key(), ENCRYPTED_DATA_KEY,
  53. '_cfg_data_key did not return the data key configured in __opts__.')
  54. def test__cfg_data_key_no_key(self):
  55. '''
  56. When no aws_kms:data_key is configured,
  57. calling _cfg_data_key should raise a SaltConfigurationError
  58. '''
  59. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._cfg_data_key)
  60. def test__session_profile(self): # pylint: disable=no-self-use
  61. '''
  62. _session instantiates boto3.Session with the configured profile_name
  63. '''
  64. with patch.object(aws_kms, '_cfg', lambda k: AWS_PROFILE):
  65. with patch('boto3.Session') as session:
  66. aws_kms._session()
  67. session.assert_called_with(profile_name=AWS_PROFILE)
  68. def test__session_noprofile(self):
  69. '''
  70. _session raises a SaltConfigurationError
  71. when boto3 raises botocore.exceptions.ProfileNotFound.
  72. '''
  73. with patch('boto3.Session') as session:
  74. session.side_effect = botocore.exceptions.ProfileNotFound(profile=AWS_PROFILE)
  75. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._session)
  76. def test__session_noregion(self):
  77. '''
  78. _session raises a SaltConfigurationError
  79. when boto3 raises botocore.exceptions.NoRegionError
  80. '''
  81. with patch('boto3.Session') as session:
  82. session.side_effect = botocore.exceptions.NoRegionError
  83. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._session)
  84. def test__kms(self): # pylint: disable=no-self-use
  85. '''
  86. _kms calls boto3.Session.client with 'kms' as its only argument.
  87. '''
  88. with patch('boto3.Session.client') as client:
  89. aws_kms._kms()
  90. client.assert_called_with('kms')
  91. def test__kms_noregion(self):
  92. '''
  93. _kms raises a SaltConfigurationError
  94. when boto3 raises a NoRegionError.
  95. '''
  96. with patch('boto3.Session') as session:
  97. session.side_effect = botocore.exceptions.NoRegionError
  98. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._kms)
  99. def test__api_decrypt(self): # pylint: disable=no-self-use
  100. '''
  101. _api_decrypt_response calls kms.decrypt with the
  102. configured data key as the CiphertextBlob kwarg.
  103. '''
  104. kms_client = MagicMock()
  105. with patch.object(aws_kms, '_kms') as kms_getter:
  106. kms_getter.return_value = kms_client
  107. with patch.object(aws_kms, '_cfg_data_key', lambda: ENCRYPTED_DATA_KEY):
  108. aws_kms._api_decrypt()
  109. kms_client.decrypt.assert_called_with(CiphertextBlob=ENCRYPTED_DATA_KEY) # pylint: disable=no-member
  110. def test__api_decrypt_badkey(self):
  111. '''
  112. _api_decrypt_response raises SaltConfigurationError
  113. when kms.decrypt raises a botocore.exceptions.ClientError
  114. with an error_code of 'InvalidCiphertextException'.
  115. '''
  116. kms_client = MagicMock()
  117. kms_client.decrypt.side_effect = botocore.exceptions.ClientError( # pylint: disable=no-member
  118. error_response={'Error': {'Code': 'InvalidCiphertextException'}},
  119. operation_name='Decrypt',
  120. )
  121. with patch.object(aws_kms, '_kms') as kms_getter:
  122. kms_getter.return_value = kms_client
  123. with patch.object(aws_kms, '_cfg_data_key', lambda: ENCRYPTED_DATA_KEY):
  124. self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._api_decrypt)
  125. def test__plaintext_data_key(self):
  126. '''
  127. _plaintext_data_key returns the 'Plaintext' value from the response.
  128. It caches the response and only calls _api_decrypt exactly once.
  129. '''
  130. with patch.object(aws_kms, '_api_decrypt', return_value={'KeyId': 'key-id', 'Plaintext': PLAINTEXT_DATA_KEY}) as api_decrypt:
  131. self.assertEqual(aws_kms._plaintext_data_key(), PLAINTEXT_DATA_KEY)
  132. aws_kms._plaintext_data_key()
  133. api_decrypt.assert_called_once()
  134. def test__base64_plaintext_data_key(self):
  135. '''
  136. _base64_plaintext_data_key returns the urlsafe base64 encoded plain text data key.
  137. '''
  138. with patch.object(aws_kms, '_plaintext_data_key', return_value=PLAINTEXT_DATA_KEY):
  139. self.assertEqual(aws_kms._base64_plaintext_data_key(), BASE64_DATA_KEY)
  140. @skipIf(NO_FERNET, 'Failed to import cryptography.fernet')
  141. def test__decrypt_ciphertext(self):
  142. '''
  143. test _decrypt_ciphertext
  144. '''
  145. test_key = fernet.Fernet.generate_key()
  146. crypted = fernet.Fernet(test_key).encrypt(PLAINTEXT_SECRET.encode())
  147. with patch.object(aws_kms, '_base64_plaintext_data_key', return_value=test_key):
  148. self.assertEqual(aws_kms._decrypt_ciphertext(crypted), PLAINTEXT_SECRET)
  149. @skipIf(NO_FERNET, 'Failed to import cryptography.fernet')
  150. def test__decrypt_object(self):
  151. '''
  152. Test _decrypt_object
  153. '''
  154. test_key = fernet.Fernet.generate_key()
  155. crypted = fernet.Fernet(test_key).encrypt(PLAINTEXT_SECRET.encode())
  156. secret_map = {'secret': PLAINTEXT_SECRET}
  157. crypted_map = {'secret': crypted}
  158. secret_list = [PLAINTEXT_SECRET]
  159. crypted_list = [crypted]
  160. with patch.object(aws_kms, '_base64_plaintext_data_key', return_value=test_key):
  161. self.assertEqual(aws_kms._decrypt_object(PLAINTEXT_SECRET), PLAINTEXT_SECRET)
  162. self.assertEqual(aws_kms._decrypt_object(crypted), PLAINTEXT_SECRET)
  163. self.assertEqual(aws_kms._decrypt_object(crypted_map), secret_map)
  164. self.assertEqual(aws_kms._decrypt_object(crypted_list), secret_list)
  165. self.assertEqual(aws_kms._decrypt_object(None), None)
  166. @skipIf(NO_FERNET, 'Failed to import cryptography.fernet')
  167. def test_render(self):
  168. '''
  169. Test that we can decrypt some data.
  170. '''
  171. test_key = fernet.Fernet.generate_key()
  172. crypted = fernet.Fernet(test_key).encrypt(PLAINTEXT_SECRET.encode())
  173. with patch.object(aws_kms, '_base64_plaintext_data_key', return_value=test_key):
  174. self.assertEqual(aws_kms.render(crypted), PLAINTEXT_SECRET)