mock.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.support.mock
  5. ~~~~~~~~~~~~~~~~~~
  6. Helper module that wraps `mock` and provides some fake objects in order to
  7. properly set the function/class decorators and yet skip the test case's
  8. execution.
  9. Note: mock >= 2.0.0 required since unittest.mock does not have
  10. MagicMock.assert_called in Python < 3.6.
  11. '''
  12. # pylint: disable=unused-import,function-redefined,blacklisted-module,blacklisted-external-module
  13. from __future__ import absolute_import
  14. import collections
  15. import copy
  16. import errno
  17. import fnmatch
  18. import sys
  19. # Import salt libs
  20. from salt.ext import six
  21. import salt.utils.stringutils
  22. try:
  23. from mock import (
  24. Mock,
  25. MagicMock,
  26. patch,
  27. sentinel,
  28. DEFAULT,
  29. # ANY and call will be imported further down
  30. create_autospec,
  31. FILTER_DIR,
  32. NonCallableMock,
  33. NonCallableMagicMock,
  34. PropertyMock,
  35. __version__
  36. )
  37. NO_MOCK = False
  38. NO_MOCK_REASON = ''
  39. mock_version = []
  40. for __part in __version__.split('.'):
  41. try:
  42. mock_version.append(int(__part))
  43. except ValueError:
  44. # Non-integer value (ex. '1a')
  45. mock_version.append(__part)
  46. mock_version = tuple(mock_version)
  47. except ImportError as exc:
  48. NO_MOCK = True
  49. NO_MOCK_REASON = 'mock python module is unavailable'
  50. mock_version = (0, 0, 0)
  51. # Let's not fail on imports by providing fake objects and classes
  52. class MagicMock(object):
  53. # __name__ can't be assigned a unicode
  54. __name__ = str('{0}.fakemock').format(__name__) # future lint: disable=blacklisted-function
  55. def __init__(self, *args, **kwargs):
  56. pass
  57. def dict(self, *args, **kwargs):
  58. return self
  59. def multiple(self, *args, **kwargs):
  60. return self
  61. def __call__(self, *args, **kwargs):
  62. return self
  63. Mock = MagicMock
  64. patch = MagicMock()
  65. sentinel = object()
  66. DEFAULT = object()
  67. create_autospec = MagicMock()
  68. FILTER_DIR = True
  69. NonCallableMock = MagicMock()
  70. NonCallableMagicMock = MagicMock()
  71. mock_open = object()
  72. PropertyMock = object()
  73. call = tuple
  74. ANY = object()
  75. if NO_MOCK is False:
  76. try:
  77. from mock import call, ANY
  78. except ImportError:
  79. NO_MOCK = True
  80. NO_MOCK_REASON = 'you need to upgrade your mock version to >= 0.8.0'
  81. class MockFH(object):
  82. def __init__(self, filename, read_data, *args, **kwargs):
  83. self.filename = filename
  84. self.read_data = read_data
  85. try:
  86. self.mode = args[0]
  87. except IndexError:
  88. self.mode = kwargs.get('mode', 'r')
  89. self.binary_mode = 'b' in self.mode
  90. self.read_mode = any(x in self.mode for x in ('r', '+'))
  91. self.write_mode = any(x in self.mode for x in ('w', 'a', '+'))
  92. self.empty_string = b'' if self.binary_mode else ''
  93. self.call = MockCall(filename, *args, **kwargs)
  94. self.read_data_iter = self._iterate_read_data(read_data)
  95. self.read = Mock(side_effect=self._read)
  96. self.readlines = Mock(side_effect=self._readlines)
  97. self.readline = Mock(side_effect=self._readline)
  98. self.write = Mock(side_effect=self._write)
  99. self.writelines = Mock(side_effect=self._writelines)
  100. self.close = Mock()
  101. self.seek = Mock()
  102. self.__loc = 0
  103. self.__read_data_ok = False
  104. def _iterate_read_data(self, read_data):
  105. '''
  106. Helper for mock_open:
  107. Retrieve lines from read_data via a generator so that separate calls to
  108. readline, read, and readlines are properly interleaved
  109. '''
  110. # Newline will always be a bytestring on PY2 because mock_open will have
  111. # normalized it to one.
  112. newline = b'\n' if isinstance(read_data, six.binary_type) else '\n'
  113. read_data = [line + newline for line in read_data.split(newline)]
  114. if read_data[-1] == newline:
  115. # If the last line ended in a newline, the list comprehension will have an
  116. # extra entry that's just a newline. Remove this.
  117. read_data = read_data[:-1]
  118. else:
  119. # If there wasn't an extra newline by itself, then the file being
  120. # emulated doesn't have a newline to end the last line, so remove the
  121. # newline that we added in the list comprehension.
  122. read_data[-1] = read_data[-1][:-1]
  123. for line in read_data:
  124. yield line
  125. @property
  126. def write_calls(self):
  127. '''
  128. Return a list of all calls to the .write() mock
  129. '''
  130. return [x[1][0] for x in self.write.mock_calls]
  131. @property
  132. def writelines_calls(self):
  133. '''
  134. Return a list of all calls to the .writelines() mock
  135. '''
  136. return [x[1][0] for x in self.writelines.mock_calls]
  137. def tell(self):
  138. return self.__loc
  139. def __check_read_data(self):
  140. if not self.__read_data_ok:
  141. if self.binary_mode:
  142. if not isinstance(self.read_data, six.binary_type):
  143. raise TypeError(
  144. '{0} opened in binary mode, expected read_data to be '
  145. 'bytes, not {1}'.format(
  146. self.filename,
  147. type(self.read_data).__name__
  148. )
  149. )
  150. else:
  151. if not isinstance(self.read_data, str):
  152. raise TypeError(
  153. '{0} opened in non-binary mode, expected read_data to '
  154. 'be str, not {1}'.format(
  155. self.filename,
  156. type(self.read_data).__name__
  157. )
  158. )
  159. # No need to repeat this the next time we check
  160. self.__read_data_ok = True
  161. def _read(self, size=0):
  162. self.__check_read_data()
  163. if not self.read_mode:
  164. raise IOError('File not open for reading')
  165. if not isinstance(size, six.integer_types) or size < 0:
  166. raise TypeError('a positive integer is required')
  167. joined = self.empty_string.join(self.read_data_iter)
  168. if not size:
  169. # read() called with no args, return everything
  170. self.__loc += len(joined)
  171. return joined
  172. else:
  173. # read() called with an explicit size. Return a slice matching the
  174. # requested size, but before doing so, reset read_data to reflect
  175. # what we read.
  176. self.read_data_iter = self._iterate_read_data(joined[size:])
  177. ret = joined[:size]
  178. self.__loc += len(ret)
  179. return ret
  180. def _readlines(self, size=None): # pylint: disable=unused-argument
  181. # TODO: Implement "size" argument
  182. self.__check_read_data()
  183. if not self.read_mode:
  184. raise IOError('File not open for reading')
  185. ret = list(self.read_data_iter)
  186. self.__loc += sum(len(x) for x in ret)
  187. return ret
  188. def _readline(self, size=None): # pylint: disable=unused-argument
  189. # TODO: Implement "size" argument
  190. self.__check_read_data()
  191. if not self.read_mode:
  192. raise IOError('File not open for reading')
  193. try:
  194. ret = next(self.read_data_iter)
  195. self.__loc += len(ret)
  196. return ret
  197. except StopIteration:
  198. return self.empty_string
  199. def __iter__(self):
  200. self.__check_read_data()
  201. if not self.read_mode:
  202. raise IOError('File not open for reading')
  203. while True:
  204. try:
  205. ret = next(self.read_data_iter)
  206. self.__loc += len(ret)
  207. yield ret
  208. except StopIteration:
  209. break
  210. def _write(self, content):
  211. if not self.write_mode:
  212. raise IOError('File not open for writing')
  213. if six.PY2:
  214. if isinstance(content, six.text_type):
  215. # encoding intentionally not specified to force a
  216. # UnicodeEncodeError when non-ascii unicode type is passed
  217. content.encode()
  218. else:
  219. content_type = type(content)
  220. if self.binary_mode and content_type is not bytes:
  221. raise TypeError(
  222. 'a bytes-like object is required, not \'{0}\''.format(
  223. content_type.__name__
  224. )
  225. )
  226. elif not self.binary_mode and content_type is not str:
  227. raise TypeError(
  228. 'write() argument must be str, not {0}'.format(
  229. content_type.__name__
  230. )
  231. )
  232. def _writelines(self, lines):
  233. if not self.write_mode:
  234. raise IOError('File not open for writing')
  235. for line in lines:
  236. self._write(line)
  237. def __enter__(self):
  238. return self
  239. def __exit__(self, exc_type, exc_val, exc_tb): # pylint: disable=unused-argument
  240. pass
  241. class MockCall(object):
  242. def __init__(self, *args, **kwargs):
  243. self.args = args
  244. self.kwargs = kwargs
  245. def __repr__(self):
  246. # future lint: disable=blacklisted-function
  247. ret = str('MockCall(')
  248. for arg in self.args:
  249. ret += repr(arg) + str(', ')
  250. if not self.kwargs:
  251. if self.args:
  252. # Remove trailing ', '
  253. ret = ret[:-2]
  254. else:
  255. for key, val in six.iteritems(self.kwargs):
  256. ret += str('{0}={1}').format(
  257. salt.utils.stringutils.to_str(key),
  258. repr(val)
  259. )
  260. ret += str(')')
  261. return ret
  262. # future lint: enable=blacklisted-function
  263. def __str__(self):
  264. return self.__repr__()
  265. def __eq__(self, other):
  266. return self.args == other.args and self.kwargs == other.kwargs
  267. class MockOpen(object):
  268. r'''
  269. This class can be used to mock the use of ``open()``.
  270. ``read_data`` is a string representing the contents of the file to be read.
  271. By default, this is an empty string.
  272. Optionally, ``read_data`` can be a dictionary mapping ``fnmatch.fnmatch()``
  273. patterns to strings (or optionally, exceptions). This allows the mocked
  274. filehandle to serve content for more than one file path.
  275. .. code-block:: python
  276. data = {
  277. '/etc/foo.conf': textwrap.dedent("""\
  278. Foo
  279. Bar
  280. Baz
  281. """),
  282. '/etc/bar.conf': textwrap.dedent("""\
  283. A
  284. B
  285. C
  286. """),
  287. }
  288. with patch('salt.utils.files.fopen', mock_open(read_data=data):
  289. do stuff
  290. If the file path being opened does not match any of the glob expressions,
  291. an IOError will be raised to simulate the file not existing.
  292. Passing ``read_data`` as a string is equivalent to passing it with a glob
  293. expression of "*". That is to say, the below two invocations are
  294. equivalent:
  295. .. code-block:: python
  296. mock_open(read_data='foo\n')
  297. mock_open(read_data={'*': 'foo\n'})
  298. Instead of a string representing file contents, ``read_data`` can map to an
  299. exception, and that exception will be raised if a file matching that
  300. pattern is opened:
  301. .. code-block:: python
  302. data = {
  303. '/etc/*': IOError(errno.EACCES, 'Permission denied'),
  304. '*': 'Hello world!\n',
  305. }
  306. with patch('salt.utils.files.fopen', mock_open(read_data=data)):
  307. do stuff
  308. The above would raise an exception if any files within /etc are opened, but
  309. would produce a mocked filehandle if any other file is opened.
  310. To simulate file contents changing upon subsequent opens, the file contents
  311. can be a list of strings/exceptions. For example:
  312. .. code-block:: python
  313. data = {
  314. '/etc/foo.conf': [
  315. 'before\n',
  316. 'after\n',
  317. ],
  318. '/etc/bar.conf': [
  319. IOError(errno.ENOENT, 'No such file or directory', '/etc/bar.conf'),
  320. 'Hey, the file exists now!',
  321. ],
  322. }
  323. with patch('salt.utils.files.fopen', mock_open(read_data=data):
  324. do stuff
  325. The first open of ``/etc/foo.conf`` would return "before\n" when read,
  326. while the second would return "after\n" when read. For ``/etc/bar.conf``,
  327. the first read would raise an exception, while the second would open
  328. successfully and read the specified string.
  329. Expressions will be attempted in dictionary iteration order (the exception
  330. being ``*`` which is tried last), so if a file path matches more than one
  331. fnmatch expression then the first match "wins". If your use case calls for
  332. overlapping expressions, then an OrderedDict can be used to ensure that the
  333. desired matching behavior occurs:
  334. .. code-block:: python
  335. data = OrderedDict()
  336. data['/etc/foo.conf'] = 'Permission granted!'
  337. data['/etc/*'] = IOError(errno.EACCES, 'Permission denied')
  338. data['*'] = '*': 'Hello world!\n'
  339. with patch('salt.utils.files.fopen', mock_open(read_data=data):
  340. do stuff
  341. The following attributes are tracked for the life of a mock object:
  342. * call_count - Tracks how many fopen calls were attempted
  343. * filehandles - This is a dictionary mapping filenames to lists of MockFH
  344. objects, representing the individual times that a given file was opened.
  345. '''
  346. def __init__(self, read_data=''):
  347. # If the read_data contains lists, we will be popping it. So, don't
  348. # modify the original value passed.
  349. read_data = copy.copy(read_data)
  350. # Normalize read_data, Python 2 filehandles should never produce unicode
  351. # types on read.
  352. if not isinstance(read_data, dict):
  353. read_data = {'*': read_data}
  354. if six.PY2:
  355. # .__class__() used here to preserve the dict class in the event that
  356. # an OrderedDict was used.
  357. new_read_data = read_data.__class__()
  358. for key, val in six.iteritems(read_data):
  359. try:
  360. val = salt.utils.data.decode(val, to_str=True)
  361. except TypeError:
  362. if not isinstance(val, BaseException):
  363. raise
  364. new_read_data[key] = val
  365. read_data = new_read_data
  366. del new_read_data
  367. self.read_data = read_data
  368. self.filehandles = {}
  369. self.calls = []
  370. self.call_count = 0
  371. def __call__(self, name, *args, **kwargs):
  372. '''
  373. Match the file being opened to the patterns in the read_data and spawn
  374. a mocked filehandle with the corresponding file contents.
  375. '''
  376. call = MockCall(name, *args, **kwargs)
  377. self.calls.append(call)
  378. self.call_count += 1
  379. for pat in self.read_data:
  380. if pat == '*':
  381. continue
  382. if fnmatch.fnmatch(name, pat):
  383. matched_pattern = pat
  384. break
  385. else:
  386. # No non-glob match in read_data, fall back to '*'
  387. matched_pattern = '*'
  388. try:
  389. matched_contents = self.read_data[matched_pattern]
  390. try:
  391. # Assuming that the value for the matching expression is a
  392. # list, pop the first element off of it.
  393. file_contents = matched_contents.pop(0)
  394. except AttributeError:
  395. # The value for the matching expression is a string (or exception)
  396. file_contents = matched_contents
  397. except IndexError:
  398. # We've run out of file contents, abort!
  399. raise RuntimeError(
  400. 'File matching expression \'{0}\' opened more times than '
  401. 'expected'.format(matched_pattern)
  402. )
  403. try:
  404. # Raise the exception if the matched file contents are an
  405. # instance of an exception class.
  406. raise file_contents
  407. except TypeError:
  408. # Contents were not an exception, so proceed with creating the
  409. # mocked filehandle.
  410. pass
  411. ret = MockFH(name, file_contents, *args, **kwargs)
  412. self.filehandles.setdefault(name, []).append(ret)
  413. return ret
  414. except KeyError:
  415. # No matching glob in read_data, treat this as a file that does
  416. # not exist and raise the appropriate exception.
  417. raise IOError(errno.ENOENT, 'No such file or directory', name)
  418. def write_calls(self, path=None):
  419. '''
  420. Returns the contents passed to all .write() calls. Use `path` to narrow
  421. the results to files matching a given pattern.
  422. '''
  423. ret = []
  424. for filename, handles in six.iteritems(self.filehandles):
  425. if path is None or fnmatch.fnmatch(filename, path):
  426. for fh_ in handles:
  427. ret.extend(fh_.write_calls)
  428. return ret
  429. def writelines_calls(self, path=None):
  430. '''
  431. Returns the contents passed to all .writelines() calls. Use `path` to
  432. narrow the results to files matching a given pattern.
  433. '''
  434. ret = []
  435. for filename, handles in six.iteritems(self.filehandles):
  436. if path is None or fnmatch.fnmatch(filename, path):
  437. for fh_ in handles:
  438. ret.extend(fh_.writelines_calls)
  439. return ret
  440. # reimplement mock_open to support multiple filehandles
  441. mock_open = MockOpen