test_payload.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.unit.payload_test
  5. ~~~~~~~~~~~~~~~~~~~~~~~
  6. '''
  7. # Import python libs
  8. from __future__ import absolute_import, print_function, unicode_literals
  9. import time
  10. import errno
  11. import threading
  12. import datetime
  13. # Import Salt Testing libs
  14. from tests.support.unit import skipIf, TestCase
  15. from tests.support.mock import NO_MOCK, NO_MOCK_REASON
  16. # Import Salt libs
  17. from salt.utils import immutabletypes
  18. from salt.utils.odict import OrderedDict
  19. import salt.exceptions
  20. import salt.payload
  21. # Import 3rd-party libs
  22. import zmq
  23. from salt.ext import six
  24. import logging
  25. log = logging.getLogger(__name__)
  26. @skipIf(NO_MOCK, NO_MOCK_REASON)
  27. class PayloadTestCase(TestCase):
  28. def assertNoOrderedDict(self, data):
  29. if isinstance(data, OrderedDict):
  30. raise AssertionError(
  31. 'Found an ordered dictionary'
  32. )
  33. if isinstance(data, dict):
  34. for value in six.itervalues(data):
  35. self.assertNoOrderedDict(value)
  36. elif isinstance(data, (list, tuple)):
  37. for chunk in data:
  38. self.assertNoOrderedDict(chunk)
  39. def test_list_nested_odicts(self):
  40. payload = salt.payload.Serial('msgpack')
  41. idata = {'pillar': [OrderedDict(environment='dev')]}
  42. odata = payload.loads(payload.dumps(idata.copy()))
  43. self.assertNoOrderedDict(odata)
  44. self.assertEqual(idata, odata)
  45. def test_datetime_dump_load(self):
  46. '''
  47. Check the custom datetime handler can understand itself
  48. '''
  49. payload = salt.payload.Serial('msgpack')
  50. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  51. idata = {dtvalue: dtvalue}
  52. sdata = payload.dumps(idata.copy())
  53. odata = payload.loads(sdata)
  54. self.assertEqual(
  55. sdata,
  56. b'\x81\xc7\x18N20010203T04:05:06.000007\xc7\x18N20010203T04:05:06.000007')
  57. self.assertEqual(idata, odata)
  58. def test_verylong_dump_load(self):
  59. '''
  60. Test verylong encoder/decoder
  61. '''
  62. payload = salt.payload.Serial('msgpack')
  63. idata = {'jid': 20180227140750302662}
  64. sdata = payload.dumps(idata.copy())
  65. odata = payload.loads(sdata)
  66. idata['jid'] = '{0}'.format(idata['jid'])
  67. self.assertEqual(idata, odata)
  68. def test_immutable_dict_dump_load(self):
  69. '''
  70. Test immutable dict encoder/decoder
  71. '''
  72. payload = salt.payload.Serial('msgpack')
  73. idata = {'dict': {'key': 'value'}}
  74. sdata = payload.dumps({'dict': immutabletypes.ImmutableDict(idata['dict'])})
  75. odata = payload.loads(sdata)
  76. self.assertEqual(idata, odata)
  77. def test_immutable_list_dump_load(self):
  78. '''
  79. Test immutable list encoder/decoder
  80. '''
  81. payload = salt.payload.Serial('msgpack')
  82. idata = {'list': [1, 2, 3]}
  83. sdata = payload.dumps({'list': immutabletypes.ImmutableList(idata['list'])})
  84. odata = payload.loads(sdata)
  85. self.assertEqual(idata, odata)
  86. def test_immutable_set_dump_load(self):
  87. '''
  88. Test immutable set encoder/decoder
  89. '''
  90. payload = salt.payload.Serial('msgpack')
  91. idata = {'set': ['red', 'green', 'blue']}
  92. sdata = payload.dumps({'set': immutabletypes.ImmutableSet(idata['set'])})
  93. odata = payload.loads(sdata)
  94. self.assertEqual(idata, odata)
  95. def test_odict_dump_load(self):
  96. '''
  97. Test odict just works. It wasn't until msgpack 0.2.0
  98. '''
  99. payload = salt.payload.Serial('msgpack')
  100. data = OrderedDict()
  101. data['a'] = 'b'
  102. data['y'] = 'z'
  103. data['j'] = 'k'
  104. data['w'] = 'x'
  105. sdata = payload.dumps({'set': data})
  106. odata = payload.loads(sdata)
  107. self.assertEqual({'set': dict(data)}, odata)
  108. def test_mixed_dump_load(self):
  109. '''
  110. Test we can handle all exceptions at once
  111. '''
  112. payload = salt.payload.Serial('msgpack')
  113. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  114. od = OrderedDict()
  115. od['a'] = 'b'
  116. od['y'] = 'z'
  117. od['j'] = 'k'
  118. od['w'] = 'x'
  119. idata = {dtvalue: dtvalue, # datetime
  120. 'jid': 20180227140750302662, # long int
  121. 'dict': immutabletypes.ImmutableDict({'key': 'value'}), # immutable dict
  122. 'list': immutabletypes.ImmutableList([1, 2, 3]), # immutable list
  123. 'set': immutabletypes.ImmutableSet(('red', 'green', 'blue')), # immutable set
  124. 'odict': od, # odict
  125. }
  126. edata = {dtvalue: dtvalue, # datetime, == input
  127. 'jid': '20180227140750302662', # string repr of long int
  128. 'dict': {'key': 'value'}, # builtin dict
  129. 'list': [1, 2, 3], # builtin list
  130. 'set': ['red', 'green', 'blue'], # builtin set
  131. 'odict': dict(od), # builtin dict
  132. }
  133. sdata = payload.dumps(idata)
  134. odata = payload.loads(sdata)
  135. self.assertEqual(edata, odata)
  136. def test_recursive_dump_load(self):
  137. '''
  138. Test recursive payloads are (mostly) serialized
  139. '''
  140. payload = salt.payload.Serial('msgpack')
  141. data = {'name': 'roscivs'}
  142. data['data'] = data # Data all the things!
  143. sdata = payload.dumps(data)
  144. odata = payload.loads(sdata)
  145. self.assertTrue('recursion' in odata['data'].lower())
  146. class SREQTestCase(TestCase):
  147. port = 8845 # TODO: dynamically assign a port?
  148. @classmethod
  149. def setUpClass(cls):
  150. '''
  151. Class to set up zmq echo socket
  152. '''
  153. def echo_server():
  154. '''
  155. A server that echos the message sent to it over zmq
  156. Optional "sleep" can be sent to delay response
  157. '''
  158. context = zmq.Context()
  159. socket = context.socket(zmq.REP)
  160. socket.bind("tcp://*:{0}".format(SREQTestCase.port))
  161. payload = salt.payload.Serial('msgpack')
  162. while SREQTestCase.thread_running.is_set():
  163. try:
  164. # Wait for next request from client
  165. message = socket.recv(zmq.NOBLOCK)
  166. msg_deserialized = payload.loads(message)
  167. log.info('Echo server received message: %s', msg_deserialized)
  168. if isinstance(msg_deserialized['load'], dict) and msg_deserialized['load'].get('sleep'):
  169. log.info('Test echo server sleeping for %s seconds',
  170. msg_deserialized['load']['sleep'])
  171. time.sleep(msg_deserialized['load']['sleep'])
  172. socket.send(message)
  173. except zmq.ZMQError as exc:
  174. if exc.errno == errno.EAGAIN:
  175. continue
  176. raise
  177. SREQTestCase.thread_running = threading.Event()
  178. SREQTestCase.thread_running.set()
  179. SREQTestCase.echo_server = threading.Thread(target=echo_server)
  180. SREQTestCase.echo_server.start()
  181. @classmethod
  182. def tearDownClass(cls):
  183. '''
  184. Remove echo server
  185. '''
  186. # kill the thread
  187. SREQTestCase.thread_running.clear()
  188. SREQTestCase.echo_server.join()
  189. def get_sreq(self):
  190. return salt.payload.SREQ('tcp://127.0.0.1:{0}'.format(SREQTestCase.port))
  191. def test_send_auto(self):
  192. '''
  193. Test creation, send/rect
  194. '''
  195. sreq = self.get_sreq()
  196. # check default of empty load and enc clear
  197. assert sreq.send_auto({}) == {'enc': 'clear', 'load': {}}
  198. # check that the load always gets passed
  199. assert sreq.send_auto({'load': 'foo'}) == {'load': 'foo', 'enc': 'clear'}
  200. def test_send(self):
  201. sreq = self.get_sreq()
  202. assert sreq.send('clear', 'foo') == {'enc': 'clear', 'load': 'foo'}
  203. @skipIf(True, 'Disabled until we can figure out how to make this more reliable.')
  204. def test_timeout(self):
  205. '''
  206. Test SREQ Timeouts
  207. '''
  208. sreq = self.get_sreq()
  209. # client-side timeout
  210. start = time.time()
  211. # This is a try/except instead of an assertRaises because of a possible
  212. # subtle bug in zmq wherein a timeout=0 actually exceutes a single poll
  213. # before the timeout is reached.
  214. log.info('Sending tries=0, timeout=0')
  215. try:
  216. sreq.send('clear', 'foo', tries=0, timeout=0)
  217. except salt.exceptions.SaltReqTimeoutError:
  218. pass
  219. assert time.time() - start < 1 # ensure we didn't wait
  220. # server-side timeout
  221. log.info('Sending tries=1, timeout=1')
  222. start = time.time()
  223. with self.assertRaises(salt.exceptions.SaltReqTimeoutError):
  224. sreq.send('clear', {'sleep': 2}, tries=1, timeout=1)
  225. assert time.time() - start >= 1 # ensure we actually tried once (1s)
  226. # server-side timeout with retries
  227. log.info('Sending tries=2, timeout=1')
  228. start = time.time()
  229. with self.assertRaises(salt.exceptions.SaltReqTimeoutError):
  230. sreq.send('clear', {'sleep': 2}, tries=2, timeout=1)
  231. assert time.time() - start >= 2 # ensure we actually tried twice (2s)
  232. # test a regular send afterwards (to make sure sockets aren't in a twist
  233. log.info('Sending regular send')
  234. assert sreq.send('clear', 'foo') == {'enc': 'clear', 'load': 'foo'}
  235. def test_destroy(self):
  236. '''
  237. Test the __del__ capabilities
  238. '''
  239. sreq = self.get_sreq()
  240. # ensure no exceptions when we go to destroy the sreq, since __del__
  241. # swallows exceptions, we have to call destroy directly
  242. sreq.destroy()
  243. def test_raw_vs_encoding_none(self):
  244. '''
  245. Test that we handle the new raw parameter in 5.0.2 correctly based on
  246. encoding. When encoding is None loads should return bytes
  247. '''
  248. payload = salt.payload.Serial('msgpack')
  249. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  250. idata = {dtvalue: 'strval'}
  251. sdata = payload.dumps(idata.copy())
  252. odata = payload.loads(sdata, encoding=None)
  253. assert isinstance(odata[dtvalue], six.string_types)
  254. def test_raw_vs_encoding_utf8(self):
  255. '''
  256. Test that we handle the new raw parameter in 5.0.2 correctly based on
  257. encoding. When encoding is utf-8 loads should return unicode
  258. '''
  259. payload = salt.payload.Serial('msgpack')
  260. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  261. idata = {dtvalue: 'strval'}
  262. sdata = payload.dumps(idata.copy())
  263. odata = payload.loads(sdata, encoding='utf-8')
  264. assert isinstance(odata[dtvalue], six.text_type)