1
0

test_zeromq.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807
  1. """
  2. :codeauthor: Thomas Jackson <jacksontj.89@gmail.com>
  3. """
  4. import ctypes
  5. import multiprocessing
  6. import os
  7. import threading
  8. import time
  9. from concurrent.futures.thread import ThreadPoolExecutor
  10. import salt.config
  11. import salt.exceptions
  12. import salt.ext.tornado.gen
  13. import salt.ext.tornado.ioloop
  14. import salt.log.setup
  15. import salt.transport.client
  16. import salt.transport.server
  17. import salt.utils.platform
  18. import salt.utils.process
  19. import zmq.eventloop.ioloop
  20. from salt.ext import six
  21. from salt.ext.six.moves import range
  22. from salt.ext.tornado.testing import AsyncTestCase
  23. from salt.transport.zeromq import AsyncReqMessageClientPool
  24. from saltfactories.utils.ports import get_unused_localhost_port
  25. from tests.support.helpers import flaky, not_runs_on, slowTest
  26. from tests.support.mixins import AdaptedConfigurationTestCaseMixin
  27. from tests.support.mock import MagicMock, call, patch
  28. from tests.support.runtests import RUNTIME_VARS
  29. from tests.support.unit import TestCase, skipIf
  30. from tests.unit.transport.mixins import (
  31. PubChannelMixin,
  32. ReqChannelMixin,
  33. run_loop_in_thread,
  34. )
  35. x = "fix pre"
  36. # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
  37. if not hasattr(zmq.eventloop.ioloop, "ZMQIOLoop"):
  38. zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
  39. class BaseZMQReqCase(TestCase, AdaptedConfigurationTestCaseMixin):
  40. """
  41. Test the req server/client pair
  42. """
  43. @classmethod
  44. def setUpClass(cls):
  45. if not hasattr(cls, "_handle_payload"):
  46. return
  47. ret_port = get_unused_localhost_port()
  48. publish_port = get_unused_localhost_port()
  49. tcp_master_pub_port = get_unused_localhost_port()
  50. tcp_master_pull_port = get_unused_localhost_port()
  51. tcp_master_publish_pull = get_unused_localhost_port()
  52. tcp_master_workers = get_unused_localhost_port()
  53. cls.master_config = cls.get_temp_config(
  54. "master",
  55. **{
  56. "transport": "zeromq",
  57. "auto_accept": True,
  58. "ret_port": ret_port,
  59. "publish_port": publish_port,
  60. "tcp_master_pub_port": tcp_master_pub_port,
  61. "tcp_master_pull_port": tcp_master_pull_port,
  62. "tcp_master_publish_pull": tcp_master_publish_pull,
  63. "tcp_master_workers": tcp_master_workers,
  64. }
  65. )
  66. cls.minion_config = cls.get_temp_config(
  67. "minion",
  68. **{
  69. "transport": "zeromq",
  70. "master_ip": "127.0.0.1",
  71. "master_port": ret_port,
  72. "auth_timeout": 5,
  73. "auth_tries": 1,
  74. "master_uri": "tcp://127.0.0.1:{}".format(ret_port),
  75. }
  76. )
  77. cls.process_manager = salt.utils.process.ProcessManager(
  78. name="ReqServer_ProcessManager"
  79. )
  80. cls.server_channel = salt.transport.server.ReqServerChannel.factory(
  81. cls.master_config
  82. )
  83. cls.server_channel.pre_fork(cls.process_manager)
  84. cls.io_loop = salt.ext.tornado.ioloop.IOLoop()
  85. cls.evt = threading.Event()
  86. cls.server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
  87. cls.server_thread = threading.Thread(
  88. target=run_loop_in_thread, args=(cls.io_loop, cls.evt)
  89. )
  90. cls.server_thread.start()
  91. @classmethod
  92. def tearDownClass(cls):
  93. if not hasattr(cls, "_handle_payload"):
  94. return
  95. # Attempting to kill the children hangs the test suite.
  96. # Let the test suite handle this instead.
  97. cls.process_manager.stop_restarting()
  98. cls.process_manager.kill_children()
  99. cls.evt.set()
  100. cls.server_thread.join()
  101. time.sleep(
  102. 2
  103. ) # Give the procs a chance to fully close before we stop the io_loop
  104. cls.server_channel.close()
  105. del cls.server_channel
  106. del cls.io_loop
  107. del cls.process_manager
  108. del cls.server_thread
  109. del cls.master_config
  110. del cls.minion_config
  111. @classmethod
  112. def _handle_payload(cls, payload):
  113. """
  114. TODO: something besides echo
  115. """
  116. return payload, {"fun": "send_clear"}
  117. class ClearReqTestCases(BaseZMQReqCase, ReqChannelMixin):
  118. """
  119. Test all of the clear msg stuff
  120. """
  121. def setUp(self):
  122. self.channel = salt.transport.client.ReqChannel.factory(
  123. self.minion_config, crypt="clear"
  124. )
  125. def tearDown(self):
  126. self.channel.close()
  127. del self.channel
  128. @classmethod
  129. @salt.ext.tornado.gen.coroutine
  130. def _handle_payload(cls, payload):
  131. """
  132. TODO: something besides echo
  133. """
  134. raise salt.ext.tornado.gen.Return((payload, {"fun": "send_clear"}))
  135. @slowTest
  136. def test_master_uri_override(self):
  137. """
  138. ensure master_uri kwarg is respected
  139. """
  140. # minion_config should be 127.0.0.1, we want a different uri that still connects
  141. uri = "tcp://{master_ip}:{master_port}".format(
  142. master_ip="localhost", master_port=self.minion_config["master_port"]
  143. )
  144. channel = salt.transport.client.ReqChannel.factory(
  145. self.minion_config, master_uri=uri
  146. )
  147. self.assertIn("localhost", channel.master_uri)
  148. del channel
  149. @flaky
  150. @not_runs_on(
  151. kernel="linux",
  152. os_familiy="Suse",
  153. reason="Skipping until https://github.com/saltstack/salt/issues/32902 gets fixed",
  154. )
  155. class AESReqTestCases(BaseZMQReqCase, ReqChannelMixin):
  156. def setUp(self):
  157. self.channel = salt.transport.client.ReqChannel.factory(self.minion_config)
  158. def tearDown(self):
  159. self.channel.close()
  160. del self.channel
  161. @classmethod
  162. @salt.ext.tornado.gen.coroutine
  163. def _handle_payload(cls, payload):
  164. """
  165. TODO: something besides echo
  166. """
  167. raise salt.ext.tornado.gen.Return((payload, {"fun": "send"}))
  168. # TODO: make failed returns have a specific framing so we can raise the same exception
  169. # on encrypted channels
  170. #
  171. #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  172. #
  173. # WARNING: This test will fail randomly on any system with > 1 CPU core!!!
  174. #
  175. #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  176. @slowTest
  177. def test_badload(self):
  178. """
  179. Test a variety of bad requests, make sure that we get some sort of error
  180. """
  181. # TODO: This test should be re-enabled when Jenkins moves to C7.
  182. # Once the version of salt-testing is increased to something newer than the September
  183. # release of salt-testing, the @flaky decorator should be applied to this test.
  184. msgs = ["", [], tuple()]
  185. for msg in msgs:
  186. with self.assertRaises(salt.exceptions.AuthenticationError):
  187. ret = self.channel.send(msg, timeout=5)
  188. class BaseZMQPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
  189. """
  190. Test the req server/client pair
  191. """
  192. @classmethod
  193. def setUpClass(cls):
  194. ret_port = get_unused_localhost_port()
  195. publish_port = get_unused_localhost_port()
  196. tcp_master_pub_port = get_unused_localhost_port()
  197. tcp_master_pull_port = get_unused_localhost_port()
  198. tcp_master_publish_pull = get_unused_localhost_port()
  199. tcp_master_workers = get_unused_localhost_port()
  200. cls.master_config = cls.get_temp_config(
  201. "master",
  202. **{
  203. "transport": "zeromq",
  204. "auto_accept": True,
  205. "ret_port": ret_port,
  206. "publish_port": publish_port,
  207. "tcp_master_pub_port": tcp_master_pub_port,
  208. "tcp_master_pull_port": tcp_master_pull_port,
  209. "tcp_master_publish_pull": tcp_master_publish_pull,
  210. "tcp_master_workers": tcp_master_workers,
  211. }
  212. )
  213. cls.minion_config = salt.config.minion_config(
  214. os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "minion")
  215. )
  216. cls.minion_config = cls.get_temp_config(
  217. "minion",
  218. **{
  219. "transport": "zeromq",
  220. "master_ip": "127.0.0.1",
  221. "master_port": ret_port,
  222. "master_uri": "tcp://127.0.0.1:{}".format(ret_port),
  223. }
  224. )
  225. cls.process_manager = salt.utils.process.ProcessManager(
  226. name="ReqServer_ProcessManager"
  227. )
  228. cls.server_channel = salt.transport.server.PubServerChannel.factory(
  229. cls.master_config
  230. )
  231. cls.server_channel.pre_fork(cls.process_manager)
  232. # we also require req server for auth
  233. cls.req_server_channel = salt.transport.server.ReqServerChannel.factory(
  234. cls.master_config
  235. )
  236. cls.req_server_channel.pre_fork(cls.process_manager)
  237. cls._server_io_loop = salt.ext.tornado.ioloop.IOLoop()
  238. cls.evt = threading.Event()
  239. cls.req_server_channel.post_fork(
  240. cls._handle_payload, io_loop=cls._server_io_loop
  241. )
  242. cls.server_thread = threading.Thread(
  243. target=run_loop_in_thread, args=(cls._server_io_loop, cls.evt)
  244. )
  245. cls.server_thread.start()
  246. @classmethod
  247. def tearDownClass(cls):
  248. cls.process_manager.kill_children()
  249. cls.process_manager.stop_restarting()
  250. time.sleep(
  251. 2
  252. ) # Give the procs a chance to fully close before we stop the io_loop
  253. cls.evt.set()
  254. cls.server_thread.join()
  255. cls.req_server_channel.close()
  256. cls.server_channel.close()
  257. cls._server_io_loop.stop()
  258. del cls.server_channel
  259. del cls._server_io_loop
  260. del cls.process_manager
  261. del cls.server_thread
  262. del cls.master_config
  263. del cls.minion_config
  264. @classmethod
  265. def _handle_payload(cls, payload):
  266. """
  267. TODO: something besides echo
  268. """
  269. return payload, {"fun": "send_clear"}
  270. def setUp(self):
  271. super().setUp()
  272. self._start_handlers = dict(self.io_loop._handlers)
  273. def tearDown(self):
  274. super().tearDown()
  275. failures = []
  276. for k, v in self.io_loop._handlers.items():
  277. if self._start_handlers.get(k) != v:
  278. failures.append((k, v))
  279. del self._start_handlers
  280. if len(failures) > 0:
  281. raise Exception("FDs still attached to the IOLoop: {}".format(failures))
  282. @skipIf(True, "Skip until we can devote time to fix this test")
  283. class AsyncPubChannelTest(BaseZMQPubCase, PubChannelMixin):
  284. """
  285. Tests around the publish system
  286. """
  287. def get_new_ioloop(self):
  288. return salt.ext.tornado.ioloop.IOLoop()
  289. class AsyncReqMessageClientPoolTest(TestCase):
  290. def setUp(self):
  291. super().setUp()
  292. sock_pool_size = 5
  293. with patch(
  294. "salt.transport.zeromq.AsyncReqMessageClient.__init__",
  295. MagicMock(return_value=None),
  296. ):
  297. self.message_client_pool = AsyncReqMessageClientPool(
  298. {"sock_pool_size": sock_pool_size}, args=({}, "")
  299. )
  300. self.original_message_clients = self.message_client_pool.message_clients
  301. self.message_client_pool.message_clients = [
  302. MagicMock() for _ in range(sock_pool_size)
  303. ]
  304. def tearDown(self):
  305. del self.original_message_clients
  306. super().tearDown()
  307. def test_send(self):
  308. for message_client_mock in self.message_client_pool.message_clients:
  309. message_client_mock.send_queue = [0, 0, 0]
  310. message_client_mock.send.return_value = []
  311. self.assertEqual([], self.message_client_pool.send())
  312. self.message_client_pool.message_clients[2].send_queue = [0]
  313. self.message_client_pool.message_clients[2].send.return_value = [1]
  314. self.assertEqual([1], self.message_client_pool.send())
  315. class ZMQConfigTest(TestCase):
  316. def test_master_uri(self):
  317. """
  318. test _get_master_uri method
  319. """
  320. m_ip = "127.0.0.1"
  321. m_port = 4505
  322. s_ip = "111.1.0.1"
  323. s_port = 4058
  324. m_ip6 = "1234:5678::9abc"
  325. s_ip6 = "1234:5678::1:9abc"
  326. with patch("salt.transport.zeromq.LIBZMQ_VERSION_INFO", (4, 1, 6)), patch(
  327. "salt.transport.zeromq.ZMQ_VERSION_INFO", (16, 0, 1)
  328. ):
  329. # pass in both source_ip and source_port
  330. assert salt.transport.zeromq._get_master_uri(
  331. master_ip=m_ip, master_port=m_port, source_ip=s_ip, source_port=s_port
  332. ) == "tcp://{}:{};{}:{}".format(s_ip, s_port, m_ip, m_port)
  333. assert salt.transport.zeromq._get_master_uri(
  334. master_ip=m_ip6, master_port=m_port, source_ip=s_ip6, source_port=s_port
  335. ) == "tcp://[{}]:{};[{}]:{}".format(s_ip6, s_port, m_ip6, m_port)
  336. # source ip and source_port empty
  337. assert salt.transport.zeromq._get_master_uri(
  338. master_ip=m_ip, master_port=m_port
  339. ) == "tcp://{}:{}".format(m_ip, m_port)
  340. assert salt.transport.zeromq._get_master_uri(
  341. master_ip=m_ip6, master_port=m_port
  342. ) == "tcp://[{}]:{}".format(m_ip6, m_port)
  343. # pass in only source_ip
  344. assert salt.transport.zeromq._get_master_uri(
  345. master_ip=m_ip, master_port=m_port, source_ip=s_ip
  346. ) == "tcp://{}:0;{}:{}".format(s_ip, m_ip, m_port)
  347. assert salt.transport.zeromq._get_master_uri(
  348. master_ip=m_ip6, master_port=m_port, source_ip=s_ip6
  349. ) == "tcp://[{}]:0;[{}]:{}".format(s_ip6, m_ip6, m_port)
  350. # pass in only source_port
  351. assert salt.transport.zeromq._get_master_uri(
  352. master_ip=m_ip, master_port=m_port, source_port=s_port
  353. ) == "tcp://0.0.0.0:{};{}:{}".format(s_port, m_ip, m_port)
  354. class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
  355. @classmethod
  356. def setUpClass(cls):
  357. ret_port = get_unused_localhost_port()
  358. publish_port = get_unused_localhost_port()
  359. tcp_master_pub_port = get_unused_localhost_port()
  360. tcp_master_pull_port = get_unused_localhost_port()
  361. tcp_master_publish_pull = get_unused_localhost_port()
  362. tcp_master_workers = get_unused_localhost_port()
  363. cls.master_config = cls.get_temp_config(
  364. "master",
  365. **{
  366. "transport": "zeromq",
  367. "auto_accept": True,
  368. "ret_port": ret_port,
  369. "publish_port": publish_port,
  370. "tcp_master_pub_port": tcp_master_pub_port,
  371. "tcp_master_pull_port": tcp_master_pull_port,
  372. "tcp_master_publish_pull": tcp_master_publish_pull,
  373. "tcp_master_workers": tcp_master_workers,
  374. "sign_pub_messages": False,
  375. }
  376. )
  377. salt.master.SMaster.secrets["aes"] = {
  378. "secret": multiprocessing.Array(
  379. ctypes.c_char, six.b(salt.crypt.Crypticle.generate_key_string()),
  380. ),
  381. }
  382. cls.minion_config = cls.get_temp_config(
  383. "minion",
  384. **{
  385. "transport": "zeromq",
  386. "master_ip": "127.0.0.1",
  387. "master_port": ret_port,
  388. "auth_timeout": 5,
  389. "auth_tries": 1,
  390. "master_uri": "tcp://127.0.0.1:{}".format(ret_port),
  391. }
  392. )
  393. @classmethod
  394. def tearDownClass(cls):
  395. del cls.minion_config
  396. del cls.master_config
  397. def setUp(self):
  398. # Start the event loop, even though we don't directly use this with
  399. # ZeroMQPubServerChannel, having it running seems to increase the
  400. # likely hood of dropped messages.
  401. self.io_loop = salt.ext.tornado.ioloop.IOLoop()
  402. self.io_loop.make_current()
  403. self.io_loop_thread = threading.Thread(target=self.io_loop.start)
  404. self.io_loop_thread.start()
  405. self.process_manager = salt.utils.process.ProcessManager(
  406. name="PubServer_ProcessManager"
  407. )
  408. def tearDown(self):
  409. self.io_loop.add_callback(self.io_loop.stop)
  410. self.io_loop_thread.join()
  411. self.process_manager.stop_restarting()
  412. self.process_manager.kill_children()
  413. del self.io_loop
  414. del self.io_loop_thread
  415. del self.process_manager
  416. @staticmethod
  417. def _gather_results(opts, pub_uri, results, timeout=120, messages=None):
  418. """
  419. Gather results until then number of seconds specified by timeout passes
  420. without reveiving a message
  421. """
  422. ctx = zmq.Context()
  423. sock = ctx.socket(zmq.SUB)
  424. sock.setsockopt(zmq.LINGER, -1)
  425. sock.setsockopt(zmq.SUBSCRIBE, b"")
  426. sock.connect(pub_uri)
  427. last_msg = time.time()
  428. serial = salt.payload.Serial(opts)
  429. crypticle = salt.crypt.Crypticle(
  430. opts, salt.master.SMaster.secrets["aes"]["secret"].value
  431. )
  432. while time.time() - last_msg < timeout:
  433. try:
  434. payload = sock.recv(zmq.NOBLOCK)
  435. except zmq.ZMQError:
  436. time.sleep(0.01)
  437. else:
  438. if messages:
  439. if messages != 1:
  440. messages -= 1
  441. continue
  442. payload = crypticle.loads(serial.loads(payload)["load"])
  443. if "stop" in payload:
  444. break
  445. last_msg = time.time()
  446. results.append(payload["jid"])
  447. @skipIf(salt.utils.platform.is_windows(), "Skip on Windows OS")
  448. @slowTest
  449. def test_publish_to_pubserv_ipc(self):
  450. """
  451. Test sending 10K messags to ZeroMQPubServerChannel using IPC transport
  452. ZMQ's ipc transport not supported on Windows
  453. """
  454. opts = dict(self.master_config, ipc_mode="ipc", pub_hwm=0)
  455. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  456. server_channel.pre_fork(
  457. self.process_manager,
  458. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  459. )
  460. pub_uri = "tcp://{interface}:{publish_port}".format(**server_channel.opts)
  461. send_num = 10000
  462. expect = []
  463. results = []
  464. gather = threading.Thread(
  465. target=self._gather_results, args=(self.minion_config, pub_uri, results,)
  466. )
  467. gather.start()
  468. # Allow time for server channel to start, especially on windows
  469. time.sleep(2)
  470. for i in range(send_num):
  471. expect.append(i)
  472. load = {"tgt_type": "glob", "tgt": "*", "jid": i}
  473. server_channel.publish(load)
  474. server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
  475. gather.join()
  476. server_channel.pub_close()
  477. assert len(results) == send_num, (len(results), set(expect).difference(results))
  478. @skipIf(salt.utils.platform.is_linux(), "Skip on Linux")
  479. @slowTest
  480. def test_zeromq_publish_port(self):
  481. """
  482. test when connecting that we
  483. use the publish_port set in opts
  484. when its not 4506
  485. """
  486. opts = dict(
  487. self.master_config,
  488. ipc_mode="ipc",
  489. pub_hwm=0,
  490. recon_randomize=False,
  491. publish_port=455505,
  492. recon_default=1,
  493. recon_max=2,
  494. master_ip="127.0.0.1",
  495. acceptance_wait_time=5,
  496. acceptance_wait_time_max=5,
  497. )
  498. opts["master_uri"] = "tcp://{interface}:{publish_port}".format(**opts)
  499. channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  500. patch_socket = MagicMock(return_value=True)
  501. patch_auth = MagicMock(return_value=True)
  502. with patch.object(channel, "_socket", patch_socket), patch.object(
  503. channel, "auth", patch_auth
  504. ):
  505. channel.connect()
  506. assert str(opts["publish_port"]) in patch_socket.mock_calls[0][1][0]
  507. @skipIf(salt.utils.platform.is_linux(), "Skip on Linux")
  508. def test_zeromq_zeromq_filtering_decode_message_no_match(self):
  509. """
  510. test AsyncZeroMQPubChannel _decode_messages when
  511. zmq_filtering enabled and minion does not match
  512. """
  513. message = [
  514. b"4f26aeafdb2367620a393c973eddbe8f8b846eb",
  515. b"\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf"
  516. b"\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2"
  517. b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
  518. b"\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d"
  519. b"\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>"
  520. b"\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D",
  521. ]
  522. opts = dict(
  523. self.master_config,
  524. ipc_mode="ipc",
  525. pub_hwm=0,
  526. zmq_filtering=True,
  527. recon_randomize=False,
  528. recon_default=1,
  529. recon_max=2,
  530. master_ip="127.0.0.1",
  531. acceptance_wait_time=5,
  532. acceptance_wait_time_max=5,
  533. )
  534. opts["master_uri"] = "tcp://{interface}:{publish_port}".format(**opts)
  535. server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  536. with patch(
  537. "salt.crypt.AsyncAuth.crypticle",
  538. MagicMock(return_value={"tgt_type": "glob", "tgt": "*", "jid": 1}),
  539. ) as mock_test:
  540. res = server_channel._decode_messages(message)
  541. assert res.result() is None
  542. @skipIf(salt.utils.platform.is_linux(), "Skip on Linux")
  543. def test_zeromq_zeromq_filtering_decode_message(self):
  544. """
  545. test AsyncZeroMQPubChannel _decode_messages
  546. when zmq_filtered enabled
  547. """
  548. message = [
  549. b"4f26aeafdb2367620a393c973eddbe8f8b846ebd",
  550. b"\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf"
  551. b"\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2"
  552. b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
  553. b"\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d"
  554. b"\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>"
  555. b"\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D",
  556. ]
  557. opts = dict(
  558. self.master_config,
  559. ipc_mode="ipc",
  560. pub_hwm=0,
  561. zmq_filtering=True,
  562. recon_randomize=False,
  563. recon_default=1,
  564. recon_max=2,
  565. master_ip="127.0.0.1",
  566. acceptance_wait_time=5,
  567. acceptance_wait_time_max=5,
  568. )
  569. opts["master_uri"] = "tcp://{interface}:{publish_port}".format(**opts)
  570. server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  571. with patch(
  572. "salt.crypt.AsyncAuth.crypticle",
  573. MagicMock(return_value={"tgt_type": "glob", "tgt": "*", "jid": 1}),
  574. ) as mock_test:
  575. res = server_channel._decode_messages(message)
  576. assert res.result()["enc"] == "aes"
  577. @skipIf(salt.utils.platform.is_windows(), "Skip on Windows OS")
  578. @slowTest
  579. def test_zeromq_filtering(self):
  580. """
  581. Test sending messags to publisher using UDP
  582. with zeromq_filtering enabled
  583. """
  584. opts = dict(
  585. self.master_config,
  586. ipc_mode="ipc",
  587. pub_hwm=0,
  588. zmq_filtering=True,
  589. acceptance_wait_time=5,
  590. )
  591. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  592. server_channel.pre_fork(
  593. self.process_manager,
  594. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  595. )
  596. pub_uri = "tcp://{interface}:{publish_port}".format(**server_channel.opts)
  597. send_num = 1
  598. expect = []
  599. results = []
  600. gather = threading.Thread(
  601. target=self._gather_results,
  602. args=(self.minion_config, pub_uri, results,),
  603. kwargs={"messages": 2},
  604. )
  605. gather.start()
  606. # Allow time for server channel to start, especially on windows
  607. time.sleep(2)
  608. expect.append(send_num)
  609. load = {"tgt_type": "glob", "tgt": "*", "jid": send_num}
  610. with patch(
  611. "salt.utils.minions.CkMinions.check_minions",
  612. MagicMock(
  613. return_value={
  614. "minions": ["minion"],
  615. "missing": [],
  616. "ssh_minions": False,
  617. }
  618. ),
  619. ):
  620. server_channel.publish(load)
  621. server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
  622. gather.join()
  623. server_channel.pub_close()
  624. assert len(results) == send_num, (len(results), set(expect).difference(results))
  625. @slowTest
  626. def test_publish_to_pubserv_tcp(self):
  627. """
  628. Test sending 10K messags to ZeroMQPubServerChannel using TCP transport
  629. """
  630. opts = dict(self.master_config, ipc_mode="tcp", pub_hwm=0)
  631. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  632. server_channel.pre_fork(
  633. self.process_manager,
  634. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  635. )
  636. pub_uri = "tcp://{interface}:{publish_port}".format(**server_channel.opts)
  637. send_num = 10000
  638. expect = []
  639. results = []
  640. gather = threading.Thread(
  641. target=self._gather_results, args=(self.minion_config, pub_uri, results,)
  642. )
  643. gather.start()
  644. # Allow time for server channel to start, especially on windows
  645. time.sleep(2)
  646. for i in range(send_num):
  647. expect.append(i)
  648. load = {"tgt_type": "glob", "tgt": "*", "jid": i}
  649. server_channel.publish(load)
  650. gather.join()
  651. server_channel.pub_close()
  652. assert len(results) == send_num, (len(results), set(expect).difference(results))
  653. @staticmethod
  654. def _send_small(opts, sid, num=10):
  655. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  656. for i in range(num):
  657. load = {"tgt_type": "glob", "tgt": "*", "jid": "{}-{}".format(sid, i)}
  658. server_channel.publish(load)
  659. server_channel.pub_close()
  660. @staticmethod
  661. def _send_large(opts, sid, num=10, size=250000 * 3):
  662. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  663. for i in range(num):
  664. load = {
  665. "tgt_type": "glob",
  666. "tgt": "*",
  667. "jid": "{}-{}".format(sid, i),
  668. "xdata": "0" * size,
  669. }
  670. server_channel.publish(load)
  671. server_channel.pub_close()
  672. @skipIf(salt.utils.platform.is_freebsd(), "Skip on FreeBSD")
  673. @slowTest
  674. def test_issue_36469_tcp(self):
  675. """
  676. Test sending both large and small messags to publisher using TCP
  677. https://github.com/saltstack/salt/issues/36469
  678. """
  679. opts = dict(self.master_config, ipc_mode="tcp", pub_hwm=0)
  680. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  681. server_channel.pre_fork(
  682. self.process_manager,
  683. kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()},
  684. )
  685. send_num = 10 * 4
  686. expect = []
  687. results = []
  688. pub_uri = "tcp://{interface}:{publish_port}".format(**opts)
  689. # Allow time for server channel to start, especially on windows
  690. time.sleep(2)
  691. gather = threading.Thread(
  692. target=self._gather_results, args=(self.minion_config, pub_uri, results,)
  693. )
  694. gather.start()
  695. with ThreadPoolExecutor(max_workers=4) as executor:
  696. executor.submit(self._send_small, opts, 1)
  697. executor.submit(self._send_small, opts, 2)
  698. executor.submit(self._send_small, opts, 3)
  699. executor.submit(self._send_large, opts, 4)
  700. expect = ["{}-{}".format(a, b) for a in range(10) for b in (1, 2, 3, 4)]
  701. time.sleep(0.1)
  702. server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True})
  703. gather.join()
  704. server_channel.pub_close()
  705. assert len(results) == send_num, (len(results), set(expect).difference(results))
  706. class AsyncZeroMQReqChannelTests(TestCase):
  707. def test_force_close_all_instances(self):
  708. zmq1 = MagicMock()
  709. zmq2 = MagicMock()
  710. zmq3 = MagicMock()
  711. zmq_objects = {"zmq": {"1": zmq1, "2": zmq2}, "other_zmq": {"3": zmq3}}
  712. with patch(
  713. "salt.transport.zeromq.AsyncZeroMQReqChannel.instance_map", zmq_objects
  714. ):
  715. salt.transport.zeromq.AsyncZeroMQReqChannel.force_close_all_instances()
  716. self.assertEqual(zmq1.mock_calls, [call.close()])
  717. self.assertEqual(zmq2.mock_calls, [call.close()])
  718. self.assertEqual(zmq3.mock_calls, [call.close()])
  719. # check if instance map changed
  720. self.assertIs(
  721. zmq_objects, salt.transport.zeromq.AsyncZeroMQReqChannel.instance_map
  722. )