123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396 |
- # -*- coding: utf-8 -*-
- # Import Python libs
- from __future__ import absolute_import, print_function, unicode_literals
- import random
- import string
- import logging
- # Import Salt Testing libs
- from tests.support.mixins import LoaderModuleMockMixin
- from tests.support.unit import skipIf, TestCase
- from tests.support.mock import (
- MagicMock,
- patch
- )
- # Import Salt libs
- import salt.config
- import salt.loader
- import salt.modules.boto_cloudtrail as boto_cloudtrail
- from salt.utils.versions import LooseVersion
- from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
- # Import 3rd-party libs
- # pylint: disable=import-error,no-name-in-module,unused-import
- try:
- import boto
- import boto3
- from botocore.exceptions import ClientError
- HAS_BOTO = True
- except ImportError:
- HAS_BOTO = False
- # pylint: enable=import-error,no-name-in-module,unused-import
- # the boto_cloudtrail module relies on the connect_to_region() method
- # which was added in boto 2.8.0
- # https://github.com/boto/boto/commit/33ac26b416fbb48a60602542b4ce15dcc7029f12
- required_boto3_version = '1.2.1'
- log = logging.getLogger(__name__)
- def _has_required_boto():
- '''
- Returns True/False boolean depending on if Boto is installed and correct
- version.
- '''
- if not HAS_BOTO:
- return False
- elif LooseVersion(boto3.__version__) < LooseVersion(required_boto3_version):
- return False
- else:
- return True
- if _has_required_boto():
- region = 'us-east-1'
- access_key = 'GKTADJGHEIQSXMKKRBJ08H'
- secret_key = 'askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs'
- conn_parameters = {'region': region, 'key': access_key, 'keyid': secret_key, 'profile': {}}
- error_message = 'An error occurred (101) when calling the {0} operation: Test-defined error'
- not_found_error = ClientError({
- 'Error': {
- 'Code': 'TrailNotFoundException',
- 'Message': "Test-defined error"
- }
- }, 'msg')
- error_content = {
- 'Error': {
- 'Code': 101,
- 'Message': "Test-defined error"
- }
- }
- trail_ret = dict(Name='testtrail',
- IncludeGlobalServiceEvents=True,
- KmsKeyId=None,
- LogFileValidationEnabled=False,
- S3BucketName='auditinfo',
- TrailARN='arn:aws:cloudtrail:us-east-1:214351231622:trail/testtrail')
- status_ret = dict(IsLogging=False,
- LatestCloudWatchLogsDeliveryError=None,
- LatestCloudWatchLogsDeliveryTime=None,
- LatestDeliveryError=None,
- LatestDeliveryTime=None,
- LatestDigestDeliveryError=None,
- LatestDigestDeliveryTime=None,
- LatestNotificationError=None,
- LatestNotificationTime=None,
- StartLoggingTime=None,
- StopLoggingTime=None)
- @skipIf(HAS_BOTO is False, 'The boto module must be installed.')
- @skipIf(_has_required_boto() is False, 'The boto3 module must be greater than'
- ' or equal to version {0}'
- .format(required_boto3_version))
- class BotoCloudTrailTestCaseBase(TestCase, LoaderModuleMockMixin):
- conn = None
- def setup_loader_modules(self):
- self.opts = opts = salt.config.DEFAULT_MINION_OPTS
- utils = salt.loader.utils(
- opts,
- whitelist=['boto3', 'args', 'systemd', 'path', 'platform'],
- context={})
- return {
- boto_cloudtrail: {
- '__utils__': utils,
- }
- }
- def setUp(self):
- super(BotoCloudTrailTestCaseBase, self).setUp()
- boto_cloudtrail.__init__(self.opts)
- del self.opts
- # Set up MagicMock to replace the boto3 session
- # connections keep getting cached from prior tests, can't find the
- # correct context object to clear it. So randomize the cache key, to prevent any
- # cache hits
- conn_parameters['key'] = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(50))
- self.patcher = patch('boto3.session.Session')
- self.addCleanup(self.patcher.stop)
- self.addCleanup(delattr, self, 'patcher')
- mock_session = self.patcher.start()
- session_instance = mock_session.return_value
- self.conn = MagicMock()
- self.addCleanup(delattr, self, 'conn')
- session_instance.client.return_value = self.conn
- class BotoCloudTrailTestCaseMixin(object):
- pass
- class BotoCloudTrailTestCase(BotoCloudTrailTestCaseBase, BotoCloudTrailTestCaseMixin):
- '''
- TestCase for salt.modules.boto_cloudtrail module
- '''
- def test_that_when_checking_if_a_trail_exists_and_a_trail_exists_the_trail_exists_method_returns_true(self):
- '''
- Tests checking cloudtrail trail existence when the cloudtrail trail already exists
- '''
- self.conn.get_trail_status.return_value = trail_ret
- result = boto_cloudtrail.exists(Name=trail_ret['Name'], **conn_parameters)
- self.assertTrue(result['exists'])
- def test_that_when_checking_if_a_trail_exists_and_a_trail_does_not_exist_the_trail_exists_method_returns_false(self):
- '''
- Tests checking cloudtrail trail existence when the cloudtrail trail does not exist
- '''
- self.conn.get_trail_status.side_effect = not_found_error
- result = boto_cloudtrail.exists(Name='mytrail', **conn_parameters)
- self.assertFalse(result['exists'])
- def test_that_when_checking_if_a_trail_exists_and_boto3_returns_an_error_the_trail_exists_method_returns_error(self):
- '''
- Tests checking cloudtrail trail existence when boto returns an error
- '''
- self.conn.get_trail_status.side_effect = ClientError(error_content, 'get_trail_status')
- result = boto_cloudtrail.exists(Name='mytrail', **conn_parameters)
- self.assertEqual(result.get('error', {}).get('message'), error_message.format('get_trail_status'))
- def test_that_when_creating_a_trail_succeeds_the_create_trail_method_returns_true(self):
- '''
- tests True trail created.
- '''
- self.conn.create_trail.return_value = trail_ret
- result = boto_cloudtrail.create(Name=trail_ret['Name'],
- S3BucketName=trail_ret['S3BucketName'],
- **conn_parameters)
- self.assertTrue(result['created'])
- def test_that_when_creating_a_trail_fails_the_create_trail_method_returns_error(self):
- '''
- tests False trail not created.
- '''
- self.conn.create_trail.side_effect = ClientError(error_content, 'create_trail')
- result = boto_cloudtrail.create(Name=trail_ret['Name'],
- S3BucketName=trail_ret['S3BucketName'],
- **conn_parameters)
- self.assertEqual(result.get('error', {}).get('message'), error_message.format('create_trail'))
- def test_that_when_deleting_a_trail_succeeds_the_delete_trail_method_returns_true(self):
- '''
- tests True trail deleted.
- '''
- result = boto_cloudtrail.delete(Name='testtrail',
- **conn_parameters)
- self.assertTrue(result['deleted'])
- def test_that_when_deleting_a_trail_fails_the_delete_trail_method_returns_false(self):
- '''
- tests False trail not deleted.
- '''
- self.conn.delete_trail.side_effect = ClientError(error_content, 'delete_trail')
- result = boto_cloudtrail.delete(Name='testtrail',
- **conn_parameters)
- self.assertFalse(result['deleted'])
- def test_that_when_describing_trail_it_returns_the_dict_of_properties_returns_true(self):
- '''
- Tests describing parameters if trail exists
- '''
- self.conn.describe_trails.return_value = {'trailList': [trail_ret]}
- result = boto_cloudtrail.describe(Name=trail_ret['Name'], **conn_parameters)
- self.assertTrue(result['trail'])
- def test_that_when_describing_trail_it_returns_the_dict_of_properties_returns_false(self):
- '''
- Tests describing parameters if trail does not exist
- '''
- self.conn.describe_trails.side_effect = not_found_error
- result = boto_cloudtrail.describe(Name='testtrail', **conn_parameters)
- self.assertFalse(result['trail'])
- def test_that_when_describing_trail_on_client_error_it_returns_error(self):
- '''
- Tests describing parameters failure
- '''
- self.conn.describe_trails.side_effect = ClientError(error_content, 'get_trail')
- result = boto_cloudtrail.describe(Name='testtrail', **conn_parameters)
- self.assertTrue('error' in result)
- def test_that_when_getting_status_it_returns_the_dict_of_properties_returns_true(self):
- '''
- Tests getting status if trail exists
- '''
- self.conn.get_trail_status.return_value = status_ret
- result = boto_cloudtrail.status(Name=trail_ret['Name'], **conn_parameters)
- self.assertTrue(result['trail'])
- def test_that_when_getting_status_it_returns_the_dict_of_properties_returns_false(self):
- '''
- Tests getting status if trail does not exist
- '''
- self.conn.get_trail_status.side_effect = not_found_error
- result = boto_cloudtrail.status(Name='testtrail', **conn_parameters)
- self.assertFalse(result['trail'])
- def test_that_when_getting_status_on_client_error_it_returns_error(self):
- '''
- Tests getting status failure
- '''
- self.conn.get_trail_status.side_effect = ClientError(error_content, 'get_trail_status')
- result = boto_cloudtrail.status(Name='testtrail', **conn_parameters)
- self.assertTrue('error' in result)
- def test_that_when_listing_trails_succeeds_the_list_trails_method_returns_true(self):
- '''
- tests True trails listed.
- '''
- self.conn.describe_trails.return_value = {'trailList': [trail_ret]}
- result = boto_cloudtrail.list(**conn_parameters)
- self.assertTrue(result['trails'])
- def test_that_when_listing_trail_fails_the_list_trail_method_returns_false(self):
- '''
- tests False no trail listed.
- '''
- self.conn.describe_trails.return_value = {'trailList': []}
- result = boto_cloudtrail.list(**conn_parameters)
- self.assertFalse(result['trails'])
- def test_that_when_listing_trail_fails_the_list_trail_method_returns_error(self):
- '''
- tests False trail error.
- '''
- self.conn.describe_trails.side_effect = ClientError(error_content, 'list_trails')
- result = boto_cloudtrail.list(**conn_parameters)
- self.assertEqual(result.get('error', {}).get('message'), error_message.format('list_trails'))
- def test_that_when_updating_a_trail_succeeds_the_update_trail_method_returns_true(self):
- '''
- tests True trail updated.
- '''
- self.conn.update_trail.return_value = trail_ret
- result = boto_cloudtrail.update(Name=trail_ret['Name'],
- S3BucketName=trail_ret['S3BucketName'],
- **conn_parameters)
- self.assertTrue(result['updated'])
- def test_that_when_updating_a_trail_fails_the_update_trail_method_returns_error(self):
- '''
- tests False trail not updated.
- '''
- self.conn.update_trail.side_effect = ClientError(error_content, 'update_trail')
- result = boto_cloudtrail.update(Name=trail_ret['Name'],
- S3BucketName=trail_ret['S3BucketName'],
- **conn_parameters)
- self.assertEqual(result.get('error', {}).get('message'), error_message.format('update_trail'))
- def test_that_when_starting_logging_succeeds_the_start_logging_method_returns_true(self):
- '''
- tests True logging started.
- '''
- result = boto_cloudtrail.start_logging(Name=trail_ret['Name'], **conn_parameters)
- self.assertTrue(result['started'])
- def test_that_when_start_logging_fails_the_start_logging_method_returns_false(self):
- '''
- tests False logging not started.
- '''
- self.conn.describe_trails.return_value = {'trailList': []}
- self.conn.start_logging.side_effect = ClientError(error_content, 'start_logging')
- result = boto_cloudtrail.start_logging(Name=trail_ret['Name'], **conn_parameters)
- self.assertFalse(result['started'])
- def test_that_when_stopping_logging_succeeds_the_stop_logging_method_returns_true(self):
- '''
- tests True logging stopped.
- '''
- result = boto_cloudtrail.stop_logging(Name=trail_ret['Name'], **conn_parameters)
- self.assertTrue(result['stopped'])
- def test_that_when_stop_logging_fails_the_stop_logging_method_returns_false(self):
- '''
- tests False logging not stopped.
- '''
- self.conn.describe_trails.return_value = {'trailList': []}
- self.conn.stop_logging.side_effect = ClientError(error_content, 'stop_logging')
- result = boto_cloudtrail.stop_logging(Name=trail_ret['Name'], **conn_parameters)
- self.assertFalse(result['stopped'])
- def test_that_when_adding_tags_succeeds_the_add_tags_method_returns_true(self):
- '''
- tests True tags added.
- '''
- with patch.dict(boto_cloudtrail.__salt__, {'boto_iam.get_account_id': MagicMock(return_value='1234')}):
- result = boto_cloudtrail.add_tags(Name=trail_ret['Name'], a='b', **conn_parameters)
- self.assertTrue(result['tagged'])
- def test_that_when_adding_tags_fails_the_add_tags_method_returns_false(self):
- '''
- tests False tags not added.
- '''
- self.conn.add_tags.side_effect = ClientError(error_content, 'add_tags')
- with patch.dict(boto_cloudtrail.__salt__, {'boto_iam.get_account_id': MagicMock(return_value='1234')}):
- result = boto_cloudtrail.add_tags(Name=trail_ret['Name'], a='b', **conn_parameters)
- self.assertFalse(result['tagged'])
- def test_that_when_removing_tags_succeeds_the_remove_tags_method_returns_true(self):
- '''
- tests True tags removed.
- '''
- with patch.dict(boto_cloudtrail.__salt__, {'boto_iam.get_account_id': MagicMock(return_value='1234')}):
- result = boto_cloudtrail.remove_tags(Name=trail_ret['Name'], a='b', **conn_parameters)
- self.assertTrue(result['tagged'])
- def test_that_when_removing_tags_fails_the_remove_tags_method_returns_false(self):
- '''
- tests False tags not removed.
- '''
- self.conn.remove_tags.side_effect = ClientError(error_content, 'remove_tags')
- with patch.dict(boto_cloudtrail.__salt__, {'boto_iam.get_account_id': MagicMock(return_value='1234')}):
- result = boto_cloudtrail.remove_tags(Name=trail_ret['Name'], a='b', **conn_parameters)
- self.assertFalse(result['tagged'])
- def test_that_when_listing_tags_succeeds_the_list_tags_method_returns_true(self):
- '''
- tests True tags listed.
- '''
- with patch.dict(boto_cloudtrail.__salt__, {'boto_iam.get_account_id': MagicMock(return_value='1234')}):
- result = boto_cloudtrail.list_tags(Name=trail_ret['Name'], **conn_parameters)
- self.assertEqual(result['tags'], {})
- def test_that_when_listing_tags_fails_the_list_tags_method_returns_false(self):
- '''
- tests False tags not listed.
- '''
- self.conn.list_tags.side_effect = ClientError(error_content, 'list_tags')
- with patch.dict(boto_cloudtrail.__salt__, {'boto_iam.get_account_id': MagicMock(return_value='1234')}):
- result = boto_cloudtrail.list_tags(Name=trail_ret['Name'], **conn_parameters)
- self.assertTrue(result['error'])
|