mixins.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # -*- coding: utf-8 -*-
  2. # Import Python libs
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. import salt.ext.tornado.gen
  5. # Import Salt Libs
  6. import salt.transport.client
  7. # Import 3rd-party libs
  8. from salt.ext import six
  9. def run_loop_in_thread(loop, evt):
  10. """
  11. Run the provided loop until an event is set
  12. """
  13. loop.make_current()
  14. @salt.ext.tornado.gen.coroutine
  15. def stopper():
  16. while True:
  17. if evt.is_set():
  18. loop.stop()
  19. break
  20. yield salt.ext.tornado.gen.sleep(0.3)
  21. loop.add_callback(stopper)
  22. try:
  23. loop.start()
  24. finally:
  25. loop.close()
  26. class ReqChannelMixin(object):
  27. def test_basic(self):
  28. """
  29. Test a variety of messages, make sure we get the expected responses
  30. """
  31. msgs = [
  32. {"foo": "bar"},
  33. {"bar": "baz"},
  34. {"baz": "qux", "list": [1, 2, 3]},
  35. ]
  36. for msg in msgs:
  37. ret = self.channel.send(msg, timeout=2, tries=1)
  38. self.assertEqual(ret["load"], msg)
  39. def test_normalization(self):
  40. """
  41. Since we use msgpack, we need to test that list types are converted to lists
  42. """
  43. types = {
  44. "list": list,
  45. }
  46. msgs = [
  47. {"list": tuple([1, 2, 3])},
  48. ]
  49. for msg in msgs:
  50. ret = self.channel.send(msg, timeout=2, tries=1)
  51. for k, v in six.iteritems(ret["load"]):
  52. self.assertEqual(types[k], type(v))
  53. def test_badload(self):
  54. """
  55. Test a variety of bad requests, make sure that we get some sort of error
  56. """
  57. msgs = ["", [], tuple()]
  58. for msg in msgs:
  59. ret = self.channel.send(msg, timeout=2, tries=1)
  60. self.assertEqual(ret, "payload and load must be a dict")
  61. class PubChannelMixin(object):
  62. def test_basic(self):
  63. self.pub = None
  64. def handle_pub(ret):
  65. self.pub = ret
  66. self.stop()
  67. self.pub_channel = salt.transport.client.AsyncPubChannel.factory(
  68. self.minion_opts, io_loop=self.io_loop
  69. )
  70. connect_future = self.pub_channel.connect()
  71. connect_future.add_done_callback(lambda f: self.stop())
  72. self.wait()
  73. connect_future.result()
  74. self.pub_channel.on_recv(handle_pub)
  75. load = {
  76. "fun": "f",
  77. "arg": "a",
  78. "tgt": "t",
  79. "jid": "j",
  80. "ret": "r",
  81. "tgt_type": "glob",
  82. }
  83. self.server_channel.publish(load)
  84. self.wait()
  85. self.assertEqual(self.pub["load"], load)
  86. self.pub_channel.on_recv(None)
  87. self.server_channel.publish(load)
  88. with self.assertRaises(self.failureException):
  89. self.wait(timeout=0.5)
  90. # close our pub_channel, to pass our FD checks
  91. self.pub_channel.close()
  92. del self.pub_channel