test_event.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.unit.utils.event_test
  5. ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  6. '''
  7. # Import python libs
  8. from __future__ import absolute_import, unicode_literals, print_function
  9. import os
  10. import hashlib
  11. import time
  12. import warnings
  13. from tornado.testing import AsyncTestCase
  14. import zmq
  15. import zmq.eventloop.ioloop
  16. # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
  17. if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
  18. zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
  19. from contextlib import contextmanager
  20. from multiprocessing import Process
  21. # Import Salt Testing libs
  22. from tests.support.unit import expectedFailure, skipIf, TestCase
  23. # Import salt libs
  24. import salt.config
  25. import salt.utils.event
  26. import salt.utils.stringutils
  27. import tests.integration as integration
  28. from salt.utils.process import clean_proc
  29. # Import 3rd-+arty libs
  30. from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
  31. from tests.support.processes import terminate_process
  32. SOCK_DIR = os.path.join(integration.TMP, 'test-socks')
  33. NO_LONG_IPC = False
  34. if getattr(zmq, 'IPC_PATH_MAX_LEN', 103) <= 103:
  35. NO_LONG_IPC = True
  36. @contextmanager
  37. def eventpublisher_process():
  38. proc = salt.utils.event.EventPublisher({'sock_dir': SOCK_DIR})
  39. proc.start()
  40. try:
  41. if os.environ.get('TRAVIS_PYTHON_VERSION', None) is not None:
  42. # Travis is slow
  43. time.sleep(10)
  44. else:
  45. time.sleep(2)
  46. yield
  47. finally:
  48. clean_proc(proc)
  49. class EventSender(Process):
  50. def __init__(self, data, tag, wait):
  51. super(EventSender, self).__init__()
  52. self.data = data
  53. self.tag = tag
  54. self.wait = wait
  55. def run(self):
  56. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=False)
  57. time.sleep(self.wait)
  58. me.fire_event(self.data, self.tag)
  59. # Wait a few seconds before tearing down the zmq context
  60. if os.environ.get('TRAVIS_PYTHON_VERSION', None) is not None:
  61. # Travis is slow
  62. time.sleep(10)
  63. else:
  64. time.sleep(2)
  65. @contextmanager
  66. def eventsender_process(data, tag, wait=0):
  67. proc = EventSender(data, tag, wait)
  68. proc.start()
  69. try:
  70. yield
  71. finally:
  72. clean_proc(proc)
  73. @skipIf(NO_LONG_IPC, "This system does not support long IPC paths. Skipping event tests!")
  74. class TestSaltEvent(TestCase):
  75. def setUp(self):
  76. if not os.path.exists(SOCK_DIR):
  77. os.makedirs(SOCK_DIR)
  78. def assertGotEvent(self, evt, data, msg=None):
  79. self.assertIsNotNone(evt, msg)
  80. for key in data:
  81. self.assertIn(key, evt, '{0}: Key {1} missing'.format(msg, key))
  82. assertMsg = '{0}: Key {1} value mismatch, {2} != {3}'
  83. assertMsg = assertMsg.format(msg, key, data[key], evt[key])
  84. self.assertEqual(data[key], evt[key], assertMsg)
  85. def test_master_event(self):
  86. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=False)
  87. self.assertEqual(
  88. me.puburi, '{0}'.format(
  89. os.path.join(SOCK_DIR, 'master_event_pub.ipc')
  90. )
  91. )
  92. self.assertEqual(
  93. me.pulluri,
  94. '{0}'.format(
  95. os.path.join(SOCK_DIR, 'master_event_pull.ipc')
  96. )
  97. )
  98. def test_minion_event(self):
  99. opts = dict(id='foo', sock_dir=SOCK_DIR)
  100. id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes(opts['id'])).hexdigest()[:10]
  101. me = salt.utils.event.MinionEvent(opts, listen=False)
  102. self.assertEqual(
  103. me.puburi,
  104. '{0}'.format(
  105. os.path.join(
  106. SOCK_DIR, 'minion_event_{0}_pub.ipc'.format(id_hash)
  107. )
  108. )
  109. )
  110. self.assertEqual(
  111. me.pulluri,
  112. '{0}'.format(
  113. os.path.join(
  114. SOCK_DIR, 'minion_event_{0}_pull.ipc'.format(id_hash)
  115. )
  116. )
  117. )
  118. def test_minion_event_tcp_ipc_mode(self):
  119. opts = dict(id='foo', ipc_mode='tcp')
  120. me = salt.utils.event.MinionEvent(opts, listen=False)
  121. self.assertEqual(me.puburi, 4510)
  122. self.assertEqual(me.pulluri, 4511)
  123. def test_minion_event_no_id(self):
  124. me = salt.utils.event.MinionEvent(dict(sock_dir=SOCK_DIR), listen=False)
  125. id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes('')).hexdigest()[:10]
  126. self.assertEqual(
  127. me.puburi,
  128. '{0}'.format(
  129. os.path.join(
  130. SOCK_DIR, 'minion_event_{0}_pub.ipc'.format(id_hash)
  131. )
  132. )
  133. )
  134. self.assertEqual(
  135. me.pulluri,
  136. '{0}'.format(
  137. os.path.join(
  138. SOCK_DIR, 'minion_event_{0}_pull.ipc'.format(id_hash)
  139. )
  140. )
  141. )
  142. def test_event_single(self):
  143. '''Test a single event is received'''
  144. with eventpublisher_process():
  145. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  146. me.fire_event({'data': 'foo1'}, 'evt1')
  147. evt1 = me.get_event(tag='evt1')
  148. self.assertGotEvent(evt1, {'data': 'foo1'})
  149. def test_event_single_no_block(self):
  150. '''Test a single event is received, no block'''
  151. with eventpublisher_process():
  152. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  153. start = time.time()
  154. finish = start + 5
  155. evt1 = me.get_event(wait=0, tag='evt1', no_block=True)
  156. # We should get None and way before the 5 seconds wait since it's
  157. # non-blocking, otherwise it would wait for an event which we
  158. # didn't even send
  159. self.assertIsNone(evt1, None)
  160. self.assertLess(start, finish)
  161. me.fire_event({'data': 'foo1'}, 'evt1')
  162. evt1 = me.get_event(wait=0, tag='evt1')
  163. self.assertGotEvent(evt1, {'data': 'foo1'})
  164. def test_event_single_wait_0_no_block_False(self):
  165. '''Test a single event is received with wait=0 and no_block=False and doesn't spin the while loop'''
  166. with eventpublisher_process():
  167. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  168. me.fire_event({'data': 'foo1'}, 'evt1')
  169. # This is too fast and will be None but assures we're not blocking
  170. evt1 = me.get_event(wait=0, tag='evt1', no_block=False)
  171. self.assertGotEvent(evt1, {'data': 'foo1'})
  172. def test_event_timeout(self):
  173. '''Test no event is received if the timeout is reached'''
  174. with eventpublisher_process():
  175. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  176. me.fire_event({'data': 'foo1'}, 'evt1')
  177. evt1 = me.get_event(tag='evt1')
  178. self.assertGotEvent(evt1, {'data': 'foo1'})
  179. evt2 = me.get_event(tag='evt1')
  180. self.assertIsNone(evt2)
  181. def test_event_no_timeout(self):
  182. '''Test no wait timeout, we should block forever, until we get one '''
  183. with eventpublisher_process():
  184. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  185. with eventsender_process({'data': 'foo2'}, 'evt2', 5):
  186. evt = me.get_event(tag='evt2', wait=0, no_block=False)
  187. self.assertGotEvent(evt, {'data': 'foo2'})
  188. def test_event_matching(self):
  189. '''Test a startswith match'''
  190. with eventpublisher_process():
  191. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  192. me.fire_event({'data': 'foo1'}, 'evt1')
  193. evt1 = me.get_event(tag='ev')
  194. self.assertGotEvent(evt1, {'data': 'foo1'})
  195. def test_event_matching_regex(self):
  196. '''Test a regex match'''
  197. with eventpublisher_process():
  198. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  199. me.fire_event({'data': 'foo1'}, 'evt1')
  200. evt1 = me.get_event(tag='^ev', match_type='regex')
  201. self.assertGotEvent(evt1, {'data': 'foo1'})
  202. def test_event_matching_all(self):
  203. '''Test an all match'''
  204. with eventpublisher_process():
  205. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  206. me.fire_event({'data': 'foo1'}, 'evt1')
  207. evt1 = me.get_event(tag='')
  208. self.assertGotEvent(evt1, {'data': 'foo1'})
  209. def test_event_matching_all_when_tag_is_None(self):
  210. '''Test event matching all when not passing a tag'''
  211. with eventpublisher_process():
  212. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  213. me.fire_event({'data': 'foo1'}, 'evt1')
  214. evt1 = me.get_event()
  215. self.assertGotEvent(evt1, {'data': 'foo1'})
  216. def test_event_not_subscribed(self):
  217. '''Test get_event drops non-subscribed events'''
  218. with eventpublisher_process():
  219. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  220. me.fire_event({'data': 'foo1'}, 'evt1')
  221. me.fire_event({'data': 'foo2'}, 'evt2')
  222. evt2 = me.get_event(tag='evt2')
  223. evt1 = me.get_event(tag='evt1')
  224. self.assertGotEvent(evt2, {'data': 'foo2'})
  225. self.assertIsNone(evt1)
  226. def test_event_subscription_cache(self):
  227. '''Test subscriptions cache a message until requested'''
  228. with eventpublisher_process():
  229. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  230. me.subscribe('evt1')
  231. me.fire_event({'data': 'foo1'}, 'evt1')
  232. me.fire_event({'data': 'foo2'}, 'evt2')
  233. evt2 = me.get_event(tag='evt2')
  234. evt1 = me.get_event(tag='evt1')
  235. self.assertGotEvent(evt2, {'data': 'foo2'})
  236. self.assertGotEvent(evt1, {'data': 'foo1'})
  237. def test_event_subscriptions_cache_regex(self):
  238. '''Test regex subscriptions cache a message until requested'''
  239. with eventpublisher_process():
  240. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  241. me.subscribe('e..1$', 'regex')
  242. me.fire_event({'data': 'foo1'}, 'evt1')
  243. me.fire_event({'data': 'foo2'}, 'evt2')
  244. evt2 = me.get_event(tag='evt2')
  245. evt1 = me.get_event(tag='evt1')
  246. self.assertGotEvent(evt2, {'data': 'foo2'})
  247. self.assertGotEvent(evt1, {'data': 'foo1'})
  248. def test_event_multiple_clients(self):
  249. '''Test event is received by multiple clients'''
  250. with eventpublisher_process():
  251. me1 = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  252. me2 = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  253. # We need to sleep here to avoid a race condition wherein
  254. # the second socket may not be connected by the time the first socket
  255. # sends the event.
  256. time.sleep(0.5)
  257. me1.fire_event({'data': 'foo1'}, 'evt1')
  258. evt1 = me1.get_event(tag='evt1')
  259. self.assertGotEvent(evt1, {'data': 'foo1'})
  260. evt2 = me2.get_event(tag='evt1')
  261. self.assertGotEvent(evt2, {'data': 'foo1'})
  262. @expectedFailure
  263. def test_event_nested_sub_all(self):
  264. '''Test nested event subscriptions do not drop events, get event for all tags'''
  265. # Show why not to call get_event(tag='')
  266. with eventpublisher_process():
  267. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  268. me.fire_event({'data': 'foo1'}, 'evt1')
  269. me.fire_event({'data': 'foo2'}, 'evt2')
  270. evt2 = me.get_event(tag='')
  271. evt1 = me.get_event(tag='')
  272. self.assertGotEvent(evt2, {'data': 'foo2'})
  273. self.assertGotEvent(evt1, {'data': 'foo1'})
  274. def test_event_many(self):
  275. '''Test a large number of events, one at a time'''
  276. with eventpublisher_process():
  277. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  278. for i in range(500):
  279. me.fire_event({'data': '{0}'.format(i)}, 'testevents')
  280. evt = me.get_event(tag='testevents')
  281. self.assertGotEvent(evt, {'data': '{0}'.format(i)}, 'Event {0}'.format(i))
  282. def test_event_many_backlog(self):
  283. '''Test a large number of events, send all then recv all'''
  284. with eventpublisher_process():
  285. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  286. # Must not exceed zmq HWM
  287. for i in range(500):
  288. me.fire_event({'data': '{0}'.format(i)}, 'testevents')
  289. for i in range(500):
  290. evt = me.get_event(tag='testevents')
  291. self.assertGotEvent(evt, {'data': '{0}'.format(i)}, 'Event {0}'.format(i))
  292. # Test the fire_master function. As it wraps the underlying fire_event,
  293. # we don't need to perform extensive testing.
  294. def test_send_master_event(self):
  295. '''Tests that sending an event through fire_master generates expected event'''
  296. with eventpublisher_process():
  297. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  298. data = {'data': 'foo1'}
  299. me.fire_master(data, 'test_master')
  300. evt = me.get_event(tag='fire_master')
  301. self.assertGotEvent(evt, {'data': data, 'tag': 'test_master', 'events': None, 'pretag': None})
  302. class TestAsyncEventPublisher(AsyncTestCase):
  303. def get_new_ioloop(self):
  304. return zmq.eventloop.ioloop.ZMQIOLoop()
  305. def setUp(self):
  306. super(TestAsyncEventPublisher, self).setUp()
  307. self.opts = {'sock_dir': SOCK_DIR}
  308. self.publisher = salt.utils.event.AsyncEventPublisher(
  309. self.opts,
  310. self.io_loop,
  311. )
  312. self.event = salt.utils.event.get_event('minion', opts=self.opts, io_loop=self.io_loop)
  313. self.event.subscribe('')
  314. self.event.set_event_handler(self._handle_publish)
  315. def _handle_publish(self, raw):
  316. self.tag, self.data = salt.utils.event.SaltEvent.unpack(raw)
  317. self.stop()
  318. def test_event_subscription(self):
  319. '''Test a single event is received'''
  320. me = salt.utils.event.MinionEvent(self.opts, listen=True)
  321. me.fire_event({'data': 'foo1'}, 'evt1')
  322. self.wait()
  323. evt1 = me.get_event(tag='evt1')
  324. self.assertEqual(self.tag, 'evt1')
  325. self.data.pop('_stamp') # drop the stamp
  326. self.assertEqual(self.data, {'data': 'foo1'})
  327. class TestEventReturn(TestCase):
  328. def test_event_return(self):
  329. # Once salt is py3 only, the warnings part of this test no longer applies
  330. evt = None
  331. try:
  332. with warnings.catch_warnings(record=True) as w:
  333. # Cause all warnings to always be triggered.
  334. warnings.simplefilter("always")
  335. evt = None
  336. try:
  337. evt = salt.utils.event.EventReturn(salt.config.DEFAULT_MASTER_OPTS.copy())
  338. evt.start()
  339. except TypeError as exc:
  340. if 'object' in str(exc):
  341. self.fail('\'{}\' TypeError should have not been raised'.format(exc))
  342. for warning in w:
  343. if warning.category is DeprecationWarning:
  344. assert 'object() takes no parameters' not in warning.message
  345. finally:
  346. if evt is not None:
  347. terminate_process(evt.pid, kill_children=True)