test_event.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. # -*- coding: utf-8 -*-
  2. """
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.integration.modules.event
  5. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  6. """
  7. from __future__ import absolute_import
  8. import threading
  9. import time
  10. import pytest
  11. import salt.utils.event as event
  12. from salt.ext.six.moves.queue import Empty, Queue
  13. from tests.support.case import ModuleCase
  14. @pytest.mark.windows_whitelisted
  15. @pytest.mark.usefixtures("salt_sub_minion")
  16. class EventModuleTest(ModuleCase):
  17. def __test_event_fire_master(self):
  18. events = Queue()
  19. def get_event(events):
  20. me = event.MasterEvent(self.master_opts["sock_dir"], listen=True)
  21. events.put_nowait(me.get_event(wait=10, tag="salttest", full=False))
  22. threading.Thread(target=get_event, args=(events,)).start()
  23. time.sleep(1) # Allow multiprocessing.Process to start
  24. ret = self.run_function(
  25. "event.fire_master", ["event.fire_master: just test it!!!!", "salttest"]
  26. )
  27. self.assertTrue(ret)
  28. eventfired = events.get(block=True, timeout=10)
  29. self.assertIsNotNone(eventfired)
  30. self.assertIn("event.fire_master: just test it!!!!", eventfired["data"])
  31. ret = self.run_function(
  32. "event.fire_master",
  33. ["event.fire_master: just test it!!!!", "salttest-miss"],
  34. )
  35. self.assertTrue(ret)
  36. with self.assertRaises(Empty):
  37. eventfired = events.get(block=True, timeout=10)
  38. def __test_event_fire(self):
  39. events = Queue()
  40. def get_event(events):
  41. me = event.MinionEvent(self.minion_opts, listen=True)
  42. events.put_nowait(me.get_event(wait=10, tag="salttest", full=False))
  43. threading.Thread(target=get_event, args=(events,)).start()
  44. time.sleep(1) # Allow multiprocessing.Process to start
  45. ret = self.run_function(
  46. "event.fire", ["event.fire: just test it!!!!", "salttest"]
  47. )
  48. self.assertTrue(ret)
  49. eventfired = events.get(block=True, timeout=10)
  50. self.assertIsNotNone(eventfired)
  51. self.assertIn("event.fire: just test it!!!!", eventfired)
  52. ret = self.run_function(
  53. "event.fire", ["event.fire: just test it!!!!", "salttest-miss"]
  54. )
  55. self.assertTrue(ret)
  56. with self.assertRaises(Empty):
  57. eventfired = events.get(block=True, timeout=10)
  58. def __test_event_fire_ipc_mode_tcp(self):
  59. events = Queue()
  60. def get_event(events):
  61. me = event.MinionEvent(self.sub_minion_opts, listen=True)
  62. events.put_nowait(me.get_event(wait=10, tag="salttest", full=False))
  63. threading.Thread(target=get_event, args=(events,)).start()
  64. time.sleep(1) # Allow multiprocessing.Process to start
  65. ret = self.run_function(
  66. "event.fire",
  67. ["event.fire: just test it!!!!", "salttest"],
  68. minion_tgt="sub_minion",
  69. )
  70. self.assertTrue(ret)
  71. eventfired = events.get(block=True, timeout=10)
  72. self.assertIsNotNone(eventfired)
  73. self.assertIn("event.fire: just test it!!!!", eventfired)
  74. ret = self.run_function(
  75. "event.fire",
  76. ["event.fire: just test it!!!!", "salttest-miss"],
  77. minion_tgt="sub_minion",
  78. )
  79. self.assertTrue(ret)
  80. with self.assertRaises(Empty):
  81. eventfired = events.get(block=True, timeout=10)