test_zeromq.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809
  1. """
  2. :codeauthor: Thomas Jackson <jacksontj.89@gmail.com>
  3. """
  4. import ctypes
  5. import multiprocessing
  6. import os
  7. import threading
  8. import time
  9. from concurrent.futures.thread import ThreadPoolExecutor
  10. import salt.config
  11. import salt.exceptions
  12. import salt.ext.tornado.gen
  13. import salt.ext.tornado.ioloop
  14. import salt.log.setup
  15. import salt.transport.client
  16. import salt.transport.server
  17. import salt.utils.platform
  18. import salt.utils.process
  19. import salt.utils.stringutils
  20. import zmq.eventloop.ioloop
  21. from salt.ext.tornado.testing import AsyncTestCase
  22. from salt.transport.zeromq import AsyncReqMessageClientPool
  23. from saltfactories.utils.ports import get_unused_localhost_port
  24. from tests.support.helpers import flaky, not_runs_on, slowTest
  25. from tests.support.mixins import AdaptedConfigurationTestCaseMixin
  26. from tests.support.mock import MagicMock, call, patch
  27. from tests.support.runtests import RUNTIME_VARS
  28. from tests.support.unit import TestCase, skipIf
  29. from tests.unit.transport.mixins import (
  30. PubChannelMixin,
  31. ReqChannelMixin,
  32. run_loop_in_thread,
  33. )
  34. x = "fix pre"
  35. # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
  36. if not hasattr(zmq.eventloop.ioloop, "ZMQIOLoop"):
  37. zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
  38. class BaseZMQReqCase(TestCase, AdaptedConfigurationTestCaseMixin):
  39. """
  40. Test the req server/client pair
  41. """
  42. @classmethod
  43. def setUpClass(cls):
  44. if not hasattr(cls, "_handle_payload"):
  45. return
  46. ret_port = get_unused_localhost_port()
  47. publish_port = get_unused_localhost_port()
  48. tcp_master_pub_port = get_unused_localhost_port()
  49. tcp_master_pull_port = get_unused_localhost_port()
  50. tcp_master_publish_pull = get_unused_localhost_port()
  51. tcp_master_workers = get_unused_localhost_port()
  52. cls.master_config = cls.get_temp_config(
  53. "master",
  54. **{
  55. "transport": "zeromq",
  56. "auto_accept": True,
  57. "ret_port": ret_port,
  58. "publish_port": publish_port,
  59. "tcp_master_pub_port": tcp_master_pub_port,
  60. "tcp_master_pull_port": tcp_master_pull_port,
  61. "tcp_master_publish_pull": tcp_master_publish_pull,
  62. "tcp_master_workers": tcp_master_workers,
  63. }
  64. )
  65. cls.minion_config = cls.get_temp_config(
  66. "minion",
  67. **{
  68. "transport": "zeromq",
  69. "master_ip": "127.0.0.1",
  70. "master_port": ret_port,
  71. "auth_timeout": 5,
  72. "auth_tries": 1,
  73. "master_uri": "tcp://127.0.0.1:{}".format(ret_port),
  74. }
  75. )
  76. cls.process_manager = salt.utils.process.ProcessManager(
  77. name="ReqServer_ProcessManager"
  78. )
  79. cls.server_channel = salt.transport.server.ReqServerChannel.factory(
  80. cls.master_config
  81. )
  82. cls.server_channel.pre_fork(cls.process_manager)
  83. cls.io_loop = salt.ext.tornado.ioloop.IOLoop()
  84. cls.evt = threading.Event()
  85. cls.server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
  86. cls.server_thread = threading.Thread(
  87. target=run_loop_in_thread, args=(cls.io_loop, cls.evt)
  88. )
  89. cls.server_thread.start()
  90. @classmethod
  91. def tearDownClass(cls):
  92. if not hasattr(cls, "_handle_payload"):
  93. return
  94. # Attempting to kill the children hangs the test suite.
  95. # Let the test suite handle this instead.
  96. cls.process_manager.stop_restarting()
  97. cls.process_manager.kill_children()
  98. cls.evt.set()
  99. cls.server_thread.join()
  100. time.sleep(
  101. 2
  102. ) # Give the procs a chance to fully close before we stop the io_loop
  103. cls.server_channel.close()
  104. del cls.server_channel
  105. del cls.io_loop
  106. del cls.process_manager
  107. del cls.server_thread
  108. del cls.master_config
  109. del cls.minion_config
  110. @classmethod
  111. def _handle_payload(cls, payload):
  112. """
  113. TODO: something besides echo
  114. """
  115. return payload, {"fun": "send_clear"}
  116. class ClearReqTestCases(BaseZMQReqCase, ReqChannelMixin):
  117. """
  118. Test all of the clear msg stuff
  119. """
  120. def setUp(self):
  121. self.channel = salt.transport.client.ReqChannel.factory(
  122. self.minion_config, crypt="clear"
  123. )
  124. def tearDown(self):
  125. self.channel.close()
  126. del self.channel
  127. @classmethod
  128. @salt.ext.tornado.gen.coroutine
  129. def _handle_payload(cls, payload):
  130. """
  131. TODO: something besides echo
  132. """
  133. raise salt.ext.tornado.gen.Return((payload, {"fun": "send_clear"}))
  134. @slowTest
  135. def test_master_uri_override(self):
  136. """
  137. ensure master_uri kwarg is respected
  138. """
  139. # minion_config should be 127.0.0.1, we want a different uri that still connects
  140. uri = "tcp://{master_ip}:{master_port}".format(
  141. master_ip="localhost", master_port=self.minion_config["master_port"]
  142. )
  143. channel = salt.transport.client.ReqChannel.factory(
  144. self.minion_config, master_uri=uri
  145. )
  146. self.assertIn("localhost", channel.master_uri)
  147. del channel
  148. @flaky
  149. @not_runs_on(
  150. kernel="linux",
  151. os_familiy="Suse",
  152. reason="Skipping until https://github.com/saltstack/salt/issues/32902 gets fixed",
  153. )
  154. class AESReqTestCases(BaseZMQReqCase, ReqChannelMixin):
  155. def setUp(self):
  156. self.channel = salt.transport.client.ReqChannel.factory(self.minion_config)
  157. def tearDown(self):
  158. self.channel.close()
  159. del self.channel
  160. @classmethod
  161. @salt.ext.tornado.gen.coroutine
  162. def _handle_payload(cls, payload):
  163. """
  164. TODO: something besides echo
  165. """
  166. raise salt.ext.tornado.gen.Return((payload, {"fun": "send"}))
  167. # TODO: make failed returns have a specific framing so we can raise the same exception
  168. # on encrypted channels
  169. #
  170. #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  171. #
  172. # WARNING: This test will fail randomly on any system with > 1 CPU core!!!
  173. #
  174. #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  175. @slowTest
  176. def test_badload(self):
  177. """
  178. Test a variety of bad requests, make sure that we get some sort of error
  179. """
  180. # TODO: This test should be re-enabled when Jenkins moves to C7.
  181. # Once the version of salt-testing is increased to something newer than the September
  182. # release of salt-testing, the @flaky decorator should be applied to this test.
  183. msgs = ["", [], tuple()]
  184. for msg in msgs:
  185. with self.assertRaises(salt.exceptions.AuthenticationError):
  186. ret = self.channel.send(msg, timeout=5)
  187. class BaseZMQPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
  188. """
  189. Test the req server/client pair
  190. """
  191. @classmethod
  192. def setUpClass(cls):
  193. ret_port = get_unused_localhost_port()
  194. publish_port = get_unused_localhost_port()
  195. tcp_master_pub_port = get_unused_localhost_port()
  196. tcp_master_pull_port = get_unused_localhost_port()
  197. tcp_master_publish_pull = get_unused_localhost_port()
  198. tcp_master_workers = get_unused_localhost_port()
  199. cls.master_config = cls.get_temp_config(
  200. "master",
  201. **{
  202. "transport": "zeromq",
  203. "auto_accept": True,
  204. "ret_port": ret_port,
  205. "publish_port": publish_port,
  206. "tcp_master_pub_port": tcp_master_pub_port,
  207. "tcp_master_pull_port": tcp_master_pull_port,
  208. "tcp_master_publish_pull": tcp_master_publish_pull,
  209. "tcp_master_workers": tcp_master_workers,
  210. }
  211. )
  212. cls.minion_config = salt.config.minion_config(
  213. os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "minion")
  214. )
  215. cls.minion_config = cls.get_temp_config(
  216. "minion",
  217. **{
  218. "transport": "zeromq",
  219. "master_ip": "127.0.0.1",
  220. "master_port": ret_port,
  221. "master_uri": "tcp://127.0.0.1:{}".format(ret_port),
  222. }
  223. )
  224. cls.process_manager = salt.utils.process.ProcessManager(
  225. name="ReqServer_ProcessManager"
  226. )
  227. cls.server_channel = salt.transport.server.PubServerChannel.factory(
  228. cls.master_config
  229. )
  230. cls.server_channel.pre_fork(cls.process_manager)
  231. # we also require req server for auth
  232. cls.req_server_channel = salt.transport.server.ReqServerChannel.factory(
  233. cls.master_config
  234. )
  235. cls.req_server_channel.pre_fork(cls.process_manager)
  236. cls._server_io_loop = salt.ext.tornado.ioloop.IOLoop()
  237. cls.evt = threading.Event()
  238. cls.req_server_channel.post_fork(
  239. cls._handle_payload, io_loop=cls._server_io_loop
  240. )
  241. cls.server_thread = threading.Thread(
  242. target=run_loop_in_thread, args=(cls._server_io_loop, cls.evt)
  243. )
  244. cls.server_thread.start()
  245. @classmethod
  246. def tearDownClass(cls):
  247. cls.process_manager.kill_children()
  248. cls.process_manager.stop_restarting()
  249. time.sleep(
  250. 2
  251. ) # Give the procs a chance to fully close before we stop the io_loop
  252. cls.evt.set()
  253. cls.server_thread.join()
  254. cls.req_server_channel.close()
  255. cls.server_channel.close()
  256. cls._server_io_loop.stop()
  257. del cls.server_channel
  258. del cls._server_io_loop
  259. del cls.process_manager
  260. del cls.server_thread
  261. del cls.master_config
  262. del cls.minion_config
  263. @classmethod
  264. def _handle_payload(cls, payload):
  265. """
  266. TODO: something besides echo
  267. """
  268. return payload, {"fun": "send_clear"}
  269. def setUp(self):
  270. super().setUp()
  271. self._start_handlers = dict(self.io_loop._handlers)
  272. def tearDown(self):
  273. super().tearDown()
  274. failures = []
  275. for k, v in self.io_loop._handlers.items():
  276. if self._start_handlers.get(k) != v:
  277. failures.append((k, v))
  278. del self._start_handlers
  279. if len(failures) > 0:
  280. raise Exception("FDs still attached to the IOLoop: {}".format(failures))
  281. @skipIf(True, "Skip until we can devote time to fix this test")
  282. class AsyncPubChannelTest(BaseZMQPubCase, PubChannelMixin):
  283. """
  284. Tests around the publish system
  285. """
  286. def get_new_ioloop(self):
  287. return salt.ext.tornado.ioloop.IOLoop()
  288. class AsyncReqMessageClientPoolTest(TestCase):
  289. def setUp(self):
  290. super().setUp()
  291. sock_pool_size = 5
  292. with patch(
  293. "salt.transport.zeromq.AsyncReqMessageClient.__init__",
  294. MagicMock(return_value=None),
  295. ):
  296. self.message_client_pool = AsyncReqMessageClientPool(
  297. {"sock_pool_size": sock_pool_size}, args=({}, "")
  298. )
  299. self.original_message_clients = self.message_client_pool.message_clients
  300. self.message_client_pool.message_clients = [
  301. MagicMock() for _ in range(sock_pool_size)
  302. ]
  303. def tearDown(self):
  304. del self.original_message_clients
  305. super().tearDown()
  306. def test_send(self):
  307. for message_client_mock in self.message_client_pool.message_clients:
  308. message_client_mock.send_queue = [0, 0, 0]
  309. message_client_mock.send.return_value = []
  310. self.assertEqual([], self.message_client_pool.send())
  311. self.message_client_pool.message_clients[2].send_queue = [0]
  312. self.message_client_pool.message_clients[2].send.return_value = [1]
  313. self.assertEqual([1], self.message_client_pool.send())
  314. class ZMQConfigTest(TestCase):
  315. def test_master_uri(self):
  316. """
  317. test _get_master_uri method
  318. """
  319. m_ip = "127.0.0.1"
  320. m_port = 4505
  321. s_ip = "111.1.0.1"
  322. s_port = 4058
  323. m_ip6 = "1234:5678::9abc"
  324. s_ip6 = "1234:5678::1:9abc"
  325. with patch("salt.transport.zeromq.LIBZMQ_VERSION_INFO", (4, 1, 6)), patch(
  326. "salt.transport.zeromq.ZMQ_VERSION_INFO", (16, 0, 1)
  327. ):
  328. # pass in both source_ip and source_port
  329. assert salt.transport.zeromq._get_master_uri(
  330. master_ip=m_ip, master_port=m_port, source_ip=s_ip, source_port=s_port
  331. ) == "tcp://{}:{};{}:{}".format(s_ip, s_port, m_ip, m_port)
  332. assert salt.transport.zeromq._get_master_uri(
  333. master_ip=m_ip6, master_port=m_port, source_ip=s_ip6, source_port=s_port
  334. ) == "tcp://[{}]:{};[{}]:{}".format(s_ip6, s_port, m_ip6, m_port)
  335. # source ip and source_port empty
  336. assert salt.transport.zeromq._get_master_uri(
  337. master_ip=m_ip, master_port=m_port
  338. ) == "tcp://{}:{}".format(m_ip, m_port)
  339. assert salt.transport.zeromq._get_master_uri(
  340. master_ip=m_ip6, master_port=m_port
  341. ) == "tcp://[{}]:{}".format(m_ip6, m_port)
  342. # pass in only source_ip
  343. assert salt.transport.zeromq._get_master_uri(
  344. master_ip=m_ip, master_port=m_port, source_ip=s_ip
  345. ) == "tcp://{}:0;{}:{}".format(s_ip, m_ip, m_port)
  346. assert salt.transport.zeromq._get_master_uri(
  347. master_ip=m_ip6, master_port=m_port, source_ip=s_ip6
  348. ) == "tcp://[{}]:0;[{}]:{}".format(s_ip6, m_ip6, m_port)
  349. # pass in only source_port
  350. assert salt.transport.zeromq._get_master_uri(
  351. master_ip=m_ip, master_port=m_port, source_port=s_port
  352. ) == "tcp://0.0.0.0:{};{}:{}".format(s_port, m_ip, m_port)
  353. class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
  354. @classmethod
  355. def setUpClass(cls):
  356. ret_port = get_unused_localhost_port()
  357. publish_port = get_unused_localhost_port()
  358. tcp_master_pub_port = get_unused_localhost_port()
  359. tcp_master_pull_port = get_unused_localhost_port()
  360. tcp_master_publish_pull = get_unused_localhost_port()
  361. tcp_master_workers = get_unused_localhost_port()
  362. cls.master_config = cls.get_temp_config(
  363. "master",
  364. **{
  365. "transport": "zeromq",
  366. "auto_accept": True,
  367. "ret_port": ret_port,
  368. "publish_port": publish_port,
  369. "tcp_master_pub_port": tcp_master_pub_port,
  370. "tcp_master_pull_port": tcp_master_pull_port,
  371. "tcp_master_publish_pull": tcp_master_publish_pull,
  372. "tcp_master_workers": tcp_master_workers,
  373. "sign_pub_messages": False,
  374. }
  375. )
  376. salt.master.SMaster.secrets["aes"] = {
  377. "secret": multiprocessing.Array(
  378. ctypes.c_char,
  379. salt.utils.stringutils.to_bytes(
  380. salt.crypt.Crypticle.generate_key_string()
  381. ),
  382. ),
  383. }
  384. cls.minion_config = cls.get_temp_config(
  385. "minion",
  386. **{
  387. "transport": "zeromq",
  388. "master_ip": "127.0.0.1",
  389. "master_port": ret_port,
  390. "auth_timeout": 5,
  391. "auth_tries": 1,
  392. "master_uri": "tcp://127.0.0.1:{}".format(ret_port),
  393. }
  394. )
  395. @classmethod
  396. def tearDownClass(cls):
  397. del cls.minion_config
  398. del cls.master_config
  399. def setUp(self):
  400. # Start the event loop, even though we don't directly use this with
  401. # ZeroMQPubServerChannel, having it running seems to increase the
  402. # likely hood of dropped messages.
  403. self.io_loop = salt.ext.tornado.ioloop.IOLoop()
  404. self.io_loop.make_current()
  405. self.io_loop_thread = threading.Thread(target=self.io_loop.start)
  406. self.io_loop_thread.start()
  407. self.process_manager = salt.utils.process.ProcessManager(
  408. name="PubServer_ProcessManager"
  409. )
  410. def tearDown(self):
  411. self.io_loop.add_callback(self.io_loop.stop)
  412. self.io_loop_thread.join()
  413. self.process_manager.stop_restarting()
  414. self.process_manager.kill_children()
  415. del self.io_loop
  416. del self.io_loop_thread
  417. del self.process_manager
  418. @staticmethod
  419. def _gather_results(opts, pub_uri, results, timeout=120, messages=None):
  420. """
  421. Gather results until then number of seconds specified by timeout passes
  422. without reveiving a message
  423. """
  424. ctx = zmq.Context()
  425. sock = ctx.socket(zmq.SUB)
  426. sock.setsockopt(zmq.LINGER, -1)
  427. sock.setsockopt(zmq.SUBSCRIBE, b"")
  428. sock.connect(pub_uri)
  429. last_msg = time.time()
  430. serial = salt.payload.Serial(opts)
  431. crypticle = salt.crypt.Crypticle(
  432. opts, salt.master.SMaster.secrets["aes"]["secret"].value
  433. )
  434. while time.time() - last_msg < timeout:
  435. try:
  436. payload = sock.recv(zmq.NOBLOCK)
  437. except zmq.ZMQError:
  438. time.sleep(0.01)
  439. else:
  440. if messages:
  441. if messages != 1:
  442. messages -= 1
  443. continue
  444. payload = crypticle.loads(serial.loads(payload)["load"])
  445. if "stop" in payload:
  446. break
  447. last_msg = time.time()
  448. results.append(payload["jid"])
  449. @skipIf(salt.utils.platform.is_windows(), "Skip on Windows OS")
  450. @slowTest
  451. def test_publish_to_pubserv_ipc(self):
  452. """
  453. Test sending 10K messags to ZeroMQPubServerChannel using IPC transport
  454. ZMQ's ipc transport not supported on Windows
  455. """
  456. opts = dict(self.master_config, ipc_mode="ipc", pub_hwm=0)
  457. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  458. server_channel.pre_fork(
  459. self.process_manager,
  460. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  461. )
  462. pub_uri = "tcp://{interface}:{publish_port}".format(**server_channel.opts)
  463. send_num = 10000
  464. expect = []
  465. results = []
  466. gather = threading.Thread(
  467. target=self._gather_results, args=(self.minion_config, pub_uri, results,)
  468. )
  469. gather.start()
  470. # Allow time for server channel to start, especially on windows
  471. time.sleep(2)
  472. for i in range(send_num):
  473. expect.append(i)
  474. load = {"tgt_type": "glob", "tgt": "*", "jid": i}
  475. server_channel.publish(load)
  476. server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
  477. gather.join()
  478. server_channel.pub_close()
  479. assert len(results) == send_num, (len(results), set(expect).difference(results))
  480. @skipIf(salt.utils.platform.is_linux(), "Skip on Linux")
  481. @slowTest
  482. def test_zeromq_publish_port(self):
  483. """
  484. test when connecting that we
  485. use the publish_port set in opts
  486. when its not 4506
  487. """
  488. opts = dict(
  489. self.master_config,
  490. ipc_mode="ipc",
  491. pub_hwm=0,
  492. recon_randomize=False,
  493. publish_port=455505,
  494. recon_default=1,
  495. recon_max=2,
  496. master_ip="127.0.0.1",
  497. acceptance_wait_time=5,
  498. acceptance_wait_time_max=5,
  499. )
  500. opts["master_uri"] = "tcp://{interface}:{publish_port}".format(**opts)
  501. channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  502. patch_socket = MagicMock(return_value=True)
  503. patch_auth = MagicMock(return_value=True)
  504. with patch.object(channel, "_socket", patch_socket), patch.object(
  505. channel, "auth", patch_auth
  506. ):
  507. channel.connect()
  508. assert str(opts["publish_port"]) in patch_socket.mock_calls[0][1][0]
  509. @skipIf(salt.utils.platform.is_linux(), "Skip on Linux")
  510. def test_zeromq_zeromq_filtering_decode_message_no_match(self):
  511. """
  512. test AsyncZeroMQPubChannel _decode_messages when
  513. zmq_filtering enabled and minion does not match
  514. """
  515. message = [
  516. b"4f26aeafdb2367620a393c973eddbe8f8b846eb",
  517. b"\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf"
  518. b"\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2"
  519. b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
  520. b"\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d"
  521. b"\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>"
  522. b"\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D",
  523. ]
  524. opts = dict(
  525. self.master_config,
  526. ipc_mode="ipc",
  527. pub_hwm=0,
  528. zmq_filtering=True,
  529. recon_randomize=False,
  530. recon_default=1,
  531. recon_max=2,
  532. master_ip="127.0.0.1",
  533. acceptance_wait_time=5,
  534. acceptance_wait_time_max=5,
  535. )
  536. opts["master_uri"] = "tcp://{interface}:{publish_port}".format(**opts)
  537. server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  538. with patch(
  539. "salt.crypt.AsyncAuth.crypticle",
  540. MagicMock(return_value={"tgt_type": "glob", "tgt": "*", "jid": 1}),
  541. ) as mock_test:
  542. res = server_channel._decode_messages(message)
  543. assert res.result() is None
  544. @skipIf(salt.utils.platform.is_linux(), "Skip on Linux")
  545. def test_zeromq_zeromq_filtering_decode_message(self):
  546. """
  547. test AsyncZeroMQPubChannel _decode_messages
  548. when zmq_filtered enabled
  549. """
  550. message = [
  551. b"4f26aeafdb2367620a393c973eddbe8f8b846ebd",
  552. b"\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf"
  553. b"\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2"
  554. b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
  555. b"\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d"
  556. b"\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>"
  557. b"\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D",
  558. ]
  559. opts = dict(
  560. self.master_config,
  561. ipc_mode="ipc",
  562. pub_hwm=0,
  563. zmq_filtering=True,
  564. recon_randomize=False,
  565. recon_default=1,
  566. recon_max=2,
  567. master_ip="127.0.0.1",
  568. acceptance_wait_time=5,
  569. acceptance_wait_time_max=5,
  570. )
  571. opts["master_uri"] = "tcp://{interface}:{publish_port}".format(**opts)
  572. server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  573. with patch(
  574. "salt.crypt.AsyncAuth.crypticle",
  575. MagicMock(return_value={"tgt_type": "glob", "tgt": "*", "jid": 1}),
  576. ) as mock_test:
  577. res = server_channel._decode_messages(message)
  578. assert res.result()["enc"] == "aes"
  579. @skipIf(salt.utils.platform.is_windows(), "Skip on Windows OS")
  580. @slowTest
  581. def test_zeromq_filtering(self):
  582. """
  583. Test sending messags to publisher using UDP
  584. with zeromq_filtering enabled
  585. """
  586. opts = dict(
  587. self.master_config,
  588. ipc_mode="ipc",
  589. pub_hwm=0,
  590. zmq_filtering=True,
  591. acceptance_wait_time=5,
  592. )
  593. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  594. server_channel.pre_fork(
  595. self.process_manager,
  596. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  597. )
  598. pub_uri = "tcp://{interface}:{publish_port}".format(**server_channel.opts)
  599. send_num = 1
  600. expect = []
  601. results = []
  602. gather = threading.Thread(
  603. target=self._gather_results,
  604. args=(self.minion_config, pub_uri, results,),
  605. kwargs={"messages": 2},
  606. )
  607. gather.start()
  608. # Allow time for server channel to start, especially on windows
  609. time.sleep(2)
  610. expect.append(send_num)
  611. load = {"tgt_type": "glob", "tgt": "*", "jid": send_num}
  612. with patch(
  613. "salt.utils.minions.CkMinions.check_minions",
  614. MagicMock(
  615. return_value={
  616. "minions": ["minion"],
  617. "missing": [],
  618. "ssh_minions": False,
  619. }
  620. ),
  621. ):
  622. server_channel.publish(load)
  623. server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
  624. gather.join()
  625. server_channel.pub_close()
  626. assert len(results) == send_num, (len(results), set(expect).difference(results))
  627. @slowTest
  628. def test_publish_to_pubserv_tcp(self):
  629. """
  630. Test sending 10K messags to ZeroMQPubServerChannel using TCP transport
  631. """
  632. opts = dict(self.master_config, ipc_mode="tcp", pub_hwm=0)
  633. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  634. server_channel.pre_fork(
  635. self.process_manager,
  636. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  637. )
  638. pub_uri = "tcp://{interface}:{publish_port}".format(**server_channel.opts)
  639. send_num = 10000
  640. expect = []
  641. results = []
  642. gather = threading.Thread(
  643. target=self._gather_results, args=(self.minion_config, pub_uri, results,)
  644. )
  645. gather.start()
  646. # Allow time for server channel to start, especially on windows
  647. time.sleep(2)
  648. for i in range(send_num):
  649. expect.append(i)
  650. load = {"tgt_type": "glob", "tgt": "*", "jid": i}
  651. server_channel.publish(load)
  652. gather.join()
  653. server_channel.pub_close()
  654. assert len(results) == send_num, (len(results), set(expect).difference(results))
  655. @staticmethod
  656. def _send_small(opts, sid, num=10):
  657. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  658. for i in range(num):
  659. load = {"tgt_type": "glob", "tgt": "*", "jid": "{}-{}".format(sid, i)}
  660. server_channel.publish(load)
  661. server_channel.pub_close()
  662. @staticmethod
  663. def _send_large(opts, sid, num=10, size=250000 * 3):
  664. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  665. for i in range(num):
  666. load = {
  667. "tgt_type": "glob",
  668. "tgt": "*",
  669. "jid": "{}-{}".format(sid, i),
  670. "xdata": "0" * size,
  671. }
  672. server_channel.publish(load)
  673. server_channel.pub_close()
  674. @skipIf(salt.utils.platform.is_freebsd(), "Skip on FreeBSD")
  675. @slowTest
  676. def test_issue_36469_tcp(self):
  677. """
  678. Test sending both large and small messags to publisher using TCP
  679. https://github.com/saltstack/salt/issues/36469
  680. """
  681. opts = dict(self.master_config, ipc_mode="tcp", pub_hwm=0)
  682. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  683. server_channel.pre_fork(
  684. self.process_manager,
  685. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  686. )
  687. send_num = 10 * 4
  688. expect = []
  689. results = []
  690. pub_uri = "tcp://{interface}:{publish_port}".format(**opts)
  691. # Allow time for server channel to start, especially on windows
  692. time.sleep(2)
  693. gather = threading.Thread(
  694. target=self._gather_results, args=(self.minion_config, pub_uri, results,)
  695. )
  696. gather.start()
  697. with ThreadPoolExecutor(max_workers=4) as executor:
  698. executor.submit(self._send_small, opts, 1)
  699. executor.submit(self._send_small, opts, 2)
  700. executor.submit(self._send_small, opts, 3)
  701. executor.submit(self._send_large, opts, 4)
  702. expect = ["{}-{}".format(a, b) for a in range(10) for b in (1, 2, 3, 4)]
  703. time.sleep(0.1)
  704. server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
  705. gather.join()
  706. server_channel.pub_close()
  707. assert len(results) == send_num, (len(results), set(expect).difference(results))
  708. class AsyncZeroMQReqChannelTests(TestCase):
  709. def test_force_close_all_instances(self):
  710. zmq1 = MagicMock()
  711. zmq2 = MagicMock()
  712. zmq3 = MagicMock()
  713. zmq_objects = {"zmq": {"1": zmq1, "2": zmq2}, "other_zmq": {"3": zmq3}}
  714. with patch(
  715. "salt.transport.zeromq.AsyncZeroMQReqChannel.instance_map", zmq_objects
  716. ):
  717. salt.transport.zeromq.AsyncZeroMQReqChannel.force_close_all_instances()
  718. self.assertEqual(zmq1.mock_calls, [call.close()])
  719. self.assertEqual(zmq2.mock_calls, [call.close()])
  720. self.assertEqual(zmq3.mock_calls, [call.close()])
  721. # check if instance map changed
  722. self.assertIs(
  723. zmq_objects, salt.transport.zeromq.AsyncZeroMQReqChannel.instance_map
  724. )