123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- """
- integration.reactor.reactor
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Test Salt's reactor system
- """
- import logging
- import signal
- import pytest
- import salt.utils.event
- import salt.utils.reactor
- from tests.support.case import ShellCase
- from tests.support.mixins import SaltMinionEventAssertsMixin
- from tests.support.unit import skipIf
- log = logging.getLogger(__name__)
- class TimeoutException(Exception):
- pass
- @pytest.mark.windows_whitelisted
- class ReactorTest(SaltMinionEventAssertsMixin, ShellCase):
- """
- Test Salt's reactor system
- """
- def setUp(self):
- self.timeout = 30
- def get_event(self, class_type="master"):
- return salt.utils.event.get_event(
- class_type,
- sock_dir=self.master_opts["sock_dir"],
- transport=self.master_opts["transport"],
- keep_loop=True,
- opts=self.master_opts,
- )
- def fire_event(self, tag, data):
- event = self.get_event()
- event.fire_event(tag, data)
- def alarm_handler(self, signal, frame):
- raise TimeoutException("Timeout of {} seconds reached".format(self.timeout))
- def test_ping_reaction(self):
- """
- Fire an event on the master and ensure
- that it pings the minion
- """
- # Create event bus connection
- e = salt.utils.event.get_event(
- "minion", sock_dir=self.minion_opts["sock_dir"], opts=self.minion_opts
- )
- e.fire_event({"a": "b"}, "/test_event")
- self.assertMinionEventReceived({"a": "b"}, timeout=30)
- @skipIf(salt.utils.platform.is_windows(), "no sigalarm on windows")
- def test_reactor_reaction(self):
- """
- Fire an event on the master and ensure
- The reactor event responds
- """
- signal.signal(signal.SIGALRM, self.alarm_handler)
- signal.alarm(self.timeout)
- master_event = self.get_event()
- master_event.fire_event({"id": "minion"}, "salt/test/reactor")
- try:
- while True:
- event = master_event.get_event(full=True)
- if event is None:
- continue
- if event.get("tag") == "test_reaction":
- self.assertTrue(event["data"]["test_reaction"])
- break
- finally:
- signal.alarm(0)
- @skipIf(salt.utils.platform.is_windows(), "no sigalarm on windows")
- def test_reactor_is_leader(self):
- """
- If reactor system is unavailable, an exception is thrown.
- When leader is true (the default), the reacion event should return.
- When leader is set to false reactor should timeout/not do anything.
- """
- ret = self.run_run_plus("reactor.is_leader")
- self.assertIn("CommandExecutionError", ret["return"])
- self.run_run_plus("reactor.set_leader", False)
- self.assertIn("CommandExecutionError", ret["return"])
- ret = self.run_run_plus("reactor.is_leader")
- self.assertIn("CommandExecutionError", ret["return"])
- # by default reactor should be leader
- signal.signal(signal.SIGALRM, self.alarm_handler)
- signal.alarm(self.timeout)
- # make reactor not the leader
- # ensure reactor engine is available
- opts_overrides = {
- "engines": [
- {
- "reactor": {
- "refresh_interval": 60,
- "worker_threads": 10,
- "worker_hwm": 10000,
- }
- }
- ]
- }
- self.run_run_plus("reactor.set_leader", False, opts_overrides=opts_overrides)
- ret = self.run_run_plus("reactor.is_leader", opts_overrides=opts_overrides)
- self.assertFalse(ret["return"])
- try:
- master_event = self.get_event()
- self.fire_event({"id": "minion"}, "salt/test/reactor")
- while True:
- event = master_event.get_event(full=True)
- if event is None:
- continue
- if event.get("tag") == "test_reaction":
- # if we reach this point, the test is a failure
- self.assertTrue(True) # pylint: disable=redundant-unittest-assert
- break
- except TimeoutException as exc:
- self.assertTrue("Timeout" in str(exc))
- finally:
- signal.alarm(0)
- # make reactor the leader again
- # ensure reactor engine is available
- opts_overrides = {
- "engines": [
- {
- "reactor": {
- "refresh_interval": 60,
- "worker_threads": 10,
- "worker_hwm": 10000,
- }
- }
- ]
- }
- self.run_run_plus("reactor.set_leader", True, opts_overrides=opts_overrides)
- ret = self.run_run_plus("reactor.is_leader", opts_overrides=opts_overrides)
- self.assertTrue(ret["return"])
- # trigger a reaction
- signal.alarm(self.timeout)
- try:
- master_event = self.get_event()
- self.fire_event({"id": "minion"}, "salt/test/reactor")
- while True:
- event = master_event.get_event(full=True)
- if event is None:
- continue
- if event.get("tag") == "test_reaction":
- self.assertTrue(event["data"]["test_reaction"])
- break
- finally:
- signal.alarm(0)
|