1
0

test_event.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.unit.utils.event_test
  5. ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  6. '''
  7. # Import python libs
  8. from __future__ import absolute_import, unicode_literals, print_function
  9. import os
  10. import hashlib
  11. import time
  12. import shutil
  13. import warnings
  14. # Import Salt Testing libs
  15. from tests.support.unit import expectedFailure, skipIf, TestCase
  16. from tests.support.runtests import RUNTIME_VARS
  17. from tests.support.events import eventpublisher_process, eventsender_process
  18. # Import salt libs
  19. import salt.config
  20. import salt.utils.event
  21. import salt.utils.stringutils
  22. # Import 3rd-+arty libs
  23. from salt.ext.tornado.testing import AsyncTestCase
  24. import salt.ext.tornado.ioloop
  25. import zmq
  26. import zmq.eventloop.ioloop
  27. # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
  28. if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
  29. zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
  30. from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
  31. from tests.support.processes import terminate_process
  32. NO_LONG_IPC = False
  33. if getattr(zmq, 'IPC_PATH_MAX_LEN', 103) <= 103:
  34. NO_LONG_IPC = True
  35. @skipIf(NO_LONG_IPC, "This system does not support long IPC paths. Skipping event tests!")
  36. class TestSaltEvent(TestCase):
  37. def setUp(self):
  38. self.sock_dir = os.path.join(RUNTIME_VARS.TMP, 'test-socks')
  39. if not os.path.exists(self.sock_dir):
  40. os.makedirs(self.sock_dir)
  41. self.addCleanup(shutil.rmtree, self.sock_dir, ignore_errors=True)
  42. def assertGotEvent(self, evt, data, msg=None):
  43. self.assertIsNotNone(evt, msg)
  44. for key in data:
  45. self.assertIn(key, evt, '{0}: Key {1} missing'.format(msg, key))
  46. assertMsg = '{0}: Key {1} value mismatch, {2} != {3}'
  47. assertMsg = assertMsg.format(msg, key, data[key], evt[key])
  48. self.assertEqual(data[key], evt[key], assertMsg)
  49. def test_master_event(self):
  50. me = salt.utils.event.MasterEvent(self.sock_dir, listen=False)
  51. self.assertEqual(
  52. me.puburi, '{0}'.format(
  53. os.path.join(self.sock_dir, 'master_event_pub.ipc')
  54. )
  55. )
  56. self.assertEqual(
  57. me.pulluri,
  58. '{0}'.format(
  59. os.path.join(self.sock_dir, 'master_event_pull.ipc')
  60. )
  61. )
  62. def test_minion_event(self):
  63. opts = dict(id='foo', sock_dir=self.sock_dir)
  64. id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes(opts['id'])).hexdigest()[:10]
  65. me = salt.utils.event.MinionEvent(opts, listen=False)
  66. self.assertEqual(
  67. me.puburi,
  68. '{0}'.format(
  69. os.path.join(
  70. self.sock_dir, 'minion_event_{0}_pub.ipc'.format(id_hash)
  71. )
  72. )
  73. )
  74. self.assertEqual(
  75. me.pulluri,
  76. '{0}'.format(
  77. os.path.join(
  78. self.sock_dir, 'minion_event_{0}_pull.ipc'.format(id_hash)
  79. )
  80. )
  81. )
  82. def test_minion_event_tcp_ipc_mode(self):
  83. opts = dict(id='foo', ipc_mode='tcp')
  84. me = salt.utils.event.MinionEvent(opts, listen=False)
  85. self.assertEqual(me.puburi, 4510)
  86. self.assertEqual(me.pulluri, 4511)
  87. def test_minion_event_no_id(self):
  88. me = salt.utils.event.MinionEvent(dict(sock_dir=self.sock_dir), listen=False)
  89. id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes('')).hexdigest()[:10]
  90. self.assertEqual(
  91. me.puburi,
  92. '{0}'.format(
  93. os.path.join(
  94. self.sock_dir, 'minion_event_{0}_pub.ipc'.format(id_hash)
  95. )
  96. )
  97. )
  98. self.assertEqual(
  99. me.pulluri,
  100. '{0}'.format(
  101. os.path.join(
  102. self.sock_dir, 'minion_event_{0}_pull.ipc'.format(id_hash)
  103. )
  104. )
  105. )
  106. def test_event_single(self):
  107. '''Test a single event is received'''
  108. with eventpublisher_process(self.sock_dir):
  109. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  110. me.fire_event({'data': 'foo1'}, 'evt1')
  111. evt1 = me.get_event(tag='evt1')
  112. self.assertGotEvent(evt1, {'data': 'foo1'})
  113. def test_event_single_no_block(self):
  114. '''Test a single event is received, no block'''
  115. with eventpublisher_process(self.sock_dir):
  116. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  117. start = time.time()
  118. finish = start + 5
  119. evt1 = me.get_event(wait=0, tag='evt1', no_block=True)
  120. # We should get None and way before the 5 seconds wait since it's
  121. # non-blocking, otherwise it would wait for an event which we
  122. # didn't even send
  123. self.assertIsNone(evt1, None)
  124. self.assertLess(start, finish)
  125. me.fire_event({'data': 'foo1'}, 'evt1')
  126. evt1 = me.get_event(wait=0, tag='evt1')
  127. self.assertGotEvent(evt1, {'data': 'foo1'})
  128. def test_event_single_wait_0_no_block_False(self):
  129. '''Test a single event is received with wait=0 and no_block=False and doesn't spin the while loop'''
  130. with eventpublisher_process(self.sock_dir):
  131. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  132. me.fire_event({'data': 'foo1'}, 'evt1')
  133. # This is too fast and will be None but assures we're not blocking
  134. evt1 = me.get_event(wait=0, tag='evt1', no_block=False)
  135. self.assertGotEvent(evt1, {'data': 'foo1'})
  136. def test_event_timeout(self):
  137. '''Test no event is received if the timeout is reached'''
  138. with eventpublisher_process(self.sock_dir):
  139. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  140. me.fire_event({'data': 'foo1'}, 'evt1')
  141. evt1 = me.get_event(tag='evt1')
  142. self.assertGotEvent(evt1, {'data': 'foo1'})
  143. evt2 = me.get_event(tag='evt1')
  144. self.assertIsNone(evt2)
  145. def test_event_no_timeout(self):
  146. '''Test no wait timeout, we should block forever, until we get one '''
  147. with eventpublisher_process(self.sock_dir):
  148. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  149. with eventsender_process({'data': 'foo2'}, 'evt2', self.sock_dir, 5):
  150. evt = me.get_event(tag='evt2', wait=0, no_block=False)
  151. self.assertGotEvent(evt, {'data': 'foo2'})
  152. def test_event_matching(self):
  153. '''Test a startswith match'''
  154. with eventpublisher_process(self.sock_dir):
  155. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  156. me.fire_event({'data': 'foo1'}, 'evt1')
  157. evt1 = me.get_event(tag='ev')
  158. self.assertGotEvent(evt1, {'data': 'foo1'})
  159. def test_event_matching_regex(self):
  160. '''Test a regex match'''
  161. with eventpublisher_process(self.sock_dir):
  162. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  163. me.fire_event({'data': 'foo1'}, 'evt1')
  164. evt1 = me.get_event(tag='^ev', match_type='regex')
  165. self.assertGotEvent(evt1, {'data': 'foo1'})
  166. def test_event_matching_all(self):
  167. '''Test an all match'''
  168. with eventpublisher_process(self.sock_dir):
  169. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  170. me.fire_event({'data': 'foo1'}, 'evt1')
  171. evt1 = me.get_event(tag='')
  172. self.assertGotEvent(evt1, {'data': 'foo1'})
  173. def test_event_matching_all_when_tag_is_None(self):
  174. '''Test event matching all when not passing a tag'''
  175. with eventpublisher_process(self.sock_dir):
  176. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  177. me.fire_event({'data': 'foo1'}, 'evt1')
  178. evt1 = me.get_event()
  179. self.assertGotEvent(evt1, {'data': 'foo1'})
  180. def test_event_not_subscribed(self):
  181. '''Test get_event drops non-subscribed events'''
  182. with eventpublisher_process(self.sock_dir):
  183. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  184. me.fire_event({'data': 'foo1'}, 'evt1')
  185. me.fire_event({'data': 'foo2'}, 'evt2')
  186. evt2 = me.get_event(tag='evt2')
  187. evt1 = me.get_event(tag='evt1')
  188. self.assertGotEvent(evt2, {'data': 'foo2'})
  189. self.assertIsNone(evt1)
  190. def test_event_subscription_cache(self):
  191. '''Test subscriptions cache a message until requested'''
  192. with eventpublisher_process(self.sock_dir):
  193. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  194. me.subscribe('evt1')
  195. me.fire_event({'data': 'foo1'}, 'evt1')
  196. me.fire_event({'data': 'foo2'}, 'evt2')
  197. evt2 = me.get_event(tag='evt2')
  198. evt1 = me.get_event(tag='evt1')
  199. self.assertGotEvent(evt2, {'data': 'foo2'})
  200. self.assertGotEvent(evt1, {'data': 'foo1'})
  201. def test_event_subscriptions_cache_regex(self):
  202. '''Test regex subscriptions cache a message until requested'''
  203. with eventpublisher_process(self.sock_dir):
  204. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  205. me.subscribe('e..1$', 'regex')
  206. me.fire_event({'data': 'foo1'}, 'evt1')
  207. me.fire_event({'data': 'foo2'}, 'evt2')
  208. evt2 = me.get_event(tag='evt2')
  209. evt1 = me.get_event(tag='evt1')
  210. self.assertGotEvent(evt2, {'data': 'foo2'})
  211. self.assertGotEvent(evt1, {'data': 'foo1'})
  212. def test_event_multiple_clients(self):
  213. '''Test event is received by multiple clients'''
  214. with eventpublisher_process(self.sock_dir):
  215. me1 = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  216. me2 = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  217. # We need to sleep here to avoid a race condition wherein
  218. # the second socket may not be connected by the time the first socket
  219. # sends the event.
  220. time.sleep(0.5)
  221. me1.fire_event({'data': 'foo1'}, 'evt1')
  222. evt1 = me1.get_event(tag='evt1')
  223. self.assertGotEvent(evt1, {'data': 'foo1'})
  224. evt2 = me2.get_event(tag='evt1')
  225. self.assertGotEvent(evt2, {'data': 'foo1'})
  226. @expectedFailure
  227. def test_event_nested_sub_all(self):
  228. '''Test nested event subscriptions do not drop events, get event for all tags'''
  229. # Show why not to call get_event(tag='')
  230. with eventpublisher_process(self.sock_dir):
  231. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  232. me.fire_event({'data': 'foo1'}, 'evt1')
  233. me.fire_event({'data': 'foo2'}, 'evt2')
  234. evt2 = me.get_event(tag='')
  235. evt1 = me.get_event(tag='')
  236. self.assertGotEvent(evt2, {'data': 'foo2'})
  237. self.assertGotEvent(evt1, {'data': 'foo1'})
  238. def test_event_many(self):
  239. '''Test a large number of events, one at a time'''
  240. with eventpublisher_process(self.sock_dir):
  241. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  242. for i in range(500):
  243. me.fire_event({'data': '{0}'.format(i)}, 'testevents')
  244. evt = me.get_event(tag='testevents')
  245. self.assertGotEvent(evt, {'data': '{0}'.format(i)}, 'Event {0}'.format(i))
  246. def test_event_many_backlog(self):
  247. '''Test a large number of events, send all then recv all'''
  248. with eventpublisher_process(self.sock_dir):
  249. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  250. # Must not exceed zmq HWM
  251. for i in range(500):
  252. me.fire_event({'data': '{0}'.format(i)}, 'testevents')
  253. for i in range(500):
  254. evt = me.get_event(tag='testevents')
  255. self.assertGotEvent(evt, {'data': '{0}'.format(i)}, 'Event {0}'.format(i))
  256. # Test the fire_master function. As it wraps the underlying fire_event,
  257. # we don't need to perform extensive testing.
  258. def test_send_master_event(self):
  259. '''Tests that sending an event through fire_master generates expected event'''
  260. with eventpublisher_process(self.sock_dir):
  261. me = salt.utils.event.MasterEvent(self.sock_dir, listen=True)
  262. data = {'data': 'foo1'}
  263. me.fire_master(data, 'test_master')
  264. evt = me.get_event(tag='fire_master')
  265. self.assertGotEvent(evt, {'data': data, 'tag': 'test_master', 'events': None, 'pretag': None})
  266. class TestAsyncEventPublisher(AsyncTestCase):
  267. def get_new_ioloop(self):
  268. return salt.ext.tornado.ioloop.IOLoop()
  269. def setUp(self):
  270. super(TestAsyncEventPublisher, self).setUp()
  271. self.sock_dir = os.path.join(RUNTIME_VARS.TMP, 'test-socks')
  272. if not os.path.exists(self.sock_dir):
  273. os.makedirs(self.sock_dir)
  274. self.addCleanup(shutil.rmtree, self.sock_dir, ignore_errors=True)
  275. self.opts = {'sock_dir': self.sock_dir}
  276. self.publisher = salt.utils.event.AsyncEventPublisher(
  277. self.opts,
  278. self.io_loop,
  279. )
  280. self.event = salt.utils.event.get_event('minion', opts=self.opts, io_loop=self.io_loop)
  281. self.event.subscribe('')
  282. self.event.set_event_handler(self._handle_publish)
  283. def _handle_publish(self, raw):
  284. self.tag, self.data = salt.utils.event.SaltEvent.unpack(raw)
  285. self.stop()
  286. def test_event_subscription(self):
  287. '''Test a single event is received'''
  288. me = salt.utils.event.MinionEvent(self.opts, listen=True)
  289. me.fire_event({'data': 'foo1'}, 'evt1')
  290. self.wait()
  291. evt1 = me.get_event(tag='evt1')
  292. self.assertEqual(self.tag, 'evt1')
  293. self.data.pop('_stamp') # drop the stamp
  294. self.assertEqual(self.data, {'data': 'foo1'})
  295. class TestEventReturn(TestCase):
  296. def test_event_return(self):
  297. # Once salt is py3 only, the warnings part of this test no longer applies
  298. evt = None
  299. try:
  300. with warnings.catch_warnings(record=True) as w:
  301. # Cause all warnings to always be triggered.
  302. warnings.simplefilter("always")
  303. evt = None
  304. try:
  305. evt = salt.utils.event.EventReturn(salt.config.DEFAULT_MASTER_OPTS.copy())
  306. evt.start()
  307. except TypeError as exc:
  308. if 'object' in str(exc):
  309. self.fail('\'{}\' TypeError should have not been raised'.format(exc))
  310. for warning in w:
  311. if warning.category is DeprecationWarning:
  312. assert 'object() takes no parameters' not in warning.message
  313. finally:
  314. if evt is not None:
  315. terminate_process(evt.pid, kill_children=True)