test_file.py 182 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests for the file state
  4. """
  5. from __future__ import absolute_import, print_function, unicode_literals
  6. import errno
  7. import filecmp
  8. import logging
  9. import os
  10. import re
  11. import shutil
  12. import stat
  13. import sys
  14. import tempfile
  15. import textwrap
  16. import pytest
  17. import salt.serializers.configparser
  18. import salt.utils.data
  19. import salt.utils.files
  20. import salt.utils.json
  21. import salt.utils.path
  22. import salt.utils.platform
  23. import salt.utils.stringutils
  24. from salt.ext import six
  25. from salt.ext.six.moves import range
  26. from salt.utils.versions import LooseVersion as _LooseVersion
  27. from tests.support.case import ModuleCase
  28. from tests.support.helpers import (
  29. Webserver,
  30. dedent,
  31. destructiveTest,
  32. skip_if_not_root,
  33. with_system_user_and_group,
  34. with_tempdir,
  35. with_tempfile,
  36. )
  37. from tests.support.mixins import SaltReturnAssertsMixin
  38. from tests.support.runtests import RUNTIME_VARS
  39. from tests.support.unit import skipIf
  40. log = logging.getLogger(__name__)
  41. HAS_PWD = True
  42. try:
  43. import pwd
  44. except ImportError:
  45. HAS_PWD = False
  46. HAS_GRP = True
  47. try:
  48. import grp
  49. except ImportError:
  50. HAS_GRP = False
  51. IS_WINDOWS = salt.utils.platform.is_windows()
  52. BINARY_FILE = b"GIF89a\x01\x00\x01\x00\x80\x00\x00\x05\x04\x04\x00\x00\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02D\x01\x00;"
  53. TEST_SYSTEM_USER = "test_system_user"
  54. TEST_SYSTEM_GROUP = "test_system_group"
  55. def _test_managed_file_mode_keep_helper(testcase, local=False):
  56. """
  57. DRY helper function to run the same test with a local or remote path
  58. """
  59. name = os.path.join(RUNTIME_VARS.TMP, "scene33")
  60. grail_fs_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
  61. grail = "salt://grail/scene33" if not local else grail_fs_path
  62. # Get the current mode so that we can put the file back the way we
  63. # found it when we're done.
  64. grail_fs_mode = int(testcase.run_function("file.get_mode", [grail_fs_path]), 8)
  65. initial_mode = 0o770
  66. new_mode_1 = 0o600
  67. new_mode_2 = 0o644
  68. # Set the initial mode, so we can be assured that when we set the mode
  69. # to "keep", we're actually changing the permissions of the file to the
  70. # new mode.
  71. ret = testcase.run_state(
  72. "file.managed", name=name, mode=oct(initial_mode), source=grail,
  73. )
  74. if IS_WINDOWS:
  75. testcase.assertSaltFalseReturn(ret)
  76. return
  77. testcase.assertSaltTrueReturn(ret)
  78. try:
  79. # Update the mode on the fileserver (pass 1)
  80. os.chmod(grail_fs_path, new_mode_1)
  81. ret = testcase.run_state("file.managed", name=name, mode="keep", source=grail,)
  82. testcase.assertSaltTrueReturn(ret)
  83. managed_mode = stat.S_IMODE(os.stat(name).st_mode)
  84. testcase.assertEqual(oct(managed_mode), oct(new_mode_1))
  85. # Update the mode on the fileserver (pass 2)
  86. # This assures us that if the file in file_roots was originally set
  87. # to the same mode as new_mode_1, we definitely get an updated mode
  88. # this time.
  89. os.chmod(grail_fs_path, new_mode_2)
  90. ret = testcase.run_state("file.managed", name=name, mode="keep", source=grail,)
  91. testcase.assertSaltTrueReturn(ret)
  92. managed_mode = stat.S_IMODE(os.stat(name).st_mode)
  93. testcase.assertEqual(oct(managed_mode), oct(new_mode_2))
  94. finally:
  95. # Set the mode of the file in the file_roots back to what it
  96. # originally was.
  97. os.chmod(grail_fs_path, grail_fs_mode)
  98. @pytest.mark.windows_whitelisted
  99. class FileTest(ModuleCase, SaltReturnAssertsMixin):
  100. """
  101. Validate the file state
  102. """
  103. def _delete_file(self, path):
  104. try:
  105. os.remove(path)
  106. except OSError as exc:
  107. if exc.errno != errno.ENOENT:
  108. log.error("Failed to remove %s: %s", path, exc)
  109. def tearDown(self):
  110. """
  111. remove files created in previous tests
  112. """
  113. user = "salt"
  114. if user in str(self.run_function("user.list_users")):
  115. self.run_function("user.delete", [user])
  116. def test_symlink(self):
  117. """
  118. file.symlink
  119. """
  120. name = os.path.join(RUNTIME_VARS.TMP, "symlink")
  121. tgt = os.path.join(RUNTIME_VARS.TMP, "target")
  122. # Windows must have a source directory to link to
  123. if IS_WINDOWS and not os.path.isdir(tgt):
  124. os.mkdir(tgt)
  125. # Windows cannot create a symlink if it already exists
  126. if IS_WINDOWS and self.run_function("file.is_link", [name]):
  127. self.run_function("file.remove", [name])
  128. ret = self.run_state("file.symlink", name=name, target=tgt)
  129. self.assertSaltTrueReturn(ret)
  130. def test_test_symlink(self):
  131. """
  132. file.symlink test interface
  133. """
  134. name = os.path.join(RUNTIME_VARS.TMP, "symlink2")
  135. tgt = os.path.join(RUNTIME_VARS.TMP, "target")
  136. ret = self.run_state("file.symlink", test=True, name=name, target=tgt)
  137. self.assertSaltNoneReturn(ret)
  138. def test_absent_file(self):
  139. """
  140. file.absent
  141. """
  142. name = os.path.join(RUNTIME_VARS.TMP, "file_to_kill")
  143. with salt.utils.files.fopen(name, "w+") as fp_:
  144. fp_.write("killme")
  145. ret = self.run_state("file.absent", name=name)
  146. self.assertSaltTrueReturn(ret)
  147. self.assertFalse(os.path.isfile(name))
  148. def test_absent_dir(self):
  149. """
  150. file.absent
  151. """
  152. name = os.path.join(RUNTIME_VARS.TMP, "dir_to_kill")
  153. if not os.path.isdir(name):
  154. # left behind... Don't fail because of this!
  155. os.makedirs(name)
  156. ret = self.run_state("file.absent", name=name)
  157. self.assertSaltTrueReturn(ret)
  158. self.assertFalse(os.path.isdir(name))
  159. def test_absent_link(self):
  160. """
  161. file.absent
  162. """
  163. name = os.path.join(RUNTIME_VARS.TMP, "link_to_kill")
  164. tgt = "{0}.tgt".format(name)
  165. # Windows must have a source directory to link to
  166. if IS_WINDOWS and not os.path.isdir(tgt):
  167. os.mkdir(tgt)
  168. if not self.run_function("file.is_link", [name]):
  169. self.run_function("file.symlink", [tgt, name])
  170. ret = self.run_state("file.absent", name=name)
  171. try:
  172. self.assertSaltTrueReturn(ret)
  173. self.assertFalse(self.run_function("file.is_link", [name]))
  174. finally:
  175. if self.run_function("file.is_link", [name]):
  176. self.run_function("file.remove", [name])
  177. @with_tempfile()
  178. def test_test_absent(self, name):
  179. """
  180. file.absent test interface
  181. """
  182. with salt.utils.files.fopen(name, "w+") as fp_:
  183. fp_.write("killme")
  184. ret = self.run_state("file.absent", test=True, name=name)
  185. self.assertSaltNoneReturn(ret)
  186. self.assertTrue(os.path.isfile(name))
  187. def test_managed(self):
  188. """
  189. file.managed
  190. """
  191. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  192. ret = self.run_state("file.managed", name=name, source="salt://grail/scene33")
  193. src = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
  194. with salt.utils.files.fopen(src, "r") as fp_:
  195. master_data = fp_.read()
  196. with salt.utils.files.fopen(name, "r") as fp_:
  197. minion_data = fp_.read()
  198. self.assertEqual(master_data, minion_data)
  199. self.assertSaltTrueReturn(ret)
  200. def test_managed_file_mode(self):
  201. """
  202. file.managed, correct file permissions
  203. """
  204. desired_mode = 504 # 0770 octal
  205. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  206. ret = self.run_state(
  207. "file.managed", name=name, mode="0770", source="salt://grail/scene33"
  208. )
  209. if IS_WINDOWS:
  210. expected = "The 'mode' option is not supported on Windows"
  211. self.assertEqual(ret[list(ret)[0]]["comment"], expected)
  212. self.assertSaltFalseReturn(ret)
  213. return
  214. resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
  215. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  216. self.assertSaltTrueReturn(ret)
  217. @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
  218. def test_managed_file_mode_keep(self):
  219. """
  220. Test using "mode: keep" in a file.managed state
  221. """
  222. _test_managed_file_mode_keep_helper(self, local=False)
  223. @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
  224. def test_managed_file_mode_keep_local_source(self):
  225. """
  226. Test using "mode: keep" in a file.managed state, with a local file path
  227. as the source.
  228. """
  229. _test_managed_file_mode_keep_helper(self, local=True)
  230. def test_managed_file_mode_file_exists_replace(self):
  231. """
  232. file.managed, existing file with replace=True, change permissions
  233. """
  234. initial_mode = 504 # 0770 octal
  235. desired_mode = 384 # 0600 octal
  236. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  237. ret = self.run_state(
  238. "file.managed",
  239. name=name,
  240. mode=oct(initial_mode),
  241. source="salt://grail/scene33",
  242. )
  243. if IS_WINDOWS:
  244. expected = "The 'mode' option is not supported on Windows"
  245. self.assertEqual(ret[list(ret)[0]]["comment"], expected)
  246. self.assertSaltFalseReturn(ret)
  247. return
  248. resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
  249. self.assertEqual(oct(initial_mode), oct(resulting_mode))
  250. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  251. ret = self.run_state(
  252. "file.managed",
  253. name=name,
  254. replace=True,
  255. mode=oct(desired_mode),
  256. source="salt://grail/scene33",
  257. )
  258. resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
  259. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  260. self.assertSaltTrueReturn(ret)
  261. def test_managed_file_mode_file_exists_noreplace(self):
  262. """
  263. file.managed, existing file with replace=False, change permissions
  264. """
  265. initial_mode = 504 # 0770 octal
  266. desired_mode = 384 # 0600 octal
  267. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  268. ret = self.run_state(
  269. "file.managed",
  270. name=name,
  271. replace=True,
  272. mode=oct(initial_mode),
  273. source="salt://grail/scene33",
  274. )
  275. if IS_WINDOWS:
  276. expected = "The 'mode' option is not supported on Windows"
  277. self.assertEqual(ret[list(ret)[0]]["comment"], expected)
  278. self.assertSaltFalseReturn(ret)
  279. return
  280. ret = self.run_state(
  281. "file.managed",
  282. name=name,
  283. replace=False,
  284. mode=oct(desired_mode),
  285. source="salt://grail/scene33",
  286. )
  287. resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
  288. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  289. self.assertSaltTrueReturn(ret)
  290. def test_managed_file_with_grains_data(self):
  291. """
  292. Test to ensure we can render grains data into a managed
  293. file.
  294. """
  295. grain_path = os.path.join(RUNTIME_VARS.TMP, "file-grain-test")
  296. state_file = "file-grainget"
  297. self.run_function("state.sls", [state_file], pillar={"grain_path": grain_path})
  298. self.assertTrue(os.path.exists(grain_path))
  299. with salt.utils.files.fopen(grain_path, "r") as fp_:
  300. file_contents = fp_.readlines()
  301. if IS_WINDOWS:
  302. match = "^minion\r\n"
  303. else:
  304. match = "^minion\n"
  305. self.assertTrue(re.match(match, file_contents[0]))
  306. def test_managed_file_with_pillar_sls(self):
  307. """
  308. Test to ensure pillar data in sls file
  309. is rendered properly and file is created.
  310. """
  311. file_pillar = os.path.join(RUNTIME_VARS.TMP, "filepillar-python")
  312. self.addCleanup(self._delete_file, file_pillar)
  313. state_name = "file-pillarget"
  314. log.warning("File Path: %s", file_pillar)
  315. ret = self.run_function("state.sls", [state_name])
  316. self.assertSaltTrueReturn(ret)
  317. # Check to make sure the file was created
  318. check_file = self.run_function("file.file_exists", [file_pillar])
  319. self.assertTrue(check_file)
  320. def test_managed_file_with_pillardefault_sls(self):
  321. """
  322. Test to ensure when pillar data is not available
  323. in sls file with pillar.get it uses the default
  324. value.
  325. """
  326. file_pillar_def = os.path.join(RUNTIME_VARS.TMP, "filepillar-defaultvalue")
  327. self.addCleanup(self._delete_file, file_pillar_def)
  328. state_name = "file-pillardefaultget"
  329. log.warning("File Path: %s", file_pillar_def)
  330. ret = self.run_function("state.sls", [state_name])
  331. self.assertSaltTrueReturn(ret)
  332. # Check to make sure the file was created
  333. check_file = self.run_function("file.file_exists", [file_pillar_def])
  334. self.assertTrue(check_file)
  335. @skip_if_not_root
  336. def test_managed_dir_mode(self):
  337. """
  338. Tests to ensure that file.managed creates directories with the
  339. permissions requested with the dir_mode argument
  340. """
  341. desired_mode = 511 # 0777 in octal
  342. name = os.path.join(RUNTIME_VARS.TMP, "a", "managed_dir_mode_test_file")
  343. desired_owner = "nobody"
  344. ret = self.run_state(
  345. "file.managed",
  346. name=name,
  347. source="salt://grail/scene33",
  348. mode=600,
  349. makedirs=True,
  350. user=desired_owner,
  351. dir_mode=oct(desired_mode), # 0777
  352. )
  353. if IS_WINDOWS:
  354. expected = "The 'mode' option is not supported on Windows"
  355. self.assertEqual(ret[list(ret)[0]]["comment"], expected)
  356. self.assertSaltFalseReturn(ret)
  357. return
  358. resulting_mode = stat.S_IMODE(
  359. os.stat(os.path.join(RUNTIME_VARS.TMP, "a")).st_mode
  360. )
  361. resulting_owner = pwd.getpwuid(
  362. os.stat(os.path.join(RUNTIME_VARS.TMP, "a")).st_uid
  363. ).pw_name
  364. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  365. self.assertSaltTrueReturn(ret)
  366. self.assertEqual(desired_owner, resulting_owner)
  367. def test_test_managed(self):
  368. """
  369. file.managed test interface
  370. """
  371. name = os.path.join(RUNTIME_VARS.TMP, "grail_not_not_scene33")
  372. ret = self.run_state(
  373. "file.managed", test=True, name=name, source="salt://grail/scene33"
  374. )
  375. self.assertSaltNoneReturn(ret)
  376. self.assertFalse(os.path.isfile(name))
  377. def test_managed_show_changes_false(self):
  378. """
  379. file.managed test interface
  380. """
  381. name = os.path.join(RUNTIME_VARS.TMP, "grail_not_scene33")
  382. with salt.utils.files.fopen(name, "wb") as fp_:
  383. fp_.write(b"test_managed_show_changes_false\n")
  384. ret = self.run_state(
  385. "file.managed", name=name, source="salt://grail/scene33", show_changes=False
  386. )
  387. changes = next(six.itervalues(ret))["changes"]
  388. self.assertEqual("<show_changes=False>", changes["diff"])
  389. def test_managed_show_changes_true(self):
  390. """
  391. file.managed test interface
  392. """
  393. name = os.path.join(RUNTIME_VARS.TMP, "grail_not_scene33")
  394. with salt.utils.files.fopen(name, "wb") as fp_:
  395. fp_.write(b"test_managed_show_changes_false\n")
  396. ret = self.run_state("file.managed", name=name, source="salt://grail/scene33",)
  397. changes = next(six.itervalues(ret))["changes"]
  398. self.assertIn("diff", changes)
  399. @skipIf(IS_WINDOWS, "Don't know how to fix for Windows")
  400. def test_managed_escaped_file_path(self):
  401. """
  402. file.managed test that 'salt://|' protects unusual characters in file path
  403. """
  404. funny_file = salt.utils.files.mkstemp(
  405. prefix="?f!le? n@=3&", suffix=".file type"
  406. )
  407. funny_file_name = os.path.split(funny_file)[1]
  408. funny_url = "salt://|" + funny_file_name
  409. funny_url_path = os.path.join(RUNTIME_VARS.BASE_FILES, funny_file_name)
  410. state_name = "funny_file"
  411. state_file_name = state_name + ".sls"
  412. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_file_name)
  413. state_key = "file_|-{0}_|-{0}_|-managed".format(funny_file)
  414. self.addCleanup(os.remove, state_file)
  415. self.addCleanup(os.remove, funny_file)
  416. self.addCleanup(os.remove, funny_url_path)
  417. with salt.utils.files.fopen(funny_url_path, "w"):
  418. pass
  419. with salt.utils.files.fopen(state_file, "w") as fp_:
  420. fp_.write(
  421. textwrap.dedent(
  422. """\
  423. {0}:
  424. file.managed:
  425. - source: {1}
  426. - makedirs: True
  427. """.format(
  428. funny_file, funny_url
  429. )
  430. )
  431. )
  432. ret = self.run_function("state.sls", [state_name])
  433. self.assertTrue(ret[state_key]["result"])
  434. def test_managed_contents(self):
  435. """
  436. test file.managed with contents that is a boolean, string, integer,
  437. float, list, and dictionary
  438. """
  439. state_name = "file-FileTest-test_managed_contents"
  440. state_filename = state_name + ".sls"
  441. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  442. managed_files = {}
  443. state_keys = {}
  444. for typ in ("bool", "str", "int", "float", "list", "dict"):
  445. managed_files[typ] = salt.utils.files.mkstemp()
  446. state_keys[typ] = "file_|-{0} file_|-{1}_|-managed".format(
  447. typ, managed_files[typ]
  448. )
  449. try:
  450. with salt.utils.files.fopen(state_file, "w") as fd_:
  451. fd_.write(
  452. textwrap.dedent(
  453. """\
  454. bool file:
  455. file.managed:
  456. - name: {bool}
  457. - contents: True
  458. str file:
  459. file.managed:
  460. - name: {str}
  461. - contents: Salt was here.
  462. int file:
  463. file.managed:
  464. - name: {int}
  465. - contents: 340282366920938463463374607431768211456
  466. float file:
  467. file.managed:
  468. - name: {float}
  469. - contents: 1.7518e-45 # gravitational coupling constant
  470. list file:
  471. file.managed:
  472. - name: {list}
  473. - contents: [1, 1, 2, 3, 5, 8, 13]
  474. dict file:
  475. file.managed:
  476. - name: {dict}
  477. - contents:
  478. C: charge
  479. P: parity
  480. T: time
  481. """.format(
  482. **managed_files
  483. )
  484. )
  485. )
  486. ret = self.run_function("state.sls", [state_name])
  487. self.assertSaltTrueReturn(ret)
  488. for typ in state_keys:
  489. self.assertTrue(ret[state_keys[typ]]["result"])
  490. self.assertIn("diff", ret[state_keys[typ]]["changes"])
  491. finally:
  492. if os.path.exists(state_file):
  493. os.remove(state_file)
  494. for typ in managed_files:
  495. if os.path.exists(managed_files[typ]):
  496. os.remove(managed_files[typ])
  497. def test_managed_contents_with_contents_newline(self):
  498. """
  499. test file.managed with contents by using the default content_newline
  500. flag.
  501. """
  502. contents = "test_managed_contents_with_newline_one"
  503. name = os.path.join(RUNTIME_VARS.TMP, "foo")
  504. # Create a file named foo with contents as above but with a \n at EOF
  505. self.run_state(
  506. "file.managed", name=name, contents=contents, contents_newline=True
  507. )
  508. with salt.utils.files.fopen(name, "r") as fp_:
  509. last_line = fp_.read()
  510. self.assertEqual((contents + os.linesep), last_line)
  511. def test_managed_contents_with_contents_newline_false(self):
  512. """
  513. test file.managed with contents by using the non default content_newline
  514. flag.
  515. """
  516. contents = "test_managed_contents_with_newline_one"
  517. name = os.path.join(RUNTIME_VARS.TMP, "bar")
  518. # Create a file named foo with contents as above but with a \n at EOF
  519. self.run_state(
  520. "file.managed", name=name, contents=contents, contents_newline=False
  521. )
  522. with salt.utils.files.fopen(name, "r") as fp_:
  523. last_line = fp_.read()
  524. self.assertEqual(contents, last_line)
  525. def test_managed_multiline_contents_with_contents_newline(self):
  526. """
  527. test file.managed with contents by using the non default content_newline
  528. flag.
  529. """
  530. contents = "this is a cookie{}this is another cookie".format(os.linesep)
  531. name = os.path.join(RUNTIME_VARS.TMP, "bar")
  532. # Create a file named foo with contents as above but with a \n at EOF
  533. self.run_state(
  534. "file.managed", name=name, contents=contents, contents_newline=True
  535. )
  536. with salt.utils.files.fopen(name, "r") as fp_:
  537. last_line = fp_.read()
  538. self.assertEqual((contents + os.linesep), last_line)
  539. def test_managed_multiline_contents_with_contents_newline_false(self):
  540. """
  541. test file.managed with contents by using the non default content_newline
  542. flag.
  543. """
  544. contents = "this is a cookie{}this is another cookie".format(os.linesep)
  545. name = os.path.join(RUNTIME_VARS.TMP, "bar")
  546. # Create a file named foo with contents as above but with a \n at EOF
  547. self.run_state(
  548. "file.managed", name=name, contents=contents, contents_newline=False
  549. )
  550. with salt.utils.files.fopen(name, "r") as fp_:
  551. last_line = fp_.read()
  552. self.assertEqual(contents, last_line)
  553. @skip_if_not_root
  554. @skipIf(IS_WINDOWS, 'Windows does not support "mode" kwarg. Skipping.')
  555. @skipIf(not salt.utils.path.which("visudo"), "sudo is missing")
  556. def test_managed_check_cmd(self):
  557. """
  558. Test file.managed passing a basic check_cmd kwarg. See Issue #38111.
  559. """
  560. r_group = "root"
  561. if salt.utils.platform.is_darwin():
  562. r_group = "wheel"
  563. try:
  564. ret = self.run_state(
  565. "file.managed",
  566. name="/tmp/sudoers",
  567. user="root",
  568. group=r_group,
  569. mode=440,
  570. check_cmd="visudo -c -s -f",
  571. )
  572. self.assertSaltTrueReturn(ret)
  573. self.assertInSaltComment("Empty file", ret)
  574. self.assertEqual(
  575. ret["file_|-/tmp/sudoers_|-/tmp/sudoers_|-managed"]["changes"],
  576. {"new": "file /tmp/sudoers created", "mode": "0440"},
  577. )
  578. finally:
  579. # Clean Up File
  580. if os.path.exists("/tmp/sudoers"):
  581. os.remove("/tmp/sudoers")
  582. def test_managed_local_source_with_source_hash(self):
  583. """
  584. Make sure that we enforce the source_hash even with local files
  585. """
  586. name = os.path.join(RUNTIME_VARS.TMP, "local_source_with_source_hash")
  587. local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
  588. actual_hash = "567fd840bf1548edc35c48eb66cdd78bfdfcccff"
  589. if IS_WINDOWS:
  590. # CRLF vs LF causes a different hash on windows
  591. actual_hash = "f658a0ec121d9c17088795afcc6ff3c43cb9842a"
  592. # Reverse the actual hash
  593. bad_hash = actual_hash[::-1]
  594. def remove_file():
  595. try:
  596. os.remove(name)
  597. except OSError as exc:
  598. if exc.errno != errno.ENOENT:
  599. raise
  600. def do_test(clean=False):
  601. for proto in ("file://", ""):
  602. source = proto + local_path
  603. log.debug("Trying source %s", source)
  604. try:
  605. ret = self.run_state(
  606. "file.managed",
  607. name=name,
  608. source=source,
  609. source_hash="sha1={0}".format(bad_hash),
  610. )
  611. self.assertSaltFalseReturn(ret)
  612. ret = ret[next(iter(ret))]
  613. # Shouldn't be any changes
  614. self.assertFalse(ret["changes"])
  615. # Check that we identified a hash mismatch
  616. self.assertIn("does not match actual checksum", ret["comment"])
  617. ret = self.run_state(
  618. "file.managed",
  619. name=name,
  620. source=source,
  621. source_hash="sha1={0}".format(actual_hash),
  622. )
  623. self.assertSaltTrueReturn(ret)
  624. finally:
  625. if clean:
  626. remove_file()
  627. remove_file()
  628. log.debug("Trying with nonexistant destination file")
  629. do_test()
  630. log.debug("Trying with destination file already present")
  631. with salt.utils.files.fopen(name, "w"):
  632. pass
  633. try:
  634. do_test(clean=False)
  635. finally:
  636. remove_file()
  637. def test_managed_local_source_does_not_exist(self):
  638. """
  639. Make sure that we exit gracefully when a local source doesn't exist
  640. """
  641. name = os.path.join(RUNTIME_VARS.TMP, "local_source_does_not_exist")
  642. local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene99")
  643. for proto in ("file://", ""):
  644. source = proto + local_path
  645. log.debug("Trying source %s", source)
  646. ret = self.run_state("file.managed", name=name, source=source)
  647. self.assertSaltFalseReturn(ret)
  648. ret = ret[next(iter(ret))]
  649. # Shouldn't be any changes
  650. self.assertFalse(ret["changes"])
  651. # Check that we identified a hash mismatch
  652. self.assertIn("does not exist", ret["comment"])
  653. def test_managed_unicode_jinja_with_tojson_filter(self):
  654. """
  655. Using {{ varname }} with a list or dictionary which contains unicode
  656. types on Python 2 will result in Jinja rendering the "u" prefix on each
  657. string. This tests that using the "tojson" jinja filter will dump them
  658. to a format which can be successfully loaded by our YAML loader.
  659. The two lines that should end up being rendered are meant to test two
  660. issues that would trip up PyYAML if the "tojson" filter were not used:
  661. 1. A unicode string type would be loaded as a unicode literal with the
  662. leading "u" as well as the quotes, rather than simply being loaded
  663. as the proper unicode type which matches the content of the string
  664. literal. In other words, u'foo' would be loaded literally as
  665. u"u'foo'". This test includes actual non-ascii unicode in one of the
  666. strings to confirm that this also handles these international
  667. characters properly.
  668. 2. Any unicode string type (such as a URL) which contains a colon would
  669. cause a ScannerError in PyYAML, as it would be assumed to delimit a
  670. mapping node.
  671. Dumping the data structure to JSON using the "tojson" jinja filter
  672. should produce an inline data structure which is valid YAML and will be
  673. loaded properly by our YAML loader.
  674. """
  675. test_file = os.path.join(RUNTIME_VARS.TMP, "test-tojson.txt")
  676. ret = self.run_function(
  677. "state.apply", mods="tojson", pillar={"tojson-file": test_file}
  678. )
  679. ret = ret[next(iter(ret))]
  680. assert ret["result"], ret
  681. with salt.utils.files.fopen(test_file, mode="rb") as fp_:
  682. managed = salt.utils.stringutils.to_unicode(fp_.read())
  683. expected = dedent(
  684. """\
  685. Die Webseite ist https://saltstack.com.
  686. Der Zucker ist süß.
  687. """
  688. )
  689. assert managed == expected, "{0!r} != {1!r}".format(managed, expected)
  690. def test_managed_source_hash_indifferent_case(self):
  691. """
  692. Test passing a source_hash as an uppercase hash.
  693. This is a regression test for Issue #38914 and Issue #48230 (test=true use).
  694. """
  695. name = os.path.join(RUNTIME_VARS.TMP, "source_hash_indifferent_case")
  696. state_name = "file_|-{0}_|" "-{0}_|-managed".format(name)
  697. local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "hello_world.txt")
  698. actual_hash = "c98c24b677eff44860afea6f493bbaec5bb1c4cbb209c6fc2bbb47f66ff2ad31"
  699. if IS_WINDOWS:
  700. # CRLF vs LF causes a differnt hash on windows
  701. actual_hash = (
  702. "92b772380a3f8e27a93e57e6deeca6c01da07f5aadce78bb2fbb20de10a66925"
  703. )
  704. uppercase_hash = actual_hash.upper()
  705. try:
  706. # Lay down tmp file to test against
  707. self.run_state(
  708. "file.managed", name=name, source=local_path, source_hash=actual_hash
  709. )
  710. # Test uppercase source_hash: should return True with no changes
  711. ret = self.run_state(
  712. "file.managed", name=name, source=local_path, source_hash=uppercase_hash
  713. )
  714. assert ret[state_name]["result"] is True
  715. assert ret[state_name]["changes"] == {}
  716. # Test uppercase source_hash using test=true
  717. # Should return True with no changes
  718. ret = self.run_state(
  719. "file.managed",
  720. name=name,
  721. source=local_path,
  722. source_hash=uppercase_hash,
  723. test=True,
  724. )
  725. assert ret[state_name]["result"] is True
  726. assert ret[state_name]["changes"] == {}
  727. finally:
  728. # Clean Up File
  729. if os.path.exists(name):
  730. os.remove(name)
  731. @with_tempfile(create=False)
  732. def test_managed_latin1_diff(self, name):
  733. """
  734. Tests that latin-1 file contents are represented properly in the diff
  735. """
  736. # Lay down the initial file
  737. ret = self.run_state(
  738. "file.managed", name=name, source="salt://issue-48777/old.html"
  739. )
  740. ret = ret[next(iter(ret))]
  741. assert ret["result"] is True, ret
  742. # Replace it with the new file and check the diff
  743. ret = self.run_state(
  744. "file.managed", name=name, source="salt://issue-48777/new.html"
  745. )
  746. ret = ret[next(iter(ret))]
  747. assert ret["result"] is True, ret
  748. diff_lines = ret["changes"]["diff"].split(os.linesep)
  749. assert "+räksmörgås" in diff_lines, diff_lines
  750. @with_tempfile()
  751. def test_managed_keep_source_false_salt(self, name):
  752. """
  753. This test ensures that we properly clean the cached file if keep_source
  754. is set to False, for source files using a salt:// URL
  755. """
  756. source = "salt://grail/scene33"
  757. saltenv = "base"
  758. # Run the state
  759. ret = self.run_state(
  760. "file.managed", name=name, source=source, saltenv=saltenv, keep_source=False
  761. )
  762. ret = ret[next(iter(ret))]
  763. assert ret["result"] is True
  764. # Now make sure that the file is not cached
  765. result = self.run_function("cp.is_cached", [source, saltenv])
  766. assert result == "", "File is still cached at {0}".format(result)
  767. @with_tempfile(create=False)
  768. @with_tempfile(create=False)
  769. def test_file_managed_onchanges(self, file1, file2):
  770. """
  771. Test file.managed state with onchanges
  772. """
  773. pillar = {
  774. "file1": file1,
  775. "file2": file2,
  776. "source": "salt://testfile",
  777. "req": "onchanges",
  778. }
  779. # Lay down the file used in the below SLS to ensure that when it is
  780. # run, there are no changes.
  781. self.run_state("file.managed", name=pillar["file2"], source=pillar["source"])
  782. ret = self.repack_state_returns(
  783. self.run_function(
  784. "state.apply", mods="onchanges_prereq", pillar=pillar, test=True,
  785. )
  786. )
  787. # The file states should both exit with None
  788. assert ret["one"]["result"] is None, ret["one"]["result"]
  789. assert ret["three"]["result"] is True, ret["three"]["result"]
  790. # The first file state should have changes, since a new file was
  791. # created. The other one should not, since we already created that file
  792. # before applying the SLS file.
  793. assert ret["one"]["changes"]
  794. assert not ret["three"]["changes"], ret["three"]["changes"]
  795. # The state watching 'one' should have been run due to changes
  796. assert ret["two"]["comment"] == "Success!", ret["two"]["comment"]
  797. # The state watching 'three' should not have been run
  798. assert (
  799. ret["four"]["comment"]
  800. == "State was not run because none of the onchanges reqs changed"
  801. ), ret["four"]["comment"]
  802. @with_tempfile(create=False)
  803. @with_tempfile(create=False)
  804. def test_file_managed_prereq(self, file1, file2):
  805. """
  806. Test file.managed state with prereq
  807. """
  808. pillar = {
  809. "file1": file1,
  810. "file2": file2,
  811. "source": "salt://testfile",
  812. "req": "prereq",
  813. }
  814. # Lay down the file used in the below SLS to ensure that when it is
  815. # run, there are no changes.
  816. self.run_state("file.managed", name=pillar["file2"], source=pillar["source"])
  817. ret = self.repack_state_returns(
  818. self.run_function(
  819. "state.apply", mods="onchanges_prereq", pillar=pillar, test=True,
  820. )
  821. )
  822. # The file states should both exit with None
  823. assert ret["one"]["result"] is None, ret["one"]["result"]
  824. assert ret["three"]["result"] is True, ret["three"]["result"]
  825. # The first file state should have changes, since a new file was
  826. # created. The other one should not, since we already created that file
  827. # before applying the SLS file.
  828. assert ret["one"]["changes"]
  829. assert not ret["three"]["changes"], ret["three"]["changes"]
  830. # The state watching 'one' should have been run due to changes
  831. assert ret["two"]["comment"] == "Success!", ret["two"]["comment"]
  832. # The state watching 'three' should not have been run
  833. assert ret["four"]["comment"] == "No changes detected", ret["four"]["comment"]
  834. def test_directory(self):
  835. """
  836. file.directory
  837. """
  838. name = os.path.join(RUNTIME_VARS.TMP, "a_new_dir")
  839. ret = self.run_state("file.directory", name=name)
  840. self.assertSaltTrueReturn(ret)
  841. self.assertTrue(os.path.isdir(name))
  842. def test_directory_symlink_dry_run(self):
  843. """
  844. Ensure that symlinks are followed when file.directory is run with
  845. test=True
  846. """
  847. try:
  848. tmp_dir = os.path.join(RUNTIME_VARS.TMP, "pgdata")
  849. sym_dir = os.path.join(RUNTIME_VARS.TMP, "pg_data")
  850. os.mkdir(tmp_dir, 0o700)
  851. self.run_function("file.symlink", [tmp_dir, sym_dir])
  852. if IS_WINDOWS:
  853. ret = self.run_state(
  854. "file.directory",
  855. test=True,
  856. name=sym_dir,
  857. follow_symlinks=True,
  858. win_owner="Administrators",
  859. )
  860. else:
  861. ret = self.run_state(
  862. "file.directory",
  863. test=True,
  864. name=sym_dir,
  865. follow_symlinks=True,
  866. mode=700,
  867. )
  868. self.assertSaltTrueReturn(ret)
  869. finally:
  870. if os.path.isdir(tmp_dir):
  871. self.run_function("file.remove", [tmp_dir])
  872. if os.path.islink(sym_dir):
  873. self.run_function("file.remove", [sym_dir])
  874. @skip_if_not_root
  875. @skipIf(IS_WINDOWS, "Mode not available in Windows")
  876. def test_directory_max_depth(self):
  877. """
  878. file.directory
  879. Test the max_depth option by iteratively increasing the depth and
  880. checking that no changes deeper than max_depth have been attempted
  881. """
  882. def _get_oct_mode(name):
  883. """
  884. Return a string octal representation of the permissions for name
  885. """
  886. return salt.utils.files.normalize_mode(oct(os.stat(name).st_mode & 0o777))
  887. top = os.path.join(RUNTIME_VARS.TMP, "top_dir")
  888. sub = os.path.join(top, "sub_dir")
  889. subsub = os.path.join(sub, "sub_sub_dir")
  890. dirs = [top, sub, subsub]
  891. initial_mode = "0111"
  892. changed_mode = "0555"
  893. initial_modes = {
  894. 0: {sub: "0755", subsub: "0111"},
  895. 1: {sub: "0111", subsub: "0111"},
  896. 2: {sub: "0111", subsub: "0111"},
  897. }
  898. if not os.path.isdir(subsub):
  899. os.makedirs(subsub, int(initial_mode, 8))
  900. try:
  901. for depth in range(0, 3):
  902. ret = self.run_state(
  903. "file.directory",
  904. name=top,
  905. max_depth=depth,
  906. dir_mode=changed_mode,
  907. recurse=["mode"],
  908. )
  909. self.assertSaltTrueReturn(ret)
  910. for changed_dir in dirs[0 : depth + 1]:
  911. self.assertEqual(changed_mode, _get_oct_mode(changed_dir))
  912. for untouched_dir in dirs[depth + 1 :]:
  913. # Beginning in Python 3.7, os.makedirs no longer sets
  914. # the mode of intermediate directories to the mode that
  915. # is passed.
  916. if sys.version_info >= (3, 7):
  917. _mode = initial_modes[depth][untouched_dir]
  918. self.assertEqual(_mode, _get_oct_mode(untouched_dir))
  919. else:
  920. self.assertEqual(initial_mode, _get_oct_mode(untouched_dir))
  921. finally:
  922. shutil.rmtree(top)
  923. def test_test_directory(self):
  924. """
  925. file.directory
  926. """
  927. name = os.path.join(RUNTIME_VARS.TMP, "a_not_dir")
  928. ret = self.run_state("file.directory", test=True, name=name)
  929. self.assertSaltNoneReturn(ret)
  930. self.assertFalse(os.path.isdir(name))
  931. @with_tempdir()
  932. def test_directory_clean(self, base_dir):
  933. """
  934. file.directory with clean=True
  935. """
  936. name = os.path.join(base_dir, "directory_clean_dir")
  937. os.mkdir(name)
  938. strayfile = os.path.join(name, "strayfile")
  939. with salt.utils.files.fopen(strayfile, "w"):
  940. pass
  941. straydir = os.path.join(name, "straydir")
  942. if not os.path.isdir(straydir):
  943. os.makedirs(straydir)
  944. with salt.utils.files.fopen(os.path.join(straydir, "strayfile2"), "w"):
  945. pass
  946. ret = self.run_state("file.directory", name=name, clean=True)
  947. self.assertSaltTrueReturn(ret)
  948. self.assertFalse(os.path.exists(strayfile))
  949. self.assertFalse(os.path.exists(straydir))
  950. self.assertTrue(os.path.isdir(name))
  951. def test_directory_is_idempotent(self):
  952. """
  953. Ensure the file.directory state produces no changes when rerun.
  954. """
  955. name = os.path.join(RUNTIME_VARS.TMP, "a_dir_twice")
  956. if IS_WINDOWS:
  957. username = os.environ.get("USERNAME", "Administrators")
  958. domain = os.environ.get("USERDOMAIN", "")
  959. fullname = "{0}\\{1}".format(domain, username)
  960. ret = self.run_state("file.directory", name=name, win_owner=fullname)
  961. else:
  962. ret = self.run_state("file.directory", name=name)
  963. self.assertSaltTrueReturn(ret)
  964. if IS_WINDOWS:
  965. ret = self.run_state("file.directory", name=name, win_owner=username)
  966. else:
  967. ret = self.run_state("file.directory", name=name)
  968. self.assertSaltTrueReturn(ret)
  969. self.assertSaltStateChangesEqual(ret, {})
  970. @with_tempdir()
  971. def test_directory_clean_exclude(self, base_dir):
  972. """
  973. file.directory with clean=True and exclude_pat set
  974. """
  975. name = os.path.join(base_dir, "directory_clean_dir")
  976. if not os.path.isdir(name):
  977. os.makedirs(name)
  978. strayfile = os.path.join(name, "strayfile")
  979. with salt.utils.files.fopen(strayfile, "w"):
  980. pass
  981. straydir = os.path.join(name, "straydir")
  982. if not os.path.isdir(straydir):
  983. os.makedirs(straydir)
  984. strayfile2 = os.path.join(straydir, "strayfile2")
  985. with salt.utils.files.fopen(strayfile2, "w"):
  986. pass
  987. keepfile = os.path.join(straydir, "keepfile")
  988. with salt.utils.files.fopen(keepfile, "w"):
  989. pass
  990. exclude_pat = "E@^straydir(|/keepfile)$"
  991. if IS_WINDOWS:
  992. exclude_pat = "E@^straydir(|\\\\keepfile)$"
  993. ret = self.run_state(
  994. "file.directory", name=name, clean=True, exclude_pat=exclude_pat
  995. )
  996. self.assertSaltTrueReturn(ret)
  997. self.assertFalse(os.path.exists(strayfile))
  998. self.assertFalse(os.path.exists(strayfile2))
  999. self.assertTrue(os.path.exists(keepfile))
  1000. @skipIf(IS_WINDOWS, "Skip on windows")
  1001. @with_tempdir()
  1002. def test_test_directory_clean_exclude(self, base_dir):
  1003. """
  1004. file.directory with test=True, clean=True and exclude_pat set
  1005. Skipped on windows because clean and exclude_pat not supported by
  1006. salt.sates.file._check_directory_win
  1007. """
  1008. name = os.path.join(base_dir, "directory_clean_dir")
  1009. os.mkdir(name)
  1010. strayfile = os.path.join(name, "strayfile")
  1011. with salt.utils.files.fopen(strayfile, "w"):
  1012. pass
  1013. straydir = os.path.join(name, "straydir")
  1014. if not os.path.isdir(straydir):
  1015. os.makedirs(straydir)
  1016. strayfile2 = os.path.join(straydir, "strayfile2")
  1017. with salt.utils.files.fopen(strayfile2, "w"):
  1018. pass
  1019. keepfile = os.path.join(straydir, "keepfile")
  1020. with salt.utils.files.fopen(keepfile, "w"):
  1021. pass
  1022. exclude_pat = "E@^straydir(|/keepfile)$"
  1023. if IS_WINDOWS:
  1024. exclude_pat = "E@^straydir(|\\\\keepfile)$"
  1025. ret = self.run_state(
  1026. "file.directory", test=True, name=name, clean=True, exclude_pat=exclude_pat
  1027. )
  1028. comment = next(six.itervalues(ret))["comment"]
  1029. self.assertSaltNoneReturn(ret)
  1030. self.assertTrue(os.path.exists(strayfile))
  1031. self.assertTrue(os.path.exists(strayfile2))
  1032. self.assertTrue(os.path.exists(keepfile))
  1033. self.assertIn(strayfile, comment)
  1034. self.assertIn(strayfile2, comment)
  1035. self.assertNotIn(keepfile, comment)
  1036. @with_tempdir()
  1037. def test_directory_clean_require_in(self, name):
  1038. """
  1039. file.directory test with clean=True and require_in file
  1040. """
  1041. state_name = "file-FileTest-test_directory_clean_require_in"
  1042. state_filename = state_name + ".sls"
  1043. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  1044. wrong_file = os.path.join(name, "wrong")
  1045. with salt.utils.files.fopen(wrong_file, "w") as fp:
  1046. fp.write("foo")
  1047. good_file = os.path.join(name, "bar")
  1048. with salt.utils.files.fopen(state_file, "w") as fp:
  1049. self.addCleanup(lambda: os.remove(state_file))
  1050. fp.write(
  1051. textwrap.dedent(
  1052. """\
  1053. some_dir:
  1054. file.directory:
  1055. - name: {name}
  1056. - clean: true
  1057. {good_file}:
  1058. file.managed:
  1059. - require_in:
  1060. - file: some_dir
  1061. """.format(
  1062. name=name, good_file=good_file
  1063. )
  1064. )
  1065. )
  1066. ret = self.run_function("state.sls", [state_name])
  1067. self.assertTrue(os.path.exists(good_file))
  1068. self.assertFalse(os.path.exists(wrong_file))
  1069. @with_tempdir()
  1070. def test_directory_clean_require_in_with_id(self, name):
  1071. """
  1072. file.directory test with clean=True and require_in file with an ID
  1073. different from the file name
  1074. """
  1075. state_name = "file-FileTest-test_directory_clean_require_in_with_id"
  1076. state_filename = state_name + ".sls"
  1077. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  1078. wrong_file = os.path.join(name, "wrong")
  1079. with salt.utils.files.fopen(wrong_file, "w") as fp:
  1080. fp.write("foo")
  1081. good_file = os.path.join(name, "bar")
  1082. with salt.utils.files.fopen(state_file, "w") as fp:
  1083. self.addCleanup(lambda: os.remove(state_file))
  1084. fp.write(
  1085. textwrap.dedent(
  1086. """\
  1087. some_dir:
  1088. file.directory:
  1089. - name: {name}
  1090. - clean: true
  1091. some_file:
  1092. file.managed:
  1093. - name: {good_file}
  1094. - require_in:
  1095. - file: some_dir
  1096. """.format(
  1097. name=name, good_file=good_file
  1098. )
  1099. )
  1100. )
  1101. ret = self.run_function("state.sls", [state_name])
  1102. self.assertTrue(os.path.exists(good_file))
  1103. self.assertFalse(os.path.exists(wrong_file))
  1104. @skipIf(
  1105. salt.utils.platform.is_darwin(),
  1106. "WAR ROOM TEMPORARY SKIP, Test is flaky on macosx",
  1107. )
  1108. @with_tempdir()
  1109. def test_directory_clean_require_with_name(self, name):
  1110. """
  1111. file.directory test with clean=True and require with a file state
  1112. relatively to the state's name, not its ID.
  1113. """
  1114. state_name = "file-FileTest-test_directory_clean_require_in_with_id"
  1115. state_filename = state_name + ".sls"
  1116. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  1117. wrong_file = os.path.join(name, "wrong")
  1118. with salt.utils.files.fopen(wrong_file, "w") as fp:
  1119. fp.write("foo")
  1120. good_file = os.path.join(name, "bar")
  1121. with salt.utils.files.fopen(state_file, "w") as fp:
  1122. self.addCleanup(lambda: os.remove(state_file))
  1123. fp.write(
  1124. textwrap.dedent(
  1125. """\
  1126. some_dir:
  1127. file.directory:
  1128. - name: {name}
  1129. - clean: true
  1130. - require:
  1131. # This requirement refers to the name of the following
  1132. # state, not its ID.
  1133. - file: {good_file}
  1134. some_file:
  1135. file.managed:
  1136. - name: {good_file}
  1137. """.format(
  1138. name=name, good_file=good_file
  1139. )
  1140. )
  1141. )
  1142. ret = self.run_function("state.sls", [state_name])
  1143. self.assertTrue(os.path.exists(good_file))
  1144. self.assertFalse(os.path.exists(wrong_file))
  1145. def test_directory_broken_symlink(self):
  1146. """
  1147. Ensure that file.directory works even if a directory
  1148. contains broken symbolic link
  1149. """
  1150. try:
  1151. tmp_dir = os.path.join(RUNTIME_VARS.TMP, "foo")
  1152. null_file = "{0}/null".format(tmp_dir)
  1153. broken_link = "{0}/broken".format(tmp_dir)
  1154. os.mkdir(tmp_dir, 0o700)
  1155. self.run_function("file.symlink", [null_file, broken_link])
  1156. if IS_WINDOWS:
  1157. ret = self.run_state(
  1158. "file.directory",
  1159. name=tmp_dir,
  1160. recurse=["mode"],
  1161. follow_symlinks=True,
  1162. win_owner="Administrators",
  1163. )
  1164. else:
  1165. ret = self.run_state(
  1166. "file.directory",
  1167. name=tmp_dir,
  1168. recurse=["mode"],
  1169. file_mode=644,
  1170. dir_mode=755,
  1171. )
  1172. self.assertSaltTrueReturn(ret)
  1173. finally:
  1174. if os.path.isdir(tmp_dir):
  1175. self.run_function("file.remove", [tmp_dir])
  1176. @with_tempdir(create=False)
  1177. def test_recurse(self, name):
  1178. """
  1179. file.recurse
  1180. """
  1181. ret = self.run_state("file.recurse", name=name, source="salt://grail")
  1182. self.assertSaltTrueReturn(ret)
  1183. self.assertTrue(os.path.isfile(os.path.join(name, "36", "scene")))
  1184. @with_tempdir(create=False)
  1185. @with_tempdir(create=False)
  1186. def test_recurse_specific_env(self, dir1, dir2):
  1187. """
  1188. file.recurse passing __env__
  1189. """
  1190. ret = self.run_state(
  1191. "file.recurse", name=dir1, source="salt://holy", __env__="prod"
  1192. )
  1193. self.assertSaltTrueReturn(ret)
  1194. self.assertTrue(os.path.isfile(os.path.join(dir1, "32", "scene")))
  1195. ret = self.run_state(
  1196. "file.recurse", name=dir2, source="salt://holy", saltenv="prod"
  1197. )
  1198. self.assertSaltTrueReturn(ret)
  1199. self.assertTrue(os.path.isfile(os.path.join(dir2, "32", "scene")))
  1200. @with_tempdir(create=False)
  1201. @with_tempdir(create=False)
  1202. def test_recurse_specific_env_in_url(self, dir1, dir2):
  1203. """
  1204. file.recurse passing __env__
  1205. """
  1206. ret = self.run_state(
  1207. "file.recurse", name=dir1, source="salt://holy?saltenv=prod"
  1208. )
  1209. self.assertSaltTrueReturn(ret)
  1210. self.assertTrue(os.path.isfile(os.path.join(dir1, "32", "scene")))
  1211. ret = self.run_state(
  1212. "file.recurse", name=dir2, source="salt://holy?saltenv=prod"
  1213. )
  1214. self.assertSaltTrueReturn(ret)
  1215. self.assertTrue(os.path.isfile(os.path.join(dir2, "32", "scene")))
  1216. @with_tempdir(create=False)
  1217. def test_test_recurse(self, name):
  1218. """
  1219. file.recurse test interface
  1220. """
  1221. ret = self.run_state(
  1222. "file.recurse", test=True, name=name, source="salt://grail",
  1223. )
  1224. self.assertSaltNoneReturn(ret)
  1225. self.assertFalse(os.path.isfile(os.path.join(name, "36", "scene")))
  1226. self.assertFalse(os.path.exists(name))
  1227. @with_tempdir(create=False)
  1228. @with_tempdir(create=False)
  1229. def test_test_recurse_specific_env(self, dir1, dir2):
  1230. """
  1231. file.recurse test interface
  1232. """
  1233. ret = self.run_state(
  1234. "file.recurse", test=True, name=dir1, source="salt://holy", __env__="prod"
  1235. )
  1236. self.assertSaltNoneReturn(ret)
  1237. self.assertFalse(os.path.isfile(os.path.join(dir1, "32", "scene")))
  1238. self.assertFalse(os.path.exists(dir1))
  1239. ret = self.run_state(
  1240. "file.recurse", test=True, name=dir2, source="salt://holy", saltenv="prod"
  1241. )
  1242. self.assertSaltNoneReturn(ret)
  1243. self.assertFalse(os.path.isfile(os.path.join(dir2, "32", "scene")))
  1244. self.assertFalse(os.path.exists(dir2))
  1245. @with_tempdir(create=False)
  1246. def test_recurse_template(self, name):
  1247. """
  1248. file.recurse with jinja template enabled
  1249. """
  1250. _ts = "TEMPLATE TEST STRING"
  1251. ret = self.run_state(
  1252. "file.recurse",
  1253. name=name,
  1254. source="salt://grail",
  1255. template="jinja",
  1256. defaults={"spam": _ts},
  1257. )
  1258. self.assertSaltTrueReturn(ret)
  1259. with salt.utils.files.fopen(os.path.join(name, "scene33"), "r") as fp_:
  1260. contents = fp_.read()
  1261. self.assertIn(_ts, contents)
  1262. @with_tempdir()
  1263. def test_recurse_clean(self, name):
  1264. """
  1265. file.recurse with clean=True
  1266. """
  1267. strayfile = os.path.join(name, "strayfile")
  1268. with salt.utils.files.fopen(strayfile, "w"):
  1269. pass
  1270. # Corner cases: replacing file with a directory and vice versa
  1271. with salt.utils.files.fopen(os.path.join(name, "36"), "w"):
  1272. pass
  1273. os.makedirs(os.path.join(name, "scene33"))
  1274. ret = self.run_state(
  1275. "file.recurse", name=name, source="salt://grail", clean=True
  1276. )
  1277. self.assertSaltTrueReturn(ret)
  1278. self.assertFalse(os.path.exists(strayfile))
  1279. self.assertTrue(os.path.isfile(os.path.join(name, "36", "scene")))
  1280. self.assertTrue(os.path.isfile(os.path.join(name, "scene33")))
  1281. @with_tempdir()
  1282. def test_recurse_clean_specific_env(self, name):
  1283. """
  1284. file.recurse with clean=True and __env__=prod
  1285. """
  1286. strayfile = os.path.join(name, "strayfile")
  1287. with salt.utils.files.fopen(strayfile, "w"):
  1288. pass
  1289. # Corner cases: replacing file with a directory and vice versa
  1290. with salt.utils.files.fopen(os.path.join(name, "32"), "w"):
  1291. pass
  1292. os.makedirs(os.path.join(name, "scene34"))
  1293. ret = self.run_state(
  1294. "file.recurse", name=name, source="salt://holy", clean=True, __env__="prod"
  1295. )
  1296. self.assertSaltTrueReturn(ret)
  1297. self.assertFalse(os.path.exists(strayfile))
  1298. self.assertTrue(os.path.isfile(os.path.join(name, "32", "scene")))
  1299. self.assertTrue(os.path.isfile(os.path.join(name, "scene34")))
  1300. @skipIf(IS_WINDOWS, "Skip on windows")
  1301. @with_tempdir()
  1302. def test_recurse_issue_34945(self, base_dir):
  1303. """
  1304. This tests the case where the source dir for the file.recurse state
  1305. does not contain any files (only subdirectories), and the dir_mode is
  1306. being managed. For a long time, this corner case resulted in the top
  1307. level of the destination directory being created with the wrong initial
  1308. permissions, a problem that would be corrected later on in the
  1309. file.recurse state via running state.directory. However, the
  1310. file.directory state only gets called when there are files to be
  1311. managed in that directory, and when the source directory contains only
  1312. subdirectories, the incorrectly-set initial perms would not be
  1313. repaired.
  1314. This was fixed in https://github.com/saltstack/salt/pull/35309
  1315. Skipped on windows because dir_mode is not supported.
  1316. """
  1317. dir_mode = "2775"
  1318. issue_dir = "issue-34945"
  1319. name = os.path.join(base_dir, issue_dir)
  1320. ret = self.run_state(
  1321. "file.recurse", name=name, source="salt://" + issue_dir, dir_mode=dir_mode
  1322. )
  1323. self.assertSaltTrueReturn(ret)
  1324. actual_dir_mode = oct(stat.S_IMODE(os.stat(name).st_mode))[-4:]
  1325. self.assertEqual(dir_mode, actual_dir_mode)
  1326. @with_tempdir(create=False)
  1327. def test_recurse_issue_40578(self, name):
  1328. """
  1329. This ensures that the state doesn't raise an exception when it
  1330. encounters a file with a unicode filename in the process of invoking
  1331. file.source_list.
  1332. """
  1333. ret = self.run_state("file.recurse", name=name, source="salt://соль")
  1334. self.assertSaltTrueReturn(ret)
  1335. if six.PY2 and IS_WINDOWS:
  1336. # Providing unicode to os.listdir so that we avoid having listdir
  1337. # try to decode the filenames using the systemencoding on windows
  1338. # python 2.
  1339. files = os.listdir(name.decode("utf-8"))
  1340. else:
  1341. files = salt.utils.data.decode(os.listdir(name), normalize=True)
  1342. self.assertEqual(
  1343. sorted(files), sorted(["foo.txt", "спам.txt", "яйца.txt"]),
  1344. )
  1345. @with_tempfile()
  1346. def test_replace(self, name):
  1347. """
  1348. file.replace
  1349. """
  1350. with salt.utils.files.fopen(name, "w+") as fp_:
  1351. fp_.write("change_me")
  1352. ret = self.run_state(
  1353. "file.replace", name=name, pattern="change", repl="salt", backup=False
  1354. )
  1355. with salt.utils.files.fopen(name, "r") as fp_:
  1356. self.assertIn("salt", fp_.read())
  1357. self.assertSaltTrueReturn(ret)
  1358. @with_tempdir()
  1359. def test_replace_issue_18612(self, base_dir):
  1360. """
  1361. Test the (mis-)behaviour of file.replace as described in #18612:
  1362. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1363. an infinitely growing file as 'file.replace' didn't check beforehand
  1364. whether the changes had already been done to the file
  1365. # Case description:
  1366. The tested file contains one commented line
  1367. The commented line should be uncommented in the end, nothing else should change
  1368. """
  1369. test_name = "test_replace_issue_18612"
  1370. path_test = os.path.join(base_dir, test_name)
  1371. with salt.utils.files.fopen(path_test, "w+") as fp_test_:
  1372. fp_test_.write("# en_US.UTF-8")
  1373. ret = []
  1374. for x in range(0, 3):
  1375. ret.append(
  1376. self.run_state(
  1377. "file.replace",
  1378. name=path_test,
  1379. pattern="^# en_US.UTF-8$",
  1380. repl="en_US.UTF-8",
  1381. append_if_not_found=True,
  1382. )
  1383. )
  1384. # ensure, the number of lines didn't change, even after invoking 'file.replace' 3 times
  1385. with salt.utils.files.fopen(path_test, "r") as fp_test_:
  1386. self.assertTrue((sum(1 for _ in fp_test_) == 1))
  1387. # ensure, the replacement succeeded
  1388. with salt.utils.files.fopen(path_test, "r") as fp_test_:
  1389. self.assertTrue(fp_test_.read().startswith("en_US.UTF-8"))
  1390. # ensure, all runs of 'file.replace' reported success
  1391. for item in ret:
  1392. self.assertSaltTrueReturn(item)
  1393. @with_tempdir()
  1394. def test_replace_issue_18612_prepend(self, base_dir):
  1395. """
  1396. Test the (mis-)behaviour of file.replace as described in #18612:
  1397. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1398. an infinitely growing file as 'file.replace' didn't check beforehand
  1399. whether the changes had already been done to the file
  1400. # Case description:
  1401. The tested multifile contains multiple lines not matching the pattern or replacement in any way
  1402. The replacement pattern should be prepended to the file
  1403. """
  1404. test_name = "test_replace_issue_18612_prepend"
  1405. path_in = os.path.join(
  1406. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1407. )
  1408. path_out = os.path.join(
  1409. RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
  1410. )
  1411. path_test = os.path.join(base_dir, test_name)
  1412. # create test file based on initial template
  1413. shutil.copyfile(path_in, path_test)
  1414. ret = []
  1415. for x in range(0, 3):
  1416. ret.append(
  1417. self.run_state(
  1418. "file.replace",
  1419. name=path_test,
  1420. pattern="^# en_US.UTF-8$",
  1421. repl="en_US.UTF-8",
  1422. prepend_if_not_found=True,
  1423. )
  1424. )
  1425. # ensure, the resulting file contains the expected lines
  1426. self.assertTrue(filecmp.cmp(path_test, path_out))
  1427. # ensure the initial file was properly backed up
  1428. self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
  1429. # ensure, all runs of 'file.replace' reported success
  1430. for item in ret:
  1431. self.assertSaltTrueReturn(item)
  1432. @with_tempdir()
  1433. def test_replace_issue_18612_append(self, base_dir):
  1434. """
  1435. Test the (mis-)behaviour of file.replace as described in #18612:
  1436. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1437. an infinitely growing file as 'file.replace' didn't check beforehand
  1438. whether the changes had already been done to the file
  1439. # Case description:
  1440. The tested multifile contains multiple lines not matching the pattern or replacement in any way
  1441. The replacement pattern should be appended to the file
  1442. """
  1443. test_name = "test_replace_issue_18612_append"
  1444. path_in = os.path.join(
  1445. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1446. )
  1447. path_out = os.path.join(
  1448. RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
  1449. )
  1450. path_test = os.path.join(base_dir, test_name)
  1451. # create test file based on initial template
  1452. shutil.copyfile(path_in, path_test)
  1453. ret = []
  1454. for x in range(0, 3):
  1455. ret.append(
  1456. self.run_state(
  1457. "file.replace",
  1458. name=path_test,
  1459. pattern="^# en_US.UTF-8$",
  1460. repl="en_US.UTF-8",
  1461. append_if_not_found=True,
  1462. )
  1463. )
  1464. # ensure, the resulting file contains the expected lines
  1465. self.assertTrue(filecmp.cmp(path_test, path_out))
  1466. # ensure the initial file was properly backed up
  1467. self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
  1468. # ensure, all runs of 'file.replace' reported success
  1469. for item in ret:
  1470. self.assertSaltTrueReturn(item)
  1471. @with_tempdir()
  1472. def test_replace_issue_18612_append_not_found_content(self, base_dir):
  1473. """
  1474. Test the (mis-)behaviour of file.replace as described in #18612:
  1475. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1476. an infinitely growing file as 'file.replace' didn't check beforehand
  1477. whether the changes had already been done to the file
  1478. # Case description:
  1479. The tested multifile contains multiple lines not matching the pattern or replacement in any way
  1480. The 'not_found_content' value should be appended to the file
  1481. """
  1482. test_name = "test_replace_issue_18612_append_not_found_content"
  1483. path_in = os.path.join(
  1484. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1485. )
  1486. path_out = os.path.join(
  1487. RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
  1488. )
  1489. path_test = os.path.join(base_dir, test_name)
  1490. # create test file based on initial template
  1491. shutil.copyfile(path_in, path_test)
  1492. ret = []
  1493. for x in range(0, 3):
  1494. ret.append(
  1495. self.run_state(
  1496. "file.replace",
  1497. name=path_test,
  1498. pattern="^# en_US.UTF-8$",
  1499. repl="en_US.UTF-8",
  1500. append_if_not_found=True,
  1501. not_found_content="THIS LINE WASN'T FOUND! SO WE'RE APPENDING IT HERE!",
  1502. )
  1503. )
  1504. # ensure, the resulting file contains the expected lines
  1505. self.assertTrue(filecmp.cmp(path_test, path_out))
  1506. # ensure the initial file was properly backed up
  1507. self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
  1508. # ensure, all runs of 'file.replace' reported success
  1509. for item in ret:
  1510. self.assertSaltTrueReturn(item)
  1511. @with_tempdir()
  1512. def test_replace_issue_18612_change_mid_line_with_comment(self, base_dir):
  1513. """
  1514. Test the (mis-)behaviour of file.replace as described in #18612:
  1515. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1516. an infinitely growing file as 'file.replace' didn't check beforehand
  1517. whether the changes had already been done to the file
  1518. # Case description:
  1519. The tested file contains 5 key=value pairs
  1520. The commented key=value pair #foo=bar should be changed to foo=salt
  1521. The comment char (#) in front of foo=bar should be removed
  1522. """
  1523. test_name = "test_replace_issue_18612_change_mid_line_with_comment"
  1524. path_in = os.path.join(
  1525. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1526. )
  1527. path_out = os.path.join(
  1528. RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
  1529. )
  1530. path_test = os.path.join(base_dir, test_name)
  1531. # create test file based on initial template
  1532. shutil.copyfile(path_in, path_test)
  1533. ret = []
  1534. for x in range(0, 3):
  1535. ret.append(
  1536. self.run_state(
  1537. "file.replace",
  1538. name=path_test,
  1539. pattern="^#foo=bar($|(?=\r\n))",
  1540. repl="foo=salt",
  1541. append_if_not_found=True,
  1542. )
  1543. )
  1544. # ensure, the resulting file contains the expected lines
  1545. self.assertTrue(filecmp.cmp(path_test, path_out))
  1546. # ensure the initial file was properly backed up
  1547. self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
  1548. # ensure, all 'file.replace' runs reported success
  1549. for item in ret:
  1550. self.assertSaltTrueReturn(item)
  1551. @with_tempdir()
  1552. def test_replace_issue_18841_no_changes(self, base_dir):
  1553. """
  1554. Test the (mis-)behaviour of file.replace as described in #18841:
  1555. Using file.replace in a way which shouldn't modify the file at all
  1556. results in changed mtime of the original file and a backup file being created.
  1557. # Case description
  1558. The tested file contains multiple lines
  1559. The tested file contains a line already matching the replacement (no change needed)
  1560. The tested file's content shouldn't change at all
  1561. The tested file's mtime shouldn't change at all
  1562. No backup file should be created
  1563. """
  1564. test_name = "test_replace_issue_18841_no_changes"
  1565. path_in = os.path.join(
  1566. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1567. )
  1568. path_test = os.path.join(base_dir, test_name)
  1569. # create test file based on initial template
  1570. shutil.copyfile(path_in, path_test)
  1571. # get (m|a)time of file
  1572. fstats_orig = os.stat(path_test)
  1573. # define how far we predate the file
  1574. age = 5 * 24 * 60 * 60
  1575. # set (m|a)time of file 5 days into the past
  1576. os.utime(path_test, (fstats_orig.st_mtime - age, fstats_orig.st_atime - age))
  1577. ret = self.run_state(
  1578. "file.replace",
  1579. name=path_test,
  1580. pattern="^hello world$",
  1581. repl="goodbye world",
  1582. show_changes=True,
  1583. flags=["IGNORECASE"],
  1584. backup=False,
  1585. )
  1586. # get (m|a)time of file
  1587. fstats_post = os.stat(path_test)
  1588. # ensure, the file content didn't change
  1589. self.assertTrue(filecmp.cmp(path_in, path_test))
  1590. # ensure no backup file was created
  1591. self.assertFalse(os.path.exists(path_test + ".bak"))
  1592. # ensure the file's mtime didn't change
  1593. self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime - age)
  1594. # ensure, all 'file.replace' runs reported success
  1595. self.assertSaltTrueReturn(ret)
  1596. def test_serialize(self):
  1597. """
  1598. Test to ensure that file.serialize returns a data structure that's
  1599. both serialized and formatted properly
  1600. """
  1601. path_test = os.path.join(RUNTIME_VARS.TMP, "test_serialize")
  1602. ret = self.run_state(
  1603. "file.serialize",
  1604. name=path_test,
  1605. dataset={
  1606. "name": "naive",
  1607. "description": "A basic test",
  1608. "a_list": ["first_element", "second_element"],
  1609. "finally": "the last item",
  1610. },
  1611. formatter="json",
  1612. )
  1613. with salt.utils.files.fopen(path_test, "rb") as fp_:
  1614. serialized_file = salt.utils.stringutils.to_unicode(fp_.read())
  1615. # The JSON serializer uses LF even on OSes where os.sep is CRLF.
  1616. expected_file = "\n".join(
  1617. [
  1618. "{",
  1619. ' "a_list": [',
  1620. ' "first_element",',
  1621. ' "second_element"',
  1622. " ],",
  1623. ' "description": "A basic test",',
  1624. ' "finally": "the last item",',
  1625. ' "name": "naive"',
  1626. "}",
  1627. "",
  1628. ]
  1629. )
  1630. self.assertEqual(serialized_file, expected_file)
  1631. @with_tempfile(create=False)
  1632. def test_serializer_deserializer_opts(self, name):
  1633. """
  1634. Test the serializer_opts and deserializer_opts options
  1635. """
  1636. data1 = {"foo": {"bar": "%(x)s"}}
  1637. data2 = {"foo": {"abc": 123}}
  1638. merged = {"foo": {"y": "not_used", "x": "baz", "abc": 123, "bar": "baz"}}
  1639. ret = self.run_state(
  1640. "file.serialize",
  1641. name=name,
  1642. dataset=data1,
  1643. formatter="configparser",
  1644. deserializer_opts=[{"defaults": {"y": "not_used"}}],
  1645. )
  1646. ret = ret[next(iter(ret))]
  1647. assert ret["result"], ret
  1648. # We should have warned about deserializer_opts being used when
  1649. # merge_if_exists was not set to True.
  1650. assert "warnings" in ret
  1651. # Run with merge_if_exists, as well as serializer and deserializer opts
  1652. # deserializer opts will be used for string interpolation of the %(x)s
  1653. # that was written to the file with data1 (i.e. bar should become baz)
  1654. ret = self.run_state(
  1655. "file.serialize",
  1656. name=name,
  1657. dataset=data2,
  1658. formatter="configparser",
  1659. merge_if_exists=True,
  1660. serializer_opts=[{"defaults": {"y": "not_used"}}],
  1661. deserializer_opts=[{"defaults": {"x": "baz"}}],
  1662. )
  1663. ret = ret[next(iter(ret))]
  1664. assert ret["result"], ret
  1665. with salt.utils.files.fopen(name) as fp_:
  1666. serialized_data = salt.serializers.configparser.deserialize(fp_)
  1667. # If this test fails, this debug logging will help tell us how the
  1668. # serialized data differs from what was serialized.
  1669. log.debug("serialized_data = %r", serialized_data)
  1670. log.debug("merged = %r", merged)
  1671. # serializing with a default of 'y' will add y = not_used into foo
  1672. assert serialized_data["foo"]["y"] == merged["foo"]["y"]
  1673. # deserializing with default of x = baz will perform interpolation on %(x)s
  1674. # and bar will then = baz
  1675. assert serialized_data["foo"]["bar"] == merged["foo"]["bar"]
  1676. @with_tempdir()
  1677. def test_replace_issue_18841_omit_backup(self, base_dir):
  1678. """
  1679. Test the (mis-)behaviour of file.replace as described in #18841:
  1680. Using file.replace in a way which shouldn't modify the file at all
  1681. results in changed mtime of the original file and a backup file being created.
  1682. # Case description
  1683. The tested file contains multiple lines
  1684. The tested file contains a line already matching the replacement (no change needed)
  1685. The tested file's content shouldn't change at all
  1686. The tested file's mtime shouldn't change at all
  1687. No backup file should be created, although backup=False isn't explicitly defined
  1688. """
  1689. test_name = "test_replace_issue_18841_omit_backup"
  1690. path_in = os.path.join(
  1691. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1692. )
  1693. path_test = os.path.join(base_dir, test_name)
  1694. # create test file based on initial template
  1695. shutil.copyfile(path_in, path_test)
  1696. # get (m|a)time of file
  1697. fstats_orig = os.stat(path_test)
  1698. # define how far we predate the file
  1699. age = 5 * 24 * 60 * 60
  1700. # set (m|a)time of file 5 days into the past
  1701. os.utime(path_test, (fstats_orig.st_mtime - age, fstats_orig.st_atime - age))
  1702. ret = self.run_state(
  1703. "file.replace",
  1704. name=path_test,
  1705. pattern="^hello world$",
  1706. repl="goodbye world",
  1707. show_changes=True,
  1708. flags=["IGNORECASE"],
  1709. )
  1710. # get (m|a)time of file
  1711. fstats_post = os.stat(path_test)
  1712. # ensure, the file content didn't change
  1713. self.assertTrue(filecmp.cmp(path_in, path_test))
  1714. # ensure no backup file was created
  1715. self.assertFalse(os.path.exists(path_test + ".bak"))
  1716. # ensure the file's mtime didn't change
  1717. self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime - age)
  1718. # ensure, all 'file.replace' runs reported success
  1719. self.assertSaltTrueReturn(ret)
  1720. @with_tempfile()
  1721. def test_comment(self, name):
  1722. """
  1723. file.comment
  1724. """
  1725. # write a line to file
  1726. with salt.utils.files.fopen(name, "w+") as fp_:
  1727. fp_.write("comment_me")
  1728. # Look for changes with test=True: return should be "None" at the first run
  1729. ret = self.run_state("file.comment", test=True, name=name, regex="^comment")
  1730. self.assertSaltNoneReturn(ret)
  1731. # comment once
  1732. ret = self.run_state("file.comment", name=name, regex="^comment")
  1733. # result is positive
  1734. self.assertSaltTrueReturn(ret)
  1735. # line is commented
  1736. with salt.utils.files.fopen(name, "r") as fp_:
  1737. self.assertTrue(fp_.read().startswith("#comment"))
  1738. # comment twice
  1739. ret = self.run_state("file.comment", name=name, regex="^comment")
  1740. # result is still positive
  1741. self.assertSaltTrueReturn(ret)
  1742. # line is still commented
  1743. with salt.utils.files.fopen(name, "r") as fp_:
  1744. self.assertTrue(fp_.read().startswith("#comment"))
  1745. # Test previously commented file returns "True" now and not "None" with test=True
  1746. ret = self.run_state("file.comment", test=True, name=name, regex="^comment")
  1747. self.assertSaltTrueReturn(ret)
  1748. @with_tempfile()
  1749. def test_test_comment(self, name):
  1750. """
  1751. file.comment test interface
  1752. """
  1753. with salt.utils.files.fopen(name, "w+") as fp_:
  1754. fp_.write("comment_me")
  1755. ret = self.run_state("file.comment", test=True, name=name, regex=".*comment.*",)
  1756. with salt.utils.files.fopen(name, "r") as fp_:
  1757. self.assertNotIn("#comment", fp_.read())
  1758. self.assertSaltNoneReturn(ret)
  1759. @with_tempfile()
  1760. def test_uncomment(self, name):
  1761. """
  1762. file.uncomment
  1763. """
  1764. with salt.utils.files.fopen(name, "w+") as fp_:
  1765. fp_.write("#comment_me")
  1766. ret = self.run_state("file.uncomment", name=name, regex="^comment")
  1767. with salt.utils.files.fopen(name, "r") as fp_:
  1768. self.assertNotIn("#comment", fp_.read())
  1769. self.assertSaltTrueReturn(ret)
  1770. @with_tempfile()
  1771. def test_test_uncomment(self, name):
  1772. """
  1773. file.comment test interface
  1774. """
  1775. with salt.utils.files.fopen(name, "w+") as fp_:
  1776. fp_.write("#comment_me")
  1777. ret = self.run_state("file.uncomment", test=True, name=name, regex="^comment.*")
  1778. with salt.utils.files.fopen(name, "r") as fp_:
  1779. self.assertIn("#comment", fp_.read())
  1780. self.assertSaltNoneReturn(ret)
  1781. @with_tempfile()
  1782. def test_append(self, name):
  1783. """
  1784. file.append
  1785. """
  1786. with salt.utils.files.fopen(name, "w+") as fp_:
  1787. fp_.write("#salty!")
  1788. ret = self.run_state("file.append", name=name, text="cheese")
  1789. with salt.utils.files.fopen(name, "r") as fp_:
  1790. self.assertIn("cheese", fp_.read())
  1791. self.assertSaltTrueReturn(ret)
  1792. @with_tempfile()
  1793. def test_test_append(self, name):
  1794. """
  1795. file.append test interface
  1796. """
  1797. with salt.utils.files.fopen(name, "w+") as fp_:
  1798. fp_.write("#salty!")
  1799. ret = self.run_state("file.append", test=True, name=name, text="cheese")
  1800. with salt.utils.files.fopen(name, "r") as fp_:
  1801. self.assertNotIn("cheese", fp_.read())
  1802. self.assertSaltNoneReturn(ret)
  1803. @with_tempdir()
  1804. def test_append_issue_1864_makedirs(self, base_dir):
  1805. """
  1806. file.append but create directories if needed as an option, and create
  1807. the file if it doesn't exist
  1808. """
  1809. fname = "append_issue_1864_makedirs"
  1810. name = os.path.join(base_dir, fname)
  1811. # Non existing file get's touched
  1812. ret = self.run_state("file.append", name=name, text="cheese", makedirs=True)
  1813. self.assertSaltTrueReturn(ret)
  1814. # Nested directory and file get's touched
  1815. name = os.path.join(base_dir, "issue_1864", fname)
  1816. ret = self.run_state("file.append", name=name, text="cheese", makedirs=True)
  1817. self.assertSaltTrueReturn(ret)
  1818. # Parent directory exists but file does not and makedirs is False
  1819. name = os.path.join(base_dir, "issue_1864", fname + "2")
  1820. ret = self.run_state("file.append", name=name, text="cheese")
  1821. self.assertSaltTrueReturn(ret)
  1822. self.assertTrue(os.path.isfile(name))
  1823. @with_tempdir()
  1824. def test_prepend_issue_27401_makedirs(self, base_dir):
  1825. """
  1826. file.prepend but create directories if needed as an option, and create
  1827. the file if it doesn't exist
  1828. """
  1829. fname = "prepend_issue_27401"
  1830. name = os.path.join(base_dir, fname)
  1831. # Non existing file get's touched
  1832. ret = self.run_state("file.prepend", name=name, text="cheese", makedirs=True)
  1833. self.assertSaltTrueReturn(ret)
  1834. # Nested directory and file get's touched
  1835. name = os.path.join(base_dir, "issue_27401", fname)
  1836. ret = self.run_state("file.prepend", name=name, text="cheese", makedirs=True)
  1837. self.assertSaltTrueReturn(ret)
  1838. # Parent directory exists but file does not and makedirs is False
  1839. name = os.path.join(base_dir, "issue_27401", fname + "2")
  1840. ret = self.run_state("file.prepend", name=name, text="cheese")
  1841. self.assertSaltTrueReturn(ret)
  1842. self.assertTrue(os.path.isfile(name))
  1843. @with_tempfile()
  1844. def test_touch(self, name):
  1845. """
  1846. file.touch
  1847. """
  1848. ret = self.run_state("file.touch", name=name)
  1849. self.assertTrue(os.path.isfile(name))
  1850. self.assertSaltTrueReturn(ret)
  1851. @with_tempfile(create=False)
  1852. def test_test_touch(self, name):
  1853. """
  1854. file.touch test interface
  1855. """
  1856. ret = self.run_state("file.touch", test=True, name=name)
  1857. self.assertFalse(os.path.isfile(name))
  1858. self.assertSaltNoneReturn(ret)
  1859. @with_tempdir()
  1860. def test_touch_directory(self, base_dir):
  1861. """
  1862. file.touch a directory
  1863. """
  1864. name = os.path.join(base_dir, "touch_test_dir")
  1865. os.mkdir(name)
  1866. ret = self.run_state("file.touch", name=name)
  1867. self.assertSaltTrueReturn(ret)
  1868. self.assertTrue(os.path.isdir(name))
  1869. @with_tempdir()
  1870. def test_issue_2227_file_append(self, base_dir):
  1871. """
  1872. Text to append includes a percent symbol
  1873. """
  1874. # let's make use of existing state to create a file with contents to
  1875. # test against
  1876. tmp_file_append = os.path.join(base_dir, "test.append")
  1877. self.run_state("file.touch", name=tmp_file_append)
  1878. self.run_state(
  1879. "file.append", name=tmp_file_append, source="salt://testappend/firstif"
  1880. )
  1881. self.run_state(
  1882. "file.append", name=tmp_file_append, source="salt://testappend/secondif"
  1883. )
  1884. # Now our real test
  1885. try:
  1886. ret = self.run_state(
  1887. "file.append", name=tmp_file_append, text="HISTTIMEFORMAT='%F %T '"
  1888. )
  1889. self.assertSaltTrueReturn(ret)
  1890. with salt.utils.files.fopen(tmp_file_append, "r") as fp_:
  1891. contents_pre = fp_.read()
  1892. # It should not append text again
  1893. ret = self.run_state(
  1894. "file.append", name=tmp_file_append, text="HISTTIMEFORMAT='%F %T '"
  1895. )
  1896. self.assertSaltTrueReturn(ret)
  1897. with salt.utils.files.fopen(tmp_file_append, "r") as fp_:
  1898. contents_post = fp_.read()
  1899. self.assertEqual(contents_pre, contents_post)
  1900. except AssertionError:
  1901. if os.path.exists(tmp_file_append):
  1902. shutil.copy(tmp_file_append, tmp_file_append + ".bak")
  1903. raise
  1904. @with_tempdir()
  1905. def test_issue_2401_file_comment(self, base_dir):
  1906. # Get a path to the temporary file
  1907. tmp_file = os.path.join(base_dir, "issue-2041-comment.txt")
  1908. # Write some data to it
  1909. with salt.utils.files.fopen(tmp_file, "w") as fp_:
  1910. fp_.write("hello\nworld\n")
  1911. # create the sls template
  1912. template_lines = [
  1913. "{0}:".format(tmp_file),
  1914. " file.comment:",
  1915. " - regex: ^world",
  1916. ]
  1917. template = "\n".join(template_lines)
  1918. try:
  1919. ret = self.run_function("state.template_str", [template], timeout=120)
  1920. self.assertSaltTrueReturn(ret)
  1921. self.assertNotInSaltComment("Pattern already commented", ret)
  1922. self.assertInSaltComment("Commented lines successfully", ret)
  1923. # This next time, it is already commented.
  1924. ret = self.run_function("state.template_str", [template], timeout=120)
  1925. self.assertSaltTrueReturn(ret)
  1926. self.assertInSaltComment("Pattern already commented", ret)
  1927. except AssertionError:
  1928. shutil.copy(tmp_file, tmp_file + ".bak")
  1929. raise
  1930. @with_tempdir()
  1931. def test_issue_2379_file_append(self, base_dir):
  1932. # Get a path to the temporary file
  1933. tmp_file = os.path.join(base_dir, "issue-2379-file-append.txt")
  1934. # Write some data to it
  1935. with salt.utils.files.fopen(tmp_file, "w") as fp_:
  1936. fp_.write(
  1937. "hello\nworld\n" # Some junk
  1938. "#PermitRootLogin yes\n" # Commented text
  1939. "# PermitRootLogin yes\n" # Commented text with space
  1940. )
  1941. # create the sls template
  1942. template_lines = [
  1943. "{0}:".format(tmp_file),
  1944. " file.append:",
  1945. " - text: PermitRootLogin yes",
  1946. ]
  1947. template = "\n".join(template_lines)
  1948. try:
  1949. ret = self.run_function("state.template_str", [template])
  1950. self.assertSaltTrueReturn(ret)
  1951. self.assertInSaltComment("Appended 1 lines", ret)
  1952. except AssertionError:
  1953. shutil.copy(tmp_file, tmp_file + ".bak")
  1954. raise
  1955. @skipIf(IS_WINDOWS, "Mode not available in Windows")
  1956. @with_tempdir(create=False)
  1957. @with_tempdir(create=False)
  1958. def test_issue_2726_mode_kwarg(self, dir1, dir2):
  1959. # Let's test for the wrong usage approach
  1960. bad_mode_kwarg_testfile = os.path.join(dir1, "bad_mode_kwarg", "testfile")
  1961. bad_template = [
  1962. "{0}:".format(bad_mode_kwarg_testfile),
  1963. " file.recurse:",
  1964. " - source: salt://testfile",
  1965. " - mode: 644",
  1966. ]
  1967. ret = self.run_function("state.template_str", [os.linesep.join(bad_template)])
  1968. self.assertSaltFalseReturn(ret)
  1969. self.assertInSaltComment(
  1970. "'mode' is not allowed in 'file.recurse'. Please use "
  1971. "'file_mode' and 'dir_mode'.",
  1972. ret,
  1973. )
  1974. self.assertNotInSaltComment(
  1975. "TypeError: managed() got multiple values for keyword " "argument 'mode'",
  1976. ret,
  1977. )
  1978. # Now, the correct usage approach
  1979. good_mode_kwargs_testfile = os.path.join(dir2, "good_mode_kwargs", "testappend")
  1980. good_template = [
  1981. "{0}:".format(good_mode_kwargs_testfile),
  1982. " file.recurse:",
  1983. " - source: salt://testappend",
  1984. " - dir_mode: 744",
  1985. " - file_mode: 644",
  1986. ]
  1987. ret = self.run_function("state.template_str", [os.linesep.join(good_template)])
  1988. self.assertSaltTrueReturn(ret)
  1989. @with_tempdir()
  1990. def test_issue_8343_accumulated_require_in(self, base_dir):
  1991. template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-8343.sls")
  1992. testcase_filedest = os.path.join(base_dir, "issue-8343.txt")
  1993. if os.path.exists(template_path):
  1994. os.remove(template_path)
  1995. if os.path.exists(testcase_filedest):
  1996. os.remove(testcase_filedest)
  1997. sls_template = [
  1998. "{0}:",
  1999. " file.managed:",
  2000. " - contents: |",
  2001. " #",
  2002. "",
  2003. "prepend-foo-accumulator-from-pillar:",
  2004. " file.accumulated:",
  2005. " - require_in:",
  2006. " - file: prepend-foo-management",
  2007. " - filename: {0}",
  2008. " - text: |",
  2009. " foo",
  2010. "",
  2011. "append-foo-accumulator-from-pillar:",
  2012. " file.accumulated:",
  2013. " - require_in:",
  2014. " - file: append-foo-management",
  2015. " - filename: {0}",
  2016. " - text: |",
  2017. " bar",
  2018. "",
  2019. "prepend-foo-management:",
  2020. " file.blockreplace:",
  2021. " - name: {0}",
  2022. ' - marker_start: "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT"',
  2023. ' - marker_end: "#-- end salt managed zonestart --"',
  2024. " - content: ''",
  2025. " - prepend_if_not_found: True",
  2026. " - backup: '.bak'",
  2027. " - show_changes: True",
  2028. "",
  2029. "append-foo-management:",
  2030. " file.blockreplace:",
  2031. " - name: {0}",
  2032. ' - marker_start: "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT"',
  2033. ' - marker_end: "#-- end salt managed zoneend --"',
  2034. " - content: ''",
  2035. " - append_if_not_found: True",
  2036. " - backup: '.bak2'",
  2037. " - show_changes: True",
  2038. "",
  2039. ]
  2040. with salt.utils.files.fopen(template_path, "w") as fp_:
  2041. fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
  2042. ret = self.run_function("state.sls", mods="issue-8343")
  2043. for name, step in six.iteritems(ret):
  2044. self.assertSaltTrueReturn({name: step})
  2045. with salt.utils.files.fopen(testcase_filedest) as fp_:
  2046. contents = fp_.read().split(os.linesep)
  2047. expected = [
  2048. "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT",
  2049. "foo",
  2050. "#-- end salt managed zonestart --",
  2051. "#",
  2052. "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT",
  2053. "bar",
  2054. "#-- end salt managed zoneend --",
  2055. "",
  2056. ]
  2057. self.assertEqual(
  2058. [salt.utils.stringutils.to_str(line) for line in expected], contents
  2059. )
  2060. @with_tempdir()
  2061. @skipIf(
  2062. salt.utils.platform.is_darwin() and six.PY2, "This test hangs on OS X on Py2"
  2063. )
  2064. def test_issue_11003_immutable_lazy_proxy_sum(self, base_dir):
  2065. # causes the Import-Module ServerManager error on Windows
  2066. template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-11003.sls")
  2067. testcase_filedest = os.path.join(base_dir, "issue-11003.txt")
  2068. sls_template = [
  2069. "a{0}:",
  2070. " file.absent:",
  2071. " - name: {0}",
  2072. "",
  2073. "{0}:",
  2074. " file.managed:",
  2075. " - contents: |",
  2076. " #",
  2077. "",
  2078. "test-acc1:",
  2079. " file.accumulated:",
  2080. " - require_in:",
  2081. " - file: final",
  2082. " - filename: {0}",
  2083. " - text: |",
  2084. " bar",
  2085. "",
  2086. "test-acc2:",
  2087. " file.accumulated:",
  2088. " - watch_in:",
  2089. " - file: final",
  2090. " - filename: {0}",
  2091. " - text: |",
  2092. " baz",
  2093. "",
  2094. "final:",
  2095. " file.blockreplace:",
  2096. " - name: {0}",
  2097. ' - marker_start: "#-- start managed zone PLEASE, DO NOT EDIT"',
  2098. ' - marker_end: "#-- end managed zone"',
  2099. " - content: ''",
  2100. " - append_if_not_found: True",
  2101. " - show_changes: True",
  2102. ]
  2103. with salt.utils.files.fopen(template_path, "w") as fp_:
  2104. fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
  2105. ret = self.run_function("state.sls", mods="issue-11003", timeout=600)
  2106. for name, step in six.iteritems(ret):
  2107. self.assertSaltTrueReturn({name: step})
  2108. with salt.utils.files.fopen(testcase_filedest) as fp_:
  2109. contents = fp_.read().split(os.linesep)
  2110. begin = contents.index("#-- start managed zone PLEASE, DO NOT EDIT") + 1
  2111. end = contents.index("#-- end managed zone")
  2112. block_contents = contents[begin:end]
  2113. for item in ("", "bar", "baz"):
  2114. block_contents.remove(item)
  2115. self.assertEqual(block_contents, [])
  2116. @with_tempdir()
  2117. def test_issue_8947_utf8_sls(self, base_dir):
  2118. """
  2119. Test some file operation with utf-8 characters on the sls
  2120. This is more generic than just a file test. Feel free to move
  2121. """
  2122. self.maxDiff = None
  2123. korean_1 = "한국어 시험"
  2124. korean_2 = "첫 번째 행"
  2125. korean_3 = "마지막 행"
  2126. test_file = os.path.join(base_dir, "{0}.txt".format(korean_1))
  2127. test_file_encoded = test_file
  2128. template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-8947.sls")
  2129. # create the sls template
  2130. template = textwrap.dedent(
  2131. """\
  2132. some-utf8-file-create:
  2133. file.managed:
  2134. - name: {test_file}
  2135. - contents: {korean_1}
  2136. - makedirs: True
  2137. - replace: True
  2138. - show_diff: True
  2139. some-utf8-file-create2:
  2140. file.managed:
  2141. - name: {test_file}
  2142. - contents: |
  2143. {korean_2}
  2144. {korean_1}
  2145. {korean_3}
  2146. - replace: True
  2147. - show_diff: True
  2148. """.format(
  2149. **locals()
  2150. )
  2151. )
  2152. if not salt.utils.platform.is_windows():
  2153. template += textwrap.dedent(
  2154. """\
  2155. some-utf8-file-content-test:
  2156. cmd.run:
  2157. - name: 'cat "{test_file}"'
  2158. - require:
  2159. - file: some-utf8-file-create2
  2160. """.format(
  2161. **locals()
  2162. )
  2163. )
  2164. # Save template file
  2165. with salt.utils.files.fopen(template_path, "wb") as fp_:
  2166. fp_.write(salt.utils.stringutils.to_bytes(template))
  2167. try:
  2168. result = self.run_function("state.sls", mods="issue-8947")
  2169. if not isinstance(result, dict):
  2170. raise AssertionError(
  2171. (
  2172. "Something went really wrong while testing this sls:" " {0}"
  2173. ).format(repr(result))
  2174. )
  2175. # difflib produces different output on python 2.6 than on >=2.7
  2176. if sys.version_info < (2, 7):
  2177. diff = "--- \n+++ \n@@ -1,1 +1,3 @@\n"
  2178. else:
  2179. diff = "--- \n+++ \n@@ -1 +1,3 @@\n"
  2180. diff += ("+첫 번째 행{0}" " 한국어 시험{0}" "+마지막 행{0}").format(os.linesep)
  2181. ret = {x.split("_|-")[1]: y for x, y in six.iteritems(result)}
  2182. # Confirm initial creation of file
  2183. self.assertEqual(
  2184. ret["some-utf8-file-create"]["comment"],
  2185. "File {0} updated".format(test_file_encoded),
  2186. )
  2187. self.assertEqual(
  2188. ret["some-utf8-file-create"]["changes"], {"diff": "New file"}
  2189. )
  2190. # Confirm file was modified and that the diff was as expected
  2191. self.assertEqual(
  2192. ret["some-utf8-file-create2"]["comment"],
  2193. "File {0} updated".format(test_file_encoded),
  2194. )
  2195. self.assertEqual(ret["some-utf8-file-create2"]["changes"], {"diff": diff})
  2196. if salt.utils.platform.is_windows():
  2197. import subprocess
  2198. import win32api
  2199. p = subprocess.Popen(
  2200. salt.utils.stringutils.to_str(
  2201. "type {}".format(win32api.GetShortPathName(test_file))
  2202. ),
  2203. shell=True,
  2204. stdout=subprocess.PIPE,
  2205. stderr=subprocess.PIPE,
  2206. )
  2207. p.poll()
  2208. out = p.stdout.read()
  2209. self.assertEqual(
  2210. out.decode("utf-8"),
  2211. os.linesep.join((korean_2, korean_1, korean_3)) + os.linesep,
  2212. )
  2213. else:
  2214. self.assertEqual(
  2215. ret["some-utf8-file-content-test"]["comment"],
  2216. 'Command "cat "{0}"" run'.format(test_file_encoded),
  2217. )
  2218. self.assertEqual(
  2219. ret["some-utf8-file-content-test"]["changes"]["stdout"],
  2220. "\n".join((korean_2, korean_1, korean_3)),
  2221. )
  2222. finally:
  2223. try:
  2224. os.remove(template_path)
  2225. except OSError:
  2226. pass
  2227. @skip_if_not_root
  2228. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2229. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2230. @with_system_user_and_group(
  2231. TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
  2232. )
  2233. @with_tempdir()
  2234. def test_issue_12209_follow_symlinks(self, tempdir, user, group):
  2235. """
  2236. Ensure that symlinks are properly chowned when recursing (following
  2237. symlinks)
  2238. """
  2239. # Make the directories for this test
  2240. onedir = os.path.join(tempdir, "one")
  2241. twodir = os.path.join(tempdir, "two")
  2242. os.mkdir(onedir)
  2243. os.symlink(onedir, twodir)
  2244. # Run the state
  2245. ret = self.run_state(
  2246. "file.directory",
  2247. name=tempdir,
  2248. follow_symlinks=True,
  2249. user=user,
  2250. group=group,
  2251. recurse=["user", "group"],
  2252. )
  2253. self.assertSaltTrueReturn(ret)
  2254. # Double-check, in case state mis-reported a True result. Since we are
  2255. # following symlinks, we expect twodir to still be owned by root, but
  2256. # onedir should be owned by the 'issue12209' user.
  2257. onestats = os.stat(onedir)
  2258. twostats = os.lstat(twodir)
  2259. self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
  2260. self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, "root")
  2261. self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
  2262. if salt.utils.path.which("id"):
  2263. root_group = self.run_function("user.primary_group", ["root"])
  2264. self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, root_group)
  2265. @skip_if_not_root
  2266. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2267. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2268. @with_system_user_and_group(
  2269. TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
  2270. )
  2271. @with_tempdir()
  2272. def test_issue_12209_no_follow_symlinks(self, tempdir, user, group):
  2273. """
  2274. Ensure that symlinks are properly chowned when recursing (not following
  2275. symlinks)
  2276. """
  2277. # Make the directories for this test
  2278. onedir = os.path.join(tempdir, "one")
  2279. twodir = os.path.join(tempdir, "two")
  2280. os.mkdir(onedir)
  2281. os.symlink(onedir, twodir)
  2282. # Run the state
  2283. ret = self.run_state(
  2284. "file.directory",
  2285. name=tempdir,
  2286. follow_symlinks=False,
  2287. user=user,
  2288. group=group,
  2289. recurse=["user", "group"],
  2290. )
  2291. self.assertSaltTrueReturn(ret)
  2292. # Double-check, in case state mis-reported a True result. Since we
  2293. # are not following symlinks, we expect twodir to now be owned by
  2294. # the 'issue12209' user, just link onedir.
  2295. onestats = os.stat(onedir)
  2296. twostats = os.lstat(twodir)
  2297. self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
  2298. self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, user)
  2299. self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
  2300. self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, group)
  2301. @with_tempfile(create=False)
  2302. @with_tempfile()
  2303. def test_template_local_file(self, source, dest):
  2304. """
  2305. Test a file.managed state with a local file as the source. Test both
  2306. with the file:// protocol designation prepended, and without it.
  2307. """
  2308. with salt.utils.files.fopen(source, "w") as fp_:
  2309. fp_.write("{{ foo }}\n")
  2310. for prefix in ("file://", ""):
  2311. ret = self.run_state(
  2312. "file.managed",
  2313. name=dest,
  2314. source=prefix + source,
  2315. template="jinja",
  2316. context={"foo": "Hello world!"},
  2317. )
  2318. self.assertSaltTrueReturn(ret)
  2319. @with_tempfile()
  2320. def test_template_local_file_noclobber(self, source):
  2321. """
  2322. Test the case where a source file is in the minion's local filesystem,
  2323. and the source path is the same as the destination path.
  2324. """
  2325. with salt.utils.files.fopen(source, "w") as fp_:
  2326. fp_.write("{{ foo }}\n")
  2327. ret = self.run_state(
  2328. "file.managed",
  2329. name=source,
  2330. source=source,
  2331. template="jinja",
  2332. context={"foo": "Hello world!"},
  2333. )
  2334. self.assertSaltFalseReturn(ret)
  2335. self.assertIn(
  2336. ("Source file cannot be the same as destination"),
  2337. ret[next(iter(ret))]["comment"],
  2338. )
  2339. @with_tempfile(create=False)
  2340. @with_tempfile(create=False)
  2341. def test_issue_25250_force_copy_deletes(self, source, dest):
  2342. """
  2343. ensure force option in copy state does not delete target file
  2344. """
  2345. shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "hosts"), source)
  2346. shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "file/base/cheese"), dest)
  2347. self.run_state("file.copy", name=dest, source=source, force=True)
  2348. self.assertTrue(os.path.exists(dest))
  2349. self.assertTrue(filecmp.cmp(source, dest))
  2350. os.remove(source)
  2351. os.remove(dest)
  2352. @destructiveTest
  2353. @skip_if_not_root
  2354. @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
  2355. @with_tempfile()
  2356. def test_file_copy_make_dirs(self, source):
  2357. """
  2358. ensure make_dirs creates correct user perms
  2359. """
  2360. shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "hosts"), source)
  2361. dest = os.path.join(RUNTIME_VARS.TMP, "dir1", "dir2", "copied_file.txt")
  2362. user = "salt"
  2363. mode = "0644"
  2364. ret = self.run_function("user.add", [user])
  2365. self.assertTrue(ret, "Failed to add user. Are you running as sudo?")
  2366. ret = self.run_state(
  2367. "file.copy", name=dest, source=source, user=user, makedirs=True, mode=mode
  2368. )
  2369. self.assertSaltTrueReturn(ret)
  2370. file_checks = [
  2371. dest,
  2372. os.path.join(RUNTIME_VARS.TMP, "dir1"),
  2373. os.path.join(RUNTIME_VARS.TMP, "dir1", "dir2"),
  2374. ]
  2375. for check in file_checks:
  2376. user_check = self.run_function("file.get_user", [check])
  2377. mode_check = self.run_function("file.get_mode", [check])
  2378. self.assertEqual(user_check, user)
  2379. self.assertEqual(salt.utils.files.normalize_mode(mode_check), mode)
  2380. def test_contents_pillar_with_pillar_list(self):
  2381. """
  2382. This tests for any regressions for this issue:
  2383. https://github.com/saltstack/salt/issues/30934
  2384. """
  2385. state_file = "file_contents_pillar"
  2386. ret = self.run_function("state.sls", mods=state_file)
  2387. self.assertSaltTrueReturn(ret)
  2388. @skip_if_not_root
  2389. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2390. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2391. @with_system_user_and_group(
  2392. TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
  2393. )
  2394. def test_owner_after_setuid(self, user, group):
  2395. """
  2396. Test to check file user/group after setting setuid or setgid.
  2397. Because Python os.chown() does reset the setuid/setgid to 0.
  2398. https://github.com/saltstack/salt/pull/45257
  2399. """
  2400. # Desired configuration.
  2401. desired = {
  2402. "file": os.path.join(RUNTIME_VARS.TMP, "file_with_setuid"),
  2403. "user": user,
  2404. "group": group,
  2405. "mode": "4750",
  2406. }
  2407. # Run the state.
  2408. ret = self.run_state(
  2409. "file.managed",
  2410. name=desired["file"],
  2411. user=desired["user"],
  2412. group=desired["group"],
  2413. mode=desired["mode"],
  2414. )
  2415. # Check result.
  2416. file_stat = os.stat(desired["file"])
  2417. result = {
  2418. "user": pwd.getpwuid(file_stat.st_uid).pw_name,
  2419. "group": grp.getgrgid(file_stat.st_gid).gr_name,
  2420. "mode": oct(stat.S_IMODE(file_stat.st_mode)),
  2421. }
  2422. self.assertSaltTrueReturn(ret)
  2423. self.assertEqual(desired["user"], result["user"])
  2424. self.assertEqual(desired["group"], result["group"])
  2425. self.assertEqual(desired["mode"], result["mode"].lstrip("0Oo"))
  2426. def test_binary_contents(self):
  2427. """
  2428. This tests to ensure that binary contents do not cause a traceback.
  2429. """
  2430. name = os.path.join(RUNTIME_VARS.TMP, "1px.gif")
  2431. try:
  2432. ret = self.run_state("file.managed", name=name, contents=BINARY_FILE)
  2433. self.assertSaltTrueReturn(ret)
  2434. finally:
  2435. try:
  2436. os.remove(name)
  2437. except OSError:
  2438. pass
  2439. def test_binary_contents_twice(self):
  2440. """
  2441. This test ensures that after a binary file is created, salt can confirm
  2442. that the file is in the correct state.
  2443. """
  2444. # Create a binary file
  2445. name = os.path.join(RUNTIME_VARS.TMP, "1px.gif")
  2446. # First run state ensures file is created
  2447. ret = self.run_state("file.managed", name=name, contents=BINARY_FILE)
  2448. self.assertSaltTrueReturn(ret)
  2449. # Second run of state ensures file is in correct state
  2450. ret = self.run_state("file.managed", name=name, contents=BINARY_FILE)
  2451. self.assertSaltTrueReturn(ret)
  2452. try:
  2453. os.remove(name)
  2454. except OSError:
  2455. pass
  2456. @skip_if_not_root
  2457. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2458. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2459. @with_system_user_and_group(
  2460. TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
  2461. )
  2462. @with_tempdir()
  2463. def test_issue_48336_file_managed_mode_setuid(self, tempdir, user, group):
  2464. """
  2465. Ensure that mode is correct with changing of ownership and group
  2466. symlinks)
  2467. """
  2468. tempfile = os.path.join(tempdir, "temp_file_issue_48336")
  2469. # Run the state
  2470. ret = self.run_state(
  2471. "file.managed", name=tempfile, user=user, group=group, mode="4750",
  2472. )
  2473. self.assertSaltTrueReturn(ret)
  2474. # Check that the owner and group are correct, and
  2475. # the mode is what we expect
  2476. temp_file_stats = os.stat(tempfile)
  2477. # Normalize the mode
  2478. temp_file_mode = str(oct(stat.S_IMODE(temp_file_stats.st_mode)))
  2479. temp_file_mode = salt.utils.files.normalize_mode(temp_file_mode)
  2480. self.assertEqual(temp_file_mode, "4750")
  2481. self.assertEqual(pwd.getpwuid(temp_file_stats.st_uid).pw_name, user)
  2482. self.assertEqual(grp.getgrgid(temp_file_stats.st_gid).gr_name, group)
  2483. @with_tempdir()
  2484. def test_issue_48557(self, tempdir):
  2485. tempfile = os.path.join(tempdir, "temp_file_issue_48557")
  2486. with salt.utils.files.fopen(tempfile, "wb") as fp:
  2487. fp.write(os.linesep.join(["test1", "test2", "test3", ""]).encode("utf-8"))
  2488. ret = self.run_state(
  2489. "file.line", name=tempfile, after="test2", mode="insert", content="test4"
  2490. )
  2491. self.assertSaltTrueReturn(ret)
  2492. with salt.utils.files.fopen(tempfile, "rb") as fp:
  2493. content = fp.read()
  2494. self.assertEqual(
  2495. content,
  2496. os.linesep.join(["test1", "test2", "test4", "test3", ""]).encode("utf-8"),
  2497. )
  2498. @with_tempfile()
  2499. def test_issue_50221(self, name):
  2500. expected = "abc{0}{0}{0}".format(os.linesep)
  2501. ret = self.run_function("pillar.get", ["issue-50221"])
  2502. assert ret == expected
  2503. ret = self.run_function("state.apply", ["issue-50221"], pillar={"name": name},)
  2504. self.assertSaltTrueReturn(ret)
  2505. with salt.utils.files.fopen(name, "r") as fp:
  2506. contents = fp.read()
  2507. assert contents == expected
  2508. def test_managed_file_issue_51208(self):
  2509. """
  2510. Test to ensure we can handle a file with escaped double-quotes
  2511. """
  2512. name = os.path.join(RUNTIME_VARS.TMP, "issue_51208.txt")
  2513. ret = self.run_state(
  2514. "file.managed", name=name, source="salt://issue-51208/vimrc.stub"
  2515. )
  2516. src = os.path.join(RUNTIME_VARS.BASE_FILES, "issue-51208", "vimrc.stub")
  2517. with salt.utils.files.fopen(src, "r") as fp_:
  2518. master_data = fp_.read()
  2519. with salt.utils.files.fopen(name, "r") as fp_:
  2520. minion_data = fp_.read()
  2521. self.assertEqual(master_data, minion_data)
  2522. self.assertSaltTrueReturn(ret)
  2523. @with_tempfile()
  2524. def test_keyvalue(self, name):
  2525. """
  2526. file.keyvalue
  2527. """
  2528. content = dedent(
  2529. """\
  2530. # This is the sshd server system-wide configuration file. See
  2531. # sshd_config(5) for more information.
  2532. # The strategy used for options in the default sshd_config shipped with
  2533. # OpenSSH is to specify options with their default value where
  2534. # possible, but leave them commented. Uncommented options override the
  2535. # default value.
  2536. #Port 22
  2537. #AddressFamily any
  2538. #ListenAddress 0.0.0.0
  2539. #ListenAddress ::
  2540. #HostKey /etc/ssh/ssh_host_rsa_key
  2541. #HostKey /etc/ssh/ssh_host_ecdsa_key
  2542. #HostKey /etc/ssh/ssh_host_ed25519_key
  2543. # Ciphers and keying
  2544. #RekeyLimit default none
  2545. # Logging
  2546. #SyslogFacility AUTH
  2547. #LogLevel INFO
  2548. # Authentication:
  2549. #LoginGraceTime 2m
  2550. #PermitRootLogin prohibit-password
  2551. #StrictModes yes
  2552. #MaxAuthTries 6
  2553. #MaxSessions 10
  2554. """
  2555. )
  2556. with salt.utils.files.fopen(name, "w+") as fp_:
  2557. fp_.write(content)
  2558. ret = self.run_state(
  2559. "file.keyvalue",
  2560. name=name,
  2561. key="permitrootlogin",
  2562. value="no",
  2563. separator=" ",
  2564. uncomment=" #",
  2565. key_ignore_case=True,
  2566. )
  2567. with salt.utils.files.fopen(name, "r") as fp_:
  2568. file_contents = fp_.read()
  2569. self.assertNotIn("#PermitRootLogin", file_contents)
  2570. self.assertNotIn("prohibit-password", file_contents)
  2571. self.assertIn("PermitRootLogin no", file_contents)
  2572. self.assertSaltTrueReturn(ret)
  2573. @pytest.mark.windows_whitelisted
  2574. class BlockreplaceTest(ModuleCase, SaltReturnAssertsMixin):
  2575. marker_start = "# start"
  2576. marker_end = "# end"
  2577. content = dedent(
  2578. """\
  2579. Line 1 of block
  2580. Line 2 of block
  2581. """
  2582. )
  2583. without_block = dedent(
  2584. """\
  2585. Hello world!
  2586. # comment here
  2587. """
  2588. )
  2589. with_non_matching_block = dedent(
  2590. """\
  2591. Hello world!
  2592. # start
  2593. No match here
  2594. # end
  2595. # comment here
  2596. """
  2597. )
  2598. with_non_matching_block_and_marker_end_not_after_newline = dedent(
  2599. """\
  2600. Hello world!
  2601. # start
  2602. No match here# end
  2603. # comment here
  2604. """
  2605. )
  2606. with_matching_block = dedent(
  2607. """\
  2608. Hello world!
  2609. # start
  2610. Line 1 of block
  2611. Line 2 of block
  2612. # end
  2613. # comment here
  2614. """
  2615. )
  2616. with_matching_block_and_extra_newline = dedent(
  2617. """\
  2618. Hello world!
  2619. # start
  2620. Line 1 of block
  2621. Line 2 of block
  2622. # end
  2623. # comment here
  2624. """
  2625. )
  2626. with_matching_block_and_marker_end_not_after_newline = dedent(
  2627. """\
  2628. Hello world!
  2629. # start
  2630. Line 1 of block
  2631. Line 2 of block# end
  2632. # comment here
  2633. """
  2634. )
  2635. content_explicit_posix_newlines = "Line 1 of block\n" "Line 2 of block\n"
  2636. content_explicit_windows_newlines = "Line 1 of block\r\n" "Line 2 of block\r\n"
  2637. without_block_explicit_posix_newlines = "Hello world!\n\n" "# comment here\n"
  2638. without_block_explicit_windows_newlines = (
  2639. "Hello world!\r\n\r\n" "# comment here\r\n"
  2640. )
  2641. with_block_prepended_explicit_posix_newlines = (
  2642. "# start\n"
  2643. "Line 1 of block\n"
  2644. "Line 2 of block\n"
  2645. "# end\n"
  2646. "Hello world!\n\n"
  2647. "# comment here\n"
  2648. )
  2649. with_block_prepended_explicit_windows_newlines = (
  2650. "# start\r\n"
  2651. "Line 1 of block\r\n"
  2652. "Line 2 of block\r\n"
  2653. "# end\r\n"
  2654. "Hello world!\r\n\r\n"
  2655. "# comment here\r\n"
  2656. )
  2657. with_block_appended_explicit_posix_newlines = (
  2658. "Hello world!\n\n"
  2659. "# comment here\n"
  2660. "# start\n"
  2661. "Line 1 of block\n"
  2662. "Line 2 of block\n"
  2663. "# end\n"
  2664. )
  2665. with_block_appended_explicit_windows_newlines = (
  2666. "Hello world!\r\n\r\n"
  2667. "# comment here\r\n"
  2668. "# start\r\n"
  2669. "Line 1 of block\r\n"
  2670. "Line 2 of block\r\n"
  2671. "# end\r\n"
  2672. )
  2673. @staticmethod
  2674. def _write(dest, content):
  2675. with salt.utils.files.fopen(dest, "wb") as fp_:
  2676. fp_.write(salt.utils.stringutils.to_bytes(content))
  2677. @staticmethod
  2678. def _read(src):
  2679. with salt.utils.files.fopen(src, "rb") as fp_:
  2680. return salt.utils.stringutils.to_unicode(fp_.read())
  2681. @with_tempfile()
  2682. def test_prepend(self, name):
  2683. """
  2684. Test blockreplace when prepend_if_not_found=True and block doesn't
  2685. exist in file.
  2686. """
  2687. expected = (
  2688. self.marker_start
  2689. + os.linesep
  2690. + self.content
  2691. + self.marker_end
  2692. + os.linesep
  2693. + self.without_block
  2694. )
  2695. # Pass 1: content ends in newline
  2696. self._write(name, self.without_block)
  2697. ret = self.run_state(
  2698. "file.blockreplace",
  2699. name=name,
  2700. content=self.content,
  2701. marker_start=self.marker_start,
  2702. marker_end=self.marker_end,
  2703. prepend_if_not_found=True,
  2704. )
  2705. self.assertSaltTrueReturn(ret)
  2706. self.assertTrue(ret[next(iter(ret))]["changes"])
  2707. self.assertEqual(self._read(name), expected)
  2708. # Pass 1a: Re-run state, no changes should be made
  2709. ret = self.run_state(
  2710. "file.blockreplace",
  2711. name=name,
  2712. content=self.content,
  2713. marker_start=self.marker_start,
  2714. marker_end=self.marker_end,
  2715. prepend_if_not_found=True,
  2716. )
  2717. self.assertSaltTrueReturn(ret)
  2718. self.assertFalse(ret[next(iter(ret))]["changes"])
  2719. self.assertEqual(self._read(name), expected)
  2720. # Pass 2: content does not end in newline
  2721. self._write(name, self.without_block)
  2722. ret = self.run_state(
  2723. "file.blockreplace",
  2724. name=name,
  2725. content=self.content.rstrip("\r\n"),
  2726. marker_start=self.marker_start,
  2727. marker_end=self.marker_end,
  2728. prepend_if_not_found=True,
  2729. )
  2730. self.assertSaltTrueReturn(ret)
  2731. self.assertTrue(ret[next(iter(ret))]["changes"])
  2732. self.assertEqual(self._read(name), expected)
  2733. # Pass 2a: Re-run state, no changes should be made
  2734. ret = self.run_state(
  2735. "file.blockreplace",
  2736. name=name,
  2737. content=self.content.rstrip("\r\n"),
  2738. marker_start=self.marker_start,
  2739. marker_end=self.marker_end,
  2740. prepend_if_not_found=True,
  2741. )
  2742. self.assertSaltTrueReturn(ret)
  2743. self.assertFalse(ret[next(iter(ret))]["changes"])
  2744. self.assertEqual(self._read(name), expected)
  2745. @with_tempfile()
  2746. def test_prepend_append_newline(self, name):
  2747. """
  2748. Test blockreplace when prepend_if_not_found=True and block doesn't
  2749. exist in file. Test with append_newline explicitly set to True.
  2750. """
  2751. # Pass 1: content ends in newline
  2752. expected = (
  2753. self.marker_start
  2754. + os.linesep
  2755. + self.content
  2756. + os.linesep
  2757. + self.marker_end
  2758. + os.linesep
  2759. + self.without_block
  2760. )
  2761. self._write(name, self.without_block)
  2762. ret = self.run_state(
  2763. "file.blockreplace",
  2764. name=name,
  2765. content=self.content,
  2766. marker_start=self.marker_start,
  2767. marker_end=self.marker_end,
  2768. prepend_if_not_found=True,
  2769. append_newline=True,
  2770. )
  2771. self.assertSaltTrueReturn(ret)
  2772. self.assertTrue(ret[next(iter(ret))]["changes"])
  2773. self.assertEqual(self._read(name), expected)
  2774. # Pass 1a: Re-run state, no changes should be made
  2775. ret = self.run_state(
  2776. "file.blockreplace",
  2777. name=name,
  2778. content=self.content,
  2779. marker_start=self.marker_start,
  2780. marker_end=self.marker_end,
  2781. prepend_if_not_found=True,
  2782. append_newline=True,
  2783. )
  2784. self.assertSaltTrueReturn(ret)
  2785. self.assertFalse(ret[next(iter(ret))]["changes"])
  2786. self.assertEqual(self._read(name), expected)
  2787. # Pass 2: content does not end in newline
  2788. expected = (
  2789. self.marker_start
  2790. + os.linesep
  2791. + self.content
  2792. + self.marker_end
  2793. + os.linesep
  2794. + self.without_block
  2795. )
  2796. self._write(name, self.without_block)
  2797. ret = self.run_state(
  2798. "file.blockreplace",
  2799. name=name,
  2800. content=self.content.rstrip("\r\n"),
  2801. marker_start=self.marker_start,
  2802. marker_end=self.marker_end,
  2803. prepend_if_not_found=True,
  2804. append_newline=True,
  2805. )
  2806. self.assertSaltTrueReturn(ret)
  2807. self.assertTrue(ret[next(iter(ret))]["changes"])
  2808. self.assertEqual(self._read(name), expected)
  2809. # Pass 2a: Re-run state, no changes should be made
  2810. ret = self.run_state(
  2811. "file.blockreplace",
  2812. name=name,
  2813. content=self.content.rstrip("\r\n"),
  2814. marker_start=self.marker_start,
  2815. marker_end=self.marker_end,
  2816. prepend_if_not_found=True,
  2817. append_newline=True,
  2818. )
  2819. self.assertSaltTrueReturn(ret)
  2820. self.assertFalse(ret[next(iter(ret))]["changes"])
  2821. self.assertEqual(self._read(name), expected)
  2822. @with_tempfile()
  2823. def test_prepend_no_append_newline(self, name):
  2824. """
  2825. Test blockreplace when prepend_if_not_found=True and block doesn't
  2826. exist in file. Test with append_newline explicitly set to False.
  2827. """
  2828. # Pass 1: content ends in newline
  2829. expected = (
  2830. self.marker_start
  2831. + os.linesep
  2832. + self.content
  2833. + self.marker_end
  2834. + os.linesep
  2835. + self.without_block
  2836. )
  2837. self._write(name, self.without_block)
  2838. ret = self.run_state(
  2839. "file.blockreplace",
  2840. name=name,
  2841. content=self.content,
  2842. marker_start=self.marker_start,
  2843. marker_end=self.marker_end,
  2844. prepend_if_not_found=True,
  2845. append_newline=False,
  2846. )
  2847. self.assertSaltTrueReturn(ret)
  2848. self.assertTrue(ret[next(iter(ret))]["changes"])
  2849. self.assertEqual(self._read(name), expected)
  2850. # Pass 1a: Re-run state, no changes should be made
  2851. ret = self.run_state(
  2852. "file.blockreplace",
  2853. name=name,
  2854. content=self.content,
  2855. marker_start=self.marker_start,
  2856. marker_end=self.marker_end,
  2857. prepend_if_not_found=True,
  2858. append_newline=False,
  2859. )
  2860. self.assertSaltTrueReturn(ret)
  2861. self.assertFalse(ret[next(iter(ret))]["changes"])
  2862. self.assertEqual(self._read(name), expected)
  2863. # Pass 2: content does not end in newline
  2864. expected = (
  2865. self.marker_start
  2866. + os.linesep
  2867. + self.content.rstrip("\r\n")
  2868. + self.marker_end
  2869. + os.linesep
  2870. + self.without_block
  2871. )
  2872. self._write(name, self.without_block)
  2873. ret = self.run_state(
  2874. "file.blockreplace",
  2875. name=name,
  2876. content=self.content.rstrip("\r\n"),
  2877. marker_start=self.marker_start,
  2878. marker_end=self.marker_end,
  2879. prepend_if_not_found=True,
  2880. append_newline=False,
  2881. )
  2882. self.assertSaltTrueReturn(ret)
  2883. self.assertTrue(ret[next(iter(ret))]["changes"])
  2884. self.assertEqual(self._read(name), expected)
  2885. # Pass 2a: Re-run state, no changes should be made
  2886. ret = self.run_state(
  2887. "file.blockreplace",
  2888. name=name,
  2889. content=self.content.rstrip("\r\n"),
  2890. marker_start=self.marker_start,
  2891. marker_end=self.marker_end,
  2892. prepend_if_not_found=True,
  2893. append_newline=False,
  2894. )
  2895. self.assertSaltTrueReturn(ret)
  2896. self.assertFalse(ret[next(iter(ret))]["changes"])
  2897. self.assertEqual(self._read(name), expected)
  2898. @with_tempfile()
  2899. def test_append(self, name):
  2900. """
  2901. Test blockreplace when append_if_not_found=True and block doesn't
  2902. exist in file.
  2903. """
  2904. expected = (
  2905. self.without_block
  2906. + self.marker_start
  2907. + os.linesep
  2908. + self.content
  2909. + self.marker_end
  2910. + os.linesep
  2911. )
  2912. # Pass 1: content ends in newline
  2913. self._write(name, self.without_block)
  2914. ret = self.run_state(
  2915. "file.blockreplace",
  2916. name=name,
  2917. content=self.content,
  2918. marker_start=self.marker_start,
  2919. marker_end=self.marker_end,
  2920. append_if_not_found=True,
  2921. )
  2922. self.assertSaltTrueReturn(ret)
  2923. self.assertTrue(ret[next(iter(ret))]["changes"])
  2924. self.assertEqual(self._read(name), expected)
  2925. # Pass 1a: Re-run state, no changes should be made
  2926. ret = self.run_state(
  2927. "file.blockreplace",
  2928. name=name,
  2929. content=self.content,
  2930. marker_start=self.marker_start,
  2931. marker_end=self.marker_end,
  2932. append_if_not_found=True,
  2933. )
  2934. self.assertSaltTrueReturn(ret)
  2935. self.assertFalse(ret[next(iter(ret))]["changes"])
  2936. self.assertEqual(self._read(name), expected)
  2937. # Pass 2: content does not end in newline
  2938. self._write(name, self.without_block)
  2939. ret = self.run_state(
  2940. "file.blockreplace",
  2941. name=name,
  2942. content=self.content.rstrip("\r\n"),
  2943. marker_start=self.marker_start,
  2944. marker_end=self.marker_end,
  2945. append_if_not_found=True,
  2946. )
  2947. self.assertSaltTrueReturn(ret)
  2948. self.assertTrue(ret[next(iter(ret))]["changes"])
  2949. self.assertEqual(self._read(name), expected)
  2950. # Pass 2a: Re-run state, no changes should be made
  2951. ret = self.run_state(
  2952. "file.blockreplace",
  2953. name=name,
  2954. content=self.content.rstrip("\r\n"),
  2955. marker_start=self.marker_start,
  2956. marker_end=self.marker_end,
  2957. append_if_not_found=True,
  2958. )
  2959. self.assertSaltTrueReturn(ret)
  2960. self.assertFalse(ret[next(iter(ret))]["changes"])
  2961. self.assertEqual(self._read(name), expected)
  2962. @with_tempfile()
  2963. def test_append_append_newline(self, name):
  2964. """
  2965. Test blockreplace when append_if_not_found=True and block doesn't
  2966. exist in file. Test with append_newline explicitly set to True.
  2967. """
  2968. # Pass 1: content ends in newline
  2969. expected = (
  2970. self.without_block
  2971. + self.marker_start
  2972. + os.linesep
  2973. + self.content
  2974. + os.linesep
  2975. + self.marker_end
  2976. + os.linesep
  2977. )
  2978. self._write(name, self.without_block)
  2979. ret = self.run_state(
  2980. "file.blockreplace",
  2981. name=name,
  2982. content=self.content,
  2983. marker_start=self.marker_start,
  2984. marker_end=self.marker_end,
  2985. append_if_not_found=True,
  2986. append_newline=True,
  2987. )
  2988. self.assertSaltTrueReturn(ret)
  2989. self.assertTrue(ret[next(iter(ret))]["changes"])
  2990. self.assertEqual(self._read(name), expected)
  2991. # Pass 1a: Re-run state, no changes should be made
  2992. ret = self.run_state(
  2993. "file.blockreplace",
  2994. name=name,
  2995. content=self.content,
  2996. marker_start=self.marker_start,
  2997. marker_end=self.marker_end,
  2998. append_if_not_found=True,
  2999. append_newline=True,
  3000. )
  3001. self.assertSaltTrueReturn(ret)
  3002. self.assertFalse(ret[next(iter(ret))]["changes"])
  3003. self.assertEqual(self._read(name), expected)
  3004. # Pass 2: content does not end in newline
  3005. expected = (
  3006. self.without_block
  3007. + self.marker_start
  3008. + os.linesep
  3009. + self.content
  3010. + self.marker_end
  3011. + os.linesep
  3012. )
  3013. self._write(name, self.without_block)
  3014. ret = self.run_state(
  3015. "file.blockreplace",
  3016. name=name,
  3017. content=self.content.rstrip("\r\n"),
  3018. marker_start=self.marker_start,
  3019. marker_end=self.marker_end,
  3020. append_if_not_found=True,
  3021. append_newline=True,
  3022. )
  3023. self.assertSaltTrueReturn(ret)
  3024. self.assertTrue(ret[next(iter(ret))]["changes"])
  3025. self.assertEqual(self._read(name), expected)
  3026. # Pass 2a: Re-run state, no changes should be made
  3027. ret = self.run_state(
  3028. "file.blockreplace",
  3029. name=name,
  3030. content=self.content.rstrip("\r\n"),
  3031. marker_start=self.marker_start,
  3032. marker_end=self.marker_end,
  3033. append_if_not_found=True,
  3034. append_newline=True,
  3035. )
  3036. self.assertSaltTrueReturn(ret)
  3037. self.assertFalse(ret[next(iter(ret))]["changes"])
  3038. self.assertEqual(self._read(name), expected)
  3039. @with_tempfile()
  3040. def test_append_no_append_newline(self, name):
  3041. """
  3042. Test blockreplace when append_if_not_found=True and block doesn't
  3043. exist in file. Test with append_newline explicitly set to False.
  3044. """
  3045. # Pass 1: content ends in newline
  3046. expected = (
  3047. self.without_block
  3048. + self.marker_start
  3049. + os.linesep
  3050. + self.content
  3051. + self.marker_end
  3052. + os.linesep
  3053. )
  3054. self._write(name, self.without_block)
  3055. ret = self.run_state(
  3056. "file.blockreplace",
  3057. name=name,
  3058. content=self.content,
  3059. marker_start=self.marker_start,
  3060. marker_end=self.marker_end,
  3061. append_if_not_found=True,
  3062. append_newline=False,
  3063. )
  3064. self.assertSaltTrueReturn(ret)
  3065. self.assertTrue(ret[next(iter(ret))]["changes"])
  3066. self.assertEqual(self._read(name), expected)
  3067. # Pass 1a: Re-run state, no changes should be made
  3068. ret = self.run_state(
  3069. "file.blockreplace",
  3070. name=name,
  3071. content=self.content,
  3072. marker_start=self.marker_start,
  3073. marker_end=self.marker_end,
  3074. append_if_not_found=True,
  3075. append_newline=False,
  3076. )
  3077. self.assertSaltTrueReturn(ret)
  3078. self.assertFalse(ret[next(iter(ret))]["changes"])
  3079. self.assertEqual(self._read(name), expected)
  3080. # Pass 2: content does not end in newline
  3081. expected = (
  3082. self.without_block
  3083. + self.marker_start
  3084. + os.linesep
  3085. + self.content.rstrip("\r\n")
  3086. + self.marker_end
  3087. + os.linesep
  3088. )
  3089. self._write(name, self.without_block)
  3090. ret = self.run_state(
  3091. "file.blockreplace",
  3092. name=name,
  3093. content=self.content.rstrip("\r\n"),
  3094. marker_start=self.marker_start,
  3095. marker_end=self.marker_end,
  3096. append_if_not_found=True,
  3097. append_newline=False,
  3098. )
  3099. self.assertSaltTrueReturn(ret)
  3100. self.assertTrue(ret[next(iter(ret))]["changes"])
  3101. self.assertEqual(self._read(name), expected)
  3102. # Pass 2a: Re-run state, no changes should be made
  3103. ret = self.run_state(
  3104. "file.blockreplace",
  3105. name=name,
  3106. content=self.content.rstrip("\r\n"),
  3107. marker_start=self.marker_start,
  3108. marker_end=self.marker_end,
  3109. append_if_not_found=True,
  3110. append_newline=False,
  3111. )
  3112. self.assertSaltTrueReturn(ret)
  3113. self.assertFalse(ret[next(iter(ret))]["changes"])
  3114. self.assertEqual(self._read(name), expected)
  3115. @with_tempfile()
  3116. def test_prepend_auto_line_separator(self, name):
  3117. """
  3118. This tests the line separator auto-detection when prepending the block
  3119. """
  3120. # POSIX newlines to Windows newlines
  3121. self._write(name, self.without_block_explicit_windows_newlines)
  3122. ret = self.run_state(
  3123. "file.blockreplace",
  3124. name=name,
  3125. content=self.content_explicit_posix_newlines,
  3126. marker_start=self.marker_start,
  3127. marker_end=self.marker_end,
  3128. prepend_if_not_found=True,
  3129. )
  3130. self.assertSaltTrueReturn(ret)
  3131. self.assertTrue(ret[next(iter(ret))]["changes"])
  3132. self.assertEqual(
  3133. self._read(name), self.with_block_prepended_explicit_windows_newlines
  3134. )
  3135. # Re-run state, no changes should be made
  3136. ret = self.run_state(
  3137. "file.blockreplace",
  3138. name=name,
  3139. content=self.content_explicit_posix_newlines,
  3140. marker_start=self.marker_start,
  3141. marker_end=self.marker_end,
  3142. prepend_if_not_found=True,
  3143. )
  3144. self.assertSaltTrueReturn(ret)
  3145. self.assertFalse(ret[next(iter(ret))]["changes"])
  3146. self.assertEqual(
  3147. self._read(name), self.with_block_prepended_explicit_windows_newlines
  3148. )
  3149. # Windows newlines to POSIX newlines
  3150. self._write(name, self.without_block_explicit_posix_newlines)
  3151. ret = self.run_state(
  3152. "file.blockreplace",
  3153. name=name,
  3154. content=self.content_explicit_windows_newlines,
  3155. marker_start=self.marker_start,
  3156. marker_end=self.marker_end,
  3157. prepend_if_not_found=True,
  3158. )
  3159. self.assertSaltTrueReturn(ret)
  3160. self.assertTrue(ret[next(iter(ret))]["changes"])
  3161. self.assertEqual(
  3162. self._read(name), self.with_block_prepended_explicit_posix_newlines
  3163. )
  3164. # Re-run state, no changes should be made
  3165. ret = self.run_state(
  3166. "file.blockreplace",
  3167. name=name,
  3168. content=self.content_explicit_windows_newlines,
  3169. marker_start=self.marker_start,
  3170. marker_end=self.marker_end,
  3171. prepend_if_not_found=True,
  3172. )
  3173. self.assertSaltTrueReturn(ret)
  3174. self.assertFalse(ret[next(iter(ret))]["changes"])
  3175. self.assertEqual(
  3176. self._read(name), self.with_block_prepended_explicit_posix_newlines
  3177. )
  3178. @with_tempfile()
  3179. def test_append_auto_line_separator(self, name):
  3180. """
  3181. This tests the line separator auto-detection when appending the block
  3182. """
  3183. # POSIX newlines to Windows newlines
  3184. self._write(name, self.without_block_explicit_windows_newlines)
  3185. ret = self.run_state(
  3186. "file.blockreplace",
  3187. name=name,
  3188. content=self.content_explicit_posix_newlines,
  3189. marker_start=self.marker_start,
  3190. marker_end=self.marker_end,
  3191. append_if_not_found=True,
  3192. )
  3193. self.assertSaltTrueReturn(ret)
  3194. self.assertTrue(ret[next(iter(ret))]["changes"])
  3195. self.assertEqual(
  3196. self._read(name), self.with_block_appended_explicit_windows_newlines
  3197. )
  3198. # Re-run state, no changes should be made
  3199. ret = self.run_state(
  3200. "file.blockreplace",
  3201. name=name,
  3202. content=self.content_explicit_posix_newlines,
  3203. marker_start=self.marker_start,
  3204. marker_end=self.marker_end,
  3205. append_if_not_found=True,
  3206. )
  3207. self.assertSaltTrueReturn(ret)
  3208. self.assertFalse(ret[next(iter(ret))]["changes"])
  3209. self.assertEqual(
  3210. self._read(name), self.with_block_appended_explicit_windows_newlines
  3211. )
  3212. # Windows newlines to POSIX newlines
  3213. self._write(name, self.without_block_explicit_posix_newlines)
  3214. ret = self.run_state(
  3215. "file.blockreplace",
  3216. name=name,
  3217. content=self.content_explicit_windows_newlines,
  3218. marker_start=self.marker_start,
  3219. marker_end=self.marker_end,
  3220. append_if_not_found=True,
  3221. )
  3222. self.assertSaltTrueReturn(ret)
  3223. self.assertTrue(ret[next(iter(ret))]["changes"])
  3224. self.assertEqual(
  3225. self._read(name), self.with_block_appended_explicit_posix_newlines
  3226. )
  3227. # Re-run state, no changes should be made
  3228. ret = self.run_state(
  3229. "file.blockreplace",
  3230. name=name,
  3231. content=self.content_explicit_windows_newlines,
  3232. marker_start=self.marker_start,
  3233. marker_end=self.marker_end,
  3234. append_if_not_found=True,
  3235. )
  3236. self.assertSaltTrueReturn(ret)
  3237. self.assertFalse(ret[next(iter(ret))]["changes"])
  3238. self.assertEqual(
  3239. self._read(name), self.with_block_appended_explicit_posix_newlines
  3240. )
  3241. @with_tempfile()
  3242. def test_non_matching_block(self, name):
  3243. """
  3244. Test blockreplace when block exists but its contents are not a
  3245. match.
  3246. """
  3247. # Pass 1: content ends in newline
  3248. self._write(name, self.with_non_matching_block)
  3249. ret = self.run_state(
  3250. "file.blockreplace",
  3251. name=name,
  3252. content=self.content,
  3253. marker_start=self.marker_start,
  3254. marker_end=self.marker_end,
  3255. )
  3256. self.assertSaltTrueReturn(ret)
  3257. self.assertTrue(ret[next(iter(ret))]["changes"])
  3258. self.assertEqual(self._read(name), self.with_matching_block)
  3259. # Pass 1a: Re-run state, no changes should be made
  3260. ret = self.run_state(
  3261. "file.blockreplace",
  3262. name=name,
  3263. content=self.content,
  3264. marker_start=self.marker_start,
  3265. marker_end=self.marker_end,
  3266. )
  3267. self.assertSaltTrueReturn(ret)
  3268. self.assertFalse(ret[next(iter(ret))]["changes"])
  3269. self.assertEqual(self._read(name), self.with_matching_block)
  3270. # Pass 2: content does not end in newline
  3271. self._write(name, self.with_non_matching_block)
  3272. ret = self.run_state(
  3273. "file.blockreplace",
  3274. name=name,
  3275. content=self.content.rstrip("\r\n"),
  3276. marker_start=self.marker_start,
  3277. marker_end=self.marker_end,
  3278. )
  3279. self.assertSaltTrueReturn(ret)
  3280. self.assertTrue(ret[next(iter(ret))]["changes"])
  3281. self.assertEqual(self._read(name), self.with_matching_block)
  3282. # Pass 2a: Re-run state, no changes should be made
  3283. ret = self.run_state(
  3284. "file.blockreplace",
  3285. name=name,
  3286. content=self.content.rstrip("\r\n"),
  3287. marker_start=self.marker_start,
  3288. marker_end=self.marker_end,
  3289. )
  3290. self.assertSaltTrueReturn(ret)
  3291. self.assertFalse(ret[next(iter(ret))]["changes"])
  3292. self.assertEqual(self._read(name), self.with_matching_block)
  3293. @with_tempfile()
  3294. def test_non_matching_block_append_newline(self, name):
  3295. """
  3296. Test blockreplace when block exists but its contents are not a
  3297. match. Test with append_newline explicitly set to True.
  3298. """
  3299. # Pass 1: content ends in newline
  3300. self._write(name, self.with_non_matching_block)
  3301. ret = self.run_state(
  3302. "file.blockreplace",
  3303. name=name,
  3304. content=self.content,
  3305. marker_start=self.marker_start,
  3306. marker_end=self.marker_end,
  3307. append_newline=True,
  3308. )
  3309. self.assertSaltTrueReturn(ret)
  3310. self.assertTrue(ret[next(iter(ret))]["changes"])
  3311. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3312. # Pass 1a: Re-run state, no changes should be made
  3313. ret = self.run_state(
  3314. "file.blockreplace",
  3315. name=name,
  3316. content=self.content,
  3317. marker_start=self.marker_start,
  3318. marker_end=self.marker_end,
  3319. append_newline=True,
  3320. )
  3321. self.assertSaltTrueReturn(ret)
  3322. self.assertFalse(ret[next(iter(ret))]["changes"])
  3323. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3324. # Pass 2: content does not end in newline
  3325. self._write(name, self.with_non_matching_block)
  3326. ret = self.run_state(
  3327. "file.blockreplace",
  3328. name=name,
  3329. content=self.content.rstrip("\r\n"),
  3330. marker_start=self.marker_start,
  3331. marker_end=self.marker_end,
  3332. append_newline=True,
  3333. )
  3334. self.assertSaltTrueReturn(ret)
  3335. self.assertTrue(ret[next(iter(ret))]["changes"])
  3336. self.assertEqual(self._read(name), self.with_matching_block)
  3337. # Pass 2a: Re-run state, no changes should be made
  3338. ret = self.run_state(
  3339. "file.blockreplace",
  3340. name=name,
  3341. content=self.content.rstrip("\r\n"),
  3342. marker_start=self.marker_start,
  3343. marker_end=self.marker_end,
  3344. append_newline=True,
  3345. )
  3346. self.assertSaltTrueReturn(ret)
  3347. self.assertFalse(ret[next(iter(ret))]["changes"])
  3348. self.assertEqual(self._read(name), self.with_matching_block)
  3349. @with_tempfile()
  3350. def test_non_matching_block_no_append_newline(self, name):
  3351. """
  3352. Test blockreplace when block exists but its contents are not a
  3353. match. Test with append_newline explicitly set to False.
  3354. """
  3355. # Pass 1: content ends in newline
  3356. self._write(name, self.with_non_matching_block)
  3357. ret = self.run_state(
  3358. "file.blockreplace",
  3359. name=name,
  3360. content=self.content,
  3361. marker_start=self.marker_start,
  3362. marker_end=self.marker_end,
  3363. append_newline=False,
  3364. )
  3365. self.assertSaltTrueReturn(ret)
  3366. self.assertTrue(ret[next(iter(ret))]["changes"])
  3367. self.assertEqual(self._read(name), self.with_matching_block)
  3368. # Pass 1a: Re-run state, no changes should be made
  3369. ret = self.run_state(
  3370. "file.blockreplace",
  3371. name=name,
  3372. content=self.content,
  3373. marker_start=self.marker_start,
  3374. marker_end=self.marker_end,
  3375. append_newline=False,
  3376. )
  3377. self.assertSaltTrueReturn(ret)
  3378. self.assertFalse(ret[next(iter(ret))]["changes"])
  3379. self.assertEqual(self._read(name), self.with_matching_block)
  3380. # Pass 2: content does not end in newline
  3381. self._write(name, self.with_non_matching_block)
  3382. ret = self.run_state(
  3383. "file.blockreplace",
  3384. name=name,
  3385. content=self.content.rstrip("\r\n"),
  3386. marker_start=self.marker_start,
  3387. marker_end=self.marker_end,
  3388. append_newline=False,
  3389. )
  3390. self.assertSaltTrueReturn(ret)
  3391. self.assertTrue(ret[next(iter(ret))]["changes"])
  3392. self.assertEqual(
  3393. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3394. )
  3395. # Pass 2a: Re-run state, no changes should be made
  3396. ret = self.run_state(
  3397. "file.blockreplace",
  3398. name=name,
  3399. content=self.content.rstrip("\r\n"),
  3400. marker_start=self.marker_start,
  3401. marker_end=self.marker_end,
  3402. append_newline=False,
  3403. )
  3404. self.assertSaltTrueReturn(ret)
  3405. self.assertFalse(ret[next(iter(ret))]["changes"])
  3406. self.assertEqual(
  3407. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3408. )
  3409. @with_tempfile()
  3410. def test_non_matching_block_and_marker_not_after_newline(self, name):
  3411. """
  3412. Test blockreplace when block exists but its contents are not a
  3413. match, and the marker_end is not directly preceded by a newline.
  3414. """
  3415. # Pass 1: content ends in newline
  3416. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3417. ret = self.run_state(
  3418. "file.blockreplace",
  3419. name=name,
  3420. content=self.content,
  3421. marker_start=self.marker_start,
  3422. marker_end=self.marker_end,
  3423. )
  3424. self.assertSaltTrueReturn(ret)
  3425. self.assertTrue(ret[next(iter(ret))]["changes"])
  3426. self.assertEqual(self._read(name), self.with_matching_block)
  3427. # Pass 1a: Re-run state, no changes should be made
  3428. ret = self.run_state(
  3429. "file.blockreplace",
  3430. name=name,
  3431. content=self.content,
  3432. marker_start=self.marker_start,
  3433. marker_end=self.marker_end,
  3434. )
  3435. self.assertSaltTrueReturn(ret)
  3436. self.assertFalse(ret[next(iter(ret))]["changes"])
  3437. self.assertEqual(self._read(name), self.with_matching_block)
  3438. # Pass 2: content does not end in newline
  3439. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3440. ret = self.run_state(
  3441. "file.blockreplace",
  3442. name=name,
  3443. content=self.content.rstrip("\r\n"),
  3444. marker_start=self.marker_start,
  3445. marker_end=self.marker_end,
  3446. )
  3447. self.assertSaltTrueReturn(ret)
  3448. self.assertTrue(ret[next(iter(ret))]["changes"])
  3449. self.assertEqual(self._read(name), self.with_matching_block)
  3450. # Pass 2a: Re-run state, no changes should be made
  3451. ret = self.run_state(
  3452. "file.blockreplace",
  3453. name=name,
  3454. content=self.content.rstrip("\r\n"),
  3455. marker_start=self.marker_start,
  3456. marker_end=self.marker_end,
  3457. )
  3458. self.assertSaltTrueReturn(ret)
  3459. self.assertFalse(ret[next(iter(ret))]["changes"])
  3460. self.assertEqual(self._read(name), self.with_matching_block)
  3461. @with_tempfile()
  3462. def test_non_matching_block_and_marker_not_after_newline_append_newline(self, name):
  3463. """
  3464. Test blockreplace when block exists but its contents are not a match,
  3465. and the marker_end is not directly preceded by a newline. Test with
  3466. append_newline explicitly set to True.
  3467. """
  3468. # Pass 1: content ends in newline
  3469. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3470. ret = self.run_state(
  3471. "file.blockreplace",
  3472. name=name,
  3473. content=self.content,
  3474. marker_start=self.marker_start,
  3475. marker_end=self.marker_end,
  3476. append_newline=True,
  3477. )
  3478. self.assertSaltTrueReturn(ret)
  3479. self.assertTrue(ret[next(iter(ret))]["changes"])
  3480. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3481. # Pass 1a: Re-run state, no changes should be made
  3482. ret = self.run_state(
  3483. "file.blockreplace",
  3484. name=name,
  3485. content=self.content,
  3486. marker_start=self.marker_start,
  3487. marker_end=self.marker_end,
  3488. append_newline=True,
  3489. )
  3490. self.assertSaltTrueReturn(ret)
  3491. self.assertFalse(ret[next(iter(ret))]["changes"])
  3492. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3493. # Pass 2: content does not end in newline
  3494. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3495. ret = self.run_state(
  3496. "file.blockreplace",
  3497. name=name,
  3498. content=self.content.rstrip("\r\n"),
  3499. marker_start=self.marker_start,
  3500. marker_end=self.marker_end,
  3501. append_newline=True,
  3502. )
  3503. self.assertSaltTrueReturn(ret)
  3504. self.assertTrue(ret[next(iter(ret))]["changes"])
  3505. self.assertEqual(self._read(name), self.with_matching_block)
  3506. # Pass 2a: Re-run state, no changes should be made
  3507. ret = self.run_state(
  3508. "file.blockreplace",
  3509. name=name,
  3510. content=self.content.rstrip("\r\n"),
  3511. marker_start=self.marker_start,
  3512. marker_end=self.marker_end,
  3513. append_newline=True,
  3514. )
  3515. self.assertSaltTrueReturn(ret)
  3516. self.assertFalse(ret[next(iter(ret))]["changes"])
  3517. self.assertEqual(self._read(name), self.with_matching_block)
  3518. @with_tempfile()
  3519. def test_non_matching_block_and_marker_not_after_newline_no_append_newline(
  3520. self, name
  3521. ):
  3522. """
  3523. Test blockreplace when block exists but its contents are not a match,
  3524. and the marker_end is not directly preceded by a newline. Test with
  3525. append_newline explicitly set to False.
  3526. """
  3527. # Pass 1: content ends in newline
  3528. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3529. ret = self.run_state(
  3530. "file.blockreplace",
  3531. name=name,
  3532. content=self.content,
  3533. marker_start=self.marker_start,
  3534. marker_end=self.marker_end,
  3535. append_newline=False,
  3536. )
  3537. self.assertSaltTrueReturn(ret)
  3538. self.assertTrue(ret[next(iter(ret))]["changes"])
  3539. self.assertEqual(self._read(name), self.with_matching_block)
  3540. # Pass 1a: Re-run state, no changes should be made
  3541. ret = self.run_state(
  3542. "file.blockreplace",
  3543. name=name,
  3544. content=self.content,
  3545. marker_start=self.marker_start,
  3546. marker_end=self.marker_end,
  3547. append_newline=False,
  3548. )
  3549. self.assertSaltTrueReturn(ret)
  3550. self.assertFalse(ret[next(iter(ret))]["changes"])
  3551. self.assertEqual(self._read(name), self.with_matching_block)
  3552. # Pass 2: content does not end in newline
  3553. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3554. ret = self.run_state(
  3555. "file.blockreplace",
  3556. name=name,
  3557. content=self.content.rstrip("\r\n"),
  3558. marker_start=self.marker_start,
  3559. marker_end=self.marker_end,
  3560. append_newline=False,
  3561. )
  3562. self.assertSaltTrueReturn(ret)
  3563. self.assertTrue(ret[next(iter(ret))]["changes"])
  3564. self.assertEqual(
  3565. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3566. )
  3567. # Pass 2a: Re-run state, no changes should be made
  3568. ret = self.run_state(
  3569. "file.blockreplace",
  3570. name=name,
  3571. content=self.content.rstrip("\r\n"),
  3572. marker_start=self.marker_start,
  3573. marker_end=self.marker_end,
  3574. append_newline=False,
  3575. )
  3576. self.assertSaltTrueReturn(ret)
  3577. self.assertFalse(ret[next(iter(ret))]["changes"])
  3578. self.assertEqual(
  3579. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3580. )
  3581. @with_tempfile()
  3582. def test_matching_block(self, name):
  3583. """
  3584. Test blockreplace when block exists and its contents are a match. No
  3585. changes should be made.
  3586. """
  3587. # Pass 1: content ends in newline
  3588. self._write(name, self.with_matching_block)
  3589. ret = self.run_state(
  3590. "file.blockreplace",
  3591. name=name,
  3592. content=self.content,
  3593. marker_start=self.marker_start,
  3594. marker_end=self.marker_end,
  3595. )
  3596. self.assertSaltTrueReturn(ret)
  3597. self.assertFalse(ret[next(iter(ret))]["changes"])
  3598. self.assertEqual(self._read(name), self.with_matching_block)
  3599. # Pass 1a: Re-run state, no changes should be made
  3600. ret = self.run_state(
  3601. "file.blockreplace",
  3602. name=name,
  3603. content=self.content,
  3604. marker_start=self.marker_start,
  3605. marker_end=self.marker_end,
  3606. )
  3607. self.assertSaltTrueReturn(ret)
  3608. self.assertFalse(ret[next(iter(ret))]["changes"])
  3609. self.assertEqual(self._read(name), self.with_matching_block)
  3610. # Pass 2: content does not end in newline
  3611. self._write(name, self.with_matching_block)
  3612. ret = self.run_state(
  3613. "file.blockreplace",
  3614. name=name,
  3615. content=self.content.rstrip("\r\n"),
  3616. marker_start=self.marker_start,
  3617. marker_end=self.marker_end,
  3618. )
  3619. self.assertSaltTrueReturn(ret)
  3620. self.assertFalse(ret[next(iter(ret))]["changes"])
  3621. self.assertEqual(self._read(name), self.with_matching_block)
  3622. # Pass 2a: Re-run state, no changes should be made
  3623. ret = self.run_state(
  3624. "file.blockreplace",
  3625. name=name,
  3626. content=self.content.rstrip("\r\n"),
  3627. marker_start=self.marker_start,
  3628. marker_end=self.marker_end,
  3629. )
  3630. self.assertSaltTrueReturn(ret)
  3631. self.assertFalse(ret[next(iter(ret))]["changes"])
  3632. self.assertEqual(self._read(name), self.with_matching_block)
  3633. @with_tempfile()
  3634. def test_matching_block_append_newline(self, name):
  3635. """
  3636. Test blockreplace when block exists and its contents are a match. Test
  3637. with append_newline explicitly set to True. This will result in an
  3638. extra newline when the content ends in a newline, and will not when the
  3639. content does not end in a newline.
  3640. """
  3641. # Pass 1: content ends in newline
  3642. self._write(name, self.with_matching_block)
  3643. ret = self.run_state(
  3644. "file.blockreplace",
  3645. name=name,
  3646. content=self.content,
  3647. marker_start=self.marker_start,
  3648. marker_end=self.marker_end,
  3649. append_newline=True,
  3650. )
  3651. self.assertSaltTrueReturn(ret)
  3652. self.assertTrue(ret[next(iter(ret))]["changes"])
  3653. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3654. # Pass 1a: Re-run state, no changes should be made
  3655. ret = self.run_state(
  3656. "file.blockreplace",
  3657. name=name,
  3658. content=self.content,
  3659. marker_start=self.marker_start,
  3660. marker_end=self.marker_end,
  3661. append_newline=True,
  3662. )
  3663. self.assertSaltTrueReturn(ret)
  3664. self.assertFalse(ret[next(iter(ret))]["changes"])
  3665. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3666. # Pass 2: content does not end in newline
  3667. self._write(name, self.with_matching_block)
  3668. ret = self.run_state(
  3669. "file.blockreplace",
  3670. name=name,
  3671. content=self.content.rstrip("\r\n"),
  3672. marker_start=self.marker_start,
  3673. marker_end=self.marker_end,
  3674. append_newline=True,
  3675. )
  3676. self.assertSaltTrueReturn(ret)
  3677. self.assertFalse(ret[next(iter(ret))]["changes"])
  3678. self.assertEqual(self._read(name), self.with_matching_block)
  3679. # Pass 2a: Re-run state, no changes should be made
  3680. ret = self.run_state(
  3681. "file.blockreplace",
  3682. name=name,
  3683. content=self.content.rstrip("\r\n"),
  3684. marker_start=self.marker_start,
  3685. marker_end=self.marker_end,
  3686. append_newline=True,
  3687. )
  3688. self.assertSaltTrueReturn(ret)
  3689. self.assertFalse(ret[next(iter(ret))]["changes"])
  3690. self.assertEqual(self._read(name), self.with_matching_block)
  3691. @with_tempfile()
  3692. def test_matching_block_no_append_newline(self, name):
  3693. """
  3694. Test blockreplace when block exists and its contents are a match. Test
  3695. with append_newline explicitly set to False. This will result in the
  3696. marker_end not being directly preceded by a newline when the content
  3697. does not end in a newline.
  3698. """
  3699. # Pass 1: content ends in newline
  3700. self._write(name, self.with_matching_block)
  3701. ret = self.run_state(
  3702. "file.blockreplace",
  3703. name=name,
  3704. content=self.content,
  3705. marker_start=self.marker_start,
  3706. marker_end=self.marker_end,
  3707. append_newline=False,
  3708. )
  3709. self.assertSaltTrueReturn(ret)
  3710. self.assertFalse(ret[next(iter(ret))]["changes"])
  3711. self.assertEqual(self._read(name), self.with_matching_block)
  3712. # Pass 1a: Re-run state, no changes should be made
  3713. ret = self.run_state(
  3714. "file.blockreplace",
  3715. name=name,
  3716. content=self.content,
  3717. marker_start=self.marker_start,
  3718. marker_end=self.marker_end,
  3719. append_newline=False,
  3720. )
  3721. self.assertSaltTrueReturn(ret)
  3722. self.assertFalse(ret[next(iter(ret))]["changes"])
  3723. self.assertEqual(self._read(name), self.with_matching_block)
  3724. # Pass 2: content does not end in newline
  3725. self._write(name, self.with_matching_block)
  3726. ret = self.run_state(
  3727. "file.blockreplace",
  3728. name=name,
  3729. content=self.content.rstrip("\r\n"),
  3730. marker_start=self.marker_start,
  3731. marker_end=self.marker_end,
  3732. append_newline=False,
  3733. )
  3734. self.assertSaltTrueReturn(ret)
  3735. self.assertTrue(ret[next(iter(ret))]["changes"])
  3736. self.assertEqual(
  3737. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3738. )
  3739. # Pass 2a: Re-run state, no changes should be made
  3740. ret = self.run_state(
  3741. "file.blockreplace",
  3742. name=name,
  3743. content=self.content.rstrip("\r\n"),
  3744. marker_start=self.marker_start,
  3745. marker_end=self.marker_end,
  3746. append_newline=False,
  3747. )
  3748. self.assertSaltTrueReturn(ret)
  3749. self.assertFalse(ret[next(iter(ret))]["changes"])
  3750. self.assertEqual(
  3751. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3752. )
  3753. @with_tempfile()
  3754. def test_matching_block_and_marker_not_after_newline(self, name):
  3755. """
  3756. Test blockreplace when block exists and its contents are a match, but
  3757. the marker_end is not directly preceded by a newline.
  3758. """
  3759. # Pass 1: content ends in newline
  3760. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3761. ret = self.run_state(
  3762. "file.blockreplace",
  3763. name=name,
  3764. content=self.content,
  3765. marker_start=self.marker_start,
  3766. marker_end=self.marker_end,
  3767. )
  3768. self.assertSaltTrueReturn(ret)
  3769. self.assertTrue(ret[next(iter(ret))]["changes"])
  3770. self.assertEqual(self._read(name), self.with_matching_block)
  3771. # Pass 1a: Re-run state, no changes should be made
  3772. ret = self.run_state(
  3773. "file.blockreplace",
  3774. name=name,
  3775. content=self.content,
  3776. marker_start=self.marker_start,
  3777. marker_end=self.marker_end,
  3778. )
  3779. self.assertSaltTrueReturn(ret)
  3780. self.assertFalse(ret[next(iter(ret))]["changes"])
  3781. self.assertEqual(self._read(name), self.with_matching_block)
  3782. # Pass 2: content does not end in newline
  3783. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3784. ret = self.run_state(
  3785. "file.blockreplace",
  3786. name=name,
  3787. content=self.content.rstrip("\r\n"),
  3788. marker_start=self.marker_start,
  3789. marker_end=self.marker_end,
  3790. )
  3791. self.assertSaltTrueReturn(ret)
  3792. self.assertTrue(ret[next(iter(ret))]["changes"])
  3793. self.assertEqual(self._read(name), self.with_matching_block)
  3794. # Pass 2a: Re-run state, no changes should be made
  3795. ret = self.run_state(
  3796. "file.blockreplace",
  3797. name=name,
  3798. content=self.content.rstrip("\r\n"),
  3799. marker_start=self.marker_start,
  3800. marker_end=self.marker_end,
  3801. )
  3802. self.assertSaltTrueReturn(ret)
  3803. self.assertFalse(ret[next(iter(ret))]["changes"])
  3804. self.assertEqual(self._read(name), self.with_matching_block)
  3805. @with_tempfile()
  3806. def test_matching_block_and_marker_not_after_newline_append_newline(self, name):
  3807. """
  3808. Test blockreplace when block exists and its contents are a match, but
  3809. the marker_end is not directly preceded by a newline. Test with
  3810. append_newline explicitly set to True. This will result in an extra
  3811. newline when the content ends in a newline, and will not when the
  3812. content does not end in a newline.
  3813. """
  3814. # Pass 1: content ends in newline
  3815. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3816. ret = self.run_state(
  3817. "file.blockreplace",
  3818. name=name,
  3819. content=self.content,
  3820. marker_start=self.marker_start,
  3821. marker_end=self.marker_end,
  3822. append_newline=True,
  3823. )
  3824. self.assertSaltTrueReturn(ret)
  3825. self.assertTrue(ret[next(iter(ret))]["changes"])
  3826. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3827. # Pass 1a: Re-run state, no changes should be made
  3828. ret = self.run_state(
  3829. "file.blockreplace",
  3830. name=name,
  3831. content=self.content,
  3832. marker_start=self.marker_start,
  3833. marker_end=self.marker_end,
  3834. append_newline=True,
  3835. )
  3836. self.assertSaltTrueReturn(ret)
  3837. self.assertFalse(ret[next(iter(ret))]["changes"])
  3838. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3839. # Pass 2: content does not end in newline
  3840. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3841. ret = self.run_state(
  3842. "file.blockreplace",
  3843. name=name,
  3844. content=self.content.rstrip("\r\n"),
  3845. marker_start=self.marker_start,
  3846. marker_end=self.marker_end,
  3847. append_newline=True,
  3848. )
  3849. self.assertSaltTrueReturn(ret)
  3850. self.assertTrue(ret[next(iter(ret))]["changes"])
  3851. self.assertEqual(self._read(name), self.with_matching_block)
  3852. # Pass 2a: Re-run state, no changes should be made
  3853. ret = self.run_state(
  3854. "file.blockreplace",
  3855. name=name,
  3856. content=self.content.rstrip("\r\n"),
  3857. marker_start=self.marker_start,
  3858. marker_end=self.marker_end,
  3859. append_newline=True,
  3860. )
  3861. self.assertSaltTrueReturn(ret)
  3862. self.assertFalse(ret[next(iter(ret))]["changes"])
  3863. self.assertEqual(self._read(name), self.with_matching_block)
  3864. @with_tempfile()
  3865. def test_matching_block_and_marker_not_after_newline_no_append_newline(self, name):
  3866. """
  3867. Test blockreplace when block exists and its contents are a match, but
  3868. the marker_end is not directly preceded by a newline. Test with
  3869. append_newline explicitly set to False.
  3870. """
  3871. # Pass 1: content ends in newline
  3872. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3873. ret = self.run_state(
  3874. "file.blockreplace",
  3875. name=name,
  3876. content=self.content,
  3877. marker_start=self.marker_start,
  3878. marker_end=self.marker_end,
  3879. append_newline=False,
  3880. )
  3881. self.assertSaltTrueReturn(ret)
  3882. self.assertTrue(ret[next(iter(ret))]["changes"])
  3883. self.assertEqual(self._read(name), self.with_matching_block)
  3884. # Pass 1a: Re-run state, no changes should be made
  3885. ret = self.run_state(
  3886. "file.blockreplace",
  3887. name=name,
  3888. content=self.content,
  3889. marker_start=self.marker_start,
  3890. marker_end=self.marker_end,
  3891. append_newline=False,
  3892. )
  3893. self.assertSaltTrueReturn(ret)
  3894. self.assertFalse(ret[next(iter(ret))]["changes"])
  3895. self.assertEqual(self._read(name), self.with_matching_block)
  3896. # Pass 2: content does not end in newline
  3897. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3898. ret = self.run_state(
  3899. "file.blockreplace",
  3900. name=name,
  3901. content=self.content.rstrip("\r\n"),
  3902. marker_start=self.marker_start,
  3903. marker_end=self.marker_end,
  3904. append_newline=False,
  3905. )
  3906. self.assertSaltTrueReturn(ret)
  3907. self.assertFalse(ret[next(iter(ret))]["changes"])
  3908. self.assertEqual(
  3909. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3910. )
  3911. # Pass 2a: Re-run state, no changes should be made
  3912. ret = self.run_state(
  3913. "file.blockreplace",
  3914. name=name,
  3915. content=self.content.rstrip("\r\n"),
  3916. marker_start=self.marker_start,
  3917. marker_end=self.marker_end,
  3918. append_newline=False,
  3919. )
  3920. self.assertSaltTrueReturn(ret)
  3921. self.assertFalse(ret[next(iter(ret))]["changes"])
  3922. self.assertEqual(
  3923. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3924. )
  3925. @with_tempfile()
  3926. def test_issue_49043(self, name):
  3927. ret = self.run_function("state.sls", mods="issue-49043", pillar={"name": name},)
  3928. log.error("ret = %s", repr(ret))
  3929. diff = "--- \n+++ \n@@ -0,0 +1,3 @@\n"
  3930. diff += dedent(
  3931. """\
  3932. +#-- start managed zone --
  3933. +äöü
  3934. +#-- end managed zone --
  3935. """
  3936. )
  3937. job = "file_|-somefile-blockreplace_|-{}_|-blockreplace".format(name)
  3938. self.assertEqual(ret[job]["changes"]["diff"], diff)
  3939. @pytest.mark.windows_whitelisted
  3940. class RemoteFileTest(ModuleCase, SaltReturnAssertsMixin):
  3941. """
  3942. Uses a local tornado webserver to test http(s) file.managed states with and
  3943. without skip_verify
  3944. """
  3945. @classmethod
  3946. def setUpClass(cls):
  3947. cls.webserver = Webserver()
  3948. cls.webserver.start()
  3949. cls.source = cls.webserver.url("grail/scene33")
  3950. if IS_WINDOWS:
  3951. # CRLF vs LF causes a different hash on windows
  3952. cls.source_hash = "21438b3d5fd2c0028bcab92f7824dc69"
  3953. else:
  3954. cls.source_hash = "d2feb3beb323c79fc7a0f44f1408b4a3"
  3955. @classmethod
  3956. def tearDownClass(cls):
  3957. cls.webserver.stop()
  3958. @with_tempfile(create=False)
  3959. def setUp(self, name): # pylint: disable=arguments-differ
  3960. self.name = name
  3961. def tearDown(self):
  3962. try:
  3963. os.remove(self.name)
  3964. except OSError as exc:
  3965. if exc.errno != errno.ENOENT:
  3966. six.reraise(*sys.exc_info())
  3967. def run_state(self, *args, **kwargs): # pylint: disable=arguments-differ
  3968. ret = super(RemoteFileTest, self).run_state(*args, **kwargs)
  3969. log.debug("ret = %s", ret)
  3970. return ret
  3971. def test_file_managed_http_source_no_hash(self):
  3972. """
  3973. Test a remote file with no hash
  3974. """
  3975. ret = self.run_state(
  3976. "file.managed", name=self.name, source=self.source, skip_verify=False
  3977. )
  3978. # This should fail because no hash was provided
  3979. self.assertSaltFalseReturn(ret)
  3980. def test_file_managed_http_source(self):
  3981. """
  3982. Test a remote file with no hash
  3983. """
  3984. ret = self.run_state(
  3985. "file.managed",
  3986. name=self.name,
  3987. source=self.source,
  3988. source_hash=self.source_hash,
  3989. skip_verify=False,
  3990. )
  3991. self.assertSaltTrueReturn(ret)
  3992. def test_file_managed_http_source_skip_verify(self):
  3993. """
  3994. Test a remote file using skip_verify
  3995. """
  3996. ret = self.run_state(
  3997. "file.managed", name=self.name, source=self.source, skip_verify=True
  3998. )
  3999. self.assertSaltTrueReturn(ret)
  4000. def test_file_managed_keep_source_false_http(self):
  4001. """
  4002. This test ensures that we properly clean the cached file if keep_source
  4003. is set to False, for source files using an http:// URL
  4004. """
  4005. # Run the state
  4006. ret = self.run_state(
  4007. "file.managed",
  4008. name=self.name,
  4009. source=self.source,
  4010. source_hash=self.source_hash,
  4011. keep_source=False,
  4012. )
  4013. ret = ret[next(iter(ret))]
  4014. assert ret["result"] is True
  4015. # Now make sure that the file is not cached
  4016. result = self.run_function("cp.is_cached", [self.source])
  4017. assert result == "", "File is still cached at {0}".format(result)
  4018. @skipIf(not salt.utils.path.which("patch"), "patch is not installed")
  4019. @pytest.mark.windows_whitelisted
  4020. class PatchTest(ModuleCase, SaltReturnAssertsMixin):
  4021. def _check_patch_version(self, min_version):
  4022. """
  4023. patch version check
  4024. """
  4025. version = self.run_function("cmd.run", ["patch --version"]).splitlines()[0]
  4026. version = version.split()[1]
  4027. if _LooseVersion(version) < _LooseVersion(min_version):
  4028. self.skipTest(
  4029. "Minimum patch version required: {0}. "
  4030. "Patch version installed: {1}".format(min_version, version)
  4031. )
  4032. @classmethod
  4033. def setUpClass(cls):
  4034. cls.webserver = Webserver()
  4035. cls.webserver.start()
  4036. cls.numbers_patch_name = "numbers.patch"
  4037. cls.math_patch_name = "math.patch"
  4038. cls.all_patch_name = "all.patch"
  4039. cls.numbers_patch_template_name = cls.numbers_patch_name + ".jinja"
  4040. cls.math_patch_template_name = cls.math_patch_name + ".jinja"
  4041. cls.all_patch_template_name = cls.all_patch_name + ".jinja"
  4042. cls.numbers_patch_path = "patches/" + cls.numbers_patch_name
  4043. cls.math_patch_path = "patches/" + cls.math_patch_name
  4044. cls.all_patch_path = "patches/" + cls.all_patch_name
  4045. cls.numbers_patch_template_path = "patches/" + cls.numbers_patch_template_name
  4046. cls.math_patch_template_path = "patches/" + cls.math_patch_template_name
  4047. cls.all_patch_template_path = "patches/" + cls.all_patch_template_name
  4048. cls.numbers_patch = "salt://" + cls.numbers_patch_path
  4049. cls.math_patch = "salt://" + cls.math_patch_path
  4050. cls.all_patch = "salt://" + cls.all_patch_path
  4051. cls.numbers_patch_template = "salt://" + cls.numbers_patch_template_path
  4052. cls.math_patch_template = "salt://" + cls.math_patch_template_path
  4053. cls.all_patch_template = "salt://" + cls.all_patch_template_path
  4054. cls.numbers_patch_http = cls.webserver.url(cls.numbers_patch_path)
  4055. cls.math_patch_http = cls.webserver.url(cls.math_patch_path)
  4056. cls.all_patch_http = cls.webserver.url(cls.all_patch_path)
  4057. cls.numbers_patch_template_http = cls.webserver.url(
  4058. cls.numbers_patch_template_path
  4059. )
  4060. cls.math_patch_template_http = cls.webserver.url(cls.math_patch_template_path)
  4061. cls.all_patch_template_http = cls.webserver.url(cls.all_patch_template_path)
  4062. patches_dir = os.path.join(RUNTIME_VARS.FILES, "file", "base", "patches")
  4063. cls.numbers_patch_hash = salt.utils.hashutils.get_hash(
  4064. os.path.join(patches_dir, cls.numbers_patch_name)
  4065. )
  4066. cls.math_patch_hash = salt.utils.hashutils.get_hash(
  4067. os.path.join(patches_dir, cls.math_patch_name)
  4068. )
  4069. cls.all_patch_hash = salt.utils.hashutils.get_hash(
  4070. os.path.join(patches_dir, cls.all_patch_name)
  4071. )
  4072. cls.numbers_patch_template_hash = salt.utils.hashutils.get_hash(
  4073. os.path.join(patches_dir, cls.numbers_patch_template_name)
  4074. )
  4075. cls.math_patch_template_hash = salt.utils.hashutils.get_hash(
  4076. os.path.join(patches_dir, cls.math_patch_template_name)
  4077. )
  4078. cls.all_patch_template_hash = salt.utils.hashutils.get_hash(
  4079. os.path.join(patches_dir, cls.all_patch_template_name)
  4080. )
  4081. cls.context = {"two": "two", "ten": 10}
  4082. @classmethod
  4083. def tearDownClass(cls):
  4084. cls.webserver.stop()
  4085. def setUp(self):
  4086. """
  4087. Create a new unpatched set of files
  4088. """
  4089. self.base_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  4090. os.makedirs(os.path.join(self.base_dir, "foo", "bar"))
  4091. self.numbers_file = os.path.join(self.base_dir, "foo", "numbers.txt")
  4092. self.math_file = os.path.join(self.base_dir, "foo", "bar", "math.txt")
  4093. with salt.utils.files.fopen(self.numbers_file, "w") as fp_:
  4094. fp_.write(
  4095. textwrap.dedent(
  4096. """\
  4097. one
  4098. two
  4099. three
  4100. 1
  4101. 2
  4102. 3
  4103. """
  4104. )
  4105. )
  4106. with salt.utils.files.fopen(self.math_file, "w") as fp_:
  4107. fp_.write(
  4108. textwrap.dedent(
  4109. """\
  4110. Five plus five is ten
  4111. Four squared is sixteen
  4112. """
  4113. )
  4114. )
  4115. self.addCleanup(shutil.rmtree, self.base_dir, ignore_errors=True)
  4116. def test_patch_single_file(self):
  4117. """
  4118. Test file.patch using a patch applied to a single file
  4119. """
  4120. ret = self.run_state(
  4121. "file.patch", name=self.numbers_file, source=self.numbers_patch,
  4122. )
  4123. self.assertSaltTrueReturn(ret)
  4124. ret = ret[next(iter(ret))]
  4125. self.assertEqual(ret["comment"], "Patch successfully applied")
  4126. # Re-run the state, should succeed and there should be a message about
  4127. # a partially-applied hunk.
  4128. ret = self.run_state(
  4129. "file.patch", name=self.numbers_file, source=self.numbers_patch,
  4130. )
  4131. self.assertSaltTrueReturn(ret)
  4132. ret = ret[next(iter(ret))]
  4133. self.assertEqual(ret["comment"], "Patch was already applied")
  4134. self.assertEqual(ret["changes"], {})
  4135. def test_patch_directory(self):
  4136. """
  4137. Test file.patch using a patch applied to a directory, with changes
  4138. spanning multiple files.
  4139. """
  4140. self._check_patch_version("2.6")
  4141. ret = self.run_state(
  4142. "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
  4143. )
  4144. self.assertSaltTrueReturn(ret)
  4145. ret = ret[next(iter(ret))]
  4146. self.assertEqual(ret["comment"], "Patch successfully applied")
  4147. # Re-run the state, should succeed and there should be a message about
  4148. # a partially-applied hunk.
  4149. ret = self.run_state(
  4150. "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
  4151. )
  4152. self.assertSaltTrueReturn(ret)
  4153. ret = ret[next(iter(ret))]
  4154. self.assertEqual(ret["comment"], "Patch was already applied")
  4155. self.assertEqual(ret["changes"], {})
  4156. def test_patch_strip_parsing(self):
  4157. """
  4158. Test that we successfuly parse -p/--strip when included in the options
  4159. """
  4160. self._check_patch_version("2.6")
  4161. # Run the state using -p1
  4162. ret = self.run_state(
  4163. "file.patch", name=self.base_dir, source=self.all_patch, options="-p1",
  4164. )
  4165. self.assertSaltTrueReturn(ret)
  4166. ret = ret[next(iter(ret))]
  4167. self.assertEqual(ret["comment"], "Patch successfully applied")
  4168. # Re-run the state using --strip=1
  4169. ret = self.run_state(
  4170. "file.patch",
  4171. name=self.base_dir,
  4172. source=self.all_patch,
  4173. options="--strip=1",
  4174. )
  4175. self.assertSaltTrueReturn(ret)
  4176. ret = ret[next(iter(ret))]
  4177. self.assertEqual(ret["comment"], "Patch was already applied")
  4178. self.assertEqual(ret["changes"], {})
  4179. # Re-run the state using --strip 1
  4180. ret = self.run_state(
  4181. "file.patch",
  4182. name=self.base_dir,
  4183. source=self.all_patch,
  4184. options="--strip 1",
  4185. )
  4186. self.assertSaltTrueReturn(ret)
  4187. ret = ret[next(iter(ret))]
  4188. self.assertEqual(ret["comment"], "Patch was already applied")
  4189. self.assertEqual(ret["changes"], {})
  4190. def test_patch_saltenv(self):
  4191. """
  4192. Test that we attempt to download the patch from a non-base saltenv
  4193. """
  4194. # This state will fail because we don't have a patch file in that
  4195. # environment, but that is OK, we just want to test that we're looking
  4196. # in an environment other than base.
  4197. ret = self.run_state(
  4198. "file.patch", name=self.math_file, source=self.math_patch, saltenv="prod",
  4199. )
  4200. self.assertSaltFalseReturn(ret)
  4201. ret = ret[next(iter(ret))]
  4202. self.assertEqual(
  4203. ret["comment"],
  4204. "Source file {0} not found in saltenv 'prod'".format(self.math_patch),
  4205. )
  4206. def test_patch_single_file_failure(self):
  4207. """
  4208. Test file.patch using a patch applied to a single file. This tests a
  4209. failed patch.
  4210. """
  4211. # Empty the file to ensure that the patch doesn't apply cleanly
  4212. with salt.utils.files.fopen(self.numbers_file, "w"):
  4213. pass
  4214. ret = self.run_state(
  4215. "file.patch", name=self.numbers_file, source=self.numbers_patch,
  4216. )
  4217. self.assertSaltFalseReturn(ret)
  4218. ret = ret[next(iter(ret))]
  4219. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4220. # Test the reject_file option and ensure that the rejects are written
  4221. # to the path specified.
  4222. reject_file = salt.utils.files.mkstemp()
  4223. ret = self.run_state(
  4224. "file.patch",
  4225. name=self.numbers_file,
  4226. source=self.numbers_patch,
  4227. reject_file=reject_file,
  4228. strip=1,
  4229. )
  4230. self.assertSaltFalseReturn(ret)
  4231. ret = ret[next(iter(ret))]
  4232. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4233. self.assertRegex(
  4234. ret["comment"], "saving rejects to (file )?{0}".format(reject_file)
  4235. )
  4236. def test_patch_directory_failure(self):
  4237. """
  4238. Test file.patch using a patch applied to a directory, with changes
  4239. spanning multiple files.
  4240. """
  4241. # Empty the file to ensure that the patch doesn't apply
  4242. with salt.utils.files.fopen(self.math_file, "w"):
  4243. pass
  4244. ret = self.run_state(
  4245. "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
  4246. )
  4247. self.assertSaltFalseReturn(ret)
  4248. ret = ret[next(iter(ret))]
  4249. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4250. # Test the reject_file option and ensure that the rejects are written
  4251. # to the path specified.
  4252. reject_file = salt.utils.files.mkstemp()
  4253. ret = self.run_state(
  4254. "file.patch",
  4255. name=self.base_dir,
  4256. source=self.all_patch,
  4257. reject_file=reject_file,
  4258. strip=1,
  4259. )
  4260. self.assertSaltFalseReturn(ret)
  4261. ret = ret[next(iter(ret))]
  4262. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4263. self.assertRegex(
  4264. ret["comment"], "saving rejects to (file )?{0}".format(reject_file)
  4265. )
  4266. def test_patch_single_file_remote_source(self):
  4267. """
  4268. Test file.patch using a patch applied to a single file, with the patch
  4269. coming from a remote source.
  4270. """
  4271. # Try without a source_hash and without skip_verify=True, this should
  4272. # fail with a message about the source_hash
  4273. ret = self.run_state(
  4274. "file.patch", name=self.math_file, source=self.math_patch_http,
  4275. )
  4276. self.assertSaltFalseReturn(ret)
  4277. ret = ret[next(iter(ret))]
  4278. self.assertIn("Unable to verify upstream hash", ret["comment"])
  4279. # Re-run the state with a source hash, it should now succeed
  4280. ret = self.run_state(
  4281. "file.patch",
  4282. name=self.math_file,
  4283. source=self.math_patch_http,
  4284. source_hash=self.math_patch_hash,
  4285. )
  4286. self.assertSaltTrueReturn(ret)
  4287. ret = ret[next(iter(ret))]
  4288. self.assertEqual(ret["comment"], "Patch successfully applied")
  4289. # Re-run again, this time with no hash and skip_verify=True to test
  4290. # skipping hash verification
  4291. ret = self.run_state(
  4292. "file.patch",
  4293. name=self.math_file,
  4294. source=self.math_patch_http,
  4295. skip_verify=True,
  4296. )
  4297. self.assertSaltTrueReturn(ret)
  4298. ret = ret[next(iter(ret))]
  4299. self.assertEqual(ret["comment"], "Patch was already applied")
  4300. self.assertEqual(ret["changes"], {})
  4301. def test_patch_directory_remote_source(self):
  4302. """
  4303. Test file.patch using a patch applied to a directory, with changes
  4304. spanning multiple files, and the patch file coming from a remote
  4305. source.
  4306. """
  4307. self._check_patch_version("2.6")
  4308. # Try without a source_hash and without skip_verify=True, this should
  4309. # fail with a message about the source_hash
  4310. ret = self.run_state(
  4311. "file.patch", name=self.base_dir, source=self.all_patch_http, strip=1,
  4312. )
  4313. self.assertSaltFalseReturn(ret)
  4314. ret = ret[next(iter(ret))]
  4315. self.assertIn("Unable to verify upstream hash", ret["comment"])
  4316. # Re-run the state with a source hash, it should now succeed
  4317. ret = self.run_state(
  4318. "file.patch",
  4319. name=self.base_dir,
  4320. source=self.all_patch_http,
  4321. source_hash=self.all_patch_hash,
  4322. strip=1,
  4323. )
  4324. self.assertSaltTrueReturn(ret)
  4325. ret = ret[next(iter(ret))]
  4326. self.assertEqual(ret["comment"], "Patch successfully applied")
  4327. # Re-run again, this time with no hash and skip_verify=True to test
  4328. # skipping hash verification
  4329. ret = self.run_state(
  4330. "file.patch",
  4331. name=self.base_dir,
  4332. source=self.all_patch_http,
  4333. strip=1,
  4334. skip_verify=True,
  4335. )
  4336. self.assertSaltTrueReturn(ret)
  4337. ret = ret[next(iter(ret))]
  4338. self.assertEqual(ret["comment"], "Patch was already applied")
  4339. self.assertEqual(ret["changes"], {})
  4340. def test_patch_single_file_template(self):
  4341. """
  4342. Test file.patch using a patch applied to a single file, with jinja
  4343. templating applied to the patch file.
  4344. """
  4345. ret = self.run_state(
  4346. "file.patch",
  4347. name=self.numbers_file,
  4348. source=self.numbers_patch_template,
  4349. template="jinja",
  4350. context=self.context,
  4351. )
  4352. self.assertSaltTrueReturn(ret)
  4353. ret = ret[next(iter(ret))]
  4354. self.assertEqual(ret["comment"], "Patch successfully applied")
  4355. # Re-run the state, should succeed and there should be a message about
  4356. # a partially-applied hunk.
  4357. ret = self.run_state(
  4358. "file.patch",
  4359. name=self.numbers_file,
  4360. source=self.numbers_patch_template,
  4361. template="jinja",
  4362. context=self.context,
  4363. )
  4364. self.assertSaltTrueReturn(ret)
  4365. ret = ret[next(iter(ret))]
  4366. self.assertEqual(ret["comment"], "Patch was already applied")
  4367. self.assertEqual(ret["changes"], {})
  4368. def test_patch_directory_template(self):
  4369. """
  4370. Test file.patch using a patch applied to a directory, with changes
  4371. spanning multiple files, and with jinja templating applied to the patch
  4372. file.
  4373. """
  4374. self._check_patch_version("2.6")
  4375. ret = self.run_state(
  4376. "file.patch",
  4377. name=self.base_dir,
  4378. source=self.all_patch_template,
  4379. template="jinja",
  4380. context=self.context,
  4381. strip=1,
  4382. )
  4383. self.assertSaltTrueReturn(ret)
  4384. ret = ret[next(iter(ret))]
  4385. self.assertEqual(ret["comment"], "Patch successfully applied")
  4386. # Re-run the state, should succeed and there should be a message about
  4387. # a partially-applied hunk.
  4388. ret = self.run_state(
  4389. "file.patch",
  4390. name=self.base_dir,
  4391. source=self.all_patch_template,
  4392. template="jinja",
  4393. context=self.context,
  4394. strip=1,
  4395. )
  4396. self.assertSaltTrueReturn(ret)
  4397. ret = ret[next(iter(ret))]
  4398. self.assertEqual(ret["comment"], "Patch was already applied")
  4399. self.assertEqual(ret["changes"], {})
  4400. def test_patch_single_file_remote_source_template(self):
  4401. """
  4402. Test file.patch using a patch applied to a single file, with the patch
  4403. coming from a remote source.
  4404. """
  4405. # Try without a source_hash and without skip_verify=True, this should
  4406. # fail with a message about the source_hash
  4407. ret = self.run_state(
  4408. "file.patch",
  4409. name=self.math_file,
  4410. source=self.math_patch_template_http,
  4411. template="jinja",
  4412. context=self.context,
  4413. )
  4414. self.assertSaltFalseReturn(ret)
  4415. ret = ret[next(iter(ret))]
  4416. self.assertIn("Unable to verify upstream hash", ret["comment"])
  4417. # Re-run the state with a source hash, it should now succeed
  4418. ret = self.run_state(
  4419. "file.patch",
  4420. name=self.math_file,
  4421. source=self.math_patch_template_http,
  4422. source_hash=self.math_patch_template_hash,
  4423. template="jinja",
  4424. context=self.context,
  4425. )
  4426. self.assertSaltTrueReturn(ret)
  4427. ret = ret[next(iter(ret))]
  4428. self.assertEqual(ret["comment"], "Patch successfully applied")
  4429. # Re-run again, this time with no hash and skip_verify=True to test
  4430. # skipping hash verification
  4431. ret = self.run_state(
  4432. "file.patch",
  4433. name=self.math_file,
  4434. source=self.math_patch_template_http,
  4435. template="jinja",
  4436. context=self.context,
  4437. skip_verify=True,
  4438. )
  4439. self.assertSaltTrueReturn(ret)
  4440. ret = ret[next(iter(ret))]
  4441. self.assertEqual(ret["comment"], "Patch was already applied")
  4442. self.assertEqual(ret["changes"], {})
  4443. def test_patch_directory_remote_source_template(self):
  4444. """
  4445. Test file.patch using a patch applied to a directory, with changes
  4446. spanning multiple files, and the patch file coming from a remote
  4447. source.
  4448. """
  4449. self._check_patch_version("2.6")
  4450. # Try without a source_hash and without skip_verify=True, this should
  4451. # fail with a message about the source_hash
  4452. ret = self.run_state(
  4453. "file.patch",
  4454. name=self.base_dir,
  4455. source=self.all_patch_template_http,
  4456. template="jinja",
  4457. context=self.context,
  4458. strip=1,
  4459. )
  4460. self.assertSaltFalseReturn(ret)
  4461. ret = ret[next(iter(ret))]
  4462. self.assertIn("Unable to verify upstream hash", ret["comment"])
  4463. # Re-run the state with a source hash, it should now succeed
  4464. ret = self.run_state(
  4465. "file.patch",
  4466. name=self.base_dir,
  4467. source=self.all_patch_template_http,
  4468. source_hash=self.all_patch_template_hash,
  4469. template="jinja",
  4470. context=self.context,
  4471. strip=1,
  4472. )
  4473. self.assertSaltTrueReturn(ret)
  4474. ret = ret[next(iter(ret))]
  4475. self.assertEqual(ret["comment"], "Patch successfully applied")
  4476. # Re-run again, this time with no hash and skip_verify=True to test
  4477. # skipping hash verification
  4478. ret = self.run_state(
  4479. "file.patch",
  4480. name=self.base_dir,
  4481. source=self.all_patch_template_http,
  4482. template="jinja",
  4483. context=self.context,
  4484. strip=1,
  4485. skip_verify=True,
  4486. )
  4487. self.assertSaltTrueReturn(ret)
  4488. ret = ret[next(iter(ret))]
  4489. self.assertEqual(ret["comment"], "Patch was already applied")
  4490. self.assertEqual(ret["changes"], {})
  4491. def test_patch_test_mode(self):
  4492. """
  4493. Test file.patch using test=True
  4494. """
  4495. # Try without a source_hash and without skip_verify=True, this should
  4496. # fail with a message about the source_hash
  4497. ret = self.run_state(
  4498. "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
  4499. )
  4500. self.assertSaltNoneReturn(ret)
  4501. ret = ret[next(iter(ret))]
  4502. self.assertEqual(ret["comment"], "The patch would be applied")
  4503. self.assertTrue(ret["changes"])
  4504. # Apply the patch for real. We'll then be able to test below that we
  4505. # exit with a True rather than a None result if test=True is used on an
  4506. # already-applied patch.
  4507. ret = self.run_state(
  4508. "file.patch", name=self.numbers_file, source=self.numbers_patch,
  4509. )
  4510. self.assertSaltTrueReturn(ret)
  4511. ret = ret[next(iter(ret))]
  4512. self.assertEqual(ret["comment"], "Patch successfully applied")
  4513. self.assertTrue(ret["changes"])
  4514. # Run again with test=True. Since the pre-check happens before we do
  4515. # the __opts__['test'] check, we should exit with a True result just
  4516. # the same as if we try to run this state on an already-patched file
  4517. # *without* test=True.
  4518. ret = self.run_state(
  4519. "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
  4520. )
  4521. self.assertSaltTrueReturn(ret)
  4522. ret = ret[next(iter(ret))]
  4523. self.assertEqual(ret["comment"], "Patch was already applied")
  4524. self.assertEqual(ret["changes"], {})
  4525. # Empty the file to ensure that the patch doesn't apply cleanly
  4526. with salt.utils.files.fopen(self.numbers_file, "w"):
  4527. pass
  4528. # Run again with test=True. Similar to the above run, we are testing
  4529. # that we return before we reach the __opts__['test'] check. In this
  4530. # case we should return a False result because we should already know
  4531. # by this point that the patch will not apply cleanly.
  4532. ret = self.run_state(
  4533. "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
  4534. )
  4535. self.assertSaltFalseReturn(ret)
  4536. ret = ret[next(iter(ret))]
  4537. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4538. self.assertEqual(ret["changes"], {})
  4539. WIN_TEST_FILE = "c:/testfile"
  4540. @destructiveTest
  4541. @skipIf(not IS_WINDOWS, "windows test only")
  4542. @pytest.mark.windows_whitelisted
  4543. class WinFileTest(ModuleCase):
  4544. """
  4545. Test for the file state on Windows
  4546. """
  4547. def setUp(self):
  4548. self.run_state(
  4549. "file.managed", name=WIN_TEST_FILE, makedirs=True, contents="Only a test"
  4550. )
  4551. def tearDown(self):
  4552. self.run_state("file.absent", name=WIN_TEST_FILE)
  4553. def test_file_managed(self):
  4554. """
  4555. Test file.managed on Windows
  4556. """
  4557. self.assertTrue(self.run_state("file.exists", name=WIN_TEST_FILE))
  4558. def test_file_copy(self):
  4559. """
  4560. Test file.copy on Windows
  4561. """
  4562. ret = self.run_state(
  4563. "file.copy", name="c:/testfile_copy", makedirs=True, source=WIN_TEST_FILE
  4564. )
  4565. self.assertTrue(ret)
  4566. def test_file_comment(self):
  4567. """
  4568. Test file.comment on Windows
  4569. """
  4570. self.run_state("file.comment", name=WIN_TEST_FILE, regex="^Only")
  4571. with salt.utils.files.fopen(WIN_TEST_FILE, "r") as fp_:
  4572. self.assertTrue(fp_.read().startswith("#Only"))
  4573. def test_file_replace(self):
  4574. """
  4575. Test file.replace on Windows
  4576. """
  4577. self.run_state(
  4578. "file.replace", name=WIN_TEST_FILE, pattern="test", repl="testing"
  4579. )
  4580. with salt.utils.files.fopen(WIN_TEST_FILE, "r") as fp_:
  4581. self.assertIn("testing", fp_.read())
  4582. def test_file_absent(self):
  4583. """
  4584. Test file.absent on Windows
  4585. """
  4586. ret = self.run_state("file.absent", name=WIN_TEST_FILE)
  4587. self.assertTrue(ret)