123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- import os
- from subprocess import PIPE
- from textwrap import dedent
- import salt.renderers.gpg as gpg
- from salt.exceptions import SaltRenderError
- from tests.support.mixins import (
- AdaptedConfigurationTestCaseMixin,
- LoaderModuleMockMixin,
- )
- from tests.support.mock import MagicMock, Mock, call, patch
- from tests.support.unit import TestCase
- class GPGTestCase(TestCase, LoaderModuleMockMixin, AdaptedConfigurationTestCaseMixin):
- """
- unit test GPG renderer
- """
- def setup_loader_modules(self):
- return {gpg: {}}
- def test__get_gpg_exec(self):
- """
- test _get_gpg_exec
- """
- gpg_exec = "/bin/gpg"
- with patch("salt.utils.path.which", MagicMock(return_value=gpg_exec)):
- self.assertEqual(gpg._get_gpg_exec(), gpg_exec)
- with patch("salt.utils.path.which", MagicMock(return_value=False)):
- self.assertRaises(SaltRenderError, gpg._get_gpg_exec)
- def test__decrypt_ciphertext(self):
- """
- test _decrypt_ciphertext
- """
- key_dir = "/etc/salt/gpgkeys"
- secret = "Use more salt."
- crypted = "-----BEGIN PGP MESSAGE-----!@#$%^&*()_+-----END PGP MESSAGE-----"
- multisecret = "password is {0} and salt is {0}".format(secret)
- multicrypted = "password is {0} and salt is {0}".format(crypted)
- class GPGDecrypt:
- def communicate(self, *args, **kwargs):
- return [secret, None]
- class GPGNotDecrypt:
- def communicate(self, *args, **kwargs):
- return [None, "decrypt error"]
- with patch(
- "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
- ), patch("salt.utils.path.which", MagicMock()):
- with patch(
- "salt.renderers.gpg.Popen", MagicMock(return_value=GPGDecrypt())
- ):
- self.assertEqual(gpg._decrypt_ciphertexts(crypted), secret)
- self.assertEqual(gpg._decrypt_ciphertexts(multicrypted), multisecret)
- with patch(
- "salt.renderers.gpg.Popen", MagicMock(return_value=GPGNotDecrypt())
- ):
- self.assertEqual(gpg._decrypt_ciphertexts(crypted), crypted)
- self.assertEqual(gpg._decrypt_ciphertexts(multicrypted), multicrypted)
- def test__decrypt_object(self):
- """
- test _decrypt_object
- """
- secret = "Use more salt."
- crypted = "-----BEGIN PGP MESSAGE-----!@#$%^&*()_+-----END PGP MESSAGE-----"
- secret_map = {"secret": secret}
- crypted_map = {"secret": crypted}
- secret_list = [secret]
- crypted_list = [crypted]
- with patch(
- "salt.renderers.gpg._decrypt_ciphertext", MagicMock(return_value=secret)
- ):
- self.assertEqual(gpg._decrypt_object(secret), secret)
- self.assertEqual(gpg._decrypt_object(crypted), secret)
- self.assertEqual(gpg._decrypt_object(crypted_map), secret_map)
- self.assertEqual(gpg._decrypt_object(crypted_list), secret_list)
- self.assertEqual(gpg._decrypt_object(None), None)
- def test_render(self):
- """
- test render
- """
- key_dir = "/etc/salt/gpgkeys"
- secret = "Use more salt."
- crypted = "-----BEGIN PGP MESSAGE-----!@#$%^&*()_+"
- with patch("salt.renderers.gpg._get_gpg_exec", MagicMock(return_value=True)):
- with patch(
- "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
- ):
- with patch(
- "salt.renderers.gpg._decrypt_object", MagicMock(return_value=secret)
- ):
- self.assertEqual(gpg.render(crypted), secret)
- def test_render_bytes(self):
- """
- test rendering bytes
- """
- key_dir = "/etc/salt/gpgkeys"
- binfo = b"User more salt."
- with patch("salt.renderers.gpg._get_gpg_exec", MagicMock(return_value=True)):
- with patch(
- "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
- ):
- self.assertEqual(gpg.render(binfo), binfo.decode())
- def test_multi_render(self):
- key_dir = "/etc/salt/gpgkeys"
- secret = "Use more salt."
- expected = "\n".join([secret] * 3)
- crypted = dedent(
- """\
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- """
- )
- with patch("salt.renderers.gpg._get_gpg_exec", MagicMock(return_value=True)):
- with patch(
- "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
- ):
- with patch(
- "salt.renderers.gpg._decrypt_ciphertext",
- MagicMock(return_value=secret),
- ):
- self.assertEqual(gpg.render(crypted), expected)
- def test_render_with_binary_data_should_return_binary_data(self):
- key_dir = "/etc/salt/gpgkeys"
- secret = b"Use\x8b more\x8b salt."
- expected = b"\n".join([secret] * 3)
- crypted = dedent(
- """\
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- """
- )
- with patch("salt.renderers.gpg._get_gpg_exec", MagicMock(return_value=True)):
- with patch(
- "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
- ):
- with patch(
- "salt.renderers.gpg._decrypt_ciphertext",
- MagicMock(return_value=secret),
- ):
- self.assertEqual(gpg.render(crypted, encoding="utf-8"), expected)
- def test_render_with_translate_newlines_should_translate_newlines(self):
- key_dir = "/etc/salt/gpgkeys"
- secret = b"Use\x8b more\x8b salt."
- expected = b"\n\n".join([secret] * 3)
- crypted = dedent(
- """\
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----\\n
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----\\n
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- """
- )
- with patch("salt.renderers.gpg._get_gpg_exec", MagicMock(return_value=True)):
- with patch(
- "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
- ):
- with patch(
- "salt.renderers.gpg._decrypt_ciphertext",
- MagicMock(return_value=secret),
- ):
- self.assertEqual(
- gpg.render(crypted, translate_newlines=True, encoding="utf-8"),
- expected,
- )
- def test_render_without_cache(self):
- key_dir = "/etc/salt/gpgkeys"
- secret = "Use more salt."
- expected = "\n".join([secret] * 3)
- crypted = dedent(
- """\
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- """
- )
- with patch("salt.renderers.gpg.Popen") as popen_mock:
- popen_mock.return_value = Mock(
- communicate=lambda *args, **kwargs: (secret, None),
- )
- with patch(
- "salt.renderers.gpg._get_gpg_exec",
- MagicMock(return_value="/usr/bin/gpg"),
- ):
- with patch(
- "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
- ):
- self.assertEqual(gpg.render(crypted), expected)
- gpg_call = call(
- [
- "/usr/bin/gpg",
- "--homedir",
- "/etc/salt/gpgkeys",
- "--status-fd",
- "2",
- "--no-tty",
- "-d",
- ],
- shell=False,
- stderr=PIPE,
- stdin=PIPE,
- stdout=PIPE,
- )
- popen_mock.assert_has_calls([gpg_call] * 3)
- def test_render_with_cache(self):
- key_dir = "/etc/salt/gpgkeys"
- secret = "Use more salt."
- expected = "\n".join([secret] * 3)
- crypted = dedent(
- """\
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- -----BEGIN PGP MESSAGE-----
- !@#$%^&*()_+
- -----END PGP MESSAGE-----
- """
- )
- minion_opts = self.get_temp_config("minion", gpg_cache=True)
- with patch.dict(gpg.__opts__, minion_opts):
- with patch("salt.renderers.gpg.Popen") as popen_mock:
- popen_mock.return_value = Mock(
- communicate=lambda *args, **kwargs: (secret, None),
- )
- with patch(
- "salt.renderers.gpg._get_gpg_exec",
- MagicMock(return_value="/usr/bin/gpg"),
- ):
- with patch(
- "salt.renderers.gpg._get_key_dir",
- MagicMock(return_value=key_dir),
- ):
- with patch(
- "salt.utils.atomicfile.atomic_open", MagicMock(),
- ) as atomic_open_mock:
- self.assertEqual(gpg.render(crypted), expected)
- gpg_call = call(
- [
- "/usr/bin/gpg",
- "--homedir",
- "/etc/salt/gpgkeys",
- "--status-fd",
- "2",
- "--no-tty",
- "-d",
- ],
- shell=False,
- stderr=PIPE,
- stdin=PIPE,
- stdout=PIPE,
- )
- popen_mock.assert_has_calls([gpg_call] * 1)
- atomic_open_mock.assert_has_calls(
- [
- call(
- os.path.join(
- minion_opts["cachedir"], "gpg_cache"
- ),
- "wb+",
- )
- ]
- )
|