1
0

events.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. """
  2. tests.support.events
  3. ~~~~~~~~~~~~~~~~~~~~
  4. """
  5. import multiprocessing
  6. import os
  7. import time
  8. from contextlib import contextmanager
  9. import salt.utils.event
  10. from salt.utils.process import clean_proc
  11. @contextmanager
  12. def eventpublisher_process(sock_dir):
  13. proc = salt.utils.event.EventPublisher({"sock_dir": sock_dir})
  14. proc.start()
  15. try:
  16. if os.environ.get("TRAVIS_PYTHON_VERSION", None) is not None:
  17. # Travis is slow
  18. time.sleep(10)
  19. else:
  20. time.sleep(8)
  21. yield
  22. finally:
  23. clean_proc(proc)
  24. class EventSender(multiprocessing.Process):
  25. def __init__(self, data, tag, wait, sock_dir):
  26. super().__init__()
  27. self.data = data
  28. self.tag = tag
  29. self.wait = wait
  30. self.sock_dir = sock_dir
  31. def run(self):
  32. me = salt.utils.event.MasterEvent(self.sock_dir, listen=False)
  33. time.sleep(self.wait)
  34. me.fire_event(self.data, self.tag)
  35. # Wait a few seconds before tearing down the zmq context
  36. if os.environ.get("TRAVIS_PYTHON_VERSION", None) is not None:
  37. # Travis is slow
  38. time.sleep(10)
  39. else:
  40. time.sleep(2)
  41. @contextmanager
  42. def eventsender_process(data, tag, sock_dir, wait=0):
  43. proc = EventSender(data, tag, wait, sock_dir=sock_dir)
  44. proc.start()
  45. try:
  46. yield
  47. finally:
  48. clean_proc(proc)