# -*- coding: utf-8 -*- # Import Python libs from __future__ import absolute_import, print_function, unicode_literals # Import Salt Libs import salt.transport.client # Import 3rd-party libs from salt.ext import six import salt.ext.tornado.gen def run_loop_in_thread(loop, evt): ''' Run the provided loop until an event is set ''' loop.make_current() @salt.ext.tornado.gen.coroutine def stopper(): while True: if evt.is_set(): loop.stop() break yield salt.ext.tornado.gen.sleep(.3) loop.add_callback(stopper) try: loop.start() finally: loop.close() class ReqChannelMixin(object): def test_basic(self): ''' Test a variety of messages, make sure we get the expected responses ''' msgs = [ {'foo': 'bar'}, {'bar': 'baz'}, {'baz': 'qux', 'list': [1, 2, 3]}, ] for msg in msgs: ret = self.channel.send(msg, timeout=2, tries=1) self.assertEqual(ret['load'], msg) def test_normalization(self): ''' Since we use msgpack, we need to test that list types are converted to lists ''' types = { 'list': list, } msgs = [ {'list': tuple([1, 2, 3])}, ] for msg in msgs: ret = self.channel.send(msg, timeout=2, tries=1) for k, v in six.iteritems(ret['load']): self.assertEqual(types[k], type(v)) def test_badload(self): ''' Test a variety of bad requests, make sure that we get some sort of error ''' msgs = ['', [], tuple()] for msg in msgs: ret = self.channel.send(msg, timeout=2, tries=1) self.assertEqual(ret, 'payload and load must be a dict') class PubChannelMixin(object): def test_basic(self): self.pub = None def handle_pub(ret): self.pub = ret self.stop() self.pub_channel = salt.transport.client.AsyncPubChannel.factory(self.minion_opts, io_loop=self.io_loop) connect_future = self.pub_channel.connect() connect_future.add_done_callback(lambda f: self.stop()) self.wait() connect_future.result() self.pub_channel.on_recv(handle_pub) load = { 'fun': 'f', 'arg': 'a', 'tgt': 't', 'jid': 'j', 'ret': 'r', 'tgt_type': 'glob', } self.server_channel.publish(load) self.wait() self.assertEqual(self.pub['load'], load) self.pub_channel.on_recv(None) self.server_channel.publish(load) with self.assertRaises(self.failureException): self.wait(timeout=0.5) # close our pub_channel, to pass our FD checks self.pub_channel.close() del self.pub_channel