123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- # -*- coding: utf-8 -*-
- '''
- Unit tests for AWS KMS Decryption Renderer.
- '''
- # pylint: disable=protected-access
- # Import Python Libs
- from __future__ import absolute_import, print_function, unicode_literals
- # Import Salt Testing libs
- from tests.support.mixins import LoaderModuleMockMixin
- from tests.support.unit import skipIf, TestCase
- from tests.support.mock import (
- MagicMock,
- patch
- )
- # Import Salt libs
- import salt.exceptions
- import salt.renderers.aws_kms as aws_kms
- try:
- import botocore.exceptions
- import botocore.session
- import botocore.stub
- NO_BOTOCORE = False
- except ImportError:
- NO_BOTOCORE = True
- try:
- import cryptography.fernet as fernet
- NO_FERNET = False
- except ImportError:
- NO_FERNET = True
- PLAINTEXT_SECRET = 'Use more salt.'
- ENCRYPTED_DATA_KEY = 'encrypted-data-key'
- PLAINTEXT_DATA_KEY = b'plaintext-data-key'
- BASE64_DATA_KEY = b'cGxhaW50ZXh0LWRhdGEta2V5'
- AWS_PROFILE = 'test-profile'
- REGION_NAME = 'us-test-1'
- @skipIf(NO_BOTOCORE, 'Unable to import botocore libraries')
- class AWSKMSTestCase(TestCase, LoaderModuleMockMixin):
- '''
- unit test AWS KMS renderer
- '''
- def setup_loader_modules(self):
- return {aws_kms: {}}
- def test__cfg_data_key(self):
- '''
- _cfg_data_key returns the aws_kms:data_key from configuration.
- '''
- config = {'aws_kms': {'data_key': ENCRYPTED_DATA_KEY}}
- with patch.dict(aws_kms.__salt__, {'config.get': config.get}): # pylint: disable=no-member
- self.assertEqual(aws_kms._cfg_data_key(), ENCRYPTED_DATA_KEY,
- '_cfg_data_key did not return the data key configured in __salt__.')
- with patch.dict(aws_kms.__opts__, config): # pylint: disable=no-member
- self.assertEqual(aws_kms._cfg_data_key(), ENCRYPTED_DATA_KEY,
- '_cfg_data_key did not return the data key configured in __opts__.')
- def test__cfg_data_key_no_key(self):
- '''
- When no aws_kms:data_key is configured,
- calling _cfg_data_key should raise a SaltConfigurationError
- '''
- self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._cfg_data_key)
- def test__session_profile(self): # pylint: disable=no-self-use
- '''
- _session instantiates boto3.Session with the configured profile_name
- '''
- with patch.object(aws_kms, '_cfg', lambda k: AWS_PROFILE):
- with patch('boto3.Session') as session:
- aws_kms._session()
- session.assert_called_with(profile_name=AWS_PROFILE)
- def test__session_noprofile(self):
- '''
- _session raises a SaltConfigurationError
- when boto3 raises botocore.exceptions.ProfileNotFound.
- '''
- with patch('boto3.Session') as session:
- session.side_effect = botocore.exceptions.ProfileNotFound(profile=AWS_PROFILE)
- self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._session)
- def test__session_noregion(self):
- '''
- _session raises a SaltConfigurationError
- when boto3 raises botocore.exceptions.NoRegionError
- '''
- with patch('boto3.Session') as session:
- session.side_effect = botocore.exceptions.NoRegionError
- self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._session)
- def test__kms(self): # pylint: disable=no-self-use
- '''
- _kms calls boto3.Session.client with 'kms' as its only argument.
- '''
- with patch('boto3.Session.client') as client:
- aws_kms._kms()
- client.assert_called_with('kms')
- def test__kms_noregion(self):
- '''
- _kms raises a SaltConfigurationError
- when boto3 raises a NoRegionError.
- '''
- with patch('boto3.Session') as session:
- session.side_effect = botocore.exceptions.NoRegionError
- self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._kms)
- def test__api_decrypt(self): # pylint: disable=no-self-use
- '''
- _api_decrypt_response calls kms.decrypt with the
- configured data key as the CiphertextBlob kwarg.
- '''
- kms_client = MagicMock()
- with patch.object(aws_kms, '_kms') as kms_getter:
- kms_getter.return_value = kms_client
- with patch.object(aws_kms, '_cfg_data_key', lambda: ENCRYPTED_DATA_KEY):
- aws_kms._api_decrypt()
- kms_client.decrypt.assert_called_with(CiphertextBlob=ENCRYPTED_DATA_KEY) # pylint: disable=no-member
- def test__api_decrypt_badkey(self):
- '''
- _api_decrypt_response raises SaltConfigurationError
- when kms.decrypt raises a botocore.exceptions.ClientError
- with an error_code of 'InvalidCiphertextException'.
- '''
- kms_client = MagicMock()
- kms_client.decrypt.side_effect = botocore.exceptions.ClientError( # pylint: disable=no-member
- error_response={'Error': {'Code': 'InvalidCiphertextException'}},
- operation_name='Decrypt',
- )
- with patch.object(aws_kms, '_kms') as kms_getter:
- kms_getter.return_value = kms_client
- with patch.object(aws_kms, '_cfg_data_key', lambda: ENCRYPTED_DATA_KEY):
- self.assertRaises(salt.exceptions.SaltConfigurationError, aws_kms._api_decrypt)
- def test__plaintext_data_key(self):
- '''
- _plaintext_data_key returns the 'Plaintext' value from the response.
- It caches the response and only calls _api_decrypt exactly once.
- '''
- with patch.object(aws_kms, '_api_decrypt', return_value={'KeyId': 'key-id', 'Plaintext': PLAINTEXT_DATA_KEY}) as api_decrypt:
- self.assertEqual(aws_kms._plaintext_data_key(), PLAINTEXT_DATA_KEY)
- aws_kms._plaintext_data_key()
- api_decrypt.assert_called_once()
- def test__base64_plaintext_data_key(self):
- '''
- _base64_plaintext_data_key returns the urlsafe base64 encoded plain text data key.
- '''
- with patch.object(aws_kms, '_plaintext_data_key', return_value=PLAINTEXT_DATA_KEY):
- self.assertEqual(aws_kms._base64_plaintext_data_key(), BASE64_DATA_KEY)
- @skipIf(NO_FERNET, 'Failed to import cryptography.fernet')
- def test__decrypt_ciphertext(self):
- '''
- test _decrypt_ciphertext
- '''
- test_key = fernet.Fernet.generate_key()
- crypted = fernet.Fernet(test_key).encrypt(PLAINTEXT_SECRET.encode())
- with patch.object(aws_kms, '_base64_plaintext_data_key', return_value=test_key):
- self.assertEqual(aws_kms._decrypt_ciphertext(crypted), PLAINTEXT_SECRET)
- @skipIf(NO_FERNET, 'Failed to import cryptography.fernet')
- def test__decrypt_object(self):
- '''
- Test _decrypt_object
- '''
- test_key = fernet.Fernet.generate_key()
- crypted = fernet.Fernet(test_key).encrypt(PLAINTEXT_SECRET.encode())
- secret_map = {'secret': PLAINTEXT_SECRET}
- crypted_map = {'secret': crypted}
- secret_list = [PLAINTEXT_SECRET]
- crypted_list = [crypted]
- with patch.object(aws_kms, '_base64_plaintext_data_key', return_value=test_key):
- self.assertEqual(aws_kms._decrypt_object(PLAINTEXT_SECRET), PLAINTEXT_SECRET)
- self.assertEqual(aws_kms._decrypt_object(crypted), PLAINTEXT_SECRET)
- self.assertEqual(aws_kms._decrypt_object(crypted_map), secret_map)
- self.assertEqual(aws_kms._decrypt_object(crypted_list), secret_list)
- self.assertEqual(aws_kms._decrypt_object(None), None)
- @skipIf(NO_FERNET, 'Failed to import cryptography.fernet')
- def test_render(self):
- '''
- Test that we can decrypt some data.
- '''
- test_key = fernet.Fernet.generate_key()
- crypted = fernet.Fernet(test_key).encrypt(PLAINTEXT_SECRET.encode())
- with patch.object(aws_kms, '_base64_plaintext_data_key', return_value=test_key):
- self.assertEqual(aws_kms.render(crypted), PLAINTEXT_SECRET)
|