|
- # -*- coding: utf-8 -*-
- from __future__ import absolute_import, print_function, unicode_literals
- import hashlib
- import logging
- import os
- import shutil
- import signal
- import tempfile
- import textwrap
- import time
- import uuid
- import psutil
- import pytest
- import salt.ext.six as six
- import salt.utils.files
- import salt.utils.path
- import salt.utils.platform
- import salt.utils.stringutils
- from saltfactories.utils.ports import get_unused_localhost_port
- from tests.support.case import ModuleCase
- from tests.support.helpers import skip_if_not_root, slowTest, with_tempfile
- from tests.support.runtests import RUNTIME_VARS
- from tests.support.unit import skipIf
- log = logging.getLogger(__name__)
- @pytest.mark.windows_whitelisted
- class CPModuleTest(ModuleCase):
- """
- Validate the cp module
- """
- def run_function(self, *args, **kwargs): # pylint: disable=arguments-differ
- """
- Ensure that results are decoded
- TODO: maybe move this behavior to ModuleCase itself?
- """
- return salt.utils.data.decode(
- super(CPModuleTest, self).run_function(*args, **kwargs)
- )
- @with_tempfile()
- @slowTest
- def test_get_file(self, tgt):
- """
- cp.get_file
- """
- self.run_function("cp.get_file", ["salt://grail/scene33", tgt])
- with salt.utils.files.fopen(tgt, "r") as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn("KNIGHT: They're nervous, sire.", data)
- self.assertNotIn("bacon", data)
- @slowTest
- def test_get_file_to_dir(self):
- """
- cp.get_file
- """
- tgt = os.path.join(RUNTIME_VARS.TMP, "")
- self.run_function("cp.get_file", ["salt://grail/scene33", tgt])
- with salt.utils.files.fopen(tgt + "scene33", "r") as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn("KNIGHT: They're nervous, sire.", data)
- self.assertNotIn("bacon", data)
- @with_tempfile()
- @skipIf(
- salt.utils.platform.is_windows() and six.PY3,
- "This test hangs on Windows on Py3",
- )
- def test_get_file_templated_paths(self, tgt):
- """
- cp.get_file
- """
- self.run_function(
- "cp.get_file",
- [
- "salt://{{grains.test_grain}}",
- tgt.replace("cheese", "{{grains.test_grain}}"),
- ],
- template="jinja",
- )
- with salt.utils.files.fopen(tgt, "r") as cheese:
- data = salt.utils.stringutils.to_unicode(cheese.read())
- self.assertIn("Gromit", data)
- self.assertNotIn("bacon", data)
- @with_tempfile()
- @slowTest
- def test_get_file_gzipped(self, tgt):
- """
- cp.get_file
- """
- src = os.path.join(RUNTIME_VARS.FILES, "file", "base", "file.big")
- with salt.utils.files.fopen(src, "rb") as fp_:
- hash_str = hashlib.md5(fp_.read()).hexdigest()
- self.run_function("cp.get_file", ["salt://file.big", tgt], gzip=5)
- with salt.utils.files.fopen(tgt, "rb") as scene:
- data = scene.read()
- self.assertEqual(hash_str, hashlib.md5(data).hexdigest())
- data = salt.utils.stringutils.to_unicode(data)
- self.assertIn("KNIGHT: They're nervous, sire.", data)
- self.assertNotIn("bacon", data)
- @slowTest
- def test_get_file_makedirs(self):
- """
- cp.get_file
- """
- tgt = os.path.join(RUNTIME_VARS.TMP, "make", "dirs", "scene33")
- self.run_function("cp.get_file", ["salt://grail/scene33", tgt], makedirs=True)
- self.addCleanup(
- shutil.rmtree, os.path.join(RUNTIME_VARS.TMP, "make"), ignore_errors=True
- )
- with salt.utils.files.fopen(tgt, "r") as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn("KNIGHT: They're nervous, sire.", data)
- self.assertNotIn("bacon", data)
- @with_tempfile()
- @slowTest
- def test_get_template(self, tgt):
- """
- cp.get_template
- """
- self.run_function(
- "cp.get_template", ["salt://grail/scene33", tgt], spam="bacon"
- )
- with salt.utils.files.fopen(tgt, "r") as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn("bacon", data)
- self.assertNotIn("spam", data)
- @slowTest
- def test_get_dir(self):
- """
- cp.get_dir
- """
- tgt = os.path.join(RUNTIME_VARS.TMP, "many")
- self.run_function("cp.get_dir", ["salt://grail", tgt])
- self.assertIn("grail", os.listdir(tgt))
- self.assertIn("36", os.listdir(os.path.join(tgt, "grail")))
- self.assertIn("empty", os.listdir(os.path.join(tgt, "grail")))
- self.assertIn("scene", os.listdir(os.path.join(tgt, "grail", "36")))
- @slowTest
- def test_get_dir_templated_paths(self):
- """
- cp.get_dir
- """
- tgt = os.path.join(RUNTIME_VARS.TMP, "many")
- self.run_function(
- "cp.get_dir",
- ["salt://{{grains.script}}", tgt.replace("many", "{{grains.alot}}")],
- )
- self.assertIn("grail", os.listdir(tgt))
- self.assertIn("36", os.listdir(os.path.join(tgt, "grail")))
- self.assertIn("empty", os.listdir(os.path.join(tgt, "grail")))
- self.assertIn("scene", os.listdir(os.path.join(tgt, "grail", "36")))
- # cp.get_url tests
- @with_tempfile()
- @slowTest
- def test_get_url(self, tgt):
- """
- cp.get_url with salt:// source given
- """
- self.run_function("cp.get_url", ["salt://grail/scene33", tgt])
- with salt.utils.files.fopen(tgt, "r") as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn("KNIGHT: They're nervous, sire.", data)
- self.assertNotIn("bacon", data)
- @slowTest
- def test_get_url_makedirs(self):
- """
- cp.get_url
- """
- tgt = os.path.join(RUNTIME_VARS.TMP, "make", "dirs", "scene33")
- self.run_function("cp.get_url", ["salt://grail/scene33", tgt], makedirs=True)
- self.addCleanup(
- shutil.rmtree, os.path.join(RUNTIME_VARS.TMP, "make"), ignore_errors=True
- )
- with salt.utils.files.fopen(tgt, "r") as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn("KNIGHT: They're nervous, sire.", data)
- self.assertNotIn("bacon", data)
- @slowTest
- def test_get_url_dest_empty(self):
- """
- cp.get_url with salt:// source given and destination omitted.
- """
- ret = self.run_function("cp.get_url", ["salt://grail/scene33"])
- with salt.utils.files.fopen(ret, "r") as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn("KNIGHT: They're nervous, sire.", data)
- self.assertNotIn("bacon", data)
- @slowTest
- def test_get_url_no_dest(self):
- """
- cp.get_url with salt:// source given and destination set as None
- """
- tgt = None
- ret = self.run_function("cp.get_url", ["salt://grail/scene33", tgt])
- self.assertIn("KNIGHT: They're nervous, sire.", ret)
- @slowTest
- def test_get_url_nonexistent_source(self):
- """
- cp.get_url with nonexistent salt:// source given
- """
- tgt = None
- ret = self.run_function("cp.get_url", ["salt://grail/nonexistent_scene", tgt])
- self.assertEqual(ret, False)
- @slowTest
- def test_get_url_to_dir(self):
- """
- cp.get_url with salt:// source
- """
- tgt = os.path.join(RUNTIME_VARS.TMP, "")
- self.run_function("cp.get_url", ["salt://grail/scene33", tgt])
- with salt.utils.files.fopen(tgt + "scene33", "r") as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn("KNIGHT: They're nervous, sire.", data)
- self.assertNotIn("bacon", data)
- @skipIf(
- salt.utils.platform.is_darwin() and six.PY2, "This test hangs on OS X on Py2"
- )
- @with_tempfile()
- @slowTest
- def test_get_url_https(self, tgt):
- """
- cp.get_url with https:// source given
- """
- self.run_function("cp.get_url", ["https://repo.saltstack.com/index.html", tgt])
- with salt.utils.files.fopen(tgt, "r") as instructions:
- data = salt.utils.stringutils.to_unicode(instructions.read())
- self.assertIn("Bootstrap", data)
- self.assertIn("Debian", data)
- self.assertIn("Windows", data)
- self.assertNotIn("AYBABTU", data)
- @skipIf(
- salt.utils.platform.is_darwin() and six.PY2, "This test hangs on OS X on Py2"
- )
- @slowTest
- def test_get_url_https_dest_empty(self):
- """
- cp.get_url with https:// source given and destination omitted.
- """
- ret = self.run_function("cp.get_url", ["https://repo.saltstack.com/index.html"])
- with salt.utils.files.fopen(ret, "r") as instructions:
- data = salt.utils.stringutils.to_unicode(instructions.read())
- self.assertIn("Bootstrap", data)
- self.assertIn("Debian", data)
- self.assertIn("Windows", data)
- self.assertNotIn("AYBABTU", data)
- @skipIf(
- salt.utils.platform.is_darwin() and six.PY2, "This test hangs on OS X on Py2"
- )
- @slowTest
- def test_get_url_https_no_dest(self):
- """
- cp.get_url with https:// source given and destination set as None
- """
- timeout = 500
- start = time.time()
- sleep = 5
- tgt = None
- while time.time() - start <= timeout:
- ret = self.run_function(
- "cp.get_url", ["https://repo.saltstack.com/index.html", tgt]
- )
- if ret.find("HTTP 599") == -1:
- break
- time.sleep(sleep)
- if ret.find("HTTP 599") != -1:
- raise Exception("https://repo.saltstack.com/index.html returned 599 error")
- self.assertIn("Bootstrap", ret)
- self.assertIn("Debian", ret)
- self.assertIn("Windows", ret)
- self.assertNotIn("AYBABTU", ret)
- @slowTest
- def test_get_url_file(self):
- """
- cp.get_url with file:// source given
- """
- tgt = ""
- src = os.path.join("file://", RUNTIME_VARS.FILES, "file", "base", "file.big")
- ret = self.run_function("cp.get_url", [src, tgt])
- with salt.utils.files.fopen(ret, "r") as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn("KNIGHT: They're nervous, sire.", data)
- self.assertNotIn("bacon", data)
- @slowTest
- def test_get_url_file_no_dest(self):
- """
- cp.get_url with file:// source given and destination set as None
- """
- tgt = None
- src = os.path.join("file://", RUNTIME_VARS.FILES, "file", "base", "file.big")
- ret = self.run_function("cp.get_url", [src, tgt])
- self.assertIn("KNIGHT: They're nervous, sire.", ret)
- self.assertNotIn("bacon", ret)
- @with_tempfile()
- @slowTest
- def test_get_url_ftp(self, tgt):
- """
- cp.get_url with https:// source given
- """
- self.run_function(
- "cp.get_url",
- [
- "ftp://ftp.freebsd.org/pub/FreeBSD/releases/amd64/amd64/12.0-RELEASE/MANIFEST",
- tgt,
- ],
- )
- with salt.utils.files.fopen(tgt, "r") as instructions:
- data = salt.utils.stringutils.to_unicode(instructions.read())
- self.assertIn("Base system", data)
- # cp.get_file_str tests
- @slowTest
- def test_get_file_str_salt(self):
- """
- cp.get_file_str with salt:// source given
- """
- src = "salt://grail/scene33"
- ret = self.run_function("cp.get_file_str", [src])
- self.assertIn("KNIGHT: They're nervous, sire.", ret)
- @slowTest
- def test_get_file_str_nonexistent_source(self):
- """
- cp.get_file_str with nonexistent salt:// source given
- """
- src = "salt://grail/nonexistent_scene"
- ret = self.run_function("cp.get_file_str", [src])
- self.assertEqual(ret, False)
- @skipIf(
- salt.utils.platform.is_darwin() and six.PY2, "This test hangs on OS X on Py2"
- )
- @slowTest
- def test_get_file_str_https(self):
- """
- cp.get_file_str with https:// source given
- """
- src = "https://repo.saltstack.com/index.html"
- ret = self.run_function("cp.get_file_str", [src])
- self.assertIn("Bootstrap", ret)
- self.assertIn("Debian", ret)
- self.assertIn("Windows", ret)
- self.assertNotIn("AYBABTU", ret)
- @slowTest
- def test_get_file_str_local(self):
- """
- cp.get_file_str with file:// source given
- """
- src = os.path.join("file://", RUNTIME_VARS.FILES, "file", "base", "file.big")
- ret = self.run_function("cp.get_file_str", [src])
- self.assertIn("KNIGHT: They're nervous, sire.", ret)
- self.assertNotIn("bacon", ret)
- # caching tests
- @slowTest
- def test_cache_file(self):
- """
- cp.cache_file
- """
- ret = self.run_function("cp.cache_file", ["salt://grail/scene33"])
- with salt.utils.files.fopen(ret, "r") as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn("KNIGHT: They're nervous, sire.", data)
- self.assertNotIn("bacon", data)
- @slowTest
- def test_cache_files(self):
- """
- cp.cache_files
- """
- ret = self.run_function(
- "cp.cache_files", [["salt://grail/scene33", "salt://grail/36/scene"]]
- )
- for path in ret:
- with salt.utils.files.fopen(path, "r") as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn("ARTHUR:", data)
- self.assertNotIn("bacon", data)
- @with_tempfile()
- @slowTest
- def test_cache_master(self, tgt):
- """
- cp.cache_master
- """
- ret = self.run_function("cp.cache_master", [tgt],)
- for path in ret:
- self.assertTrue(os.path.exists(path))
- @slowTest
- def test_cache_local_file(self):
- """
- cp.cache_local_file
- """
- src = os.path.join(RUNTIME_VARS.TMP, "random")
- with salt.utils.files.fopen(src, "w+") as fn_:
- fn_.write(salt.utils.stringutils.to_str("foo"))
- ret = self.run_function("cp.cache_local_file", [src])
- with salt.utils.files.fopen(ret, "r") as cp_:
- self.assertEqual(salt.utils.stringutils.to_unicode(cp_.read()), "foo")
- @skipIf(not salt.utils.path.which("nginx"), "nginx not installed")
- @skip_if_not_root
- @slowTest
- def test_cache_remote_file(self):
- """
- cp.cache_file
- """
- nginx_port = get_unused_localhost_port()
- url_prefix = "http://localhost:{0}/".format(nginx_port)
- temp_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True)
- nginx_root_dir = os.path.join(temp_dir, "root")
- nginx_conf_dir = os.path.join(temp_dir, "conf")
- nginx_conf = os.path.join(nginx_conf_dir, "nginx.conf")
- nginx_pidfile = os.path.join(nginx_conf_dir, "nginx.pid")
- file_contents = "Hello world!"
- for dirname in (nginx_root_dir, nginx_conf_dir):
- os.makedirs(dirname)
- # Write the temp file
- with salt.utils.files.fopen(
- os.path.join(nginx_root_dir, "actual_file"), "w"
- ) as fp_:
- fp_.write(salt.utils.stringutils.to_str(file_contents))
- # Write the nginx config
- with salt.utils.files.fopen(nginx_conf, "w") as fp_:
- fp_.write(
- textwrap.dedent(
- salt.utils.stringutils.to_str(
- """\
- user root;
- worker_processes 1;
- error_log {nginx_conf_dir}/server_error.log;
- pid {nginx_pidfile};
- events {{
- worker_connections 1024;
- }}
- http {{
- include /etc/nginx/mime.types;
- default_type application/octet-stream;
- access_log {nginx_conf_dir}/access.log;
- error_log {nginx_conf_dir}/error.log;
- server {{
- listen {nginx_port} default_server;
- server_name cachefile.local;
- root {nginx_root_dir};
- location ~ ^/301$ {{
- return 301 /actual_file;
- }}
- location ~ ^/302$ {{
- return 302 /actual_file;
- }}
- }}
- }}""".format(
- **locals()
- )
- )
- )
- )
- self.run_function("cmd.run", [["nginx", "-c", nginx_conf]], python_shell=False)
- with salt.utils.files.fopen(nginx_pidfile) as fp_:
- nginx_pid = int(fp_.read().strip())
- nginx_proc = psutil.Process(pid=nginx_pid)
- self.addCleanup(nginx_proc.send_signal, signal.SIGQUIT)
- for code in ("", "301", "302"):
- url = url_prefix + (code or "actual_file")
- log.debug("attempting to cache %s", url)
- ret = self.run_function("cp.cache_file", [url])
- self.assertTrue(ret)
- with salt.utils.files.fopen(ret) as fp_:
- cached_contents = salt.utils.stringutils.to_unicode(fp_.read())
- self.assertEqual(cached_contents, file_contents)
- @slowTest
- def test_list_states(self):
- """
- cp.list_states
- """
- ret = self.run_function("cp.list_states",)
- self.assertIn("core", ret)
- self.assertIn("top", ret)
- @slowTest
- def test_list_minion(self):
- """
- cp.list_minion
- """
- self.run_function("cp.cache_file", ["salt://grail/scene33"])
- ret = self.run_function("cp.list_minion")
- found = False
- search = "grail/scene33"
- if salt.utils.platform.is_windows():
- search = r"grail\scene33"
- for path in ret:
- if search in path:
- found = True
- break
- self.assertTrue(found)
- @slowTest
- def test_is_cached(self):
- """
- cp.is_cached
- """
- self.run_function("cp.cache_file", ["salt://grail/scene33"])
- ret1 = self.run_function("cp.is_cached", ["salt://grail/scene33"])
- self.assertTrue(ret1)
- ret2 = self.run_function("cp.is_cached", ["salt://fasldkgj/poicxzbn"])
- self.assertFalse(ret2)
- @slowTest
- def test_hash_file(self):
- """
- cp.hash_file
- """
- sha256_hash = self.run_function("cp.hash_file", ["salt://grail/scene33"])
- path = self.run_function("cp.cache_file", ["salt://grail/scene33"])
- with salt.utils.files.fopen(path, "rb") as fn_:
- data = fn_.read()
- self.assertEqual(sha256_hash["hsum"], hashlib.sha256(data).hexdigest())
- @with_tempfile()
- @slowTest
- def test_get_file_from_env_predefined(self, tgt):
- """
- cp.get_file
- """
- tgt = os.path.join(RUNTIME_VARS.TMP, "cheese")
- try:
- self.run_function("cp.get_file", ["salt://cheese", tgt])
- with salt.utils.files.fopen(tgt, "r") as cheese:
- data = salt.utils.stringutils.to_unicode(cheese.read())
- self.assertIn("Gromit", data)
- self.assertNotIn("Comte", data)
- finally:
- os.unlink(tgt)
- @with_tempfile()
- @slowTest
- def test_get_file_from_env_in_url(self, tgt):
- tgt = os.path.join(RUNTIME_VARS.TMP, "cheese")
- try:
- self.run_function("cp.get_file", ["salt://cheese?saltenv=prod", tgt])
- with salt.utils.files.fopen(tgt, "r") as cheese:
- data = salt.utils.stringutils.to_unicode(cheese.read())
- self.assertIn("Gromit", data)
- self.assertIn("Comte", data)
- finally:
- os.unlink(tgt)
- @slowTest
- def test_push(self):
- log_to_xfer = os.path.join(RUNTIME_VARS.TMP, uuid.uuid4().hex)
- open(log_to_xfer, "w").close() # pylint: disable=resource-leakage
- try:
- self.run_function("cp.push", [log_to_xfer])
- tgt_cache_file = os.path.join(
- RUNTIME_VARS.TMP,
- "master-minion-root",
- "cache",
- "minions",
- "minion",
- "files",
- RUNTIME_VARS.TMP,
- log_to_xfer,
- )
- self.assertTrue(
- os.path.isfile(tgt_cache_file), "File was not cached on the master"
- )
- finally:
- os.unlink(tgt_cache_file)
- @slowTest
- def test_envs(self):
- self.assertEqual(sorted(self.run_function("cp.envs")), sorted(["base", "prod"]))
|