1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021 |
- # -*- coding: utf-8 -*-
- '''
- Tests for git execution module
- NOTE: These tests may modify the global git config, and have been marked as
- destructive as a result. If no values are set for user.name or user.email in
- the user's global .gitconfig, then these tests will set one.
- '''
- # Import Python Libs
- from __future__ import absolute_import, print_function, unicode_literals
- from contextlib import closing
- import errno
- import logging
- import os
- import re
- import shutil
- import subprocess
- import tarfile
- import tempfile
- # Import Salt Testing libs
- from tests.support.runtests import RUNTIME_VARS
- from tests.support.case import ModuleCase
- from tests.support.unit import skipIf
- # Import salt libs
- import salt.utils.data
- import salt.utils.files
- import salt.utils.platform
- from salt.utils.versions import LooseVersion
- # Import 3rd-party libs
- import pytest
- from salt.ext import six
- log = logging.getLogger(__name__)
- def _git_version():
- try:
- git_version = subprocess.Popen(
- ['git', '--version'],
- shell=False,
- close_fds=False if salt.utils.platform.is_windows() else True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE).communicate()[0]
- except OSError:
- return False
- if not git_version:
- log.debug('Git not installed')
- return False
- git_version = git_version.strip().split()[-1]
- if six.PY3:
- git_version = git_version.decode(__salt_system_encoding__)
- log.debug('Detected git version: %s', git_version)
- return LooseVersion(git_version)
- def _worktrees_supported():
- '''
- Check if the git version is 2.5.0 or later
- '''
- try:
- return _git_version() >= LooseVersion('2.5.0')
- except AttributeError:
- return False
- def _makedirs(path):
- try:
- os.makedirs(path)
- except OSError as exc:
- # Don't raise an exception if the directory exists
- if exc.errno != errno.EEXIST:
- raise
- @pytest.mark.skip_if_binaries_missing('git')
- @pytest.mark.windows_whitelisted
- class GitModuleTest(ModuleCase):
- def setUp(self):
- super(GitModuleTest, self).setUp()
- self.orig_cwd = os.getcwd()
- self.addCleanup(os.chdir, self.orig_cwd)
- self.repo = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- self.addCleanup(shutil.rmtree, self.repo, ignore_errors=True)
- self.files = ('foo', 'bar', 'baz', 'питон')
- self.dirs = ('', 'qux')
- self.branches = ('master', 'iamanewbranch')
- self.tags = ('git_testing',)
- for dirname in self.dirs:
- dir_path = os.path.join(self.repo, dirname)
- _makedirs(dir_path)
- for filename in self.files:
- with salt.utils.files.fopen(os.path.join(dir_path, filename), 'wb') as fp_:
- fp_.write(
- 'This is a test file named {0}.'.format(filename).encode('utf-8')
- )
- # Navigate to the root of the repo to init, stage, and commit
- os.chdir(self.repo)
- # Initialize a new git repository
- subprocess.check_call(['git', 'init', '--quiet', self.repo])
- # Set user.name and user.email config attributes if not present
- for key, value in (('user.name', 'Jenkins'),
- ('user.email', 'qa@saltstack.com')):
- # Check if key is missing
- keycheck = subprocess.Popen(
- ['git', 'config', '--get', '--global', key],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- if keycheck.wait() != 0:
- # Set the key if it is not present
- subprocess.check_call(
- ['git', 'config', '--global', key, value])
- subprocess.check_call(['git', 'add', '.'])
- subprocess.check_call(
- ['git', 'commit', '--quiet', '--message', 'Initial commit']
- )
- # Add a tag
- subprocess.check_call(
- ['git', 'tag', '-a', self.tags[0], '-m', 'Add tag']
- )
- # Checkout a second branch
- subprocess.check_call(
- ['git', 'checkout', '--quiet', '-b', self.branches[1]]
- )
- # Add a line to the file
- with salt.utils.files.fopen(self.files[0], 'a') as fp_:
- fp_.write(salt.utils.stringutils.to_str('Added a line\n'))
- # Commit the updated file
- subprocess.check_call(
- ['git', 'commit', '--quiet',
- '--message', 'Added a line to ' + self.files[0], self.files[0]]
- )
- # Switch back to master
- subprocess.check_call(['git', 'checkout', '--quiet', 'master'])
- # Go back to original cwd
- os.chdir(self.orig_cwd)
- def run_function(self, *args, **kwargs):
- '''
- Ensure that results are decoded
- TODO: maybe move this behavior to ModuleCase itself?
- '''
- return salt.utils.data.decode(
- super(GitModuleTest, self).run_function(*args, **kwargs)
- )
- def tearDown(self):
- for key in ('orig_cwd', 'repo', 'files', 'dirs', 'branches', 'tags'):
- delattr(self, key)
- super(GitModuleTest, self).tearDown()
- def test_add_dir(self):
- '''
- Test git.add with a directory
- '''
- newdir = 'quux'
- # Change to the repo dir
- newdir_path = os.path.join(self.repo, newdir)
- _makedirs(newdir_path)
- files = [os.path.join(newdir_path, x) for x in self.files]
- files_relpath = [os.path.join(newdir, x) for x in self.files]
- for path in files:
- with salt.utils.files.fopen(path, 'wb') as fp_:
- fp_.write(
- 'This is a test file with relative path {0}.\n'.format(path).encode('utf-8')
- )
- ret = self.run_function('git.add', [self.repo, newdir])
- res = '\n'.join(sorted(['add \'{0}\''.format(x) for x in files_relpath]))
- if salt.utils.platform.is_windows():
- res = res.replace('\\', '/')
- self.assertEqual(ret, res)
- def test_add_file(self):
- '''
- Test git.add with a file
- '''
- filename = 'quux'
- file_path = os.path.join(self.repo, filename)
- with salt.utils.files.fopen(file_path, 'w') as fp_:
- fp_.write(salt.utils.stringutils.to_str(
- 'This is a test file named {0}.\n'.format(filename)
- ))
- ret = self.run_function('git.add', [self.repo, filename])
- self.assertEqual(ret, 'add \'{0}\''.format(filename))
- def test_archive(self):
- '''
- Test git.archive
- '''
- tar_archive = os.path.join(RUNTIME_VARS.TMP, 'test_archive.tar.gz')
- try:
- self.assertTrue(
- self.run_function(
- 'git.archive',
- [self.repo, tar_archive],
- prefix='foo/'
- )
- )
- self.assertTrue(tarfile.is_tarfile(tar_archive))
- self.run_function('cmd.run', ['cp ' + tar_archive + ' /root/'])
- with closing(tarfile.open(tar_archive, 'r')) as tar_obj:
- self.assertEqual(
- sorted(salt.utils.data.decode(tar_obj.getnames())),
- sorted([
- 'foo', 'foo/bar', 'foo/baz', 'foo/foo', 'foo/питон',
- 'foo/qux', 'foo/qux/bar', 'foo/qux/baz', 'foo/qux/foo',
- 'foo/qux/питон'
- ])
- )
- finally:
- try:
- os.unlink(tar_archive)
- except OSError:
- pass
- def test_archive_subdir(self):
- '''
- Test git.archive on a subdir, giving only a partial copy of the repo in
- the resulting archive
- '''
- tar_archive = os.path.join(RUNTIME_VARS.TMP, 'test_archive.tar.gz')
- try:
- self.assertTrue(
- self.run_function(
- 'git.archive',
- [os.path.join(self.repo, 'qux'), tar_archive],
- prefix='foo/'
- )
- )
- self.assertTrue(tarfile.is_tarfile(tar_archive))
- with closing(tarfile.open(tar_archive, 'r')) as tar_obj:
- self.assertEqual(
- sorted(salt.utils.data.decode(tar_obj.getnames())),
- sorted(['foo', 'foo/bar', 'foo/baz', 'foo/foo', 'foo/питон'])
- )
- finally:
- try:
- os.unlink(tar_archive)
- except OSError:
- pass
- def test_branch(self):
- '''
- Test creating, renaming, and deleting a branch using git.branch
- '''
- renamed_branch = 'ihavebeenrenamed'
- self.assertTrue(
- self.run_function('git.branch', [self.repo, self.branches[1]])
- )
- self.assertTrue(
- self.run_function(
- 'git.branch',
- [self.repo, renamed_branch],
- opts='-m ' + self.branches[1]
- )
- )
- self.assertTrue(
- self.run_function(
- 'git.branch',
- [self.repo, renamed_branch],
- opts='-D'
- )
- )
- def test_checkout(self):
- '''
- Test checking out a new branch and then checking out master again
- '''
- new_branch = 'iamanothernewbranch'
- self.assertEqual(
- self.run_function(
- 'git.checkout',
- [self.repo, 'HEAD'],
- opts='-b ' + new_branch
- ),
- 'Switched to a new branch \'' + new_branch + '\''
- )
- self.assertTrue(
- 'Switched to branch \'master\'' in
- self.run_function('git.checkout', [self.repo, 'master']),
- )
- def test_checkout_no_rev(self):
- '''
- Test git.checkout without a rev, both with -b in opts and without
- '''
- new_branch = 'iamanothernewbranch'
- self.assertEqual(
- self.run_function(
- 'git.checkout', [self.repo], rev=None, opts='-b ' + new_branch
- ),
- 'Switched to a new branch \'' + new_branch + '\''
- )
- self.assertTrue(
- '\'rev\' argument is required unless -b or -B in opts' in
- self.run_function('git.checkout', [self.repo])
- )
- def test_clone(self):
- '''
- Test cloning an existing repo
- '''
- clone_parent_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- self.assertTrue(
- self.run_function('git.clone', [clone_parent_dir, self.repo])
- )
- # Cleanup after yourself
- shutil.rmtree(clone_parent_dir, True)
- def test_clone_with_alternate_name(self):
- '''
- Test cloning an existing repo with an alternate name for the repo dir
- '''
- clone_parent_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- clone_name = os.path.basename(self.repo)
- # Change to newly-created temp dir
- self.assertTrue(
- self.run_function(
- 'git.clone',
- [clone_parent_dir, self.repo],
- name=clone_name
- )
- )
- # Cleanup after yourself
- shutil.rmtree(clone_parent_dir, True)
- def test_commit(self):
- '''
- Test git.commit two ways:
- 1) First using git.add, then git.commit
- 2) Using git.commit with the 'filename' argument to skip staging
- '''
- filename = 'foo'
- commit_re_prefix = r'^\[master [0-9a-f]+\] '
- # Add a line
- with salt.utils.files.fopen(os.path.join(self.repo, filename), 'a') as fp_:
- fp_.write('Added a line\n')
- # Stage the file
- self.run_function('git.add', [self.repo, filename])
- # Commit the staged file
- commit_msg = 'Add a line to ' + filename
- ret = self.run_function('git.commit', [self.repo, commit_msg])
- # Make sure the expected line is in the output
- self.assertTrue(bool(re.search(commit_re_prefix + commit_msg, ret)))
- # Add another line
- with salt.utils.files.fopen(os.path.join(self.repo, filename), 'a') as fp_:
- fp_.write('Added another line\n')
- # Commit the second file without staging
- commit_msg = 'Add another line to ' + filename
- ret = self.run_function(
- 'git.commit',
- [self.repo, commit_msg],
- filename=filename
- )
- self.assertTrue(bool(re.search(commit_re_prefix + commit_msg, ret)))
- def test_config(self):
- '''
- Test setting, getting, and unsetting config values
- WARNING: This test will modify and completely remove a config section
- 'foo', both in the repo created in setUp() and in the user's global
- .gitconfig.
- '''
- def _clear_config():
- cmds = (
- ['git', 'config', '--remove-section', 'foo'],
- ['git', 'config', '--global', '--remove-section', 'foo']
- )
- for cmd in cmds:
- with salt.utils.files.fopen(os.devnull, 'w') as devnull:
- try:
- subprocess.check_call(cmd, stderr=devnull)
- except subprocess.CalledProcessError:
- pass
- cfg_local = {
- 'foo.single': ['foo'],
- 'foo.multi': ['foo', 'bar', 'baz']
- }
- cfg_global = {
- 'foo.single': ['abc'],
- 'foo.multi': ['abc', 'def', 'ghi']
- }
- _clear_config()
- try:
- log.debug(
- 'Try to specify both single and multivar (should raise error)'
- )
- self.assertTrue(
- 'Only one of \'value\' and \'multivar\' is permitted' in
- self.run_function(
- 'git.config_set',
- ['foo.single'],
- value=cfg_local['foo.single'][0],
- multivar=cfg_local['foo.multi'],
- cwd=self.repo
- )
- )
- log.debug(
- 'Try to set single local value without cwd (should raise '
- 'error)'
- )
- self.assertTrue(
- '\'cwd\' argument required unless global=True' in
- self.run_function(
- 'git.config_set',
- ['foo.single'],
- value=cfg_local['foo.single'][0],
- )
- )
- log.debug('Set single local value')
- self.assertEqual(
- self.run_function(
- 'git.config_set',
- ['foo.single'],
- value=cfg_local['foo.single'][0],
- cwd=self.repo
- ),
- cfg_local['foo.single']
- )
- log.debug('Set single global value')
- self.assertEqual(
- self.run_function(
- 'git.config_set',
- ['foo.single'],
- value=cfg_global['foo.single'][0],
- **{'global': True}
- ),
- cfg_global['foo.single']
- )
- log.debug('Set local multivar')
- self.assertEqual(
- self.run_function(
- 'git.config_set',
- ['foo.multi'],
- multivar=cfg_local['foo.multi'],
- cwd=self.repo
- ),
- cfg_local['foo.multi']
- )
- log.debug('Set global multivar')
- self.assertEqual(
- self.run_function(
- 'git.config_set',
- ['foo.multi'],
- multivar=cfg_global['foo.multi'],
- **{'global': True}
- ),
- cfg_global['foo.multi']
- )
- log.debug('Get single local value')
- self.assertEqual(
- self.run_function(
- 'git.config_get',
- ['foo.single'],
- cwd=self.repo
- ),
- cfg_local['foo.single'][0]
- )
- log.debug('Get single value from local multivar')
- self.assertEqual(
- self.run_function(
- 'git.config_get',
- ['foo.multi'],
- cwd=self.repo
- ),
- cfg_local['foo.multi'][-1]
- )
- log.debug('Get all values from multivar (includes globals)')
- self.assertEqual(
- self.run_function(
- 'git.config_get',
- ['foo.multi'],
- cwd=self.repo,
- **{'all': True}
- ),
- cfg_local['foo.multi']
- )
- log.debug('Get single global value')
- self.assertEqual(
- self.run_function(
- 'git.config_get',
- ['foo.single'],
- **{'global': True}
- ),
- cfg_global['foo.single'][0]
- )
- log.debug('Get single value from global multivar')
- self.assertEqual(
- self.run_function(
- 'git.config_get',
- ['foo.multi'],
- **{'global': True}
- ),
- cfg_global['foo.multi'][-1]
- )
- log.debug('Get all values from global multivar')
- self.assertEqual(
- self.run_function(
- 'git.config_get',
- ['foo.multi'],
- **{'all': True, 'global': True}
- ),
- cfg_global['foo.multi']
- )
- log.debug('Get all local keys/values using regex')
- self.assertEqual(
- self.run_function(
- 'git.config_get_regexp',
- ['foo.(single|multi)'],
- cwd=self.repo
- ),
- cfg_local
- )
- log.debug('Get all global keys/values using regex')
- self.assertEqual(
- self.run_function(
- 'git.config_get_regexp',
- ['foo.(single|multi)'],
- cwd=self.repo,
- **{'global': True}
- ),
- cfg_global
- )
- log.debug('Get just the local foo.multi values containing \'a\'')
- self.assertEqual(
- self.run_function(
- 'git.config_get_regexp',
- ['foo.multi'],
- value_regex='a',
- cwd=self.repo
- ),
- {'foo.multi': [x for x in cfg_local['foo.multi'] if 'a' in x]}
- )
- log.debug('Get just the global foo.multi values containing \'a\'')
- self.assertEqual(
- self.run_function(
- 'git.config_get_regexp',
- ['foo.multi'],
- value_regex='a',
- cwd=self.repo,
- **{'global': True}
- ),
- {'foo.multi': [x for x in cfg_global['foo.multi'] if 'a' in x]}
- )
- # TODO: More robust unset testing, try to trigger all the
- # exceptions raised.
- log.debug('Unset a single local value')
- self.assertTrue(
- self.run_function(
- 'git.config_unset',
- ['foo.single'],
- cwd=self.repo,
- )
- )
- log.debug('Unset an entire local multivar')
- self.assertTrue(
- self.run_function(
- 'git.config_unset',
- ['foo.multi'],
- cwd=self.repo,
- **{'all': True}
- )
- )
- log.debug('Unset a single global value')
- self.assertTrue(
- self.run_function(
- 'git.config_unset',
- ['foo.single'],
- **{'global': True}
- )
- )
- log.debug('Unset an entire local multivar')
- self.assertTrue(
- self.run_function(
- 'git.config_unset',
- ['foo.multi'],
- **{'all': True, 'global': True}
- )
- )
- except Exception:
- raise
- finally:
- _clear_config()
- def test_current_branch(self):
- '''
- Test git.current_branch
- '''
- self.assertEqual(
- self.run_function('git.current_branch', [self.repo]),
- 'master'
- )
- def test_describe(self):
- '''
- Test git.describe
- '''
- self.assertEqual(
- self.run_function('git.describe', [self.repo]),
- self.tags[0]
- )
- # Test for git.fetch would be unreliable on Jenkins, skipping for now
- # The test should go into test_remotes when ready
- def test_init(self):
- '''
- Use git.init to init a new repo
- '''
- new_repo = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- # `tempfile.mkdtemp` gets the path to the Temp directory using
- # environment variables. As a result, folder names longer than 8
- # characters are shortened. For example "C:\Users\Administrators"
- # becomes "C:\Users\Admini~1". However, the "git.init" function returns
- # the full, unshortened name of the folder. Therefore you can't compare
- # the path returned by `tempfile.mkdtemp` and the results of `git.init`
- # exactly.
- if salt.utils.platform.is_windows():
- new_repo = new_repo.replace('\\', '/')
- # Get the name of the temp directory
- tmp_dir = os.path.basename(new_repo)
- # Get git output
- git_ret = self.run_function('git.init', [new_repo]).lower()
- self.assertIn(
- 'Initialized empty Git repository in'.lower(), git_ret)
- self.assertIn(tmp_dir, git_ret)
- else:
- self.assertEqual(
- self.run_function('git.init', [new_repo]).lower(),
- 'Initialized empty Git repository in {0}/.git/'.format(new_repo).lower()
- )
- shutil.rmtree(new_repo)
- def test_list_branches(self):
- '''
- Test git.list_branches
- '''
- self.assertEqual(
- self.run_function('git.list_branches', [self.repo]),
- sorted(self.branches)
- )
- def test_list_tags(self):
- '''
- Test git.list_tags
- '''
- self.assertEqual(
- self.run_function('git.list_tags', [self.repo]),
- sorted(self.tags)
- )
- # Test for git.ls_remote will need to wait for now, while I think of how to
- # properly mock it.
- def test_merge(self):
- '''
- Test git.merge
- # TODO: Test more than just a fast-forward merge
- '''
- # Merge the second branch into the current branch
- ret = self.run_function(
- 'git.merge',
- [self.repo],
- rev=self.branches[1]
- )
- # Merge should be a fast-forward
- self.assertTrue('Fast-forward' in ret.splitlines())
- def test_merge_base_and_tree(self):
- '''
- Test git.merge_base, git.merge_tree and git.revision
- TODO: Test all of the arguments
- '''
- # Get the SHA1 of current HEAD
- head_rev = self.run_function('git.revision', [self.repo], rev='HEAD')
- # Make sure revision is a 40-char string
- self.assertTrue(len(head_rev) == 40)
- # Get the second branch's SHA1
- second_rev = self.run_function(
- 'git.revision',
- [self.repo],
- rev=self.branches[1],
- timeout=120
- )
- # Make sure revision is a 40-char string
- self.assertTrue(len(second_rev) == 40)
- # self.branches[1] should be just one commit ahead, so the merge base
- # for master and self.branches[1] should be the same as the current
- # HEAD.
- self.assertEqual(
- self.run_function(
- 'git.merge_base',
- [self.repo],
- refs=','.join((head_rev, second_rev))
- ),
- head_rev
- )
- # There should be no conflict here, so the return should be an empty
- # string.
- ret = self.run_function(
- 'git.merge_tree',
- [self.repo, head_rev, second_rev]
- ).splitlines()
- self.assertTrue(len([x for x in ret if x.startswith('@@')]) == 1)
- # Test for git.pull would be unreliable on Jenkins, skipping for now
- # Test for git.push would be unreliable on Jenkins, skipping for now
- def test_rebase(self):
- '''
- Test git.rebase
- '''
- # Make a change to a different file than the one modifed in setUp
- file_path = os.path.join(self.repo, self.files[1])
- with salt.utils.files.fopen(file_path, 'a') as fp_:
- fp_.write('Added a line\n')
- # Commit the change
- self.assertTrue(
- 'ERROR' not in self.run_function(
- 'git.commit',
- [self.repo, 'Added a line to ' + self.files[1]],
- filename=self.files[1]
- )
- )
- # Switch to the second branch
- self.assertTrue(
- 'ERROR' not in self.run_function(
- 'git.checkout',
- [self.repo],
- rev=self.branches[1]
- )
- )
- # Perform the rebase. The commit should show a comment about
- # self.files[0] being modified, as that is the file that was modified
- # in the second branch in the setUp function
- self.assertEqual(
- self.run_function('git.rebase', [self.repo]),
- 'First, rewinding head to replay your work on top of it...\n'
- 'Applying: Added a line to ' + self.files[0]
- )
- # Test for git.remote_get is in test_remotes
- # Test for git.remote_set is in test_remotes
- def test_remotes(self):
- '''
- Test setting a remote (git.remote_set), and getting a remote
- (git.remote_get and git.remotes)
- TODO: Properly mock fetching a remote (git.fetch), and build out more
- robust testing that confirms that the https auth bits work.
- '''
- remotes = {
- 'first': {'fetch': '/dev/null', 'push': '/dev/null'},
- 'second': {'fetch': '/dev/null', 'push': '/dev/stdout'}
- }
- self.assertEqual(
- self.run_function(
- 'git.remote_set',
- [self.repo, remotes['first']['fetch']],
- remote='first'
- ),
- remotes['first']
- )
- self.assertEqual(
- self.run_function(
- 'git.remote_set',
- [self.repo, remotes['second']['fetch']],
- remote='second',
- push_url=remotes['second']['push']
- ),
- remotes['second']
- )
- self.assertEqual(
- self.run_function('git.remotes', [self.repo]),
- remotes
- )
- def test_reset(self):
- '''
- Test git.reset
- TODO: Test more than just a hard reset
- '''
- # Switch to the second branch
- self.assertTrue(
- 'ERROR' not in self.run_function(
- 'git.checkout',
- [self.repo],
- rev=self.branches[1]
- )
- )
- # Back up one commit. We should now be at the same revision as master
- self.run_function(
- 'git.reset',
- [self.repo],
- opts='--hard HEAD~1'
- )
- # Get the SHA1 of current HEAD (remember, we're on the second branch)
- head_rev = self.run_function('git.revision', [self.repo], rev='HEAD')
- # Make sure revision is a 40-char string
- self.assertTrue(len(head_rev) == 40)
- # Get the master branch's SHA1
- master_rev = self.run_function(
- 'git.revision',
- [self.repo],
- rev='master'
- )
- # Make sure revision is a 40-char string
- self.assertTrue(len(master_rev) == 40)
- # The two revisions should be the same
- self.assertEqual(head_rev, master_rev)
- def test_rev_parse(self):
- '''
- Test git.rev_parse
- '''
- # Using --abbrev-ref on HEAD will give us the current branch
- self.assertEqual(
- self.run_function(
- 'git.rev_parse', [self.repo, 'HEAD'], opts='--abbrev-ref'
- ),
- 'master'
- )
- # Test for git.revision happens in test_merge_base
- def test_rm(self):
- '''
- Test git.rm
- '''
- single_file = self.files[0]
- entire_dir = self.dirs[1]
- # Remove a single file
- self.assertEqual(
- self.run_function('git.rm', [self.repo, single_file]),
- 'rm \'' + single_file + '\''
- )
- # Remove an entire dir
- expected = '\n'.join(
- sorted(['rm \'' + os.path.join(entire_dir, x) + '\''
- for x in self.files])
- )
- if salt.utils.platform.is_windows():
- expected = expected.replace('\\', '/')
- self.assertEqual(
- self.run_function('git.rm', [self.repo, entire_dir], opts='-r'),
- expected
- )
- def test_stash(self):
- '''
- Test git.stash
- # TODO: test more stash actions
- '''
- file_path = os.path.join(self.repo, self.files[0])
- with salt.utils.files.fopen(file_path, 'a') as fp_:
- fp_.write('Temp change to be stashed')
- self.assertTrue(
- 'ERROR' not in self.run_function('git.stash', [self.repo])
- )
- # List stashes
- ret = self.run_function('git.stash', [self.repo], action='list')
- self.assertTrue('ERROR' not in ret)
- self.assertTrue(len(ret.splitlines()) == 1)
- # Apply the stash
- self.assertTrue(
- 'ERROR' not in self.run_function(
- 'git.stash',
- [self.repo],
- action='apply',
- opts='stash@{0}'
- )
- )
- # Drop the stash
- self.assertTrue(
- 'ERROR' not in self.run_function(
- 'git.stash',
- [self.repo],
- action='drop',
- opts='stash@{0}'
- )
- )
- def test_status(self):
- '''
- Test git.status
- '''
- changes = {
- 'modified': ['foo'],
- 'new': ['thisisdefinitelyanewfile'],
- 'deleted': ['bar'],
- 'untracked': ['thisisalsoanewfile']
- }
- for filename in changes['modified']:
- with salt.utils.files.fopen(os.path.join(self.repo, filename), 'a') as fp_:
- fp_.write('Added a line\n')
- for filename in changes['new']:
- with salt.utils.files.fopen(os.path.join(self.repo, filename), 'w') as fp_:
- fp_.write(salt.utils.stringutils.to_str(
- 'This is a new file named {0}.'.format(filename)
- ))
- # Stage the new file so it shows up as a 'new' file
- self.assertTrue(
- 'ERROR' not in self.run_function(
- 'git.add',
- [self.repo, filename]
- )
- )
- for filename in changes['deleted']:
- self.run_function('git.rm', [self.repo, filename])
- for filename in changes['untracked']:
- with salt.utils.files.fopen(os.path.join(self.repo, filename), 'w') as fp_:
- fp_.write(salt.utils.stringutils.to_str(
- 'This is a new file named {0}.'.format(filename)
- ))
- self.assertEqual(
- self.run_function('git.status', [self.repo]),
- changes
- )
- # TODO: Add git.submodule test
- def test_symbolic_ref(self):
- '''
- Test git.symbolic_ref
- '''
- self.assertEqual(
- self.run_function(
- 'git.symbolic_ref',
- [self.repo, 'HEAD'],
- opts='--quiet'
- ),
- 'refs/heads/master'
- )
- @skipIf(not _worktrees_supported(),
- 'Git 2.5 or newer required for worktree support')
- def test_worktree_add_rm(self):
- '''
- This tests git.worktree_add, git.is_worktree, git.worktree_rm, and
- git.worktree_prune. Tests for 'git worktree list' are covered in
- tests.unit.modules.git_test.
- '''
- # We don't need to enclose this comparison in a try/except, since the
- # decorator would skip this test if git is not installed and we'd never
- # get here in the first place.
- if _git_version() >= LooseVersion('2.6.0'):
- worktree_add_prefix = 'Preparing '
- else:
- worktree_add_prefix = 'Enter '
- worktree_path = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- worktree_basename = os.path.basename(worktree_path)
- worktree_path2 = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- worktree_basename2 = os.path.basename(worktree_path2)
- # Even though this is Windows, git commands return a unix style path
- if salt.utils.platform.is_windows():
- worktree_path = worktree_path.replace('\\', '/')
- worktree_path2 = worktree_path2.replace('\\', '/')
- # Add the worktrees
- ret = self.run_function(
- 'git.worktree_add', [self.repo, worktree_path],
- )
- self.assertTrue(worktree_add_prefix in ret)
- self.assertTrue(worktree_basename in ret)
- ret = self.run_function(
- 'git.worktree_add', [self.repo, worktree_path2]
- )
- self.assertTrue(worktree_add_prefix in ret)
- self.assertTrue(worktree_basename2 in ret)
- # Check if this new path is a worktree
- self.assertTrue(self.run_function('git.is_worktree', [worktree_path]))
- # Check if the main repo is a worktree
- self.assertFalse(self.run_function('git.is_worktree', [self.repo]))
- # Check if a non-repo directory is a worktree
- empty_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- self.assertFalse(self.run_function('git.is_worktree', [empty_dir]))
- shutil.rmtree(empty_dir)
- # Remove the first worktree
- self.assertTrue(self.run_function('git.worktree_rm', [worktree_path]))
- # Prune the worktrees
- prune_message = (
- 'Removing worktrees/{0}: gitdir file points to non-existent '
- 'location'.format(worktree_basename)
- )
- # Test dry run output. It should match the same output we get when we
- # actually prune the worktrees.
- result = self.run_function('git.worktree_prune',
- [self.repo],
- dry_run=True)
- self.assertEqual(result, prune_message)
- # Test pruning for real, and make sure the output is the same
- self.assertEqual(
- self.run_function('git.worktree_prune', [self.repo]),
- prune_message
- )
|