test_process.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import datetime
  4. import functools
  5. import io
  6. import multiprocessing
  7. import os
  8. import signal
  9. import sys
  10. import threading
  11. import time
  12. import warnings
  13. import salt.utils.platform
  14. import salt.utils.process
  15. from salt.ext import six
  16. from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
  17. from salt.utils.versions import warn_until_date
  18. from tests.support.helpers import slowTest
  19. from tests.support.mock import patch
  20. from tests.support.unit import TestCase, skipIf
  21. HAS_PSUTIL = False
  22. try:
  23. import psutil
  24. HAS_PSUTIL = True
  25. except ImportError:
  26. pass
  27. def die(func):
  28. """
  29. Add proc title
  30. """
  31. @functools.wraps(func)
  32. def wrapper(self):
  33. # Strip off the "test_" from the function name
  34. name = func.__name__[5:]
  35. def _die():
  36. salt.utils.process.appendproctitle("test_{0}".format(name))
  37. attrname = "die_" + name
  38. setattr(self, attrname, _die)
  39. self.addCleanup(delattr, self, attrname)
  40. return wrapper
  41. def incr(func):
  42. """
  43. Increment counter
  44. """
  45. @functools.wraps(func)
  46. def wrapper(self):
  47. # Strip off the "test_" from the function name
  48. name = func.__name__[5:]
  49. def _incr(counter, num):
  50. salt.utils.process.appendproctitle("test_{0}".format(name))
  51. for _ in range(0, num):
  52. counter.value += 1
  53. attrname = "incr_" + name
  54. setattr(self, attrname, _incr)
  55. self.addCleanup(delattr, self, attrname)
  56. return wrapper
  57. def spin(func):
  58. """
  59. Spin indefinitely
  60. """
  61. @functools.wraps(func)
  62. def wrapper(self):
  63. # Strip off the "test_" from the function name
  64. name = func.__name__[5:]
  65. def _spin():
  66. salt.utils.process.appendproctitle("test_{0}".format(name))
  67. while True:
  68. time.sleep(1)
  69. attrname = "spin_" + name
  70. setattr(self, attrname, _spin)
  71. self.addCleanup(delattr, self, attrname)
  72. return wrapper
  73. class TestProcessManager(TestCase):
  74. @spin
  75. @slowTest
  76. def test_basic(self):
  77. """
  78. Make sure that the process is alive 2s later
  79. """
  80. process_manager = salt.utils.process.ProcessManager()
  81. process_manager.add_process(self.spin_basic)
  82. initial_pid = next(six.iterkeys(process_manager._process_map))
  83. time.sleep(2)
  84. process_manager.check_children()
  85. try:
  86. assert initial_pid == next(six.iterkeys(process_manager._process_map))
  87. finally:
  88. process_manager.stop_restarting()
  89. process_manager.kill_children()
  90. time.sleep(0.5)
  91. # Are there child processes still running?
  92. if process_manager._process_map.keys():
  93. process_manager.send_signal_to_processes(signal.SIGKILL)
  94. process_manager.stop_restarting()
  95. process_manager.kill_children()
  96. @spin
  97. def test_kill(self):
  98. process_manager = salt.utils.process.ProcessManager()
  99. process_manager.add_process(self.spin_kill)
  100. initial_pid = next(six.iterkeys(process_manager._process_map))
  101. # kill the child
  102. if salt.utils.platform.is_windows():
  103. os.kill(initial_pid, signal.SIGTERM)
  104. else:
  105. os.kill(initial_pid, signal.SIGKILL)
  106. # give the OS time to give the signal...
  107. time.sleep(0.1)
  108. process_manager.check_children()
  109. try:
  110. assert initial_pid != next(six.iterkeys(process_manager._process_map))
  111. finally:
  112. process_manager.stop_restarting()
  113. process_manager.kill_children()
  114. time.sleep(0.5)
  115. # Are there child processes still running?
  116. if process_manager._process_map.keys():
  117. process_manager.send_signal_to_processes(signal.SIGKILL)
  118. process_manager.stop_restarting()
  119. process_manager.kill_children()
  120. @die
  121. def test_restarting(self):
  122. """
  123. Make sure that the process is alive 2s later
  124. """
  125. process_manager = salt.utils.process.ProcessManager()
  126. process_manager.add_process(self.die_restarting)
  127. initial_pid = next(six.iterkeys(process_manager._process_map))
  128. time.sleep(2)
  129. process_manager.check_children()
  130. try:
  131. assert initial_pid != next(six.iterkeys(process_manager._process_map))
  132. finally:
  133. process_manager.stop_restarting()
  134. process_manager.kill_children()
  135. time.sleep(0.5)
  136. # Are there child processes still running?
  137. if process_manager._process_map.keys():
  138. process_manager.send_signal_to_processes(signal.SIGKILL)
  139. process_manager.stop_restarting()
  140. process_manager.kill_children()
  141. @skipIf(sys.version_info < (2, 7), "Needs > Py 2.7 due to bug in stdlib")
  142. @incr
  143. def test_counter(self):
  144. counter = multiprocessing.Value("i", 0)
  145. process_manager = salt.utils.process.ProcessManager()
  146. process_manager.add_process(self.incr_counter, args=(counter, 2))
  147. time.sleep(1)
  148. process_manager.check_children()
  149. time.sleep(1)
  150. # we should have had 2 processes go at it
  151. try:
  152. assert counter.value == 4
  153. finally:
  154. process_manager.stop_restarting()
  155. process_manager.kill_children()
  156. time.sleep(0.5)
  157. # Are there child processes still running?
  158. if process_manager._process_map.keys():
  159. process_manager.send_signal_to_processes(signal.SIGKILL)
  160. process_manager.stop_restarting()
  161. process_manager.kill_children()
  162. class TestThreadPool(TestCase):
  163. @slowTest
  164. def test_basic(self):
  165. """
  166. Make sure the threadpool can do things
  167. """
  168. def incr_counter(counter):
  169. counter.value += 1
  170. counter = multiprocessing.Value("i", 0)
  171. pool = salt.utils.process.ThreadPool()
  172. sent = pool.fire_async(incr_counter, args=(counter,))
  173. self.assertTrue(sent)
  174. time.sleep(1) # Sleep to let the threads do things
  175. self.assertEqual(counter.value, 1)
  176. self.assertEqual(pool._job_queue.qsize(), 0)
  177. @slowTest
  178. def test_full_queue(self):
  179. """
  180. Make sure that a full threadpool acts as we expect
  181. """
  182. def incr_counter(counter):
  183. counter.value += 1
  184. counter = multiprocessing.Value("i", 0)
  185. # Create a pool with no workers and 1 queue size
  186. pool = salt.utils.process.ThreadPool(0, 1)
  187. # make sure we can put the one item in
  188. sent = pool.fire_async(incr_counter, args=(counter,))
  189. self.assertTrue(sent)
  190. # make sure we can't put more in
  191. sent = pool.fire_async(incr_counter, args=(counter,))
  192. self.assertFalse(sent)
  193. time.sleep(1) # Sleep to let the threads do things
  194. # make sure no one updated the counter
  195. self.assertEqual(counter.value, 0)
  196. # make sure the queue is still full
  197. self.assertEqual(pool._job_queue.qsize(), 1)
  198. class TestProcess(TestCase):
  199. def test_daemonize_if(self):
  200. # pylint: disable=assignment-from-none
  201. with patch("sys.argv", ["salt-call"]):
  202. ret = salt.utils.process.daemonize_if({})
  203. self.assertEqual(None, ret)
  204. ret = salt.utils.process.daemonize_if({"multiprocessing": False})
  205. self.assertEqual(None, ret)
  206. with patch("sys.platform", "win"):
  207. ret = salt.utils.process.daemonize_if({})
  208. self.assertEqual(None, ret)
  209. with patch("salt.utils.process.daemonize"), patch("sys.platform", "linux2"):
  210. salt.utils.process.daemonize_if({})
  211. self.assertTrue(salt.utils.process.daemonize.called)
  212. # pylint: enable=assignment-from-none
  213. class TestProcessCallbacks(TestCase):
  214. @staticmethod
  215. def process_target(evt):
  216. evt.set()
  217. def test_callbacks(self):
  218. "Validate Process call after fork and finalize methods"
  219. teardown_to_mock = "salt.log.setup.shutdown_multiprocessing_logging"
  220. log_to_mock = "salt.utils.process.Process._setup_process_logging"
  221. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  222. evt = multiprocessing.Event()
  223. proc = salt.utils.process.Process(target=self.process_target, args=(evt,))
  224. proc.run()
  225. assert evt.is_set()
  226. mb.assert_called()
  227. ma.assert_called()
  228. def test_callbacks_called_when_run_overridden(self):
  229. "Validate Process sub classes call after fork and finalize methods when run is overridden"
  230. class MyProcess(salt.utils.process.Process):
  231. def __init__(self):
  232. super(MyProcess, self).__init__()
  233. self.evt = multiprocessing.Event()
  234. def run(self):
  235. self.evt.set()
  236. teardown_to_mock = "salt.log.setup.shutdown_multiprocessing_logging"
  237. log_to_mock = "salt.utils.process.Process._setup_process_logging"
  238. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  239. proc = MyProcess()
  240. proc.run()
  241. assert proc.evt.is_set()
  242. ma.assert_called()
  243. mb.assert_called()
  244. class TestSignalHandlingProcess(TestCase):
  245. @classmethod
  246. def Process(cls, pid):
  247. raise psutil.NoSuchProcess(pid)
  248. @classmethod
  249. def target(cls):
  250. os.kill(os.getpid(), signal.SIGTERM)
  251. @classmethod
  252. def children(cls, *args, **kwargs):
  253. raise psutil.NoSuchProcess(1)
  254. def test_process_does_not_exist(self):
  255. try:
  256. with patch("psutil.Process", self.Process):
  257. proc = salt.utils.process.SignalHandlingProcess(target=self.target)
  258. proc.start()
  259. except psutil.NoSuchProcess:
  260. assert False, "psutil.NoSuchProcess raised"
  261. def test_process_children_do_not_exist(self):
  262. try:
  263. with patch("psutil.Process.children", self.children):
  264. proc = salt.utils.process.SignalHandlingProcess(target=self.target)
  265. proc.start()
  266. except psutil.NoSuchProcess:
  267. assert False, "psutil.NoSuchProcess raised"
  268. @staticmethod
  269. def run_forever_sub_target(evt):
  270. "Used by run_forever_target to create a sub-process"
  271. while not evt.is_set():
  272. time.sleep(1)
  273. @staticmethod
  274. def run_forever_target(sub_target, evt):
  275. "A target that will run forever or until an event is set"
  276. p = multiprocessing.Process(target=sub_target, args=(evt,))
  277. p.start()
  278. p.join()
  279. @staticmethod
  280. def kill_target_sub_proc():
  281. pid = os.fork()
  282. if pid == 0:
  283. return
  284. pid = os.fork()
  285. if pid == 0:
  286. return
  287. time.sleep(0.1)
  288. try:
  289. os.kill(os.getpid(), signal.SIGINT)
  290. except KeyboardInterrupt:
  291. pass
  292. @skipIf(sys.platform.startswith("win"), "No os.fork on Windows")
  293. @slowTest
  294. def test_signal_processing_regression_test(self):
  295. evt = multiprocessing.Event()
  296. sh_proc = salt.utils.process.SignalHandlingProcess(
  297. target=self.run_forever_target, args=(self.run_forever_sub_target, evt)
  298. )
  299. sh_proc.start()
  300. proc = multiprocessing.Process(target=self.kill_target_sub_proc)
  301. proc.start()
  302. proc.join()
  303. # When the bug exists, the kill_target_sub_proc signal will kill both
  304. # processes. sh_proc will be alive if the bug is fixed
  305. try:
  306. assert sh_proc.is_alive()
  307. finally:
  308. evt.set()
  309. sh_proc.join()
  310. @staticmethod
  311. def no_op_target():
  312. pass
  313. @staticmethod
  314. def pid_setting_target(sub_target, val, evt):
  315. val.value = os.getpid()
  316. p = multiprocessing.Process(target=sub_target, args=(evt,))
  317. p.start()
  318. p.join()
  319. @skipIf(sys.platform.startswith("win"), "Required signals not supported on windows")
  320. @slowTest
  321. def test_signal_processing_handle_signals_called(self):
  322. "Validate SignalHandlingProcess handles signals"
  323. # Gloobal event to stop all processes we're creating
  324. evt = multiprocessing.Event()
  325. # Create a process to test signal handler
  326. val = multiprocessing.Value("i", 0)
  327. proc = salt.utils.process.SignalHandlingProcess(
  328. target=self.pid_setting_target,
  329. args=(self.run_forever_sub_target, val, evt),
  330. )
  331. proc.start()
  332. # Create a second process that should not respond to SIGINT or SIGTERM
  333. proc2 = multiprocessing.Process(
  334. target=self.run_forever_target, args=(self.run_forever_sub_target, evt),
  335. )
  336. proc2.start()
  337. # Wait for the sub process to set its pid
  338. while not val.value:
  339. time.sleep(0.3)
  340. assert not proc.signal_handled()
  341. # Send a signal that should get handled by the subprocess
  342. os.kill(val.value, signal.SIGTERM)
  343. # wait up to 10 seconds for signal handler:
  344. start = time.time()
  345. while time.time() - start < 10:
  346. if proc.signal_handled():
  347. break
  348. time.sleep(0.3)
  349. try:
  350. # Allow some time for the signal handler to do its thing
  351. assert proc.signal_handled()
  352. # Reap the signaled process
  353. proc.join(1)
  354. assert proc2.is_alive()
  355. finally:
  356. evt.set()
  357. proc2.join(30)
  358. proc.join(30)
  359. class TestSignalHandlingProcessCallbacks(TestCase):
  360. @staticmethod
  361. def process_target(evt):
  362. evt.set()
  363. def test_callbacks(self):
  364. "Validate SignalHandlingProcess call after fork and finalize methods"
  365. teardown_to_mock = "salt.log.setup.shutdown_multiprocessing_logging"
  366. log_to_mock = "salt.utils.process.Process._setup_process_logging"
  367. sig_to_mock = "salt.utils.process.SignalHandlingProcess._setup_signals"
  368. # Mock _setup_signals so we do not register one for this process.
  369. evt = multiprocessing.Event()
  370. with patch(sig_to_mock):
  371. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  372. sh_proc = salt.utils.process.SignalHandlingProcess(
  373. target=self.process_target, args=(evt,)
  374. )
  375. sh_proc.run()
  376. assert evt.is_set()
  377. ma.assert_called()
  378. mb.assert_called()
  379. def test_callbacks_called_when_run_overridden(self):
  380. "Validate SignalHandlingProcess sub classes call after fork and finalize methods when run is overridden"
  381. class MyProcess(salt.utils.process.SignalHandlingProcess):
  382. def __init__(self):
  383. super(MyProcess, self).__init__()
  384. self.evt = multiprocessing.Event()
  385. def run(self):
  386. self.evt.set()
  387. teardown_to_mock = "salt.log.setup.shutdown_multiprocessing_logging"
  388. log_to_mock = "salt.utils.process.Process._setup_process_logging"
  389. sig_to_mock = "salt.utils.process.SignalHandlingProcess._setup_signals"
  390. # Mock _setup_signals so we do not register one for this process.
  391. with patch(sig_to_mock):
  392. with patch(teardown_to_mock) as ma, patch(log_to_mock) as mb:
  393. sh_proc = MyProcess()
  394. sh_proc.run()
  395. assert sh_proc.evt.is_set()
  396. ma.assert_called()
  397. mb.assert_called()
  398. class TestDup2(TestCase):
  399. def test_dup2_no_fileno(self):
  400. "The dup2 method does not fail on streams without fileno support"
  401. f1 = io.StringIO("some initial text data")
  402. f2 = io.StringIO("some initial other text data")
  403. with self.assertRaises(io.UnsupportedOperation):
  404. f1.fileno()
  405. with patch("os.dup2") as dup_mock:
  406. try:
  407. salt.utils.process.dup2(f1, f2)
  408. except io.UnsupportedOperation:
  409. assert False, "io.UnsupportedOperation was raised"
  410. assert not dup_mock.called
  411. def null_target():
  412. pass
  413. def event_target(event):
  414. while True:
  415. if event.wait(5):
  416. break
  417. class TestProcessList(TestCase):
  418. @staticmethod
  419. def wait_for_proc(proc, timeout=10):
  420. start = time.time()
  421. while proc.is_alive():
  422. if time.time() - start > timeout:
  423. raise Exception("Process did not finishe before timeout")
  424. time.sleep(0.3)
  425. @slowTest
  426. def test_process_list_process(self):
  427. plist = salt.utils.process.SubprocessList()
  428. proc = multiprocessing.Process(target=null_target)
  429. proc.start()
  430. plist.add(proc)
  431. assert proc in plist.processes
  432. self.wait_for_proc(proc)
  433. assert not proc.is_alive()
  434. plist.cleanup()
  435. assert proc not in plist.processes
  436. def test_process_list_thread(self):
  437. plist = salt.utils.process.SubprocessList()
  438. thread = threading.Thread(target=null_target)
  439. thread.start()
  440. plist.add(thread)
  441. assert thread in plist.processes
  442. self.wait_for_proc(thread)
  443. assert not thread.is_alive()
  444. plist.cleanup()
  445. assert thread not in plist.processes
  446. @slowTest
  447. def test_process_list_cleanup(self):
  448. plist = salt.utils.process.SubprocessList()
  449. event = multiprocessing.Event()
  450. proc = multiprocessing.Process(target=event_target, args=[event])
  451. proc.start()
  452. plist.add(proc)
  453. assert proc in plist.processes
  454. plist.cleanup()
  455. event.set()
  456. assert proc in plist.processes
  457. self.wait_for_proc(proc)
  458. assert not proc.is_alive()
  459. plist.cleanup()
  460. assert proc not in plist.processes
  461. class TestDeprecatedClassNames(TestCase):
  462. @staticmethod
  463. def process_target():
  464. pass
  465. @staticmethod
  466. def patched_warn_until_date(current_date):
  467. def _patched_warn_until_date(
  468. date,
  469. message,
  470. category=DeprecationWarning,
  471. stacklevel=None,
  472. _current_date=current_date,
  473. _dont_call_warnings=False,
  474. ):
  475. # Because we add another function in between, the stacklevel
  476. # set in salt.utils.process, 3, needs to now be 4
  477. stacklevel = 4
  478. return warn_until_date(
  479. date,
  480. message,
  481. category=category,
  482. stacklevel=stacklevel,
  483. _current_date=_current_date,
  484. _dont_call_warnings=_dont_call_warnings,
  485. )
  486. return _patched_warn_until_date
  487. def test_multiprocessing_process_warning(self):
  488. # We *always* want *all* warnings thrown on this module
  489. warnings.filterwarnings("always", "", DeprecationWarning, __name__)
  490. fake_utcnow = datetime.date(2021, 1, 1)
  491. proc = None
  492. try:
  493. with patch(
  494. "salt.utils.versions.warn_until_date",
  495. self.patched_warn_until_date(fake_utcnow),
  496. ):
  497. # Test warning
  498. with warnings.catch_warnings(record=True) as recorded_warnings:
  499. proc = salt.utils.process.MultiprocessingProcess(
  500. target=self.process_target
  501. )
  502. self.assertEqual(
  503. "Please stop using 'salt.utils.process.MultiprocessingProcess' "
  504. "and instead use 'salt.utils.process.Process'. "
  505. "'salt.utils.process.MultiprocessingProcess' will go away "
  506. "after 2022-01-01.",
  507. six.text_type(recorded_warnings[0].message),
  508. )
  509. finally:
  510. if proc is not None:
  511. del proc
  512. def test_multiprocessing_process_runtime_error(self):
  513. fake_utcnow = datetime.date(2022, 1, 1)
  514. proc = None
  515. try:
  516. with patch(
  517. "salt.utils.versions.warn_until_date",
  518. self.patched_warn_until_date(fake_utcnow),
  519. ):
  520. with self.assertRaisesRegex(
  521. RuntimeError,
  522. r"Please stop using 'salt.utils.process.MultiprocessingProcess' "
  523. r"and instead use 'salt.utils.process.Process'. "
  524. r"'salt.utils.process.MultiprocessingProcess' will go away "
  525. r"after 2022-01-01. "
  526. r"This warning\(now exception\) triggered on "
  527. r"filename '(.*)test_process.py', line number ([\d]+), is "
  528. r"supposed to be shown until ([\d-]+). Today is ([\d-]+). "
  529. r"Please remove the warning.",
  530. ):
  531. proc = salt.utils.process.MultiprocessingProcess(
  532. target=self.process_target
  533. )
  534. finally:
  535. if proc is not None:
  536. del proc
  537. def test_signal_handling_multiprocessing_process_warning(self):
  538. # We *always* want *all* warnings thrown on this module
  539. warnings.filterwarnings("always", "", DeprecationWarning, __name__)
  540. fake_utcnow = datetime.date(2021, 1, 1)
  541. proc = None
  542. try:
  543. with patch(
  544. "salt.utils.versions.warn_until_date",
  545. self.patched_warn_until_date(fake_utcnow),
  546. ):
  547. # Test warning
  548. with warnings.catch_warnings(record=True) as recorded_warnings:
  549. proc = salt.utils.process.SignalHandlingMultiprocessingProcess(
  550. target=self.process_target
  551. )
  552. self.assertEqual(
  553. "Please stop using 'salt.utils.process.SignalHandlingMultiprocessingProcess' "
  554. "and instead use 'salt.utils.process.SignalHandlingProcess'. "
  555. "'salt.utils.process.SignalHandlingMultiprocessingProcess' will go away "
  556. "after 2022-01-01.",
  557. six.text_type(recorded_warnings[0].message),
  558. )
  559. finally:
  560. if proc is not None:
  561. del proc
  562. def test_signal_handling_multiprocessing_process_runtime_error(self):
  563. fake_utcnow = datetime.date(2022, 1, 1)
  564. proc = None
  565. try:
  566. with patch(
  567. "salt.utils.versions.warn_until_date",
  568. self.patched_warn_until_date(fake_utcnow),
  569. ):
  570. with self.assertRaisesRegex(
  571. RuntimeError,
  572. r"Please stop using 'salt.utils.process.SignalHandlingMultiprocessingProcess' "
  573. r"and instead use 'salt.utils.process.SignalHandlingProcess'. "
  574. r"'salt.utils.process.SignalHandlingMultiprocessingProcess' will go away "
  575. r"after 2022-01-01. "
  576. r"This warning\(now exception\) triggered on "
  577. r"filename '(.*)test_process.py', line number ([\d]+), is "
  578. r"supposed to be shown until ([\d-]+). Today is ([\d-]+). "
  579. r"Please remove the warning.",
  580. ):
  581. proc = salt.utils.process.SignalHandlingMultiprocessingProcess(
  582. target=self.process_target
  583. )
  584. finally:
  585. if proc is not None:
  586. del proc
  587. class CMORProcessHelper:
  588. def __init__(self, file_name):
  589. self._lock = threading.Lock()
  590. self._running = True
  591. self._queue = multiprocessing.Queue()
  592. self._ret_queue = multiprocessing.Queue()
  593. self._process = multiprocessing.Process(
  594. target=self.test_process,
  595. args=(file_name, self._queue, self._ret_queue),
  596. daemon=True,
  597. )
  598. self._process.start()
  599. def __enter__(self):
  600. return self
  601. def __exit__(self, exc_type, exc_val, exc_tb):
  602. self.stop()
  603. def claim(self):
  604. try:
  605. self._lock.acquire()
  606. if self._running:
  607. self._queue.put("claim")
  608. return self._ret_queue.get(timeout=10)
  609. finally:
  610. self._lock.release()
  611. def stop(self):
  612. try:
  613. self._lock.acquire()
  614. if self._running:
  615. self._running = False
  616. self._queue.put("stop")
  617. self._process.join(timeout=10)
  618. self._queue.close()
  619. self._ret_queue.close()
  620. finally:
  621. self._lock.release()
  622. @property
  623. def pid(self):
  624. return self._process.pid
  625. @staticmethod
  626. def test_process(file_name, queue, ret_queue):
  627. while True:
  628. action = queue.get()
  629. if action == "claim":
  630. ret_queue.put(
  631. salt.utils.process.claim_mantle_of_responsibility(file_name)
  632. )
  633. elif action == "stop":
  634. return
  635. @skipIf(not HAS_PSUTIL, "Missing psutil")
  636. class TestGetProcessInfo(TestCase):
  637. def test_this_process(self):
  638. this_process_info = salt.utils.process.get_process_info()
  639. self.assertEqual(
  640. this_process_info, salt.utils.process.get_process_info(os.getpid())
  641. )
  642. self.assertIsNotNone(this_process_info)
  643. for key in ("pid", "name", "start_time"):
  644. self.assertIn(key, this_process_info)
  645. raw_process_info = psutil.Process(os.getpid())
  646. self.assertEqual(this_process_info["pid"], os.getpid())
  647. self.assertEqual(this_process_info["name"], raw_process_info.name())
  648. self.assertEqual(
  649. this_process_info["start_time"], raw_process_info.create_time()
  650. )
  651. def test_random_processes(self):
  652. for _ in range(3):
  653. with CMORProcessHelper("CMOR_TEST_FILE") as p1:
  654. pid = p1.pid
  655. self.assertIsInstance(salt.utils.process.get_process_info(pid), dict)
  656. self.assertIsNone(salt.utils.process.get_process_info(pid))
  657. class TestClaimMantleOfResponsibility(TestCase):
  658. @skipIf(HAS_PSUTIL, "Has psutil")
  659. def test_simple_claim_no_psutil(self):
  660. salt.utils.process.claim_mantle_of_responsibility("CMOR_TEST_FILE")
  661. @skipIf(not HAS_PSUTIL, "Missing psutil")
  662. def test_simple_claim(self):
  663. try:
  664. for _ in range(5):
  665. self.assertTrue(
  666. salt.utils.process.claim_mantle_of_responsibility("CMOR_TEST_FILE")
  667. )
  668. finally:
  669. os.remove("CMOR_TEST_FILE")
  670. @skipIf(not HAS_PSUTIL, "Missing psutil")
  671. def test_multiple_processes(self):
  672. try:
  673. with CMORProcessHelper("CMOR_TEST_FILE") as p1:
  674. self.assertTrue(p1.claim())
  675. self.assertFalse(
  676. salt.utils.process.claim_mantle_of_responsibility("CMOR_TEST_FILE")
  677. )
  678. with CMORProcessHelper("CMOR_TEST_FILE") as p2:
  679. for _ in range(3):
  680. self.assertFalse(p2.claim())
  681. self.assertTrue(p1.claim())
  682. with CMORProcessHelper("CMOR_TEST_FILE") as p1:
  683. self.assertTrue(p1.claim())
  684. self.assertFalse(
  685. salt.utils.process.claim_mantle_of_responsibility("CMOR_TEST_FILE")
  686. )
  687. self.assertTrue(
  688. salt.utils.process.claim_mantle_of_responsibility("CMOR_TEST_FILE")
  689. )
  690. finally:
  691. os.remove("CMOR_TEST_FILE")
  692. class TestCheckMantleOfResponsibility(TestCase):
  693. @skipIf(HAS_PSUTIL, "Has psutil")
  694. def test_simple_claim_no_psutil(self):
  695. try:
  696. self.assertIsNone(
  697. salt.utils.process.check_mantle_of_responsibility("CMOR_TEST_FILE")
  698. )
  699. finally:
  700. os.remove("CMOR_TEST_FILE")
  701. @skipIf(not HAS_PSUTIL, "Missing psutil")
  702. def test_simple_claim(self):
  703. try:
  704. self.assertIsNone(
  705. salt.utils.process.check_mantle_of_responsibility("CMOR_TEST_FILE")
  706. )
  707. salt.utils.process.claim_mantle_of_responsibility("CMOR_TEST_FILE")
  708. pid = salt.utils.process.get_process_info()["pid"]
  709. self.assertEqual(
  710. pid, salt.utils.process.check_mantle_of_responsibility("CMOR_TEST_FILE")
  711. )
  712. finally:
  713. os.remove("CMOR_TEST_FILE")
  714. @skipIf(not HAS_PSUTIL, "Missing psutil")
  715. def test_multiple_processes(self):
  716. try:
  717. self.assertIsNone(
  718. salt.utils.process.check_mantle_of_responsibility("CMOR_TEST_FILE")
  719. )
  720. with CMORProcessHelper("CMOR_TEST_FILE") as p1:
  721. self.assertTrue(p1.claim())
  722. random_pid = salt.utils.process.check_mantle_of_responsibility(
  723. "CMOR_TEST_FILE"
  724. )
  725. self.assertIsInstance(random_pid, int)
  726. with CMORProcessHelper("CMOR_TEST_FILE") as p2:
  727. for _ in range(3):
  728. self.assertFalse(p2.claim())
  729. self.assertEqual(
  730. random_pid,
  731. salt.utils.process.check_mantle_of_responsibility(
  732. "CMOR_TEST_FILE"
  733. ),
  734. )
  735. self.assertIsNone(
  736. salt.utils.process.check_mantle_of_responsibility("CMOR_TEST_FILE")
  737. )
  738. salt.utils.process.claim_mantle_of_responsibility("CMOR_TEST_FILE")
  739. pid = salt.utils.process.get_process_info()["pid"]
  740. self.assertEqual(
  741. pid, salt.utils.process.check_mantle_of_responsibility("CMOR_TEST_FILE")
  742. )
  743. finally:
  744. os.remove("CMOR_TEST_FILE")