1
0

test_event.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.integration.modules.event
  5. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  6. '''
  7. # Import python libs
  8. from __future__ import absolute_import
  9. import time
  10. import threading
  11. # Import Salt Testing libs
  12. from tests.support.case import ModuleCase
  13. # Import salt libs
  14. import salt.utils.event as event
  15. # Import 3rd-party libs
  16. from salt.ext.six.moves.queue import Queue, Empty # pylint: disable=import-error,no-name-in-module
  17. class EventModuleTest(ModuleCase):
  18. def __test_event_fire_master(self):
  19. events = Queue()
  20. def get_event(events):
  21. me = event.MasterEvent(self.master_opts['sock_dir'], listen=True)
  22. events.put_nowait(
  23. me.get_event(wait=10, tag='salttest', full=False)
  24. )
  25. threading.Thread(target=get_event, args=(events,)).start()
  26. time.sleep(1) # Allow multiprocessing.Process to start
  27. ret = self.run_function(
  28. 'event.fire_master',
  29. ['event.fire_master: just test it!!!!', 'salttest']
  30. )
  31. self.assertTrue(ret)
  32. eventfired = events.get(block=True, timeout=10)
  33. self.assertIsNotNone(eventfired)
  34. self.assertIn(
  35. 'event.fire_master: just test it!!!!', eventfired['data']
  36. )
  37. ret = self.run_function(
  38. 'event.fire_master',
  39. ['event.fire_master: just test it!!!!', 'salttest-miss']
  40. )
  41. self.assertTrue(ret)
  42. with self.assertRaises(Empty):
  43. eventfired = events.get(block=True, timeout=10)
  44. def __test_event_fire(self):
  45. events = Queue()
  46. def get_event(events):
  47. me = event.MinionEvent(self.minion_opts, listen=True)
  48. events.put_nowait(
  49. me.get_event(wait=10, tag='salttest', full=False)
  50. )
  51. threading.Thread(target=get_event, args=(events,)).start()
  52. time.sleep(1) # Allow multiprocessing.Process to start
  53. ret = self.run_function(
  54. 'event.fire', ['event.fire: just test it!!!!', 'salttest']
  55. )
  56. self.assertTrue(ret)
  57. eventfired = events.get(block=True, timeout=10)
  58. self.assertIsNotNone(eventfired)
  59. self.assertIn('event.fire: just test it!!!!', eventfired)
  60. ret = self.run_function(
  61. 'event.fire', ['event.fire: just test it!!!!', 'salttest-miss']
  62. )
  63. self.assertTrue(ret)
  64. with self.assertRaises(Empty):
  65. eventfired = events.get(block=True, timeout=10)
  66. def __test_event_fire_ipc_mode_tcp(self):
  67. events = Queue()
  68. def get_event(events):
  69. me = event.MinionEvent(self.sub_minion_opts, listen=True)
  70. events.put_nowait(
  71. me.get_event(wait=10, tag='salttest', full=False)
  72. )
  73. threading.Thread(target=get_event, args=(events,)).start()
  74. time.sleep(1) # Allow multiprocessing.Process to start
  75. ret = self.run_function(
  76. 'event.fire', ['event.fire: just test it!!!!', 'salttest'],
  77. minion_tgt='sub_minion'
  78. )
  79. self.assertTrue(ret)
  80. eventfired = events.get(block=True, timeout=10)
  81. self.assertIsNotNone(eventfired)
  82. self.assertIn('event.fire: just test it!!!!', eventfired)
  83. ret = self.run_function(
  84. 'event.fire', ['event.fire: just test it!!!!', 'salttest-miss'],
  85. minion_tgt='sub_minion'
  86. )
  87. self.assertTrue(ret)
  88. with self.assertRaises(Empty):
  89. eventfired = events.get(block=True, timeout=10)