test_app.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import os
  4. import threading
  5. import time
  6. import pytest
  7. import salt.utils.json
  8. import salt.utils.stringutils
  9. from salt.ext import six
  10. from salt.netapi.rest_tornado import saltnado
  11. from salt.utils.versions import StrictVersion
  12. from salt.utils.zeromq import ZMQDefaultLoop as ZMQIOLoop
  13. from salt.utils.zeromq import zmq
  14. from tests.support.helpers import flaky, slowTest
  15. from tests.support.unit import skipIf
  16. from tests.unit.netapi.test_rest_tornado import SaltnadoTestCase
  17. HAS_ZMQ_IOLOOP = bool(zmq)
  18. class _SaltnadoIntegrationTestCase(SaltnadoTestCase): # pylint: disable=abstract-method
  19. @property
  20. def opts(self):
  21. return self.get_config("client_config", from_scratch=True)
  22. @property
  23. def mod_opts(self):
  24. return self.get_config("minion", from_scratch=True)
  25. @skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
  26. @skipIf(
  27. StrictVersion(zmq.__version__) < StrictVersion("14.0.1"),
  28. "PyZMQ must be >= 14.0.1 to run these tests.",
  29. )
  30. @pytest.mark.usefixtures("salt_sub_minion")
  31. class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):
  32. def setUp(self):
  33. super(TestSaltAPIHandler, self).setUp()
  34. os.environ["ASYNC_TEST_TIMEOUT"] = "300"
  35. def get_app(self):
  36. urls = [("/", saltnado.SaltAPIHandler)]
  37. application = self.build_tornado_app(urls)
  38. application.event_listener = saltnado.EventListener({}, self.opts)
  39. self.application = application
  40. return application
  41. def test_root(self):
  42. """
  43. Test the root path which returns the list of clients we support
  44. """
  45. response = self.fetch("/", connect_timeout=30, request_timeout=30,)
  46. self.assertEqual(response.code, 200)
  47. response_obj = salt.utils.json.loads(response.body)
  48. self.assertEqual(
  49. sorted(response_obj["clients"]),
  50. ["local", "local_async", "runner", "runner_async"],
  51. )
  52. self.assertEqual(response_obj["return"], "Welcome")
  53. @slowTest
  54. def test_post_no_auth(self):
  55. """
  56. Test post with no auth token, should 401
  57. """
  58. # get a token for this test
  59. low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
  60. response = self.fetch(
  61. "/",
  62. method="POST",
  63. body=salt.utils.json.dumps(low),
  64. headers={"Content-Type": self.content_type_map["json"]},
  65. follow_redirects=False,
  66. connect_timeout=30,
  67. request_timeout=30,
  68. )
  69. self.assertEqual(response.code, 302)
  70. self.assertEqual(response.headers["Location"], "/login")
  71. # Local client tests
  72. @skipIf(True, "to be re-enabled when #23623 is merged")
  73. def test_simple_local_post(self):
  74. """
  75. Test a basic API of /
  76. """
  77. low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
  78. response = self.fetch(
  79. "/",
  80. method="POST",
  81. body=salt.utils.json.dumps(low),
  82. headers={
  83. "Content-Type": self.content_type_map["json"],
  84. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  85. },
  86. connect_timeout=30,
  87. request_timeout=30,
  88. )
  89. response_obj = salt.utils.json.loads(response.body)
  90. self.assertEqual(len(response_obj["return"]), 1)
  91. # If --proxy is set, it will cause an extra minion_id to be in the
  92. # response. Since there's not a great way to know if the test
  93. # runner's proxy minion is running, and we're not testing proxy
  94. # minions here anyway, just remove it from the response.
  95. response_obj["return"][0].pop("proxytest", None)
  96. self.assertEqual(
  97. response_obj["return"][0], {"minion": True, "sub_minion": True}
  98. )
  99. def test_simple_local_post_no_tgt(self):
  100. """
  101. POST job with invalid tgt
  102. """
  103. low = [{"client": "local", "tgt": "minion_we_dont_have", "fun": "test.ping"}]
  104. response = self.fetch(
  105. "/",
  106. method="POST",
  107. body=salt.utils.json.dumps(low),
  108. headers={
  109. "Content-Type": self.content_type_map["json"],
  110. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  111. },
  112. connect_timeout=30,
  113. request_timeout=30,
  114. )
  115. response_obj = salt.utils.json.loads(response.body)
  116. self.assertEqual(
  117. response_obj["return"],
  118. [
  119. "No minions matched the target. No command was sent, no jid was assigned."
  120. ],
  121. )
  122. # local client request body test
  123. @skipIf(True, "Undetermined race condition in test. Temporarily disabled.")
  124. def test_simple_local_post_only_dictionary_request(self):
  125. """
  126. Test a basic API of /
  127. """
  128. low = {
  129. "client": "local",
  130. "tgt": "*",
  131. "fun": "test.ping",
  132. }
  133. response = self.fetch(
  134. "/",
  135. method="POST",
  136. body=salt.utils.json.dumps(low),
  137. headers={
  138. "Content-Type": self.content_type_map["json"],
  139. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  140. },
  141. connect_timeout=30,
  142. request_timeout=30,
  143. )
  144. response_obj = salt.utils.json.loads(response.body)
  145. self.assertEqual(len(response_obj["return"]), 1)
  146. # If --proxy is set, it will cause an extra minion_id to be in the
  147. # response. Since there's not a great way to know if the test
  148. # runner's proxy minion is running, and we're not testing proxy
  149. # minions here anyway, just remove it from the response.
  150. response_obj["return"][0].pop("proxytest", None)
  151. self.assertEqual(
  152. response_obj["return"][0], {"minion": True, "sub_minion": True}
  153. )
  154. def test_simple_local_post_invalid_request(self):
  155. """
  156. Test a basic API of /
  157. """
  158. low = ["invalid request"]
  159. response = self.fetch(
  160. "/",
  161. method="POST",
  162. body=salt.utils.json.dumps(low),
  163. headers={
  164. "Content-Type": self.content_type_map["json"],
  165. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  166. },
  167. connect_timeout=30,
  168. request_timeout=30,
  169. )
  170. self.assertEqual(response.code, 400)
  171. # local_async tests
  172. def test_simple_local_async_post(self):
  173. low = [{"client": "local_async", "tgt": "*", "fun": "test.ping"}]
  174. response = self.fetch(
  175. "/",
  176. method="POST",
  177. body=salt.utils.json.dumps(low),
  178. headers={
  179. "Content-Type": self.content_type_map["json"],
  180. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  181. },
  182. )
  183. response_obj = salt.utils.json.loads(response.body)
  184. ret = response_obj["return"]
  185. ret[0]["minions"] = sorted(ret[0]["minions"])
  186. try:
  187. # If --proxy is set, it will cause an extra minion_id to be in the
  188. # response. Since there's not a great way to know if the test
  189. # runner's proxy minion is running, and we're not testing proxy
  190. # minions here anyway, just remove it from the response.
  191. ret[0]["minions"].remove("proxytest")
  192. except ValueError:
  193. pass
  194. # TODO: verify pub function? Maybe look at how we test the publisher
  195. self.assertEqual(len(ret), 1)
  196. self.assertIn("jid", ret[0])
  197. self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
  198. def test_multi_local_async_post(self):
  199. low = [
  200. {"client": "local_async", "tgt": "*", "fun": "test.ping"},
  201. {"client": "local_async", "tgt": "*", "fun": "test.ping"},
  202. ]
  203. response = self.fetch(
  204. "/",
  205. method="POST",
  206. body=salt.utils.json.dumps(low),
  207. headers={
  208. "Content-Type": self.content_type_map["json"],
  209. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  210. },
  211. )
  212. response_obj = salt.utils.json.loads(response.body)
  213. ret = response_obj["return"]
  214. ret[0]["minions"] = sorted(ret[0]["minions"])
  215. ret[1]["minions"] = sorted(ret[1]["minions"])
  216. try:
  217. # If --proxy is set, it will cause an extra minion_id to be in the
  218. # response. Since there's not a great way to know if the test
  219. # runner's proxy minion is running, and we're not testing proxy
  220. # minions here anyway, just remove it from the response.
  221. ret[0]["minions"].remove("proxytest")
  222. ret[1]["minions"].remove("proxytest")
  223. except ValueError:
  224. pass
  225. self.assertEqual(len(ret), 2)
  226. self.assertIn("jid", ret[0])
  227. self.assertIn("jid", ret[1])
  228. self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
  229. self.assertEqual(ret[1]["minions"], sorted(["minion", "sub_minion"]))
  230. @slowTest
  231. def test_multi_local_async_post_multitoken(self):
  232. low = [
  233. {"client": "local_async", "tgt": "*", "fun": "test.ping"},
  234. {
  235. "client": "local_async",
  236. "tgt": "*",
  237. "fun": "test.ping",
  238. "token": self.token[
  239. "token"
  240. ], # send a different (but still valid token)
  241. },
  242. {
  243. "client": "local_async",
  244. "tgt": "*",
  245. "fun": "test.ping",
  246. "token": "BAD_TOKEN", # send a bad token
  247. },
  248. ]
  249. response = self.fetch(
  250. "/",
  251. method="POST",
  252. body=salt.utils.json.dumps(low),
  253. headers={
  254. "Content-Type": self.content_type_map["json"],
  255. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  256. },
  257. )
  258. response_obj = salt.utils.json.loads(response.body)
  259. ret = response_obj["return"]
  260. ret[0]["minions"] = sorted(ret[0]["minions"])
  261. ret[1]["minions"] = sorted(ret[1]["minions"])
  262. try:
  263. # If --proxy is set, it will cause an extra minion_id to be in the
  264. # response. Since there's not a great way to know if the test
  265. # runner's proxy minion is running, and we're not testing proxy
  266. # minions here anyway, just remove it from the response.
  267. ret[0]["minions"].remove("proxytest")
  268. ret[1]["minions"].remove("proxytest")
  269. except ValueError:
  270. pass
  271. self.assertEqual(len(ret), 3) # make sure we got 3 responses
  272. self.assertIn("jid", ret[0]) # the first 2 are regular returns
  273. self.assertIn("jid", ret[1])
  274. self.assertIn("Failed to authenticate", ret[2]) # bad auth
  275. self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
  276. self.assertEqual(ret[1]["minions"], sorted(["minion", "sub_minion"]))
  277. @slowTest
  278. def test_simple_local_async_post_no_tgt(self):
  279. low = [
  280. {"client": "local_async", "tgt": "minion_we_dont_have", "fun": "test.ping"}
  281. ]
  282. response = self.fetch(
  283. "/",
  284. method="POST",
  285. body=salt.utils.json.dumps(low),
  286. headers={
  287. "Content-Type": self.content_type_map["json"],
  288. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  289. },
  290. )
  291. response_obj = salt.utils.json.loads(response.body)
  292. self.assertEqual(response_obj["return"], [{}])
  293. @skipIf(True, "Undetermined race condition in test. Temporarily disabled.")
  294. def test_simple_local_post_only_dictionary_request_with_order_masters(self):
  295. """
  296. Test a basic API of /
  297. """
  298. low = {
  299. "client": "local",
  300. "tgt": "*",
  301. "fun": "test.ping",
  302. }
  303. self.application.opts["order_masters"] = True
  304. self.application.opts["syndic_wait"] = 5
  305. response = self.fetch(
  306. "/",
  307. method="POST",
  308. body=salt.utils.json.dumps(low),
  309. headers={
  310. "Content-Type": self.content_type_map["json"],
  311. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  312. },
  313. connect_timeout=30,
  314. request_timeout=30,
  315. )
  316. response_obj = salt.utils.json.loads(response.body)
  317. self.application.opts["order_masters"] = []
  318. self.application.opts["syndic_wait"] = 5
  319. # If --proxy is set, it will cause an extra minion_id to be in the
  320. # response. Since there's not a great way to know if the test runner's
  321. # proxy minion is running, and we're not testing proxy minions here
  322. # anyway, just remove it from the response.
  323. response_obj[0]["return"].pop("proxytest", None)
  324. self.assertEqual(response_obj["return"], [{"minion": True, "sub_minion": True}])
  325. # runner tests
  326. @slowTest
  327. def test_simple_local_runner_post(self):
  328. low = [{"client": "runner", "fun": "manage.up"}]
  329. response = self.fetch(
  330. "/",
  331. method="POST",
  332. body=salt.utils.json.dumps(low),
  333. headers={
  334. "Content-Type": self.content_type_map["json"],
  335. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  336. },
  337. connect_timeout=30,
  338. request_timeout=300,
  339. )
  340. response_obj = salt.utils.json.loads(response.body)
  341. self.assertEqual(len(response_obj["return"]), 1)
  342. try:
  343. # If --proxy is set, it will cause an extra minion_id to be in the
  344. # response. Since there's not a great way to know if the test
  345. # runner's proxy minion is running, and we're not testing proxy
  346. # minions here anyway, just remove it from the response.
  347. response_obj["return"][0].remove("proxytest")
  348. except ValueError:
  349. pass
  350. self.assertEqual(
  351. sorted(response_obj["return"][0]), sorted(["minion", "sub_minion"])
  352. )
  353. # runner_async tests
  354. def test_simple_local_runner_async_post(self):
  355. low = [{"client": "runner_async", "fun": "manage.up"}]
  356. response = self.fetch(
  357. "/",
  358. method="POST",
  359. body=salt.utils.json.dumps(low),
  360. headers={
  361. "Content-Type": self.content_type_map["json"],
  362. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  363. },
  364. connect_timeout=10,
  365. request_timeout=10,
  366. )
  367. response_obj = salt.utils.json.loads(response.body)
  368. self.assertIn("return", response_obj)
  369. self.assertEqual(1, len(response_obj["return"]))
  370. self.assertIn("jid", response_obj["return"][0])
  371. self.assertIn("tag", response_obj["return"][0])
  372. @flaky
  373. @skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
  374. class TestMinionSaltAPIHandler(_SaltnadoIntegrationTestCase):
  375. def get_app(self):
  376. urls = [
  377. (r"/minions/(.*)", saltnado.MinionSaltAPIHandler),
  378. (r"/minions", saltnado.MinionSaltAPIHandler),
  379. ]
  380. application = self.build_tornado_app(urls)
  381. application.event_listener = saltnado.EventListener({}, self.opts)
  382. return application
  383. @skipIf(True, "issue #34753")
  384. def test_get_no_mid(self):
  385. response = self.fetch(
  386. "/minions",
  387. method="GET",
  388. headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
  389. follow_redirects=False,
  390. )
  391. response_obj = salt.utils.json.loads(response.body)
  392. self.assertEqual(len(response_obj["return"]), 1)
  393. # one per minion
  394. self.assertEqual(len(response_obj["return"][0]), 2)
  395. # check a single grain
  396. for minion_id, grains in six.iteritems(response_obj["return"][0]):
  397. self.assertEqual(minion_id, grains["id"])
  398. @skipIf(True, "to be re-enabled when #23623 is merged")
  399. @slowTest
  400. def test_get(self):
  401. response = self.fetch(
  402. "/minions/minion",
  403. method="GET",
  404. headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
  405. follow_redirects=False,
  406. )
  407. response_obj = salt.utils.json.loads(response.body)
  408. self.assertEqual(len(response_obj["return"]), 1)
  409. self.assertEqual(len(response_obj["return"][0]), 1)
  410. # check a single grain
  411. self.assertEqual(response_obj["return"][0]["minion"]["id"], "minion")
  412. def test_post(self):
  413. low = [{"tgt": "*minion", "fun": "test.ping"}]
  414. response = self.fetch(
  415. "/minions",
  416. method="POST",
  417. body=salt.utils.json.dumps(low),
  418. headers={
  419. "Content-Type": self.content_type_map["json"],
  420. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  421. },
  422. )
  423. response_obj = salt.utils.json.loads(response.body)
  424. ret = response_obj["return"]
  425. ret[0]["minions"] = sorted(ret[0]["minions"])
  426. # TODO: verify pub function? Maybe look at how we test the publisher
  427. self.assertEqual(len(ret), 1)
  428. self.assertIn("jid", ret[0])
  429. self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
  430. @slowTest
  431. def test_post_with_client(self):
  432. # get a token for this test
  433. low = [{"client": "local_async", "tgt": "*minion", "fun": "test.ping"}]
  434. response = self.fetch(
  435. "/minions",
  436. method="POST",
  437. body=salt.utils.json.dumps(low),
  438. headers={
  439. "Content-Type": self.content_type_map["json"],
  440. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  441. },
  442. )
  443. response_obj = salt.utils.json.loads(response.body)
  444. ret = response_obj["return"]
  445. ret[0]["minions"] = sorted(ret[0]["minions"])
  446. # TODO: verify pub function? Maybe look at how we test the publisher
  447. self.assertEqual(len(ret), 1)
  448. self.assertIn("jid", ret[0])
  449. self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
  450. @slowTest
  451. def test_post_with_incorrect_client(self):
  452. """
  453. The /minions endpoint is asynchronous only, so if you try something else
  454. make sure you get an error
  455. """
  456. # get a token for this test
  457. low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
  458. response = self.fetch(
  459. "/minions",
  460. method="POST",
  461. body=salt.utils.json.dumps(low),
  462. headers={
  463. "Content-Type": self.content_type_map["json"],
  464. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  465. },
  466. )
  467. self.assertEqual(response.code, 400)
  468. @skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
  469. class TestJobsSaltAPIHandler(_SaltnadoIntegrationTestCase):
  470. def get_app(self):
  471. urls = [
  472. (r"/jobs/(.*)", saltnado.JobsSaltAPIHandler),
  473. (r"/jobs", saltnado.JobsSaltAPIHandler),
  474. ]
  475. application = self.build_tornado_app(urls)
  476. application.event_listener = saltnado.EventListener({}, self.opts)
  477. return application
  478. @skipIf(True, "to be re-enabled when #23623 is merged")
  479. @slowTest
  480. def test_get(self):
  481. # test with no JID
  482. self.http_client.fetch(
  483. self.get_url("/jobs"),
  484. self.stop,
  485. method="GET",
  486. headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
  487. follow_redirects=False,
  488. )
  489. response = self.wait(timeout=30)
  490. response_obj = salt.utils.json.loads(response.body)["return"][0]
  491. try:
  492. for jid, ret in six.iteritems(response_obj):
  493. self.assertIn("Function", ret)
  494. self.assertIn("Target", ret)
  495. self.assertIn("Target-type", ret)
  496. self.assertIn("User", ret)
  497. self.assertIn("StartTime", ret)
  498. self.assertIn("Arguments", ret)
  499. except AttributeError as attribute_error:
  500. print(salt.utils.json.loads(response.body))
  501. raise
  502. # test with a specific JID passed in
  503. jid = next(six.iterkeys(response_obj))
  504. self.http_client.fetch(
  505. self.get_url("/jobs/{0}".format(jid)),
  506. self.stop,
  507. method="GET",
  508. headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
  509. follow_redirects=False,
  510. )
  511. response = self.wait(timeout=30)
  512. response_obj = salt.utils.json.loads(response.body)["return"][0]
  513. self.assertIn("Function", response_obj)
  514. self.assertIn("Target", response_obj)
  515. self.assertIn("Target-type", response_obj)
  516. self.assertIn("User", response_obj)
  517. self.assertIn("StartTime", response_obj)
  518. self.assertIn("Arguments", response_obj)
  519. self.assertIn("Result", response_obj)
  520. # TODO: run all the same tests from the root handler, but for now since they are
  521. # the same code, we'll just sanity check
  522. @skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
  523. class TestRunSaltAPIHandler(_SaltnadoIntegrationTestCase):
  524. def get_app(self):
  525. urls = [
  526. ("/run", saltnado.RunSaltAPIHandler),
  527. ]
  528. application = self.build_tornado_app(urls)
  529. application.event_listener = saltnado.EventListener({}, self.opts)
  530. return application
  531. @skipIf(True, "to be re-enabled when #23623 is merged")
  532. @slowTest
  533. def test_get(self):
  534. low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
  535. response = self.fetch(
  536. "/run",
  537. method="POST",
  538. body=salt.utils.json.dumps(low),
  539. headers={
  540. "Content-Type": self.content_type_map["json"],
  541. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  542. },
  543. )
  544. response_obj = salt.utils.json.loads(response.body)
  545. self.assertEqual(response_obj["return"], [{"minion": True, "sub_minion": True}])
  546. @skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
  547. class TestEventsSaltAPIHandler(_SaltnadoIntegrationTestCase):
  548. def get_app(self):
  549. urls = [
  550. (r"/events", saltnado.EventsSaltAPIHandler),
  551. ]
  552. application = self.build_tornado_app(urls)
  553. application.event_listener = saltnado.EventListener({}, self.opts)
  554. # store a reference, for magic later!
  555. self.application = application
  556. self.events_to_fire = 0
  557. return application
  558. @slowTest
  559. def test_get(self):
  560. self.events_to_fire = 5
  561. response = self.fetch(
  562. "/events",
  563. headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
  564. streaming_callback=self.on_event,
  565. )
  566. def _stop(self):
  567. self.stop()
  568. def on_event(self, event):
  569. if six.PY3:
  570. event = event.decode("utf-8")
  571. if self.events_to_fire > 0:
  572. self.application.event_listener.event.fire_event(
  573. {"foo": "bar", "baz": "qux"}, "salt/netapi/test"
  574. )
  575. self.events_to_fire -= 1
  576. # once we've fired all the events, lets call it a day
  577. else:
  578. # wait so that we can ensure that the next future is ready to go
  579. # to make sure we don't explode if the next one is ready
  580. ZMQIOLoop.current().add_timeout(time.time() + 0.5, self._stop)
  581. event = event.strip()
  582. # if we got a retry, just continue
  583. if event != "retry: 400":
  584. tag, data = event.splitlines()
  585. self.assertTrue(tag.startswith("tag: "))
  586. self.assertTrue(data.startswith("data: "))
  587. @skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
  588. class TestWebhookSaltAPIHandler(_SaltnadoIntegrationTestCase):
  589. def get_app(self):
  590. urls = [
  591. (r"/hook(/.*)?", saltnado.WebhookSaltAPIHandler),
  592. ]
  593. application = self.build_tornado_app(urls)
  594. self.application = application
  595. application.event_listener = saltnado.EventListener({}, self.opts)
  596. return application
  597. @skipIf(True, "Skipping until we can devote more resources to debugging this test.")
  598. def test_post(self):
  599. self._future_resolved = threading.Event()
  600. try:
  601. def verify_event(future):
  602. """
  603. Notify the threading event that the future is resolved
  604. """
  605. self._future_resolved.set()
  606. self._finished = (
  607. False # TODO: remove after some cleanup of the event listener
  608. )
  609. # get an event future
  610. future = self.application.event_listener.get_event(
  611. self, tag="salt/netapi/hook", callback=verify_event
  612. )
  613. # fire the event
  614. response = self.fetch(
  615. "/hook",
  616. method="POST",
  617. body="foo=bar",
  618. headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
  619. )
  620. response_obj = salt.utils.json.loads(response.body)
  621. self.assertTrue(response_obj["success"])
  622. resolve_future_timeout = 60
  623. self._future_resolved.wait(resolve_future_timeout)
  624. try:
  625. event = future.result()
  626. except Exception as exc: # pylint: disable=broad-except
  627. self.fail(
  628. "Failed to resolve future under {} secs: {}".format(
  629. resolve_future_timeout, exc
  630. )
  631. )
  632. self.assertEqual(event["tag"], "salt/netapi/hook")
  633. self.assertIn("headers", event["data"])
  634. self.assertEqual(
  635. event["data"]["post"], {"foo": salt.utils.stringutils.to_bytes("bar")}
  636. )
  637. finally:
  638. self._future_resolved.clear()
  639. del self._future_resolved