1
0

test_gitfs.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. # -*- coding: utf-8 -*-
  2. """
  3. These only test the provider selection and verification logic, they do not init
  4. any remotes.
  5. """
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import os
  8. import shutil
  9. from time import time
  10. import salt.fileserver.gitfs
  11. import salt.utils.files
  12. import salt.utils.gitfs
  13. import salt.utils.platform
  14. import tests.support.paths
  15. from salt.exceptions import FileserverConfigError
  16. from tests.support.mock import MagicMock, patch
  17. from tests.support.unit import TestCase, skipIf
  18. try:
  19. HAS_PYGIT2 = (
  20. salt.utils.gitfs.PYGIT2_VERSION >= salt.utils.gitfs.PYGIT2_MINVER
  21. and salt.utils.gitfs.LIBGIT2_VERSION >= salt.utils.gitfs.LIBGIT2_MINVER
  22. )
  23. except AttributeError:
  24. HAS_PYGIT2 = False
  25. if HAS_PYGIT2:
  26. import pygit2
  27. class TestGitFSProvider(TestCase):
  28. def setUp(self):
  29. self.opts = {"cachedir": "/tmp/gitfs-test-cache"}
  30. def tearDown(self):
  31. self.opts = None
  32. def test_provider_case_insensitive(self):
  33. """
  34. Ensure that both lowercase and non-lowercase values are supported
  35. """
  36. provider = "GitPython"
  37. for role_name, role_class in (
  38. ("gitfs", salt.utils.gitfs.GitFS),
  39. ("git_pillar", salt.utils.gitfs.GitPillar),
  40. ("winrepo", salt.utils.gitfs.WinRepo),
  41. ):
  42. key = "{0}_provider".format(role_name)
  43. with patch.object(
  44. role_class, "verify_gitpython", MagicMock(return_value=True)
  45. ):
  46. with patch.object(
  47. role_class, "verify_pygit2", MagicMock(return_value=False)
  48. ):
  49. args = [self.opts, {}]
  50. kwargs = {"init_remotes": False}
  51. if role_name == "winrepo":
  52. kwargs["cache_root"] = "/tmp/winrepo-dir"
  53. with patch.dict(self.opts, {key: provider}):
  54. # Try to create an instance with uppercase letters in
  55. # provider name. If it fails then a
  56. # FileserverConfigError will be raised, so no assert is
  57. # necessary.
  58. role_class(*args, **kwargs)
  59. # Now try to instantiate an instance with all lowercase
  60. # letters. Again, no need for an assert here.
  61. role_class(*args, **kwargs)
  62. def test_valid_provider(self):
  63. """
  64. Ensure that an invalid provider is not accepted, raising a
  65. FileserverConfigError.
  66. """
  67. def _get_mock(verify, provider):
  68. """
  69. Return a MagicMock with the desired return value
  70. """
  71. return MagicMock(return_value=verify.endswith(provider))
  72. for role_name, role_class in (
  73. ("gitfs", salt.utils.gitfs.GitFS),
  74. ("git_pillar", salt.utils.gitfs.GitPillar),
  75. ("winrepo", salt.utils.gitfs.WinRepo),
  76. ):
  77. key = "{0}_provider".format(role_name)
  78. for provider in salt.utils.gitfs.GIT_PROVIDERS:
  79. verify = "verify_gitpython"
  80. mock1 = _get_mock(verify, provider)
  81. with patch.object(role_class, verify, mock1):
  82. verify = "verify_pygit2"
  83. mock2 = _get_mock(verify, provider)
  84. with patch.object(role_class, verify, mock2):
  85. args = [self.opts, {}]
  86. kwargs = {"init_remotes": False}
  87. if role_name == "winrepo":
  88. kwargs["cache_root"] = "/tmp/winrepo-dir"
  89. with patch.dict(self.opts, {key: provider}):
  90. role_class(*args, **kwargs)
  91. with patch.dict(self.opts, {key: "foo"}):
  92. # Set the provider name to a known invalid provider
  93. # and make sure it raises an exception.
  94. self.assertRaises(
  95. FileserverConfigError, role_class, *args, **kwargs
  96. )
  97. @skipIf(not HAS_PYGIT2, "This host lacks proper pygit2 support")
  98. @skipIf(
  99. salt.utils.platform.is_windows(),
  100. "Skip Pygit2 on windows, due to pygit2 access error on windows",
  101. )
  102. class TestPygit2(TestCase):
  103. def _prepare_remote_repository(self, path):
  104. shutil.rmtree(path, ignore_errors=True)
  105. filecontent = "This is an empty README file"
  106. filename = "README"
  107. signature = pygit2.Signature(
  108. "Dummy Commiter", "dummy@dummy.com", int(time()), 0
  109. )
  110. repository = pygit2.init_repository(path, False)
  111. builder = repository.TreeBuilder()
  112. tree = builder.write()
  113. commit = repository.create_commit(
  114. "HEAD", signature, signature, "Create master branch", tree, []
  115. )
  116. repository.create_reference("refs/tags/simple_tag", commit)
  117. with salt.utils.files.fopen(
  118. os.path.join(repository.workdir, filename), "w"
  119. ) as file:
  120. file.write(filecontent)
  121. blob = repository.create_blob_fromworkdir(filename)
  122. builder = repository.TreeBuilder()
  123. builder.insert(filename, blob, pygit2.GIT_FILEMODE_BLOB)
  124. tree = builder.write()
  125. repository.index.read()
  126. repository.index.add(filename)
  127. repository.index.write()
  128. commit = repository.create_commit(
  129. "HEAD",
  130. signature,
  131. signature,
  132. "Added a README",
  133. tree,
  134. [repository.head.target],
  135. )
  136. repository.create_tag(
  137. "annotated_tag", commit, pygit2.GIT_OBJ_COMMIT, signature, "some message"
  138. )
  139. def _prepare_cache_repository(self, remote, cache):
  140. opts = {
  141. "cachedir": cache,
  142. "__role": "minion",
  143. "gitfs_disable_saltenv_mapping": False,
  144. "gitfs_base": "master",
  145. "gitfs_insecure_auth": False,
  146. "gitfs_mountpoint": "",
  147. "gitfs_passphrase": "",
  148. "gitfs_password": "",
  149. "gitfs_privkey": "",
  150. "gitfs_provider": "pygit2",
  151. "gitfs_pubkey": "",
  152. "gitfs_ref_types": ["branch", "tag", "sha"],
  153. "gitfs_refspecs": [
  154. "+refs/heads/*:refs/remotes/origin/*",
  155. "+refs/tags/*:refs/tags/*",
  156. ],
  157. "gitfs_root": "",
  158. "gitfs_saltenv_blacklist": [],
  159. "gitfs_saltenv_whitelist": [],
  160. "gitfs_ssl_verify": True,
  161. "gitfs_update_interval": 3,
  162. "gitfs_user": "",
  163. "verified_gitfs_provider": "pygit2",
  164. }
  165. per_remote_defaults = {
  166. "base": "master",
  167. "disable_saltenv_mapping": False,
  168. "insecure_auth": False,
  169. "ref_types": ["branch", "tag", "sha"],
  170. "passphrase": "",
  171. "mountpoint": "",
  172. "password": "",
  173. "privkey": "",
  174. "pubkey": "",
  175. "refspecs": [
  176. "+refs/heads/*:refs/remotes/origin/*",
  177. "+refs/tags/*:refs/tags/*",
  178. ],
  179. "root": "",
  180. "saltenv_blacklist": [],
  181. "saltenv_whitelist": [],
  182. "ssl_verify": True,
  183. "update_interval": 60,
  184. "user": "",
  185. }
  186. per_remote_only = ("all_saltenvs", "name", "saltenv")
  187. override_params = tuple(per_remote_defaults.keys())
  188. cache_root = os.path.join(cache, "gitfs")
  189. role = "gitfs"
  190. shutil.rmtree(cache_root, ignore_errors=True)
  191. provider = salt.utils.gitfs.Pygit2(
  192. opts,
  193. remote,
  194. per_remote_defaults,
  195. per_remote_only,
  196. override_params,
  197. cache_root,
  198. role,
  199. )
  200. return provider
  201. def test_checkout(self):
  202. remote = os.path.join(tests.support.paths.TMP, "pygit2-repo")
  203. cache = os.path.join(tests.support.paths.TMP, "pygit2-repo-cache")
  204. self._prepare_remote_repository(remote)
  205. provider = self._prepare_cache_repository(remote, cache)
  206. provider.remotecallbacks = None
  207. provider.credentials = None
  208. provider.init_remote()
  209. provider.fetch()
  210. provider.branch = "master"
  211. self.assertIn(provider.cachedir, provider.checkout())
  212. provider.branch = "simple_tag"
  213. self.assertIn(provider.cachedir, provider.checkout())
  214. provider.branch = "annotated_tag"
  215. self.assertIn(provider.cachedir, provider.checkout())
  216. provider.branch = "does_not_exist"
  217. self.assertIsNone(provider.checkout())