test_event.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. # -*- coding: utf-8 -*-
  2. """
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.unit.utils.event_test
  5. ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  6. """
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. import hashlib
  9. import os
  10. import shutil
  11. import time
  12. import warnings
  13. import salt.config
  14. import salt.ext.tornado.ioloop
  15. import salt.utils.event
  16. import salt.utils.stringutils
  17. import zmq
  18. import zmq.eventloop.ioloop
  19. from salt.ext.six.moves import range
  20. from salt.ext.tornado.testing import AsyncTestCase
  21. from saltfactories.utils.processes.helpers import terminate_process
  22. from tests.support.events import eventpublisher_process, eventsender_process
  23. from tests.support.helpers import slowTest
  24. from tests.support.runtests import RUNTIME_VARS
  25. from tests.support.unit import TestCase, expectedFailure, skipIf
  26. # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
  27. if not hasattr(zmq.eventloop.ioloop, "ZMQIOLoop"):
  28. zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
  29. NO_LONG_IPC = False
  30. if getattr(zmq, "IPC_PATH_MAX_LEN", 103) <= 103:
  31. NO_LONG_IPC = True
  32. @skipIf(
  33. NO_LONG_IPC, "This system does not support long IPC paths. Skipping event tests!"
  34. )
  35. class TestSaltEvent(TestCase):
  36. def setUp(self):
  37. self.sock_dir = os.path.join(RUNTIME_VARS.TMP, "test-socks")
  38. if not os.path.exists(self.sock_dir):
  39. os.makedirs(self.sock_dir)
  40. self.addCleanup(shutil.rmtree, self.sock_dir, ignore_errors=True)
  41. def assertGotEvent(self, evt, data, msg=None):
  42. self.assertIsNotNone(evt, msg)
  43. for key in data:
  44. self.assertIn(key, evt, "{0}: Key {1} missing".format(msg, key))
  45. assertMsg = "{0}: Key {1} value mismatch, {2} != {3}"
  46. assertMsg = assertMsg.format(msg, key, data[key], evt[key])
  47. self.assertEqual(data[key], evt[key], assertMsg)
  48. def test_master_event(self):
  49. me = salt.utils.event.MasterEvent(self.sock_dir, listen=False)
  50. self.assertEqual(
  51. me.puburi, "{0}".format(os.path.join(self.sock_dir, "master_event_pub.ipc"))
  52. )
  53. self.assertEqual(
  54. me.pulluri,
  55. "{0}".format(os.path.join(self.sock_dir, "master_event_pull.ipc")),
  56. )
  57. def test_minion_event(self):
  58. opts = dict(id="foo", sock_dir=self.sock_dir)
  59. id_hash = hashlib.sha256(
  60. salt.utils.stringutils.to_bytes(opts["id"])
  61. ).hexdigest()[:10]
  62. me = salt.utils.event.MinionEvent(opts, listen=False)
  63. self.assertEqual(
  64. me.puburi,
  65. "{0}".format(
  66. os.path.join(self.sock_dir, "minion_event_{0}_pub.ipc".format(id_hash))
  67. ),
  68. )
  69. self.assertEqual(
  70. me.pulluri,
  71. "{0}".format(
  72. os.path.join(self.sock_dir, "minion_event_{0}_pull.ipc".format(id_hash))
  73. ),
  74. )
  75. def test_minion_event_tcp_ipc_mode(self):
  76. opts = dict(id="foo", ipc_mode="tcp")
  77. me = salt.utils.event.MinionEvent(opts, listen=False)
  78. self.assertEqual(me.puburi, 4510)
  79. self.assertEqual(me.pulluri, 4511)
  80. def test_minion_event_no_id(self):
  81. me = salt.utils.event.MinionEvent(dict(sock_dir=self.sock_dir), listen=False)
  82. id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes("")).hexdigest()[:10]
  83. self.assertEqual(
  84. me.puburi,
  85. "{0}".format(
  86. os.path.join(self.sock_dir, "minion_event_{0}_pub.ipc".format(id_hash))
  87. ),
  88. )
  89. self.assertEqual(
  90. me.pulluri,
  91. "{0}".format(
  92. os.path.join(self.sock_dir, "minion_event_{0}_pull.ipc".format(id_hash))
  93. ),
  94. )
  95. @slowTest
  96. def test_event_single(self):
  97. """Test a single event is received"""
  98. with eventpublisher_process(self.sock_dir):
  99. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  100. me.fire_event({"data": "foo1"}, "evt1")
  101. evt1 = me.get_event(tag="evt1")
  102. self.assertGotEvent(evt1, {"data": "foo1"})
  103. @slowTest
  104. def test_event_single_no_block(self):
  105. """Test a single event is received, no block"""
  106. with eventpublisher_process(self.sock_dir):
  107. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  108. start = time.time()
  109. finish = start + 5
  110. evt1 = me.get_event(wait=0, tag="evt1", no_block=True)
  111. # We should get None and way before the 5 seconds wait since it's
  112. # non-blocking, otherwise it would wait for an event which we
  113. # didn't even send
  114. self.assertIsNone(evt1, None)
  115. self.assertLess(start, finish)
  116. me.fire_event({"data": "foo1"}, "evt1")
  117. evt1 = me.get_event(wait=0, tag="evt1")
  118. self.assertGotEvent(evt1, {"data": "foo1"})
  119. @slowTest
  120. def test_event_single_wait_0_no_block_False(self):
  121. """Test a single event is received with wait=0 and no_block=False and doesn't spin the while loop"""
  122. with eventpublisher_process(self.sock_dir):
  123. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  124. me.fire_event({"data": "foo1"}, "evt1")
  125. # This is too fast and will be None but assures we're not blocking
  126. evt1 = me.get_event(wait=0, tag="evt1", no_block=False)
  127. self.assertGotEvent(evt1, {"data": "foo1"})
  128. @slowTest
  129. def test_event_timeout(self):
  130. """Test no event is received if the timeout is reached"""
  131. with eventpublisher_process(self.sock_dir):
  132. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  133. me.fire_event({"data": "foo1"}, "evt1")
  134. evt1 = me.get_event(tag="evt1")
  135. self.assertGotEvent(evt1, {"data": "foo1"})
  136. evt2 = me.get_event(tag="evt1")
  137. self.assertIsNone(evt2)
  138. @slowTest
  139. def test_event_no_timeout(self):
  140. """Test no wait timeout, we should block forever, until we get one """
  141. with eventpublisher_process(self.sock_dir):
  142. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  143. with eventsender_process({"data": "foo2"}, "evt2", self.sock_dir, 5):
  144. evt = me.get_event(tag="evt2", wait=0, no_block=False)
  145. self.assertGotEvent(evt, {"data": "foo2"})
  146. @slowTest
  147. def test_event_matching(self):
  148. """Test a startswith match"""
  149. with eventpublisher_process(self.sock_dir):
  150. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  151. me.fire_event({"data": "foo1"}, "evt1")
  152. evt1 = me.get_event(tag="ev")
  153. self.assertGotEvent(evt1, {"data": "foo1"})
  154. @slowTest
  155. def test_event_matching_regex(self):
  156. """Test a regex match"""
  157. with eventpublisher_process(self.sock_dir):
  158. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  159. me.fire_event({"data": "foo1"}, "evt1")
  160. evt1 = me.get_event(tag="^ev", match_type="regex")
  161. self.assertGotEvent(evt1, {"data": "foo1"})
  162. @slowTest
  163. def test_event_matching_all(self):
  164. """Test an all match"""
  165. with eventpublisher_process(self.sock_dir):
  166. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  167. me.fire_event({"data": "foo1"}, "evt1")
  168. evt1 = me.get_event(tag="")
  169. self.assertGotEvent(evt1, {"data": "foo1"})
  170. @slowTest
  171. def test_event_matching_all_when_tag_is_None(self):
  172. """Test event matching all when not passing a tag"""
  173. with eventpublisher_process(self.sock_dir):
  174. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  175. me.fire_event({"data": "foo1"}, "evt1")
  176. evt1 = me.get_event()
  177. self.assertGotEvent(evt1, {"data": "foo1"})
  178. @slowTest
  179. def test_event_not_subscribed(self):
  180. """Test get_event drops non-subscribed events"""
  181. with eventpublisher_process(self.sock_dir):
  182. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  183. me.fire_event({"data": "foo1"}, "evt1")
  184. me.fire_event({"data": "foo2"}, "evt2")
  185. evt2 = me.get_event(tag="evt2")
  186. evt1 = me.get_event(tag="evt1")
  187. self.assertGotEvent(evt2, {"data": "foo2"})
  188. self.assertIsNone(evt1)
  189. @slowTest
  190. def test_event_subscription_cache(self):
  191. """Test subscriptions cache a message until requested"""
  192. with eventpublisher_process(self.sock_dir):
  193. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  194. me.subscribe("evt1")
  195. me.fire_event({"data": "foo1"}, "evt1")
  196. me.fire_event({"data": "foo2"}, "evt2")
  197. evt2 = me.get_event(tag="evt2")
  198. evt1 = me.get_event(tag="evt1")
  199. self.assertGotEvent(evt2, {"data": "foo2"})
  200. self.assertGotEvent(evt1, {"data": "foo1"})
  201. @slowTest
  202. def test_event_subscriptions_cache_regex(self):
  203. """Test regex subscriptions cache a message until requested"""
  204. with eventpublisher_process(self.sock_dir):
  205. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  206. me.subscribe("e..1$", "regex")
  207. me.fire_event({"data": "foo1"}, "evt1")
  208. me.fire_event({"data": "foo2"}, "evt2")
  209. evt2 = me.get_event(tag="evt2")
  210. evt1 = me.get_event(tag="evt1")
  211. self.assertGotEvent(evt2, {"data": "foo2"})
  212. self.assertGotEvent(evt1, {"data": "foo1"})
  213. @slowTest
  214. def test_event_multiple_clients(self):
  215. """Test event is received by multiple clients"""
  216. with eventpublisher_process(self.sock_dir):
  217. me1 = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  218. me2 = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  219. # We need to sleep here to avoid a race condition wherein
  220. # the second socket may not be connected by the time the first socket
  221. # sends the event.
  222. time.sleep(0.5)
  223. me1.fire_event({"data": "foo1"}, "evt1")
  224. evt1 = me1.get_event(tag="evt1")
  225. self.assertGotEvent(evt1, {"data": "foo1"})
  226. evt2 = me2.get_event(tag="evt1")
  227. self.assertGotEvent(evt2, {"data": "foo1"})
  228. @expectedFailure
  229. def test_event_nested_sub_all(self):
  230. """Test nested event subscriptions do not drop events, get event for all tags"""
  231. # Show why not to call get_event(tag='')
  232. with eventpublisher_process(self.sock_dir):
  233. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  234. me.fire_event({"data": "foo1"}, "evt1")
  235. me.fire_event({"data": "foo2"}, "evt2")
  236. evt2 = me.get_event(tag="")
  237. evt1 = me.get_event(tag="")
  238. self.assertGotEvent(evt2, {"data": "foo2"})
  239. self.assertGotEvent(evt1, {"data": "foo1"})
  240. @slowTest
  241. def test_event_many(self):
  242. """Test a large number of events, one at a time"""
  243. with eventpublisher_process(self.sock_dir):
  244. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  245. for i in range(500):
  246. me.fire_event({"data": "{0}".format(i)}, "testevents")
  247. evt = me.get_event(tag="testevents")
  248. self.assertGotEvent(
  249. evt, {"data": "{0}".format(i)}, "Event {0}".format(i)
  250. )
  251. @slowTest
  252. def test_event_many_backlog(self):
  253. """Test a large number of events, send all then recv all"""
  254. with eventpublisher_process(self.sock_dir):
  255. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  256. # Must not exceed zmq HWM
  257. for i in range(500):
  258. me.fire_event({"data": "{0}".format(i)}, "testevents")
  259. for i in range(500):
  260. evt = me.get_event(tag="testevents")
  261. self.assertGotEvent(
  262. evt, {"data": "{0}".format(i)}, "Event {0}".format(i)
  263. )
  264. # Test the fire_master function. As it wraps the underlying fire_event,
  265. # we don't need to perform extensive testing.
  266. @slowTest
  267. def test_send_master_event(self):
  268. """Tests that sending an event through fire_master generates expected event"""
  269. with eventpublisher_process(self.sock_dir):
  270. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  271. data = {"data": "foo1"}
  272. me.fire_master(data, "test_master")
  273. evt = me.get_event(tag="fire_master")
  274. self.assertGotEvent(
  275. evt,
  276. {"data": data, "tag": "test_master", "events": None, "pretag": None},
  277. )
  278. class TestAsyncEventPublisher(AsyncTestCase):
  279. def get_new_ioloop(self):
  280. return salt.ext.tornado.ioloop.IOLoop()
  281. def setUp(self):
  282. super(TestAsyncEventPublisher, self).setUp()
  283. self.sock_dir = os.path.join(RUNTIME_VARS.TMP, "test-socks")
  284. if not os.path.exists(self.sock_dir):
  285. os.makedirs(self.sock_dir)
  286. self.addCleanup(shutil.rmtree, self.sock_dir, ignore_errors=True)
  287. self.opts = {"sock_dir": self.sock_dir}
  288. self.publisher = salt.utils.event.AsyncEventPublisher(self.opts, self.io_loop,)
  289. self.event = salt.utils.event.get_event(
  290. "minion", opts=self.opts, io_loop=self.io_loop
  291. )
  292. self.event.subscribe("")
  293. self.event.set_event_handler(self._handle_publish)
  294. def _handle_publish(self, raw):
  295. self.tag, self.data = salt.utils.event.SaltEvent.unpack(raw)
  296. self.stop()
  297. def test_event_subscription(self):
  298. """Test a single event is received"""
  299. me = salt.utils.event.MinionEvent(self.opts, listen=True)
  300. me.fire_event({"data": "foo1"}, "evt1")
  301. self.wait()
  302. evt1 = me.get_event(tag="evt1")
  303. self.assertEqual(self.tag, "evt1")
  304. self.data.pop("_stamp") # drop the stamp
  305. self.assertEqual(self.data, {"data": "foo1"})
  306. class TestEventReturn(TestCase):
  307. @slowTest
  308. def test_event_return(self):
  309. # Once salt is py3 only, the warnings part of this test no longer applies
  310. evt = None
  311. try:
  312. with warnings.catch_warnings(record=True) as w:
  313. # Cause all warnings to always be triggered.
  314. warnings.simplefilter("always")
  315. evt = None
  316. try:
  317. evt = salt.utils.event.EventReturn(
  318. salt.config.DEFAULT_MASTER_OPTS.copy()
  319. )
  320. evt.start()
  321. except TypeError as exc:
  322. if "object" in str(exc):
  323. self.fail(
  324. "'{}' TypeError should have not been raised".format(exc)
  325. )
  326. for warning in w:
  327. if warning.category is DeprecationWarning:
  328. assert "object() takes no parameters" not in warning.message
  329. finally:
  330. if evt is not None:
  331. terminate_process(evt.pid, kill_children=True)