test_zeromq.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Thomas Jackson <jacksontj.89@gmail.com>
  4. '''
  5. # Import python libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import os
  8. import time
  9. import threading
  10. import multiprocessing
  11. import ctypes
  12. from concurrent.futures.thread import ThreadPoolExecutor
  13. # linux_distribution deprecated in py3.7
  14. try:
  15. from platform import linux_distribution
  16. except ImportError:
  17. from distro import linux_distribution
  18. # Import 3rd-party libs
  19. import zmq.eventloop.ioloop
  20. # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
  21. if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
  22. zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
  23. from tornado.testing import AsyncTestCase
  24. import tornado.gen
  25. # Import Salt libs
  26. import salt.config
  27. import salt.log.setup
  28. from salt.ext import six
  29. import salt.utils.process
  30. import salt.utils.platform
  31. import salt.transport.server
  32. import salt.transport.client
  33. import salt.exceptions
  34. from salt.ext.six.moves import range
  35. from salt.transport.zeromq import AsyncReqMessageClientPool
  36. # Import test support libs
  37. from tests.support.runtests import RUNTIME_VARS
  38. from tests.support.unit import TestCase, skipIf, WAR_ROOM_SKIP
  39. from tests.support.helpers import flaky, get_unused_localhost_port
  40. from tests.support.mixins import AdaptedConfigurationTestCaseMixin
  41. from tests.support.mock import MagicMock, patch
  42. from tests.unit.transport.mixins import PubChannelMixin, ReqChannelMixin
  43. ON_SUSE = False
  44. if 'SuSE' in linux_distribution(full_distribution_name=False):
  45. ON_SUSE = True
  46. class BaseZMQReqCase(TestCase, AdaptedConfigurationTestCaseMixin):
  47. '''
  48. Test the req server/client pair
  49. '''
  50. @classmethod
  51. def setUpClass(cls):
  52. if not hasattr(cls, '_handle_payload'):
  53. return
  54. ret_port = get_unused_localhost_port()
  55. publish_port = get_unused_localhost_port()
  56. tcp_master_pub_port = get_unused_localhost_port()
  57. tcp_master_pull_port = get_unused_localhost_port()
  58. tcp_master_publish_pull = get_unused_localhost_port()
  59. tcp_master_workers = get_unused_localhost_port()
  60. cls.master_config = cls.get_temp_config(
  61. 'master',
  62. **{'transport': 'zeromq',
  63. 'auto_accept': True,
  64. 'ret_port': ret_port,
  65. 'publish_port': publish_port,
  66. 'tcp_master_pub_port': tcp_master_pub_port,
  67. 'tcp_master_pull_port': tcp_master_pull_port,
  68. 'tcp_master_publish_pull': tcp_master_publish_pull,
  69. 'tcp_master_workers': tcp_master_workers}
  70. )
  71. cls.minion_config = cls.get_temp_config(
  72. 'minion',
  73. **{'transport': 'zeromq',
  74. 'master_ip': '127.0.0.1',
  75. 'master_port': ret_port,
  76. 'auth_timeout': 5,
  77. 'auth_tries': 1,
  78. 'master_uri': 'tcp://127.0.0.1:{0}'.format(ret_port)}
  79. )
  80. cls.process_manager = salt.utils.process.ProcessManager(name='ReqServer_ProcessManager')
  81. cls.server_channel = salt.transport.server.ReqServerChannel.factory(cls.master_config)
  82. cls.server_channel.pre_fork(cls.process_manager)
  83. cls.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
  84. cls.io_loop.make_current()
  85. cls.server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
  86. cls.server_thread = threading.Thread(target=cls.io_loop.start)
  87. cls.server_thread.daemon = True
  88. cls.server_thread.start()
  89. @classmethod
  90. def tearDownClass(cls):
  91. if not hasattr(cls, '_handle_payload'):
  92. return
  93. # Attempting to kill the children hangs the test suite.
  94. # Let the test suite handle this instead.
  95. cls.process_manager.stop_restarting()
  96. cls.process_manager.kill_children()
  97. cls.io_loop.add_callback(cls.io_loop.stop)
  98. cls.server_thread.join()
  99. time.sleep(2) # Give the procs a chance to fully close before we stop the io_loop
  100. cls.server_channel.close()
  101. del cls.server_channel
  102. del cls.io_loop
  103. del cls.process_manager
  104. del cls.server_thread
  105. del cls.master_config
  106. del cls.minion_config
  107. @classmethod
  108. def _handle_payload(cls, payload):
  109. '''
  110. TODO: something besides echo
  111. '''
  112. return payload, {'fun': 'send_clear'}
  113. class ClearReqTestCases(BaseZMQReqCase, ReqChannelMixin):
  114. '''
  115. Test all of the clear msg stuff
  116. '''
  117. def setUp(self):
  118. self.channel = salt.transport.client.ReqChannel.factory(self.minion_config, crypt='clear')
  119. def tearDown(self):
  120. del self.channel
  121. @classmethod
  122. @tornado.gen.coroutine
  123. def _handle_payload(cls, payload):
  124. '''
  125. TODO: something besides echo
  126. '''
  127. raise tornado.gen.Return((payload, {'fun': 'send_clear'}))
  128. def test_master_uri_override(self):
  129. '''
  130. ensure master_uri kwarg is respected
  131. '''
  132. # minion_config should be 127.0.0.1, we want a different uri that still connects
  133. uri = 'tcp://{master_ip}:{master_port}'.format(master_ip='localhost', master_port=self.minion_config['master_port'])
  134. channel = salt.transport.Channel.factory(self.minion_config, master_uri=uri)
  135. self.assertIn('localhost', channel.master_uri)
  136. del channel
  137. @flaky
  138. @skipIf(ON_SUSE, 'Skipping until https://github.com/saltstack/salt/issues/32902 gets fixed')
  139. class AESReqTestCases(BaseZMQReqCase, ReqChannelMixin):
  140. def setUp(self):
  141. self.channel = salt.transport.client.ReqChannel.factory(self.minion_config)
  142. def tearDown(self):
  143. del self.channel
  144. @classmethod
  145. @tornado.gen.coroutine
  146. def _handle_payload(cls, payload):
  147. '''
  148. TODO: something besides echo
  149. '''
  150. raise tornado.gen.Return((payload, {'fun': 'send'}))
  151. # TODO: make failed returns have a specific framing so we can raise the same exception
  152. # on encrypted channels
  153. #
  154. #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  155. #
  156. # WARNING: This test will fail randomly on any system with > 1 CPU core!!!
  157. #
  158. #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  159. def test_badload(self):
  160. '''
  161. Test a variety of bad requests, make sure that we get some sort of error
  162. '''
  163. # TODO: This test should be re-enabled when Jenkins moves to C7.
  164. # Once the version of salt-testing is increased to something newer than the September
  165. # release of salt-testing, the @flaky decorator should be applied to this test.
  166. msgs = ['', [], tuple()]
  167. for msg in msgs:
  168. with self.assertRaises(salt.exceptions.AuthenticationError):
  169. ret = self.channel.send(msg, timeout=5)
  170. class BaseZMQPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
  171. '''
  172. Test the req server/client pair
  173. '''
  174. @classmethod
  175. def setUpClass(cls):
  176. ret_port = get_unused_localhost_port()
  177. publish_port = get_unused_localhost_port()
  178. tcp_master_pub_port = get_unused_localhost_port()
  179. tcp_master_pull_port = get_unused_localhost_port()
  180. tcp_master_publish_pull = get_unused_localhost_port()
  181. tcp_master_workers = get_unused_localhost_port()
  182. cls.master_config = cls.get_temp_config(
  183. 'master',
  184. **{'transport': 'zeromq',
  185. 'auto_accept': True,
  186. 'ret_port': ret_port,
  187. 'publish_port': publish_port,
  188. 'tcp_master_pub_port': tcp_master_pub_port,
  189. 'tcp_master_pull_port': tcp_master_pull_port,
  190. 'tcp_master_publish_pull': tcp_master_publish_pull,
  191. 'tcp_master_workers': tcp_master_workers}
  192. )
  193. cls.minion_config = salt.config.minion_config(os.path.join(RUNTIME_VARS.TMP_CONF_DIR, 'minion'))
  194. cls.minion_config = cls.get_temp_config(
  195. 'minion',
  196. **{'transport': 'zeromq',
  197. 'master_ip': '127.0.0.1',
  198. 'master_port': ret_port,
  199. 'master_uri': 'tcp://127.0.0.1:{0}'.format(ret_port)}
  200. )
  201. cls.process_manager = salt.utils.process.ProcessManager(name='ReqServer_ProcessManager')
  202. cls.server_channel = salt.transport.server.PubServerChannel.factory(cls.master_config)
  203. cls.server_channel.pre_fork(cls.process_manager)
  204. # we also require req server for auth
  205. cls.req_server_channel = salt.transport.server.ReqServerChannel.factory(cls.master_config)
  206. cls.req_server_channel.pre_fork(cls.process_manager)
  207. cls._server_io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
  208. cls.req_server_channel.post_fork(cls._handle_payload, io_loop=cls._server_io_loop)
  209. cls.server_thread = threading.Thread(target=cls._server_io_loop.start)
  210. cls.server_thread.daemon = True
  211. cls.server_thread.start()
  212. @classmethod
  213. def tearDownClass(cls):
  214. cls.process_manager.kill_children()
  215. cls.process_manager.stop_restarting()
  216. time.sleep(2) # Give the procs a chance to fully close before we stop the io_loop
  217. cls.io_loop.add_callback(cls.io_loop.stop)
  218. cls.server_thread.join()
  219. cls.req_server_channel.close()
  220. cls.server_channel.close()
  221. cls._server_io_loop.stop()
  222. del cls.server_channel
  223. del cls._server_io_loop
  224. del cls.process_manager
  225. del cls.server_thread
  226. del cls.master_config
  227. del cls.minion_config
  228. @classmethod
  229. def _handle_payload(cls, payload):
  230. '''
  231. TODO: something besides echo
  232. '''
  233. return payload, {'fun': 'send_clear'}
  234. def setUp(self):
  235. super(BaseZMQPubCase, self).setUp()
  236. self._start_handlers = dict(self.io_loop._handlers)
  237. def tearDown(self):
  238. super(BaseZMQPubCase, self).tearDown()
  239. failures = []
  240. for k, v in six.iteritems(self.io_loop._handlers):
  241. if self._start_handlers.get(k) != v:
  242. failures.append((k, v))
  243. del self._start_handlers
  244. if failures:
  245. raise Exception('FDs still attached to the IOLoop: {0}'.format(failures))
  246. @skipIf(True, 'Skip until we can devote time to fix this test')
  247. class AsyncPubChannelTest(BaseZMQPubCase, PubChannelMixin):
  248. '''
  249. Tests around the publish system
  250. '''
  251. def get_new_ioloop(self):
  252. return zmq.eventloop.ioloop.ZMQIOLoop()
  253. class AsyncReqMessageClientPoolTest(TestCase):
  254. def setUp(self):
  255. super(AsyncReqMessageClientPoolTest, self).setUp()
  256. sock_pool_size = 5
  257. with patch('salt.transport.zeromq.AsyncReqMessageClient.__init__', MagicMock(return_value=None)):
  258. self.message_client_pool = AsyncReqMessageClientPool({'sock_pool_size': sock_pool_size},
  259. args=({}, ''))
  260. self.original_message_clients = self.message_client_pool.message_clients
  261. self.message_client_pool.message_clients = [MagicMock() for _ in range(sock_pool_size)]
  262. def tearDown(self):
  263. with patch('salt.transport.zeromq.AsyncReqMessageClient.destroy', MagicMock(return_value=None)):
  264. del self.original_message_clients
  265. super(AsyncReqMessageClientPoolTest, self).tearDown()
  266. def test_send(self):
  267. for message_client_mock in self.message_client_pool.message_clients:
  268. message_client_mock.send_queue = [0, 0, 0]
  269. message_client_mock.send.return_value = []
  270. self.assertEqual([], self.message_client_pool.send())
  271. self.message_client_pool.message_clients[2].send_queue = [0]
  272. self.message_client_pool.message_clients[2].send.return_value = [1]
  273. self.assertEqual([1], self.message_client_pool.send())
  274. def test_destroy(self):
  275. self.message_client_pool.destroy()
  276. self.assertEqual([], self.message_client_pool.message_clients)
  277. class ZMQConfigTest(TestCase):
  278. def test_master_uri(self):
  279. '''
  280. test _get_master_uri method
  281. '''
  282. m_ip = '127.0.0.1'
  283. m_port = 4505
  284. s_ip = '111.1.0.1'
  285. s_port = 4058
  286. m_ip6 = '1234:5678::9abc'
  287. s_ip6 = '1234:5678::1:9abc'
  288. with patch('salt.transport.zeromq.LIBZMQ_VERSION_INFO', (4, 1, 6)), \
  289. patch('salt.transport.zeromq.ZMQ_VERSION_INFO', (16, 0, 1)):
  290. # pass in both source_ip and source_port
  291. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip,
  292. master_port=m_port,
  293. source_ip=s_ip,
  294. source_port=s_port) == 'tcp://{0}:{1};{2}:{3}'.format(s_ip, s_port, m_ip, m_port)
  295. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip6,
  296. master_port=m_port,
  297. source_ip=s_ip6,
  298. source_port=s_port) == 'tcp://[{0}]:{1};[{2}]:{3}'.format(s_ip6, s_port, m_ip6, m_port)
  299. # source ip and source_port empty
  300. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip,
  301. master_port=m_port) == 'tcp://{0}:{1}'.format(m_ip, m_port)
  302. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip6,
  303. master_port=m_port) == 'tcp://[{0}]:{1}'.format(m_ip6, m_port)
  304. # pass in only source_ip
  305. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip,
  306. master_port=m_port,
  307. source_ip=s_ip) == 'tcp://{0}:0;{1}:{2}'.format(s_ip, m_ip, m_port)
  308. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip6,
  309. master_port=m_port,
  310. source_ip=s_ip6) == 'tcp://[{0}]:0;[{1}]:{2}'.format(s_ip6, m_ip6, m_port)
  311. # pass in only source_port
  312. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip,
  313. master_port=m_port,
  314. source_port=s_port) == 'tcp://0.0.0.0:{0};{1}:{2}'.format(s_port, m_ip, m_port)
  315. @skipIf(WAR_ROOM_SKIP, 'WAR ROOM TEMPORARY SKIP')
  316. class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
  317. @classmethod
  318. def setUpClass(cls):
  319. ret_port = get_unused_localhost_port()
  320. publish_port = get_unused_localhost_port()
  321. tcp_master_pub_port = get_unused_localhost_port()
  322. tcp_master_pull_port = get_unused_localhost_port()
  323. tcp_master_publish_pull = get_unused_localhost_port()
  324. tcp_master_workers = get_unused_localhost_port()
  325. cls.master_config = cls.get_temp_config(
  326. 'master',
  327. **{'transport': 'zeromq',
  328. 'auto_accept': True,
  329. 'ret_port': ret_port,
  330. 'publish_port': publish_port,
  331. 'tcp_master_pub_port': tcp_master_pub_port,
  332. 'tcp_master_pull_port': tcp_master_pull_port,
  333. 'tcp_master_publish_pull': tcp_master_publish_pull,
  334. 'tcp_master_workers': tcp_master_workers,
  335. 'sign_pub_messages': False,
  336. }
  337. )
  338. salt.master.SMaster.secrets['aes'] = {
  339. 'secret': multiprocessing.Array(
  340. ctypes.c_char,
  341. six.b(salt.crypt.Crypticle.generate_key_string()),
  342. ),
  343. }
  344. cls.minion_config = cls.get_temp_config(
  345. 'minion',
  346. **{'transport': 'zeromq',
  347. 'master_ip': '127.0.0.1',
  348. 'master_port': ret_port,
  349. 'auth_timeout': 5,
  350. 'auth_tries': 1,
  351. 'master_uri': 'tcp://127.0.0.1:{0}'.format(ret_port)}
  352. )
  353. @classmethod
  354. def tearDownClass(cls):
  355. del cls.minion_config
  356. del cls.master_config
  357. def setUp(self):
  358. # Start the event loop, even though we dont directly use this with
  359. # ZeroMQPubServerChannel, having it running seems to increase the
  360. # likely hood of dropped messages.
  361. self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
  362. self.io_loop.make_current()
  363. self.io_loop_thread = threading.Thread(target=self.io_loop.start)
  364. self.io_loop_thread.start()
  365. self.process_manager = salt.utils.process.ProcessManager(name='PubServer_ProcessManager')
  366. def tearDown(self):
  367. self.io_loop.add_callback(self.io_loop.stop)
  368. self.io_loop_thread.join()
  369. self.process_manager.stop_restarting()
  370. self.process_manager.kill_children()
  371. del self.io_loop
  372. del self.io_loop_thread
  373. del self.process_manager
  374. @staticmethod
  375. def _gather_results(opts, pub_uri, results, timeout=120, messages=None):
  376. '''
  377. Gather results until then number of seconds specified by timeout passes
  378. without reveiving a message
  379. '''
  380. ctx = zmq.Context()
  381. sock = ctx.socket(zmq.SUB)
  382. sock.setsockopt(zmq.LINGER, -1)
  383. sock.setsockopt(zmq.SUBSCRIBE, b'')
  384. sock.connect(pub_uri)
  385. last_msg = time.time()
  386. serial = salt.payload.Serial(opts)
  387. crypticle = salt.crypt.Crypticle(opts, salt.master.SMaster.secrets['aes']['secret'].value)
  388. while time.time() - last_msg < timeout:
  389. try:
  390. payload = sock.recv(zmq.NOBLOCK)
  391. except zmq.ZMQError:
  392. time.sleep(.01)
  393. else:
  394. if messages:
  395. if messages != 1:
  396. messages -= 1
  397. continue
  398. payload = crypticle.loads(serial.loads(payload)['load'])
  399. if 'stop' in payload:
  400. break
  401. last_msg = time.time()
  402. results.append(payload['jid'])
  403. return results
  404. @skipIf(salt.utils.platform.is_windows(), 'Skip on Windows OS')
  405. def test_publish_to_pubserv_ipc(self):
  406. '''
  407. Test sending 10K messags to ZeroMQPubServerChannel using IPC transport
  408. ZMQ's ipc transport not supported on Windows
  409. '''
  410. opts = dict(self.master_config, ipc_mode='ipc', pub_hwm=0)
  411. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  412. server_channel.pre_fork(self.process_manager, kwargs={
  413. 'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
  414. })
  415. pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts)
  416. send_num = 10000
  417. expect = []
  418. results = []
  419. gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
  420. gather.start()
  421. # Allow time for server channel to start, especially on windows
  422. time.sleep(2)
  423. for i in range(send_num):
  424. expect.append(i)
  425. load = {'tgt_type': 'glob', 'tgt': '*', 'jid': i}
  426. server_channel.publish(load)
  427. server_channel.publish(
  428. {'tgt_type': 'glob', 'tgt': '*', 'stop': True}
  429. )
  430. gather.join()
  431. server_channel.pub_close()
  432. assert len(results) == send_num, (len(results), set(expect).difference(results))
  433. def test_zeromq_zeromq_filtering_decode_message_no_match(self):
  434. '''
  435. test AsyncZeroMQPubChannel _decode_messages when
  436. zmq_filtering enabled and minion does not match
  437. '''
  438. message = [b'4f26aeafdb2367620a393c973eddbe8f8b846eb',
  439. b'\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf'
  440. b'\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2'
  441. b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
  442. b'\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d'
  443. b'\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>'
  444. b'\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D']
  445. opts = dict(self.master_config, ipc_mode='ipc',
  446. pub_hwm=0, zmq_filtering=True, recon_randomize=False,
  447. recon_default=1, recon_max=2, master_ip='127.0.0.1',
  448. acceptance_wait_time=5, acceptance_wait_time_max=5)
  449. opts['master_uri'] = 'tcp://{interface}:{publish_port}'.format(**opts)
  450. server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  451. with patch('salt.crypt.AsyncAuth.crypticle',
  452. MagicMock(return_value={'tgt_type': 'glob', 'tgt': '*',
  453. 'jid': 1})) as mock_test:
  454. res = server_channel._decode_messages(message)
  455. assert res.result() is None
  456. def test_zeromq_zeromq_filtering_decode_message(self):
  457. '''
  458. test AsyncZeroMQPubChannel _decode_messages
  459. when zmq_filtered enabled
  460. '''
  461. message = [b'4f26aeafdb2367620a393c973eddbe8f8b846ebd',
  462. b'\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf'
  463. b'\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2'
  464. b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
  465. b'\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d'
  466. b'\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>'
  467. b'\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D']
  468. opts = dict(self.master_config, ipc_mode='ipc',
  469. pub_hwm=0, zmq_filtering=True, recon_randomize=False,
  470. recon_default=1, recon_max=2, master_ip='127.0.0.1',
  471. acceptance_wait_time=5, acceptance_wait_time_max=5)
  472. opts['master_uri'] = 'tcp://{interface}:{publish_port}'.format(**opts)
  473. server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  474. with patch('salt.crypt.AsyncAuth.crypticle',
  475. MagicMock(return_value={'tgt_type': 'glob', 'tgt': '*',
  476. 'jid': 1})) as mock_test:
  477. res = server_channel._decode_messages(message)
  478. assert res.result()['enc'] == 'aes'
  479. @skipIf(salt.utils.platform.is_windows(), 'Skip on Windows OS')
  480. def test_zeromq_filtering(self):
  481. '''
  482. Test sending messags to publisher using UDP
  483. with zeromq_filtering enabled
  484. '''
  485. opts = dict(self.master_config, ipc_mode='ipc',
  486. pub_hwm=0, zmq_filtering=True, acceptance_wait_time=5)
  487. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  488. server_channel.pre_fork(self.process_manager, kwargs={
  489. 'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
  490. })
  491. pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts)
  492. send_num = 1
  493. expect = []
  494. results = []
  495. gather = threading.Thread(target=self._gather_results,
  496. args=(self.minion_config, pub_uri, results,),
  497. kwargs={'messages': 2})
  498. gather.start()
  499. # Allow time for server channel to start, especially on windows
  500. time.sleep(2)
  501. expect.append(send_num)
  502. load = {'tgt_type': 'glob', 'tgt': '*', 'jid': send_num}
  503. with patch('salt.utils.minions.CkMinions.check_minions',
  504. MagicMock(return_value={'minions': ['minion'], 'missing': [],
  505. 'ssh_minions': False})):
  506. server_channel.publish(load)
  507. server_channel.publish(
  508. {'tgt_type': 'glob', 'tgt': '*', 'stop': True}
  509. )
  510. gather.join()
  511. server_channel.pub_close()
  512. assert len(results) == send_num, (len(results), set(expect).difference(results))
  513. def test_publish_to_pubserv_tcp(self):
  514. '''
  515. Test sending 10K messags to ZeroMQPubServerChannel using TCP transport
  516. '''
  517. opts = dict(self.master_config, ipc_mode='tcp', pub_hwm=0)
  518. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  519. server_channel.pre_fork(self.process_manager, kwargs={
  520. 'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
  521. })
  522. pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts)
  523. send_num = 10000
  524. expect = []
  525. results = []
  526. gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
  527. gather.start()
  528. # Allow time for server channel to start, especially on windows
  529. time.sleep(2)
  530. for i in range(send_num):
  531. expect.append(i)
  532. load = {'tgt_type': 'glob', 'tgt': '*', 'jid': i}
  533. server_channel.publish(load)
  534. gather.join()
  535. server_channel.pub_close()
  536. assert len(results) == send_num, (len(results), set(expect).difference(results))
  537. @staticmethod
  538. def _send_small(opts, sid, num=10):
  539. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  540. for i in range(num):
  541. load = {'tgt_type': 'glob', 'tgt': '*', 'jid': '{}-{}'.format(sid, i)}
  542. server_channel.publish(load)
  543. @staticmethod
  544. def _send_large(opts, sid, num=10, size=250000 * 3):
  545. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  546. for i in range(num):
  547. load = {'tgt_type': 'glob', 'tgt': '*', 'jid': '{}-{}'.format(sid, i), 'xdata': '0' * size}
  548. server_channel.publish(load)
  549. def test_issue_36469_tcp(self):
  550. '''
  551. Test sending both large and small messags to publisher using TCP
  552. https://github.com/saltstack/salt/issues/36469
  553. '''
  554. opts = dict(self.master_config, ipc_mode='tcp', pub_hwm=0)
  555. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  556. server_channel.pre_fork(self.process_manager, kwargs={
  557. 'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
  558. })
  559. send_num = 10 * 4
  560. expect = []
  561. results = []
  562. pub_uri = 'tcp://{interface}:{publish_port}'.format(**opts)
  563. # Allow time for server channel to start, especially on windows
  564. time.sleep(2)
  565. gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
  566. gather.start()
  567. with ThreadPoolExecutor(max_workers=4) as executor:
  568. executor.submit(self._send_small, opts, 1)
  569. executor.submit(self._send_small, opts, 2)
  570. executor.submit(self._send_small, opts, 3)
  571. executor.submit(self._send_large, opts, 4)
  572. expect = ['{}-{}'.format(a, b) for a in range(10) for b in (1, 2, 3, 4)]
  573. server_channel.publish({'tgt_type': 'glob', 'tgt': '*', 'stop': True})
  574. gather.join()
  575. server_channel.pub_close()
  576. assert len(results) == send_num, (len(results), set(expect).difference(results))
  577. @skipIf(salt.utils.platform.is_windows(), 'Skip on Windows OS')
  578. def test_issue_36469_udp(self):
  579. '''
  580. Test sending both large and small messags to publisher using UDP
  581. https://github.com/saltstack/salt/issues/36469
  582. '''
  583. opts = dict(self.master_config, ipc_mode='udp', pub_hwm=0)
  584. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  585. server_channel.pre_fork(self.process_manager, kwargs={
  586. 'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
  587. })
  588. send_num = 10 * 4
  589. expect = []
  590. results = []
  591. pub_uri = 'tcp://{interface}:{publish_port}'.format(**opts)
  592. # Allow time for server channel to start, especially on windows
  593. time.sleep(2)
  594. gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
  595. gather.start()
  596. with ThreadPoolExecutor(max_workers=4) as executor:
  597. executor.submit(self._send_small, opts, 1)
  598. executor.submit(self._send_small, opts, 2)
  599. executor.submit(self._send_small, opts, 3)
  600. executor.submit(self._send_large, opts, 4)
  601. expect = ['{}-{}'.format(a, b) for a in range(10) for b in (1, 2, 3, 4)]
  602. time.sleep(0.1)
  603. server_channel.publish({'tgt_type': 'glob', 'tgt': '*', 'stop': True})
  604. gather.join()
  605. server_channel.pub_close()
  606. assert len(results) == send_num, (len(results), set(expect).difference(results))