1
0

test_publish.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import pytest
  4. from tests.support.case import ModuleCase
  5. from tests.support.mixins import SaltReturnAssertsMixin
  6. @pytest.mark.windows_whitelisted
  7. class PublishModuleTest(ModuleCase, SaltReturnAssertsMixin):
  8. """
  9. Validate the publish module
  10. """
  11. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  12. def test_publish(self):
  13. """
  14. publish.publish
  15. """
  16. ret = self.run_function(
  17. "publish.publish", ["minion", "test.ping"], f_timeout=50
  18. )
  19. self.assertEqual(ret, {"minion": True})
  20. ret = self.run_function(
  21. "publish.publish",
  22. ["minion", "test.kwarg"],
  23. f_arg="cheese=spam",
  24. f_timeout=50,
  25. )
  26. ret = ret["minion"]
  27. check_true = (
  28. "cheese",
  29. "__pub_arg",
  30. "__pub_fun",
  31. "__pub_id",
  32. "__pub_jid",
  33. "__pub_ret",
  34. "__pub_tgt",
  35. "__pub_tgt_type",
  36. )
  37. for name in check_true:
  38. if name not in ret:
  39. print(name)
  40. self.assertTrue(name in ret)
  41. self.assertEqual(ret["cheese"], "spam")
  42. self.assertEqual(ret["__pub_arg"], [{"cheese": "spam"}])
  43. self.assertEqual(ret["__pub_id"], "minion")
  44. self.assertEqual(ret["__pub_fun"], "test.kwarg")
  45. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  46. def test_publish_yaml_args(self):
  47. """
  48. test publish.publish yaml args formatting
  49. """
  50. ret = self.run_function(
  51. "publish.publish", ["minion", "test.ping"], f_timeout=50
  52. )
  53. self.assertEqual(ret, {"minion": True})
  54. test_args_list = ["saltines, si", "crackers, nein", "cheese, indeed"]
  55. test_args = '["{args[0]}", "{args[1]}", "{args[2]}"]'.format(
  56. args=test_args_list
  57. )
  58. ret = self.run_function(
  59. "publish.publish", ["minion", "test.arg", test_args], f_timeout=50
  60. )
  61. ret = ret["minion"]
  62. check_true = (
  63. "__pub_arg",
  64. "__pub_fun",
  65. "__pub_id",
  66. "__pub_jid",
  67. "__pub_ret",
  68. "__pub_tgt",
  69. "__pub_tgt_type",
  70. )
  71. for name in check_true:
  72. if name not in ret["kwargs"]:
  73. print(name)
  74. self.assertTrue(name in ret["kwargs"])
  75. self.assertEqual(ret["args"], test_args_list)
  76. self.assertEqual(ret["kwargs"]["__pub_id"], "minion")
  77. self.assertEqual(ret["kwargs"]["__pub_fun"], "test.arg")
  78. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  79. def test_full_data(self):
  80. """
  81. publish.full_data
  82. """
  83. ret = self.run_function(
  84. "publish.full_data", ["minion", "test.fib", 20], f_timeout=50
  85. )
  86. self.assertTrue(ret)
  87. self.assertEqual(ret["minion"]["ret"][0], 6765)
  88. @pytest.mark.slow_test(seconds=60) # Test takes >30 and <=60 seconds
  89. def test_kwarg(self):
  90. """
  91. Verify that the pub data is making it to the minion functions
  92. """
  93. ret = self.run_function(
  94. "publish.full_data",
  95. ["minion", "test.kwarg"],
  96. f_arg="cheese=spam",
  97. f_timeout=50,
  98. )
  99. ret = ret["minion"]["ret"]
  100. check_true = (
  101. "cheese",
  102. "__pub_arg",
  103. "__pub_fun",
  104. "__pub_id",
  105. "__pub_jid",
  106. "__pub_ret",
  107. "__pub_tgt",
  108. "__pub_tgt_type",
  109. )
  110. for name in check_true:
  111. if name not in ret:
  112. print(name)
  113. self.assertTrue(name in ret)
  114. self.assertEqual(ret["cheese"], "spam")
  115. self.assertEqual(ret["__pub_arg"], [{"cheese": "spam"}])
  116. self.assertEqual(ret["__pub_id"], "minion")
  117. self.assertEqual(ret["__pub_fun"], "test.kwarg")
  118. ret = self.run_function(
  119. "publish.full_data", ["minion", "test.kwarg"], cheese="spam", f_timeout=50
  120. )
  121. self.assertIn("The following keyword arguments are not valid", ret)
  122. @pytest.mark.slow_test(seconds=30) # Test takes >10 and <=30 seconds
  123. def test_reject_minion(self):
  124. """
  125. Test bad authentication
  126. """
  127. ret = self.run_function(
  128. "publish.publish", ["minion", "cmd.run", ["echo foo"]], f_timeout=50
  129. )
  130. self.assertEqual(ret, {})