test_zeromq.py 28 KB


  1. """
  2. :codeauthor: Thomas Jackson <jacksontj.89@gmail.com>
  3. """
  4. import ctypes
  5. import multiprocessing
  6. import os
  7. import threading
  8. import time
  9. from concurrent.futures.thread import ThreadPoolExecutor
  10. import salt.config
  11. import salt.exceptions
  12. import salt.ext.tornado.gen
  13. import salt.ext.tornado.ioloop
  14. import salt.log.setup
  15. import salt.transport.client
  16. import salt.transport.server
  17. import salt.utils.platform
  18. import salt.utils.process
  19. import zmq.eventloop.ioloop
  20. from salt.ext import six
  21. from salt.ext.six.moves import range
  22. from salt.ext.tornado.testing import AsyncTestCase
  23. from salt.transport.zeromq import AsyncReqMessageClientPool
  24. from saltfactories.utils.ports import get_unused_localhost_port
  25. from tests.support.helpers import flaky, not_runs_on, slowTest
  26. from tests.support.mixins import AdaptedConfigurationTestCaseMixin
  27. from tests.support.mock import MagicMock, patch
  28. from tests.support.runtests import RUNTIME_VARS
  29. from tests.support.unit import TestCase, skipIf
  30. from tests.unit.transport.mixins import (
  31. PubChannelMixin,
  32. ReqChannelMixin,
  33. run_loop_in_thread,
  34. )
  35. # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
  36. if not hasattr(zmq.eventloop.ioloop, "ZMQIOLoop"):
  37. zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
  38. class BaseZMQReqCase(TestCase, AdaptedConfigurationTestCaseMixin):
  39. """
  40. Test the req server/client pair
  41. """
  42. @classmethod
  43. def setUpClass(cls):
  44. if not hasattr(cls, "_handle_payload"):
  45. return
  46. ret_port = get_unused_localhost_port()
  47. publish_port = get_unused_localhost_port()
  48. tcp_master_pub_port = get_unused_localhost_port()
  49. tcp_master_pull_port = get_unused_localhost_port()
  50. tcp_master_publish_pull = get_unused_localhost_port()
  51. tcp_master_workers = get_unused_localhost_port()
  52. cls.master_config = cls.get_temp_config(
  53. "master",
  54. **{
  55. "transport": "zeromq",
  56. "auto_accept": True,
  57. "ret_port": ret_port,
  58. "publish_port": publish_port,
  59. "tcp_master_pub_port": tcp_master_pub_port,
  60. "tcp_master_pull_port": tcp_master_pull_port,
  61. "tcp_master_publish_pull": tcp_master_publish_pull,
  62. "tcp_master_workers": tcp_master_workers,
  63. }
  64. )
  65. cls.minion_config = cls.get_temp_config(
  66. "minion",
  67. **{
  68. "transport": "zeromq",
  69. "master_ip": "127.0.0.1",
  70. "master_port": ret_port,
  71. "auth_timeout": 5,
  72. "auth_tries": 1,
  73. "master_uri": "tcp://127.0.0.1:{}".format(ret_port),
  74. }
  75. )
  76. cls.process_manager = salt.utils.process.ProcessManager(
  77. name="ReqServer_ProcessManager"
  78. )
  79. cls.server_channel = salt.transport.server.ReqServerChannel.factory(
  80. cls.master_config
  81. )
  82. cls.server_channel.pre_fork(cls.process_manager)
  83. cls.io_loop = salt.ext.tornado.ioloop.IOLoop()
  84. cls.evt = threading.Event()
  85. cls.server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
  86. cls.server_thread = threading.Thread(
  87. target=run_loop_in_thread, args=(cls.io_loop, cls.evt)
  88. )
  89. cls.server_thread.start()
  90. @classmethod
  91. def tearDownClass(cls):
  92. if not hasattr(cls, "_handle_payload"):
  93. return
  94. # Attempting to kill the children hangs the test suite.
  95. # Let the test suite handle this instead.
  96. cls.process_manager.stop_restarting()
  97. cls.process_manager.kill_children()
  98. cls.evt.set()
  99. cls.server_thread.join()
  100. time.sleep(
  101. 2
  102. ) # Give the procs a chance to fully close before we stop the io_loop
  103. cls.server_channel.close()
  104. del cls.server_channel
  105. del cls.io_loop
  106. del cls.process_manager
  107. del cls.server_thread
  108. del cls.master_config
  109. del cls.minion_config
  110. @classmethod
  111. def _handle_payload(cls, payload):
  112. """
  113. TODO: something besides echo
  114. """
  115. return payload, {"fun": "send_clear"}
  116. class ClearReqTestCases(BaseZMQReqCase, ReqChannelMixin):
  117. """
  118. Test all of the clear msg stuff
  119. """
  120. def setUp(self):
  121. self.channel = salt.transport.client.ReqChannel.factory(
  122. self.minion_config, crypt="clear"
  123. )
  124. def tearDown(self):
  125. self.channel.close()
  126. del self.channel
  127. @classmethod
  128. @salt.ext.tornado.gen.coroutine
  129. def _handle_payload(cls, payload):
  130. """
  131. TODO: something besides echo
  132. """
  133. raise salt.ext.tornado.gen.Return((payload, {"fun": "send_clear"}))
  134. @slowTest
  135. def test_master_uri_override(self):
  136. """
  137. ensure master_uri kwarg is respected
  138. """
  139. # minion_config should be 127.0.0.1, we want a different uri that still connects
  140. uri = "tcp://{master_ip}:{master_port}".format(
  141. master_ip="localhost", master_port=self.minion_config["master_port"]
  142. )
  143. channel = salt.transport.client.ReqChannel.factory(
  144. self.minion_config, master_uri=uri
  145. )
  146. self.assertIn("localhost", channel.master_uri)
  147. del channel
  148. @flaky
  149. @not_runs_on(
  150. kernel="linux",
  151. os_familiy="Suse",
  152. reason="Skipping until https://github.com/saltstack/salt/issues/32902 gets fixed",
  153. )
  154. class AESReqTestCases(BaseZMQReqCase, ReqChannelMixin):
  155. def setUp(self):
  156. self.channel = salt.transport.client.ReqChannel.factory(self.minion_config)
  157. def tearDown(self):
  158. self.channel.close()
  159. del self.channel
  160. @classmethod
  161. @salt.ext.tornado.gen.coroutine
  162. def _handle_payload(cls, payload):
  163. """
  164. TODO: something besides echo
  165. """
  166. raise salt.ext.tornado.gen.Return((payload, {"fun": "send"}))
  167. # TODO: make failed returns have a specific framing so we can raise the same exception
  168. # on encrypted channels
  169. #
  170. #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  171. #
  172. # WARNING: This test will fail randomly on any system with > 1 CPU core!!!
  173. #
  174. #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  175. @slowTest
  176. def test_badload(self):
  177. """
  178. Test a variety of bad requests, make sure that we get some sort of error
  179. """
  180. # TODO: This test should be re-enabled when Jenkins moves to C7.
  181. # Once the version of salt-testing is increased to something newer than the September
  182. # release of salt-testing, the @flaky decorator should be applied to this test.
  183. msgs = ["", [], tuple()]
  184. for msg in msgs:
  185. with self.assertRaises(salt.exceptions.AuthenticationError):
  186. ret = self.channel.send(msg, timeout=5)
  187. class BaseZMQPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
  188. """
  189. Test the req server/client pair
  190. """
  191. @classmethod
  192. def setUpClass(cls):
  193. ret_port = get_unused_localhost_port()
  194. publish_port = get_unused_localhost_port()
  195. tcp_master_pub_port = get_unused_localhost_port()
  196. tcp_master_pull_port = get_unused_localhost_port()
  197. tcp_master_publish_pull = get_unused_localhost_port()
  198. tcp_master_workers = get_unused_localhost_port()
  199. cls.master_config = cls.get_temp_config(
  200. "master",
  201. **{
  202. "transport": "zeromq",
  203. "auto_accept": True,
  204. "ret_port": ret_port,
  205. "publish_port": publish_port,
  206. "tcp_master_pub_port": tcp_master_pub_port,
  207. "tcp_master_pull_port": tcp_master_pull_port,
  208. "tcp_master_publish_pull": tcp_master_publish_pull,
  209. "tcp_master_workers": tcp_master_workers,
  210. }
  211. )
  212. cls.minion_config = salt.config.minion_config(
  213. os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "minion")
  214. )
  215. cls.minion_config = cls.get_temp_config(
  216. "minion",
  217. **{
  218. "transport": "zeromq",
  219. "master_ip": "127.0.0.1",
  220. "master_port": ret_port,
  221. "master_uri": "tcp://127.0.0.1:{}".format(ret_port),
  222. }
  223. )
  224. cls.process_manager = salt.utils.process.ProcessManager(
  225. name="ReqServer_ProcessManager"
  226. )
  227. cls.server_channel = salt.transport.server.PubServerChannel.factory(
  228. cls.master_config
  229. )
  230. cls.server_channel.pre_fork(cls.process_manager)
  231. # we also require req server for auth
  232. cls.req_server_channel = salt.transport.server.ReqServerChannel.factory(
  233. cls.master_config
  234. )
  235. cls.req_server_channel.pre_fork(cls.process_manager)
  236. cls._server_io_loop = salt.ext.tornado.ioloop.IOLoop()
  237. cls.evt = threading.Event()
  238. cls.req_server_channel.post_fork(
  239. cls._handle_payload, io_loop=cls._server_io_loop
  240. )
  241. cls.server_thread = threading.Thread(
  242. target=run_loop_in_thread, args=(cls._server_io_loop, cls.evt)
  243. )
  244. cls.server_thread.start()
  245. @classmethod
  246. def tearDownClass(cls):
  247. cls.process_manager.kill_children()
  248. cls.process_manager.stop_restarting()
  249. time.sleep(
  250. 2
  251. ) # Give the procs a chance to fully close before we stop the io_loop
  252. cls.evt.set()
  253. cls.server_thread.join()
  254. cls.req_server_channel.close()
  255. cls.server_channel.close()
  256. cls._server_io_loop.stop()
  257. del cls.server_channel
  258. del cls._server_io_loop
  259. del cls.process_manager
  260. del cls.server_thread
  261. del cls.master_config
  262. del cls.minion_config
  263. @classmethod
  264. def _handle_payload(cls, payload):
  265. """
  266. TODO: something besides echo
  267. """
  268. return payload, {"fun": "send_clear"}
  269. def setUp(self):
  270. super().setUp()
  271. self._start_handlers = dict(self.io_loop._handlers)
  272. def tearDown(self):
  273. super().tearDown()
  274. failures = []
  275. for k, v in self.io_loop._handlers.items():
  276. if self._start_handlers.get(k) != v:
  277. failures.append((k, v))
  278. del self._start_handlers
  279. if len(failures) > 0:
  280. raise Exception("FDs still attached to the IOLoop: {}".format(failures))
  281. @skipIf(True, "Skip until we can devote time to fix this test")
  282. class AsyncPubChannelTest(BaseZMQPubCase, PubChannelMixin):
  283. """
  284. Tests around the publish system
  285. """
  286. def get_new_ioloop(self):
  287. return salt.ext.tornado.ioloop.IOLoop()
  288. class AsyncReqMessageClientPoolTest(TestCase):
  289. def setUp(self):
  290. super().setUp()
  291. sock_pool_size = 5
  292. with patch(
  293. "salt.transport.zeromq.AsyncReqMessageClient.__init__",
  294. MagicMock(return_value=None),
  295. ):
  296. self.message_client_pool = AsyncReqMessageClientPool(
  297. {"sock_pool_size": sock_pool_size}, args=({}, "")
  298. )
  299. self.original_message_clients = self.message_client_pool.message_clients
  300. self.message_client_pool.message_clients = [
  301. MagicMock() for _ in range(sock_pool_size)
  302. ]
  303. def tearDown(self):
  304. del self.original_message_clients
  305. super().tearDown()
  306. def test_send(self):
  307. for message_client_mock in self.message_client_pool.message_clients:
  308. message_client_mock.send_queue = [0, 0, 0]
  309. message_client_mock.send.return_value = []
  310. self.assertEqual([], self.message_client_pool.send())
  311. self.message_client_pool.message_clients[2].send_queue = [0]
  312. self.message_client_pool.message_clients[2].send.return_value = [1]
  313. self.assertEqual([1], self.message_client_pool.send())
  314. class ZMQConfigTest(TestCase):
  315. def test_master_uri(self):
  316. """
  317. test _get_master_uri method
  318. """
  319. m_ip = "127.0.0.1"
  320. m_port = 4505
  321. s_ip = "111.1.0.1"
  322. s_port = 4058
  323. m_ip6 = "1234:5678::9abc"
  324. s_ip6 = "1234:5678::1:9abc"
  325. with patch("salt.transport.zeromq.LIBZMQ_VERSION_INFO", (4, 1, 6)), patch(
  326. "salt.transport.zeromq.ZMQ_VERSION_INFO", (16, 0, 1)
  327. ):
  328. # pass in both source_ip and source_port
  329. assert salt.transport.zeromq._get_master_uri(
  330. master_ip=m_ip, master_port=m_port, source_ip=s_ip, source_port=s_port
  331. ) == "tcp://{}:{};{}:{}".format(s_ip, s_port, m_ip, m_port)
  332. assert salt.transport.zeromq._get_master_uri(
  333. master_ip=m_ip6, master_port=m_port, source_ip=s_ip6, source_port=s_port
  334. ) == "tcp://[{}]:{};[{}]:{}".format(s_ip6, s_port, m_ip6, m_port)
  335. # source ip and source_port empty
  336. assert salt.transport.zeromq._get_master_uri(
  337. master_ip=m_ip, master_port=m_port
  338. ) == "tcp://{}:{}".format(m_ip, m_port)
  339. assert salt.transport.zeromq._get_master_uri(
  340. master_ip=m_ip6, master_port=m_port
  341. ) == "tcp://[{}]:{}".format(m_ip6, m_port)
  342. # pass in only source_ip
  343. assert salt.transport.zeromq._get_master_uri(
  344. master_ip=m_ip, master_port=m_port, source_ip=s_ip
  345. ) == "tcp://{}:0;{}:{}".format(s_ip, m_ip, m_port)
  346. assert salt.transport.zeromq._get_master_uri(
  347. master_ip=m_ip6, master_port=m_port, source_ip=s_ip6
  348. ) == "tcp://[{}]:0;[{}]:{}".format(s_ip6, m_ip6, m_port)
  349. # pass in only source_port
  350. assert salt.transport.zeromq._get_master_uri(
  351. master_ip=m_ip, master_port=m_port, source_port=s_port
  352. ) == "tcp://0.0.0.0:{};{}:{}".format(s_port, m_ip, m_port)
  353. class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
  354. @classmethod
  355. def setUpClass(cls):
  356. ret_port = get_unused_localhost_port()
  357. publish_port = get_unused_localhost_port()
  358. tcp_master_pub_port = get_unused_localhost_port()
  359. tcp_master_pull_port = get_unused_localhost_port()
  360. tcp_master_publish_pull = get_unused_localhost_port()
  361. tcp_master_workers = get_unused_localhost_port()
  362. cls.master_config = cls.get_temp_config(
  363. "master",
  364. **{
  365. "transport": "zeromq",
  366. "auto_accept": True,
  367. "ret_port": ret_port,
  368. "publish_port": publish_port,
  369. "tcp_master_pub_port": tcp_master_pub_port,
  370. "tcp_master_pull_port": tcp_master_pull_port,
  371. "tcp_master_publish_pull": tcp_master_publish_pull,
  372. "tcp_master_workers": tcp_master_workers,
  373. "sign_pub_messages": False,
  374. }
  375. )
  376. salt.master.SMaster.secrets["aes"] = {
  377. "secret": multiprocessing.Array(
  378. ctypes.c_char, six.b(salt.crypt.Crypticle.generate_key_string()),
  379. ),
  380. }
  381. cls.minion_config = cls.get_temp_config(
  382. "minion",
  383. **{
  384. "transport": "zeromq",
  385. "master_ip": "127.0.0.1",
  386. "master_port": ret_port,
  387. "auth_timeout": 5,
  388. "auth_tries": 1,
  389. "master_uri": "tcp://127.0.0.1:{}".format(ret_port),
  390. }
  391. )
  392. @classmethod
  393. def tearDownClass(cls):
  394. del cls.minion_config
  395. del cls.master_config
  396. def setUp(self):
  397. # Start the event loop, even though we don't directly use this with
  398. # ZeroMQPubServerChannel, having it running seems to increase the
  399. # likely hood of dropped messages.
  400. self.io_loop = salt.ext.tornado.ioloop.IOLoop()
  401. self.io_loop.make_current()
  402. self.io_loop_thread = threading.Thread(target=self.io_loop.start)
  403. self.io_loop_thread.start()
  404. self.process_manager = salt.utils.process.ProcessManager(
  405. name="PubServer_ProcessManager"
  406. )
  407. def tearDown(self):
  408. self.io_loop.add_callback(self.io_loop.stop)
  409. self.io_loop_thread.join()
  410. self.process_manager.stop_restarting()
  411. self.process_manager.kill_children()
  412. del self.io_loop
  413. del self.io_loop_thread
  414. del self.process_manager
  415. @staticmethod
  416. def _gather_results(opts, pub_uri, results, timeout=120, messages=None):
  417. """
  418. Gather results until then number of seconds specified by timeout passes
  419. without reveiving a message
  420. """
  421. ctx = zmq.Context()
  422. sock = ctx.socket(zmq.SUB)
  423. sock.setsockopt(zmq.LINGER, -1)
  424. sock.setsockopt(zmq.SUBSCRIBE, b"")
  425. sock.connect(pub_uri)
  426. last_msg = time.time()
  427. serial = salt.payload.Serial(opts)
  428. crypticle = salt.crypt.Crypticle(
  429. opts, salt.master.SMaster.secrets["aes"]["secret"].value
  430. )
  431. while time.time() - last_msg < timeout:
  432. try:
  433. payload = sock.recv(zmq.NOBLOCK)
  434. except zmq.ZMQError:
  435. time.sleep(0.01)
  436. else:
  437. if messages:
  438. if messages != 1:
  439. messages -= 1
  440. continue
  441. payload = crypticle.loads(serial.loads(payload)["load"])
  442. if "stop" in payload:
  443. break
  444. last_msg = time.time()
  445. results.append(payload["jid"])
  446. @skipIf(salt.utils.platform.is_windows(), "Skip on Windows OS")
  447. @slowTest
  448. def test_publish_to_pubserv_ipc(self):
  449. """
  450. Test sending 10K messags to ZeroMQPubServerChannel using IPC transport
  451. ZMQ's ipc transport not supported on Windows
  452. """
  453. opts = dict(self.master_config, ipc_mode="ipc", pub_hwm=0)
  454. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  455. server_channel.pre_fork(
  456. self.process_manager,
  457. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  458. )
  459. pub_uri = "tcp://{interface}:{publish_port}".format(**server_channel.opts)
  460. send_num = 10000
  461. expect = []
  462. results = []
  463. gather = threading.Thread(
  464. target=self._gather_results, args=(self.minion_config, pub_uri, results,)
  465. )
  466. gather.start()
  467. # Allow time for server channel to start, especially on windows
  468. time.sleep(2)
  469. for i in range(send_num):
  470. expect.append(i)
  471. load = {"tgt_type": "glob", "tgt": "*", "jid": i}
  472. server_channel.publish(load)
  473. server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
  474. gather.join()
  475. server_channel.pub_close()
  476. assert len(results) == send_num, (len(results), set(expect).difference(results))
  477. @skipIf(salt.utils.platform.is_linux(), "Skip on Linux")
  478. @slowTest
  479. def test_zeromq_publish_port(self):
  480. """
  481. test when connecting that we
  482. use the publish_port set in opts
  483. when its not 4506
  484. """
  485. opts = dict(
  486. self.master_config,
  487. ipc_mode="ipc",
  488. pub_hwm=0,
  489. recon_randomize=False,
  490. publish_port=455505,
  491. recon_default=1,
  492. recon_max=2,
  493. master_ip="127.0.0.1",
  494. acceptance_wait_time=5,
  495. acceptance_wait_time_max=5,
  496. )
  497. opts["master_uri"] = "tcp://{interface}:{publish_port}".format(**opts)
  498. channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  499. patch_socket = MagicMock(return_value=True)
  500. patch_auth = MagicMock(return_value=True)
  501. with patch.object(channel, "_socket", patch_socket), patch.object(
  502. channel, "auth", patch_auth
  503. ):
  504. channel.connect()
  505. assert str(opts["publish_port"]) in patch_socket.mock_calls[0][1][0]
  506. @skipIf(salt.utils.platform.is_linux(), "Skip on Linux")
  507. def test_zeromq_zeromq_filtering_decode_message_no_match(self):
  508. """
  509. test AsyncZeroMQPubChannel _decode_messages when
  510. zmq_filtering enabled and minion does not match
  511. """
  512. message = [
  513. b"4f26aeafdb2367620a393c973eddbe8f8b846eb",
  514. b"\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf"
  515. b"\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2"
  516. b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
  517. b"\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d"
  518. b"\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>"
  519. b"\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D",
  520. ]
  521. opts = dict(
  522. self.master_config,
  523. ipc_mode="ipc",
  524. pub_hwm=0,
  525. zmq_filtering=True,
  526. recon_randomize=False,
  527. recon_default=1,
  528. recon_max=2,
  529. master_ip="127.0.0.1",
  530. acceptance_wait_time=5,
  531. acceptance_wait_time_max=5,
  532. )
  533. opts["master_uri"] = "tcp://{interface}:{publish_port}".format(**opts)
  534. server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  535. with patch(
  536. "salt.crypt.AsyncAuth.crypticle",
  537. MagicMock(return_value={"tgt_type": "glob", "tgt": "*", "jid": 1}),
  538. ) as mock_test:
  539. res = server_channel._decode_messages(message)
  540. assert res.result() is None
  541. @skipIf(salt.utils.platform.is_linux(), "Skip on Linux")
  542. def test_zeromq_zeromq_filtering_decode_message(self):
  543. """
  544. test AsyncZeroMQPubChannel _decode_messages
  545. when zmq_filtered enabled
  546. """
  547. message = [
  548. b"4f26aeafdb2367620a393c973eddbe8f8b846ebd",
  549. b"\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf"
  550. b"\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2"
  551. b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
  552. b"\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d"
  553. b"\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>"
  554. b"\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D",
  555. ]
  556. opts = dict(
  557. self.master_config,
  558. ipc_mode="ipc",
  559. pub_hwm=0,
  560. zmq_filtering=True,
  561. recon_randomize=False,
  562. recon_default=1,
  563. recon_max=2,
  564. master_ip="127.0.0.1",
  565. acceptance_wait_time=5,
  566. acceptance_wait_time_max=5,
  567. )
  568. opts["master_uri"] = "tcp://{interface}:{publish_port}".format(**opts)
  569. server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  570. with patch(
  571. "salt.crypt.AsyncAuth.crypticle",
  572. MagicMock(return_value={"tgt_type": "glob", "tgt": "*", "jid": 1}),
  573. ) as mock_test:
  574. res = server_channel._decode_messages(message)
  575. assert res.result()["enc"] == "aes"
  576. @skipIf(salt.utils.platform.is_windows(), "Skip on Windows OS")
  577. @slowTest
  578. def test_zeromq_filtering(self):
  579. """
  580. Test sending messags to publisher using UDP
  581. with zeromq_filtering enabled
  582. """
  583. opts = dict(
  584. self.master_config,
  585. ipc_mode="ipc",
  586. pub_hwm=0,
  587. zmq_filtering=True,
  588. acceptance_wait_time=5,
  589. )
  590. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  591. server_channel.pre_fork(
  592. self.process_manager,
  593. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  594. )
  595. pub_uri = "tcp://{interface}:{publish_port}".format(**server_channel.opts)
  596. send_num = 1
  597. expect = []
  598. results = []
  599. gather = threading.Thread(
  600. target=self._gather_results,
  601. args=(self.minion_config, pub_uri, results,),
  602. kwargs={"messages": 2},
  603. )
  604. gather.start()
  605. # Allow time for server channel to start, especially on windows
  606. time.sleep(2)
  607. expect.append(send_num)
  608. load = {"tgt_type": "glob", "tgt": "*", "jid": send_num}
  609. with patch(
  610. "salt.utils.minions.CkMinions.check_minions",
  611. MagicMock(
  612. return_value={
  613. "minions": ["minion"],
  614. "missing": [],
  615. "ssh_minions": False,
  616. }
  617. ),
  618. ):
  619. server_channel.publish(load)
  620. server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
  621. gather.join()
  622. server_channel.pub_close()
  623. assert len(results) == send_num, (len(results), set(expect).difference(results))
  624. @slowTest
  625. def test_publish_to_pubserv_tcp(self):
  626. """
  627. Test sending 10K messags to ZeroMQPubServerChannel using TCP transport
  628. """
  629. opts = dict(self.master_config, ipc_mode="tcp", pub_hwm=0)
  630. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  631. server_channel.pre_fork(
  632. self.process_manager,
  633. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  634. )
  635. pub_uri = "tcp://{interface}:{publish_port}".format(**server_channel.opts)
  636. send_num = 10000
  637. expect = []
  638. results = []
  639. gather = threading.Thread(
  640. target=self._gather_results, args=(self.minion_config, pub_uri, results,)
  641. )
  642. gather.start()
  643. # Allow time for server channel to start, especially on windows
  644. time.sleep(2)
  645. for i in range(send_num):
  646. expect.append(i)
  647. load = {"tgt_type": "glob", "tgt": "*", "jid": i}
  648. server_channel.publish(load)
  649. gather.join()
  650. server_channel.pub_close()
  651. assert len(results) == send_num, (len(results), set(expect).difference(results))
  652. @staticmethod
  653. def _send_small(opts, sid, num=10):
  654. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  655. for i in range(num):
  656. load = {"tgt_type": "glob", "tgt": "*", "jid": "{}-{}".format(sid, i)}
  657. server_channel.publish(load)
  658. server_channel.pub_close()
  659. @staticmethod
  660. def _send_large(opts, sid, num=10, size=250000 * 3):
  661. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  662. for i in range(num):
  663. load = {
  664. "tgt_type": "glob",
  665. "tgt": "*",
  666. "jid": "{}-{}".format(sid, i),
  667. "xdata": "0" * size,
  668. }
  669. server_channel.publish(load)
  670. server_channel.pub_close()
  671. @skipIf(salt.utils.platform.is_freebsd(), "Skip on FreeBSD")
  672. @slowTest
  673. def test_issue_36469_tcp(self):
  674. """
  675. Test sending both large and small messags to publisher using TCP
  676. https://github.com/saltstack/salt/issues/36469
  677. """
  678. opts = dict(self.master_config, ipc_mode="tcp", pub_hwm=0)
  679. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  680. server_channel.pre_fork(
  681. self.process_manager,
  682. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  683. )
  684. send_num = 10 * 4
  685. expect = []
  686. results = []
  687. pub_uri = "tcp://{interface}:{publish_port}".format(**opts)
  688. # Allow time for server channel to start, especially on windows
  689. time.sleep(2)
  690. gather = threading.Thread(
  691. target=self._gather_results, args=(self.minion_config, pub_uri, results,)
  692. )
  693. gather.start()
  694. with ThreadPoolExecutor(max_workers=4) as executor:
  695. executor.submit(self._send_small, opts, 1)
  696. executor.submit(self._send_small, opts, 2)
  697. executor.submit(self._send_small, opts, 3)
  698. executor.submit(self._send_large, opts, 4)
  699. expect = ["{}-{}".format(a, b) for a in range(10) for b in (1, 2, 3, 4)]
  700. time.sleep(0.1)
  701. server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
  702. gather.join()
  703. server_channel.pub_close()
  704. assert len(results) == send_num, (len(results), set(expect).difference(results))