123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- # -*- coding: utf-8 -*-
- """
- integration.reactor.reactor
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Test Salt's reactor system
- """
- from __future__ import absolute_import
- import signal
- import pytest
- import salt.utils.event
- import salt.utils.reactor
- from tests.support.case import ShellCase
- from tests.support.mixins import SaltMinionEventAssertsMixin
- from tests.support.unit import skipIf
- class TimeoutException(Exception):
- pass
- @pytest.mark.windows_whitelisted
- class ReactorTest(SaltMinionEventAssertsMixin, ShellCase):
- """
- Test Salt's reactor system
- """
- def setUp(self):
- self.timeout = 30
- def get_event(self, class_type="master"):
- return salt.utils.event.get_event(
- class_type,
- sock_dir=self.master_opts["sock_dir"],
- transport=self.master_opts["transport"],
- keep_loop=True,
- opts=self.master_opts,
- )
- def fire_event(self, tag, data):
- event = self.get_event()
- event.fire_event(tag, data)
- def alarm_handler(self, signal, frame):
- raise TimeoutException("Timeout of {0} seconds reached".format(self.timeout))
- def test_ping_reaction(self):
- """
- Fire an event on the master and ensure
- that it pings the minion
- """
- # Create event bus connection
- e = salt.utils.event.get_event(
- "minion", sock_dir=self.minion_opts["sock_dir"], opts=self.minion_opts
- )
- e.fire_event({"a": "b"}, "/test_event")
- self.assertMinionEventReceived({"a": "b"}, timeout=30)
- @skipIf(salt.utils.platform.is_windows(), "no sigalarm on windows")
- def test_reactor_reaction(self):
- """
- Fire an event on the master and ensure
- The reactor event responds
- """
- signal.signal(signal.SIGALRM, self.alarm_handler)
- signal.alarm(self.timeout)
- master_event = self.get_event()
- master_event.fire_event({"id": "minion"}, "salt/test/reactor")
- try:
- while True:
- event = master_event.get_event(full=True)
- if event is None:
- continue
- if event.get("tag") == "test_reaction":
- self.assertTrue(event["data"]["test_reaction"])
- break
- finally:
- signal.alarm(0)
- @skipIf(salt.utils.platform.is_windows(), "no sigalarm on windows")
- def test_reactor_is_leader(self):
- """
- when leader is set to false reactor should timeout/not do anything
- """
- # by default reactor should be leader
- ret = self.run_run_plus("reactor.is_leader")
- self.assertTrue(ret["return"])
- # make reactor not leader
- self.run_run_plus("reactor.set_leader", False)
- ret = self.run_run_plus("reactor.is_leader")
- self.assertFalse(ret["return"])
- signal.signal(signal.SIGALRM, self.alarm_handler)
- signal.alarm(self.timeout)
- try:
- master_event = self.get_event()
- self.fire_event({"id": "minion"}, "salt/test/reactor")
- while True:
- event = master_event.get_event(full=True)
- if event is None:
- continue
- if event.get("tag") == "test_reaction":
- # if we reach this point, the test is a failure
- self.assertTrue(False) # pylint: disable=redundant-unittest-assert
- break
- except TimeoutException as exc:
- self.assertTrue("Timeout" in str(exc))
- finally:
- signal.alarm(0)
- # make reactor leader again
- self.run_run_plus("reactor.set_leader", True)
- ret = self.run_run_plus("reactor.is_leader")
- self.assertTrue(ret["return"])
- # trigger a reaction
- signal.alarm(self.timeout)
- try:
- master_event = self.get_event()
- self.fire_event({"id": "minion"}, "salt/test/reactor")
- while True:
- event = master_event.get_event(full=True)
- if event is None:
- continue
- if event.get("tag") == "test_reaction":
- self.assertTrue(event["data"]["test_reaction"])
- break
- finally:
- signal.alarm(0)
|