events.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. # -*- coding: utf-8 -*-
  2. """
  3. tests.support.events
  4. ~~~~~~~~~~~~~~~~~~~~
  5. """
  6. # Import Python libs
  7. from __future__ import absolute_import, unicode_literals
  8. import multiprocessing
  9. import os
  10. import time
  11. from contextlib import contextmanager
  12. # Import Salt libs
  13. import salt.utils.event
  14. from salt.utils.process import clean_proc
  15. @contextmanager
  16. def eventpublisher_process(sock_dir):
  17. proc = salt.utils.event.EventPublisher({"sock_dir": sock_dir})
  18. proc.start()
  19. try:
  20. if os.environ.get("TRAVIS_PYTHON_VERSION", None) is not None:
  21. # Travis is slow
  22. time.sleep(10)
  23. else:
  24. time.sleep(2)
  25. yield
  26. finally:
  27. clean_proc(proc)
  28. class EventSender(multiprocessing.Process):
  29. def __init__(self, data, tag, wait, sock_dir):
  30. super(EventSender, self).__init__()
  31. self.data = data
  32. self.tag = tag
  33. self.wait = wait
  34. self.sock_dir = sock_dir
  35. def run(self):
  36. me = salt.utils.event.MasterEvent(self.sock_dir, listen=False)
  37. time.sleep(self.wait)
  38. me.fire_event(self.data, self.tag)
  39. # Wait a few seconds before tearing down the zmq context
  40. if os.environ.get("TRAVIS_PYTHON_VERSION", None) is not None:
  41. # Travis is slow
  42. time.sleep(10)
  43. else:
  44. time.sleep(2)
  45. @contextmanager
  46. def eventsender_process(data, tag, sock_dir, wait=0):
  47. proc = EventSender(data, tag, wait, sock_dir=sock_dir)
  48. proc.start()
  49. try:
  50. yield
  51. finally:
  52. clean_proc(proc)