test_files.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. # -*- coding: utf-8 -*-
  2. '''
  3. Unit Tests for functions located in salt/utils/files.py
  4. '''
  5. # Import python libs
  6. from __future__ import absolute_import, unicode_literals, print_function
  7. import copy
  8. import os
  9. # Import Salt libs
  10. import salt.utils.files
  11. from salt.ext import six
  12. # Import Salt Testing libs
  13. from tests.support.helpers import with_tempdir
  14. from tests.support.unit import TestCase, skipIf
  15. from tests.support.mock import (
  16. patch,
  17. )
  18. class FilesTestCase(TestCase):
  19. '''
  20. Test case for files util.
  21. '''
  22. def test_safe_rm(self):
  23. with patch('os.remove') as os_remove_mock:
  24. salt.utils.files.safe_rm('dummy_tgt')
  25. self.assertTrue(os_remove_mock.called)
  26. @skipIf(os.path.exists('/tmp/no_way_this_is_a_file_nope.sh'), 'Test file exists! Skipping safe_rm_exceptions test!')
  27. def test_safe_rm_exceptions(self):
  28. error = False
  29. try:
  30. salt.utils.files.safe_rm('/tmp/no_way_this_is_a_file_nope.sh')
  31. except (IOError, OSError):
  32. error = True
  33. self.assertFalse(error, 'salt.utils.files.safe_rm raised exception when it should not have')
  34. @with_tempdir()
  35. def test_safe_walk_symlink_recursion(self, tmp):
  36. if os.stat(tmp).st_ino == 0:
  37. self.skipTest('inodes not supported in {0}'.format(tmp))
  38. os.mkdir(os.path.join(tmp, 'fax'))
  39. os.makedirs(os.path.join(tmp, 'foo', 'bar'))
  40. os.symlink(os.path.join('..', '..'), os.path.join(tmp, 'foo', 'bar', 'baz'))
  41. os.symlink('foo', os.path.join(tmp, 'root'))
  42. expected = [
  43. (os.path.join(tmp, 'root'), ['bar'], []),
  44. (os.path.join(tmp, 'root', 'bar'), ['baz'], []),
  45. (os.path.join(tmp, 'root', 'bar', 'baz'), ['fax', 'foo', 'root'], []),
  46. (os.path.join(tmp, 'root', 'bar', 'baz', 'fax'), [], []),
  47. ]
  48. paths = []
  49. for root, dirs, names in salt.utils.files.safe_walk(os.path.join(tmp, 'root')):
  50. paths.append((root, sorted(dirs), names))
  51. if paths != expected:
  52. raise AssertionError(
  53. '\n'.join(
  54. ['got:'] + [repr(p) for p in paths] +
  55. ['', 'expected:'] + [repr(p) for p in expected]
  56. )
  57. )
  58. @skipIf(not six.PY3, 'This test only applies to Python 3')
  59. def test_fopen_with_disallowed_fds(self):
  60. '''
  61. This is safe to have as a unit test since we aren't going to actually
  62. try to read or write. We want to ensure that we are raising a
  63. TypeError. Python 3's open() builtin will treat the booleans as file
  64. descriptor numbers and try to open stdin/stdout. We also want to test
  65. fd 2 which is stderr.
  66. '''
  67. for invalid_fn in (False, True, 0, 1, 2):
  68. try:
  69. with salt.utils.files.fopen(invalid_fn):
  70. pass
  71. except TypeError:
  72. # This is expected. We aren't using an assertRaises here
  73. # because we want to ensure that if we did somehow open the
  74. # filehandle, that it doesn't remain open.
  75. pass
  76. else:
  77. # We probably won't even get this far if we actually opened
  78. # stdin/stdout as a file descriptor. It is likely to cause the
  79. # integration suite to die since, news flash, closing
  80. # stdin/stdout/stderr is usually not a wise thing to do in the
  81. # middle of a program's execution.
  82. self.fail(
  83. 'fopen() should have been prevented from opening a file '
  84. 'using {0} as the filename'.format(invalid_fn)
  85. )
  86. def _create_temp_structure(self, temp_directory, structure):
  87. for folder, files in six.iteritems(structure):
  88. current_directory = os.path.join(temp_directory, folder)
  89. os.makedirs(current_directory)
  90. for name, content in six.iteritems(files):
  91. path = os.path.join(temp_directory, folder, name)
  92. with salt.utils.files.fopen(path, 'w+') as fh:
  93. fh.write(content)
  94. def _validate_folder_structure_and_contents(self, target_directory,
  95. desired_structure):
  96. for folder, files in six.iteritems(desired_structure):
  97. for name, content in six.iteritems(files):
  98. path = os.path.join(target_directory, folder, name)
  99. with salt.utils.files.fopen(path) as fh:
  100. assert fh.read().strip() == content
  101. @with_tempdir()
  102. @with_tempdir()
  103. def test_recursive_copy(self, src, dest):
  104. src_structure = {
  105. 'foo': {
  106. 'foofile.txt': 'fooSTRUCTURE'
  107. },
  108. 'bar': {
  109. 'barfile.txt': 'barSTRUCTURE'
  110. }
  111. }
  112. dest_structure = {
  113. 'foo': {
  114. 'foo.txt': 'fooTARGET_STRUCTURE'
  115. },
  116. 'baz': {
  117. 'baz.txt': 'bazTARGET_STRUCTURE'
  118. }
  119. }
  120. # Create the file structures in both src and dest dirs
  121. self._create_temp_structure(src, src_structure)
  122. self._create_temp_structure(dest, dest_structure)
  123. # Perform the recursive copy
  124. salt.utils.files.recursive_copy(src, dest)
  125. # Confirm results match expected results
  126. desired_structure = copy.copy(dest_structure)
  127. desired_structure.update(src_structure)
  128. self._validate_folder_structure_and_contents(
  129. dest,
  130. desired_structure)