123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700 |
- # -*- coding: utf-8 -*-
- # Import python libs
- from __future__ import absolute_import, print_function, unicode_literals
- import os
- import sys
- import uuid
- import hashlib
- import logging
- import psutil
- import shutil
- import signal
- import tempfile
- import textwrap
- import time
- # Import Salt Testing libs
- from tests.support.case import ModuleCase
- from tests.support.helpers import (
- get_unused_localhost_port,
- with_tempfile)
- from tests.support.unit import skipIf
- from tests.support.runtests import RUNTIME_VARS
- # Import 3rd party libs
- import pytest
- import salt.ext.six as six
- # Import salt libs
- import salt.utils.files
- import salt.utils.path
- import salt.utils.platform
- import salt.utils.stringutils
- log = logging.getLogger(__name__)
- SSL3_SUPPORT = sys.version_info >= (2, 7, 9)
- @pytest.mark.windows_whitelisted
- class CPModuleTest(ModuleCase):
- '''
- Validate the cp module
- '''
- def run_function(self, *args, **kwargs):
- '''
- Ensure that results are decoded
- TODO: maybe move this behavior to ModuleCase itself?
- '''
- return salt.utils.data.decode(
- super(CPModuleTest, self).run_function(*args, **kwargs)
- )
- @with_tempfile()
- def test_get_file(self, tgt):
- '''
- cp.get_file
- '''
- self.run_function(
- 'cp.get_file',
- [
- 'salt://grail/scene33',
- tgt,
- ])
- with salt.utils.files.fopen(tgt, 'r') as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn('KNIGHT: They\'re nervous, sire.', data)
- self.assertNotIn('bacon', data)
- def test_get_file_to_dir(self):
- '''
- cp.get_file
- '''
- tgt = os.path.join(RUNTIME_VARS.TMP, '')
- self.run_function(
- 'cp.get_file',
- [
- 'salt://grail/scene33',
- tgt,
- ])
- with salt.utils.files.fopen(tgt + 'scene33', 'r') as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn('KNIGHT: They\'re nervous, sire.', data)
- self.assertNotIn('bacon', data)
- @with_tempfile()
- @skipIf(salt.utils.platform.is_windows() and six.PY3, 'This test hangs on Windows on Py3')
- def test_get_file_templated_paths(self, tgt):
- '''
- cp.get_file
- '''
- self.run_function(
- 'cp.get_file',
- [
- 'salt://{{grains.test_grain}}',
- tgt.replace('cheese', '{{grains.test_grain}}')
- ],
- template='jinja'
- )
- with salt.utils.files.fopen(tgt, 'r') as cheese:
- data = salt.utils.stringutils.to_unicode(cheese.read())
- self.assertIn('Gromit', data)
- self.assertNotIn('bacon', data)
- @with_tempfile()
- def test_get_file_gzipped(self, tgt):
- '''
- cp.get_file
- '''
- src = os.path.join(RUNTIME_VARS.FILES, 'file', 'base', 'file.big')
- with salt.utils.files.fopen(src, 'rb') as fp_:
- hash_str = hashlib.md5(fp_.read()).hexdigest()
- self.run_function(
- 'cp.get_file',
- [
- 'salt://file.big',
- tgt,
- ],
- gzip=5
- )
- with salt.utils.files.fopen(tgt, 'rb') as scene:
- data = scene.read()
- self.assertEqual(hash_str, hashlib.md5(data).hexdigest())
- data = salt.utils.stringutils.to_unicode(data)
- self.assertIn('KNIGHT: They\'re nervous, sire.', data)
- self.assertNotIn('bacon', data)
- def test_get_file_makedirs(self):
- '''
- cp.get_file
- '''
- tgt = os.path.join(RUNTIME_VARS.TMP, 'make', 'dirs', 'scene33')
- self.run_function(
- 'cp.get_file',
- [
- 'salt://grail/scene33',
- tgt,
- ],
- makedirs=True
- )
- self.addCleanup(shutil.rmtree, os.path.join(RUNTIME_VARS.TMP, 'make'), ignore_errors=True)
- with salt.utils.files.fopen(tgt, 'r') as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn('KNIGHT: They\'re nervous, sire.', data)
- self.assertNotIn('bacon', data)
- @with_tempfile()
- def test_get_template(self, tgt):
- '''
- cp.get_template
- '''
- self.run_function(
- 'cp.get_template',
- ['salt://grail/scene33', tgt],
- spam='bacon')
- with salt.utils.files.fopen(tgt, 'r') as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn('bacon', data)
- self.assertNotIn('spam', data)
- def test_get_dir(self):
- '''
- cp.get_dir
- '''
- tgt = os.path.join(RUNTIME_VARS.TMP, 'many')
- self.run_function(
- 'cp.get_dir',
- [
- 'salt://grail',
- tgt
- ])
- self.assertIn('grail', os.listdir(tgt))
- self.assertIn('36', os.listdir(os.path.join(tgt, 'grail')))
- self.assertIn('empty', os.listdir(os.path.join(tgt, 'grail')))
- self.assertIn('scene', os.listdir(os.path.join(tgt, 'grail', '36')))
- def test_get_dir_templated_paths(self):
- '''
- cp.get_dir
- '''
- tgt = os.path.join(RUNTIME_VARS.TMP, 'many')
- self.run_function(
- 'cp.get_dir',
- [
- 'salt://{{grains.script}}',
- tgt.replace('many', '{{grains.alot}}')
- ]
- )
- self.assertIn('grail', os.listdir(tgt))
- self.assertIn('36', os.listdir(os.path.join(tgt, 'grail')))
- self.assertIn('empty', os.listdir(os.path.join(tgt, 'grail')))
- self.assertIn('scene', os.listdir(os.path.join(tgt, 'grail', '36')))
- # cp.get_url tests
- @with_tempfile()
- def test_get_url(self, tgt):
- '''
- cp.get_url with salt:// source given
- '''
- self.run_function(
- 'cp.get_url',
- [
- 'salt://grail/scene33',
- tgt,
- ])
- with salt.utils.files.fopen(tgt, 'r') as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn('KNIGHT: They\'re nervous, sire.', data)
- self.assertNotIn('bacon', data)
- def test_get_url_makedirs(self):
- '''
- cp.get_url
- '''
- tgt = os.path.join(RUNTIME_VARS.TMP, 'make', 'dirs', 'scene33')
- self.run_function(
- 'cp.get_url',
- [
- 'salt://grail/scene33',
- tgt,
- ],
- makedirs=True
- )
- self.addCleanup(shutil.rmtree, os.path.join(RUNTIME_VARS.TMP, 'make'), ignore_errors=True)
- with salt.utils.files.fopen(tgt, 'r') as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn('KNIGHT: They\'re nervous, sire.', data)
- self.assertNotIn('bacon', data)
- def test_get_url_dest_empty(self):
- '''
- cp.get_url with salt:// source given and destination omitted.
- '''
- ret = self.run_function(
- 'cp.get_url',
- [
- 'salt://grail/scene33',
- ])
- with salt.utils.files.fopen(ret, 'r') as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn('KNIGHT: They\'re nervous, sire.', data)
- self.assertNotIn('bacon', data)
- def test_get_url_no_dest(self):
- '''
- cp.get_url with salt:// source given and destination set as None
- '''
- tgt = None
- ret = self.run_function(
- 'cp.get_url',
- [
- 'salt://grail/scene33',
- tgt,
- ])
- self.assertIn('KNIGHT: They\'re nervous, sire.', ret)
- def test_get_url_nonexistent_source(self):
- '''
- cp.get_url with nonexistent salt:// source given
- '''
- tgt = None
- ret = self.run_function(
- 'cp.get_url',
- [
- 'salt://grail/nonexistent_scene',
- tgt,
- ])
- self.assertEqual(ret, False)
- def test_get_url_to_dir(self):
- '''
- cp.get_url with salt:// source
- '''
- tgt = os.path.join(RUNTIME_VARS.TMP, '')
- self.run_function(
- 'cp.get_url',
- [
- 'salt://grail/scene33',
- tgt,
- ])
- with salt.utils.files.fopen(tgt + 'scene33', 'r') as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn('KNIGHT: They\'re nervous, sire.', data)
- self.assertNotIn('bacon', data)
- @skipIf(not SSL3_SUPPORT, 'Requires python with SSL3 support')
- @skipIf(salt.utils.platform.is_darwin() and six.PY2, 'This test hangs on OS X on Py2')
- @with_tempfile()
- def test_get_url_https(self, tgt):
- '''
- cp.get_url with https:// source given
- '''
- self.run_function(
- 'cp.get_url',
- [
- 'https://repo.saltstack.com/index.html',
- tgt,
- ])
- with salt.utils.files.fopen(tgt, 'r') as instructions:
- data = salt.utils.stringutils.to_unicode(instructions.read())
- self.assertIn('Bootstrap', data)
- self.assertIn('Debian', data)
- self.assertIn('Windows', data)
- self.assertNotIn('AYBABTU', data)
- @skipIf(not SSL3_SUPPORT, 'Requires python with SSL3 support')
- @skipIf(salt.utils.platform.is_darwin() and six.PY2, 'This test hangs on OS X on Py2')
- def test_get_url_https_dest_empty(self):
- '''
- cp.get_url with https:// source given and destination omitted.
- '''
- ret = self.run_function(
- 'cp.get_url',
- [
- 'https://repo.saltstack.com/index.html',
- ])
- with salt.utils.files.fopen(ret, 'r') as instructions:
- data = salt.utils.stringutils.to_unicode(instructions.read())
- self.assertIn('Bootstrap', data)
- self.assertIn('Debian', data)
- self.assertIn('Windows', data)
- self.assertNotIn('AYBABTU', data)
- @skipIf(not SSL3_SUPPORT, 'Requires python with SSL3 support')
- @skipIf(salt.utils.platform.is_darwin() and six.PY2, 'This test hangs on OS X on Py2')
- def test_get_url_https_no_dest(self):
- '''
- cp.get_url with https:// source given and destination set as None
- '''
- timeout = 500
- start = time.time()
- sleep = 5
- tgt = None
- while time.time() - start <= timeout:
- ret = self.run_function(
- 'cp.get_url',
- [
- 'https://repo.saltstack.com/index.html',
- tgt,
- ])
- if ret.find('HTTP 599') == -1:
- break
- time.sleep(sleep)
- if ret.find('HTTP 599') != -1:
- raise Exception(
- 'https://repo.saltstack.com/index.html returned 599 error'
- )
- self.assertIn('Bootstrap', ret)
- self.assertIn('Debian', ret)
- self.assertIn('Windows', ret)
- self.assertNotIn('AYBABTU', ret)
- def test_get_url_file(self):
- '''
- cp.get_url with file:// source given
- '''
- tgt = ''
- src = os.path.join('file://', RUNTIME_VARS.FILES, 'file', 'base', 'file.big')
- ret = self.run_function(
- 'cp.get_url',
- [
- src,
- tgt,
- ])
- with salt.utils.files.fopen(ret, 'r') as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn('KNIGHT: They\'re nervous, sire.', data)
- self.assertNotIn('bacon', data)
- def test_get_url_file_no_dest(self):
- '''
- cp.get_url with file:// source given and destination set as None
- '''
- tgt = None
- src = os.path.join('file://', RUNTIME_VARS.FILES, 'file', 'base', 'file.big')
- ret = self.run_function(
- 'cp.get_url',
- [
- src,
- tgt,
- ])
- self.assertIn('KNIGHT: They\'re nervous, sire.', ret)
- self.assertNotIn('bacon', ret)
- # cp.get_file_str tests
- def test_get_file_str_salt(self):
- '''
- cp.get_file_str with salt:// source given
- '''
- src = 'salt://grail/scene33'
- ret = self.run_function(
- 'cp.get_file_str',
- [
- src,
- ])
- self.assertIn('KNIGHT: They\'re nervous, sire.', ret)
- def test_get_file_str_nonexistent_source(self):
- '''
- cp.get_file_str with nonexistent salt:// source given
- '''
- src = 'salt://grail/nonexistent_scene'
- ret = self.run_function(
- 'cp.get_file_str',
- [
- src,
- ])
- self.assertEqual(ret, False)
- @skipIf(not SSL3_SUPPORT, 'Requires python with SSL3 support')
- @skipIf(salt.utils.platform.is_darwin() and six.PY2, 'This test hangs on OS X on Py2')
- def test_get_file_str_https(self):
- '''
- cp.get_file_str with https:// source given
- '''
- src = 'https://repo.saltstack.com/index.html'
- ret = self.run_function(
- 'cp.get_file_str',
- [
- src,
- ])
- self.assertIn('Bootstrap', ret)
- self.assertIn('Debian', ret)
- self.assertIn('Windows', ret)
- self.assertNotIn('AYBABTU', ret)
- def test_get_file_str_local(self):
- '''
- cp.get_file_str with file:// source given
- '''
- src = os.path.join('file://', RUNTIME_VARS.FILES, 'file', 'base', 'file.big')
- ret = self.run_function(
- 'cp.get_file_str',
- [
- src,
- ])
- self.assertIn('KNIGHT: They\'re nervous, sire.', ret)
- self.assertNotIn('bacon', ret)
- # caching tests
- def test_cache_file(self):
- '''
- cp.cache_file
- '''
- ret = self.run_function(
- 'cp.cache_file',
- [
- 'salt://grail/scene33',
- ])
- with salt.utils.files.fopen(ret, 'r') as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn('KNIGHT: They\'re nervous, sire.', data)
- self.assertNotIn('bacon', data)
- def test_cache_files(self):
- '''
- cp.cache_files
- '''
- ret = self.run_function(
- 'cp.cache_files',
- [
- ['salt://grail/scene33', 'salt://grail/36/scene'],
- ])
- for path in ret:
- with salt.utils.files.fopen(path, 'r') as scene:
- data = salt.utils.stringutils.to_unicode(scene.read())
- self.assertIn('ARTHUR:', data)
- self.assertNotIn('bacon', data)
- @with_tempfile()
- def test_cache_master(self, tgt):
- '''
- cp.cache_master
- '''
- ret = self.run_function(
- 'cp.cache_master',
- [tgt],
- )
- for path in ret:
- self.assertTrue(os.path.exists(path))
- def test_cache_local_file(self):
- '''
- cp.cache_local_file
- '''
- src = os.path.join(RUNTIME_VARS.TMP, 'random')
- with salt.utils.files.fopen(src, 'w+') as fn_:
- fn_.write(salt.utils.stringutils.to_str('foo'))
- ret = self.run_function(
- 'cp.cache_local_file',
- [src])
- with salt.utils.files.fopen(ret, 'r') as cp_:
- self.assertEqual(
- salt.utils.stringutils.to_unicode(cp_.read()),
- 'foo'
- )
- @skipIf(not salt.utils.path.which('nginx'), 'nginx not installed')
- @pytest.mark.skip_if_not_root
- def test_cache_remote_file(self):
- '''
- cp.cache_file
- '''
- nginx_port = get_unused_localhost_port()
- url_prefix = 'http://localhost:{0}/'.format(nginx_port)
- temp_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True)
- nginx_root_dir = os.path.join(temp_dir, 'root')
- nginx_conf_dir = os.path.join(temp_dir, 'conf')
- nginx_conf = os.path.join(nginx_conf_dir, 'nginx.conf')
- nginx_pidfile = os.path.join(nginx_conf_dir, 'nginx.pid')
- file_contents = 'Hello world!'
- for dirname in (nginx_root_dir, nginx_conf_dir):
- os.makedirs(dirname)
- # Write the temp file
- with salt.utils.files.fopen(os.path.join(nginx_root_dir, 'actual_file'), 'w') as fp_:
- fp_.write(salt.utils.stringutils.to_str(file_contents))
- # Write the nginx config
- with salt.utils.files.fopen(nginx_conf, 'w') as fp_:
- fp_.write(textwrap.dedent(salt.utils.stringutils.to_str(
- '''\
- user root;
- worker_processes 1;
- error_log {nginx_conf_dir}/server_error.log;
- pid {nginx_pidfile};
- events {{
- worker_connections 1024;
- }}
- http {{
- include /etc/nginx/mime.types;
- default_type application/octet-stream;
- access_log {nginx_conf_dir}/access.log;
- error_log {nginx_conf_dir}/error.log;
- server {{
- listen {nginx_port} default_server;
- server_name cachefile.local;
- root {nginx_root_dir};
- location ~ ^/301$ {{
- return 301 /actual_file;
- }}
- location ~ ^/302$ {{
- return 302 /actual_file;
- }}
- }}
- }}'''.format(**locals())
- )))
- self.run_function(
- 'cmd.run',
- [['nginx', '-c', nginx_conf]],
- python_shell=False
- )
- with salt.utils.files.fopen(nginx_pidfile) as fp_:
- nginx_pid = int(fp_.read().strip())
- nginx_proc = psutil.Process(pid=nginx_pid)
- self.addCleanup(nginx_proc.send_signal, signal.SIGQUIT)
- for code in ('', '301', '302'):
- url = url_prefix + (code or 'actual_file')
- log.debug('attempting to cache %s', url)
- ret = self.run_function('cp.cache_file', [url])
- self.assertTrue(ret)
- with salt.utils.files.fopen(ret) as fp_:
- cached_contents = salt.utils.stringutils.to_unicode(fp_.read())
- self.assertEqual(cached_contents, file_contents)
- def test_list_states(self):
- '''
- cp.list_states
- '''
- ret = self.run_function(
- 'cp.list_states',
- )
- self.assertIn('core', ret)
- self.assertIn('top', ret)
- def test_list_minion(self):
- '''
- cp.list_minion
- '''
- self.run_function(
- 'cp.cache_file',
- [
- 'salt://grail/scene33',
- ])
- ret = self.run_function('cp.list_minion')
- found = False
- search = 'grail/scene33'
- if salt.utils.platform.is_windows():
- search = r'grail\scene33'
- for path in ret:
- if search in path:
- found = True
- break
- self.assertTrue(found)
- def test_is_cached(self):
- '''
- cp.is_cached
- '''
- self.run_function(
- 'cp.cache_file',
- [
- 'salt://grail/scene33',
- ])
- ret1 = self.run_function(
- 'cp.is_cached',
- [
- 'salt://grail/scene33',
- ])
- self.assertTrue(ret1)
- ret2 = self.run_function(
- 'cp.is_cached',
- [
- 'salt://fasldkgj/poicxzbn',
- ])
- self.assertFalse(ret2)
- def test_hash_file(self):
- '''
- cp.hash_file
- '''
- sha256_hash = self.run_function(
- 'cp.hash_file',
- [
- 'salt://grail/scene33',
- ])
- path = self.run_function(
- 'cp.cache_file',
- [
- 'salt://grail/scene33',
- ])
- with salt.utils.files.fopen(path, 'rb') as fn_:
- data = fn_.read()
- self.assertEqual(
- sha256_hash['hsum'], hashlib.sha256(data).hexdigest())
- @with_tempfile()
- def test_get_file_from_env_predefined(self, tgt):
- '''
- cp.get_file
- '''
- tgt = os.path.join(RUNTIME_VARS.TMP, 'cheese')
- try:
- self.run_function('cp.get_file', ['salt://cheese', tgt])
- with salt.utils.files.fopen(tgt, 'r') as cheese:
- data = salt.utils.stringutils.to_unicode(cheese.read())
- self.assertIn('Gromit', data)
- self.assertNotIn('Comte', data)
- finally:
- os.unlink(tgt)
- @with_tempfile()
- def test_get_file_from_env_in_url(self, tgt):
- tgt = os.path.join(RUNTIME_VARS.TMP, 'cheese')
- try:
- self.run_function('cp.get_file', ['salt://cheese?saltenv=prod', tgt])
- with salt.utils.files.fopen(tgt, 'r') as cheese:
- data = salt.utils.stringutils.to_unicode(cheese.read())
- self.assertIn('Gromit', data)
- self.assertIn('Comte', data)
- finally:
- os.unlink(tgt)
- def test_push(self):
- log_to_xfer = os.path.join(RUNTIME_VARS.TMP, uuid.uuid4().hex)
- open(log_to_xfer, 'w').close() # pylint: disable=resource-leakage
- try:
- self.run_function('cp.push', [log_to_xfer])
- tgt_cache_file = os.path.join(
- RUNTIME_VARS.TMP,
- 'master-minion-root',
- 'cache',
- 'minions',
- 'minion',
- 'files',
- RUNTIME_VARS.TMP,
- log_to_xfer)
- self.assertTrue(os.path.isfile(tgt_cache_file), 'File was not cached on the master')
- finally:
- os.unlink(tgt_cache_file)
- def test_envs(self):
- self.assertEqual(sorted(self.run_function('cp.envs')), sorted(['base', 'prod']))
|