123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945 |
- # -*- coding: utf-8 -*-
- """
- Tests for git execution module
- NOTE: These tests may modify the global git config, and have been marked as
- destructive as a result. If no values are set for user.name or user.email in
- the user's global .gitconfig, then these tests will set one.
- """
- from __future__ import absolute_import, print_function, unicode_literals
- import errno
- import logging
- import os
- import re
- import shutil
- import subprocess
- import tarfile
- import tempfile
- from contextlib import closing
- import pytest
- import salt.utils.data
- import salt.utils.files
- import salt.utils.platform
- from salt.ext import six
- from salt.utils.versions import LooseVersion
- from tests.support.case import ModuleCase
- from tests.support.runtests import RUNTIME_VARS
- from tests.support.unit import skipIf
- log = logging.getLogger(__name__)
- def _git_version():
- try:
- git_version = subprocess.Popen(
- ["git", "--version"],
- shell=False,
- close_fds=False if salt.utils.platform.is_windows() else True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- ).communicate()[0]
- except OSError:
- return False
- if not git_version:
- log.debug("Git not installed")
- return False
- git_version = git_version.strip().split()[-1]
- if six.PY3:
- git_version = git_version.decode(__salt_system_encoding__)
- log.debug("Detected git version: %s", git_version)
- return LooseVersion(git_version)
- def _worktrees_supported():
- """
- Check if the git version is 2.5.0 or later
- """
- try:
- return _git_version() >= LooseVersion("2.5.0")
- except AttributeError:
- return False
- def _makedirs(path):
- try:
- os.makedirs(path)
- except OSError as exc:
- # Don't raise an exception if the directory exists
- if exc.errno != errno.EEXIST:
- raise
- @pytest.mark.windows_whitelisted
- @pytest.mark.skip_if_binaries_missing("git")
- class GitModuleTest(ModuleCase):
- def setUp(self):
- super(GitModuleTest, self).setUp()
- self.orig_cwd = os.getcwd()
- self.addCleanup(os.chdir, self.orig_cwd)
- self.repo = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- self.addCleanup(shutil.rmtree, self.repo, ignore_errors=True)
- self.files = ("foo", "bar", "baz", "питон")
- self.dirs = ("", "qux")
- self.branches = ("master", "iamanewbranch")
- self.tags = ("git_testing",)
- for dirname in self.dirs:
- dir_path = os.path.join(self.repo, dirname)
- _makedirs(dir_path)
- for filename in self.files:
- with salt.utils.files.fopen(
- os.path.join(dir_path, filename), "wb"
- ) as fp_:
- fp_.write(
- "This is a test file named {0}.".format(filename).encode(
- "utf-8"
- )
- )
- # Navigate to the root of the repo to init, stage, and commit
- os.chdir(self.repo)
- # Initialize a new git repository
- subprocess.check_call(["git", "init", "--quiet", self.repo])
- # Set user.name and user.email config attributes if not present
- for key, value in (
- ("user.name", "Jenkins"),
- ("user.email", "qa@saltstack.com"),
- ):
- # Check if key is missing
- keycheck = subprocess.Popen(
- ["git", "config", "--get", "--global", key],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
- if keycheck.wait() != 0:
- # Set the key if it is not present
- subprocess.check_call(["git", "config", "--global", key, value])
- subprocess.check_call(["git", "add", "."])
- subprocess.check_call(
- ["git", "commit", "--quiet", "--message", "Initial commit"]
- )
- # Add a tag
- subprocess.check_call(["git", "tag", "-a", self.tags[0], "-m", "Add tag"])
- # Checkout a second branch
- subprocess.check_call(["git", "checkout", "--quiet", "-b", self.branches[1]])
- # Add a line to the file
- with salt.utils.files.fopen(self.files[0], "a") as fp_:
- fp_.write(salt.utils.stringutils.to_str("Added a line\n"))
- # Commit the updated file
- subprocess.check_call(
- [
- "git",
- "commit",
- "--quiet",
- "--message",
- "Added a line to " + self.files[0],
- self.files[0],
- ]
- )
- # Switch back to master
- subprocess.check_call(["git", "checkout", "--quiet", "master"])
- # Go back to original cwd
- os.chdir(self.orig_cwd)
- def run_function(self, *args, **kwargs): # pylint: disable=arguments-differ
- """
- Ensure that results are decoded
- TODO: maybe move this behavior to ModuleCase itself?
- """
- return salt.utils.data.decode(
- super(GitModuleTest, self).run_function(*args, **kwargs)
- )
- def tearDown(self):
- for key in ("orig_cwd", "repo", "files", "dirs", "branches", "tags"):
- delattr(self, key)
- super(GitModuleTest, self).tearDown()
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_add_dir(self):
- """
- Test git.add with a directory
- """
- newdir = "quux"
- # Change to the repo dir
- newdir_path = os.path.join(self.repo, newdir)
- _makedirs(newdir_path)
- files = [os.path.join(newdir_path, x) for x in self.files]
- files_relpath = [os.path.join(newdir, x) for x in self.files]
- for path in files:
- with salt.utils.files.fopen(path, "wb") as fp_:
- fp_.write(
- "This is a test file with relative path {0}.\n".format(path).encode(
- "utf-8"
- )
- )
- ret = self.run_function("git.add", [self.repo, newdir])
- res = "\n".join(sorted(["add '{0}'".format(x) for x in files_relpath]))
- if salt.utils.platform.is_windows():
- res = res.replace("\\", "/")
- self.assertEqual(ret, res)
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_add_file(self):
- """
- Test git.add with a file
- """
- filename = "quux"
- file_path = os.path.join(self.repo, filename)
- with salt.utils.files.fopen(file_path, "w") as fp_:
- fp_.write(
- salt.utils.stringutils.to_str(
- "This is a test file named {0}.\n".format(filename)
- )
- )
- ret = self.run_function("git.add", [self.repo, filename])
- self.assertEqual(ret, "add '{0}'".format(filename))
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_archive(self):
- """
- Test git.archive
- """
- tar_archive = os.path.join(RUNTIME_VARS.TMP, "test_archive.tar.gz")
- try:
- self.assertTrue(
- self.run_function(
- "git.archive", [self.repo, tar_archive], prefix="foo/"
- )
- )
- self.assertTrue(tarfile.is_tarfile(tar_archive))
- self.run_function("cmd.run", ["cp " + tar_archive + " /root/"])
- with closing(tarfile.open(tar_archive, "r")) as tar_obj:
- self.assertEqual(
- sorted(salt.utils.data.decode(tar_obj.getnames())),
- sorted(
- [
- "foo",
- "foo/bar",
- "foo/baz",
- "foo/foo",
- "foo/питон",
- "foo/qux",
- "foo/qux/bar",
- "foo/qux/baz",
- "foo/qux/foo",
- "foo/qux/питон",
- ]
- ),
- )
- finally:
- try:
- os.unlink(tar_archive)
- except OSError:
- pass
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_archive_subdir(self):
- """
- Test git.archive on a subdir, giving only a partial copy of the repo in
- the resulting archive
- """
- tar_archive = os.path.join(RUNTIME_VARS.TMP, "test_archive.tar.gz")
- try:
- self.assertTrue(
- self.run_function(
- "git.archive",
- [os.path.join(self.repo, "qux"), tar_archive],
- prefix="foo/",
- )
- )
- self.assertTrue(tarfile.is_tarfile(tar_archive))
- with closing(tarfile.open(tar_archive, "r")) as tar_obj:
- self.assertEqual(
- sorted(salt.utils.data.decode(tar_obj.getnames())),
- sorted(["foo", "foo/bar", "foo/baz", "foo/foo", "foo/питон"]),
- )
- finally:
- try:
- os.unlink(tar_archive)
- except OSError:
- pass
- @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
- def test_branch(self):
- """
- Test creating, renaming, and deleting a branch using git.branch
- """
- renamed_branch = "ihavebeenrenamed"
- self.assertTrue(self.run_function("git.branch", [self.repo, self.branches[1]]))
- self.assertTrue(
- self.run_function(
- "git.branch", [self.repo, renamed_branch], opts="-m " + self.branches[1]
- )
- )
- self.assertTrue(
- self.run_function("git.branch", [self.repo, renamed_branch], opts="-D")
- )
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_checkout(self):
- """
- Test checking out a new branch and then checking out master again
- """
- new_branch = "iamanothernewbranch"
- self.assertEqual(
- self.run_function(
- "git.checkout", [self.repo, "HEAD"], opts="-b " + new_branch
- ),
- "Switched to a new branch '" + new_branch + "'",
- )
- self.assertTrue(
- "Switched to branch 'master'"
- in self.run_function("git.checkout", [self.repo, "master"]),
- )
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_checkout_no_rev(self):
- """
- Test git.checkout without a rev, both with -b in opts and without
- """
- new_branch = "iamanothernewbranch"
- self.assertEqual(
- self.run_function(
- "git.checkout", [self.repo], rev=None, opts="-b " + new_branch
- ),
- "Switched to a new branch '" + new_branch + "'",
- )
- self.assertTrue(
- "'rev' argument is required unless -b or -B in opts"
- in self.run_function("git.checkout", [self.repo])
- )
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_clone(self):
- """
- Test cloning an existing repo
- """
- clone_parent_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- self.assertTrue(self.run_function("git.clone", [clone_parent_dir, self.repo]))
- # Cleanup after yourself
- shutil.rmtree(clone_parent_dir, True)
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_clone_with_alternate_name(self):
- """
- Test cloning an existing repo with an alternate name for the repo dir
- """
- clone_parent_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- clone_name = os.path.basename(self.repo)
- # Change to newly-created temp dir
- self.assertTrue(
- self.run_function(
- "git.clone", [clone_parent_dir, self.repo], name=clone_name
- )
- )
- # Cleanup after yourself
- shutil.rmtree(clone_parent_dir, True)
- @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
- def test_commit(self):
- """
- Test git.commit two ways:
- 1) First using git.add, then git.commit
- 2) Using git.commit with the 'filename' argument to skip staging
- """
- filename = "foo"
- commit_re_prefix = r"^\[master [0-9a-f]+\] "
- # Add a line
- with salt.utils.files.fopen(os.path.join(self.repo, filename), "a") as fp_:
- fp_.write("Added a line\n")
- # Stage the file
- self.run_function("git.add", [self.repo, filename])
- # Commit the staged file
- commit_msg = "Add a line to " + filename
- ret = self.run_function("git.commit", [self.repo, commit_msg])
- # Make sure the expected line is in the output
- self.assertTrue(bool(re.search(commit_re_prefix + commit_msg, ret)))
- # Add another line
- with salt.utils.files.fopen(os.path.join(self.repo, filename), "a") as fp_:
- fp_.write("Added another line\n")
- # Commit the second file without staging
- commit_msg = "Add another line to " + filename
- ret = self.run_function(
- "git.commit", [self.repo, commit_msg], filename=filename
- )
- self.assertTrue(bool(re.search(commit_re_prefix + commit_msg, ret)))
- @pytest.mark.slow_test(seconds=240) # Test takes >120 and <=240 seconds
- def test_config(self):
- """
- Test setting, getting, and unsetting config values
- WARNING: This test will modify and completely remove a config section
- 'foo', both in the repo created in setUp() and in the user's global
- .gitconfig.
- """
- def _clear_config():
- cmds = (
- ["git", "config", "--remove-section", "foo"],
- ["git", "config", "--global", "--remove-section", "foo"],
- )
- for cmd in cmds:
- with salt.utils.files.fopen(os.devnull, "w") as devnull:
- try:
- subprocess.check_call(cmd, stderr=devnull)
- except subprocess.CalledProcessError:
- pass
- cfg_local = {"foo.single": ["foo"], "foo.multi": ["foo", "bar", "baz"]}
- cfg_global = {"foo.single": ["abc"], "foo.multi": ["abc", "def", "ghi"]}
- _clear_config()
- try:
- log.debug("Try to specify both single and multivar (should raise error)")
- self.assertTrue(
- "Only one of 'value' and 'multivar' is permitted"
- in self.run_function(
- "git.config_set",
- ["foo.single"],
- value=cfg_local["foo.single"][0],
- multivar=cfg_local["foo.multi"],
- cwd=self.repo,
- )
- )
- log.debug(
- "Try to set single local value without cwd (should raise " "error)"
- )
- self.assertTrue(
- "'cwd' argument required unless global=True"
- in self.run_function(
- "git.config_set", ["foo.single"], value=cfg_local["foo.single"][0],
- )
- )
- log.debug("Set single local value")
- self.assertEqual(
- self.run_function(
- "git.config_set",
- ["foo.single"],
- value=cfg_local["foo.single"][0],
- cwd=self.repo,
- ),
- cfg_local["foo.single"],
- )
- log.debug("Set single global value")
- self.assertEqual(
- self.run_function(
- "git.config_set",
- ["foo.single"],
- value=cfg_global["foo.single"][0],
- **{"global": True}
- ),
- cfg_global["foo.single"],
- )
- log.debug("Set local multivar")
- self.assertEqual(
- self.run_function(
- "git.config_set",
- ["foo.multi"],
- multivar=cfg_local["foo.multi"],
- cwd=self.repo,
- ),
- cfg_local["foo.multi"],
- )
- log.debug("Set global multivar")
- self.assertEqual(
- self.run_function(
- "git.config_set",
- ["foo.multi"],
- multivar=cfg_global["foo.multi"],
- **{"global": True}
- ),
- cfg_global["foo.multi"],
- )
- log.debug("Get single local value")
- self.assertEqual(
- self.run_function("git.config_get", ["foo.single"], cwd=self.repo),
- cfg_local["foo.single"][0],
- )
- log.debug("Get single value from local multivar")
- self.assertEqual(
- self.run_function("git.config_get", ["foo.multi"], cwd=self.repo),
- cfg_local["foo.multi"][-1],
- )
- log.debug("Get all values from multivar (includes globals)")
- self.assertEqual(
- self.run_function(
- "git.config_get", ["foo.multi"], cwd=self.repo, **{"all": True}
- ),
- cfg_local["foo.multi"],
- )
- log.debug("Get single global value")
- self.assertEqual(
- self.run_function("git.config_get", ["foo.single"], **{"global": True}),
- cfg_global["foo.single"][0],
- )
- log.debug("Get single value from global multivar")
- self.assertEqual(
- self.run_function("git.config_get", ["foo.multi"], **{"global": True}),
- cfg_global["foo.multi"][-1],
- )
- log.debug("Get all values from global multivar")
- self.assertEqual(
- self.run_function(
- "git.config_get", ["foo.multi"], **{"all": True, "global": True}
- ),
- cfg_global["foo.multi"],
- )
- log.debug("Get all local keys/values using regex")
- self.assertEqual(
- self.run_function(
- "git.config_get_regexp", ["foo.(single|multi)"], cwd=self.repo
- ),
- cfg_local,
- )
- log.debug("Get all global keys/values using regex")
- self.assertEqual(
- self.run_function(
- "git.config_get_regexp",
- ["foo.(single|multi)"],
- cwd=self.repo,
- **{"global": True}
- ),
- cfg_global,
- )
- log.debug("Get just the local foo.multi values containing 'a'")
- self.assertEqual(
- self.run_function(
- "git.config_get_regexp",
- ["foo.multi"],
- value_regex="a",
- cwd=self.repo,
- ),
- {"foo.multi": [x for x in cfg_local["foo.multi"] if "a" in x]},
- )
- log.debug("Get just the global foo.multi values containing 'a'")
- self.assertEqual(
- self.run_function(
- "git.config_get_regexp",
- ["foo.multi"],
- value_regex="a",
- cwd=self.repo,
- **{"global": True}
- ),
- {"foo.multi": [x for x in cfg_global["foo.multi"] if "a" in x]},
- )
- # TODO: More robust unset testing, try to trigger all the
- # exceptions raised.
- log.debug("Unset a single local value")
- self.assertTrue(
- self.run_function("git.config_unset", ["foo.single"], cwd=self.repo,)
- )
- log.debug("Unset an entire local multivar")
- self.assertTrue(
- self.run_function(
- "git.config_unset", ["foo.multi"], cwd=self.repo, **{"all": True}
- )
- )
- log.debug("Unset a single global value")
- self.assertTrue(
- self.run_function(
- "git.config_unset", ["foo.single"], **{"global": True}
- )
- )
- log.debug("Unset an entire local multivar")
- self.assertTrue(
- self.run_function(
- "git.config_unset", ["foo.multi"], **{"all": True, "global": True}
- )
- )
- finally:
- _clear_config()
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_current_branch(self):
- """
- Test git.current_branch
- """
- self.assertEqual(self.run_function("git.current_branch", [self.repo]), "master")
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_describe(self):
- """
- Test git.describe
- """
- self.assertEqual(self.run_function("git.describe", [self.repo]), self.tags[0])
- # Test for git.fetch would be unreliable on Jenkins, skipping for now
- # The test should go into test_remotes when ready
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_init(self):
- """
- Use git.init to init a new repo
- """
- new_repo = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- # `tempfile.mkdtemp` gets the path to the Temp directory using
- # environment variables. As a result, folder names longer than 8
- # characters are shortened. For example "C:\Users\Administrators"
- # becomes "C:\Users\Admini~1". However, the "git.init" function returns
- # the full, unshortened name of the folder. Therefore you can't compare
- # the path returned by `tempfile.mkdtemp` and the results of `git.init`
- # exactly.
- if salt.utils.platform.is_windows():
- new_repo = new_repo.replace("\\", "/")
- # Get the name of the temp directory
- tmp_dir = os.path.basename(new_repo)
- # Get git output
- git_ret = self.run_function("git.init", [new_repo]).lower()
- self.assertIn("Initialized empty Git repository in".lower(), git_ret)
- self.assertIn(tmp_dir, git_ret)
- else:
- self.assertEqual(
- self.run_function("git.init", [new_repo]).lower(),
- "Initialized empty Git repository in {0}/.git/".format(
- new_repo
- ).lower(),
- )
- shutil.rmtree(new_repo)
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_list_branches(self):
- """
- Test git.list_branches
- """
- self.assertEqual(
- self.run_function("git.list_branches", [self.repo]), sorted(self.branches)
- )
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_list_tags(self):
- """
- Test git.list_tags
- """
- self.assertEqual(
- self.run_function("git.list_tags", [self.repo]), sorted(self.tags)
- )
- # Test for git.ls_remote will need to wait for now, while I think of how to
- # properly mock it.
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_merge(self):
- """
- Test git.merge
- # TODO: Test more than just a fast-forward merge
- """
- # Merge the second branch into the current branch
- ret = self.run_function("git.merge", [self.repo], rev=self.branches[1])
- # Merge should be a fast-forward
- self.assertTrue("Fast-forward" in ret.splitlines())
- @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
- def test_merge_base_and_tree(self):
- """
- Test git.merge_base, git.merge_tree and git.revision
- TODO: Test all of the arguments
- """
- # Get the SHA1 of current HEAD
- head_rev = self.run_function("git.revision", [self.repo], rev="HEAD")
- # Make sure revision is a 40-char string
- self.assertTrue(len(head_rev) == 40)
- # Get the second branch's SHA1
- second_rev = self.run_function(
- "git.revision", [self.repo], rev=self.branches[1], timeout=120
- )
- # Make sure revision is a 40-char string
- self.assertTrue(len(second_rev) == 40)
- # self.branches[1] should be just one commit ahead, so the merge base
- # for master and self.branches[1] should be the same as the current
- # HEAD.
- self.assertEqual(
- self.run_function(
- "git.merge_base", [self.repo], refs=",".join((head_rev, second_rev))
- ),
- head_rev,
- )
- # There should be no conflict here, so the return should be an empty
- # string.
- ret = self.run_function(
- "git.merge_tree", [self.repo, head_rev, second_rev]
- ).splitlines()
- self.assertTrue(len([x for x in ret if x.startswith("@@")]) == 1)
- # Test for git.pull would be unreliable on Jenkins, skipping for now
- # Test for git.push would be unreliable on Jenkins, skipping for now
- @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
- def test_rebase(self):
- """
- Test git.rebase
- """
- # Make a change to a different file than the one modifed in setUp
- file_path = os.path.join(self.repo, self.files[1])
- with salt.utils.files.fopen(file_path, "a") as fp_:
- fp_.write("Added a line\n")
- # Commit the change
- self.assertTrue(
- "ERROR"
- not in self.run_function(
- "git.commit",
- [self.repo, "Added a line to " + self.files[1]],
- filename=self.files[1],
- )
- )
- # Switch to the second branch
- self.assertTrue(
- "ERROR"
- not in self.run_function("git.checkout", [self.repo], rev=self.branches[1])
- )
- # Perform the rebase. The commit should show a comment about
- # self.files[0] being modified, as that is the file that was modified
- # in the second branch in the setUp function
- self.assertEqual(
- self.run_function("git.rebase", [self.repo]),
- "First, rewinding head to replay your work on top of it...\n"
- "Applying: Added a line to " + self.files[0],
- )
- # Test for git.remote_get is in test_remotes
- # Test for git.remote_set is in test_remotes
- @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
- def test_remotes(self):
- """
- Test setting a remote (git.remote_set), and getting a remote
- (git.remote_get and git.remotes)
- TODO: Properly mock fetching a remote (git.fetch), and build out more
- robust testing that confirms that the https auth bits work.
- """
- remotes = {
- "first": {"fetch": "/dev/null", "push": "/dev/null"},
- "second": {"fetch": "/dev/null", "push": "/dev/stdout"},
- }
- self.assertEqual(
- self.run_function(
- "git.remote_set", [self.repo, remotes["first"]["fetch"]], remote="first"
- ),
- remotes["first"],
- )
- self.assertEqual(
- self.run_function(
- "git.remote_set",
- [self.repo, remotes["second"]["fetch"]],
- remote="second",
- push_url=remotes["second"]["push"],
- ),
- remotes["second"],
- )
- self.assertEqual(self.run_function("git.remotes", [self.repo]), remotes)
- @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
- def test_reset(self):
- """
- Test git.reset
- TODO: Test more than just a hard reset
- """
- # Switch to the second branch
- self.assertTrue(
- "ERROR"
- not in self.run_function("git.checkout", [self.repo], rev=self.branches[1])
- )
- # Back up one commit. We should now be at the same revision as master
- self.run_function("git.reset", [self.repo], opts="--hard HEAD~1")
- # Get the SHA1 of current HEAD (remember, we're on the second branch)
- head_rev = self.run_function("git.revision", [self.repo], rev="HEAD")
- # Make sure revision is a 40-char string
- self.assertTrue(len(head_rev) == 40)
- # Get the master branch's SHA1
- master_rev = self.run_function("git.revision", [self.repo], rev="master")
- # Make sure revision is a 40-char string
- self.assertTrue(len(master_rev) == 40)
- # The two revisions should be the same
- self.assertEqual(head_rev, master_rev)
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_rev_parse(self):
- """
- Test git.rev_parse
- """
- # Using --abbrev-ref on HEAD will give us the current branch
- self.assertEqual(
- self.run_function(
- "git.rev_parse", [self.repo, "HEAD"], opts="--abbrev-ref"
- ),
- "master",
- )
- # Test for git.revision happens in test_merge_base
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_rm(self):
- """
- Test git.rm
- """
- single_file = self.files[0]
- entire_dir = self.dirs[1]
- # Remove a single file
- self.assertEqual(
- self.run_function("git.rm", [self.repo, single_file]),
- "rm '" + single_file + "'",
- )
- # Remove an entire dir
- expected = "\n".join(
- sorted(["rm '" + os.path.join(entire_dir, x) + "'" for x in self.files])
- )
- if salt.utils.platform.is_windows():
- expected = expected.replace("\\", "/")
- self.assertEqual(
- self.run_function("git.rm", [self.repo, entire_dir], opts="-r"), expected
- )
- @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
- def test_stash(self):
- """
- Test git.stash
- # TODO: test more stash actions
- """
- file_path = os.path.join(self.repo, self.files[0])
- with salt.utils.files.fopen(file_path, "a") as fp_:
- fp_.write("Temp change to be stashed")
- self.assertTrue("ERROR" not in self.run_function("git.stash", [self.repo]))
- # List stashes
- ret = self.run_function("git.stash", [self.repo], action="list")
- self.assertTrue("ERROR" not in ret)
- self.assertTrue(len(ret.splitlines()) == 1)
- # Apply the stash
- self.assertTrue(
- "ERROR"
- not in self.run_function(
- "git.stash", [self.repo], action="apply", opts="stash@{0}"
- )
- )
- # Drop the stash
- self.assertTrue(
- "ERROR"
- not in self.run_function(
- "git.stash", [self.repo], action="drop", opts="stash@{0}"
- )
- )
- @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
- def test_status(self):
- """
- Test git.status
- """
- changes = {
- "modified": ["foo"],
- "new": ["thisisdefinitelyanewfile"],
- "deleted": ["bar"],
- "untracked": ["thisisalsoanewfile"],
- }
- for filename in changes["modified"]:
- with salt.utils.files.fopen(os.path.join(self.repo, filename), "a") as fp_:
- fp_.write("Added a line\n")
- for filename in changes["new"]:
- with salt.utils.files.fopen(os.path.join(self.repo, filename), "w") as fp_:
- fp_.write(
- salt.utils.stringutils.to_str(
- "This is a new file named {0}.".format(filename)
- )
- )
- # Stage the new file so it shows up as a 'new' file
- self.assertTrue(
- "ERROR" not in self.run_function("git.add", [self.repo, filename])
- )
- for filename in changes["deleted"]:
- self.run_function("git.rm", [self.repo, filename])
- for filename in changes["untracked"]:
- with salt.utils.files.fopen(os.path.join(self.repo, filename), "w") as fp_:
- fp_.write(
- salt.utils.stringutils.to_str(
- "This is a new file named {0}.".format(filename)
- )
- )
- self.assertEqual(self.run_function("git.status", [self.repo]), changes)
- # TODO: Add git.submodule test
- @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
- def test_symbolic_ref(self):
- """
- Test git.symbolic_ref
- """
- self.assertEqual(
- self.run_function("git.symbolic_ref", [self.repo, "HEAD"], opts="--quiet"),
- "refs/heads/master",
- )
- @skipIf(
- not _worktrees_supported(), "Git 2.5 or newer required for worktree support"
- )
- @pytest.mark.slow_test(seconds=120) # Test takes >60 and <=120 seconds
- def test_worktree_add_rm(self):
- """
- This tests git.worktree_add, git.is_worktree, git.worktree_rm, and
- git.worktree_prune. Tests for 'git worktree list' are covered in
- tests.unit.modules.git_test.
- """
- # We don't need to enclose this comparison in a try/except, since the
- # decorator would skip this test if git is not installed and we'd never
- # get here in the first place.
- if _git_version() >= LooseVersion("2.6.0"):
- worktree_add_prefix = "Preparing "
- else:
- worktree_add_prefix = "Enter "
- worktree_path = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- worktree_basename = os.path.basename(worktree_path)
- worktree_path2 = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- worktree_basename2 = os.path.basename(worktree_path2)
- # Even though this is Windows, git commands return a unix style path
- if salt.utils.platform.is_windows():
- worktree_path = worktree_path.replace("\\", "/")
- worktree_path2 = worktree_path2.replace("\\", "/")
- # Add the worktrees
- ret = self.run_function("git.worktree_add", [self.repo, worktree_path],)
- self.assertTrue(worktree_add_prefix in ret)
- self.assertTrue(worktree_basename in ret)
- ret = self.run_function("git.worktree_add", [self.repo, worktree_path2])
- self.assertTrue(worktree_add_prefix in ret)
- self.assertTrue(worktree_basename2 in ret)
- # Check if this new path is a worktree
- self.assertTrue(self.run_function("git.is_worktree", [worktree_path]))
- # Check if the main repo is a worktree
- self.assertFalse(self.run_function("git.is_worktree", [self.repo]))
- # Check if a non-repo directory is a worktree
- empty_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- self.assertFalse(self.run_function("git.is_worktree", [empty_dir]))
- shutil.rmtree(empty_dir)
- # Remove the first worktree
- self.assertTrue(self.run_function("git.worktree_rm", [worktree_path]))
- # Prune the worktrees
- prune_message = (
- "Removing worktrees/{0}: gitdir file points to non-existent "
- "location".format(worktree_basename)
- )
- # Test dry run output. It should match the same output we get when we
- # actually prune the worktrees.
- result = self.run_function("git.worktree_prune", [self.repo], dry_run=True)
- self.assertEqual(result, prune_message)
- # Test pruning for real, and make sure the output is the same
- self.assertEqual(
- self.run_function("git.worktree_prune", [self.repo]), prune_message
- )
|