test_event.py 14 KB


  1. """
  2. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  3. tests.unit.utils.event_test
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  5. """
  6. import hashlib
  7. import os
  8. import shutil
  9. import time
  10. import salt.config
  11. import salt.ext.tornado.ioloop
  12. import salt.utils.event
  13. import salt.utils.stringutils
  14. import zmq
  15. import zmq.eventloop.ioloop
  16. from salt.ext.six.moves import range
  17. from salt.ext.tornado.testing import AsyncTestCase
  18. from saltfactories.utils.processes import terminate_process
  19. from tests.support.events import eventpublisher_process, eventsender_process
  20. from tests.support.helpers import slowTest
  21. from tests.support.runtests import RUNTIME_VARS
  22. from tests.support.unit import TestCase, expectedFailure, skipIf
  23. # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
  24. if not hasattr(zmq.eventloop.ioloop, "ZMQIOLoop"):
  25. zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
  26. NO_LONG_IPC = False
  27. if getattr(zmq, "IPC_PATH_MAX_LEN", 103) <= 103:
  28. NO_LONG_IPC = True
  29. @skipIf(
  30. NO_LONG_IPC, "This system does not support long IPC paths. Skipping event tests!"
  31. )
  32. class TestSaltEvent(TestCase):
  33. def setUp(self):
  34. self.sock_dir = os.path.join(RUNTIME_VARS.TMP, "test-socks")
  35. if not os.path.exists(self.sock_dir):
  36. os.makedirs(self.sock_dir)
  37. self.addCleanup(shutil.rmtree, self.sock_dir, ignore_errors=True)
  38. def assertGotEvent(self, evt, data, msg=None):
  39. self.assertIsNotNone(evt, msg)
  40. for key in data:
  41. self.assertIn(key, evt, "{}: Key {} missing".format(msg, key))
  42. assertMsg = "{0}: Key {1} value mismatch, {2} != {3}"
  43. assertMsg = assertMsg.format(msg, key, data[key], evt[key])
  44. self.assertEqual(data[key], evt[key], assertMsg)
  45. def test_master_event(self):
  46. me = salt.utils.event.MasterEvent(self.sock_dir, listen=False)
  47. self.assertEqual(
  48. me.puburi, "{}".format(os.path.join(self.sock_dir, "master_event_pub.ipc"))
  49. )
  50. self.assertEqual(
  51. me.pulluri,
  52. "{}".format(os.path.join(self.sock_dir, "master_event_pull.ipc")),
  53. )
  54. def test_minion_event(self):
  55. opts = dict(id="foo", sock_dir=self.sock_dir)
  56. id_hash = hashlib.sha256(
  57. salt.utils.stringutils.to_bytes(opts["id"])
  58. ).hexdigest()[:10]
  59. me = salt.utils.event.MinionEvent(opts, listen=False)
  60. self.assertEqual(
  61. me.puburi,
  62. "{}".format(
  63. os.path.join(self.sock_dir, "minion_event_{}_pub.ipc".format(id_hash))
  64. ),
  65. )
  66. self.assertEqual(
  67. me.pulluri,
  68. "{}".format(
  69. os.path.join(self.sock_dir, "minion_event_{}_pull.ipc".format(id_hash))
  70. ),
  71. )
  72. def test_minion_event_tcp_ipc_mode(self):
  73. opts = dict(id="foo", ipc_mode="tcp")
  74. me = salt.utils.event.MinionEvent(opts, listen=False)
  75. self.assertEqual(me.puburi, 4510)
  76. self.assertEqual(me.pulluri, 4511)
  77. def test_minion_event_no_id(self):
  78. me = salt.utils.event.MinionEvent(dict(sock_dir=self.sock_dir), listen=False)
  79. id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes("")).hexdigest()[:10]
  80. self.assertEqual(
  81. me.puburi,
  82. "{}".format(
  83. os.path.join(self.sock_dir, "minion_event_{}_pub.ipc".format(id_hash))
  84. ),
  85. )
  86. self.assertEqual(
  87. me.pulluri,
  88. "{}".format(
  89. os.path.join(self.sock_dir, "minion_event_{}_pull.ipc".format(id_hash))
  90. ),
  91. )
  92. @slowTest
  93. def test_event_single(self):
  94. """Test a single event is received"""
  95. with eventpublisher_process(self.sock_dir):
  96. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  97. me.fire_event({"data": "foo1"}, "evt1")
  98. evt1 = me.get_event(tag="evt1")
  99. self.assertGotEvent(evt1, {"data": "foo1"})
  100. @slowTest
  101. def test_event_single_no_block(self):
  102. """Test a single event is received, no block"""
  103. with eventpublisher_process(self.sock_dir):
  104. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  105. start = time.time()
  106. finish = start + 5
  107. evt1 = me.get_event(wait=0, tag="evt1", no_block=True)
  108. # We should get None and way before the 5 seconds wait since it's
  109. # non-blocking, otherwise it would wait for an event which we
  110. # didn't even send
  111. self.assertIsNone(evt1, None)
  112. self.assertLess(start, finish)
  113. me.fire_event({"data": "foo1"}, "evt1")
  114. evt1 = me.get_event(wait=0, tag="evt1")
  115. self.assertGotEvent(evt1, {"data": "foo1"})
  116. @slowTest
  117. def test_event_single_wait_0_no_block_False(self):
  118. """Test a single event is received with wait=0 and no_block=False and doesn't spin the while loop"""
  119. with eventpublisher_process(self.sock_dir):
  120. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  121. me.fire_event({"data": "foo1"}, "evt1")
  122. # This is too fast and will be None but assures we're not blocking
  123. evt1 = me.get_event(wait=0, tag="evt1", no_block=False)
  124. self.assertGotEvent(evt1, {"data": "foo1"})
  125. @slowTest
  126. def test_event_timeout(self):
  127. """Test no event is received if the timeout is reached"""
  128. with eventpublisher_process(self.sock_dir):
  129. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  130. me.fire_event({"data": "foo1"}, "evt1")
  131. evt1 = me.get_event(tag="evt1")
  132. self.assertGotEvent(evt1, {"data": "foo1"})
  133. evt2 = me.get_event(tag="evt1")
  134. self.assertIsNone(evt2)
  135. @slowTest
  136. def test_event_no_timeout(self):
  137. """Test no wait timeout, we should block forever, until we get one """
  138. with eventpublisher_process(self.sock_dir):
  139. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  140. with eventsender_process({"data": "foo2"}, "evt2", self.sock_dir, 5):
  141. evt = me.get_event(tag="evt2", wait=0, no_block=False)
  142. self.assertGotEvent(evt, {"data": "foo2"})
  143. @slowTest
  144. def test_event_matching(self):
  145. """Test a startswith match"""
  146. with eventpublisher_process(self.sock_dir):
  147. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  148. me.fire_event({"data": "foo1"}, "evt1")
  149. evt1 = me.get_event(tag="ev")
  150. self.assertGotEvent(evt1, {"data": "foo1"})
  151. @slowTest
  152. def test_event_matching_regex(self):
  153. """Test a regex match"""
  154. with eventpublisher_process(self.sock_dir):
  155. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  156. me.fire_event({"data": "foo1"}, "evt1")
  157. evt1 = me.get_event(tag="^ev", match_type="regex")
  158. self.assertGotEvent(evt1, {"data": "foo1"})
  159. @slowTest
  160. def test_event_matching_all(self):
  161. """Test an all match"""
  162. with eventpublisher_process(self.sock_dir):
  163. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  164. me.fire_event({"data": "foo1"}, "evt1")
  165. evt1 = me.get_event(tag="")
  166. self.assertGotEvent(evt1, {"data": "foo1"})
  167. @slowTest
  168. def test_event_matching_all_when_tag_is_None(self):
  169. """Test event matching all when not passing a tag"""
  170. with eventpublisher_process(self.sock_dir):
  171. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  172. me.fire_event({"data": "foo1"}, "evt1")
  173. evt1 = me.get_event()
  174. self.assertGotEvent(evt1, {"data": "foo1"})
  175. @slowTest
  176. def test_event_not_subscribed(self):
  177. """Test get_event drops non-subscribed events"""
  178. with eventpublisher_process(self.sock_dir):
  179. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  180. me.fire_event({"data": "foo1"}, "evt1")
  181. me.fire_event({"data": "foo2"}, "evt2")
  182. evt2 = me.get_event(tag="evt2")
  183. evt1 = me.get_event(tag="evt1")
  184. self.assertGotEvent(evt2, {"data": "foo2"})
  185. self.assertIsNone(evt1)
  186. @slowTest
  187. def test_event_subscription_cache(self):
  188. """Test subscriptions cache a message until requested"""
  189. with eventpublisher_process(self.sock_dir):
  190. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  191. me.subscribe("evt1")
  192. me.fire_event({"data": "foo1"}, "evt1")
  193. me.fire_event({"data": "foo2"}, "evt2")
  194. evt2 = me.get_event(tag="evt2")
  195. evt1 = me.get_event(tag="evt1")
  196. self.assertGotEvent(evt2, {"data": "foo2"})
  197. self.assertGotEvent(evt1, {"data": "foo1"})
  198. @slowTest
  199. def test_event_subscriptions_cache_regex(self):
  200. """Test regex subscriptions cache a message until requested"""
  201. with eventpublisher_process(self.sock_dir):
  202. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  203. me.subscribe("e..1$", "regex")
  204. me.fire_event({"data": "foo1"}, "evt1")
  205. me.fire_event({"data": "foo2"}, "evt2")
  206. evt2 = me.get_event(tag="evt2")
  207. evt1 = me.get_event(tag="evt1")
  208. self.assertGotEvent(evt2, {"data": "foo2"})
  209. self.assertGotEvent(evt1, {"data": "foo1"})
  210. @slowTest
  211. def test_event_multiple_clients(self):
  212. """Test event is received by multiple clients"""
  213. with eventpublisher_process(self.sock_dir):
  214. me1 = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  215. me2 = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  216. # We need to sleep here to avoid a race condition wherein
  217. # the second socket may not be connected by the time the first socket
  218. # sends the event.
  219. time.sleep(0.5)
  220. me1.fire_event({"data": "foo1"}, "evt1")
  221. evt1 = me1.get_event(tag="evt1")
  222. self.assertGotEvent(evt1, {"data": "foo1"})
  223. evt2 = me2.get_event(tag="evt1")
  224. self.assertGotEvent(evt2, {"data": "foo1"})
  225. @expectedFailure
  226. def test_event_nested_sub_all(self):
  227. """Test nested event subscriptions do not drop events, get event for all tags"""
  228. # Show why not to call get_event(tag='')
  229. with eventpublisher_process(self.sock_dir):
  230. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  231. me.fire_event({"data": "foo1"}, "evt1")
  232. me.fire_event({"data": "foo2"}, "evt2")
  233. evt2 = me.get_event(tag="")
  234. evt1 = me.get_event(tag="")
  235. self.assertGotEvent(evt2, {"data": "foo2"})
  236. self.assertGotEvent(evt1, {"data": "foo1"})
  237. @slowTest
  238. def test_event_many(self):
  239. """Test a large number of events, one at a time"""
  240. with eventpublisher_process(self.sock_dir):
  241. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  242. for i in range(500):
  243. me.fire_event({"data": "{}".format(i)}, "testevents")
  244. evt = me.get_event(tag="testevents")
  245. self.assertGotEvent(evt, {"data": "{}".format(i)}, "Event {}".format(i))
  246. @slowTest
  247. def test_event_many_backlog(self):
  248. """Test a large number of events, send all then recv all"""
  249. with eventpublisher_process(self.sock_dir):
  250. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  251. # Must not exceed zmq HWM
  252. for i in range(500):
  253. me.fire_event({"data": "{}".format(i)}, "testevents")
  254. for i in range(500):
  255. evt = me.get_event(tag="testevents")
  256. self.assertGotEvent(evt, {"data": "{}".format(i)}, "Event {}".format(i))
  257. # Test the fire_master function. As it wraps the underlying fire_event,
  258. # we don't need to perform extensive testing.
  259. @slowTest
  260. def test_send_master_event(self):
  261. """Tests that sending an event through fire_master generates expected event"""
  262. with eventpublisher_process(self.sock_dir):
  263. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  264. data = {"data": "foo1"}
  265. me.fire_master(data, "test_master")
  266. evt = me.get_event(tag="fire_master")
  267. self.assertGotEvent(
  268. evt,
  269. {"data": data, "tag": "test_master", "events": None, "pretag": None},
  270. )
  271. class TestAsyncEventPublisher(AsyncTestCase):
  272. def get_new_ioloop(self):
  273. return salt.ext.tornado.ioloop.IOLoop()
  274. def setUp(self):
  275. super().setUp()
  276. self.sock_dir = os.path.join(RUNTIME_VARS.TMP, "test-socks")
  277. if not os.path.exists(self.sock_dir):
  278. os.makedirs(self.sock_dir)
  279. self.addCleanup(shutil.rmtree, self.sock_dir, ignore_errors=True)
  280. self.opts = {"sock_dir": self.sock_dir}
  281. self.publisher = salt.utils.event.AsyncEventPublisher(self.opts, self.io_loop,)
  282. self.event = salt.utils.event.get_event(
  283. "minion", opts=self.opts, io_loop=self.io_loop
  284. )
  285. self.event.subscribe("")
  286. self.event.set_event_handler(self._handle_publish)
  287. def _handle_publish(self, raw):
  288. self.tag, self.data = salt.utils.event.SaltEvent.unpack(raw)
  289. self.stop()
  290. def test_event_subscription(self):
  291. """Test a single event is received"""
  292. me = salt.utils.event.MinionEvent(self.opts, listen=True)
  293. me.fire_event({"data": "foo1"}, "evt1")
  294. self.wait()
  295. evt1 = me.get_event(tag="evt1")
  296. self.assertEqual(self.tag, "evt1")
  297. self.data.pop("_stamp") # drop the stamp
  298. self.assertEqual(self.data, {"data": "foo1"})
  299. def test_event_unsubscribe_remove_error(self):
  300. me = salt.utils.event.MinionEvent(self.opts, listen=True)
  301. tag = "evt1"
  302. me.fire_event({"data": "foo1"}, tag)
  303. # Make sure no remove error is raised when tag is not found
  304. for _ in range(2):
  305. me.unsubscribe(tag)
  306. me.unsubscribe("tag_does_not_exist")
  307. class TestEventReturn(TestCase):
  308. @slowTest
  309. def test_event_return(self):
  310. evt = None
  311. try:
  312. evt = salt.utils.event.EventReturn(salt.config.DEFAULT_MASTER_OPTS.copy())
  313. evt.start()
  314. except TypeError as exc:
  315. if "object" in str(exc):
  316. self.fail("'{}' TypeError should have not been raised".format(exc))
  317. finally:
  318. if evt is not None:
  319. terminate_process(evt.pid, kill_children=True)