test_payload.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.unit.payload_test
  5. ~~~~~~~~~~~~~~~~~~~~~~~
  6. '''
  7. # Import python libs
  8. from __future__ import absolute_import, print_function, unicode_literals
  9. import time
  10. import errno
  11. import threading
  12. import datetime
  13. # Import Salt Testing libs
  14. from tests.support.unit import skipIf, TestCase
  15. # Import Salt libs
  16. from salt.utils import immutabletypes
  17. from salt.utils.odict import OrderedDict
  18. import salt.exceptions
  19. import salt.payload
  20. # Import 3rd-party libs
  21. import zmq
  22. from salt.ext import six
  23. import logging
  24. log = logging.getLogger(__name__)
  25. class PayloadTestCase(TestCase):
  26. def assertNoOrderedDict(self, data):
  27. if isinstance(data, OrderedDict):
  28. raise AssertionError(
  29. 'Found an ordered dictionary'
  30. )
  31. if isinstance(data, dict):
  32. for value in six.itervalues(data):
  33. self.assertNoOrderedDict(value)
  34. elif isinstance(data, (list, tuple)):
  35. for chunk in data:
  36. self.assertNoOrderedDict(chunk)
  37. def test_list_nested_odicts(self):
  38. payload = salt.payload.Serial('msgpack')
  39. idata = {'pillar': [OrderedDict(environment='dev')]}
  40. odata = payload.loads(payload.dumps(idata.copy()))
  41. self.assertNoOrderedDict(odata)
  42. self.assertEqual(idata, odata)
  43. def test_datetime_dump_load(self):
  44. '''
  45. Check the custom datetime handler can understand itself
  46. '''
  47. payload = salt.payload.Serial('msgpack')
  48. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  49. idata = {dtvalue: dtvalue}
  50. sdata = payload.dumps(idata.copy())
  51. odata = payload.loads(sdata)
  52. self.assertEqual(
  53. sdata,
  54. b'\x81\xc7\x18N20010203T04:05:06.000007\xc7\x18N20010203T04:05:06.000007')
  55. self.assertEqual(idata, odata)
  56. def test_verylong_dump_load(self):
  57. '''
  58. Test verylong encoder/decoder
  59. '''
  60. payload = salt.payload.Serial('msgpack')
  61. idata = {'jid': 20180227140750302662}
  62. sdata = payload.dumps(idata.copy())
  63. odata = payload.loads(sdata)
  64. idata['jid'] = '{0}'.format(idata['jid'])
  65. self.assertEqual(idata, odata)
  66. def test_immutable_dict_dump_load(self):
  67. '''
  68. Test immutable dict encoder/decoder
  69. '''
  70. payload = salt.payload.Serial('msgpack')
  71. idata = {'dict': {'key': 'value'}}
  72. sdata = payload.dumps({'dict': immutabletypes.ImmutableDict(idata['dict'])})
  73. odata = payload.loads(sdata)
  74. self.assertEqual(idata, odata)
  75. def test_immutable_list_dump_load(self):
  76. '''
  77. Test immutable list encoder/decoder
  78. '''
  79. payload = salt.payload.Serial('msgpack')
  80. idata = {'list': [1, 2, 3]}
  81. sdata = payload.dumps({'list': immutabletypes.ImmutableList(idata['list'])})
  82. odata = payload.loads(sdata)
  83. self.assertEqual(idata, odata)
  84. def test_immutable_set_dump_load(self):
  85. '''
  86. Test immutable set encoder/decoder
  87. '''
  88. payload = salt.payload.Serial('msgpack')
  89. idata = {'set': ['red', 'green', 'blue']}
  90. sdata = payload.dumps({'set': immutabletypes.ImmutableSet(idata['set'])})
  91. odata = payload.loads(sdata)
  92. self.assertEqual(idata, odata)
  93. def test_odict_dump_load(self):
  94. '''
  95. Test odict just works. It wasn't until msgpack 0.2.0
  96. '''
  97. payload = salt.payload.Serial('msgpack')
  98. data = OrderedDict()
  99. data['a'] = 'b'
  100. data['y'] = 'z'
  101. data['j'] = 'k'
  102. data['w'] = 'x'
  103. sdata = payload.dumps({'set': data})
  104. odata = payload.loads(sdata)
  105. self.assertEqual({'set': dict(data)}, odata)
  106. def test_mixed_dump_load(self):
  107. '''
  108. Test we can handle all exceptions at once
  109. '''
  110. payload = salt.payload.Serial('msgpack')
  111. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  112. od = OrderedDict()
  113. od['a'] = 'b'
  114. od['y'] = 'z'
  115. od['j'] = 'k'
  116. od['w'] = 'x'
  117. idata = {dtvalue: dtvalue, # datetime
  118. 'jid': 20180227140750302662, # long int
  119. 'dict': immutabletypes.ImmutableDict({'key': 'value'}), # immutable dict
  120. 'list': immutabletypes.ImmutableList([1, 2, 3]), # immutable list
  121. 'set': immutabletypes.ImmutableSet(('red', 'green', 'blue')), # immutable set
  122. 'odict': od, # odict
  123. }
  124. edata = {dtvalue: dtvalue, # datetime, == input
  125. 'jid': '20180227140750302662', # string repr of long int
  126. 'dict': {'key': 'value'}, # builtin dict
  127. 'list': [1, 2, 3], # builtin list
  128. 'set': ['red', 'green', 'blue'], # builtin set
  129. 'odict': dict(od), # builtin dict
  130. }
  131. sdata = payload.dumps(idata)
  132. odata = payload.loads(sdata)
  133. self.assertEqual(edata, odata)
  134. class SREQTestCase(TestCase):
  135. port = 8845 # TODO: dynamically assign a port?
  136. @classmethod
  137. def setUpClass(cls):
  138. '''
  139. Class to set up zmq echo socket
  140. '''
  141. def echo_server():
  142. '''
  143. A server that echos the message sent to it over zmq
  144. Optional "sleep" can be sent to delay response
  145. '''
  146. context = zmq.Context()
  147. socket = context.socket(zmq.REP)
  148. socket.bind("tcp://*:{0}".format(SREQTestCase.port))
  149. payload = salt.payload.Serial('msgpack')
  150. while SREQTestCase.thread_running.is_set():
  151. try:
  152. # Wait for next request from client
  153. message = socket.recv(zmq.NOBLOCK)
  154. msg_deserialized = payload.loads(message)
  155. log.info('Echo server received message: %s', msg_deserialized)
  156. if isinstance(msg_deserialized['load'], dict) and msg_deserialized['load'].get('sleep'):
  157. log.info('Test echo server sleeping for %s seconds',
  158. msg_deserialized['load']['sleep'])
  159. time.sleep(msg_deserialized['load']['sleep'])
  160. socket.send(message)
  161. except zmq.ZMQError as exc:
  162. if exc.errno == errno.EAGAIN:
  163. continue
  164. raise
  165. SREQTestCase.thread_running = threading.Event()
  166. SREQTestCase.thread_running.set()
  167. SREQTestCase.echo_server = threading.Thread(target=echo_server)
  168. SREQTestCase.echo_server.start()
  169. @classmethod
  170. def tearDownClass(cls):
  171. '''
  172. Remove echo server
  173. '''
  174. # kill the thread
  175. SREQTestCase.thread_running.clear()
  176. SREQTestCase.echo_server.join()
  177. def get_sreq(self):
  178. return salt.payload.SREQ('tcp://127.0.0.1:{0}'.format(SREQTestCase.port))
  179. def test_send_auto(self):
  180. '''
  181. Test creation, send/rect
  182. '''
  183. sreq = self.get_sreq()
  184. # check default of empty load and enc clear
  185. assert sreq.send_auto({}) == {'enc': 'clear', 'load': {}}
  186. # check that the load always gets passed
  187. assert sreq.send_auto({'load': 'foo'}) == {'load': 'foo', 'enc': 'clear'}
  188. def test_send(self):
  189. sreq = self.get_sreq()
  190. assert sreq.send('clear', 'foo') == {'enc': 'clear', 'load': 'foo'}
  191. @skipIf(True, 'Disabled until we can figure out how to make this more reliable.')
  192. def test_timeout(self):
  193. '''
  194. Test SREQ Timeouts
  195. '''
  196. sreq = self.get_sreq()
  197. # client-side timeout
  198. start = time.time()
  199. # This is a try/except instead of an assertRaises because of a possible
  200. # subtle bug in zmq wherein a timeout=0 actually exceutes a single poll
  201. # before the timeout is reached.
  202. log.info('Sending tries=0, timeout=0')
  203. try:
  204. sreq.send('clear', 'foo', tries=0, timeout=0)
  205. except salt.exceptions.SaltReqTimeoutError:
  206. pass
  207. assert time.time() - start < 1 # ensure we didn't wait
  208. # server-side timeout
  209. log.info('Sending tries=1, timeout=1')
  210. start = time.time()
  211. with self.assertRaises(salt.exceptions.SaltReqTimeoutError):
  212. sreq.send('clear', {'sleep': 2}, tries=1, timeout=1)
  213. assert time.time() - start >= 1 # ensure we actually tried once (1s)
  214. # server-side timeout with retries
  215. log.info('Sending tries=2, timeout=1')
  216. start = time.time()
  217. with self.assertRaises(salt.exceptions.SaltReqTimeoutError):
  218. sreq.send('clear', {'sleep': 2}, tries=2, timeout=1)
  219. assert time.time() - start >= 2 # ensure we actually tried twice (2s)
  220. # test a regular send afterwards (to make sure sockets aren't in a twist
  221. log.info('Sending regular send')
  222. assert sreq.send('clear', 'foo') == {'enc': 'clear', 'load': 'foo'}
  223. def test_destroy(self):
  224. '''
  225. Test the __del__ capabilities
  226. '''
  227. sreq = self.get_sreq()
  228. # ensure no exceptions when we go to destroy the sreq, since __del__
  229. # swallows exceptions, we have to call destroy directly
  230. sreq.destroy()
  231. def test_raw_vs_encoding_none(self):
  232. '''
  233. Test that we handle the new raw parameter in 5.0.2 correctly based on
  234. encoding. When encoding is None loads should return bytes
  235. '''
  236. payload = salt.payload.Serial('msgpack')
  237. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  238. idata = {dtvalue: 'strval'}
  239. sdata = payload.dumps(idata.copy())
  240. odata = payload.loads(sdata, encoding=None)
  241. assert isinstance(odata[dtvalue], six.string_types)
  242. def test_raw_vs_encoding_utf8(self):
  243. '''
  244. Test that we handle the new raw parameter in 5.0.2 correctly based on
  245. encoding. When encoding is utf-8 loads should return unicode
  246. '''
  247. payload = salt.payload.Serial('msgpack')
  248. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  249. idata = {dtvalue: 'strval'}
  250. sdata = payload.dumps(idata.copy())
  251. odata = payload.loads(sdata, encoding='utf-8')
  252. assert isinstance(odata[dtvalue], six.text_type)