test_file.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import getpass
  4. import os
  5. import shutil
  6. import sys
  7. import pytest
  8. import salt.utils.files
  9. import salt.utils.platform
  10. from tests.support.case import ModuleCase
  11. from tests.support.helpers import requires_system_grains
  12. from tests.support.runtests import RUNTIME_VARS
  13. from tests.support.unit import skipIf
  14. # Posix only
  15. try:
  16. import grp
  17. import pwd
  18. except ImportError:
  19. pass
  20. # Windows only
  21. try:
  22. import win32file
  23. except ImportError:
  24. pass
  25. def symlink(source, link_name):
  26. """
  27. Handle symlinks on Windows with Python < 3.2
  28. """
  29. if salt.utils.platform.is_windows():
  30. win32file.CreateSymbolicLink(link_name, source)
  31. else:
  32. os.symlink(source, link_name)
  33. @pytest.mark.windows_whitelisted
  34. class FileModuleTest(ModuleCase):
  35. """
  36. Validate the file module
  37. """
  38. def setUp(self):
  39. self.myfile = os.path.join(RUNTIME_VARS.TMP, "myfile")
  40. with salt.utils.files.fopen(self.myfile, "w+") as fp:
  41. fp.write(salt.utils.stringutils.to_str("Hello" + os.linesep))
  42. self.mydir = os.path.join(RUNTIME_VARS.TMP, "mydir/isawesome")
  43. if not os.path.isdir(self.mydir):
  44. # left behind... Don't fail because of this!
  45. os.makedirs(self.mydir)
  46. self.mysymlink = os.path.join(RUNTIME_VARS.TMP, "mysymlink")
  47. if os.path.islink(self.mysymlink) or os.path.isfile(self.mysymlink):
  48. os.remove(self.mysymlink)
  49. symlink(self.myfile, self.mysymlink)
  50. self.mybadsymlink = os.path.join(RUNTIME_VARS.TMP, "mybadsymlink")
  51. if os.path.islink(self.mybadsymlink) or os.path.isfile(self.mybadsymlink):
  52. os.remove(self.mybadsymlink)
  53. symlink("/nonexistentpath", self.mybadsymlink)
  54. super(FileModuleTest, self).setUp()
  55. def tearDown(self):
  56. if os.path.isfile(self.myfile):
  57. os.remove(self.myfile)
  58. if os.path.islink(self.mysymlink) or os.path.isfile(self.mysymlink):
  59. os.remove(self.mysymlink)
  60. if os.path.islink(self.mybadsymlink) or os.path.isfile(self.mybadsymlink):
  61. os.remove(self.mybadsymlink)
  62. shutil.rmtree(self.mydir, ignore_errors=True)
  63. super(FileModuleTest, self).tearDown()
  64. @skipIf(salt.utils.platform.is_windows(), "No security context on Windows")
  65. @requires_system_grains
  66. def test_get_selinux_context(self, grains):
  67. if grains.get("selinux", {}).get("enabled", False):
  68. NEW_CONTEXT = "system_u:object_r:system_conf_t:s0"
  69. self.run_function(
  70. "file.set_selinux_context", arg=[self.myfile, *(NEW_CONTEXT.split(":"))]
  71. )
  72. ret_file = self.run_function("file.get_selinux_context", arg=[self.myfile])
  73. self.assertEqual(ret_file, NEW_CONTEXT)
  74. # Issue #56557. Ensure that the context of the directory
  75. # containing one file is the context of the directory itself, and
  76. # not the context of the first file in the directory.
  77. self.run_function(
  78. "file.set_selinux_context", arg=[self.mydir, *(NEW_CONTEXT.split(":"))]
  79. )
  80. ret_dir = self.run_function("file.get_selinux_context", arg=[self.mydir])
  81. self.assertEqual(ret_dir, NEW_CONTEXT)
  82. ret_updir = self.run_function(
  83. "file.get_selinux_context",
  84. arg=[os.path.abspath(os.path.join(self.mydir, ".."))],
  85. )
  86. self.assertNotEqual(ret_updir, NEW_CONTEXT)
  87. else:
  88. ret_file = self.run_function("file.get_selinux_context", arg=[self.myfile])
  89. self.assertIn("No selinux context information is available", ret_file)
  90. @skipIf(salt.utils.platform.is_windows(), "No security context on Windows")
  91. @requires_system_grains
  92. def test_set_selinux_context(self, grains):
  93. if not grains.get("selinux", {}).get("enabled", False):
  94. self.skipTest("selinux not available")
  95. FILE_CONTEXT = "system_u:object_r:system_conf_t:s0"
  96. ret_file = self.run_function(
  97. "file.set_selinux_context", arg=[self.myfile, *(FILE_CONTEXT.split(":"))]
  98. )
  99. self.assertEqual(ret_file, FILE_CONTEXT)
  100. DIR_CONTEXT = "system_u:object_r:user_home_t:s0"
  101. ret_dir = self.run_function(
  102. "file.set_selinux_context", arg=[self.mydir, *(DIR_CONTEXT.split(":"))]
  103. )
  104. self.assertEqual(ret_dir, DIR_CONTEXT)
  105. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  106. def test_chown(self):
  107. user = getpass.getuser()
  108. if sys.platform == "darwin":
  109. group = "staff"
  110. elif sys.platform.startswith(("linux", "freebsd", "openbsd")):
  111. group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
  112. ret = self.run_function("file.chown", arg=[self.myfile, user, group])
  113. self.assertIsNone(ret)
  114. fstat = os.stat(self.myfile)
  115. self.assertEqual(fstat.st_uid, os.getuid())
  116. self.assertEqual(fstat.st_gid, grp.getgrnam(group).gr_gid)
  117. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  118. def test_chown_no_user(self):
  119. user = "notanyuseriknow"
  120. group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
  121. ret = self.run_function("file.chown", arg=[self.myfile, user, group])
  122. self.assertIn("not exist", ret)
  123. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  124. def test_chown_no_user_no_group(self):
  125. user = "notanyuseriknow"
  126. group = "notanygroupyoushoulduse"
  127. ret = self.run_function("file.chown", arg=[self.myfile, user, group])
  128. self.assertIn("Group does not exist", ret)
  129. self.assertIn("User does not exist", ret)
  130. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  131. def test_chown_no_path(self):
  132. user = getpass.getuser()
  133. if sys.platform == "darwin":
  134. group = "staff"
  135. elif sys.platform.startswith(("linux", "freebsd", "openbsd")):
  136. group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
  137. ret = self.run_function("file.chown", arg=["/tmp/nosuchfile", user, group])
  138. self.assertIn("File not found", ret)
  139. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  140. def test_chown_noop(self):
  141. user = ""
  142. group = ""
  143. ret = self.run_function("file.chown", arg=[self.myfile, user, group])
  144. self.assertIsNone(ret)
  145. fstat = os.stat(self.myfile)
  146. self.assertEqual(fstat.st_uid, os.getuid())
  147. self.assertEqual(fstat.st_gid, os.getgid())
  148. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  149. def test_chgrp(self):
  150. if sys.platform == "darwin":
  151. group = "everyone"
  152. elif sys.platform.startswith(("linux", "freebsd", "openbsd")):
  153. group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
  154. ret = self.run_function("file.chgrp", arg=[self.myfile, group])
  155. self.assertIsNone(ret)
  156. fstat = os.stat(self.myfile)
  157. self.assertEqual(fstat.st_gid, grp.getgrnam(group).gr_gid)
  158. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  159. def test_chgrp_failure(self):
  160. group = "thisgroupdoesntexist"
  161. ret = self.run_function("file.chgrp", arg=[self.myfile, group])
  162. self.assertIn("not exist", ret)
  163. def test_patch(self):
  164. if not self.run_function("cmd.has_exec", ["patch"]):
  165. self.skipTest("patch is not installed")
  166. src_patch = os.path.join(RUNTIME_VARS.FILES, "file", "base", "hello.patch")
  167. src_file = os.path.join(RUNTIME_VARS.TMP, "src.txt")
  168. with salt.utils.files.fopen(src_file, "w+") as fp:
  169. fp.write(salt.utils.stringutils.to_str("Hello\n"))
  170. # dry-run should not modify src_file
  171. ret = self.minion_run("file.patch", src_file, src_patch, dry_run=True)
  172. assert ret["retcode"] == 0, repr(ret)
  173. with salt.utils.files.fopen(src_file) as fp:
  174. self.assertEqual(salt.utils.stringutils.to_unicode(fp.read()), "Hello\n")
  175. ret = self.minion_run("file.patch", src_file, src_patch)
  176. assert ret["retcode"] == 0, repr(ret)
  177. with salt.utils.files.fopen(src_file) as fp:
  178. self.assertEqual(
  179. salt.utils.stringutils.to_unicode(fp.read()), "Hello world\n"
  180. )
  181. def test_remove_file(self):
  182. ret = self.run_function("file.remove", arg=[self.myfile])
  183. self.assertTrue(ret)
  184. def test_remove_dir(self):
  185. ret = self.run_function("file.remove", arg=[self.mydir])
  186. self.assertTrue(ret)
  187. def test_remove_symlink(self):
  188. ret = self.run_function("file.remove", arg=[self.mysymlink])
  189. self.assertTrue(ret)
  190. def test_remove_broken_symlink(self):
  191. ret = self.run_function("file.remove", arg=[self.mybadsymlink])
  192. self.assertTrue(ret)
  193. def test_cannot_remove(self):
  194. ret = self.run_function("file.remove", arg=["tty"])
  195. self.assertEqual(
  196. "ERROR executing 'file.remove': File path must be absolute: tty", ret
  197. )
  198. def test_source_list_for_single_file_returns_unchanged(self):
  199. ret = self.run_function(
  200. "file.source_list", ["salt://http/httpd.conf", "filehash", "base"]
  201. )
  202. self.assertEqual(list(ret), ["salt://http/httpd.conf", "filehash"])
  203. def test_source_list_for_single_local_file_slash_returns_unchanged(self):
  204. ret = self.run_function("file.source_list", [self.myfile, "filehash", "base"])
  205. self.assertEqual(list(ret), [self.myfile, "filehash"])
  206. def test_source_list_for_single_local_file_proto_returns_unchanged(self):
  207. ret = self.run_function(
  208. "file.source_list", ["file://" + self.myfile, "filehash", "base"]
  209. )
  210. self.assertEqual(list(ret), ["file://" + self.myfile, "filehash"])
  211. def test_file_line_changes_format(self):
  212. """
  213. Test file.line changes output formatting.
  214. Issue #41474
  215. """
  216. ret = self.minion_run(
  217. "file.line", self.myfile, "Goodbye", mode="insert", after="Hello"
  218. )
  219. self.assertIn("Hello" + os.linesep + "+Goodbye", ret)
  220. def test_file_line_changes_entire_line(self):
  221. """
  222. Test file.line entire line matching
  223. Issue #49855
  224. """
  225. ret = self.minion_run(
  226. "file.line", self.myfile, "Goodbye", mode="insert", after="Hello"
  227. )
  228. assert "Hello" + os.linesep + "+Goodbye" in ret
  229. ret = self.minion_run(
  230. "file.line", self.myfile, "Goodbye 1", mode="insert", after="Hello"
  231. )
  232. assert (
  233. "Hello" + os.linesep + "+Goodbye 1" + os.linesep + " Goodbye" + os.linesep
  234. in ret
  235. )
  236. with salt.utils.files.fopen(self.myfile, "r") as fh_:
  237. content = fh_.read()
  238. assert (
  239. "Hello" + os.linesep + "Goodbye 1" + os.linesep + "Goodbye" + os.linesep
  240. == content
  241. )
  242. def test_file_line_content(self):
  243. self.minion_run(
  244. "file.line", self.myfile, "Goodbye", mode="insert", after="Hello"
  245. )
  246. with salt.utils.files.fopen(self.myfile, "r") as fp:
  247. content = fp.read()
  248. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  249. def test_file_line_duplicate_insert_after(self):
  250. """
  251. Test file.line duplicates line.
  252. Issue #50254
  253. """
  254. with salt.utils.files.fopen(self.myfile, "a") as fp:
  255. fp.write(salt.utils.stringutils.to_str("Goodbye" + os.linesep))
  256. self.minion_run(
  257. "file.line", self.myfile, "Goodbye", mode="insert", after="Hello"
  258. )
  259. with salt.utils.files.fopen(self.myfile, "r") as fp:
  260. content = fp.read()
  261. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  262. def test_file_line_duplicate_insert_before(self):
  263. """
  264. Test file.line duplicates line.
  265. Issue #50254
  266. """
  267. with salt.utils.files.fopen(self.myfile, "a") as fp:
  268. fp.write(salt.utils.stringutils.to_str("Goodbye" + os.linesep))
  269. self.minion_run(
  270. "file.line", self.myfile, "Hello", mode="insert", before="Goodbye"
  271. )
  272. with salt.utils.files.fopen(self.myfile, "r") as fp:
  273. content = fp.read()
  274. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  275. def test_file_line_duplicate_ensure_after(self):
  276. """
  277. Test file.line duplicates line.
  278. Issue #50254
  279. """
  280. with salt.utils.files.fopen(self.myfile, "a") as fp:
  281. fp.write(salt.utils.stringutils.to_str("Goodbye" + os.linesep))
  282. self.minion_run(
  283. "file.line", self.myfile, "Goodbye", mode="ensure", after="Hello"
  284. )
  285. with salt.utils.files.fopen(self.myfile, "r") as fp:
  286. content = fp.read()
  287. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  288. def test_file_line_duplicate_ensure_before(self):
  289. """
  290. Test file.line duplicates line.
  291. Issue #50254
  292. """
  293. with salt.utils.files.fopen(self.myfile, "a") as fp:
  294. fp.write(salt.utils.stringutils.to_str("Goodbye" + os.linesep))
  295. self.minion_run(
  296. "file.line", self.myfile, "Hello", mode="ensure", before="Goodbye"
  297. )
  298. with salt.utils.files.fopen(self.myfile, "r") as fp:
  299. content = fp.read()
  300. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)