123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- """
- :codeauthor: Pedro Algarvio (pedro@algarvio.me)
- tests.unit.utils.event_test
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~
- """
- import hashlib
- import os
- import shutil
- import time
- import salt.config
- import salt.ext.tornado.ioloop
- import salt.utils.event
- import salt.utils.stringutils
- import zmq
- import zmq.eventloop.ioloop
- from salt.ext.six.moves import range
- from salt.ext.tornado.testing import AsyncTestCase
- from saltfactories.utils.processes import terminate_process
- from tests.support.events import eventpublisher_process, eventsender_process
- from tests.support.helpers import slowTest
- from tests.support.runtests import RUNTIME_VARS
- from tests.support.unit import TestCase, expectedFailure, skipIf
- # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
- if not hasattr(zmq.eventloop.ioloop, "ZMQIOLoop"):
- zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
- NO_LONG_IPC = False
- if getattr(zmq, "IPC_PATH_MAX_LEN", 103) <= 103:
- NO_LONG_IPC = True
- @skipIf(
- NO_LONG_IPC, "This system does not support long IPC paths. Skipping event tests!"
- )
- class TestSaltEvent(TestCase):
- def setUp(self):
- self.sock_dir = os.path.join(RUNTIME_VARS.TMP, "test-socks")
- if not os.path.exists(self.sock_dir):
- os.makedirs(self.sock_dir)
- self.addCleanup(shutil.rmtree, self.sock_dir, ignore_errors=True)
- def assertGotEvent(self, evt, data, msg=None):
- self.assertIsNotNone(evt, msg)
- for key in data:
- self.assertIn(key, evt, "{}: Key {} missing".format(msg, key))
- assertMsg = "{0}: Key {1} value mismatch, {2} != {3}"
- assertMsg = assertMsg.format(msg, key, data[key], evt[key])
- self.assertEqual(data[key], evt[key], assertMsg)
- def test_master_event(self):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=False)
- self.assertEqual(
- me.puburi, "{}".format(os.path.join(self.sock_dir, "master_event_pub.ipc"))
- )
- self.assertEqual(
- me.pulluri,
- "{}".format(os.path.join(self.sock_dir, "master_event_pull.ipc")),
- )
- def test_minion_event(self):
- opts = dict(id="foo", sock_dir=self.sock_dir)
- id_hash = hashlib.sha256(
- salt.utils.stringutils.to_bytes(opts["id"])
- ).hexdigest()[:10]
- me = salt.utils.event.MinionEvent(opts, listen=False)
- self.assertEqual(
- me.puburi,
- "{}".format(
- os.path.join(self.sock_dir, "minion_event_{}_pub.ipc".format(id_hash))
- ),
- )
- self.assertEqual(
- me.pulluri,
- "{}".format(
- os.path.join(self.sock_dir, "minion_event_{}_pull.ipc".format(id_hash))
- ),
- )
- def test_minion_event_tcp_ipc_mode(self):
- opts = dict(id="foo", ipc_mode="tcp")
- me = salt.utils.event.MinionEvent(opts, listen=False)
- self.assertEqual(me.puburi, 4510)
- self.assertEqual(me.pulluri, 4511)
- def test_minion_event_no_id(self):
- me = salt.utils.event.MinionEvent(dict(sock_dir=self.sock_dir), listen=False)
- id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes("")).hexdigest()[:10]
- self.assertEqual(
- me.puburi,
- "{}".format(
- os.path.join(self.sock_dir, "minion_event_{}_pub.ipc".format(id_hash))
- ),
- )
- self.assertEqual(
- me.pulluri,
- "{}".format(
- os.path.join(self.sock_dir, "minion_event_{}_pull.ipc".format(id_hash))
- ),
- )
- @slowTest
- def test_event_single(self):
- """Test a single event is received"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- me.fire_event({"data": "foo1"}, "evt1")
- evt1 = me.get_event(tag="evt1")
- self.assertGotEvent(evt1, {"data": "foo1"})
- @slowTest
- def test_event_single_no_block(self):
- """Test a single event is received, no block"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- start = time.time()
- finish = start + 5
- evt1 = me.get_event(wait=0, tag="evt1", no_block=True)
- # We should get None and way before the 5 seconds wait since it's
- # non-blocking, otherwise it would wait for an event which we
- # didn't even send
- self.assertIsNone(evt1, None)
- self.assertLess(start, finish)
- me.fire_event({"data": "foo1"}, "evt1")
- evt1 = me.get_event(wait=0, tag="evt1")
- self.assertGotEvent(evt1, {"data": "foo1"})
- @slowTest
- def test_event_single_wait_0_no_block_False(self):
- """Test a single event is received with wait=0 and no_block=False and doesn't spin the while loop"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- me.fire_event({"data": "foo1"}, "evt1")
- # This is too fast and will be None but assures we're not blocking
- evt1 = me.get_event(wait=0, tag="evt1", no_block=False)
- self.assertGotEvent(evt1, {"data": "foo1"})
- @slowTest
- def test_event_timeout(self):
- """Test no event is received if the timeout is reached"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- me.fire_event({"data": "foo1"}, "evt1")
- evt1 = me.get_event(tag="evt1")
- self.assertGotEvent(evt1, {"data": "foo1"})
- evt2 = me.get_event(tag="evt1")
- self.assertIsNone(evt2)
- @slowTest
- def test_event_no_timeout(self):
- """Test no wait timeout, we should block forever, until we get one """
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- with eventsender_process({"data": "foo2"}, "evt2", self.sock_dir, 5):
- evt = me.get_event(tag="evt2", wait=0, no_block=False)
- self.assertGotEvent(evt, {"data": "foo2"})
- @slowTest
- def test_event_matching(self):
- """Test a startswith match"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- me.fire_event({"data": "foo1"}, "evt1")
- evt1 = me.get_event(tag="ev")
- self.assertGotEvent(evt1, {"data": "foo1"})
- @slowTest
- def test_event_matching_regex(self):
- """Test a regex match"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- me.fire_event({"data": "foo1"}, "evt1")
- evt1 = me.get_event(tag="^ev", match_type="regex")
- self.assertGotEvent(evt1, {"data": "foo1"})
- @slowTest
- def test_event_matching_all(self):
- """Test an all match"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- me.fire_event({"data": "foo1"}, "evt1")
- evt1 = me.get_event(tag="")
- self.assertGotEvent(evt1, {"data": "foo1"})
- @slowTest
- def test_event_matching_all_when_tag_is_None(self):
- """Test event matching all when not passing a tag"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- me.fire_event({"data": "foo1"}, "evt1")
- evt1 = me.get_event()
- self.assertGotEvent(evt1, {"data": "foo1"})
- @slowTest
- def test_event_not_subscribed(self):
- """Test get_event drops non-subscribed events"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- me.fire_event({"data": "foo1"}, "evt1")
- me.fire_event({"data": "foo2"}, "evt2")
- evt2 = me.get_event(tag="evt2")
- evt1 = me.get_event(tag="evt1")
- self.assertGotEvent(evt2, {"data": "foo2"})
- self.assertIsNone(evt1)
- @slowTest
- def test_event_subscription_cache(self):
- """Test subscriptions cache a message until requested"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- me.subscribe("evt1")
- me.fire_event({"data": "foo1"}, "evt1")
- me.fire_event({"data": "foo2"}, "evt2")
- evt2 = me.get_event(tag="evt2")
- evt1 = me.get_event(tag="evt1")
- self.assertGotEvent(evt2, {"data": "foo2"})
- self.assertGotEvent(evt1, {"data": "foo1"})
- @slowTest
- def test_event_subscriptions_cache_regex(self):
- """Test regex subscriptions cache a message until requested"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- me.subscribe("e..1$", "regex")
- me.fire_event({"data": "foo1"}, "evt1")
- me.fire_event({"data": "foo2"}, "evt2")
- evt2 = me.get_event(tag="evt2")
- evt1 = me.get_event(tag="evt1")
- self.assertGotEvent(evt2, {"data": "foo2"})
- self.assertGotEvent(evt1, {"data": "foo1"})
- @slowTest
- def test_event_multiple_clients(self):
- """Test event is received by multiple clients"""
- with eventpublisher_process(self.sock_dir):
- me1 = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- me2 = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- # We need to sleep here to avoid a race condition wherein
- # the second socket may not be connected by the time the first socket
- # sends the event.
- time.sleep(0.5)
- me1.fire_event({"data": "foo1"}, "evt1")
- evt1 = me1.get_event(tag="evt1")
- self.assertGotEvent(evt1, {"data": "foo1"})
- evt2 = me2.get_event(tag="evt1")
- self.assertGotEvent(evt2, {"data": "foo1"})
- @expectedFailure
- def test_event_nested_sub_all(self):
- """Test nested event subscriptions do not drop events, get event for all tags"""
- # Show why not to call get_event(tag='')
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- me.fire_event({"data": "foo1"}, "evt1")
- me.fire_event({"data": "foo2"}, "evt2")
- evt2 = me.get_event(tag="")
- evt1 = me.get_event(tag="")
- self.assertGotEvent(evt2, {"data": "foo2"})
- self.assertGotEvent(evt1, {"data": "foo1"})
- @slowTest
- def test_event_many(self):
- """Test a large number of events, one at a time"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- for i in range(500):
- me.fire_event({"data": "{}".format(i)}, "testevents")
- evt = me.get_event(tag="testevents")
- self.assertGotEvent(evt, {"data": "{}".format(i)}, "Event {}".format(i))
- @slowTest
- def test_event_many_backlog(self):
- """Test a large number of events, send all then recv all"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- # Must not exceed zmq HWM
- for i in range(500):
- me.fire_event({"data": "{}".format(i)}, "testevents")
- for i in range(500):
- evt = me.get_event(tag="testevents")
- self.assertGotEvent(evt, {"data": "{}".format(i)}, "Event {}".format(i))
- # Test the fire_master function. As it wraps the underlying fire_event,
- # we don't need to perform extensive testing.
- @slowTest
- def test_send_master_event(self):
- """Tests that sending an event through fire_master generates expected event"""
- with eventpublisher_process(self.sock_dir):
- me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
- data = {"data": "foo1"}
- me.fire_master(data, "test_master")
- evt = me.get_event(tag="fire_master")
- self.assertGotEvent(
- evt,
- {"data": data, "tag": "test_master", "events": None, "pretag": None},
- )
- class TestAsyncEventPublisher(AsyncTestCase):
- def get_new_ioloop(self):
- return salt.ext.tornado.ioloop.IOLoop()
- def setUp(self):
- super().setUp()
- self.sock_dir = os.path.join(RUNTIME_VARS.TMP, "test-socks")
- if not os.path.exists(self.sock_dir):
- os.makedirs(self.sock_dir)
- self.addCleanup(shutil.rmtree, self.sock_dir, ignore_errors=True)
- self.opts = {"sock_dir": self.sock_dir}
- self.publisher = salt.utils.event.AsyncEventPublisher(self.opts, self.io_loop,)
- self.event = salt.utils.event.get_event(
- "minion", opts=self.opts, io_loop=self.io_loop
- )
- self.event.subscribe("")
- self.event.set_event_handler(self._handle_publish)
- def _handle_publish(self, raw):
- self.tag, self.data = salt.utils.event.SaltEvent.unpack(raw)
- self.stop()
- def test_event_subscription(self):
- """Test a single event is received"""
- me = salt.utils.event.MinionEvent(self.opts, listen=True)
- me.fire_event({"data": "foo1"}, "evt1")
- self.wait()
- evt1 = me.get_event(tag="evt1")
- self.assertEqual(self.tag, "evt1")
- self.data.pop("_stamp") # drop the stamp
- self.assertEqual(self.data, {"data": "foo1"})
- def test_event_unsubscribe_remove_error(self):
- me = salt.utils.event.MinionEvent(self.opts, listen=True)
- tag = "evt1"
- me.fire_event({"data": "foo1"}, tag)
- # Make sure no remove error is raised when tag is not found
- for _ in range(2):
- me.unsubscribe(tag)
- me.unsubscribe("tag_does_not_exist")
- class TestEventReturn(TestCase):
- @slowTest
- def test_event_return(self):
- evt = None
- try:
- evt = salt.utils.event.EventReturn(salt.config.DEFAULT_MASTER_OPTS.copy())
- evt.start()
- except TypeError as exc:
- if "object" in str(exc):
- self.fail("'{}' TypeError should have not been raised".format(exc))
- finally:
- if evt is not None:
- terminate_process(evt.pid, kill_children=True)
|