test_app.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727
  1. import os
  2. import threading
  3. import time
  4. import pytest
  5. import salt.utils.json
  6. import salt.utils.stringutils
  7. from salt.netapi.rest_tornado import saltnado
  8. from salt.utils.versions import StrictVersion
  9. from salt.utils.zeromq import ZMQDefaultLoop as ZMQIOLoop
  10. from salt.utils.zeromq import zmq
  11. from tests.support.helpers import TstSuiteLoggingHandler, flaky, slowTest
  12. from tests.support.unit import skipIf
  13. from tests.unit.netapi.test_rest_tornado import SaltnadoTestCase
  14. HAS_ZMQ_IOLOOP = bool(zmq)
  15. class _SaltnadoIntegrationTestCase(SaltnadoTestCase): # pylint: disable=abstract-method
  16. @property
  17. def opts(self):
  18. return self.get_config("client_config", from_scratch=True)
  19. @property
  20. def mod_opts(self):
  21. return self.get_config("minion", from_scratch=True)
  22. @skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
  23. @skipIf(
  24. StrictVersion(zmq.__version__) < StrictVersion("14.0.1"),
  25. "PyZMQ must be >= 14.0.1 to run these tests.",
  26. )
  27. @pytest.mark.usefixtures("salt_sub_minion")
  28. class TestSaltAPIHandler(_SaltnadoIntegrationTestCase):
  29. def setUp(self):
  30. super().setUp()
  31. os.environ["ASYNC_TEST_TIMEOUT"] = "300"
  32. def get_app(self):
  33. urls = [("/", saltnado.SaltAPIHandler)]
  34. application = self.build_tornado_app(urls)
  35. application.event_listener = saltnado.EventListener({}, self.opts)
  36. self.application = application
  37. return application
  38. def test_root(self):
  39. """
  40. Test the root path which returns the list of clients we support
  41. """
  42. response = self.fetch("/", connect_timeout=30, request_timeout=30,)
  43. self.assertEqual(response.code, 200)
  44. response_obj = salt.utils.json.loads(response.body)
  45. self.assertEqual(
  46. sorted(response_obj["clients"]),
  47. ["local", "local_async", "runner", "runner_async"],
  48. )
  49. self.assertEqual(response_obj["return"], "Welcome")
  50. @slowTest
  51. def test_post_no_auth(self):
  52. """
  53. Test post with no auth token, should 401
  54. """
  55. # get a token for this test
  56. low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
  57. response = self.fetch(
  58. "/",
  59. method="POST",
  60. body=salt.utils.json.dumps(low),
  61. headers={"Content-Type": self.content_type_map["json"]},
  62. follow_redirects=False,
  63. connect_timeout=30,
  64. request_timeout=30,
  65. )
  66. self.assertEqual(response.code, 302)
  67. self.assertEqual(response.headers["Location"], "/login")
  68. # Local client tests
  69. @slowTest
  70. def test_regression_49572(self):
  71. with TstSuiteLoggingHandler() as handler:
  72. GATHER_JOB_TIMEOUT = 1
  73. self.application.opts["gather_job_timeout"] = GATHER_JOB_TIMEOUT
  74. low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
  75. fetch_kwargs = {
  76. "method": "POST",
  77. "body": salt.utils.json.dumps(low),
  78. "headers": {
  79. "Content-Type": self.content_type_map["json"],
  80. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  81. },
  82. "connect_timeout": 30,
  83. "request_timeout": 30,
  84. }
  85. self.fetch("/", **fetch_kwargs)
  86. time.sleep(GATHER_JOB_TIMEOUT + 0.1) # ick
  87. # While the traceback is in the logs after the sleep without this
  88. # follow up fetch, the logging handler doesn't see it in its list
  89. # of messages unless something else runs.
  90. self.fetch("/", **fetch_kwargs)
  91. for message in handler.messages:
  92. if "TypeError: 'NoneType' object is not iterable" in message:
  93. raise AssertionError(
  94. "#49572: regression: set_result on completed event"
  95. )
  96. def test_simple_local_post(self):
  97. """
  98. Test a basic API of /
  99. """
  100. low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
  101. response = self.fetch(
  102. "/",
  103. method="POST",
  104. body=salt.utils.json.dumps(low),
  105. headers={
  106. "Content-Type": self.content_type_map["json"],
  107. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  108. },
  109. connect_timeout=30,
  110. request_timeout=30,
  111. )
  112. response_obj = salt.utils.json.loads(response.body)
  113. self.assertEqual(len(response_obj["return"]), 1)
  114. # If --proxy is set, it will cause an extra minion_id to be in the
  115. # response. Since there's not a great way to know if the test
  116. # runner's proxy minion is running, and we're not testing proxy
  117. # minions here anyway, just remove it from the response.
  118. response_obj["return"][0].pop("proxytest", None)
  119. self.assertEqual(
  120. response_obj["return"][0], {"minion": True, "sub_minion": True}
  121. )
  122. def test_simple_local_post_no_tgt(self):
  123. """
  124. POST job with invalid tgt
  125. """
  126. low = [{"client": "local", "tgt": "minion_we_dont_have", "fun": "test.ping"}]
  127. response = self.fetch(
  128. "/",
  129. method="POST",
  130. body=salt.utils.json.dumps(low),
  131. headers={
  132. "Content-Type": self.content_type_map["json"],
  133. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  134. },
  135. connect_timeout=30,
  136. request_timeout=30,
  137. )
  138. response_obj = salt.utils.json.loads(response.body)
  139. self.assertEqual(
  140. response_obj["return"],
  141. [
  142. "No minions matched the target. No command was sent, no jid was assigned."
  143. ],
  144. )
  145. # local client request body test
  146. def test_simple_local_post_only_dictionary_request(self):
  147. """
  148. Test a basic API of /
  149. """
  150. low = {
  151. "client": "local",
  152. "tgt": "*",
  153. "fun": "test.ping",
  154. }
  155. response = self.fetch(
  156. "/",
  157. method="POST",
  158. body=salt.utils.json.dumps(low),
  159. headers={
  160. "Content-Type": self.content_type_map["json"],
  161. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  162. },
  163. connect_timeout=30,
  164. request_timeout=30,
  165. )
  166. response_obj = salt.utils.json.loads(response.body)
  167. self.assertEqual(len(response_obj["return"]), 1)
  168. # If --proxy is set, it will cause an extra minion_id to be in the
  169. # response. Since there's not a great way to know if the test
  170. # runner's proxy minion is running, and we're not testing proxy
  171. # minions here anyway, just remove it from the response.
  172. response_obj["return"][0].pop("proxytest", None)
  173. self.assertEqual(
  174. response_obj["return"][0], {"minion": True, "sub_minion": True}
  175. )
  176. def test_simple_local_post_invalid_request(self):
  177. """
  178. Test a basic API of /
  179. """
  180. low = ["invalid request"]
  181. response = self.fetch(
  182. "/",
  183. method="POST",
  184. body=salt.utils.json.dumps(low),
  185. headers={
  186. "Content-Type": self.content_type_map["json"],
  187. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  188. },
  189. connect_timeout=30,
  190. request_timeout=30,
  191. )
  192. self.assertEqual(response.code, 400)
  193. # local_async tests
  194. def test_simple_local_async_post(self):
  195. low = [{"client": "local_async", "tgt": "*", "fun": "test.ping"}]
  196. response = self.fetch(
  197. "/",
  198. method="POST",
  199. body=salt.utils.json.dumps(low),
  200. headers={
  201. "Content-Type": self.content_type_map["json"],
  202. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  203. },
  204. )
  205. response_obj = salt.utils.json.loads(response.body)
  206. ret = response_obj["return"]
  207. ret[0]["minions"] = sorted(ret[0]["minions"])
  208. try:
  209. # If --proxy is set, it will cause an extra minion_id to be in the
  210. # response. Since there's not a great way to know if the test
  211. # runner's proxy minion is running, and we're not testing proxy
  212. # minions here anyway, just remove it from the response.
  213. ret[0]["minions"].remove("proxytest")
  214. except ValueError:
  215. pass
  216. # TODO: verify pub function? Maybe look at how we test the publisher
  217. self.assertEqual(len(ret), 1)
  218. self.assertIn("jid", ret[0])
  219. self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
  220. def test_multi_local_async_post(self):
  221. low = [
  222. {"client": "local_async", "tgt": "*", "fun": "test.ping"},
  223. {"client": "local_async", "tgt": "*", "fun": "test.ping"},
  224. ]
  225. response = self.fetch(
  226. "/",
  227. method="POST",
  228. body=salt.utils.json.dumps(low),
  229. headers={
  230. "Content-Type": self.content_type_map["json"],
  231. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  232. },
  233. )
  234. response_obj = salt.utils.json.loads(response.body)
  235. ret = response_obj["return"]
  236. ret[0]["minions"] = sorted(ret[0]["minions"])
  237. ret[1]["minions"] = sorted(ret[1]["minions"])
  238. try:
  239. # If --proxy is set, it will cause an extra minion_id to be in the
  240. # response. Since there's not a great way to know if the test
  241. # runner's proxy minion is running, and we're not testing proxy
  242. # minions here anyway, just remove it from the response.
  243. ret[0]["minions"].remove("proxytest")
  244. ret[1]["minions"].remove("proxytest")
  245. except ValueError:
  246. pass
  247. self.assertEqual(len(ret), 2)
  248. self.assertIn("jid", ret[0])
  249. self.assertIn("jid", ret[1])
  250. self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
  251. self.assertEqual(ret[1]["minions"], sorted(["minion", "sub_minion"]))
  252. @slowTest
  253. def test_multi_local_async_post_multitoken(self):
  254. low = [
  255. {"client": "local_async", "tgt": "*", "fun": "test.ping"},
  256. {
  257. "client": "local_async",
  258. "tgt": "*",
  259. "fun": "test.ping",
  260. "token": self.token[
  261. "token"
  262. ], # send a different (but still valid token)
  263. },
  264. {
  265. "client": "local_async",
  266. "tgt": "*",
  267. "fun": "test.ping",
  268. "token": "BAD_TOKEN", # send a bad token
  269. },
  270. ]
  271. response = self.fetch(
  272. "/",
  273. method="POST",
  274. body=salt.utils.json.dumps(low),
  275. headers={
  276. "Content-Type": self.content_type_map["json"],
  277. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  278. },
  279. )
  280. response_obj = salt.utils.json.loads(response.body)
  281. ret = response_obj["return"]
  282. ret[0]["minions"] = sorted(ret[0]["minions"])
  283. ret[1]["minions"] = sorted(ret[1]["minions"])
  284. try:
  285. # If --proxy is set, it will cause an extra minion_id to be in the
  286. # response. Since there's not a great way to know if the test
  287. # runner's proxy minion is running, and we're not testing proxy
  288. # minions here anyway, just remove it from the response.
  289. ret[0]["minions"].remove("proxytest")
  290. ret[1]["minions"].remove("proxytest")
  291. except ValueError:
  292. pass
  293. self.assertEqual(len(ret), 3) # make sure we got 3 responses
  294. self.assertIn("jid", ret[0]) # the first 2 are regular returns
  295. self.assertIn("jid", ret[1])
  296. self.assertIn("Failed to authenticate", ret[2]) # bad auth
  297. self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
  298. self.assertEqual(ret[1]["minions"], sorted(["minion", "sub_minion"]))
  299. @slowTest
  300. def test_simple_local_async_post_no_tgt(self):
  301. low = [
  302. {"client": "local_async", "tgt": "minion_we_dont_have", "fun": "test.ping"}
  303. ]
  304. response = self.fetch(
  305. "/",
  306. method="POST",
  307. body=salt.utils.json.dumps(low),
  308. headers={
  309. "Content-Type": self.content_type_map["json"],
  310. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  311. },
  312. )
  313. response_obj = salt.utils.json.loads(response.body)
  314. self.assertEqual(response_obj["return"], [{}])
  315. @skipIf(True, "Undetermined race condition in test. Temporarily disabled.")
  316. def test_simple_local_post_only_dictionary_request_with_order_masters(self):
  317. """
  318. Test a basic API of /
  319. """
  320. low = {
  321. "client": "local",
  322. "tgt": "*",
  323. "fun": "test.ping",
  324. }
  325. self.application.opts["order_masters"] = True
  326. self.application.opts["syndic_wait"] = 5
  327. response = self.fetch(
  328. "/",
  329. method="POST",
  330. body=salt.utils.json.dumps(low),
  331. headers={
  332. "Content-Type": self.content_type_map["json"],
  333. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  334. },
  335. connect_timeout=30,
  336. request_timeout=30,
  337. )
  338. response_obj = salt.utils.json.loads(response.body)
  339. self.application.opts["order_masters"] = []
  340. self.application.opts["syndic_wait"] = 5
  341. # If --proxy is set, it will cause an extra minion_id to be in the
  342. # response. Since there's not a great way to know if the test runner's
  343. # proxy minion is running, and we're not testing proxy minions here
  344. # anyway, just remove it from the response.
  345. response_obj[0]["return"].pop("proxytest", None)
  346. self.assertEqual(response_obj["return"], [{"minion": True, "sub_minion": True}])
  347. # runner tests
  348. @slowTest
  349. def test_simple_local_runner_post(self):
  350. low = [{"client": "runner", "fun": "manage.up"}]
  351. response = self.fetch(
  352. "/",
  353. method="POST",
  354. body=salt.utils.json.dumps(low),
  355. headers={
  356. "Content-Type": self.content_type_map["json"],
  357. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  358. },
  359. connect_timeout=30,
  360. request_timeout=300,
  361. )
  362. response_obj = salt.utils.json.loads(response.body)
  363. self.assertEqual(len(response_obj["return"]), 1)
  364. try:
  365. # If --proxy is set, it will cause an extra minion_id to be in the
  366. # response. Since there's not a great way to know if the test
  367. # runner's proxy minion is running, and we're not testing proxy
  368. # minions here anyway, just remove it from the response.
  369. response_obj["return"][0].remove("proxytest")
  370. except ValueError:
  371. pass
  372. self.assertEqual(
  373. sorted(response_obj["return"][0]), sorted(["minion", "sub_minion"])
  374. )
  375. # runner_async tests
  376. def test_simple_local_runner_async_post(self):
  377. low = [{"client": "runner_async", "fun": "manage.up"}]
  378. response = self.fetch(
  379. "/",
  380. method="POST",
  381. body=salt.utils.json.dumps(low),
  382. headers={
  383. "Content-Type": self.content_type_map["json"],
  384. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  385. },
  386. connect_timeout=10,
  387. request_timeout=10,
  388. )
  389. response_obj = salt.utils.json.loads(response.body)
  390. self.assertIn("return", response_obj)
  391. self.assertEqual(1, len(response_obj["return"]))
  392. self.assertIn("jid", response_obj["return"][0])
  393. self.assertIn("tag", response_obj["return"][0])
  394. @flaky
  395. @skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
  396. class TestMinionSaltAPIHandler(_SaltnadoIntegrationTestCase):
  397. def get_app(self):
  398. urls = [
  399. (r"/minions/(.*)", saltnado.MinionSaltAPIHandler),
  400. (r"/minions", saltnado.MinionSaltAPIHandler),
  401. ]
  402. application = self.build_tornado_app(urls)
  403. application.event_listener = saltnado.EventListener({}, self.opts)
  404. return application
  405. def test_get_no_mid(self):
  406. response = self.fetch(
  407. "/minions",
  408. method="GET",
  409. headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
  410. follow_redirects=False,
  411. )
  412. response_obj = salt.utils.json.loads(response.body)
  413. self.assertEqual(len(response_obj["return"]), 1)
  414. # one per minion
  415. self.assertEqual(len(response_obj["return"][0]), 2)
  416. # check a single grain
  417. for minion_id, grains in response_obj["return"][0].items():
  418. self.assertEqual(minion_id, grains["id"])
  419. @slowTest
  420. def test_get(self):
  421. response = self.fetch(
  422. "/minions/minion",
  423. method="GET",
  424. headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
  425. follow_redirects=False,
  426. )
  427. response_obj = salt.utils.json.loads(response.body)
  428. self.assertEqual(len(response_obj["return"]), 1)
  429. self.assertEqual(len(response_obj["return"][0]), 1)
  430. # check a single grain
  431. self.assertEqual(response_obj["return"][0]["minion"]["id"], "minion")
  432. def test_post(self):
  433. low = [{"tgt": "*minion", "fun": "test.ping"}]
  434. response = self.fetch(
  435. "/minions",
  436. method="POST",
  437. body=salt.utils.json.dumps(low),
  438. headers={
  439. "Content-Type": self.content_type_map["json"],
  440. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  441. },
  442. )
  443. response_obj = salt.utils.json.loads(response.body)
  444. ret = response_obj["return"]
  445. ret[0]["minions"] = sorted(ret[0]["minions"])
  446. # TODO: verify pub function? Maybe look at how we test the publisher
  447. self.assertEqual(len(ret), 1)
  448. self.assertIn("jid", ret[0])
  449. self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
  450. @slowTest
  451. def test_post_with_client(self):
  452. # get a token for this test
  453. low = [{"client": "local_async", "tgt": "*minion", "fun": "test.ping"}]
  454. response = self.fetch(
  455. "/minions",
  456. method="POST",
  457. body=salt.utils.json.dumps(low),
  458. headers={
  459. "Content-Type": self.content_type_map["json"],
  460. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  461. },
  462. )
  463. response_obj = salt.utils.json.loads(response.body)
  464. ret = response_obj["return"]
  465. ret[0]["minions"] = sorted(ret[0]["minions"])
  466. # TODO: verify pub function? Maybe look at how we test the publisher
  467. self.assertEqual(len(ret), 1)
  468. self.assertIn("jid", ret[0])
  469. self.assertEqual(ret[0]["minions"], sorted(["minion", "sub_minion"]))
  470. @slowTest
  471. def test_post_with_incorrect_client(self):
  472. """
  473. The /minions endpoint is asynchronous only, so if you try something else
  474. make sure you get an error
  475. """
  476. # get a token for this test
  477. low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
  478. response = self.fetch(
  479. "/minions",
  480. method="POST",
  481. body=salt.utils.json.dumps(low),
  482. headers={
  483. "Content-Type": self.content_type_map["json"],
  484. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  485. },
  486. )
  487. self.assertEqual(response.code, 400)
  488. @skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
  489. class TestJobsSaltAPIHandler(_SaltnadoIntegrationTestCase):
  490. def get_app(self):
  491. urls = [
  492. (r"/jobs/(.*)", saltnado.JobsSaltAPIHandler),
  493. (r"/jobs", saltnado.JobsSaltAPIHandler),
  494. ]
  495. application = self.build_tornado_app(urls)
  496. application.event_listener = saltnado.EventListener({}, self.opts)
  497. return application
  498. @slowTest
  499. def test_get(self):
  500. # test with no JID
  501. self.http_client.fetch(
  502. self.get_url("/jobs"),
  503. self.stop,
  504. method="GET",
  505. headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
  506. follow_redirects=False,
  507. )
  508. response = self.wait(timeout=30)
  509. response_obj = salt.utils.json.loads(response.body)["return"][0]
  510. try:
  511. for jid, ret in response_obj.items():
  512. self.assertIn("Function", ret)
  513. self.assertIn("Target", ret)
  514. self.assertIn("Target-type", ret)
  515. self.assertIn("User", ret)
  516. self.assertIn("StartTime", ret)
  517. self.assertIn("Arguments", ret)
  518. except AttributeError as attribute_error:
  519. print(salt.utils.json.loads(response.body))
  520. raise
  521. # test with a specific JID passed in
  522. jid = next(iter(response_obj.keys()))
  523. self.http_client.fetch(
  524. self.get_url("/jobs/{}".format(jid)),
  525. self.stop,
  526. method="GET",
  527. headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
  528. follow_redirects=False,
  529. )
  530. response = self.wait(timeout=30)
  531. response_obj = salt.utils.json.loads(response.body)["return"][0]
  532. self.assertIn("Function", response_obj)
  533. self.assertIn("Target", response_obj)
  534. self.assertIn("Target-type", response_obj)
  535. self.assertIn("User", response_obj)
  536. self.assertIn("StartTime", response_obj)
  537. self.assertIn("Arguments", response_obj)
  538. self.assertIn("Result", response_obj)
  539. # TODO: run all the same tests from the root handler, but for now since they are
  540. # the same code, we'll just sanity check
  541. @skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
  542. class TestRunSaltAPIHandler(_SaltnadoIntegrationTestCase):
  543. def get_app(self):
  544. urls = [
  545. ("/run", saltnado.RunSaltAPIHandler),
  546. ]
  547. application = self.build_tornado_app(urls)
  548. application.event_listener = saltnado.EventListener({}, self.opts)
  549. return application
  550. @slowTest
  551. def test_get(self):
  552. low = [{"client": "local", "tgt": "*", "fun": "test.ping"}]
  553. response = self.fetch(
  554. "/run",
  555. method="POST",
  556. body=salt.utils.json.dumps(low),
  557. headers={
  558. "Content-Type": self.content_type_map["json"],
  559. saltnado.AUTH_TOKEN_HEADER: self.token["token"],
  560. },
  561. )
  562. response_obj = salt.utils.json.loads(response.body)
  563. self.assertEqual(response_obj["return"], [{"minion": True, "sub_minion": True}])
  564. @skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
  565. class TestEventsSaltAPIHandler(_SaltnadoIntegrationTestCase):
  566. def get_app(self):
  567. urls = [
  568. (r"/events", saltnado.EventsSaltAPIHandler),
  569. ]
  570. application = self.build_tornado_app(urls)
  571. application.event_listener = saltnado.EventListener({}, self.opts)
  572. # store a reference, for magic later!
  573. self.application = application
  574. self.events_to_fire = 0
  575. return application
  576. @slowTest
  577. def test_get(self):
  578. self.events_to_fire = 5
  579. response = self.fetch(
  580. "/events",
  581. headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
  582. streaming_callback=self.on_event,
  583. )
  584. def _stop(self):
  585. self.stop()
  586. def on_event(self, event):
  587. event = event.decode("utf-8")
  588. if self.events_to_fire > 0:
  589. self.application.event_listener.event.fire_event(
  590. {"foo": "bar", "baz": "qux"}, "salt/netapi/test"
  591. )
  592. self.events_to_fire -= 1
  593. # once we've fired all the events, lets call it a day
  594. else:
  595. # wait so that we can ensure that the next future is ready to go
  596. # to make sure we don't explode if the next one is ready
  597. ZMQIOLoop.current().add_timeout(time.time() + 0.5, self._stop)
  598. event = event.strip()
  599. # if we got a retry, just continue
  600. if event != "retry: 400":
  601. tag, data = event.splitlines()
  602. self.assertTrue(tag.startswith("tag: "))
  603. self.assertTrue(data.startswith("data: "))
  604. @skipIf(HAS_ZMQ_IOLOOP is False, "PyZMQ version must be >= 14.0.1 to run these tests.")
  605. class TestWebhookSaltAPIHandler(_SaltnadoIntegrationTestCase):
  606. def get_app(self):
  607. urls = [
  608. (r"/hook(/.*)?", saltnado.WebhookSaltAPIHandler),
  609. ]
  610. application = self.build_tornado_app(urls)
  611. self.application = application
  612. application.event_listener = saltnado.EventListener({}, self.opts)
  613. return application
  614. @skipIf(True, "Skipping until we can devote more resources to debugging this test.")
  615. def test_post(self):
  616. self._future_resolved = threading.Event()
  617. try:
  618. def verify_event(future):
  619. """
  620. Notify the threading event that the future is resolved
  621. """
  622. self._future_resolved.set()
  623. self._finished = (
  624. False # TODO: remove after some cleanup of the event listener
  625. )
  626. # get an event future
  627. future = self.application.event_listener.get_event(
  628. self, tag="salt/netapi/hook", callback=verify_event
  629. )
  630. # fire the event
  631. response = self.fetch(
  632. "/hook",
  633. method="POST",
  634. body="foo=bar",
  635. headers={saltnado.AUTH_TOKEN_HEADER: self.token["token"]},
  636. )
  637. response_obj = salt.utils.json.loads(response.body)
  638. self.assertTrue(response_obj["success"])
  639. resolve_future_timeout = 60
  640. self._future_resolved.wait(resolve_future_timeout)
  641. try:
  642. event = future.result()
  643. except Exception as exc: # pylint: disable=broad-except
  644. self.fail(
  645. "Failed to resolve future under {} secs: {}".format(
  646. resolve_future_timeout, exc
  647. )
  648. )
  649. self.assertEqual(event["tag"], "salt/netapi/hook")
  650. self.assertIn("headers", event["data"])
  651. self.assertEqual(event["data"]["post"], {"foo": "bar"})
  652. finally:
  653. self._future_resolved.clear()
  654. del self._future_resolved