1
0

test_engines.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. # -*- coding: utf-8 -*-
  2. """
  3. unit tests for the Salt engines
  4. """
  5. from __future__ import absolute_import, print_function, unicode_literals
  6. import logging
  7. import salt.config
  8. import salt.engines as engines
  9. import salt.utils.process
  10. from salt.ext import six
  11. from tests.support.helpers import slowTest
  12. from tests.support.mixins import LoaderModuleMockMixin
  13. from tests.support.mock import patch
  14. from tests.support.unit import TestCase
  15. log = logging.getLogger(__name__)
  16. class EngineTestCase(TestCase, LoaderModuleMockMixin):
  17. """
  18. Test cases for salt.engine.sqs_events
  19. """
  20. def setup_loader_modules(self):
  21. return {engines: {}}
  22. @slowTest
  23. def test_engine_module(self):
  24. """
  25. Test
  26. """
  27. mock_opts = salt.config.DEFAULT_MINION_OPTS.copy()
  28. mock_opts["__role"] = "minion"
  29. mock_opts["engines"] = [
  30. {"test_one": {"engine_module": "test"}},
  31. {"test_two": {"engine_module": "test"}},
  32. ]
  33. process_manager = salt.utils.process.ProcessManager()
  34. with patch.dict(engines.__opts__, mock_opts):
  35. salt.engines.start_engines(mock_opts, process_manager)
  36. process_map = process_manager._process_map
  37. count = 0
  38. for proc in six.iterkeys(process_map):
  39. count += 1
  40. fun = process_map[proc]["Process"].fun
  41. # Ensure function is start from the test engine
  42. self.assertEqual(fun, "test.start")
  43. # Ensure there were two engine started
  44. self.assertEqual(count, len(mock_opts["engines"]))