test_payload.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. # -*- coding: utf-8 -*-
  2. """
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.unit.payload_test
  5. ~~~~~~~~~~~~~~~~~~~~~~~
  6. """
  7. # Import python libs
  8. from __future__ import absolute_import, print_function, unicode_literals
  9. import datetime
  10. import errno
  11. import logging
  12. import threading
  13. import time
  14. import pytest
  15. import salt.exceptions
  16. import salt.payload
  17. # Import 3rd-party libs
  18. import zmq
  19. from salt.ext import six
  20. # Import Salt libs
  21. from salt.utils import immutabletypes
  22. from salt.utils.odict import OrderedDict
  23. # Import Salt Testing libs
  24. from tests.support.unit import TestCase, skipIf
  25. log = logging.getLogger(__name__)
  26. class PayloadTestCase(TestCase):
  27. def assertNoOrderedDict(self, data):
  28. if isinstance(data, OrderedDict):
  29. raise AssertionError("Found an ordered dictionary")
  30. if isinstance(data, dict):
  31. for value in six.itervalues(data):
  32. self.assertNoOrderedDict(value)
  33. elif isinstance(data, (list, tuple)):
  34. for chunk in data:
  35. self.assertNoOrderedDict(chunk)
  36. def test_list_nested_odicts(self):
  37. payload = salt.payload.Serial("msgpack")
  38. idata = {"pillar": [OrderedDict(environment="dev")]}
  39. odata = payload.loads(payload.dumps(idata.copy()))
  40. self.assertNoOrderedDict(odata)
  41. self.assertEqual(idata, odata)
  42. def test_datetime_dump_load(self):
  43. """
  44. Check the custom datetime handler can understand itself
  45. """
  46. payload = salt.payload.Serial("msgpack")
  47. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  48. idata = {dtvalue: dtvalue}
  49. sdata = payload.dumps(idata.copy())
  50. odata = payload.loads(sdata)
  51. self.assertEqual(
  52. sdata,
  53. b"\x81\xc7\x18N20010203T04:05:06.000007\xc7\x18N20010203T04:05:06.000007",
  54. )
  55. self.assertEqual(idata, odata)
  56. def test_verylong_dump_load(self):
  57. """
  58. Test verylong encoder/decoder
  59. """
  60. payload = salt.payload.Serial("msgpack")
  61. idata = {"jid": 20180227140750302662}
  62. sdata = payload.dumps(idata.copy())
  63. odata = payload.loads(sdata)
  64. idata["jid"] = "{0}".format(idata["jid"])
  65. self.assertEqual(idata, odata)
  66. def test_immutable_dict_dump_load(self):
  67. """
  68. Test immutable dict encoder/decoder
  69. """
  70. payload = salt.payload.Serial("msgpack")
  71. idata = {"dict": {"key": "value"}}
  72. sdata = payload.dumps({"dict": immutabletypes.ImmutableDict(idata["dict"])})
  73. odata = payload.loads(sdata)
  74. self.assertEqual(idata, odata)
  75. def test_immutable_list_dump_load(self):
  76. """
  77. Test immutable list encoder/decoder
  78. """
  79. payload = salt.payload.Serial("msgpack")
  80. idata = {"list": [1, 2, 3]}
  81. sdata = payload.dumps({"list": immutabletypes.ImmutableList(idata["list"])})
  82. odata = payload.loads(sdata)
  83. self.assertEqual(idata, odata)
  84. def test_immutable_set_dump_load(self):
  85. """
  86. Test immutable set encoder/decoder
  87. """
  88. payload = salt.payload.Serial("msgpack")
  89. idata = {"set": ["red", "green", "blue"]}
  90. sdata = payload.dumps({"set": immutabletypes.ImmutableSet(idata["set"])})
  91. odata = payload.loads(sdata)
  92. self.assertEqual(idata, odata)
  93. def test_odict_dump_load(self):
  94. """
  95. Test odict just works. It wasn't until msgpack 0.2.0
  96. """
  97. payload = salt.payload.Serial("msgpack")
  98. data = OrderedDict()
  99. data["a"] = "b"
  100. data["y"] = "z"
  101. data["j"] = "k"
  102. data["w"] = "x"
  103. sdata = payload.dumps({"set": data})
  104. odata = payload.loads(sdata)
  105. self.assertEqual({"set": dict(data)}, odata)
  106. def test_mixed_dump_load(self):
  107. """
  108. Test we can handle all exceptions at once
  109. """
  110. payload = salt.payload.Serial("msgpack")
  111. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  112. od = OrderedDict()
  113. od["a"] = "b"
  114. od["y"] = "z"
  115. od["j"] = "k"
  116. od["w"] = "x"
  117. idata = {
  118. dtvalue: dtvalue, # datetime
  119. "jid": 20180227140750302662, # long int
  120. "dict": immutabletypes.ImmutableDict({"key": "value"}), # immutable dict
  121. "list": immutabletypes.ImmutableList([1, 2, 3]), # immutable list
  122. "set": immutabletypes.ImmutableSet(
  123. ("red", "green", "blue")
  124. ), # immutable set
  125. "odict": od, # odict
  126. }
  127. edata = {
  128. dtvalue: dtvalue, # datetime, == input
  129. "jid": "20180227140750302662", # string repr of long int
  130. "dict": {"key": "value"}, # builtin dict
  131. "list": [1, 2, 3], # builtin list
  132. "set": ["red", "green", "blue"], # builtin set
  133. "odict": dict(od), # builtin dict
  134. }
  135. sdata = payload.dumps(idata)
  136. odata = payload.loads(sdata)
  137. self.assertEqual(edata, odata)
  138. def test_recursive_dump_load(self):
  139. """
  140. Test recursive payloads are (mostly) serialized
  141. """
  142. payload = salt.payload.Serial("msgpack")
  143. data = {"name": "roscivs"}
  144. data["data"] = data # Data all the things!
  145. sdata = payload.dumps(data)
  146. odata = payload.loads(sdata)
  147. self.assertTrue("recursion" in odata["data"].lower())
  148. class SREQTestCase(TestCase):
  149. port = 8845 # TODO: dynamically assign a port?
  150. @classmethod
  151. def setUpClass(cls):
  152. """
  153. Class to set up zmq echo socket
  154. """
  155. def echo_server():
  156. """
  157. A server that echos the message sent to it over zmq
  158. Optional "sleep" can be sent to delay response
  159. """
  160. context = zmq.Context()
  161. socket = context.socket(zmq.REP)
  162. socket.bind("tcp://*:{0}".format(SREQTestCase.port))
  163. payload = salt.payload.Serial("msgpack")
  164. while SREQTestCase.thread_running.is_set():
  165. try:
  166. # Wait for next request from client
  167. message = socket.recv(zmq.NOBLOCK)
  168. msg_deserialized = payload.loads(message)
  169. log.info("Echo server received message: %s", msg_deserialized)
  170. if isinstance(msg_deserialized["load"], dict) and msg_deserialized[
  171. "load"
  172. ].get("sleep"):
  173. log.info(
  174. "Test echo server sleeping for %s seconds",
  175. msg_deserialized["load"]["sleep"],
  176. )
  177. time.sleep(msg_deserialized["load"]["sleep"])
  178. socket.send(message)
  179. except zmq.ZMQError as exc:
  180. if exc.errno == errno.EAGAIN:
  181. continue
  182. raise
  183. SREQTestCase.thread_running = threading.Event()
  184. SREQTestCase.thread_running.set()
  185. SREQTestCase.echo_server = threading.Thread(target=echo_server)
  186. SREQTestCase.echo_server.start()
  187. @classmethod
  188. def tearDownClass(cls):
  189. """
  190. Remove echo server
  191. """
  192. # kill the thread
  193. SREQTestCase.thread_running.clear()
  194. SREQTestCase.echo_server.join()
  195. def get_sreq(self):
  196. return salt.payload.SREQ("tcp://127.0.0.1:{0}".format(SREQTestCase.port))
  197. @pytest.mark.slow_test(seconds=5) # Test takes >1 and <=5 seconds
  198. def test_send_auto(self):
  199. """
  200. Test creation, send/rect
  201. """
  202. sreq = self.get_sreq()
  203. # check default of empty load and enc clear
  204. assert sreq.send_auto({}) == {"enc": "clear", "load": {}}
  205. # check that the load always gets passed
  206. assert sreq.send_auto({"load": "foo"}) == {"load": "foo", "enc": "clear"}
  207. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  208. def test_send(self):
  209. sreq = self.get_sreq()
  210. assert sreq.send("clear", "foo") == {"enc": "clear", "load": "foo"}
  211. @skipIf(True, "Disabled until we can figure out how to make this more reliable.")
  212. def test_timeout(self):
  213. """
  214. Test SREQ Timeouts
  215. """
  216. sreq = self.get_sreq()
  217. # client-side timeout
  218. start = time.time()
  219. # This is a try/except instead of an assertRaises because of a possible
  220. # subtle bug in zmq wherein a timeout=0 actually exceutes a single poll
  221. # before the timeout is reached.
  222. log.info("Sending tries=0, timeout=0")
  223. try:
  224. sreq.send("clear", "foo", tries=0, timeout=0)
  225. except salt.exceptions.SaltReqTimeoutError:
  226. pass
  227. assert time.time() - start < 1 # ensure we didn't wait
  228. # server-side timeout
  229. log.info("Sending tries=1, timeout=1")
  230. start = time.time()
  231. with self.assertRaises(salt.exceptions.SaltReqTimeoutError):
  232. sreq.send("clear", {"sleep": 2}, tries=1, timeout=1)
  233. assert time.time() - start >= 1 # ensure we actually tried once (1s)
  234. # server-side timeout with retries
  235. log.info("Sending tries=2, timeout=1")
  236. start = time.time()
  237. with self.assertRaises(salt.exceptions.SaltReqTimeoutError):
  238. sreq.send("clear", {"sleep": 2}, tries=2, timeout=1)
  239. assert time.time() - start >= 2 # ensure we actually tried twice (2s)
  240. # test a regular send afterwards (to make sure sockets aren't in a twist
  241. log.info("Sending regular send")
  242. assert sreq.send("clear", "foo") == {"enc": "clear", "load": "foo"}
  243. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  244. def test_destroy(self):
  245. """
  246. Test the __del__ capabilities
  247. """
  248. sreq = self.get_sreq()
  249. # ensure no exceptions when we go to destroy the sreq, since __del__
  250. # swallows exceptions, we have to call destroy directly
  251. sreq.destroy()
  252. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  253. def test_raw_vs_encoding_none(self):
  254. """
  255. Test that we handle the new raw parameter in 5.0.2 correctly based on
  256. encoding. When encoding is None loads should return bytes
  257. """
  258. payload = salt.payload.Serial("msgpack")
  259. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  260. idata = {dtvalue: "strval"}
  261. sdata = payload.dumps(idata.copy())
  262. odata = payload.loads(sdata, encoding=None)
  263. assert isinstance(odata[dtvalue], six.string_types)
  264. @pytest.mark.slow_test(seconds=1) # Test takes >0.1 and <=1 seconds
  265. def test_raw_vs_encoding_utf8(self):
  266. """
  267. Test that we handle the new raw parameter in 5.0.2 correctly based on
  268. encoding. When encoding is utf-8 loads should return unicode
  269. """
  270. payload = salt.payload.Serial("msgpack")
  271. dtvalue = datetime.datetime(2001, 2, 3, 4, 5, 6, 7)
  272. idata = {dtvalue: "strval"}
  273. sdata = payload.dumps(idata.copy())
  274. odata = payload.loads(sdata, encoding="utf-8")
  275. assert isinstance(odata[dtvalue], six.text_type)