1
0

test_process.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. # -*- coding: utf-8 -*-
  2. # Import python libs
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. import os
  5. import sys
  6. import time
  7. import signal
  8. import multiprocessing
  9. import functools
  10. # Import Salt Testing libs
  11. from tests.support.unit import TestCase, skipIf
  12. from tests.support.mock import (
  13. patch,
  14. NO_MOCK,
  15. NO_MOCK_REASON
  16. )
  17. # Import salt libs
  18. import salt.utils.platform
  19. import salt.utils.process
  20. # Import 3rd-party libs
  21. from salt.ext import six
  22. from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
  23. def die(func):
  24. '''
  25. Add proc title
  26. '''
  27. @functools.wraps(func)
  28. def wrapper(self):
  29. # Strip off the "test_" from the function name
  30. name = func.__name__[5:]
  31. def _die():
  32. salt.utils.process.appendproctitle('test_{0}'.format(name))
  33. setattr(self, 'die_' + name, _die)
  34. return wrapper
  35. def incr(func):
  36. '''
  37. Increment counter
  38. '''
  39. @functools.wraps(func)
  40. def wrapper(self):
  41. # Strip off the "test_" from the function name
  42. name = func.__name__[5:]
  43. def _incr(counter, num):
  44. salt.utils.process.appendproctitle('test_{0}'.format(name))
  45. for _ in range(0, num):
  46. counter.value += 1
  47. setattr(self, 'incr_' + name, _incr)
  48. return wrapper
  49. def spin(func):
  50. '''
  51. Spin indefinitely
  52. '''
  53. @functools.wraps(func)
  54. def wrapper(self):
  55. # Strip off the "test_" from the function name
  56. name = func.__name__[5:]
  57. def _spin():
  58. salt.utils.process.appendproctitle('test_{0}'.format(name))
  59. while True:
  60. time.sleep(1)
  61. setattr(self, 'spin_' + name, _spin)
  62. return wrapper
  63. class TestProcessManager(TestCase):
  64. @spin
  65. def test_basic(self):
  66. '''
  67. Make sure that the process is alive 2s later
  68. '''
  69. process_manager = salt.utils.process.ProcessManager()
  70. process_manager.add_process(self.spin_basic)
  71. initial_pid = next(six.iterkeys(process_manager._process_map))
  72. time.sleep(2)
  73. process_manager.check_children()
  74. try:
  75. assert initial_pid == next(six.iterkeys(process_manager._process_map))
  76. finally:
  77. process_manager.stop_restarting()
  78. process_manager.kill_children()
  79. time.sleep(0.5)
  80. # Are there child processes still running?
  81. if process_manager._process_map.keys():
  82. process_manager.send_signal_to_processes(signal.SIGKILL)
  83. process_manager.stop_restarting()
  84. process_manager.kill_children()
  85. @spin
  86. def test_kill(self):
  87. process_manager = salt.utils.process.ProcessManager()
  88. process_manager.add_process(self.spin_kill)
  89. initial_pid = next(six.iterkeys(process_manager._process_map))
  90. # kill the child
  91. if salt.utils.platform.is_windows():
  92. os.kill(initial_pid, signal.SIGTERM)
  93. else:
  94. os.kill(initial_pid, signal.SIGKILL)
  95. # give the OS time to give the signal...
  96. time.sleep(0.1)
  97. process_manager.check_children()
  98. try:
  99. assert initial_pid != next(six.iterkeys(process_manager._process_map))
  100. finally:
  101. process_manager.stop_restarting()
  102. process_manager.kill_children()
  103. time.sleep(0.5)
  104. # Are there child processes still running?
  105. if process_manager._process_map.keys():
  106. process_manager.send_signal_to_processes(signal.SIGKILL)
  107. process_manager.stop_restarting()
  108. process_manager.kill_children()
  109. @die
  110. def test_restarting(self):
  111. '''
  112. Make sure that the process is alive 2s later
  113. '''
  114. process_manager = salt.utils.process.ProcessManager()
  115. process_manager.add_process(self.die_restarting)
  116. initial_pid = next(six.iterkeys(process_manager._process_map))
  117. time.sleep(2)
  118. process_manager.check_children()
  119. try:
  120. assert initial_pid != next(six.iterkeys(process_manager._process_map))
  121. finally:
  122. process_manager.stop_restarting()
  123. process_manager.kill_children()
  124. time.sleep(0.5)
  125. # Are there child processes still running?
  126. if process_manager._process_map.keys():
  127. process_manager.send_signal_to_processes(signal.SIGKILL)
  128. process_manager.stop_restarting()
  129. process_manager.kill_children()
  130. @skipIf(sys.version_info < (2, 7), 'Needs > Py 2.7 due to bug in stdlib')
  131. @incr
  132. def test_counter(self):
  133. counter = multiprocessing.Value('i', 0)
  134. process_manager = salt.utils.process.ProcessManager()
  135. process_manager.add_process(self.incr_counter, args=(counter, 2))
  136. time.sleep(1)
  137. process_manager.check_children()
  138. time.sleep(1)
  139. # we should have had 2 processes go at it
  140. try:
  141. assert counter.value == 4
  142. finally:
  143. process_manager.stop_restarting()
  144. process_manager.kill_children()
  145. time.sleep(0.5)
  146. # Are there child processes still running?
  147. if process_manager._process_map.keys():
  148. process_manager.send_signal_to_processes(signal.SIGKILL)
  149. process_manager.stop_restarting()
  150. process_manager.kill_children()
  151. class TestThreadPool(TestCase):
  152. def test_basic(self):
  153. '''
  154. Make sure the threadpool can do things
  155. '''
  156. def incr_counter(counter):
  157. counter.value += 1
  158. counter = multiprocessing.Value('i', 0)
  159. pool = salt.utils.process.ThreadPool()
  160. sent = pool.fire_async(incr_counter, args=(counter,))
  161. self.assertTrue(sent)
  162. time.sleep(1) # Sleep to let the threads do things
  163. self.assertEqual(counter.value, 1)
  164. self.assertEqual(pool._job_queue.qsize(), 0)
  165. def test_full_queue(self):
  166. '''
  167. Make sure that a full threadpool acts as we expect
  168. '''
  169. def incr_counter(counter):
  170. counter.value += 1
  171. counter = multiprocessing.Value('i', 0)
  172. # Create a pool with no workers and 1 queue size
  173. pool = salt.utils.process.ThreadPool(0, 1)
  174. # make sure we can put the one item in
  175. sent = pool.fire_async(incr_counter, args=(counter,))
  176. self.assertTrue(sent)
  177. # make sure we can't put more in
  178. sent = pool.fire_async(incr_counter, args=(counter,))
  179. self.assertFalse(sent)
  180. time.sleep(1) # Sleep to let the threads do things
  181. # make sure no one updated the counter
  182. self.assertEqual(counter.value, 0)
  183. # make sure the queue is still full
  184. self.assertEqual(pool._job_queue.qsize(), 1)
  185. class TestProcess(TestCase):
  186. @skipIf(NO_MOCK, NO_MOCK_REASON)
  187. def test_daemonize_if(self):
  188. # pylint: disable=assignment-from-none
  189. with patch('sys.argv', ['salt-call']):
  190. ret = salt.utils.process.daemonize_if({})
  191. self.assertEqual(None, ret)
  192. ret = salt.utils.process.daemonize_if({'multiprocessing': False})
  193. self.assertEqual(None, ret)
  194. with patch('sys.platform', 'win'):
  195. ret = salt.utils.process.daemonize_if({})
  196. self.assertEqual(None, ret)
  197. with patch('salt.utils.process.daemonize'), \
  198. patch('sys.platform', 'linux2'):
  199. salt.utils.process.daemonize_if({})
  200. self.assertTrue(salt.utils.process.daemonize.called)
  201. # pylint: enable=assignment-from-none