1
0

test_archive.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. # -*- coding: utf-8 -*-
  2. '''
  3. Tests for the archive state
  4. '''
  5. # Import python libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import os
  8. import shutil
  9. import textwrap
  10. # Import Salt Testing libs
  11. from tests.support.runtests import RUNTIME_VARS
  12. from tests.support.case import ModuleCase
  13. from tests.support.unit import skipIf
  14. # Import salt libs
  15. import salt.utils.files
  16. import salt.utils.path
  17. import salt.utils.platform
  18. import salt.utils.stringutils
  19. # Import 3rd party libs
  20. import pytest
  21. from salt.ext import six
  22. try:
  23. import zipfile # pylint: disable=W0611
  24. HAS_ZIPFILE = True
  25. except ImportError:
  26. HAS_ZIPFILE = False
  27. @pytest.mark.destructive_test
  28. @pytest.mark.windows_whitelisted
  29. class ArchiveTest(ModuleCase):
  30. '''
  31. Validate the archive module
  32. '''
  33. # Base path used for test artifacts
  34. base_path = os.path.join(RUNTIME_VARS.TMP, 'modules', 'archive')
  35. def _set_artifact_paths(self, arch_fmt):
  36. '''
  37. Define the paths for the source, archive, and destination files
  38. :param str arch_fmt: The archive format used in the test
  39. '''
  40. self.src = os.path.join(self.base_path, '{0}_src_dir'.format(arch_fmt))
  41. self.src_file = os.path.join(self.src, 'file')
  42. self.arch = os.path.join(self.base_path, 'archive.{0}'.format(arch_fmt))
  43. self.dst = os.path.join(self.base_path, '{0}_dst_dir'.format(arch_fmt))
  44. def _set_up(self, arch_fmt, unicode_filename=False):
  45. '''
  46. Create source file tree and destination directory
  47. :param str arch_fmt: The archive format used in the test
  48. '''
  49. self._set_artifact_paths(arch_fmt)
  50. # Remove the artifacts if any present
  51. if any([os.path.exists(f) for f in (self.src, self.arch, self.dst)]):
  52. self._tear_down()
  53. self._set_artifact_paths(arch_fmt)
  54. # Create source
  55. os.makedirs(self.src)
  56. if unicode_filename:
  57. filename = 'file®'
  58. else:
  59. filename = 'file'
  60. with salt.utils.files.fopen(os.path.join(self.src, filename), 'wb') as theorem:
  61. if six.PY3 and salt.utils.platform.is_windows():
  62. encoding = 'utf-8'
  63. else:
  64. encoding = None
  65. theorem.write(salt.utils.stringutils.to_bytes(textwrap.dedent('''\
  66. Compression theorem of computational complexity theory:
  67. Given a Gödel numbering $φ$ of the computable functions and a
  68. Blum complexity measure $Φ$ where a complexity class for a
  69. boundary function $f$ is defined as
  70. $\\mathrm C(f) := \\{φ_i ∈ \\mathbb R^{(1)} | (∀^∞ x) Φ_i(x) ≤ f(x)\\}$.
  71. Then there exists a total computable function $f$ so that for
  72. all $i$
  73. $\\mathrm{Dom}(φ_i) = \\mathrm{Dom}(φ_{f(i)})$
  74. and
  75. $\\mathrm C(φ_i) ⊊ \\mathrm{C}(φ_{f(i)})$.
  76. '''), encoding=encoding))
  77. # Create destination
  78. os.makedirs(self.dst)
  79. def _tear_down(self):
  80. '''
  81. Remove source file tree, archive, and destination file tree
  82. '''
  83. for f in (self.src, self.arch, self.dst):
  84. if os.path.exists(f):
  85. if os.path.isdir(f):
  86. shutil.rmtree(f, ignore_errors=True)
  87. else:
  88. os.remove(f)
  89. del self.dst
  90. del self.src
  91. del self.arch
  92. del self.src_file
  93. def _assert_artifacts_in_ret(self, ret, file_only=False, unix_sep=False):
  94. '''
  95. Assert that the artifact source files are printed in the source command
  96. output
  97. '''
  98. def normdir(path):
  99. normdir = os.path.normcase(os.path.abspath(path))
  100. if salt.utils.platform.is_windows():
  101. # Remove the drive portion of path
  102. if len(normdir) >= 2 and normdir[1] == ':':
  103. normdir = normdir.split(':', 1)[1]
  104. normdir = normdir.lstrip(os.path.sep)
  105. # Unzipped paths might have unix line endings
  106. if unix_sep:
  107. normdir = normdir.replace(os.path.sep, '/')
  108. return normdir
  109. # Try to find source directory and file in output lines
  110. dir_in_ret = None
  111. file_in_ret = None
  112. for line in ret:
  113. if normdir(self.src) in os.path.normcase(line) \
  114. and not normdir(self.src_file) in os.path.normcase(line):
  115. dir_in_ret = True
  116. if normdir(self.src_file) in os.path.normcase(line):
  117. file_in_ret = True
  118. # Assert number of lines, reporting of source directory and file
  119. self.assertTrue(len(ret) >= 1 if file_only else 2)
  120. if not file_only:
  121. self.assertTrue(dir_in_ret)
  122. self.assertTrue(file_in_ret)
  123. @skipIf(not salt.utils.path.which('tar'), 'Cannot find tar executable')
  124. def test_tar_pack(self):
  125. '''
  126. Validate using the tar function to create archives
  127. '''
  128. self._set_up(arch_fmt='tar')
  129. # Test create archive
  130. ret = self.run_function('archive.tar', ['-cvf', self.arch], sources=self.src)
  131. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  132. self._assert_artifacts_in_ret(ret)
  133. self._tear_down()
  134. @skipIf(not salt.utils.path.which('tar'), 'Cannot find tar executable')
  135. def test_tar_unpack(self):
  136. '''
  137. Validate using the tar function to extract archives
  138. '''
  139. self._set_up(arch_fmt='tar')
  140. self.run_function('archive.tar', ['-cvf', self.arch], sources=self.src)
  141. # Test extract archive
  142. ret = self.run_function('archive.tar', ['-xvf', self.arch], dest=self.dst)
  143. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  144. self._assert_artifacts_in_ret(ret)
  145. self._tear_down()
  146. @skipIf(not salt.utils.path.which('tar'), 'Cannot find tar executable')
  147. def test_tar_pack_unicode(self):
  148. '''
  149. Validate using the tar function to create archives
  150. '''
  151. self._set_up(arch_fmt='tar', unicode_filename=True)
  152. # Test create archive
  153. ret = self.run_function('archive.tar', ['-cvf', self.arch], sources=self.src)
  154. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  155. self._assert_artifacts_in_ret(ret)
  156. self._tear_down()
  157. @skipIf(not salt.utils.path.which('tar'), 'Cannot find tar executable')
  158. def test_tar_unpack_unicode(self):
  159. '''
  160. Validate using the tar function to extract archives
  161. '''
  162. self._set_up(arch_fmt='tar', unicode_filename=True)
  163. self.run_function('archive.tar', ['-cvf', self.arch], sources=self.src)
  164. # Test extract archive
  165. ret = self.run_function('archive.tar', ['-xvf', self.arch], dest=self.dst)
  166. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  167. self._assert_artifacts_in_ret(ret)
  168. self._tear_down()
  169. @skipIf(not salt.utils.path.which('tar'), 'Cannot find tar executable')
  170. def test_tar_list_unicode(self):
  171. '''
  172. Validate using the tar function to extract archives
  173. '''
  174. self._set_up(arch_fmt='tar', unicode_filename=True)
  175. self.run_function('archive.tar', ['-cvf', self.arch], sources=self.src)
  176. # Test list archive
  177. ret = self.run_function('archive.list', name=self.arch)
  178. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  179. self._assert_artifacts_in_ret(ret)
  180. self._tear_down()
  181. @skipIf(not salt.utils.path.which('gzip'), 'Cannot find gzip executable')
  182. def test_gzip(self):
  183. '''
  184. Validate using the gzip function
  185. '''
  186. self._set_up(arch_fmt='gz')
  187. # Test create archive
  188. ret = self.run_function('archive.gzip', [self.src_file], options='-v')
  189. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  190. self._assert_artifacts_in_ret(ret, file_only=True)
  191. self._tear_down()
  192. @skipIf(not salt.utils.path.which('gzip'), 'Cannot find gzip executable')
  193. @skipIf(not salt.utils.path.which('gunzip'), 'Cannot find gunzip executable')
  194. def test_gunzip(self):
  195. '''
  196. Validate using the gunzip function
  197. '''
  198. self._set_up(arch_fmt='gz')
  199. self.run_function('archive.gzip', [self.src_file], options='-v')
  200. # Test extract archive
  201. ret = self.run_function('archive.gunzip', [self.src_file + '.gz'], options='-v')
  202. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  203. self._assert_artifacts_in_ret(ret, file_only=True)
  204. self._tear_down()
  205. @skipIf(not salt.utils.path.which('zip'), 'Cannot find zip executable')
  206. def test_cmd_zip(self):
  207. '''
  208. Validate using the cmd_zip function
  209. '''
  210. self._set_up(arch_fmt='zip')
  211. # Test create archive
  212. ret = self.run_function('archive.cmd_zip', [self.arch, self.src])
  213. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  214. self._assert_artifacts_in_ret(ret)
  215. self._tear_down()
  216. @skipIf(not salt.utils.path.which('zip'), 'Cannot find zip executable')
  217. @skipIf(not salt.utils.path.which('unzip'), 'Cannot find unzip executable')
  218. def test_cmd_unzip(self):
  219. '''
  220. Validate using the cmd_unzip function
  221. '''
  222. self._set_up(arch_fmt='zip')
  223. self.run_function('archive.cmd_zip', [self.arch, self.src])
  224. # Test create archive
  225. ret = self.run_function('archive.cmd_unzip', [self.arch, self.dst])
  226. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  227. self._assert_artifacts_in_ret(ret)
  228. self._tear_down()
  229. @skipIf(not HAS_ZIPFILE, 'Cannot find zipfile python module')
  230. def test_zip(self):
  231. '''
  232. Validate using the zip function
  233. '''
  234. self._set_up(arch_fmt='zip')
  235. # Test create archive
  236. ret = self.run_function('archive.zip', [self.arch, self.src])
  237. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  238. self._assert_artifacts_in_ret(ret)
  239. self._tear_down()
  240. @skipIf(not HAS_ZIPFILE, 'Cannot find zipfile python module')
  241. def test_unzip(self):
  242. '''
  243. Validate using the unzip function
  244. '''
  245. self._set_up(arch_fmt='zip')
  246. self.run_function('archive.zip', [self.arch, self.src])
  247. # Test create archive
  248. ret = self.run_function('archive.unzip', [self.arch, self.dst])
  249. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  250. self._assert_artifacts_in_ret(ret, unix_sep=False)
  251. self._tear_down()
  252. @skipIf(not salt.utils.path.which('rar'), 'Cannot find rar executable')
  253. def test_rar(self):
  254. '''
  255. Validate using the rar function
  256. '''
  257. self._set_up(arch_fmt='rar')
  258. # Test create archive
  259. ret = self.run_function('archive.rar', [self.arch, self.src])
  260. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  261. self._assert_artifacts_in_ret(ret)
  262. self._tear_down()
  263. @skipIf(not salt.utils.path.which('rar'), 'Cannot find rar executable')
  264. @skipIf(not salt.utils.path.which('unrar'), 'Cannot find unrar executable')
  265. def test_unrar(self):
  266. '''
  267. Validate using the unrar function
  268. '''
  269. self._set_up(arch_fmt='rar')
  270. self.run_function('archive.rar', [self.arch, self.src])
  271. # Test create archive
  272. ret = self.run_function('archive.unrar', [self.arch, self.dst])
  273. self.assertTrue(isinstance(ret, list), six.text_type(ret))
  274. self._assert_artifacts_in_ret(ret)
  275. self._tear_down()