test_beacons.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Justin Anderson <janderson@saltstack.com>
  4. '''
  5. # Python Libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import os
  8. # Salt Libs
  9. from salt.exceptions import CommandExecutionError
  10. # Salttesting libs
  11. from tests.support.runtests import RUNTIME_VARS
  12. from tests.support.case import ModuleCase
  13. from tests.support.unit import skipIf
  14. import pytest
  15. @pytest.mark.windows_whitelisted
  16. class BeaconsAddDeleteTest(ModuleCase):
  17. '''
  18. Tests the add and delete functions
  19. '''
  20. def setUp(self):
  21. self.minion_conf_d_dir = os.path.join(
  22. RUNTIME_VARS.TMP_CONF_DIR,
  23. os.path.dirname(self.minion_opts['default_include']))
  24. if not os.path.isdir(self.minion_conf_d_dir):
  25. os.makedirs(self.minion_conf_d_dir)
  26. self.beacons_config_file_path = os.path.join(self.minion_conf_d_dir, 'beacons.conf')
  27. def tearDown(self):
  28. if os.path.isfile(self.beacons_config_file_path):
  29. os.unlink(self.beacons_config_file_path)
  30. # Reset beacons
  31. self.run_function('beacons.reset', f_timeout=300)
  32. def test_add_and_delete(self):
  33. '''
  34. Test adding and deleting a beacon
  35. '''
  36. _add = self.run_function(
  37. 'beacons.add',
  38. ['ps', [{'processes': {'apache2': 'stopped'}}]],
  39. f_timeout=300
  40. )
  41. self.assertTrue(_add['result'])
  42. # save added beacon
  43. _save = self.run_function('beacons.save', f_timeout=300)
  44. self.assertTrue(_save['result'])
  45. # delete the beacon
  46. _delete = self.run_function('beacons.delete', ['ps'], f_timeout=300)
  47. self.assertTrue(_delete['result'])
  48. # save the results
  49. self.run_function('beacons.save', f_timeout=300)
  50. @pytest.mark.windows_whitelisted
  51. class BeaconsTest(ModuleCase):
  52. '''
  53. Tests the beacons execution module
  54. '''
  55. beacons_config_file_path = minion_conf_d_dir = None
  56. @classmethod
  57. def tearDownClass(cls):
  58. if cls.beacons_config_file_path and os.path.isfile(cls.beacons_config_file_path):
  59. os.unlink(cls.beacons_config_file_path)
  60. def setUp(self):
  61. if self.minion_conf_d_dir is None:
  62. self.minion_conf_d_dir = os.path.join(
  63. RUNTIME_VARS.TMP_CONF_DIR,
  64. os.path.dirname(self.minion_opts['default_include']))
  65. if not os.path.isdir(self.minion_conf_d_dir):
  66. os.makedirs(self.minion_conf_d_dir)
  67. self.__class__.beacons_config_file_path = os.path.join(self.minion_conf_d_dir, 'beacons.conf')
  68. try:
  69. # Add beacon to disable
  70. self.run_function('beacons.add',
  71. ['ps', [{'processes': {'apache2': 'stopped'}}]],
  72. f_timeout=300)
  73. self.run_function('beacons.save', f_timeout=300)
  74. except CommandExecutionError:
  75. self.skipTest('Unable to add beacon')
  76. def tearDown(self):
  77. # delete added beacon
  78. self.run_function('beacons.delete', ['ps'], f_timeout=300)
  79. self.run_function('beacons.save', f_timeout=300)
  80. # Reset beacons
  81. self.run_function('beacons.reset', f_timeout=300)
  82. def test_disable(self):
  83. '''
  84. Test disabling beacons
  85. '''
  86. # assert beacon exists
  87. _list = self.run_function('beacons.list',
  88. return_yaml=False,
  89. f_timeout=300)
  90. self.assertIn('ps', _list)
  91. ret = self.run_function('beacons.disable', f_timeout=300)
  92. self.assertTrue(ret['result'])
  93. # assert beacons are disabled
  94. _list = self.run_function('beacons.list',
  95. return_yaml=False,
  96. f_timeout=300)
  97. self.assertFalse(_list['enabled'])
  98. # disable added beacon
  99. ret = self.run_function('beacons.disable_beacon', ['ps'], f_timeout=300)
  100. self.assertTrue(ret['result'])
  101. # assert beacon ps is disabled
  102. _list = self.run_function('beacons.list',
  103. return_yaml=False,
  104. f_timeout=300)
  105. for bdict in _list['ps']:
  106. if 'enabled' in bdict:
  107. self.assertFalse(bdict['enabled'])
  108. break
  109. def test_enable(self):
  110. '''
  111. Test enabling beacons
  112. '''
  113. # assert beacon exists
  114. _list = self.run_function('beacons.list',
  115. return_yaml=False,
  116. f_timeout=300)
  117. self.assertIn('ps', _list)
  118. # enable beacons on minion
  119. ret = self.run_function('beacons.enable', f_timeout=300)
  120. self.assertTrue(ret['result'])
  121. # assert beacons are enabled
  122. _list = self.run_function('beacons.list',
  123. return_yaml=False,
  124. f_timeout=300)
  125. self.assertTrue(_list['enabled'])
  126. @skipIf(True, 'Skip until https://github.com/saltstack/salt/issues/31516 '
  127. 'problems are resolved.')
  128. def test_enabled_beacons(self):
  129. '''
  130. Test enabled specific beacon
  131. '''
  132. # enable added beacon
  133. ret = self.run_function('beacons.enable_beacon', ['ps'], f_timeout=300)
  134. self.assertTrue(ret['result'])
  135. # assert beacon ps is enabled
  136. _list = self.run_function('beacons.list',
  137. return_yaml=False,
  138. f_timeout=300)
  139. self.assertTrue(_list['ps']['enabled'])
  140. def test_list(self):
  141. '''
  142. Test listing the beacons
  143. '''
  144. # list beacons
  145. ret = self.run_function('beacons.list',
  146. return_yaml=False,
  147. f_timeout=300)
  148. if 'enabled' in ret:
  149. self.assertEqual(ret, {'ps': [{'processes': {'apache2': 'stopped'}}], 'enabled': True})
  150. else:
  151. self.assertEqual(ret, {'ps': [{'processes': {'apache2': 'stopped'}}]})
  152. def test_list_available(self):
  153. '''
  154. Test listing the beacons
  155. '''
  156. # list beacons
  157. ret = self.run_function('beacons.list_available',
  158. return_yaml=False,
  159. f_timeout=300)
  160. self.assertTrue(ret)