saltfactories_compat.py 74 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030
  1. """
  2. tests.support.saltfactories_virt
  3. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. This module only exists to help with some tests while the Salt code base
  5. is not migrated to a newer salt-factories package
  6. """
  7. # pylint: disable=resource-leakage
  8. import atexit
  9. import json
  10. import logging
  11. import os
  12. import pathlib
  13. import pprint
  14. import shutil
  15. import socket
  16. import subprocess
  17. import sys
  18. import threading
  19. import time
  20. import uuid
  21. from datetime import datetime
  22. import attr # pylint: disable=3rd-party-module-not-gated
  23. import msgpack
  24. import psutil # pylint: disable=3rd-party-module-not-gated
  25. import pytest
  26. import salt.config
  27. import salt.utils.dictupdate
  28. import salt.utils.files
  29. import salt.utils.path
  30. import salt.utils.user
  31. import salt.utils.verify
  32. import salt.utils.yaml
  33. import zmq
  34. from salt.utils.immutabletypes import freeze
  35. from saltfactories import CODE_ROOT_DIR
  36. from saltfactories.exceptions import ProcessNotStarted as FactoryNotStarted
  37. from saltfactories.exceptions import ProcessTimeout as FactoryTimeout
  38. from saltfactories.utils import cli_scripts, ports, random_string
  39. from saltfactories.utils.processes.bases import Popen, ProcessResult, ShellResult
  40. from saltfactories.utils.processes.helpers import (
  41. terminate_process,
  42. terminate_process_list,
  43. )
  44. from tests.support.runtests import RUNTIME_VARS
  45. try:
  46. import docker
  47. from docker.errors import APIError
  48. HAS_DOCKER = True
  49. except ImportError: # pragma: no cover
  50. HAS_DOCKER = False
  51. class APIError(Exception):
  52. pass
  53. try:
  54. from requests.exceptions import ConnectionError as RequestsConnectionError
  55. HAS_REQUESTS = True
  56. except ImportError: # pragma: no cover
  57. HAS_REQUESTS = False
  58. class RequestsConnectionError(ConnectionError):
  59. pass
  60. try:
  61. import pywintypes
  62. PyWinTypesError = pywintypes.error
  63. except ImportError:
  64. class PyWinTypesError(Exception):
  65. pass
  66. try:
  67. from saltfactories.exceptions import ( # pylint: disable=no-name-in-module
  68. FactoryNotStarted,
  69. )
  70. raise RuntimeError("s0undt3ch, it's time to cleanup this spaghetti code!")
  71. except ImportError:
  72. pass
  73. log = logging.getLogger(__name__)
  74. @attr.s(kw_only=True)
  75. class Factory:
  76. """
  77. Base factory class
  78. Args:
  79. display_name(str):
  80. Human readable name for the factory
  81. environ(dict):
  82. A dictionary of `key`, `value` pairs to add to the environment.
  83. cwd (str):
  84. The path to the current working directory
  85. """
  86. display_name = attr.ib(default=None)
  87. cwd = attr.ib(default=None)
  88. environ = attr.ib(repr=False, default=None)
  89. def __attrs_post_init__(self):
  90. if self.environ is None:
  91. self.environ = os.environ.copy()
  92. if self.cwd is None:
  93. self.cwd = os.getcwd()
  94. def get_display_name(self):
  95. """
  96. Returns a human readable name for the factory
  97. """
  98. if self.display_name:
  99. return "{}({})".format(self.__class__.__name__, self.display_name)
  100. return self.__class__.__name__
  101. @attr.s(kw_only=True)
  102. class SubprocessFactoryBase(Factory):
  103. """
  104. Base CLI script/binary class
  105. Args:
  106. cli_script_name(str):
  107. This is the string containing the name of the binary to call on the subprocess, either the
  108. full path to it, or the basename. In case of the basename, the directory containing the
  109. basename must be in your ``$PATH`` variable.
  110. base_script_args(list or tuple):
  111. An list or tuple iterable of the base arguments to use when building the command line to
  112. launch the process
  113. slow_stop(bool):
  114. Whether to terminate the processes by sending a :py:attr:`SIGTERM` signal or by calling
  115. :py:meth:`~subprocess.Popen.terminate` on the sub-process.
  116. When code coverage is enabled, one will want `slow_stop` set to `True` so that coverage data
  117. can be written down to disk.
  118. """
  119. cli_script_name = attr.ib()
  120. base_script_args = attr.ib(default=None)
  121. slow_stop = attr.ib(default=True)
  122. _terminal = attr.ib(repr=False, init=False, default=None)
  123. _terminal_result = attr.ib(repr=False, init=False, default=None)
  124. _terminal_timeout = attr.ib(repr=False, init=False, default=None)
  125. _children = attr.ib(repr=False, init=False, default=attr.Factory(list))
  126. def __attrs_post_init__(self):
  127. super().__attrs_post_init__()
  128. if self.base_script_args is None:
  129. self.base_script_args = []
  130. def get_display_name(self):
  131. """
  132. Returns a human readable name for the factory
  133. """
  134. return self.display_name or self.cli_script_name
  135. def get_script_path(self):
  136. """
  137. Returns the path to the script to run
  138. """
  139. if os.path.isabs(self.cli_script_name):
  140. script_path = self.cli_script_name
  141. else:
  142. script_path = salt.utils.path.which(self.cli_script_name)
  143. if not script_path or not os.path.exists(script_path):
  144. pytest.fail("The CLI script {!r} does not exist".format(script_path))
  145. return script_path
  146. def get_base_script_args(self):
  147. """
  148. Returns any additional arguments to pass to the CLI script
  149. """
  150. return list(self.base_script_args)
  151. def get_script_args(self): # pylint: disable=no-self-use
  152. """
  153. Returns any additional arguments to pass to the CLI script
  154. """
  155. return []
  156. def build_cmdline(self, *args):
  157. """
  158. Construct a list of arguments to use when starting the subprocess
  159. Args:
  160. args:
  161. Additional arguments to use when starting the subprocess
  162. """
  163. return (
  164. [self.get_script_path()]
  165. + self.get_base_script_args()
  166. + self.get_script_args()
  167. + list(args)
  168. )
  169. def init_terminal(self, cmdline, **kwargs):
  170. """
  171. Instantiate a terminal with the passed cmdline and kwargs and return it.
  172. Additionally, it sets a reference to it in self._terminal and also collects
  173. an initial listing of child processes which will be used when terminating the
  174. terminal
  175. """
  176. self._terminal = Popen(cmdline, **kwargs)
  177. # Reset the previous _terminal_result if set
  178. self._terminal_result = None
  179. # A little sleep to allow the subprocess to start
  180. time.sleep(0.125)
  181. try:
  182. for child in psutil.Process(self._terminal.pid).children(recursive=True):
  183. if child not in self._children:
  184. self._children.append(child)
  185. except psutil.NoSuchProcess:
  186. # The terminal process is gone
  187. pass
  188. atexit.register(self.terminate)
  189. return self._terminal
  190. def is_running(self):
  191. """
  192. Returns true if the sub-process is alive
  193. """
  194. if not self._terminal:
  195. return False
  196. return self._terminal.poll() is None
  197. def terminate(self):
  198. """
  199. Terminate the started daemon
  200. """
  201. if self._terminal is None:
  202. return self._terminal_result
  203. atexit.unregister(self.terminate)
  204. log.info("Stopping %s", self)
  205. # Collect any child processes information before terminating the process
  206. try:
  207. for child in psutil.Process(self._terminal.pid).children(recursive=True):
  208. if child not in self._children:
  209. self._children.append(child)
  210. except psutil.NoSuchProcess:
  211. # The terminal process is gone
  212. pass
  213. # poll the terminal before trying to terminate it, running or not, so that
  214. # the right returncode is set on the popen object
  215. self._terminal.poll()
  216. # Lets log and kill any child processes left behind
  217. terminate_process(
  218. pid=self._terminal.pid,
  219. kill_children=True,
  220. children=self._children,
  221. slow_stop=self.slow_stop,
  222. )
  223. stdout, stderr = self._terminal.communicate()
  224. try:
  225. log_message = "Terminated {}.".format(self)
  226. if stdout or stderr:
  227. log_message += " Process Output:"
  228. if stdout:
  229. log_message += "\n>>>>> STDOUT >>>>>\n{}\n<<<<< STDOUT <<<<<".format(
  230. stdout.strip()
  231. )
  232. if stderr:
  233. log_message += "\n>>>>> STDERR >>>>>\n{}\n<<<<< STDERR <<<<<".format(
  234. stderr.strip()
  235. )
  236. log_message += "\n"
  237. log.info(log_message)
  238. self._terminal_result = ProcessResult(
  239. self._terminal.returncode, stdout, stderr, cmdline=self._terminal.args
  240. )
  241. return self._terminal_result
  242. finally:
  243. self._terminal = None
  244. self._terminal_timeout = None
  245. self._children = []
  246. @property
  247. def pid(self):
  248. if not self._terminal:
  249. return
  250. return self._terminal.pid
  251. def _run(self, *args, **kwargs):
  252. """
  253. Run the given command synchronously
  254. """
  255. cmdline = self.build_cmdline(*args, **kwargs)
  256. log.info("%s is running %r in CWD: %s ...", self, cmdline, self.cwd)
  257. terminal = self.init_terminal(cmdline, cwd=self.cwd, env=self.environ)
  258. try:
  259. self._children.extend(psutil.Process(self.pid).children(recursive=True))
  260. except psutil.NoSuchProcess:
  261. # Process already died?!
  262. pass
  263. return terminal
  264. @attr.s(kw_only=True)
  265. class ProcessFactory(SubprocessFactoryBase):
  266. """
  267. Base process factory
  268. Args:
  269. default_timeout(int):
  270. The maximum amount of seconds that a script should run
  271. """
  272. default_timeout = attr.ib()
  273. _terminal_timeout_set_explicitly = attr.ib(repr=False, init=False, default=False)
  274. @default_timeout.default
  275. def _set_default_timeout(self):
  276. if not sys.platform.startswith(("win", "darwin")):
  277. return 30
  278. # Windows and macOS are just slower.
  279. return 120
  280. def run(self, *args, _timeout=None, **kwargs):
  281. """
  282. Run the given command synchronously
  283. """
  284. start_time = time.time()
  285. # Build the cmdline to pass to the terminal
  286. # We set the _terminal_timeout attribute while calling build_cmdline in case it needs
  287. # access to that information to build the command line
  288. self._terminal_timeout = _timeout or self.default_timeout
  289. self._terminal_timeout_set_explicitly = _timeout is not None
  290. timeout_expire = time.time() + self._terminal_timeout
  291. running = self._run(*args, **kwargs)
  292. timmed_out = False
  293. while True:
  294. if timeout_expire < time.time():
  295. timmed_out = True
  296. break
  297. if self._terminal.poll() is not None:
  298. break
  299. time.sleep(0.25)
  300. result = self.terminate()
  301. if timmed_out:
  302. raise FactoryTimeout(
  303. "{} Failed to run: {}; Error: Timed out after {:.2f} seconds!".format(
  304. self, result.cmdline, time.time() - start_time
  305. ),
  306. stdout=result.stdout,
  307. stderr=result.stderr,
  308. cmdline=result.cmdline,
  309. exitcode=result.exitcode,
  310. )
  311. cmdline = result.cmdline
  312. exitcode = result.exitcode
  313. stdout, stderr, json_out = self.process_output(
  314. result.stdout, result.stderr, cmdline=cmdline
  315. )
  316. log.info(
  317. "%s completed %r in CWD: %s after %.2f seconds",
  318. self,
  319. cmdline,
  320. self.cwd,
  321. time.time() - start_time,
  322. )
  323. return ShellResult(exitcode, stdout, stderr, json=json_out, cmdline=cmdline)
  324. def process_output(self, stdout, stderr, cmdline=None):
  325. if stdout:
  326. try:
  327. json_out = json.loads(stdout)
  328. except ValueError:
  329. log.debug(
  330. "%s failed to load JSON from the following output:\n%r",
  331. self,
  332. stdout,
  333. )
  334. json_out = None
  335. else:
  336. json_out = None
  337. return stdout, stderr, json_out
  338. @attr.s(kw_only=True)
  339. class DaemonFactory(SubprocessFactoryBase):
  340. """
  341. Base daemon factory
  342. """
  343. check_ports = attr.ib(default=None)
  344. factories_manager = attr.ib(repr=False, hash=False, default=None)
  345. start_timeout = attr.ib(repr=False)
  346. max_start_attempts = attr.ib(repr=False, default=3)
  347. before_start_callbacks = attr.ib(repr=False, hash=False, default=attr.Factory(list))
  348. before_terminate_callbacks = attr.ib(
  349. repr=False, hash=False, default=attr.Factory(list)
  350. )
  351. after_start_callbacks = attr.ib(repr=False, hash=False, default=attr.Factory(list))
  352. after_terminate_callbacks = attr.ib(
  353. repr=False, hash=False, default=attr.Factory(list)
  354. )
  355. extra_cli_arguments_after_first_start_failure = attr.ib(
  356. hash=False, default=attr.Factory(list)
  357. )
  358. listen_ports = attr.ib(
  359. init=False, repr=False, hash=False, default=attr.Factory(list)
  360. )
  361. def __attrs_post_init__(self):
  362. super().__attrs_post_init__()
  363. if self.check_ports and not isinstance(self.check_ports, (list, tuple)):
  364. self.check_ports = [self.check_ports]
  365. if self.check_ports:
  366. self.listen_ports.extend(self.check_ports)
  367. self.register_after_start_callback(self._add_factory_to_stats_processes)
  368. self.register_after_terminate_callback(
  369. self._terminate_processes_matching_listen_ports
  370. )
  371. self.register_after_terminate_callback(
  372. self._remove_factory_from_stats_processes
  373. )
  374. def register_before_start_callback(self, callback, *args, **kwargs):
  375. self.before_start_callbacks.append((callback, args, kwargs))
  376. def register_before_terminate_callback(self, callback, *args, **kwargs):
  377. self.before_terminate_callbacks.append((callback, args, kwargs))
  378. def register_after_start_callback(self, callback, *args, **kwargs):
  379. self.after_start_callbacks.append((callback, args, kwargs))
  380. def register_after_terminate_callback(self, callback, *args, **kwargs):
  381. self.after_terminate_callbacks.append((callback, args, kwargs))
  382. def get_check_ports(self):
  383. """
  384. Return a list of ports to check against to ensure the daemon is running
  385. """
  386. return self.check_ports or []
  387. def _format_callback(self, callback, args, kwargs):
  388. callback_str = "{}(".format(callback.__name__)
  389. if args:
  390. callback_str += ", ".join([repr(arg) for arg in args])
  391. if kwargs:
  392. callback_str += ", ".join(
  393. ["{}={!r}".format(k, v) for (k, v) in kwargs.items()]
  394. )
  395. callback_str += ")"
  396. return callback_str
  397. def start(self, *extra_cli_arguments, max_start_attempts=None, start_timeout=None):
  398. """
  399. Start the daemon
  400. """
  401. if self.is_running():
  402. log.warning("%s is already running.", self)
  403. return True
  404. process_running = False
  405. start_time = time.time()
  406. start_attempts = max_start_attempts or self.max_start_attempts
  407. current_attempt = 0
  408. run_arguments = list(extra_cli_arguments)
  409. while True:
  410. if process_running:
  411. break
  412. current_attempt += 1
  413. if current_attempt > start_attempts:
  414. break
  415. log.info(
  416. "Starting %s. Attempt: %d of %d", self, current_attempt, start_attempts
  417. )
  418. for callback, args, kwargs in self.before_start_callbacks:
  419. try:
  420. callback(*args, **kwargs)
  421. except Exception as exc: # pylint: disable=broad-except
  422. log.info(
  423. "Exception raised when running %s: %s",
  424. self._format_callback(callback, args, kwargs),
  425. exc,
  426. exc_info=True,
  427. )
  428. current_start_time = time.time()
  429. start_running_timeout = current_start_time + (
  430. start_timeout or self.start_timeout
  431. )
  432. if (
  433. current_attempt > 1
  434. and self.extra_cli_arguments_after_first_start_failure
  435. ):
  436. run_arguments = list(extra_cli_arguments) + list(
  437. self.extra_cli_arguments_after_first_start_failure
  438. )
  439. self._run(*run_arguments)
  440. if not self.is_running():
  441. # A little breathe time to allow the process to start if not started already
  442. time.sleep(0.5)
  443. while time.time() <= start_running_timeout:
  444. if not self.is_running():
  445. log.warning("%s is no longer running", self)
  446. self.terminate()
  447. break
  448. try:
  449. if (
  450. self.run_start_checks(current_start_time, start_running_timeout)
  451. is False
  452. ):
  453. time.sleep(1)
  454. continue
  455. except FactoryNotStarted:
  456. self.terminate()
  457. break
  458. log.info(
  459. "The %s factory is running after %d attempts. Took %1.2f seconds",
  460. self,
  461. current_attempt,
  462. time.time() - start_time,
  463. )
  464. process_running = True
  465. break
  466. else:
  467. # The factory failed to confirm it's running status
  468. self.terminate()
  469. if process_running:
  470. for callback, args, kwargs in self.after_start_callbacks:
  471. try:
  472. callback(*args, **kwargs)
  473. except Exception as exc: # pylint: disable=broad-except
  474. log.info(
  475. "Exception raised when running %s: %s",
  476. self._format_callback(callback, args, kwargs),
  477. exc,
  478. exc_info=True,
  479. )
  480. return process_running
  481. result = self.terminate()
  482. raise FactoryNotStarted(
  483. "The {} factory has failed to confirm running status after {} attempts, which "
  484. "took {:.2f} seconds".format(
  485. self, current_attempt - 1, time.time() - start_time,
  486. ),
  487. stdout=result.stdout,
  488. stderr=result.stderr,
  489. exitcode=result.exitcode,
  490. )
  491. def started(
  492. self, *extra_cli_arguments, max_start_attempts=None, start_timeout=None
  493. ):
  494. """
  495. Start the daemon and return it's instance so it can be used as a context manager
  496. """
  497. self.start(
  498. *extra_cli_arguments,
  499. max_start_attempts=max_start_attempts,
  500. start_timeout=start_timeout
  501. )
  502. return self
  503. def terminate(self):
  504. if self._terminal_result is not None:
  505. # This factory has already been terminated
  506. return self._terminal_result
  507. for callback, args, kwargs in self.before_terminate_callbacks:
  508. try:
  509. callback(*args, **kwargs)
  510. except Exception as exc: # pylint: disable=broad-except
  511. log.info(
  512. "Exception raised when running %s: %s",
  513. self._format_callback(callback, args, kwargs),
  514. exc,
  515. exc_info=True,
  516. )
  517. try:
  518. return super().terminate()
  519. finally:
  520. for callback, args, kwargs in self.after_terminate_callbacks:
  521. try:
  522. callback(*args, **kwargs)
  523. except Exception as exc: # pylint: disable=broad-except
  524. log.info(
  525. "Exception raised when running %s: %s",
  526. self._format_callback(callback, args, kwargs),
  527. exc,
  528. exc_info=True,
  529. )
  530. def run_start_checks(self, started_at, timeout_at):
  531. check_ports = set(self.get_check_ports())
  532. if not check_ports:
  533. log.debug("No ports to check connection to for %s", self)
  534. return True
  535. checks_start_time = time.time()
  536. while time.time() <= timeout_at:
  537. if not self.is_running():
  538. raise FactoryNotStarted("{} is no longer running".format(self))
  539. if not check_ports:
  540. break
  541. check_ports -= ports.get_connectable_ports(check_ports)
  542. if check_ports:
  543. time.sleep(0.5)
  544. else:
  545. log.error(
  546. "Failed to check ports after %1.2f seconds for %s",
  547. time.time() - checks_start_time,
  548. self,
  549. )
  550. return False
  551. log.debug(
  552. "Successfuly connected to all ports(%s) for %s",
  553. set(self.get_check_ports()),
  554. self,
  555. )
  556. return True
  557. def _add_factory_to_stats_processes(self):
  558. if (
  559. self.factories_manager
  560. and self.factories_manager.stats_processes is not None
  561. ):
  562. display_name = self.get_display_name()
  563. self.factories_manager.stats_processes[display_name] = psutil.Process(
  564. self.pid
  565. )
  566. def _remove_factory_from_stats_processes(self):
  567. if (
  568. self.factories_manager
  569. and self.factories_manager.stats_processes is not None
  570. ):
  571. display_name = self.get_display_name()
  572. self.factories_manager.stats_processes.pop(display_name, None)
  573. def _terminate_processes_matching_listen_ports(self):
  574. if not self.listen_ports:
  575. return
  576. # If any processes were not terminated and are listening on the ports
  577. # we have set on listen_ports, terminate those processes.
  578. found_processes = []
  579. for process in psutil.process_iter(["connections"]):
  580. try:
  581. for connection in process.connections():
  582. if connection.status != psutil.CONN_LISTEN:
  583. # We only care about listening services
  584. continue
  585. if connection.laddr.port in self.check_ports:
  586. found_processes.append(process)
  587. # We already found one connection, no need to check the others
  588. break
  589. except psutil.AccessDenied:
  590. # We've been denied access to this process connections. Carry on.
  591. continue
  592. if found_processes:
  593. log.debug(
  594. "The following processes were found listening on ports %s: %s",
  595. ", ".join([str(port) for port in self.listen_ports]),
  596. found_processes,
  597. )
  598. terminate_process_list(found_processes, kill=True, slow_stop=False)
  599. else:
  600. log.debug(
  601. "No astray processes were found listening on ports: %s",
  602. ", ".join([str(port) for port in self.listen_ports]),
  603. )
  604. def __enter__(self):
  605. if not self.is_running():
  606. raise RuntimeError(
  607. "Factory not yet started. Perhaps you're after something like:\n\n"
  608. "with {}.started() as factory:\n"
  609. " yield factory".format(self.__class__.__name__)
  610. )
  611. return self
  612. def __exit__(self, *exc):
  613. return self.terminate()
  614. @attr.s(kw_only=True)
  615. class SaltFactory:
  616. """
  617. Base factory for salt cli's and daemon's
  618. Args:
  619. config(dict):
  620. The Salt config dictionary
  621. python_executable(str):
  622. The path to the python executable to use
  623. """
  624. id = attr.ib(default=None, init=False)
  625. config = attr.ib(repr=False)
  626. config_dir = attr.ib(init=False, default=None)
  627. config_file = attr.ib(init=False, default=None)
  628. python_executable = attr.ib(default=None)
  629. display_name = attr.ib(init=False, default=None)
  630. def __attrs_post_init__(self):
  631. if self.python_executable is None:
  632. self.python_executable = sys.executable
  633. # We really do not want buffered output
  634. self.environ.setdefault("PYTHONUNBUFFERED", "1")
  635. # Don't write .pyc files or create them in __pycache__ directories
  636. self.environ.setdefault("PYTHONDONTWRITEBYTECODE", "1")
  637. self.config_file = self.config["conf_file"]
  638. self.config_dir = os.path.dirname(self.config_file)
  639. self.id = self.config["id"]
  640. self.config = freeze(self.config)
  641. def get_display_name(self):
  642. """
  643. Returns a human readable name for the factory
  644. """
  645. if self.display_name is None:
  646. self.display_name = "{}(id={!r})".format(self.__class__.__name__, self.id)
  647. return super().get_display_name()
  648. @attr.s(kw_only=True)
  649. class SaltCliFactory(SaltFactory, ProcessFactory):
  650. """
  651. Base factory for salt cli's
  652. Args:
  653. hard_crash(bool):
  654. Pass ``--hard-crash`` to Salt's CLI's
  655. """
  656. hard_crash = attr.ib(repr=False, default=False)
  657. # Override the following to default to non-mandatory and to None
  658. display_name = attr.ib(init=False, default=None)
  659. _minion_tgt = attr.ib(repr=False, init=False, default=None)
  660. __cli_timeout_supported__ = attr.ib(repr=False, init=False, default=False)
  661. __cli_log_level_supported__ = attr.ib(repr=False, init=False, default=True)
  662. __cli_output_supported__ = attr.ib(repr=False, init=False, default=True)
  663. # Override the following to default to non-mandatory and to None
  664. display_name = attr.ib(init=False, default=None)
  665. def __attrs_post_init__(self):
  666. ProcessFactory.__attrs_post_init__(self)
  667. SaltFactory.__attrs_post_init__(self)
  668. def get_script_args(self):
  669. """
  670. Returns any additional arguments to pass to the CLI script
  671. """
  672. if not self.hard_crash:
  673. return super().get_script_args()
  674. return ["--hard-crash"]
  675. def get_minion_tgt(self, minion_tgt=None):
  676. return minion_tgt
  677. def build_cmdline(
  678. self, *args, minion_tgt=None, **kwargs
  679. ): # pylint: disable=arguments-differ
  680. """
  681. Construct a list of arguments to use when starting the subprocess
  682. Args:
  683. args:
  684. Additional arguments to use when starting the subprocess
  685. kwargs:
  686. Keyword arguments will be converted into ``key=value`` pairs to be consumed by the salt CLI's
  687. minion_tgt(str):
  688. The minion ID to target
  689. """
  690. log.debug(
  691. "Building cmdline. Minion target: %s; Input args: %s; Input kwargs: %s;",
  692. minion_tgt,
  693. args,
  694. kwargs,
  695. )
  696. minion_tgt = self._minion_tgt = self.get_minion_tgt(minion_tgt=minion_tgt)
  697. cmdline = []
  698. args = list(args)
  699. # Handle the config directory flag
  700. for arg in args:
  701. if arg.startswith("--config-dir="):
  702. break
  703. if arg in ("-c", "--config-dir"):
  704. break
  705. else:
  706. cmdline.append("--config-dir={}".format(self.config_dir))
  707. # Handle the timeout CLI flag, if supported
  708. if self.__cli_timeout_supported__:
  709. salt_cli_timeout_next = False
  710. for arg in args:
  711. if arg.startswith("--timeout="):
  712. # Let's actually change the _terminal_timeout value which is used to
  713. # calculate when the run() method should actually timeout
  714. if self._terminal_timeout_set_explicitly is False:
  715. salt_cli_timeout = arg.split("--timeout=")[-1]
  716. try:
  717. self._terminal_timeout = int(salt_cli_timeout) + 5
  718. except ValueError:
  719. # Not a number? Let salt do it's error handling
  720. pass
  721. break
  722. if salt_cli_timeout_next:
  723. if self._terminal_timeout_set_explicitly is False:
  724. try:
  725. self._terminal_timeout = int(arg) + 5
  726. except ValueError:
  727. # Not a number? Let salt do it's error handling
  728. pass
  729. break
  730. if arg == "-t" or arg.startswith("--timeout"):
  731. salt_cli_timeout_next = True
  732. continue
  733. else:
  734. salt_cli_timeout = self._terminal_timeout
  735. if salt_cli_timeout and self._terminal_timeout_set_explicitly is False:
  736. # Shave off a few seconds so that the salt command times out before the terminal does
  737. salt_cli_timeout -= 5
  738. if salt_cli_timeout:
  739. # If it's still a positive number, add it to the salt command CLI flags
  740. cmdline.append("--timeout={}".format(salt_cli_timeout))
  741. # Handle the output flag
  742. if self.__cli_output_supported__:
  743. for arg in args:
  744. if arg in ("--out", "--output"):
  745. break
  746. if arg.startswith(("--out=", "--output=")):
  747. break
  748. else:
  749. # No output was passed, the default output is JSON
  750. cmdline.append("--out=json")
  751. if self.__cli_log_level_supported__:
  752. # Handle the logging flag
  753. for arg in args:
  754. if arg in ("-l", "--log-level"):
  755. break
  756. if arg.startswith("--log-level="):
  757. break
  758. else:
  759. # Default to being quiet on console output
  760. cmdline.append("--log-level=quiet")
  761. if minion_tgt:
  762. cmdline.append(minion_tgt)
  763. # Add the remaining args
  764. cmdline.extend(args)
  765. # Keyword arguments get passed as KEY=VALUE pairs to the CLI
  766. for key in kwargs:
  767. value = kwargs[key]
  768. if not isinstance(value, str):
  769. value = json.dumps(value)
  770. cmdline.append("{}={}".format(key, value))
  771. cmdline = super().build_cmdline(*cmdline)
  772. if self.python_executable:
  773. if cmdline[0] != self.python_executable:
  774. cmdline.insert(0, self.python_executable)
  775. log.debug("Built cmdline: %s", cmdline)
  776. return cmdline
  777. def process_output(self, stdout, stderr, cmdline=None):
  778. stdout, stderr, json_out = super().process_output(
  779. stdout, stderr, cmdline=cmdline
  780. )
  781. if (
  782. self.__cli_output_supported__
  783. and json_out
  784. and isinstance(json_out, str)
  785. and "--out=json" in cmdline
  786. ):
  787. # Sometimes the parsed JSON is just a string, for example:
  788. # OUTPUT: '"The salt master could not be contacted. Is master running?"\n'
  789. # LOADED JSON: 'The salt master could not be contacted. Is master running?'
  790. #
  791. # In this case, we assign the loaded JSON to stdout and reset json_out
  792. stdout = json_out
  793. json_out = None
  794. if self.__cli_output_supported__ and json_out and self._minion_tgt:
  795. try:
  796. json_out = json_out[self._minion_tgt]
  797. except KeyError:
  798. pass
  799. return stdout, stderr, json_out
  800. @attr.s(kw_only=True)
  801. class SaltDaemonFactory(SaltFactory, DaemonFactory):
  802. """
  803. Base factory for salt daemon's
  804. """
  805. display_name = attr.ib(init=False, default=None)
  806. event_listener = attr.ib(repr=False, default=None)
  807. started_at = attr.ib(repr=False, default=None)
  808. def __attrs_post_init__(self):
  809. DaemonFactory.__attrs_post_init__(self)
  810. SaltFactory.__attrs_post_init__(self)
  811. for arg in self.extra_cli_arguments_after_first_start_failure:
  812. if arg in ("-l", "--log-level"):
  813. break
  814. if arg.startswith("--log-level="):
  815. break
  816. else:
  817. self.extra_cli_arguments_after_first_start_failure.append(
  818. "--log-level=debug"
  819. )
  820. @classmethod
  821. def configure(
  822. cls,
  823. factories_manager,
  824. daemon_id,
  825. root_dir=None,
  826. config_defaults=None,
  827. config_overrides=None,
  828. **configure_kwargs
  829. ):
  830. return cls._configure(
  831. factories_manager,
  832. daemon_id,
  833. root_dir=root_dir,
  834. config_defaults=config_defaults,
  835. config_overrides=config_overrides,
  836. **configure_kwargs
  837. )
  838. @classmethod
  839. def _configure(
  840. cls,
  841. factories_manager,
  842. daemon_id,
  843. root_dir=None,
  844. config_defaults=None,
  845. config_overrides=None,
  846. ):
  847. raise NotImplementedError
  848. @classmethod
  849. def verify_config(cls, config):
  850. salt.utils.verify.verify_env(
  851. cls._get_verify_config_entries(config),
  852. salt.utils.user.get_user(),
  853. pki_dir=config.get("pki_dir") or "",
  854. root_dir=config["root_dir"],
  855. )
  856. @classmethod
  857. def _get_verify_config_entries(cls, config):
  858. raise NotImplementedError
  859. @classmethod
  860. def write_config(cls, config):
  861. config_file = config.pop("conf_file")
  862. log.debug(
  863. "Writing to configuration file %s. Configuration:\n%s",
  864. config_file,
  865. pprint.pformat(config),
  866. )
  867. # Write down the computed configuration into the config file
  868. with salt.utils.files.fopen(config_file, "w") as wfh:
  869. salt.utils.yaml.safe_dump(config, wfh, default_flow_style=False)
  870. loaded_config = cls.load_config(config_file, config)
  871. cls.verify_config(loaded_config)
  872. return loaded_config
  873. @classmethod
  874. def load_config(cls, config_file, config):
  875. """
  876. Should return the configuration as the daemon would have loaded after
  877. parsing the CLI
  878. """
  879. raise NotImplementedError
  880. def get_check_events(self):
  881. """
  882. Return a list of tuples in the form of `(master_id, event_tag)` check against to ensure the daemon is running
  883. """
  884. raise NotImplementedError
  885. def run_start_checks(self, started_at, timeout_at):
  886. if not super().run_start_checks(started_at, timeout_at):
  887. return False
  888. if not self.event_listener:
  889. log.debug(
  890. "The 'event_listener' attribute is not set. Not checking events..."
  891. )
  892. return True
  893. check_events = set(self.get_check_events())
  894. if not check_events:
  895. log.debug("No events to listen to for %s", self)
  896. return True
  897. checks_start_time = time.time()
  898. while time.time() <= timeout_at:
  899. if not self.is_running():
  900. raise FactoryNotStarted("{} is no longer running".format(self))
  901. if not check_events:
  902. break
  903. check_events -= self.event_listener.get_events(
  904. check_events, after_time=started_at
  905. )
  906. if check_events:
  907. time.sleep(0.5)
  908. else:
  909. log.error(
  910. "Failed to check events after %1.2f seconds for %s",
  911. time.time() - checks_start_time,
  912. self,
  913. )
  914. return False
  915. log.debug(
  916. "Successfuly checked for all events(%s) for %s",
  917. set(self.get_check_events()),
  918. self,
  919. )
  920. return True
  921. def build_cmdline(self, *args):
  922. _args = []
  923. # Handle the config directory flag
  924. for arg in args:
  925. if arg.startswith("--config-dir="):
  926. break
  927. if arg in ("-c", "--config-dir"):
  928. break
  929. else:
  930. _args.append("--config-dir={}".format(self.config_dir))
  931. # Handle the logging flag
  932. for arg in args:
  933. if arg in ("-l", "--log-level"):
  934. break
  935. if arg.startswith("--log-level="):
  936. break
  937. else:
  938. # Default to being quiet on console output
  939. _args.append("--log-level=quiet")
  940. cmdline = super().build_cmdline(*(_args + list(args)))
  941. if self.python_executable:
  942. if cmdline[0] != self.python_executable:
  943. cmdline.insert(0, self.python_executable)
  944. return cmdline
  945. @attr.s(kw_only=True, slots=True)
  946. class SaltCallCliFactory(SaltCliFactory):
  947. """
  948. salt-call CLI factory
  949. """
  950. __cli_timeout_supported__ = attr.ib(repr=False, init=False, default=True)
  951. def get_minion_tgt(self, minion_tgt=None):
  952. return None
  953. def process_output(self, stdout, stderr, cmdline=None):
  954. # Under salt-call, the minion target is always "local"
  955. self._minion_tgt = "local"
  956. return super().process_output(stdout, stderr, cmdline=cmdline)
  957. @attr.s(kw_only=True, slots=True)
  958. class SaltMinionFactory(SaltDaemonFactory):
  959. @classmethod
  960. def default_config(
  961. cls,
  962. root_dir,
  963. minion_id,
  964. config_defaults=None,
  965. config_overrides=None,
  966. master=None,
  967. ):
  968. if config_defaults is None:
  969. config_defaults = {}
  970. master_id = master_port = None
  971. if master is not None:
  972. master_id = master.id
  973. master_port = master.config["ret_port"]
  974. # Match transport if not set
  975. config_defaults.setdefault("transport", master.config["transport"])
  976. conf_dir = root_dir / "conf"
  977. conf_dir.mkdir(parents=True, exist_ok=True)
  978. conf_file = str(conf_dir / "minion")
  979. _config_defaults = {
  980. "id": minion_id,
  981. "conf_file": conf_file,
  982. "root_dir": str(root_dir),
  983. "interface": "127.0.0.1",
  984. "master": "127.0.0.1",
  985. "master_port": master_port or ports.get_unused_localhost_port(),
  986. "tcp_pub_port": ports.get_unused_localhost_port(),
  987. "tcp_pull_port": ports.get_unused_localhost_port(),
  988. "pidfile": "run/minion.pid",
  989. "pki_dir": "pki",
  990. "cachedir": "cache",
  991. "sock_dir": "run/minion",
  992. "log_file": "logs/minion.log",
  993. "log_level_logfile": "debug",
  994. "loop_interval": 0.05,
  995. "log_fmt_console": "%(asctime)s,%(msecs)03.0f [%(name)-17s:%(lineno)-4d][%(levelname)-8s][%(processName)18s(%(process)d)] %(message)s",
  996. "log_fmt_logfile": "[%(asctime)s,%(msecs)03.0f][%(name)-17s:%(lineno)-4d][%(levelname)-8s][%(processName)18s(%(process)d)] %(message)s",
  997. "pytest-minion": {
  998. "master-id": master_id,
  999. "log": {"prefix": "{}(id={!r})".format(cls.__name__, minion_id)},
  1000. },
  1001. "acceptance_wait_time": 0.5,
  1002. "acceptance_wait_time_max": 5,
  1003. }
  1004. # Merge in the initial default options with the internal _config_defaults
  1005. salt.utils.dictupdate.update(
  1006. config_defaults, _config_defaults, merge_lists=True
  1007. )
  1008. if config_overrides:
  1009. # Merge in the default options with the minion_config_overrides
  1010. salt.utils.dictupdate.update(
  1011. config_defaults, config_overrides, merge_lists=True
  1012. )
  1013. return config_defaults
  1014. @classmethod
  1015. def _configure( # pylint: disable=arguments-differ
  1016. cls,
  1017. factories_manager,
  1018. daemon_id,
  1019. root_dir=None,
  1020. config_defaults=None,
  1021. config_overrides=None,
  1022. master=None,
  1023. ):
  1024. return cls.default_config(
  1025. root_dir,
  1026. daemon_id,
  1027. config_defaults=config_defaults,
  1028. config_overrides=config_overrides,
  1029. master=master,
  1030. )
  1031. @classmethod
  1032. def _get_verify_config_entries(cls, config):
  1033. # verify env to make sure all required directories are created and have the
  1034. # right permissions
  1035. pki_dir = pathlib.Path(config["pki_dir"])
  1036. return [
  1037. str(pki_dir / "minions"),
  1038. str(pki_dir / "minions_pre"),
  1039. str(pki_dir / "minions_rejected"),
  1040. str(pki_dir / "accepted"),
  1041. str(pki_dir / "rejected"),
  1042. str(pki_dir / "pending"),
  1043. str(pathlib.Path(config["log_file"]).parent),
  1044. str(pathlib.Path(config["cachedir"]) / "proc"),
  1045. # config['extension_modules'],
  1046. config["sock_dir"],
  1047. ]
  1048. @classmethod
  1049. def load_config(cls, config_file, config):
  1050. return salt.config.minion_config(
  1051. config_file, minion_id=config["id"], cache_minion_id=True
  1052. )
  1053. def get_script_args(self):
  1054. args = super().get_script_args()
  1055. if sys.platform.startswith("win") is False:
  1056. args.append("--disable-keepalive")
  1057. return args
  1058. def get_check_events(self):
  1059. """
  1060. Return a list of tuples in the form of `(master_id, event_tag)` check against to ensure the daemon is running
  1061. """
  1062. pytest_config = self.config["pytest-{}".format(self.config["__role"])]
  1063. if not pytest_config.get("master-id"):
  1064. log.warning(
  1065. "Will not be able to check for start events for %s since it's missing the 'master-id' key "
  1066. "in the 'pytest-%s' dictionary, or it's value is None.",
  1067. self,
  1068. self.config["__role"],
  1069. )
  1070. else:
  1071. yield pytest_config["master-id"], "salt/{role}/{id}/start".format(
  1072. role=self.config["__role"], id=self.id
  1073. )
  1074. def get_salt_call_cli(
  1075. self, factory_class=SaltCallCliFactory, **factory_class_kwargs
  1076. ):
  1077. """
  1078. Return a `salt-call` CLI process for this minion instance
  1079. """
  1080. script_path = cli_scripts.generate_script(
  1081. self.factories_manager.scripts_dir,
  1082. "salt-call",
  1083. code_dir=self.factories_manager.code_dir,
  1084. inject_coverage=self.factories_manager.inject_coverage,
  1085. inject_sitecustomize=self.factories_manager.inject_sitecustomize,
  1086. )
  1087. return factory_class(
  1088. cli_script_name=script_path,
  1089. config=self.config.copy(),
  1090. **factory_class_kwargs
  1091. )
  1092. @attr.s(kw_only=True)
  1093. class ContainerFactory(Factory):
  1094. image = attr.ib()
  1095. name = attr.ib(default=None)
  1096. check_ports = attr.ib(default=None)
  1097. docker_client = attr.ib(repr=False, default=None)
  1098. container_run_kwargs = attr.ib(repr=False, default=attr.Factory(dict))
  1099. container = attr.ib(init=False, default=None, repr=False)
  1100. start_timeout = attr.ib(repr=False, default=30)
  1101. max_start_attempts = attr.ib(repr=False, default=3)
  1102. before_start_callbacks = attr.ib(repr=False, hash=False, default=attr.Factory(list))
  1103. before_terminate_callbacks = attr.ib(
  1104. repr=False, hash=False, default=attr.Factory(list)
  1105. )
  1106. after_start_callbacks = attr.ib(repr=False, hash=False, default=attr.Factory(list))
  1107. after_terminate_callbacks = attr.ib(
  1108. repr=False, hash=False, default=attr.Factory(list)
  1109. )
  1110. _terminate_result = attr.ib(repr=False, hash=False, init=False, default=None)
  1111. def __attrs_post_init__(self):
  1112. super().__attrs_post_init__()
  1113. if self.name is None:
  1114. self.name = random_string("factories-")
  1115. if self.docker_client is None:
  1116. if not HAS_DOCKER:
  1117. raise RuntimeError("The docker python library was not found installed")
  1118. if not HAS_REQUESTS:
  1119. raise RuntimeError(
  1120. "The requests python library was not found installed"
  1121. )
  1122. self.docker_client = docker.from_env()
  1123. def _format_callback(self, callback, args, kwargs):
  1124. callback_str = "{}(".format(callback.__name__)
  1125. if args:
  1126. callback_str += ", ".join(args)
  1127. if kwargs:
  1128. callback_str += ", ".join(
  1129. ["{}={!r}".format(k, v) for (k, v) in kwargs.items()]
  1130. )
  1131. callback_str += ")"
  1132. return callback_str
  1133. def register_before_start_callback(self, callback, *args, **kwargs):
  1134. self.before_start_callbacks.append((callback, args, kwargs))
  1135. def register_before_terminate_callback(self, callback, *args, **kwargs):
  1136. self.before_terminate_callbacks.append((callback, args, kwargs))
  1137. def register_after_start_callback(self, callback, *args, **kwargs):
  1138. self.after_start_callbacks.append((callback, args, kwargs))
  1139. def register_after_terminate_callback(self, callback, *args, **kwargs):
  1140. self.after_terminate_callbacks.append((callback, args, kwargs))
  1141. def start(self, *command, max_start_attempts=None, start_timeout=None):
  1142. if self.is_running():
  1143. log.warning("%s is already running.", self)
  1144. return True
  1145. connectable = ContainerFactory.client_connectable(self.docker_client)
  1146. if connectable is not True:
  1147. self.terminate()
  1148. raise RuntimeError(connectable)
  1149. self._terminate_result = None
  1150. atexit.register(self.terminate)
  1151. factory_started = False
  1152. for callback, args, kwargs in self.before_start_callbacks:
  1153. try:
  1154. callback(*args, **kwargs)
  1155. except Exception as exc: # pylint: disable=broad-except
  1156. log.info(
  1157. "Exception raised when running %s: %s",
  1158. self._format_callback(callback, args, kwargs),
  1159. exc,
  1160. exc_info=True,
  1161. )
  1162. start_time = time.time()
  1163. start_attempts = max_start_attempts or self.max_start_attempts
  1164. current_attempt = 0
  1165. while current_attempt <= start_attempts:
  1166. current_attempt += 1
  1167. if factory_started:
  1168. break
  1169. log.info(
  1170. "Starting %s. Attempt: %d of %d", self, current_attempt, start_attempts
  1171. )
  1172. current_start_time = time.time()
  1173. start_running_timeout = current_start_time + (
  1174. start_timeout or self.start_timeout
  1175. )
  1176. # Start the container
  1177. self.container = self.docker_client.containers.run(
  1178. self.image,
  1179. name=self.name,
  1180. detach=True,
  1181. stdin_open=True,
  1182. command=list(command) or None,
  1183. **self.container_run_kwargs
  1184. )
  1185. while time.time() <= start_running_timeout:
  1186. # Don't know why, but if self.container wasn't previously in a running
  1187. # state, and now it is, we have to re-set the self.container attribute
  1188. # so that it gives valid status information
  1189. self.container = self.docker_client.containers.get(self.name)
  1190. if self.container.status != "running":
  1191. time.sleep(0.25)
  1192. continue
  1193. self.container = self.docker_client.containers.get(self.name)
  1194. logs = self.container.logs(stdout=True, stderr=True, stream=False)
  1195. if isinstance(logs, bytes):
  1196. stdout = logs.decode()
  1197. stderr = None
  1198. else:
  1199. stdout = logs[0].decode()
  1200. stderr = logs[1].decode()
  1201. log.warning("Running Container Logs:\n%s\n%s", stdout, stderr)
  1202. # If we reached this far it means that we got the running status above, and
  1203. # now that the container has started, run start checks
  1204. try:
  1205. if (
  1206. self.run_container_start_checks(
  1207. current_start_time, start_running_timeout
  1208. )
  1209. is False
  1210. ):
  1211. time.sleep(0.5)
  1212. continue
  1213. except FactoryNotStarted:
  1214. self.terminate()
  1215. break
  1216. log.info(
  1217. "The %s factory is running after %d attempts. Took %1.2f seconds",
  1218. self,
  1219. current_attempt,
  1220. time.time() - start_time,
  1221. )
  1222. factory_started = True
  1223. break
  1224. else:
  1225. # We reached start_running_timeout, re-try
  1226. try:
  1227. self.container.remove(force=True)
  1228. self.container.wait()
  1229. except docker.errors.NotFound:
  1230. pass
  1231. self.container = None
  1232. else:
  1233. # The factory failed to confirm it's running status
  1234. self.terminate()
  1235. if factory_started:
  1236. for callback, args, kwargs in self.after_start_callbacks:
  1237. try:
  1238. callback(*args, **kwargs)
  1239. except Exception as exc: # pylint: disable=broad-except
  1240. log.info(
  1241. "Exception raised when running %s: %s",
  1242. self._format_callback(callback, args, kwargs),
  1243. exc,
  1244. exc_info=True,
  1245. )
  1246. # TODO: Add containers to the processes stats?!
  1247. # if self.factories_manager and self.factories_manager.stats_processes is not None:
  1248. # self.factories_manager.stats_processes[self.get_display_name()] = psutil.Process(
  1249. # self.pid
  1250. # )
  1251. return factory_started
  1252. result = self.terminate()
  1253. raise FactoryNotStarted(
  1254. "The {} factory has failed to confirm running status after {} attempts, which "
  1255. "took {:.2f} seconds({:.2f} seconds each)".format(
  1256. self,
  1257. current_attempt - 1,
  1258. time.time() - start_time,
  1259. start_timeout or self.start_timeout,
  1260. ),
  1261. stdout=result.stdout,
  1262. stderr=result.stderr,
  1263. exitcode=result.exitcode,
  1264. )
  1265. def started(self, *command, max_start_attempts=None, start_timeout=None):
  1266. """
  1267. Start the container and return it's instance so it can be used as a context manager
  1268. """
  1269. self.start(
  1270. *command, max_start_attempts=max_start_attempts, start_timeout=start_timeout
  1271. )
  1272. return self
  1273. def terminate(self):
  1274. if self._terminate_result is not None:
  1275. # The factory is already terminated
  1276. return self._terminate_result
  1277. atexit.unregister(self.terminate)
  1278. for callback, args, kwargs in self.before_terminate_callbacks:
  1279. try:
  1280. callback(*args, **kwargs)
  1281. except Exception as exc: # pylint: disable=broad-except
  1282. log.info(
  1283. "Exception raised when running %s: %s",
  1284. self._format_callback(callback, args, kwargs),
  1285. exc,
  1286. exc_info=True,
  1287. )
  1288. stdout = stderr = None
  1289. try:
  1290. if self.container is not None:
  1291. container = self.docker_client.containers.get(self.name)
  1292. logs = container.logs(stdout=True, stderr=True, stream=False)
  1293. if isinstance(logs, bytes):
  1294. stdout = logs.decode()
  1295. else:
  1296. stdout = logs[0].decode()
  1297. stderr = logs[1].decode()
  1298. log.warning("Stopped Container Logs:\n%s\n%s", stdout, stderr)
  1299. if container.status == "running":
  1300. container.remove(force=True)
  1301. container.wait()
  1302. self.container = None
  1303. except docker.errors.NotFound:
  1304. pass
  1305. finally:
  1306. for callback, args, kwargs in self.after_terminate_callbacks:
  1307. try:
  1308. callback(*args, **kwargs)
  1309. except Exception as exc: # pylint: disable=broad-except
  1310. log.info(
  1311. "Exception raised when running %s: %s",
  1312. self._format_callback(callback, args, kwargs),
  1313. exc,
  1314. exc_info=True,
  1315. )
  1316. self._terminate_result = ProcessResult(exitcode=0, stdout=stdout, stderr=stderr)
  1317. return self._terminate_result
  1318. def get_check_ports(self):
  1319. """
  1320. Return a list of ports to check against to ensure the daemon is running
  1321. """
  1322. return self.check_ports or []
  1323. def is_running(self):
  1324. if self.container is None:
  1325. log.warning("self.container is None")
  1326. return False
  1327. self.container = self.docker_client.containers.get(self.name)
  1328. return self.container.status == "running"
  1329. def run(self, *cmd, **kwargs):
  1330. if len(cmd) == 1:
  1331. cmd = cmd[0]
  1332. log.info("%s is running %r ...", self, cmd)
  1333. # We force dmux to True so that we always get back both stdout and stderr
  1334. container = self.docker_client.containers.get(self.name)
  1335. ret = container.exec_run(cmd, demux=True, **kwargs)
  1336. exitcode = ret.exit_code
  1337. stdout = stderr = None
  1338. if ret.output:
  1339. stdout, stderr = ret.output
  1340. if stdout is not None:
  1341. stdout = stdout.decode()
  1342. if stderr is not None:
  1343. stderr = stderr.decode()
  1344. return ProcessResult(
  1345. exitcode=exitcode, stdout=stdout, stderr=stderr, cmdline=cmd
  1346. )
  1347. @staticmethod
  1348. def client_connectable(docker_client):
  1349. try:
  1350. if not docker_client.ping():
  1351. return "The docker client failed to get a ping response from the docker daemon"
  1352. return True
  1353. except (APIError, RequestsConnectionError, PyWinTypesError) as exc:
  1354. return "The docker client failed to ping the docker server: {}".format(exc)
  1355. def run_container_start_checks(self, started_at, timeout_at):
  1356. checks_start_time = time.time()
  1357. while time.time() <= timeout_at:
  1358. if not self.is_running():
  1359. raise FactoryNotStarted("{} is no longer running".format(self))
  1360. if self._container_start_checks():
  1361. break
  1362. else:
  1363. log.error(
  1364. "Failed to run container start checks after %1.2f seconds",
  1365. time.time() - checks_start_time,
  1366. )
  1367. return False
  1368. check_ports = set(self.get_check_ports())
  1369. if not check_ports:
  1370. return True
  1371. while time.time() <= timeout_at:
  1372. if not self.is_running():
  1373. raise FactoryNotStarted("{} is no longer running".format(self))
  1374. if not check_ports:
  1375. break
  1376. check_ports -= ports.get_connectable_ports(check_ports)
  1377. if check_ports:
  1378. time.sleep(0.5)
  1379. else:
  1380. log.error(
  1381. "Failed to check ports after %1.2f seconds",
  1382. time.time() - checks_start_time,
  1383. )
  1384. return False
  1385. return True
  1386. def _container_start_checks(self):
  1387. return True
  1388. def __enter__(self):
  1389. if not self.is_running():
  1390. raise RuntimeError(
  1391. "Factory not yet started. Perhaps you're after something like:\n\n"
  1392. "with {}.started() as factory:\n"
  1393. " yield factory".format(self.__class__.__name__)
  1394. )
  1395. return self
  1396. def __exit__(self, *exc):
  1397. return self.terminate()
  1398. @attr.s(kw_only=True)
  1399. class SaltDaemonContainerFactory(SaltDaemonFactory, ContainerFactory):
  1400. def __attrs_post_init__(self):
  1401. self.daemon_started = self.daemon_starting = False
  1402. if self.python_executable is None:
  1403. # Default to whatever is the default python in the container
  1404. self.python_executable = "python"
  1405. SaltDaemonFactory.__attrs_post_init__(self)
  1406. ContainerFactory.__attrs_post_init__(self)
  1407. # There are some volumes which NEED to exist on the container so
  1408. # that configs are in the right place and also our custom salt
  1409. # plugins along with the custom scripts to start the daemons.
  1410. root_dir = os.path.dirname(self.config["root_dir"])
  1411. config_dir = str(self.config_dir)
  1412. scripts_dir = str(self.factories_manager.scripts_dir)
  1413. volumes = {
  1414. root_dir: {"bind": root_dir, "mode": "z"},
  1415. scripts_dir: {"bind": scripts_dir, "mode": "z"},
  1416. config_dir: {"bind": self.config_dir, "mode": "z"},
  1417. str(CODE_ROOT_DIR): {"bind": str(CODE_ROOT_DIR), "mode": "z"},
  1418. }
  1419. if "volumes" not in self.container_run_kwargs:
  1420. self.container_run_kwargs["volumes"] = {}
  1421. self.container_run_kwargs["volumes"].update(volumes)
  1422. self.container_run_kwargs.setdefault("hostname", self.name)
  1423. self.container_run_kwargs.setdefault("auto_remove", True)
  1424. def build_cmdline(self, *args):
  1425. return ["docker", "exec", "-i", self.name] + super().build_cmdline(*args)
  1426. def start(self, *extra_cli_arguments, max_start_attempts=None, start_timeout=None):
  1427. # Start the container
  1428. ContainerFactory.start(
  1429. self, max_start_attempts=max_start_attempts, start_timeout=start_timeout
  1430. )
  1431. self.daemon_starting = True
  1432. # Now that the container is up, let's start the daemon
  1433. self.daemon_started = SaltDaemonFactory.start(
  1434. self,
  1435. *extra_cli_arguments,
  1436. max_start_attempts=max_start_attempts,
  1437. start_timeout=start_timeout
  1438. )
  1439. return self.daemon_started
  1440. def terminate(self):
  1441. self.daemon_started = self.daemon_starting = False
  1442. ret = SaltDaemonFactory.terminate(self)
  1443. ContainerFactory.terminate(self)
  1444. return ret
  1445. def is_running(self):
  1446. running = ContainerFactory.is_running(self)
  1447. if running is False:
  1448. return running
  1449. if self.daemon_starting or self.daemon_started:
  1450. return SaltDaemonFactory.is_running(self)
  1451. return running
  1452. def get_check_events(self):
  1453. """
  1454. Return a list of tuples in the form of `(master_id, event_tag)` check against to ensure the daemon is running
  1455. """
  1456. raise NotImplementedError
  1457. @attr.s(kw_only=True, slots=True)
  1458. class SaltMinionContainerFactory(SaltDaemonContainerFactory, SaltMinionFactory):
  1459. """
  1460. Salt minion daemon implementation running in a docker container
  1461. """
  1462. def get_check_events(self):
  1463. """
  1464. Return a list of tuples in the form of `(master_id, event_tag)` check against to ensure the daemon is running
  1465. """
  1466. return SaltMinionFactory.get_check_events(self)
  1467. def run_start_checks(self, started_at, timeout_at):
  1468. return SaltMinionFactory.run_start_checks(self, started_at, timeout_at)
  1469. @attr.s(kw_only=True, slots=True)
  1470. class SshdDaemonFactory(DaemonFactory):
  1471. config_dir = attr.ib()
  1472. listen_address = attr.ib(default=None)
  1473. listen_port = attr.ib(default=None)
  1474. authorized_keys = attr.ib(default=None)
  1475. sshd_config_dict = attr.ib(default=None, repr=False)
  1476. client_key = attr.ib(default=None, init=False, repr=False)
  1477. sshd_config = attr.ib(default=None, init=False)
  1478. def __attrs_post_init__(self):
  1479. if self.authorized_keys is None:
  1480. self.authorized_keys = []
  1481. if self.sshd_config_dict is None:
  1482. self.sshd_config_dict = {}
  1483. if self.listen_address is None:
  1484. self.listen_address = "127.0.0.1"
  1485. if self.listen_port is None:
  1486. self.listen_port = ports.get_unused_localhost_port()
  1487. self.check_ports = [self.listen_port]
  1488. if isinstance(self.config_dir, str):
  1489. self.config_dir = pathlib.Path(self.config_dir)
  1490. elif not isinstance(self.config_dir, pathlib.Path):
  1491. # A py local path?
  1492. self.config_dir = pathlib.Path(self.config_dir.strpath)
  1493. self.config_dir.chmod(0o0700)
  1494. authorized_keys_file = self.config_dir / "authorized_keys"
  1495. # Let's generate the client key
  1496. self.client_key = self._generate_client_ecdsa_key()
  1497. with open("{}.pub".format(self.client_key)) as rfh:
  1498. pubkey = rfh.read().strip()
  1499. log.debug("SSH client pub key: %r", pubkey)
  1500. self.authorized_keys.append(pubkey)
  1501. # Write the authorized pub keys to file
  1502. with open(str(authorized_keys_file), "w") as wfh:
  1503. wfh.write("\n".join(self.authorized_keys))
  1504. authorized_keys_file.chmod(0o0600)
  1505. with open(str(authorized_keys_file)) as rfh:
  1506. log.debug("AuthorizedKeysFile contents:\n%s", rfh.read())
  1507. _default_config = {
  1508. "ListenAddress": self.listen_address,
  1509. "PermitRootLogin": "no",
  1510. "ChallengeResponseAuthentication": "no",
  1511. "PasswordAuthentication": "no",
  1512. "PubkeyAuthentication": "yes",
  1513. "PrintMotd": "no",
  1514. "PidFile": self.config_dir / "sshd.pid",
  1515. "AuthorizedKeysFile": authorized_keys_file,
  1516. }
  1517. if self.sshd_config_dict:
  1518. _default_config.update(self.sshd_config_dict)
  1519. self.sshd_config = _default_config
  1520. self._write_config()
  1521. super().__attrs_post_init__()
  1522. def get_base_script_args(self):
  1523. """
  1524. Returns any additional arguments to pass to the CLI script
  1525. """
  1526. return [
  1527. "-D",
  1528. "-e",
  1529. "-f",
  1530. str(self.config_dir / "sshd_config"),
  1531. "-p",
  1532. str(self.listen_port),
  1533. ]
  1534. def _write_config(self):
  1535. sshd_config_file = self.config_dir / "sshd_config"
  1536. if not sshd_config_file.exists():
  1537. # Let's write a default config file
  1538. config_lines = []
  1539. for key, value in self.sshd_config.items():
  1540. if isinstance(value, list):
  1541. for item in value:
  1542. config_lines.append("{} {}\n".format(key, item))
  1543. continue
  1544. config_lines.append("{} {}\n".format(key, value))
  1545. # Let's generate the host keys
  1546. self._generate_server_dsa_key()
  1547. self._generate_server_ecdsa_key()
  1548. self._generate_server_ed25519_key()
  1549. for host_key in pathlib.Path(self.config_dir).glob("ssh_host_*_key"):
  1550. config_lines.append("HostKey {}\n".format(host_key))
  1551. with open(str(sshd_config_file), "w") as wfh:
  1552. wfh.write("".join(sorted(config_lines)))
  1553. sshd_config_file.chmod(0o0600)
  1554. with open(str(sshd_config_file)) as wfh:
  1555. log.debug(
  1556. "Wrote to configuration file %s. Configuration:\n%s",
  1557. sshd_config_file,
  1558. wfh.read(),
  1559. )
  1560. def _generate_client_ecdsa_key(self):
  1561. key_filename = "client_key"
  1562. key_path_prv = self.config_dir / key_filename
  1563. key_path_pub = self.config_dir / "{}.pub".format(key_filename)
  1564. if key_path_prv.exists() and key_path_pub.exists():
  1565. return key_path_prv
  1566. self._ssh_keygen(key_filename, "ecdsa", "521")
  1567. for key_path in (key_path_prv, key_path_pub):
  1568. key_path.chmod(0o0400)
  1569. return key_path_prv
  1570. def _generate_server_dsa_key(self):
  1571. key_filename = "ssh_host_dsa_key"
  1572. key_path_prv = self.config_dir / key_filename
  1573. key_path_pub = self.config_dir / "{}.pub".format(key_filename)
  1574. if key_path_prv.exists() and key_path_pub.exists():
  1575. return key_path_prv
  1576. self._ssh_keygen(key_filename, "dsa", "1024")
  1577. for key_path in (key_path_prv, key_path_pub):
  1578. key_path.chmod(0o0400)
  1579. return key_path_prv
  1580. def _generate_server_ecdsa_key(self):
  1581. key_filename = "ssh_host_ecdsa_key"
  1582. key_path_prv = self.config_dir / key_filename
  1583. key_path_pub = self.config_dir / "{}.pub".format(key_filename)
  1584. if key_path_prv.exists() and key_path_pub.exists():
  1585. return key_path_prv
  1586. self._ssh_keygen(key_filename, "ecdsa", "521")
  1587. for key_path in (key_path_prv, key_path_pub):
  1588. key_path.chmod(0o0400)
  1589. return key_path_prv
  1590. def _generate_server_ed25519_key(self):
  1591. key_filename = "ssh_host_ed25519_key"
  1592. key_path_prv = self.config_dir / key_filename
  1593. key_path_pub = self.config_dir / "{}.pub".format(key_filename)
  1594. if key_path_prv.exists() and key_path_pub.exists():
  1595. return key_path_prv
  1596. self._ssh_keygen(key_filename, "ed25519", "521")
  1597. for key_path in (key_path_prv, key_path_pub):
  1598. key_path.chmod(0o0400)
  1599. return key_path_prv
  1600. def _ssh_keygen(self, key_filename, key_type, bits, comment=None):
  1601. try:
  1602. ssh_keygen = self._ssh_keygen_path
  1603. except AttributeError:
  1604. ssh_keygen = self._ssh_keygen_path = shutil.which("ssh-keygen")
  1605. if comment is None:
  1606. comment = "{user}@{host}-{date}".format(
  1607. user=salt.utils.user.get_user(),
  1608. host=socket.gethostname(),
  1609. date=datetime.utcnow().strftime("%Y-%m-%d"),
  1610. )
  1611. cmdline = [
  1612. ssh_keygen,
  1613. "-t",
  1614. key_type,
  1615. "-b",
  1616. bits,
  1617. "-C",
  1618. comment,
  1619. "-f",
  1620. key_filename,
  1621. "-P",
  1622. "",
  1623. ]
  1624. try:
  1625. subprocess.run(
  1626. cmdline,
  1627. cwd=str(self.config_dir),
  1628. check=True,
  1629. universal_newlines=True,
  1630. stdout=subprocess.PIPE,
  1631. stderr=subprocess.PIPE,
  1632. )
  1633. except subprocess.CalledProcessError as exc:
  1634. raise FactoryNotStarted(
  1635. "Failed to generate ssh key.",
  1636. cmdline=exc.args,
  1637. stdout=exc.stdout,
  1638. stderr=exc.stderr,
  1639. exitcode=exc.returncode,
  1640. )
  1641. @attr.s(kw_only=True, slots=True)
  1642. class SaltVirtMinionContainerFactory(SaltMinionContainerFactory):
  1643. host_uuid = attr.ib(default=attr.Factory(uuid.uuid4))
  1644. ssh_port = attr.ib(
  1645. default=attr.Factory(ports.get_unused_localhost_port), repr=False
  1646. )
  1647. sshd_port = attr.ib(default=attr.Factory(ports.get_unused_localhost_port))
  1648. libvirt_tcp_port = attr.ib(
  1649. default=attr.Factory(ports.get_unused_localhost_port), repr=False
  1650. )
  1651. libvirt_tls_port = attr.ib(
  1652. default=attr.Factory(ports.get_unused_localhost_port), repr=False
  1653. )
  1654. uri = attr.ib(init=False)
  1655. ssh_uri = attr.ib(init=False)
  1656. tcp_uri = attr.ib(init=False)
  1657. tls_uri = attr.ib(init=False)
  1658. def __attrs_post_init__(self):
  1659. self.uri = "localhost:{}".format(self.sshd_port)
  1660. self.ssh_uri = "qemu+ssh://{}/system".format(self.uri)
  1661. self.tcp_uri = "qemu+tcp://localhost:{}/system".format(self.libvirt_tcp_port)
  1662. self.tls_uri = "qemu+tls://localhost:{}/system".format(self.libvirt_tls_port)
  1663. if self.check_ports is None:
  1664. self.check_ports = []
  1665. self.check_ports.extend(
  1666. [self.sshd_port, self.libvirt_tcp_port, self.libvirt_tls_port]
  1667. )
  1668. if "environment" not in self.container_run_kwargs:
  1669. self.container_run_kwargs["environment"] = {}
  1670. self.container_run_kwargs["environment"].update(
  1671. {
  1672. "SSH_PORT": str(self.ssh_port),
  1673. "SSHD_PORT": str(self.sshd_port),
  1674. "LIBVIRT_TCP_PORT": str(self.libvirt_tcp_port),
  1675. "LIBVIRT_TLS_PORT": str(self.libvirt_tls_port),
  1676. "NO_START_MINION": "1",
  1677. "HOST_UUID": self.host_uuid,
  1678. }
  1679. )
  1680. if "ports" not in self.container_run_kwargs:
  1681. self.container_run_kwargs["ports"] = {}
  1682. self.container_run_kwargs["ports"].update(
  1683. {
  1684. "{}/tcp".format(self.ssh_port): self.ssh_port,
  1685. "{}/tcp".format(self.sshd_port): self.sshd_port,
  1686. "{}/tcp".format(self.libvirt_tcp_port): self.libvirt_tcp_port,
  1687. "{}/tcp".format(self.libvirt_tls_port): self.libvirt_tls_port,
  1688. }
  1689. )
  1690. if "volumes" not in self.container_run_kwargs:
  1691. self.container_run_kwargs["volumes"] = {}
  1692. self.container_run_kwargs["volumes"].update(
  1693. {
  1694. RUNTIME_VARS.CODE_DIR: {"bind": "/salt", "mode": "z"},
  1695. RUNTIME_VARS.CODE_DIR: {"bind": RUNTIME_VARS.CODE_DIR, "mode": "z"},
  1696. }
  1697. )
  1698. self.container_run_kwargs["working_dir"] = RUNTIME_VARS.CODE_DIR
  1699. self.container_run_kwargs["network_mode"] = "host"
  1700. self.container_run_kwargs["cap_add"] = ["ALL"]
  1701. self.container_run_kwargs["privileged"] = True
  1702. super().__attrs_post_init__()
  1703. self.python_executable = "python3"
  1704. def _container_start_checks(self):
  1705. # Once we're able to ls the salt-minion script it means the container
  1706. # has salt installed
  1707. ret = self.run("ls", "-lah", self.get_script_path())
  1708. if ret.exitcode == 0:
  1709. return True
  1710. time.sleep(1)
  1711. return False
  1712. @attr.s(kw_only=True, slots=True, hash=True)
  1713. class LogServer:
  1714. log_host = attr.ib(default="0.0.0.0")
  1715. log_port = attr.ib(default=attr.Factory(ports.get_unused_localhost_port))
  1716. log_level = attr.ib()
  1717. running_event = attr.ib(init=False, repr=False, hash=False)
  1718. process_queue_thread = attr.ib(init=False, repr=False, hash=False)
  1719. def start(self):
  1720. log.info("Starting log server at %s:%d", self.log_host, self.log_port)
  1721. self.running_event = threading.Event()
  1722. self.process_queue_thread = threading.Thread(target=self.process_logs)
  1723. self.process_queue_thread.start()
  1724. # Wait for the thread to start
  1725. if self.running_event.wait(5) is not True:
  1726. self.running_event.clear()
  1727. raise RuntimeError("Failed to start the log server")
  1728. log.info("Log Server Started")
  1729. def stop(self):
  1730. log.info("Stopping the logging server")
  1731. address = "tcp://{}:{}".format(self.log_host, self.log_port)
  1732. log.debug("Stopping the multiprocessing logging queue listener at %s", address)
  1733. context = zmq.Context()
  1734. sender = context.socket(zmq.PUSH)
  1735. sender.connect(address)
  1736. try:
  1737. sender.send(msgpack.dumps(None))
  1738. log.info("Sent sentinel to trigger log server shutdown")
  1739. finally:
  1740. sender.close(1000)
  1741. context.term()
  1742. # Clear the running even, the log process thread know it should stop
  1743. self.running_event.clear()
  1744. log.info("Joining the logging server process thread")
  1745. self.process_queue_thread.join(7)
  1746. if not self.process_queue_thread.is_alive():
  1747. log.debug("Stopped the log server")
  1748. else:
  1749. log.warning(
  1750. "The logging server thread is still running. Waiting a little longer..."
  1751. )
  1752. self.process_queue_thread.join(5)
  1753. if not self.process_queue_thread.is_alive():
  1754. log.debug("Stopped the log server")
  1755. else:
  1756. log.warning("The logging server thread is still running...")
  1757. def process_logs(self):
  1758. address = "tcp://{}:{}".format(self.log_host, self.log_port)
  1759. context = zmq.Context()
  1760. puller = context.socket(zmq.PULL)
  1761. exit_timeout_seconds = 5
  1762. exit_timeout = None
  1763. try:
  1764. puller.bind(address)
  1765. except zmq.ZMQError as exc:
  1766. log.exception("Unable to bind to puller at %s", address)
  1767. return
  1768. try:
  1769. self.running_event.set()
  1770. while True:
  1771. if not self.running_event.is_set():
  1772. if exit_timeout is None:
  1773. log.debug(
  1774. "Waiting %d seconds to process any remaning log messages "
  1775. "before exiting...",
  1776. exit_timeout_seconds,
  1777. )
  1778. exit_timeout = time.time() + exit_timeout_seconds
  1779. if time.time() >= exit_timeout:
  1780. log.debug(
  1781. "Unable to process remaining log messages in time. "
  1782. "Exiting anyway."
  1783. )
  1784. break
  1785. try:
  1786. try:
  1787. msg = puller.recv(flags=zmq.NOBLOCK)
  1788. except zmq.ZMQError as exc:
  1789. if exc.errno != zmq.EAGAIN:
  1790. raise
  1791. time.sleep(0.25)
  1792. continue
  1793. if msgpack.version >= (0, 5, 2):
  1794. record_dict = msgpack.loads(msg, raw=False)
  1795. else:
  1796. record_dict = msgpack.loads(msg, encoding="utf-8")
  1797. if record_dict is None:
  1798. # A sentinel to stop processing the queue
  1799. log.info("Received the sentinel to shutdown the log server")
  1800. break
  1801. try:
  1802. record_dict["message"]
  1803. except KeyError:
  1804. # This log record was msgpack dumped from Py2
  1805. for key, value in record_dict.copy().items():
  1806. skip_update = True
  1807. if isinstance(value, bytes):
  1808. value = value.decode("utf-8")
  1809. skip_update = False
  1810. if isinstance(key, bytes):
  1811. key = key.decode("utf-8")
  1812. skip_update = False
  1813. if skip_update is False:
  1814. record_dict[key] = value
  1815. # Just log everything, filtering will happen on the main process
  1816. # logging handlers
  1817. record = logging.makeLogRecord(record_dict)
  1818. logger = logging.getLogger(record.name)
  1819. logger.handle(record)
  1820. except (EOFError, KeyboardInterrupt, SystemExit) as exc:
  1821. break
  1822. except Exception as exc: # pylint: disable=broad-except
  1823. log.warning(
  1824. "An exception occurred in the log server processing queue thread: %s",
  1825. exc,
  1826. exc_info=True,
  1827. )
  1828. finally:
  1829. puller.close(1)
  1830. context.term()