1
0

test_payload.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.unit.payload_test
  5. ~~~~~~~~~~~~~~~~~~~~~~~
  6. '''
  7. # Import python libs
  8. from __future__ import absolute_import, print_function, unicode_literals
  9. import time
  10. import errno
  11. import threading
  12. import datetime
  13. # Import Salt Testing libs
  14. from tests.support.unit import skipIf, TestCase
  15. from tests.support.mock import NO_MOCK, NO_MOCK_REASON
  16. # Import Salt libs
  17. from salt.utils import immutabletypes
  18. from salt.utils.odict import OrderedDict
  19. import salt.exceptions
  20. import salt.payload
  21. # Import 3rd-party libs
  22. import zmq
  23. from salt.ext import six
  24. import logging
  25. log = logging.getLogger(__name__)
  26. @skipIf(NO_MOCK, NO_MOCK_REASON)
  27. class PayloadTestCase(TestCase):
  28. def assertNoOrderedDict(self, data):
  29. if isinstance(data, OrderedDict):
  30. raise AssertionError(
  31. 'Found an ordered dictionary'
  32. )
  33. if isinstance(data, dict):
  34. for value in six.itervalues(data):
  35. self.assertNoOrderedDict(value)
  36. elif isinstance(data, (list, tuple)):
  37. for chunk in data:
  38. self.assertNoOrderedDict(chunk)
  39. def test_list_nested_odicts(self):
  40. payload = salt.payload.Serial('msgpack')
  41. idata = {'pillar': [OrderedDict(environment='dev')]}
  42. odata = payload.loads(payload.dumps(idata.copy()))
  43. self.assertNoOrderedDict(odata)
  44. self.assertEqual(idata, odata)
  45. def test_datetime_dump_load(self):
  46. '''
  47. Check the custom datetime handler can understand itself
  48. '''
  49. payload = salt.payload.Serial('msgpack')
  50. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  51. idata = {dtvalue: dtvalue}
  52. sdata = payload.dumps(idata.copy())
  53. odata = payload.loads(sdata)
  54. self.assertEqual(
  55. sdata,
  56. b'\x81\xc7\x18N20010203T04:05:06.000007\xc7\x18N20010203T04:05:06.000007')
  57. self.assertEqual(idata, odata)
  58. def test_verylong_dump_load(self):
  59. '''
  60. Test verylong encoder/decoder
  61. '''
  62. payload = salt.payload.Serial('msgpack')
  63. idata = {'jid': 20180227140750302662}
  64. sdata = payload.dumps(idata.copy())
  65. odata = payload.loads(sdata)
  66. idata['jid'] = '{0}'.format(idata['jid'])
  67. self.assertEqual(idata, odata)
  68. def test_immutable_dict_dump_load(self):
  69. '''
  70. Test immutable dict encoder/decoder
  71. '''
  72. payload = salt.payload.Serial('msgpack')
  73. idata = {'dict': {'key': 'value'}}
  74. sdata = payload.dumps({'dict': immutabletypes.ImmutableDict(idata['dict'])})
  75. odata = payload.loads(sdata)
  76. self.assertEqual(idata, odata)
  77. def test_immutable_list_dump_load(self):
  78. '''
  79. Test immutable list encoder/decoder
  80. '''
  81. payload = salt.payload.Serial('msgpack')
  82. idata = {'list': [1, 2, 3]}
  83. sdata = payload.dumps({'list': immutabletypes.ImmutableList(idata['list'])})
  84. odata = payload.loads(sdata)
  85. self.assertEqual(idata, odata)
  86. def test_immutable_set_dump_load(self):
  87. '''
  88. Test immutable set encoder/decoder
  89. '''
  90. payload = salt.payload.Serial('msgpack')
  91. idata = {'set': ['red', 'green', 'blue']}
  92. sdata = payload.dumps({'set': immutabletypes.ImmutableSet(idata['set'])})
  93. odata = payload.loads(sdata)
  94. self.assertEqual(idata, odata)
  95. def test_odict_dump_load(self):
  96. '''
  97. Test odict just works. It wasn't until msgpack 0.2.0
  98. '''
  99. payload = salt.payload.Serial('msgpack')
  100. data = OrderedDict()
  101. data['a'] = 'b'
  102. data['y'] = 'z'
  103. data['j'] = 'k'
  104. data['w'] = 'x'
  105. sdata = payload.dumps({'set': data})
  106. odata = payload.loads(sdata)
  107. self.assertEqual({'set': dict(data)}, odata)
  108. def test_mixed_dump_load(self):
  109. '''
  110. Test we can handle all exceptions at once
  111. '''
  112. payload = salt.payload.Serial('msgpack')
  113. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  114. od = OrderedDict()
  115. od['a'] = 'b'
  116. od['y'] = 'z'
  117. od['j'] = 'k'
  118. od['w'] = 'x'
  119. idata = {dtvalue: dtvalue, # datetime
  120. 'jid': 20180227140750302662, # long int
  121. 'dict': immutabletypes.ImmutableDict({'key': 'value'}), # immutable dict
  122. 'list': immutabletypes.ImmutableList([1, 2, 3]), # immutable list
  123. 'set': immutabletypes.ImmutableSet(('red', 'green', 'blue')), # immutable set
  124. 'odict': od, # odict
  125. }
  126. edata = {dtvalue: dtvalue, # datetime, == input
  127. 'jid': '20180227140750302662', # string repr of long int
  128. 'dict': {'key': 'value'}, # builtin dict
  129. 'list': [1, 2, 3], # builtin list
  130. 'set': ['red', 'green', 'blue'], # builtin set
  131. 'odict': dict(od), # builtin dict
  132. }
  133. sdata = payload.dumps(idata)
  134. odata = payload.loads(sdata)
  135. self.assertEqual(edata, odata)
  136. class SREQTestCase(TestCase):
  137. port = 8845 # TODO: dynamically assign a port?
  138. @classmethod
  139. def setUpClass(cls):
  140. '''
  141. Class to set up zmq echo socket
  142. '''
  143. def echo_server():
  144. '''
  145. A server that echos the message sent to it over zmq
  146. Optional "sleep" can be sent to delay response
  147. '''
  148. context = zmq.Context()
  149. socket = context.socket(zmq.REP)
  150. socket.bind("tcp://*:{0}".format(SREQTestCase.port))
  151. payload = salt.payload.Serial('msgpack')
  152. while SREQTestCase.thread_running.is_set():
  153. try:
  154. # Wait for next request from client
  155. message = socket.recv(zmq.NOBLOCK)
  156. msg_deserialized = payload.loads(message)
  157. log.info('Echo server received message: %s', msg_deserialized)
  158. if isinstance(msg_deserialized['load'], dict) and msg_deserialized['load'].get('sleep'):
  159. log.info('Test echo server sleeping for %s seconds',
  160. msg_deserialized['load']['sleep'])
  161. time.sleep(msg_deserialized['load']['sleep'])
  162. socket.send(message)
  163. except zmq.ZMQError as exc:
  164. if exc.errno == errno.EAGAIN:
  165. continue
  166. raise
  167. SREQTestCase.thread_running = threading.Event()
  168. SREQTestCase.thread_running.set()
  169. SREQTestCase.echo_server = threading.Thread(target=echo_server)
  170. SREQTestCase.echo_server.start()
  171. @classmethod
  172. def tearDownClass(cls):
  173. '''
  174. Remove echo server
  175. '''
  176. # kill the thread
  177. SREQTestCase.thread_running.clear()
  178. SREQTestCase.echo_server.join()
  179. def get_sreq(self):
  180. return salt.payload.SREQ('tcp://127.0.0.1:{0}'.format(SREQTestCase.port))
  181. def test_send_auto(self):
  182. '''
  183. Test creation, send/rect
  184. '''
  185. sreq = self.get_sreq()
  186. # check default of empty load and enc clear
  187. assert sreq.send_auto({}) == {'enc': 'clear', 'load': {}}
  188. # check that the load always gets passed
  189. assert sreq.send_auto({'load': 'foo'}) == {'load': 'foo', 'enc': 'clear'}
  190. def test_send(self):
  191. sreq = self.get_sreq()
  192. assert sreq.send('clear', 'foo') == {'enc': 'clear', 'load': 'foo'}
  193. @skipIf(True, 'Disabled until we can figure out how to make this more reliable.')
  194. def test_timeout(self):
  195. '''
  196. Test SREQ Timeouts
  197. '''
  198. sreq = self.get_sreq()
  199. # client-side timeout
  200. start = time.time()
  201. # This is a try/except instead of an assertRaises because of a possible
  202. # subtle bug in zmq wherein a timeout=0 actually exceutes a single poll
  203. # before the timeout is reached.
  204. log.info('Sending tries=0, timeout=0')
  205. try:
  206. sreq.send('clear', 'foo', tries=0, timeout=0)
  207. except salt.exceptions.SaltReqTimeoutError:
  208. pass
  209. assert time.time() - start < 1 # ensure we didn't wait
  210. # server-side timeout
  211. log.info('Sending tries=1, timeout=1')
  212. start = time.time()
  213. with self.assertRaises(salt.exceptions.SaltReqTimeoutError):
  214. sreq.send('clear', {'sleep': 2}, tries=1, timeout=1)
  215. assert time.time() - start >= 1 # ensure we actually tried once (1s)
  216. # server-side timeout with retries
  217. log.info('Sending tries=2, timeout=1')
  218. start = time.time()
  219. with self.assertRaises(salt.exceptions.SaltReqTimeoutError):
  220. sreq.send('clear', {'sleep': 2}, tries=2, timeout=1)
  221. assert time.time() - start >= 2 # ensure we actually tried twice (2s)
  222. # test a regular send afterwards (to make sure sockets aren't in a twist
  223. log.info('Sending regular send')
  224. assert sreq.send('clear', 'foo') == {'enc': 'clear', 'load': 'foo'}
  225. def test_destroy(self):
  226. '''
  227. Test the __del__ capabilities
  228. '''
  229. sreq = self.get_sreq()
  230. # ensure no exceptions when we go to destroy the sreq, since __del__
  231. # swallows exceptions, we have to call destroy directly
  232. sreq.destroy()