test_payload.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.unit.payload_test
  5. ~~~~~~~~~~~~~~~~~~~~~~~
  6. '''
  7. # Import python libs
  8. from __future__ import absolute_import, print_function, unicode_literals
  9. import time
  10. import errno
  11. import threading
  12. import datetime
  13. # Import Salt Testing libs
  14. from tests.support.unit import skipIf, TestCase
  15. # Import Salt libs
  16. from salt.utils import immutabletypes
  17. from salt.utils.odict import OrderedDict
  18. import salt.exceptions
  19. import salt.payload
  20. # Import 3rd-party libs
  21. import zmq
  22. from salt.ext import six
  23. import logging
  24. log = logging.getLogger(__name__)
  25. class PayloadTestCase(TestCase):
  26. def assertNoOrderedDict(self, data):
  27. if isinstance(data, OrderedDict):
  28. raise AssertionError(
  29. 'Found an ordered dictionary'
  30. )
  31. if isinstance(data, dict):
  32. for value in six.itervalues(data):
  33. self.assertNoOrderedDict(value)
  34. elif isinstance(data, (list, tuple)):
  35. for chunk in data:
  36. self.assertNoOrderedDict(chunk)
  37. def test_list_nested_odicts(self):
  38. payload = salt.payload.Serial('msgpack')
  39. idata = {'pillar': [OrderedDict(environment='dev')]}
  40. odata = payload.loads(payload.dumps(idata.copy()))
  41. self.assertNoOrderedDict(odata)
  42. self.assertEqual(idata, odata)
  43. def test_datetime_dump_load(self):
  44. '''
  45. Check the custom datetime handler can understand itself
  46. '''
  47. payload = salt.payload.Serial('msgpack')
  48. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  49. idata = {dtvalue: dtvalue}
  50. sdata = payload.dumps(idata.copy())
  51. odata = payload.loads(sdata)
  52. self.assertEqual(
  53. sdata,
  54. b'\x81\xc7\x18N20010203T04:05:06.000007\xc7\x18N20010203T04:05:06.000007')
  55. self.assertEqual(idata, odata)
  56. def test_verylong_dump_load(self):
  57. '''
  58. Test verylong encoder/decoder
  59. '''
  60. payload = salt.payload.Serial('msgpack')
  61. idata = {'jid': 20180227140750302662}
  62. sdata = payload.dumps(idata.copy())
  63. odata = payload.loads(sdata)
  64. idata['jid'] = '{0}'.format(idata['jid'])
  65. self.assertEqual(idata, odata)
  66. def test_immutable_dict_dump_load(self):
  67. '''
  68. Test immutable dict encoder/decoder
  69. '''
  70. payload = salt.payload.Serial('msgpack')
  71. idata = {'dict': {'key': 'value'}}
  72. sdata = payload.dumps({'dict': immutabletypes.ImmutableDict(idata['dict'])})
  73. odata = payload.loads(sdata)
  74. self.assertEqual(idata, odata)
  75. def test_immutable_list_dump_load(self):
  76. '''
  77. Test immutable list encoder/decoder
  78. '''
  79. payload = salt.payload.Serial('msgpack')
  80. idata = {'list': [1, 2, 3]}
  81. sdata = payload.dumps({'list': immutabletypes.ImmutableList(idata['list'])})
  82. odata = payload.loads(sdata)
  83. self.assertEqual(idata, odata)
  84. def test_immutable_set_dump_load(self):
  85. '''
  86. Test immutable set encoder/decoder
  87. '''
  88. payload = salt.payload.Serial('msgpack')
  89. idata = {'set': ['red', 'green', 'blue']}
  90. sdata = payload.dumps({'set': immutabletypes.ImmutableSet(idata['set'])})
  91. odata = payload.loads(sdata)
  92. self.assertEqual(idata, odata)
  93. def test_odict_dump_load(self):
  94. '''
  95. Test odict just works. It wasn't until msgpack 0.2.0
  96. '''
  97. payload = salt.payload.Serial('msgpack')
  98. data = OrderedDict()
  99. data['a'] = 'b'
  100. data['y'] = 'z'
  101. data['j'] = 'k'
  102. data['w'] = 'x'
  103. sdata = payload.dumps({'set': data})
  104. odata = payload.loads(sdata)
  105. self.assertEqual({'set': dict(data)}, odata)
  106. def test_mixed_dump_load(self):
  107. '''
  108. Test we can handle all exceptions at once
  109. '''
  110. payload = salt.payload.Serial('msgpack')
  111. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  112. od = OrderedDict()
  113. od['a'] = 'b'
  114. od['y'] = 'z'
  115. od['j'] = 'k'
  116. od['w'] = 'x'
  117. idata = {dtvalue: dtvalue, # datetime
  118. 'jid': 20180227140750302662, # long int
  119. 'dict': immutabletypes.ImmutableDict({'key': 'value'}), # immutable dict
  120. 'list': immutabletypes.ImmutableList([1, 2, 3]), # immutable list
  121. 'set': immutabletypes.ImmutableSet(('red', 'green', 'blue')), # immutable set
  122. 'odict': od, # odict
  123. }
  124. edata = {dtvalue: dtvalue, # datetime, == input
  125. 'jid': '20180227140750302662', # string repr of long int
  126. 'dict': {'key': 'value'}, # builtin dict
  127. 'list': [1, 2, 3], # builtin list
  128. 'set': ['red', 'green', 'blue'], # builtin set
  129. 'odict': dict(od), # builtin dict
  130. }
  131. sdata = payload.dumps(idata)
  132. odata = payload.loads(sdata)
  133. self.assertEqual(edata, odata)
  134. def test_recursive_dump_load(self):
  135. '''
  136. Test recursive payloads are (mostly) serialized
  137. '''
  138. payload = salt.payload.Serial('msgpack')
  139. data = {'name': 'roscivs'}
  140. data['data'] = data # Data all the things!
  141. sdata = payload.dumps(data)
  142. odata = payload.loads(sdata)
  143. self.assertTrue('recursion' in odata['data'].lower())
  144. class SREQTestCase(TestCase):
  145. port = 8845 # TODO: dynamically assign a port?
  146. @classmethod
  147. def setUpClass(cls):
  148. '''
  149. Class to set up zmq echo socket
  150. '''
  151. def echo_server():
  152. '''
  153. A server that echos the message sent to it over zmq
  154. Optional "sleep" can be sent to delay response
  155. '''
  156. context = zmq.Context()
  157. socket = context.socket(zmq.REP)
  158. socket.bind("tcp://*:{0}".format(SREQTestCase.port))
  159. payload = salt.payload.Serial('msgpack')
  160. while SREQTestCase.thread_running.is_set():
  161. try:
  162. # Wait for next request from client
  163. message = socket.recv(zmq.NOBLOCK)
  164. msg_deserialized = payload.loads(message)
  165. log.info('Echo server received message: %s', msg_deserialized)
  166. if isinstance(msg_deserialized['load'], dict) and msg_deserialized['load'].get('sleep'):
  167. log.info('Test echo server sleeping for %s seconds',
  168. msg_deserialized['load']['sleep'])
  169. time.sleep(msg_deserialized['load']['sleep'])
  170. socket.send(message)
  171. except zmq.ZMQError as exc:
  172. if exc.errno == errno.EAGAIN:
  173. continue
  174. raise
  175. SREQTestCase.thread_running = threading.Event()
  176. SREQTestCase.thread_running.set()
  177. SREQTestCase.echo_server = threading.Thread(target=echo_server)
  178. SREQTestCase.echo_server.start()
  179. @classmethod
  180. def tearDownClass(cls):
  181. '''
  182. Remove echo server
  183. '''
  184. # kill the thread
  185. SREQTestCase.thread_running.clear()
  186. SREQTestCase.echo_server.join()
  187. def get_sreq(self):
  188. return salt.payload.SREQ('tcp://127.0.0.1:{0}'.format(SREQTestCase.port))
  189. def test_send_auto(self):
  190. '''
  191. Test creation, send/rect
  192. '''
  193. sreq = self.get_sreq()
  194. # check default of empty load and enc clear
  195. assert sreq.send_auto({}) == {'enc': 'clear', 'load': {}}
  196. # check that the load always gets passed
  197. assert sreq.send_auto({'load': 'foo'}) == {'load': 'foo', 'enc': 'clear'}
  198. def test_send(self):
  199. sreq = self.get_sreq()
  200. assert sreq.send('clear', 'foo') == {'enc': 'clear', 'load': 'foo'}
  201. @skipIf(True, 'Disabled until we can figure out how to make this more reliable.')
  202. def test_timeout(self):
  203. '''
  204. Test SREQ Timeouts
  205. '''
  206. sreq = self.get_sreq()
  207. # client-side timeout
  208. start = time.time()
  209. # This is a try/except instead of an assertRaises because of a possible
  210. # subtle bug in zmq wherein a timeout=0 actually exceutes a single poll
  211. # before the timeout is reached.
  212. log.info('Sending tries=0, timeout=0')
  213. try:
  214. sreq.send('clear', 'foo', tries=0, timeout=0)
  215. except salt.exceptions.SaltReqTimeoutError:
  216. pass
  217. assert time.time() - start < 1 # ensure we didn't wait
  218. # server-side timeout
  219. log.info('Sending tries=1, timeout=1')
  220. start = time.time()
  221. with self.assertRaises(salt.exceptions.SaltReqTimeoutError):
  222. sreq.send('clear', {'sleep': 2}, tries=1, timeout=1)
  223. assert time.time() - start >= 1 # ensure we actually tried once (1s)
  224. # server-side timeout with retries
  225. log.info('Sending tries=2, timeout=1')
  226. start = time.time()
  227. with self.assertRaises(salt.exceptions.SaltReqTimeoutError):
  228. sreq.send('clear', {'sleep': 2}, tries=2, timeout=1)
  229. assert time.time() - start >= 2 # ensure we actually tried twice (2s)
  230. # test a regular send afterwards (to make sure sockets aren't in a twist
  231. log.info('Sending regular send')
  232. assert sreq.send('clear', 'foo') == {'enc': 'clear', 'load': 'foo'}
  233. def test_destroy(self):
  234. '''
  235. Test the __del__ capabilities
  236. '''
  237. sreq = self.get_sreq()
  238. # ensure no exceptions when we go to destroy the sreq, since __del__
  239. # swallows exceptions, we have to call destroy directly
  240. sreq.destroy()
  241. def test_raw_vs_encoding_none(self):
  242. '''
  243. Test that we handle the new raw parameter in 5.0.2 correctly based on
  244. encoding. When encoding is None loads should return bytes
  245. '''
  246. payload = salt.payload.Serial('msgpack')
  247. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  248. idata = {dtvalue: 'strval'}
  249. sdata = payload.dumps(idata.copy())
  250. odata = payload.loads(sdata, encoding=None)
  251. assert isinstance(odata[dtvalue], six.string_types)
  252. def test_raw_vs_encoding_utf8(self):
  253. '''
  254. Test that we handle the new raw parameter in 5.0.2 correctly based on
  255. encoding. When encoding is utf-8 loads should return unicode
  256. '''
  257. payload = salt.payload.Serial('msgpack')
  258. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  259. idata = {dtvalue: 'strval'}
  260. sdata = payload.dumps(idata.copy())
  261. odata = payload.loads(sdata, encoding='utf-8')
  262. assert isinstance(odata[dtvalue], six.text_type)