test_zeromq.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781
  1. # -*- coding: utf-8 -*-
  2. """
  3. :codeauthor: Thomas Jackson <jacksontj.89@gmail.com>
  4. """
  5. from __future__ import absolute_import, print_function, unicode_literals
  6. import ctypes
  7. import multiprocessing
  8. import os
  9. import threading
  10. import time
  11. from concurrent.futures.thread import ThreadPoolExecutor
  12. import salt.config
  13. import salt.exceptions
  14. import salt.ext.tornado.gen
  15. import salt.ext.tornado.ioloop
  16. import salt.log.setup
  17. import salt.transport.client
  18. import salt.transport.server
  19. import salt.utils.platform
  20. import salt.utils.process
  21. import zmq.eventloop.ioloop
  22. from salt.ext import six
  23. from salt.ext.six.moves import range
  24. from salt.ext.tornado.testing import AsyncTestCase
  25. from salt.transport.zeromq import AsyncReqMessageClientPool
  26. from saltfactories.utils.ports import get_unused_localhost_port
  27. from tests.support.helpers import flaky, not_runs_on, slowTest
  28. from tests.support.mixins import AdaptedConfigurationTestCaseMixin
  29. from tests.support.mock import MagicMock, patch
  30. from tests.support.runtests import RUNTIME_VARS
  31. from tests.support.unit import TestCase, skipIf
  32. from tests.unit.transport.mixins import (
  33. PubChannelMixin,
  34. ReqChannelMixin,
  35. run_loop_in_thread,
  36. )
  37. # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
  38. if not hasattr(zmq.eventloop.ioloop, "ZMQIOLoop"):
  39. zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
  40. class BaseZMQReqCase(TestCase, AdaptedConfigurationTestCaseMixin):
  41. """
  42. Test the req server/client pair
  43. """
  44. @classmethod
  45. def setUpClass(cls):
  46. if not hasattr(cls, "_handle_payload"):
  47. return
  48. ret_port = get_unused_localhost_port()
  49. publish_port = get_unused_localhost_port()
  50. tcp_master_pub_port = get_unused_localhost_port()
  51. tcp_master_pull_port = get_unused_localhost_port()
  52. tcp_master_publish_pull = get_unused_localhost_port()
  53. tcp_master_workers = get_unused_localhost_port()
  54. cls.master_config = cls.get_temp_config(
  55. "master",
  56. **{
  57. "transport": "zeromq",
  58. "auto_accept": True,
  59. "ret_port": ret_port,
  60. "publish_port": publish_port,
  61. "tcp_master_pub_port": tcp_master_pub_port,
  62. "tcp_master_pull_port": tcp_master_pull_port,
  63. "tcp_master_publish_pull": tcp_master_publish_pull,
  64. "tcp_master_workers": tcp_master_workers,
  65. }
  66. )
  67. cls.minion_config = cls.get_temp_config(
  68. "minion",
  69. **{
  70. "transport": "zeromq",
  71. "master_ip": "127.0.0.1",
  72. "master_port": ret_port,
  73. "auth_timeout": 5,
  74. "auth_tries": 1,
  75. "master_uri": "tcp://127.0.0.1:{0}".format(ret_port),
  76. }
  77. )
  78. cls.process_manager = salt.utils.process.ProcessManager(
  79. name="ReqServer_ProcessManager"
  80. )
  81. cls.server_channel = salt.transport.server.ReqServerChannel.factory(
  82. cls.master_config
  83. )
  84. cls.server_channel.pre_fork(cls.process_manager)
  85. cls.io_loop = salt.ext.tornado.ioloop.IOLoop()
  86. cls.evt = threading.Event()
  87. cls.server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
  88. cls.server_thread = threading.Thread(
  89. target=run_loop_in_thread, args=(cls.io_loop, cls.evt)
  90. )
  91. cls.server_thread.start()
  92. @classmethod
  93. def tearDownClass(cls):
  94. if not hasattr(cls, "_handle_payload"):
  95. return
  96. # Attempting to kill the children hangs the test suite.
  97. # Let the test suite handle this instead.
  98. cls.process_manager.stop_restarting()
  99. cls.process_manager.kill_children()
  100. cls.evt.set()
  101. cls.server_thread.join()
  102. time.sleep(
  103. 2
  104. ) # Give the procs a chance to fully close before we stop the io_loop
  105. cls.server_channel.close()
  106. del cls.server_channel
  107. del cls.io_loop
  108. del cls.process_manager
  109. del cls.server_thread
  110. del cls.master_config
  111. del cls.minion_config
  112. @classmethod
  113. def _handle_payload(cls, payload):
  114. """
  115. TODO: something besides echo
  116. """
  117. return payload, {"fun": "send_clear"}
  118. class ClearReqTestCases(BaseZMQReqCase, ReqChannelMixin):
  119. """
  120. Test all of the clear msg stuff
  121. """
  122. def setUp(self):
  123. self.channel = salt.transport.client.ReqChannel.factory(
  124. self.minion_config, crypt="clear"
  125. )
  126. def tearDown(self):
  127. self.channel.close()
  128. del self.channel
  129. @classmethod
  130. @salt.ext.tornado.gen.coroutine
  131. def _handle_payload(cls, payload):
  132. """
  133. TODO: something besides echo
  134. """
  135. raise salt.ext.tornado.gen.Return((payload, {"fun": "send_clear"}))
  136. @slowTest
  137. def test_master_uri_override(self):
  138. """
  139. ensure master_uri kwarg is respected
  140. """
  141. # minion_config should be 127.0.0.1, we want a different uri that still connects
  142. uri = "tcp://{master_ip}:{master_port}".format(
  143. master_ip="localhost", master_port=self.minion_config["master_port"]
  144. )
  145. channel = salt.transport.client.ReqChannel.factory(
  146. self.minion_config, master_uri=uri
  147. )
  148. self.assertIn("localhost", channel.master_uri)
  149. del channel
  150. @flaky
  151. @not_runs_on(
  152. kernel="linux",
  153. os_familiy="Suse",
  154. reason="Skipping until https://github.com/saltstack/salt/issues/32902 gets fixed",
  155. )
  156. class AESReqTestCases(BaseZMQReqCase, ReqChannelMixin):
  157. def setUp(self):
  158. self.channel = salt.transport.client.ReqChannel.factory(self.minion_config)
  159. def tearDown(self):
  160. self.channel.close()
  161. del self.channel
  162. @classmethod
  163. @salt.ext.tornado.gen.coroutine
  164. def _handle_payload(cls, payload):
  165. """
  166. TODO: something besides echo
  167. """
  168. raise salt.ext.tornado.gen.Return((payload, {"fun": "send"}))
  169. # TODO: make failed returns have a specific framing so we can raise the same exception
  170. # on encrypted channels
  171. #
  172. #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  173. #
  174. # WARNING: This test will fail randomly on any system with > 1 CPU core!!!
  175. #
  176. #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  177. @slowTest
  178. def test_badload(self):
  179. """
  180. Test a variety of bad requests, make sure that we get some sort of error
  181. """
  182. # TODO: This test should be re-enabled when Jenkins moves to C7.
  183. # Once the version of salt-testing is increased to something newer than the September
  184. # release of salt-testing, the @flaky decorator should be applied to this test.
  185. msgs = ["", [], tuple()]
  186. for msg in msgs:
  187. with self.assertRaises(salt.exceptions.AuthenticationError):
  188. ret = self.channel.send(msg, timeout=5)
  189. class BaseZMQPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
  190. """
  191. Test the req server/client pair
  192. """
  193. @classmethod
  194. def setUpClass(cls):
  195. ret_port = get_unused_localhost_port()
  196. publish_port = get_unused_localhost_port()
  197. tcp_master_pub_port = get_unused_localhost_port()
  198. tcp_master_pull_port = get_unused_localhost_port()
  199. tcp_master_publish_pull = get_unused_localhost_port()
  200. tcp_master_workers = get_unused_localhost_port()
  201. cls.master_config = cls.get_temp_config(
  202. "master",
  203. **{
  204. "transport": "zeromq",
  205. "auto_accept": True,
  206. "ret_port": ret_port,
  207. "publish_port": publish_port,
  208. "tcp_master_pub_port": tcp_master_pub_port,
  209. "tcp_master_pull_port": tcp_master_pull_port,
  210. "tcp_master_publish_pull": tcp_master_publish_pull,
  211. "tcp_master_workers": tcp_master_workers,
  212. }
  213. )
  214. cls.minion_config = salt.config.minion_config(
  215. os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "minion")
  216. )
  217. cls.minion_config = cls.get_temp_config(
  218. "minion",
  219. **{
  220. "transport": "zeromq",
  221. "master_ip": "127.0.0.1",
  222. "master_port": ret_port,
  223. "master_uri": "tcp://127.0.0.1:{0}".format(ret_port),
  224. }
  225. )
  226. cls.process_manager = salt.utils.process.ProcessManager(
  227. name="ReqServer_ProcessManager"
  228. )
  229. cls.server_channel = salt.transport.server.PubServerChannel.factory(
  230. cls.master_config
  231. )
  232. cls.server_channel.pre_fork(cls.process_manager)
  233. # we also require req server for auth
  234. cls.req_server_channel = salt.transport.server.ReqServerChannel.factory(
  235. cls.master_config
  236. )
  237. cls.req_server_channel.pre_fork(cls.process_manager)
  238. cls._server_io_loop = salt.ext.tornado.ioloop.IOLoop()
  239. cls.evt = threading.Event()
  240. cls.req_server_channel.post_fork(
  241. cls._handle_payload, io_loop=cls._server_io_loop
  242. )
  243. cls.server_thread = threading.Thread(
  244. target=run_loop_in_thread, args=(cls._server_io_loop, cls.evt)
  245. )
  246. cls.server_thread.start()
  247. @classmethod
  248. def tearDownClass(cls):
  249. cls.process_manager.kill_children()
  250. cls.process_manager.stop_restarting()
  251. time.sleep(
  252. 2
  253. ) # Give the procs a chance to fully close before we stop the io_loop
  254. cls.evt.set()
  255. cls.server_thread.join()
  256. cls.req_server_channel.close()
  257. cls.server_channel.close()
  258. cls._server_io_loop.stop()
  259. del cls.server_channel
  260. del cls._server_io_loop
  261. del cls.process_manager
  262. del cls.server_thread
  263. del cls.master_config
  264. del cls.minion_config
  265. @classmethod
  266. def _handle_payload(cls, payload):
  267. """
  268. TODO: something besides echo
  269. """
  270. return payload, {"fun": "send_clear"}
  271. def setUp(self):
  272. super(BaseZMQPubCase, self).setUp()
  273. self._start_handlers = dict(self.io_loop._handlers)
  274. def tearDown(self):
  275. super(BaseZMQPubCase, self).tearDown()
  276. failures = []
  277. for k, v in six.iteritems(self.io_loop._handlers):
  278. if self._start_handlers.get(k) != v:
  279. failures.append((k, v))
  280. del self._start_handlers
  281. if len(failures) > 0:
  282. raise Exception("FDs still attached to the IOLoop: {0}".format(failures))
  283. @skipIf(True, "Skip until we can devote time to fix this test")
  284. class AsyncPubChannelTest(BaseZMQPubCase, PubChannelMixin):
  285. """
  286. Tests around the publish system
  287. """
  288. def get_new_ioloop(self):
  289. return salt.ext.tornado.ioloop.IOLoop()
  290. class AsyncReqMessageClientPoolTest(TestCase):
  291. def setUp(self):
  292. super(AsyncReqMessageClientPoolTest, self).setUp()
  293. sock_pool_size = 5
  294. with patch(
  295. "salt.transport.zeromq.AsyncReqMessageClient.__init__",
  296. MagicMock(return_value=None),
  297. ):
  298. self.message_client_pool = AsyncReqMessageClientPool(
  299. {"sock_pool_size": sock_pool_size}, args=({}, "")
  300. )
  301. self.original_message_clients = self.message_client_pool.message_clients
  302. self.message_client_pool.message_clients = [
  303. MagicMock() for _ in range(sock_pool_size)
  304. ]
  305. def tearDown(self):
  306. del self.original_message_clients
  307. super(AsyncReqMessageClientPoolTest, self).tearDown()
  308. def test_send(self):
  309. for message_client_mock in self.message_client_pool.message_clients:
  310. message_client_mock.send_queue = [0, 0, 0]
  311. message_client_mock.send.return_value = []
  312. self.assertEqual([], self.message_client_pool.send())
  313. self.message_client_pool.message_clients[2].send_queue = [0]
  314. self.message_client_pool.message_clients[2].send.return_value = [1]
  315. self.assertEqual([1], self.message_client_pool.send())
  316. class ZMQConfigTest(TestCase):
  317. def test_master_uri(self):
  318. """
  319. test _get_master_uri method
  320. """
  321. m_ip = "127.0.0.1"
  322. m_port = 4505
  323. s_ip = "111.1.0.1"
  324. s_port = 4058
  325. m_ip6 = "1234:5678::9abc"
  326. s_ip6 = "1234:5678::1:9abc"
  327. with patch("salt.transport.zeromq.LIBZMQ_VERSION_INFO", (4, 1, 6)), patch(
  328. "salt.transport.zeromq.ZMQ_VERSION_INFO", (16, 0, 1)
  329. ):
  330. # pass in both source_ip and source_port
  331. assert salt.transport.zeromq._get_master_uri(
  332. master_ip=m_ip, master_port=m_port, source_ip=s_ip, source_port=s_port
  333. ) == "tcp://{0}:{1};{2}:{3}".format(s_ip, s_port, m_ip, m_port)
  334. assert salt.transport.zeromq._get_master_uri(
  335. master_ip=m_ip6, master_port=m_port, source_ip=s_ip6, source_port=s_port
  336. ) == "tcp://[{0}]:{1};[{2}]:{3}".format(s_ip6, s_port, m_ip6, m_port)
  337. # source ip and source_port empty
  338. assert salt.transport.zeromq._get_master_uri(
  339. master_ip=m_ip, master_port=m_port
  340. ) == "tcp://{0}:{1}".format(m_ip, m_port)
  341. assert salt.transport.zeromq._get_master_uri(
  342. master_ip=m_ip6, master_port=m_port
  343. ) == "tcp://[{0}]:{1}".format(m_ip6, m_port)
  344. # pass in only source_ip
  345. assert salt.transport.zeromq._get_master_uri(
  346. master_ip=m_ip, master_port=m_port, source_ip=s_ip
  347. ) == "tcp://{0}:0;{1}:{2}".format(s_ip, m_ip, m_port)
  348. assert salt.transport.zeromq._get_master_uri(
  349. master_ip=m_ip6, master_port=m_port, source_ip=s_ip6
  350. ) == "tcp://[{0}]:0;[{1}]:{2}".format(s_ip6, m_ip6, m_port)
  351. # pass in only source_port
  352. assert salt.transport.zeromq._get_master_uri(
  353. master_ip=m_ip, master_port=m_port, source_port=s_port
  354. ) == "tcp://0.0.0.0:{0};{1}:{2}".format(s_port, m_ip, m_port)
  355. class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
  356. @classmethod
  357. def setUpClass(cls):
  358. ret_port = get_unused_localhost_port()
  359. publish_port = get_unused_localhost_port()
  360. tcp_master_pub_port = get_unused_localhost_port()
  361. tcp_master_pull_port = get_unused_localhost_port()
  362. tcp_master_publish_pull = get_unused_localhost_port()
  363. tcp_master_workers = get_unused_localhost_port()
  364. cls.master_config = cls.get_temp_config(
  365. "master",
  366. **{
  367. "transport": "zeromq",
  368. "auto_accept": True,
  369. "ret_port": ret_port,
  370. "publish_port": publish_port,
  371. "tcp_master_pub_port": tcp_master_pub_port,
  372. "tcp_master_pull_port": tcp_master_pull_port,
  373. "tcp_master_publish_pull": tcp_master_publish_pull,
  374. "tcp_master_workers": tcp_master_workers,
  375. "sign_pub_messages": False,
  376. }
  377. )
  378. salt.master.SMaster.secrets["aes"] = {
  379. "secret": multiprocessing.Array(
  380. ctypes.c_char, six.b(salt.crypt.Crypticle.generate_key_string()),
  381. ),
  382. }
  383. cls.minion_config = cls.get_temp_config(
  384. "minion",
  385. **{
  386. "transport": "zeromq",
  387. "master_ip": "127.0.0.1",
  388. "master_port": ret_port,
  389. "auth_timeout": 5,
  390. "auth_tries": 1,
  391. "master_uri": "tcp://127.0.0.1:{0}".format(ret_port),
  392. }
  393. )
  394. @classmethod
  395. def tearDownClass(cls):
  396. del cls.minion_config
  397. del cls.master_config
  398. def setUp(self):
  399. # Start the event loop, even though we don't directly use this with
  400. # ZeroMQPubServerChannel, having it running seems to increase the
  401. # likely hood of dropped messages.
  402. self.io_loop = salt.ext.tornado.ioloop.IOLoop()
  403. self.io_loop.make_current()
  404. self.io_loop_thread = threading.Thread(target=self.io_loop.start)
  405. self.io_loop_thread.start()
  406. self.process_manager = salt.utils.process.ProcessManager(
  407. name="PubServer_ProcessManager"
  408. )
  409. def tearDown(self):
  410. self.io_loop.add_callback(self.io_loop.stop)
  411. self.io_loop_thread.join()
  412. self.process_manager.stop_restarting()
  413. self.process_manager.kill_children()
  414. del self.io_loop
  415. del self.io_loop_thread
  416. del self.process_manager
  417. @staticmethod
  418. def _gather_results(opts, pub_uri, results, timeout=120, messages=None):
  419. """
  420. Gather results until then number of seconds specified by timeout passes
  421. without reveiving a message
  422. """
  423. ctx = zmq.Context()
  424. sock = ctx.socket(zmq.SUB)
  425. sock.setsockopt(zmq.LINGER, -1)
  426. sock.setsockopt(zmq.SUBSCRIBE, b"")
  427. sock.connect(pub_uri)
  428. last_msg = time.time()
  429. serial = salt.payload.Serial(opts)
  430. crypticle = salt.crypt.Crypticle(
  431. opts, salt.master.SMaster.secrets["aes"]["secret"].value
  432. )
  433. while time.time() - last_msg < timeout:
  434. try:
  435. payload = sock.recv(zmq.NOBLOCK)
  436. except zmq.ZMQError:
  437. time.sleep(0.01)
  438. else:
  439. if messages:
  440. if messages != 1:
  441. messages -= 1
  442. continue
  443. payload = crypticle.loads(serial.loads(payload)["load"])
  444. if "stop" in payload:
  445. break
  446. last_msg = time.time()
  447. results.append(payload["jid"])
  448. @skipIf(salt.utils.platform.is_windows(), "Skip on Windows OS")
  449. @slowTest
  450. def test_publish_to_pubserv_ipc(self):
  451. """
  452. Test sending 10K messags to ZeroMQPubServerChannel using IPC transport
  453. ZMQ's ipc transport not supported on Windows
  454. """
  455. opts = dict(self.master_config, ipc_mode="ipc", pub_hwm=0)
  456. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  457. server_channel.pre_fork(
  458. self.process_manager,
  459. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  460. )
  461. pub_uri = "tcp://{interface}:{publish_port}".format(**server_channel.opts)
  462. send_num = 10000
  463. expect = []
  464. results = []
  465. gather = threading.Thread(
  466. target=self._gather_results, args=(self.minion_config, pub_uri, results,)
  467. )
  468. gather.start()
  469. # Allow time for server channel to start, especially on windows
  470. time.sleep(2)
  471. for i in range(send_num):
  472. expect.append(i)
  473. load = {"tgt_type": "glob", "tgt": "*", "jid": i}
  474. server_channel.publish(load)
  475. server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
  476. gather.join()
  477. server_channel.pub_close()
  478. assert len(results) == send_num, (len(results), set(expect).difference(results))
  479. @slowTest
  480. def test_zeromq_publish_port(self):
  481. """
  482. test when connecting that we
  483. use the publish_port set in opts
  484. when its not 4506
  485. """
  486. opts = dict(
  487. self.master_config,
  488. ipc_mode="ipc",
  489. pub_hwm=0,
  490. recon_randomize=False,
  491. publish_port=455505,
  492. recon_default=1,
  493. recon_max=2,
  494. master_ip="127.0.0.1",
  495. acceptance_wait_time=5,
  496. acceptance_wait_time_max=5,
  497. )
  498. opts["master_uri"] = "tcp://{interface}:{publish_port}".format(**opts)
  499. channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  500. patch_socket = MagicMock(return_value=True)
  501. patch_auth = MagicMock(return_value=True)
  502. with patch.object(channel, "_socket", patch_socket), patch.object(
  503. channel, "auth", patch_auth
  504. ):
  505. channel.connect()
  506. assert str(opts["publish_port"]) in patch_socket.mock_calls[0][1][0]
  507. def test_zeromq_zeromq_filtering_decode_message_no_match(self):
  508. """
  509. test AsyncZeroMQPubChannel _decode_messages when
  510. zmq_filtering enabled and minion does not match
  511. """
  512. message = [
  513. b"4f26aeafdb2367620a393c973eddbe8f8b846eb",
  514. b"\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf"
  515. b"\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2"
  516. b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
  517. b"\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d"
  518. b"\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>"
  519. b"\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D",
  520. ]
  521. opts = dict(
  522. self.master_config,
  523. ipc_mode="ipc",
  524. pub_hwm=0,
  525. zmq_filtering=True,
  526. recon_randomize=False,
  527. recon_default=1,
  528. recon_max=2,
  529. master_ip="127.0.0.1",
  530. acceptance_wait_time=5,
  531. acceptance_wait_time_max=5,
  532. )
  533. opts["master_uri"] = "tcp://{interface}:{publish_port}".format(**opts)
  534. server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  535. with patch(
  536. "salt.crypt.AsyncAuth.crypticle",
  537. MagicMock(return_value={"tgt_type": "glob", "tgt": "*", "jid": 1}),
  538. ) as mock_test:
  539. res = server_channel._decode_messages(message)
  540. assert res.result() is None
  541. def test_zeromq_zeromq_filtering_decode_message(self):
  542. """
  543. test AsyncZeroMQPubChannel _decode_messages
  544. when zmq_filtered enabled
  545. """
  546. message = [
  547. b"4f26aeafdb2367620a393c973eddbe8f8b846ebd",
  548. b"\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf"
  549. b"\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2"
  550. b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
  551. b"\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d"
  552. b"\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>"
  553. b"\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D",
  554. ]
  555. opts = dict(
  556. self.master_config,
  557. ipc_mode="ipc",
  558. pub_hwm=0,
  559. zmq_filtering=True,
  560. recon_randomize=False,
  561. recon_default=1,
  562. recon_max=2,
  563. master_ip="127.0.0.1",
  564. acceptance_wait_time=5,
  565. acceptance_wait_time_max=5,
  566. )
  567. opts["master_uri"] = "tcp://{interface}:{publish_port}".format(**opts)
  568. server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  569. with patch(
  570. "salt.crypt.AsyncAuth.crypticle",
  571. MagicMock(return_value={"tgt_type": "glob", "tgt": "*", "jid": 1}),
  572. ) as mock_test:
  573. res = server_channel._decode_messages(message)
  574. assert res.result()["enc"] == "aes"
  575. @skipIf(salt.utils.platform.is_windows(), "Skip on Windows OS")
  576. @slowTest
  577. def test_zeromq_filtering(self):
  578. """
  579. Test sending messags to publisher using UDP
  580. with zeromq_filtering enabled
  581. """
  582. opts = dict(
  583. self.master_config,
  584. ipc_mode="ipc",
  585. pub_hwm=0,
  586. zmq_filtering=True,
  587. acceptance_wait_time=5,
  588. )
  589. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  590. server_channel.pre_fork(
  591. self.process_manager,
  592. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  593. )
  594. pub_uri = "tcp://{interface}:{publish_port}".format(**server_channel.opts)
  595. send_num = 1
  596. expect = []
  597. results = []
  598. gather = threading.Thread(
  599. target=self._gather_results,
  600. args=(self.minion_config, pub_uri, results,),
  601. kwargs={"messages": 2},
  602. )
  603. gather.start()
  604. # Allow time for server channel to start, especially on windows
  605. time.sleep(2)
  606. expect.append(send_num)
  607. load = {"tgt_type": "glob", "tgt": "*", "jid": send_num}
  608. with patch(
  609. "salt.utils.minions.CkMinions.check_minions",
  610. MagicMock(
  611. return_value={
  612. "minions": ["minion"],
  613. "missing": [],
  614. "ssh_minions": False,
  615. }
  616. ),
  617. ):
  618. server_channel.publish(load)
  619. server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
  620. gather.join()
  621. server_channel.pub_close()
  622. assert len(results) == send_num, (len(results), set(expect).difference(results))
  623. @slowTest
  624. def test_publish_to_pubserv_tcp(self):
  625. """
  626. Test sending 10K messags to ZeroMQPubServerChannel using TCP transport
  627. """
  628. opts = dict(self.master_config, ipc_mode="tcp", pub_hwm=0)
  629. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  630. server_channel.pre_fork(
  631. self.process_manager,
  632. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  633. )
  634. pub_uri = "tcp://{interface}:{publish_port}".format(**server_channel.opts)
  635. send_num = 10000
  636. expect = []
  637. results = []
  638. gather = threading.Thread(
  639. target=self._gather_results, args=(self.minion_config, pub_uri, results,)
  640. )
  641. gather.start()
  642. # Allow time for server channel to start, especially on windows
  643. time.sleep(2)
  644. for i in range(send_num):
  645. expect.append(i)
  646. load = {"tgt_type": "glob", "tgt": "*", "jid": i}
  647. server_channel.publish(load)
  648. gather.join()
  649. server_channel.pub_close()
  650. assert len(results) == send_num, (len(results), set(expect).difference(results))
  651. @staticmethod
  652. def _send_small(opts, sid, num=10):
  653. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  654. for i in range(num):
  655. load = {"tgt_type": "glob", "tgt": "*", "jid": "{}-{}".format(sid, i)}
  656. server_channel.publish(load)
  657. server_channel.close()
  658. @staticmethod
  659. def _send_large(opts, sid, num=10, size=250000 * 3):
  660. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  661. for i in range(num):
  662. load = {
  663. "tgt_type": "glob",
  664. "tgt": "*",
  665. "jid": "{}-{}".format(sid, i),
  666. "xdata": "0" * size,
  667. }
  668. server_channel.publish(load)
  669. server_channel.close()
  670. @slowTest
  671. def test_issue_36469_tcp(self):
  672. """
  673. Test sending both large and small messags to publisher using TCP
  674. https://github.com/saltstack/salt/issues/36469
  675. """
  676. opts = dict(self.master_config, ipc_mode="tcp", pub_hwm=0)
  677. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  678. server_channel.pre_fork(
  679. self.process_manager,
  680. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  681. )
  682. send_num = 10 * 4
  683. expect = []
  684. results = []
  685. pub_uri = "tcp://{interface}:{publish_port}".format(**opts)
  686. # Allow time for server channel to start, especially on windows
  687. time.sleep(2)
  688. gather = threading.Thread(
  689. target=self._gather_results, args=(self.minion_config, pub_uri, results,)
  690. )
  691. gather.start()
  692. with ThreadPoolExecutor(max_workers=4) as executor:
  693. executor.submit(self._send_small, opts, 1)
  694. executor.submit(self._send_small, opts, 2)
  695. executor.submit(self._send_small, opts, 3)
  696. executor.submit(self._send_large, opts, 4)
  697. expect = ["{}-{}".format(a, b) for a in range(10) for b in (1, 2, 3, 4)]
  698. time.sleep(0.1)
  699. server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
  700. gather.join()
  701. server_channel.pub_close()
  702. assert len(results) == send_num, (len(results), set(expect).difference(results))