mixins.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. # -*- coding: utf-8 -*-
  2. # Import Python libs
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. # Import Salt Libs
  5. import salt.transport.client
  6. # Import 3rd-party libs
  7. from salt.ext import six
  8. import salt.ext.tornado.gen
  9. def run_loop_in_thread(loop, evt):
  10. '''
  11. Run the provided loop until an event is set
  12. '''
  13. loop.make_current()
  14. @salt.ext.tornado.gen.coroutine
  15. def stopper():
  16. while True:
  17. if evt.is_set():
  18. loop.stop()
  19. break
  20. yield salt.ext.tornado.gen.sleep(.3)
  21. loop.add_callback(stopper)
  22. try:
  23. loop.start()
  24. finally:
  25. loop.close()
  26. class ReqChannelMixin(object):
  27. def test_basic(self):
  28. '''
  29. Test a variety of messages, make sure we get the expected responses
  30. '''
  31. msgs = [
  32. {'foo': 'bar'},
  33. {'bar': 'baz'},
  34. {'baz': 'qux', 'list': [1, 2, 3]},
  35. ]
  36. for msg in msgs:
  37. ret = self.channel.send(msg, timeout=2, tries=1)
  38. self.assertEqual(ret['load'], msg)
  39. def test_normalization(self):
  40. '''
  41. Since we use msgpack, we need to test that list types are converted to lists
  42. '''
  43. types = {
  44. 'list': list,
  45. }
  46. msgs = [
  47. {'list': tuple([1, 2, 3])},
  48. ]
  49. for msg in msgs:
  50. ret = self.channel.send(msg, timeout=2, tries=1)
  51. for k, v in six.iteritems(ret['load']):
  52. self.assertEqual(types[k], type(v))
  53. def test_badload(self):
  54. '''
  55. Test a variety of bad requests, make sure that we get some sort of error
  56. '''
  57. msgs = ['', [], tuple()]
  58. for msg in msgs:
  59. ret = self.channel.send(msg, timeout=2, tries=1)
  60. self.assertEqual(ret, 'payload and load must be a dict')
  61. class PubChannelMixin(object):
  62. def test_basic(self):
  63. self.pub = None
  64. def handle_pub(ret):
  65. self.pub = ret
  66. self.stop()
  67. self.pub_channel = salt.transport.client.AsyncPubChannel.factory(self.minion_opts, io_loop=self.io_loop)
  68. connect_future = self.pub_channel.connect()
  69. connect_future.add_done_callback(lambda f: self.stop())
  70. self.wait()
  71. connect_future.result()
  72. self.pub_channel.on_recv(handle_pub)
  73. load = {
  74. 'fun': 'f',
  75. 'arg': 'a',
  76. 'tgt': 't',
  77. 'jid': 'j',
  78. 'ret': 'r',
  79. 'tgt_type': 'glob',
  80. }
  81. self.server_channel.publish(load)
  82. self.wait()
  83. self.assertEqual(self.pub['load'], load)
  84. self.pub_channel.on_recv(None)
  85. self.server_channel.publish(load)
  86. with self.assertRaises(self.failureException):
  87. self.wait(timeout=0.5)
  88. # close our pub_channel, to pass our FD checks
  89. self.pub_channel.close()
  90. del self.pub_channel