test_gpg.py 11 KB


  1. import os
  2. from subprocess import PIPE
  3. from textwrap import dedent
  4. import salt.renderers.gpg as gpg
  5. from salt.exceptions import SaltRenderError
  6. from tests.support.mixins import (
  7. AdaptedConfigurationTestCaseMixin,
  8. LoaderModuleMockMixin,
  9. )
  10. from tests.support.mock import MagicMock, Mock, call, patch
  11. from tests.support.unit import TestCase
  12. class GPGTestCase(TestCase, LoaderModuleMockMixin, AdaptedConfigurationTestCaseMixin):
  13. """
  14. unit test GPG renderer
  15. """
  16. def setup_loader_modules(self):
  17. return {gpg: {}}
  18. def test__get_gpg_exec(self):
  19. """
  20. test _get_gpg_exec
  21. """
  22. gpg_exec = "/bin/gpg"
  23. with patch("salt.utils.path.which", MagicMock(return_value=gpg_exec)):
  24. self.assertEqual(gpg._get_gpg_exec(), gpg_exec)
  25. with patch("salt.utils.path.which", MagicMock(return_value=False)):
  26. self.assertRaises(SaltRenderError, gpg._get_gpg_exec)
  27. def test__decrypt_ciphertext(self):
  28. """
  29. test _decrypt_ciphertext
  30. """
  31. key_dir = "/etc/salt/gpgkeys"
  32. secret = "Use more salt."
  33. crypted = "-----BEGIN PGP MESSAGE-----!@#$%^&*()_+-----END PGP MESSAGE-----"
  34. multisecret = "password is {0} and salt is {0}".format(secret)
  35. multicrypted = "password is {0} and salt is {0}".format(crypted)
  36. class GPGDecrypt:
  37. def communicate(self, *args, **kwargs):
  38. return [secret, None]
  39. class GPGNotDecrypt:
  40. def communicate(self, *args, **kwargs):
  41. return [None, "decrypt error"]
  42. with patch(
  43. "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
  44. ), patch("salt.utils.path.which", MagicMock()):
  45. with patch(
  46. "salt.renderers.gpg.Popen", MagicMock(return_value=GPGDecrypt())
  47. ):
  48. self.assertEqual(gpg._decrypt_ciphertexts(crypted), secret)
  49. self.assertEqual(gpg._decrypt_ciphertexts(multicrypted), multisecret)
  50. with patch(
  51. "salt.renderers.gpg.Popen", MagicMock(return_value=GPGNotDecrypt())
  52. ):
  53. self.assertEqual(gpg._decrypt_ciphertexts(crypted), crypted)
  54. self.assertEqual(gpg._decrypt_ciphertexts(multicrypted), multicrypted)
  55. def test__decrypt_object(self):
  56. """
  57. test _decrypt_object
  58. """
  59. secret = "Use more salt."
  60. crypted = "-----BEGIN PGP MESSAGE-----!@#$%^&*()_+-----END PGP MESSAGE-----"
  61. secret_map = {"secret": secret}
  62. crypted_map = {"secret": crypted}
  63. secret_list = [secret]
  64. crypted_list = [crypted]
  65. with patch(
  66. "salt.renderers.gpg._decrypt_ciphertext", MagicMock(return_value=secret)
  67. ):
  68. self.assertEqual(gpg._decrypt_object(secret), secret)
  69. self.assertEqual(gpg._decrypt_object(crypted), secret)
  70. self.assertEqual(gpg._decrypt_object(crypted_map), secret_map)
  71. self.assertEqual(gpg._decrypt_object(crypted_list), secret_list)
  72. self.assertEqual(gpg._decrypt_object(None), None)
  73. def test_render(self):
  74. """
  75. test render
  76. """
  77. key_dir = "/etc/salt/gpgkeys"
  78. secret = "Use more salt."
  79. crypted = "-----BEGIN PGP MESSAGE-----!@#$%^&*()_+"
  80. with patch("salt.renderers.gpg._get_gpg_exec", MagicMock(return_value=True)):
  81. with patch(
  82. "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
  83. ):
  84. with patch(
  85. "salt.renderers.gpg._decrypt_object", MagicMock(return_value=secret)
  86. ):
  87. self.assertEqual(gpg.render(crypted), secret)
  88. def test_render_bytes(self):
  89. """
  90. test rendering bytes
  91. """
  92. key_dir = "/etc/salt/gpgkeys"
  93. binfo = b"User more salt."
  94. with patch("salt.renderers.gpg._get_gpg_exec", MagicMock(return_value=True)):
  95. with patch(
  96. "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
  97. ):
  98. self.assertEqual(gpg.render(binfo), binfo.decode())
  99. def test_multi_render(self):
  100. key_dir = "/etc/salt/gpgkeys"
  101. secret = "Use more salt."
  102. expected = "\n".join([secret] * 3)
  103. crypted = dedent(
  104. """\
  105. -----BEGIN PGP MESSAGE-----
  106. !@#$%^&*()_+
  107. -----END PGP MESSAGE-----
  108. -----BEGIN PGP MESSAGE-----
  109. !@#$%^&*()_+
  110. -----END PGP MESSAGE-----
  111. -----BEGIN PGP MESSAGE-----
  112. !@#$%^&*()_+
  113. -----END PGP MESSAGE-----
  114. """
  115. )
  116. with patch("salt.renderers.gpg._get_gpg_exec", MagicMock(return_value=True)):
  117. with patch(
  118. "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
  119. ):
  120. with patch(
  121. "salt.renderers.gpg._decrypt_ciphertext",
  122. MagicMock(return_value=secret),
  123. ):
  124. self.assertEqual(gpg.render(crypted), expected)
  125. def test_render_with_binary_data_should_return_binary_data(self):
  126. key_dir = "/etc/salt/gpgkeys"
  127. secret = b"Use\x8b more\x8b salt."
  128. expected = b"\n".join([secret] * 3)
  129. crypted = dedent(
  130. """\
  131. -----BEGIN PGP MESSAGE-----
  132. !@#$%^&*()_+
  133. -----END PGP MESSAGE-----
  134. -----BEGIN PGP MESSAGE-----
  135. !@#$%^&*()_+
  136. -----END PGP MESSAGE-----
  137. -----BEGIN PGP MESSAGE-----
  138. !@#$%^&*()_+
  139. -----END PGP MESSAGE-----
  140. """
  141. )
  142. with patch("salt.renderers.gpg._get_gpg_exec", MagicMock(return_value=True)):
  143. with patch(
  144. "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
  145. ):
  146. with patch(
  147. "salt.renderers.gpg._decrypt_ciphertext",
  148. MagicMock(return_value=secret),
  149. ):
  150. self.assertEqual(gpg.render(crypted, encoding="utf-8"), expected)
  151. def test_render_with_translate_newlines_should_translate_newlines(self):
  152. key_dir = "/etc/salt/gpgkeys"
  153. secret = b"Use\x8b more\x8b salt."
  154. expected = b"\n\n".join([secret] * 3)
  155. crypted = dedent(
  156. """\
  157. -----BEGIN PGP MESSAGE-----
  158. !@#$%^&*()_+
  159. -----END PGP MESSAGE-----\\n
  160. -----BEGIN PGP MESSAGE-----
  161. !@#$%^&*()_+
  162. -----END PGP MESSAGE-----\\n
  163. -----BEGIN PGP MESSAGE-----
  164. !@#$%^&*()_+
  165. -----END PGP MESSAGE-----
  166. """
  167. )
  168. with patch("salt.renderers.gpg._get_gpg_exec", MagicMock(return_value=True)):
  169. with patch(
  170. "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
  171. ):
  172. with patch(
  173. "salt.renderers.gpg._decrypt_ciphertext",
  174. MagicMock(return_value=secret),
  175. ):
  176. self.assertEqual(
  177. gpg.render(crypted, translate_newlines=True, encoding="utf-8"),
  178. expected,
  179. )
  180. def test_render_without_cache(self):
  181. key_dir = "/etc/salt/gpgkeys"
  182. secret = "Use more salt."
  183. expected = "\n".join([secret] * 3)
  184. crypted = dedent(
  185. """\
  186. -----BEGIN PGP MESSAGE-----
  187. !@#$%^&*()_+
  188. -----END PGP MESSAGE-----
  189. -----BEGIN PGP MESSAGE-----
  190. !@#$%^&*()_+
  191. -----END PGP MESSAGE-----
  192. -----BEGIN PGP MESSAGE-----
  193. !@#$%^&*()_+
  194. -----END PGP MESSAGE-----
  195. """
  196. )
  197. with patch("salt.renderers.gpg.Popen") as popen_mock:
  198. popen_mock.return_value = Mock(
  199. communicate=lambda *args, **kwargs: (secret, None),
  200. )
  201. with patch(
  202. "salt.renderers.gpg._get_gpg_exec",
  203. MagicMock(return_value="/usr/bin/gpg"),
  204. ):
  205. with patch(
  206. "salt.renderers.gpg._get_key_dir", MagicMock(return_value=key_dir)
  207. ):
  208. self.assertEqual(gpg.render(crypted), expected)
  209. gpg_call = call(
  210. [
  211. "/usr/bin/gpg",
  212. "--homedir",
  213. "/etc/salt/gpgkeys",
  214. "--status-fd",
  215. "2",
  216. "--no-tty",
  217. "-d",
  218. ],
  219. shell=False,
  220. stderr=PIPE,
  221. stdin=PIPE,
  222. stdout=PIPE,
  223. )
  224. popen_mock.assert_has_calls([gpg_call] * 3)
  225. def test_render_with_cache(self):
  226. key_dir = "/etc/salt/gpgkeys"
  227. secret = "Use more salt."
  228. expected = "\n".join([secret] * 3)
  229. crypted = dedent(
  230. """\
  231. -----BEGIN PGP MESSAGE-----
  232. !@#$%^&*()_+
  233. -----END PGP MESSAGE-----
  234. -----BEGIN PGP MESSAGE-----
  235. !@#$%^&*()_+
  236. -----END PGP MESSAGE-----
  237. -----BEGIN PGP MESSAGE-----
  238. !@#$%^&*()_+
  239. -----END PGP MESSAGE-----
  240. """
  241. )
  242. minion_opts = self.get_temp_config("minion", gpg_cache=True)
  243. with patch.dict(gpg.__opts__, minion_opts):
  244. with patch("salt.renderers.gpg.Popen") as popen_mock:
  245. popen_mock.return_value = Mock(
  246. communicate=lambda *args, **kwargs: (secret, None),
  247. )
  248. with patch(
  249. "salt.renderers.gpg._get_gpg_exec",
  250. MagicMock(return_value="/usr/bin/gpg"),
  251. ):
  252. with patch(
  253. "salt.renderers.gpg._get_key_dir",
  254. MagicMock(return_value=key_dir),
  255. ):
  256. with patch(
  257. "salt.utils.atomicfile.atomic_open", MagicMock(),
  258. ) as atomic_open_mock:
  259. self.assertEqual(gpg.render(crypted), expected)
  260. gpg_call = call(
  261. [
  262. "/usr/bin/gpg",
  263. "--homedir",
  264. "/etc/salt/gpgkeys",
  265. "--status-fd",
  266. "2",
  267. "--no-tty",
  268. "-d",
  269. ],
  270. shell=False,
  271. stderr=PIPE,
  272. stdin=PIPE,
  273. stdout=PIPE,
  274. )
  275. popen_mock.assert_has_calls([gpg_call] * 1)
  276. atomic_open_mock.assert_has_calls(
  277. [
  278. call(
  279. os.path.join(
  280. minion_opts["cachedir"], "gpg_cache"
  281. ),
  282. "wb+",
  283. )
  284. ]
  285. )