123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- # -*- coding: utf-8 -*-
- """
- Test the verification routines
- """
- # Import Python libs
- from __future__ import absolute_import, print_function, unicode_literals
- import ctypes
- import getpass
- import os
- import shutil
- import socket
- import stat
- import sys
- import tempfile
- # Import salt libs
- import salt.utils.files
- import salt.utils.platform
- # Import 3rd-party libs
- from salt.ext import six
- from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
- from salt.utils.verify import (
- check_max_open_files,
- check_user,
- clean_path,
- log,
- valid_id,
- verify_env,
- verify_log,
- verify_log_files,
- verify_logs_filter,
- verify_socket,
- zmq_version,
- )
- from tests.support.helpers import TstSuiteLoggingHandler, requires_network
- from tests.support.mock import MagicMock, patch
- # Import Salt Testing libs
- # Import Salt Testing libs
- from tests.support.runtests import RUNTIME_VARS
- from tests.support.unit import TestCase, skipIf
- # Import third party libs
- if sys.platform.startswith("win"):
- import win32file
- else:
- import resource
- # Import third party libs
- if sys.platform.startswith("win"):
- import win32file
- else:
- import resource
- class TestVerify(TestCase):
- """
- Verify module tests
- """
- def test_valid_id_exception_handler(self):
- """
- Ensure we just return False if we pass in invalid or undefined paths.
- Refs #8259
- """
- opts = {"pki_dir": "/tmp/whatever"}
- self.assertFalse(valid_id(opts, None))
- def test_valid_id_pathsep(self):
- """
- Path separators in id should make it invalid
- """
- opts = {"pki_dir": "/tmp/whatever"}
- # We have to test both path separators because os.path.normpath will
- # convert forward slashes to backslashes on Windows.
- for pathsep in ("/", "\\"):
- self.assertFalse(valid_id(opts, pathsep.join(("..", "foobar"))))
- def test_zmq_verify(self):
- self.assertTrue(zmq_version())
- def test_zmq_verify_insufficient(self):
- import zmq
- with patch.object(zmq, "__version__", "2.1.0"):
- self.assertFalse(zmq_version())
- def test_user(self):
- self.assertTrue(check_user(getpass.getuser()))
- def test_no_user(self):
- # Catch sys.stderr here since no logging is configured and
- # check_user WILL write to sys.stderr
- class FakeWriter(object):
- def __init__(self):
- self.output = ""
- def write(self, data):
- self.output += data
- stderr = sys.stderr
- writer = FakeWriter()
- sys.stderr = writer
- # Now run the test
- if sys.platform.startswith("win"):
- self.assertTrue(check_user("nouser"))
- else:
- self.assertFalse(check_user("nouser"))
- # Restore sys.stderr
- sys.stderr = stderr
- if writer.output != 'CRITICAL: User not found: "nouser"\n':
- # If there's a different error catch, write it to sys.stderr
- sys.stderr.write(writer.output)
- @skipIf(salt.utils.platform.is_windows(), "No verify_env Windows")
- def test_verify_env(self):
- root_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- var_dir = os.path.join(root_dir, "var", "log", "salt")
- key_dir = os.path.join(root_dir, "key_dir")
- verify_env([var_dir], getpass.getuser(), root_dir=root_dir)
- self.assertTrue(os.path.exists(var_dir))
- dir_stat = os.stat(var_dir)
- self.assertEqual(dir_stat.st_uid, os.getuid())
- self.assertEqual(dir_stat.st_mode & stat.S_IRWXU, stat.S_IRWXU)
- self.assertEqual(dir_stat.st_mode & stat.S_IRWXG, 40)
- self.assertEqual(dir_stat.st_mode & stat.S_IRWXO, 5)
- @requires_network(only_local_network=True)
- def test_verify_socket(self):
- self.assertTrue(verify_socket("", 18000, 18001))
- if socket.has_ipv6:
- # Only run if Python is built with IPv6 support; otherwise
- # this will just fail.
- try:
- self.assertTrue(verify_socket("::", 18000, 18001))
- except socket.error as serr:
- # Python has IPv6 enabled, but the system cannot create
- # IPv6 sockets (otherwise the test would return a bool)
- # - skip the test
- #
- # FIXME - possibly emit a message that the system does
- # not support IPv6.
- pass
- def test_max_open_files(self):
- with TstSuiteLoggingHandler() as handler:
- logmsg_dbg = "DEBUG:This salt-master instance has accepted {0} minion keys."
- logmsg_chk = (
- "{0}:The number of accepted minion keys({1}) should be lower "
- "than 1/4 of the max open files soft setting({2}). According "
- "to the system's hard limit, there's still a margin of {3} "
- "to raise the salt's max_open_files setting. Please consider "
- "raising this value."
- )
- logmsg_crash = (
- "{0}:The number of accepted minion keys({1}) should be lower "
- "than 1/4 of the max open files soft setting({2}). "
- "salt-master will crash pretty soon! According to the "
- "system's hard limit, there's still a margin of {3} to "
- "raise the salt's max_open_files setting. Please consider "
- "raising this value."
- )
- if sys.platform.startswith("win"):
- logmsg_crash = (
- "{0}:The number of accepted minion keys({1}) should be lower "
- "than 1/4 of the max open files soft setting({2}). "
- "salt-master will crash pretty soon! Please consider "
- "raising this value."
- )
- if sys.platform.startswith("win"):
- # Check the Windows API for more detail on this
- # http://msdn.microsoft.com/en-us/library/xt874334(v=vs.71).aspx
- # and the python binding http://timgolden.me.uk/pywin32-docs/win32file.html
- mof_s = mof_h = win32file._getmaxstdio()
- else:
- mof_s, mof_h = resource.getrlimit(resource.RLIMIT_NOFILE)
- tempdir = tempfile.mkdtemp(prefix="fake-keys")
- keys_dir = os.path.join(tempdir, "minions")
- os.makedirs(keys_dir)
- mof_test = 256
- if sys.platform.startswith("win"):
- win32file._setmaxstdio(mof_test)
- else:
- resource.setrlimit(resource.RLIMIT_NOFILE, (mof_test, mof_h))
- try:
- prev = 0
- for newmax, level in (
- (24, None),
- (66, "INFO"),
- (127, "WARNING"),
- (196, "CRITICAL"),
- ):
- for n in range(prev, newmax):
- kpath = os.path.join(keys_dir, six.text_type(n))
- with salt.utils.files.fopen(kpath, "w") as fp_:
- fp_.write(
- str(n)
- ) # future lint: disable=blacklisted-function
- opts = {"max_open_files": newmax, "pki_dir": tempdir}
- check_max_open_files(opts)
- if level is None:
- # No log message is triggered, only the DEBUG one which
- # tells us how many minion keys were accepted.
- self.assertEqual([logmsg_dbg.format(newmax)], handler.messages)
- else:
- self.assertIn(logmsg_dbg.format(newmax), handler.messages)
- self.assertIn(
- logmsg_chk.format(
- level,
- newmax,
- mof_test,
- mof_test - newmax
- if sys.platform.startswith("win")
- else mof_h - newmax,
- ),
- handler.messages,
- )
- handler.clear()
- prev = newmax
- newmax = mof_test
- for n in range(prev, newmax):
- kpath = os.path.join(keys_dir, six.text_type(n))
- with salt.utils.files.fopen(kpath, "w") as fp_:
- fp_.write(str(n)) # future lint: disable=blacklisted-function
- opts = {"max_open_files": newmax, "pki_dir": tempdir}
- check_max_open_files(opts)
- self.assertIn(logmsg_dbg.format(newmax), handler.messages)
- self.assertIn(
- logmsg_crash.format(
- "CRITICAL",
- newmax,
- mof_test,
- mof_test - newmax
- if sys.platform.startswith("win")
- else mof_h - newmax,
- ),
- handler.messages,
- )
- handler.clear()
- except IOError as err:
- if err.errno == 24:
- # Too many open files
- self.skipTest("We've hit the max open files setting")
- raise
- finally:
- if sys.platform.startswith("win"):
- win32file._setmaxstdio(mof_h)
- else:
- resource.setrlimit(resource.RLIMIT_NOFILE, (mof_s, mof_h))
- shutil.rmtree(tempdir)
- def test_verify_log(self):
- """
- Test that verify_log works as expected
- """
- message = (
- "Insecure logging configuration detected! Sensitive data may be logged."
- )
- mock_cheese = MagicMock()
- with patch.object(log, "warning", mock_cheese):
- verify_log({"log_level": "cheeseshop"})
- mock_cheese.assert_called_once_with(message)
- mock_trace = MagicMock()
- with patch.object(log, "warning", mock_trace):
- verify_log({"log_level": "trace"})
- mock_trace.assert_called_once_with(message)
- mock_none = MagicMock()
- with patch.object(log, "warning", mock_none):
- verify_log({})
- mock_none.assert_called_once_with(message)
- mock_info = MagicMock()
- with patch.object(log, "warning", mock_info):
- verify_log({"log_level": "info"})
- self.assertTrue(mock_info.call_count == 0)
- class TestVerifyLog(TestCase):
- def setUp(self):
- self.tmpdir = tempfile.mkdtemp()
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
- def test_verify_logs_filter(self):
- filtered = verify_logs_filter(
- ["udp://foo", "tcp://bar", "/tmp/foo", "file://tmp/bar"]
- )
- assert filtered == ["/tmp/foo"], filtered
- @skipIf(salt.utils.platform.is_windows(), "Not applicable on Windows")
- def test_verify_log_files_udp_scheme(self):
- verify_log_files(["udp://foo"], getpass.getuser())
- self.assertFalse(os.path.isdir(os.path.join(os.getcwd(), "udp:")))
- @skipIf(salt.utils.platform.is_windows(), "Not applicable on Windows")
- def test_verify_log_files_tcp_scheme(self):
- verify_log_files(["udp://foo"], getpass.getuser())
- self.assertFalse(os.path.isdir(os.path.join(os.getcwd(), "tcp:")))
- @skipIf(salt.utils.platform.is_windows(), "Not applicable on Windows")
- def test_verify_log_files_file_scheme(self):
- verify_log_files(["file://{}"], getpass.getuser())
- self.assertFalse(os.path.isdir(os.path.join(os.getcwd(), "file:")))
- @skipIf(salt.utils.platform.is_windows(), "Not applicable on Windows")
- def test_verify_log_files(self):
- path = os.path.join(self.tmpdir, "foo", "bar.log")
- self.assertFalse(os.path.exists(path))
- verify_log_files([path], getpass.getuser())
- self.assertTrue(os.path.exists(path))
- class TestCleanPath(TestCase):
- """
- salt.utils.clean_path works as expected
- """
- def setUp(self):
- self.tmpdir = tempfile.mkdtemp()
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
- def test_clean_path_valid(self):
- path_a = os.path.join(self.tmpdir, "foo")
- path_b = os.path.join(self.tmpdir, "foo", "bar")
- assert clean_path(path_a, path_b) == path_b
- def test_clean_path_invalid(self):
- path_a = os.path.join(self.tmpdir, "foo")
- path_b = os.path.join(self.tmpdir, "baz", "bar")
- assert clean_path(path_a, path_b) == ""
- __CSL = None
- def symlink(source, link_name):
- """
- symlink(source, link_name) Creates a symbolic link pointing to source named
- link_name
- """
- global __CSL
- if __CSL is None:
- csl = ctypes.windll.kernel32.CreateSymbolicLinkW
- csl.argtypes = (ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint32)
- csl.restype = ctypes.c_ubyte
- __CSL = csl
- flags = 0
- if source is not None and os.path.isdir(source):
- flags = 1
- if __CSL(link_name, source, flags) == 0:
- raise ctypes.WinError()
- @skipIf(six.PY2 and salt.utils.platform.is_windows(), "Skipped on windows py2")
- class TestCleanPathLink(TestCase):
- """
- Ensure salt.utils.clean_path works with symlinked directories and files
- """
- def setUp(self):
- self.tmpdir = tempfile.mkdtemp()
- self.to_path = os.path.join(self.tmpdir, "linkto")
- self.from_path = os.path.join(self.tmpdir, "linkfrom")
- if six.PY2 or salt.utils.platform.is_windows():
- kwargs = {}
- else:
- kwargs = {"target_is_directory": True}
- if salt.utils.platform.is_windows():
- symlink(self.to_path, self.from_path, **kwargs)
- else:
- os.symlink(self.to_path, self.from_path, **kwargs)
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
- def test_clean_path_symlinked_src(self):
- test_path = os.path.join(self.from_path, "test")
- expect_path = os.path.join(self.to_path, "test")
- ret = clean_path(self.from_path, test_path)
- assert ret == expect_path, "{} is not {}".format(ret, expect_path)
- def test_clean_path_symlinked_tgt(self):
- test_path = os.path.join(self.to_path, "test")
- expect_path = os.path.join(self.to_path, "test")
- ret = clean_path(self.from_path, test_path)
- assert ret == expect_path, "{} is not {}".format(ret, expect_path)
|