1
0

test_process.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. # -*- coding: utf-8 -*-
  2. # Import python libs
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. import io
  5. import os
  6. import sys
  7. import threading
  8. import time
  9. import signal
  10. import multiprocessing
  11. import functools
  12. import datetime
  13. import warnings
  14. # Import Salt Testing libs
  15. from tests.support.unit import TestCase, skipIf
  16. from tests.support.mock import (
  17. patch,
  18. )
  19. # Import salt libs
  20. import salt.utils.platform
  21. import salt.utils.process
  22. from salt.utils.versions import warn_until_date
  23. # Import 3rd-party libs
  24. from salt.ext import six
  25. from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
  26. import psutil
  27. def die(func):
  28. '''
  29. Add proc title
  30. '''
  31. @functools.wraps(func)
  32. def wrapper(self):
  33. # Strip off the "test_" from the function name
  34. name = func.__name__[5:]
  35. def _die():
  36. salt.utils.process.appendproctitle('test_{0}'.format(name))
  37. attrname = 'die_' + name
  38. setattr(self, attrname, _die)
  39. self.addCleanup(delattr, self, attrname)
  40. return wrapper
  41. def incr(func):
  42. '''
  43. Increment counter
  44. '''
  45. @functools.wraps(func)
  46. def wrapper(self):
  47. # Strip off the "test_" from the function name
  48. name = func.__name__[5:]
  49. def _incr(counter, num):
  50. salt.utils.process.appendproctitle('test_{0}'.format(name))
  51. for _ in range(0, num):
  52. counter.value += 1
  53. attrname = 'incr_' + name
  54. setattr(self, attrname, _incr)
  55. self.addCleanup(delattr, self, attrname)
  56. return wrapper
  57. def spin(func):
  58. '''
  59. Spin indefinitely
  60. '''
  61. @functools.wraps(func)
  62. def wrapper(self):
  63. # Strip off the "test_" from the function name
  64. name = func.__name__[5:]
  65. def _spin():
  66. salt.utils.process.appendproctitle('test_{0}'.format(name))
  67. while True:
  68. time.sleep(1)
  69. attrname = 'spin_' + name
  70. setattr(self, attrname, _spin)
  71. self.addCleanup(delattr, self, attrname)
  72. return wrapper
  73. class TestProcessManager(TestCase):
  74. @spin
  75. def test_basic(self):
  76. '''
  77. Make sure that the process is alive 2s later
  78. '''
  79. process_manager = salt.utils.process.ProcessManager()
  80. process_manager.add_process(self.spin_basic)
  81. initial_pid = next(six.iterkeys(process_manager._process_map))
  82. time.sleep(2)
  83. process_manager.check_children()
  84. try:
  85. assert initial_pid == next(six.iterkeys(process_manager._process_map))
  86. finally:
  87. process_manager.stop_restarting()
  88. process_manager.kill_children()
  89. time.sleep(0.5)
  90. # Are there child processes still running?
  91. if process_manager._process_map.keys():
  92. process_manager.send_signal_to_processes(signal.SIGKILL)
  93. process_manager.stop_restarting()
  94. process_manager.kill_children()
  95. @spin
  96. def test_kill(self):
  97. process_manager = salt.utils.process.ProcessManager()
  98. process_manager.add_process(self.spin_kill)
  99. initial_pid = next(six.iterkeys(process_manager._process_map))
  100. # kill the child
  101. if salt.utils.platform.is_windows():
  102. os.kill(initial_pid, signal.SIGTERM)
  103. else:
  104. os.kill(initial_pid, signal.SIGKILL)
  105. # give the OS time to give the signal...
  106. time.sleep(0.1)
  107. process_manager.check_children()
  108. try:
  109. assert initial_pid != next(six.iterkeys(process_manager._process_map))
  110. finally:
  111. process_manager.stop_restarting()
  112. process_manager.kill_children()
  113. time.sleep(0.5)
  114. # Are there child processes still running?
  115. if process_manager._process_map.keys():
  116. process_manager.send_signal_to_processes(signal.SIGKILL)
  117. process_manager.stop_restarting()
  118. process_manager.kill_children()
  119. @die
  120. def test_restarting(self):
  121. '''
  122. Make sure that the process is alive 2s later
  123. '''
  124. process_manager = salt.utils.process.ProcessManager()
  125. process_manager.add_process(self.die_restarting)
  126. initial_pid = next(six.iterkeys(process_manager._process_map))
  127. time.sleep(2)
  128. process_manager.check_children()
  129. try:
  130. assert initial_pid != next(six.iterkeys(process_manager._process_map))
  131. finally:
  132. process_manager.stop_restarting()
  133. process_manager.kill_children()
  134. time.sleep(0.5)
  135. # Are there child processes still running?
  136. if process_manager._process_map.keys():
  137. process_manager.send_signal_to_processes(signal.SIGKILL)
  138. process_manager.stop_restarting()
  139. process_manager.kill_children()
  140. @skipIf(sys.version_info < (2, 7), 'Needs > Py 2.7 due to bug in stdlib')
  141. @incr
  142. def test_counter(self):
  143. counter = multiprocessing.Value('i', 0)
  144. process_manager = salt.utils.process.ProcessManager()
  145. process_manager.add_process(self.incr_counter, args=(counter, 2))
  146. time.sleep(1)
  147. process_manager.check_children()
  148. time.sleep(1)
  149. # we should have had 2 processes go at it
  150. try:
  151. assert counter.value == 4
  152. finally:
  153. process_manager.stop_restarting()
  154. process_manager.kill_children()
  155. time.sleep(0.5)
  156. # Are there child processes still running?
  157. if process_manager._process_map.keys():
  158. process_manager.send_signal_to_processes(signal.SIGKILL)
  159. process_manager.stop_restarting()
  160. process_manager.kill_children()
  161. class TestThreadPool(TestCase):
  162. def test_basic(self):
  163. '''
  164. Make sure the threadpool can do things
  165. '''
  166. def incr_counter(counter):
  167. counter.value += 1
  168. counter = multiprocessing.Value('i', 0)
  169. pool = salt.utils.process.ThreadPool()
  170. sent = pool.fire_async(incr_counter, args=(counter,))
  171. self.assertTrue(sent)
  172. time.sleep(1) # Sleep to let the threads do things
  173. self.assertEqual(counter.value, 1)
  174. self.assertEqual(pool._job_queue.qsize(), 0)
  175. def test_full_queue(self):
  176. '''
  177. Make sure that a full threadpool acts as we expect
  178. '''
  179. def incr_counter(counter):
  180. counter.value += 1
  181. counter = multiprocessing.Value('i', 0)
  182. # Create a pool with no workers and 1 queue size
  183. pool = salt.utils.process.ThreadPool(0, 1)
  184. # make sure we can put the one item in
  185. sent = pool.fire_async(incr_counter, args=(counter,))
  186. self.assertTrue(sent)
  187. # make sure we can't put more in
  188. sent = pool.fire_async(incr_counter, args=(counter,))
  189. self.assertFalse(sent)
  190. time.sleep(1) # Sleep to let the threads do things
  191. # make sure no one updated the counter
  192. self.assertEqual(counter.value, 0)
  193. # make sure the queue is still full
  194. self.assertEqual(pool._job_queue.qsize(), 1)
  195. class TestProcess(TestCase):
  196. def test_daemonize_if(self):
  197. # pylint: disable=assignment-from-none
  198. with patch('sys.argv', ['salt-call']):
  199. ret = salt.utils.process.daemonize_if({})
  200. self.assertEqual(None, ret)
  201. ret = salt.utils.process.daemonize_if({'multiprocessing': False})
  202. self.assertEqual(None, ret)
  203. with patch('sys.platform', 'win'):
  204. ret = salt.utils.process.daemonize_if({})
  205. self.assertEqual(None, ret)
  206. with patch('salt.utils.process.daemonize'), \
  207. patch('sys.platform', 'linux2'):
  208. salt.utils.process.daemonize_if({})
  209. self.assertTrue(salt.utils.process.daemonize.called)
  210. # pylint: enable=assignment-from-none
  211. class TestProcessCallbacks(TestCase):
  212. @staticmethod
  213. def process_target(evt):
  214. evt.set()
  215. def test_callbacks(self):
  216. 'Validate Process call after fork and finalize methods'
  217. teardown_to_mock = 'salt.log.setup.shutdown_multiprocessing_logging'
  218. log_to_mock = 'salt.utils.process.Process._setup_process_logging'
  219. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  220. evt = multiprocessing.Event()
  221. proc = salt.utils.process.Process(target=self.process_target, args=(evt,))
  222. proc.run()
  223. assert evt.is_set()
  224. mb.assert_called()
  225. ma.assert_called()
  226. def test_callbacks_called_when_run_overriden(self):
  227. 'Validate Process sub classes call after fork and finalize methods when run is overridden'
  228. class MyProcess(salt.utils.process.Process):
  229. def __init__(self):
  230. super(MyProcess, self).__init__()
  231. self.evt = multiprocessing.Event()
  232. def run(self):
  233. self.evt.set()
  234. teardown_to_mock = 'salt.log.setup.shutdown_multiprocessing_logging'
  235. log_to_mock = 'salt.utils.process.Process._setup_process_logging'
  236. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  237. proc = MyProcess()
  238. proc.run()
  239. assert proc.evt.is_set()
  240. ma.assert_called()
  241. mb.assert_called()
  242. class TestSignalHandlingProcess(TestCase):
  243. @classmethod
  244. def Process(cls, pid):
  245. raise psutil.NoSuchProcess(pid)
  246. @classmethod
  247. def target(cls):
  248. os.kill(os.getpid(), signal.SIGTERM)
  249. @classmethod
  250. def children(cls, *args, **kwargs):
  251. raise psutil.NoSuchProcess(1)
  252. def test_process_does_not_exist(self):
  253. try:
  254. with patch('psutil.Process', self.Process):
  255. proc = salt.utils.process.SignalHandlingProcess(target=self.target)
  256. proc.start()
  257. except psutil.NoSuchProcess:
  258. assert False, "psutil.NoSuchProcess raised"
  259. def test_process_children_do_not_exist(self):
  260. try:
  261. with patch('psutil.Process.children', self.children):
  262. proc = salt.utils.process.SignalHandlingProcess(target=self.target)
  263. proc.start()
  264. except psutil.NoSuchProcess:
  265. assert False, "psutil.NoSuchProcess raised"
  266. @staticmethod
  267. def run_forever_sub_target(evt):
  268. 'Used by run_forever_target to create a sub-process'
  269. while not evt.is_set():
  270. time.sleep(1)
  271. @staticmethod
  272. def run_forever_target(sub_target, evt):
  273. 'A target that will run forever or until an event is set'
  274. p = multiprocessing.Process(target=sub_target, args=(evt,))
  275. p.start()
  276. p.join()
  277. @staticmethod
  278. def kill_target_sub_proc():
  279. pid = os.fork()
  280. if pid == 0:
  281. return
  282. pid = os.fork()
  283. if pid == 0:
  284. return
  285. time.sleep(.1)
  286. try:
  287. os.kill(os.getpid(), signal.SIGINT)
  288. except KeyboardInterrupt:
  289. pass
  290. @skipIf(sys.platform.startswith('win'), 'No os.fork on Windows')
  291. def test_signal_processing_regression_test(self):
  292. evt = multiprocessing.Event()
  293. sh_proc = salt.utils.process.SignalHandlingProcess(
  294. target=self.run_forever_target,
  295. args=(self.run_forever_sub_target, evt)
  296. )
  297. sh_proc.start()
  298. proc = multiprocessing.Process(target=self.kill_target_sub_proc)
  299. proc.start()
  300. proc.join()
  301. # When the bug exists, the kill_target_sub_proc signal will kill both
  302. # processes. sh_proc will be alive if the bug is fixed
  303. try:
  304. assert sh_proc.is_alive()
  305. finally:
  306. evt.set()
  307. sh_proc.join()
  308. @staticmethod
  309. def no_op_target():
  310. pass
  311. @staticmethod
  312. def pid_setting_target(sub_target, val, evt):
  313. val.value = os.getpid()
  314. p = multiprocessing.Process(target=sub_target, args=(evt,))
  315. p.start()
  316. p.join()
  317. @skipIf(sys.platform.startswith('win'), 'Required signals not supported on windows')
  318. def test_signal_processing_handle_signals_called(self):
  319. 'Validate SignalHandlingProcess handles signals'
  320. # Gloobal event to stop all processes we're creating
  321. evt = multiprocessing.Event()
  322. # Create a process to test signal handler
  323. val = multiprocessing.Value('i', 0)
  324. proc = salt.utils.process.SignalHandlingProcess(
  325. target=self.pid_setting_target,
  326. args=(self.run_forever_sub_target, val, evt),
  327. )
  328. proc.start()
  329. # Create a second process that should not respond to SIGINT or SIGTERM
  330. proc2 = multiprocessing.Process(
  331. target=self.run_forever_target,
  332. args=(self.run_forever_sub_target, evt),
  333. )
  334. proc2.start()
  335. # Wait for the sub process to set it's pid
  336. while not val.value:
  337. time.sleep(.3)
  338. assert not proc.signal_handled()
  339. # Send a signal that should get handled by the subprocess
  340. os.kill(val.value, signal.SIGTERM)
  341. # wait up to 10 seconds for signal handler:
  342. start = time.time()
  343. while time.time() - start < 10:
  344. if proc.signal_handled():
  345. break
  346. time.sleep(.3)
  347. try:
  348. # Allow some time for the signal handler to do it's thing
  349. assert proc.signal_handled()
  350. # Reap the signaled process
  351. proc.join(1)
  352. assert proc2.is_alive()
  353. finally:
  354. evt.set()
  355. proc2.join(30)
  356. proc.join(30)
  357. class TestSignalHandlingProcessCallbacks(TestCase):
  358. @staticmethod
  359. def process_target(evt):
  360. evt.set()
  361. def test_callbacks(self):
  362. 'Validate SignalHandlingProcess call after fork and finalize methods'
  363. teardown_to_mock = 'salt.log.setup.shutdown_multiprocessing_logging'
  364. log_to_mock = 'salt.utils.process.Process._setup_process_logging'
  365. sig_to_mock = 'salt.utils.process.SignalHandlingProcess._setup_signals'
  366. # Mock _setup_signals so we do not register one for this process.
  367. evt = multiprocessing.Event()
  368. with patch(sig_to_mock):
  369. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  370. sh_proc = salt.utils.process.SignalHandlingProcess(
  371. target=self.process_target,
  372. args=(evt,)
  373. )
  374. sh_proc.run()
  375. assert evt.is_set()
  376. ma.assert_called()
  377. mb.assert_called()
  378. def test_callbacks_called_when_run_overriden(self):
  379. 'Validate SignalHandlingProcess sub classes call after fork and finalize methods when run is overridden'
  380. class MyProcess(salt.utils.process.SignalHandlingProcess):
  381. def __init__(self):
  382. super(MyProcess, self).__init__()
  383. self.evt = multiprocessing.Event()
  384. def run(self):
  385. self.evt.set()
  386. teardown_to_mock = 'salt.log.setup.shutdown_multiprocessing_logging'
  387. log_to_mock = 'salt.utils.process.Process._setup_process_logging'
  388. sig_to_mock = 'salt.utils.process.SignalHandlingProcess._setup_signals'
  389. # Mock _setup_signals so we do not register one for this process.
  390. with patch(sig_to_mock):
  391. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  392. sh_proc = MyProcess()
  393. sh_proc.run()
  394. assert sh_proc.evt.is_set()
  395. ma.assert_called()
  396. mb.assert_called()
  397. class TestDup2(TestCase):
  398. def test_dup2_no_fileno(self):
  399. 'The dup2 method does not fail on streams without fileno support'
  400. f1 = io.StringIO("some initial text data")
  401. f2 = io.StringIO("some initial other text data")
  402. with self.assertRaises(io.UnsupportedOperation):
  403. f1.fileno()
  404. with patch('os.dup2') as dup_mock:
  405. try:
  406. salt.utils.process.dup2(f1, f2)
  407. except io.UnsupportedOperation:
  408. assert False, 'io.UnsupportedOperation was raised'
  409. assert not dup_mock.called
  410. def null_target():
  411. pass
  412. def event_target(event):
  413. while True:
  414. if event.wait(5):
  415. break
  416. class TestProcessList(TestCase):
  417. @staticmethod
  418. def wait_for_proc(proc, timeout=10):
  419. start = time.time()
  420. while proc.is_alive():
  421. if time.time() - start > timeout:
  422. raise Exception("Process did not finishe before timeout")
  423. time.sleep(.3)
  424. def test_process_list_process(self):
  425. plist = salt.utils.process.SubprocessList()
  426. proc = multiprocessing.Process(target=null_target)
  427. proc.start()
  428. plist.add(proc)
  429. assert proc in plist.processes
  430. self.wait_for_proc(proc)
  431. assert not proc.is_alive()
  432. plist.cleanup()
  433. assert proc not in plist.processes
  434. def test_process_list_thread(self):
  435. plist = salt.utils.process.SubprocessList()
  436. thread = threading.Thread(target=null_target)
  437. thread.start()
  438. plist.add(thread)
  439. assert thread in plist.processes
  440. self.wait_for_proc(thread)
  441. assert not thread.is_alive()
  442. plist.cleanup()
  443. assert thread not in plist.processes
  444. def test_process_list_cleanup(self):
  445. plist = salt.utils.process.SubprocessList()
  446. event = multiprocessing.Event()
  447. proc = multiprocessing.Process(target=event_target, args=[event])
  448. proc.start()
  449. plist.add(proc)
  450. assert proc in plist.processes
  451. plist.cleanup()
  452. event.set()
  453. assert proc in plist.processes
  454. self.wait_for_proc(proc)
  455. assert not proc.is_alive()
  456. plist.cleanup()
  457. assert proc not in plist.processes
  458. class TestDeprecatedClassNames(TestCase):
  459. @staticmethod
  460. def process_target():
  461. pass
  462. @staticmethod
  463. def patched_warn_until_date(current_date):
  464. def _patched_warn_until_date(date,
  465. message,
  466. category=DeprecationWarning,
  467. stacklevel=None,
  468. _current_date=current_date,
  469. _dont_call_warnings=False):
  470. # Because we add another function in between, the stacklevel
  471. # set in salt.utils.process, 3, needs to now be 4
  472. stacklevel = 4
  473. return warn_until_date(date,
  474. message,
  475. category=category,
  476. stacklevel=stacklevel,
  477. _current_date=_current_date,
  478. _dont_call_warnings=_dont_call_warnings)
  479. return _patched_warn_until_date
  480. def test_multiprocessing_process_warning(self):
  481. # We *always* want *all* warnings thrown on this module
  482. warnings.filterwarnings('always', '', DeprecationWarning, __name__)
  483. fake_utcnow = datetime.date(2021, 1, 1)
  484. proc = None
  485. try:
  486. with patch('salt.utils.versions.warn_until_date', self.patched_warn_until_date(fake_utcnow)):
  487. # Test warning
  488. with warnings.catch_warnings(record=True) as recorded_warnings:
  489. proc = salt.utils.process.MultiprocessingProcess(target=self.process_target)
  490. self.assertEqual(
  491. 'Please stop using \'salt.utils.process.MultiprocessingProcess\' '
  492. 'and instead use \'salt.utils.process.Process\'. '
  493. '\'salt.utils.process.MultiprocessingProcess\' will go away '
  494. 'after 2022-01-01.',
  495. six.text_type(recorded_warnings[0].message)
  496. )
  497. finally:
  498. if proc is not None:
  499. del proc
  500. def test_multiprocessing_process_runtime_error(self):
  501. fake_utcnow = datetime.date(2022, 1, 1)
  502. proc = None
  503. try:
  504. with patch('salt.utils.versions.warn_until_date', self.patched_warn_until_date(fake_utcnow)):
  505. with self.assertRaisesRegex(
  506. RuntimeError,
  507. r"Please stop using 'salt.utils.process.MultiprocessingProcess' "
  508. r"and instead use 'salt.utils.process.Process'. "
  509. r"'salt.utils.process.MultiprocessingProcess' will go away "
  510. r'after 2022-01-01. '
  511. r'This warning\(now exception\) triggered on '
  512. r"filename '(.*)test_process.py', line number ([\d]+), is "
  513. r'supposed to be shown until ([\d-]+). Today is ([\d-]+). '
  514. r'Please remove the warning.'):
  515. proc = salt.utils.process.MultiprocessingProcess(target=self.process_target)
  516. finally:
  517. if proc is not None:
  518. del proc
  519. def test_signal_handling_multiprocessing_process_warning(self):
  520. # We *always* want *all* warnings thrown on this module
  521. warnings.filterwarnings('always', '', DeprecationWarning, __name__)
  522. fake_utcnow = datetime.date(2021, 1, 1)
  523. proc = None
  524. try:
  525. with patch('salt.utils.versions.warn_until_date', self.patched_warn_until_date(fake_utcnow)):
  526. # Test warning
  527. with warnings.catch_warnings(record=True) as recorded_warnings:
  528. proc = salt.utils.process.SignalHandlingMultiprocessingProcess(target=self.process_target)
  529. self.assertEqual(
  530. 'Please stop using \'salt.utils.process.SignalHandlingMultiprocessingProcess\' '
  531. 'and instead use \'salt.utils.process.SignalHandlingProcess\'. '
  532. '\'salt.utils.process.SignalHandlingMultiprocessingProcess\' will go away '
  533. 'after 2022-01-01.',
  534. six.text_type(recorded_warnings[0].message)
  535. )
  536. finally:
  537. if proc is not None:
  538. del proc
  539. def test_signal_handling_multiprocessing_process_runtime_error(self):
  540. fake_utcnow = datetime.date(2022, 1, 1)
  541. proc = None
  542. try:
  543. with patch('salt.utils.versions.warn_until_date', self.patched_warn_until_date(fake_utcnow)):
  544. with self.assertRaisesRegex(
  545. RuntimeError,
  546. r"Please stop using 'salt.utils.process.SignalHandlingMultiprocessingProcess' "
  547. r"and instead use 'salt.utils.process.SignalHandlingProcess'. "
  548. r"'salt.utils.process.SignalHandlingMultiprocessingProcess' will go away "
  549. r'after 2022-01-01. '
  550. r'This warning\(now exception\) triggered on '
  551. r"filename '(.*)test_process.py', line number ([\d]+), is "
  552. r'supposed to be shown until ([\d-]+). Today is ([\d-]+). '
  553. r'Please remove the warning.'):
  554. proc = salt.utils.process.SignalHandlingMultiprocessingProcess(target=self.process_target)
  555. finally:
  556. if proc is not None:
  557. del proc