test_win_status.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # -*- coding: utf-8 -*-
  2. # Import python libs
  3. from __future__ import absolute_import, unicode_literals, print_function
  4. import sys
  5. # Import Salt libs
  6. from salt.ext import six
  7. # Import Salt Testing libs
  8. from tests.support.unit import skipIf, TestCase
  9. from tests.support.mock import Mock, patch, ANY
  10. try:
  11. import wmi
  12. except ImportError:
  13. pass
  14. # This is imported late so mock can do its job
  15. import salt.modules.win_status as status
  16. @skipIf(status.HAS_WMI is False, 'This test requires Windows')
  17. class TestProcsBase(TestCase):
  18. def __init__(self, *args, **kwargs):
  19. TestCase.__init__(self, *args, **kwargs)
  20. self.__processes = []
  21. def add_process(
  22. self,
  23. pid=100,
  24. cmd='cmd',
  25. name='name',
  26. user='user',
  27. user_domain='domain',
  28. get_owner_result=0):
  29. process = Mock()
  30. process.GetOwner = Mock(
  31. return_value=(user_domain, get_owner_result, user)
  32. )
  33. process.ProcessId = pid
  34. process.CommandLine = cmd
  35. process.Name = name
  36. self.__processes.append(process)
  37. def call_procs(self):
  38. WMI = Mock()
  39. WMI.win32_process = Mock(return_value=self.__processes)
  40. with patch.object(wmi, 'WMI', Mock(return_value=WMI)):
  41. self.result = status.procs()
  42. class TestProcsCount(TestProcsBase):
  43. def setUp(self):
  44. self.add_process(pid=100)
  45. self.add_process(pid=101)
  46. self.call_procs()
  47. def test_process_count(self):
  48. self.assertEqual(len(self.result), 2)
  49. def test_process_key_is_pid(self):
  50. self.assertSetEqual(set(self.result.keys()), set([100, 101]))
  51. class TestProcsAttributes(TestProcsBase):
  52. def setUp(self):
  53. self._expected_name = 'name'
  54. self._expected_cmd = 'cmd'
  55. self._expected_user = 'user'
  56. self._expected_domain = 'domain'
  57. pid = 100
  58. self.add_process(
  59. pid=pid,
  60. cmd=self._expected_cmd,
  61. user=self._expected_user,
  62. user_domain=self._expected_domain,
  63. get_owner_result=0)
  64. self.call_procs()
  65. self.proc = self.result[pid]
  66. def test_process_cmd_is_set(self):
  67. self.assertEqual(self.proc['cmd'], self._expected_cmd)
  68. def test_process_name_is_set(self):
  69. self.assertEqual(self.proc['name'], self._expected_name)
  70. def test_process_user_is_set(self):
  71. self.assertEqual(self.proc['user'], self._expected_user)
  72. def test_process_user_domain_is_set(self):
  73. self.assertEqual(self.proc['user_domain'], self._expected_domain)
  74. @skipIf(sys.stdin.encoding != 'UTF-8', 'UTF-8 encoding required for this test is not supported')
  75. class TestProcsUnicodeAttributes(TestProcsBase):
  76. def setUp(self):
  77. unicode_str = u'\xc1'
  78. self.ustr = unicode_str.encode('utf8') if six.PY2 else unicode_str
  79. pid = 100
  80. self.add_process(
  81. pid=pid,
  82. user=unicode_str,
  83. user_domain=unicode_str,
  84. cmd=unicode_str,
  85. name=unicode_str)
  86. self.call_procs()
  87. self.proc = self.result[pid]
  88. def test_process_cmd_is_utf8(self):
  89. self.assertEqual(self.proc['cmd'], self.ustr)
  90. def test_process_name_is_utf8(self):
  91. self.assertEqual(self.proc['name'], self.ustr)
  92. def test_process_user_is_utf8(self):
  93. self.assertEqual(self.proc['user'], self.ustr)
  94. def test_process_user_domain_is_utf8(self):
  95. self.assertEqual(self.proc['user_domain'], self.ustr)
  96. class TestProcsWMIGetOwnerAccessDeniedWorkaround(TestProcsBase):
  97. def setUp(self):
  98. self.expected_user = 'SYSTEM'
  99. self.expected_domain = 'NT AUTHORITY'
  100. self.add_process(pid=0, get_owner_result=2)
  101. self.add_process(pid=4, get_owner_result=2)
  102. self.call_procs()
  103. def test_user_is_set(self):
  104. self.assertEqual(self.result[0]['user'], self.expected_user)
  105. self.assertEqual(self.result[4]['user'], self.expected_user)
  106. def test_process_user_domain_is_set(self):
  107. self.assertEqual(self.result[0]['user_domain'], self.expected_domain)
  108. self.assertEqual(self.result[4]['user_domain'], self.expected_domain)
  109. class TestProcsWMIGetOwnerErrorsAreLogged(TestProcsBase):
  110. def setUp(self):
  111. self.expected_error_code = 8
  112. self.add_process(get_owner_result=self.expected_error_code)
  113. def test_error_logged_if_process_get_owner_fails(self):
  114. with patch('salt.modules.win_status.log') as log:
  115. self.call_procs()
  116. log.warning.assert_called_once_with(ANY, ANY, self.expected_error_code)
  117. class TestEmptyCommandLine(TestProcsBase):
  118. def setUp(self):
  119. self.expected_error_code = 8
  120. pid = 100
  121. self.add_process(pid=pid, cmd=None)
  122. self.call_procs()
  123. self.proc = self.result[pid]
  124. def test_cmd_is_empty_string(self):
  125. self.assertEqual(self.proc['cmd'], '')
  126. #class TestProcsComInitialization(TestProcsBase):
  127. # def setUp(self):
  128. # call_count = 5
  129. # for _ in range(call_count):
  130. # self.call_procs()
  131. # self.expected_calls = [call()] * call_count
  132. #
  133. # def test_initialize_and_uninitialize_called(self):
  134. # pythoncom.CoInitialize.assert_has_calls(self.expected_calls)
  135. # pythoncom.CoUninitialize.assert_has_calls(self.expected_calls)