1
0

test_zeromq.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: Thomas Jackson <jacksontj.89@gmail.com>
  4. '''
  5. # Import python libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import os
  8. import time
  9. import threading
  10. import multiprocessing
  11. import ctypes
  12. from concurrent.futures.thread import ThreadPoolExecutor
  13. from distro import linux_distribution
  14. # Import 3rd-party libs
  15. import zmq.eventloop.ioloop
  16. # support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
  17. if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
  18. zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
  19. from salt.ext.tornado.testing import AsyncTestCase
  20. import salt.ext.tornado.gen
  21. # Import Salt libs
  22. import salt.config
  23. import salt.log.setup
  24. from salt.ext import six
  25. import salt.utils.process
  26. import salt.utils.platform
  27. import salt.transport.server
  28. import salt.transport.client
  29. import salt.exceptions
  30. from salt.ext.six.moves import range
  31. from salt.transport.zeromq import AsyncReqMessageClientPool
  32. import salt.ext.tornado.ioloop
  33. # Import test support libs
  34. from tests.support.runtests import RUNTIME_VARS
  35. from tests.support.unit import TestCase, skipIf
  36. from tests.support.helpers import flaky, get_unused_localhost_port
  37. from tests.support.mixins import AdaptedConfigurationTestCaseMixin
  38. from tests.support.mock import MagicMock, patch
  39. from tests.unit.transport.mixins import PubChannelMixin, ReqChannelMixin, run_loop_in_thread
  40. ON_SUSE = False
  41. if 'SuSE' in linux_distribution(full_distribution_name=False):
  42. ON_SUSE = True
  43. class BaseZMQReqCase(TestCase, AdaptedConfigurationTestCaseMixin):
  44. '''
  45. Test the req server/client pair
  46. '''
  47. @classmethod
  48. def setUpClass(cls):
  49. if not hasattr(cls, '_handle_payload'):
  50. return
  51. ret_port = get_unused_localhost_port()
  52. publish_port = get_unused_localhost_port()
  53. tcp_master_pub_port = get_unused_localhost_port()
  54. tcp_master_pull_port = get_unused_localhost_port()
  55. tcp_master_publish_pull = get_unused_localhost_port()
  56. tcp_master_workers = get_unused_localhost_port()
  57. cls.master_config = cls.get_temp_config(
  58. 'master',
  59. **{'transport': 'zeromq',
  60. 'auto_accept': True,
  61. 'ret_port': ret_port,
  62. 'publish_port': publish_port,
  63. 'tcp_master_pub_port': tcp_master_pub_port,
  64. 'tcp_master_pull_port': tcp_master_pull_port,
  65. 'tcp_master_publish_pull': tcp_master_publish_pull,
  66. 'tcp_master_workers': tcp_master_workers}
  67. )
  68. cls.minion_config = cls.get_temp_config(
  69. 'minion',
  70. **{'transport': 'zeromq',
  71. 'master_ip': '127.0.0.1',
  72. 'master_port': ret_port,
  73. 'auth_timeout': 5,
  74. 'auth_tries': 1,
  75. 'master_uri': 'tcp://127.0.0.1:{0}'.format(ret_port)}
  76. )
  77. cls.process_manager = salt.utils.process.ProcessManager(name='ReqServer_ProcessManager')
  78. cls.server_channel = salt.transport.server.ReqServerChannel.factory(cls.master_config)
  79. cls.server_channel.pre_fork(cls.process_manager)
  80. cls.io_loop = salt.ext.tornado.ioloop.IOLoop()
  81. cls.evt = threading.Event()
  82. cls.server_channel.post_fork(cls._handle_payload, io_loop=cls.io_loop)
  83. cls.server_thread = threading.Thread(target=run_loop_in_thread, args=(cls.io_loop, cls.evt))
  84. cls.server_thread.start()
  85. @classmethod
  86. def tearDownClass(cls):
  87. if not hasattr(cls, '_handle_payload'):
  88. return
  89. # Attempting to kill the children hangs the test suite.
  90. # Let the test suite handle this instead.
  91. cls.process_manager.stop_restarting()
  92. cls.process_manager.kill_children()
  93. cls.evt.set()
  94. cls.server_thread.join()
  95. time.sleep(2) # Give the procs a chance to fully close before we stop the io_loop
  96. cls.server_channel.close()
  97. del cls.server_channel
  98. del cls.io_loop
  99. del cls.process_manager
  100. del cls.server_thread
  101. del cls.master_config
  102. del cls.minion_config
  103. @classmethod
  104. def _handle_payload(cls, payload):
  105. '''
  106. TODO: something besides echo
  107. '''
  108. return payload, {'fun': 'send_clear'}
  109. class ClearReqTestCases(BaseZMQReqCase, ReqChannelMixin):
  110. '''
  111. Test all of the clear msg stuff
  112. '''
  113. def setUp(self):
  114. self.channel = salt.transport.client.ReqChannel.factory(self.minion_config, crypt='clear')
  115. def tearDown(self):
  116. self.channel.close()
  117. del self.channel
  118. @classmethod
  119. @salt.ext.tornado.gen.coroutine
  120. def _handle_payload(cls, payload):
  121. '''
  122. TODO: something besides echo
  123. '''
  124. raise salt.ext.tornado.gen.Return((payload, {'fun': 'send_clear'}))
  125. def test_master_uri_override(self):
  126. '''
  127. ensure master_uri kwarg is respected
  128. '''
  129. # minion_config should be 127.0.0.1, we want a different uri that still connects
  130. uri = 'tcp://{master_ip}:{master_port}'.format(master_ip='localhost', master_port=self.minion_config['master_port'])
  131. channel = salt.transport.Channel.factory(self.minion_config, master_uri=uri)
  132. self.assertIn('localhost', channel.master_uri)
  133. del channel
  134. @flaky
  135. @skipIf(ON_SUSE, 'Skipping until https://github.com/saltstack/salt/issues/32902 gets fixed')
  136. class AESReqTestCases(BaseZMQReqCase, ReqChannelMixin):
  137. def setUp(self):
  138. self.channel = salt.transport.client.ReqChannel.factory(self.minion_config)
  139. def tearDown(self):
  140. self.channel.close()
  141. del self.channel
  142. @classmethod
  143. @salt.ext.tornado.gen.coroutine
  144. def _handle_payload(cls, payload):
  145. '''
  146. TODO: something besides echo
  147. '''
  148. raise salt.ext.tornado.gen.Return((payload, {'fun': 'send'}))
  149. # TODO: make failed returns have a specific framing so we can raise the same exception
  150. # on encrypted channels
  151. #
  152. #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  153. #
  154. # WARNING: This test will fail randomly on any system with > 1 CPU core!!!
  155. #
  156. #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  157. def test_badload(self):
  158. '''
  159. Test a variety of bad requests, make sure that we get some sort of error
  160. '''
  161. # TODO: This test should be re-enabled when Jenkins moves to C7.
  162. # Once the version of salt-testing is increased to something newer than the September
  163. # release of salt-testing, the @flaky decorator should be applied to this test.
  164. msgs = ['', [], tuple()]
  165. for msg in msgs:
  166. with self.assertRaises(salt.exceptions.AuthenticationError):
  167. ret = self.channel.send(msg, timeout=5)
  168. class BaseZMQPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin):
  169. '''
  170. Test the req server/client pair
  171. '''
  172. @classmethod
  173. def setUpClass(cls):
  174. ret_port = get_unused_localhost_port()
  175. publish_port = get_unused_localhost_port()
  176. tcp_master_pub_port = get_unused_localhost_port()
  177. tcp_master_pull_port = get_unused_localhost_port()
  178. tcp_master_publish_pull = get_unused_localhost_port()
  179. tcp_master_workers = get_unused_localhost_port()
  180. cls.master_config = cls.get_temp_config(
  181. 'master',
  182. **{'transport': 'zeromq',
  183. 'auto_accept': True,
  184. 'ret_port': ret_port,
  185. 'publish_port': publish_port,
  186. 'tcp_master_pub_port': tcp_master_pub_port,
  187. 'tcp_master_pull_port': tcp_master_pull_port,
  188. 'tcp_master_publish_pull': tcp_master_publish_pull,
  189. 'tcp_master_workers': tcp_master_workers}
  190. )
  191. cls.minion_config = salt.config.minion_config(os.path.join(RUNTIME_VARS.TMP_CONF_DIR, 'minion'))
  192. cls.minion_config = cls.get_temp_config(
  193. 'minion',
  194. **{'transport': 'zeromq',
  195. 'master_ip': '127.0.0.1',
  196. 'master_port': ret_port,
  197. 'master_uri': 'tcp://127.0.0.1:{0}'.format(ret_port)}
  198. )
  199. cls.process_manager = salt.utils.process.ProcessManager(name='ReqServer_ProcessManager')
  200. cls.server_channel = salt.transport.server.PubServerChannel.factory(cls.master_config)
  201. cls.server_channel.pre_fork(cls.process_manager)
  202. # we also require req server for auth
  203. cls.req_server_channel = salt.transport.server.ReqServerChannel.factory(cls.master_config)
  204. cls.req_server_channel.pre_fork(cls.process_manager)
  205. cls._server_io_loop = salt.ext.tornado.ioloop.IOLoop()
  206. cls.evt = threading.Event()
  207. cls.req_server_channel.post_fork(cls._handle_payload, io_loop=cls._server_io_loop)
  208. cls.server_thread = threading.Thread(target=run_loop_in_thread, args=(cls._server_io_loop, cls.evt))
  209. cls.server_thread.start()
  210. @classmethod
  211. def tearDownClass(cls):
  212. cls.process_manager.kill_children()
  213. cls.process_manager.stop_restarting()
  214. time.sleep(2) # Give the procs a chance to fully close before we stop the io_loop
  215. cls.evt.set()
  216. cls.server_thread.join()
  217. cls.req_server_channel.close()
  218. cls.server_channel.close()
  219. cls._server_io_loop.stop()
  220. del cls.server_channel
  221. del cls._server_io_loop
  222. del cls.process_manager
  223. del cls.server_thread
  224. del cls.master_config
  225. del cls.minion_config
  226. @classmethod
  227. def _handle_payload(cls, payload):
  228. '''
  229. TODO: something besides echo
  230. '''
  231. return payload, {'fun': 'send_clear'}
  232. def setUp(self):
  233. super(BaseZMQPubCase, self).setUp()
  234. self._start_handlers = dict(self.io_loop._handlers)
  235. def tearDown(self):
  236. super(BaseZMQPubCase, self).tearDown()
  237. failures = []
  238. for k, v in six.iteritems(self.io_loop._handlers):
  239. if self._start_handlers.get(k) != v:
  240. failures.append((k, v))
  241. del self._start_handlers
  242. if len(failures) > 0:
  243. raise Exception('FDs still attached to the IOLoop: {0}'.format(failures))
  244. @skipIf(True, 'Skip until we can devote time to fix this test')
  245. class AsyncPubChannelTest(BaseZMQPubCase, PubChannelMixin):
  246. '''
  247. Tests around the publish system
  248. '''
  249. def get_new_ioloop(self):
  250. return salt.ext.tornado.ioloop.IOLoop()
  251. class AsyncReqMessageClientPoolTest(TestCase):
  252. def setUp(self):
  253. super(AsyncReqMessageClientPoolTest, self).setUp()
  254. sock_pool_size = 5
  255. with patch('salt.transport.zeromq.AsyncReqMessageClient.__init__', MagicMock(return_value=None)):
  256. self.message_client_pool = AsyncReqMessageClientPool({'sock_pool_size': sock_pool_size},
  257. args=({}, ''))
  258. self.original_message_clients = self.message_client_pool.message_clients
  259. self.message_client_pool.message_clients = [MagicMock() for _ in range(sock_pool_size)]
  260. def tearDown(self):
  261. with patch('salt.transport.zeromq.AsyncReqMessageClient.destroy', MagicMock(return_value=None)):
  262. del self.original_message_clients
  263. super(AsyncReqMessageClientPoolTest, self).tearDown()
  264. def test_send(self):
  265. for message_client_mock in self.message_client_pool.message_clients:
  266. message_client_mock.send_queue = [0, 0, 0]
  267. message_client_mock.send.return_value = []
  268. self.assertEqual([], self.message_client_pool.send())
  269. self.message_client_pool.message_clients[2].send_queue = [0]
  270. self.message_client_pool.message_clients[2].send.return_value = [1]
  271. self.assertEqual([1], self.message_client_pool.send())
  272. def test_destroy(self):
  273. self.message_client_pool.destroy()
  274. self.assertEqual([], self.message_client_pool.message_clients)
  275. class ZMQConfigTest(TestCase):
  276. def test_master_uri(self):
  277. '''
  278. test _get_master_uri method
  279. '''
  280. m_ip = '127.0.0.1'
  281. m_port = 4505
  282. s_ip = '111.1.0.1'
  283. s_port = 4058
  284. m_ip6 = '1234:5678::9abc'
  285. s_ip6 = '1234:5678::1:9abc'
  286. with patch('salt.transport.zeromq.LIBZMQ_VERSION_INFO', (4, 1, 6)), \
  287. patch('salt.transport.zeromq.ZMQ_VERSION_INFO', (16, 0, 1)):
  288. # pass in both source_ip and source_port
  289. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip,
  290. master_port=m_port,
  291. source_ip=s_ip,
  292. source_port=s_port) == 'tcp://{0}:{1};{2}:{3}'.format(s_ip, s_port, m_ip, m_port)
  293. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip6,
  294. master_port=m_port,
  295. source_ip=s_ip6,
  296. source_port=s_port) == 'tcp://[{0}]:{1};[{2}]:{3}'.format(s_ip6, s_port, m_ip6, m_port)
  297. # source ip and source_port empty
  298. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip,
  299. master_port=m_port) == 'tcp://{0}:{1}'.format(m_ip, m_port)
  300. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip6,
  301. master_port=m_port) == 'tcp://[{0}]:{1}'.format(m_ip6, m_port)
  302. # pass in only source_ip
  303. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip,
  304. master_port=m_port,
  305. source_ip=s_ip) == 'tcp://{0}:0;{1}:{2}'.format(s_ip, m_ip, m_port)
  306. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip6,
  307. master_port=m_port,
  308. source_ip=s_ip6) == 'tcp://[{0}]:0;[{1}]:{2}'.format(s_ip6, m_ip6, m_port)
  309. # pass in only source_port
  310. assert salt.transport.zeromq._get_master_uri(master_ip=m_ip,
  311. master_port=m_port,
  312. source_port=s_port) == 'tcp://0.0.0.0:{0};{1}:{2}'.format(s_port, m_ip, m_port)
  313. class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin):
  314. @classmethod
  315. def setUpClass(cls):
  316. ret_port = get_unused_localhost_port()
  317. publish_port = get_unused_localhost_port()
  318. tcp_master_pub_port = get_unused_localhost_port()
  319. tcp_master_pull_port = get_unused_localhost_port()
  320. tcp_master_publish_pull = get_unused_localhost_port()
  321. tcp_master_workers = get_unused_localhost_port()
  322. cls.master_config = cls.get_temp_config(
  323. 'master',
  324. **{'transport': 'zeromq',
  325. 'auto_accept': True,
  326. 'ret_port': ret_port,
  327. 'publish_port': publish_port,
  328. 'tcp_master_pub_port': tcp_master_pub_port,
  329. 'tcp_master_pull_port': tcp_master_pull_port,
  330. 'tcp_master_publish_pull': tcp_master_publish_pull,
  331. 'tcp_master_workers': tcp_master_workers,
  332. 'sign_pub_messages': False,
  333. }
  334. )
  335. salt.master.SMaster.secrets['aes'] = {
  336. 'secret': multiprocessing.Array(
  337. ctypes.c_char,
  338. six.b(salt.crypt.Crypticle.generate_key_string()),
  339. ),
  340. }
  341. cls.minion_config = cls.get_temp_config(
  342. 'minion',
  343. **{'transport': 'zeromq',
  344. 'master_ip': '127.0.0.1',
  345. 'master_port': ret_port,
  346. 'auth_timeout': 5,
  347. 'auth_tries': 1,
  348. 'master_uri': 'tcp://127.0.0.1:{0}'.format(ret_port)}
  349. )
  350. @classmethod
  351. def tearDownClass(cls):
  352. del cls.minion_config
  353. del cls.master_config
  354. def setUp(self):
  355. # Start the event loop, even though we dont directly use this with
  356. # ZeroMQPubServerChannel, having it running seems to increase the
  357. # likely hood of dropped messages.
  358. self.io_loop = salt.ext.tornado.ioloop.IOLoop()
  359. self.io_loop.make_current()
  360. self.io_loop_thread = threading.Thread(target=self.io_loop.start)
  361. self.io_loop_thread.start()
  362. self.process_manager = salt.utils.process.ProcessManager(name='PubServer_ProcessManager')
  363. def tearDown(self):
  364. self.io_loop.add_callback(self.io_loop.stop)
  365. self.io_loop_thread.join()
  366. self.process_manager.stop_restarting()
  367. self.process_manager.kill_children()
  368. del self.io_loop
  369. del self.io_loop_thread
  370. del self.process_manager
  371. @staticmethod
  372. def _gather_results(opts, pub_uri, results, timeout=120, messages=None):
  373. '''
  374. Gather results until then number of seconds specified by timeout passes
  375. without reveiving a message
  376. '''
  377. ctx = zmq.Context()
  378. sock = ctx.socket(zmq.SUB)
  379. sock.setsockopt(zmq.LINGER, -1)
  380. sock.setsockopt(zmq.SUBSCRIBE, b'')
  381. sock.connect(pub_uri)
  382. last_msg = time.time()
  383. serial = salt.payload.Serial(opts)
  384. crypticle = salt.crypt.Crypticle(opts, salt.master.SMaster.secrets['aes']['secret'].value)
  385. while time.time() - last_msg < timeout:
  386. try:
  387. payload = sock.recv(zmq.NOBLOCK)
  388. except zmq.ZMQError:
  389. time.sleep(.01)
  390. else:
  391. if messages:
  392. if messages != 1:
  393. messages -= 1
  394. continue
  395. payload = crypticle.loads(serial.loads(payload)['load'])
  396. if 'stop' in payload:
  397. break
  398. last_msg = time.time()
  399. results.append(payload['jid'])
  400. @skipIf(salt.utils.platform.is_windows(), 'Skip on Windows OS')
  401. def test_publish_to_pubserv_ipc(self):
  402. '''
  403. Test sending 10K messags to ZeroMQPubServerChannel using IPC transport
  404. ZMQ's ipc transport not supported on Windows
  405. '''
  406. opts = dict(self.master_config, ipc_mode='ipc', pub_hwm=0)
  407. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  408. server_channel.pre_fork(self.process_manager, kwargs={
  409. 'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
  410. })
  411. pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts)
  412. send_num = 10000
  413. expect = []
  414. results = []
  415. gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
  416. gather.start()
  417. # Allow time for server channel to start, especially on windows
  418. time.sleep(2)
  419. for i in range(send_num):
  420. expect.append(i)
  421. load = {'tgt_type': 'glob', 'tgt': '*', 'jid': i}
  422. server_channel.publish(load)
  423. server_channel.publish(
  424. {'tgt_type': 'glob', 'tgt': '*', 'stop': True}
  425. )
  426. gather.join()
  427. server_channel.pub_close()
  428. assert len(results) == send_num, (len(results), set(expect).difference(results))
  429. def test_zeromq_publish_port(self):
  430. '''
  431. test when connecting that we
  432. use the publish_port set in opts
  433. when its not 4506
  434. '''
  435. opts = dict(self.master_config, ipc_mode='ipc',
  436. pub_hwm=0, recon_randomize=False,
  437. publish_port=455505,
  438. recon_default=1, recon_max=2, master_ip='127.0.0.1',
  439. acceptance_wait_time=5, acceptance_wait_time_max=5)
  440. opts['master_uri'] = 'tcp://{interface}:{publish_port}'.format(**opts)
  441. channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  442. patch_socket = MagicMock(return_value=True)
  443. patch_auth = MagicMock(return_value=True)
  444. with patch.object(channel, '_socket', patch_socket), \
  445. patch.object(channel, 'auth', patch_auth):
  446. channel.connect()
  447. assert str(opts['publish_port']) in patch_socket.mock_calls[0][1][0]
  448. def test_zeromq_zeromq_filtering_decode_message_no_match(self):
  449. '''
  450. test AsyncZeroMQPubChannel _decode_messages when
  451. zmq_filtering enabled and minion does not match
  452. '''
  453. message = [b'4f26aeafdb2367620a393c973eddbe8f8b846eb',
  454. b'\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf'
  455. b'\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2'
  456. b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
  457. b'\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d'
  458. b'\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>'
  459. b'\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D']
  460. opts = dict(self.master_config, ipc_mode='ipc',
  461. pub_hwm=0, zmq_filtering=True, recon_randomize=False,
  462. recon_default=1, recon_max=2, master_ip='127.0.0.1',
  463. acceptance_wait_time=5, acceptance_wait_time_max=5)
  464. opts['master_uri'] = 'tcp://{interface}:{publish_port}'.format(**opts)
  465. server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  466. with patch('salt.crypt.AsyncAuth.crypticle',
  467. MagicMock(return_value={'tgt_type': 'glob', 'tgt': '*',
  468. 'jid': 1})) as mock_test:
  469. res = server_channel._decode_messages(message)
  470. assert res.result() is None
  471. def test_zeromq_zeromq_filtering_decode_message(self):
  472. '''
  473. test AsyncZeroMQPubChannel _decode_messages
  474. when zmq_filtered enabled
  475. '''
  476. message = [b'4f26aeafdb2367620a393c973eddbe8f8b846ebd',
  477. b'\x82\xa3enc\xa3aes\xa4load\xda\x00`\xeeR\xcf'
  478. b'\x0eaI#V\x17if\xcf\xae\x05\xa7\xb3bN\xf7\xb2\xe2'
  479. b'\xd0sF\xd1\xd4\xecB\xe8\xaf"/*ml\x80Q3\xdb\xaexg'
  480. b'\x8e\x8a\x8c\xd3l\x03\\,J\xa7\x01i\xd1:]\xe3\x8d'
  481. b'\xf4\x03\x88K\x84\n`\xe8\x9a\xad\xad\xc6\x8ea\x15>'
  482. b'\x92m\x9e\xc7aM\x11?\x18;\xbd\x04c\x07\x85\x99\xa3\xea[\x00D']
  483. opts = dict(self.master_config, ipc_mode='ipc',
  484. pub_hwm=0, zmq_filtering=True, recon_randomize=False,
  485. recon_default=1, recon_max=2, master_ip='127.0.0.1',
  486. acceptance_wait_time=5, acceptance_wait_time_max=5)
  487. opts['master_uri'] = 'tcp://{interface}:{publish_port}'.format(**opts)
  488. server_channel = salt.transport.zeromq.AsyncZeroMQPubChannel(opts)
  489. with patch('salt.crypt.AsyncAuth.crypticle',
  490. MagicMock(return_value={'tgt_type': 'glob', 'tgt': '*',
  491. 'jid': 1})) as mock_test:
  492. res = server_channel._decode_messages(message)
  493. assert res.result()['enc'] == 'aes'
  494. @skipIf(salt.utils.platform.is_windows(), 'Skip on Windows OS')
  495. def test_zeromq_filtering(self):
  496. '''
  497. Test sending messags to publisher using UDP
  498. with zeromq_filtering enabled
  499. '''
  500. opts = dict(self.master_config, ipc_mode='ipc',
  501. pub_hwm=0, zmq_filtering=True, acceptance_wait_time=5)
  502. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  503. server_channel.pre_fork(self.process_manager, kwargs={
  504. 'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
  505. })
  506. pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts)
  507. send_num = 1
  508. expect = []
  509. results = []
  510. gather = threading.Thread(target=self._gather_results,
  511. args=(self.minion_config, pub_uri, results,),
  512. kwargs={'messages': 2})
  513. gather.start()
  514. # Allow time for server channel to start, especially on windows
  515. time.sleep(2)
  516. expect.append(send_num)
  517. load = {'tgt_type': 'glob', 'tgt': '*', 'jid': send_num}
  518. with patch('salt.utils.minions.CkMinions.check_minions',
  519. MagicMock(return_value={'minions': ['minion'], 'missing': [],
  520. 'ssh_minions': False})):
  521. server_channel.publish(load)
  522. server_channel.publish(
  523. {'tgt_type': 'glob', 'tgt': '*', 'stop': True}
  524. )
  525. gather.join()
  526. server_channel.pub_close()
  527. assert len(results) == send_num, (len(results), set(expect).difference(results))
  528. def test_publish_to_pubserv_tcp(self):
  529. '''
  530. Test sending 10K messags to ZeroMQPubServerChannel using TCP transport
  531. '''
  532. opts = dict(self.master_config, ipc_mode='tcp', pub_hwm=0)
  533. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  534. server_channel.pre_fork(self.process_manager, kwargs={
  535. 'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
  536. })
  537. pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts)
  538. send_num = 10000
  539. expect = []
  540. results = []
  541. gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
  542. gather.start()
  543. # Allow time for server channel to start, especially on windows
  544. time.sleep(2)
  545. for i in range(send_num):
  546. expect.append(i)
  547. load = {'tgt_type': 'glob', 'tgt': '*', 'jid': i}
  548. server_channel.publish(load)
  549. gather.join()
  550. server_channel.pub_close()
  551. assert len(results) == send_num, (len(results), set(expect).difference(results))
  552. @staticmethod
  553. def _send_small(opts, sid, num=10):
  554. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  555. for i in range(num):
  556. load = {'tgt_type': 'glob', 'tgt': '*', 'jid': '{}-{}'.format(sid, i)}
  557. server_channel.publish(load)
  558. server_channel.close()
  559. @staticmethod
  560. def _send_large(opts, sid, num=10, size=250000 * 3):
  561. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  562. for i in range(num):
  563. load = {'tgt_type': 'glob', 'tgt': '*', 'jid': '{}-{}'.format(sid, i), 'xdata': '0' * size}
  564. server_channel.publish(load)
  565. server_channel.close()
  566. def test_issue_36469_tcp(self):
  567. '''
  568. Test sending both large and small messags to publisher using TCP
  569. https://github.com/saltstack/salt/issues/36469
  570. '''
  571. opts = dict(self.master_config, ipc_mode='tcp', pub_hwm=0)
  572. server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts)
  573. server_channel.pre_fork(self.process_manager, kwargs={
  574. 'log_queue': salt.log.setup.get_multiprocessing_logging_queue()
  575. })
  576. send_num = 10 * 4
  577. expect = []
  578. results = []
  579. pub_uri = 'tcp://{interface}:{publish_port}'.format(**opts)
  580. # Allow time for server channel to start, especially on windows
  581. time.sleep(2)
  582. gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,))
  583. gather.start()
  584. with ThreadPoolExecutor(max_workers=4) as executor:
  585. executor.submit(self._send_small, opts, 1)
  586. executor.submit(self._send_small, opts, 2)
  587. executor.submit(self._send_small, opts, 3)
  588. executor.submit(self._send_large, opts, 4)
  589. expect = ['{}-{}'.format(a, b) for a in range(10) for b in (1, 2, 3, 4)]
  590. time.sleep(0.1)
  591. server_channel.publish({'tgt_type': 'glob', 'tgt': '*', 'stop': True})
  592. gather.join()
  593. server_channel.pub_close()
  594. assert len(results) == send_num, (len(results), set(expect).difference(results))