test_event.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.integration.modules.event
  5. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  6. '''
  7. # Import python libs
  8. from __future__ import absolute_import
  9. import time
  10. import threading
  11. # Import Salt Testing libs
  12. from tests.support.case import ModuleCase
  13. # Import salt libs
  14. import salt.utils.event as event
  15. # Import 3rd-party libs
  16. import pytest
  17. from salt.ext.six.moves.queue import Queue, Empty # pylint: disable=import-error,no-name-in-module
  18. @pytest.mark.windows_whitelisted
  19. class EventModuleTest(ModuleCase):
  20. def __test_event_fire_master(self):
  21. events = Queue()
  22. def get_event(events):
  23. me = event.MasterEvent(self.master_opts['sock_dir'], listen=True)
  24. events.put_nowait(
  25. me.get_event(wait=10, tag='salttest', full=False)
  26. )
  27. threading.Thread(target=get_event, args=(events,)).start()
  28. time.sleep(1) # Allow multiprocessing.Process to start
  29. ret = self.run_function(
  30. 'event.fire_master',
  31. ['event.fire_master: just test it!!!!', 'salttest']
  32. )
  33. self.assertTrue(ret)
  34. eventfired = events.get(block=True, timeout=10)
  35. self.assertIsNotNone(eventfired)
  36. self.assertIn(
  37. 'event.fire_master: just test it!!!!', eventfired['data']
  38. )
  39. ret = self.run_function(
  40. 'event.fire_master',
  41. ['event.fire_master: just test it!!!!', 'salttest-miss']
  42. )
  43. self.assertTrue(ret)
  44. with self.assertRaises(Empty):
  45. eventfired = events.get(block=True, timeout=10)
  46. def __test_event_fire(self):
  47. events = Queue()
  48. def get_event(events):
  49. me = event.MinionEvent(self.minion_opts, listen=True)
  50. events.put_nowait(
  51. me.get_event(wait=10, tag='salttest', full=False)
  52. )
  53. threading.Thread(target=get_event, args=(events,)).start()
  54. time.sleep(1) # Allow multiprocessing.Process to start
  55. ret = self.run_function(
  56. 'event.fire', ['event.fire: just test it!!!!', 'salttest']
  57. )
  58. self.assertTrue(ret)
  59. eventfired = events.get(block=True, timeout=10)
  60. self.assertIsNotNone(eventfired)
  61. self.assertIn('event.fire: just test it!!!!', eventfired)
  62. ret = self.run_function(
  63. 'event.fire', ['event.fire: just test it!!!!', 'salttest-miss']
  64. )
  65. self.assertTrue(ret)
  66. with self.assertRaises(Empty):
  67. eventfired = events.get(block=True, timeout=10)
  68. def __test_event_fire_ipc_mode_tcp(self):
  69. events = Queue()
  70. def get_event(events):
  71. me = event.MinionEvent(self.sub_minion_opts, listen=True)
  72. events.put_nowait(
  73. me.get_event(wait=10, tag='salttest', full=False)
  74. )
  75. threading.Thread(target=get_event, args=(events,)).start()
  76. time.sleep(1) # Allow multiprocessing.Process to start
  77. ret = self.run_function(
  78. 'event.fire', ['event.fire: just test it!!!!', 'salttest'],
  79. minion_tgt='sub_minion'
  80. )
  81. self.assertTrue(ret)
  82. eventfired = events.get(block=True, timeout=10)
  83. self.assertIsNotNone(eventfired)
  84. self.assertIn('event.fire: just test it!!!!', eventfired)
  85. ret = self.run_function(
  86. 'event.fire', ['event.fire: just test it!!!!', 'salttest-miss'],
  87. minion_tgt='sub_minion'
  88. )
  89. self.assertTrue(ret)
  90. with self.assertRaises(Empty):
  91. eventfired = events.get(block=True, timeout=10)