test_event.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.unit.utils.event_test
  5. ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  6. '''
  7. # Import python libs
  8. from __future__ import absolute_import, unicode_literals, print_function
  9. import os
  10. import hashlib
  11. import time
  12. import shutil
  13. import warnings
  14. # Import Salt Testing libs
  15. from tests.support.unit import expectedFailure, skipIf, TestCase
  16. from tests.support.runtests import RUNTIME_VARS
  17. from tests.support.events import eventpublisher_process, eventsender_process
  18. # Import salt libs
  19. import salt.config
  20. import salt.utils.event
  21. import salt.utils.stringutils
  22. # Import 3rd-+arty libs
  23. from tornado.testing import AsyncTestCase
  24. import zmq
  25. import zmq.eventloop.ioloop
  26. # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
  27. if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
  28. zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
  29. from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
  30. from tests.support.processes import terminate_process
  31. NO_LONG_IPC = False
  32. if getattr(zmq, 'IPC_PATH_MAX_LEN', 103) <= 103:
  33. NO_LONG_IPC = True
  34. @skipIf(NO_LONG_IPC, "This system does not support long IPC paths. Skipping event tests!")
  35. class TestSaltEvent(TestCase):
  36. def setUp(self):
  37. self.sock_dir = os.path.join(RUNTIME_VARS.TMP, 'test-socks')
  38. if not os.path.exists(self.sock_dir):
  39. os.makedirs(self.sock_dir)
  40. self.addCleanup(shutil.rmtree, self.sock_dir, ignore_errors=True)
  41. def assertGotEvent(self, evt, data, msg=None):
  42. self.assertIsNotNone(evt, msg)
  43. for key in data:
  44. self.assertIn(key, evt, '{0}: Key {1} missing'.format(msg, key))
  45. assertMsg = '{0}: Key {1} value mismatch, {2} != {3}'
  46. assertMsg = assertMsg.format(msg, key, data[key], evt[key])
  47. self.assertEqual(data[key], evt[key], assertMsg)
  48. def test_master_event(self):
  49. me = salt.utils.event.MasterEvent(self.sock_dir, listen=False)
  50. self.assertEqual(
  51. me.puburi, '{0}'.format(
  52. os.path.join(self.sock_dir, 'master_event_pub.ipc')
  53. )
  54. )
  55. self.assertEqual(
  56. me.pulluri,
  57. '{0}'.format(
  58. os.path.join(self.sock_dir, 'master_event_pull.ipc')
  59. )
  60. )
  61. def test_minion_event(self):
  62. opts = dict(id='foo', sock_dir=self.sock_dir)
  63. id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes(opts['id'])).hexdigest()[:10]
  64. me = salt.utils.event.MinionEvent(opts, listen=False)
  65. self.assertEqual(
  66. me.puburi,
  67. '{0}'.format(
  68. os.path.join(
  69. self.sock_dir, 'minion_event_{0}_pub.ipc'.format(id_hash)
  70. )
  71. )
  72. )
  73. self.assertEqual(
  74. me.pulluri,
  75. '{0}'.format(
  76. os.path.join(
  77. self.sock_dir, 'minion_event_{0}_pull.ipc'.format(id_hash)
  78. )
  79. )
  80. )
  81. def test_minion_event_tcp_ipc_mode(self):
  82. opts = dict(id='foo', ipc_mode='tcp')
  83. me = salt.utils.event.MinionEvent(opts, listen=False)
  84. self.assertEqual(me.puburi, 4510)
  85. self.assertEqual(me.pulluri, 4511)
  86. def test_minion_event_no_id(self):
  87. me = salt.utils.event.MinionEvent(dict(sock_dir=self.sock_dir), listen=False)
  88. id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes('')).hexdigest()[:10]
  89. self.assertEqual(
  90. me.puburi,
  91. '{0}'.format(
  92. os.path.join(
  93. self.sock_dir, 'minion_event_{0}_pub.ipc'.format(id_hash)
  94. )
  95. )
  96. )
  97. self.assertEqual(
  98. me.pulluri,
  99. '{0}'.format(
  100. os.path.join(
  101. self.sock_dir, 'minion_event_{0}_pull.ipc'.format(id_hash)
  102. )
  103. )
  104. )
  105. def test_event_single(self):
  106. '''Test a single event is received'''
  107. with eventpublisher_process(self.sock_dir):
  108. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  109. me.fire_event({'data': 'foo1'}, 'evt1')
  110. evt1 = me.get_event(tag='evt1')
  111. self.assertGotEvent(evt1, {'data': 'foo1'})
  112. def test_event_single_no_block(self):
  113. '''Test a single event is received, no block'''
  114. with eventpublisher_process(self.sock_dir):
  115. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  116. start = time.time()
  117. finish = start + 5
  118. evt1 = me.get_event(wait=0, tag='evt1', no_block=True)
  119. # We should get None and way before the 5 seconds wait since it's
  120. # non-blocking, otherwise it would wait for an event which we
  121. # didn't even send
  122. self.assertIsNone(evt1, None)
  123. self.assertLess(start, finish)
  124. me.fire_event({'data': 'foo1'}, 'evt1')
  125. evt1 = me.get_event(wait=0, tag='evt1')
  126. self.assertGotEvent(evt1, {'data': 'foo1'})
  127. def test_event_single_wait_0_no_block_False(self):
  128. '''Test a single event is received with wait=0 and no_block=False and doesn't spin the while loop'''
  129. with eventpublisher_process(self.sock_dir):
  130. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  131. me.fire_event({'data': 'foo1'}, 'evt1')
  132. # This is too fast and will be None but assures we're not blocking
  133. evt1 = me.get_event(wait=0, tag='evt1', no_block=False)
  134. self.assertGotEvent(evt1, {'data': 'foo1'})
  135. def test_event_timeout(self):
  136. '''Test no event is received if the timeout is reached'''
  137. with eventpublisher_process(self.sock_dir):
  138. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  139. me.fire_event({'data': 'foo1'}, 'evt1')
  140. evt1 = me.get_event(tag='evt1')
  141. self.assertGotEvent(evt1, {'data': 'foo1'})
  142. evt2 = me.get_event(tag='evt1')
  143. self.assertIsNone(evt2)
  144. def test_event_no_timeout(self):
  145. '''Test no wait timeout, we should block forever, until we get one '''
  146. with eventpublisher_process(self.sock_dir):
  147. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  148. with eventsender_process({'data': 'foo2'}, 'evt2', self.sock_dir, 5):
  149. evt = me.get_event(tag='evt2', wait=0, no_block=False)
  150. self.assertGotEvent(evt, {'data': 'foo2'})
  151. def test_event_matching(self):
  152. '''Test a startswith match'''
  153. with eventpublisher_process(self.sock_dir):
  154. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  155. me.fire_event({'data': 'foo1'}, 'evt1')
  156. evt1 = me.get_event(tag='ev')
  157. self.assertGotEvent(evt1, {'data': 'foo1'})
  158. def test_event_matching_regex(self):
  159. '''Test a regex match'''
  160. with eventpublisher_process(self.sock_dir):
  161. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  162. me.fire_event({'data': 'foo1'}, 'evt1')
  163. evt1 = me.get_event(tag='^ev', match_type='regex')
  164. self.assertGotEvent(evt1, {'data': 'foo1'})
  165. def test_event_matching_all(self):
  166. '''Test an all match'''
  167. with eventpublisher_process(self.sock_dir):
  168. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  169. me.fire_event({'data': 'foo1'}, 'evt1')
  170. evt1 = me.get_event(tag='')
  171. self.assertGotEvent(evt1, {'data': 'foo1'})
  172. def test_event_matching_all_when_tag_is_None(self):
  173. '''Test event matching all when not passing a tag'''
  174. with eventpublisher_process(self.sock_dir):
  175. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  176. me.fire_event({'data': 'foo1'}, 'evt1')
  177. evt1 = me.get_event()
  178. self.assertGotEvent(evt1, {'data': 'foo1'})
  179. def test_event_not_subscribed(self):
  180. '''Test get_event drops non-subscribed events'''
  181. with eventpublisher_process(self.sock_dir):
  182. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  183. me.fire_event({'data': 'foo1'}, 'evt1')
  184. me.fire_event({'data': 'foo2'}, 'evt2')
  185. evt2 = me.get_event(tag='evt2')
  186. evt1 = me.get_event(tag='evt1')
  187. self.assertGotEvent(evt2, {'data': 'foo2'})
  188. self.assertIsNone(evt1)
  189. def test_event_subscription_cache(self):
  190. '''Test subscriptions cache a message until requested'''
  191. with eventpublisher_process(self.sock_dir):
  192. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  193. me.subscribe('evt1')
  194. me.fire_event({'data': 'foo1'}, 'evt1')
  195. me.fire_event({'data': 'foo2'}, 'evt2')
  196. evt2 = me.get_event(tag='evt2')
  197. evt1 = me.get_event(tag='evt1')
  198. self.assertGotEvent(evt2, {'data': 'foo2'})
  199. self.assertGotEvent(evt1, {'data': 'foo1'})
  200. def test_event_subscriptions_cache_regex(self):
  201. '''Test regex subscriptions cache a message until requested'''
  202. with eventpublisher_process(self.sock_dir):
  203. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  204. me.subscribe('e..1$', 'regex')
  205. me.fire_event({'data': 'foo1'}, 'evt1')
  206. me.fire_event({'data': 'foo2'}, 'evt2')
  207. evt2 = me.get_event(tag='evt2')
  208. evt1 = me.get_event(tag='evt1')
  209. self.assertGotEvent(evt2, {'data': 'foo2'})
  210. self.assertGotEvent(evt1, {'data': 'foo1'})
  211. def test_event_multiple_clients(self):
  212. '''Test event is received by multiple clients'''
  213. with eventpublisher_process(self.sock_dir):
  214. me1 = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  215. me2 = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  216. # We need to sleep here to avoid a race condition wherein
  217. # the second socket may not be connected by the time the first socket
  218. # sends the event.
  219. time.sleep(0.5)
  220. me1.fire_event({'data': 'foo1'}, 'evt1')
  221. evt1 = me1.get_event(tag='evt1')
  222. self.assertGotEvent(evt1, {'data': 'foo1'})
  223. evt2 = me2.get_event(tag='evt1')
  224. self.assertGotEvent(evt2, {'data': 'foo1'})
  225. @expectedFailure
  226. def test_event_nested_sub_all(self):
  227. '''Test nested event subscriptions do not drop events, get event for all tags'''
  228. # Show why not to call get_event(tag='')
  229. with eventpublisher_process(self.sock_dir):
  230. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  231. me.fire_event({'data': 'foo1'}, 'evt1')
  232. me.fire_event({'data': 'foo2'}, 'evt2')
  233. evt2 = me.get_event(tag='')
  234. evt1 = me.get_event(tag='')
  235. self.assertGotEvent(evt2, {'data': 'foo2'})
  236. self.assertGotEvent(evt1, {'data': 'foo1'})
  237. def test_event_many(self):
  238. '''Test a large number of events, one at a time'''
  239. with eventpublisher_process(self.sock_dir):
  240. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  241. for i in range(500):
  242. me.fire_event({'data': '{0}'.format(i)}, 'testevents')
  243. evt = me.get_event(tag='testevents')
  244. self.assertGotEvent(evt, {'data': '{0}'.format(i)}, 'Event {0}'.format(i))
  245. def test_event_many_backlog(self):
  246. '''Test a large number of events, send all then recv all'''
  247. with eventpublisher_process(self.sock_dir):
  248. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  249. # Must not exceed zmq HWM
  250. for i in range(500):
  251. me.fire_event({'data': '{0}'.format(i)}, 'testevents')
  252. for i in range(500):
  253. evt = me.get_event(tag='testevents')
  254. self.assertGotEvent(evt, {'data': '{0}'.format(i)}, 'Event {0}'.format(i))
  255. # Test the fire_master function. As it wraps the underlying fire_event,
  256. # we don't need to perform extensive testing.
  257. def test_send_master_event(self):
  258. '''Tests that sending an event through fire_master generates expected event'''
  259. with eventpublisher_process(self.sock_dir):
  260. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  261. data = {'data': 'foo1'}
  262. me.fire_master(data, 'test_master')
  263. evt = me.get_event(tag='fire_master')
  264. self.assertGotEvent(evt, {'data': data, 'tag': 'test_master', 'events': None, 'pretag': None})
  265. class TestAsyncEventPublisher(AsyncTestCase):
  266. def get_new_ioloop(self):
  267. return zmq.eventloop.ioloop.ZMQIOLoop()
  268. def setUp(self):
  269. super(TestAsyncEventPublisher, self).setUp()
  270. self.sock_dir = os.path.join(RUNTIME_VARS.TMP, 'test-socks')
  271. if not os.path.exists(self.sock_dir):
  272. os.makedirs(self.sock_dir)
  273. self.addCleanup(shutil.rmtree, self.sock_dir, ignore_errors=True)
  274. self.opts = {'sock_dir': self.sock_dir}
  275. self.publisher = salt.utils.event.AsyncEventPublisher(
  276. self.opts,
  277. self.io_loop,
  278. )
  279. self.event = salt.utils.event.get_event('minion', opts=self.opts, io_loop=self.io_loop)
  280. self.event.subscribe('')
  281. self.event.set_event_handler(self._handle_publish)
  282. def _handle_publish(self, raw):
  283. self.tag, self.data = salt.utils.event.SaltEvent.unpack(raw)
  284. self.stop()
  285. def test_event_subscription(self):
  286. '''Test a single event is received'''
  287. me = salt.utils.event.MinionEvent(self.opts, listen=True)
  288. me.fire_event({'data': 'foo1'}, 'evt1')
  289. self.wait()
  290. evt1 = me.get_event(tag='evt1')
  291. self.assertEqual(self.tag, 'evt1')
  292. self.data.pop('_stamp') # drop the stamp
  293. self.assertEqual(self.data, {'data': 'foo1'})
  294. class TestEventReturn(TestCase):
  295. def test_event_return(self):
  296. # Once salt is py3 only, the warnings part of this test no longer applies
  297. evt = None
  298. try:
  299. with warnings.catch_warnings(record=True) as w:
  300. # Cause all warnings to always be triggered.
  301. warnings.simplefilter("always")
  302. evt = None
  303. try:
  304. evt = salt.utils.event.EventReturn(salt.config.DEFAULT_MASTER_OPTS.copy())
  305. evt.start()
  306. except TypeError as exc:
  307. if 'object' in str(exc):
  308. self.fail('\'{}\' TypeError should have not been raised'.format(exc))
  309. for warning in w:
  310. if warning.category is DeprecationWarning:
  311. assert 'object() takes no parameters' not in warning.message
  312. finally:
  313. if evt is not None:
  314. terminate_process(evt.pid, kill_children=True)