events.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. # -*- coding: utf-8 -*-
  2. '''
  3. tests.support.events
  4. ~~~~~~~~~~~~~~~~~~~~
  5. '''
  6. # Import Python libs
  7. from __future__ import absolute_import, unicode_literals
  8. import os
  9. import time
  10. import multiprocessing
  11. from contextlib import contextmanager
  12. # Import Salt libs
  13. import salt.utils.event
  14. from salt.utils.process import clean_proc
  15. @contextmanager
  16. def eventpublisher_process(sock_dir):
  17. proc = salt.utils.event.EventPublisher({'sock_dir': sock_dir})
  18. proc.start()
  19. try:
  20. if os.environ.get('TRAVIS_PYTHON_VERSION', None) is not None:
  21. # Travis is slow
  22. time.sleep(10)
  23. else:
  24. time.sleep(2)
  25. yield
  26. finally:
  27. clean_proc(proc)
  28. class EventSender(multiprocessing.Process):
  29. def __init__(self, data, tag, wait, sock_dir):
  30. super(EventSender, self).__init__()
  31. self.data = data
  32. self.tag = tag
  33. self.wait = wait
  34. self.sock_dir = sock_dir
  35. def run(self):
  36. me = salt.utils.event.MasterEvent(self.sock_dir, listen=False)
  37. time.sleep(self.wait)
  38. me.fire_event(self.data, self.tag)
  39. # Wait a few seconds before tearing down the zmq context
  40. if os.environ.get('TRAVIS_PYTHON_VERSION', None) is not None:
  41. # Travis is slow
  42. time.sleep(10)
  43. else:
  44. time.sleep(2)
  45. @contextmanager
  46. def eventsender_process(data, tag, sock_dir, wait=0):
  47. proc = EventSender(data, tag, wait, sock_dir=sock_dir)
  48. proc.start()
  49. try:
  50. yield
  51. finally:
  52. clean_proc(proc)