test_file.py 14 KB


  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import getpass
  4. import os
  5. import shutil
  6. import sys
  7. import pytest
  8. import salt.utils.files
  9. import salt.utils.platform
  10. from tests.support.case import ModuleCase
  11. from tests.support.helpers import requires_system_grains
  12. from tests.support.runtests import RUNTIME_VARS
  13. from tests.support.unit import skipIf
  14. # Posix only
  15. try:
  16. import grp
  17. import pwd
  18. except ImportError:
  19. pass
  20. # Windows only
  21. try:
  22. import win32file
  23. except ImportError:
  24. pass
  25. def symlink(source, link_name):
  26. """
  27. Handle symlinks on Windows with Python < 3.2
  28. """
  29. if salt.utils.platform.is_windows():
  30. win32file.CreateSymbolicLink(link_name, source)
  31. else:
  32. os.symlink(source, link_name)
  33. @pytest.mark.windows_whitelisted
  34. class FileModuleTest(ModuleCase):
  35. """
  36. Validate the file module
  37. """
  38. def setUp(self):
  39. self.myfile = os.path.join(RUNTIME_VARS.TMP, "myfile")
  40. with salt.utils.files.fopen(self.myfile, "w+") as fp:
  41. fp.write(salt.utils.stringutils.to_str("Hello" + os.linesep))
  42. self.mydir = os.path.join(RUNTIME_VARS.TMP, "mydir/isawesome")
  43. if not os.path.isdir(self.mydir):
  44. # left behind... Don't fail because of this!
  45. os.makedirs(self.mydir)
  46. self.mysymlink = os.path.join(RUNTIME_VARS.TMP, "mysymlink")
  47. if os.path.islink(self.mysymlink) or os.path.isfile(self.mysymlink):
  48. os.remove(self.mysymlink)
  49. symlink(self.myfile, self.mysymlink)
  50. self.mybadsymlink = os.path.join(RUNTIME_VARS.TMP, "mybadsymlink")
  51. if os.path.islink(self.mybadsymlink) or os.path.isfile(self.mybadsymlink):
  52. os.remove(self.mybadsymlink)
  53. symlink("/nonexistentpath", self.mybadsymlink)
  54. super(FileModuleTest, self).setUp()
  55. def tearDown(self):
  56. if os.path.isfile(self.myfile):
  57. os.remove(self.myfile)
  58. if os.path.islink(self.mysymlink) or os.path.isfile(self.mysymlink):
  59. os.remove(self.mysymlink)
  60. if os.path.islink(self.mybadsymlink) or os.path.isfile(self.mybadsymlink):
  61. os.remove(self.mybadsymlink)
  62. shutil.rmtree(self.mydir, ignore_errors=True)
  63. super(FileModuleTest, self).tearDown()
  64. @skipIf(salt.utils.platform.is_windows(), "No security context on Windows")
  65. @requires_system_grains
  66. def test_get_selinux_context(self, grains):
  67. if grains.get("selinux", {}).get("enabled", False):
  68. NEW_CONTEXT = "system_u:object_r:system_conf_t:s0"
  69. self.run_function(
  70. "file.set_selinux_context", arg=[self.myfile, *(NEW_CONTEXT.split(":"))]
  71. )
  72. ret_file = self.run_function("file.get_selinux_context", arg=[self.myfile])
  73. self.assertEqual(ret_file, NEW_CONTEXT)
  74. # Issue #56557. Ensure that the context of the directory
  75. # containing one file is the context of the directory itself, and
  76. # not the context of the first file in the directory.
  77. self.run_function(
  78. "file.set_selinux_context", arg=[self.mydir, *(NEW_CONTEXT.split(":"))]
  79. )
  80. ret_dir = self.run_function("file.get_selinux_context", arg=[self.mydir])
  81. self.assertEqual(ret_dir, NEW_CONTEXT)
  82. ret_updir = self.run_function(
  83. "file.get_selinux_context",
  84. arg=[os.path.abspath(os.path.join(self.mydir, ".."))],
  85. )
  86. self.assertNotEqual(ret_updir, NEW_CONTEXT)
  87. else:
  88. ret_file = self.run_function("file.get_selinux_context", arg=[self.myfile])
  89. self.assertIn("No selinux context information is available", ret_file)
  90. @skipIf(salt.utils.platform.is_windows(), "No security context on Windows")
  91. @requires_system_grains
  92. def test_set_selinux_context(self, grains):
  93. if not grains.get("selinux", {}).get("enabled", False):
  94. self.skipTest("selinux not available")
  95. FILE_CONTEXT = "system_u:object_r:system_conf_t:s0"
  96. ret_file = self.run_function(
  97. "file.set_selinux_context", arg=[self.myfile, *(FILE_CONTEXT.split(":"))]
  98. )
  99. self.assertEqual(ret_file, FILE_CONTEXT)
  100. DIR_CONTEXT = "system_u:object_r:user_home_t:s0"
  101. ret_dir = self.run_function(
  102. "file.set_selinux_context", arg=[self.mydir, *(DIR_CONTEXT.split(":"))]
  103. )
  104. self.assertEqual(ret_dir, DIR_CONTEXT)
  105. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  106. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  107. def test_chown(self):
  108. user = getpass.getuser()
  109. if sys.platform == "darwin":
  110. group = "staff"
  111. elif sys.platform.startswith(("linux", "freebsd", "openbsd")):
  112. group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
  113. ret = self.run_function("file.chown", arg=[self.myfile, user, group])
  114. self.assertIsNone(ret)
  115. fstat = os.stat(self.myfile)
  116. self.assertEqual(fstat.st_uid, os.getuid())
  117. self.assertEqual(fstat.st_gid, grp.getgrnam(group).gr_gid)
  118. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  119. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  120. def test_chown_no_user(self):
  121. user = "notanyuseriknow"
  122. group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
  123. ret = self.run_function("file.chown", arg=[self.myfile, user, group])
  124. self.assertIn("not exist", ret)
  125. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  126. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  127. def test_chown_no_user_no_group(self):
  128. user = "notanyuseriknow"
  129. group = "notanygroupyoushoulduse"
  130. ret = self.run_function("file.chown", arg=[self.myfile, user, group])
  131. self.assertIn("Group does not exist", ret)
  132. self.assertIn("User does not exist", ret)
  133. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  134. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  135. def test_chown_no_path(self):
  136. user = getpass.getuser()
  137. if sys.platform == "darwin":
  138. group = "staff"
  139. elif sys.platform.startswith(("linux", "freebsd", "openbsd")):
  140. group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
  141. ret = self.run_function("file.chown", arg=["/tmp/nosuchfile", user, group])
  142. self.assertIn("File not found", ret)
  143. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  144. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  145. def test_chown_noop(self):
  146. user = ""
  147. group = ""
  148. ret = self.run_function("file.chown", arg=[self.myfile, user, group])
  149. self.assertIsNone(ret)
  150. fstat = os.stat(self.myfile)
  151. self.assertEqual(fstat.st_uid, os.getuid())
  152. self.assertEqual(fstat.st_gid, os.getgid())
  153. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  154. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  155. def test_chgrp(self):
  156. if sys.platform == "darwin":
  157. group = "everyone"
  158. elif sys.platform.startswith(("linux", "freebsd", "openbsd")):
  159. group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
  160. ret = self.run_function("file.chgrp", arg=[self.myfile, group])
  161. self.assertIsNone(ret)
  162. fstat = os.stat(self.myfile)
  163. self.assertEqual(fstat.st_gid, grp.getgrnam(group).gr_gid)
  164. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  165. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  166. def test_chgrp_failure(self):
  167. group = "thisgroupdoesntexist"
  168. ret = self.run_function("file.chgrp", arg=[self.myfile, group])
  169. self.assertIn("not exist", ret)
  170. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  171. def test_patch(self):
  172. if not self.run_function("cmd.has_exec", ["patch"]):
  173. self.skipTest("patch is not installed")
  174. src_patch = os.path.join(RUNTIME_VARS.FILES, "file", "base", "hello.patch")
  175. src_file = os.path.join(RUNTIME_VARS.TMP, "src.txt")
  176. with salt.utils.files.fopen(src_file, "w+") as fp:
  177. fp.write(salt.utils.stringutils.to_str("Hello\n"))
  178. # dry-run should not modify src_file
  179. ret = self.minion_run("file.patch", src_file, src_patch, dry_run=True)
  180. assert ret["retcode"] == 0, repr(ret)
  181. with salt.utils.files.fopen(src_file) as fp:
  182. self.assertEqual(salt.utils.stringutils.to_unicode(fp.read()), "Hello\n")
  183. ret = self.minion_run("file.patch", src_file, src_patch)
  184. assert ret["retcode"] == 0, repr(ret)
  185. with salt.utils.files.fopen(src_file) as fp:
  186. self.assertEqual(
  187. salt.utils.stringutils.to_unicode(fp.read()), "Hello world\n"
  188. )
  189. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  190. def test_remove_file(self):
  191. ret = self.run_function("file.remove", arg=[self.myfile])
  192. self.assertTrue(ret)
  193. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  194. def test_remove_dir(self):
  195. ret = self.run_function("file.remove", arg=[self.mydir])
  196. self.assertTrue(ret)
  197. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  198. def test_remove_symlink(self):
  199. ret = self.run_function("file.remove", arg=[self.mysymlink])
  200. self.assertTrue(ret)
  201. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  202. def test_remove_broken_symlink(self):
  203. ret = self.run_function("file.remove", arg=[self.mybadsymlink])
  204. self.assertTrue(ret)
  205. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  206. def test_cannot_remove(self):
  207. ret = self.run_function("file.remove", arg=["tty"])
  208. self.assertEqual(
  209. "ERROR executing 'file.remove': File path must be absolute: tty", ret
  210. )
  211. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  212. def test_source_list_for_single_file_returns_unchanged(self):
  213. ret = self.run_function(
  214. "file.source_list", ["salt://http/httpd.conf", "filehash", "base"]
  215. )
  216. self.assertEqual(list(ret), ["salt://http/httpd.conf", "filehash"])
  217. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  218. def test_source_list_for_single_local_file_slash_returns_unchanged(self):
  219. ret = self.run_function("file.source_list", [self.myfile, "filehash", "base"])
  220. self.assertEqual(list(ret), [self.myfile, "filehash"])
  221. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  222. def test_source_list_for_single_local_file_proto_returns_unchanged(self):
  223. ret = self.run_function(
  224. "file.source_list", ["file://" + self.myfile, "filehash", "base"]
  225. )
  226. self.assertEqual(list(ret), ["file://" + self.myfile, "filehash"])
  227. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  228. def test_file_line_changes_format(self):
  229. """
  230. Test file.line changes output formatting.
  231. Issue #41474
  232. """
  233. ret = self.minion_run(
  234. "file.line", self.myfile, "Goodbye", mode="insert", after="Hello"
  235. )
  236. self.assertIn("Hello" + os.linesep + "+Goodbye", ret)
  237. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  238. def test_file_line_content(self):
  239. self.minion_run(
  240. "file.line", self.myfile, "Goodbye", mode="insert", after="Hello"
  241. )
  242. with salt.utils.files.fopen(self.myfile, "r") as fp:
  243. content = fp.read()
  244. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  245. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  246. def test_file_line_duplicate_insert_after(self):
  247. """
  248. Test file.line duplicates line.
  249. Issue #50254
  250. """
  251. with salt.utils.files.fopen(self.myfile, "a") as fp:
  252. fp.write(salt.utils.stringutils.to_str("Goodbye" + os.linesep))
  253. self.minion_run(
  254. "file.line", self.myfile, "Goodbye", mode="insert", after="Hello"
  255. )
  256. with salt.utils.files.fopen(self.myfile, "r") as fp:
  257. content = fp.read()
  258. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  259. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  260. def test_file_line_duplicate_insert_before(self):
  261. """
  262. Test file.line duplicates line.
  263. Issue #50254
  264. """
  265. with salt.utils.files.fopen(self.myfile, "a") as fp:
  266. fp.write(salt.utils.stringutils.to_str("Goodbye" + os.linesep))
  267. self.minion_run(
  268. "file.line", self.myfile, "Hello", mode="insert", before="Goodbye"
  269. )
  270. with salt.utils.files.fopen(self.myfile, "r") as fp:
  271. content = fp.read()
  272. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  273. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  274. def test_file_line_duplicate_ensure_after(self):
  275. """
  276. Test file.line duplicates line.
  277. Issue #50254
  278. """
  279. with salt.utils.files.fopen(self.myfile, "a") as fp:
  280. fp.write(salt.utils.stringutils.to_str("Goodbye" + os.linesep))
  281. self.minion_run(
  282. "file.line", self.myfile, "Goodbye", mode="ensure", after="Hello"
  283. )
  284. with salt.utils.files.fopen(self.myfile, "r") as fp:
  285. content = fp.read()
  286. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  287. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  288. def test_file_line_duplicate_ensure_before(self):
  289. """
  290. Test file.line duplicates line.
  291. Issue #50254
  292. """
  293. with salt.utils.files.fopen(self.myfile, "a") as fp:
  294. fp.write(salt.utils.stringutils.to_str("Goodbye" + os.linesep))
  295. self.minion_run(
  296. "file.line", self.myfile, "Hello", mode="ensure", before="Goodbye"
  297. )
  298. with salt.utils.files.fopen(self.myfile, "r") as fp:
  299. content = fp.read()
  300. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)