1
0

test_vault.py 9.1 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Integration tests for the vault modules
  4. """
  5. from __future__ import absolute_import, print_function, unicode_literals
  6. import inspect
  7. import logging
  8. import time
  9. import salt.utils.path
  10. from tests.support.case import ModuleCase, ShellCase
  11. from tests.support.helpers import destructiveTest, flaky, slowTest
  12. from tests.support.runtests import RUNTIME_VARS
  13. from tests.support.unit import skipIf
  14. log = logging.getLogger(__name__)
  15. @skipIf(not salt.utils.path.which("dockerd"), "Docker not installed")
  16. @skipIf(not salt.utils.path.which("vault"), "Vault not installed")
  17. class VaultTestCase(ModuleCase, ShellCase):
  18. """
  19. Test vault module
  20. """
  21. count = 0
  22. def setUp(self):
  23. """
  24. SetUp vault container
  25. """
  26. vault_binary = salt.utils.path.which("vault")
  27. if VaultTestCase.count == 0:
  28. config = '{"backend": {"file": {"path": "/vault/file"}}, "default_lease_ttl": "168h", "max_lease_ttl": "720h"}'
  29. self.run_state("docker_image.present", name="vault", tag="0.9.6")
  30. self.run_state(
  31. "docker_container.running",
  32. name="vault",
  33. image="vault:0.9.6",
  34. port_bindings="8200:8200",
  35. environment={
  36. "VAULT_DEV_ROOT_TOKEN_ID": "testsecret",
  37. "VAULT_LOCAL_CONFIG": config,
  38. },
  39. cap_add="IPC_LOCK",
  40. )
  41. time.sleep(5)
  42. ret = self.run_function(
  43. "cmd.retcode",
  44. cmd="{} login token=testsecret".format(vault_binary),
  45. env={"VAULT_ADDR": "http://127.0.0.1:8200"},
  46. )
  47. login_attempts = 1
  48. # If the login failed, container might have stopped
  49. # attempt again, maximum of three times before
  50. # skipping.
  51. while ret != 0:
  52. self.run_state(
  53. "docker_container.running",
  54. name="vault",
  55. image="vault:0.9.6",
  56. port_bindings="8200:8200",
  57. environment={
  58. "VAULT_DEV_ROOT_TOKEN_ID": "testsecret",
  59. "VAULT_LOCAL_CONFIG": config,
  60. },
  61. cap_add="IPC_LOCK",
  62. )
  63. time.sleep(5)
  64. ret = self.run_function(
  65. "cmd.retcode",
  66. cmd="{} login token=testsecret".format(vault_binary),
  67. env={"VAULT_ADDR": "http://127.0.0.1:8200"},
  68. )
  69. login_attempts += 1
  70. if login_attempts >= 3:
  71. self.skipTest("unable to login to vault")
  72. ret = self.run_function(
  73. "cmd.retcode",
  74. cmd="{} policy write testpolicy {}/vault.hcl".format(
  75. vault_binary, RUNTIME_VARS.FILES
  76. ),
  77. env={"VAULT_ADDR": "http://127.0.0.1:8200"},
  78. )
  79. if ret != 0:
  80. self.skipTest("unable to assign policy to vault")
  81. VaultTestCase.count += 1
  82. def tearDown(self):
  83. """
  84. TearDown vault container
  85. """
  86. def count_tests(funcobj):
  87. return (
  88. inspect.ismethod(funcobj)
  89. or inspect.isfunction(funcobj)
  90. and funcobj.__name__.startswith("test_")
  91. )
  92. numtests = len(inspect.getmembers(VaultTestCase, predicate=count_tests))
  93. if VaultTestCase.count >= numtests:
  94. self.run_state("docker_container.stopped", name="vault")
  95. self.run_state("docker_container.absent", name="vault")
  96. self.run_state("docker_image.absent", name="vault", force=True)
  97. @flaky
  98. @slowTest
  99. def test_sdb(self):
  100. set_output = self.run_function(
  101. "sdb.set", uri="sdb://sdbvault/secret/test/test_sdb/foo", value="bar"
  102. )
  103. self.assertEqual(set_output, True)
  104. get_output = self.run_function(
  105. "sdb.get", arg=["sdb://sdbvault/secret/test/test_sdb/foo"]
  106. )
  107. self.assertEqual(get_output, "bar")
  108. @flaky
  109. @slowTest
  110. def test_sdb_runner(self):
  111. set_output = self.run_run(
  112. "sdb.set sdb://sdbvault/secret/test/test_sdb_runner/foo bar"
  113. )
  114. self.assertEqual(set_output, ["True"])
  115. get_output = self.run_run(
  116. "sdb.get sdb://sdbvault/secret/test/test_sdb_runner/foo"
  117. )
  118. self.assertEqual(get_output, ["bar"])
  119. @flaky
  120. @slowTest
  121. def test_config(self):
  122. set_output = self.run_function(
  123. "sdb.set", uri="sdb://sdbvault/secret/test/test_pillar_sdb/foo", value="bar"
  124. )
  125. self.assertEqual(set_output, True)
  126. get_output = self.run_function("config.get", arg=["test_vault_pillar_sdb"])
  127. self.assertEqual(get_output, "bar")
  128. @destructiveTest
  129. @skipIf(not salt.utils.path.which("dockerd"), "Docker not installed")
  130. @skipIf(not salt.utils.path.which("vault"), "Vault not installed")
  131. class VaultTestCaseCurrent(ModuleCase, ShellCase):
  132. """
  133. Test vault module
  134. """
  135. count = 0
  136. def setUp(self):
  137. """
  138. SetUp vault container
  139. """
  140. if self.count == 0:
  141. config = '{"backend": {"file": {"path": "/vault/file"}}, "default_lease_ttl": "168h", "max_lease_ttl": "720h"}'
  142. self.run_state("docker_image.present", name="vault", tag="1.3.1")
  143. self.run_state(
  144. "docker_container.running",
  145. name="vault",
  146. image="vault:1.3.1",
  147. port_bindings="8200:8200",
  148. environment={
  149. "VAULT_DEV_ROOT_TOKEN_ID": "testsecret",
  150. "VAULT_LOCAL_CONFIG": config,
  151. },
  152. cap_add="IPC_LOCK",
  153. )
  154. time.sleep(5)
  155. ret = self.run_function(
  156. "cmd.retcode",
  157. cmd="/usr/local/bin/vault login token=testsecret",
  158. env={"VAULT_ADDR": "http://127.0.0.1:8200"},
  159. )
  160. login_attempts = 1
  161. # If the login failed, container might have stopped
  162. # attempt again, maximum of three times before
  163. # skipping.
  164. while ret != 0:
  165. self.run_state(
  166. "docker_container.running",
  167. name="vault",
  168. image="vault:1.3.1",
  169. port_bindings="8200:8200",
  170. environment={
  171. "VAULT_DEV_ROOT_TOKEN_ID": "testsecret",
  172. "VAULT_LOCAL_CONFIG": config,
  173. },
  174. cap_add="IPC_LOCK",
  175. )
  176. time.sleep(5)
  177. ret = self.run_function(
  178. "cmd.retcode",
  179. cmd="/usr/local/bin/vault login token=testsecret",
  180. env={"VAULT_ADDR": "http://127.0.0.1:8200"},
  181. )
  182. login_attempts += 1
  183. if login_attempts >= 3:
  184. self.skipTest("unable to login to vault")
  185. ret = self.run_function(
  186. "cmd.retcode",
  187. cmd="/usr/local/bin/vault policy write testpolicy {0}/vault.hcl".format(
  188. RUNTIME_VARS.FILES
  189. ),
  190. env={"VAULT_ADDR": "http://127.0.0.1:8200"},
  191. )
  192. if ret != 0:
  193. self.skipTest("unable to assign policy to vault")
  194. self.count += 1
  195. def tearDown(self):
  196. """
  197. TearDown vault container
  198. """
  199. def count_tests(funcobj):
  200. return (
  201. inspect.ismethod(funcobj)
  202. or inspect.isfunction(funcobj)
  203. and funcobj.__name__.startswith("test_")
  204. )
  205. numtests = len(inspect.getmembers(VaultTestCaseCurrent, predicate=count_tests))
  206. if self.count >= numtests:
  207. self.run_state("docker_container.stopped", name="vault")
  208. self.run_state("docker_container.absent", name="vault")
  209. self.run_state("docker_image.absent", name="vault", force=True)
  210. @flaky
  211. @slowTest
  212. def test_sdb_kv2(self):
  213. set_output = self.run_function(
  214. "sdb.set", uri="sdb://sdbvault/secret/test/test_sdb/foo", value="bar"
  215. )
  216. self.assertEqual(set_output, True)
  217. get_output = self.run_function(
  218. "sdb.get", arg=["sdb://sdbvault/secret/test/test_sdb/foo"]
  219. )
  220. self.assertEqual(get_output, "bar")
  221. @flaky
  222. @slowTest
  223. def test_sdb_runner_kv2(self):
  224. set_output = self.run_run(
  225. "sdb.set sdb://sdbvault/secret/test/test_sdb_runner/foo bar"
  226. )
  227. self.assertEqual(set_output, ["True"])
  228. get_output = self.run_run(
  229. "sdb.get sdb://sdbvault/secret/test/test_sdb_runner/foo"
  230. )
  231. self.assertEqual(get_output, ["bar"])
  232. @flaky
  233. @slowTest
  234. def test_config_kv2(self):
  235. set_output = self.run_function(
  236. "sdb.set", uri="sdb://sdbvault/secret/test/test_pillar_sdb/foo", value="bar"
  237. )
  238. self.assertEqual(set_output, True)
  239. get_output = self.run_function("config.get", arg=["test_vault_pillar_sdb"])
  240. self.assertEqual(get_output, "bar")