test_engines.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. # -*- coding: utf-8 -*-
  2. '''
  3. unit tests for the Salt engines
  4. '''
  5. # Import Python libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. # Import Salt Testing Libs
  8. from tests.support.mixins import LoaderModuleMockMixin
  9. from tests.support.unit import skipIf, TestCase
  10. from tests.support.mock import (
  11. NO_MOCK,
  12. NO_MOCK_REASON,
  13. patch)
  14. # Import Salt Libs
  15. import salt.engines as engines
  16. import salt.config
  17. import salt.utils.process
  18. # Import 3rd-party libs
  19. from salt.ext import six
  20. import logging
  21. log = logging.getLogger(__name__)
  22. @skipIf(NO_MOCK, NO_MOCK_REASON)
  23. class EngineTestCase(TestCase, LoaderModuleMockMixin):
  24. '''
  25. Test cases for salt.engine.sqs_events
  26. '''
  27. def setup_loader_modules(self):
  28. return {engines: {}}
  29. def test_engine_module(self):
  30. '''
  31. Test
  32. '''
  33. mock_opts = salt.config.DEFAULT_MINION_OPTS.copy()
  34. mock_opts['__role'] = 'minion'
  35. mock_opts['engines'] = [{'test_one': {'engine_module': 'test'}},
  36. {'test_two': {'engine_module': 'test'}}]
  37. process_manager = salt.utils.process.ProcessManager()
  38. with patch.dict(engines.__opts__, mock_opts):
  39. salt.engines.start_engines(mock_opts, process_manager)
  40. process_map = process_manager._process_map
  41. count = 0
  42. for proc in six.iterkeys(process_map):
  43. count += 1
  44. fun = process_map[proc]['Process'].fun
  45. # Ensure function is start from the test engine
  46. self.assertEqual(fun, 'test.start')
  47. # Ensure there were two engine started
  48. self.assertEqual(count, len(mock_opts['engines']))