1
0

test_win_status.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. # -*- coding: utf-8 -*-
  2. # Import python libs
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. import sys
  5. # This is imported late so mock can do its job
  6. import salt.modules.win_status as status
  7. # Import Salt libs
  8. from salt.ext import six
  9. from tests.support.mock import ANY, Mock, patch
  10. # Import Salt Testing libs
  11. from tests.support.unit import TestCase, skipIf
  12. try:
  13. import wmi
  14. except ImportError:
  15. pass
  16. @skipIf(status.HAS_WMI is False, "This test requires Windows")
  17. class TestProcsBase(TestCase):
  18. def __init__(self, *args, **kwargs):
  19. TestCase.__init__(self, *args, **kwargs)
  20. self.__processes = []
  21. def add_process(
  22. self,
  23. pid=100,
  24. cmd="cmd",
  25. name="name",
  26. user="user",
  27. user_domain="domain",
  28. get_owner_result=0,
  29. ):
  30. process = Mock()
  31. process.GetOwner = Mock(return_value=(user_domain, get_owner_result, user))
  32. process.ProcessId = pid
  33. process.CommandLine = cmd
  34. process.Name = name
  35. self.__processes.append(process)
  36. def call_procs(self):
  37. WMI = Mock()
  38. WMI.win32_process = Mock(return_value=self.__processes)
  39. with patch.object(wmi, "WMI", Mock(return_value=WMI)):
  40. self.result = status.procs()
  41. class TestProcsCount(TestProcsBase):
  42. def setUp(self):
  43. self.add_process(pid=100)
  44. self.add_process(pid=101)
  45. self.call_procs()
  46. def test_process_count(self):
  47. self.assertEqual(len(self.result), 2)
  48. def test_process_key_is_pid(self):
  49. self.assertSetEqual(set(self.result.keys()), set([100, 101]))
  50. class TestProcsAttributes(TestProcsBase):
  51. def setUp(self):
  52. self._expected_name = "name"
  53. self._expected_cmd = "cmd"
  54. self._expected_user = "user"
  55. self._expected_domain = "domain"
  56. pid = 100
  57. self.add_process(
  58. pid=pid,
  59. cmd=self._expected_cmd,
  60. user=self._expected_user,
  61. user_domain=self._expected_domain,
  62. get_owner_result=0,
  63. )
  64. self.call_procs()
  65. self.proc = self.result[pid]
  66. def test_process_cmd_is_set(self):
  67. self.assertEqual(self.proc["cmd"], self._expected_cmd)
  68. def test_process_name_is_set(self):
  69. self.assertEqual(self.proc["name"], self._expected_name)
  70. def test_process_user_is_set(self):
  71. self.assertEqual(self.proc["user"], self._expected_user)
  72. def test_process_user_domain_is_set(self):
  73. self.assertEqual(self.proc["user_domain"], self._expected_domain)
  74. @skipIf(
  75. sys.stdin.encoding != "UTF-8",
  76. "UTF-8 encoding required for this test is not supported",
  77. )
  78. class TestProcsUnicodeAttributes(TestProcsBase):
  79. def setUp(self):
  80. unicode_str = "\xc1"
  81. self.ustr = unicode_str.encode("utf8") if six.PY2 else unicode_str
  82. pid = 100
  83. self.add_process(
  84. pid=pid,
  85. user=unicode_str,
  86. user_domain=unicode_str,
  87. cmd=unicode_str,
  88. name=unicode_str,
  89. )
  90. self.call_procs()
  91. self.proc = self.result[pid]
  92. def test_process_cmd_is_utf8(self):
  93. self.assertEqual(self.proc["cmd"], self.ustr)
  94. def test_process_name_is_utf8(self):
  95. self.assertEqual(self.proc["name"], self.ustr)
  96. def test_process_user_is_utf8(self):
  97. self.assertEqual(self.proc["user"], self.ustr)
  98. def test_process_user_domain_is_utf8(self):
  99. self.assertEqual(self.proc["user_domain"], self.ustr)
  100. class TestProcsWMIGetOwnerAccessDeniedWorkaround(TestProcsBase):
  101. def setUp(self):
  102. self.expected_user = "SYSTEM"
  103. self.expected_domain = "NT AUTHORITY"
  104. self.add_process(pid=0, get_owner_result=2)
  105. self.add_process(pid=4, get_owner_result=2)
  106. self.call_procs()
  107. def test_user_is_set(self):
  108. self.assertEqual(self.result[0]["user"], self.expected_user)
  109. self.assertEqual(self.result[4]["user"], self.expected_user)
  110. def test_process_user_domain_is_set(self):
  111. self.assertEqual(self.result[0]["user_domain"], self.expected_domain)
  112. self.assertEqual(self.result[4]["user_domain"], self.expected_domain)
  113. class TestProcsWMIGetOwnerErrorsAreLogged(TestProcsBase):
  114. def setUp(self):
  115. self.expected_error_code = 8
  116. self.add_process(get_owner_result=self.expected_error_code)
  117. def test_error_logged_if_process_get_owner_fails(self):
  118. with patch("salt.modules.win_status.log") as log:
  119. self.call_procs()
  120. log.warning.assert_called_once_with(ANY, ANY, self.expected_error_code)
  121. class TestEmptyCommandLine(TestProcsBase):
  122. def setUp(self):
  123. self.expected_error_code = 8
  124. pid = 100
  125. self.add_process(pid=pid, cmd=None)
  126. self.call_procs()
  127. self.proc = self.result[pid]
  128. def test_cmd_is_empty_string(self):
  129. self.assertEqual(self.proc["cmd"], "")
  130. # class TestProcsComInitialization(TestProcsBase):
  131. # def setUp(self):
  132. # call_count = 5
  133. # for _ in range(call_count):
  134. # self.call_procs()
  135. # self.expected_calls = [call()] * call_count
  136. #
  137. # def test_initialize_and_uninitialize_called(self):
  138. # pythoncom.CoInitialize.assert_has_calls(self.expected_calls)
  139. # pythoncom.CoUninitialize.assert_has_calls(self.expected_calls)