123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- """
- tests.support.pytest.helpers
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- PyTest helpers functions
- """
- import logging
- import os
- import pathlib
- import shutil
- import tempfile
- import textwrap
- import types
- import warnings
- from contextlib import contextmanager
- import attr
- import pytest
- import salt.utils.platform
- import salt.utils.pycrypto
- from saltfactories.utils import random_string
- from tests.support.pytest.loader import LoaderModuleMock
- from tests.support.runtests import RUNTIME_VARS
- from tests.support.sminion import create_sminion
- log = logging.getLogger(__name__)
- if not RUNTIME_VARS.PYTEST_SESSION:
- # XXX: Remove this try/except once we fully switch to pytest
- class FakePyTestHelpersNamespace:
- __slots__ = ()
- def register(self, func):
- return func
- # Patch pytest so it all works under runtests.py
- pytest.helpers = FakePyTestHelpersNamespace()
- @pytest.helpers.register
- @contextmanager
- def temp_directory(name=None):
- """
- This helper creates a temporary directory. It should be used as a context manager
- which returns the temporary directory path, and, once out of context, deletes it.
- Can be directly imported and used, or, it can be used as a pytest helper function if
- ``pytest-helpers-namespace`` is installed.
- .. code-block:: python
- import os
- import pytest
- def test_blah():
- with pytest.helpers.temp_directory() as tpath:
- print(tpath)
- assert os.path.exists(tpath)
- assert not os.path.exists(tpath)
- """
- try:
- if name is not None:
- directory_path = os.path.join(RUNTIME_VARS.TMP, name)
- else:
- directory_path = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- if not os.path.isdir(directory_path):
- os.makedirs(directory_path)
- yield directory_path
- finally:
- shutil.rmtree(directory_path, ignore_errors=True)
- @pytest.helpers.register
- @contextmanager
- def temp_file(name=None, contents=None, directory=None, strip_first_newline=True):
- """
- This helper creates a temporary file. It should be used as a context manager
- which returns the temporary file path, and, once out of context, deletes it.
- Can be directly imported and used, or, it can be used as a pytest helper function if
- ``pytest-helpers-namespace`` is installed.
- .. code-block:: python
- import os
- import pytest
- def test_blah():
- with pytest.helpers.temp_file("blah.txt") as tpath:
- print(tpath)
- assert os.path.exists(tpath)
- assert not os.path.exists(tpath)
- Args:
- name(str):
- The temporary file name
- contents(str):
- The contents of the temporary file
- directory(str):
- The directory where to create the temporary file. If ``None``, then ``RUNTIME_VARS.TMP``
- will be used.
- strip_first_newline(bool):
- Wether to strip the initial first new line char or not.
- """
- try:
- if directory is None:
- directory = RUNTIME_VARS.TMP
- if not isinstance(directory, pathlib.Path):
- directory = pathlib.Path(str(directory))
- if name is not None:
- file_path = directory / name
- else:
- handle, file_path = tempfile.mkstemp(dir=str(directory))
- os.close(handle)
- file_path = pathlib.Path(file_path)
- file_directory = file_path.parent
- if not file_directory.is_dir():
- file_directory.mkdir(parents=True)
- if contents is not None:
- if contents:
- if contents.startswith("\n") and strip_first_newline:
- contents = contents[1:]
- file_contents = textwrap.dedent(contents)
- else:
- file_contents = contents
- file_path.write_text(file_contents)
- log_contents = "{0} Contents of {1}\n{2}\n{3} Contents of {1}".format(
- ">" * 6, file_path, file_contents, "<" * 6
- )
- log.debug("Created temp file: %s\n%s", file_path, log_contents)
- else:
- log.debug("Touched temp file: %s", file_path)
- yield file_path
- finally:
- if file_path.exists():
- file_path.unlink()
- log.debug("Deleted temp file: %s", file_path)
- @pytest.helpers.register
- def temp_state_file(name, contents, saltenv="base", strip_first_newline=True):
- """
- This helper creates a temporary state file. It should be used as a context manager
- which returns the temporary state file path, and, once out of context, deletes it.
- Can be directly imported and used, or, it can be used as a pytest helper function if
- ``pytest-helpers-namespace`` is installed.
- .. code-block:: python
- import os
- import pytest
- def test_blah():
- with pytest.helpers.temp_state_file("blah.sls") as tpath:
- print(tpath)
- assert os.path.exists(tpath)
- assert not os.path.exists(tpath)
- Depending on the saltenv, it will be created under ``RUNTIME_VARS.TMP_STATE_TREE`` or
- ``RUNTIME_VARS.TMP_PRODENV_STATE_TREE``.
- Args:
- name(str):
- The temporary state file name
- contents(str):
- The contents of the temporary file
- saltenv(str):
- The salt env to use. Either ``base`` or ``prod``
- strip_first_newline(bool):
- Wether to strip the initial first new line char or not.
- """
- if saltenv == "base":
- directory = RUNTIME_VARS.TMP_BASEENV_STATE_TREE
- elif saltenv == "prod":
- directory = RUNTIME_VARS.TMP_PRODENV_STATE_TREE
- else:
- raise RuntimeError(
- '"saltenv" can only be "base" or "prod", not "{}"'.format(saltenv)
- )
- return temp_file(
- name, contents, directory=directory, strip_first_newline=strip_first_newline
- )
- @pytest.helpers.register
- def temp_pillar_file(name, contents, saltenv="base", strip_first_newline=True):
- """
- This helper creates a temporary pillar file. It should be used as a context manager
- which returns the temporary pillar file path, and, once out of context, deletes it.
- Can be directly imported and used, or, it can be used as a pytest helper function if
- ``pytest-helpers-namespace`` is installed.
- .. code-block:: python
- import os
- import pytest
- def test_blah():
- with pytest.helpers.temp_pillar_file("blah.sls") as tpath:
- print(tpath)
- assert os.path.exists(tpath)
- assert not os.path.exists(tpath)
- Depending on the saltenv, it will be created under ``RUNTIME_VARS.TMP_PILLAR_TREE`` or
- ``RUNTIME_VARS.TMP_PRODENV_PILLAR_TREE``.
- Args:
- name(str):
- The temporary state file name
- contents(str):
- The contents of the temporary file
- saltenv(str):
- The salt env to use. Either ``base`` or ``prod``
- strip_first_newline(bool):
- Wether to strip the initial first new line char or not.
- """
- if saltenv == "base":
- directory = RUNTIME_VARS.TMP_BASEENV_PILLAR_TREE
- elif saltenv == "prod":
- directory = RUNTIME_VARS.TMP_PRODENV_PILLAR_TREE
- else:
- raise RuntimeError(
- '"saltenv" can only be "base" or "prod", not "{}"'.format(saltenv)
- )
- return temp_file(
- name, contents, directory=directory, strip_first_newline=strip_first_newline
- )
- @pytest.helpers.register
- def loader_mock(*args, **kwargs):
- if len(args) > 1:
- loader_modules = args[1]
- warnings.warn(
- "'request' is not longer an accepted argument to 'loader_mock()'. Please stop passing it.",
- category=DeprecationWarning,
- )
- else:
- loader_modules = args[0]
- return LoaderModuleMock(loader_modules, **kwargs)
- @pytest.helpers.register
- def salt_loader_module_functions(module):
- if not isinstance(module, types.ModuleType):
- raise RuntimeError(
- "The passed 'module' argument must be an imported "
- "imported module, not {}".format(type(module))
- )
- funcs = {}
- func_alias = getattr(module, "__func_alias__", {})
- virtualname = getattr(module, "__virtualname__")
- for name in dir(module):
- if name.startswith("_"):
- continue
- func = getattr(module, name)
- if getattr(func, "__module__", None) != module.__name__:
- # Not eve defined on the module being processed, carry on
- continue
- if not isinstance(func, types.FunctionType):
- # Not a function? carry on
- continue
- funcname = func_alias.get(func.__name__) or func.__name__
- funcs["{}.{}".format(virtualname, funcname)] = func
- return funcs
- @pytest.helpers.register
- def remove_stale_minion_key(master, minion_id):
- key_path = os.path.join(master.config["pki_dir"], "minions", minion_id)
- if os.path.exists(key_path):
- os.unlink(key_path)
- else:
- log.debug("The minion(id=%r) key was not found at %s", minion_id, key_path)
- @attr.s(kw_only=True, slots=True)
- class TestAccount:
- sminion = attr.ib(default=None, repr=False)
- username = attr.ib(default=None)
- password = attr.ib(default=None)
- hashed_password = attr.ib(default=None, repr=False)
- groups = attr.ib(default=None)
- def __attrs_post_init__(self):
- if self.sminion is None:
- self.sminion = create_sminion()
- if self.username is None:
- self.username = random_string("account-", uppercase=False)
- if self.password is None:
- self.password = self.username
- if self.hashed_password is None:
- self.hashed_password = salt.utils.pycrypto.gen_hash(password=self.password)
- def __enter__(self):
- log.debug("Creating system account: %s", self)
- ret = self.sminion.functions.user.add(self.username)
- assert ret
- ret = self.sminion.functions.shadow.set_password(
- self.username,
- self.password if salt.utils.platform.is_darwin() else self.hashed_password,
- )
- assert ret
- assert self.username in self.sminion.functions.user.list_users()
- log.debug("Created system account: %s", self)
- # Run tests
- return self
- def __exit__(self, *args):
- self.sminion.functions.user.delete(self.username, remove=True, force=True)
- log.debug("Deleted system account: %s", self.username)
- @pytest.helpers.register
- @contextmanager
- def create_account(username=None, password=None, hashed_password=None, sminion=None):
- with TestAccount(
- sminion=sminion,
- username=username,
- password=password,
- hashed_password=hashed_password,
- ) as account:
- yield account
- # Only allow star importing the functions defined in this module
- __all__ = [
- name
- for (name, func) in locals().items()
- if getattr(func, "__module__", None) == __name__
- ]
|