1
0

test_process.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. # -*- coding: utf-8 -*-
  2. # Import python libs
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. import io
  5. import os
  6. import sys
  7. import threading
  8. import time
  9. import signal
  10. import multiprocessing
  11. import functools
  12. # Import Salt Testing libs
  13. from tests.support.unit import TestCase, skipIf
  14. from tests.support.mock import (
  15. patch,
  16. NO_MOCK,
  17. NO_MOCK_REASON
  18. )
  19. # Import salt libs
  20. import salt.utils.platform
  21. import salt.utils.process
  22. # Import 3rd-party libs
  23. from salt.ext import six
  24. from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
  25. import psutil
  26. def die(func):
  27. '''
  28. Add proc title
  29. '''
  30. @functools.wraps(func)
  31. def wrapper(self):
  32. # Strip off the "test_" from the function name
  33. name = func.__name__[5:]
  34. def _die():
  35. salt.utils.process.appendproctitle('test_{0}'.format(name))
  36. setattr(self, 'die_' + name, _die)
  37. return wrapper
  38. def incr(func):
  39. '''
  40. Increment counter
  41. '''
  42. @functools.wraps(func)
  43. def wrapper(self):
  44. # Strip off the "test_" from the function name
  45. name = func.__name__[5:]
  46. def _incr(counter, num):
  47. salt.utils.process.appendproctitle('test_{0}'.format(name))
  48. for _ in range(0, num):
  49. counter.value += 1
  50. setattr(self, 'incr_' + name, _incr)
  51. return wrapper
  52. def spin(func):
  53. '''
  54. Spin indefinitely
  55. '''
  56. @functools.wraps(func)
  57. def wrapper(self):
  58. # Strip off the "test_" from the function name
  59. name = func.__name__[5:]
  60. def _spin():
  61. salt.utils.process.appendproctitle('test_{0}'.format(name))
  62. while True:
  63. time.sleep(1)
  64. setattr(self, 'spin_' + name, _spin)
  65. return wrapper
  66. class TestProcessManager(TestCase):
  67. @spin
  68. def test_basic(self):
  69. '''
  70. Make sure that the process is alive 2s later
  71. '''
  72. process_manager = salt.utils.process.ProcessManager()
  73. process_manager.add_process(self.spin_basic)
  74. initial_pid = next(six.iterkeys(process_manager._process_map))
  75. time.sleep(2)
  76. process_manager.check_children()
  77. try:
  78. assert initial_pid == next(six.iterkeys(process_manager._process_map))
  79. finally:
  80. process_manager.stop_restarting()
  81. process_manager.kill_children()
  82. time.sleep(0.5)
  83. # Are there child processes still running?
  84. if process_manager._process_map.keys():
  85. process_manager.send_signal_to_processes(signal.SIGKILL)
  86. process_manager.stop_restarting()
  87. process_manager.kill_children()
  88. @spin
  89. def test_kill(self):
  90. process_manager = salt.utils.process.ProcessManager()
  91. process_manager.add_process(self.spin_kill)
  92. initial_pid = next(six.iterkeys(process_manager._process_map))
  93. # kill the child
  94. if salt.utils.platform.is_windows():
  95. os.kill(initial_pid, signal.SIGTERM)
  96. else:
  97. os.kill(initial_pid, signal.SIGKILL)
  98. # give the OS time to give the signal...
  99. time.sleep(0.1)
  100. process_manager.check_children()
  101. try:
  102. assert initial_pid != next(six.iterkeys(process_manager._process_map))
  103. finally:
  104. process_manager.stop_restarting()
  105. process_manager.kill_children()
  106. time.sleep(0.5)
  107. # Are there child processes still running?
  108. if process_manager._process_map.keys():
  109. process_manager.send_signal_to_processes(signal.SIGKILL)
  110. process_manager.stop_restarting()
  111. process_manager.kill_children()
  112. @die
  113. def test_restarting(self):
  114. '''
  115. Make sure that the process is alive 2s later
  116. '''
  117. process_manager = salt.utils.process.ProcessManager()
  118. process_manager.add_process(self.die_restarting)
  119. initial_pid = next(six.iterkeys(process_manager._process_map))
  120. time.sleep(2)
  121. process_manager.check_children()
  122. try:
  123. assert initial_pid != next(six.iterkeys(process_manager._process_map))
  124. finally:
  125. process_manager.stop_restarting()
  126. process_manager.kill_children()
  127. time.sleep(0.5)
  128. # Are there child processes still running?
  129. if process_manager._process_map.keys():
  130. process_manager.send_signal_to_processes(signal.SIGKILL)
  131. process_manager.stop_restarting()
  132. process_manager.kill_children()
  133. @skipIf(sys.version_info < (2, 7), 'Needs > Py 2.7 due to bug in stdlib')
  134. @incr
  135. def test_counter(self):
  136. counter = multiprocessing.Value('i', 0)
  137. process_manager = salt.utils.process.ProcessManager()
  138. process_manager.add_process(self.incr_counter, args=(counter, 2))
  139. time.sleep(1)
  140. process_manager.check_children()
  141. time.sleep(1)
  142. # we should have had 2 processes go at it
  143. try:
  144. assert counter.value == 4
  145. finally:
  146. process_manager.stop_restarting()
  147. process_manager.kill_children()
  148. time.sleep(0.5)
  149. # Are there child processes still running?
  150. if process_manager._process_map.keys():
  151. process_manager.send_signal_to_processes(signal.SIGKILL)
  152. process_manager.stop_restarting()
  153. process_manager.kill_children()
  154. class TestThreadPool(TestCase):
  155. def test_basic(self):
  156. '''
  157. Make sure the threadpool can do things
  158. '''
  159. def incr_counter(counter):
  160. counter.value += 1
  161. counter = multiprocessing.Value('i', 0)
  162. pool = salt.utils.process.ThreadPool()
  163. sent = pool.fire_async(incr_counter, args=(counter,))
  164. self.assertTrue(sent)
  165. time.sleep(1) # Sleep to let the threads do things
  166. self.assertEqual(counter.value, 1)
  167. self.assertEqual(pool._job_queue.qsize(), 0)
  168. def test_full_queue(self):
  169. '''
  170. Make sure that a full threadpool acts as we expect
  171. '''
  172. def incr_counter(counter):
  173. counter.value += 1
  174. counter = multiprocessing.Value('i', 0)
  175. # Create a pool with no workers and 1 queue size
  176. pool = salt.utils.process.ThreadPool(0, 1)
  177. # make sure we can put the one item in
  178. sent = pool.fire_async(incr_counter, args=(counter,))
  179. self.assertTrue(sent)
  180. # make sure we can't put more in
  181. sent = pool.fire_async(incr_counter, args=(counter,))
  182. self.assertFalse(sent)
  183. time.sleep(1) # Sleep to let the threads do things
  184. # make sure no one updated the counter
  185. self.assertEqual(counter.value, 0)
  186. # make sure the queue is still full
  187. self.assertEqual(pool._job_queue.qsize(), 1)
  188. class TestProcess(TestCase):
  189. @skipIf(NO_MOCK, NO_MOCK_REASON)
  190. def test_daemonize_if(self):
  191. # pylint: disable=assignment-from-none
  192. with patch('sys.argv', ['salt-call']):
  193. ret = salt.utils.process.daemonize_if({})
  194. self.assertEqual(None, ret)
  195. ret = salt.utils.process.daemonize_if({'multiprocessing': False})
  196. self.assertEqual(None, ret)
  197. with patch('sys.platform', 'win'):
  198. ret = salt.utils.process.daemonize_if({})
  199. self.assertEqual(None, ret)
  200. with patch('salt.utils.process.daemonize'), \
  201. patch('sys.platform', 'linux2'):
  202. salt.utils.process.daemonize_if({})
  203. self.assertTrue(salt.utils.process.daemonize.called)
  204. # pylint: enable=assignment-from-none
  205. class TestSignalHandlingMultiprocessingProcess(TestCase):
  206. @classmethod
  207. def Process(cls, pid):
  208. raise psutil.NoSuchProcess(pid)
  209. @classmethod
  210. def target(cls):
  211. os.kill(os.getpid(), signal.SIGTERM)
  212. @classmethod
  213. def children(cls, *args, **kwargs):
  214. raise psutil.NoSuchProcess(1)
  215. @skipIf(NO_MOCK, NO_MOCK_REASON)
  216. def test_process_does_not_exist(self):
  217. try:
  218. with patch('psutil.Process', self.Process):
  219. proc = salt.utils.process.SignalHandlingMultiprocessingProcess(target=self.target)
  220. proc.start()
  221. except psutil.NoSuchProcess:
  222. assert False, "psutil.NoSuchProcess raised"
  223. @skipIf(NO_MOCK, NO_MOCK_REASON)
  224. def test_process_children_do_not_exist(self):
  225. try:
  226. with patch('psutil.Process.children', self.children):
  227. proc = salt.utils.process.SignalHandlingMultiprocessingProcess(target=self.target)
  228. proc.start()
  229. except psutil.NoSuchProcess:
  230. assert False, "psutil.NoSuchProcess raised"
  231. @staticmethod
  232. def run_forever_sub_target(evt):
  233. 'Used by run_forever_target to create a sub-process'
  234. while not evt.is_set():
  235. time.sleep(1)
  236. @staticmethod
  237. def run_forever_target(sub_target, evt):
  238. 'A target that will run forever or until an event is set'
  239. p = multiprocessing.Process(target=sub_target, args=(evt,))
  240. p.start()
  241. p.join()
  242. @staticmethod
  243. def kill_target_sub_proc():
  244. pid = os.fork()
  245. if pid == 0:
  246. return
  247. pid = os.fork()
  248. if pid == 0:
  249. return
  250. time.sleep(.1)
  251. try:
  252. os.kill(os.getpid(), signal.SIGINT)
  253. except KeyboardInterrupt:
  254. pass
  255. @skipIf(sys.platform.startswith('win'), 'No os.fork on Windows')
  256. def test_signal_processing_regression_test(self):
  257. evt = multiprocessing.Event()
  258. sh_proc = salt.utils.process.SignalHandlingMultiprocessingProcess(
  259. target=self.run_forever_target,
  260. args=(self.run_forever_sub_target, evt)
  261. )
  262. sh_proc.start()
  263. proc = multiprocessing.Process(target=self.kill_target_sub_proc)
  264. proc.start()
  265. proc.join()
  266. # When the bug exists, the kill_target_sub_proc signal will kill both
  267. # processes. sh_proc will be alive if the bug is fixed
  268. try:
  269. assert sh_proc.is_alive()
  270. finally:
  271. evt.set()
  272. sh_proc.join()
  273. @staticmethod
  274. def no_op_target():
  275. pass
  276. @skipIf(NO_MOCK, NO_MOCK_REASON)
  277. def test_signal_processing_test_after_fork_called(self):
  278. 'Validate MultiprocessingProcess and sub classes call after fork methods'
  279. evt = multiprocessing.Event()
  280. sig_to_mock = 'salt.utils.process.SignalHandlingMultiprocessingProcess._setup_signals'
  281. log_to_mock = 'salt.utils.process.MultiprocessingProcess._setup_process_logging'
  282. with patch(sig_to_mock) as ma, patch(log_to_mock) as mb:
  283. self.sh_proc = salt.utils.process.SignalHandlingMultiprocessingProcess(target=self.no_op_target)
  284. self.sh_proc.run()
  285. ma.assert_called()
  286. mb.assert_called()
  287. @skipIf(NO_MOCK, NO_MOCK_REASON)
  288. def test_signal_processing_test_final_methods_called(self):
  289. 'Validate MultiprocessingProcess and sub classes call finalize methods'
  290. evt = multiprocessing.Event()
  291. teardown_to_mock = 'salt.log.setup.shutdown_multiprocessing_logging'
  292. log_to_mock = 'salt.utils.process.MultiprocessingProcess._setup_process_logging'
  293. sig_to_mock = 'salt.utils.process.SignalHandlingMultiprocessingProcess._setup_signals'
  294. # Mock _setup_signals so we do not register one for this process.
  295. with patch(sig_to_mock):
  296. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  297. self.sh_proc = salt.utils.process.SignalHandlingMultiprocessingProcess(target=self.no_op_target)
  298. self.sh_proc.run()
  299. ma.assert_called()
  300. mb.assert_called()
  301. @staticmethod
  302. def pid_setting_target(sub_target, val, evt):
  303. val.value = os.getpid()
  304. p = multiprocessing.Process(target=sub_target, args=(evt,))
  305. p.start()
  306. p.join()
  307. @skipIf(sys.platform.startswith('win'), 'Required signals not supported on windows')
  308. def test_signal_processing_handle_signals_called(self):
  309. 'Validate SignalHandlingMultiprocessingProcess handles signals'
  310. # Gloobal event to stop all processes we're creating
  311. evt = multiprocessing.Event()
  312. # Create a process to test signal handler
  313. val = multiprocessing.Value('i', 0)
  314. proc = salt.utils.process.SignalHandlingMultiprocessingProcess(
  315. target=self.pid_setting_target,
  316. args=(self.run_forever_sub_target, val, evt),
  317. )
  318. proc.start()
  319. # Create a second process that should not respond to SIGINT or SIGTERM
  320. proc2 = multiprocessing.Process(
  321. target=self.run_forever_target,
  322. args=(self.run_forever_sub_target, evt),
  323. )
  324. proc2.start()
  325. # Wait for the sub process to set it's pid
  326. while not val.value:
  327. time.sleep(.3)
  328. assert not proc.signal_handled()
  329. # Send a signal that should get handled by the subprocess
  330. os.kill(val.value, signal.SIGTERM)
  331. # wait up to 10 seconds for signal handler:
  332. start = time.time()
  333. while time.time() - start < 10:
  334. if proc.signal_handled():
  335. break
  336. time.sleep(.3)
  337. try:
  338. # Allow some time for the signal handler to do it's thing
  339. assert proc.signal_handled()
  340. # Reap the signaled process
  341. proc.join(1)
  342. assert proc2.is_alive()
  343. finally:
  344. evt.set()
  345. proc2.join(30)
  346. proc.join(30)
  347. class TestDup2(TestCase):
  348. def test_dup2_no_fileno(self):
  349. 'The dup2 method does not fail on streams without fileno support'
  350. f1 = io.StringIO("some initial text data")
  351. f2 = io.StringIO("some initial other text data")
  352. with self.assertRaises(io.UnsupportedOperation):
  353. f1.fileno()
  354. with patch('os.dup2') as dup_mock:
  355. try:
  356. salt.utils.process.dup2(f1, f2)
  357. except io.UnsupportedOperation:
  358. assert False, 'io.UnsupportedOperation was raised'
  359. assert not dup_mock.called
  360. def null_target():
  361. pass
  362. def event_target(event):
  363. while True:
  364. if event.wait(5):
  365. break
  366. class TestProcessList(TestCase):
  367. @staticmethod
  368. def wait_for_proc(proc, timeout=10):
  369. start = time.time()
  370. while proc.is_alive():
  371. if time.time() - start > timeout:
  372. raise Exception("Process did not finishe before timeout")
  373. time.sleep(.3)
  374. def test_process_list_process(self):
  375. plist = salt.utils.process.SubprocessList()
  376. proc = multiprocessing.Process(target=null_target)
  377. proc.start()
  378. plist.add(proc)
  379. assert proc in plist.processes
  380. self.wait_for_proc(proc)
  381. assert not proc.is_alive()
  382. plist.cleanup()
  383. assert proc not in plist.processes
  384. def test_process_list_thread(self):
  385. plist = salt.utils.process.SubprocessList()
  386. thread = threading.Thread(target=null_target)
  387. thread.start()
  388. plist.add(thread)
  389. assert thread in plist.processes
  390. self.wait_for_proc(thread)
  391. assert not thread.is_alive()
  392. plist.cleanup()
  393. assert thread not in plist.processes
  394. def test_process_list_cleanup(self):
  395. plist = salt.utils.process.SubprocessList()
  396. event = multiprocessing.Event()
  397. proc = multiprocessing.Process(target=event_target, args=[event])
  398. proc.start()
  399. plist.add(proc)
  400. assert proc in plist.processes
  401. plist.cleanup()
  402. event.set()
  403. assert proc in plist.processes
  404. self.wait_for_proc(proc)
  405. assert not proc.is_alive()
  406. plist.cleanup()
  407. assert proc not in plist.processes