123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- """
- tests.support.pytest.helpers
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- PyTest helpers functions
- """
- import logging
- import os
- import shutil
- import tempfile
- import textwrap
- import types
- import warnings
- from contextlib import contextmanager
- import pytest
- import salt.utils.files
- from tests.support.pytest.loader import LoaderModuleMock
- from tests.support.runtests import RUNTIME_VARS
- log = logging.getLogger(__name__)
- if not RUNTIME_VARS.PYTEST_SESSION:
- # XXX: Remove this try/except once we fully switch to pytest
- class FakePyTestHelpersNamespace:
- __slots__ = ()
- def register(self, func):
- return func
- # Patch pytest so it all works under runtests.py
- pytest.helpers = FakePyTestHelpersNamespace()
- @pytest.helpers.register
- @contextmanager
- def temp_directory(name=None):
- """
- This helper creates a temporary directory. It should be used as a context manager
- which returns the temporary directory path, and, once out of context, deletes it.
- Can be directly imported and used, or, it can be used as a pytest helper function if
- ``pytest-helpers-namespace`` is installed.
- .. code-block:: python
- import os
- import pytest
- def test_blah():
- with pytest.helpers.temp_directory() as tpath:
- print(tpath)
- assert os.path.exists(tpath)
- assert not os.path.exists(tpath)
- """
- try:
- if name is not None:
- directory_path = os.path.join(RUNTIME_VARS.TMP, name)
- else:
- directory_path = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- if not os.path.isdir(directory_path):
- os.makedirs(directory_path)
- yield directory_path
- finally:
- shutil.rmtree(directory_path, ignore_errors=True)
- @pytest.helpers.register
- @contextmanager
- def temp_file(name=None, contents=None, directory=None, strip_first_newline=True):
- """
- This helper creates a temporary file. It should be used as a context manager
- which returns the temporary file path, and, once out of context, deletes it.
- Can be directly imported and used, or, it can be used as a pytest helper function if
- ``pytest-helpers-namespace`` is installed.
- .. code-block:: python
- import os
- import pytest
- def test_blah():
- with pytest.helpers.temp_file("blah.txt") as tpath:
- print(tpath)
- assert os.path.exists(tpath)
- assert not os.path.exists(tpath)
- Args:
- name(str):
- The temporary file name
- contents(str):
- The contents of the temporary file
- directory(str):
- The directory where to create the temporary file. If ``None``, then ``RUNTIME_VARS.TMP``
- will be used.
- strip_first_newline(bool):
- Wether to strip the initial first new line char or not.
- """
- try:
- if directory is None:
- directory = RUNTIME_VARS.TMP
- if name is not None:
- file_path = os.path.join(directory, name)
- else:
- handle, file_path = tempfile.mkstemp(dir=directory)
- os.close(handle)
- file_directory = os.path.dirname(file_path)
- if not os.path.isdir(file_directory):
- os.makedirs(file_directory)
- if contents is not None:
- if contents:
- if contents.startswith("\n") and strip_first_newline:
- contents = contents[1:]
- file_contents = textwrap.dedent(contents)
- else:
- file_contents = contents
- with salt.utils.files.fopen(file_path, "w") as wfh:
- wfh.write(file_contents)
- yield file_path
- finally:
- try:
- os.unlink(file_path)
- except OSError:
- # Already deleted
- pass
- @pytest.helpers.register
- def temp_state_file(name, contents, saltenv="base", strip_first_newline=True):
- """
- This helper creates a temporary state file. It should be used as a context manager
- which returns the temporary state file path, and, once out of context, deletes it.
- Can be directly imported and used, or, it can be used as a pytest helper function if
- ``pytest-helpers-namespace`` is installed.
- .. code-block:: python
- import os
- import pytest
- def test_blah():
- with pytest.helpers.temp_state_file("blah.sls") as tpath:
- print(tpath)
- assert os.path.exists(tpath)
- assert not os.path.exists(tpath)
- Depending on the saltenv, it will be created under ``RUNTIME_VARS.TMP_STATE_TREE`` or
- ``RUNTIME_VARS.TMP_PRODENV_STATE_TREE``.
- Args:
- name(str):
- The temporary state file name
- contents(str):
- The contents of the temporary file
- saltenv(str):
- The salt env to use. Either ``base`` or ``prod``
- strip_first_newline(bool):
- Wether to strip the initial first new line char or not.
- """
- if saltenv == "base":
- directory = RUNTIME_VARS.TMP_BASEENV_STATE_TREE
- elif saltenv == "prod":
- directory = RUNTIME_VARS.TMP_PRODENV_STATE_TREE
- else:
- raise RuntimeError(
- '"saltenv" can only be "base" or "prod", not "{}"'.format(saltenv)
- )
- return temp_file(
- name, contents, directory=directory, strip_first_newline=strip_first_newline
- )
- @pytest.helpers.register
- def temp_pillar_file(name, contents, saltenv="base", strip_first_newline=True):
- """
- This helper creates a temporary pillar file. It should be used as a context manager
- which returns the temporary pillar file path, and, once out of context, deletes it.
- Can be directly imported and used, or, it can be used as a pytest helper function if
- ``pytest-helpers-namespace`` is installed.
- .. code-block:: python
- import os
- import pytest
- def test_blah():
- with pytest.helpers.temp_pillar_file("blah.sls") as tpath:
- print(tpath)
- assert os.path.exists(tpath)
- assert not os.path.exists(tpath)
- Depending on the saltenv, it will be created under ``RUNTIME_VARS.TMP_PILLAR_TREE`` or
- ``RUNTIME_VARS.TMP_PRODENV_PILLAR_TREE``.
- Args:
- name(str):
- The temporary state file name
- contents(str):
- The contents of the temporary file
- saltenv(str):
- The salt env to use. Either ``base`` or ``prod``
- strip_first_newline(bool):
- Wether to strip the initial first new line char or not.
- """
- if saltenv == "base":
- directory = RUNTIME_VARS.TMP_BASEENV_PILLAR_TREE
- elif saltenv == "prod":
- directory = RUNTIME_VARS.TMP_PRODENV_PILLAR_TREE
- else:
- raise RuntimeError(
- '"saltenv" can only be "base" or "prod", not "{}"'.format(saltenv)
- )
- return temp_file(
- name, contents, directory=directory, strip_first_newline=strip_first_newline
- )
- @pytest.helpers.register
- def loader_mock(*args, **kwargs):
- if len(args) > 1:
- loader_modules = args[1]
- warnings.warn(
- "'request' is not longer an accepted argument to 'loader_mock()'. Please stop passing it.",
- category=DeprecationWarning,
- )
- else:
- loader_modules = args[0]
- return LoaderModuleMock(loader_modules, **kwargs)
- @pytest.helpers.register
- def salt_loader_module_functions(module):
- if not isinstance(module, types.ModuleType):
- raise RuntimeError(
- "The passed 'module' argument must be an imported "
- "imported module, not {}".format(type(module))
- )
- funcs = {}
- func_alias = getattr(module, "__func_alias__", {})
- virtualname = getattr(module, "__virtualname__")
- for name in dir(module):
- if name.startswith("_"):
- continue
- func = getattr(module, name)
- if getattr(func, "__module__", None) != module.__name__:
- # Not eve defined on the module being processed, carry on
- continue
- if not isinstance(func, types.FunctionType):
- # Not a function? carry on
- continue
- funcname = func_alias.get(func.__name__) or func.__name__
- funcs["{}.{}".format(virtualname, funcname)] = func
- return funcs
- @pytest.helpers.register
- def remove_stale_minion_key(master, minion_id):
- key_path = os.path.join(master.config["pki_dir"], "minions", minion_id)
- if os.path.exists(key_path):
- os.unlink(key_path)
- else:
- log.debug("The minion(id=%r) key was not found at %s", minion_id, key_path)
- # Only allow star importing the functions defined in this module
- __all__ = [
- name
- for (name, func) in locals().items()
- if getattr(func, "__module__", None) == __name__
- ]
|