test_file.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. import getpass
  2. import os
  3. import shutil
  4. import sys
  5. import pytest
  6. import salt.utils.files
  7. import salt.utils.platform
  8. from tests.support.case import ModuleCase
  9. from tests.support.helpers import requires_system_grains
  10. from tests.support.runtests import RUNTIME_VARS
  11. from tests.support.unit import skipIf
  12. # Posix only
  13. try:
  14. import grp
  15. import pwd
  16. except ImportError:
  17. pass
  18. # Windows only
  19. try:
  20. import win32file
  21. except ImportError:
  22. pass
  23. def symlink(source, link_name):
  24. """
  25. Handle symlinks on Windows with Python < 3.2
  26. """
  27. if salt.utils.platform.is_windows():
  28. win32file.CreateSymbolicLink(link_name, source)
  29. else:
  30. os.symlink(source, link_name)
  31. @pytest.mark.windows_whitelisted
  32. class FileModuleTest(ModuleCase):
  33. """
  34. Validate the file module
  35. """
  36. def setUp(self):
  37. self.myfile = os.path.join(RUNTIME_VARS.TMP, "myfile")
  38. with salt.utils.files.fopen(self.myfile, "w+") as fp:
  39. fp.write(salt.utils.stringutils.to_str("Hello" + os.linesep))
  40. self.mydir = os.path.join(RUNTIME_VARS.TMP, "mydir/isawesome")
  41. if not os.path.isdir(self.mydir):
  42. # left behind... Don't fail because of this!
  43. os.makedirs(self.mydir)
  44. self.mysymlink = os.path.join(RUNTIME_VARS.TMP, "mysymlink")
  45. if os.path.islink(self.mysymlink) or os.path.isfile(self.mysymlink):
  46. os.remove(self.mysymlink)
  47. symlink(self.myfile, self.mysymlink)
  48. self.mybadsymlink = os.path.join(RUNTIME_VARS.TMP, "mybadsymlink")
  49. if os.path.islink(self.mybadsymlink) or os.path.isfile(self.mybadsymlink):
  50. os.remove(self.mybadsymlink)
  51. symlink("/nonexistentpath", self.mybadsymlink)
  52. super().setUp()
  53. def tearDown(self):
  54. if os.path.isfile(self.myfile):
  55. os.remove(self.myfile)
  56. if os.path.islink(self.mysymlink) or os.path.isfile(self.mysymlink):
  57. os.remove(self.mysymlink)
  58. if os.path.islink(self.mybadsymlink) or os.path.isfile(self.mybadsymlink):
  59. os.remove(self.mybadsymlink)
  60. shutil.rmtree(self.mydir, ignore_errors=True)
  61. super().tearDown()
  62. @skipIf(salt.utils.platform.is_windows(), "No security context on Windows")
  63. @requires_system_grains
  64. def test_get_selinux_context(self, grains):
  65. if grains.get("selinux", {}).get("enabled", False):
  66. NEW_CONTEXT = "system_u:object_r:system_conf_t:s0"
  67. self.run_function(
  68. "file.set_selinux_context", arg=[self.myfile, *(NEW_CONTEXT.split(":"))]
  69. )
  70. ret_file = self.run_function("file.get_selinux_context", arg=[self.myfile])
  71. self.assertEqual(ret_file, NEW_CONTEXT)
  72. # Issue #56557. Ensure that the context of the directory
  73. # containing one file is the context of the directory itself, and
  74. # not the context of the first file in the directory.
  75. self.run_function(
  76. "file.set_selinux_context", arg=[self.mydir, *(NEW_CONTEXT.split(":"))]
  77. )
  78. ret_dir = self.run_function("file.get_selinux_context", arg=[self.mydir])
  79. self.assertEqual(ret_dir, NEW_CONTEXT)
  80. ret_updir = self.run_function(
  81. "file.get_selinux_context",
  82. arg=[os.path.abspath(os.path.join(self.mydir, ".."))],
  83. )
  84. self.assertNotEqual(ret_updir, NEW_CONTEXT)
  85. else:
  86. ret_file = self.run_function("file.get_selinux_context", arg=[self.myfile])
  87. self.assertIn("No selinux context information is available", ret_file)
  88. @skipIf(salt.utils.platform.is_windows(), "No security context on Windows")
  89. @requires_system_grains
  90. def test_set_selinux_context(self, grains):
  91. if not grains.get("selinux", {}).get("enabled", False):
  92. self.skipTest("selinux not available")
  93. FILE_CONTEXT = "system_u:object_r:system_conf_t:s0"
  94. ret_file = self.run_function(
  95. "file.set_selinux_context", arg=[self.myfile, *(FILE_CONTEXT.split(":"))]
  96. )
  97. self.assertEqual(ret_file, FILE_CONTEXT)
  98. DIR_CONTEXT = "system_u:object_r:user_home_t:s0"
  99. ret_dir = self.run_function(
  100. "file.set_selinux_context", arg=[self.mydir, *(DIR_CONTEXT.split(":"))]
  101. )
  102. self.assertEqual(ret_dir, DIR_CONTEXT)
  103. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  104. def test_chown(self):
  105. user = getpass.getuser()
  106. if sys.platform == "darwin":
  107. group = "staff"
  108. elif sys.platform.startswith(("linux", "freebsd", "openbsd")):
  109. group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
  110. ret = self.run_function("file.chown", arg=[self.myfile, user, group])
  111. self.assertIsNone(ret)
  112. fstat = os.stat(self.myfile)
  113. self.assertEqual(fstat.st_uid, os.getuid())
  114. self.assertEqual(fstat.st_gid, grp.getgrnam(group).gr_gid)
  115. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  116. def test_chown_no_user(self):
  117. user = "notanyuseriknow"
  118. group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
  119. ret = self.run_function("file.chown", arg=[self.myfile, user, group])
  120. self.assertIn("not exist", ret)
  121. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  122. def test_chown_no_user_no_group(self):
  123. user = "notanyuseriknow"
  124. group = "notanygroupyoushoulduse"
  125. ret = self.run_function("file.chown", arg=[self.myfile, user, group])
  126. self.assertIn("Group does not exist", ret)
  127. self.assertIn("User does not exist", ret)
  128. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  129. def test_chown_no_path(self):
  130. user = getpass.getuser()
  131. if sys.platform == "darwin":
  132. group = "staff"
  133. elif sys.platform.startswith(("linux", "freebsd", "openbsd")):
  134. group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
  135. ret = self.run_function("file.chown", arg=["/tmp/nosuchfile", user, group])
  136. self.assertIn("File not found", ret)
  137. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  138. def test_chown_noop(self):
  139. user = ""
  140. group = ""
  141. ret = self.run_function("file.chown", arg=[self.myfile, user, group])
  142. self.assertIsNone(ret)
  143. fstat = os.stat(self.myfile)
  144. self.assertEqual(fstat.st_uid, os.getuid())
  145. self.assertEqual(fstat.st_gid, os.getgid())
  146. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  147. def test_chgrp(self):
  148. if sys.platform == "darwin":
  149. group = "everyone"
  150. elif sys.platform.startswith(("linux", "freebsd", "openbsd")):
  151. group = grp.getgrgid(pwd.getpwuid(os.getuid()).pw_gid).gr_name
  152. ret = self.run_function("file.chgrp", arg=[self.myfile, group])
  153. self.assertIsNone(ret)
  154. fstat = os.stat(self.myfile)
  155. self.assertEqual(fstat.st_gid, grp.getgrnam(group).gr_gid)
  156. @skipIf(salt.utils.platform.is_windows(), "No chgrp on Windows")
  157. def test_chgrp_failure(self):
  158. group = "thisgroupdoesntexist"
  159. ret = self.run_function("file.chgrp", arg=[self.myfile, group])
  160. self.assertIn("not exist", ret)
  161. def test_patch(self):
  162. if not self.run_function("cmd.has_exec", ["patch"]):
  163. self.skipTest("patch is not installed")
  164. src_patch = os.path.join(RUNTIME_VARS.FILES, "file", "base", "hello.patch")
  165. src_file = os.path.join(RUNTIME_VARS.TMP, "src.txt")
  166. with salt.utils.files.fopen(src_file, "w+") as fp:
  167. fp.write(salt.utils.stringutils.to_str("Hello\n"))
  168. # dry-run should not modify src_file
  169. ret = self.minion_run("file.patch", src_file, src_patch, dry_run=True)
  170. assert ret["retcode"] == 0, repr(ret)
  171. with salt.utils.files.fopen(src_file) as fp:
  172. self.assertEqual(salt.utils.stringutils.to_unicode(fp.read()), "Hello\n")
  173. ret = self.minion_run("file.patch", src_file, src_patch)
  174. assert ret["retcode"] == 0, repr(ret)
  175. with salt.utils.files.fopen(src_file) as fp:
  176. self.assertEqual(
  177. salt.utils.stringutils.to_unicode(fp.read()), "Hello world\n"
  178. )
  179. def test_remove_file(self):
  180. ret = self.run_function("file.remove", arg=[self.myfile])
  181. self.assertTrue(ret)
  182. def test_remove_dir(self):
  183. ret = self.run_function("file.remove", arg=[self.mydir])
  184. self.assertTrue(ret)
  185. def test_remove_symlink(self):
  186. ret = self.run_function("file.remove", arg=[self.mysymlink])
  187. self.assertTrue(ret)
  188. def test_remove_broken_symlink(self):
  189. ret = self.run_function("file.remove", arg=[self.mybadsymlink])
  190. self.assertTrue(ret)
  191. def test_cannot_remove(self):
  192. ret = self.run_function("file.remove", arg=["tty"])
  193. self.assertEqual(
  194. "ERROR executing 'file.remove': File path must be absolute: tty", ret
  195. )
  196. def test_source_list_for_single_file_returns_unchanged(self):
  197. ret = self.run_function(
  198. "file.source_list", ["salt://http/httpd.conf", "filehash", "base"]
  199. )
  200. self.assertEqual(list(ret), ["salt://http/httpd.conf", "filehash"])
  201. def test_source_list_for_single_local_file_slash_returns_unchanged(self):
  202. ret = self.run_function("file.source_list", [self.myfile, "filehash", "base"])
  203. self.assertEqual(list(ret), [self.myfile, "filehash"])
  204. def test_source_list_for_single_local_file_proto_returns_unchanged(self):
  205. ret = self.run_function(
  206. "file.source_list", ["file://" + self.myfile, "filehash", "base"]
  207. )
  208. self.assertEqual(list(ret), ["file://" + self.myfile, "filehash"])
  209. def test_source_list_for_multiple_files_with_missing_files(self):
  210. file_list = [
  211. "salt://does/not/exist",
  212. "file://" + self.myfile,
  213. "http://localhost//does/not/exist",
  214. "salt://http/httpd.conf",
  215. ]
  216. ret = self.run_function("file.source_list", [file_list, "filehash", "base"])
  217. self.assertEqual(list(ret), ["file://" + self.myfile, "filehash"])
  218. def test_source_list_for_multiple_files_dict_with_missing_files(self):
  219. file_list = [
  220. {"salt://does/not/exist": "filehash"},
  221. {"file://" + self.myfile: "filehash"},
  222. {"http://localhost//does/not/exist": "filehash"},
  223. {"salt://http/httpd.conf": "filehash"},
  224. ]
  225. ret = self.run_function("file.source_list", [file_list, "", "base"])
  226. self.assertEqual(list(ret), ["file://" + self.myfile, "filehash"])
  227. def test_file_line_changes_format(self):
  228. """
  229. Test file.line changes output formatting.
  230. Issue #41474
  231. """
  232. ret = self.minion_run(
  233. "file.line", self.myfile, "Goodbye", mode="insert", after="Hello"
  234. )
  235. self.assertIn("Hello" + os.linesep + "+Goodbye", ret)
  236. def test_file_line_changes_entire_line(self):
  237. """
  238. Test file.line entire line matching
  239. Issue #49855
  240. """
  241. ret = self.minion_run(
  242. "file.line", self.myfile, "Goodbye", mode="insert", after="Hello"
  243. )
  244. assert "Hello" + os.linesep + "+Goodbye" in ret
  245. ret = self.minion_run(
  246. "file.line", self.myfile, "Goodbye 1", mode="insert", after="Hello"
  247. )
  248. assert (
  249. "Hello" + os.linesep + "+Goodbye 1" + os.linesep + " Goodbye" + os.linesep
  250. in ret
  251. )
  252. with salt.utils.files.fopen(self.myfile, "r") as fh_:
  253. content = fh_.read()
  254. assert (
  255. "Hello" + os.linesep + "Goodbye 1" + os.linesep + "Goodbye" + os.linesep
  256. == content
  257. )
  258. def test_file_line_content(self):
  259. self.minion_run(
  260. "file.line", self.myfile, "Goodbye", mode="insert", after="Hello"
  261. )
  262. with salt.utils.files.fopen(self.myfile, "r") as fp:
  263. content = fp.read()
  264. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  265. def test_file_line_duplicate_insert_after(self):
  266. """
  267. Test file.line duplicates line.
  268. Issue #50254
  269. """
  270. with salt.utils.files.fopen(self.myfile, "a") as fp:
  271. fp.write(salt.utils.stringutils.to_str("Goodbye" + os.linesep))
  272. self.minion_run(
  273. "file.line", self.myfile, "Goodbye", mode="insert", after="Hello"
  274. )
  275. with salt.utils.files.fopen(self.myfile, "r") as fp:
  276. content = fp.read()
  277. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  278. def test_file_line_duplicate_insert_before(self):
  279. """
  280. Test file.line duplicates line.
  281. Issue #50254
  282. """
  283. with salt.utils.files.fopen(self.myfile, "a") as fp:
  284. fp.write(salt.utils.stringutils.to_str("Goodbye" + os.linesep))
  285. self.minion_run(
  286. "file.line", self.myfile, "Hello", mode="insert", before="Goodbye"
  287. )
  288. with salt.utils.files.fopen(self.myfile, "r") as fp:
  289. content = fp.read()
  290. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  291. def test_file_line_duplicate_ensure_after(self):
  292. """
  293. Test file.line duplicates line.
  294. Issue #50254
  295. """
  296. with salt.utils.files.fopen(self.myfile, "a") as fp:
  297. fp.write(salt.utils.stringutils.to_str("Goodbye" + os.linesep))
  298. self.minion_run(
  299. "file.line", self.myfile, "Goodbye", mode="ensure", after="Hello"
  300. )
  301. with salt.utils.files.fopen(self.myfile, "r") as fp:
  302. content = fp.read()
  303. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  304. def test_file_line_duplicate_ensure_before(self):
  305. """
  306. Test file.line duplicates line.
  307. Issue #50254
  308. """
  309. with salt.utils.files.fopen(self.myfile, "a") as fp:
  310. fp.write(salt.utils.stringutils.to_str("Goodbye" + os.linesep))
  311. self.minion_run(
  312. "file.line", self.myfile, "Hello", mode="ensure", before="Goodbye"
  313. )
  314. with salt.utils.files.fopen(self.myfile, "r") as fp:
  315. content = fp.read()
  316. self.assertEqual(content, "Hello" + os.linesep + "Goodbye" + os.linesep)
  317. def test_file_read_bytes(self):
  318. """
  319. Test that ``file.read`` reads and returns ``bytes`` data
  320. """
  321. # Write some random bytes
  322. data = b"n\x1a\xf7S@tBI\xa9J"
  323. with salt.utils.files.fopen(self.myfile, "wb") as fp:
  324. fp.write(data)
  325. ret = self.minion_run("file.read", self.myfile, binary=True)
  326. self.assertEqual(type(ret), bytes)
  327. self.assertEqual(ret, data)
  328. def test_file_read_str(self):
  329. """
  330. Test that ``file.read`` reads and returns ``str`` data
  331. """
  332. # Write some str data
  333. data = "printable characters"
  334. with salt.utils.files.fopen(self.myfile, "w") as fp:
  335. fp.write(data)
  336. ret = self.minion_run("file.read", self.myfile)
  337. self.assertEqual(type(ret), str)
  338. self.assertEqual(ret, data)