1
0

test_queue.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. # -*- coding: utf-8 -*-
  2. '''
  3. unit tests for the cache runner
  4. '''
  5. # Import Python Libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import os
  8. # Import Salt Testing Libs
  9. from tests.support.runtests import RUNTIME_VARS
  10. from tests.support.mixins import LoaderModuleMockMixin
  11. from tests.support.unit import TestCase
  12. from tests.support.mock import (
  13. MagicMock,
  14. patch
  15. )
  16. # Import Salt Libs
  17. import salt.runners.queue as queue_mod
  18. class QueueTest(TestCase, LoaderModuleMockMixin):
  19. '''
  20. Validate the queue runner
  21. '''
  22. def setup_loader_modules(self):
  23. return {
  24. queue_mod: {
  25. '__opts__': {
  26. 'sock_dir': os.path.join(RUNTIME_VARS.TMP, 'queue-runner-sock-dir'),
  27. 'transport': 'zeromq'
  28. }
  29. }
  30. }
  31. def test_insert_runner(self):
  32. queue_insert = MagicMock(return_value=True)
  33. with patch.object(queue_mod, 'insert', queue_insert):
  34. queue_mod.insert_runner('test.stdout_print', queue='salt')
  35. expected_call = {
  36. 'queue': 'salt',
  37. 'items': {
  38. 'fun': 'test.stdout_print',
  39. 'args': [],
  40. 'kwargs': {},
  41. },
  42. 'backend': 'pgjsonb',
  43. }
  44. queue_insert.assert_called_once_with(**expected_call)
  45. def test_process_runner(self):
  46. ret = [{
  47. 'fun': 'test.stdout_print',
  48. 'args': [],
  49. 'kwargs': {},
  50. }]
  51. queue_pop = MagicMock(return_value=ret)
  52. test_stdout_print = MagicMock(return_value=True)
  53. with patch.dict(queue_mod.__salt__, {'test.stdout_print': test_stdout_print}):
  54. with patch.object(queue_mod, 'pop', queue_pop):
  55. queue_mod.process_runner(queue='salt')
  56. queue_pop.assert_called_once_with(is_runner=True, queue='salt', quantity=1, backend='pgjsonb')
  57. test_stdout_print.assert_called_once_with()
  58. queue_pop.assert_called_once_with(is_runner=True, queue='salt', quantity=1, backend='pgjsonb')