test_vault.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. # -*- coding: utf-8 -*-
  2. """
  3. Integration tests for the vault execution module
  4. """
  5. from __future__ import absolute_import, print_function, unicode_literals
  6. import inspect
  7. import logging
  8. import time
  9. import pytest
  10. import salt.utils.path
  11. from tests.support.case import ModuleCase
  12. from tests.support.runtests import RUNTIME_VARS
  13. from tests.support.unit import skipIf
  14. log = logging.getLogger(__name__)
  15. @skipIf(not salt.utils.path.which("dockerd"), "Docker not installed")
  16. @skipIf(not salt.utils.path.which("vault"), "Vault not installed")
  17. @pytest.mark.destructive_test
  18. class VaultTestCase(ModuleCase):
  19. """
  20. Test vault module
  21. """
  22. count = 0
  23. def setUp(self):
  24. """
  25. SetUp vault container
  26. """
  27. if self.count == 0:
  28. config = '{"backend": {"file": {"path": "/vault/file"}}, "default_lease_ttl": "168h", "max_lease_ttl": "720h", "disable_mlock": true}'
  29. self.run_state("docker_image.present", name="vault", tag="0.9.6")
  30. self.run_state(
  31. "docker_container.running",
  32. name="vault",
  33. image="vault:0.9.6",
  34. port_bindings="8200:8200",
  35. environment={
  36. "VAULT_DEV_ROOT_TOKEN_ID": "testsecret",
  37. "VAULT_LOCAL_CONFIG": config,
  38. },
  39. )
  40. time.sleep(5)
  41. ret = self.run_function(
  42. "cmd.retcode",
  43. cmd="/usr/local/bin/vault login token=testsecret",
  44. env={"VAULT_ADDR": "http://127.0.0.1:8200"},
  45. )
  46. if ret != 0:
  47. self.skipTest("unable to login to vault")
  48. ret = self.run_function(
  49. "cmd.retcode",
  50. cmd="/usr/local/bin/vault policy write testpolicy {0}/vault.hcl".format(
  51. RUNTIME_VARS.FILES
  52. ),
  53. env={"VAULT_ADDR": "http://127.0.0.1:8200"},
  54. )
  55. if ret != 0:
  56. self.skipTest("unable to assign policy to vault")
  57. self.count += 1
  58. def tearDown(self):
  59. """
  60. TearDown vault container
  61. """
  62. def count_tests(funcobj):
  63. return inspect.ismethod(funcobj) and funcobj.__name__.startswith("test_")
  64. numtests = len(inspect.getmembers(VaultTestCase, predicate=count_tests))
  65. if self.count >= numtests:
  66. self.run_state("docker_container.stopped", name="vault")
  67. self.run_state("docker_container.absent", name="vault")
  68. self.run_state("docker_image.absent", name="vault", force=True)
  69. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  70. def test_write_read_secret(self):
  71. write_return = self.run_function(
  72. "vault.write_secret", path="secret/my/secret", user="foo", password="bar"
  73. )
  74. self.assertEqual(write_return, True)
  75. assert self.run_function("vault.read_secret", arg=["secret/my/secret"]) == {
  76. "password": "bar",
  77. "user": "foo",
  78. }
  79. assert (
  80. self.run_function("vault.read_secret", arg=["secret/my/secret", "user"])
  81. == "foo"
  82. )
  83. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  84. def test_write_raw_read_secret(self):
  85. assert (
  86. self.run_function(
  87. "vault.write_raw",
  88. path="secret/my/secret2",
  89. raw={"user2": "foo2", "password2": "bar2"},
  90. )
  91. is True
  92. )
  93. assert self.run_function("vault.read_secret", arg=["secret/my/secret2"]) == {
  94. "password2": "bar2",
  95. "user2": "foo2",
  96. }
  97. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  98. def test_delete_secret(self):
  99. assert (
  100. self.run_function(
  101. "vault.write_secret",
  102. path="secret/my/secret",
  103. user="foo",
  104. password="bar",
  105. )
  106. is True
  107. )
  108. assert (
  109. self.run_function("vault.delete_secret", arg=["secret/my/secret"]) is True
  110. )
  111. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  112. def test_list_secrets(self):
  113. assert (
  114. self.run_function(
  115. "vault.write_secret",
  116. path="secret/my/secret",
  117. user="foo",
  118. password="bar",
  119. )
  120. is True
  121. )
  122. assert self.run_function("vault.list_secrets", arg=["secret/my/"]) == {
  123. "keys": ["secret"]
  124. }
  125. @pytest.mark.destructive_test
  126. @skipIf(not salt.utils.path.which("dockerd"), "Docker not installed")
  127. @skipIf(not salt.utils.path.which("vault"), "Vault not installed")
  128. class VaultTestCaseCurrent(ModuleCase):
  129. """
  130. Test vault module against current vault
  131. """
  132. count = 0
  133. def setUp(self):
  134. """
  135. SetUp vault container
  136. """
  137. if self.count == 0:
  138. config = '{"backend": {"file": {"path": "/vault/file"}}, "default_lease_ttl": "168h", "max_lease_ttl": "720h", "disable_mlock": true}'
  139. self.run_state("docker_image.present", name="vault", tag="1.3.1")
  140. self.run_state(
  141. "docker_container.running",
  142. name="vault",
  143. image="vault:1.3.1",
  144. port_bindings="8200:8200",
  145. environment={
  146. "VAULT_DEV_ROOT_TOKEN_ID": "testsecret",
  147. "VAULT_LOCAL_CONFIG": config,
  148. },
  149. )
  150. time.sleep(5)
  151. ret = self.run_function(
  152. "cmd.retcode",
  153. cmd="/usr/local/bin/vault login token=testsecret",
  154. env={"VAULT_ADDR": "http://127.0.0.1:8200"},
  155. )
  156. if ret != 0:
  157. self.skipTest("unable to login to vault")
  158. ret = self.run_function(
  159. "cmd.retcode",
  160. cmd="/usr/local/bin/vault policy write testpolicy {0}/vault.hcl".format(
  161. RUNTIME_VARS.FILES
  162. ),
  163. env={"VAULT_ADDR": "http://127.0.0.1:8200"},
  164. )
  165. if ret != 0:
  166. self.skipTest("unable to assign policy to vault")
  167. self.count += 1
  168. def tearDown(self):
  169. """
  170. TearDown vault container
  171. """
  172. def count_tests(funcobj):
  173. return inspect.ismethod(funcobj) and funcobj.__name__.startswith("test_")
  174. numtests = len(inspect.getmembers(VaultTestCaseCurrent, predicate=count_tests))
  175. if self.count >= numtests:
  176. self.run_state("docker_container.stopped", name="vault")
  177. self.run_state("docker_container.absent", name="vault")
  178. self.run_state("docker_image.absent", name="vault", force=True)
  179. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  180. def test_write_read_secret_kv2(self):
  181. write_return = self.run_function(
  182. "vault.write_secret", path="secret/my/secret", user="foo", password="bar"
  183. )
  184. # write_secret output:
  185. # {'created_time': '2020-01-12T23:09:34.571294241Z', 'destroyed': False,
  186. # 'version': 1, 'deletion_time': ''}
  187. expected_write = {"destroyed": False, "deletion_time": ""}
  188. self.assertDictContainsSubset(expected_write, write_return)
  189. read_return = self.run_function(
  190. "vault.read_secret", arg=["secret/my/secret"], metadata=True
  191. )
  192. # read_secret output:
  193. # {'data': {'password': 'bar', 'user': 'foo'},
  194. # 'metadata': {'created_time': '2020-01-12T23:07:18.829326918Z', 'destroyed': False,
  195. # 'version': 1, 'deletion_time': ''}}
  196. expected_read = {"data": {"password": "bar", "user": "foo"}}
  197. self.assertDictContainsSubset(expected_read, read_return)
  198. expected_read = {"password": "bar", "user": "foo"}
  199. read_return = self.run_function("vault.read_secret", arg=["secret/my/secret"])
  200. self.assertDictContainsSubset(expected_read, read_return)
  201. read_return = self.run_function(
  202. "vault.read_secret", arg=["secret/my/secret", "user"]
  203. )
  204. self.assertEqual(read_return, "foo")
  205. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  206. def test_list_secrets_kv2(self):
  207. write_return = self.run_function(
  208. "vault.write_secret", path="secret/my/secret", user="foo", password="bar"
  209. )
  210. expected_write = {"destroyed": False, "deletion_time": ""}
  211. self.assertDictContainsSubset(expected_write, write_return)
  212. list_return = self.run_function("vault.list_secrets", arg=["secret/my/"])
  213. self.assertIn("secret", list_return["keys"])
  214. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  215. def test_write_raw_read_secret_kv2(self):
  216. write_return = self.run_function(
  217. "vault.write_raw",
  218. path="secret/my/secret2",
  219. raw={"user2": "foo2", "password2": "bar2"},
  220. )
  221. expected_write = {"destroyed": False, "deletion_time": ""}
  222. self.assertDictContainsSubset(expected_write, write_return)
  223. read_return = self.run_function(
  224. "vault.read_secret", arg=["secret/my/secret2"], metadata=True
  225. )
  226. expected_read = {"data": {"password2": "bar2", "user2": "foo2"}}
  227. self.assertDictContainsSubset(expected_read, read_return)
  228. read_return = self.run_function("vault.read_secret", arg=["secret/my/secret2"])
  229. expected_read = {"password2": "bar2", "user2": "foo2"}
  230. self.assertDictContainsSubset(expected_read, read_return)
  231. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  232. def test_delete_secret_kv2(self):
  233. write_return = self.run_function(
  234. "vault.write_secret",
  235. path="secret/my/secret3",
  236. user3="foo3",
  237. password3="bar3",
  238. )
  239. expected_write = {"destroyed": False, "deletion_time": ""}
  240. self.assertDictContainsSubset(expected_write, write_return)
  241. delete_return = self.run_function(
  242. "vault.delete_secret", arg=["secret/my/secret3"]
  243. )
  244. self.assertEqual(delete_return, True)
  245. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  246. def test_destroy_secret_kv2(self):
  247. write_return = self.run_function(
  248. "vault.write_secret",
  249. path="secret/my/secret4",
  250. user3="foo4",
  251. password4="bar4",
  252. )
  253. expected_write = {"destroyed": False, "deletion_time": ""}
  254. self.assertDictContainsSubset(expected_write, write_return)
  255. destroy_return = self.run_function(
  256. "vault.destroy_secret", arg=["secret/my/secret4", "1"]
  257. )
  258. self.assertEqual(destroy_return, True)
  259. # self.assertIsNone(self.run_function('vault.read_secret', arg=['secret/my/secret4']))
  260. # list_return = self.run_function('vault.list_secrets', arg=['secret/my/'])
  261. # self.assertNotIn('secret4', list_return['keys'])