123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- # -*- coding: utf-8 -*-
- from __future__ import absolute_import, print_function, unicode_literals
- import logging
- import os
- import shutil
- import tempfile
- import time
- import salt.master
- import salt.transport.client
- import salt.utils.files
- import salt.utils.platform
- import salt.utils.user
- from tests.support.case import TestCase
- from tests.support.mixins import AdaptedConfigurationTestCaseMixin
- from tests.support.runtests import RUNTIME_VARS
- log = logging.getLogger(__name__)
- class ConfigMixin:
- @classmethod
- def setUpClass(cls):
- cls.master_config = AdaptedConfigurationTestCaseMixin.get_config("master")
- cls.minion_config = AdaptedConfigurationTestCaseMixin.get_temp_config(
- "minion",
- id="root",
- transport=cls.master_config["transport"],
- auth_tries=1,
- auth_timeout=5,
- master_ip="127.0.0.1",
- master_port=cls.master_config["ret_port"],
- master_uri="tcp://127.0.0.1:{}".format(cls.master_config["ret_port"]),
- )
- if not salt.utils.platform.is_windows():
- user = cls.master_config["user"]
- else:
- user = salt.utils.user.get_specific_user().replace("\\", "_")
- if user.startswith("sudo_"):
- user = user.split("sudo_")[-1]
- cls.user = user
- cls.keyfile = ".{}_key".format(cls.user)
- cls.keypath = os.path.join(cls.master_config["cachedir"], cls.keyfile)
- with salt.utils.files.fopen(cls.keypath) as keyfd:
- cls.key = keyfd.read()
- @classmethod
- def tearDownClass(cls):
- del cls.master_config
- del cls.minion_config
- del cls.key
- del cls.keyfile
- del cls.keypath
- class ClearFuncsAuthTestCase(ConfigMixin, TestCase):
- def test_auth_info_not_allowed(self):
- assert hasattr(salt.master.ClearFuncs, "_prep_auth_info")
- clear_channel = salt.transport.client.ReqChannel.factory(
- self.minion_config, crypt="clear"
- )
- msg = {"cmd": "_prep_auth_info"}
- rets = clear_channel.send(msg, timeout=15)
- ret_key = None
- for ret in rets:
- try:
- ret_key = ret[self.user]
- break
- except (TypeError, KeyError):
- pass
- assert ret_key != self.key, "Able to retrieve user key"
- class ClearFuncsPubTestCase(ConfigMixin, TestCase):
- def setUp(self):
- tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- self.addCleanup(shutil.rmtree, tempdir, ignore_errors=True)
- self.tmpfile = os.path.join(tempdir, "evil_file")
- def tearDown(self):
- self.tmpfile = None
- def test_pub_not_allowed(self):
- assert hasattr(salt.master.ClearFuncs, "_send_pub")
- assert not os.path.exists(self.tmpfile)
- clear_channel = salt.transport.client.ReqChannel.factory(
- self.minion_config, crypt="clear"
- )
- jid = "202003100000000001"
- msg = {
- "cmd": "_send_pub",
- "fun": "file.write",
- "jid": jid,
- "arg": [self.tmpfile, "evil contents"],
- "kwargs": {"show_jid": False, "show_timeout": False},
- "ret": "",
- "tgt": "minion",
- "tgt_type": "glob",
- "user": "root",
- }
- eventbus = salt.utils.event.get_event(
- "master",
- sock_dir=self.master_config["sock_dir"],
- transport=self.master_config["transport"],
- opts=self.master_config,
- )
- ret = clear_channel.send(msg, timeout=15)
- if salt.utils.platform.is_windows():
- time.sleep(30)
- timeout = 30
- else:
- timeout = 5
- ret_evt = None
- start = time.time()
- while time.time() - start <= timeout:
- raw = eventbus.get_event(timeout, auto_reconnect=True)
- if raw and "jid" in raw and raw["jid"] == jid:
- ret_evt = raw
- break
- assert not os.path.exists(self.tmpfile), "Evil file created"
- class ClearFuncsConfigTest(ConfigMixin, TestCase):
- def setUp(self):
- self.evil_file_path = os.path.join(
- os.path.dirname(self.master_config["conf_file"]), "evil.conf"
- )
- def tearDown(self):
- try:
- os.remove(self.evil_file_path)
- except OSError:
- pass
- self.evil_file_path = None
- def test_clearfuncs_config(self):
- clear_channel = salt.transport.client.ReqChannel.factory(
- self.minion_config, crypt="clear"
- )
- msg = {
- "key": self.key,
- "cmd": "wheel",
- "fun": "config.update_config",
- "file_name": "../evil",
- "yaml_contents": "win",
- }
- ret = clear_channel.send(msg, timeout=5)
- assert not os.path.exists(
- self.evil_file_path
- ), "Wrote file via directory traversal"
- assert ret["data"]["return"] == "Invalid path"
- class ClearFuncsFileRoots(ConfigMixin, TestCase):
- def setUp(self):
- self.file_roots_dir = self.master_config["file_roots"]["base"][0]
- self.target_dir = os.path.dirname(self.file_roots_dir)
- def tearDown(self):
- try:
- os.remove(os.path.join(self.target_dir, "pwn.txt"))
- except OSError:
- pass
- def test_fileroots_write(self):
- clear_channel = salt.transport.client.ReqChannel.factory(
- self.minion_config, crypt="clear"
- )
- msg = {
- "key": self.key,
- "cmd": "wheel",
- "fun": "file_roots.write",
- "data": "win",
- "path": os.path.join("..", "pwn.txt"),
- "saltenv": "base",
- }
- ret = clear_channel.send(msg, timeout=5)
- assert not os.path.exists(
- os.path.join(self.target_dir, "pwn.txt")
- ), "Wrote file via directory traversal"
- def test_fileroots_read(self):
- readpath = os.path.relpath(self.keypath, self.file_roots_dir)
- relative_key_path = os.path.join(self.file_roots_dir, readpath)
- log.debug("Master root_dir: %s", self.master_config["root_dir"])
- log.debug("File Root: %s", self.file_roots_dir)
- log.debug("Key Path: %s", self.keypath)
- log.debug("Read Path: %s", readpath)
- log.debug("Relative Key Path: %s", relative_key_path)
- log.debug("Absolute Read Path: %s", os.path.abspath(relative_key_path))
- # If this asserion fails the test may need to be re-written
- assert os.path.abspath(relative_key_path) == self.keypath
- clear_channel = salt.transport.client.ReqChannel.factory(
- self.minion_config, crypt="clear"
- )
- msg = {
- "key": self.key,
- "cmd": "wheel",
- "fun": "file_roots.read",
- "path": readpath,
- "saltenv": "base",
- }
- ret = clear_channel.send(msg, timeout=5)
- try:
- # When vulnerable this assertion will fail.
- assert (
- list(ret["data"]["return"][0].items())[0][1] != self.key
- ), "Read file via directory traversal"
- except IndexError:
- pass
- # If the vulnerability is fixed, no data will be returned.
- assert ret["data"]["return"] == []
- class ClearFuncsTokenTest(ConfigMixin, TestCase):
- def test_token(self):
- tokensdir = os.path.join(self.master_config["cachedir"], "tokens")
- assert os.path.exists(tokensdir), tokensdir
- clear_channel = salt.transport.client.ReqChannel.factory(
- self.minion_config, crypt="clear"
- )
- msg = {
- "arg": [],
- "cmd": "get_token",
- "token": os.path.join("..", "minions", "minion", "data.p"),
- }
- ret = clear_channel.send(msg, timeout=5)
- assert "pillar" not in ret, "Read minion data via directory traversal"
|