1
0

test_file.py 187 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests for the file state
  4. """
  5. from __future__ import absolute_import, print_function, unicode_literals
  6. import errno
  7. import filecmp
  8. import logging
  9. import os
  10. import re
  11. import shutil
  12. import stat
  13. import sys
  14. import tempfile
  15. import textwrap
  16. import pytest
  17. import salt.serializers.configparser
  18. import salt.serializers.plist
  19. import salt.utils.data
  20. import salt.utils.files
  21. import salt.utils.json
  22. import salt.utils.path
  23. import salt.utils.platform
  24. import salt.utils.stringutils
  25. from salt.ext import six
  26. from salt.ext.six.moves import range
  27. from salt.utils.versions import LooseVersion as _LooseVersion
  28. from tests.support.case import ModuleCase
  29. from tests.support.helpers import (
  30. Webserver,
  31. dedent,
  32. destructiveTest,
  33. skip_if_not_root,
  34. with_system_user_and_group,
  35. with_tempdir,
  36. with_tempfile,
  37. )
  38. from tests.support.mixins import SaltReturnAssertsMixin
  39. from tests.support.runtests import RUNTIME_VARS
  40. from tests.support.unit import skipIf
  41. log = logging.getLogger(__name__)
  42. HAS_PWD = True
  43. try:
  44. import pwd
  45. except ImportError:
  46. HAS_PWD = False
  47. HAS_GRP = True
  48. try:
  49. import grp
  50. except ImportError:
  51. HAS_GRP = False
  52. IS_WINDOWS = salt.utils.platform.is_windows()
  53. BINARY_FILE = b"GIF89a\x01\x00\x01\x00\x80\x00\x00\x05\x04\x04\x00\x00\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02D\x01\x00;"
  54. TEST_SYSTEM_USER = "test_system_user"
  55. TEST_SYSTEM_GROUP = "test_system_group"
  56. def _test_managed_file_mode_keep_helper(testcase, local=False):
  57. """
  58. DRY helper function to run the same test with a local or remote path
  59. """
  60. name = os.path.join(RUNTIME_VARS.TMP, "scene33")
  61. grail_fs_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
  62. grail = "salt://grail/scene33" if not local else grail_fs_path
  63. # Get the current mode so that we can put the file back the way we
  64. # found it when we're done.
  65. grail_fs_mode = int(testcase.run_function("file.get_mode", [grail_fs_path]), 8)
  66. initial_mode = 0o770
  67. new_mode_1 = 0o600
  68. new_mode_2 = 0o644
  69. # Set the initial mode, so we can be assured that when we set the mode
  70. # to "keep", we're actually changing the permissions of the file to the
  71. # new mode.
  72. ret = testcase.run_state(
  73. "file.managed", name=name, mode=oct(initial_mode), source=grail,
  74. )
  75. if IS_WINDOWS:
  76. testcase.assertSaltFalseReturn(ret)
  77. return
  78. testcase.assertSaltTrueReturn(ret)
  79. try:
  80. # Update the mode on the fileserver (pass 1)
  81. os.chmod(grail_fs_path, new_mode_1)
  82. ret = testcase.run_state("file.managed", name=name, mode="keep", source=grail,)
  83. testcase.assertSaltTrueReturn(ret)
  84. managed_mode = stat.S_IMODE(os.stat(name).st_mode)
  85. testcase.assertEqual(oct(managed_mode), oct(new_mode_1))
  86. # Update the mode on the fileserver (pass 2)
  87. # This assures us that if the file in file_roots was originally set
  88. # to the same mode as new_mode_1, we definitely get an updated mode
  89. # this time.
  90. os.chmod(grail_fs_path, new_mode_2)
  91. ret = testcase.run_state("file.managed", name=name, mode="keep", source=grail,)
  92. testcase.assertSaltTrueReturn(ret)
  93. managed_mode = stat.S_IMODE(os.stat(name).st_mode)
  94. testcase.assertEqual(oct(managed_mode), oct(new_mode_2))
  95. finally:
  96. # Set the mode of the file in the file_roots back to what it
  97. # originally was.
  98. os.chmod(grail_fs_path, grail_fs_mode)
  99. @pytest.mark.windows_whitelisted
  100. class FileTest(ModuleCase, SaltReturnAssertsMixin):
  101. """
  102. Validate the file state
  103. """
  104. def _delete_file(self, path):
  105. try:
  106. os.remove(path)
  107. except OSError as exc:
  108. if exc.errno != errno.ENOENT:
  109. log.error("Failed to remove %s: %s", path, exc)
  110. def tearDown(self):
  111. """
  112. remove files created in previous tests
  113. """
  114. user = "salt"
  115. if user in str(self.run_function("user.list_users")):
  116. self.run_function("user.delete", [user])
  117. def test_symlink(self):
  118. """
  119. file.symlink
  120. """
  121. name = os.path.join(RUNTIME_VARS.TMP, "symlink")
  122. tgt = os.path.join(RUNTIME_VARS.TMP, "target")
  123. # Windows must have a source directory to link to
  124. if IS_WINDOWS and not os.path.isdir(tgt):
  125. os.mkdir(tgt)
  126. # Windows cannot create a symlink if it already exists
  127. if IS_WINDOWS and self.run_function("file.is_link", [name]):
  128. self.run_function("file.remove", [name])
  129. ret = self.run_state("file.symlink", name=name, target=tgt)
  130. self.assertSaltTrueReturn(ret)
  131. def test_test_symlink(self):
  132. """
  133. file.symlink test interface
  134. """
  135. name = os.path.join(RUNTIME_VARS.TMP, "symlink2")
  136. tgt = os.path.join(RUNTIME_VARS.TMP, "target")
  137. ret = self.run_state("file.symlink", test=True, name=name, target=tgt)
  138. self.assertSaltNoneReturn(ret)
  139. def test_absent_file(self):
  140. """
  141. file.absent
  142. """
  143. name = os.path.join(RUNTIME_VARS.TMP, "file_to_kill")
  144. with salt.utils.files.fopen(name, "w+") as fp_:
  145. fp_.write("killme")
  146. ret = self.run_state("file.absent", name=name)
  147. self.assertSaltTrueReturn(ret)
  148. self.assertFalse(os.path.isfile(name))
  149. def test_absent_dir(self):
  150. """
  151. file.absent
  152. """
  153. name = os.path.join(RUNTIME_VARS.TMP, "dir_to_kill")
  154. if not os.path.isdir(name):
  155. # left behind... Don't fail because of this!
  156. os.makedirs(name)
  157. ret = self.run_state("file.absent", name=name)
  158. self.assertSaltTrueReturn(ret)
  159. self.assertFalse(os.path.isdir(name))
  160. def test_absent_link(self):
  161. """
  162. file.absent
  163. """
  164. name = os.path.join(RUNTIME_VARS.TMP, "link_to_kill")
  165. tgt = "{0}.tgt".format(name)
  166. # Windows must have a source directory to link to
  167. if IS_WINDOWS and not os.path.isdir(tgt):
  168. os.mkdir(tgt)
  169. if not self.run_function("file.is_link", [name]):
  170. self.run_function("file.symlink", [tgt, name])
  171. ret = self.run_state("file.absent", name=name)
  172. try:
  173. self.assertSaltTrueReturn(ret)
  174. self.assertFalse(self.run_function("file.is_link", [name]))
  175. finally:
  176. if self.run_function("file.is_link", [name]):
  177. self.run_function("file.remove", [name])
  178. @with_tempfile()
  179. def test_test_absent(self, name):
  180. """
  181. file.absent test interface
  182. """
  183. with salt.utils.files.fopen(name, "w+") as fp_:
  184. fp_.write("killme")
  185. ret = self.run_state("file.absent", test=True, name=name)
  186. self.assertSaltNoneReturn(ret)
  187. self.assertTrue(os.path.isfile(name))
  188. def test_managed(self):
  189. """
  190. file.managed
  191. """
  192. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  193. ret = self.run_state("file.managed", name=name, source="salt://grail/scene33")
  194. src = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
  195. with salt.utils.files.fopen(src, "r") as fp_:
  196. master_data = fp_.read()
  197. with salt.utils.files.fopen(name, "r") as fp_:
  198. minion_data = fp_.read()
  199. self.assertEqual(master_data, minion_data)
  200. self.assertSaltTrueReturn(ret)
  201. def test_managed_file_mode(self):
  202. """
  203. file.managed, correct file permissions
  204. """
  205. desired_mode = 504 # 0770 octal
  206. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  207. ret = self.run_state(
  208. "file.managed", name=name, mode="0770", source="salt://grail/scene33"
  209. )
  210. if IS_WINDOWS:
  211. expected = "The 'mode' option is not supported on Windows"
  212. self.assertEqual(ret[list(ret)[0]]["comment"], expected)
  213. self.assertSaltFalseReturn(ret)
  214. return
  215. resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
  216. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  217. self.assertSaltTrueReturn(ret)
  218. @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
  219. def test_managed_file_mode_keep(self):
  220. """
  221. Test using "mode: keep" in a file.managed state
  222. """
  223. _test_managed_file_mode_keep_helper(self, local=False)
  224. @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
  225. def test_managed_file_mode_keep_local_source(self):
  226. """
  227. Test using "mode: keep" in a file.managed state, with a local file path
  228. as the source.
  229. """
  230. _test_managed_file_mode_keep_helper(self, local=True)
  231. def test_managed_file_mode_file_exists_replace(self):
  232. """
  233. file.managed, existing file with replace=True, change permissions
  234. """
  235. initial_mode = 504 # 0770 octal
  236. desired_mode = 384 # 0600 octal
  237. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  238. ret = self.run_state(
  239. "file.managed",
  240. name=name,
  241. mode=oct(initial_mode),
  242. source="salt://grail/scene33",
  243. )
  244. if IS_WINDOWS:
  245. expected = "The 'mode' option is not supported on Windows"
  246. self.assertEqual(ret[list(ret)[0]]["comment"], expected)
  247. self.assertSaltFalseReturn(ret)
  248. return
  249. resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
  250. self.assertEqual(oct(initial_mode), oct(resulting_mode))
  251. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  252. ret = self.run_state(
  253. "file.managed",
  254. name=name,
  255. replace=True,
  256. mode=oct(desired_mode),
  257. source="salt://grail/scene33",
  258. )
  259. resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
  260. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  261. self.assertSaltTrueReturn(ret)
  262. def test_managed_file_mode_file_exists_noreplace(self):
  263. """
  264. file.managed, existing file with replace=False, change permissions
  265. """
  266. initial_mode = 504 # 0770 octal
  267. desired_mode = 384 # 0600 octal
  268. name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
  269. ret = self.run_state(
  270. "file.managed",
  271. name=name,
  272. replace=True,
  273. mode=oct(initial_mode),
  274. source="salt://grail/scene33",
  275. )
  276. if IS_WINDOWS:
  277. expected = "The 'mode' option is not supported on Windows"
  278. self.assertEqual(ret[list(ret)[0]]["comment"], expected)
  279. self.assertSaltFalseReturn(ret)
  280. return
  281. ret = self.run_state(
  282. "file.managed",
  283. name=name,
  284. replace=False,
  285. mode=oct(desired_mode),
  286. source="salt://grail/scene33",
  287. )
  288. resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
  289. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  290. self.assertSaltTrueReturn(ret)
  291. def test_managed_file_with_grains_data(self):
  292. """
  293. Test to ensure we can render grains data into a managed
  294. file.
  295. """
  296. grain_path = os.path.join(RUNTIME_VARS.TMP, "file-grain-test")
  297. state_file = "file-grainget"
  298. self.run_function("state.sls", [state_file], pillar={"grain_path": grain_path})
  299. self.assertTrue(os.path.exists(grain_path))
  300. with salt.utils.files.fopen(grain_path, "r") as fp_:
  301. file_contents = fp_.readlines()
  302. if IS_WINDOWS:
  303. match = "^minion\r\n"
  304. else:
  305. match = "^minion\n"
  306. self.assertTrue(re.match(match, file_contents[0]))
  307. def test_managed_file_with_pillar_sls(self):
  308. """
  309. Test to ensure pillar data in sls file
  310. is rendered properly and file is created.
  311. """
  312. file_pillar = os.path.join(RUNTIME_VARS.TMP, "filepillar-python")
  313. self.addCleanup(self._delete_file, file_pillar)
  314. state_name = "file-pillarget"
  315. log.warning("File Path: %s", file_pillar)
  316. ret = self.run_function("state.sls", [state_name])
  317. self.assertSaltTrueReturn(ret)
  318. # Check to make sure the file was created
  319. check_file = self.run_function("file.file_exists", [file_pillar])
  320. self.assertTrue(check_file)
  321. def test_managed_file_with_pillardefault_sls(self):
  322. """
  323. Test to ensure when pillar data is not available
  324. in sls file with pillar.get it uses the default
  325. value.
  326. """
  327. file_pillar_def = os.path.join(RUNTIME_VARS.TMP, "filepillar-defaultvalue")
  328. self.addCleanup(self._delete_file, file_pillar_def)
  329. state_name = "file-pillardefaultget"
  330. log.warning("File Path: %s", file_pillar_def)
  331. ret = self.run_function("state.sls", [state_name])
  332. self.assertSaltTrueReturn(ret)
  333. # Check to make sure the file was created
  334. check_file = self.run_function("file.file_exists", [file_pillar_def])
  335. self.assertTrue(check_file)
  336. @skip_if_not_root
  337. def test_managed_dir_mode(self):
  338. """
  339. Tests to ensure that file.managed creates directories with the
  340. permissions requested with the dir_mode argument
  341. """
  342. desired_mode = 511 # 0777 in octal
  343. name = os.path.join(RUNTIME_VARS.TMP, "a", "managed_dir_mode_test_file")
  344. desired_owner = "nobody"
  345. ret = self.run_state(
  346. "file.managed",
  347. name=name,
  348. source="salt://grail/scene33",
  349. mode=600,
  350. makedirs=True,
  351. user=desired_owner,
  352. dir_mode=oct(desired_mode), # 0777
  353. )
  354. if IS_WINDOWS:
  355. expected = "The 'mode' option is not supported on Windows"
  356. self.assertEqual(ret[list(ret)[0]]["comment"], expected)
  357. self.assertSaltFalseReturn(ret)
  358. return
  359. resulting_mode = stat.S_IMODE(
  360. os.stat(os.path.join(RUNTIME_VARS.TMP, "a")).st_mode
  361. )
  362. resulting_owner = pwd.getpwuid(
  363. os.stat(os.path.join(RUNTIME_VARS.TMP, "a")).st_uid
  364. ).pw_name
  365. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  366. self.assertSaltTrueReturn(ret)
  367. self.assertEqual(desired_owner, resulting_owner)
  368. def test_test_managed(self):
  369. """
  370. file.managed test interface
  371. """
  372. name = os.path.join(RUNTIME_VARS.TMP, "grail_not_not_scene33")
  373. ret = self.run_state(
  374. "file.managed", test=True, name=name, source="salt://grail/scene33"
  375. )
  376. self.assertSaltNoneReturn(ret)
  377. self.assertFalse(os.path.isfile(name))
  378. def test_managed_show_changes_false(self):
  379. """
  380. file.managed test interface
  381. """
  382. name = os.path.join(RUNTIME_VARS.TMP, "grail_not_scene33")
  383. with salt.utils.files.fopen(name, "wb") as fp_:
  384. fp_.write(b"test_managed_show_changes_false\n")
  385. ret = self.run_state(
  386. "file.managed", name=name, source="salt://grail/scene33", show_changes=False
  387. )
  388. changes = next(six.itervalues(ret))["changes"]
  389. self.assertEqual("<show_changes=False>", changes["diff"])
  390. def test_managed_show_changes_true(self):
  391. """
  392. file.managed test interface
  393. """
  394. name = os.path.join(RUNTIME_VARS.TMP, "grail_not_scene33")
  395. with salt.utils.files.fopen(name, "wb") as fp_:
  396. fp_.write(b"test_managed_show_changes_false\n")
  397. ret = self.run_state("file.managed", name=name, source="salt://grail/scene33",)
  398. changes = next(six.itervalues(ret))["changes"]
  399. self.assertIn("diff", changes)
  400. @skipIf(IS_WINDOWS, "Don't know how to fix for Windows")
  401. def test_managed_escaped_file_path(self):
  402. """
  403. file.managed test that 'salt://|' protects unusual characters in file path
  404. """
  405. funny_file = salt.utils.files.mkstemp(
  406. prefix="?f!le? n@=3&", suffix=".file type"
  407. )
  408. funny_file_name = os.path.split(funny_file)[1]
  409. funny_url = "salt://|" + funny_file_name
  410. funny_url_path = os.path.join(RUNTIME_VARS.BASE_FILES, funny_file_name)
  411. state_name = "funny_file"
  412. state_file_name = state_name + ".sls"
  413. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_file_name)
  414. state_key = "file_|-{0}_|-{0}_|-managed".format(funny_file)
  415. self.addCleanup(os.remove, state_file)
  416. self.addCleanup(os.remove, funny_file)
  417. self.addCleanup(os.remove, funny_url_path)
  418. with salt.utils.files.fopen(funny_url_path, "w"):
  419. pass
  420. with salt.utils.files.fopen(state_file, "w") as fp_:
  421. fp_.write(
  422. textwrap.dedent(
  423. """\
  424. {0}:
  425. file.managed:
  426. - source: {1}
  427. - makedirs: True
  428. """.format(
  429. funny_file, funny_url
  430. )
  431. )
  432. )
  433. ret = self.run_function("state.sls", [state_name])
  434. self.assertTrue(ret[state_key]["result"])
  435. def test_managed_contents(self):
  436. """
  437. test file.managed with contents that is a boolean, string, integer,
  438. float, list, and dictionary
  439. """
  440. state_name = "file-FileTest-test_managed_contents"
  441. state_filename = state_name + ".sls"
  442. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  443. managed_files = {}
  444. state_keys = {}
  445. for typ in ("bool", "str", "int", "float", "list", "dict"):
  446. managed_files[typ] = salt.utils.files.mkstemp()
  447. state_keys[typ] = "file_|-{0} file_|-{1}_|-managed".format(
  448. typ, managed_files[typ]
  449. )
  450. try:
  451. with salt.utils.files.fopen(state_file, "w") as fd_:
  452. fd_.write(
  453. textwrap.dedent(
  454. """\
  455. bool file:
  456. file.managed:
  457. - name: {bool}
  458. - contents: True
  459. str file:
  460. file.managed:
  461. - name: {str}
  462. - contents: Salt was here.
  463. int file:
  464. file.managed:
  465. - name: {int}
  466. - contents: 340282366920938463463374607431768211456
  467. float file:
  468. file.managed:
  469. - name: {float}
  470. - contents: 1.7518e-45 # gravitational coupling constant
  471. list file:
  472. file.managed:
  473. - name: {list}
  474. - contents: [1, 1, 2, 3, 5, 8, 13]
  475. dict file:
  476. file.managed:
  477. - name: {dict}
  478. - contents:
  479. C: charge
  480. P: parity
  481. T: time
  482. """.format(
  483. **managed_files
  484. )
  485. )
  486. )
  487. ret = self.run_function("state.sls", [state_name])
  488. self.assertSaltTrueReturn(ret)
  489. for typ in state_keys:
  490. self.assertTrue(ret[state_keys[typ]]["result"])
  491. self.assertIn("diff", ret[state_keys[typ]]["changes"])
  492. finally:
  493. if os.path.exists(state_file):
  494. os.remove(state_file)
  495. for typ in managed_files:
  496. if os.path.exists(managed_files[typ]):
  497. os.remove(managed_files[typ])
  498. def test_onchanges_any_recursive_error_issues_50811(self):
  499. """
  500. test that onchanges_any does not causes a recursive error
  501. """
  502. state_name = "onchanges_any_recursive_error"
  503. state_filename = state_name + ".sls"
  504. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  505. try:
  506. with salt.utils.files.fopen(state_file, "w") as fd_:
  507. fd_.write(
  508. textwrap.dedent(
  509. """\
  510. command-test:
  511. cmd.run:
  512. - name: ls
  513. - onchanges_any:
  514. - file: /tmp/an-unfollowed-file
  515. """
  516. )
  517. )
  518. ret = self.run_function("state.sls", [state_name])
  519. self.assertSaltFalseReturn(ret)
  520. finally:
  521. if os.path.exists(state_file):
  522. os.remove(state_file)
  523. def test_prerequired_issues_55775(self):
  524. """
  525. Test that __prereqired__ is filter from file.replace
  526. if __prereqired__ is not filter from file.replace an error will be raised
  527. """
  528. state_name = "Test_Issues_55775"
  529. state_filename = state_name + ".sls"
  530. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  531. test_file = os.path.join(RUNTIME_VARS.BASE_FILES, "Issues_55775.txt")
  532. try:
  533. with salt.utils.files.fopen(state_file, "w") as fd_:
  534. fd_.write(
  535. textwrap.dedent(
  536. """\
  537. /tmp/bug.txt:
  538. file.managed:
  539. - name: {0}
  540. - contents:
  541. - foo
  542. file.replace:
  543. file.replace:
  544. - name: {0}
  545. - pattern: 'foo'
  546. - repl: 'bar'
  547. - prereq:
  548. - test no changes
  549. - test changes
  550. test no changes:
  551. test.succeed_without_changes:
  552. - name: no changes
  553. test changes:
  554. test.succeed_with_changes:
  555. - name: changes
  556. - require:
  557. - test: test no changes
  558. """.format(
  559. test_file
  560. )
  561. )
  562. )
  563. ret = self.run_function("state.sls", [state_name])
  564. self.assertSaltTrueReturn(ret)
  565. finally:
  566. for file in (state_file, test_file):
  567. if os.path.exists(file):
  568. os.remove(file)
  569. def test_managed_contents_with_contents_newline(self):
  570. """
  571. test file.managed with contents by using the default content_newline
  572. flag.
  573. """
  574. contents = "test_managed_contents_with_newline_one"
  575. name = os.path.join(RUNTIME_VARS.TMP, "foo")
  576. # Create a file named foo with contents as above but with a \n at EOF
  577. self.run_state(
  578. "file.managed", name=name, contents=contents, contents_newline=True
  579. )
  580. with salt.utils.files.fopen(name, "r") as fp_:
  581. last_line = fp_.read()
  582. self.assertEqual((contents + os.linesep), last_line)
  583. def test_managed_contents_with_contents_newline_false(self):
  584. """
  585. test file.managed with contents by using the non default content_newline
  586. flag.
  587. """
  588. contents = "test_managed_contents_with_newline_one"
  589. name = os.path.join(RUNTIME_VARS.TMP, "bar")
  590. # Create a file named foo with contents as above but with a \n at EOF
  591. self.run_state(
  592. "file.managed", name=name, contents=contents, contents_newline=False
  593. )
  594. with salt.utils.files.fopen(name, "r") as fp_:
  595. last_line = fp_.read()
  596. self.assertEqual(contents, last_line)
  597. def test_managed_multiline_contents_with_contents_newline(self):
  598. """
  599. test file.managed with contents by using the non default content_newline
  600. flag.
  601. """
  602. contents = "this is a cookie{}this is another cookie".format(os.linesep)
  603. name = os.path.join(RUNTIME_VARS.TMP, "bar")
  604. # Create a file named foo with contents as above but with a \n at EOF
  605. self.run_state(
  606. "file.managed", name=name, contents=contents, contents_newline=True
  607. )
  608. with salt.utils.files.fopen(name, "r") as fp_:
  609. last_line = fp_.read()
  610. self.assertEqual((contents + os.linesep), last_line)
  611. def test_managed_multiline_contents_with_contents_newline_false(self):
  612. """
  613. test file.managed with contents by using the non default content_newline
  614. flag.
  615. """
  616. contents = "this is a cookie{}this is another cookie".format(os.linesep)
  617. name = os.path.join(RUNTIME_VARS.TMP, "bar")
  618. # Create a file named foo with contents as above but with a \n at EOF
  619. self.run_state(
  620. "file.managed", name=name, contents=contents, contents_newline=False
  621. )
  622. with salt.utils.files.fopen(name, "r") as fp_:
  623. last_line = fp_.read()
  624. self.assertEqual(contents, last_line)
  625. @skip_if_not_root
  626. @skipIf(IS_WINDOWS, 'Windows does not support "mode" kwarg. Skipping.')
  627. @skipIf(not salt.utils.path.which("visudo"), "sudo is missing")
  628. def test_managed_check_cmd(self):
  629. """
  630. Test file.managed passing a basic check_cmd kwarg. See Issue #38111.
  631. """
  632. r_group = "root"
  633. if salt.utils.platform.is_darwin():
  634. r_group = "wheel"
  635. try:
  636. ret = self.run_state(
  637. "file.managed",
  638. name="/tmp/sudoers",
  639. user="root",
  640. group=r_group,
  641. mode=440,
  642. check_cmd="visudo -c -s -f",
  643. )
  644. self.assertSaltTrueReturn(ret)
  645. self.assertInSaltComment("Empty file", ret)
  646. self.assertEqual(
  647. ret["file_|-/tmp/sudoers_|-/tmp/sudoers_|-managed"]["changes"],
  648. {"new": "file /tmp/sudoers created", "mode": "0440"},
  649. )
  650. finally:
  651. # Clean Up File
  652. if os.path.exists("/tmp/sudoers"):
  653. os.remove("/tmp/sudoers")
  654. def test_managed_local_source_with_source_hash(self):
  655. """
  656. Make sure that we enforce the source_hash even with local files
  657. """
  658. name = os.path.join(RUNTIME_VARS.TMP, "local_source_with_source_hash")
  659. local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
  660. actual_hash = "567fd840bf1548edc35c48eb66cdd78bfdfcccff"
  661. if IS_WINDOWS:
  662. # CRLF vs LF causes a different hash on windows
  663. actual_hash = "f658a0ec121d9c17088795afcc6ff3c43cb9842a"
  664. # Reverse the actual hash
  665. bad_hash = actual_hash[::-1]
  666. def remove_file():
  667. try:
  668. os.remove(name)
  669. except OSError as exc:
  670. if exc.errno != errno.ENOENT:
  671. raise
  672. def do_test(clean=False):
  673. for proto in ("file://", ""):
  674. source = proto + local_path
  675. log.debug("Trying source %s", source)
  676. try:
  677. ret = self.run_state(
  678. "file.managed",
  679. name=name,
  680. source=source,
  681. source_hash="sha1={0}".format(bad_hash),
  682. )
  683. self.assertSaltFalseReturn(ret)
  684. ret = ret[next(iter(ret))]
  685. # Shouldn't be any changes
  686. self.assertFalse(ret["changes"])
  687. # Check that we identified a hash mismatch
  688. self.assertIn("does not match actual checksum", ret["comment"])
  689. ret = self.run_state(
  690. "file.managed",
  691. name=name,
  692. source=source,
  693. source_hash="sha1={0}".format(actual_hash),
  694. )
  695. self.assertSaltTrueReturn(ret)
  696. finally:
  697. if clean:
  698. remove_file()
  699. remove_file()
  700. log.debug("Trying with nonexistant destination file")
  701. do_test()
  702. log.debug("Trying with destination file already present")
  703. with salt.utils.files.fopen(name, "w"):
  704. pass
  705. try:
  706. do_test(clean=False)
  707. finally:
  708. remove_file()
  709. def test_managed_local_source_does_not_exist(self):
  710. """
  711. Make sure that we exit gracefully when a local source doesn't exist
  712. """
  713. name = os.path.join(RUNTIME_VARS.TMP, "local_source_does_not_exist")
  714. local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene99")
  715. for proto in ("file://", ""):
  716. source = proto + local_path
  717. log.debug("Trying source %s", source)
  718. ret = self.run_state("file.managed", name=name, source=source)
  719. self.assertSaltFalseReturn(ret)
  720. ret = ret[next(iter(ret))]
  721. # Shouldn't be any changes
  722. self.assertFalse(ret["changes"])
  723. # Check that we identified a hash mismatch
  724. self.assertIn("does not exist", ret["comment"])
  725. def test_managed_unicode_jinja_with_tojson_filter(self):
  726. """
  727. Using {{ varname }} with a list or dictionary which contains unicode
  728. types on Python 2 will result in Jinja rendering the "u" prefix on each
  729. string. This tests that using the "tojson" jinja filter will dump them
  730. to a format which can be successfully loaded by our YAML loader.
  731. The two lines that should end up being rendered are meant to test two
  732. issues that would trip up PyYAML if the "tojson" filter were not used:
  733. 1. A unicode string type would be loaded as a unicode literal with the
  734. leading "u" as well as the quotes, rather than simply being loaded
  735. as the proper unicode type which matches the content of the string
  736. literal. In other words, u'foo' would be loaded literally as
  737. u"u'foo'". This test includes actual non-ascii unicode in one of the
  738. strings to confirm that this also handles these international
  739. characters properly.
  740. 2. Any unicode string type (such as a URL) which contains a colon would
  741. cause a ScannerError in PyYAML, as it would be assumed to delimit a
  742. mapping node.
  743. Dumping the data structure to JSON using the "tojson" jinja filter
  744. should produce an inline data structure which is valid YAML and will be
  745. loaded properly by our YAML loader.
  746. """
  747. test_file = os.path.join(RUNTIME_VARS.TMP, "test-tojson.txt")
  748. ret = self.run_function(
  749. "state.apply", mods="tojson", pillar={"tojson-file": test_file}
  750. )
  751. ret = ret[next(iter(ret))]
  752. assert ret["result"], ret
  753. with salt.utils.files.fopen(test_file, mode="rb") as fp_:
  754. managed = salt.utils.stringutils.to_unicode(fp_.read())
  755. expected = dedent(
  756. """\
  757. Die Webseite ist https://saltstack.com.
  758. Der Zucker ist süß.
  759. """
  760. )
  761. assert managed == expected, "{0!r} != {1!r}".format(managed, expected)
  762. def test_managed_source_hash_indifferent_case(self):
  763. """
  764. Test passing a source_hash as an uppercase hash.
  765. This is a regression test for Issue #38914 and Issue #48230 (test=true use).
  766. """
  767. name = os.path.join(RUNTIME_VARS.TMP, "source_hash_indifferent_case")
  768. state_name = "file_|-{0}_|" "-{0}_|-managed".format(name)
  769. local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "hello_world.txt")
  770. actual_hash = "c98c24b677eff44860afea6f493bbaec5bb1c4cbb209c6fc2bbb47f66ff2ad31"
  771. if IS_WINDOWS:
  772. # CRLF vs LF causes a differnt hash on windows
  773. actual_hash = (
  774. "92b772380a3f8e27a93e57e6deeca6c01da07f5aadce78bb2fbb20de10a66925"
  775. )
  776. uppercase_hash = actual_hash.upper()
  777. try:
  778. # Lay down tmp file to test against
  779. self.run_state(
  780. "file.managed", name=name, source=local_path, source_hash=actual_hash
  781. )
  782. # Test uppercase source_hash: should return True with no changes
  783. ret = self.run_state(
  784. "file.managed", name=name, source=local_path, source_hash=uppercase_hash
  785. )
  786. assert ret[state_name]["result"] is True
  787. assert ret[state_name]["changes"] == {}
  788. # Test uppercase source_hash using test=true
  789. # Should return True with no changes
  790. ret = self.run_state(
  791. "file.managed",
  792. name=name,
  793. source=local_path,
  794. source_hash=uppercase_hash,
  795. test=True,
  796. )
  797. assert ret[state_name]["result"] is True
  798. assert ret[state_name]["changes"] == {}
  799. finally:
  800. # Clean Up File
  801. if os.path.exists(name):
  802. os.remove(name)
  803. @with_tempfile(create=False)
  804. def test_managed_latin1_diff(self, name):
  805. """
  806. Tests that latin-1 file contents are represented properly in the diff
  807. """
  808. # Lay down the initial file
  809. ret = self.run_state(
  810. "file.managed", name=name, source="salt://issue-48777/old.html"
  811. )
  812. ret = ret[next(iter(ret))]
  813. assert ret["result"] is True, ret
  814. # Replace it with the new file and check the diff
  815. ret = self.run_state(
  816. "file.managed", name=name, source="salt://issue-48777/new.html"
  817. )
  818. ret = ret[next(iter(ret))]
  819. assert ret["result"] is True, ret
  820. diff_lines = ret["changes"]["diff"].split(os.linesep)
  821. assert "+räksmörgås" in diff_lines, diff_lines
  822. @with_tempfile()
  823. def test_managed_keep_source_false_salt(self, name):
  824. """
  825. This test ensures that we properly clean the cached file if keep_source
  826. is set to False, for source files using a salt:// URL
  827. """
  828. source = "salt://grail/scene33"
  829. saltenv = "base"
  830. # Run the state
  831. ret = self.run_state(
  832. "file.managed", name=name, source=source, saltenv=saltenv, keep_source=False
  833. )
  834. ret = ret[next(iter(ret))]
  835. assert ret["result"] is True
  836. # Now make sure that the file is not cached
  837. result = self.run_function("cp.is_cached", [source, saltenv])
  838. assert result == "", "File is still cached at {0}".format(result)
  839. @with_tempfile(create=False)
  840. @with_tempfile(create=False)
  841. def test_file_managed_onchanges(self, file1, file2):
  842. """
  843. Test file.managed state with onchanges
  844. """
  845. pillar = {
  846. "file1": file1,
  847. "file2": file2,
  848. "source": "salt://testfile",
  849. "req": "onchanges",
  850. }
  851. # Lay down the file used in the below SLS to ensure that when it is
  852. # run, there are no changes.
  853. self.run_state("file.managed", name=pillar["file2"], source=pillar["source"])
  854. ret = self.repack_state_returns(
  855. self.run_function(
  856. "state.apply", mods="onchanges_prereq", pillar=pillar, test=True,
  857. )
  858. )
  859. # The file states should both exit with None
  860. assert ret["one"]["result"] is None, ret["one"]["result"]
  861. assert ret["three"]["result"] is True, ret["three"]["result"]
  862. # The first file state should have changes, since a new file was
  863. # created. The other one should not, since we already created that file
  864. # before applying the SLS file.
  865. assert ret["one"]["changes"]
  866. assert not ret["three"]["changes"], ret["three"]["changes"]
  867. # The state watching 'one' should have been run due to changes
  868. assert ret["two"]["comment"] == "Success!", ret["two"]["comment"]
  869. # The state watching 'three' should not have been run
  870. assert (
  871. ret["four"]["comment"]
  872. == "State was not run because none of the onchanges reqs changed"
  873. ), ret["four"]["comment"]
  874. @with_tempfile(create=False)
  875. @with_tempfile(create=False)
  876. def test_file_managed_prereq(self, file1, file2):
  877. """
  878. Test file.managed state with prereq
  879. """
  880. pillar = {
  881. "file1": file1,
  882. "file2": file2,
  883. "source": "salt://testfile",
  884. "req": "prereq",
  885. }
  886. # Lay down the file used in the below SLS to ensure that when it is
  887. # run, there are no changes.
  888. self.run_state("file.managed", name=pillar["file2"], source=pillar["source"])
  889. ret = self.repack_state_returns(
  890. self.run_function(
  891. "state.apply", mods="onchanges_prereq", pillar=pillar, test=True,
  892. )
  893. )
  894. # The file states should both exit with None
  895. assert ret["one"]["result"] is None, ret["one"]["result"]
  896. assert ret["three"]["result"] is True, ret["three"]["result"]
  897. # The first file state should have changes, since a new file was
  898. # created. The other one should not, since we already created that file
  899. # before applying the SLS file.
  900. assert ret["one"]["changes"]
  901. assert not ret["three"]["changes"], ret["three"]["changes"]
  902. # The state watching 'one' should have been run due to changes
  903. assert ret["two"]["comment"] == "Success!", ret["two"]["comment"]
  904. # The state watching 'three' should not have been run
  905. assert ret["four"]["comment"] == "No changes detected", ret["four"]["comment"]
  906. def test_directory(self):
  907. """
  908. file.directory
  909. """
  910. name = os.path.join(RUNTIME_VARS.TMP, "a_new_dir")
  911. ret = self.run_state("file.directory", name=name)
  912. self.assertSaltTrueReturn(ret)
  913. self.assertTrue(os.path.isdir(name))
  914. def test_directory_symlink_dry_run(self):
  915. """
  916. Ensure that symlinks are followed when file.directory is run with
  917. test=True
  918. """
  919. try:
  920. tmp_dir = os.path.join(RUNTIME_VARS.TMP, "pgdata")
  921. sym_dir = os.path.join(RUNTIME_VARS.TMP, "pg_data")
  922. os.mkdir(tmp_dir, 0o700)
  923. self.run_function("file.symlink", [tmp_dir, sym_dir])
  924. if IS_WINDOWS:
  925. ret = self.run_state(
  926. "file.directory",
  927. test=True,
  928. name=sym_dir,
  929. follow_symlinks=True,
  930. win_owner="Administrators",
  931. )
  932. else:
  933. ret = self.run_state(
  934. "file.directory",
  935. test=True,
  936. name=sym_dir,
  937. follow_symlinks=True,
  938. mode=700,
  939. )
  940. self.assertSaltTrueReturn(ret)
  941. finally:
  942. if os.path.isdir(tmp_dir):
  943. self.run_function("file.remove", [tmp_dir])
  944. if os.path.islink(sym_dir):
  945. self.run_function("file.remove", [sym_dir])
  946. @skip_if_not_root
  947. @skipIf(IS_WINDOWS, "Mode not available in Windows")
  948. def test_directory_max_depth(self):
  949. """
  950. file.directory
  951. Test the max_depth option by iteratively increasing the depth and
  952. checking that no changes deeper than max_depth have been attempted
  953. """
  954. def _get_oct_mode(name):
  955. """
  956. Return a string octal representation of the permissions for name
  957. """
  958. return salt.utils.files.normalize_mode(oct(os.stat(name).st_mode & 0o777))
  959. top = os.path.join(RUNTIME_VARS.TMP, "top_dir")
  960. sub = os.path.join(top, "sub_dir")
  961. subsub = os.path.join(sub, "sub_sub_dir")
  962. dirs = [top, sub, subsub]
  963. initial_mode = "0111"
  964. changed_mode = "0555"
  965. initial_modes = {
  966. 0: {sub: "0755", subsub: "0111"},
  967. 1: {sub: "0111", subsub: "0111"},
  968. 2: {sub: "0111", subsub: "0111"},
  969. }
  970. if not os.path.isdir(subsub):
  971. os.makedirs(subsub, int(initial_mode, 8))
  972. try:
  973. for depth in range(0, 3):
  974. ret = self.run_state(
  975. "file.directory",
  976. name=top,
  977. max_depth=depth,
  978. dir_mode=changed_mode,
  979. recurse=["mode"],
  980. )
  981. self.assertSaltTrueReturn(ret)
  982. for changed_dir in dirs[0 : depth + 1]:
  983. self.assertEqual(changed_mode, _get_oct_mode(changed_dir))
  984. for untouched_dir in dirs[depth + 1 :]:
  985. # Beginning in Python 3.7, os.makedirs no longer sets
  986. # the mode of intermediate directories to the mode that
  987. # is passed.
  988. if sys.version_info >= (3, 7):
  989. _mode = initial_modes[depth][untouched_dir]
  990. self.assertEqual(_mode, _get_oct_mode(untouched_dir))
  991. else:
  992. self.assertEqual(initial_mode, _get_oct_mode(untouched_dir))
  993. finally:
  994. shutil.rmtree(top)
  995. def test_test_directory(self):
  996. """
  997. file.directory
  998. """
  999. name = os.path.join(RUNTIME_VARS.TMP, "a_not_dir")
  1000. ret = self.run_state("file.directory", test=True, name=name)
  1001. self.assertSaltNoneReturn(ret)
  1002. self.assertFalse(os.path.isdir(name))
  1003. @with_tempdir()
  1004. def test_directory_clean(self, base_dir):
  1005. """
  1006. file.directory with clean=True
  1007. """
  1008. name = os.path.join(base_dir, "directory_clean_dir")
  1009. os.mkdir(name)
  1010. strayfile = os.path.join(name, "strayfile")
  1011. with salt.utils.files.fopen(strayfile, "w"):
  1012. pass
  1013. straydir = os.path.join(name, "straydir")
  1014. if not os.path.isdir(straydir):
  1015. os.makedirs(straydir)
  1016. with salt.utils.files.fopen(os.path.join(straydir, "strayfile2"), "w"):
  1017. pass
  1018. ret = self.run_state("file.directory", name=name, clean=True)
  1019. self.assertSaltTrueReturn(ret)
  1020. self.assertFalse(os.path.exists(strayfile))
  1021. self.assertFalse(os.path.exists(straydir))
  1022. self.assertTrue(os.path.isdir(name))
  1023. def test_directory_is_idempotent(self):
  1024. """
  1025. Ensure the file.directory state produces no changes when rerun.
  1026. """
  1027. name = os.path.join(RUNTIME_VARS.TMP, "a_dir_twice")
  1028. if IS_WINDOWS:
  1029. username = os.environ.get("USERNAME", "Administrators")
  1030. domain = os.environ.get("USERDOMAIN", "")
  1031. fullname = "{0}\\{1}".format(domain, username)
  1032. ret = self.run_state("file.directory", name=name, win_owner=fullname)
  1033. else:
  1034. ret = self.run_state("file.directory", name=name)
  1035. self.assertSaltTrueReturn(ret)
  1036. if IS_WINDOWS:
  1037. ret = self.run_state("file.directory", name=name, win_owner=username)
  1038. else:
  1039. ret = self.run_state("file.directory", name=name)
  1040. self.assertSaltTrueReturn(ret)
  1041. self.assertSaltStateChangesEqual(ret, {})
  1042. @with_tempdir()
  1043. def test_directory_clean_exclude(self, base_dir):
  1044. """
  1045. file.directory with clean=True and exclude_pat set
  1046. """
  1047. name = os.path.join(base_dir, "directory_clean_dir")
  1048. if not os.path.isdir(name):
  1049. os.makedirs(name)
  1050. strayfile = os.path.join(name, "strayfile")
  1051. with salt.utils.files.fopen(strayfile, "w"):
  1052. pass
  1053. straydir = os.path.join(name, "straydir")
  1054. if not os.path.isdir(straydir):
  1055. os.makedirs(straydir)
  1056. strayfile2 = os.path.join(straydir, "strayfile2")
  1057. with salt.utils.files.fopen(strayfile2, "w"):
  1058. pass
  1059. keepfile = os.path.join(straydir, "keepfile")
  1060. with salt.utils.files.fopen(keepfile, "w"):
  1061. pass
  1062. exclude_pat = "E@^straydir(|/keepfile)$"
  1063. if IS_WINDOWS:
  1064. exclude_pat = "E@^straydir(|\\\\keepfile)$"
  1065. ret = self.run_state(
  1066. "file.directory", name=name, clean=True, exclude_pat=exclude_pat
  1067. )
  1068. self.assertSaltTrueReturn(ret)
  1069. self.assertFalse(os.path.exists(strayfile))
  1070. self.assertFalse(os.path.exists(strayfile2))
  1071. self.assertTrue(os.path.exists(keepfile))
  1072. @skipIf(IS_WINDOWS, "Skip on windows")
  1073. @with_tempdir()
  1074. def test_test_directory_clean_exclude(self, base_dir):
  1075. """
  1076. file.directory with test=True, clean=True and exclude_pat set
  1077. Skipped on windows because clean and exclude_pat not supported by
  1078. salt.sates.file._check_directory_win
  1079. """
  1080. name = os.path.join(base_dir, "directory_clean_dir")
  1081. os.mkdir(name)
  1082. strayfile = os.path.join(name, "strayfile")
  1083. with salt.utils.files.fopen(strayfile, "w"):
  1084. pass
  1085. straydir = os.path.join(name, "straydir")
  1086. if not os.path.isdir(straydir):
  1087. os.makedirs(straydir)
  1088. strayfile2 = os.path.join(straydir, "strayfile2")
  1089. with salt.utils.files.fopen(strayfile2, "w"):
  1090. pass
  1091. keepfile = os.path.join(straydir, "keepfile")
  1092. with salt.utils.files.fopen(keepfile, "w"):
  1093. pass
  1094. exclude_pat = "E@^straydir(|/keepfile)$"
  1095. if IS_WINDOWS:
  1096. exclude_pat = "E@^straydir(|\\\\keepfile)$"
  1097. ret = self.run_state(
  1098. "file.directory", test=True, name=name, clean=True, exclude_pat=exclude_pat
  1099. )
  1100. comment = next(six.itervalues(ret))["comment"]
  1101. self.assertSaltNoneReturn(ret)
  1102. self.assertTrue(os.path.exists(strayfile))
  1103. self.assertTrue(os.path.exists(strayfile2))
  1104. self.assertTrue(os.path.exists(keepfile))
  1105. self.assertIn(strayfile, comment)
  1106. self.assertIn(strayfile2, comment)
  1107. self.assertNotIn(keepfile, comment)
  1108. @with_tempdir()
  1109. def test_directory_clean_require_in(self, name):
  1110. """
  1111. file.directory test with clean=True and require_in file
  1112. """
  1113. state_name = "file-FileTest-test_directory_clean_require_in"
  1114. state_filename = state_name + ".sls"
  1115. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  1116. wrong_file = os.path.join(name, "wrong")
  1117. with salt.utils.files.fopen(wrong_file, "w") as fp:
  1118. fp.write("foo")
  1119. good_file = os.path.join(name, "bar")
  1120. with salt.utils.files.fopen(state_file, "w") as fp:
  1121. self.addCleanup(lambda: os.remove(state_file))
  1122. fp.write(
  1123. textwrap.dedent(
  1124. """\
  1125. some_dir:
  1126. file.directory:
  1127. - name: {name}
  1128. - clean: true
  1129. {good_file}:
  1130. file.managed:
  1131. - require_in:
  1132. - file: some_dir
  1133. """.format(
  1134. name=name, good_file=good_file
  1135. )
  1136. )
  1137. )
  1138. ret = self.run_function("state.sls", [state_name])
  1139. self.assertTrue(os.path.exists(good_file))
  1140. self.assertFalse(os.path.exists(wrong_file))
  1141. @with_tempdir()
  1142. def test_directory_clean_require_in_with_id(self, name):
  1143. """
  1144. file.directory test with clean=True and require_in file with an ID
  1145. different from the file name
  1146. """
  1147. state_name = "file-FileTest-test_directory_clean_require_in_with_id"
  1148. state_filename = state_name + ".sls"
  1149. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  1150. wrong_file = os.path.join(name, "wrong")
  1151. with salt.utils.files.fopen(wrong_file, "w") as fp:
  1152. fp.write("foo")
  1153. good_file = os.path.join(name, "bar")
  1154. with salt.utils.files.fopen(state_file, "w") as fp:
  1155. self.addCleanup(lambda: os.remove(state_file))
  1156. fp.write(
  1157. textwrap.dedent(
  1158. """\
  1159. some_dir:
  1160. file.directory:
  1161. - name: {name}
  1162. - clean: true
  1163. some_file:
  1164. file.managed:
  1165. - name: {good_file}
  1166. - require_in:
  1167. - file: some_dir
  1168. """.format(
  1169. name=name, good_file=good_file
  1170. )
  1171. )
  1172. )
  1173. ret = self.run_function("state.sls", [state_name])
  1174. self.assertTrue(os.path.exists(good_file))
  1175. self.assertFalse(os.path.exists(wrong_file))
  1176. @skipIf(
  1177. salt.utils.platform.is_darwin(),
  1178. "WAR ROOM TEMPORARY SKIP, Test is flaky on macosx",
  1179. )
  1180. @with_tempdir()
  1181. def test_directory_clean_require_with_name(self, name):
  1182. """
  1183. file.directory test with clean=True and require with a file state
  1184. relatively to the state's name, not its ID.
  1185. """
  1186. state_name = "file-FileTest-test_directory_clean_require_in_with_id"
  1187. state_filename = state_name + ".sls"
  1188. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  1189. wrong_file = os.path.join(name, "wrong")
  1190. with salt.utils.files.fopen(wrong_file, "w") as fp:
  1191. fp.write("foo")
  1192. good_file = os.path.join(name, "bar")
  1193. with salt.utils.files.fopen(state_file, "w") as fp:
  1194. self.addCleanup(lambda: os.remove(state_file))
  1195. fp.write(
  1196. textwrap.dedent(
  1197. """\
  1198. some_dir:
  1199. file.directory:
  1200. - name: {name}
  1201. - clean: true
  1202. - require:
  1203. # This requirement refers to the name of the following
  1204. # state, not its ID.
  1205. - file: {good_file}
  1206. some_file:
  1207. file.managed:
  1208. - name: {good_file}
  1209. """.format(
  1210. name=name, good_file=good_file
  1211. )
  1212. )
  1213. )
  1214. ret = self.run_function("state.sls", [state_name])
  1215. self.assertTrue(os.path.exists(good_file))
  1216. self.assertFalse(os.path.exists(wrong_file))
  1217. def test_directory_broken_symlink(self):
  1218. """
  1219. Ensure that file.directory works even if a directory
  1220. contains broken symbolic link
  1221. """
  1222. try:
  1223. tmp_dir = os.path.join(RUNTIME_VARS.TMP, "foo")
  1224. null_file = "{0}/null".format(tmp_dir)
  1225. broken_link = "{0}/broken".format(tmp_dir)
  1226. os.mkdir(tmp_dir, 0o700)
  1227. self.run_function("file.symlink", [null_file, broken_link])
  1228. if IS_WINDOWS:
  1229. ret = self.run_state(
  1230. "file.directory",
  1231. name=tmp_dir,
  1232. recurse=["mode"],
  1233. follow_symlinks=True,
  1234. win_owner="Administrators",
  1235. )
  1236. else:
  1237. ret = self.run_state(
  1238. "file.directory",
  1239. name=tmp_dir,
  1240. recurse=["mode"],
  1241. file_mode=644,
  1242. dir_mode=755,
  1243. )
  1244. self.assertSaltTrueReturn(ret)
  1245. finally:
  1246. if os.path.isdir(tmp_dir):
  1247. self.run_function("file.remove", [tmp_dir])
  1248. @with_tempdir(create=False)
  1249. def test_recurse(self, name):
  1250. """
  1251. file.recurse
  1252. """
  1253. ret = self.run_state("file.recurse", name=name, source="salt://grail")
  1254. self.assertSaltTrueReturn(ret)
  1255. self.assertTrue(os.path.isfile(os.path.join(name, "36", "scene")))
  1256. @with_tempdir(create=False)
  1257. @with_tempdir(create=False)
  1258. def test_recurse_specific_env(self, dir1, dir2):
  1259. """
  1260. file.recurse passing __env__
  1261. """
  1262. ret = self.run_state(
  1263. "file.recurse", name=dir1, source="salt://holy", __env__="prod"
  1264. )
  1265. self.assertSaltTrueReturn(ret)
  1266. self.assertTrue(os.path.isfile(os.path.join(dir1, "32", "scene")))
  1267. ret = self.run_state(
  1268. "file.recurse", name=dir2, source="salt://holy", saltenv="prod"
  1269. )
  1270. self.assertSaltTrueReturn(ret)
  1271. self.assertTrue(os.path.isfile(os.path.join(dir2, "32", "scene")))
  1272. @with_tempdir(create=False)
  1273. @with_tempdir(create=False)
  1274. def test_recurse_specific_env_in_url(self, dir1, dir2):
  1275. """
  1276. file.recurse passing __env__
  1277. """
  1278. ret = self.run_state(
  1279. "file.recurse", name=dir1, source="salt://holy?saltenv=prod"
  1280. )
  1281. self.assertSaltTrueReturn(ret)
  1282. self.assertTrue(os.path.isfile(os.path.join(dir1, "32", "scene")))
  1283. ret = self.run_state(
  1284. "file.recurse", name=dir2, source="salt://holy?saltenv=prod"
  1285. )
  1286. self.assertSaltTrueReturn(ret)
  1287. self.assertTrue(os.path.isfile(os.path.join(dir2, "32", "scene")))
  1288. @with_tempdir(create=False)
  1289. def test_test_recurse(self, name):
  1290. """
  1291. file.recurse test interface
  1292. """
  1293. ret = self.run_state(
  1294. "file.recurse", test=True, name=name, source="salt://grail",
  1295. )
  1296. self.assertSaltNoneReturn(ret)
  1297. self.assertFalse(os.path.isfile(os.path.join(name, "36", "scene")))
  1298. self.assertFalse(os.path.exists(name))
  1299. @with_tempdir(create=False)
  1300. @with_tempdir(create=False)
  1301. def test_test_recurse_specific_env(self, dir1, dir2):
  1302. """
  1303. file.recurse test interface
  1304. """
  1305. ret = self.run_state(
  1306. "file.recurse", test=True, name=dir1, source="salt://holy", __env__="prod"
  1307. )
  1308. self.assertSaltNoneReturn(ret)
  1309. self.assertFalse(os.path.isfile(os.path.join(dir1, "32", "scene")))
  1310. self.assertFalse(os.path.exists(dir1))
  1311. ret = self.run_state(
  1312. "file.recurse", test=True, name=dir2, source="salt://holy", saltenv="prod"
  1313. )
  1314. self.assertSaltNoneReturn(ret)
  1315. self.assertFalse(os.path.isfile(os.path.join(dir2, "32", "scene")))
  1316. self.assertFalse(os.path.exists(dir2))
  1317. @with_tempdir(create=False)
  1318. def test_recurse_template(self, name):
  1319. """
  1320. file.recurse with jinja template enabled
  1321. """
  1322. _ts = "TEMPLATE TEST STRING"
  1323. ret = self.run_state(
  1324. "file.recurse",
  1325. name=name,
  1326. source="salt://grail",
  1327. template="jinja",
  1328. defaults={"spam": _ts},
  1329. )
  1330. self.assertSaltTrueReturn(ret)
  1331. with salt.utils.files.fopen(os.path.join(name, "scene33"), "r") as fp_:
  1332. contents = fp_.read()
  1333. self.assertIn(_ts, contents)
  1334. @with_tempdir()
  1335. def test_recurse_clean(self, name):
  1336. """
  1337. file.recurse with clean=True
  1338. """
  1339. strayfile = os.path.join(name, "strayfile")
  1340. with salt.utils.files.fopen(strayfile, "w"):
  1341. pass
  1342. # Corner cases: replacing file with a directory and vice versa
  1343. with salt.utils.files.fopen(os.path.join(name, "36"), "w"):
  1344. pass
  1345. os.makedirs(os.path.join(name, "scene33"))
  1346. ret = self.run_state(
  1347. "file.recurse", name=name, source="salt://grail", clean=True
  1348. )
  1349. self.assertSaltTrueReturn(ret)
  1350. self.assertFalse(os.path.exists(strayfile))
  1351. self.assertTrue(os.path.isfile(os.path.join(name, "36", "scene")))
  1352. self.assertTrue(os.path.isfile(os.path.join(name, "scene33")))
  1353. @with_tempdir()
  1354. def test_recurse_clean_specific_env(self, name):
  1355. """
  1356. file.recurse with clean=True and __env__=prod
  1357. """
  1358. strayfile = os.path.join(name, "strayfile")
  1359. with salt.utils.files.fopen(strayfile, "w"):
  1360. pass
  1361. # Corner cases: replacing file with a directory and vice versa
  1362. with salt.utils.files.fopen(os.path.join(name, "32"), "w"):
  1363. pass
  1364. os.makedirs(os.path.join(name, "scene34"))
  1365. ret = self.run_state(
  1366. "file.recurse", name=name, source="salt://holy", clean=True, __env__="prod"
  1367. )
  1368. self.assertSaltTrueReturn(ret)
  1369. self.assertFalse(os.path.exists(strayfile))
  1370. self.assertTrue(os.path.isfile(os.path.join(name, "32", "scene")))
  1371. self.assertTrue(os.path.isfile(os.path.join(name, "scene34")))
  1372. @skipIf(IS_WINDOWS, "Skip on windows")
  1373. @with_tempdir()
  1374. def test_recurse_issue_34945(self, base_dir):
  1375. """
  1376. This tests the case where the source dir for the file.recurse state
  1377. does not contain any files (only subdirectories), and the dir_mode is
  1378. being managed. For a long time, this corner case resulted in the top
  1379. level of the destination directory being created with the wrong initial
  1380. permissions, a problem that would be corrected later on in the
  1381. file.recurse state via running state.directory. However, the
  1382. file.directory state only gets called when there are files to be
  1383. managed in that directory, and when the source directory contains only
  1384. subdirectories, the incorrectly-set initial perms would not be
  1385. repaired.
  1386. This was fixed in https://github.com/saltstack/salt/pull/35309
  1387. Skipped on windows because dir_mode is not supported.
  1388. """
  1389. dir_mode = "2775"
  1390. issue_dir = "issue-34945"
  1391. name = os.path.join(base_dir, issue_dir)
  1392. ret = self.run_state(
  1393. "file.recurse", name=name, source="salt://" + issue_dir, dir_mode=dir_mode
  1394. )
  1395. self.assertSaltTrueReturn(ret)
  1396. actual_dir_mode = oct(stat.S_IMODE(os.stat(name).st_mode))[-4:]
  1397. self.assertEqual(dir_mode, actual_dir_mode)
  1398. @with_tempdir(create=False)
  1399. def test_recurse_issue_40578(self, name):
  1400. """
  1401. This ensures that the state doesn't raise an exception when it
  1402. encounters a file with a unicode filename in the process of invoking
  1403. file.source_list.
  1404. """
  1405. ret = self.run_state("file.recurse", name=name, source="salt://соль")
  1406. self.assertSaltTrueReturn(ret)
  1407. if six.PY2 and IS_WINDOWS:
  1408. # Providing unicode to os.listdir so that we avoid having listdir
  1409. # try to decode the filenames using the systemencoding on windows
  1410. # python 2.
  1411. files = os.listdir(name.decode("utf-8"))
  1412. else:
  1413. files = salt.utils.data.decode(os.listdir(name), normalize=True)
  1414. self.assertEqual(
  1415. sorted(files), sorted(["foo.txt", "спам.txt", "яйца.txt"]),
  1416. )
  1417. @with_tempfile()
  1418. def test_replace(self, name):
  1419. """
  1420. file.replace
  1421. """
  1422. with salt.utils.files.fopen(name, "w+") as fp_:
  1423. fp_.write("change_me")
  1424. ret = self.run_state(
  1425. "file.replace", name=name, pattern="change", repl="salt", backup=False
  1426. )
  1427. with salt.utils.files.fopen(name, "r") as fp_:
  1428. self.assertIn("salt", fp_.read())
  1429. self.assertSaltTrueReturn(ret)
  1430. @with_tempdir()
  1431. def test_replace_issue_18612(self, base_dir):
  1432. """
  1433. Test the (mis-)behaviour of file.replace as described in #18612:
  1434. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1435. an infinitely growing file as 'file.replace' didn't check beforehand
  1436. whether the changes had already been done to the file
  1437. # Case description:
  1438. The tested file contains one commented line
  1439. The commented line should be uncommented in the end, nothing else should change
  1440. """
  1441. test_name = "test_replace_issue_18612"
  1442. path_test = os.path.join(base_dir, test_name)
  1443. with salt.utils.files.fopen(path_test, "w+") as fp_test_:
  1444. fp_test_.write("# en_US.UTF-8")
  1445. ret = []
  1446. for x in range(0, 3):
  1447. ret.append(
  1448. self.run_state(
  1449. "file.replace",
  1450. name=path_test,
  1451. pattern="^# en_US.UTF-8$",
  1452. repl="en_US.UTF-8",
  1453. append_if_not_found=True,
  1454. )
  1455. )
  1456. # ensure, the number of lines didn't change, even after invoking 'file.replace' 3 times
  1457. with salt.utils.files.fopen(path_test, "r") as fp_test_:
  1458. self.assertTrue((sum(1 for _ in fp_test_) == 1))
  1459. # ensure, the replacement succeeded
  1460. with salt.utils.files.fopen(path_test, "r") as fp_test_:
  1461. self.assertTrue(fp_test_.read().startswith("en_US.UTF-8"))
  1462. # ensure, all runs of 'file.replace' reported success
  1463. for item in ret:
  1464. self.assertSaltTrueReturn(item)
  1465. @with_tempdir()
  1466. def test_replace_issue_18612_prepend(self, base_dir):
  1467. """
  1468. Test the (mis-)behaviour of file.replace as described in #18612:
  1469. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1470. an infinitely growing file as 'file.replace' didn't check beforehand
  1471. whether the changes had already been done to the file
  1472. # Case description:
  1473. The tested multifile contains multiple lines not matching the pattern or replacement in any way
  1474. The replacement pattern should be prepended to the file
  1475. """
  1476. test_name = "test_replace_issue_18612_prepend"
  1477. path_in = os.path.join(
  1478. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1479. )
  1480. path_out = os.path.join(
  1481. RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
  1482. )
  1483. path_test = os.path.join(base_dir, test_name)
  1484. # create test file based on initial template
  1485. shutil.copyfile(path_in, path_test)
  1486. ret = []
  1487. for x in range(0, 3):
  1488. ret.append(
  1489. self.run_state(
  1490. "file.replace",
  1491. name=path_test,
  1492. pattern="^# en_US.UTF-8$",
  1493. repl="en_US.UTF-8",
  1494. prepend_if_not_found=True,
  1495. )
  1496. )
  1497. # ensure, the resulting file contains the expected lines
  1498. self.assertTrue(filecmp.cmp(path_test, path_out))
  1499. # ensure the initial file was properly backed up
  1500. self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
  1501. # ensure, all runs of 'file.replace' reported success
  1502. for item in ret:
  1503. self.assertSaltTrueReturn(item)
  1504. @with_tempdir()
  1505. def test_replace_issue_18612_append(self, base_dir):
  1506. """
  1507. Test the (mis-)behaviour of file.replace as described in #18612:
  1508. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1509. an infinitely growing file as 'file.replace' didn't check beforehand
  1510. whether the changes had already been done to the file
  1511. # Case description:
  1512. The tested multifile contains multiple lines not matching the pattern or replacement in any way
  1513. The replacement pattern should be appended to the file
  1514. """
  1515. test_name = "test_replace_issue_18612_append"
  1516. path_in = os.path.join(
  1517. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1518. )
  1519. path_out = os.path.join(
  1520. RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
  1521. )
  1522. path_test = os.path.join(base_dir, test_name)
  1523. # create test file based on initial template
  1524. shutil.copyfile(path_in, path_test)
  1525. ret = []
  1526. for x in range(0, 3):
  1527. ret.append(
  1528. self.run_state(
  1529. "file.replace",
  1530. name=path_test,
  1531. pattern="^# en_US.UTF-8$",
  1532. repl="en_US.UTF-8",
  1533. append_if_not_found=True,
  1534. )
  1535. )
  1536. # ensure, the resulting file contains the expected lines
  1537. self.assertTrue(filecmp.cmp(path_test, path_out))
  1538. # ensure the initial file was properly backed up
  1539. self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
  1540. # ensure, all runs of 'file.replace' reported success
  1541. for item in ret:
  1542. self.assertSaltTrueReturn(item)
  1543. @with_tempdir()
  1544. def test_replace_issue_18612_append_not_found_content(self, base_dir):
  1545. """
  1546. Test the (mis-)behaviour of file.replace as described in #18612:
  1547. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1548. an infinitely growing file as 'file.replace' didn't check beforehand
  1549. whether the changes had already been done to the file
  1550. # Case description:
  1551. The tested multifile contains multiple lines not matching the pattern or replacement in any way
  1552. The 'not_found_content' value should be appended to the file
  1553. """
  1554. test_name = "test_replace_issue_18612_append_not_found_content"
  1555. path_in = os.path.join(
  1556. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1557. )
  1558. path_out = os.path.join(
  1559. RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
  1560. )
  1561. path_test = os.path.join(base_dir, test_name)
  1562. # create test file based on initial template
  1563. shutil.copyfile(path_in, path_test)
  1564. ret = []
  1565. for x in range(0, 3):
  1566. ret.append(
  1567. self.run_state(
  1568. "file.replace",
  1569. name=path_test,
  1570. pattern="^# en_US.UTF-8$",
  1571. repl="en_US.UTF-8",
  1572. append_if_not_found=True,
  1573. not_found_content="THIS LINE WASN'T FOUND! SO WE'RE APPENDING IT HERE!",
  1574. )
  1575. )
  1576. # ensure, the resulting file contains the expected lines
  1577. self.assertTrue(filecmp.cmp(path_test, path_out))
  1578. # ensure the initial file was properly backed up
  1579. self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
  1580. # ensure, all runs of 'file.replace' reported success
  1581. for item in ret:
  1582. self.assertSaltTrueReturn(item)
  1583. @with_tempdir()
  1584. def test_replace_issue_18612_change_mid_line_with_comment(self, base_dir):
  1585. """
  1586. Test the (mis-)behaviour of file.replace as described in #18612:
  1587. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1588. an infinitely growing file as 'file.replace' didn't check beforehand
  1589. whether the changes had already been done to the file
  1590. # Case description:
  1591. The tested file contains 5 key=value pairs
  1592. The commented key=value pair #foo=bar should be changed to foo=salt
  1593. The comment char (#) in front of foo=bar should be removed
  1594. """
  1595. test_name = "test_replace_issue_18612_change_mid_line_with_comment"
  1596. path_in = os.path.join(
  1597. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1598. )
  1599. path_out = os.path.join(
  1600. RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
  1601. )
  1602. path_test = os.path.join(base_dir, test_name)
  1603. # create test file based on initial template
  1604. shutil.copyfile(path_in, path_test)
  1605. ret = []
  1606. for x in range(0, 3):
  1607. ret.append(
  1608. self.run_state(
  1609. "file.replace",
  1610. name=path_test,
  1611. pattern="^#foo=bar($|(?=\r\n))",
  1612. repl="foo=salt",
  1613. append_if_not_found=True,
  1614. )
  1615. )
  1616. # ensure, the resulting file contains the expected lines
  1617. self.assertTrue(filecmp.cmp(path_test, path_out))
  1618. # ensure the initial file was properly backed up
  1619. self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
  1620. # ensure, all 'file.replace' runs reported success
  1621. for item in ret:
  1622. self.assertSaltTrueReturn(item)
  1623. @with_tempdir()
  1624. def test_replace_issue_18841_no_changes(self, base_dir):
  1625. """
  1626. Test the (mis-)behaviour of file.replace as described in #18841:
  1627. Using file.replace in a way which shouldn't modify the file at all
  1628. results in changed mtime of the original file and a backup file being created.
  1629. # Case description
  1630. The tested file contains multiple lines
  1631. The tested file contains a line already matching the replacement (no change needed)
  1632. The tested file's content shouldn't change at all
  1633. The tested file's mtime shouldn't change at all
  1634. No backup file should be created
  1635. """
  1636. test_name = "test_replace_issue_18841_no_changes"
  1637. path_in = os.path.join(
  1638. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1639. )
  1640. path_test = os.path.join(base_dir, test_name)
  1641. # create test file based on initial template
  1642. shutil.copyfile(path_in, path_test)
  1643. # get (m|a)time of file
  1644. fstats_orig = os.stat(path_test)
  1645. # define how far we predate the file
  1646. age = 5 * 24 * 60 * 60
  1647. # set (m|a)time of file 5 days into the past
  1648. os.utime(path_test, (fstats_orig.st_mtime - age, fstats_orig.st_atime - age))
  1649. ret = self.run_state(
  1650. "file.replace",
  1651. name=path_test,
  1652. pattern="^hello world$",
  1653. repl="goodbye world",
  1654. show_changes=True,
  1655. flags=["IGNORECASE"],
  1656. backup=False,
  1657. )
  1658. # get (m|a)time of file
  1659. fstats_post = os.stat(path_test)
  1660. # ensure, the file content didn't change
  1661. self.assertTrue(filecmp.cmp(path_in, path_test))
  1662. # ensure no backup file was created
  1663. self.assertFalse(os.path.exists(path_test + ".bak"))
  1664. # ensure the file's mtime didn't change
  1665. self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime - age)
  1666. # ensure, all 'file.replace' runs reported success
  1667. self.assertSaltTrueReturn(ret)
  1668. def test_serialize(self):
  1669. """
  1670. Test to ensure that file.serialize returns a data structure that's
  1671. both serialized and formatted properly
  1672. """
  1673. path_test = os.path.join(RUNTIME_VARS.TMP, "test_serialize")
  1674. ret = self.run_state(
  1675. "file.serialize",
  1676. name=path_test,
  1677. dataset={
  1678. "name": "naive",
  1679. "description": "A basic test",
  1680. "a_list": ["first_element", "second_element"],
  1681. "finally": "the last item",
  1682. },
  1683. formatter="json",
  1684. )
  1685. with salt.utils.files.fopen(path_test, "rb") as fp_:
  1686. serialized_file = salt.utils.stringutils.to_unicode(fp_.read())
  1687. # The JSON serializer uses LF even on OSes where os.sep is CRLF.
  1688. expected_file = "\n".join(
  1689. [
  1690. "{",
  1691. ' "a_list": [',
  1692. ' "first_element",',
  1693. ' "second_element"',
  1694. " ],",
  1695. ' "description": "A basic test",',
  1696. ' "finally": "the last item",',
  1697. ' "name": "naive"',
  1698. "}",
  1699. "",
  1700. ]
  1701. )
  1702. self.assertEqual(serialized_file, expected_file)
  1703. @with_tempfile(create=False)
  1704. def test_serializer_deserializer_opts(self, name):
  1705. """
  1706. Test the serializer_opts and deserializer_opts options
  1707. """
  1708. data1 = {"foo": {"bar": "%(x)s"}}
  1709. data2 = {"foo": {"abc": 123}}
  1710. merged = {"foo": {"y": "not_used", "x": "baz", "abc": 123, "bar": "baz"}}
  1711. ret = self.run_state(
  1712. "file.serialize",
  1713. name=name,
  1714. dataset=data1,
  1715. formatter="configparser",
  1716. deserializer_opts=[{"defaults": {"y": "not_used"}}],
  1717. )
  1718. ret = ret[next(iter(ret))]
  1719. assert ret["result"], ret
  1720. # We should have warned about deserializer_opts being used when
  1721. # merge_if_exists was not set to True.
  1722. assert "warnings" in ret
  1723. # Run with merge_if_exists, as well as serializer and deserializer opts
  1724. # deserializer opts will be used for string interpolation of the %(x)s
  1725. # that was written to the file with data1 (i.e. bar should become baz)
  1726. ret = self.run_state(
  1727. "file.serialize",
  1728. name=name,
  1729. dataset=data2,
  1730. formatter="configparser",
  1731. merge_if_exists=True,
  1732. serializer_opts=[{"defaults": {"y": "not_used"}}],
  1733. deserializer_opts=[{"defaults": {"x": "baz"}}],
  1734. )
  1735. ret = ret[next(iter(ret))]
  1736. assert ret["result"], ret
  1737. with salt.utils.files.fopen(name) as fp_:
  1738. serialized_data = salt.serializers.configparser.deserialize(fp_)
  1739. # If this test fails, this debug logging will help tell us how the
  1740. # serialized data differs from what was serialized.
  1741. log.debug("serialized_data = %r", serialized_data)
  1742. log.debug("merged = %r", merged)
  1743. # serializing with a default of 'y' will add y = not_used into foo
  1744. assert serialized_data["foo"]["y"] == merged["foo"]["y"]
  1745. # deserializing with default of x = baz will perform interpolation on %(x)s
  1746. # and bar will then = baz
  1747. assert serialized_data["foo"]["bar"] == merged["foo"]["bar"]
  1748. @with_tempfile(create=False)
  1749. def test_serializer_plist_binary_file_open(self, name):
  1750. """
  1751. Test the serialization and deserialization of plists which should include
  1752. the "rb" file open arguments change specifically for this formatter to handle
  1753. binary plists.
  1754. """
  1755. data1 = {"foo": {"bar": "%(x)s"}}
  1756. data2 = {"foo": {"abc": 123}}
  1757. merged = {"foo": {"abc": 123, "bar": "%(x)s"}}
  1758. ret = self.run_state(
  1759. "file.serialize",
  1760. name=name,
  1761. dataset=data1,
  1762. formatter="plist",
  1763. serializer_opts=[{"fmt": "FMT_BINARY"}],
  1764. )
  1765. ret = ret[next(iter(ret))]
  1766. assert ret["result"], ret
  1767. # Run with merge_if_exists so we test the deserializer.
  1768. ret = self.run_state(
  1769. "file.serialize",
  1770. name=name,
  1771. dataset=data2,
  1772. formatter="plist",
  1773. merge_if_exists=True,
  1774. serializer_opts=[{"fmt": "FMT_BINARY"}],
  1775. )
  1776. ret = ret[next(iter(ret))]
  1777. assert ret["result"], ret
  1778. with salt.utils.files.fopen(name, "rb") as fp_:
  1779. serialized_data = salt.serializers.plist.deserialize(fp_)
  1780. # make sure our serialized data matches what we expect
  1781. assert serialized_data["foo"] == merged["foo"]
  1782. @with_tempfile(create=False)
  1783. def test_serializer_plist_file_open(self, name):
  1784. """
  1785. Test the serialization and deserialization of non binary plists with
  1786. the new line concatenation.
  1787. """
  1788. data1 = {"foo": {"bar": "%(x)s"}}
  1789. data2 = {"foo": {"abc": 123}}
  1790. merged = {"foo": {"abc": 123, "bar": "%(x)s"}}
  1791. ret = self.run_state(
  1792. "file.serialize", name=name, dataset=data1, formatter="plist",
  1793. )
  1794. ret = ret[next(iter(ret))]
  1795. assert ret["result"], ret
  1796. # Run with merge_if_exists so we test the deserializer.
  1797. ret = self.run_state(
  1798. "file.serialize",
  1799. name=name,
  1800. dataset=data2,
  1801. formatter="plist",
  1802. merge_if_exists=True,
  1803. )
  1804. ret = ret[next(iter(ret))]
  1805. assert ret["result"], ret
  1806. with salt.utils.files.fopen(name, "rb") as fp_:
  1807. serialized_data = salt.serializers.plist.deserialize(fp_)
  1808. # make sure our serialized data matches what we expect
  1809. assert serialized_data["foo"] == merged["foo"]
  1810. @with_tempdir()
  1811. def test_replace_issue_18841_omit_backup(self, base_dir):
  1812. """
  1813. Test the (mis-)behaviour of file.replace as described in #18841:
  1814. Using file.replace in a way which shouldn't modify the file at all
  1815. results in changed mtime of the original file and a backup file being created.
  1816. # Case description
  1817. The tested file contains multiple lines
  1818. The tested file contains a line already matching the replacement (no change needed)
  1819. The tested file's content shouldn't change at all
  1820. The tested file's mtime shouldn't change at all
  1821. No backup file should be created, although backup=False isn't explicitly defined
  1822. """
  1823. test_name = "test_replace_issue_18841_omit_backup"
  1824. path_in = os.path.join(
  1825. RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
  1826. )
  1827. path_test = os.path.join(base_dir, test_name)
  1828. # create test file based on initial template
  1829. shutil.copyfile(path_in, path_test)
  1830. # get (m|a)time of file
  1831. fstats_orig = os.stat(path_test)
  1832. # define how far we predate the file
  1833. age = 5 * 24 * 60 * 60
  1834. # set (m|a)time of file 5 days into the past
  1835. os.utime(path_test, (fstats_orig.st_mtime - age, fstats_orig.st_atime - age))
  1836. ret = self.run_state(
  1837. "file.replace",
  1838. name=path_test,
  1839. pattern="^hello world$",
  1840. repl="goodbye world",
  1841. show_changes=True,
  1842. flags=["IGNORECASE"],
  1843. )
  1844. # get (m|a)time of file
  1845. fstats_post = os.stat(path_test)
  1846. # ensure, the file content didn't change
  1847. self.assertTrue(filecmp.cmp(path_in, path_test))
  1848. # ensure no backup file was created
  1849. self.assertFalse(os.path.exists(path_test + ".bak"))
  1850. # ensure the file's mtime didn't change
  1851. self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime - age)
  1852. # ensure, all 'file.replace' runs reported success
  1853. self.assertSaltTrueReturn(ret)
  1854. @with_tempfile()
  1855. def test_comment(self, name):
  1856. """
  1857. file.comment
  1858. """
  1859. # write a line to file
  1860. with salt.utils.files.fopen(name, "w+") as fp_:
  1861. fp_.write("comment_me")
  1862. # Look for changes with test=True: return should be "None" at the first run
  1863. ret = self.run_state("file.comment", test=True, name=name, regex="^comment")
  1864. self.assertSaltNoneReturn(ret)
  1865. # comment once
  1866. ret = self.run_state("file.comment", name=name, regex="^comment")
  1867. # result is positive
  1868. self.assertSaltTrueReturn(ret)
  1869. # line is commented
  1870. with salt.utils.files.fopen(name, "r") as fp_:
  1871. self.assertTrue(fp_.read().startswith("#comment"))
  1872. # comment twice
  1873. ret = self.run_state("file.comment", name=name, regex="^comment")
  1874. # result is still positive
  1875. self.assertSaltTrueReturn(ret)
  1876. # line is still commented
  1877. with salt.utils.files.fopen(name, "r") as fp_:
  1878. self.assertTrue(fp_.read().startswith("#comment"))
  1879. # Test previously commented file returns "True" now and not "None" with test=True
  1880. ret = self.run_state("file.comment", test=True, name=name, regex="^comment")
  1881. self.assertSaltTrueReturn(ret)
  1882. @with_tempfile()
  1883. def test_test_comment(self, name):
  1884. """
  1885. file.comment test interface
  1886. """
  1887. with salt.utils.files.fopen(name, "w+") as fp_:
  1888. fp_.write("comment_me")
  1889. ret = self.run_state("file.comment", test=True, name=name, regex=".*comment.*",)
  1890. with salt.utils.files.fopen(name, "r") as fp_:
  1891. self.assertNotIn("#comment", fp_.read())
  1892. self.assertSaltNoneReturn(ret)
  1893. @with_tempfile()
  1894. def test_uncomment(self, name):
  1895. """
  1896. file.uncomment
  1897. """
  1898. with salt.utils.files.fopen(name, "w+") as fp_:
  1899. fp_.write("#comment_me")
  1900. ret = self.run_state("file.uncomment", name=name, regex="^comment")
  1901. with salt.utils.files.fopen(name, "r") as fp_:
  1902. self.assertNotIn("#comment", fp_.read())
  1903. self.assertSaltTrueReturn(ret)
  1904. @with_tempfile()
  1905. def test_test_uncomment(self, name):
  1906. """
  1907. file.comment test interface
  1908. """
  1909. with salt.utils.files.fopen(name, "w+") as fp_:
  1910. fp_.write("#comment_me")
  1911. ret = self.run_state("file.uncomment", test=True, name=name, regex="^comment.*")
  1912. with salt.utils.files.fopen(name, "r") as fp_:
  1913. self.assertIn("#comment", fp_.read())
  1914. self.assertSaltNoneReturn(ret)
  1915. @with_tempfile()
  1916. def test_append(self, name):
  1917. """
  1918. file.append
  1919. """
  1920. with salt.utils.files.fopen(name, "w+") as fp_:
  1921. fp_.write("#salty!")
  1922. ret = self.run_state("file.append", name=name, text="cheese")
  1923. with salt.utils.files.fopen(name, "r") as fp_:
  1924. self.assertIn("cheese", fp_.read())
  1925. self.assertSaltTrueReturn(ret)
  1926. @with_tempfile()
  1927. def test_test_append(self, name):
  1928. """
  1929. file.append test interface
  1930. """
  1931. with salt.utils.files.fopen(name, "w+") as fp_:
  1932. fp_.write("#salty!")
  1933. ret = self.run_state("file.append", test=True, name=name, text="cheese")
  1934. with salt.utils.files.fopen(name, "r") as fp_:
  1935. self.assertNotIn("cheese", fp_.read())
  1936. self.assertSaltNoneReturn(ret)
  1937. @with_tempdir()
  1938. def test_append_issue_1864_makedirs(self, base_dir):
  1939. """
  1940. file.append but create directories if needed as an option, and create
  1941. the file if it doesn't exist
  1942. """
  1943. fname = "append_issue_1864_makedirs"
  1944. name = os.path.join(base_dir, fname)
  1945. # Non existing file get's touched
  1946. ret = self.run_state("file.append", name=name, text="cheese", makedirs=True)
  1947. self.assertSaltTrueReturn(ret)
  1948. # Nested directory and file get's touched
  1949. name = os.path.join(base_dir, "issue_1864", fname)
  1950. ret = self.run_state("file.append", name=name, text="cheese", makedirs=True)
  1951. self.assertSaltTrueReturn(ret)
  1952. # Parent directory exists but file does not and makedirs is False
  1953. name = os.path.join(base_dir, "issue_1864", fname + "2")
  1954. ret = self.run_state("file.append", name=name, text="cheese")
  1955. self.assertSaltTrueReturn(ret)
  1956. self.assertTrue(os.path.isfile(name))
  1957. @with_tempdir()
  1958. def test_prepend_issue_27401_makedirs(self, base_dir):
  1959. """
  1960. file.prepend but create directories if needed as an option, and create
  1961. the file if it doesn't exist
  1962. """
  1963. fname = "prepend_issue_27401"
  1964. name = os.path.join(base_dir, fname)
  1965. # Non existing file get's touched
  1966. ret = self.run_state("file.prepend", name=name, text="cheese", makedirs=True)
  1967. self.assertSaltTrueReturn(ret)
  1968. # Nested directory and file get's touched
  1969. name = os.path.join(base_dir, "issue_27401", fname)
  1970. ret = self.run_state("file.prepend", name=name, text="cheese", makedirs=True)
  1971. self.assertSaltTrueReturn(ret)
  1972. # Parent directory exists but file does not and makedirs is False
  1973. name = os.path.join(base_dir, "issue_27401", fname + "2")
  1974. ret = self.run_state("file.prepend", name=name, text="cheese")
  1975. self.assertSaltTrueReturn(ret)
  1976. self.assertTrue(os.path.isfile(name))
  1977. @with_tempfile()
  1978. def test_touch(self, name):
  1979. """
  1980. file.touch
  1981. """
  1982. ret = self.run_state("file.touch", name=name)
  1983. self.assertTrue(os.path.isfile(name))
  1984. self.assertSaltTrueReturn(ret)
  1985. @with_tempfile(create=False)
  1986. def test_test_touch(self, name):
  1987. """
  1988. file.touch test interface
  1989. """
  1990. ret = self.run_state("file.touch", test=True, name=name)
  1991. self.assertFalse(os.path.isfile(name))
  1992. self.assertSaltNoneReturn(ret)
  1993. @with_tempdir()
  1994. def test_touch_directory(self, base_dir):
  1995. """
  1996. file.touch a directory
  1997. """
  1998. name = os.path.join(base_dir, "touch_test_dir")
  1999. os.mkdir(name)
  2000. ret = self.run_state("file.touch", name=name)
  2001. self.assertSaltTrueReturn(ret)
  2002. self.assertTrue(os.path.isdir(name))
  2003. @with_tempdir()
  2004. def test_issue_2227_file_append(self, base_dir):
  2005. """
  2006. Text to append includes a percent symbol
  2007. """
  2008. # let's make use of existing state to create a file with contents to
  2009. # test against
  2010. tmp_file_append = os.path.join(base_dir, "test.append")
  2011. self.run_state("file.touch", name=tmp_file_append)
  2012. self.run_state(
  2013. "file.append", name=tmp_file_append, source="salt://testappend/firstif"
  2014. )
  2015. self.run_state(
  2016. "file.append", name=tmp_file_append, source="salt://testappend/secondif"
  2017. )
  2018. # Now our real test
  2019. try:
  2020. ret = self.run_state(
  2021. "file.append", name=tmp_file_append, text="HISTTIMEFORMAT='%F %T '"
  2022. )
  2023. self.assertSaltTrueReturn(ret)
  2024. with salt.utils.files.fopen(tmp_file_append, "r") as fp_:
  2025. contents_pre = fp_.read()
  2026. # It should not append text again
  2027. ret = self.run_state(
  2028. "file.append", name=tmp_file_append, text="HISTTIMEFORMAT='%F %T '"
  2029. )
  2030. self.assertSaltTrueReturn(ret)
  2031. with salt.utils.files.fopen(tmp_file_append, "r") as fp_:
  2032. contents_post = fp_.read()
  2033. self.assertEqual(contents_pre, contents_post)
  2034. except AssertionError:
  2035. if os.path.exists(tmp_file_append):
  2036. shutil.copy(tmp_file_append, tmp_file_append + ".bak")
  2037. raise
  2038. @with_tempdir()
  2039. def test_issue_2401_file_comment(self, base_dir):
  2040. # Get a path to the temporary file
  2041. tmp_file = os.path.join(base_dir, "issue-2041-comment.txt")
  2042. # Write some data to it
  2043. with salt.utils.files.fopen(tmp_file, "w") as fp_:
  2044. fp_.write("hello\nworld\n")
  2045. # create the sls template
  2046. template_lines = [
  2047. "{0}:".format(tmp_file),
  2048. " file.comment:",
  2049. " - regex: ^world",
  2050. ]
  2051. template = "\n".join(template_lines)
  2052. try:
  2053. ret = self.run_function("state.template_str", [template], timeout=120)
  2054. self.assertSaltTrueReturn(ret)
  2055. self.assertNotInSaltComment("Pattern already commented", ret)
  2056. self.assertInSaltComment("Commented lines successfully", ret)
  2057. # This next time, it is already commented.
  2058. ret = self.run_function("state.template_str", [template], timeout=120)
  2059. self.assertSaltTrueReturn(ret)
  2060. self.assertInSaltComment("Pattern already commented", ret)
  2061. except AssertionError:
  2062. shutil.copy(tmp_file, tmp_file + ".bak")
  2063. raise
  2064. @with_tempdir()
  2065. def test_issue_2379_file_append(self, base_dir):
  2066. # Get a path to the temporary file
  2067. tmp_file = os.path.join(base_dir, "issue-2379-file-append.txt")
  2068. # Write some data to it
  2069. with salt.utils.files.fopen(tmp_file, "w") as fp_:
  2070. fp_.write(
  2071. "hello\nworld\n" # Some junk
  2072. "#PermitRootLogin yes\n" # Commented text
  2073. "# PermitRootLogin yes\n" # Commented text with space
  2074. )
  2075. # create the sls template
  2076. template_lines = [
  2077. "{0}:".format(tmp_file),
  2078. " file.append:",
  2079. " - text: PermitRootLogin yes",
  2080. ]
  2081. template = "\n".join(template_lines)
  2082. try:
  2083. ret = self.run_function("state.template_str", [template])
  2084. self.assertSaltTrueReturn(ret)
  2085. self.assertInSaltComment("Appended 1 lines", ret)
  2086. except AssertionError:
  2087. shutil.copy(tmp_file, tmp_file + ".bak")
  2088. raise
  2089. @skipIf(IS_WINDOWS, "Mode not available in Windows")
  2090. @with_tempdir(create=False)
  2091. @with_tempdir(create=False)
  2092. def test_issue_2726_mode_kwarg(self, dir1, dir2):
  2093. # Let's test for the wrong usage approach
  2094. bad_mode_kwarg_testfile = os.path.join(dir1, "bad_mode_kwarg", "testfile")
  2095. bad_template = [
  2096. "{0}:".format(bad_mode_kwarg_testfile),
  2097. " file.recurse:",
  2098. " - source: salt://testfile",
  2099. " - mode: 644",
  2100. ]
  2101. ret = self.run_function("state.template_str", [os.linesep.join(bad_template)])
  2102. self.assertSaltFalseReturn(ret)
  2103. self.assertInSaltComment(
  2104. "'mode' is not allowed in 'file.recurse'. Please use "
  2105. "'file_mode' and 'dir_mode'.",
  2106. ret,
  2107. )
  2108. self.assertNotInSaltComment(
  2109. "TypeError: managed() got multiple values for keyword " "argument 'mode'",
  2110. ret,
  2111. )
  2112. # Now, the correct usage approach
  2113. good_mode_kwargs_testfile = os.path.join(dir2, "good_mode_kwargs", "testappend")
  2114. good_template = [
  2115. "{0}:".format(good_mode_kwargs_testfile),
  2116. " file.recurse:",
  2117. " - source: salt://testappend",
  2118. " - dir_mode: 744",
  2119. " - file_mode: 644",
  2120. ]
  2121. ret = self.run_function("state.template_str", [os.linesep.join(good_template)])
  2122. self.assertSaltTrueReturn(ret)
  2123. @with_tempdir()
  2124. def test_issue_8343_accumulated_require_in(self, base_dir):
  2125. template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-8343.sls")
  2126. testcase_filedest = os.path.join(base_dir, "issue-8343.txt")
  2127. if os.path.exists(template_path):
  2128. os.remove(template_path)
  2129. if os.path.exists(testcase_filedest):
  2130. os.remove(testcase_filedest)
  2131. sls_template = [
  2132. "{0}:",
  2133. " file.managed:",
  2134. " - contents: |",
  2135. " #",
  2136. "",
  2137. "prepend-foo-accumulator-from-pillar:",
  2138. " file.accumulated:",
  2139. " - require_in:",
  2140. " - file: prepend-foo-management",
  2141. " - filename: {0}",
  2142. " - text: |",
  2143. " foo",
  2144. "",
  2145. "append-foo-accumulator-from-pillar:",
  2146. " file.accumulated:",
  2147. " - require_in:",
  2148. " - file: append-foo-management",
  2149. " - filename: {0}",
  2150. " - text: |",
  2151. " bar",
  2152. "",
  2153. "prepend-foo-management:",
  2154. " file.blockreplace:",
  2155. " - name: {0}",
  2156. ' - marker_start: "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT"',
  2157. ' - marker_end: "#-- end salt managed zonestart --"',
  2158. " - content: ''",
  2159. " - prepend_if_not_found: True",
  2160. " - backup: '.bak'",
  2161. " - show_changes: True",
  2162. "",
  2163. "append-foo-management:",
  2164. " file.blockreplace:",
  2165. " - name: {0}",
  2166. ' - marker_start: "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT"',
  2167. ' - marker_end: "#-- end salt managed zoneend --"',
  2168. " - content: ''",
  2169. " - append_if_not_found: True",
  2170. " - backup: '.bak2'",
  2171. " - show_changes: True",
  2172. "",
  2173. ]
  2174. with salt.utils.files.fopen(template_path, "w") as fp_:
  2175. fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
  2176. ret = self.run_function("state.sls", mods="issue-8343")
  2177. for name, step in six.iteritems(ret):
  2178. self.assertSaltTrueReturn({name: step})
  2179. with salt.utils.files.fopen(testcase_filedest) as fp_:
  2180. contents = fp_.read().split(os.linesep)
  2181. expected = [
  2182. "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT",
  2183. "foo",
  2184. "#-- end salt managed zonestart --",
  2185. "#",
  2186. "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT",
  2187. "bar",
  2188. "#-- end salt managed zoneend --",
  2189. "",
  2190. ]
  2191. self.assertEqual(
  2192. [salt.utils.stringutils.to_str(line) for line in expected], contents
  2193. )
  2194. @with_tempdir()
  2195. @skipIf(
  2196. salt.utils.platform.is_darwin() and six.PY2, "This test hangs on OS X on Py2"
  2197. )
  2198. def test_issue_11003_immutable_lazy_proxy_sum(self, base_dir):
  2199. # causes the Import-Module ServerManager error on Windows
  2200. template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-11003.sls")
  2201. testcase_filedest = os.path.join(base_dir, "issue-11003.txt")
  2202. sls_template = [
  2203. "a{0}:",
  2204. " file.absent:",
  2205. " - name: {0}",
  2206. "",
  2207. "{0}:",
  2208. " file.managed:",
  2209. " - contents: |",
  2210. " #",
  2211. "",
  2212. "test-acc1:",
  2213. " file.accumulated:",
  2214. " - require_in:",
  2215. " - file: final",
  2216. " - filename: {0}",
  2217. " - text: |",
  2218. " bar",
  2219. "",
  2220. "test-acc2:",
  2221. " file.accumulated:",
  2222. " - watch_in:",
  2223. " - file: final",
  2224. " - filename: {0}",
  2225. " - text: |",
  2226. " baz",
  2227. "",
  2228. "final:",
  2229. " file.blockreplace:",
  2230. " - name: {0}",
  2231. ' - marker_start: "#-- start managed zone PLEASE, DO NOT EDIT"',
  2232. ' - marker_end: "#-- end managed zone"',
  2233. " - content: ''",
  2234. " - append_if_not_found: True",
  2235. " - show_changes: True",
  2236. ]
  2237. with salt.utils.files.fopen(template_path, "w") as fp_:
  2238. fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
  2239. ret = self.run_function("state.sls", mods="issue-11003", timeout=600)
  2240. for name, step in six.iteritems(ret):
  2241. self.assertSaltTrueReturn({name: step})
  2242. with salt.utils.files.fopen(testcase_filedest) as fp_:
  2243. contents = fp_.read().split(os.linesep)
  2244. begin = contents.index("#-- start managed zone PLEASE, DO NOT EDIT") + 1
  2245. end = contents.index("#-- end managed zone")
  2246. block_contents = contents[begin:end]
  2247. for item in ("", "bar", "baz"):
  2248. block_contents.remove(item)
  2249. self.assertEqual(block_contents, [])
  2250. @with_tempdir()
  2251. def test_issue_8947_utf8_sls(self, base_dir):
  2252. """
  2253. Test some file operation with utf-8 characters on the sls
  2254. This is more generic than just a file test. Feel free to move
  2255. """
  2256. self.maxDiff = None
  2257. korean_1 = "한국어 시험"
  2258. korean_2 = "첫 번째 행"
  2259. korean_3 = "마지막 행"
  2260. test_file = os.path.join(base_dir, "{0}.txt".format(korean_1))
  2261. test_file_encoded = test_file
  2262. template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-8947.sls")
  2263. # create the sls template
  2264. template = textwrap.dedent(
  2265. """\
  2266. some-utf8-file-create:
  2267. file.managed:
  2268. - name: {test_file}
  2269. - contents: {korean_1}
  2270. - makedirs: True
  2271. - replace: True
  2272. - show_diff: True
  2273. some-utf8-file-create2:
  2274. file.managed:
  2275. - name: {test_file}
  2276. - contents: |
  2277. {korean_2}
  2278. {korean_1}
  2279. {korean_3}
  2280. - replace: True
  2281. - show_diff: True
  2282. """.format(
  2283. **locals()
  2284. )
  2285. )
  2286. if not salt.utils.platform.is_windows():
  2287. template += textwrap.dedent(
  2288. """\
  2289. some-utf8-file-content-test:
  2290. cmd.run:
  2291. - name: 'cat "{test_file}"'
  2292. - require:
  2293. - file: some-utf8-file-create2
  2294. """.format(
  2295. **locals()
  2296. )
  2297. )
  2298. # Save template file
  2299. with salt.utils.files.fopen(template_path, "wb") as fp_:
  2300. fp_.write(salt.utils.stringutils.to_bytes(template))
  2301. try:
  2302. result = self.run_function("state.sls", mods="issue-8947")
  2303. if not isinstance(result, dict):
  2304. raise AssertionError(
  2305. (
  2306. "Something went really wrong while testing this sls:" " {0}"
  2307. ).format(repr(result))
  2308. )
  2309. # difflib produces different output on python 2.6 than on >=2.7
  2310. if sys.version_info < (2, 7):
  2311. diff = "--- \n+++ \n@@ -1,1 +1,3 @@\n"
  2312. else:
  2313. diff = "--- \n+++ \n@@ -1 +1,3 @@\n"
  2314. diff += ("+첫 번째 행{0}" " 한국어 시험{0}" "+마지막 행{0}").format(os.linesep)
  2315. ret = {x.split("_|-")[1]: y for x, y in six.iteritems(result)}
  2316. # Confirm initial creation of file
  2317. self.assertEqual(
  2318. ret["some-utf8-file-create"]["comment"],
  2319. "File {0} updated".format(test_file_encoded),
  2320. )
  2321. self.assertEqual(
  2322. ret["some-utf8-file-create"]["changes"], {"diff": "New file"}
  2323. )
  2324. # Confirm file was modified and that the diff was as expected
  2325. self.assertEqual(
  2326. ret["some-utf8-file-create2"]["comment"],
  2327. "File {0} updated".format(test_file_encoded),
  2328. )
  2329. self.assertEqual(ret["some-utf8-file-create2"]["changes"], {"diff": diff})
  2330. if salt.utils.platform.is_windows():
  2331. import subprocess
  2332. import win32api
  2333. p = subprocess.Popen(
  2334. salt.utils.stringutils.to_str(
  2335. "type {}".format(win32api.GetShortPathName(test_file))
  2336. ),
  2337. shell=True,
  2338. stdout=subprocess.PIPE,
  2339. stderr=subprocess.PIPE,
  2340. )
  2341. p.poll()
  2342. out = p.stdout.read()
  2343. self.assertEqual(
  2344. out.decode("utf-8"),
  2345. os.linesep.join((korean_2, korean_1, korean_3)) + os.linesep,
  2346. )
  2347. else:
  2348. self.assertEqual(
  2349. ret["some-utf8-file-content-test"]["comment"],
  2350. 'Command "cat "{0}"" run'.format(test_file_encoded),
  2351. )
  2352. self.assertEqual(
  2353. ret["some-utf8-file-content-test"]["changes"]["stdout"],
  2354. "\n".join((korean_2, korean_1, korean_3)),
  2355. )
  2356. finally:
  2357. try:
  2358. os.remove(template_path)
  2359. except OSError:
  2360. pass
  2361. @skip_if_not_root
  2362. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2363. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2364. @with_system_user_and_group(
  2365. TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
  2366. )
  2367. @with_tempdir()
  2368. def test_issue_12209_follow_symlinks(self, tempdir, user, group):
  2369. """
  2370. Ensure that symlinks are properly chowned when recursing (following
  2371. symlinks)
  2372. """
  2373. # Make the directories for this test
  2374. onedir = os.path.join(tempdir, "one")
  2375. twodir = os.path.join(tempdir, "two")
  2376. os.mkdir(onedir)
  2377. os.symlink(onedir, twodir)
  2378. # Run the state
  2379. ret = self.run_state(
  2380. "file.directory",
  2381. name=tempdir,
  2382. follow_symlinks=True,
  2383. user=user,
  2384. group=group,
  2385. recurse=["user", "group"],
  2386. )
  2387. self.assertSaltTrueReturn(ret)
  2388. # Double-check, in case state mis-reported a True result. Since we are
  2389. # following symlinks, we expect twodir to still be owned by root, but
  2390. # onedir should be owned by the 'issue12209' user.
  2391. onestats = os.stat(onedir)
  2392. twostats = os.lstat(twodir)
  2393. self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
  2394. self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, "root")
  2395. self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
  2396. if salt.utils.path.which("id"):
  2397. root_group = self.run_function("user.primary_group", ["root"])
  2398. self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, root_group)
  2399. @skip_if_not_root
  2400. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2401. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2402. @with_system_user_and_group(
  2403. TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
  2404. )
  2405. @with_tempdir()
  2406. def test_issue_12209_no_follow_symlinks(self, tempdir, user, group):
  2407. """
  2408. Ensure that symlinks are properly chowned when recursing (not following
  2409. symlinks)
  2410. """
  2411. # Make the directories for this test
  2412. onedir = os.path.join(tempdir, "one")
  2413. twodir = os.path.join(tempdir, "two")
  2414. os.mkdir(onedir)
  2415. os.symlink(onedir, twodir)
  2416. # Run the state
  2417. ret = self.run_state(
  2418. "file.directory",
  2419. name=tempdir,
  2420. follow_symlinks=False,
  2421. user=user,
  2422. group=group,
  2423. recurse=["user", "group"],
  2424. )
  2425. self.assertSaltTrueReturn(ret)
  2426. # Double-check, in case state mis-reported a True result. Since we
  2427. # are not following symlinks, we expect twodir to now be owned by
  2428. # the 'issue12209' user, just link onedir.
  2429. onestats = os.stat(onedir)
  2430. twostats = os.lstat(twodir)
  2431. self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
  2432. self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, user)
  2433. self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
  2434. self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, group)
  2435. @with_tempfile(create=False)
  2436. @with_tempfile()
  2437. def test_template_local_file(self, source, dest):
  2438. """
  2439. Test a file.managed state with a local file as the source. Test both
  2440. with the file:// protocol designation prepended, and without it.
  2441. """
  2442. with salt.utils.files.fopen(source, "w") as fp_:
  2443. fp_.write("{{ foo }}\n")
  2444. for prefix in ("file://", ""):
  2445. ret = self.run_state(
  2446. "file.managed",
  2447. name=dest,
  2448. source=prefix + source,
  2449. template="jinja",
  2450. context={"foo": "Hello world!"},
  2451. )
  2452. self.assertSaltTrueReturn(ret)
  2453. @with_tempfile()
  2454. def test_template_local_file_noclobber(self, source):
  2455. """
  2456. Test the case where a source file is in the minion's local filesystem,
  2457. and the source path is the same as the destination path.
  2458. """
  2459. with salt.utils.files.fopen(source, "w") as fp_:
  2460. fp_.write("{{ foo }}\n")
  2461. ret = self.run_state(
  2462. "file.managed",
  2463. name=source,
  2464. source=source,
  2465. template="jinja",
  2466. context={"foo": "Hello world!"},
  2467. )
  2468. self.assertSaltFalseReturn(ret)
  2469. self.assertIn(
  2470. ("Source file cannot be the same as destination"),
  2471. ret[next(iter(ret))]["comment"],
  2472. )
  2473. @with_tempfile(create=False)
  2474. @with_tempfile(create=False)
  2475. def test_issue_25250_force_copy_deletes(self, source, dest):
  2476. """
  2477. ensure force option in copy state does not delete target file
  2478. """
  2479. shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "hosts"), source)
  2480. shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "file/base/cheese"), dest)
  2481. self.run_state("file.copy", name=dest, source=source, force=True)
  2482. self.assertTrue(os.path.exists(dest))
  2483. self.assertTrue(filecmp.cmp(source, dest))
  2484. os.remove(source)
  2485. os.remove(dest)
  2486. @destructiveTest
  2487. @skip_if_not_root
  2488. @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
  2489. @with_tempfile()
  2490. def test_file_copy_make_dirs(self, source):
  2491. """
  2492. ensure make_dirs creates correct user perms
  2493. """
  2494. shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "hosts"), source)
  2495. dest = os.path.join(RUNTIME_VARS.TMP, "dir1", "dir2", "copied_file.txt")
  2496. user = "salt"
  2497. mode = "0644"
  2498. ret = self.run_function("user.add", [user])
  2499. self.assertTrue(ret, "Failed to add user. Are you running as sudo?")
  2500. ret = self.run_state(
  2501. "file.copy", name=dest, source=source, user=user, makedirs=True, mode=mode
  2502. )
  2503. self.assertSaltTrueReturn(ret)
  2504. file_checks = [
  2505. dest,
  2506. os.path.join(RUNTIME_VARS.TMP, "dir1"),
  2507. os.path.join(RUNTIME_VARS.TMP, "dir1", "dir2"),
  2508. ]
  2509. for check in file_checks:
  2510. user_check = self.run_function("file.get_user", [check])
  2511. mode_check = self.run_function("file.get_mode", [check])
  2512. self.assertEqual(user_check, user)
  2513. self.assertEqual(salt.utils.files.normalize_mode(mode_check), mode)
  2514. def test_contents_pillar_with_pillar_list(self):
  2515. """
  2516. This tests for any regressions for this issue:
  2517. https://github.com/saltstack/salt/issues/30934
  2518. """
  2519. state_file = "file_contents_pillar"
  2520. ret = self.run_function("state.sls", mods=state_file)
  2521. self.assertSaltTrueReturn(ret)
  2522. @skip_if_not_root
  2523. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2524. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2525. @with_system_user_and_group(
  2526. TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
  2527. )
  2528. def test_owner_after_setuid(self, user, group):
  2529. """
  2530. Test to check file user/group after setting setuid or setgid.
  2531. Because Python os.chown() does reset the setuid/setgid to 0.
  2532. https://github.com/saltstack/salt/pull/45257
  2533. """
  2534. # Desired configuration.
  2535. desired = {
  2536. "file": os.path.join(RUNTIME_VARS.TMP, "file_with_setuid"),
  2537. "user": user,
  2538. "group": group,
  2539. "mode": "4750",
  2540. }
  2541. # Run the state.
  2542. ret = self.run_state(
  2543. "file.managed",
  2544. name=desired["file"],
  2545. user=desired["user"],
  2546. group=desired["group"],
  2547. mode=desired["mode"],
  2548. )
  2549. # Check result.
  2550. file_stat = os.stat(desired["file"])
  2551. result = {
  2552. "user": pwd.getpwuid(file_stat.st_uid).pw_name,
  2553. "group": grp.getgrgid(file_stat.st_gid).gr_name,
  2554. "mode": oct(stat.S_IMODE(file_stat.st_mode)),
  2555. }
  2556. self.assertSaltTrueReturn(ret)
  2557. self.assertEqual(desired["user"], result["user"])
  2558. self.assertEqual(desired["group"], result["group"])
  2559. self.assertEqual(desired["mode"], result["mode"].lstrip("0Oo"))
  2560. def test_binary_contents(self):
  2561. """
  2562. This tests to ensure that binary contents do not cause a traceback.
  2563. """
  2564. name = os.path.join(RUNTIME_VARS.TMP, "1px.gif")
  2565. try:
  2566. ret = self.run_state("file.managed", name=name, contents=BINARY_FILE)
  2567. self.assertSaltTrueReturn(ret)
  2568. finally:
  2569. try:
  2570. os.remove(name)
  2571. except OSError:
  2572. pass
  2573. def test_binary_contents_twice(self):
  2574. """
  2575. This test ensures that after a binary file is created, salt can confirm
  2576. that the file is in the correct state.
  2577. """
  2578. # Create a binary file
  2579. name = os.path.join(RUNTIME_VARS.TMP, "1px.gif")
  2580. # First run state ensures file is created
  2581. ret = self.run_state("file.managed", name=name, contents=BINARY_FILE)
  2582. self.assertSaltTrueReturn(ret)
  2583. # Second run of state ensures file is in correct state
  2584. ret = self.run_state("file.managed", name=name, contents=BINARY_FILE)
  2585. self.assertSaltTrueReturn(ret)
  2586. try:
  2587. os.remove(name)
  2588. except OSError:
  2589. pass
  2590. @skip_if_not_root
  2591. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2592. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2593. @with_system_user_and_group(
  2594. TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
  2595. )
  2596. @with_tempdir()
  2597. def test_issue_48336_file_managed_mode_setuid(self, tempdir, user, group):
  2598. """
  2599. Ensure that mode is correct with changing of ownership and group
  2600. symlinks)
  2601. """
  2602. tempfile = os.path.join(tempdir, "temp_file_issue_48336")
  2603. # Run the state
  2604. ret = self.run_state(
  2605. "file.managed", name=tempfile, user=user, group=group, mode="4750",
  2606. )
  2607. self.assertSaltTrueReturn(ret)
  2608. # Check that the owner and group are correct, and
  2609. # the mode is what we expect
  2610. temp_file_stats = os.stat(tempfile)
  2611. # Normalize the mode
  2612. temp_file_mode = str(oct(stat.S_IMODE(temp_file_stats.st_mode)))
  2613. temp_file_mode = salt.utils.files.normalize_mode(temp_file_mode)
  2614. self.assertEqual(temp_file_mode, "4750")
  2615. self.assertEqual(pwd.getpwuid(temp_file_stats.st_uid).pw_name, user)
  2616. self.assertEqual(grp.getgrgid(temp_file_stats.st_gid).gr_name, group)
  2617. @with_tempdir()
  2618. def test_issue_48557(self, tempdir):
  2619. tempfile = os.path.join(tempdir, "temp_file_issue_48557")
  2620. with salt.utils.files.fopen(tempfile, "wb") as fp:
  2621. fp.write(os.linesep.join(["test1", "test2", "test3", ""]).encode("utf-8"))
  2622. ret = self.run_state(
  2623. "file.line", name=tempfile, after="test2", mode="insert", content="test4"
  2624. )
  2625. self.assertSaltTrueReturn(ret)
  2626. with salt.utils.files.fopen(tempfile, "rb") as fp:
  2627. content = fp.read()
  2628. self.assertEqual(
  2629. content,
  2630. os.linesep.join(["test1", "test2", "test4", "test3", ""]).encode("utf-8"),
  2631. )
  2632. @with_tempfile()
  2633. def test_issue_50221(self, name):
  2634. expected = "abc{0}{0}{0}".format(os.linesep)
  2635. ret = self.run_function("pillar.get", ["issue-50221"])
  2636. assert ret == expected
  2637. ret = self.run_function("state.apply", ["issue-50221"], pillar={"name": name},)
  2638. self.assertSaltTrueReturn(ret)
  2639. with salt.utils.files.fopen(name, "r") as fp:
  2640. contents = fp.read()
  2641. assert contents == expected
  2642. def test_managed_file_issue_51208(self):
  2643. """
  2644. Test to ensure we can handle a file with escaped double-quotes
  2645. """
  2646. name = os.path.join(RUNTIME_VARS.TMP, "issue_51208.txt")
  2647. ret = self.run_state(
  2648. "file.managed", name=name, source="salt://issue-51208/vimrc.stub"
  2649. )
  2650. src = os.path.join(RUNTIME_VARS.BASE_FILES, "issue-51208", "vimrc.stub")
  2651. with salt.utils.files.fopen(src, "r") as fp_:
  2652. master_data = fp_.read()
  2653. with salt.utils.files.fopen(name, "r") as fp_:
  2654. minion_data = fp_.read()
  2655. self.assertEqual(master_data, minion_data)
  2656. self.assertSaltTrueReturn(ret)
  2657. @with_tempfile()
  2658. def test_keyvalue(self, name):
  2659. """
  2660. file.keyvalue
  2661. """
  2662. content = dedent(
  2663. """\
  2664. # This is the sshd server system-wide configuration file. See
  2665. # sshd_config(5) for more information.
  2666. # The strategy used for options in the default sshd_config shipped with
  2667. # OpenSSH is to specify options with their default value where
  2668. # possible, but leave them commented. Uncommented options override the
  2669. # default value.
  2670. #Port 22
  2671. #AddressFamily any
  2672. #ListenAddress 0.0.0.0
  2673. #ListenAddress ::
  2674. #HostKey /etc/ssh/ssh_host_rsa_key
  2675. #HostKey /etc/ssh/ssh_host_ecdsa_key
  2676. #HostKey /etc/ssh/ssh_host_ed25519_key
  2677. # Ciphers and keying
  2678. #RekeyLimit default none
  2679. # Logging
  2680. #SyslogFacility AUTH
  2681. #LogLevel INFO
  2682. # Authentication:
  2683. #LoginGraceTime 2m
  2684. #PermitRootLogin prohibit-password
  2685. #StrictModes yes
  2686. #MaxAuthTries 6
  2687. #MaxSessions 10
  2688. """
  2689. )
  2690. with salt.utils.files.fopen(name, "w+") as fp_:
  2691. fp_.write(content)
  2692. ret = self.run_state(
  2693. "file.keyvalue",
  2694. name=name,
  2695. key="permitrootlogin",
  2696. value="no",
  2697. separator=" ",
  2698. uncomment=" #",
  2699. key_ignore_case=True,
  2700. )
  2701. with salt.utils.files.fopen(name, "r") as fp_:
  2702. file_contents = fp_.read()
  2703. self.assertNotIn("#PermitRootLogin", file_contents)
  2704. self.assertNotIn("prohibit-password", file_contents)
  2705. self.assertIn("PermitRootLogin no", file_contents)
  2706. self.assertSaltTrueReturn(ret)
  2707. @pytest.mark.windows_whitelisted
  2708. class BlockreplaceTest(ModuleCase, SaltReturnAssertsMixin):
  2709. marker_start = "# start"
  2710. marker_end = "# end"
  2711. content = dedent(
  2712. """\
  2713. Line 1 of block
  2714. Line 2 of block
  2715. """
  2716. )
  2717. without_block = dedent(
  2718. """\
  2719. Hello world!
  2720. # comment here
  2721. """
  2722. )
  2723. with_non_matching_block = dedent(
  2724. """\
  2725. Hello world!
  2726. # start
  2727. No match here
  2728. # end
  2729. # comment here
  2730. """
  2731. )
  2732. with_non_matching_block_and_marker_end_not_after_newline = dedent(
  2733. """\
  2734. Hello world!
  2735. # start
  2736. No match here# end
  2737. # comment here
  2738. """
  2739. )
  2740. with_matching_block = dedent(
  2741. """\
  2742. Hello world!
  2743. # start
  2744. Line 1 of block
  2745. Line 2 of block
  2746. # end
  2747. # comment here
  2748. """
  2749. )
  2750. with_matching_block_and_extra_newline = dedent(
  2751. """\
  2752. Hello world!
  2753. # start
  2754. Line 1 of block
  2755. Line 2 of block
  2756. # end
  2757. # comment here
  2758. """
  2759. )
  2760. with_matching_block_and_marker_end_not_after_newline = dedent(
  2761. """\
  2762. Hello world!
  2763. # start
  2764. Line 1 of block
  2765. Line 2 of block# end
  2766. # comment here
  2767. """
  2768. )
  2769. content_explicit_posix_newlines = "Line 1 of block\n" "Line 2 of block\n"
  2770. content_explicit_windows_newlines = "Line 1 of block\r\n" "Line 2 of block\r\n"
  2771. without_block_explicit_posix_newlines = "Hello world!\n\n" "# comment here\n"
  2772. without_block_explicit_windows_newlines = (
  2773. "Hello world!\r\n\r\n" "# comment here\r\n"
  2774. )
  2775. with_block_prepended_explicit_posix_newlines = (
  2776. "# start\n"
  2777. "Line 1 of block\n"
  2778. "Line 2 of block\n"
  2779. "# end\n"
  2780. "Hello world!\n\n"
  2781. "# comment here\n"
  2782. )
  2783. with_block_prepended_explicit_windows_newlines = (
  2784. "# start\r\n"
  2785. "Line 1 of block\r\n"
  2786. "Line 2 of block\r\n"
  2787. "# end\r\n"
  2788. "Hello world!\r\n\r\n"
  2789. "# comment here\r\n"
  2790. )
  2791. with_block_appended_explicit_posix_newlines = (
  2792. "Hello world!\n\n"
  2793. "# comment here\n"
  2794. "# start\n"
  2795. "Line 1 of block\n"
  2796. "Line 2 of block\n"
  2797. "# end\n"
  2798. )
  2799. with_block_appended_explicit_windows_newlines = (
  2800. "Hello world!\r\n\r\n"
  2801. "# comment here\r\n"
  2802. "# start\r\n"
  2803. "Line 1 of block\r\n"
  2804. "Line 2 of block\r\n"
  2805. "# end\r\n"
  2806. )
  2807. @staticmethod
  2808. def _write(dest, content):
  2809. with salt.utils.files.fopen(dest, "wb") as fp_:
  2810. fp_.write(salt.utils.stringutils.to_bytes(content))
  2811. @staticmethod
  2812. def _read(src):
  2813. with salt.utils.files.fopen(src, "rb") as fp_:
  2814. return salt.utils.stringutils.to_unicode(fp_.read())
  2815. @with_tempfile()
  2816. def test_prepend(self, name):
  2817. """
  2818. Test blockreplace when prepend_if_not_found=True and block doesn't
  2819. exist in file.
  2820. """
  2821. expected = (
  2822. self.marker_start
  2823. + os.linesep
  2824. + self.content
  2825. + self.marker_end
  2826. + os.linesep
  2827. + self.without_block
  2828. )
  2829. # Pass 1: content ends in newline
  2830. self._write(name, self.without_block)
  2831. ret = self.run_state(
  2832. "file.blockreplace",
  2833. name=name,
  2834. content=self.content,
  2835. marker_start=self.marker_start,
  2836. marker_end=self.marker_end,
  2837. prepend_if_not_found=True,
  2838. )
  2839. self.assertSaltTrueReturn(ret)
  2840. self.assertTrue(ret[next(iter(ret))]["changes"])
  2841. self.assertEqual(self._read(name), expected)
  2842. # Pass 1a: Re-run state, no changes should be made
  2843. ret = self.run_state(
  2844. "file.blockreplace",
  2845. name=name,
  2846. content=self.content,
  2847. marker_start=self.marker_start,
  2848. marker_end=self.marker_end,
  2849. prepend_if_not_found=True,
  2850. )
  2851. self.assertSaltTrueReturn(ret)
  2852. self.assertFalse(ret[next(iter(ret))]["changes"])
  2853. self.assertEqual(self._read(name), expected)
  2854. # Pass 2: content does not end in newline
  2855. self._write(name, self.without_block)
  2856. ret = self.run_state(
  2857. "file.blockreplace",
  2858. name=name,
  2859. content=self.content.rstrip("\r\n"),
  2860. marker_start=self.marker_start,
  2861. marker_end=self.marker_end,
  2862. prepend_if_not_found=True,
  2863. )
  2864. self.assertSaltTrueReturn(ret)
  2865. self.assertTrue(ret[next(iter(ret))]["changes"])
  2866. self.assertEqual(self._read(name), expected)
  2867. # Pass 2a: Re-run state, no changes should be made
  2868. ret = self.run_state(
  2869. "file.blockreplace",
  2870. name=name,
  2871. content=self.content.rstrip("\r\n"),
  2872. marker_start=self.marker_start,
  2873. marker_end=self.marker_end,
  2874. prepend_if_not_found=True,
  2875. )
  2876. self.assertSaltTrueReturn(ret)
  2877. self.assertFalse(ret[next(iter(ret))]["changes"])
  2878. self.assertEqual(self._read(name), expected)
  2879. @with_tempfile()
  2880. def test_prepend_append_newline(self, name):
  2881. """
  2882. Test blockreplace when prepend_if_not_found=True and block doesn't
  2883. exist in file. Test with append_newline explicitly set to True.
  2884. """
  2885. # Pass 1: content ends in newline
  2886. expected = (
  2887. self.marker_start
  2888. + os.linesep
  2889. + self.content
  2890. + os.linesep
  2891. + self.marker_end
  2892. + os.linesep
  2893. + self.without_block
  2894. )
  2895. self._write(name, self.without_block)
  2896. ret = self.run_state(
  2897. "file.blockreplace",
  2898. name=name,
  2899. content=self.content,
  2900. marker_start=self.marker_start,
  2901. marker_end=self.marker_end,
  2902. prepend_if_not_found=True,
  2903. append_newline=True,
  2904. )
  2905. self.assertSaltTrueReturn(ret)
  2906. self.assertTrue(ret[next(iter(ret))]["changes"])
  2907. self.assertEqual(self._read(name), expected)
  2908. # Pass 1a: Re-run state, no changes should be made
  2909. ret = self.run_state(
  2910. "file.blockreplace",
  2911. name=name,
  2912. content=self.content,
  2913. marker_start=self.marker_start,
  2914. marker_end=self.marker_end,
  2915. prepend_if_not_found=True,
  2916. append_newline=True,
  2917. )
  2918. self.assertSaltTrueReturn(ret)
  2919. self.assertFalse(ret[next(iter(ret))]["changes"])
  2920. self.assertEqual(self._read(name), expected)
  2921. # Pass 2: content does not end in newline
  2922. expected = (
  2923. self.marker_start
  2924. + os.linesep
  2925. + self.content
  2926. + self.marker_end
  2927. + os.linesep
  2928. + self.without_block
  2929. )
  2930. self._write(name, self.without_block)
  2931. ret = self.run_state(
  2932. "file.blockreplace",
  2933. name=name,
  2934. content=self.content.rstrip("\r\n"),
  2935. marker_start=self.marker_start,
  2936. marker_end=self.marker_end,
  2937. prepend_if_not_found=True,
  2938. append_newline=True,
  2939. )
  2940. self.assertSaltTrueReturn(ret)
  2941. self.assertTrue(ret[next(iter(ret))]["changes"])
  2942. self.assertEqual(self._read(name), expected)
  2943. # Pass 2a: Re-run state, no changes should be made
  2944. ret = self.run_state(
  2945. "file.blockreplace",
  2946. name=name,
  2947. content=self.content.rstrip("\r\n"),
  2948. marker_start=self.marker_start,
  2949. marker_end=self.marker_end,
  2950. prepend_if_not_found=True,
  2951. append_newline=True,
  2952. )
  2953. self.assertSaltTrueReturn(ret)
  2954. self.assertFalse(ret[next(iter(ret))]["changes"])
  2955. self.assertEqual(self._read(name), expected)
  2956. @with_tempfile()
  2957. def test_prepend_no_append_newline(self, name):
  2958. """
  2959. Test blockreplace when prepend_if_not_found=True and block doesn't
  2960. exist in file. Test with append_newline explicitly set to False.
  2961. """
  2962. # Pass 1: content ends in newline
  2963. expected = (
  2964. self.marker_start
  2965. + os.linesep
  2966. + self.content
  2967. + self.marker_end
  2968. + os.linesep
  2969. + self.without_block
  2970. )
  2971. self._write(name, self.without_block)
  2972. ret = self.run_state(
  2973. "file.blockreplace",
  2974. name=name,
  2975. content=self.content,
  2976. marker_start=self.marker_start,
  2977. marker_end=self.marker_end,
  2978. prepend_if_not_found=True,
  2979. append_newline=False,
  2980. )
  2981. self.assertSaltTrueReturn(ret)
  2982. self.assertTrue(ret[next(iter(ret))]["changes"])
  2983. self.assertEqual(self._read(name), expected)
  2984. # Pass 1a: Re-run state, no changes should be made
  2985. ret = self.run_state(
  2986. "file.blockreplace",
  2987. name=name,
  2988. content=self.content,
  2989. marker_start=self.marker_start,
  2990. marker_end=self.marker_end,
  2991. prepend_if_not_found=True,
  2992. append_newline=False,
  2993. )
  2994. self.assertSaltTrueReturn(ret)
  2995. self.assertFalse(ret[next(iter(ret))]["changes"])
  2996. self.assertEqual(self._read(name), expected)
  2997. # Pass 2: content does not end in newline
  2998. expected = (
  2999. self.marker_start
  3000. + os.linesep
  3001. + self.content.rstrip("\r\n")
  3002. + self.marker_end
  3003. + os.linesep
  3004. + self.without_block
  3005. )
  3006. self._write(name, self.without_block)
  3007. ret = self.run_state(
  3008. "file.blockreplace",
  3009. name=name,
  3010. content=self.content.rstrip("\r\n"),
  3011. marker_start=self.marker_start,
  3012. marker_end=self.marker_end,
  3013. prepend_if_not_found=True,
  3014. append_newline=False,
  3015. )
  3016. self.assertSaltTrueReturn(ret)
  3017. self.assertTrue(ret[next(iter(ret))]["changes"])
  3018. self.assertEqual(self._read(name), expected)
  3019. # Pass 2a: Re-run state, no changes should be made
  3020. ret = self.run_state(
  3021. "file.blockreplace",
  3022. name=name,
  3023. content=self.content.rstrip("\r\n"),
  3024. marker_start=self.marker_start,
  3025. marker_end=self.marker_end,
  3026. prepend_if_not_found=True,
  3027. append_newline=False,
  3028. )
  3029. self.assertSaltTrueReturn(ret)
  3030. self.assertFalse(ret[next(iter(ret))]["changes"])
  3031. self.assertEqual(self._read(name), expected)
  3032. @with_tempfile()
  3033. def test_append(self, name):
  3034. """
  3035. Test blockreplace when append_if_not_found=True and block doesn't
  3036. exist in file.
  3037. """
  3038. expected = (
  3039. self.without_block
  3040. + self.marker_start
  3041. + os.linesep
  3042. + self.content
  3043. + self.marker_end
  3044. + os.linesep
  3045. )
  3046. # Pass 1: content ends in newline
  3047. self._write(name, self.without_block)
  3048. ret = self.run_state(
  3049. "file.blockreplace",
  3050. name=name,
  3051. content=self.content,
  3052. marker_start=self.marker_start,
  3053. marker_end=self.marker_end,
  3054. append_if_not_found=True,
  3055. )
  3056. self.assertSaltTrueReturn(ret)
  3057. self.assertTrue(ret[next(iter(ret))]["changes"])
  3058. self.assertEqual(self._read(name), expected)
  3059. # Pass 1a: Re-run state, no changes should be made
  3060. ret = self.run_state(
  3061. "file.blockreplace",
  3062. name=name,
  3063. content=self.content,
  3064. marker_start=self.marker_start,
  3065. marker_end=self.marker_end,
  3066. append_if_not_found=True,
  3067. )
  3068. self.assertSaltTrueReturn(ret)
  3069. self.assertFalse(ret[next(iter(ret))]["changes"])
  3070. self.assertEqual(self._read(name), expected)
  3071. # Pass 2: content does not end in newline
  3072. self._write(name, self.without_block)
  3073. ret = self.run_state(
  3074. "file.blockreplace",
  3075. name=name,
  3076. content=self.content.rstrip("\r\n"),
  3077. marker_start=self.marker_start,
  3078. marker_end=self.marker_end,
  3079. append_if_not_found=True,
  3080. )
  3081. self.assertSaltTrueReturn(ret)
  3082. self.assertTrue(ret[next(iter(ret))]["changes"])
  3083. self.assertEqual(self._read(name), expected)
  3084. # Pass 2a: Re-run state, no changes should be made
  3085. ret = self.run_state(
  3086. "file.blockreplace",
  3087. name=name,
  3088. content=self.content.rstrip("\r\n"),
  3089. marker_start=self.marker_start,
  3090. marker_end=self.marker_end,
  3091. append_if_not_found=True,
  3092. )
  3093. self.assertSaltTrueReturn(ret)
  3094. self.assertFalse(ret[next(iter(ret))]["changes"])
  3095. self.assertEqual(self._read(name), expected)
  3096. @with_tempfile()
  3097. def test_append_append_newline(self, name):
  3098. """
  3099. Test blockreplace when append_if_not_found=True and block doesn't
  3100. exist in file. Test with append_newline explicitly set to True.
  3101. """
  3102. # Pass 1: content ends in newline
  3103. expected = (
  3104. self.without_block
  3105. + self.marker_start
  3106. + os.linesep
  3107. + self.content
  3108. + os.linesep
  3109. + self.marker_end
  3110. + os.linesep
  3111. )
  3112. self._write(name, self.without_block)
  3113. ret = self.run_state(
  3114. "file.blockreplace",
  3115. name=name,
  3116. content=self.content,
  3117. marker_start=self.marker_start,
  3118. marker_end=self.marker_end,
  3119. append_if_not_found=True,
  3120. append_newline=True,
  3121. )
  3122. self.assertSaltTrueReturn(ret)
  3123. self.assertTrue(ret[next(iter(ret))]["changes"])
  3124. self.assertEqual(self._read(name), expected)
  3125. # Pass 1a: Re-run state, no changes should be made
  3126. ret = self.run_state(
  3127. "file.blockreplace",
  3128. name=name,
  3129. content=self.content,
  3130. marker_start=self.marker_start,
  3131. marker_end=self.marker_end,
  3132. append_if_not_found=True,
  3133. append_newline=True,
  3134. )
  3135. self.assertSaltTrueReturn(ret)
  3136. self.assertFalse(ret[next(iter(ret))]["changes"])
  3137. self.assertEqual(self._read(name), expected)
  3138. # Pass 2: content does not end in newline
  3139. expected = (
  3140. self.without_block
  3141. + self.marker_start
  3142. + os.linesep
  3143. + self.content
  3144. + self.marker_end
  3145. + os.linesep
  3146. )
  3147. self._write(name, self.without_block)
  3148. ret = self.run_state(
  3149. "file.blockreplace",
  3150. name=name,
  3151. content=self.content.rstrip("\r\n"),
  3152. marker_start=self.marker_start,
  3153. marker_end=self.marker_end,
  3154. append_if_not_found=True,
  3155. append_newline=True,
  3156. )
  3157. self.assertSaltTrueReturn(ret)
  3158. self.assertTrue(ret[next(iter(ret))]["changes"])
  3159. self.assertEqual(self._read(name), expected)
  3160. # Pass 2a: Re-run state, no changes should be made
  3161. ret = self.run_state(
  3162. "file.blockreplace",
  3163. name=name,
  3164. content=self.content.rstrip("\r\n"),
  3165. marker_start=self.marker_start,
  3166. marker_end=self.marker_end,
  3167. append_if_not_found=True,
  3168. append_newline=True,
  3169. )
  3170. self.assertSaltTrueReturn(ret)
  3171. self.assertFalse(ret[next(iter(ret))]["changes"])
  3172. self.assertEqual(self._read(name), expected)
  3173. @with_tempfile()
  3174. def test_append_no_append_newline(self, name):
  3175. """
  3176. Test blockreplace when append_if_not_found=True and block doesn't
  3177. exist in file. Test with append_newline explicitly set to False.
  3178. """
  3179. # Pass 1: content ends in newline
  3180. expected = (
  3181. self.without_block
  3182. + self.marker_start
  3183. + os.linesep
  3184. + self.content
  3185. + self.marker_end
  3186. + os.linesep
  3187. )
  3188. self._write(name, self.without_block)
  3189. ret = self.run_state(
  3190. "file.blockreplace",
  3191. name=name,
  3192. content=self.content,
  3193. marker_start=self.marker_start,
  3194. marker_end=self.marker_end,
  3195. append_if_not_found=True,
  3196. append_newline=False,
  3197. )
  3198. self.assertSaltTrueReturn(ret)
  3199. self.assertTrue(ret[next(iter(ret))]["changes"])
  3200. self.assertEqual(self._read(name), expected)
  3201. # Pass 1a: Re-run state, no changes should be made
  3202. ret = self.run_state(
  3203. "file.blockreplace",
  3204. name=name,
  3205. content=self.content,
  3206. marker_start=self.marker_start,
  3207. marker_end=self.marker_end,
  3208. append_if_not_found=True,
  3209. append_newline=False,
  3210. )
  3211. self.assertSaltTrueReturn(ret)
  3212. self.assertFalse(ret[next(iter(ret))]["changes"])
  3213. self.assertEqual(self._read(name), expected)
  3214. # Pass 2: content does not end in newline
  3215. expected = (
  3216. self.without_block
  3217. + self.marker_start
  3218. + os.linesep
  3219. + self.content.rstrip("\r\n")
  3220. + self.marker_end
  3221. + os.linesep
  3222. )
  3223. self._write(name, self.without_block)
  3224. ret = self.run_state(
  3225. "file.blockreplace",
  3226. name=name,
  3227. content=self.content.rstrip("\r\n"),
  3228. marker_start=self.marker_start,
  3229. marker_end=self.marker_end,
  3230. append_if_not_found=True,
  3231. append_newline=False,
  3232. )
  3233. self.assertSaltTrueReturn(ret)
  3234. self.assertTrue(ret[next(iter(ret))]["changes"])
  3235. self.assertEqual(self._read(name), expected)
  3236. # Pass 2a: Re-run state, no changes should be made
  3237. ret = self.run_state(
  3238. "file.blockreplace",
  3239. name=name,
  3240. content=self.content.rstrip("\r\n"),
  3241. marker_start=self.marker_start,
  3242. marker_end=self.marker_end,
  3243. append_if_not_found=True,
  3244. append_newline=False,
  3245. )
  3246. self.assertSaltTrueReturn(ret)
  3247. self.assertFalse(ret[next(iter(ret))]["changes"])
  3248. self.assertEqual(self._read(name), expected)
  3249. @with_tempfile()
  3250. def test_prepend_auto_line_separator(self, name):
  3251. """
  3252. This tests the line separator auto-detection when prepending the block
  3253. """
  3254. # POSIX newlines to Windows newlines
  3255. self._write(name, self.without_block_explicit_windows_newlines)
  3256. ret = self.run_state(
  3257. "file.blockreplace",
  3258. name=name,
  3259. content=self.content_explicit_posix_newlines,
  3260. marker_start=self.marker_start,
  3261. marker_end=self.marker_end,
  3262. prepend_if_not_found=True,
  3263. )
  3264. self.assertSaltTrueReturn(ret)
  3265. self.assertTrue(ret[next(iter(ret))]["changes"])
  3266. self.assertEqual(
  3267. self._read(name), self.with_block_prepended_explicit_windows_newlines
  3268. )
  3269. # Re-run state, no changes should be made
  3270. ret = self.run_state(
  3271. "file.blockreplace",
  3272. name=name,
  3273. content=self.content_explicit_posix_newlines,
  3274. marker_start=self.marker_start,
  3275. marker_end=self.marker_end,
  3276. prepend_if_not_found=True,
  3277. )
  3278. self.assertSaltTrueReturn(ret)
  3279. self.assertFalse(ret[next(iter(ret))]["changes"])
  3280. self.assertEqual(
  3281. self._read(name), self.with_block_prepended_explicit_windows_newlines
  3282. )
  3283. # Windows newlines to POSIX newlines
  3284. self._write(name, self.without_block_explicit_posix_newlines)
  3285. ret = self.run_state(
  3286. "file.blockreplace",
  3287. name=name,
  3288. content=self.content_explicit_windows_newlines,
  3289. marker_start=self.marker_start,
  3290. marker_end=self.marker_end,
  3291. prepend_if_not_found=True,
  3292. )
  3293. self.assertSaltTrueReturn(ret)
  3294. self.assertTrue(ret[next(iter(ret))]["changes"])
  3295. self.assertEqual(
  3296. self._read(name), self.with_block_prepended_explicit_posix_newlines
  3297. )
  3298. # Re-run state, no changes should be made
  3299. ret = self.run_state(
  3300. "file.blockreplace",
  3301. name=name,
  3302. content=self.content_explicit_windows_newlines,
  3303. marker_start=self.marker_start,
  3304. marker_end=self.marker_end,
  3305. prepend_if_not_found=True,
  3306. )
  3307. self.assertSaltTrueReturn(ret)
  3308. self.assertFalse(ret[next(iter(ret))]["changes"])
  3309. self.assertEqual(
  3310. self._read(name), self.with_block_prepended_explicit_posix_newlines
  3311. )
  3312. @with_tempfile()
  3313. def test_append_auto_line_separator(self, name):
  3314. """
  3315. This tests the line separator auto-detection when appending the block
  3316. """
  3317. # POSIX newlines to Windows newlines
  3318. self._write(name, self.without_block_explicit_windows_newlines)
  3319. ret = self.run_state(
  3320. "file.blockreplace",
  3321. name=name,
  3322. content=self.content_explicit_posix_newlines,
  3323. marker_start=self.marker_start,
  3324. marker_end=self.marker_end,
  3325. append_if_not_found=True,
  3326. )
  3327. self.assertSaltTrueReturn(ret)
  3328. self.assertTrue(ret[next(iter(ret))]["changes"])
  3329. self.assertEqual(
  3330. self._read(name), self.with_block_appended_explicit_windows_newlines
  3331. )
  3332. # Re-run state, no changes should be made
  3333. ret = self.run_state(
  3334. "file.blockreplace",
  3335. name=name,
  3336. content=self.content_explicit_posix_newlines,
  3337. marker_start=self.marker_start,
  3338. marker_end=self.marker_end,
  3339. append_if_not_found=True,
  3340. )
  3341. self.assertSaltTrueReturn(ret)
  3342. self.assertFalse(ret[next(iter(ret))]["changes"])
  3343. self.assertEqual(
  3344. self._read(name), self.with_block_appended_explicit_windows_newlines
  3345. )
  3346. # Windows newlines to POSIX newlines
  3347. self._write(name, self.without_block_explicit_posix_newlines)
  3348. ret = self.run_state(
  3349. "file.blockreplace",
  3350. name=name,
  3351. content=self.content_explicit_windows_newlines,
  3352. marker_start=self.marker_start,
  3353. marker_end=self.marker_end,
  3354. append_if_not_found=True,
  3355. )
  3356. self.assertSaltTrueReturn(ret)
  3357. self.assertTrue(ret[next(iter(ret))]["changes"])
  3358. self.assertEqual(
  3359. self._read(name), self.with_block_appended_explicit_posix_newlines
  3360. )
  3361. # Re-run state, no changes should be made
  3362. ret = self.run_state(
  3363. "file.blockreplace",
  3364. name=name,
  3365. content=self.content_explicit_windows_newlines,
  3366. marker_start=self.marker_start,
  3367. marker_end=self.marker_end,
  3368. append_if_not_found=True,
  3369. )
  3370. self.assertSaltTrueReturn(ret)
  3371. self.assertFalse(ret[next(iter(ret))]["changes"])
  3372. self.assertEqual(
  3373. self._read(name), self.with_block_appended_explicit_posix_newlines
  3374. )
  3375. @with_tempfile()
  3376. def test_non_matching_block(self, name):
  3377. """
  3378. Test blockreplace when block exists but its contents are not a
  3379. match.
  3380. """
  3381. # Pass 1: content ends in newline
  3382. self._write(name, self.with_non_matching_block)
  3383. ret = self.run_state(
  3384. "file.blockreplace",
  3385. name=name,
  3386. content=self.content,
  3387. marker_start=self.marker_start,
  3388. marker_end=self.marker_end,
  3389. )
  3390. self.assertSaltTrueReturn(ret)
  3391. self.assertTrue(ret[next(iter(ret))]["changes"])
  3392. self.assertEqual(self._read(name), self.with_matching_block)
  3393. # Pass 1a: Re-run state, no changes should be made
  3394. ret = self.run_state(
  3395. "file.blockreplace",
  3396. name=name,
  3397. content=self.content,
  3398. marker_start=self.marker_start,
  3399. marker_end=self.marker_end,
  3400. )
  3401. self.assertSaltTrueReturn(ret)
  3402. self.assertFalse(ret[next(iter(ret))]["changes"])
  3403. self.assertEqual(self._read(name), self.with_matching_block)
  3404. # Pass 2: content does not end in newline
  3405. self._write(name, self.with_non_matching_block)
  3406. ret = self.run_state(
  3407. "file.blockreplace",
  3408. name=name,
  3409. content=self.content.rstrip("\r\n"),
  3410. marker_start=self.marker_start,
  3411. marker_end=self.marker_end,
  3412. )
  3413. self.assertSaltTrueReturn(ret)
  3414. self.assertTrue(ret[next(iter(ret))]["changes"])
  3415. self.assertEqual(self._read(name), self.with_matching_block)
  3416. # Pass 2a: Re-run state, no changes should be made
  3417. ret = self.run_state(
  3418. "file.blockreplace",
  3419. name=name,
  3420. content=self.content.rstrip("\r\n"),
  3421. marker_start=self.marker_start,
  3422. marker_end=self.marker_end,
  3423. )
  3424. self.assertSaltTrueReturn(ret)
  3425. self.assertFalse(ret[next(iter(ret))]["changes"])
  3426. self.assertEqual(self._read(name), self.with_matching_block)
  3427. @with_tempfile()
  3428. def test_non_matching_block_append_newline(self, name):
  3429. """
  3430. Test blockreplace when block exists but its contents are not a
  3431. match. Test with append_newline explicitly set to True.
  3432. """
  3433. # Pass 1: content ends in newline
  3434. self._write(name, self.with_non_matching_block)
  3435. ret = self.run_state(
  3436. "file.blockreplace",
  3437. name=name,
  3438. content=self.content,
  3439. marker_start=self.marker_start,
  3440. marker_end=self.marker_end,
  3441. append_newline=True,
  3442. )
  3443. self.assertSaltTrueReturn(ret)
  3444. self.assertTrue(ret[next(iter(ret))]["changes"])
  3445. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3446. # Pass 1a: Re-run state, no changes should be made
  3447. ret = self.run_state(
  3448. "file.blockreplace",
  3449. name=name,
  3450. content=self.content,
  3451. marker_start=self.marker_start,
  3452. marker_end=self.marker_end,
  3453. append_newline=True,
  3454. )
  3455. self.assertSaltTrueReturn(ret)
  3456. self.assertFalse(ret[next(iter(ret))]["changes"])
  3457. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3458. # Pass 2: content does not end in newline
  3459. self._write(name, self.with_non_matching_block)
  3460. ret = self.run_state(
  3461. "file.blockreplace",
  3462. name=name,
  3463. content=self.content.rstrip("\r\n"),
  3464. marker_start=self.marker_start,
  3465. marker_end=self.marker_end,
  3466. append_newline=True,
  3467. )
  3468. self.assertSaltTrueReturn(ret)
  3469. self.assertTrue(ret[next(iter(ret))]["changes"])
  3470. self.assertEqual(self._read(name), self.with_matching_block)
  3471. # Pass 2a: Re-run state, no changes should be made
  3472. ret = self.run_state(
  3473. "file.blockreplace",
  3474. name=name,
  3475. content=self.content.rstrip("\r\n"),
  3476. marker_start=self.marker_start,
  3477. marker_end=self.marker_end,
  3478. append_newline=True,
  3479. )
  3480. self.assertSaltTrueReturn(ret)
  3481. self.assertFalse(ret[next(iter(ret))]["changes"])
  3482. self.assertEqual(self._read(name), self.with_matching_block)
  3483. @with_tempfile()
  3484. def test_non_matching_block_no_append_newline(self, name):
  3485. """
  3486. Test blockreplace when block exists but its contents are not a
  3487. match. Test with append_newline explicitly set to False.
  3488. """
  3489. # Pass 1: content ends in newline
  3490. self._write(name, self.with_non_matching_block)
  3491. ret = self.run_state(
  3492. "file.blockreplace",
  3493. name=name,
  3494. content=self.content,
  3495. marker_start=self.marker_start,
  3496. marker_end=self.marker_end,
  3497. append_newline=False,
  3498. )
  3499. self.assertSaltTrueReturn(ret)
  3500. self.assertTrue(ret[next(iter(ret))]["changes"])
  3501. self.assertEqual(self._read(name), self.with_matching_block)
  3502. # Pass 1a: Re-run state, no changes should be made
  3503. ret = self.run_state(
  3504. "file.blockreplace",
  3505. name=name,
  3506. content=self.content,
  3507. marker_start=self.marker_start,
  3508. marker_end=self.marker_end,
  3509. append_newline=False,
  3510. )
  3511. self.assertSaltTrueReturn(ret)
  3512. self.assertFalse(ret[next(iter(ret))]["changes"])
  3513. self.assertEqual(self._read(name), self.with_matching_block)
  3514. # Pass 2: content does not end in newline
  3515. self._write(name, self.with_non_matching_block)
  3516. ret = self.run_state(
  3517. "file.blockreplace",
  3518. name=name,
  3519. content=self.content.rstrip("\r\n"),
  3520. marker_start=self.marker_start,
  3521. marker_end=self.marker_end,
  3522. append_newline=False,
  3523. )
  3524. self.assertSaltTrueReturn(ret)
  3525. self.assertTrue(ret[next(iter(ret))]["changes"])
  3526. self.assertEqual(
  3527. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3528. )
  3529. # Pass 2a: Re-run state, no changes should be made
  3530. ret = self.run_state(
  3531. "file.blockreplace",
  3532. name=name,
  3533. content=self.content.rstrip("\r\n"),
  3534. marker_start=self.marker_start,
  3535. marker_end=self.marker_end,
  3536. append_newline=False,
  3537. )
  3538. self.assertSaltTrueReturn(ret)
  3539. self.assertFalse(ret[next(iter(ret))]["changes"])
  3540. self.assertEqual(
  3541. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3542. )
  3543. @with_tempfile()
  3544. def test_non_matching_block_and_marker_not_after_newline(self, name):
  3545. """
  3546. Test blockreplace when block exists but its contents are not a
  3547. match, and the marker_end is not directly preceded by a newline.
  3548. """
  3549. # Pass 1: content ends in newline
  3550. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3551. ret = self.run_state(
  3552. "file.blockreplace",
  3553. name=name,
  3554. content=self.content,
  3555. marker_start=self.marker_start,
  3556. marker_end=self.marker_end,
  3557. )
  3558. self.assertSaltTrueReturn(ret)
  3559. self.assertTrue(ret[next(iter(ret))]["changes"])
  3560. self.assertEqual(self._read(name), self.with_matching_block)
  3561. # Pass 1a: Re-run state, no changes should be made
  3562. ret = self.run_state(
  3563. "file.blockreplace",
  3564. name=name,
  3565. content=self.content,
  3566. marker_start=self.marker_start,
  3567. marker_end=self.marker_end,
  3568. )
  3569. self.assertSaltTrueReturn(ret)
  3570. self.assertFalse(ret[next(iter(ret))]["changes"])
  3571. self.assertEqual(self._read(name), self.with_matching_block)
  3572. # Pass 2: content does not end in newline
  3573. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3574. ret = self.run_state(
  3575. "file.blockreplace",
  3576. name=name,
  3577. content=self.content.rstrip("\r\n"),
  3578. marker_start=self.marker_start,
  3579. marker_end=self.marker_end,
  3580. )
  3581. self.assertSaltTrueReturn(ret)
  3582. self.assertTrue(ret[next(iter(ret))]["changes"])
  3583. self.assertEqual(self._read(name), self.with_matching_block)
  3584. # Pass 2a: Re-run state, no changes should be made
  3585. ret = self.run_state(
  3586. "file.blockreplace",
  3587. name=name,
  3588. content=self.content.rstrip("\r\n"),
  3589. marker_start=self.marker_start,
  3590. marker_end=self.marker_end,
  3591. )
  3592. self.assertSaltTrueReturn(ret)
  3593. self.assertFalse(ret[next(iter(ret))]["changes"])
  3594. self.assertEqual(self._read(name), self.with_matching_block)
  3595. @with_tempfile()
  3596. def test_non_matching_block_and_marker_not_after_newline_append_newline(self, name):
  3597. """
  3598. Test blockreplace when block exists but its contents are not a match,
  3599. and the marker_end is not directly preceded by a newline. Test with
  3600. append_newline explicitly set to True.
  3601. """
  3602. # Pass 1: content ends in newline
  3603. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3604. ret = self.run_state(
  3605. "file.blockreplace",
  3606. name=name,
  3607. content=self.content,
  3608. marker_start=self.marker_start,
  3609. marker_end=self.marker_end,
  3610. append_newline=True,
  3611. )
  3612. self.assertSaltTrueReturn(ret)
  3613. self.assertTrue(ret[next(iter(ret))]["changes"])
  3614. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3615. # Pass 1a: Re-run state, no changes should be made
  3616. ret = self.run_state(
  3617. "file.blockreplace",
  3618. name=name,
  3619. content=self.content,
  3620. marker_start=self.marker_start,
  3621. marker_end=self.marker_end,
  3622. append_newline=True,
  3623. )
  3624. self.assertSaltTrueReturn(ret)
  3625. self.assertFalse(ret[next(iter(ret))]["changes"])
  3626. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3627. # Pass 2: content does not end in newline
  3628. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3629. ret = self.run_state(
  3630. "file.blockreplace",
  3631. name=name,
  3632. content=self.content.rstrip("\r\n"),
  3633. marker_start=self.marker_start,
  3634. marker_end=self.marker_end,
  3635. append_newline=True,
  3636. )
  3637. self.assertSaltTrueReturn(ret)
  3638. self.assertTrue(ret[next(iter(ret))]["changes"])
  3639. self.assertEqual(self._read(name), self.with_matching_block)
  3640. # Pass 2a: Re-run state, no changes should be made
  3641. ret = self.run_state(
  3642. "file.blockreplace",
  3643. name=name,
  3644. content=self.content.rstrip("\r\n"),
  3645. marker_start=self.marker_start,
  3646. marker_end=self.marker_end,
  3647. append_newline=True,
  3648. )
  3649. self.assertSaltTrueReturn(ret)
  3650. self.assertFalse(ret[next(iter(ret))]["changes"])
  3651. self.assertEqual(self._read(name), self.with_matching_block)
  3652. @with_tempfile()
  3653. def test_non_matching_block_and_marker_not_after_newline_no_append_newline(
  3654. self, name
  3655. ):
  3656. """
  3657. Test blockreplace when block exists but its contents are not a match,
  3658. and the marker_end is not directly preceded by a newline. Test with
  3659. append_newline explicitly set to False.
  3660. """
  3661. # Pass 1: content ends in newline
  3662. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3663. ret = self.run_state(
  3664. "file.blockreplace",
  3665. name=name,
  3666. content=self.content,
  3667. marker_start=self.marker_start,
  3668. marker_end=self.marker_end,
  3669. append_newline=False,
  3670. )
  3671. self.assertSaltTrueReturn(ret)
  3672. self.assertTrue(ret[next(iter(ret))]["changes"])
  3673. self.assertEqual(self._read(name), self.with_matching_block)
  3674. # Pass 1a: Re-run state, no changes should be made
  3675. ret = self.run_state(
  3676. "file.blockreplace",
  3677. name=name,
  3678. content=self.content,
  3679. marker_start=self.marker_start,
  3680. marker_end=self.marker_end,
  3681. append_newline=False,
  3682. )
  3683. self.assertSaltTrueReturn(ret)
  3684. self.assertFalse(ret[next(iter(ret))]["changes"])
  3685. self.assertEqual(self._read(name), self.with_matching_block)
  3686. # Pass 2: content does not end in newline
  3687. self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
  3688. ret = self.run_state(
  3689. "file.blockreplace",
  3690. name=name,
  3691. content=self.content.rstrip("\r\n"),
  3692. marker_start=self.marker_start,
  3693. marker_end=self.marker_end,
  3694. append_newline=False,
  3695. )
  3696. self.assertSaltTrueReturn(ret)
  3697. self.assertTrue(ret[next(iter(ret))]["changes"])
  3698. self.assertEqual(
  3699. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3700. )
  3701. # Pass 2a: Re-run state, no changes should be made
  3702. ret = self.run_state(
  3703. "file.blockreplace",
  3704. name=name,
  3705. content=self.content.rstrip("\r\n"),
  3706. marker_start=self.marker_start,
  3707. marker_end=self.marker_end,
  3708. append_newline=False,
  3709. )
  3710. self.assertSaltTrueReturn(ret)
  3711. self.assertFalse(ret[next(iter(ret))]["changes"])
  3712. self.assertEqual(
  3713. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3714. )
  3715. @with_tempfile()
  3716. def test_matching_block(self, name):
  3717. """
  3718. Test blockreplace when block exists and its contents are a match. No
  3719. changes should be made.
  3720. """
  3721. # Pass 1: content ends in newline
  3722. self._write(name, self.with_matching_block)
  3723. ret = self.run_state(
  3724. "file.blockreplace",
  3725. name=name,
  3726. content=self.content,
  3727. marker_start=self.marker_start,
  3728. marker_end=self.marker_end,
  3729. )
  3730. self.assertSaltTrueReturn(ret)
  3731. self.assertFalse(ret[next(iter(ret))]["changes"])
  3732. self.assertEqual(self._read(name), self.with_matching_block)
  3733. # Pass 1a: Re-run state, no changes should be made
  3734. ret = self.run_state(
  3735. "file.blockreplace",
  3736. name=name,
  3737. content=self.content,
  3738. marker_start=self.marker_start,
  3739. marker_end=self.marker_end,
  3740. )
  3741. self.assertSaltTrueReturn(ret)
  3742. self.assertFalse(ret[next(iter(ret))]["changes"])
  3743. self.assertEqual(self._read(name), self.with_matching_block)
  3744. # Pass 2: content does not end in newline
  3745. self._write(name, self.with_matching_block)
  3746. ret = self.run_state(
  3747. "file.blockreplace",
  3748. name=name,
  3749. content=self.content.rstrip("\r\n"),
  3750. marker_start=self.marker_start,
  3751. marker_end=self.marker_end,
  3752. )
  3753. self.assertSaltTrueReturn(ret)
  3754. self.assertFalse(ret[next(iter(ret))]["changes"])
  3755. self.assertEqual(self._read(name), self.with_matching_block)
  3756. # Pass 2a: Re-run state, no changes should be made
  3757. ret = self.run_state(
  3758. "file.blockreplace",
  3759. name=name,
  3760. content=self.content.rstrip("\r\n"),
  3761. marker_start=self.marker_start,
  3762. marker_end=self.marker_end,
  3763. )
  3764. self.assertSaltTrueReturn(ret)
  3765. self.assertFalse(ret[next(iter(ret))]["changes"])
  3766. self.assertEqual(self._read(name), self.with_matching_block)
  3767. @with_tempfile()
  3768. def test_matching_block_append_newline(self, name):
  3769. """
  3770. Test blockreplace when block exists and its contents are a match. Test
  3771. with append_newline explicitly set to True. This will result in an
  3772. extra newline when the content ends in a newline, and will not when the
  3773. content does not end in a newline.
  3774. """
  3775. # Pass 1: content ends in newline
  3776. self._write(name, self.with_matching_block)
  3777. ret = self.run_state(
  3778. "file.blockreplace",
  3779. name=name,
  3780. content=self.content,
  3781. marker_start=self.marker_start,
  3782. marker_end=self.marker_end,
  3783. append_newline=True,
  3784. )
  3785. self.assertSaltTrueReturn(ret)
  3786. self.assertTrue(ret[next(iter(ret))]["changes"])
  3787. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3788. # Pass 1a: Re-run state, no changes should be made
  3789. ret = self.run_state(
  3790. "file.blockreplace",
  3791. name=name,
  3792. content=self.content,
  3793. marker_start=self.marker_start,
  3794. marker_end=self.marker_end,
  3795. append_newline=True,
  3796. )
  3797. self.assertSaltTrueReturn(ret)
  3798. self.assertFalse(ret[next(iter(ret))]["changes"])
  3799. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3800. # Pass 2: content does not end in newline
  3801. self._write(name, self.with_matching_block)
  3802. ret = self.run_state(
  3803. "file.blockreplace",
  3804. name=name,
  3805. content=self.content.rstrip("\r\n"),
  3806. marker_start=self.marker_start,
  3807. marker_end=self.marker_end,
  3808. append_newline=True,
  3809. )
  3810. self.assertSaltTrueReturn(ret)
  3811. self.assertFalse(ret[next(iter(ret))]["changes"])
  3812. self.assertEqual(self._read(name), self.with_matching_block)
  3813. # Pass 2a: Re-run state, no changes should be made
  3814. ret = self.run_state(
  3815. "file.blockreplace",
  3816. name=name,
  3817. content=self.content.rstrip("\r\n"),
  3818. marker_start=self.marker_start,
  3819. marker_end=self.marker_end,
  3820. append_newline=True,
  3821. )
  3822. self.assertSaltTrueReturn(ret)
  3823. self.assertFalse(ret[next(iter(ret))]["changes"])
  3824. self.assertEqual(self._read(name), self.with_matching_block)
  3825. @with_tempfile()
  3826. def test_matching_block_no_append_newline(self, name):
  3827. """
  3828. Test blockreplace when block exists and its contents are a match. Test
  3829. with append_newline explicitly set to False. This will result in the
  3830. marker_end not being directly preceded by a newline when the content
  3831. does not end in a newline.
  3832. """
  3833. # Pass 1: content ends in newline
  3834. self._write(name, self.with_matching_block)
  3835. ret = self.run_state(
  3836. "file.blockreplace",
  3837. name=name,
  3838. content=self.content,
  3839. marker_start=self.marker_start,
  3840. marker_end=self.marker_end,
  3841. append_newline=False,
  3842. )
  3843. self.assertSaltTrueReturn(ret)
  3844. self.assertFalse(ret[next(iter(ret))]["changes"])
  3845. self.assertEqual(self._read(name), self.with_matching_block)
  3846. # Pass 1a: Re-run state, no changes should be made
  3847. ret = self.run_state(
  3848. "file.blockreplace",
  3849. name=name,
  3850. content=self.content,
  3851. marker_start=self.marker_start,
  3852. marker_end=self.marker_end,
  3853. append_newline=False,
  3854. )
  3855. self.assertSaltTrueReturn(ret)
  3856. self.assertFalse(ret[next(iter(ret))]["changes"])
  3857. self.assertEqual(self._read(name), self.with_matching_block)
  3858. # Pass 2: content does not end in newline
  3859. self._write(name, self.with_matching_block)
  3860. ret = self.run_state(
  3861. "file.blockreplace",
  3862. name=name,
  3863. content=self.content.rstrip("\r\n"),
  3864. marker_start=self.marker_start,
  3865. marker_end=self.marker_end,
  3866. append_newline=False,
  3867. )
  3868. self.assertSaltTrueReturn(ret)
  3869. self.assertTrue(ret[next(iter(ret))]["changes"])
  3870. self.assertEqual(
  3871. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3872. )
  3873. # Pass 2a: Re-run state, no changes should be made
  3874. ret = self.run_state(
  3875. "file.blockreplace",
  3876. name=name,
  3877. content=self.content.rstrip("\r\n"),
  3878. marker_start=self.marker_start,
  3879. marker_end=self.marker_end,
  3880. append_newline=False,
  3881. )
  3882. self.assertSaltTrueReturn(ret)
  3883. self.assertFalse(ret[next(iter(ret))]["changes"])
  3884. self.assertEqual(
  3885. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  3886. )
  3887. @with_tempfile()
  3888. def test_matching_block_and_marker_not_after_newline(self, name):
  3889. """
  3890. Test blockreplace when block exists and its contents are a match, but
  3891. the marker_end is not directly preceded by a newline.
  3892. """
  3893. # Pass 1: content ends in newline
  3894. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3895. ret = self.run_state(
  3896. "file.blockreplace",
  3897. name=name,
  3898. content=self.content,
  3899. marker_start=self.marker_start,
  3900. marker_end=self.marker_end,
  3901. )
  3902. self.assertSaltTrueReturn(ret)
  3903. self.assertTrue(ret[next(iter(ret))]["changes"])
  3904. self.assertEqual(self._read(name), self.with_matching_block)
  3905. # Pass 1a: Re-run state, no changes should be made
  3906. ret = self.run_state(
  3907. "file.blockreplace",
  3908. name=name,
  3909. content=self.content,
  3910. marker_start=self.marker_start,
  3911. marker_end=self.marker_end,
  3912. )
  3913. self.assertSaltTrueReturn(ret)
  3914. self.assertFalse(ret[next(iter(ret))]["changes"])
  3915. self.assertEqual(self._read(name), self.with_matching_block)
  3916. # Pass 2: content does not end in newline
  3917. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3918. ret = self.run_state(
  3919. "file.blockreplace",
  3920. name=name,
  3921. content=self.content.rstrip("\r\n"),
  3922. marker_start=self.marker_start,
  3923. marker_end=self.marker_end,
  3924. )
  3925. self.assertSaltTrueReturn(ret)
  3926. self.assertTrue(ret[next(iter(ret))]["changes"])
  3927. self.assertEqual(self._read(name), self.with_matching_block)
  3928. # Pass 2a: Re-run state, no changes should be made
  3929. ret = self.run_state(
  3930. "file.blockreplace",
  3931. name=name,
  3932. content=self.content.rstrip("\r\n"),
  3933. marker_start=self.marker_start,
  3934. marker_end=self.marker_end,
  3935. )
  3936. self.assertSaltTrueReturn(ret)
  3937. self.assertFalse(ret[next(iter(ret))]["changes"])
  3938. self.assertEqual(self._read(name), self.with_matching_block)
  3939. @with_tempfile()
  3940. def test_matching_block_and_marker_not_after_newline_append_newline(self, name):
  3941. """
  3942. Test blockreplace when block exists and its contents are a match, but
  3943. the marker_end is not directly preceded by a newline. Test with
  3944. append_newline explicitly set to True. This will result in an extra
  3945. newline when the content ends in a newline, and will not when the
  3946. content does not end in a newline.
  3947. """
  3948. # Pass 1: content ends in newline
  3949. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3950. ret = self.run_state(
  3951. "file.blockreplace",
  3952. name=name,
  3953. content=self.content,
  3954. marker_start=self.marker_start,
  3955. marker_end=self.marker_end,
  3956. append_newline=True,
  3957. )
  3958. self.assertSaltTrueReturn(ret)
  3959. self.assertTrue(ret[next(iter(ret))]["changes"])
  3960. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3961. # Pass 1a: Re-run state, no changes should be made
  3962. ret = self.run_state(
  3963. "file.blockreplace",
  3964. name=name,
  3965. content=self.content,
  3966. marker_start=self.marker_start,
  3967. marker_end=self.marker_end,
  3968. append_newline=True,
  3969. )
  3970. self.assertSaltTrueReturn(ret)
  3971. self.assertFalse(ret[next(iter(ret))]["changes"])
  3972. self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
  3973. # Pass 2: content does not end in newline
  3974. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  3975. ret = self.run_state(
  3976. "file.blockreplace",
  3977. name=name,
  3978. content=self.content.rstrip("\r\n"),
  3979. marker_start=self.marker_start,
  3980. marker_end=self.marker_end,
  3981. append_newline=True,
  3982. )
  3983. self.assertSaltTrueReturn(ret)
  3984. self.assertTrue(ret[next(iter(ret))]["changes"])
  3985. self.assertEqual(self._read(name), self.with_matching_block)
  3986. # Pass 2a: Re-run state, no changes should be made
  3987. ret = self.run_state(
  3988. "file.blockreplace",
  3989. name=name,
  3990. content=self.content.rstrip("\r\n"),
  3991. marker_start=self.marker_start,
  3992. marker_end=self.marker_end,
  3993. append_newline=True,
  3994. )
  3995. self.assertSaltTrueReturn(ret)
  3996. self.assertFalse(ret[next(iter(ret))]["changes"])
  3997. self.assertEqual(self._read(name), self.with_matching_block)
  3998. @with_tempfile()
  3999. def test_matching_block_and_marker_not_after_newline_no_append_newline(self, name):
  4000. """
  4001. Test blockreplace when block exists and its contents are a match, but
  4002. the marker_end is not directly preceded by a newline. Test with
  4003. append_newline explicitly set to False.
  4004. """
  4005. # Pass 1: content ends in newline
  4006. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  4007. ret = self.run_state(
  4008. "file.blockreplace",
  4009. name=name,
  4010. content=self.content,
  4011. marker_start=self.marker_start,
  4012. marker_end=self.marker_end,
  4013. append_newline=False,
  4014. )
  4015. self.assertSaltTrueReturn(ret)
  4016. self.assertTrue(ret[next(iter(ret))]["changes"])
  4017. self.assertEqual(self._read(name), self.with_matching_block)
  4018. # Pass 1a: Re-run state, no changes should be made
  4019. ret = self.run_state(
  4020. "file.blockreplace",
  4021. name=name,
  4022. content=self.content,
  4023. marker_start=self.marker_start,
  4024. marker_end=self.marker_end,
  4025. append_newline=False,
  4026. )
  4027. self.assertSaltTrueReturn(ret)
  4028. self.assertFalse(ret[next(iter(ret))]["changes"])
  4029. self.assertEqual(self._read(name), self.with_matching_block)
  4030. # Pass 2: content does not end in newline
  4031. self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
  4032. ret = self.run_state(
  4033. "file.blockreplace",
  4034. name=name,
  4035. content=self.content.rstrip("\r\n"),
  4036. marker_start=self.marker_start,
  4037. marker_end=self.marker_end,
  4038. append_newline=False,
  4039. )
  4040. self.assertSaltTrueReturn(ret)
  4041. self.assertFalse(ret[next(iter(ret))]["changes"])
  4042. self.assertEqual(
  4043. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  4044. )
  4045. # Pass 2a: Re-run state, no changes should be made
  4046. ret = self.run_state(
  4047. "file.blockreplace",
  4048. name=name,
  4049. content=self.content.rstrip("\r\n"),
  4050. marker_start=self.marker_start,
  4051. marker_end=self.marker_end,
  4052. append_newline=False,
  4053. )
  4054. self.assertSaltTrueReturn(ret)
  4055. self.assertFalse(ret[next(iter(ret))]["changes"])
  4056. self.assertEqual(
  4057. self._read(name), self.with_matching_block_and_marker_end_not_after_newline
  4058. )
  4059. @with_tempfile()
  4060. def test_issue_49043(self, name):
  4061. ret = self.run_function("state.sls", mods="issue-49043", pillar={"name": name},)
  4062. log.error("ret = %s", repr(ret))
  4063. diff = "--- \n+++ \n@@ -0,0 +1,3 @@\n"
  4064. diff += dedent(
  4065. """\
  4066. +#-- start managed zone --
  4067. +äöü
  4068. +#-- end managed zone --
  4069. """
  4070. )
  4071. job = "file_|-somefile-blockreplace_|-{}_|-blockreplace".format(name)
  4072. self.assertEqual(ret[job]["changes"]["diff"], diff)
  4073. @pytest.mark.windows_whitelisted
  4074. class RemoteFileTest(ModuleCase, SaltReturnAssertsMixin):
  4075. """
  4076. Uses a local tornado webserver to test http(s) file.managed states with and
  4077. without skip_verify
  4078. """
  4079. @classmethod
  4080. def setUpClass(cls):
  4081. cls.webserver = Webserver()
  4082. cls.webserver.start()
  4083. cls.source = cls.webserver.url("grail/scene33")
  4084. if IS_WINDOWS:
  4085. # CRLF vs LF causes a different hash on windows
  4086. cls.source_hash = "21438b3d5fd2c0028bcab92f7824dc69"
  4087. else:
  4088. cls.source_hash = "d2feb3beb323c79fc7a0f44f1408b4a3"
  4089. @classmethod
  4090. def tearDownClass(cls):
  4091. cls.webserver.stop()
  4092. @with_tempfile(create=False)
  4093. def setUp(self, name): # pylint: disable=arguments-differ
  4094. self.name = name
  4095. def tearDown(self):
  4096. try:
  4097. os.remove(self.name)
  4098. except OSError as exc:
  4099. if exc.errno != errno.ENOENT:
  4100. six.reraise(*sys.exc_info())
  4101. def run_state(self, *args, **kwargs): # pylint: disable=arguments-differ
  4102. ret = super(RemoteFileTest, self).run_state(*args, **kwargs)
  4103. log.debug("ret = %s", ret)
  4104. return ret
  4105. def test_file_managed_http_source_no_hash(self):
  4106. """
  4107. Test a remote file with no hash
  4108. """
  4109. ret = self.run_state(
  4110. "file.managed", name=self.name, source=self.source, skip_verify=False
  4111. )
  4112. # This should fail because no hash was provided
  4113. self.assertSaltFalseReturn(ret)
  4114. def test_file_managed_http_source(self):
  4115. """
  4116. Test a remote file with no hash
  4117. """
  4118. ret = self.run_state(
  4119. "file.managed",
  4120. name=self.name,
  4121. source=self.source,
  4122. source_hash=self.source_hash,
  4123. skip_verify=False,
  4124. )
  4125. self.assertSaltTrueReturn(ret)
  4126. def test_file_managed_http_source_skip_verify(self):
  4127. """
  4128. Test a remote file using skip_verify
  4129. """
  4130. ret = self.run_state(
  4131. "file.managed", name=self.name, source=self.source, skip_verify=True
  4132. )
  4133. self.assertSaltTrueReturn(ret)
  4134. def test_file_managed_keep_source_false_http(self):
  4135. """
  4136. This test ensures that we properly clean the cached file if keep_source
  4137. is set to False, for source files using an http:// URL
  4138. """
  4139. # Run the state
  4140. ret = self.run_state(
  4141. "file.managed",
  4142. name=self.name,
  4143. source=self.source,
  4144. source_hash=self.source_hash,
  4145. keep_source=False,
  4146. )
  4147. ret = ret[next(iter(ret))]
  4148. assert ret["result"] is True
  4149. # Now make sure that the file is not cached
  4150. result = self.run_function("cp.is_cached", [self.source])
  4151. assert result == "", "File is still cached at {0}".format(result)
  4152. @skipIf(not salt.utils.path.which("patch"), "patch is not installed")
  4153. @pytest.mark.windows_whitelisted
  4154. class PatchTest(ModuleCase, SaltReturnAssertsMixin):
  4155. def _check_patch_version(self, min_version):
  4156. """
  4157. patch version check
  4158. """
  4159. version = self.run_function("cmd.run", ["patch --version"]).splitlines()[0]
  4160. version = version.split()[1]
  4161. if _LooseVersion(version) < _LooseVersion(min_version):
  4162. self.skipTest(
  4163. "Minimum patch version required: {0}. "
  4164. "Patch version installed: {1}".format(min_version, version)
  4165. )
  4166. @classmethod
  4167. def setUpClass(cls):
  4168. cls.webserver = Webserver()
  4169. cls.webserver.start()
  4170. cls.numbers_patch_name = "numbers.patch"
  4171. cls.math_patch_name = "math.patch"
  4172. cls.all_patch_name = "all.patch"
  4173. cls.numbers_patch_template_name = cls.numbers_patch_name + ".jinja"
  4174. cls.math_patch_template_name = cls.math_patch_name + ".jinja"
  4175. cls.all_patch_template_name = cls.all_patch_name + ".jinja"
  4176. cls.numbers_patch_path = "patches/" + cls.numbers_patch_name
  4177. cls.math_patch_path = "patches/" + cls.math_patch_name
  4178. cls.all_patch_path = "patches/" + cls.all_patch_name
  4179. cls.numbers_patch_template_path = "patches/" + cls.numbers_patch_template_name
  4180. cls.math_patch_template_path = "patches/" + cls.math_patch_template_name
  4181. cls.all_patch_template_path = "patches/" + cls.all_patch_template_name
  4182. cls.numbers_patch = "salt://" + cls.numbers_patch_path
  4183. cls.math_patch = "salt://" + cls.math_patch_path
  4184. cls.all_patch = "salt://" + cls.all_patch_path
  4185. cls.numbers_patch_template = "salt://" + cls.numbers_patch_template_path
  4186. cls.math_patch_template = "salt://" + cls.math_patch_template_path
  4187. cls.all_patch_template = "salt://" + cls.all_patch_template_path
  4188. cls.numbers_patch_http = cls.webserver.url(cls.numbers_patch_path)
  4189. cls.math_patch_http = cls.webserver.url(cls.math_patch_path)
  4190. cls.all_patch_http = cls.webserver.url(cls.all_patch_path)
  4191. cls.numbers_patch_template_http = cls.webserver.url(
  4192. cls.numbers_patch_template_path
  4193. )
  4194. cls.math_patch_template_http = cls.webserver.url(cls.math_patch_template_path)
  4195. cls.all_patch_template_http = cls.webserver.url(cls.all_patch_template_path)
  4196. patches_dir = os.path.join(RUNTIME_VARS.FILES, "file", "base", "patches")
  4197. cls.numbers_patch_hash = salt.utils.hashutils.get_hash(
  4198. os.path.join(patches_dir, cls.numbers_patch_name)
  4199. )
  4200. cls.math_patch_hash = salt.utils.hashutils.get_hash(
  4201. os.path.join(patches_dir, cls.math_patch_name)
  4202. )
  4203. cls.all_patch_hash = salt.utils.hashutils.get_hash(
  4204. os.path.join(patches_dir, cls.all_patch_name)
  4205. )
  4206. cls.numbers_patch_template_hash = salt.utils.hashutils.get_hash(
  4207. os.path.join(patches_dir, cls.numbers_patch_template_name)
  4208. )
  4209. cls.math_patch_template_hash = salt.utils.hashutils.get_hash(
  4210. os.path.join(patches_dir, cls.math_patch_template_name)
  4211. )
  4212. cls.all_patch_template_hash = salt.utils.hashutils.get_hash(
  4213. os.path.join(patches_dir, cls.all_patch_template_name)
  4214. )
  4215. cls.context = {"two": "two", "ten": 10}
  4216. @classmethod
  4217. def tearDownClass(cls):
  4218. cls.webserver.stop()
  4219. def setUp(self):
  4220. """
  4221. Create a new unpatched set of files
  4222. """
  4223. self.base_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  4224. os.makedirs(os.path.join(self.base_dir, "foo", "bar"))
  4225. self.numbers_file = os.path.join(self.base_dir, "foo", "numbers.txt")
  4226. self.math_file = os.path.join(self.base_dir, "foo", "bar", "math.txt")
  4227. with salt.utils.files.fopen(self.numbers_file, "w") as fp_:
  4228. fp_.write(
  4229. textwrap.dedent(
  4230. """\
  4231. one
  4232. two
  4233. three
  4234. 1
  4235. 2
  4236. 3
  4237. """
  4238. )
  4239. )
  4240. with salt.utils.files.fopen(self.math_file, "w") as fp_:
  4241. fp_.write(
  4242. textwrap.dedent(
  4243. """\
  4244. Five plus five is ten
  4245. Four squared is sixteen
  4246. """
  4247. )
  4248. )
  4249. self.addCleanup(shutil.rmtree, self.base_dir, ignore_errors=True)
  4250. def test_patch_single_file(self):
  4251. """
  4252. Test file.patch using a patch applied to a single file
  4253. """
  4254. ret = self.run_state(
  4255. "file.patch", name=self.numbers_file, source=self.numbers_patch,
  4256. )
  4257. self.assertSaltTrueReturn(ret)
  4258. ret = ret[next(iter(ret))]
  4259. self.assertEqual(ret["comment"], "Patch successfully applied")
  4260. # Re-run the state, should succeed and there should be a message about
  4261. # a partially-applied hunk.
  4262. ret = self.run_state(
  4263. "file.patch", name=self.numbers_file, source=self.numbers_patch,
  4264. )
  4265. self.assertSaltTrueReturn(ret)
  4266. ret = ret[next(iter(ret))]
  4267. self.assertEqual(ret["comment"], "Patch was already applied")
  4268. self.assertEqual(ret["changes"], {})
  4269. def test_patch_directory(self):
  4270. """
  4271. Test file.patch using a patch applied to a directory, with changes
  4272. spanning multiple files.
  4273. """
  4274. self._check_patch_version("2.6")
  4275. ret = self.run_state(
  4276. "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
  4277. )
  4278. self.assertSaltTrueReturn(ret)
  4279. ret = ret[next(iter(ret))]
  4280. self.assertEqual(ret["comment"], "Patch successfully applied")
  4281. # Re-run the state, should succeed and there should be a message about
  4282. # a partially-applied hunk.
  4283. ret = self.run_state(
  4284. "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
  4285. )
  4286. self.assertSaltTrueReturn(ret)
  4287. ret = ret[next(iter(ret))]
  4288. self.assertEqual(ret["comment"], "Patch was already applied")
  4289. self.assertEqual(ret["changes"], {})
  4290. def test_patch_strip_parsing(self):
  4291. """
  4292. Test that we successfuly parse -p/--strip when included in the options
  4293. """
  4294. self._check_patch_version("2.6")
  4295. # Run the state using -p1
  4296. ret = self.run_state(
  4297. "file.patch", name=self.base_dir, source=self.all_patch, options="-p1",
  4298. )
  4299. self.assertSaltTrueReturn(ret)
  4300. ret = ret[next(iter(ret))]
  4301. self.assertEqual(ret["comment"], "Patch successfully applied")
  4302. # Re-run the state using --strip=1
  4303. ret = self.run_state(
  4304. "file.patch",
  4305. name=self.base_dir,
  4306. source=self.all_patch,
  4307. options="--strip=1",
  4308. )
  4309. self.assertSaltTrueReturn(ret)
  4310. ret = ret[next(iter(ret))]
  4311. self.assertEqual(ret["comment"], "Patch was already applied")
  4312. self.assertEqual(ret["changes"], {})
  4313. # Re-run the state using --strip 1
  4314. ret = self.run_state(
  4315. "file.patch",
  4316. name=self.base_dir,
  4317. source=self.all_patch,
  4318. options="--strip 1",
  4319. )
  4320. self.assertSaltTrueReturn(ret)
  4321. ret = ret[next(iter(ret))]
  4322. self.assertEqual(ret["comment"], "Patch was already applied")
  4323. self.assertEqual(ret["changes"], {})
  4324. def test_patch_saltenv(self):
  4325. """
  4326. Test that we attempt to download the patch from a non-base saltenv
  4327. """
  4328. # This state will fail because we don't have a patch file in that
  4329. # environment, but that is OK, we just want to test that we're looking
  4330. # in an environment other than base.
  4331. ret = self.run_state(
  4332. "file.patch", name=self.math_file, source=self.math_patch, saltenv="prod",
  4333. )
  4334. self.assertSaltFalseReturn(ret)
  4335. ret = ret[next(iter(ret))]
  4336. self.assertEqual(
  4337. ret["comment"],
  4338. "Source file {0} not found in saltenv 'prod'".format(self.math_patch),
  4339. )
  4340. def test_patch_single_file_failure(self):
  4341. """
  4342. Test file.patch using a patch applied to a single file. This tests a
  4343. failed patch.
  4344. """
  4345. # Empty the file to ensure that the patch doesn't apply cleanly
  4346. with salt.utils.files.fopen(self.numbers_file, "w"):
  4347. pass
  4348. ret = self.run_state(
  4349. "file.patch", name=self.numbers_file, source=self.numbers_patch,
  4350. )
  4351. self.assertSaltFalseReturn(ret)
  4352. ret = ret[next(iter(ret))]
  4353. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4354. # Test the reject_file option and ensure that the rejects are written
  4355. # to the path specified.
  4356. reject_file = salt.utils.files.mkstemp()
  4357. ret = self.run_state(
  4358. "file.patch",
  4359. name=self.numbers_file,
  4360. source=self.numbers_patch,
  4361. reject_file=reject_file,
  4362. strip=1,
  4363. )
  4364. self.assertSaltFalseReturn(ret)
  4365. ret = ret[next(iter(ret))]
  4366. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4367. self.assertRegex(
  4368. ret["comment"], "saving rejects to (file )?{0}".format(reject_file)
  4369. )
  4370. def test_patch_directory_failure(self):
  4371. """
  4372. Test file.patch using a patch applied to a directory, with changes
  4373. spanning multiple files.
  4374. """
  4375. # Empty the file to ensure that the patch doesn't apply
  4376. with salt.utils.files.fopen(self.math_file, "w"):
  4377. pass
  4378. ret = self.run_state(
  4379. "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
  4380. )
  4381. self.assertSaltFalseReturn(ret)
  4382. ret = ret[next(iter(ret))]
  4383. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4384. # Test the reject_file option and ensure that the rejects are written
  4385. # to the path specified.
  4386. reject_file = salt.utils.files.mkstemp()
  4387. ret = self.run_state(
  4388. "file.patch",
  4389. name=self.base_dir,
  4390. source=self.all_patch,
  4391. reject_file=reject_file,
  4392. strip=1,
  4393. )
  4394. self.assertSaltFalseReturn(ret)
  4395. ret = ret[next(iter(ret))]
  4396. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4397. self.assertRegex(
  4398. ret["comment"], "saving rejects to (file )?{0}".format(reject_file)
  4399. )
  4400. def test_patch_single_file_remote_source(self):
  4401. """
  4402. Test file.patch using a patch applied to a single file, with the patch
  4403. coming from a remote source.
  4404. """
  4405. # Try without a source_hash and without skip_verify=True, this should
  4406. # fail with a message about the source_hash
  4407. ret = self.run_state(
  4408. "file.patch", name=self.math_file, source=self.math_patch_http,
  4409. )
  4410. self.assertSaltFalseReturn(ret)
  4411. ret = ret[next(iter(ret))]
  4412. self.assertIn("Unable to verify upstream hash", ret["comment"])
  4413. # Re-run the state with a source hash, it should now succeed
  4414. ret = self.run_state(
  4415. "file.patch",
  4416. name=self.math_file,
  4417. source=self.math_patch_http,
  4418. source_hash=self.math_patch_hash,
  4419. )
  4420. self.assertSaltTrueReturn(ret)
  4421. ret = ret[next(iter(ret))]
  4422. self.assertEqual(ret["comment"], "Patch successfully applied")
  4423. # Re-run again, this time with no hash and skip_verify=True to test
  4424. # skipping hash verification
  4425. ret = self.run_state(
  4426. "file.patch",
  4427. name=self.math_file,
  4428. source=self.math_patch_http,
  4429. skip_verify=True,
  4430. )
  4431. self.assertSaltTrueReturn(ret)
  4432. ret = ret[next(iter(ret))]
  4433. self.assertEqual(ret["comment"], "Patch was already applied")
  4434. self.assertEqual(ret["changes"], {})
  4435. def test_patch_directory_remote_source(self):
  4436. """
  4437. Test file.patch using a patch applied to a directory, with changes
  4438. spanning multiple files, and the patch file coming from a remote
  4439. source.
  4440. """
  4441. self._check_patch_version("2.6")
  4442. # Try without a source_hash and without skip_verify=True, this should
  4443. # fail with a message about the source_hash
  4444. ret = self.run_state(
  4445. "file.patch", name=self.base_dir, source=self.all_patch_http, strip=1,
  4446. )
  4447. self.assertSaltFalseReturn(ret)
  4448. ret = ret[next(iter(ret))]
  4449. self.assertIn("Unable to verify upstream hash", ret["comment"])
  4450. # Re-run the state with a source hash, it should now succeed
  4451. ret = self.run_state(
  4452. "file.patch",
  4453. name=self.base_dir,
  4454. source=self.all_patch_http,
  4455. source_hash=self.all_patch_hash,
  4456. strip=1,
  4457. )
  4458. self.assertSaltTrueReturn(ret)
  4459. ret = ret[next(iter(ret))]
  4460. self.assertEqual(ret["comment"], "Patch successfully applied")
  4461. # Re-run again, this time with no hash and skip_verify=True to test
  4462. # skipping hash verification
  4463. ret = self.run_state(
  4464. "file.patch",
  4465. name=self.base_dir,
  4466. source=self.all_patch_http,
  4467. strip=1,
  4468. skip_verify=True,
  4469. )
  4470. self.assertSaltTrueReturn(ret)
  4471. ret = ret[next(iter(ret))]
  4472. self.assertEqual(ret["comment"], "Patch was already applied")
  4473. self.assertEqual(ret["changes"], {})
  4474. def test_patch_single_file_template(self):
  4475. """
  4476. Test file.patch using a patch applied to a single file, with jinja
  4477. templating applied to the patch file.
  4478. """
  4479. ret = self.run_state(
  4480. "file.patch",
  4481. name=self.numbers_file,
  4482. source=self.numbers_patch_template,
  4483. template="jinja",
  4484. context=self.context,
  4485. )
  4486. self.assertSaltTrueReturn(ret)
  4487. ret = ret[next(iter(ret))]
  4488. self.assertEqual(ret["comment"], "Patch successfully applied")
  4489. # Re-run the state, should succeed and there should be a message about
  4490. # a partially-applied hunk.
  4491. ret = self.run_state(
  4492. "file.patch",
  4493. name=self.numbers_file,
  4494. source=self.numbers_patch_template,
  4495. template="jinja",
  4496. context=self.context,
  4497. )
  4498. self.assertSaltTrueReturn(ret)
  4499. ret = ret[next(iter(ret))]
  4500. self.assertEqual(ret["comment"], "Patch was already applied")
  4501. self.assertEqual(ret["changes"], {})
  4502. def test_patch_directory_template(self):
  4503. """
  4504. Test file.patch using a patch applied to a directory, with changes
  4505. spanning multiple files, and with jinja templating applied to the patch
  4506. file.
  4507. """
  4508. self._check_patch_version("2.6")
  4509. ret = self.run_state(
  4510. "file.patch",
  4511. name=self.base_dir,
  4512. source=self.all_patch_template,
  4513. template="jinja",
  4514. context=self.context,
  4515. strip=1,
  4516. )
  4517. self.assertSaltTrueReturn(ret)
  4518. ret = ret[next(iter(ret))]
  4519. self.assertEqual(ret["comment"], "Patch successfully applied")
  4520. # Re-run the state, should succeed and there should be a message about
  4521. # a partially-applied hunk.
  4522. ret = self.run_state(
  4523. "file.patch",
  4524. name=self.base_dir,
  4525. source=self.all_patch_template,
  4526. template="jinja",
  4527. context=self.context,
  4528. strip=1,
  4529. )
  4530. self.assertSaltTrueReturn(ret)
  4531. ret = ret[next(iter(ret))]
  4532. self.assertEqual(ret["comment"], "Patch was already applied")
  4533. self.assertEqual(ret["changes"], {})
  4534. def test_patch_single_file_remote_source_template(self):
  4535. """
  4536. Test file.patch using a patch applied to a single file, with the patch
  4537. coming from a remote source.
  4538. """
  4539. # Try without a source_hash and without skip_verify=True, this should
  4540. # fail with a message about the source_hash
  4541. ret = self.run_state(
  4542. "file.patch",
  4543. name=self.math_file,
  4544. source=self.math_patch_template_http,
  4545. template="jinja",
  4546. context=self.context,
  4547. )
  4548. self.assertSaltFalseReturn(ret)
  4549. ret = ret[next(iter(ret))]
  4550. self.assertIn("Unable to verify upstream hash", ret["comment"])
  4551. # Re-run the state with a source hash, it should now succeed
  4552. ret = self.run_state(
  4553. "file.patch",
  4554. name=self.math_file,
  4555. source=self.math_patch_template_http,
  4556. source_hash=self.math_patch_template_hash,
  4557. template="jinja",
  4558. context=self.context,
  4559. )
  4560. self.assertSaltTrueReturn(ret)
  4561. ret = ret[next(iter(ret))]
  4562. self.assertEqual(ret["comment"], "Patch successfully applied")
  4563. # Re-run again, this time with no hash and skip_verify=True to test
  4564. # skipping hash verification
  4565. ret = self.run_state(
  4566. "file.patch",
  4567. name=self.math_file,
  4568. source=self.math_patch_template_http,
  4569. template="jinja",
  4570. context=self.context,
  4571. skip_verify=True,
  4572. )
  4573. self.assertSaltTrueReturn(ret)
  4574. ret = ret[next(iter(ret))]
  4575. self.assertEqual(ret["comment"], "Patch was already applied")
  4576. self.assertEqual(ret["changes"], {})
  4577. def test_patch_directory_remote_source_template(self):
  4578. """
  4579. Test file.patch using a patch applied to a directory, with changes
  4580. spanning multiple files, and the patch file coming from a remote
  4581. source.
  4582. """
  4583. self._check_patch_version("2.6")
  4584. # Try without a source_hash and without skip_verify=True, this should
  4585. # fail with a message about the source_hash
  4586. ret = self.run_state(
  4587. "file.patch",
  4588. name=self.base_dir,
  4589. source=self.all_patch_template_http,
  4590. template="jinja",
  4591. context=self.context,
  4592. strip=1,
  4593. )
  4594. self.assertSaltFalseReturn(ret)
  4595. ret = ret[next(iter(ret))]
  4596. self.assertIn("Unable to verify upstream hash", ret["comment"])
  4597. # Re-run the state with a source hash, it should now succeed
  4598. ret = self.run_state(
  4599. "file.patch",
  4600. name=self.base_dir,
  4601. source=self.all_patch_template_http,
  4602. source_hash=self.all_patch_template_hash,
  4603. template="jinja",
  4604. context=self.context,
  4605. strip=1,
  4606. )
  4607. self.assertSaltTrueReturn(ret)
  4608. ret = ret[next(iter(ret))]
  4609. self.assertEqual(ret["comment"], "Patch successfully applied")
  4610. # Re-run again, this time with no hash and skip_verify=True to test
  4611. # skipping hash verification
  4612. ret = self.run_state(
  4613. "file.patch",
  4614. name=self.base_dir,
  4615. source=self.all_patch_template_http,
  4616. template="jinja",
  4617. context=self.context,
  4618. strip=1,
  4619. skip_verify=True,
  4620. )
  4621. self.assertSaltTrueReturn(ret)
  4622. ret = ret[next(iter(ret))]
  4623. self.assertEqual(ret["comment"], "Patch was already applied")
  4624. self.assertEqual(ret["changes"], {})
  4625. def test_patch_test_mode(self):
  4626. """
  4627. Test file.patch using test=True
  4628. """
  4629. # Try without a source_hash and without skip_verify=True, this should
  4630. # fail with a message about the source_hash
  4631. ret = self.run_state(
  4632. "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
  4633. )
  4634. self.assertSaltNoneReturn(ret)
  4635. ret = ret[next(iter(ret))]
  4636. self.assertEqual(ret["comment"], "The patch would be applied")
  4637. self.assertTrue(ret["changes"])
  4638. # Apply the patch for real. We'll then be able to test below that we
  4639. # exit with a True rather than a None result if test=True is used on an
  4640. # already-applied patch.
  4641. ret = self.run_state(
  4642. "file.patch", name=self.numbers_file, source=self.numbers_patch,
  4643. )
  4644. self.assertSaltTrueReturn(ret)
  4645. ret = ret[next(iter(ret))]
  4646. self.assertEqual(ret["comment"], "Patch successfully applied")
  4647. self.assertTrue(ret["changes"])
  4648. # Run again with test=True. Since the pre-check happens before we do
  4649. # the __opts__['test'] check, we should exit with a True result just
  4650. # the same as if we try to run this state on an already-patched file
  4651. # *without* test=True.
  4652. ret = self.run_state(
  4653. "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
  4654. )
  4655. self.assertSaltTrueReturn(ret)
  4656. ret = ret[next(iter(ret))]
  4657. self.assertEqual(ret["comment"], "Patch was already applied")
  4658. self.assertEqual(ret["changes"], {})
  4659. # Empty the file to ensure that the patch doesn't apply cleanly
  4660. with salt.utils.files.fopen(self.numbers_file, "w"):
  4661. pass
  4662. # Run again with test=True. Similar to the above run, we are testing
  4663. # that we return before we reach the __opts__['test'] check. In this
  4664. # case we should return a False result because we should already know
  4665. # by this point that the patch will not apply cleanly.
  4666. ret = self.run_state(
  4667. "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
  4668. )
  4669. self.assertSaltFalseReturn(ret)
  4670. ret = ret[next(iter(ret))]
  4671. self.assertIn("Patch would not apply cleanly", ret["comment"])
  4672. self.assertEqual(ret["changes"], {})
  4673. WIN_TEST_FILE = "c:/testfile"
  4674. @destructiveTest
  4675. @skipIf(not IS_WINDOWS, "windows test only")
  4676. @pytest.mark.windows_whitelisted
  4677. class WinFileTest(ModuleCase):
  4678. """
  4679. Test for the file state on Windows
  4680. """
  4681. def setUp(self):
  4682. self.run_state(
  4683. "file.managed", name=WIN_TEST_FILE, makedirs=True, contents="Only a test"
  4684. )
  4685. def tearDown(self):
  4686. self.run_state("file.absent", name=WIN_TEST_FILE)
  4687. def test_file_managed(self):
  4688. """
  4689. Test file.managed on Windows
  4690. """
  4691. self.assertTrue(self.run_state("file.exists", name=WIN_TEST_FILE))
  4692. def test_file_copy(self):
  4693. """
  4694. Test file.copy on Windows
  4695. """
  4696. ret = self.run_state(
  4697. "file.copy", name="c:/testfile_copy", makedirs=True, source=WIN_TEST_FILE
  4698. )
  4699. self.assertTrue(ret)
  4700. def test_file_comment(self):
  4701. """
  4702. Test file.comment on Windows
  4703. """
  4704. self.run_state("file.comment", name=WIN_TEST_FILE, regex="^Only")
  4705. with salt.utils.files.fopen(WIN_TEST_FILE, "r") as fp_:
  4706. self.assertTrue(fp_.read().startswith("#Only"))
  4707. def test_file_replace(self):
  4708. """
  4709. Test file.replace on Windows
  4710. """
  4711. self.run_state(
  4712. "file.replace", name=WIN_TEST_FILE, pattern="test", repl="testing"
  4713. )
  4714. with salt.utils.files.fopen(WIN_TEST_FILE, "r") as fp_:
  4715. self.assertIn("testing", fp_.read())
  4716. def test_file_absent(self):
  4717. """
  4718. Test file.absent on Windows
  4719. """
  4720. ret = self.run_state("file.absent", name=WIN_TEST_FILE)
  4721. self.assertTrue(ret)