test_boto_cloudtrail.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. # -*- coding: utf-8 -*-
  2. # Import Python libs
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. import random
  5. import string
  6. import logging
  7. # Import Salt Testing libs
  8. from tests.support.mixins import LoaderModuleMockMixin
  9. from tests.support.unit import skipIf, TestCase
  10. from tests.support.mock import (
  11. MagicMock,
  12. patch
  13. )
  14. # Import Salt libs
  15. import salt.config
  16. import salt.loader
  17. import salt.modules.boto_cloudtrail as boto_cloudtrail
  18. from salt.utils.versions import LooseVersion
  19. from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
  20. # Import 3rd-party libs
  21. # pylint: disable=import-error,no-name-in-module,unused-import
  22. try:
  23. import boto
  24. import boto3
  25. from botocore.exceptions import ClientError
  26. HAS_BOTO = True
  27. except ImportError:
  28. HAS_BOTO = False
  29. # pylint: enable=import-error,no-name-in-module,unused-import
  30. # the boto_cloudtrail module relies on the connect_to_region() method
  31. # which was added in boto 2.8.0
  32. # https://github.com/boto/boto/commit/33ac26b416fbb48a60602542b4ce15dcc7029f12
  33. required_boto3_version = '1.2.1'
  34. log = logging.getLogger(__name__)
  35. def _has_required_boto():
  36. '''
  37. Returns True/False boolean depending on if Boto is installed and correct
  38. version.
  39. '''
  40. if not HAS_BOTO:
  41. return False
  42. elif LooseVersion(boto3.__version__) < LooseVersion(required_boto3_version):
  43. return False
  44. else:
  45. return True
  46. if _has_required_boto():
  47. region = 'us-east-1'
  48. access_key = 'GKTADJGHEIQSXMKKRBJ08H'
  49. secret_key = 'askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs'
  50. conn_parameters = {'region': region, 'key': access_key, 'keyid': secret_key, 'profile': {}}
  51. error_message = 'An error occurred (101) when calling the {0} operation: Test-defined error'
  52. not_found_error = ClientError({
  53. 'Error': {
  54. 'Code': 'TrailNotFoundException',
  55. 'Message': "Test-defined error"
  56. }
  57. }, 'msg')
  58. error_content = {
  59. 'Error': {
  60. 'Code': 101,
  61. 'Message': "Test-defined error"
  62. }
  63. }
  64. trail_ret = dict(Name='testtrail',
  65. IncludeGlobalServiceEvents=True,
  66. KmsKeyId=None,
  67. LogFileValidationEnabled=False,
  68. S3BucketName='auditinfo',
  69. TrailARN='arn:aws:cloudtrail:us-east-1:214351231622:trail/testtrail')
  70. status_ret = dict(IsLogging=False,
  71. LatestCloudWatchLogsDeliveryError=None,
  72. LatestCloudWatchLogsDeliveryTime=None,
  73. LatestDeliveryError=None,
  74. LatestDeliveryTime=None,
  75. LatestDigestDeliveryError=None,
  76. LatestDigestDeliveryTime=None,
  77. LatestNotificationError=None,
  78. LatestNotificationTime=None,
  79. StartLoggingTime=None,
  80. StopLoggingTime=None)
  81. @skipIf(HAS_BOTO is False, 'The boto module must be installed.')
  82. @skipIf(_has_required_boto() is False, 'The boto3 module must be greater than'
  83. ' or equal to version {0}'
  84. .format(required_boto3_version))
  85. class BotoCloudTrailTestCaseBase(TestCase, LoaderModuleMockMixin):
  86. conn = None
  87. def setup_loader_modules(self):
  88. self.opts = opts = salt.config.DEFAULT_MINION_OPTS
  89. utils = salt.loader.utils(
  90. opts,
  91. whitelist=['boto3', 'args', 'systemd', 'path', 'platform'],
  92. context={})
  93. return {
  94. boto_cloudtrail: {
  95. '__utils__': utils,
  96. }
  97. }
  98. def setUp(self):
  99. super(BotoCloudTrailTestCaseBase, self).setUp()
  100. boto_cloudtrail.__init__(self.opts)
  101. del self.opts
  102. # Set up MagicMock to replace the boto3 session
  103. # connections keep getting cached from prior tests, can't find the
  104. # correct context object to clear it. So randomize the cache key, to prevent any
  105. # cache hits
  106. conn_parameters['key'] = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(50))
  107. self.patcher = patch('boto3.session.Session')
  108. self.addCleanup(self.patcher.stop)
  109. self.addCleanup(delattr, self, 'patcher')
  110. mock_session = self.patcher.start()
  111. session_instance = mock_session.return_value
  112. self.conn = MagicMock()
  113. self.addCleanup(delattr, self, 'conn')
  114. session_instance.client.return_value = self.conn
  115. class BotoCloudTrailTestCaseMixin(object):
  116. pass
  117. class BotoCloudTrailTestCase(BotoCloudTrailTestCaseBase, BotoCloudTrailTestCaseMixin):
  118. '''
  119. TestCase for salt.modules.boto_cloudtrail module
  120. '''
  121. def test_that_when_checking_if_a_trail_exists_and_a_trail_exists_the_trail_exists_method_returns_true(self):
  122. '''
  123. Tests checking cloudtrail trail existence when the cloudtrail trail already exists
  124. '''
  125. self.conn.get_trail_status.return_value = trail_ret
  126. result = boto_cloudtrail.exists(Name=trail_ret['Name'], **conn_parameters)
  127. self.assertTrue(result['exists'])
  128. def test_that_when_checking_if_a_trail_exists_and_a_trail_does_not_exist_the_trail_exists_method_returns_false(self):
  129. '''
  130. Tests checking cloudtrail trail existence when the cloudtrail trail does not exist
  131. '''
  132. self.conn.get_trail_status.side_effect = not_found_error
  133. result = boto_cloudtrail.exists(Name='mytrail', **conn_parameters)
  134. self.assertFalse(result['exists'])
  135. def test_that_when_checking_if_a_trail_exists_and_boto3_returns_an_error_the_trail_exists_method_returns_error(self):
  136. '''
  137. Tests checking cloudtrail trail existence when boto returns an error
  138. '''
  139. self.conn.get_trail_status.side_effect = ClientError(error_content, 'get_trail_status')
  140. result = boto_cloudtrail.exists(Name='mytrail', **conn_parameters)
  141. self.assertEqual(result.get('error', {}).get('message'), error_message.format('get_trail_status'))
  142. def test_that_when_creating_a_trail_succeeds_the_create_trail_method_returns_true(self):
  143. '''
  144. tests True trail created.
  145. '''
  146. self.conn.create_trail.return_value = trail_ret
  147. result = boto_cloudtrail.create(Name=trail_ret['Name'],
  148. S3BucketName=trail_ret['S3BucketName'],
  149. **conn_parameters)
  150. self.assertTrue(result['created'])
  151. def test_that_when_creating_a_trail_fails_the_create_trail_method_returns_error(self):
  152. '''
  153. tests False trail not created.
  154. '''
  155. self.conn.create_trail.side_effect = ClientError(error_content, 'create_trail')
  156. result = boto_cloudtrail.create(Name=trail_ret['Name'],
  157. S3BucketName=trail_ret['S3BucketName'],
  158. **conn_parameters)
  159. self.assertEqual(result.get('error', {}).get('message'), error_message.format('create_trail'))
  160. def test_that_when_deleting_a_trail_succeeds_the_delete_trail_method_returns_true(self):
  161. '''
  162. tests True trail deleted.
  163. '''
  164. result = boto_cloudtrail.delete(Name='testtrail',
  165. **conn_parameters)
  166. self.assertTrue(result['deleted'])
  167. def test_that_when_deleting_a_trail_fails_the_delete_trail_method_returns_false(self):
  168. '''
  169. tests False trail not deleted.
  170. '''
  171. self.conn.delete_trail.side_effect = ClientError(error_content, 'delete_trail')
  172. result = boto_cloudtrail.delete(Name='testtrail',
  173. **conn_parameters)
  174. self.assertFalse(result['deleted'])
  175. def test_that_when_describing_trail_it_returns_the_dict_of_properties_returns_true(self):
  176. '''
  177. Tests describing parameters if trail exists
  178. '''
  179. self.conn.describe_trails.return_value = {'trailList': [trail_ret]}
  180. result = boto_cloudtrail.describe(Name=trail_ret['Name'], **conn_parameters)
  181. self.assertTrue(result['trail'])
  182. def test_that_when_describing_trail_it_returns_the_dict_of_properties_returns_false(self):
  183. '''
  184. Tests describing parameters if trail does not exist
  185. '''
  186. self.conn.describe_trails.side_effect = not_found_error
  187. result = boto_cloudtrail.describe(Name='testtrail', **conn_parameters)
  188. self.assertFalse(result['trail'])
  189. def test_that_when_describing_trail_on_client_error_it_returns_error(self):
  190. '''
  191. Tests describing parameters failure
  192. '''
  193. self.conn.describe_trails.side_effect = ClientError(error_content, 'get_trail')
  194. result = boto_cloudtrail.describe(Name='testtrail', **conn_parameters)
  195. self.assertTrue('error' in result)
  196. def test_that_when_getting_status_it_returns_the_dict_of_properties_returns_true(self):
  197. '''
  198. Tests getting status if trail exists
  199. '''
  200. self.conn.get_trail_status.return_value = status_ret
  201. result = boto_cloudtrail.status(Name=trail_ret['Name'], **conn_parameters)
  202. self.assertTrue(result['trail'])
  203. def test_that_when_getting_status_it_returns_the_dict_of_properties_returns_false(self):
  204. '''
  205. Tests getting status if trail does not exist
  206. '''
  207. self.conn.get_trail_status.side_effect = not_found_error
  208. result = boto_cloudtrail.status(Name='testtrail', **conn_parameters)
  209. self.assertFalse(result['trail'])
  210. def test_that_when_getting_status_on_client_error_it_returns_error(self):
  211. '''
  212. Tests getting status failure
  213. '''
  214. self.conn.get_trail_status.side_effect = ClientError(error_content, 'get_trail_status')
  215. result = boto_cloudtrail.status(Name='testtrail', **conn_parameters)
  216. self.assertTrue('error' in result)
  217. def test_that_when_listing_trails_succeeds_the_list_trails_method_returns_true(self):
  218. '''
  219. tests True trails listed.
  220. '''
  221. self.conn.describe_trails.return_value = {'trailList': [trail_ret]}
  222. result = boto_cloudtrail.list(**conn_parameters)
  223. self.assertTrue(result['trails'])
  224. def test_that_when_listing_trail_fails_the_list_trail_method_returns_false(self):
  225. '''
  226. tests False no trail listed.
  227. '''
  228. self.conn.describe_trails.return_value = {'trailList': []}
  229. result = boto_cloudtrail.list(**conn_parameters)
  230. self.assertFalse(result['trails'])
  231. def test_that_when_listing_trail_fails_the_list_trail_method_returns_error(self):
  232. '''
  233. tests False trail error.
  234. '''
  235. self.conn.describe_trails.side_effect = ClientError(error_content, 'list_trails')
  236. result = boto_cloudtrail.list(**conn_parameters)
  237. self.assertEqual(result.get('error', {}).get('message'), error_message.format('list_trails'))
  238. def test_that_when_updating_a_trail_succeeds_the_update_trail_method_returns_true(self):
  239. '''
  240. tests True trail updated.
  241. '''
  242. self.conn.update_trail.return_value = trail_ret
  243. result = boto_cloudtrail.update(Name=trail_ret['Name'],
  244. S3BucketName=trail_ret['S3BucketName'],
  245. **conn_parameters)
  246. self.assertTrue(result['updated'])
  247. def test_that_when_updating_a_trail_fails_the_update_trail_method_returns_error(self):
  248. '''
  249. tests False trail not updated.
  250. '''
  251. self.conn.update_trail.side_effect = ClientError(error_content, 'update_trail')
  252. result = boto_cloudtrail.update(Name=trail_ret['Name'],
  253. S3BucketName=trail_ret['S3BucketName'],
  254. **conn_parameters)
  255. self.assertEqual(result.get('error', {}).get('message'), error_message.format('update_trail'))
  256. def test_that_when_starting_logging_succeeds_the_start_logging_method_returns_true(self):
  257. '''
  258. tests True logging started.
  259. '''
  260. result = boto_cloudtrail.start_logging(Name=trail_ret['Name'], **conn_parameters)
  261. self.assertTrue(result['started'])
  262. def test_that_when_start_logging_fails_the_start_logging_method_returns_false(self):
  263. '''
  264. tests False logging not started.
  265. '''
  266. self.conn.describe_trails.return_value = {'trailList': []}
  267. self.conn.start_logging.side_effect = ClientError(error_content, 'start_logging')
  268. result = boto_cloudtrail.start_logging(Name=trail_ret['Name'], **conn_parameters)
  269. self.assertFalse(result['started'])
  270. def test_that_when_stopping_logging_succeeds_the_stop_logging_method_returns_true(self):
  271. '''
  272. tests True logging stopped.
  273. '''
  274. result = boto_cloudtrail.stop_logging(Name=trail_ret['Name'], **conn_parameters)
  275. self.assertTrue(result['stopped'])
  276. def test_that_when_stop_logging_fails_the_stop_logging_method_returns_false(self):
  277. '''
  278. tests False logging not stopped.
  279. '''
  280. self.conn.describe_trails.return_value = {'trailList': []}
  281. self.conn.stop_logging.side_effect = ClientError(error_content, 'stop_logging')
  282. result = boto_cloudtrail.stop_logging(Name=trail_ret['Name'], **conn_parameters)
  283. self.assertFalse(result['stopped'])
  284. def test_that_when_adding_tags_succeeds_the_add_tags_method_returns_true(self):
  285. '''
  286. tests True tags added.
  287. '''
  288. with patch.dict(boto_cloudtrail.__salt__, {'boto_iam.get_account_id': MagicMock(return_value='1234')}):
  289. result = boto_cloudtrail.add_tags(Name=trail_ret['Name'], a='b', **conn_parameters)
  290. self.assertTrue(result['tagged'])
  291. def test_that_when_adding_tags_fails_the_add_tags_method_returns_false(self):
  292. '''
  293. tests False tags not added.
  294. '''
  295. self.conn.add_tags.side_effect = ClientError(error_content, 'add_tags')
  296. with patch.dict(boto_cloudtrail.__salt__, {'boto_iam.get_account_id': MagicMock(return_value='1234')}):
  297. result = boto_cloudtrail.add_tags(Name=trail_ret['Name'], a='b', **conn_parameters)
  298. self.assertFalse(result['tagged'])
  299. def test_that_when_removing_tags_succeeds_the_remove_tags_method_returns_true(self):
  300. '''
  301. tests True tags removed.
  302. '''
  303. with patch.dict(boto_cloudtrail.__salt__, {'boto_iam.get_account_id': MagicMock(return_value='1234')}):
  304. result = boto_cloudtrail.remove_tags(Name=trail_ret['Name'], a='b', **conn_parameters)
  305. self.assertTrue(result['tagged'])
  306. def test_that_when_removing_tags_fails_the_remove_tags_method_returns_false(self):
  307. '''
  308. tests False tags not removed.
  309. '''
  310. self.conn.remove_tags.side_effect = ClientError(error_content, 'remove_tags')
  311. with patch.dict(boto_cloudtrail.__salt__, {'boto_iam.get_account_id': MagicMock(return_value='1234')}):
  312. result = boto_cloudtrail.remove_tags(Name=trail_ret['Name'], a='b', **conn_parameters)
  313. self.assertFalse(result['tagged'])
  314. def test_that_when_listing_tags_succeeds_the_list_tags_method_returns_true(self):
  315. '''
  316. tests True tags listed.
  317. '''
  318. with patch.dict(boto_cloudtrail.__salt__, {'boto_iam.get_account_id': MagicMock(return_value='1234')}):
  319. result = boto_cloudtrail.list_tags(Name=trail_ret['Name'], **conn_parameters)
  320. self.assertEqual(result['tags'], {})
  321. def test_that_when_listing_tags_fails_the_list_tags_method_returns_false(self):
  322. '''
  323. tests False tags not listed.
  324. '''
  325. self.conn.list_tags.side_effect = ClientError(error_content, 'list_tags')
  326. with patch.dict(boto_cloudtrail.__salt__, {'boto_iam.get_account_id': MagicMock(return_value='1234')}):
  327. result = boto_cloudtrail.list_tags(Name=trail_ret['Name'], **conn_parameters)
  328. self.assertTrue(result['error'])