test_reactor.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. """
  2. integration.reactor.reactor
  3. ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. Test Salt's reactor system
  5. """
  6. import logging
  7. import signal
  8. import pytest
  9. import salt.utils.event
  10. import salt.utils.reactor
  11. from tests.support.case import ShellCase
  12. from tests.support.mixins import SaltMinionEventAssertsMixin
  13. from tests.support.unit import skipIf
  14. log = logging.getLogger(__name__)
  15. class TimeoutException(Exception):
  16. pass
  17. @pytest.mark.windows_whitelisted
  18. class ReactorTest(SaltMinionEventAssertsMixin, ShellCase):
  19. """
  20. Test Salt's reactor system
  21. """
  22. def setUp(self):
  23. self.timeout = 30
  24. def get_event(self, class_type="master"):
  25. return salt.utils.event.get_event(
  26. class_type,
  27. sock_dir=self.master_opts["sock_dir"],
  28. transport=self.master_opts["transport"],
  29. keep_loop=True,
  30. opts=self.master_opts,
  31. )
  32. def fire_event(self, tag, data):
  33. event = self.get_event()
  34. event.fire_event(tag, data)
  35. def alarm_handler(self, signal, frame):
  36. raise TimeoutException("Timeout of {} seconds reached".format(self.timeout))
  37. def test_ping_reaction(self):
  38. """
  39. Fire an event on the master and ensure
  40. that it pings the minion
  41. """
  42. # Create event bus connection
  43. e = salt.utils.event.get_event(
  44. "minion", sock_dir=self.minion_opts["sock_dir"], opts=self.minion_opts
  45. )
  46. e.fire_event({"a": "b"}, "/test_event")
  47. self.assertMinionEventReceived({"a": "b"}, timeout=30)
  48. @skipIf(salt.utils.platform.is_windows(), "no sigalarm on windows")
  49. def test_reactor_reaction(self):
  50. """
  51. Fire an event on the master and ensure
  52. The reactor event responds
  53. """
  54. signal.signal(signal.SIGALRM, self.alarm_handler)
  55. signal.alarm(self.timeout)
  56. master_event = self.get_event()
  57. master_event.fire_event({"id": "minion"}, "salt/test/reactor")
  58. try:
  59. while True:
  60. event = master_event.get_event(full=True)
  61. if event is None:
  62. continue
  63. if event.get("tag") == "test_reaction":
  64. self.assertTrue(event["data"]["test_reaction"])
  65. break
  66. finally:
  67. signal.alarm(0)
  68. @skipIf(salt.utils.platform.is_windows(), "no sigalarm on windows")
  69. def test_reactor_is_leader(self):
  70. """
  71. If reactor system is unavailable, an exception is thrown.
  72. When leader is true (the default), the reacion event should return.
  73. When leader is set to false reactor should timeout/not do anything.
  74. """
  75. ret = self.run_run_plus("reactor.is_leader")
  76. self.assertIn("CommandExecutionError", ret["return"])
  77. self.run_run_plus("reactor.set_leader", False)
  78. self.assertIn("CommandExecutionError", ret["return"])
  79. ret = self.run_run_plus("reactor.is_leader")
  80. self.assertIn("CommandExecutionError", ret["return"])
  81. # by default reactor should be leader
  82. signal.signal(signal.SIGALRM, self.alarm_handler)
  83. signal.alarm(self.timeout)
  84. # make reactor not the leader
  85. # ensure reactor engine is available
  86. opts_overrides = {
  87. "engines": [
  88. {
  89. "reactor": {
  90. "refresh_interval": 60,
  91. "worker_threads": 10,
  92. "worker_hwm": 10000,
  93. }
  94. }
  95. ]
  96. }
  97. self.run_run_plus("reactor.set_leader", False, opts_overrides=opts_overrides)
  98. ret = self.run_run_plus("reactor.is_leader", opts_overrides=opts_overrides)
  99. self.assertFalse(ret["return"])
  100. try:
  101. master_event = self.get_event()
  102. self.fire_event({"id": "minion"}, "salt/test/reactor")
  103. while True:
  104. event = master_event.get_event(full=True)
  105. if event is None:
  106. continue
  107. if event.get("tag") == "test_reaction":
  108. # if we reach this point, the test is a failure
  109. self.assertTrue(True) # pylint: disable=redundant-unittest-assert
  110. break
  111. except TimeoutException as exc:
  112. self.assertTrue("Timeout" in str(exc))
  113. finally:
  114. signal.alarm(0)
  115. # make reactor the leader again
  116. # ensure reactor engine is available
  117. opts_overrides = {
  118. "engines": [
  119. {
  120. "reactor": {
  121. "refresh_interval": 60,
  122. "worker_threads": 10,
  123. "worker_hwm": 10000,
  124. }
  125. }
  126. ]
  127. }
  128. self.run_run_plus("reactor.set_leader", True, opts_overrides=opts_overrides)
  129. ret = self.run_run_plus("reactor.is_leader", opts_overrides=opts_overrides)
  130. self.assertTrue(ret["return"])
  131. # trigger a reaction
  132. signal.alarm(self.timeout)
  133. try:
  134. master_event = self.get_event()
  135. self.fire_event({"id": "minion"}, "salt/test/reactor")
  136. while True:
  137. event = master_event.get_event(full=True)
  138. if event is None:
  139. continue
  140. if event.get("tag") == "test_reaction":
  141. self.assertTrue(event["data"]["test_reaction"])
  142. break
  143. finally:
  144. signal.alarm(0)