test_process.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import datetime
  4. import functools
  5. import io
  6. import multiprocessing
  7. import os
  8. import signal
  9. import sys
  10. import threading
  11. import time
  12. import warnings
  13. import psutil
  14. import salt.utils.platform
  15. import salt.utils.process
  16. from salt.ext import six
  17. from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
  18. from salt.utils.versions import warn_until_date
  19. from tests.support.helpers import slowTest
  20. from tests.support.mock import patch
  21. from tests.support.unit import TestCase, skipIf
  22. def die(func):
  23. """
  24. Add proc title
  25. """
  26. @functools.wraps(func)
  27. def wrapper(self):
  28. # Strip off the "test_" from the function name
  29. name = func.__name__[5:]
  30. def _die():
  31. salt.utils.process.appendproctitle("test_{0}".format(name))
  32. attrname = "die_" + name
  33. setattr(self, attrname, _die)
  34. self.addCleanup(delattr, self, attrname)
  35. return wrapper
  36. def incr(func):
  37. """
  38. Increment counter
  39. """
  40. @functools.wraps(func)
  41. def wrapper(self):
  42. # Strip off the "test_" from the function name
  43. name = func.__name__[5:]
  44. def _incr(counter, num):
  45. salt.utils.process.appendproctitle("test_{0}".format(name))
  46. for _ in range(0, num):
  47. counter.value += 1
  48. attrname = "incr_" + name
  49. setattr(self, attrname, _incr)
  50. self.addCleanup(delattr, self, attrname)
  51. return wrapper
  52. def spin(func):
  53. """
  54. Spin indefinitely
  55. """
  56. @functools.wraps(func)
  57. def wrapper(self):
  58. # Strip off the "test_" from the function name
  59. name = func.__name__[5:]
  60. def _spin():
  61. salt.utils.process.appendproctitle("test_{0}".format(name))
  62. while True:
  63. time.sleep(1)
  64. attrname = "spin_" + name
  65. setattr(self, attrname, _spin)
  66. self.addCleanup(delattr, self, attrname)
  67. return wrapper
  68. class TestProcessManager(TestCase):
  69. @spin
  70. @slowTest
  71. def test_basic(self):
  72. """
  73. Make sure that the process is alive 2s later
  74. """
  75. process_manager = salt.utils.process.ProcessManager()
  76. process_manager.add_process(self.spin_basic)
  77. initial_pid = next(six.iterkeys(process_manager._process_map))
  78. time.sleep(2)
  79. process_manager.check_children()
  80. try:
  81. assert initial_pid == next(six.iterkeys(process_manager._process_map))
  82. finally:
  83. process_manager.stop_restarting()
  84. process_manager.kill_children()
  85. time.sleep(0.5)
  86. # Are there child processes still running?
  87. if process_manager._process_map.keys():
  88. process_manager.send_signal_to_processes(signal.SIGKILL)
  89. process_manager.stop_restarting()
  90. process_manager.kill_children()
  91. @spin
  92. def test_kill(self):
  93. process_manager = salt.utils.process.ProcessManager()
  94. process_manager.add_process(self.spin_kill)
  95. initial_pid = next(six.iterkeys(process_manager._process_map))
  96. # kill the child
  97. if salt.utils.platform.is_windows():
  98. os.kill(initial_pid, signal.SIGTERM)
  99. else:
  100. os.kill(initial_pid, signal.SIGKILL)
  101. # give the OS time to give the signal...
  102. time.sleep(0.1)
  103. process_manager.check_children()
  104. try:
  105. assert initial_pid != next(six.iterkeys(process_manager._process_map))
  106. finally:
  107. process_manager.stop_restarting()
  108. process_manager.kill_children()
  109. time.sleep(0.5)
  110. # Are there child processes still running?
  111. if process_manager._process_map.keys():
  112. process_manager.send_signal_to_processes(signal.SIGKILL)
  113. process_manager.stop_restarting()
  114. process_manager.kill_children()
  115. @die
  116. def test_restarting(self):
  117. """
  118. Make sure that the process is alive 2s later
  119. """
  120. process_manager = salt.utils.process.ProcessManager()
  121. process_manager.add_process(self.die_restarting)
  122. initial_pid = next(six.iterkeys(process_manager._process_map))
  123. time.sleep(2)
  124. process_manager.check_children()
  125. try:
  126. assert initial_pid != next(six.iterkeys(process_manager._process_map))
  127. finally:
  128. process_manager.stop_restarting()
  129. process_manager.kill_children()
  130. time.sleep(0.5)
  131. # Are there child processes still running?
  132. if process_manager._process_map.keys():
  133. process_manager.send_signal_to_processes(signal.SIGKILL)
  134. process_manager.stop_restarting()
  135. process_manager.kill_children()
  136. @skipIf(sys.version_info < (2, 7), "Needs > Py 2.7 due to bug in stdlib")
  137. @incr
  138. def test_counter(self):
  139. counter = multiprocessing.Value("i", 0)
  140. process_manager = salt.utils.process.ProcessManager()
  141. process_manager.add_process(self.incr_counter, args=(counter, 2))
  142. time.sleep(1)
  143. process_manager.check_children()
  144. time.sleep(1)
  145. # we should have had 2 processes go at it
  146. try:
  147. assert counter.value == 4
  148. finally:
  149. process_manager.stop_restarting()
  150. process_manager.kill_children()
  151. time.sleep(0.5)
  152. # Are there child processes still running?
  153. if process_manager._process_map.keys():
  154. process_manager.send_signal_to_processes(signal.SIGKILL)
  155. process_manager.stop_restarting()
  156. process_manager.kill_children()
  157. class TestThreadPool(TestCase):
  158. @slowTest
  159. def test_basic(self):
  160. """
  161. Make sure the threadpool can do things
  162. """
  163. def incr_counter(counter):
  164. counter.value += 1
  165. counter = multiprocessing.Value("i", 0)
  166. pool = salt.utils.process.ThreadPool()
  167. sent = pool.fire_async(incr_counter, args=(counter,))
  168. self.assertTrue(sent)
  169. time.sleep(1) # Sleep to let the threads do things
  170. self.assertEqual(counter.value, 1)
  171. self.assertEqual(pool._job_queue.qsize(), 0)
  172. @slowTest
  173. def test_full_queue(self):
  174. """
  175. Make sure that a full threadpool acts as we expect
  176. """
  177. def incr_counter(counter):
  178. counter.value += 1
  179. counter = multiprocessing.Value("i", 0)
  180. # Create a pool with no workers and 1 queue size
  181. pool = salt.utils.process.ThreadPool(0, 1)
  182. # make sure we can put the one item in
  183. sent = pool.fire_async(incr_counter, args=(counter,))
  184. self.assertTrue(sent)
  185. # make sure we can't put more in
  186. sent = pool.fire_async(incr_counter, args=(counter,))
  187. self.assertFalse(sent)
  188. time.sleep(1) # Sleep to let the threads do things
  189. # make sure no one updated the counter
  190. self.assertEqual(counter.value, 0)
  191. # make sure the queue is still full
  192. self.assertEqual(pool._job_queue.qsize(), 1)
  193. class TestProcess(TestCase):
  194. def test_daemonize_if(self):
  195. # pylint: disable=assignment-from-none
  196. with patch("sys.argv", ["salt-call"]):
  197. ret = salt.utils.process.daemonize_if({})
  198. self.assertEqual(None, ret)
  199. ret = salt.utils.process.daemonize_if({"multiprocessing": False})
  200. self.assertEqual(None, ret)
  201. with patch("sys.platform", "win"):
  202. ret = salt.utils.process.daemonize_if({})
  203. self.assertEqual(None, ret)
  204. with patch("salt.utils.process.daemonize"), patch("sys.platform", "linux2"):
  205. salt.utils.process.daemonize_if({})
  206. self.assertTrue(salt.utils.process.daemonize.called)
  207. # pylint: enable=assignment-from-none
  208. class TestProcessCallbacks(TestCase):
  209. @staticmethod
  210. def process_target(evt):
  211. evt.set()
  212. def test_callbacks(self):
  213. "Validate Process call after fork and finalize methods"
  214. teardown_to_mock = "salt.log.setup.shutdown_multiprocessing_logging"
  215. log_to_mock = "salt.utils.process.Process._setup_process_logging"
  216. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  217. evt = multiprocessing.Event()
  218. proc = salt.utils.process.Process(target=self.process_target, args=(evt,))
  219. proc.run()
  220. assert evt.is_set()
  221. mb.assert_called()
  222. ma.assert_called()
  223. def test_callbacks_called_when_run_overridden(self):
  224. "Validate Process sub classes call after fork and finalize methods when run is overridden"
  225. class MyProcess(salt.utils.process.Process):
  226. def __init__(self):
  227. super(MyProcess, self).__init__()
  228. self.evt = multiprocessing.Event()
  229. def run(self):
  230. self.evt.set()
  231. teardown_to_mock = "salt.log.setup.shutdown_multiprocessing_logging"
  232. log_to_mock = "salt.utils.process.Process._setup_process_logging"
  233. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  234. proc = MyProcess()
  235. proc.run()
  236. assert proc.evt.is_set()
  237. ma.assert_called()
  238. mb.assert_called()
  239. class TestSignalHandlingProcess(TestCase):
  240. @classmethod
  241. def Process(cls, pid):
  242. raise psutil.NoSuchProcess(pid)
  243. @classmethod
  244. def target(cls):
  245. os.kill(os.getpid(), signal.SIGTERM)
  246. @classmethod
  247. def children(cls, *args, **kwargs):
  248. raise psutil.NoSuchProcess(1)
  249. def test_process_does_not_exist(self):
  250. try:
  251. with patch("psutil.Process", self.Process):
  252. proc = salt.utils.process.SignalHandlingProcess(target=self.target)
  253. proc.start()
  254. except psutil.NoSuchProcess:
  255. assert False, "psutil.NoSuchProcess raised"
  256. def test_process_children_do_not_exist(self):
  257. try:
  258. with patch("psutil.Process.children", self.children):
  259. proc = salt.utils.process.SignalHandlingProcess(target=self.target)
  260. proc.start()
  261. except psutil.NoSuchProcess:
  262. assert False, "psutil.NoSuchProcess raised"
  263. @staticmethod
  264. def run_forever_sub_target(evt):
  265. "Used by run_forever_target to create a sub-process"
  266. while not evt.is_set():
  267. time.sleep(1)
  268. @staticmethod
  269. def run_forever_target(sub_target, evt):
  270. "A target that will run forever or until an event is set"
  271. p = multiprocessing.Process(target=sub_target, args=(evt,))
  272. p.start()
  273. p.join()
  274. @staticmethod
  275. def kill_target_sub_proc():
  276. pid = os.fork()
  277. if pid == 0:
  278. return
  279. pid = os.fork()
  280. if pid == 0:
  281. return
  282. time.sleep(0.1)
  283. try:
  284. os.kill(os.getpid(), signal.SIGINT)
  285. except KeyboardInterrupt:
  286. pass
  287. @skipIf(sys.platform.startswith("win"), "No os.fork on Windows")
  288. @slowTest
  289. def test_signal_processing_regression_test(self):
  290. evt = multiprocessing.Event()
  291. sh_proc = salt.utils.process.SignalHandlingProcess(
  292. target=self.run_forever_target, args=(self.run_forever_sub_target, evt)
  293. )
  294. sh_proc.start()
  295. proc = multiprocessing.Process(target=self.kill_target_sub_proc)
  296. proc.start()
  297. proc.join()
  298. # When the bug exists, the kill_target_sub_proc signal will kill both
  299. # processes. sh_proc will be alive if the bug is fixed
  300. try:
  301. assert sh_proc.is_alive()
  302. finally:
  303. evt.set()
  304. sh_proc.join()
  305. @staticmethod
  306. def no_op_target():
  307. pass
  308. @staticmethod
  309. def pid_setting_target(sub_target, val, evt):
  310. val.value = os.getpid()
  311. p = multiprocessing.Process(target=sub_target, args=(evt,))
  312. p.start()
  313. p.join()
  314. @skipIf(sys.platform.startswith("win"), "Required signals not supported on windows")
  315. @slowTest
  316. def test_signal_processing_handle_signals_called(self):
  317. "Validate SignalHandlingProcess handles signals"
  318. # Gloobal event to stop all processes we're creating
  319. evt = multiprocessing.Event()
  320. # Create a process to test signal handler
  321. val = multiprocessing.Value("i", 0)
  322. proc = salt.utils.process.SignalHandlingProcess(
  323. target=self.pid_setting_target,
  324. args=(self.run_forever_sub_target, val, evt),
  325. )
  326. proc.start()
  327. # Create a second process that should not respond to SIGINT or SIGTERM
  328. proc2 = multiprocessing.Process(
  329. target=self.run_forever_target, args=(self.run_forever_sub_target, evt),
  330. )
  331. proc2.start()
  332. # Wait for the sub process to set its pid
  333. while not val.value:
  334. time.sleep(0.3)
  335. assert not proc.signal_handled()
  336. # Send a signal that should get handled by the subprocess
  337. os.kill(val.value, signal.SIGTERM)
  338. # wait up to 10 seconds for signal handler:
  339. start = time.time()
  340. while time.time() - start < 10:
  341. if proc.signal_handled():
  342. break
  343. time.sleep(0.3)
  344. try:
  345. # Allow some time for the signal handler to do its thing
  346. assert proc.signal_handled()
  347. # Reap the signaled process
  348. proc.join(1)
  349. assert proc2.is_alive()
  350. finally:
  351. evt.set()
  352. proc2.join(30)
  353. proc.join(30)
  354. class TestSignalHandlingProcessCallbacks(TestCase):
  355. @staticmethod
  356. def process_target(evt):
  357. evt.set()
  358. def test_callbacks(self):
  359. "Validate SignalHandlingProcess call after fork and finalize methods"
  360. teardown_to_mock = "salt.log.setup.shutdown_multiprocessing_logging"
  361. log_to_mock = "salt.utils.process.Process._setup_process_logging"
  362. sig_to_mock = "salt.utils.process.SignalHandlingProcess._setup_signals"
  363. # Mock _setup_signals so we do not register one for this process.
  364. evt = multiprocessing.Event()
  365. with patch(sig_to_mock):
  366. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  367. sh_proc = salt.utils.process.SignalHandlingProcess(
  368. target=self.process_target, args=(evt,)
  369. )
  370. sh_proc.run()
  371. assert evt.is_set()
  372. ma.assert_called()
  373. mb.assert_called()
  374. def test_callbacks_called_when_run_overridden(self):
  375. "Validate SignalHandlingProcess sub classes call after fork and finalize methods when run is overridden"
  376. class MyProcess(salt.utils.process.SignalHandlingProcess):
  377. def __init__(self):
  378. super(MyProcess, self).__init__()
  379. self.evt = multiprocessing.Event()
  380. def run(self):
  381. self.evt.set()
  382. teardown_to_mock = "salt.log.setup.shutdown_multiprocessing_logging"
  383. log_to_mock = "salt.utils.process.Process._setup_process_logging"
  384. sig_to_mock = "salt.utils.process.SignalHandlingProcess._setup_signals"
  385. # Mock _setup_signals so we do not register one for this process.
  386. with patch(sig_to_mock):
  387. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  388. sh_proc = MyProcess()
  389. sh_proc.run()
  390. assert sh_proc.evt.is_set()
  391. ma.assert_called()
  392. mb.assert_called()
  393. class TestDup2(TestCase):
  394. def test_dup2_no_fileno(self):
  395. "The dup2 method does not fail on streams without fileno support"
  396. f1 = io.StringIO("some initial text data")
  397. f2 = io.StringIO("some initial other text data")
  398. with self.assertRaises(io.UnsupportedOperation):
  399. f1.fileno()
  400. with patch("os.dup2") as dup_mock:
  401. try:
  402. salt.utils.process.dup2(f1, f2)
  403. except io.UnsupportedOperation:
  404. assert False, "io.UnsupportedOperation was raised"
  405. assert not dup_mock.called
  406. def null_target():
  407. pass
  408. def event_target(event):
  409. while True:
  410. if event.wait(5):
  411. break
  412. class TestProcessList(TestCase):
  413. @staticmethod
  414. def wait_for_proc(proc, timeout=10):
  415. start = time.time()
  416. while proc.is_alive():
  417. if time.time() - start > timeout:
  418. raise Exception("Process did not finishe before timeout")
  419. time.sleep(0.3)
  420. @slowTest
  421. def test_process_list_process(self):
  422. plist = salt.utils.process.SubprocessList()
  423. proc = multiprocessing.Process(target=null_target)
  424. proc.start()
  425. plist.add(proc)
  426. assert proc in plist.processes
  427. self.wait_for_proc(proc)
  428. assert not proc.is_alive()
  429. plist.cleanup()
  430. assert proc not in plist.processes
  431. def test_process_list_thread(self):
  432. plist = salt.utils.process.SubprocessList()
  433. thread = threading.Thread(target=null_target)
  434. thread.start()
  435. plist.add(thread)
  436. assert thread in plist.processes
  437. self.wait_for_proc(thread)
  438. assert not thread.is_alive()
  439. plist.cleanup()
  440. assert thread not in plist.processes
  441. @slowTest
  442. def test_process_list_cleanup(self):
  443. plist = salt.utils.process.SubprocessList()
  444. event = multiprocessing.Event()
  445. proc = multiprocessing.Process(target=event_target, args=[event])
  446. proc.start()
  447. plist.add(proc)
  448. assert proc in plist.processes
  449. plist.cleanup()
  450. event.set()
  451. assert proc in plist.processes
  452. self.wait_for_proc(proc)
  453. assert not proc.is_alive()
  454. plist.cleanup()
  455. assert proc not in plist.processes
  456. class TestDeprecatedClassNames(TestCase):
  457. @staticmethod
  458. def process_target():
  459. pass
  460. @staticmethod
  461. def patched_warn_until_date(current_date):
  462. def _patched_warn_until_date(
  463. date,
  464. message,
  465. category=DeprecationWarning,
  466. stacklevel=None,
  467. _current_date=current_date,
  468. _dont_call_warnings=False,
  469. ):
  470. # Because we add another function in between, the stacklevel
  471. # set in salt.utils.process, 3, needs to now be 4
  472. stacklevel = 4
  473. return warn_until_date(
  474. date,
  475. message,
  476. category=category,
  477. stacklevel=stacklevel,
  478. _current_date=_current_date,
  479. _dont_call_warnings=_dont_call_warnings,
  480. )
  481. return _patched_warn_until_date
  482. def test_multiprocessing_process_warning(self):
  483. # We *always* want *all* warnings thrown on this module
  484. warnings.filterwarnings("always", "", DeprecationWarning, __name__)
  485. fake_utcnow = datetime.date(2021, 1, 1)
  486. proc = None
  487. try:
  488. with patch(
  489. "salt.utils.versions.warn_until_date",
  490. self.patched_warn_until_date(fake_utcnow),
  491. ):
  492. # Test warning
  493. with warnings.catch_warnings(record=True) as recorded_warnings:
  494. proc = salt.utils.process.MultiprocessingProcess(
  495. target=self.process_target
  496. )
  497. self.assertEqual(
  498. "Please stop using 'salt.utils.process.MultiprocessingProcess' "
  499. "and instead use 'salt.utils.process.Process'. "
  500. "'salt.utils.process.MultiprocessingProcess' will go away "
  501. "after 2022-01-01.",
  502. six.text_type(recorded_warnings[0].message),
  503. )
  504. finally:
  505. if proc is not None:
  506. del proc
  507. def test_multiprocessing_process_runtime_error(self):
  508. fake_utcnow = datetime.date(2022, 1, 1)
  509. proc = None
  510. try:
  511. with patch(
  512. "salt.utils.versions.warn_until_date",
  513. self.patched_warn_until_date(fake_utcnow),
  514. ):
  515. with self.assertRaisesRegex(
  516. RuntimeError,
  517. r"Please stop using 'salt.utils.process.MultiprocessingProcess' "
  518. r"and instead use 'salt.utils.process.Process'. "
  519. r"'salt.utils.process.MultiprocessingProcess' will go away "
  520. r"after 2022-01-01. "
  521. r"This warning\(now exception\) triggered on "
  522. r"filename '(.*)test_process.py', line number ([\d]+), is "
  523. r"supposed to be shown until ([\d-]+). Today is ([\d-]+). "
  524. r"Please remove the warning.",
  525. ):
  526. proc = salt.utils.process.MultiprocessingProcess(
  527. target=self.process_target
  528. )
  529. finally:
  530. if proc is not None:
  531. del proc
  532. def test_signal_handling_multiprocessing_process_warning(self):
  533. # We *always* want *all* warnings thrown on this module
  534. warnings.filterwarnings("always", "", DeprecationWarning, __name__)
  535. fake_utcnow = datetime.date(2021, 1, 1)
  536. proc = None
  537. try:
  538. with patch(
  539. "salt.utils.versions.warn_until_date",
  540. self.patched_warn_until_date(fake_utcnow),
  541. ):
  542. # Test warning
  543. with warnings.catch_warnings(record=True) as recorded_warnings:
  544. proc = salt.utils.process.SignalHandlingMultiprocessingProcess(
  545. target=self.process_target
  546. )
  547. self.assertEqual(
  548. "Please stop using 'salt.utils.process.SignalHandlingMultiprocessingProcess' "
  549. "and instead use 'salt.utils.process.SignalHandlingProcess'. "
  550. "'salt.utils.process.SignalHandlingMultiprocessingProcess' will go away "
  551. "after 2022-01-01.",
  552. six.text_type(recorded_warnings[0].message),
  553. )
  554. finally:
  555. if proc is not None:
  556. del proc
  557. def test_signal_handling_multiprocessing_process_runtime_error(self):
  558. fake_utcnow = datetime.date(2022, 1, 1)
  559. proc = None
  560. try:
  561. with patch(
  562. "salt.utils.versions.warn_until_date",
  563. self.patched_warn_until_date(fake_utcnow),
  564. ):
  565. with self.assertRaisesRegex(
  566. RuntimeError,
  567. r"Please stop using 'salt.utils.process.SignalHandlingMultiprocessingProcess' "
  568. r"and instead use 'salt.utils.process.SignalHandlingProcess'. "
  569. r"'salt.utils.process.SignalHandlingMultiprocessingProcess' will go away "
  570. r"after 2022-01-01. "
  571. r"This warning\(now exception\) triggered on "
  572. r"filename '(.*)test_process.py', line number ([\d]+), is "
  573. r"supposed to be shown until ([\d-]+). Today is ([\d-]+). "
  574. r"Please remove the warning.",
  575. ):
  576. proc = salt.utils.process.SignalHandlingMultiprocessingProcess(
  577. target=self.process_target
  578. )
  579. finally:
  580. if proc is not None:
  581. del proc