test_event.py 14 KB


  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.unit.utils.event_test
  5. ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  6. '''
  7. # Import python libs
  8. from __future__ import absolute_import, unicode_literals, print_function
  9. import os
  10. import hashlib
  11. import time
  12. from tornado.testing import AsyncTestCase
  13. import zmq
  14. import zmq.eventloop.ioloop
  15. # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
  16. if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
  17. zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
  18. from contextlib import contextmanager
  19. from multiprocessing import Process
  20. # Import Salt Testing libs
  21. from tests.support.unit import expectedFailure, skipIf, TestCase
  22. # Import salt libs
  23. import salt.utils.event
  24. import salt.utils.stringutils
  25. import tests.integration as integration
  26. from salt.utils.process import clean_proc
  27. # Import 3rd-+arty libs
  28. from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
  29. SOCK_DIR = os.path.join(integration.TMP, 'test-socks')
  30. NO_LONG_IPC = False
  31. if getattr(zmq, 'IPC_PATH_MAX_LEN', 103) <= 103:
  32. NO_LONG_IPC = True
  33. @contextmanager
  34. def eventpublisher_process():
  35. proc = salt.utils.event.EventPublisher({'sock_dir': SOCK_DIR})
  36. proc.start()
  37. try:
  38. if os.environ.get('TRAVIS_PYTHON_VERSION', None) is not None:
  39. # Travis is slow
  40. time.sleep(10)
  41. else:
  42. time.sleep(2)
  43. yield
  44. finally:
  45. clean_proc(proc)
  46. class EventSender(Process):
  47. def __init__(self, data, tag, wait):
  48. super(EventSender, self).__init__()
  49. self.data = data
  50. self.tag = tag
  51. self.wait = wait
  52. def run(self):
  53. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=False)
  54. time.sleep(self.wait)
  55. me.fire_event(self.data, self.tag)
  56. # Wait a few seconds before tearing down the zmq context
  57. if os.environ.get('TRAVIS_PYTHON_VERSION', None) is not None:
  58. # Travis is slow
  59. time.sleep(10)
  60. else:
  61. time.sleep(2)
  62. @contextmanager
  63. def eventsender_process(data, tag, wait=0):
  64. proc = EventSender(data, tag, wait)
  65. proc.start()
  66. try:
  67. yield
  68. finally:
  69. clean_proc(proc)
  70. @skipIf(NO_LONG_IPC, "This system does not support long IPC paths. Skipping event tests!")
  71. class TestSaltEvent(TestCase):
  72. def setUp(self):
  73. if not os.path.exists(SOCK_DIR):
  74. os.makedirs(SOCK_DIR)
  75. def assertGotEvent(self, evt, data, msg=None):
  76. self.assertIsNotNone(evt, msg)
  77. for key in data:
  78. self.assertIn(key, evt, '{0}: Key {1} missing'.format(msg, key))
  79. assertMsg = '{0}: Key {1} value mismatch, {2} != {3}'
  80. assertMsg = assertMsg.format(msg, key, data[key], evt[key])
  81. self.assertEqual(data[key], evt[key], assertMsg)
  82. def test_master_event(self):
  83. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=False)
  84. self.assertEqual(
  85. me.puburi, '{0}'.format(
  86. os.path.join(SOCK_DIR, 'master_event_pub.ipc')
  87. )
  88. )
  89. self.assertEqual(
  90. me.pulluri,
  91. '{0}'.format(
  92. os.path.join(SOCK_DIR, 'master_event_pull.ipc')
  93. )
  94. )
  95. def test_minion_event(self):
  96. opts = dict(id='foo', sock_dir=SOCK_DIR)
  97. id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes(opts['id'])).hexdigest()[:10]
  98. me = salt.utils.event.MinionEvent(opts, listen=False)
  99. self.assertEqual(
  100. me.puburi,
  101. '{0}'.format(
  102. os.path.join(
  103. SOCK_DIR, 'minion_event_{0}_pub.ipc'.format(id_hash)
  104. )
  105. )
  106. )
  107. self.assertEqual(
  108. me.pulluri,
  109. '{0}'.format(
  110. os.path.join(
  111. SOCK_DIR, 'minion_event_{0}_pull.ipc'.format(id_hash)
  112. )
  113. )
  114. )
  115. def test_minion_event_tcp_ipc_mode(self):
  116. opts = dict(id='foo', ipc_mode='tcp')
  117. me = salt.utils.event.MinionEvent(opts, listen=False)
  118. self.assertEqual(me.puburi, 4510)
  119. self.assertEqual(me.pulluri, 4511)
  120. def test_minion_event_no_id(self):
  121. me = salt.utils.event.MinionEvent(dict(sock_dir=SOCK_DIR), listen=False)
  122. id_hash = hashlib.sha256(salt.utils.stringutils.to_bytes('')).hexdigest()[:10]
  123. self.assertEqual(
  124. me.puburi,
  125. '{0}'.format(
  126. os.path.join(
  127. SOCK_DIR, 'minion_event_{0}_pub.ipc'.format(id_hash)
  128. )
  129. )
  130. )
  131. self.assertEqual(
  132. me.pulluri,
  133. '{0}'.format(
  134. os.path.join(
  135. SOCK_DIR, 'minion_event_{0}_pull.ipc'.format(id_hash)
  136. )
  137. )
  138. )
  139. def test_event_single(self):
  140. '''Test a single event is received'''
  141. with eventpublisher_process():
  142. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  143. me.fire_event({'data': 'foo1'}, 'evt1')
  144. evt1 = me.get_event(tag='evt1')
  145. self.assertGotEvent(evt1, {'data': 'foo1'})
  146. def test_event_single_no_block(self):
  147. '''Test a single event is received, no block'''
  148. with eventpublisher_process():
  149. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  150. start = time.time()
  151. finish = start + 5
  152. evt1 = me.get_event(wait=0, tag='evt1', no_block=True)
  153. # We should get None and way before the 5 seconds wait since it's
  154. # non-blocking, otherwise it would wait for an event which we
  155. # didn't even send
  156. self.assertIsNone(evt1, None)
  157. self.assertLess(start, finish)
  158. me.fire_event({'data': 'foo1'}, 'evt1')
  159. evt1 = me.get_event(wait=0, tag='evt1')
  160. self.assertGotEvent(evt1, {'data': 'foo1'})
  161. def test_event_single_wait_0_no_block_False(self):
  162. '''Test a single event is received with wait=0 and no_block=False and doesn't spin the while loop'''
  163. with eventpublisher_process():
  164. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  165. me.fire_event({'data': 'foo1'}, 'evt1')
  166. # This is too fast and will be None but assures we're not blocking
  167. evt1 = me.get_event(wait=0, tag='evt1', no_block=False)
  168. self.assertGotEvent(evt1, {'data': 'foo1'})
  169. def test_event_timeout(self):
  170. '''Test no event is received if the timeout is reached'''
  171. with eventpublisher_process():
  172. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  173. me.fire_event({'data': 'foo1'}, 'evt1')
  174. evt1 = me.get_event(tag='evt1')
  175. self.assertGotEvent(evt1, {'data': 'foo1'})
  176. evt2 = me.get_event(tag='evt1')
  177. self.assertIsNone(evt2)
  178. def test_event_no_timeout(self):
  179. '''Test no wait timeout, we should block forever, until we get one '''
  180. with eventpublisher_process():
  181. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  182. with eventsender_process({'data': 'foo2'}, 'evt2', 5):
  183. evt = me.get_event(tag='evt2', wait=0, no_block=False)
  184. self.assertGotEvent(evt, {'data': 'foo2'})
  185. def test_event_matching(self):
  186. '''Test a startswith match'''
  187. with eventpublisher_process():
  188. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  189. me.fire_event({'data': 'foo1'}, 'evt1')
  190. evt1 = me.get_event(tag='ev')
  191. self.assertGotEvent(evt1, {'data': 'foo1'})
  192. def test_event_matching_regex(self):
  193. '''Test a regex match'''
  194. with eventpublisher_process():
  195. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  196. me.fire_event({'data': 'foo1'}, 'evt1')
  197. evt1 = me.get_event(tag='^ev', match_type='regex')
  198. self.assertGotEvent(evt1, {'data': 'foo1'})
  199. def test_event_matching_all(self):
  200. '''Test an all match'''
  201. with eventpublisher_process():
  202. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  203. me.fire_event({'data': 'foo1'}, 'evt1')
  204. evt1 = me.get_event(tag='')
  205. self.assertGotEvent(evt1, {'data': 'foo1'})
  206. def test_event_matching_all_when_tag_is_None(self):
  207. '''Test event matching all when not passing a tag'''
  208. with eventpublisher_process():
  209. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  210. me.fire_event({'data': 'foo1'}, 'evt1')
  211. evt1 = me.get_event()
  212. self.assertGotEvent(evt1, {'data': 'foo1'})
  213. def test_event_not_subscribed(self):
  214. '''Test get_event drops non-subscribed events'''
  215. with eventpublisher_process():
  216. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  217. me.fire_event({'data': 'foo1'}, 'evt1')
  218. me.fire_event({'data': 'foo2'}, 'evt2')
  219. evt2 = me.get_event(tag='evt2')
  220. evt1 = me.get_event(tag='evt1')
  221. self.assertGotEvent(evt2, {'data': 'foo2'})
  222. self.assertIsNone(evt1)
  223. def test_event_subscription_cache(self):
  224. '''Test subscriptions cache a message until requested'''
  225. with eventpublisher_process():
  226. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  227. me.subscribe('evt1')
  228. me.fire_event({'data': 'foo1'}, 'evt1')
  229. me.fire_event({'data': 'foo2'}, 'evt2')
  230. evt2 = me.get_event(tag='evt2')
  231. evt1 = me.get_event(tag='evt1')
  232. self.assertGotEvent(evt2, {'data': 'foo2'})
  233. self.assertGotEvent(evt1, {'data': 'foo1'})
  234. def test_event_subscriptions_cache_regex(self):
  235. '''Test regex subscriptions cache a message until requested'''
  236. with eventpublisher_process():
  237. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  238. me.subscribe('e..1$', 'regex')
  239. me.fire_event({'data': 'foo1'}, 'evt1')
  240. me.fire_event({'data': 'foo2'}, 'evt2')
  241. evt2 = me.get_event(tag='evt2')
  242. evt1 = me.get_event(tag='evt1')
  243. self.assertGotEvent(evt2, {'data': 'foo2'})
  244. self.assertGotEvent(evt1, {'data': 'foo1'})
  245. def test_event_multiple_clients(self):
  246. '''Test event is received by multiple clients'''
  247. with eventpublisher_process():
  248. me1 = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  249. me2 = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  250. # We need to sleep here to avoid a race condition wherein
  251. # the second socket may not be connected by the time the first socket
  252. # sends the event.
  253. time.sleep(0.5)
  254. me1.fire_event({'data': 'foo1'}, 'evt1')
  255. evt1 = me1.get_event(tag='evt1')
  256. self.assertGotEvent(evt1, {'data': 'foo1'})
  257. evt2 = me2.get_event(tag='evt1')
  258. self.assertGotEvent(evt2, {'data': 'foo1'})
  259. @expectedFailure
  260. def test_event_nested_sub_all(self):
  261. '''Test nested event subscriptions do not drop events, get event for all tags'''
  262. # Show why not to call get_event(tag='')
  263. with eventpublisher_process():
  264. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  265. me.fire_event({'data': 'foo1'}, 'evt1')
  266. me.fire_event({'data': 'foo2'}, 'evt2')
  267. evt2 = me.get_event(tag='')
  268. evt1 = me.get_event(tag='')
  269. self.assertGotEvent(evt2, {'data': 'foo2'})
  270. self.assertGotEvent(evt1, {'data': 'foo1'})
  271. def test_event_many(self):
  272. '''Test a large number of events, one at a time'''
  273. with eventpublisher_process():
  274. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  275. for i in range(500):
  276. me.fire_event({'data': '{0}'.format(i)}, 'testevents')
  277. evt = me.get_event(tag='testevents')
  278. self.assertGotEvent(evt, {'data': '{0}'.format(i)}, 'Event {0}'.format(i))
  279. def test_event_many_backlog(self):
  280. '''Test a large number of events, send all then recv all'''
  281. with eventpublisher_process():
  282. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  283. # Must not exceed zmq HWM
  284. for i in range(500):
  285. me.fire_event({'data': '{0}'.format(i)}, 'testevents')
  286. for i in range(500):
  287. evt = me.get_event(tag='testevents')
  288. self.assertGotEvent(evt, {'data': '{0}'.format(i)}, 'Event {0}'.format(i))
  289. # Test the fire_master function. As it wraps the underlying fire_event,
  290. # we don't need to perform extensive testing.
  291. def test_send_master_event(self):
  292. '''Tests that sending an event through fire_master generates expected event'''
  293. with eventpublisher_process():
  294. me = salt.utils.event.MasterEvent(SOCK_DIR, listen=True)
  295. data = {'data': 'foo1'}
  296. me.fire_master(data, 'test_master')
  297. evt = me.get_event(tag='fire_master')
  298. self.assertGotEvent(evt, {'data': data, 'tag': 'test_master', 'events': None, 'pretag': None})
  299. class TestAsyncEventPublisher(AsyncTestCase):
  300. def get_new_ioloop(self):
  301. return zmq.eventloop.ioloop.ZMQIOLoop()
  302. def setUp(self):
  303. super(TestAsyncEventPublisher, self).setUp()
  304. self.opts = {'sock_dir': SOCK_DIR}
  305. self.publisher = salt.utils.event.AsyncEventPublisher(
  306. self.opts,
  307. self.io_loop,
  308. )
  309. self.event = salt.utils.event.get_event('minion', opts=self.opts, io_loop=self.io_loop)
  310. self.event.subscribe('')
  311. self.event.set_event_handler(self._handle_publish)
  312. def _handle_publish(self, raw):
  313. self.tag, self.data = salt.utils.event.SaltEvent.unpack(raw)
  314. self.stop()
  315. def test_event_subscription(self):
  316. '''Test a single event is received'''
  317. me = salt.utils.event.MinionEvent(self.opts, listen=True)
  318. me.fire_event({'data': 'foo1'}, 'evt1')
  319. self.wait()
  320. evt1 = me.get_event(tag='evt1')
  321. self.assertEqual(self.tag, 'evt1')
  322. self.data.pop('_stamp') # drop the stamp
  323. self.assertEqual(self.data, {'data': 'foo1'})