mixins.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. # -*- coding: utf-8 -*-
  2. # Import Python libs
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. # Import Salt Libs
  5. import salt.transport.client
  6. # Import 3rd-party libs
  7. from salt.ext import six
  8. class ReqChannelMixin(object):
  9. def test_basic(self):
  10. '''
  11. Test a variety of messages, make sure we get the expected responses
  12. '''
  13. msgs = [
  14. {'foo': 'bar'},
  15. {'bar': 'baz'},
  16. {'baz': 'qux', 'list': [1, 2, 3]},
  17. ]
  18. for msg in msgs:
  19. ret = self.channel.send(msg, timeout=2, tries=1)
  20. self.assertEqual(ret['load'], msg)
  21. def test_normalization(self):
  22. '''
  23. Since we use msgpack, we need to test that list types are converted to lists
  24. '''
  25. types = {
  26. 'list': list,
  27. }
  28. msgs = [
  29. {'list': tuple([1, 2, 3])},
  30. ]
  31. for msg in msgs:
  32. ret = self.channel.send(msg, timeout=2, tries=1)
  33. for k, v in six.iteritems(ret['load']):
  34. self.assertEqual(types[k], type(v))
  35. def test_badload(self):
  36. '''
  37. Test a variety of bad requests, make sure that we get some sort of error
  38. '''
  39. msgs = ['', [], tuple()]
  40. for msg in msgs:
  41. ret = self.channel.send(msg, timeout=2, tries=1)
  42. self.assertEqual(ret, 'payload and load must be a dict')
  43. class PubChannelMixin(object):
  44. def test_basic(self):
  45. self.pub = None
  46. def handle_pub(ret):
  47. self.pub = ret
  48. self.stop()
  49. self.pub_channel = salt.transport.client.AsyncPubChannel.factory(self.minion_opts, io_loop=self.io_loop)
  50. connect_future = self.pub_channel.connect()
  51. connect_future.add_done_callback(lambda f: self.stop())
  52. self.wait()
  53. connect_future.result()
  54. self.pub_channel.on_recv(handle_pub)
  55. load = {
  56. 'fun': 'f',
  57. 'arg': 'a',
  58. 'tgt': 't',
  59. 'jid': 'j',
  60. 'ret': 'r',
  61. 'tgt_type': 'glob',
  62. }
  63. self.server_channel.publish(load)
  64. self.wait()
  65. self.assertEqual(self.pub['load'], load)
  66. self.pub_channel.on_recv(None)
  67. self.server_channel.publish(load)
  68. with self.assertRaises(self.failureException):
  69. self.wait(timeout=0.5)
  70. # close our pub_channel, to pass our FD checks
  71. del self.pub_channel