helpers.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. """
  2. tests.support.pytest.helpers
  3. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. PyTest helpers functions
  5. """
  6. import logging
  7. import os
  8. import pathlib
  9. import shutil
  10. import tempfile
  11. import textwrap
  12. import types
  13. import warnings
  14. from contextlib import contextmanager
  15. import attr
  16. import pytest
  17. import salt.utils.platform
  18. import salt.utils.pycrypto
  19. from saltfactories.utils import random_string
  20. from tests.support.pytest.loader import LoaderModuleMock
  21. from tests.support.runtests import RUNTIME_VARS
  22. from tests.support.sminion import create_sminion
  23. log = logging.getLogger(__name__)
  24. if not RUNTIME_VARS.PYTEST_SESSION:
  25. # XXX: Remove this try/except once we fully switch to pytest
  26. class FakePyTestHelpersNamespace:
  27. __slots__ = ()
  28. def register(self, func):
  29. return func
  30. # Patch pytest so it all works under runtests.py
  31. pytest.helpers = FakePyTestHelpersNamespace()
  32. @pytest.helpers.register
  33. @contextmanager
  34. def temp_directory(name=None):
  35. """
  36. This helper creates a temporary directory. It should be used as a context manager
  37. which returns the temporary directory path, and, once out of context, deletes it.
  38. Can be directly imported and used, or, it can be used as a pytest helper function if
  39. ``pytest-helpers-namespace`` is installed.
  40. .. code-block:: python
  41. import os
  42. import pytest
  43. def test_blah():
  44. with pytest.helpers.temp_directory() as tpath:
  45. print(tpath)
  46. assert os.path.exists(tpath)
  47. assert not os.path.exists(tpath)
  48. """
  49. try:
  50. if name is not None:
  51. directory_path = os.path.join(RUNTIME_VARS.TMP, name)
  52. else:
  53. directory_path = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  54. if not os.path.isdir(directory_path):
  55. os.makedirs(directory_path)
  56. yield directory_path
  57. finally:
  58. shutil.rmtree(directory_path, ignore_errors=True)
  59. @pytest.helpers.register
  60. @contextmanager
  61. def temp_file(name=None, contents=None, directory=None, strip_first_newline=True):
  62. """
  63. This helper creates a temporary file. It should be used as a context manager
  64. which returns the temporary file path, and, once out of context, deletes it.
  65. Can be directly imported and used, or, it can be used as a pytest helper function if
  66. ``pytest-helpers-namespace`` is installed.
  67. .. code-block:: python
  68. import os
  69. import pytest
  70. def test_blah():
  71. with pytest.helpers.temp_file("blah.txt") as tpath:
  72. print(tpath)
  73. assert os.path.exists(tpath)
  74. assert not os.path.exists(tpath)
  75. Args:
  76. name(str):
  77. The temporary file name
  78. contents(str):
  79. The contents of the temporary file
  80. directory(str):
  81. The directory where to create the temporary file. If ``None``, then ``RUNTIME_VARS.TMP``
  82. will be used.
  83. strip_first_newline(bool):
  84. Wether to strip the initial first new line char or not.
  85. """
  86. try:
  87. if directory is None:
  88. directory = RUNTIME_VARS.TMP
  89. if not isinstance(directory, pathlib.Path):
  90. directory = pathlib.Path(str(directory))
  91. if name is not None:
  92. file_path = directory / name
  93. else:
  94. handle, file_path = tempfile.mkstemp(dir=str(directory))
  95. os.close(handle)
  96. file_path = pathlib.Path(file_path)
  97. file_directory = file_path.parent
  98. if not file_directory.is_dir():
  99. file_directory.mkdir(parents=True)
  100. if contents is not None:
  101. if contents:
  102. if contents.startswith("\n") and strip_first_newline:
  103. contents = contents[1:]
  104. file_contents = textwrap.dedent(contents)
  105. else:
  106. file_contents = contents
  107. file_path.write_text(file_contents)
  108. log_contents = "{0} Contents of {1}\n{2}\n{3} Contents of {1}".format(
  109. ">" * 6, file_path, file_contents, "<" * 6
  110. )
  111. log.debug("Created temp file: %s\n%s", file_path, log_contents)
  112. else:
  113. log.debug("Touched temp file: %s", file_path)
  114. yield file_path
  115. finally:
  116. if file_path.exists():
  117. file_path.unlink()
  118. log.debug("Deleted temp file: %s", file_path)
  119. @pytest.helpers.register
  120. def temp_state_file(name, contents, saltenv="base", strip_first_newline=True):
  121. """
  122. This helper creates a temporary state file. It should be used as a context manager
  123. which returns the temporary state file path, and, once out of context, deletes it.
  124. Can be directly imported and used, or, it can be used as a pytest helper function if
  125. ``pytest-helpers-namespace`` is installed.
  126. .. code-block:: python
  127. import os
  128. import pytest
  129. def test_blah():
  130. with pytest.helpers.temp_state_file("blah.sls") as tpath:
  131. print(tpath)
  132. assert os.path.exists(tpath)
  133. assert not os.path.exists(tpath)
  134. Depending on the saltenv, it will be created under ``RUNTIME_VARS.TMP_STATE_TREE`` or
  135. ``RUNTIME_VARS.TMP_PRODENV_STATE_TREE``.
  136. Args:
  137. name(str):
  138. The temporary state file name
  139. contents(str):
  140. The contents of the temporary file
  141. saltenv(str):
  142. The salt env to use. Either ``base`` or ``prod``
  143. strip_first_newline(bool):
  144. Wether to strip the initial first new line char or not.
  145. """
  146. if saltenv == "base":
  147. directory = RUNTIME_VARS.TMP_BASEENV_STATE_TREE
  148. elif saltenv == "prod":
  149. directory = RUNTIME_VARS.TMP_PRODENV_STATE_TREE
  150. else:
  151. raise RuntimeError(
  152. '"saltenv" can only be "base" or "prod", not "{}"'.format(saltenv)
  153. )
  154. return temp_file(
  155. name, contents, directory=directory, strip_first_newline=strip_first_newline
  156. )
  157. @pytest.helpers.register
  158. def temp_pillar_file(name, contents, saltenv="base", strip_first_newline=True):
  159. """
  160. This helper creates a temporary pillar file. It should be used as a context manager
  161. which returns the temporary pillar file path, and, once out of context, deletes it.
  162. Can be directly imported and used, or, it can be used as a pytest helper function if
  163. ``pytest-helpers-namespace`` is installed.
  164. .. code-block:: python
  165. import os
  166. import pytest
  167. def test_blah():
  168. with pytest.helpers.temp_pillar_file("blah.sls") as tpath:
  169. print(tpath)
  170. assert os.path.exists(tpath)
  171. assert not os.path.exists(tpath)
  172. Depending on the saltenv, it will be created under ``RUNTIME_VARS.TMP_PILLAR_TREE`` or
  173. ``RUNTIME_VARS.TMP_PRODENV_PILLAR_TREE``.
  174. Args:
  175. name(str):
  176. The temporary state file name
  177. contents(str):
  178. The contents of the temporary file
  179. saltenv(str):
  180. The salt env to use. Either ``base`` or ``prod``
  181. strip_first_newline(bool):
  182. Wether to strip the initial first new line char or not.
  183. """
  184. if saltenv == "base":
  185. directory = RUNTIME_VARS.TMP_BASEENV_PILLAR_TREE
  186. elif saltenv == "prod":
  187. directory = RUNTIME_VARS.TMP_PRODENV_PILLAR_TREE
  188. else:
  189. raise RuntimeError(
  190. '"saltenv" can only be "base" or "prod", not "{}"'.format(saltenv)
  191. )
  192. return temp_file(
  193. name, contents, directory=directory, strip_first_newline=strip_first_newline
  194. )
  195. @pytest.helpers.register
  196. def loader_mock(*args, **kwargs):
  197. if len(args) > 1:
  198. loader_modules = args[1]
  199. warnings.warn(
  200. "'request' is not longer an accepted argument to 'loader_mock()'. Please stop passing it.",
  201. category=DeprecationWarning,
  202. )
  203. else:
  204. loader_modules = args[0]
  205. return LoaderModuleMock(loader_modules, **kwargs)
  206. @pytest.helpers.register
  207. def salt_loader_module_functions(module):
  208. if not isinstance(module, types.ModuleType):
  209. raise RuntimeError(
  210. "The passed 'module' argument must be an imported "
  211. "imported module, not {}".format(type(module))
  212. )
  213. funcs = {}
  214. func_alias = getattr(module, "__func_alias__", {})
  215. virtualname = getattr(module, "__virtualname__")
  216. for name in dir(module):
  217. if name.startswith("_"):
  218. continue
  219. func = getattr(module, name)
  220. if getattr(func, "__module__", None) != module.__name__:
  221. # Not eve defined on the module being processed, carry on
  222. continue
  223. if not isinstance(func, types.FunctionType):
  224. # Not a function? carry on
  225. continue
  226. funcname = func_alias.get(func.__name__) or func.__name__
  227. funcs["{}.{}".format(virtualname, funcname)] = func
  228. return funcs
  229. @pytest.helpers.register
  230. def remove_stale_minion_key(master, minion_id):
  231. key_path = os.path.join(master.config["pki_dir"], "minions", minion_id)
  232. if os.path.exists(key_path):
  233. os.unlink(key_path)
  234. else:
  235. log.debug("The minion(id=%r) key was not found at %s", minion_id, key_path)
  236. @attr.s(kw_only=True, slots=True)
  237. class TestAccount:
  238. sminion = attr.ib(default=None, repr=False)
  239. username = attr.ib(default=None)
  240. password = attr.ib(default=None)
  241. hashed_password = attr.ib(default=None, repr=False)
  242. groups = attr.ib(default=None)
  243. def __attrs_post_init__(self):
  244. if self.sminion is None:
  245. self.sminion = create_sminion()
  246. if self.username is None:
  247. self.username = random_string("account-", uppercase=False)
  248. if self.password is None:
  249. self.password = self.username
  250. if self.hashed_password is None:
  251. self.hashed_password = salt.utils.pycrypto.gen_hash(password=self.password)
  252. def __enter__(self):
  253. log.debug("Creating system account: %s", self)
  254. ret = self.sminion.functions.user.add(self.username)
  255. assert ret
  256. ret = self.sminion.functions.shadow.set_password(
  257. self.username,
  258. self.password if salt.utils.platform.is_darwin() else self.hashed_password,
  259. )
  260. assert ret
  261. assert self.username in self.sminion.functions.user.list_users()
  262. log.debug("Created system account: %s", self)
  263. # Run tests
  264. return self
  265. def __exit__(self, *args):
  266. self.sminion.functions.user.delete(self.username, remove=True, force=True)
  267. log.debug("Deleted system account: %s", self.username)
  268. @pytest.helpers.register
  269. @contextmanager
  270. def create_account(username=None, password=None, hashed_password=None, sminion=None):
  271. with TestAccount(
  272. sminion=sminion,
  273. username=username,
  274. password=password,
  275. hashed_password=hashed_password,
  276. ) as account:
  277. yield account
  278. # Only allow star importing the functions defined in this module
  279. __all__ = [
  280. name
  281. for (name, func) in locals().items()
  282. if getattr(func, "__module__", None) == __name__
  283. ]