123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- # -*- coding: utf-8 -*-
- # Import Python libs
- from __future__ import absolute_import, print_function, unicode_literals
- import salt.ext.tornado.gen
- # Import Salt Libs
- import salt.transport.client
- # Import 3rd-party libs
- from salt.ext import six
- def run_loop_in_thread(loop, evt):
- """
- Run the provided loop until an event is set
- """
- loop.make_current()
- @salt.ext.tornado.gen.coroutine
- def stopper():
- while True:
- if evt.is_set():
- loop.stop()
- break
- yield salt.ext.tornado.gen.sleep(0.3)
- loop.add_callback(stopper)
- try:
- loop.start()
- finally:
- loop.close()
- class ReqChannelMixin(object):
- def test_basic(self):
- """
- Test a variety of messages, make sure we get the expected responses
- """
- msgs = [
- {"foo": "bar"},
- {"bar": "baz"},
- {"baz": "qux", "list": [1, 2, 3]},
- ]
- for msg in msgs:
- ret = self.channel.send(msg, timeout=2, tries=1)
- self.assertEqual(ret["load"], msg)
- def test_normalization(self):
- """
- Since we use msgpack, we need to test that list types are converted to lists
- """
- types = {
- "list": list,
- }
- msgs = [
- {"list": tuple([1, 2, 3])},
- ]
- for msg in msgs:
- ret = self.channel.send(msg, timeout=2, tries=1)
- for k, v in six.iteritems(ret["load"]):
- self.assertEqual(types[k], type(v))
- def test_badload(self):
- """
- Test a variety of bad requests, make sure that we get some sort of error
- """
- msgs = ["", [], tuple()]
- for msg in msgs:
- ret = self.channel.send(msg, timeout=2, tries=1)
- self.assertEqual(ret, "payload and load must be a dict")
- class PubChannelMixin(object):
- def test_basic(self):
- self.pub = None
- def handle_pub(ret):
- self.pub = ret
- self.stop()
- self.pub_channel = salt.transport.client.AsyncPubChannel.factory(
- self.minion_opts, io_loop=self.io_loop
- )
- connect_future = self.pub_channel.connect()
- connect_future.add_done_callback(lambda f: self.stop())
- self.wait()
- connect_future.result()
- self.pub_channel.on_recv(handle_pub)
- load = {
- "fun": "f",
- "arg": "a",
- "tgt": "t",
- "jid": "j",
- "ret": "r",
- "tgt_type": "glob",
- }
- self.server_channel.publish(load)
- self.wait()
- self.assertEqual(self.pub["load"], load)
- self.pub_channel.on_recv(None)
- self.server_channel.publish(load)
- with self.assertRaises(self.failureException):
- self.wait(timeout=0.5)
- # close our pub_channel, to pass our FD checks
- self.pub_channel.close()
- del self.pub_channel
|