test_file.py 187 KB


  1. # -*- coding: utf-8 -*-
  2. '''
  3. Tests for the file state
  4. '''
  5. # Import Python libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import errno
  8. import logging
  9. import os
  10. import re
  11. import sys
  12. import shutil
  13. import stat
  14. import tempfile
  15. import textwrap
  16. import filecmp
  17. log = logging.getLogger(__name__)
  18. # Import Salt Testing libs
  19. from tests.support.runtests import RUNTIME_VARS
  20. from tests.support.case import ModuleCase
  21. from tests.support.unit import skipIf
  22. from tests.support.helpers import (
  23. with_system_user_and_group,
  24. with_tempdir,
  25. with_tempfile,
  26. Webserver,
  27. dedent,
  28. )
  29. from tests.support.mixins import SaltReturnAssertsMixin
  30. # Import Salt libs
  31. import salt.utils.data
  32. import salt.utils.files
  33. import salt.utils.json
  34. import salt.utils.path
  35. import salt.utils.platform
  36. import salt.utils.stringutils
  37. import salt.serializers.configparser
  38. from salt.utils.versions import LooseVersion as _LooseVersion
  39. HAS_PWD = True
  40. try:
  41. import pwd
  42. except ImportError:
  43. HAS_PWD = False
  44. HAS_GRP = True
  45. try:
  46. import grp
  47. except ImportError:
  48. HAS_GRP = False
  49. # Import 3rd-party libs
  50. import pytest
  51. from salt.ext import six
  52. from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
  53. IS_WINDOWS = salt.utils.platform.is_windows()
  54. BINARY_FILE = b'GIF89a\x01\x00\x01\x00\x80\x00\x00\x05\x04\x04\x00\x00\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02D\x01\x00;'
  55. TEST_SYSTEM_USER = 'test_system_user'
  56. TEST_SYSTEM_GROUP = 'test_system_group'
  57. def _test_managed_file_mode_keep_helper(testcase, local=False):
  58. '''
  59. DRY helper function to run the same test with a local or remote path
  60. '''
  61. name = os.path.join(RUNTIME_VARS.TMP, 'scene33')
  62. grail_fs_path = os.path.join(RUNTIME_VARS.BASE_FILES, 'grail', 'scene33')
  63. grail = 'salt://grail/scene33' if not local else grail_fs_path
  64. # Get the current mode so that we can put the file back the way we
  65. # found it when we're done.
  66. grail_fs_mode = int(testcase.run_function('file.get_mode', [grail_fs_path]), 8)
  67. initial_mode = 0o770
  68. new_mode_1 = 0o600
  69. new_mode_2 = 0o644
  70. # Set the initial mode, so we can be assured that when we set the mode
  71. # to "keep", we're actually changing the permissions of the file to the
  72. # new mode.
  73. ret = testcase.run_state(
  74. 'file.managed',
  75. name=name,
  76. mode=oct(initial_mode),
  77. source=grail,
  78. )
  79. if IS_WINDOWS:
  80. testcase.assertSaltFalseReturn(ret)
  81. return
  82. testcase.assertSaltTrueReturn(ret)
  83. try:
  84. # Update the mode on the fileserver (pass 1)
  85. os.chmod(grail_fs_path, new_mode_1)
  86. ret = testcase.run_state(
  87. 'file.managed',
  88. name=name,
  89. mode='keep',
  90. source=grail,
  91. )
  92. testcase.assertSaltTrueReturn(ret)
  93. managed_mode = stat.S_IMODE(os.stat(name).st_mode)
  94. testcase.assertEqual(oct(managed_mode), oct(new_mode_1))
  95. # Update the mode on the fileserver (pass 2)
  96. # This assures us that if the file in file_roots was originally set
  97. # to the same mode as new_mode_1, we definitely get an updated mode
  98. # this time.
  99. os.chmod(grail_fs_path, new_mode_2)
  100. ret = testcase.run_state(
  101. 'file.managed',
  102. name=name,
  103. mode='keep',
  104. source=grail,
  105. )
  106. testcase.assertSaltTrueReturn(ret)
  107. managed_mode = stat.S_IMODE(os.stat(name).st_mode)
  108. testcase.assertEqual(oct(managed_mode), oct(new_mode_2))
  109. except Exception:
  110. raise
  111. finally:
  112. # Set the mode of the file in the file_roots back to what it
  113. # originally was.
  114. os.chmod(grail_fs_path, grail_fs_mode)
  115. @pytest.mark.windows_whitelisted
  116. class FileTest(ModuleCase, SaltReturnAssertsMixin):
  117. '''
  118. Validate the file state
  119. '''
  120. def _delete_file(self, path):
  121. try:
  122. os.remove(path)
  123. except OSError as exc:
  124. if exc.errno != errno.ENOENT:
  125. log.error('Failed to remove %s: %s', path, exc)
  126. def tearDown(self):
  127. '''
  128. remove files created in previous tests
  129. '''
  130. user = 'salt'
  131. if user in str(self.run_function('user.list_users')):
  132. self.run_function('user.delete', [user])
  133. def test_symlink(self):
  134. '''
  135. file.symlink
  136. '''
  137. name = os.path.join(RUNTIME_VARS.TMP, 'symlink')
  138. tgt = os.path.join(RUNTIME_VARS.TMP, 'target')
  139. # Windows must have a source directory to link to
  140. if IS_WINDOWS and not os.path.isdir(tgt):
  141. os.mkdir(tgt)
  142. # Windows cannot create a symlink if it already exists
  143. if IS_WINDOWS and self.run_function('file.is_link', [name]):
  144. self.run_function('file.remove', [name])
  145. ret = self.run_state('file.symlink', name=name, target=tgt)
  146. self.assertSaltTrueReturn(ret)
  147. def test_test_symlink(self):
  148. '''
  149. file.symlink test interface
  150. '''
  151. name = os.path.join(RUNTIME_VARS.TMP, 'symlink2')
  152. tgt = os.path.join(RUNTIME_VARS.TMP, 'target')
  153. ret = self.run_state('file.symlink', test=True, name=name, target=tgt)
  154. self.assertSaltNoneReturn(ret)
  155. def test_absent_file(self):
  156. '''
  157. file.absent
  158. '''
  159. name = os.path.join(RUNTIME_VARS.TMP, 'file_to_kill')
  160. with salt.utils.files.fopen(name, 'w+') as fp_:
  161. fp_.write('killme')
  162. ret = self.run_state('file.absent', name=name)
  163. self.assertSaltTrueReturn(ret)
  164. self.assertFalse(os.path.isfile(name))
  165. def test_absent_dir(self):
  166. '''
  167. file.absent
  168. '''
  169. name = os.path.join(RUNTIME_VARS.TMP, 'dir_to_kill')
  170. if not os.path.isdir(name):
  171. # left behind... Don't fail because of this!
  172. os.makedirs(name)
  173. ret = self.run_state('file.absent', name=name)
  174. self.assertSaltTrueReturn(ret)
  175. self.assertFalse(os.path.isdir(name))
  176. def test_absent_link(self):
  177. '''
  178. file.absent
  179. '''
  180. name = os.path.join(RUNTIME_VARS.TMP, 'link_to_kill')
  181. tgt = '{0}.tgt'.format(name)
  182. # Windows must have a source directory to link to
  183. if IS_WINDOWS and not os.path.isdir(tgt):
  184. os.mkdir(tgt)
  185. if not self.run_function('file.is_link', [name]):
  186. self.run_function('file.symlink', [tgt, name])
  187. ret = self.run_state('file.absent', name=name)
  188. try:
  189. self.assertSaltTrueReturn(ret)
  190. self.assertFalse(self.run_function('file.is_link', [name]))
  191. finally:
  192. if self.run_function('file.is_link', [name]):
  193. self.run_function('file.remove', [name])
  194. @with_tempfile()
  195. def test_test_absent(self, name):
  196. '''
  197. file.absent test interface
  198. '''
  199. with salt.utils.files.fopen(name, 'w+') as fp_:
  200. fp_.write('killme')
  201. ret = self.run_state('file.absent', test=True, name=name)
  202. self.assertSaltNoneReturn(ret)
  203. self.assertTrue(os.path.isfile(name))
  204. def test_managed(self):
  205. '''
  206. file.managed
  207. '''
  208. name = os.path.join(RUNTIME_VARS.TMP, 'grail_scene33')
  209. ret = self.run_state(
  210. 'file.managed', name=name, source='salt://grail/scene33'
  211. )
  212. src = os.path.join(RUNTIME_VARS.BASE_FILES, 'grail', 'scene33')
  213. with salt.utils.files.fopen(src, 'r') as fp_:
  214. master_data = fp_.read()
  215. with salt.utils.files.fopen(name, 'r') as fp_:
  216. minion_data = fp_.read()
  217. self.assertEqual(master_data, minion_data)
  218. self.assertSaltTrueReturn(ret)
  219. def test_managed_file_mode(self):
  220. '''
  221. file.managed, correct file permissions
  222. '''
  223. desired_mode = 504 # 0770 octal
  224. name = os.path.join(RUNTIME_VARS.TMP, 'grail_scene33')
  225. ret = self.run_state(
  226. 'file.managed', name=name, mode='0770', source='salt://grail/scene33'
  227. )
  228. if IS_WINDOWS:
  229. expected = 'The \'mode\' option is not supported on Windows'
  230. self.assertEqual(ret[list(ret)[0]]['comment'], expected)
  231. self.assertSaltFalseReturn(ret)
  232. return
  233. resulting_mode = stat.S_IMODE(
  234. os.stat(name).st_mode
  235. )
  236. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  237. self.assertSaltTrueReturn(ret)
  238. @skipIf(IS_WINDOWS, 'Windows does not report any file modes. Skipping.')
  239. def test_managed_file_mode_keep(self):
  240. '''
  241. Test using "mode: keep" in a file.managed state
  242. '''
  243. _test_managed_file_mode_keep_helper(self, local=False)
  244. @skipIf(IS_WINDOWS, 'Windows does not report any file modes. Skipping.')
  245. def test_managed_file_mode_keep_local_source(self):
  246. '''
  247. Test using "mode: keep" in a file.managed state, with a local file path
  248. as the source.
  249. '''
  250. _test_managed_file_mode_keep_helper(self, local=True)
  251. def test_managed_file_mode_file_exists_replace(self):
  252. '''
  253. file.managed, existing file with replace=True, change permissions
  254. '''
  255. initial_mode = 504 # 0770 octal
  256. desired_mode = 384 # 0600 octal
  257. name = os.path.join(RUNTIME_VARS.TMP, 'grail_scene33')
  258. ret = self.run_state(
  259. 'file.managed', name=name, mode=oct(initial_mode), source='salt://grail/scene33'
  260. )
  261. if IS_WINDOWS:
  262. expected = 'The \'mode\' option is not supported on Windows'
  263. self.assertEqual(ret[list(ret)[0]]['comment'], expected)
  264. self.assertSaltFalseReturn(ret)
  265. return
  266. resulting_mode = stat.S_IMODE(
  267. os.stat(name).st_mode
  268. )
  269. self.assertEqual(oct(initial_mode), oct(resulting_mode))
  270. name = os.path.join(RUNTIME_VARS.TMP, 'grail_scene33')
  271. ret = self.run_state(
  272. 'file.managed', name=name, replace=True, mode=oct(desired_mode), source='salt://grail/scene33'
  273. )
  274. resulting_mode = stat.S_IMODE(
  275. os.stat(name).st_mode
  276. )
  277. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  278. self.assertSaltTrueReturn(ret)
  279. def test_managed_file_mode_file_exists_noreplace(self):
  280. '''
  281. file.managed, existing file with replace=False, change permissions
  282. '''
  283. initial_mode = 504 # 0770 octal
  284. desired_mode = 384 # 0600 octal
  285. name = os.path.join(RUNTIME_VARS.TMP, 'grail_scene33')
  286. ret = self.run_state(
  287. 'file.managed', name=name, replace=True, mode=oct(initial_mode), source='salt://grail/scene33'
  288. )
  289. if IS_WINDOWS:
  290. expected = 'The \'mode\' option is not supported on Windows'
  291. self.assertEqual(ret[list(ret)[0]]['comment'], expected)
  292. self.assertSaltFalseReturn(ret)
  293. return
  294. ret = self.run_state(
  295. 'file.managed', name=name, replace=False, mode=oct(desired_mode), source='salt://grail/scene33'
  296. )
  297. resulting_mode = stat.S_IMODE(
  298. os.stat(name).st_mode
  299. )
  300. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  301. self.assertSaltTrueReturn(ret)
  302. def test_managed_file_with_grains_data(self):
  303. '''
  304. Test to ensure we can render grains data into a managed
  305. file.
  306. '''
  307. grain_path = os.path.join(RUNTIME_VARS.TMP, 'file-grain-test')
  308. state_file = 'file-grainget'
  309. self.run_function('state.sls', [state_file], pillar={'grain_path': grain_path})
  310. self.assertTrue(os.path.exists(grain_path))
  311. with salt.utils.files.fopen(grain_path, 'r') as fp_:
  312. file_contents = fp_.readlines()
  313. if IS_WINDOWS:
  314. match = '^minion\r\n'
  315. else:
  316. match = '^minion\n'
  317. self.assertTrue(re.match(match, file_contents[0]))
  318. def test_managed_file_with_pillar_sls(self):
  319. '''
  320. Test to ensure pillar data in sls file
  321. is rendered properly and file is created.
  322. '''
  323. file_pillar = os.path.join(RUNTIME_VARS.TMP, 'filepillar-python')
  324. self.addCleanup(self._delete_file, file_pillar)
  325. state_name = 'file-pillarget'
  326. log.warning('File Path: %s', file_pillar)
  327. ret = self.run_function('state.sls', [state_name])
  328. self.assertSaltTrueReturn(ret)
  329. # Check to make sure the file was created
  330. check_file = self.run_function('file.file_exists', [file_pillar])
  331. self.assertTrue(check_file)
  332. def test_managed_file_with_pillardefault_sls(self):
  333. '''
  334. Test to ensure when pillar data is not available
  335. in sls file with pillar.get it uses the default
  336. value.
  337. '''
  338. file_pillar_def = os.path.join(RUNTIME_VARS.TMP, 'filepillar-defaultvalue')
  339. self.addCleanup(self._delete_file, file_pillar_def)
  340. state_name = 'file-pillardefaultget'
  341. log.warning('File Path: %s', file_pillar_def)
  342. ret = self.run_function('state.sls', [state_name])
  343. self.assertSaltTrueReturn(ret)
  344. # Check to make sure the file was created
  345. check_file = self.run_function('file.file_exists', [file_pillar_def])
  346. self.assertTrue(check_file)
  347. @pytest.mark.skip_if_not_root
  348. def test_managed_dir_mode(self):
  349. '''
  350. Tests to ensure that file.managed creates directories with the
  351. permissions requested with the dir_mode argument
  352. '''
  353. desired_mode = 511 # 0777 in octal
  354. name = os.path.join(RUNTIME_VARS.TMP, 'a', 'managed_dir_mode_test_file')
  355. desired_owner = 'nobody'
  356. ret = self.run_state(
  357. 'file.managed',
  358. name=name,
  359. source='salt://grail/scene33',
  360. mode=600,
  361. makedirs=True,
  362. user=desired_owner,
  363. dir_mode=oct(desired_mode) # 0777
  364. )
  365. if IS_WINDOWS:
  366. expected = 'The \'mode\' option is not supported on Windows'
  367. self.assertEqual(ret[list(ret)[0]]['comment'], expected)
  368. self.assertSaltFalseReturn(ret)
  369. return
  370. resulting_mode = stat.S_IMODE(
  371. os.stat(os.path.join(RUNTIME_VARS.TMP, 'a')).st_mode
  372. )
  373. resulting_owner = pwd.getpwuid(os.stat(os.path.join(RUNTIME_VARS.TMP, 'a')).st_uid).pw_name
  374. self.assertEqual(oct(desired_mode), oct(resulting_mode))
  375. self.assertSaltTrueReturn(ret)
  376. self.assertEqual(desired_owner, resulting_owner)
  377. def test_test_managed(self):
  378. '''
  379. file.managed test interface
  380. '''
  381. name = os.path.join(RUNTIME_VARS.TMP, 'grail_not_not_scene33')
  382. ret = self.run_state(
  383. 'file.managed', test=True, name=name, source='salt://grail/scene33'
  384. )
  385. self.assertSaltNoneReturn(ret)
  386. self.assertFalse(os.path.isfile(name))
  387. def test_managed_show_changes_false(self):
  388. '''
  389. file.managed test interface
  390. '''
  391. name = os.path.join(RUNTIME_VARS.TMP, 'grail_not_scene33')
  392. with salt.utils.files.fopen(name, 'wb') as fp_:
  393. fp_.write(b'test_managed_show_changes_false\n')
  394. ret = self.run_state(
  395. 'file.managed', name=name, source='salt://grail/scene33',
  396. show_changes=False
  397. )
  398. changes = next(six.itervalues(ret))['changes']
  399. self.assertEqual('<show_changes=False>', changes['diff'])
  400. def test_managed_show_changes_true(self):
  401. '''
  402. file.managed test interface
  403. '''
  404. name = os.path.join(RUNTIME_VARS.TMP, 'grail_not_scene33')
  405. with salt.utils.files.fopen(name, 'wb') as fp_:
  406. fp_.write(b'test_managed_show_changes_false\n')
  407. ret = self.run_state(
  408. 'file.managed', name=name, source='salt://grail/scene33',
  409. )
  410. changes = next(six.itervalues(ret))['changes']
  411. self.assertIn('diff', changes)
  412. @skipIf(IS_WINDOWS, 'Don\'t know how to fix for Windows')
  413. def test_managed_escaped_file_path(self):
  414. '''
  415. file.managed test that 'salt://|' protects unusual characters in file path
  416. '''
  417. funny_file = salt.utils.files.mkstemp(prefix='?f!le? n@=3&', suffix='.file type')
  418. funny_file_name = os.path.split(funny_file)[1]
  419. funny_url = 'salt://|' + funny_file_name
  420. funny_url_path = os.path.join(RUNTIME_VARS.BASE_FILES, funny_file_name)
  421. state_name = 'funny_file'
  422. state_file_name = state_name + '.sls'
  423. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_file_name)
  424. state_key = 'file_|-{0}_|-{0}_|-managed'.format(funny_file)
  425. self.addCleanup(os.remove, state_file)
  426. self.addCleanup(os.remove, funny_file)
  427. self.addCleanup(os.remove, funny_url_path)
  428. with salt.utils.files.fopen(funny_url_path, 'w'):
  429. pass
  430. with salt.utils.files.fopen(state_file, 'w') as fp_:
  431. fp_.write(textwrap.dedent('''\
  432. {0}:
  433. file.managed:
  434. - source: {1}
  435. - makedirs: True
  436. '''.format(funny_file, funny_url)))
  437. ret = self.run_function('state.sls', [state_name])
  438. self.assertTrue(ret[state_key]['result'])
  439. def test_managed_contents(self):
  440. '''
  441. test file.managed with contents that is a boolean, string, integer,
  442. float, list, and dictionary
  443. '''
  444. state_name = 'file-FileTest-test_managed_contents'
  445. state_filename = state_name + '.sls'
  446. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  447. managed_files = {}
  448. state_keys = {}
  449. for typ in ('bool', 'str', 'int', 'float', 'list', 'dict'):
  450. managed_files[typ] = salt.utils.files.mkstemp()
  451. state_keys[typ] = 'file_|-{0} file_|-{1}_|-managed'.format(typ, managed_files[typ])
  452. try:
  453. with salt.utils.files.fopen(state_file, 'w') as fd_:
  454. fd_.write(textwrap.dedent('''\
  455. bool file:
  456. file.managed:
  457. - name: {bool}
  458. - contents: True
  459. str file:
  460. file.managed:
  461. - name: {str}
  462. - contents: Salt was here.
  463. int file:
  464. file.managed:
  465. - name: {int}
  466. - contents: 340282366920938463463374607431768211456
  467. float file:
  468. file.managed:
  469. - name: {float}
  470. - contents: 1.7518e-45 # gravitational coupling constant
  471. list file:
  472. file.managed:
  473. - name: {list}
  474. - contents: [1, 1, 2, 3, 5, 8, 13]
  475. dict file:
  476. file.managed:
  477. - name: {dict}
  478. - contents:
  479. C: charge
  480. P: parity
  481. T: time
  482. '''.format(**managed_files)))
  483. ret = self.run_function('state.sls', [state_name])
  484. self.assertSaltTrueReturn(ret)
  485. for typ in state_keys:
  486. self.assertTrue(ret[state_keys[typ]]['result'])
  487. self.assertIn('diff', ret[state_keys[typ]]['changes'])
  488. finally:
  489. if os.path.exists(state_file):
  490. os.remove(state_file)
  491. for typ in managed_files:
  492. if os.path.exists(managed_files[typ]):
  493. os.remove(managed_files[typ])
  494. def test_managed_contents_with_contents_newline(self):
  495. '''
  496. test file.managed with contents by using the default content_newline
  497. flag.
  498. '''
  499. contents = 'test_managed_contents_with_newline_one'
  500. name = os.path.join(RUNTIME_VARS.TMP, 'foo')
  501. # Create a file named foo with contents as above but with a \n at EOF
  502. self.run_state('file.managed', name=name, contents=contents,
  503. contents_newline=True)
  504. with salt.utils.files.fopen(name, 'r') as fp_:
  505. last_line = fp_.read()
  506. self.assertEqual((contents + os.linesep), last_line)
  507. def test_managed_contents_with_contents_newline_false(self):
  508. '''
  509. test file.managed with contents by using the non default content_newline
  510. flag.
  511. '''
  512. contents = 'test_managed_contents_with_newline_one'
  513. name = os.path.join(RUNTIME_VARS.TMP, 'bar')
  514. # Create a file named foo with contents as above but with a \n at EOF
  515. self.run_state('file.managed', name=name, contents=contents,
  516. contents_newline=False)
  517. with salt.utils.files.fopen(name, 'r') as fp_:
  518. last_line = fp_.read()
  519. self.assertEqual(contents, last_line)
  520. def test_managed_multiline_contents_with_contents_newline(self):
  521. '''
  522. test file.managed with contents by using the non default content_newline
  523. flag.
  524. '''
  525. contents = ('this is a cookie{}this is another cookie'.
  526. format(os.linesep))
  527. name = os.path.join(RUNTIME_VARS.TMP, 'bar')
  528. # Create a file named foo with contents as above but with a \n at EOF
  529. self.run_state('file.managed', name=name, contents=contents,
  530. contents_newline=True)
  531. with salt.utils.files.fopen(name, 'r') as fp_:
  532. last_line = fp_.read()
  533. self.assertEqual((contents + os.linesep), last_line)
  534. def test_managed_multiline_contents_with_contents_newline_false(self):
  535. '''
  536. test file.managed with contents by using the non default content_newline
  537. flag.
  538. '''
  539. contents = ('this is a cookie{}this is another cookie'.
  540. format(os.linesep))
  541. name = os.path.join(RUNTIME_VARS.TMP, 'bar')
  542. # Create a file named foo with contents as above but with a \n at EOF
  543. self.run_state('file.managed', name=name, contents=contents,
  544. contents_newline=False)
  545. with salt.utils.files.fopen(name, 'r') as fp_:
  546. last_line = fp_.read()
  547. self.assertEqual(contents, last_line)
  548. @pytest.mark.skip_if_not_root
  549. @skipIf(IS_WINDOWS, 'Windows does not support "mode" kwarg. Skipping.')
  550. @skipIf(not salt.utils.path.which('visudo'), 'sudo is missing')
  551. def test_managed_check_cmd(self):
  552. '''
  553. Test file.managed passing a basic check_cmd kwarg. See Issue #38111.
  554. '''
  555. r_group = 'root'
  556. if salt.utils.platform.is_darwin():
  557. r_group = 'wheel'
  558. try:
  559. ret = self.run_state(
  560. 'file.managed',
  561. name='/tmp/sudoers',
  562. user='root',
  563. group=r_group,
  564. mode=440,
  565. check_cmd='visudo -c -s -f'
  566. )
  567. self.assertSaltTrueReturn(ret)
  568. self.assertInSaltComment('Empty file', ret)
  569. self.assertEqual(ret['file_|-/tmp/sudoers_|-/tmp/sudoers_|-managed']['changes'],
  570. {'new': 'file /tmp/sudoers created', 'mode': '0440'})
  571. finally:
  572. # Clean Up File
  573. if os.path.exists('/tmp/sudoers'):
  574. os.remove('/tmp/sudoers')
  575. def test_managed_local_source_with_source_hash(self):
  576. '''
  577. Make sure that we enforce the source_hash even with local files
  578. '''
  579. name = os.path.join(RUNTIME_VARS.TMP, 'local_source_with_source_hash')
  580. local_path = os.path.join(RUNTIME_VARS.BASE_FILES, 'grail', 'scene33')
  581. actual_hash = '567fd840bf1548edc35c48eb66cdd78bfdfcccff'
  582. if IS_WINDOWS:
  583. # CRLF vs LF causes a different hash on windows
  584. actual_hash = 'f658a0ec121d9c17088795afcc6ff3c43cb9842a'
  585. # Reverse the actual hash
  586. bad_hash = actual_hash[::-1]
  587. def remove_file():
  588. try:
  589. os.remove(name)
  590. except OSError as exc:
  591. if exc.errno != errno.ENOENT:
  592. raise
  593. def do_test(clean=False):
  594. for proto in ('file://', ''):
  595. source = proto + local_path
  596. log.debug('Trying source %s', source)
  597. try:
  598. ret = self.run_state(
  599. 'file.managed',
  600. name=name,
  601. source=source,
  602. source_hash='sha1={0}'.format(bad_hash))
  603. self.assertSaltFalseReturn(ret)
  604. ret = ret[next(iter(ret))]
  605. # Shouldn't be any changes
  606. self.assertFalse(ret['changes'])
  607. # Check that we identified a hash mismatch
  608. self.assertIn(
  609. 'does not match actual checksum', ret['comment'])
  610. ret = self.run_state(
  611. 'file.managed',
  612. name=name,
  613. source=source,
  614. source_hash='sha1={0}'.format(actual_hash))
  615. self.assertSaltTrueReturn(ret)
  616. finally:
  617. if clean:
  618. remove_file()
  619. remove_file()
  620. log.debug('Trying with nonexistant destination file')
  621. do_test()
  622. log.debug('Trying with destination file already present')
  623. with salt.utils.files.fopen(name, 'w'):
  624. pass
  625. try:
  626. do_test(clean=False)
  627. finally:
  628. remove_file()
  629. def test_managed_local_source_does_not_exist(self):
  630. '''
  631. Make sure that we exit gracefully when a local source doesn't exist
  632. '''
  633. name = os.path.join(RUNTIME_VARS.TMP, 'local_source_does_not_exist')
  634. local_path = os.path.join(RUNTIME_VARS.BASE_FILES, 'grail', 'scene99')
  635. for proto in ('file://', ''):
  636. source = proto + local_path
  637. log.debug('Trying source %s', source)
  638. ret = self.run_state(
  639. 'file.managed',
  640. name=name,
  641. source=source)
  642. self.assertSaltFalseReturn(ret)
  643. ret = ret[next(iter(ret))]
  644. # Shouldn't be any changes
  645. self.assertFalse(ret['changes'])
  646. # Check that we identified a hash mismatch
  647. self.assertIn(
  648. 'does not exist', ret['comment'])
  649. def test_managed_unicode_jinja_with_tojson_filter(self):
  650. '''
  651. Using {{ varname }} with a list or dictionary which contains unicode
  652. types on Python 2 will result in Jinja rendering the "u" prefix on each
  653. string. This tests that using the "tojson" jinja filter will dump them
  654. to a format which can be successfully loaded by our YAML loader.
  655. The two lines that should end up being rendered are meant to test two
  656. issues that would trip up PyYAML if the "tojson" filter were not used:
  657. 1. A unicode string type would be loaded as a unicode literal with the
  658. leading "u" as well as the quotes, rather than simply being loaded
  659. as the proper unicode type which matches the content of the string
  660. literal. In other words, u'foo' would be loaded literally as
  661. u"u'foo'". This test includes actual non-ascii unicode in one of the
  662. strings to confirm that this also handles these international
  663. characters properly.
  664. 2. Any unicode string type (such as a URL) which contains a colon would
  665. cause a ScannerError in PyYAML, as it would be assumed to delimit a
  666. mapping node.
  667. Dumping the data structure to JSON using the "tojson" jinja filter
  668. should produce an inline data structure which is valid YAML and will be
  669. loaded properly by our YAML loader.
  670. '''
  671. test_file = os.path.join(RUNTIME_VARS.TMP, 'test-tojson.txt')
  672. ret = self.run_function(
  673. 'state.apply',
  674. mods='tojson',
  675. pillar={'tojson-file': test_file})
  676. ret = ret[next(iter(ret))]
  677. assert ret['result'], ret
  678. with salt.utils.files.fopen(test_file, mode='rb') as fp_:
  679. managed = salt.utils.stringutils.to_unicode(fp_.read())
  680. expected = dedent('''\
  681. Die Webseite ist https://saltstack.com.
  682. Der Zucker ist süß.
  683. ''')
  684. assert managed == expected, '{0!r} != {1!r}'.format(managed, expected) # pylint: disable=repr-flag-used-in-string
  685. def test_managed_source_hash_indifferent_case(self):
  686. '''
  687. Test passing a source_hash as an uppercase hash.
  688. This is a regression test for Issue #38914 and Issue #48230 (test=true use).
  689. '''
  690. name = os.path.join(RUNTIME_VARS.TMP, 'source_hash_indifferent_case')
  691. state_name = 'file_|-{0}_|' \
  692. '-{0}_|-managed'.format(name)
  693. local_path = os.path.join(RUNTIME_VARS.BASE_FILES, 'hello_world.txt')
  694. actual_hash = 'c98c24b677eff44860afea6f493bbaec5bb1c4cbb209c6fc2bbb47f66ff2ad31'
  695. if IS_WINDOWS:
  696. # CRLF vs LF causes a differnt hash on windows
  697. actual_hash = '92b772380a3f8e27a93e57e6deeca6c01da07f5aadce78bb2fbb20de10a66925'
  698. uppercase_hash = actual_hash.upper()
  699. try:
  700. # Lay down tmp file to test against
  701. self.run_state(
  702. 'file.managed',
  703. name=name,
  704. source=local_path,
  705. source_hash=actual_hash
  706. )
  707. # Test uppercase source_hash: should return True with no changes
  708. ret = self.run_state(
  709. 'file.managed',
  710. name=name,
  711. source=local_path,
  712. source_hash=uppercase_hash
  713. )
  714. assert ret[state_name]['result'] is True
  715. assert ret[state_name]['changes'] == {}
  716. # Test uppercase source_hash using test=true
  717. # Should return True with no changes
  718. ret = self.run_state(
  719. 'file.managed',
  720. name=name,
  721. source=local_path,
  722. source_hash=uppercase_hash,
  723. test=True
  724. )
  725. assert ret[state_name]['result'] is True
  726. assert ret[state_name]['changes'] == {}
  727. finally:
  728. # Clean Up File
  729. if os.path.exists(name):
  730. os.remove(name)
  731. @with_tempfile(create=False)
  732. def test_managed_latin1_diff(self, name):
  733. '''
  734. Tests that latin-1 file contents are represented properly in the diff
  735. '''
  736. # Lay down the initial file
  737. ret = self.run_state(
  738. 'file.managed',
  739. name=name,
  740. source='salt://issue-48777/old.html')
  741. ret = ret[next(iter(ret))]
  742. assert ret['result'] is True, ret
  743. # Replace it with the new file and check the diff
  744. ret = self.run_state(
  745. 'file.managed',
  746. name=name,
  747. source='salt://issue-48777/new.html')
  748. ret = ret[next(iter(ret))]
  749. assert ret['result'] is True, ret
  750. diff_lines = ret['changes']['diff'].split(os.linesep)
  751. assert '+räksmörgås' in diff_lines, diff_lines
  752. @with_tempfile()
  753. def test_managed_keep_source_false_salt(self, name):
  754. '''
  755. This test ensures that we properly clean the cached file if keep_source
  756. is set to False, for source files using a salt:// URL
  757. '''
  758. source = 'salt://grail/scene33'
  759. saltenv = 'base'
  760. # Run the state
  761. ret = self.run_state(
  762. 'file.managed',
  763. name=name,
  764. source=source,
  765. saltenv=saltenv,
  766. keep_source=False)
  767. ret = ret[next(iter(ret))]
  768. assert ret['result'] is True
  769. # Now make sure that the file is not cached
  770. result = self.run_function('cp.is_cached', [source, saltenv])
  771. assert result == '', 'File is still cached at {0}'.format(result)
  772. @with_tempfile(create=False)
  773. @with_tempfile(create=False)
  774. def test_file_managed_onchanges(self, file1, file2):
  775. '''
  776. Test file.managed state with onchanges
  777. '''
  778. pillar = {'file1': file1,
  779. 'file2': file2,
  780. 'source': 'salt://testfile',
  781. 'req': 'onchanges'}
  782. # Lay down the file used in the below SLS to ensure that when it is
  783. # run, there are no changes.
  784. self.run_state(
  785. 'file.managed',
  786. name=pillar['file2'],
  787. source=pillar['source'])
  788. ret = self.repack_state_returns(
  789. self.run_function(
  790. 'state.apply',
  791. mods='onchanges_prereq',
  792. pillar=pillar,
  793. test=True,
  794. )
  795. )
  796. # The file states should both exit with None
  797. assert ret['one']['result'] is None, ret['one']['result']
  798. assert ret['three']['result'] is True, ret['three']['result']
  799. # The first file state should have changes, since a new file was
  800. # created. The other one should not, since we already created that file
  801. # before applying the SLS file.
  802. assert ret['one']['changes']
  803. assert not ret['three']['changes'], ret['three']['changes']
  804. # The state watching 'one' should have been run due to changes
  805. assert ret['two']['comment'] == 'Success!', ret['two']['comment']
  806. # The state watching 'three' should not have been run
  807. assert ret['four']['comment'] == \
  808. 'State was not run because none of the onchanges reqs changed', \
  809. ret['four']['comment']
  810. @with_tempfile(create=False)
  811. @with_tempfile(create=False)
  812. def test_file_managed_prereq(self, file1, file2):
  813. '''
  814. Test file.managed state with prereq
  815. '''
  816. pillar = {'file1': file1,
  817. 'file2': file2,
  818. 'source': 'salt://testfile',
  819. 'req': 'prereq'}
  820. # Lay down the file used in the below SLS to ensure that when it is
  821. # run, there are no changes.
  822. self.run_state(
  823. 'file.managed',
  824. name=pillar['file2'],
  825. source=pillar['source'])
  826. ret = self.repack_state_returns(
  827. self.run_function(
  828. 'state.apply',
  829. mods='onchanges_prereq',
  830. pillar=pillar,
  831. test=True,
  832. )
  833. )
  834. # The file states should both exit with None
  835. assert ret['one']['result'] is None, ret['one']['result']
  836. assert ret['three']['result'] is True, ret['three']['result']
  837. # The first file state should have changes, since a new file was
  838. # created. The other one should not, since we already created that file
  839. # before applying the SLS file.
  840. assert ret['one']['changes']
  841. assert not ret['three']['changes'], ret['three']['changes']
  842. # The state watching 'one' should have been run due to changes
  843. assert ret['two']['comment'] == 'Success!', ret['two']['comment']
  844. # The state watching 'three' should not have been run
  845. assert ret['four']['comment'] == 'No changes detected', \
  846. ret['four']['comment']
  847. def test_directory(self):
  848. '''
  849. file.directory
  850. '''
  851. name = os.path.join(RUNTIME_VARS.TMP, 'a_new_dir')
  852. ret = self.run_state('file.directory', name=name)
  853. self.assertSaltTrueReturn(ret)
  854. self.assertTrue(os.path.isdir(name))
  855. def test_directory_symlink_dry_run(self):
  856. '''
  857. Ensure that symlinks are followed when file.directory is run with
  858. test=True
  859. '''
  860. try:
  861. tmp_dir = os.path.join(RUNTIME_VARS.TMP, 'pgdata')
  862. sym_dir = os.path.join(RUNTIME_VARS.TMP, 'pg_data')
  863. os.mkdir(tmp_dir, 0o700)
  864. self.run_function('file.symlink', [tmp_dir, sym_dir])
  865. if IS_WINDOWS:
  866. ret = self.run_state(
  867. 'file.directory', test=True, name=sym_dir,
  868. follow_symlinks=True, win_owner='Administrators')
  869. else:
  870. ret = self.run_state(
  871. 'file.directory', test=True, name=sym_dir,
  872. follow_symlinks=True, mode=700)
  873. self.assertSaltTrueReturn(ret)
  874. finally:
  875. if os.path.isdir(tmp_dir):
  876. self.run_function('file.remove', [tmp_dir])
  877. if os.path.islink(sym_dir):
  878. self.run_function('file.remove', [sym_dir])
  879. @pytest.mark.skip_if_not_root
  880. @skipIf(IS_WINDOWS, 'Mode not available in Windows')
  881. def test_directory_max_depth(self):
  882. '''
  883. file.directory
  884. Test the max_depth option by iteratively increasing the depth and
  885. checking that no changes deeper than max_depth have been attempted
  886. '''
  887. def _get_oct_mode(name):
  888. '''
  889. Return a string octal representation of the permissions for name
  890. '''
  891. return salt.utils.files.normalize_mode(oct(os.stat(name).st_mode & 0o777))
  892. top = os.path.join(RUNTIME_VARS.TMP, 'top_dir')
  893. sub = os.path.join(top, 'sub_dir')
  894. subsub = os.path.join(sub, 'sub_sub_dir')
  895. dirs = [top, sub, subsub]
  896. initial_mode = '0111'
  897. changed_mode = '0555'
  898. initial_modes = {0: {sub: '0755',
  899. subsub: '0111'},
  900. 1: {sub: '0111',
  901. subsub: '0111'},
  902. 2: {sub: '0111',
  903. subsub: '0111'}}
  904. if not os.path.isdir(subsub):
  905. os.makedirs(subsub, int(initial_mode, 8))
  906. try:
  907. for depth in range(0, 3):
  908. ret = self.run_state('file.directory',
  909. name=top,
  910. max_depth=depth,
  911. dir_mode=changed_mode,
  912. recurse=['mode'])
  913. self.assertSaltTrueReturn(ret)
  914. for changed_dir in dirs[0:depth+1]:
  915. self.assertEqual(changed_mode,
  916. _get_oct_mode(changed_dir))
  917. for untouched_dir in dirs[depth+1:]:
  918. # Beginning in Python 3.7, os.makedirs no longer sets
  919. # the mode of intermediate directories to the mode that
  920. # is passed.
  921. if sys.version_info >= (3, 7):
  922. _mode = initial_modes[depth][untouched_dir]
  923. self.assertEqual(_mode,
  924. _get_oct_mode(untouched_dir))
  925. else:
  926. self.assertEqual(initial_mode,
  927. _get_oct_mode(untouched_dir))
  928. finally:
  929. shutil.rmtree(top)
  930. def test_test_directory(self):
  931. '''
  932. file.directory
  933. '''
  934. name = os.path.join(RUNTIME_VARS.TMP, 'a_not_dir')
  935. ret = self.run_state('file.directory', test=True, name=name)
  936. self.assertSaltNoneReturn(ret)
  937. self.assertFalse(os.path.isdir(name))
  938. @with_tempdir()
  939. def test_directory_clean(self, base_dir):
  940. '''
  941. file.directory with clean=True
  942. '''
  943. name = os.path.join(base_dir, 'directory_clean_dir')
  944. os.mkdir(name)
  945. strayfile = os.path.join(name, 'strayfile')
  946. with salt.utils.files.fopen(strayfile, 'w'):
  947. pass
  948. straydir = os.path.join(name, 'straydir')
  949. if not os.path.isdir(straydir):
  950. os.makedirs(straydir)
  951. with salt.utils.files.fopen(os.path.join(straydir, 'strayfile2'), 'w'):
  952. pass
  953. ret = self.run_state('file.directory', name=name, clean=True)
  954. self.assertSaltTrueReturn(ret)
  955. self.assertFalse(os.path.exists(strayfile))
  956. self.assertFalse(os.path.exists(straydir))
  957. self.assertTrue(os.path.isdir(name))
  958. def test_directory_is_idempotent(self):
  959. '''
  960. Ensure the file.directory state produces no changes when rerun.
  961. '''
  962. name = os.path.join(RUNTIME_VARS.TMP, 'a_dir_twice')
  963. if IS_WINDOWS:
  964. username = os.environ.get('USERNAME', 'Administrators')
  965. domain = os.environ.get('USERDOMAIN', '')
  966. fullname = '{0}\\{1}'.format(domain, username)
  967. ret = self.run_state('file.directory', name=name, win_owner=fullname)
  968. else:
  969. ret = self.run_state('file.directory', name=name)
  970. self.assertSaltTrueReturn(ret)
  971. if IS_WINDOWS:
  972. ret = self.run_state('file.directory', name=name, win_owner=username)
  973. else:
  974. ret = self.run_state('file.directory', name=name)
  975. self.assertSaltTrueReturn(ret)
  976. self.assertSaltStateChangesEqual(ret, {})
  977. @with_tempdir()
  978. def test_directory_clean_exclude(self, base_dir):
  979. '''
  980. file.directory with clean=True and exclude_pat set
  981. '''
  982. name = os.path.join(base_dir, 'directory_clean_dir')
  983. if not os.path.isdir(name):
  984. os.makedirs(name)
  985. strayfile = os.path.join(name, 'strayfile')
  986. with salt.utils.files.fopen(strayfile, 'w'):
  987. pass
  988. straydir = os.path.join(name, 'straydir')
  989. if not os.path.isdir(straydir):
  990. os.makedirs(straydir)
  991. strayfile2 = os.path.join(straydir, 'strayfile2')
  992. with salt.utils.files.fopen(strayfile2, 'w'):
  993. pass
  994. keepfile = os.path.join(straydir, 'keepfile')
  995. with salt.utils.files.fopen(keepfile, 'w'):
  996. pass
  997. exclude_pat = 'E@^straydir(|/keepfile)$'
  998. if IS_WINDOWS:
  999. exclude_pat = 'E@^straydir(|\\\\keepfile)$'
  1000. ret = self.run_state('file.directory',
  1001. name=name,
  1002. clean=True,
  1003. exclude_pat=exclude_pat)
  1004. self.assertSaltTrueReturn(ret)
  1005. self.assertFalse(os.path.exists(strayfile))
  1006. self.assertFalse(os.path.exists(strayfile2))
  1007. self.assertTrue(os.path.exists(keepfile))
  1008. @skipIf(IS_WINDOWS, 'Skip on windows')
  1009. @with_tempdir()
  1010. def test_test_directory_clean_exclude(self, base_dir):
  1011. '''
  1012. file.directory with test=True, clean=True and exclude_pat set
  1013. Skipped on windows because clean and exclude_pat not supported by
  1014. salt.sates.file._check_directory_win
  1015. '''
  1016. name = os.path.join(base_dir, 'directory_clean_dir')
  1017. os.mkdir(name)
  1018. strayfile = os.path.join(name, 'strayfile')
  1019. with salt.utils.files.fopen(strayfile, 'w'):
  1020. pass
  1021. straydir = os.path.join(name, 'straydir')
  1022. if not os.path.isdir(straydir):
  1023. os.makedirs(straydir)
  1024. strayfile2 = os.path.join(straydir, 'strayfile2')
  1025. with salt.utils.files.fopen(strayfile2, 'w'):
  1026. pass
  1027. keepfile = os.path.join(straydir, 'keepfile')
  1028. with salt.utils.files.fopen(keepfile, 'w'):
  1029. pass
  1030. exclude_pat = 'E@^straydir(|/keepfile)$'
  1031. if IS_WINDOWS:
  1032. exclude_pat = 'E@^straydir(|\\\\keepfile)$'
  1033. ret = self.run_state('file.directory',
  1034. test=True,
  1035. name=name,
  1036. clean=True,
  1037. exclude_pat=exclude_pat)
  1038. comment = next(six.itervalues(ret))['comment']
  1039. self.assertSaltNoneReturn(ret)
  1040. self.assertTrue(os.path.exists(strayfile))
  1041. self.assertTrue(os.path.exists(strayfile2))
  1042. self.assertTrue(os.path.exists(keepfile))
  1043. self.assertIn(strayfile, comment)
  1044. self.assertIn(strayfile2, comment)
  1045. self.assertNotIn(keepfile, comment)
  1046. @with_tempdir()
  1047. def test_directory_clean_require_in(self, name):
  1048. '''
  1049. file.directory test with clean=True and require_in file
  1050. '''
  1051. state_name = 'file-FileTest-test_directory_clean_require_in'
  1052. state_filename = state_name + '.sls'
  1053. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  1054. wrong_file = os.path.join(name, "wrong")
  1055. with salt.utils.files.fopen(wrong_file, "w") as fp:
  1056. fp.write("foo")
  1057. good_file = os.path.join(name, "bar")
  1058. with salt.utils.files.fopen(state_file, 'w') as fp:
  1059. self.addCleanup(lambda: os.remove(state_file))
  1060. fp.write(textwrap.dedent('''\
  1061. some_dir:
  1062. file.directory:
  1063. - name: {name}
  1064. - clean: true
  1065. {good_file}:
  1066. file.managed:
  1067. - require_in:
  1068. - file: some_dir
  1069. '''.format(name=name, good_file=good_file)))
  1070. ret = self.run_function('state.sls', [state_name])
  1071. self.assertTrue(os.path.exists(good_file))
  1072. self.assertFalse(os.path.exists(wrong_file))
  1073. @with_tempdir()
  1074. def test_directory_clean_require_in_with_id(self, name):
  1075. '''
  1076. file.directory test with clean=True and require_in file with an ID
  1077. different from the file name
  1078. '''
  1079. state_name = 'file-FileTest-test_directory_clean_require_in_with_id'
  1080. state_filename = state_name + '.sls'
  1081. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  1082. wrong_file = os.path.join(name, "wrong")
  1083. with salt.utils.files.fopen(wrong_file, "w") as fp:
  1084. fp.write("foo")
  1085. good_file = os.path.join(name, "bar")
  1086. with salt.utils.files.fopen(state_file, 'w') as fp:
  1087. self.addCleanup(lambda: os.remove(state_file))
  1088. fp.write(textwrap.dedent('''\
  1089. some_dir:
  1090. file.directory:
  1091. - name: {name}
  1092. - clean: true
  1093. some_file:
  1094. file.managed:
  1095. - name: {good_file}
  1096. - require_in:
  1097. - file: some_dir
  1098. '''.format(name=name, good_file=good_file)))
  1099. ret = self.run_function('state.sls', [state_name])
  1100. self.assertTrue(os.path.exists(good_file))
  1101. self.assertFalse(os.path.exists(wrong_file))
  1102. @skipIf(salt.utils.platform.is_darwin(), 'WAR ROOM TEMPORARY SKIP, Test is flaky on macosx')
  1103. @with_tempdir()
  1104. def test_directory_clean_require_with_name(self, name):
  1105. '''
  1106. file.directory test with clean=True and require with a file state
  1107. relatively to the state's name, not its ID.
  1108. '''
  1109. state_name = 'file-FileTest-test_directory_clean_require_in_with_id'
  1110. state_filename = state_name + '.sls'
  1111. state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
  1112. wrong_file = os.path.join(name, "wrong")
  1113. with salt.utils.files.fopen(wrong_file, "w") as fp:
  1114. fp.write("foo")
  1115. good_file = os.path.join(name, "bar")
  1116. with salt.utils.files.fopen(state_file, 'w') as fp:
  1117. self.addCleanup(lambda: os.remove(state_file))
  1118. fp.write(textwrap.dedent('''\
  1119. some_dir:
  1120. file.directory:
  1121. - name: {name}
  1122. - clean: true
  1123. - require:
  1124. # This requirement refers to the name of the following
  1125. # state, not its ID.
  1126. - file: {good_file}
  1127. some_file:
  1128. file.managed:
  1129. - name: {good_file}
  1130. '''.format(name=name, good_file=good_file)))
  1131. ret = self.run_function('state.sls', [state_name])
  1132. self.assertTrue(os.path.exists(good_file))
  1133. self.assertFalse(os.path.exists(wrong_file))
  1134. def test_directory_broken_symlink(self):
  1135. '''
  1136. Ensure that file.directory works even if a directory
  1137. contains broken symbolic link
  1138. '''
  1139. try:
  1140. tmp_dir = os.path.join(RUNTIME_VARS.TMP, 'foo')
  1141. null_file = '{0}/null'.format(tmp_dir)
  1142. broken_link = '{0}/broken'.format(tmp_dir)
  1143. os.mkdir(tmp_dir, 0o700)
  1144. self.run_function('file.symlink', [null_file, broken_link])
  1145. if IS_WINDOWS:
  1146. ret = self.run_state(
  1147. 'file.directory', name=tmp_dir, recurse=['mode'],
  1148. follow_symlinks=True, win_owner='Administrators')
  1149. else:
  1150. ret = self.run_state(
  1151. 'file.directory', name=tmp_dir, recurse=['mode'],
  1152. file_mode=644, dir_mode=755)
  1153. self.assertSaltTrueReturn(ret)
  1154. finally:
  1155. if os.path.isdir(tmp_dir):
  1156. self.run_function('file.remove', [tmp_dir])
  1157. @with_tempdir(create=False)
  1158. def test_recurse(self, name):
  1159. '''
  1160. file.recurse
  1161. '''
  1162. ret = self.run_state('file.recurse', name=name, source='salt://grail')
  1163. self.assertSaltTrueReturn(ret)
  1164. self.assertTrue(os.path.isfile(os.path.join(name, '36', 'scene')))
  1165. @with_tempdir(create=False)
  1166. @with_tempdir(create=False)
  1167. def test_recurse_specific_env(self, dir1, dir2):
  1168. '''
  1169. file.recurse passing __env__
  1170. '''
  1171. ret = self.run_state('file.recurse',
  1172. name=dir1,
  1173. source='salt://holy',
  1174. __env__='prod')
  1175. self.assertSaltTrueReturn(ret)
  1176. self.assertTrue(os.path.isfile(os.path.join(dir1, '32', 'scene')))
  1177. ret = self.run_state('file.recurse',
  1178. name=dir2,
  1179. source='salt://holy',
  1180. saltenv='prod')
  1181. self.assertSaltTrueReturn(ret)
  1182. self.assertTrue(os.path.isfile(os.path.join(dir2, '32', 'scene')))
  1183. @with_tempdir(create=False)
  1184. @with_tempdir(create=False)
  1185. def test_recurse_specific_env_in_url(self, dir1, dir2):
  1186. '''
  1187. file.recurse passing __env__
  1188. '''
  1189. ret = self.run_state('file.recurse',
  1190. name=dir1,
  1191. source='salt://holy?saltenv=prod')
  1192. self.assertSaltTrueReturn(ret)
  1193. self.assertTrue(os.path.isfile(os.path.join(dir1, '32', 'scene')))
  1194. ret = self.run_state('file.recurse',
  1195. name=dir2,
  1196. source='salt://holy?saltenv=prod')
  1197. self.assertSaltTrueReturn(ret)
  1198. self.assertTrue(os.path.isfile(os.path.join(dir2, '32', 'scene')))
  1199. @with_tempdir(create=False)
  1200. def test_test_recurse(self, name):
  1201. '''
  1202. file.recurse test interface
  1203. '''
  1204. ret = self.run_state(
  1205. 'file.recurse', test=True, name=name, source='salt://grail',
  1206. )
  1207. self.assertSaltNoneReturn(ret)
  1208. self.assertFalse(os.path.isfile(os.path.join(name, '36', 'scene')))
  1209. self.assertFalse(os.path.exists(name))
  1210. @with_tempdir(create=False)
  1211. @with_tempdir(create=False)
  1212. def test_test_recurse_specific_env(self, dir1, dir2):
  1213. '''
  1214. file.recurse test interface
  1215. '''
  1216. ret = self.run_state('file.recurse',
  1217. test=True,
  1218. name=dir1,
  1219. source='salt://holy',
  1220. __env__='prod'
  1221. )
  1222. self.assertSaltNoneReturn(ret)
  1223. self.assertFalse(os.path.isfile(os.path.join(dir1, '32', 'scene')))
  1224. self.assertFalse(os.path.exists(dir1))
  1225. ret = self.run_state('file.recurse',
  1226. test=True,
  1227. name=dir2,
  1228. source='salt://holy',
  1229. saltenv='prod'
  1230. )
  1231. self.assertSaltNoneReturn(ret)
  1232. self.assertFalse(os.path.isfile(os.path.join(dir2, '32', 'scene')))
  1233. self.assertFalse(os.path.exists(dir2))
  1234. @with_tempdir(create=False)
  1235. def test_recurse_template(self, name):
  1236. '''
  1237. file.recurse with jinja template enabled
  1238. '''
  1239. _ts = 'TEMPLATE TEST STRING'
  1240. ret = self.run_state(
  1241. 'file.recurse', name=name, source='salt://grail',
  1242. template='jinja', defaults={'spam': _ts})
  1243. self.assertSaltTrueReturn(ret)
  1244. with salt.utils.files.fopen(os.path.join(name, 'scene33'), 'r') as fp_:
  1245. contents = fp_.read()
  1246. self.assertIn(_ts, contents)
  1247. @with_tempdir()
  1248. def test_recurse_clean(self, name):
  1249. '''
  1250. file.recurse with clean=True
  1251. '''
  1252. strayfile = os.path.join(name, 'strayfile')
  1253. with salt.utils.files.fopen(strayfile, 'w'):
  1254. pass
  1255. # Corner cases: replacing file with a directory and vice versa
  1256. with salt.utils.files.fopen(os.path.join(name, '36'), 'w'):
  1257. pass
  1258. os.makedirs(os.path.join(name, 'scene33'))
  1259. ret = self.run_state(
  1260. 'file.recurse', name=name, source='salt://grail', clean=True)
  1261. self.assertSaltTrueReturn(ret)
  1262. self.assertFalse(os.path.exists(strayfile))
  1263. self.assertTrue(os.path.isfile(os.path.join(name, '36', 'scene')))
  1264. self.assertTrue(os.path.isfile(os.path.join(name, 'scene33')))
  1265. @with_tempdir()
  1266. def test_recurse_clean_specific_env(self, name):
  1267. '''
  1268. file.recurse with clean=True and __env__=prod
  1269. '''
  1270. strayfile = os.path.join(name, 'strayfile')
  1271. with salt.utils.files.fopen(strayfile, 'w'):
  1272. pass
  1273. # Corner cases: replacing file with a directory and vice versa
  1274. with salt.utils.files.fopen(os.path.join(name, '32'), 'w'):
  1275. pass
  1276. os.makedirs(os.path.join(name, 'scene34'))
  1277. ret = self.run_state('file.recurse',
  1278. name=name,
  1279. source='salt://holy',
  1280. clean=True,
  1281. __env__='prod')
  1282. self.assertSaltTrueReturn(ret)
  1283. self.assertFalse(os.path.exists(strayfile))
  1284. self.assertTrue(os.path.isfile(os.path.join(name, '32', 'scene')))
  1285. self.assertTrue(os.path.isfile(os.path.join(name, 'scene34')))
  1286. @skipIf(IS_WINDOWS, 'Skip on windows')
  1287. @with_tempdir()
  1288. def test_recurse_issue_34945(self, base_dir):
  1289. '''
  1290. This tests the case where the source dir for the file.recurse state
  1291. does not contain any files (only subdirectories), and the dir_mode is
  1292. being managed. For a long time, this corner case resulted in the top
  1293. level of the destination directory being created with the wrong initial
  1294. permissions, a problem that would be corrected later on in the
  1295. file.recurse state via running state.directory. However, the
  1296. file.directory state only gets called when there are files to be
  1297. managed in that directory, and when the source directory contains only
  1298. subdirectories, the incorrectly-set initial perms would not be
  1299. repaired.
  1300. This was fixed in https://github.com/saltstack/salt/pull/35309
  1301. Skipped on windows because dir_mode is not supported.
  1302. '''
  1303. dir_mode = '2775'
  1304. issue_dir = 'issue-34945'
  1305. name = os.path.join(base_dir, issue_dir)
  1306. ret = self.run_state('file.recurse',
  1307. name=name,
  1308. source='salt://' + issue_dir,
  1309. dir_mode=dir_mode)
  1310. self.assertSaltTrueReturn(ret)
  1311. actual_dir_mode = oct(stat.S_IMODE(os.stat(name).st_mode))[-4:]
  1312. self.assertEqual(dir_mode, actual_dir_mode)
  1313. @with_tempdir(create=False)
  1314. def test_recurse_issue_40578(self, name):
  1315. '''
  1316. This ensures that the state doesn't raise an exception when it
  1317. encounters a file with a unicode filename in the process of invoking
  1318. file.source_list.
  1319. '''
  1320. ret = self.run_state('file.recurse',
  1321. name=name,
  1322. source='salt://соль')
  1323. self.assertSaltTrueReturn(ret)
  1324. if six.PY2 and IS_WINDOWS:
  1325. # Providing unicode to os.listdir so that we avoid having listdir
  1326. # try to decode the filenames using the systemencoding on windows
  1327. # python 2.
  1328. files = os.listdir(name.decode('utf-8'))
  1329. else:
  1330. files = salt.utils.data.decode(os.listdir(name), normalize=True)
  1331. self.assertEqual(
  1332. sorted(files),
  1333. sorted(['foo.txt', 'спам.txt', 'яйца.txt']),
  1334. )
  1335. @with_tempfile()
  1336. def test_replace(self, name):
  1337. '''
  1338. file.replace
  1339. '''
  1340. with salt.utils.files.fopen(name, 'w+') as fp_:
  1341. fp_.write('change_me')
  1342. ret = self.run_state('file.replace',
  1343. name=name, pattern='change', repl='salt', backup=False)
  1344. with salt.utils.files.fopen(name, 'r') as fp_:
  1345. self.assertIn('salt', fp_.read())
  1346. self.assertSaltTrueReturn(ret)
  1347. @with_tempdir()
  1348. def test_replace_issue_18612(self, base_dir):
  1349. '''
  1350. Test the (mis-)behaviour of file.replace as described in #18612:
  1351. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1352. an infinitely growing file as 'file.replace' didn't check beforehand
  1353. whether the changes had already been done to the file
  1354. # Case description:
  1355. The tested file contains one commented line
  1356. The commented line should be uncommented in the end, nothing else should change
  1357. '''
  1358. test_name = 'test_replace_issue_18612'
  1359. path_test = os.path.join(base_dir, test_name)
  1360. with salt.utils.files.fopen(path_test, 'w+') as fp_test_:
  1361. fp_test_.write('# en_US.UTF-8')
  1362. ret = []
  1363. for x in range(0, 3):
  1364. ret.append(self.run_state('file.replace',
  1365. name=path_test, pattern='^# en_US.UTF-8$', repl='en_US.UTF-8', append_if_not_found=True))
  1366. # ensure, the number of lines didn't change, even after invoking 'file.replace' 3 times
  1367. with salt.utils.files.fopen(path_test, 'r') as fp_test_:
  1368. self.assertTrue((sum(1 for _ in fp_test_) == 1))
  1369. # ensure, the replacement succeeded
  1370. with salt.utils.files.fopen(path_test, 'r') as fp_test_:
  1371. self.assertTrue(fp_test_.read().startswith('en_US.UTF-8'))
  1372. # ensure, all runs of 'file.replace' reported success
  1373. for item in ret:
  1374. self.assertSaltTrueReturn(item)
  1375. @with_tempdir()
  1376. def test_replace_issue_18612_prepend(self, base_dir):
  1377. '''
  1378. Test the (mis-)behaviour of file.replace as described in #18612:
  1379. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1380. an infinitely growing file as 'file.replace' didn't check beforehand
  1381. whether the changes had already been done to the file
  1382. # Case description:
  1383. The tested multifile contains multiple lines not matching the pattern or replacement in any way
  1384. The replacement pattern should be prepended to the file
  1385. '''
  1386. test_name = 'test_replace_issue_18612_prepend'
  1387. path_in = os.path.join(
  1388. RUNTIME_VARS.FILES, 'file.replace', '{0}.in'.format(test_name)
  1389. )
  1390. path_out = os.path.join(
  1391. RUNTIME_VARS.FILES, 'file.replace', '{0}.out'.format(test_name)
  1392. )
  1393. path_test = os.path.join(base_dir, test_name)
  1394. # create test file based on initial template
  1395. shutil.copyfile(path_in, path_test)
  1396. ret = []
  1397. for x in range(0, 3):
  1398. ret.append(self.run_state('file.replace',
  1399. name=path_test, pattern='^# en_US.UTF-8$', repl='en_US.UTF-8', prepend_if_not_found=True))
  1400. # ensure, the resulting file contains the expected lines
  1401. self.assertTrue(filecmp.cmp(path_test, path_out))
  1402. # ensure the initial file was properly backed up
  1403. self.assertTrue(filecmp.cmp(path_test + '.bak', path_in))
  1404. # ensure, all runs of 'file.replace' reported success
  1405. for item in ret:
  1406. self.assertSaltTrueReturn(item)
  1407. @with_tempdir()
  1408. def test_replace_issue_18612_append(self, base_dir):
  1409. '''
  1410. Test the (mis-)behaviour of file.replace as described in #18612:
  1411. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1412. an infinitely growing file as 'file.replace' didn't check beforehand
  1413. whether the changes had already been done to the file
  1414. # Case description:
  1415. The tested multifile contains multiple lines not matching the pattern or replacement in any way
  1416. The replacement pattern should be appended to the file
  1417. '''
  1418. test_name = 'test_replace_issue_18612_append'
  1419. path_in = os.path.join(
  1420. RUNTIME_VARS.FILES, 'file.replace', '{0}.in'.format(test_name)
  1421. )
  1422. path_out = os.path.join(
  1423. RUNTIME_VARS.FILES, 'file.replace', '{0}.out'.format(test_name)
  1424. )
  1425. path_test = os.path.join(base_dir, test_name)
  1426. # create test file based on initial template
  1427. shutil.copyfile(path_in, path_test)
  1428. ret = []
  1429. for x in range(0, 3):
  1430. ret.append(self.run_state('file.replace',
  1431. name=path_test, pattern='^# en_US.UTF-8$', repl='en_US.UTF-8', append_if_not_found=True))
  1432. # ensure, the resulting file contains the expected lines
  1433. self.assertTrue(filecmp.cmp(path_test, path_out))
  1434. # ensure the initial file was properly backed up
  1435. self.assertTrue(filecmp.cmp(path_test + '.bak', path_in))
  1436. # ensure, all runs of 'file.replace' reported success
  1437. for item in ret:
  1438. self.assertSaltTrueReturn(item)
  1439. @with_tempdir()
  1440. def test_replace_issue_18612_append_not_found_content(self, base_dir):
  1441. '''
  1442. Test the (mis-)behaviour of file.replace as described in #18612:
  1443. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1444. an infinitely growing file as 'file.replace' didn't check beforehand
  1445. whether the changes had already been done to the file
  1446. # Case description:
  1447. The tested multifile contains multiple lines not matching the pattern or replacement in any way
  1448. The 'not_found_content' value should be appended to the file
  1449. '''
  1450. test_name = 'test_replace_issue_18612_append_not_found_content'
  1451. path_in = os.path.join(
  1452. RUNTIME_VARS.FILES, 'file.replace', '{0}.in'.format(test_name)
  1453. )
  1454. path_out = os.path.join(
  1455. RUNTIME_VARS.FILES, 'file.replace', '{0}.out'.format(test_name)
  1456. )
  1457. path_test = os.path.join(base_dir, test_name)
  1458. # create test file based on initial template
  1459. shutil.copyfile(path_in, path_test)
  1460. ret = []
  1461. for x in range(0, 3):
  1462. ret.append(
  1463. self.run_state('file.replace',
  1464. name=path_test,
  1465. pattern='^# en_US.UTF-8$',
  1466. repl='en_US.UTF-8',
  1467. append_if_not_found=True,
  1468. not_found_content='THIS LINE WASN\'T FOUND! SO WE\'RE APPENDING IT HERE!'
  1469. ))
  1470. # ensure, the resulting file contains the expected lines
  1471. self.assertTrue(filecmp.cmp(path_test, path_out))
  1472. # ensure the initial file was properly backed up
  1473. self.assertTrue(filecmp.cmp(path_test + '.bak', path_in))
  1474. # ensure, all runs of 'file.replace' reported success
  1475. for item in ret:
  1476. self.assertSaltTrueReturn(item)
  1477. @with_tempdir()
  1478. def test_replace_issue_18612_change_mid_line_with_comment(self, base_dir):
  1479. '''
  1480. Test the (mis-)behaviour of file.replace as described in #18612:
  1481. Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
  1482. an infinitely growing file as 'file.replace' didn't check beforehand
  1483. whether the changes had already been done to the file
  1484. # Case description:
  1485. The tested file contains 5 key=value pairs
  1486. The commented key=value pair #foo=bar should be changed to foo=salt
  1487. The comment char (#) in front of foo=bar should be removed
  1488. '''
  1489. test_name = 'test_replace_issue_18612_change_mid_line_with_comment'
  1490. path_in = os.path.join(
  1491. RUNTIME_VARS.FILES, 'file.replace', '{0}.in'.format(test_name)
  1492. )
  1493. path_out = os.path.join(
  1494. RUNTIME_VARS.FILES, 'file.replace', '{0}.out'.format(test_name)
  1495. )
  1496. path_test = os.path.join(base_dir, test_name)
  1497. # create test file based on initial template
  1498. shutil.copyfile(path_in, path_test)
  1499. ret = []
  1500. for x in range(0, 3):
  1501. ret.append(self.run_state('file.replace',
  1502. name=path_test, pattern='^#foo=bar($|(?=\r\n))', repl='foo=salt', append_if_not_found=True))
  1503. # ensure, the resulting file contains the expected lines
  1504. self.assertTrue(filecmp.cmp(path_test, path_out))
  1505. # ensure the initial file was properly backed up
  1506. self.assertTrue(filecmp.cmp(path_test + '.bak', path_in))
  1507. # ensure, all 'file.replace' runs reported success
  1508. for item in ret:
  1509. self.assertSaltTrueReturn(item)
  1510. @with_tempdir()
  1511. def test_replace_issue_18841_no_changes(self, base_dir):
  1512. '''
  1513. Test the (mis-)behaviour of file.replace as described in #18841:
  1514. Using file.replace in a way which shouldn't modify the file at all
  1515. results in changed mtime of the original file and a backup file being created.
  1516. # Case description
  1517. The tested file contains multiple lines
  1518. The tested file contains a line already matching the replacement (no change needed)
  1519. The tested file's content shouldn't change at all
  1520. The tested file's mtime shouldn't change at all
  1521. No backup file should be created
  1522. '''
  1523. test_name = 'test_replace_issue_18841_no_changes'
  1524. path_in = os.path.join(
  1525. RUNTIME_VARS.FILES, 'file.replace', '{0}.in'.format(test_name)
  1526. )
  1527. path_test = os.path.join(base_dir, test_name)
  1528. # create test file based on initial template
  1529. shutil.copyfile(path_in, path_test)
  1530. # get (m|a)time of file
  1531. fstats_orig = os.stat(path_test)
  1532. # define how far we predate the file
  1533. age = 5*24*60*60
  1534. # set (m|a)time of file 5 days into the past
  1535. os.utime(path_test, (fstats_orig.st_mtime-age, fstats_orig.st_atime-age))
  1536. ret = self.run_state('file.replace',
  1537. name=path_test,
  1538. pattern='^hello world$',
  1539. repl='goodbye world',
  1540. show_changes=True,
  1541. flags=['IGNORECASE'],
  1542. backup=False
  1543. )
  1544. # get (m|a)time of file
  1545. fstats_post = os.stat(path_test)
  1546. # ensure, the file content didn't change
  1547. self.assertTrue(filecmp.cmp(path_in, path_test))
  1548. # ensure no backup file was created
  1549. self.assertFalse(os.path.exists(path_test + '.bak'))
  1550. # ensure the file's mtime didn't change
  1551. self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime-age)
  1552. # ensure, all 'file.replace' runs reported success
  1553. self.assertSaltTrueReturn(ret)
  1554. def test_serialize(self):
  1555. '''
  1556. Test to ensure that file.serialize returns a data structure that's
  1557. both serialized and formatted properly
  1558. '''
  1559. path_test = os.path.join(RUNTIME_VARS.TMP, 'test_serialize')
  1560. ret = self.run_state('file.serialize',
  1561. name=path_test,
  1562. dataset={'name': 'naive',
  1563. 'description': 'A basic test',
  1564. 'a_list': ['first_element', 'second_element'],
  1565. 'finally': 'the last item'},
  1566. formatter='json')
  1567. with salt.utils.files.fopen(path_test, 'rb') as fp_:
  1568. serialized_file = salt.utils.stringutils.to_unicode(fp_.read())
  1569. # The JSON serializer uses LF even on OSes where os.sep is CRLF.
  1570. expected_file = '\n'.join([
  1571. '{',
  1572. ' "a_list": [',
  1573. ' "first_element",',
  1574. ' "second_element"',
  1575. ' ],',
  1576. ' "description": "A basic test",',
  1577. ' "finally": "the last item",',
  1578. ' "name": "naive"',
  1579. '}',
  1580. '',
  1581. ])
  1582. self.assertEqual(serialized_file, expected_file)
  1583. @with_tempfile(create=False)
  1584. def test_serializer_deserializer_opts(self, name):
  1585. '''
  1586. Test the serializer_opts and deserializer_opts options
  1587. '''
  1588. data1 = {'foo': {'bar': '%(x)s'}}
  1589. data2 = {'foo': {'abc': 123}}
  1590. merged = {'foo': {'y': 'not_used', 'x': 'baz', 'abc': 123, 'bar': u'baz'}}
  1591. ret = self.run_state(
  1592. 'file.serialize',
  1593. name=name,
  1594. dataset=data1,
  1595. formatter='configparser',
  1596. deserializer_opts=[{'defaults': {'y': 'not_used'}}])
  1597. ret = ret[next(iter(ret))]
  1598. assert ret['result'], ret
  1599. # We should have warned about deserializer_opts being used when
  1600. # merge_if_exists was not set to True.
  1601. assert 'warnings' in ret
  1602. # Run with merge_if_exists, as well as serializer and deserializer opts
  1603. # deserializer opts will be used for string interpolation of the %(x)s
  1604. # that was written to the file with data1 (i.e. bar should become baz)
  1605. ret = self.run_state(
  1606. 'file.serialize',
  1607. name=name,
  1608. dataset=data2,
  1609. formatter='configparser',
  1610. merge_if_exists=True,
  1611. serializer_opts=[{'defaults': {'y': 'not_used'}}],
  1612. deserializer_opts=[{'defaults': {'x': 'baz'}}])
  1613. ret = ret[next(iter(ret))]
  1614. assert ret['result'], ret
  1615. with salt.utils.files.fopen(name) as fp_:
  1616. serialized_data = salt.serializers.configparser.deserialize(fp_)
  1617. # If this test fails, this debug logging will help tell us how the
  1618. # serialized data differs from what was serialized.
  1619. log.debug('serialized_data = %r', serialized_data)
  1620. log.debug('merged = %r', merged)
  1621. # serializing with a default of 'y' will add y = not_used into foo
  1622. assert serialized_data['foo']['y'] == merged['foo']['y']
  1623. # deserializing with default of x = baz will perform interpolation on %(x)s
  1624. # and bar will then = baz
  1625. assert serialized_data['foo']['bar'] == merged['foo']['bar']
  1626. @with_tempdir()
  1627. def test_replace_issue_18841_omit_backup(self, base_dir):
  1628. '''
  1629. Test the (mis-)behaviour of file.replace as described in #18841:
  1630. Using file.replace in a way which shouldn't modify the file at all
  1631. results in changed mtime of the original file and a backup file being created.
  1632. # Case description
  1633. The tested file contains multiple lines
  1634. The tested file contains a line already matching the replacement (no change needed)
  1635. The tested file's content shouldn't change at all
  1636. The tested file's mtime shouldn't change at all
  1637. No backup file should be created, although backup=False isn't explicitly defined
  1638. '''
  1639. test_name = 'test_replace_issue_18841_omit_backup'
  1640. path_in = os.path.join(
  1641. RUNTIME_VARS.FILES, 'file.replace', '{0}.in'.format(test_name)
  1642. )
  1643. path_test = os.path.join(base_dir, test_name)
  1644. # create test file based on initial template
  1645. shutil.copyfile(path_in, path_test)
  1646. # get (m|a)time of file
  1647. fstats_orig = os.stat(path_test)
  1648. # define how far we predate the file
  1649. age = 5*24*60*60
  1650. # set (m|a)time of file 5 days into the past
  1651. os.utime(path_test, (fstats_orig.st_mtime-age, fstats_orig.st_atime-age))
  1652. ret = self.run_state('file.replace',
  1653. name=path_test,
  1654. pattern='^hello world$',
  1655. repl='goodbye world',
  1656. show_changes=True,
  1657. flags=['IGNORECASE']
  1658. )
  1659. # get (m|a)time of file
  1660. fstats_post = os.stat(path_test)
  1661. # ensure, the file content didn't change
  1662. self.assertTrue(filecmp.cmp(path_in, path_test))
  1663. # ensure no backup file was created
  1664. self.assertFalse(os.path.exists(path_test + '.bak'))
  1665. # ensure the file's mtime didn't change
  1666. self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime-age)
  1667. # ensure, all 'file.replace' runs reported success
  1668. self.assertSaltTrueReturn(ret)
  1669. @with_tempfile()
  1670. def test_comment(self, name):
  1671. '''
  1672. file.comment
  1673. '''
  1674. # write a line to file
  1675. with salt.utils.files.fopen(name, 'w+') as fp_:
  1676. fp_.write('comment_me')
  1677. # Look for changes with test=True: return should be "None" at the first run
  1678. ret = self.run_state('file.comment', test=True, name=name, regex='^comment')
  1679. self.assertSaltNoneReturn(ret)
  1680. # comment once
  1681. ret = self.run_state('file.comment', name=name, regex='^comment')
  1682. # result is positive
  1683. self.assertSaltTrueReturn(ret)
  1684. # line is commented
  1685. with salt.utils.files.fopen(name, 'r') as fp_:
  1686. self.assertTrue(fp_.read().startswith('#comment'))
  1687. # comment twice
  1688. ret = self.run_state('file.comment', name=name, regex='^comment')
  1689. # result is still positive
  1690. self.assertSaltTrueReturn(ret)
  1691. # line is still commented
  1692. with salt.utils.files.fopen(name, 'r') as fp_:
  1693. self.assertTrue(fp_.read().startswith('#comment'))
  1694. # Test previously commented file returns "True" now and not "None" with test=True
  1695. ret = self.run_state('file.comment', test=True, name=name, regex='^comment')
  1696. self.assertSaltTrueReturn(ret)
  1697. @with_tempfile()
  1698. def test_test_comment(self, name):
  1699. '''
  1700. file.comment test interface
  1701. '''
  1702. with salt.utils.files.fopen(name, 'w+') as fp_:
  1703. fp_.write('comment_me')
  1704. ret = self.run_state(
  1705. 'file.comment', test=True, name=name, regex='.*comment.*',
  1706. )
  1707. with salt.utils.files.fopen(name, 'r') as fp_:
  1708. self.assertNotIn('#comment', fp_.read())
  1709. self.assertSaltNoneReturn(ret)
  1710. @with_tempfile()
  1711. def test_uncomment(self, name):
  1712. '''
  1713. file.uncomment
  1714. '''
  1715. with salt.utils.files.fopen(name, 'w+') as fp_:
  1716. fp_.write('#comment_me')
  1717. ret = self.run_state('file.uncomment', name=name, regex='^comment')
  1718. with salt.utils.files.fopen(name, 'r') as fp_:
  1719. self.assertNotIn('#comment', fp_.read())
  1720. self.assertSaltTrueReturn(ret)
  1721. @with_tempfile()
  1722. def test_test_uncomment(self, name):
  1723. '''
  1724. file.comment test interface
  1725. '''
  1726. with salt.utils.files.fopen(name, 'w+') as fp_:
  1727. fp_.write('#comment_me')
  1728. ret = self.run_state(
  1729. 'file.uncomment', test=True, name=name, regex='^comment.*'
  1730. )
  1731. with salt.utils.files.fopen(name, 'r') as fp_:
  1732. self.assertIn('#comment', fp_.read())
  1733. self.assertSaltNoneReturn(ret)
  1734. @with_tempfile()
  1735. def test_append(self, name):
  1736. '''
  1737. file.append
  1738. '''
  1739. with salt.utils.files.fopen(name, 'w+') as fp_:
  1740. fp_.write('#salty!')
  1741. ret = self.run_state('file.append', name=name, text='cheese')
  1742. with salt.utils.files.fopen(name, 'r') as fp_:
  1743. self.assertIn('cheese', fp_.read())
  1744. self.assertSaltTrueReturn(ret)
  1745. @with_tempfile()
  1746. def test_test_append(self, name):
  1747. '''
  1748. file.append test interface
  1749. '''
  1750. with salt.utils.files.fopen(name, 'w+') as fp_:
  1751. fp_.write('#salty!')
  1752. ret = self.run_state(
  1753. 'file.append', test=True, name=name, text='cheese'
  1754. )
  1755. with salt.utils.files.fopen(name, 'r') as fp_:
  1756. self.assertNotIn('cheese', fp_.read())
  1757. self.assertSaltNoneReturn(ret)
  1758. @with_tempdir()
  1759. def test_append_issue_1864_makedirs(self, base_dir):
  1760. '''
  1761. file.append but create directories if needed as an option, and create
  1762. the file if it doesn't exist
  1763. '''
  1764. fname = 'append_issue_1864_makedirs'
  1765. name = os.path.join(base_dir, fname)
  1766. # Non existing file get's touched
  1767. ret = self.run_state(
  1768. 'file.append', name=name, text='cheese', makedirs=True
  1769. )
  1770. self.assertSaltTrueReturn(ret)
  1771. # Nested directory and file get's touched
  1772. name = os.path.join(base_dir, 'issue_1864', fname)
  1773. ret = self.run_state(
  1774. 'file.append', name=name, text='cheese', makedirs=True
  1775. )
  1776. self.assertSaltTrueReturn(ret)
  1777. # Parent directory exists but file does not and makedirs is False
  1778. name = os.path.join(base_dir, 'issue_1864', fname + '2')
  1779. ret = self.run_state(
  1780. 'file.append', name=name, text='cheese'
  1781. )
  1782. self.assertSaltTrueReturn(ret)
  1783. self.assertTrue(os.path.isfile(name))
  1784. @with_tempdir()
  1785. def test_prepend_issue_27401_makedirs(self, base_dir):
  1786. '''
  1787. file.prepend but create directories if needed as an option, and create
  1788. the file if it doesn't exist
  1789. '''
  1790. fname = 'prepend_issue_27401'
  1791. name = os.path.join(base_dir, fname)
  1792. # Non existing file get's touched
  1793. ret = self.run_state(
  1794. 'file.prepend', name=name, text='cheese', makedirs=True
  1795. )
  1796. self.assertSaltTrueReturn(ret)
  1797. # Nested directory and file get's touched
  1798. name = os.path.join(base_dir, 'issue_27401', fname)
  1799. ret = self.run_state(
  1800. 'file.prepend', name=name, text='cheese', makedirs=True
  1801. )
  1802. self.assertSaltTrueReturn(ret)
  1803. # Parent directory exists but file does not and makedirs is False
  1804. name = os.path.join(base_dir, 'issue_27401', fname + '2')
  1805. ret = self.run_state(
  1806. 'file.prepend', name=name, text='cheese'
  1807. )
  1808. self.assertSaltTrueReturn(ret)
  1809. self.assertTrue(os.path.isfile(name))
  1810. @with_tempfile()
  1811. def test_touch(self, name):
  1812. '''
  1813. file.touch
  1814. '''
  1815. ret = self.run_state('file.touch', name=name)
  1816. self.assertTrue(os.path.isfile(name))
  1817. self.assertSaltTrueReturn(ret)
  1818. @with_tempfile(create=False)
  1819. def test_test_touch(self, name):
  1820. '''
  1821. file.touch test interface
  1822. '''
  1823. ret = self.run_state('file.touch', test=True, name=name)
  1824. self.assertFalse(os.path.isfile(name))
  1825. self.assertSaltNoneReturn(ret)
  1826. @with_tempdir()
  1827. def test_touch_directory(self, base_dir):
  1828. '''
  1829. file.touch a directory
  1830. '''
  1831. name = os.path.join(base_dir, 'touch_test_dir')
  1832. os.mkdir(name)
  1833. ret = self.run_state('file.touch', name=name)
  1834. self.assertSaltTrueReturn(ret)
  1835. self.assertTrue(os.path.isdir(name))
  1836. @with_tempdir()
  1837. def test_issue_2227_file_append(self, base_dir):
  1838. '''
  1839. Text to append includes a percent symbol
  1840. '''
  1841. # let's make use of existing state to create a file with contents to
  1842. # test against
  1843. tmp_file_append = os.path.join(base_dir, 'test.append')
  1844. self.run_state('file.touch', name=tmp_file_append)
  1845. self.run_state(
  1846. 'file.append',
  1847. name=tmp_file_append,
  1848. source='salt://testappend/firstif')
  1849. self.run_state(
  1850. 'file.append',
  1851. name=tmp_file_append,
  1852. source='salt://testappend/secondif')
  1853. # Now our real test
  1854. try:
  1855. ret = self.run_state(
  1856. 'file.append',
  1857. name=tmp_file_append,
  1858. text="HISTTIMEFORMAT='%F %T '")
  1859. self.assertSaltTrueReturn(ret)
  1860. with salt.utils.files.fopen(tmp_file_append, 'r') as fp_:
  1861. contents_pre = fp_.read()
  1862. # It should not append text again
  1863. ret = self.run_state(
  1864. 'file.append',
  1865. name=tmp_file_append,
  1866. text="HISTTIMEFORMAT='%F %T '")
  1867. self.assertSaltTrueReturn(ret)
  1868. with salt.utils.files.fopen(tmp_file_append, 'r') as fp_:
  1869. contents_post = fp_.read()
  1870. self.assertEqual(contents_pre, contents_post)
  1871. except AssertionError:
  1872. if os.path.exists(tmp_file_append):
  1873. shutil.copy(tmp_file_append, tmp_file_append + '.bak')
  1874. raise
  1875. @with_tempdir()
  1876. def test_issue_2401_file_comment(self, base_dir):
  1877. # Get a path to the temporary file
  1878. tmp_file = os.path.join(base_dir, 'issue-2041-comment.txt')
  1879. # Write some data to it
  1880. with salt.utils.files.fopen(tmp_file, 'w') as fp_:
  1881. fp_.write('hello\nworld\n')
  1882. # create the sls template
  1883. template_lines = [
  1884. '{0}:'.format(tmp_file),
  1885. ' file.comment:',
  1886. ' - regex: ^world'
  1887. ]
  1888. template = '\n'.join(template_lines)
  1889. try:
  1890. ret = self.run_function(
  1891. 'state.template_str', [template], timeout=120
  1892. )
  1893. self.assertSaltTrueReturn(ret)
  1894. self.assertNotInSaltComment('Pattern already commented', ret)
  1895. self.assertInSaltComment('Commented lines successfully', ret)
  1896. # This next time, it is already commented.
  1897. ret = self.run_function(
  1898. 'state.template_str', [template], timeout=120
  1899. )
  1900. self.assertSaltTrueReturn(ret)
  1901. self.assertInSaltComment('Pattern already commented', ret)
  1902. except AssertionError:
  1903. shutil.copy(tmp_file, tmp_file + '.bak')
  1904. raise
  1905. @with_tempdir()
  1906. def test_issue_2379_file_append(self, base_dir):
  1907. # Get a path to the temporary file
  1908. tmp_file = os.path.join(base_dir, 'issue-2379-file-append.txt')
  1909. # Write some data to it
  1910. with salt.utils.files.fopen(tmp_file, 'w') as fp_:
  1911. fp_.write(
  1912. 'hello\nworld\n' # Some junk
  1913. '#PermitRootLogin yes\n' # Commented text
  1914. '# PermitRootLogin yes\n' # Commented text with space
  1915. )
  1916. # create the sls template
  1917. template_lines = [
  1918. '{0}:'.format(tmp_file),
  1919. ' file.append:',
  1920. ' - text: PermitRootLogin yes'
  1921. ]
  1922. template = '\n'.join(template_lines)
  1923. try:
  1924. ret = self.run_function('state.template_str', [template])
  1925. self.assertSaltTrueReturn(ret)
  1926. self.assertInSaltComment('Appended 1 lines', ret)
  1927. except AssertionError:
  1928. shutil.copy(tmp_file, tmp_file + '.bak')
  1929. raise
  1930. @skipIf(IS_WINDOWS, 'Mode not available in Windows')
  1931. @with_tempdir(create=False)
  1932. @with_tempdir(create=False)
  1933. def test_issue_2726_mode_kwarg(self, dir1, dir2):
  1934. # Let's test for the wrong usage approach
  1935. bad_mode_kwarg_testfile = os.path.join(
  1936. dir1, 'bad_mode_kwarg', 'testfile'
  1937. )
  1938. bad_template = [
  1939. '{0}:'.format(bad_mode_kwarg_testfile),
  1940. ' file.recurse:',
  1941. ' - source: salt://testfile',
  1942. ' - mode: 644'
  1943. ]
  1944. ret = self.run_function(
  1945. 'state.template_str', [os.linesep.join(bad_template)]
  1946. )
  1947. self.assertSaltFalseReturn(ret)
  1948. self.assertInSaltComment(
  1949. '\'mode\' is not allowed in \'file.recurse\'. Please use '
  1950. '\'file_mode\' and \'dir_mode\'.',
  1951. ret
  1952. )
  1953. self.assertNotInSaltComment(
  1954. 'TypeError: managed() got multiple values for keyword '
  1955. 'argument \'mode\'',
  1956. ret
  1957. )
  1958. # Now, the correct usage approach
  1959. good_mode_kwargs_testfile = os.path.join(
  1960. dir2, 'good_mode_kwargs', 'testappend'
  1961. )
  1962. good_template = [
  1963. '{0}:'.format(good_mode_kwargs_testfile),
  1964. ' file.recurse:',
  1965. ' - source: salt://testappend',
  1966. ' - dir_mode: 744',
  1967. ' - file_mode: 644',
  1968. ]
  1969. ret = self.run_function(
  1970. 'state.template_str', [os.linesep.join(good_template)]
  1971. )
  1972. self.assertSaltTrueReturn(ret)
  1973. @with_tempdir()
  1974. def test_issue_8343_accumulated_require_in(self, base_dir):
  1975. template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, 'issue-8343.sls')
  1976. testcase_filedest = os.path.join(base_dir, 'issue-8343.txt')
  1977. if os.path.exists(template_path):
  1978. os.remove(template_path)
  1979. if os.path.exists(testcase_filedest):
  1980. os.remove(testcase_filedest)
  1981. sls_template = [
  1982. '{0}:',
  1983. ' file.managed:',
  1984. ' - contents: |',
  1985. ' #',
  1986. '',
  1987. 'prepend-foo-accumulator-from-pillar:',
  1988. ' file.accumulated:',
  1989. ' - require_in:',
  1990. ' - file: prepend-foo-management',
  1991. ' - filename: {0}',
  1992. ' - text: |',
  1993. ' foo',
  1994. '',
  1995. 'append-foo-accumulator-from-pillar:',
  1996. ' file.accumulated:',
  1997. ' - require_in:',
  1998. ' - file: append-foo-management',
  1999. ' - filename: {0}',
  2000. ' - text: |',
  2001. ' bar',
  2002. '',
  2003. 'prepend-foo-management:',
  2004. ' file.blockreplace:',
  2005. ' - name: {0}',
  2006. ' - marker_start: "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT"',
  2007. ' - marker_end: "#-- end salt managed zonestart --"',
  2008. " - content: ''",
  2009. ' - prepend_if_not_found: True',
  2010. " - backup: '.bak'",
  2011. ' - show_changes: True',
  2012. '',
  2013. 'append-foo-management:',
  2014. ' file.blockreplace:',
  2015. ' - name: {0}',
  2016. ' - marker_start: "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT"',
  2017. ' - marker_end: "#-- end salt managed zoneend --"',
  2018. " - content: ''",
  2019. ' - append_if_not_found: True',
  2020. " - backup: '.bak2'",
  2021. ' - show_changes: True',
  2022. '']
  2023. with salt.utils.files.fopen(template_path, 'w') as fp_:
  2024. fp_.write(
  2025. os.linesep.join(sls_template).format(testcase_filedest))
  2026. ret = self.run_function('state.sls', mods='issue-8343')
  2027. for name, step in six.iteritems(ret):
  2028. self.assertSaltTrueReturn({name: step})
  2029. with salt.utils.files.fopen(testcase_filedest) as fp_:
  2030. contents = fp_.read().split(os.linesep)
  2031. expected = [
  2032. '#-- start salt managed zonestart -- PLEASE, DO NOT EDIT',
  2033. 'foo',
  2034. '#-- end salt managed zonestart --',
  2035. '#',
  2036. '#-- start salt managed zoneend -- PLEASE, DO NOT EDIT',
  2037. 'bar',
  2038. '#-- end salt managed zoneend --',
  2039. '']
  2040. self.assertEqual([salt.utils.stringutils.to_str(line) for line in expected], contents)
  2041. @with_tempdir()
  2042. @skipIf(salt.utils.platform.is_darwin() and six.PY2, 'This test hangs on OS X on Py2')
  2043. def test_issue_11003_immutable_lazy_proxy_sum(self, base_dir):
  2044. # causes the Import-Module ServerManager error on Windows
  2045. template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, 'issue-11003.sls')
  2046. testcase_filedest = os.path.join(base_dir, 'issue-11003.txt')
  2047. sls_template = [
  2048. 'a{0}:',
  2049. ' file.absent:',
  2050. ' - name: {0}',
  2051. '',
  2052. '{0}:',
  2053. ' file.managed:',
  2054. ' - contents: |',
  2055. ' #',
  2056. '',
  2057. 'test-acc1:',
  2058. ' file.accumulated:',
  2059. ' - require_in:',
  2060. ' - file: final',
  2061. ' - filename: {0}',
  2062. ' - text: |',
  2063. ' bar',
  2064. '',
  2065. 'test-acc2:',
  2066. ' file.accumulated:',
  2067. ' - watch_in:',
  2068. ' - file: final',
  2069. ' - filename: {0}',
  2070. ' - text: |',
  2071. ' baz',
  2072. '',
  2073. 'final:',
  2074. ' file.blockreplace:',
  2075. ' - name: {0}',
  2076. ' - marker_start: "#-- start managed zone PLEASE, DO NOT EDIT"',
  2077. ' - marker_end: "#-- end managed zone"',
  2078. ' - content: \'\'',
  2079. ' - append_if_not_found: True',
  2080. ' - show_changes: True'
  2081. ]
  2082. with salt.utils.files.fopen(template_path, 'w') as fp_:
  2083. fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
  2084. ret = self.run_function('state.sls', mods='issue-11003', timeout=600)
  2085. for name, step in six.iteritems(ret):
  2086. self.assertSaltTrueReturn({name: step})
  2087. with salt.utils.files.fopen(testcase_filedest) as fp_:
  2088. contents = fp_.read().split(os.linesep)
  2089. begin = contents.index(
  2090. '#-- start managed zone PLEASE, DO NOT EDIT') + 1
  2091. end = contents.index('#-- end managed zone')
  2092. block_contents = contents[begin:end]
  2093. for item in ('', 'bar', 'baz'):
  2094. block_contents.remove(item)
  2095. self.assertEqual(block_contents, [])
  2096. @with_tempdir()
  2097. def test_issue_8947_utf8_sls(self, base_dir):
  2098. '''
  2099. Test some file operation with utf-8 characters on the sls
  2100. This is more generic than just a file test. Feel free to move
  2101. '''
  2102. self.maxDiff = None
  2103. korean_1 = '한국어 시험'
  2104. korean_2 = '첫 번째 행'
  2105. korean_3 = '마지막 행'
  2106. test_file = os.path.join(base_dir, '{0}.txt'.format(korean_1))
  2107. test_file_encoded = test_file
  2108. template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, 'issue-8947.sls')
  2109. # create the sls template
  2110. template = textwrap.dedent('''\
  2111. some-utf8-file-create:
  2112. file.managed:
  2113. - name: {test_file}
  2114. - contents: {korean_1}
  2115. - makedirs: True
  2116. - replace: True
  2117. - show_diff: True
  2118. some-utf8-file-create2:
  2119. file.managed:
  2120. - name: {test_file}
  2121. - contents: |
  2122. {korean_2}
  2123. {korean_1}
  2124. {korean_3}
  2125. - replace: True
  2126. - show_diff: True
  2127. '''.format(**locals()))
  2128. if not salt.utils.platform.is_windows():
  2129. template += textwrap.dedent('''\
  2130. some-utf8-file-content-test:
  2131. cmd.run:
  2132. - name: 'cat "{test_file}"'
  2133. - require:
  2134. - file: some-utf8-file-create2
  2135. '''.format(**locals()))
  2136. # Save template file
  2137. with salt.utils.files.fopen(template_path, 'wb') as fp_:
  2138. fp_.write(salt.utils.stringutils.to_bytes(template))
  2139. try:
  2140. result = self.run_function('state.sls', mods='issue-8947')
  2141. if not isinstance(result, dict):
  2142. raise AssertionError(
  2143. ('Something went really wrong while testing this sls:'
  2144. ' {0}').format(repr(result))
  2145. )
  2146. # difflib produces different output on python 2.6 than on >=2.7
  2147. if sys.version_info < (2, 7):
  2148. diff = '--- \n+++ \n@@ -1,1 +1,3 @@\n'
  2149. else:
  2150. diff = '--- \n+++ \n@@ -1 +1,3 @@\n'
  2151. diff += (
  2152. '+첫 번째 행{0}'
  2153. ' 한국어 시험{0}'
  2154. '+마지막 행{0}'
  2155. ).format(os.linesep)
  2156. ret = {x.split('_|-')[1]: y for x, y in six.iteritems(result)}
  2157. # Confirm initial creation of file
  2158. self.assertEqual(
  2159. ret['some-utf8-file-create']['comment'],
  2160. 'File {0} updated'.format(test_file_encoded)
  2161. )
  2162. self.assertEqual(
  2163. ret['some-utf8-file-create']['changes'],
  2164. {'diff': 'New file'}
  2165. )
  2166. # Confirm file was modified and that the diff was as expected
  2167. self.assertEqual(
  2168. ret['some-utf8-file-create2']['comment'],
  2169. 'File {0} updated'.format(test_file_encoded)
  2170. )
  2171. self.assertEqual(
  2172. ret['some-utf8-file-create2']['changes'],
  2173. {'diff': diff}
  2174. )
  2175. if salt.utils.platform.is_windows():
  2176. import subprocess
  2177. import win32api
  2178. p = subprocess.Popen(
  2179. salt.utils.stringutils.to_str(
  2180. 'type {}'.format(win32api.GetShortPathName(test_file))),
  2181. shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  2182. p.poll()
  2183. out = p.stdout.read()
  2184. self.assertEqual(
  2185. out.decode('utf-8'),
  2186. os.linesep.join((korean_2, korean_1, korean_3)) + os.linesep
  2187. )
  2188. else:
  2189. self.assertEqual(
  2190. ret['some-utf8-file-content-test']['comment'],
  2191. 'Command "cat "{0}"" run'.format(
  2192. test_file_encoded
  2193. )
  2194. )
  2195. self.assertEqual(
  2196. ret['some-utf8-file-content-test']['changes']['stdout'],
  2197. '\n'.join((korean_2, korean_1, korean_3))
  2198. )
  2199. finally:
  2200. try:
  2201. os.remove(template_path)
  2202. except OSError:
  2203. pass
  2204. @pytest.mark.skip_if_not_root
  2205. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2206. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2207. @with_system_user_and_group(TEST_SYSTEM_USER, TEST_SYSTEM_GROUP,
  2208. on_existing='delete', delete=True)
  2209. @with_tempdir()
  2210. def test_issue_12209_follow_symlinks(self, tempdir, user, group):
  2211. '''
  2212. Ensure that symlinks are properly chowned when recursing (following
  2213. symlinks)
  2214. '''
  2215. # Make the directories for this test
  2216. onedir = os.path.join(tempdir, 'one')
  2217. twodir = os.path.join(tempdir, 'two')
  2218. os.mkdir(onedir)
  2219. os.symlink(onedir, twodir)
  2220. # Run the state
  2221. ret = self.run_state(
  2222. 'file.directory', name=tempdir, follow_symlinks=True,
  2223. user=user, group=group, recurse=['user', 'group']
  2224. )
  2225. self.assertSaltTrueReturn(ret)
  2226. # Double-check, in case state mis-reported a True result. Since we are
  2227. # following symlinks, we expect twodir to still be owned by root, but
  2228. # onedir should be owned by the 'issue12209' user.
  2229. onestats = os.stat(onedir)
  2230. twostats = os.lstat(twodir)
  2231. self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
  2232. self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, 'root')
  2233. self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
  2234. if salt.utils.path.which('id'):
  2235. root_group = self.run_function('user.primary_group', ['root'])
  2236. self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, root_group)
  2237. @pytest.mark.skip_if_not_root
  2238. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2239. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2240. @with_system_user_and_group(TEST_SYSTEM_USER, TEST_SYSTEM_GROUP,
  2241. on_existing='delete', delete=True)
  2242. @with_tempdir()
  2243. def test_issue_12209_no_follow_symlinks(self, tempdir, user, group):
  2244. '''
  2245. Ensure that symlinks are properly chowned when recursing (not following
  2246. symlinks)
  2247. '''
  2248. # Make the directories for this test
  2249. onedir = os.path.join(tempdir, 'one')
  2250. twodir = os.path.join(tempdir, 'two')
  2251. os.mkdir(onedir)
  2252. os.symlink(onedir, twodir)
  2253. # Run the state
  2254. ret = self.run_state(
  2255. 'file.directory', name=tempdir, follow_symlinks=False,
  2256. user=user, group=group, recurse=['user', 'group']
  2257. )
  2258. self.assertSaltTrueReturn(ret)
  2259. # Double-check, in case state mis-reported a True result. Since we
  2260. # are not following symlinks, we expect twodir to now be owned by
  2261. # the 'issue12209' user, just link onedir.
  2262. onestats = os.stat(onedir)
  2263. twostats = os.lstat(twodir)
  2264. self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
  2265. self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, user)
  2266. self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
  2267. self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, group)
  2268. @with_tempfile(create=False)
  2269. @with_tempfile()
  2270. def test_template_local_file(self, source, dest):
  2271. '''
  2272. Test a file.managed state with a local file as the source. Test both
  2273. with the file:// protocol designation prepended, and without it.
  2274. '''
  2275. with salt.utils.files.fopen(source, 'w') as fp_:
  2276. fp_.write('{{ foo }}\n')
  2277. for prefix in ('file://', ''):
  2278. ret = self.run_state(
  2279. 'file.managed',
  2280. name=dest,
  2281. source=prefix + source,
  2282. template='jinja',
  2283. context={'foo': 'Hello world!'}
  2284. )
  2285. self.assertSaltTrueReturn(ret)
  2286. @with_tempfile()
  2287. def test_template_local_file_noclobber(self, source):
  2288. '''
  2289. Test the case where a source file is in the minion's local filesystem,
  2290. and the source path is the same as the destination path.
  2291. '''
  2292. with salt.utils.files.fopen(source, 'w') as fp_:
  2293. fp_.write('{{ foo }}\n')
  2294. ret = self.run_state(
  2295. 'file.managed',
  2296. name=source,
  2297. source=source,
  2298. template='jinja',
  2299. context={'foo': 'Hello world!'}
  2300. )
  2301. self.assertSaltFalseReturn(ret)
  2302. self.assertIn(
  2303. ('Source file cannot be the same as destination'),
  2304. ret[next(iter(ret))]['comment'],
  2305. )
  2306. @with_tempfile(create=False)
  2307. @with_tempfile(create=False)
  2308. def test_issue_25250_force_copy_deletes(self, source, dest):
  2309. '''
  2310. ensure force option in copy state does not delete target file
  2311. '''
  2312. shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, 'hosts'), source)
  2313. shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, 'file/base/cheese'), dest)
  2314. self.run_state('file.copy', name=dest, source=source, force=True)
  2315. self.assertTrue(os.path.exists(dest))
  2316. self.assertTrue(filecmp.cmp(source, dest))
  2317. os.remove(source)
  2318. os.remove(dest)
  2319. @skipIf(IS_WINDOWS, 'Windows does not report any file modes. Skipping.')
  2320. @pytest.mark.destructive_test
  2321. @with_tempfile()
  2322. def test_file_copy_make_dirs(self, source):
  2323. '''
  2324. ensure make_dirs creates correct user perms
  2325. '''
  2326. shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, 'hosts'), source)
  2327. dest = os.path.join(RUNTIME_VARS.TMP, 'dir1', 'dir2', 'copied_file.txt')
  2328. user = 'salt'
  2329. mode = '0644'
  2330. ret = self.run_function('user.add', [user])
  2331. self.assertTrue(ret, 'Failed to add user. Are you running as sudo?')
  2332. ret = self.run_state('file.copy', name=dest, source=source, user=user,
  2333. makedirs=True, mode=mode)
  2334. self.assertSaltTrueReturn(ret)
  2335. file_checks = [dest, os.path.join(RUNTIME_VARS.TMP, 'dir1'), os.path.join(RUNTIME_VARS.TMP, 'dir1', 'dir2')]
  2336. for check in file_checks:
  2337. user_check = self.run_function('file.get_user', [check])
  2338. mode_check = self.run_function('file.get_mode', [check])
  2339. self.assertEqual(user_check, user)
  2340. self.assertEqual(salt.utils.files.normalize_mode(mode_check), mode)
  2341. def test_contents_pillar_with_pillar_list(self):
  2342. '''
  2343. This tests for any regressions for this issue:
  2344. https://github.com/saltstack/salt/issues/30934
  2345. '''
  2346. state_file = 'file_contents_pillar'
  2347. ret = self.run_function('state.sls', mods=state_file)
  2348. self.assertSaltTrueReturn(ret)
  2349. @pytest.mark.skip_if_not_root
  2350. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2351. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2352. @with_system_user_and_group(TEST_SYSTEM_USER, TEST_SYSTEM_GROUP,
  2353. on_existing='delete', delete=True)
  2354. def test_owner_after_setuid(self, user, group):
  2355. '''
  2356. Test to check file user/group after setting setuid or setgid.
  2357. Because Python os.chown() does reset the setuid/setgid to 0.
  2358. https://github.com/saltstack/salt/pull/45257
  2359. '''
  2360. # Desired configuration.
  2361. desired = {
  2362. 'file': os.path.join(RUNTIME_VARS.TMP, 'file_with_setuid'),
  2363. 'user': user,
  2364. 'group': group,
  2365. 'mode': '4750'
  2366. }
  2367. # Run the state.
  2368. ret = self.run_state(
  2369. 'file.managed', name=desired['file'],
  2370. user=desired['user'], group=desired['group'], mode=desired['mode']
  2371. )
  2372. # Check result.
  2373. file_stat = os.stat(desired['file'])
  2374. result = {
  2375. 'user': pwd.getpwuid(file_stat.st_uid).pw_name,
  2376. 'group': grp.getgrgid(file_stat.st_gid).gr_name,
  2377. 'mode': oct(stat.S_IMODE(file_stat.st_mode))
  2378. }
  2379. self.assertSaltTrueReturn(ret)
  2380. self.assertEqual(desired['user'], result['user'])
  2381. self.assertEqual(desired['group'], result['group'])
  2382. self.assertEqual(desired['mode'], result['mode'].lstrip('0Oo'))
  2383. def test_binary_contents(self):
  2384. '''
  2385. This tests to ensure that binary contents do not cause a traceback.
  2386. '''
  2387. name = os.path.join(RUNTIME_VARS.TMP, '1px.gif')
  2388. try:
  2389. ret = self.run_state(
  2390. 'file.managed',
  2391. name=name,
  2392. contents=BINARY_FILE)
  2393. self.assertSaltTrueReturn(ret)
  2394. finally:
  2395. try:
  2396. os.remove(name)
  2397. except OSError:
  2398. pass
  2399. def test_binary_contents_twice(self):
  2400. '''
  2401. This test ensures that after a binary file is created, salt can confirm
  2402. that the file is in the correct state.
  2403. '''
  2404. # Create a binary file
  2405. name = os.path.join(RUNTIME_VARS.TMP, '1px.gif')
  2406. # First run state ensures file is created
  2407. ret = self.run_state('file.managed', name=name, contents=BINARY_FILE)
  2408. self.assertSaltTrueReturn(ret)
  2409. # Second run of state ensures file is in correct state
  2410. ret = self.run_state('file.managed', name=name, contents=BINARY_FILE)
  2411. self.assertSaltTrueReturn(ret)
  2412. try:
  2413. os.remove(name)
  2414. except OSError:
  2415. pass
  2416. @pytest.mark.skip_if_not_root
  2417. @skipIf(not HAS_PWD, "pwd not available. Skipping test")
  2418. @skipIf(not HAS_GRP, "grp not available. Skipping test")
  2419. @with_system_user_and_group(TEST_SYSTEM_USER, TEST_SYSTEM_GROUP,
  2420. on_existing='delete', delete=True)
  2421. @with_tempdir()
  2422. def test_issue_48336_file_managed_mode_setuid(self, tempdir, user, group):
  2423. '''
  2424. Ensure that mode is correct with changing of ownership and group
  2425. symlinks)
  2426. '''
  2427. tempfile = os.path.join(tempdir, 'temp_file_issue_48336')
  2428. # Run the state
  2429. ret = self.run_state(
  2430. 'file.managed', name=tempfile,
  2431. user=user, group=group, mode='4750',
  2432. )
  2433. self.assertSaltTrueReturn(ret)
  2434. # Check that the owner and group are correct, and
  2435. # the mode is what we expect
  2436. temp_file_stats = os.stat(tempfile)
  2437. # Normalize the mode
  2438. temp_file_mode = six.text_type(oct(stat.S_IMODE(temp_file_stats.st_mode)))
  2439. temp_file_mode = salt.utils.files.normalize_mode(temp_file_mode)
  2440. self.assertEqual(temp_file_mode, '4750')
  2441. self.assertEqual(pwd.getpwuid(temp_file_stats.st_uid).pw_name, user)
  2442. self.assertEqual(grp.getgrgid(temp_file_stats.st_gid).gr_name, group)
  2443. @with_tempdir()
  2444. def test_issue_48557(self, tempdir):
  2445. tempfile = os.path.join(tempdir, 'temp_file_issue_48557')
  2446. with salt.utils.files.fopen(tempfile, 'wb') as fp:
  2447. fp.write(os.linesep.join([
  2448. 'test1',
  2449. 'test2',
  2450. 'test3',
  2451. '',
  2452. ]).encode('utf-8'))
  2453. ret = self.run_state('file.line',
  2454. name=tempfile,
  2455. after='test2',
  2456. mode='insert',
  2457. content='test4')
  2458. self.assertSaltTrueReturn(ret)
  2459. with salt.utils.files.fopen(tempfile, 'rb') as fp:
  2460. content = fp.read()
  2461. self.assertEqual(content, os.linesep.join([
  2462. 'test1',
  2463. 'test2',
  2464. 'test4',
  2465. 'test3',
  2466. '',
  2467. ]).encode('utf-8'))
  2468. @with_tempfile()
  2469. def test_issue_50221(self, name):
  2470. expected = 'abc{0}{0}{0}'.format(os.linesep)
  2471. ret = self.run_function(
  2472. 'pillar.get',
  2473. ['issue-50221']
  2474. )
  2475. assert ret == expected
  2476. ret = self.run_function(
  2477. 'state.apply',
  2478. ['issue-50221'],
  2479. pillar={
  2480. 'name': name
  2481. },
  2482. )
  2483. self.assertSaltTrueReturn(ret)
  2484. with salt.utils.files.fopen(name, 'r') as fp:
  2485. contents = fp.read()
  2486. assert contents == expected
  2487. def test_managed_file_issue_51208(self):
  2488. '''
  2489. Test to ensure we can handle a file with escaped double-quotes
  2490. '''
  2491. name = os.path.join(RUNTIME_VARS.TMP, 'issue_51208.txt')
  2492. ret = self.run_state(
  2493. 'file.managed', name=name, source='salt://issue-51208/vimrc.stub'
  2494. )
  2495. src = os.path.join(RUNTIME_VARS.BASE_FILES, 'issue-51208', 'vimrc.stub')
  2496. with salt.utils.files.fopen(src, 'r') as fp_:
  2497. master_data = fp_.read()
  2498. with salt.utils.files.fopen(name, 'r') as fp_:
  2499. minion_data = fp_.read()
  2500. self.assertEqual(master_data, minion_data)
  2501. self.assertSaltTrueReturn(ret)
  2502. @pytest.mark.windows_whitelisted
  2503. class BlockreplaceTest(ModuleCase, SaltReturnAssertsMixin):
  2504. marker_start = '# start'
  2505. marker_end = '# end'
  2506. content = dedent(six.text_type('''\
  2507. Line 1 of block
  2508. Line 2 of block
  2509. '''))
  2510. without_block = dedent(six.text_type('''\
  2511. Hello world!
  2512. # comment here
  2513. '''))
  2514. with_non_matching_block = dedent(six.text_type('''\
  2515. Hello world!
  2516. # start
  2517. No match here
  2518. # end
  2519. # comment here
  2520. '''))
  2521. with_non_matching_block_and_marker_end_not_after_newline = dedent(six.text_type('''\
  2522. Hello world!
  2523. # start
  2524. No match here# end
  2525. # comment here
  2526. '''))
  2527. with_matching_block = dedent(six.text_type('''\
  2528. Hello world!
  2529. # start
  2530. Line 1 of block
  2531. Line 2 of block
  2532. # end
  2533. # comment here
  2534. '''))
  2535. with_matching_block_and_extra_newline = dedent(six.text_type('''\
  2536. Hello world!
  2537. # start
  2538. Line 1 of block
  2539. Line 2 of block
  2540. # end
  2541. # comment here
  2542. '''))
  2543. with_matching_block_and_marker_end_not_after_newline = dedent(six.text_type('''\
  2544. Hello world!
  2545. # start
  2546. Line 1 of block
  2547. Line 2 of block# end
  2548. # comment here
  2549. '''))
  2550. content_explicit_posix_newlines = ('Line 1 of block\n'
  2551. 'Line 2 of block\n')
  2552. content_explicit_windows_newlines = ('Line 1 of block\r\n'
  2553. 'Line 2 of block\r\n')
  2554. without_block_explicit_posix_newlines = ('Hello world!\n\n'
  2555. '# comment here\n')
  2556. without_block_explicit_windows_newlines = ('Hello world!\r\n\r\n'
  2557. '# comment here\r\n')
  2558. with_block_prepended_explicit_posix_newlines = ('# start\n'
  2559. 'Line 1 of block\n'
  2560. 'Line 2 of block\n'
  2561. '# end\n'
  2562. 'Hello world!\n\n'
  2563. '# comment here\n')
  2564. with_block_prepended_explicit_windows_newlines = ('# start\r\n'
  2565. 'Line 1 of block\r\n'
  2566. 'Line 2 of block\r\n'
  2567. '# end\r\n'
  2568. 'Hello world!\r\n\r\n'
  2569. '# comment here\r\n')
  2570. with_block_appended_explicit_posix_newlines = ('Hello world!\n\n'
  2571. '# comment here\n'
  2572. '# start\n'
  2573. 'Line 1 of block\n'
  2574. 'Line 2 of block\n'
  2575. '# end\n')
  2576. with_block_appended_explicit_windows_newlines = ('Hello world!\r\n\r\n'
  2577. '# comment here\r\n'
  2578. '# start\r\n'
  2579. 'Line 1 of block\r\n'
  2580. 'Line 2 of block\r\n'
  2581. '# end\r\n')
  2582. @staticmethod
  2583. def _write(dest, content):
  2584. with salt.utils.files.fopen(dest, 'wb') as fp_:
  2585. fp_.write(salt.utils.stringutils.to_bytes(content))
  2586. @staticmethod
  2587. def _read(src):
  2588. with salt.utils.files.fopen(src, 'rb') as fp_:
  2589. return salt.utils.stringutils.to_unicode(fp_.read())
  2590. @with_tempfile()
  2591. def test_prepend(self, name):
  2592. '''
  2593. Test blockreplace when prepend_if_not_found=True and block doesn't
  2594. exist in file.
  2595. '''
  2596. expected = self.marker_start + os.linesep + self.content + \
  2597. self.marker_end + os.linesep + self.without_block
  2598. # Pass 1: content ends in newline
  2599. self._write(name, self.without_block)
  2600. ret = self.run_state('file.blockreplace',
  2601. name=name,
  2602. content=self.content,
  2603. marker_start=self.marker_start,
  2604. marker_end=self.marker_end,
  2605. prepend_if_not_found=True)
  2606. self.assertSaltTrueReturn(ret)
  2607. self.assertTrue(ret[next(iter(ret))]['changes'])
  2608. self.assertEqual(self._read(name), expected)
  2609. # Pass 1a: Re-run state, no changes should be made
  2610. ret = self.run_state('file.blockreplace',
  2611. name=name,
  2612. content=self.content,
  2613. marker_start=self.marker_start,
  2614. marker_end=self.marker_end,
  2615. prepend_if_not_found=True)
  2616. self.assertSaltTrueReturn(ret)
  2617. self.assertFalse(ret[next(iter(ret))]['changes'])
  2618. self.assertEqual(self._read(name), expected)
  2619. # Pass 2: content does not end in newline
  2620. self._write(name, self.without_block)
  2621. ret = self.run_state('file.blockreplace',
  2622. name=name,
  2623. content=self.content.rstrip('\r\n'),
  2624. marker_start=self.marker_start,
  2625. marker_end=self.marker_end,
  2626. prepend_if_not_found=True)
  2627. self.assertSaltTrueReturn(ret)
  2628. self.assertTrue(ret[next(iter(ret))]['changes'])
  2629. self.assertEqual(self._read(name), expected)
  2630. # Pass 2a: Re-run state, no changes should be made
  2631. ret = self.run_state('file.blockreplace',
  2632. name=name,
  2633. content=self.content.rstrip('\r\n'),
  2634. marker_start=self.marker_start,
  2635. marker_end=self.marker_end,
  2636. prepend_if_not_found=True)
  2637. self.assertSaltTrueReturn(ret)
  2638. self.assertFalse(ret[next(iter(ret))]['changes'])
  2639. self.assertEqual(self._read(name), expected)
  2640. @with_tempfile()
  2641. def test_prepend_append_newline(self, name):
  2642. '''
  2643. Test blockreplace when prepend_if_not_found=True and block doesn't
  2644. exist in file. Test with append_newline explicitly set to True.
  2645. '''
  2646. # Pass 1: content ends in newline
  2647. expected = self.marker_start + os.linesep + self.content + \
  2648. os.linesep + self.marker_end + os.linesep + self.without_block
  2649. self._write(name, self.without_block)
  2650. ret = self.run_state('file.blockreplace',
  2651. name=name,
  2652. content=self.content,
  2653. marker_start=self.marker_start,
  2654. marker_end=self.marker_end,
  2655. prepend_if_not_found=True,
  2656. append_newline=True)
  2657. self.assertSaltTrueReturn(ret)
  2658. self.assertTrue(ret[next(iter(ret))]['changes'])
  2659. self.assertEqual(self._read(name), expected)
  2660. # Pass 1a: Re-run state, no changes should be made
  2661. ret = self.run_state('file.blockreplace',
  2662. name=name,
  2663. content=self.content,
  2664. marker_start=self.marker_start,
  2665. marker_end=self.marker_end,
  2666. prepend_if_not_found=True,
  2667. append_newline=True)
  2668. self.assertSaltTrueReturn(ret)
  2669. self.assertFalse(ret[next(iter(ret))]['changes'])
  2670. self.assertEqual(self._read(name), expected)
  2671. # Pass 2: content does not end in newline
  2672. expected = self.marker_start + os.linesep + self.content + \
  2673. self.marker_end + os.linesep + self.without_block
  2674. self._write(name, self.without_block)
  2675. ret = self.run_state('file.blockreplace',
  2676. name=name,
  2677. content=self.content.rstrip('\r\n'),
  2678. marker_start=self.marker_start,
  2679. marker_end=self.marker_end,
  2680. prepend_if_not_found=True,
  2681. append_newline=True)
  2682. self.assertSaltTrueReturn(ret)
  2683. self.assertTrue(ret[next(iter(ret))]['changes'])
  2684. self.assertEqual(self._read(name), expected)
  2685. # Pass 2a: Re-run state, no changes should be made
  2686. ret = self.run_state('file.blockreplace',
  2687. name=name,
  2688. content=self.content.rstrip('\r\n'),
  2689. marker_start=self.marker_start,
  2690. marker_end=self.marker_end,
  2691. prepend_if_not_found=True,
  2692. append_newline=True)
  2693. self.assertSaltTrueReturn(ret)
  2694. self.assertFalse(ret[next(iter(ret))]['changes'])
  2695. self.assertEqual(self._read(name), expected)
  2696. @with_tempfile()
  2697. def test_prepend_no_append_newline(self, name):
  2698. '''
  2699. Test blockreplace when prepend_if_not_found=True and block doesn't
  2700. exist in file. Test with append_newline explicitly set to False.
  2701. '''
  2702. # Pass 1: content ends in newline
  2703. expected = self.marker_start + os.linesep + self.content + \
  2704. self.marker_end + os.linesep + self.without_block
  2705. self._write(name, self.without_block)
  2706. ret = self.run_state('file.blockreplace',
  2707. name=name,
  2708. content=self.content,
  2709. marker_start=self.marker_start,
  2710. marker_end=self.marker_end,
  2711. prepend_if_not_found=True,
  2712. append_newline=False)
  2713. self.assertSaltTrueReturn(ret)
  2714. self.assertTrue(ret[next(iter(ret))]['changes'])
  2715. self.assertEqual(self._read(name), expected)
  2716. # Pass 1a: Re-run state, no changes should be made
  2717. ret = self.run_state('file.blockreplace',
  2718. name=name,
  2719. content=self.content,
  2720. marker_start=self.marker_start,
  2721. marker_end=self.marker_end,
  2722. prepend_if_not_found=True,
  2723. append_newline=False)
  2724. self.assertSaltTrueReturn(ret)
  2725. self.assertFalse(ret[next(iter(ret))]['changes'])
  2726. self.assertEqual(self._read(name), expected)
  2727. # Pass 2: content does not end in newline
  2728. expected = self.marker_start + os.linesep + \
  2729. self.content.rstrip('\r\n') + self.marker_end + os.linesep + \
  2730. self.without_block
  2731. self._write(name, self.without_block)
  2732. ret = self.run_state('file.blockreplace',
  2733. name=name,
  2734. content=self.content.rstrip('\r\n'),
  2735. marker_start=self.marker_start,
  2736. marker_end=self.marker_end,
  2737. prepend_if_not_found=True,
  2738. append_newline=False)
  2739. self.assertSaltTrueReturn(ret)
  2740. self.assertTrue(ret[next(iter(ret))]['changes'])
  2741. self.assertEqual(self._read(name), expected)
  2742. # Pass 2a: Re-run state, no changes should be made
  2743. ret = self.run_state('file.blockreplace',
  2744. name=name,
  2745. content=self.content.rstrip('\r\n'),
  2746. marker_start=self.marker_start,
  2747. marker_end=self.marker_end,
  2748. prepend_if_not_found=True,
  2749. append_newline=False)
  2750. self.assertSaltTrueReturn(ret)
  2751. self.assertFalse(ret[next(iter(ret))]['changes'])
  2752. self.assertEqual(self._read(name), expected)
  2753. @with_tempfile()
  2754. def test_append(self, name):
  2755. '''
  2756. Test blockreplace when append_if_not_found=True and block doesn't
  2757. exist in file.
  2758. '''
  2759. expected = self.without_block + self.marker_start + os.linesep + \
  2760. self.content + self.marker_end + os.linesep
  2761. # Pass 1: content ends in newline
  2762. self._write(name, self.without_block)
  2763. ret = self.run_state('file.blockreplace',
  2764. name=name,
  2765. content=self.content,
  2766. marker_start=self.marker_start,
  2767. marker_end=self.marker_end,
  2768. append_if_not_found=True)
  2769. self.assertSaltTrueReturn(ret)
  2770. self.assertTrue(ret[next(iter(ret))]['changes'])
  2771. self.assertEqual(self._read(name), expected)
  2772. # Pass 1a: Re-run state, no changes should be made
  2773. ret = self.run_state('file.blockreplace',
  2774. name=name,
  2775. content=self.content,
  2776. marker_start=self.marker_start,
  2777. marker_end=self.marker_end,
  2778. append_if_not_found=True)
  2779. self.assertSaltTrueReturn(ret)
  2780. self.assertFalse(ret[next(iter(ret))]['changes'])
  2781. self.assertEqual(self._read(name), expected)
  2782. # Pass 2: content does not end in newline
  2783. self._write(name, self.without_block)
  2784. ret = self.run_state('file.blockreplace',
  2785. name=name,
  2786. content=self.content.rstrip('\r\n'),
  2787. marker_start=self.marker_start,
  2788. marker_end=self.marker_end,
  2789. append_if_not_found=True)
  2790. self.assertSaltTrueReturn(ret)
  2791. self.assertTrue(ret[next(iter(ret))]['changes'])
  2792. self.assertEqual(self._read(name), expected)
  2793. # Pass 2a: Re-run state, no changes should be made
  2794. ret = self.run_state('file.blockreplace',
  2795. name=name,
  2796. content=self.content.rstrip('\r\n'),
  2797. marker_start=self.marker_start,
  2798. marker_end=self.marker_end,
  2799. append_if_not_found=True)
  2800. self.assertSaltTrueReturn(ret)
  2801. self.assertFalse(ret[next(iter(ret))]['changes'])
  2802. self.assertEqual(self._read(name), expected)
  2803. @with_tempfile()
  2804. def test_append_append_newline(self, name):
  2805. '''
  2806. Test blockreplace when append_if_not_found=True and block doesn't
  2807. exist in file. Test with append_newline explicitly set to True.
  2808. '''
  2809. # Pass 1: content ends in newline
  2810. expected = self.without_block + self.marker_start + os.linesep + \
  2811. self.content + os.linesep + self.marker_end + os.linesep
  2812. self._write(name, self.without_block)
  2813. ret = self.run_state('file.blockreplace',
  2814. name=name,
  2815. content=self.content,
  2816. marker_start=self.marker_start,
  2817. marker_end=self.marker_end,
  2818. append_if_not_found=True,
  2819. append_newline=True)
  2820. self.assertSaltTrueReturn(ret)
  2821. self.assertTrue(ret[next(iter(ret))]['changes'])
  2822. self.assertEqual(self._read(name), expected)
  2823. # Pass 1a: Re-run state, no changes should be made
  2824. ret = self.run_state('file.blockreplace',
  2825. name=name,
  2826. content=self.content,
  2827. marker_start=self.marker_start,
  2828. marker_end=self.marker_end,
  2829. append_if_not_found=True,
  2830. append_newline=True)
  2831. self.assertSaltTrueReturn(ret)
  2832. self.assertFalse(ret[next(iter(ret))]['changes'])
  2833. self.assertEqual(self._read(name), expected)
  2834. # Pass 2: content does not end in newline
  2835. expected = self.without_block + self.marker_start + os.linesep + \
  2836. self.content + self.marker_end + os.linesep
  2837. self._write(name, self.without_block)
  2838. ret = self.run_state('file.blockreplace',
  2839. name=name,
  2840. content=self.content.rstrip('\r\n'),
  2841. marker_start=self.marker_start,
  2842. marker_end=self.marker_end,
  2843. append_if_not_found=True,
  2844. append_newline=True)
  2845. self.assertSaltTrueReturn(ret)
  2846. self.assertTrue(ret[next(iter(ret))]['changes'])
  2847. self.assertEqual(self._read(name), expected)
  2848. # Pass 2a: Re-run state, no changes should be made
  2849. ret = self.run_state('file.blockreplace',
  2850. name=name,
  2851. content=self.content.rstrip('\r\n'),
  2852. marker_start=self.marker_start,
  2853. marker_end=self.marker_end,
  2854. append_if_not_found=True,
  2855. append_newline=True)
  2856. self.assertSaltTrueReturn(ret)
  2857. self.assertFalse(ret[next(iter(ret))]['changes'])
  2858. self.assertEqual(self._read(name), expected)
  2859. @with_tempfile()
  2860. def test_append_no_append_newline(self, name):
  2861. '''
  2862. Test blockreplace when append_if_not_found=True and block doesn't
  2863. exist in file. Test with append_newline explicitly set to False.
  2864. '''
  2865. # Pass 1: content ends in newline
  2866. expected = self.without_block + self.marker_start + os.linesep + \
  2867. self.content + self.marker_end + os.linesep
  2868. self._write(name, self.without_block)
  2869. ret = self.run_state('file.blockreplace',
  2870. name=name,
  2871. content=self.content,
  2872. marker_start=self.marker_start,
  2873. marker_end=self.marker_end,
  2874. append_if_not_found=True,
  2875. append_newline=False)
  2876. self.assertSaltTrueReturn(ret)
  2877. self.assertTrue(ret[next(iter(ret))]['changes'])
  2878. self.assertEqual(self._read(name), expected)
  2879. # Pass 1a: Re-run state, no changes should be made
  2880. ret = self.run_state('file.blockreplace',
  2881. name=name,
  2882. content=self.content,
  2883. marker_start=self.marker_start,
  2884. marker_end=self.marker_end,
  2885. append_if_not_found=True,
  2886. append_newline=False)
  2887. self.assertSaltTrueReturn(ret)
  2888. self.assertFalse(ret[next(iter(ret))]['changes'])
  2889. self.assertEqual(self._read(name), expected)
  2890. # Pass 2: content does not end in newline
  2891. expected = self.without_block + self.marker_start + os.linesep + \
  2892. self.content.rstrip('\r\n') + self.marker_end + os.linesep
  2893. self._write(name, self.without_block)
  2894. ret = self.run_state('file.blockreplace',
  2895. name=name,
  2896. content=self.content.rstrip('\r\n'),
  2897. marker_start=self.marker_start,
  2898. marker_end=self.marker_end,
  2899. append_if_not_found=True,
  2900. append_newline=False)
  2901. self.assertSaltTrueReturn(ret)
  2902. self.assertTrue(ret[next(iter(ret))]['changes'])
  2903. self.assertEqual(self._read(name), expected)
  2904. # Pass 2a: Re-run state, no changes should be made
  2905. ret = self.run_state('file.blockreplace',
  2906. name=name,
  2907. content=self.content.rstrip('\r\n'),
  2908. marker_start=self.marker_start,
  2909. marker_end=self.marker_end,
  2910. append_if_not_found=True,
  2911. append_newline=False)
  2912. self.assertSaltTrueReturn(ret)
  2913. self.assertFalse(ret[next(iter(ret))]['changes'])
  2914. self.assertEqual(self._read(name), expected)
  2915. @with_tempfile()
  2916. def test_prepend_auto_line_separator(self, name):
  2917. '''
  2918. This tests the line separator auto-detection when prepending the block
  2919. '''
  2920. # POSIX newlines to Windows newlines
  2921. self._write(name, self.without_block_explicit_windows_newlines)
  2922. ret = self.run_state('file.blockreplace',
  2923. name=name,
  2924. content=self.content_explicit_posix_newlines,
  2925. marker_start=self.marker_start,
  2926. marker_end=self.marker_end,
  2927. prepend_if_not_found=True)
  2928. self.assertSaltTrueReturn(ret)
  2929. self.assertTrue(ret[next(iter(ret))]['changes'])
  2930. self.assertEqual(
  2931. self._read(name),
  2932. self.with_block_prepended_explicit_windows_newlines)
  2933. # Re-run state, no changes should be made
  2934. ret = self.run_state('file.blockreplace',
  2935. name=name,
  2936. content=self.content_explicit_posix_newlines,
  2937. marker_start=self.marker_start,
  2938. marker_end=self.marker_end,
  2939. prepend_if_not_found=True)
  2940. self.assertSaltTrueReturn(ret)
  2941. self.assertFalse(ret[next(iter(ret))]['changes'])
  2942. self.assertEqual(
  2943. self._read(name),
  2944. self.with_block_prepended_explicit_windows_newlines)
  2945. # Windows newlines to POSIX newlines
  2946. self._write(name, self.without_block_explicit_posix_newlines)
  2947. ret = self.run_state('file.blockreplace',
  2948. name=name,
  2949. content=self.content_explicit_windows_newlines,
  2950. marker_start=self.marker_start,
  2951. marker_end=self.marker_end,
  2952. prepend_if_not_found=True)
  2953. self.assertSaltTrueReturn(ret)
  2954. self.assertTrue(ret[next(iter(ret))]['changes'])
  2955. self.assertEqual(
  2956. self._read(name),
  2957. self.with_block_prepended_explicit_posix_newlines)
  2958. # Re-run state, no changes should be made
  2959. ret = self.run_state('file.blockreplace',
  2960. name=name,
  2961. content=self.content_explicit_windows_newlines,
  2962. marker_start=self.marker_start,
  2963. marker_end=self.marker_end,
  2964. prepend_if_not_found=True)
  2965. self.assertSaltTrueReturn(ret)
  2966. self.assertFalse(ret[next(iter(ret))]['changes'])
  2967. self.assertEqual(
  2968. self._read(name),
  2969. self.with_block_prepended_explicit_posix_newlines)
  2970. @with_tempfile()
  2971. def test_append_auto_line_separator(self, name):
  2972. '''
  2973. This tests the line separator auto-detection when appending the block
  2974. '''
  2975. # POSIX newlines to Windows newlines
  2976. self._write(name, self.without_block_explicit_windows_newlines)
  2977. ret = self.run_state('file.blockreplace',
  2978. name=name,
  2979. content=self.content_explicit_posix_newlines,
  2980. marker_start=self.marker_start,
  2981. marker_end=self.marker_end,
  2982. append_if_not_found=True)
  2983. self.assertSaltTrueReturn(ret)
  2984. self.assertTrue(ret[next(iter(ret))]['changes'])
  2985. self.assertEqual(
  2986. self._read(name),
  2987. self.with_block_appended_explicit_windows_newlines)
  2988. # Re-run state, no changes should be made
  2989. ret = self.run_state('file.blockreplace',
  2990. name=name,
  2991. content=self.content_explicit_posix_newlines,
  2992. marker_start=self.marker_start,
  2993. marker_end=self.marker_end,
  2994. append_if_not_found=True)
  2995. self.assertSaltTrueReturn(ret)
  2996. self.assertFalse(ret[next(iter(ret))]['changes'])
  2997. self.assertEqual(
  2998. self._read(name),
  2999. self.with_block_appended_explicit_windows_newlines)
  3000. # Windows newlines to POSIX newlines
  3001. self._write(name, self.without_block_explicit_posix_newlines)
  3002. ret = self.run_state('file.blockreplace',
  3003. name=name,
  3004. content=self.content_explicit_windows_newlines,
  3005. marker_start=self.marker_start,
  3006. marker_end=self.marker_end,
  3007. append_if_not_found=True)
  3008. self.assertSaltTrueReturn(ret)
  3009. self.assertTrue(ret[next(iter(ret))]['changes'])
  3010. self.assertEqual(
  3011. self._read(name),
  3012. self.with_block_appended_explicit_posix_newlines)
  3013. # Re-run state, no changes should be made
  3014. ret = self.run_state('file.blockreplace',
  3015. name=name,
  3016. content=self.content_explicit_windows_newlines,
  3017. marker_start=self.marker_start,
  3018. marker_end=self.marker_end,
  3019. append_if_not_found=True)
  3020. self.assertSaltTrueReturn(ret)
  3021. self.assertFalse(ret[next(iter(ret))]['changes'])
  3022. self.assertEqual(
  3023. self._read(name),
  3024. self.with_block_appended_explicit_posix_newlines)
  3025. @with_tempfile()
  3026. def test_non_matching_block(self, name):
  3027. '''
  3028. Test blockreplace when block exists but its contents are not a
  3029. match.
  3030. '''
  3031. # Pass 1: content ends in newline
  3032. self._write(name, self.with_non_matching_block)
  3033. ret = self.run_state('file.blockreplace',
  3034. name=name,
  3035. content=self.content,
  3036. marker_start=self.marker_start,
  3037. marker_end=self.marker_end)
  3038. self.assertSaltTrueReturn(ret)
  3039. self.assertTrue(ret[next(iter(ret))]['changes'])
  3040. self.assertEqual(self._read(name), self.with_matching_block)
  3041. # Pass 1a: Re-run state, no changes should be made
  3042. ret = self.run_state('file.blockreplace',
  3043. name=name,
  3044. content=self.content,
  3045. marker_start=self.marker_start,
  3046. marker_end=self.marker_end)
  3047. self.assertSaltTrueReturn(ret)
  3048. self.assertFalse(ret[next(iter(ret))]['changes'])
  3049. self.assertEqual(self._read(name), self.with_matching_block)
  3050. # Pass 2: content does not end in newline
  3051. self._write(name, self.with_non_matching_block)
  3052. ret = self.run_state('file.blockreplace',
  3053. name=name,
  3054. content=self.content.rstrip('\r\n'),
  3055. marker_start=self.marker_start,
  3056. marker_end=self.marker_end)
  3057. self.assertSaltTrueReturn(ret)
  3058. self.assertTrue(ret[next(iter(ret))]['changes'])
  3059. self.assertEqual(self._read(name), self.with_matching_block)
  3060. # Pass 2a: Re-run state, no changes should be made
  3061. ret = self.run_state('file.blockreplace',
  3062. name=name,
  3063. content=self.content.rstrip('\r\n'),
  3064. marker_start=self.marker_start,
  3065. marker_end=self.marker_end)
  3066. self.assertSaltTrueReturn(ret)
  3067. self.assertFalse(ret[next(iter(ret))]['changes'])
  3068. self.assertEqual(self._read(name), self.with_matching_block)
  3069. @with_tempfile()
  3070. def test_non_matching_block_append_newline(self, name):
  3071. '''
  3072. Test blockreplace when block exists but its contents are not a
  3073. match. Test with append_newline explicitly set to True.
  3074. '''
  3075. # Pass 1: content ends in newline
  3076. self._write(name, self.with_non_matching_block)
  3077. ret = self.run_state('file.blockreplace',
  3078. name=name,
  3079. content=self.content,
  3080. marker_start=self.marker_start,
  3081. marker_end=self.marker_end,
  3082. append_newline=True)
  3083. self.assertSaltTrueReturn(ret)
  3084. self.assertTrue(ret[next(iter(ret))]['changes'])
  3085. self.assertEqual(
  3086. self._read(name),
  3087. self.with_matching_block_and_extra_newline)
  3088. # Pass 1a: Re-run state, no changes should be made
  3089. ret = self.run_state('file.blockreplace',
  3090. name=name,
  3091. content=self.content,
  3092. marker_start=self.marker_start,
  3093. marker_end=self.marker_end,
  3094. append_newline=True)
  3095. self.assertSaltTrueReturn(ret)
  3096. self.assertFalse(ret[next(iter(ret))]['changes'])
  3097. self.assertEqual(
  3098. self._read(name),
  3099. self.with_matching_block_and_extra_newline)
  3100. # Pass 2: content does not end in newline
  3101. self._write(name, self.with_non_matching_block)
  3102. ret = self.run_state('file.blockreplace',
  3103. name=name,
  3104. content=self.content.rstrip('\r\n'),
  3105. marker_start=self.marker_start,
  3106. marker_end=self.marker_end,
  3107. append_newline=True)
  3108. self.assertSaltTrueReturn(ret)
  3109. self.assertTrue(ret[next(iter(ret))]['changes'])
  3110. self.assertEqual(self._read(name), self.with_matching_block)
  3111. # Pass 2a: Re-run state, no changes should be made
  3112. ret = self.run_state('file.blockreplace',
  3113. name=name,
  3114. content=self.content.rstrip('\r\n'),
  3115. marker_start=self.marker_start,
  3116. marker_end=self.marker_end,
  3117. append_newline=True)
  3118. self.assertSaltTrueReturn(ret)
  3119. self.assertFalse(ret[next(iter(ret))]['changes'])
  3120. self.assertEqual(self._read(name), self.with_matching_block)
  3121. @with_tempfile()
  3122. def test_non_matching_block_no_append_newline(self, name):
  3123. '''
  3124. Test blockreplace when block exists but its contents are not a
  3125. match. Test with append_newline explicitly set to False.
  3126. '''
  3127. # Pass 1: content ends in newline
  3128. self._write(name, self.with_non_matching_block)
  3129. ret = self.run_state('file.blockreplace',
  3130. name=name,
  3131. content=self.content,
  3132. marker_start=self.marker_start,
  3133. marker_end=self.marker_end,
  3134. append_newline=False)
  3135. self.assertSaltTrueReturn(ret)
  3136. self.assertTrue(ret[next(iter(ret))]['changes'])
  3137. self.assertEqual(self._read(name), self.with_matching_block)
  3138. # Pass 1a: Re-run state, no changes should be made
  3139. ret = self.run_state('file.blockreplace',
  3140. name=name,
  3141. content=self.content,
  3142. marker_start=self.marker_start,
  3143. marker_end=self.marker_end,
  3144. append_newline=False)
  3145. self.assertSaltTrueReturn(ret)
  3146. self.assertFalse(ret[next(iter(ret))]['changes'])
  3147. self.assertEqual(self._read(name), self.with_matching_block)
  3148. # Pass 2: content does not end in newline
  3149. self._write(name, self.with_non_matching_block)
  3150. ret = self.run_state('file.blockreplace',
  3151. name=name,
  3152. content=self.content.rstrip('\r\n'),
  3153. marker_start=self.marker_start,
  3154. marker_end=self.marker_end,
  3155. append_newline=False)
  3156. self.assertSaltTrueReturn(ret)
  3157. self.assertTrue(ret[next(iter(ret))]['changes'])
  3158. self.assertEqual(
  3159. self._read(name),
  3160. self.with_matching_block_and_marker_end_not_after_newline)
  3161. # Pass 2a: Re-run state, no changes should be made
  3162. ret = self.run_state('file.blockreplace',
  3163. name=name,
  3164. content=self.content.rstrip('\r\n'),
  3165. marker_start=self.marker_start,
  3166. marker_end=self.marker_end,
  3167. append_newline=False)
  3168. self.assertSaltTrueReturn(ret)
  3169. self.assertFalse(ret[next(iter(ret))]['changes'])
  3170. self.assertEqual(
  3171. self._read(name),
  3172. self.with_matching_block_and_marker_end_not_after_newline)
  3173. @with_tempfile()
  3174. def test_non_matching_block_and_marker_not_after_newline(self, name):
  3175. '''
  3176. Test blockreplace when block exists but its contents are not a
  3177. match, and the marker_end is not directly preceded by a newline.
  3178. '''
  3179. # Pass 1: content ends in newline
  3180. self._write(
  3181. name,
  3182. self.with_non_matching_block_and_marker_end_not_after_newline)
  3183. ret = self.run_state('file.blockreplace',
  3184. name=name,
  3185. content=self.content,
  3186. marker_start=self.marker_start,
  3187. marker_end=self.marker_end)
  3188. self.assertSaltTrueReturn(ret)
  3189. self.assertTrue(ret[next(iter(ret))]['changes'])
  3190. self.assertEqual(self._read(name), self.with_matching_block)
  3191. # Pass 1a: Re-run state, no changes should be made
  3192. ret = self.run_state('file.blockreplace',
  3193. name=name,
  3194. content=self.content,
  3195. marker_start=self.marker_start,
  3196. marker_end=self.marker_end)
  3197. self.assertSaltTrueReturn(ret)
  3198. self.assertFalse(ret[next(iter(ret))]['changes'])
  3199. self.assertEqual(self._read(name), self.with_matching_block)
  3200. # Pass 2: content does not end in newline
  3201. self._write(
  3202. name,
  3203. self.with_non_matching_block_and_marker_end_not_after_newline)
  3204. ret = self.run_state('file.blockreplace',
  3205. name=name,
  3206. content=self.content.rstrip('\r\n'),
  3207. marker_start=self.marker_start,
  3208. marker_end=self.marker_end)
  3209. self.assertSaltTrueReturn(ret)
  3210. self.assertTrue(ret[next(iter(ret))]['changes'])
  3211. self.assertEqual(self._read(name), self.with_matching_block)
  3212. # Pass 2a: Re-run state, no changes should be made
  3213. ret = self.run_state('file.blockreplace',
  3214. name=name,
  3215. content=self.content.rstrip('\r\n'),
  3216. marker_start=self.marker_start,
  3217. marker_end=self.marker_end)
  3218. self.assertSaltTrueReturn(ret)
  3219. self.assertFalse(ret[next(iter(ret))]['changes'])
  3220. self.assertEqual(self._read(name), self.with_matching_block)
  3221. @with_tempfile()
  3222. def test_non_matching_block_and_marker_not_after_newline_append_newline(self, name):
  3223. '''
  3224. Test blockreplace when block exists but its contents are not a match,
  3225. and the marker_end is not directly preceded by a newline. Test with
  3226. append_newline explicitly set to True.
  3227. '''
  3228. # Pass 1: content ends in newline
  3229. self._write(
  3230. name,
  3231. self.with_non_matching_block_and_marker_end_not_after_newline)
  3232. ret = self.run_state('file.blockreplace',
  3233. name=name,
  3234. content=self.content,
  3235. marker_start=self.marker_start,
  3236. marker_end=self.marker_end,
  3237. append_newline=True)
  3238. self.assertSaltTrueReturn(ret)
  3239. self.assertTrue(ret[next(iter(ret))]['changes'])
  3240. self.assertEqual(
  3241. self._read(name),
  3242. self.with_matching_block_and_extra_newline)
  3243. # Pass 1a: Re-run state, no changes should be made
  3244. ret = self.run_state('file.blockreplace',
  3245. name=name,
  3246. content=self.content,
  3247. marker_start=self.marker_start,
  3248. marker_end=self.marker_end,
  3249. append_newline=True)
  3250. self.assertSaltTrueReturn(ret)
  3251. self.assertFalse(ret[next(iter(ret))]['changes'])
  3252. self.assertEqual(
  3253. self._read(name),
  3254. self.with_matching_block_and_extra_newline)
  3255. # Pass 2: content does not end in newline
  3256. self._write(
  3257. name,
  3258. self.with_non_matching_block_and_marker_end_not_after_newline)
  3259. ret = self.run_state('file.blockreplace',
  3260. name=name,
  3261. content=self.content.rstrip('\r\n'),
  3262. marker_start=self.marker_start,
  3263. marker_end=self.marker_end,
  3264. append_newline=True)
  3265. self.assertSaltTrueReturn(ret)
  3266. self.assertTrue(ret[next(iter(ret))]['changes'])
  3267. self.assertEqual(self._read(name), self.with_matching_block)
  3268. # Pass 2a: Re-run state, no changes should be made
  3269. ret = self.run_state('file.blockreplace',
  3270. name=name,
  3271. content=self.content.rstrip('\r\n'),
  3272. marker_start=self.marker_start,
  3273. marker_end=self.marker_end,
  3274. append_newline=True)
  3275. self.assertSaltTrueReturn(ret)
  3276. self.assertFalse(ret[next(iter(ret))]['changes'])
  3277. self.assertEqual(self._read(name), self.with_matching_block)
  3278. @with_tempfile()
  3279. def test_non_matching_block_and_marker_not_after_newline_no_append_newline(self, name):
  3280. '''
  3281. Test blockreplace when block exists but its contents are not a match,
  3282. and the marker_end is not directly preceded by a newline. Test with
  3283. append_newline explicitly set to False.
  3284. '''
  3285. # Pass 1: content ends in newline
  3286. self._write(
  3287. name,
  3288. self.with_non_matching_block_and_marker_end_not_after_newline)
  3289. ret = self.run_state('file.blockreplace',
  3290. name=name,
  3291. content=self.content,
  3292. marker_start=self.marker_start,
  3293. marker_end=self.marker_end,
  3294. append_newline=False)
  3295. self.assertSaltTrueReturn(ret)
  3296. self.assertTrue(ret[next(iter(ret))]['changes'])
  3297. self.assertEqual(self._read(name), self.with_matching_block)
  3298. # Pass 1a: Re-run state, no changes should be made
  3299. ret = self.run_state('file.blockreplace',
  3300. name=name,
  3301. content=self.content,
  3302. marker_start=self.marker_start,
  3303. marker_end=self.marker_end,
  3304. append_newline=False)
  3305. self.assertSaltTrueReturn(ret)
  3306. self.assertFalse(ret[next(iter(ret))]['changes'])
  3307. self.assertEqual(self._read(name), self.with_matching_block)
  3308. # Pass 2: content does not end in newline
  3309. self._write(
  3310. name,
  3311. self.with_non_matching_block_and_marker_end_not_after_newline)
  3312. ret = self.run_state('file.blockreplace',
  3313. name=name,
  3314. content=self.content.rstrip('\r\n'),
  3315. marker_start=self.marker_start,
  3316. marker_end=self.marker_end,
  3317. append_newline=False)
  3318. self.assertSaltTrueReturn(ret)
  3319. self.assertTrue(ret[next(iter(ret))]['changes'])
  3320. self.assertEqual(
  3321. self._read(name),
  3322. self.with_matching_block_and_marker_end_not_after_newline)
  3323. # Pass 2a: Re-run state, no changes should be made
  3324. ret = self.run_state('file.blockreplace',
  3325. name=name,
  3326. content=self.content.rstrip('\r\n'),
  3327. marker_start=self.marker_start,
  3328. marker_end=self.marker_end,
  3329. append_newline=False)
  3330. self.assertSaltTrueReturn(ret)
  3331. self.assertFalse(ret[next(iter(ret))]['changes'])
  3332. self.assertEqual(
  3333. self._read(name),
  3334. self.with_matching_block_and_marker_end_not_after_newline)
  3335. @with_tempfile()
  3336. def test_matching_block(self, name):
  3337. '''
  3338. Test blockreplace when block exists and its contents are a match. No
  3339. changes should be made.
  3340. '''
  3341. # Pass 1: content ends in newline
  3342. self._write(name, self.with_matching_block)
  3343. ret = self.run_state('file.blockreplace',
  3344. name=name,
  3345. content=self.content,
  3346. marker_start=self.marker_start,
  3347. marker_end=self.marker_end)
  3348. self.assertSaltTrueReturn(ret)
  3349. self.assertFalse(ret[next(iter(ret))]['changes'])
  3350. self.assertEqual(self._read(name), self.with_matching_block)
  3351. # Pass 1a: Re-run state, no changes should be made
  3352. ret = self.run_state('file.blockreplace',
  3353. name=name,
  3354. content=self.content,
  3355. marker_start=self.marker_start,
  3356. marker_end=self.marker_end)
  3357. self.assertSaltTrueReturn(ret)
  3358. self.assertFalse(ret[next(iter(ret))]['changes'])
  3359. self.assertEqual(self._read(name), self.with_matching_block)
  3360. # Pass 2: content does not end in newline
  3361. self._write(name, self.with_matching_block)
  3362. ret = self.run_state('file.blockreplace',
  3363. name=name,
  3364. content=self.content.rstrip('\r\n'),
  3365. marker_start=self.marker_start,
  3366. marker_end=self.marker_end)
  3367. self.assertSaltTrueReturn(ret)
  3368. self.assertFalse(ret[next(iter(ret))]['changes'])
  3369. self.assertEqual(self._read(name), self.with_matching_block)
  3370. # Pass 2a: Re-run state, no changes should be made
  3371. ret = self.run_state('file.blockreplace',
  3372. name=name,
  3373. content=self.content.rstrip('\r\n'),
  3374. marker_start=self.marker_start,
  3375. marker_end=self.marker_end)
  3376. self.assertSaltTrueReturn(ret)
  3377. self.assertFalse(ret[next(iter(ret))]['changes'])
  3378. self.assertEqual(self._read(name), self.with_matching_block)
  3379. @with_tempfile()
  3380. def test_matching_block_append_newline(self, name):
  3381. '''
  3382. Test blockreplace when block exists and its contents are a match. Test
  3383. with append_newline explicitly set to True. This will result in an
  3384. extra newline when the content ends in a newline, and will not when the
  3385. content does not end in a newline.
  3386. '''
  3387. # Pass 1: content ends in newline
  3388. self._write(name, self.with_matching_block)
  3389. ret = self.run_state('file.blockreplace',
  3390. name=name,
  3391. content=self.content,
  3392. marker_start=self.marker_start,
  3393. marker_end=self.marker_end,
  3394. append_newline=True)
  3395. self.assertSaltTrueReturn(ret)
  3396. self.assertTrue(ret[next(iter(ret))]['changes'])
  3397. self.assertEqual(
  3398. self._read(name),
  3399. self.with_matching_block_and_extra_newline)
  3400. # Pass 1a: Re-run state, no changes should be made
  3401. ret = self.run_state('file.blockreplace',
  3402. name=name,
  3403. content=self.content,
  3404. marker_start=self.marker_start,
  3405. marker_end=self.marker_end,
  3406. append_newline=True)
  3407. self.assertSaltTrueReturn(ret)
  3408. self.assertFalse(ret[next(iter(ret))]['changes'])
  3409. self.assertEqual(
  3410. self._read(name),
  3411. self.with_matching_block_and_extra_newline)
  3412. # Pass 2: content does not end in newline
  3413. self._write(name, self.with_matching_block)
  3414. ret = self.run_state('file.blockreplace',
  3415. name=name,
  3416. content=self.content.rstrip('\r\n'),
  3417. marker_start=self.marker_start,
  3418. marker_end=self.marker_end,
  3419. append_newline=True)
  3420. self.assertSaltTrueReturn(ret)
  3421. self.assertFalse(ret[next(iter(ret))]['changes'])
  3422. self.assertEqual(self._read(name), self.with_matching_block)
  3423. # Pass 2a: Re-run state, no changes should be made
  3424. ret = self.run_state('file.blockreplace',
  3425. name=name,
  3426. content=self.content.rstrip('\r\n'),
  3427. marker_start=self.marker_start,
  3428. marker_end=self.marker_end,
  3429. append_newline=True)
  3430. self.assertSaltTrueReturn(ret)
  3431. self.assertFalse(ret[next(iter(ret))]['changes'])
  3432. self.assertEqual(self._read(name), self.with_matching_block)
  3433. @with_tempfile()
  3434. def test_matching_block_no_append_newline(self, name):
  3435. '''
  3436. Test blockreplace when block exists and its contents are a match. Test
  3437. with append_newline explicitly set to False. This will result in the
  3438. marker_end not being directly preceded by a newline when the content
  3439. does not end in a newline.
  3440. '''
  3441. # Pass 1: content ends in newline
  3442. self._write(name, self.with_matching_block)
  3443. ret = self.run_state('file.blockreplace',
  3444. name=name,
  3445. content=self.content,
  3446. marker_start=self.marker_start,
  3447. marker_end=self.marker_end,
  3448. append_newline=False)
  3449. self.assertSaltTrueReturn(ret)
  3450. self.assertFalse(ret[next(iter(ret))]['changes'])
  3451. self.assertEqual(self._read(name), self.with_matching_block)
  3452. # Pass 1a: Re-run state, no changes should be made
  3453. ret = self.run_state('file.blockreplace',
  3454. name=name,
  3455. content=self.content,
  3456. marker_start=self.marker_start,
  3457. marker_end=self.marker_end,
  3458. append_newline=False)
  3459. self.assertSaltTrueReturn(ret)
  3460. self.assertFalse(ret[next(iter(ret))]['changes'])
  3461. self.assertEqual(self._read(name), self.with_matching_block)
  3462. # Pass 2: content does not end in newline
  3463. self._write(name, self.with_matching_block)
  3464. ret = self.run_state('file.blockreplace',
  3465. name=name,
  3466. content=self.content.rstrip('\r\n'),
  3467. marker_start=self.marker_start,
  3468. marker_end=self.marker_end,
  3469. append_newline=False)
  3470. self.assertSaltTrueReturn(ret)
  3471. self.assertTrue(ret[next(iter(ret))]['changes'])
  3472. self.assertEqual(
  3473. self._read(name),
  3474. self.with_matching_block_and_marker_end_not_after_newline)
  3475. # Pass 2a: Re-run state, no changes should be made
  3476. ret = self.run_state('file.blockreplace',
  3477. name=name,
  3478. content=self.content.rstrip('\r\n'),
  3479. marker_start=self.marker_start,
  3480. marker_end=self.marker_end,
  3481. append_newline=False)
  3482. self.assertSaltTrueReturn(ret)
  3483. self.assertFalse(ret[next(iter(ret))]['changes'])
  3484. self.assertEqual(
  3485. self._read(name),
  3486. self.with_matching_block_and_marker_end_not_after_newline)
  3487. @with_tempfile()
  3488. def test_matching_block_and_marker_not_after_newline(self, name):
  3489. '''
  3490. Test blockreplace when block exists and its contents are a match, but
  3491. the marker_end is not directly preceded by a newline.
  3492. '''
  3493. # Pass 1: content ends in newline
  3494. self._write(
  3495. name,
  3496. self.with_matching_block_and_marker_end_not_after_newline)
  3497. ret = self.run_state('file.blockreplace',
  3498. name=name,
  3499. content=self.content,
  3500. marker_start=self.marker_start,
  3501. marker_end=self.marker_end)
  3502. self.assertSaltTrueReturn(ret)
  3503. self.assertTrue(ret[next(iter(ret))]['changes'])
  3504. self.assertEqual(self._read(name), self.with_matching_block)
  3505. # Pass 1a: Re-run state, no changes should be made
  3506. ret = self.run_state('file.blockreplace',
  3507. name=name,
  3508. content=self.content,
  3509. marker_start=self.marker_start,
  3510. marker_end=self.marker_end)
  3511. self.assertSaltTrueReturn(ret)
  3512. self.assertFalse(ret[next(iter(ret))]['changes'])
  3513. self.assertEqual(self._read(name), self.with_matching_block)
  3514. # Pass 2: content does not end in newline
  3515. self._write(
  3516. name,
  3517. self.with_matching_block_and_marker_end_not_after_newline)
  3518. ret = self.run_state('file.blockreplace',
  3519. name=name,
  3520. content=self.content.rstrip('\r\n'),
  3521. marker_start=self.marker_start,
  3522. marker_end=self.marker_end)
  3523. self.assertSaltTrueReturn(ret)
  3524. self.assertTrue(ret[next(iter(ret))]['changes'])
  3525. self.assertEqual(self._read(name), self.with_matching_block)
  3526. # Pass 2a: Re-run state, no changes should be made
  3527. ret = self.run_state('file.blockreplace',
  3528. name=name,
  3529. content=self.content.rstrip('\r\n'),
  3530. marker_start=self.marker_start,
  3531. marker_end=self.marker_end)
  3532. self.assertSaltTrueReturn(ret)
  3533. self.assertFalse(ret[next(iter(ret))]['changes'])
  3534. self.assertEqual(self._read(name), self.with_matching_block)
  3535. @with_tempfile()
  3536. def test_matching_block_and_marker_not_after_newline_append_newline(self, name):
  3537. '''
  3538. Test blockreplace when block exists and its contents are a match, but
  3539. the marker_end is not directly preceded by a newline. Test with
  3540. append_newline explicitly set to True. This will result in an extra
  3541. newline when the content ends in a newline, and will not when the
  3542. content does not end in a newline.
  3543. '''
  3544. # Pass 1: content ends in newline
  3545. self._write(
  3546. name,
  3547. self.with_matching_block_and_marker_end_not_after_newline)
  3548. ret = self.run_state('file.blockreplace',
  3549. name=name,
  3550. content=self.content,
  3551. marker_start=self.marker_start,
  3552. marker_end=self.marker_end,
  3553. append_newline=True)
  3554. self.assertSaltTrueReturn(ret)
  3555. self.assertTrue(ret[next(iter(ret))]['changes'])
  3556. self.assertEqual(
  3557. self._read(name),
  3558. self.with_matching_block_and_extra_newline)
  3559. # Pass 1a: Re-run state, no changes should be made
  3560. ret = self.run_state('file.blockreplace',
  3561. name=name,
  3562. content=self.content,
  3563. marker_start=self.marker_start,
  3564. marker_end=self.marker_end,
  3565. append_newline=True)
  3566. self.assertSaltTrueReturn(ret)
  3567. self.assertFalse(ret[next(iter(ret))]['changes'])
  3568. self.assertEqual(
  3569. self._read(name),
  3570. self.with_matching_block_and_extra_newline)
  3571. # Pass 2: content does not end in newline
  3572. self._write(
  3573. name,
  3574. self.with_matching_block_and_marker_end_not_after_newline)
  3575. ret = self.run_state('file.blockreplace',
  3576. name=name,
  3577. content=self.content.rstrip('\r\n'),
  3578. marker_start=self.marker_start,
  3579. marker_end=self.marker_end,
  3580. append_newline=True)
  3581. self.assertSaltTrueReturn(ret)
  3582. self.assertTrue(ret[next(iter(ret))]['changes'])
  3583. self.assertEqual(self._read(name), self.with_matching_block)
  3584. # Pass 2a: Re-run state, no changes should be made
  3585. ret = self.run_state('file.blockreplace',
  3586. name=name,
  3587. content=self.content.rstrip('\r\n'),
  3588. marker_start=self.marker_start,
  3589. marker_end=self.marker_end,
  3590. append_newline=True)
  3591. self.assertSaltTrueReturn(ret)
  3592. self.assertFalse(ret[next(iter(ret))]['changes'])
  3593. self.assertEqual(self._read(name), self.with_matching_block)
  3594. @with_tempfile()
  3595. def test_matching_block_and_marker_not_after_newline_no_append_newline(self, name):
  3596. '''
  3597. Test blockreplace when block exists and its contents are a match, but
  3598. the marker_end is not directly preceded by a newline. Test with
  3599. append_newline explicitly set to False.
  3600. '''
  3601. # Pass 1: content ends in newline
  3602. self._write(
  3603. name,
  3604. self.with_matching_block_and_marker_end_not_after_newline)
  3605. ret = self.run_state('file.blockreplace',
  3606. name=name,
  3607. content=self.content,
  3608. marker_start=self.marker_start,
  3609. marker_end=self.marker_end,
  3610. append_newline=False)
  3611. self.assertSaltTrueReturn(ret)
  3612. self.assertTrue(ret[next(iter(ret))]['changes'])
  3613. self.assertEqual(self._read(name), self.with_matching_block)
  3614. # Pass 1a: Re-run state, no changes should be made
  3615. ret = self.run_state('file.blockreplace',
  3616. name=name,
  3617. content=self.content,
  3618. marker_start=self.marker_start,
  3619. marker_end=self.marker_end,
  3620. append_newline=False)
  3621. self.assertSaltTrueReturn(ret)
  3622. self.assertFalse(ret[next(iter(ret))]['changes'])
  3623. self.assertEqual(self._read(name), self.with_matching_block)
  3624. # Pass 2: content does not end in newline
  3625. self._write(
  3626. name,
  3627. self.with_matching_block_and_marker_end_not_after_newline)
  3628. ret = self.run_state('file.blockreplace',
  3629. name=name,
  3630. content=self.content.rstrip('\r\n'),
  3631. marker_start=self.marker_start,
  3632. marker_end=self.marker_end,
  3633. append_newline=False)
  3634. self.assertSaltTrueReturn(ret)
  3635. self.assertFalse(ret[next(iter(ret))]['changes'])
  3636. self.assertEqual(
  3637. self._read(name),
  3638. self.with_matching_block_and_marker_end_not_after_newline)
  3639. # Pass 2a: Re-run state, no changes should be made
  3640. ret = self.run_state('file.blockreplace',
  3641. name=name,
  3642. content=self.content.rstrip('\r\n'),
  3643. marker_start=self.marker_start,
  3644. marker_end=self.marker_end,
  3645. append_newline=False)
  3646. self.assertSaltTrueReturn(ret)
  3647. self.assertFalse(ret[next(iter(ret))]['changes'])
  3648. self.assertEqual(
  3649. self._read(name),
  3650. self.with_matching_block_and_marker_end_not_after_newline)
  3651. @with_tempfile()
  3652. def test_issue_49043(self, name):
  3653. ret = self.run_function(
  3654. 'state.sls',
  3655. mods='issue-49043',
  3656. pillar={'name': name},
  3657. )
  3658. log.error("ret = %s", repr(ret))
  3659. diff = '--- \n+++ \n@@ -0,0 +1,3 @@\n'
  3660. diff += dedent('''\
  3661. +#-- start managed zone --
  3662. +äöü
  3663. +#-- end managed zone --
  3664. ''')
  3665. job = 'file_|-somefile-blockreplace_|-{}_|-blockreplace'.format(name)
  3666. self.assertEqual(
  3667. ret[job]['changes']['diff'],
  3668. diff)
  3669. @pytest.mark.windows_whitelisted
  3670. class RemoteFileTest(ModuleCase, SaltReturnAssertsMixin):
  3671. '''
  3672. Uses a local tornado webserver to test http(s) file.managed states with and
  3673. without skip_verify
  3674. '''
  3675. @classmethod
  3676. def setUpClass(cls):
  3677. cls.webserver = Webserver()
  3678. cls.webserver.start()
  3679. cls.source = cls.webserver.url('grail/scene33')
  3680. if IS_WINDOWS:
  3681. # CRLF vs LF causes a different hash on windows
  3682. cls.source_hash = '21438b3d5fd2c0028bcab92f7824dc69'
  3683. else:
  3684. cls.source_hash = 'd2feb3beb323c79fc7a0f44f1408b4a3'
  3685. @classmethod
  3686. def tearDownClass(cls):
  3687. cls.webserver.stop()
  3688. @with_tempfile(create=False)
  3689. def setUp(self, name): # pylint: disable=arguments-differ
  3690. self.name = name
  3691. def tearDown(self):
  3692. try:
  3693. os.remove(self.name)
  3694. except OSError as exc:
  3695. if exc.errno != errno.ENOENT:
  3696. six.reraise(*sys.exc_info())
  3697. def run_state(self, *args, **kwargs):
  3698. ret = super(RemoteFileTest, self).run_state(*args, **kwargs)
  3699. log.debug('ret = %s', ret)
  3700. return ret
  3701. def test_file_managed_http_source_no_hash(self):
  3702. '''
  3703. Test a remote file with no hash
  3704. '''
  3705. ret = self.run_state('file.managed',
  3706. name=self.name,
  3707. source=self.source,
  3708. skip_verify=False)
  3709. # This should fail because no hash was provided
  3710. self.assertSaltFalseReturn(ret)
  3711. def test_file_managed_http_source(self):
  3712. '''
  3713. Test a remote file with no hash
  3714. '''
  3715. ret = self.run_state('file.managed',
  3716. name=self.name,
  3717. source=self.source,
  3718. source_hash=self.source_hash,
  3719. skip_verify=False)
  3720. self.assertSaltTrueReturn(ret)
  3721. def test_file_managed_http_source_skip_verify(self):
  3722. '''
  3723. Test a remote file using skip_verify
  3724. '''
  3725. ret = self.run_state('file.managed',
  3726. name=self.name,
  3727. source=self.source,
  3728. skip_verify=True)
  3729. self.assertSaltTrueReturn(ret)
  3730. def test_file_managed_keep_source_false_http(self):
  3731. '''
  3732. This test ensures that we properly clean the cached file if keep_source
  3733. is set to False, for source files using an http:// URL
  3734. '''
  3735. # Run the state
  3736. ret = self.run_state('file.managed',
  3737. name=self.name,
  3738. source=self.source,
  3739. source_hash=self.source_hash,
  3740. keep_source=False)
  3741. ret = ret[next(iter(ret))]
  3742. assert ret['result'] is True
  3743. # Now make sure that the file is not cached
  3744. result = self.run_function('cp.is_cached', [self.source])
  3745. assert result == '', 'File is still cached at {0}'.format(result)
  3746. @skipIf(not salt.utils.path.which('patch'), 'patch is not installed')
  3747. @pytest.mark.windows_whitelisted
  3748. class PatchTest(ModuleCase, SaltReturnAssertsMixin):
  3749. def _check_patch_version(self, min_version):
  3750. '''
  3751. patch version check
  3752. '''
  3753. version = self.run_function('cmd.run', ['patch --version']).splitlines()[0]
  3754. version = version.split()[1]
  3755. if _LooseVersion(version) < _LooseVersion(min_version):
  3756. self.skipTest('Minimum patch version required: {0}. '
  3757. 'Patch version installed: {1}'.format(min_version, version))
  3758. @classmethod
  3759. def setUpClass(cls):
  3760. cls.webserver = Webserver()
  3761. cls.webserver.start()
  3762. cls.numbers_patch_name = 'numbers.patch'
  3763. cls.math_patch_name = 'math.patch'
  3764. cls.all_patch_name = 'all.patch'
  3765. cls.numbers_patch_template_name = cls.numbers_patch_name + '.jinja'
  3766. cls.math_patch_template_name = cls.math_patch_name + '.jinja'
  3767. cls.all_patch_template_name = cls.all_patch_name + '.jinja'
  3768. cls.numbers_patch_path = 'patches/' + cls.numbers_patch_name
  3769. cls.math_patch_path = 'patches/' + cls.math_patch_name
  3770. cls.all_patch_path = 'patches/' + cls.all_patch_name
  3771. cls.numbers_patch_template_path = \
  3772. 'patches/' + cls.numbers_patch_template_name
  3773. cls.math_patch_template_path = \
  3774. 'patches/' + cls.math_patch_template_name
  3775. cls.all_patch_template_path = \
  3776. 'patches/' + cls.all_patch_template_name
  3777. cls.numbers_patch = 'salt://' + cls.numbers_patch_path
  3778. cls.math_patch = 'salt://' + cls.math_patch_path
  3779. cls.all_patch = 'salt://' + cls.all_patch_path
  3780. cls.numbers_patch_template = 'salt://' + cls.numbers_patch_template_path
  3781. cls.math_patch_template = 'salt://' + cls.math_patch_template_path
  3782. cls.all_patch_template = 'salt://' + cls.all_patch_template_path
  3783. cls.numbers_patch_http = cls.webserver.url(cls.numbers_patch_path)
  3784. cls.math_patch_http = cls.webserver.url(cls.math_patch_path)
  3785. cls.all_patch_http = cls.webserver.url(cls.all_patch_path)
  3786. cls.numbers_patch_template_http = \
  3787. cls.webserver.url(cls.numbers_patch_template_path)
  3788. cls.math_patch_template_http = \
  3789. cls.webserver.url(cls.math_patch_template_path)
  3790. cls.all_patch_template_http = \
  3791. cls.webserver.url(cls.all_patch_template_path)
  3792. patches_dir = os.path.join(RUNTIME_VARS.FILES, 'file', 'base', 'patches')
  3793. cls.numbers_patch_hash = salt.utils.hashutils.get_hash(
  3794. os.path.join(patches_dir, cls.numbers_patch_name)
  3795. )
  3796. cls.math_patch_hash = salt.utils.hashutils.get_hash(
  3797. os.path.join(patches_dir, cls.math_patch_name)
  3798. )
  3799. cls.all_patch_hash = salt.utils.hashutils.get_hash(
  3800. os.path.join(patches_dir, cls.all_patch_name)
  3801. )
  3802. cls.numbers_patch_template_hash = salt.utils.hashutils.get_hash(
  3803. os.path.join(patches_dir, cls.numbers_patch_template_name)
  3804. )
  3805. cls.math_patch_template_hash = salt.utils.hashutils.get_hash(
  3806. os.path.join(patches_dir, cls.math_patch_template_name)
  3807. )
  3808. cls.all_patch_template_hash = salt.utils.hashutils.get_hash(
  3809. os.path.join(patches_dir, cls.all_patch_template_name)
  3810. )
  3811. cls.context = {'two': 'two', 'ten': 10}
  3812. @classmethod
  3813. def tearDownClass(cls):
  3814. cls.webserver.stop()
  3815. def setUp(self):
  3816. '''
  3817. Create a new unpatched set of files
  3818. '''
  3819. self.base_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  3820. os.makedirs(os.path.join(self.base_dir, 'foo', 'bar'))
  3821. self.numbers_file = os.path.join(self.base_dir, 'foo', 'numbers.txt')
  3822. self.math_file = os.path.join(self.base_dir, 'foo', 'bar', 'math.txt')
  3823. with salt.utils.files.fopen(self.numbers_file, 'w') as fp_:
  3824. fp_.write(textwrap.dedent('''\
  3825. one
  3826. two
  3827. three
  3828. 1
  3829. 2
  3830. 3
  3831. '''))
  3832. with salt.utils.files.fopen(self.math_file, 'w') as fp_:
  3833. fp_.write(textwrap.dedent('''\
  3834. Five plus five is ten
  3835. Four squared is sixteen
  3836. '''))
  3837. self.addCleanup(shutil.rmtree, self.base_dir, ignore_errors=True)
  3838. def test_patch_single_file(self):
  3839. '''
  3840. Test file.patch using a patch applied to a single file
  3841. '''
  3842. ret = self.run_state(
  3843. 'file.patch',
  3844. name=self.numbers_file,
  3845. source=self.numbers_patch,
  3846. )
  3847. self.assertSaltTrueReturn(ret)
  3848. ret = ret[next(iter(ret))]
  3849. self.assertEqual(ret['comment'], 'Patch successfully applied')
  3850. # Re-run the state, should succeed and there should be a message about
  3851. # a partially-applied hunk.
  3852. ret = self.run_state(
  3853. 'file.patch',
  3854. name=self.numbers_file,
  3855. source=self.numbers_patch,
  3856. )
  3857. self.assertSaltTrueReturn(ret)
  3858. ret = ret[next(iter(ret))]
  3859. self.assertEqual(ret['comment'], 'Patch was already applied')
  3860. self.assertEqual(ret['changes'], {})
  3861. def test_patch_directory(self):
  3862. '''
  3863. Test file.patch using a patch applied to a directory, with changes
  3864. spanning multiple files.
  3865. '''
  3866. self._check_patch_version('2.6')
  3867. ret = self.run_state(
  3868. 'file.patch',
  3869. name=self.base_dir,
  3870. source=self.all_patch,
  3871. strip=1,
  3872. )
  3873. self.assertSaltTrueReturn(ret)
  3874. ret = ret[next(iter(ret))]
  3875. self.assertEqual(ret['comment'], 'Patch successfully applied')
  3876. # Re-run the state, should succeed and there should be a message about
  3877. # a partially-applied hunk.
  3878. ret = self.run_state(
  3879. 'file.patch',
  3880. name=self.base_dir,
  3881. source=self.all_patch,
  3882. strip=1,
  3883. )
  3884. self.assertSaltTrueReturn(ret)
  3885. ret = ret[next(iter(ret))]
  3886. self.assertEqual(ret['comment'], 'Patch was already applied')
  3887. self.assertEqual(ret['changes'], {})
  3888. def test_patch_strip_parsing(self):
  3889. '''
  3890. Test that we successfuly parse -p/--strip when included in the options
  3891. '''
  3892. self._check_patch_version('2.6')
  3893. # Run the state using -p1
  3894. ret = self.run_state(
  3895. 'file.patch',
  3896. name=self.base_dir,
  3897. source=self.all_patch,
  3898. options='-p1',
  3899. )
  3900. self.assertSaltTrueReturn(ret)
  3901. ret = ret[next(iter(ret))]
  3902. self.assertEqual(ret['comment'], 'Patch successfully applied')
  3903. # Re-run the state using --strip=1
  3904. ret = self.run_state(
  3905. 'file.patch',
  3906. name=self.base_dir,
  3907. source=self.all_patch,
  3908. options='--strip=1',
  3909. )
  3910. self.assertSaltTrueReturn(ret)
  3911. ret = ret[next(iter(ret))]
  3912. self.assertEqual(ret['comment'], 'Patch was already applied')
  3913. self.assertEqual(ret['changes'], {})
  3914. # Re-run the state using --strip 1
  3915. ret = self.run_state(
  3916. 'file.patch',
  3917. name=self.base_dir,
  3918. source=self.all_patch,
  3919. options='--strip 1',
  3920. )
  3921. self.assertSaltTrueReturn(ret)
  3922. ret = ret[next(iter(ret))]
  3923. self.assertEqual(ret['comment'], 'Patch was already applied')
  3924. self.assertEqual(ret['changes'], {})
  3925. def test_patch_saltenv(self):
  3926. '''
  3927. Test that we attempt to download the patch from a non-base saltenv
  3928. '''
  3929. # This state will fail because we don't have a patch file in that
  3930. # environment, but that is OK, we just want to test that we're looking
  3931. # in an environment other than base.
  3932. ret = self.run_state(
  3933. 'file.patch',
  3934. name=self.math_file,
  3935. source=self.math_patch,
  3936. saltenv='prod',
  3937. )
  3938. self.assertSaltFalseReturn(ret)
  3939. ret = ret[next(iter(ret))]
  3940. self.assertEqual(
  3941. ret['comment'],
  3942. "Source file {0} not found in saltenv 'prod'".format(self.math_patch)
  3943. )
  3944. def test_patch_single_file_failure(self):
  3945. '''
  3946. Test file.patch using a patch applied to a single file. This tests a
  3947. failed patch.
  3948. '''
  3949. # Empty the file to ensure that the patch doesn't apply cleanly
  3950. with salt.utils.files.fopen(self.numbers_file, 'w'):
  3951. pass
  3952. ret = self.run_state(
  3953. 'file.patch',
  3954. name=self.numbers_file,
  3955. source=self.numbers_patch,
  3956. )
  3957. self.assertSaltFalseReturn(ret)
  3958. ret = ret[next(iter(ret))]
  3959. self.assertIn('Patch would not apply cleanly', ret['comment'])
  3960. # Test the reject_file option and ensure that the rejects are written
  3961. # to the path specified.
  3962. reject_file = salt.utils.files.mkstemp()
  3963. ret = self.run_state(
  3964. 'file.patch',
  3965. name=self.numbers_file,
  3966. source=self.numbers_patch,
  3967. reject_file=reject_file,
  3968. strip=1,
  3969. )
  3970. self.assertSaltFalseReturn(ret)
  3971. ret = ret[next(iter(ret))]
  3972. self.assertIn('Patch would not apply cleanly', ret['comment'])
  3973. self.assertIn(
  3974. 'saving rejects to file {0}'.format(reject_file),
  3975. ret['comment']
  3976. )
  3977. def test_patch_directory_failure(self):
  3978. '''
  3979. Test file.patch using a patch applied to a directory, with changes
  3980. spanning multiple files.
  3981. '''
  3982. # Empty the file to ensure that the patch doesn't apply
  3983. with salt.utils.files.fopen(self.math_file, 'w'):
  3984. pass
  3985. ret = self.run_state(
  3986. 'file.patch',
  3987. name=self.base_dir,
  3988. source=self.all_patch,
  3989. strip=1,
  3990. )
  3991. self.assertSaltFalseReturn(ret)
  3992. ret = ret[next(iter(ret))]
  3993. self.assertIn('Patch would not apply cleanly', ret['comment'])
  3994. # Test the reject_file option and ensure that the rejects are written
  3995. # to the path specified.
  3996. reject_file = salt.utils.files.mkstemp()
  3997. ret = self.run_state(
  3998. 'file.patch',
  3999. name=self.base_dir,
  4000. source=self.all_patch,
  4001. reject_file=reject_file,
  4002. strip=1,
  4003. )
  4004. self.assertSaltFalseReturn(ret)
  4005. ret = ret[next(iter(ret))]
  4006. self.assertIn('Patch would not apply cleanly', ret['comment'])
  4007. self.assertIn(
  4008. 'saving rejects to file {0}'.format(reject_file),
  4009. ret['comment']
  4010. )
  4011. def test_patch_single_file_remote_source(self):
  4012. '''
  4013. Test file.patch using a patch applied to a single file, with the patch
  4014. coming from a remote source.
  4015. '''
  4016. # Try without a source_hash and without skip_verify=True, this should
  4017. # fail with a message about the source_hash
  4018. ret = self.run_state(
  4019. 'file.patch',
  4020. name=self.math_file,
  4021. source=self.math_patch_http,
  4022. )
  4023. self.assertSaltFalseReturn(ret)
  4024. ret = ret[next(iter(ret))]
  4025. self.assertIn('Unable to verify upstream hash', ret['comment'])
  4026. # Re-run the state with a source hash, it should now succeed
  4027. ret = self.run_state(
  4028. 'file.patch',
  4029. name=self.math_file,
  4030. source=self.math_patch_http,
  4031. source_hash=self.math_patch_hash,
  4032. )
  4033. self.assertSaltTrueReturn(ret)
  4034. ret = ret[next(iter(ret))]
  4035. self.assertEqual(ret['comment'], 'Patch successfully applied')
  4036. # Re-run again, this time with no hash and skip_verify=True to test
  4037. # skipping hash verification
  4038. ret = self.run_state(
  4039. 'file.patch',
  4040. name=self.math_file,
  4041. source=self.math_patch_http,
  4042. skip_verify=True,
  4043. )
  4044. self.assertSaltTrueReturn(ret)
  4045. ret = ret[next(iter(ret))]
  4046. self.assertEqual(ret['comment'], 'Patch was already applied')
  4047. self.assertEqual(ret['changes'], {})
  4048. def test_patch_directory_remote_source(self):
  4049. '''
  4050. Test file.patch using a patch applied to a directory, with changes
  4051. spanning multiple files, and the patch file coming from a remote
  4052. source.
  4053. '''
  4054. self._check_patch_version('2.6')
  4055. # Try without a source_hash and without skip_verify=True, this should
  4056. # fail with a message about the source_hash
  4057. ret = self.run_state(
  4058. 'file.patch',
  4059. name=self.base_dir,
  4060. source=self.all_patch_http,
  4061. strip=1,
  4062. )
  4063. self.assertSaltFalseReturn(ret)
  4064. ret = ret[next(iter(ret))]
  4065. self.assertIn('Unable to verify upstream hash', ret['comment'])
  4066. # Re-run the state with a source hash, it should now succeed
  4067. ret = self.run_state(
  4068. 'file.patch',
  4069. name=self.base_dir,
  4070. source=self.all_patch_http,
  4071. source_hash=self.all_patch_hash,
  4072. strip=1,
  4073. )
  4074. self.assertSaltTrueReturn(ret)
  4075. ret = ret[next(iter(ret))]
  4076. self.assertEqual(ret['comment'], 'Patch successfully applied')
  4077. # Re-run again, this time with no hash and skip_verify=True to test
  4078. # skipping hash verification
  4079. ret = self.run_state(
  4080. 'file.patch',
  4081. name=self.base_dir,
  4082. source=self.all_patch_http,
  4083. strip=1,
  4084. skip_verify=True,
  4085. )
  4086. self.assertSaltTrueReturn(ret)
  4087. ret = ret[next(iter(ret))]
  4088. self.assertEqual(ret['comment'], 'Patch was already applied')
  4089. self.assertEqual(ret['changes'], {})
  4090. def test_patch_single_file_template(self):
  4091. '''
  4092. Test file.patch using a patch applied to a single file, with jinja
  4093. templating applied to the patch file.
  4094. '''
  4095. ret = self.run_state(
  4096. 'file.patch',
  4097. name=self.numbers_file,
  4098. source=self.numbers_patch_template,
  4099. template='jinja',
  4100. context=self.context,
  4101. )
  4102. self.assertSaltTrueReturn(ret)
  4103. ret = ret[next(iter(ret))]
  4104. self.assertEqual(ret['comment'], 'Patch successfully applied')
  4105. # Re-run the state, should succeed and there should be a message about
  4106. # a partially-applied hunk.
  4107. ret = self.run_state(
  4108. 'file.patch',
  4109. name=self.numbers_file,
  4110. source=self.numbers_patch_template,
  4111. template='jinja',
  4112. context=self.context,
  4113. )
  4114. self.assertSaltTrueReturn(ret)
  4115. ret = ret[next(iter(ret))]
  4116. self.assertEqual(ret['comment'], 'Patch was already applied')
  4117. self.assertEqual(ret['changes'], {})
  4118. def test_patch_directory_template(self):
  4119. '''
  4120. Test file.patch using a patch applied to a directory, with changes
  4121. spanning multiple files, and with jinja templating applied to the patch
  4122. file.
  4123. '''
  4124. self._check_patch_version('2.6')
  4125. ret = self.run_state(
  4126. 'file.patch',
  4127. name=self.base_dir,
  4128. source=self.all_patch_template,
  4129. template='jinja',
  4130. context=self.context,
  4131. strip=1,
  4132. )
  4133. self.assertSaltTrueReturn(ret)
  4134. ret = ret[next(iter(ret))]
  4135. self.assertEqual(ret['comment'], 'Patch successfully applied')
  4136. # Re-run the state, should succeed and there should be a message about
  4137. # a partially-applied hunk.
  4138. ret = self.run_state(
  4139. 'file.patch',
  4140. name=self.base_dir,
  4141. source=self.all_patch_template,
  4142. template='jinja',
  4143. context=self.context,
  4144. strip=1,
  4145. )
  4146. self.assertSaltTrueReturn(ret)
  4147. ret = ret[next(iter(ret))]
  4148. self.assertEqual(ret['comment'], 'Patch was already applied')
  4149. self.assertEqual(ret['changes'], {})
  4150. def test_patch_single_file_remote_source_template(self):
  4151. '''
  4152. Test file.patch using a patch applied to a single file, with the patch
  4153. coming from a remote source.
  4154. '''
  4155. # Try without a source_hash and without skip_verify=True, this should
  4156. # fail with a message about the source_hash
  4157. ret = self.run_state(
  4158. 'file.patch',
  4159. name=self.math_file,
  4160. source=self.math_patch_template_http,
  4161. template='jinja',
  4162. context=self.context,
  4163. )
  4164. self.assertSaltFalseReturn(ret)
  4165. ret = ret[next(iter(ret))]
  4166. self.assertIn('Unable to verify upstream hash', ret['comment'])
  4167. # Re-run the state with a source hash, it should now succeed
  4168. ret = self.run_state(
  4169. 'file.patch',
  4170. name=self.math_file,
  4171. source=self.math_patch_template_http,
  4172. source_hash=self.math_patch_template_hash,
  4173. template='jinja',
  4174. context=self.context,
  4175. )
  4176. self.assertSaltTrueReturn(ret)
  4177. ret = ret[next(iter(ret))]
  4178. self.assertEqual(ret['comment'], 'Patch successfully applied')
  4179. # Re-run again, this time with no hash and skip_verify=True to test
  4180. # skipping hash verification
  4181. ret = self.run_state(
  4182. 'file.patch',
  4183. name=self.math_file,
  4184. source=self.math_patch_template_http,
  4185. template='jinja',
  4186. context=self.context,
  4187. skip_verify=True,
  4188. )
  4189. self.assertSaltTrueReturn(ret)
  4190. ret = ret[next(iter(ret))]
  4191. self.assertEqual(ret['comment'], 'Patch was already applied')
  4192. self.assertEqual(ret['changes'], {})
  4193. def test_patch_directory_remote_source_template(self):
  4194. '''
  4195. Test file.patch using a patch applied to a directory, with changes
  4196. spanning multiple files, and the patch file coming from a remote
  4197. source.
  4198. '''
  4199. self._check_patch_version('2.6')
  4200. # Try without a source_hash and without skip_verify=True, this should
  4201. # fail with a message about the source_hash
  4202. ret = self.run_state(
  4203. 'file.patch',
  4204. name=self.base_dir,
  4205. source=self.all_patch_template_http,
  4206. template='jinja',
  4207. context=self.context,
  4208. strip=1,
  4209. )
  4210. self.assertSaltFalseReturn(ret)
  4211. ret = ret[next(iter(ret))]
  4212. self.assertIn('Unable to verify upstream hash', ret['comment'])
  4213. # Re-run the state with a source hash, it should now succeed
  4214. ret = self.run_state(
  4215. 'file.patch',
  4216. name=self.base_dir,
  4217. source=self.all_patch_template_http,
  4218. source_hash=self.all_patch_template_hash,
  4219. template='jinja',
  4220. context=self.context,
  4221. strip=1,
  4222. )
  4223. self.assertSaltTrueReturn(ret)
  4224. ret = ret[next(iter(ret))]
  4225. self.assertEqual(ret['comment'], 'Patch successfully applied')
  4226. # Re-run again, this time with no hash and skip_verify=True to test
  4227. # skipping hash verification
  4228. ret = self.run_state(
  4229. 'file.patch',
  4230. name=self.base_dir,
  4231. source=self.all_patch_template_http,
  4232. template='jinja',
  4233. context=self.context,
  4234. strip=1,
  4235. skip_verify=True,
  4236. )
  4237. self.assertSaltTrueReturn(ret)
  4238. ret = ret[next(iter(ret))]
  4239. self.assertEqual(ret['comment'], 'Patch was already applied')
  4240. self.assertEqual(ret['changes'], {})
  4241. def test_patch_test_mode(self):
  4242. '''
  4243. Test file.patch using test=True
  4244. '''
  4245. # Try without a source_hash and without skip_verify=True, this should
  4246. # fail with a message about the source_hash
  4247. ret = self.run_state(
  4248. 'file.patch',
  4249. name=self.numbers_file,
  4250. source=self.numbers_patch,
  4251. test=True,
  4252. )
  4253. self.assertSaltNoneReturn(ret)
  4254. ret = ret[next(iter(ret))]
  4255. self.assertEqual(ret['comment'], 'The patch would be applied')
  4256. self.assertTrue(ret['changes'])
  4257. # Apply the patch for real. We'll then be able to test below that we
  4258. # exit with a True rather than a None result if test=True is used on an
  4259. # already-applied patch.
  4260. ret = self.run_state(
  4261. 'file.patch',
  4262. name=self.numbers_file,
  4263. source=self.numbers_patch,
  4264. )
  4265. self.assertSaltTrueReturn(ret)
  4266. ret = ret[next(iter(ret))]
  4267. self.assertEqual(ret['comment'], 'Patch successfully applied')
  4268. self.assertTrue(ret['changes'])
  4269. # Run again with test=True. Since the pre-check happens before we do
  4270. # the __opts__['test'] check, we should exit with a True result just
  4271. # the same as if we try to run this state on an already-patched file
  4272. # *without* test=True.
  4273. ret = self.run_state(
  4274. 'file.patch',
  4275. name=self.numbers_file,
  4276. source=self.numbers_patch,
  4277. test=True,
  4278. )
  4279. self.assertSaltTrueReturn(ret)
  4280. ret = ret[next(iter(ret))]
  4281. self.assertEqual(ret['comment'], 'Patch was already applied')
  4282. self.assertEqual(ret['changes'], {})
  4283. # Empty the file to ensure that the patch doesn't apply cleanly
  4284. with salt.utils.files.fopen(self.numbers_file, 'w'):
  4285. pass
  4286. # Run again with test=True. Similar to the above run, we are testing
  4287. # that we return before we reach the __opts__['test'] check. In this
  4288. # case we should return a False result because we should already know
  4289. # by this point that the patch will not apply cleanly.
  4290. ret = self.run_state(
  4291. 'file.patch',
  4292. name=self.numbers_file,
  4293. source=self.numbers_patch,
  4294. test=True,
  4295. )
  4296. self.assertSaltFalseReturn(ret)
  4297. ret = ret[next(iter(ret))]
  4298. self.assertIn('Patch would not apply cleanly', ret['comment'])
  4299. self.assertEqual(ret['changes'], {})
  4300. WIN_TEST_FILE = 'c:/testfile'
  4301. @pytest.mark.destructive_test
  4302. @skipIf(not IS_WINDOWS, 'windows test only')
  4303. @pytest.mark.windows_whitelisted
  4304. class WinFileTest(ModuleCase):
  4305. '''
  4306. Test for the file state on Windows
  4307. '''
  4308. def setUp(self):
  4309. self.run_state('file.managed', name=WIN_TEST_FILE, makedirs=True, contents='Only a test')
  4310. def tearDown(self):
  4311. self.run_state('file.absent', name=WIN_TEST_FILE)
  4312. def test_file_managed(self):
  4313. '''
  4314. Test file.managed on Windows
  4315. '''
  4316. self.assertTrue(self.run_state('file.exists', name=WIN_TEST_FILE))
  4317. def test_file_copy(self):
  4318. '''
  4319. Test file.copy on Windows
  4320. '''
  4321. ret = self.run_state('file.copy', name='c:/testfile_copy', makedirs=True, source=WIN_TEST_FILE)
  4322. self.assertTrue(ret)
  4323. def test_file_comment(self):
  4324. '''
  4325. Test file.comment on Windows
  4326. '''
  4327. self.run_state('file.comment', name=WIN_TEST_FILE, regex='^Only')
  4328. with salt.utils.files.fopen(WIN_TEST_FILE, 'r') as fp_:
  4329. self.assertTrue(fp_.read().startswith('#Only'))
  4330. def test_file_replace(self):
  4331. '''
  4332. Test file.replace on Windows
  4333. '''
  4334. self.run_state('file.replace', name=WIN_TEST_FILE, pattern='test', repl='testing')
  4335. with salt.utils.files.fopen(WIN_TEST_FILE, 'r') as fp_:
  4336. self.assertIn('testing', fp_.read())
  4337. def test_file_absent(self):
  4338. '''
  4339. Test file.absent on Windows
  4340. '''
  4341. ret = self.run_state('file.absent', name=WIN_TEST_FILE)
  4342. self.assertTrue(ret)