1
0

test_queue.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. # -*- coding: utf-8 -*-
  2. '''
  3. unit tests for the cache runner
  4. '''
  5. # Import Python Libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import os
  8. # Import Salt Testing Libs
  9. from tests.support.mixins import LoaderModuleMockMixin
  10. from tests.support.paths import TMP
  11. from tests.support.unit import skipIf, TestCase
  12. from tests.support.mock import (
  13. NO_MOCK,
  14. NO_MOCK_REASON,
  15. MagicMock,
  16. patch
  17. )
  18. # Import Salt Libs
  19. import salt.runners.queue as queue_mod
  20. @skipIf(NO_MOCK, NO_MOCK_REASON)
  21. class QueueTest(TestCase, LoaderModuleMockMixin):
  22. '''
  23. Validate the queue runner
  24. '''
  25. def setup_loader_modules(self):
  26. return {
  27. queue_mod: {
  28. '__opts__': {
  29. 'sock_dir': os.path.join(TMP, 'queue-runner-sock-dir'),
  30. 'transport': 'zeromq'
  31. }
  32. }
  33. }
  34. def test_insert_runner(self):
  35. queue_insert = MagicMock(return_value=True)
  36. with patch.object(queue_mod, 'insert', queue_insert):
  37. queue_mod.insert_runner('test.stdout_print', queue='salt')
  38. expected_call = {
  39. 'queue': 'salt',
  40. 'items': {
  41. 'fun': 'test.stdout_print',
  42. 'args': [],
  43. 'kwargs': {},
  44. },
  45. 'backend': 'pgjsonb',
  46. }
  47. queue_insert.assert_called_once_with(**expected_call)
  48. def test_process_runner(self):
  49. ret = [{
  50. 'fun': 'test.stdout_print',
  51. 'args': [],
  52. 'kwargs': {},
  53. }]
  54. queue_pop = MagicMock(return_value=ret)
  55. test_stdout_print = MagicMock(return_value=True)
  56. with patch.dict(queue_mod.__salt__, {'test.stdout_print': test_stdout_print}):
  57. with patch.object(queue_mod, 'pop', queue_pop):
  58. queue_mod.process_runner(queue='salt')
  59. queue_pop.assert_called_once_with(is_runner=True, queue='salt', quantity=1, backend='pgjsonb')
  60. test_stdout_print.assert_called_once_with()
  61. queue_pop.assert_called_once_with(is_runner=True, queue='salt', quantity=1, backend='pgjsonb')