test_files.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. # -*- coding: utf-8 -*-
  2. '''
  3. Unit Tests for functions located in salt/utils/files.py
  4. '''
  5. # Import python libs
  6. from __future__ import absolute_import, unicode_literals, print_function
  7. import copy
  8. import os
  9. # Import Salt libs
  10. import salt.utils.files
  11. from salt.ext import six
  12. # Import Salt Testing libs
  13. from tests.support.helpers import with_tempdir
  14. from tests.support.unit import TestCase, skipIf
  15. from tests.support.mock import (
  16. patch,
  17. NO_MOCK,
  18. NO_MOCK_REASON
  19. )
  20. class FilesTestCase(TestCase):
  21. '''
  22. Test case for files util.
  23. '''
  24. @skipIf(NO_MOCK, NO_MOCK_REASON)
  25. def test_safe_rm(self):
  26. with patch('os.remove') as os_remove_mock:
  27. salt.utils.files.safe_rm('dummy_tgt')
  28. self.assertTrue(os_remove_mock.called)
  29. @skipIf(os.path.exists('/tmp/no_way_this_is_a_file_nope.sh'), 'Test file exists! Skipping safe_rm_exceptions test!')
  30. def test_safe_rm_exceptions(self):
  31. error = False
  32. try:
  33. salt.utils.files.safe_rm('/tmp/no_way_this_is_a_file_nope.sh')
  34. except (IOError, OSError):
  35. error = True
  36. self.assertFalse(error, 'salt.utils.files.safe_rm raised exception when it should not have')
  37. @with_tempdir()
  38. def test_safe_walk_symlink_recursion(self, tmp):
  39. if os.stat(tmp).st_ino == 0:
  40. self.skipTest('inodes not supported in {0}'.format(tmp))
  41. os.mkdir(os.path.join(tmp, 'fax'))
  42. os.makedirs(os.path.join(tmp, 'foo', 'bar'))
  43. os.symlink(os.path.join('..', '..'), os.path.join(tmp, 'foo', 'bar', 'baz'))
  44. os.symlink('foo', os.path.join(tmp, 'root'))
  45. expected = [
  46. (os.path.join(tmp, 'root'), ['bar'], []),
  47. (os.path.join(tmp, 'root', 'bar'), ['baz'], []),
  48. (os.path.join(tmp, 'root', 'bar', 'baz'), ['fax', 'foo', 'root'], []),
  49. (os.path.join(tmp, 'root', 'bar', 'baz', 'fax'), [], []),
  50. ]
  51. paths = []
  52. for root, dirs, names in salt.utils.files.safe_walk(os.path.join(tmp, 'root')):
  53. paths.append((root, sorted(dirs), names))
  54. if paths != expected:
  55. raise AssertionError(
  56. '\n'.join(
  57. ['got:'] + [repr(p) for p in paths] +
  58. ['', 'expected:'] + [repr(p) for p in expected]
  59. )
  60. )
  61. @skipIf(not six.PY3, 'This test only applies to Python 3')
  62. def test_fopen_with_disallowed_fds(self):
  63. '''
  64. This is safe to have as a unit test since we aren't going to actually
  65. try to read or write. We want to ensure that we are raising a
  66. TypeError. Python 3's open() builtin will treat the booleans as file
  67. descriptor numbers and try to open stdin/stdout. We also want to test
  68. fd 2 which is stderr.
  69. '''
  70. for invalid_fn in (False, True, 0, 1, 2):
  71. try:
  72. with salt.utils.files.fopen(invalid_fn):
  73. pass
  74. except TypeError:
  75. # This is expected. We aren't using an assertRaises here
  76. # because we want to ensure that if we did somehow open the
  77. # filehandle, that it doesn't remain open.
  78. pass
  79. else:
  80. # We probably won't even get this far if we actually opened
  81. # stdin/stdout as a file descriptor. It is likely to cause the
  82. # integration suite to die since, news flash, closing
  83. # stdin/stdout/stderr is usually not a wise thing to do in the
  84. # middle of a program's execution.
  85. self.fail(
  86. 'fopen() should have been prevented from opening a file '
  87. 'using {0} as the filename'.format(invalid_fn)
  88. )
  89. def _create_temp_structure(self, temp_directory, structure):
  90. for folder, files in six.iteritems(structure):
  91. current_directory = os.path.join(temp_directory, folder)
  92. os.makedirs(current_directory)
  93. for name, content in six.iteritems(files):
  94. path = os.path.join(temp_directory, folder, name)
  95. with salt.utils.files.fopen(path, 'w+') as fh:
  96. fh.write(content)
  97. def _validate_folder_structure_and_contents(self, target_directory,
  98. desired_structure):
  99. for folder, files in six.iteritems(desired_structure):
  100. for name, content in six.iteritems(files):
  101. path = os.path.join(target_directory, folder, name)
  102. with salt.utils.files.fopen(path) as fh:
  103. assert fh.read().strip() == content
  104. @with_tempdir()
  105. @with_tempdir()
  106. def test_recursive_copy(self, src, dest):
  107. src_structure = {
  108. 'foo': {
  109. 'foofile.txt': 'fooSTRUCTURE'
  110. },
  111. 'bar': {
  112. 'barfile.txt': 'barSTRUCTURE'
  113. }
  114. }
  115. dest_structure = {
  116. 'foo': {
  117. 'foo.txt': 'fooTARGET_STRUCTURE'
  118. },
  119. 'baz': {
  120. 'baz.txt': 'bazTARGET_STRUCTURE'
  121. }
  122. }
  123. # Create the file structures in both src and dest dirs
  124. self._create_temp_structure(src, src_structure)
  125. self._create_temp_structure(dest, dest_structure)
  126. # Perform the recursive copy
  127. salt.utils.files.recursive_copy(src, dest)
  128. # Confirm results match expected results
  129. desired_structure = copy.copy(dest_structure)
  130. desired_structure.update(src_structure)
  131. self._validate_folder_structure_and_contents(
  132. dest,
  133. desired_structure)