123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- # -*- coding: utf-8 -*-
- # Import Python libs
- from __future__ import absolute_import, print_function, unicode_literals
- # Import Salt Libs
- import salt.transport.client
- # Import 3rd-party libs
- from salt.ext import six
- import salt.ext.tornado.gen
- def run_loop_in_thread(loop, evt):
- '''
- Run the provided loop until an event is set
- '''
- loop.make_current()
- @salt.ext.tornado.gen.coroutine
- def stopper():
- while True:
- if evt.is_set():
- loop.stop()
- break
- yield salt.ext.tornado.gen.sleep(.3)
- loop.add_callback(stopper)
- try:
- loop.start()
- finally:
- loop.close()
- class ReqChannelMixin(object):
- def test_basic(self):
- '''
- Test a variety of messages, make sure we get the expected responses
- '''
- msgs = [
- {'foo': 'bar'},
- {'bar': 'baz'},
- {'baz': 'qux', 'list': [1, 2, 3]},
- ]
- for msg in msgs:
- ret = self.channel.send(msg, timeout=2, tries=1)
- self.assertEqual(ret['load'], msg)
- def test_normalization(self):
- '''
- Since we use msgpack, we need to test that list types are converted to lists
- '''
- types = {
- 'list': list,
- }
- msgs = [
- {'list': tuple([1, 2, 3])},
- ]
- for msg in msgs:
- ret = self.channel.send(msg, timeout=2, tries=1)
- for k, v in six.iteritems(ret['load']):
- self.assertEqual(types[k], type(v))
- def test_badload(self):
- '''
- Test a variety of bad requests, make sure that we get some sort of error
- '''
- msgs = ['', [], tuple()]
- for msg in msgs:
- ret = self.channel.send(msg, timeout=2, tries=1)
- self.assertEqual(ret, 'payload and load must be a dict')
- class PubChannelMixin(object):
- def test_basic(self):
- self.pub = None
- def handle_pub(ret):
- self.pub = ret
- self.stop()
- self.pub_channel = salt.transport.client.AsyncPubChannel.factory(self.minion_opts, io_loop=self.io_loop)
- connect_future = self.pub_channel.connect()
- connect_future.add_done_callback(lambda f: self.stop())
- self.wait()
- connect_future.result()
- self.pub_channel.on_recv(handle_pub)
- load = {
- 'fun': 'f',
- 'arg': 'a',
- 'tgt': 't',
- 'jid': 'j',
- 'ret': 'r',
- 'tgt_type': 'glob',
- }
- self.server_channel.publish(load)
- self.wait()
- self.assertEqual(self.pub['load'], load)
- self.pub_channel.on_recv(None)
- self.server_channel.publish(load)
- with self.assertRaises(self.failureException):
- self.wait(timeout=0.5)
- # close our pub_channel, to pass our FD checks
- self.pub_channel.close()
- del self.pub_channel
|