test_tcp.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. # -*- coding: utf-8 -*-
  2. """
  3. :codeauthor: Thomas Jackson <jacksontj.89@gmail.com>
  4. """
  5. from __future__ import absolute_import, print_function, unicode_literals
  6. import logging
  7. import socket
  8. import threading
  9. import salt.config
  10. import salt.exceptions
  11. import salt.ext.tornado.concurrent
  12. import salt.ext.tornado.gen
  13. import salt.ext.tornado.ioloop
  14. import salt.transport.client
  15. import salt.transport.server
  16. import salt.utils.platform
  17. import salt.utils.process
  18. from salt.ext import six
  19. from salt.ext.six.moves import range
  20. from salt.ext.tornado.testing import AsyncTestCase, gen_test
  21. from salt.transport.tcp import (
  22. SaltMessageClient,
  23. SaltMessageClientPool,
  24. TCPPubServerChannel,
  25. )
  26. from saltfactories.utils.ports import get_unused_localhost_port
  27. from tests.support.helpers import flaky, slowTest
  28. from tests.support.mixins import AdaptedConfigurationTestCaseMixin
  29. from tests.support.mock import MagicMock, patch
  30. from tests.support.unit import TestCase, skipIf
  31. from tests.unit.transport.mixins import (
  32. PubChannelMixin,
  33. ReqChannelMixin,
  34. run_loop_in_thread,
  35. )
  36. log = logging.getLogger(__name__)
  37. class BaseTCPReqCase(TestCase, AdaptedConfigurationTestCaseMixin):
  38. """
  39. Test the req server/client pair
  40. """
  41. @classmethod
  42. def setUpClass(cls):
  43. if not hasattr(cls, "_handle_payload"):
  44. return
  45. ret_port = get_unused_localhost_port()
  46. publish_port = get_unused_localhost_port()
  47. tcp_master_pub_port = get_unused_localhost_port()
  48. tcp_master_pull_port = get_unused_localhost_port()
  49. tcp_master_publish_pull = get_unused_localhost_port()
  50. tcp_master_workers = get_unused_localhost_port()
  51. cls.master_config = cls.get_temp_config(
  52. "master",
  53. **{
  54. "transport": "tcp",
  55. "auto_accept": True,
  56. "ret_port": ret_port,
  57. "publish_port": publish_port,
  58. "tcp_master_pub_port": tcp_master_pub_port,
  59. "tcp_master_pull_port": tcp_master_pull_port,
  60. "tcp_master_publish_pull": tcp_master_publish_pull,
  61. "tcp_master_workers": tcp_master_workers,
  62. }
  63. )
  64. cls.minion_config = cls.get_temp_config(
  65. "minion",
  66. **{
  67. "transport": "tcp",
  68. "master_ip": "127.0.0.1",
  69. "master_port": ret_port,
  70. "master_uri": "tcp://127.0.0.1:{0}".format(ret_port),
  71. }
  72. )
  73. cls.process_manager = salt.utils.process.ProcessManager(
  74. name="ReqServer_ProcessManager"
  75. )
  76. cls.server_channel = salt.transport.server.ReqServerChannel.factory(
  77. cls.master_config
  78. )
  79. cls.server_channel.pre_fork(cls.process_manager)
  80. cls.io_loop = salt.ext.tornado.ioloop.IOLoop()
  81. cls.stop = threading.Event()
  82. cls.server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
  83. cls.server_thread = threading.Thread(
  84. target=run_loop_in_thread, args=(cls.io_loop, cls.stop,),
  85. )
  86. cls.server_thread.start()
  87. @classmethod
  88. def tearDownClass(cls):
  89. cls.server_channel.close()
  90. cls.stop.set()
  91. cls.server_thread.join()
  92. cls.process_manager.kill_children()
  93. del cls.server_channel
  94. @classmethod
  95. @salt.ext.tornado.gen.coroutine
  96. def _handle_payload(cls, payload):
  97. """
  98. TODO: something besides echo
  99. """
  100. raise salt.ext.tornado.gen.Return((payload, {"fun": "send_clear"}))
  101. @skipIf(salt.utils.platform.is_darwin(), "hanging test suite on MacOS")
  102. class ClearReqTestCases(BaseTCPReqCase, ReqChannelMixin):
  103. """
  104. Test all of the clear msg stuff
  105. """
  106. def setUp(self):
  107. self.channel = salt.transport.client.ReqChannel.factory(
  108. self.minion_config, crypt="clear"
  109. )
  110. def tearDown(self):
  111. self.channel.close()
  112. del self.channel
  113. @classmethod
  114. @salt.ext.tornado.gen.coroutine
  115. def _handle_payload(cls, payload):
  116. """
  117. TODO: something besides echo
  118. """
  119. raise salt.ext.tornado.gen.Return((payload, {"fun": "send_clear"}))
  120. @skipIf(salt.utils.platform.is_darwin(), "hanging test suite on MacOS")
  121. class AESReqTestCases(BaseTCPReqCase, ReqChannelMixin):
  122. def setUp(self):
  123. self.channel = salt.transport.client.ReqChannel.factory(self.minion_config)
  124. def tearDown(self):
  125. self.channel.close()
  126. del self.channel
  127. @classmethod
  128. @salt.ext.tornado.gen.coroutine
  129. def _handle_payload(cls, payload):
  130. """
  131. TODO: something besides echo
  132. """
  133. raise salt.ext.tornado.gen.Return((payload, {"fun": "send"}))
  134. # TODO: make failed returns have a specific framing so we can raise the same exception
  135. # on encrypted channels
  136. @flaky
  137. @slowTest
  138. def test_badload(self):
  139. """
  140. Test a variety of bad requests, make sure that we get some sort of error
  141. """
  142. msgs = ["", [], tuple()]
  143. for msg in msgs:
  144. with self.assertRaises(salt.exceptions.AuthenticationError):
  145. ret = self.channel.send(msg)
  146. class BaseTCPPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
  147. """
  148. Test the req server/client pair
  149. """
  150. @classmethod
  151. def setUpClass(cls):
  152. ret_port = get_unused_localhost_port()
  153. publish_port = get_unused_localhost_port()
  154. tcp_master_pub_port = get_unused_localhost_port()
  155. tcp_master_pull_port = get_unused_localhost_port()
  156. tcp_master_publish_pull = get_unused_localhost_port()
  157. tcp_master_workers = get_unused_localhost_port()
  158. cls.master_config = cls.get_temp_config(
  159. "master",
  160. **{
  161. "transport": "tcp",
  162. "auto_accept": True,
  163. "ret_port": ret_port,
  164. "publish_port": publish_port,
  165. "tcp_master_pub_port": tcp_master_pub_port,
  166. "tcp_master_pull_port": tcp_master_pull_port,
  167. "tcp_master_publish_pull": tcp_master_publish_pull,
  168. "tcp_master_workers": tcp_master_workers,
  169. }
  170. )
  171. cls.minion_config = cls.get_temp_config(
  172. "minion",
  173. **{
  174. "transport": "tcp",
  175. "master_ip": "127.0.0.1",
  176. "auth_timeout": 1,
  177. "master_port": ret_port,
  178. "master_uri": "tcp://127.0.0.1:{0}".format(ret_port),
  179. }
  180. )
  181. cls.process_manager = salt.utils.process.ProcessManager(
  182. name="ReqServer_ProcessManager"
  183. )
  184. cls.server_channel = salt.transport.server.PubServerChannel.factory(
  185. cls.master_config
  186. )
  187. cls.server_channel.pre_fork(cls.process_manager)
  188. # we also require req server for auth
  189. cls.req_server_channel = salt.transport.server.ReqServerChannel.factory(
  190. cls.master_config
  191. )
  192. cls.req_server_channel.pre_fork(cls.process_manager)
  193. cls.io_loop = salt.ext.tornado.ioloop.IOLoop()
  194. cls.stop = threading.Event()
  195. cls.req_server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
  196. cls.server_thread = threading.Thread(
  197. target=run_loop_in_thread, args=(cls.io_loop, cls.stop,),
  198. )
  199. cls.server_thread.start()
  200. @classmethod
  201. def _handle_payload(cls, payload):
  202. """
  203. TODO: something besides echo
  204. """
  205. return payload, {"fun": "send_clear"}
  206. @classmethod
  207. def tearDownClass(cls):
  208. cls.req_server_channel.close()
  209. cls.server_channel.close()
  210. cls.stop.set()
  211. cls.server_thread.join()
  212. cls.process_manager.kill_children()
  213. del cls.req_server_channel
  214. def setUp(self):
  215. super(BaseTCPPubCase, self).setUp()
  216. self._start_handlers = dict(self.io_loop._handlers)
  217. def tearDown(self):
  218. super(BaseTCPPubCase, self).tearDown()
  219. failures = []
  220. for k, v in six.iteritems(self.io_loop._handlers):
  221. if self._start_handlers.get(k) != v:
  222. failures.append((k, v))
  223. if failures:
  224. raise Exception("FDs still attached to the IOLoop: {0}".format(failures))
  225. del self.channel
  226. del self._start_handlers
  227. class AsyncTCPPubChannelTest(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
  228. @slowTest
  229. def test_connect_publish_port(self):
  230. """
  231. test when publish_port is not 4506
  232. """
  233. opts = self.get_temp_config("master")
  234. opts["master_uri"] = ""
  235. opts["master_ip"] = "127.0.0.1"
  236. opts["publish_port"] = 1234
  237. channel = salt.transport.tcp.AsyncTCPPubChannel(opts)
  238. patch_auth = MagicMock(return_value=True)
  239. patch_client = MagicMock(spec=SaltMessageClientPool)
  240. with patch("salt.crypt.AsyncAuth.gen_token", patch_auth), patch(
  241. "salt.crypt.AsyncAuth.authenticated", patch_auth
  242. ), patch("salt.transport.tcp.SaltMessageClientPool", patch_client):
  243. channel.connect()
  244. assert patch_client.call_args[0][0]["publish_port"] == opts["publish_port"]
  245. @skipIf(True, "Skip until we can devote time to fix this test")
  246. class AsyncPubChannelTest(BaseTCPPubCase, PubChannelMixin):
  247. """
  248. Tests around the publish system
  249. """
  250. class SaltMessageClientPoolTest(AsyncTestCase):
  251. def setUp(self):
  252. super(SaltMessageClientPoolTest, self).setUp()
  253. sock_pool_size = 5
  254. with patch(
  255. "salt.transport.tcp.SaltMessageClient.__init__",
  256. MagicMock(return_value=None),
  257. ):
  258. self.message_client_pool = SaltMessageClientPool(
  259. {"sock_pool_size": sock_pool_size}, args=({}, "", 0)
  260. )
  261. self.original_message_clients = self.message_client_pool.message_clients
  262. self.message_client_pool.message_clients = [
  263. MagicMock() for _ in range(sock_pool_size)
  264. ]
  265. def tearDown(self):
  266. with patch(
  267. "salt.transport.tcp.SaltMessageClient.close", MagicMock(return_value=None)
  268. ):
  269. del self.original_message_clients
  270. super(SaltMessageClientPoolTest, self).tearDown()
  271. def test_send(self):
  272. for message_client_mock in self.message_client_pool.message_clients:
  273. message_client_mock.send_queue = [0, 0, 0]
  274. message_client_mock.send.return_value = []
  275. self.assertEqual([], self.message_client_pool.send())
  276. self.message_client_pool.message_clients[2].send_queue = [0]
  277. self.message_client_pool.message_clients[2].send.return_value = [1]
  278. self.assertEqual([1], self.message_client_pool.send())
  279. def test_write_to_stream(self):
  280. for message_client_mock in self.message_client_pool.message_clients:
  281. message_client_mock.send_queue = [0, 0, 0]
  282. message_client_mock._stream.write.return_value = []
  283. self.assertEqual([], self.message_client_pool.write_to_stream(""))
  284. self.message_client_pool.message_clients[2].send_queue = [0]
  285. self.message_client_pool.message_clients[2]._stream.write.return_value = [1]
  286. self.assertEqual([1], self.message_client_pool.write_to_stream(""))
  287. def test_close(self):
  288. self.message_client_pool.close()
  289. self.assertEqual([], self.message_client_pool.message_clients)
  290. def test_on_recv(self):
  291. for message_client_mock in self.message_client_pool.message_clients:
  292. message_client_mock.on_recv.return_value = None
  293. self.message_client_pool.on_recv()
  294. for message_client_mock in self.message_client_pool.message_clients:
  295. self.assertTrue(message_client_mock.on_recv.called)
  296. def test_connect_all(self):
  297. @gen_test
  298. def test_connect(self):
  299. yield self.message_client_pool.connect()
  300. for message_client_mock in self.message_client_pool.message_clients:
  301. future = salt.ext.tornado.concurrent.Future()
  302. future.set_result("foo")
  303. message_client_mock.connect.return_value = future
  304. self.assertIsNone(test_connect(self))
  305. def test_connect_partial(self):
  306. @gen_test(timeout=0.1)
  307. def test_connect(self):
  308. yield self.message_client_pool.connect()
  309. for idx, message_client_mock in enumerate(
  310. self.message_client_pool.message_clients
  311. ):
  312. future = salt.ext.tornado.concurrent.Future()
  313. if idx % 2 == 0:
  314. future.set_result("foo")
  315. message_client_mock.connect.return_value = future
  316. with self.assertRaises(salt.ext.tornado.ioloop.TimeoutError):
  317. test_connect(self)
  318. class SaltMessageClientCleanupTest(TestCase, AdaptedConfigurationTestCaseMixin):
  319. def setUp(self):
  320. self.listen_on = "127.0.0.1"
  321. self.port = get_unused_localhost_port()
  322. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  323. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  324. self.sock.bind((self.listen_on, self.port))
  325. self.sock.listen(1)
  326. def tearDown(self):
  327. self.sock.close()
  328. del self.sock
  329. def test_message_client(self):
  330. """
  331. test message client cleanup on close
  332. """
  333. orig_loop = salt.ext.tornado.ioloop.IOLoop()
  334. orig_loop.make_current()
  335. opts = self.get_temp_config("master")
  336. client = SaltMessageClient(opts, self.listen_on, self.port)
  337. # Mock the io_loop's stop method so we know when it has been called.
  338. orig_loop.real_stop = orig_loop.stop
  339. orig_loop.stop_called = False
  340. def stop(*args, **kwargs):
  341. orig_loop.stop_called = True
  342. orig_loop.real_stop()
  343. orig_loop.stop = stop
  344. try:
  345. assert client.io_loop == orig_loop
  346. client.io_loop.run_sync(client.connect)
  347. # Ensure we are testing the _read_until_future and io_loop teardown
  348. assert client._stream is not None
  349. assert client._read_until_future is not None
  350. assert orig_loop.stop_called is True
  351. # The run_sync call will set stop_called, reset it
  352. orig_loop.stop_called = False
  353. client.close()
  354. # Stop should be called again, client's io_loop should be None
  355. assert orig_loop.stop_called is True
  356. assert client.io_loop is None
  357. finally:
  358. orig_loop.stop = orig_loop.real_stop
  359. del orig_loop.real_stop
  360. del orig_loop.stop_called
  361. class TCPPubServerChannelTest(TestCase, AdaptedConfigurationTestCaseMixin):
  362. @patch("salt.master.SMaster.secrets")
  363. @patch("salt.crypt.Crypticle")
  364. @patch("salt.utils.asynchronous.SyncWrapper")
  365. def test_publish_filtering(self, sync_wrapper, crypticle, secrets):
  366. opts = self.get_temp_config("master")
  367. opts["sign_pub_messages"] = False
  368. channel = TCPPubServerChannel(opts)
  369. wrap = MagicMock()
  370. crypt = MagicMock()
  371. crypt.dumps.return_value = {"test": "value"}
  372. secrets.return_value = {"aes": {"secret": None}}
  373. crypticle.return_value = crypt
  374. sync_wrapper.return_value = wrap
  375. # try simple publish with glob tgt_type
  376. channel.publish({"test": "value", "tgt_type": "glob", "tgt": "*"})
  377. payload = wrap.send.call_args[0][0]
  378. # verify we send it without any specific topic
  379. assert "topic_lst" not in payload
  380. # try simple publish with list tgt_type
  381. channel.publish({"test": "value", "tgt_type": "list", "tgt": ["minion01"]})
  382. payload = wrap.send.call_args[0][0]
  383. # verify we send it with correct topic
  384. assert "topic_lst" in payload
  385. self.assertEqual(payload["topic_lst"], ["minion01"])
  386. # try with syndic settings
  387. opts["order_masters"] = True
  388. channel.publish({"test": "value", "tgt_type": "list", "tgt": ["minion01"]})
  389. payload = wrap.send.call_args[0][0]
  390. # verify we send it without topic for syndics
  391. assert "topic_lst" not in payload
  392. @patch("salt.utils.minions.CkMinions.check_minions")
  393. @patch("salt.master.SMaster.secrets")
  394. @patch("salt.crypt.Crypticle")
  395. @patch("salt.utils.asynchronous.SyncWrapper")
  396. def test_publish_filtering_str_list(
  397. self, sync_wrapper, crypticle, secrets, check_minions
  398. ):
  399. opts = self.get_temp_config("master")
  400. opts["sign_pub_messages"] = False
  401. channel = TCPPubServerChannel(opts)
  402. wrap = MagicMock()
  403. crypt = MagicMock()
  404. crypt.dumps.return_value = {"test": "value"}
  405. secrets.return_value = {"aes": {"secret": None}}
  406. crypticle.return_value = crypt
  407. sync_wrapper.return_value = wrap
  408. check_minions.return_value = {"minions": ["minion02"]}
  409. # try simple publish with list tgt_type
  410. channel.publish({"test": "value", "tgt_type": "list", "tgt": "minion02"})
  411. payload = wrap.send.call_args[0][0]
  412. # verify we send it with correct topic
  413. assert "topic_lst" in payload
  414. self.assertEqual(payload["topic_lst"], ["minion02"])
  415. # verify it was correctly calling check_minions
  416. check_minions.assert_called_with("minion02", tgt_type="list")