1
0

test_win_runas.py 22 KB


  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. import inspect
  4. import io
  5. import logging
  6. import os
  7. import socket
  8. import subprocess
  9. # Service manager imports
  10. import sys
  11. import textwrap
  12. import threading
  13. import time
  14. import traceback
  15. import salt.ext.six
  16. import salt.utils.files
  17. import salt.utils.win_runas
  18. import yaml
  19. from tests.support.case import ModuleCase
  20. from tests.support.helpers import with_system_user
  21. from tests.support.mock import Mock
  22. from tests.support.runtests import RUNTIME_VARS
  23. from tests.support.unit import skipIf
  24. try:
  25. import win32service
  26. import win32serviceutil
  27. import win32event
  28. import servicemanager
  29. import win32api
  30. CODE_DIR = win32api.GetLongPathName(RUNTIME_VARS.CODE_DIR)
  31. HAS_WIN32 = True
  32. except ImportError:
  33. # Mock win32serviceutil object to avoid
  34. # a stacktrace in the _ServiceManager class
  35. win32serviceutil = Mock()
  36. HAS_WIN32 = False
  37. logger = logging.getLogger(__name__)
  38. PASSWORD = "P@ssW0rd"
  39. NOPRIV_STDERR = "ERROR: Logged-on user does not have administrative privilege.\n"
  40. PRIV_STDOUT = (
  41. "\nINFO: The system global flag 'maintain objects list' needs\n "
  42. "to be enabled to see local opened files.\n See Openfiles "
  43. "/? for more information.\n\n\nFiles opened remotely via local share "
  44. "points:\n---------------------------------------------\n\n"
  45. "INFO: No shared open files found.\n"
  46. )
  47. if HAS_WIN32:
  48. RUNAS_PATH = os.path.abspath(os.path.join(CODE_DIR, "runas.py"))
  49. RUNAS_OUT = os.path.abspath(os.path.join(CODE_DIR, "runas.out"))
  50. def default_target(service, *args, **kwargs):
  51. while service.active:
  52. time.sleep(service.timeout)
  53. class _ServiceManager(win32serviceutil.ServiceFramework):
  54. """
  55. A windows service manager
  56. """
  57. _svc_name_ = "Service Manager"
  58. _svc_display_name_ = "Service Manager"
  59. _svc_description_ = "A Service Manager"
  60. run_in_foreground = False
  61. target = default_target
  62. def __init__(self, args, target=None, timeout=60, active=True):
  63. win32serviceutil.ServiceFramework.__init__(self, args)
  64. self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
  65. self.timeout = timeout
  66. self.active = active
  67. if target is not None:
  68. self.target = target
  69. @classmethod
  70. def log_error(cls, msg):
  71. if cls.run_in_foreground:
  72. logger.error(msg)
  73. servicemanager.LogErrorMsg(msg)
  74. @classmethod
  75. def log_info(cls, msg):
  76. if cls.run_in_foreground:
  77. logger.info(msg)
  78. servicemanager.LogInfoMsg(msg)
  79. @classmethod
  80. def log_exception(cls, msg):
  81. if cls.run_in_foreground:
  82. logger.exception(msg)
  83. exc_info = sys.exc_info()
  84. tb = traceback.format_tb(exc_info[2])
  85. servicemanager.LogErrorMsg("{} {} {}".format(msg, exc_info[1], tb))
  86. @property
  87. def timeout_ms(self):
  88. return self.timeout * 1000
  89. def SvcStop(self):
  90. """
  91. Stop the service by; terminating any subprocess call, notify
  92. windows internals of the stop event, set the instance's active
  93. attribute to 'False' so the run loops stop.
  94. """
  95. self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
  96. win32event.SetEvent(self.hWaitStop)
  97. self.active = False
  98. def SvcDoRun(self):
  99. """
  100. Run the monitor in a separete thread so the main thread is
  101. free to react to events sent to the windows service.
  102. """
  103. servicemanager.LogMsg(
  104. servicemanager.EVENTLOG_INFORMATION_TYPE,
  105. servicemanager.PYS_SERVICE_STARTED,
  106. (self._svc_name_, ""),
  107. )
  108. self.log_info("Starting Service {}".format(self._svc_name_))
  109. monitor_thread = threading.Thread(target=self.target_thread)
  110. monitor_thread.start()
  111. while self.active:
  112. rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout_ms)
  113. if rc == win32event.WAIT_OBJECT_0:
  114. # Stop signal encountered
  115. self.log_info("Stopping Service")
  116. break
  117. if not monitor_thread.is_alive():
  118. self.log_info("Update Thread Died, Stopping Service")
  119. break
  120. def target_thread(self, *args, **kwargs):
  121. """
  122. Target Thread, handles any exception in the target method and
  123. logs them.
  124. """
  125. self.log_info("Monitor")
  126. try:
  127. self.target(self, *args, **kwargs)
  128. except Exception as exc: # pylint: disable=broad-except
  129. # TODO: Add traceback info to windows event log objects
  130. self.log_exception("Exception In Target")
  131. @classmethod
  132. def install(cls, username=None, password=None, start_type=None):
  133. if hasattr(cls, "_svc_reg_class_"):
  134. svc_class = cls._svc_reg_class_
  135. else:
  136. svc_class = win32serviceutil.GetServiceClassString(cls)
  137. win32serviceutil.InstallService(
  138. svc_class,
  139. cls._svc_name_,
  140. cls._svc_display_name_,
  141. description=cls._svc_description_,
  142. userName=username,
  143. password=password,
  144. startType=start_type,
  145. )
  146. @classmethod
  147. def remove(cls):
  148. win32serviceutil.RemoveService(cls._svc_name_)
  149. @classmethod
  150. def start(cls):
  151. win32serviceutil.StartService(cls._svc_name_)
  152. @classmethod
  153. def restart(cls):
  154. win32serviceutil.RestartService(cls._svc_name_)
  155. @classmethod
  156. def stop(cls):
  157. win32serviceutil.StopService(cls._svc_name_)
  158. def service_class_factory(
  159. cls_name,
  160. name,
  161. target=default_target,
  162. display_name="",
  163. description="",
  164. run_in_foreground=False,
  165. ):
  166. frm = inspect.stack()[1]
  167. mod = inspect.getmodule(frm[0])
  168. if salt.ext.six.PY2:
  169. cls_name = cls_name.encode()
  170. return type(
  171. cls_name,
  172. (_ServiceManager, object),
  173. {
  174. "__module__": mod.__name__,
  175. "_svc_name_": name,
  176. "_svc_display_name_": display_name or name,
  177. "_svc_description_": description,
  178. "run_in_foreground": run_in_foreground,
  179. "target": target,
  180. },
  181. )
  182. if HAS_WIN32:
  183. test_service = service_class_factory("test_service", "test service")
  184. SERVICE_SOURCE = """
  185. from __future__ import absolute_import, unicode_literals
  186. import logging
  187. logger = logging.getLogger()
  188. logging.basicConfig(level=logging.DEBUG, format="%(message)s")
  189. from tests.integration.utils.test_win_runas import service_class_factory
  190. import salt.utils.win_runas
  191. import sys
  192. import yaml
  193. OUTPUT = {}
  194. USERNAME = '{}'
  195. PASSWORD = '{}'
  196. def target(service, *args, **kwargs):
  197. service.log_info("target start")
  198. if PASSWORD:
  199. ret = salt.utils.win_runas.runas(
  200. 'cmd.exe /C OPENFILES',
  201. username=USERNAME,
  202. password=PASSWORD,
  203. )
  204. else:
  205. ret = salt.utils.win_runas.runas(
  206. 'cmd.exe /C OPENFILES',
  207. username=USERNAME,
  208. )
  209. service.log_info("win_runas returned %s" % ret)
  210. with open(OUTPUT, 'w') as fp:
  211. yaml.dump(ret, fp)
  212. service.log_info("target stop")
  213. # This class will get imported and run as the service
  214. test_service = service_class_factory('test_service', 'test service', target=target)
  215. if __name__ == '__main__':
  216. try:
  217. test_service.stop()
  218. except Exception as exc: # pylint: disable=broad-except
  219. logger.debug("stop service failed, this is ok.")
  220. try:
  221. test_service.remove()
  222. except Exception as exc: # pylint: disable=broad-except
  223. logger.debug("remove service failed, this os ok.")
  224. test_service.install()
  225. sys.exit(0)
  226. """
  227. def wait_for_service(name, timeout=200):
  228. start = time.time()
  229. while True:
  230. status = win32serviceutil.QueryServiceStatus(name)
  231. if status[1] == win32service.SERVICE_STOPPED:
  232. break
  233. if time.time() - start > timeout:
  234. raise TimeoutError(
  235. "Timeout waiting for service"
  236. ) # pylint: disable=undefined-variable
  237. time.sleep(0.3)
  238. @skipIf(not HAS_WIN32, "This test runs only on windows.")
  239. class RunAsTest(ModuleCase):
  240. @classmethod
  241. def setUpClass(cls):
  242. super(RunAsTest, cls).setUpClass()
  243. cls.hostname = socket.gethostname()
  244. @with_system_user(
  245. "test-runas", on_existing="delete", delete=True, password=PASSWORD
  246. )
  247. def test_runas(self, username):
  248. ret = salt.utils.win_runas.runas("cmd.exe /C OPENFILES", username, PASSWORD)
  249. self.assertEqual(ret["stdout"], "")
  250. self.assertEqual(ret["stderr"], NOPRIV_STDERR)
  251. self.assertEqual(ret["retcode"], 1)
  252. @with_system_user(
  253. "test-runas", on_existing="delete", delete=True, password=PASSWORD
  254. )
  255. def test_runas_no_pass(self, username):
  256. ret = salt.utils.win_runas.runas("cmd.exe /C OPENFILES", username)
  257. self.assertEqual(ret["stdout"], "")
  258. self.assertEqual(ret["stderr"], NOPRIV_STDERR)
  259. self.assertEqual(ret["retcode"], 1)
  260. @with_system_user(
  261. "test-runas-admin",
  262. on_existing="delete",
  263. delete=True,
  264. password=PASSWORD,
  265. groups=["Administrators"],
  266. )
  267. def test_runas_admin(self, username):
  268. ret = salt.utils.win_runas.runas("cmd.exe /C OPENFILES", username, PASSWORD)
  269. self.assertEqual(ret["stdout"], PRIV_STDOUT)
  270. self.assertEqual(ret["stderr"], "")
  271. self.assertEqual(ret["retcode"], 0)
  272. @with_system_user(
  273. "test-runas-admin",
  274. on_existing="delete",
  275. delete=True,
  276. password=PASSWORD,
  277. groups=["Administrators"],
  278. )
  279. def test_runas_admin_no_pass(self, username):
  280. ret = salt.utils.win_runas.runas("cmd.exe /C OPENFILES", username)
  281. self.assertEqual(ret["stdout"], PRIV_STDOUT)
  282. self.assertEqual(ret["stderr"], "")
  283. self.assertEqual(ret["retcode"], 0)
  284. def test_runas_system_user(self):
  285. ret = salt.utils.win_runas.runas("cmd.exe /C OPENFILES", "SYSTEM")
  286. self.assertEqual(ret["stdout"], PRIV_STDOUT)
  287. self.assertEqual(ret["stderr"], "")
  288. self.assertEqual(ret["retcode"], 0)
  289. def test_runas_network_service(self):
  290. ret = salt.utils.win_runas.runas("cmd.exe /C OPENFILES", "NETWORK SERVICE")
  291. self.assertEqual(ret["stdout"], "")
  292. self.assertEqual(ret["stderr"], NOPRIV_STDERR)
  293. self.assertEqual(ret["retcode"], 1)
  294. def test_runas_local_service(self):
  295. ret = salt.utils.win_runas.runas("cmd.exe /C OPENFILES", "LOCAL SERVICE")
  296. self.assertEqual(ret["stdout"], "")
  297. self.assertEqual(ret["stderr"], NOPRIV_STDERR)
  298. self.assertEqual(ret["retcode"], 1)
  299. @with_system_user(
  300. "test-runas", on_existing="delete", delete=True, password=PASSWORD
  301. )
  302. def test_runas_winrs(self, username):
  303. runaspy = textwrap.dedent(
  304. """
  305. import sys
  306. import salt.utils.win_runas
  307. username = '{}'
  308. password = '{}'
  309. sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username, password)['retcode'])
  310. """.format(
  311. username, PASSWORD
  312. )
  313. )
  314. with salt.utils.files.fopen(RUNAS_PATH, "w") as fp:
  315. fp.write(runaspy)
  316. ret = subprocess.call(
  317. "cmd.exe /C winrs /r:{} python {}".format(self.hostname, RUNAS_PATH),
  318. shell=True,
  319. )
  320. self.assertEqual(ret, 1)
  321. @with_system_user(
  322. "test-runas", on_existing="delete", delete=True, password=PASSWORD
  323. )
  324. def test_runas_winrs_no_pass(self, username):
  325. runaspy = textwrap.dedent(
  326. """
  327. import sys
  328. import salt.utils.win_runas
  329. username = '{}'
  330. sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username)['retcode'])
  331. """.format(
  332. username
  333. )
  334. )
  335. with salt.utils.files.fopen(RUNAS_PATH, "w") as fp:
  336. fp.write(runaspy)
  337. ret = subprocess.call(
  338. "cmd.exe /C winrs /r:{} python {}".format(self.hostname, RUNAS_PATH),
  339. shell=True,
  340. )
  341. self.assertEqual(ret, 1)
  342. @with_system_user(
  343. "test-runas-admin",
  344. on_existing="delete",
  345. delete=True,
  346. password=PASSWORD,
  347. groups=["Administrators"],
  348. )
  349. def test_runas_winrs_admin(self, username):
  350. runaspy = textwrap.dedent(
  351. """
  352. import sys
  353. import salt.utils.win_runas
  354. username = '{}'
  355. password = '{}'
  356. sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username, password)['retcode'])
  357. """.format(
  358. username, PASSWORD
  359. )
  360. )
  361. with salt.utils.files.fopen(RUNAS_PATH, "w") as fp:
  362. fp.write(runaspy)
  363. ret = subprocess.call(
  364. "cmd.exe /C winrs /r:{} python {}".format(self.hostname, RUNAS_PATH),
  365. shell=True,
  366. )
  367. self.assertEqual(ret, 0)
  368. @with_system_user(
  369. "test-runas-admin",
  370. on_existing="delete",
  371. delete=True,
  372. password=PASSWORD,
  373. groups=["Administrators"],
  374. )
  375. def test_runas_winrs_admin_no_pass(self, username):
  376. runaspy = textwrap.dedent(
  377. """
  378. import sys
  379. import salt.utils.win_runas
  380. username = '{}'
  381. sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username)['retcode'])
  382. """.format(
  383. username
  384. )
  385. )
  386. with salt.utils.files.fopen(RUNAS_PATH, "w") as fp:
  387. fp.write(runaspy)
  388. ret = subprocess.call(
  389. "cmd.exe /C winrs /r:{} python {}".format(self.hostname, RUNAS_PATH),
  390. shell=True,
  391. )
  392. self.assertEqual(ret, 0)
  393. def test_runas_winrs_system_user(self):
  394. runaspy = textwrap.dedent(
  395. """
  396. import sys
  397. import salt.utils.win_runas
  398. sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', 'SYSTEM')['retcode'])
  399. """
  400. )
  401. with salt.utils.files.fopen(RUNAS_PATH, "w") as fp:
  402. fp.write(runaspy)
  403. ret = subprocess.call(
  404. "cmd.exe /C winrs /r:{} python {}".format(self.hostname, RUNAS_PATH),
  405. shell=True,
  406. )
  407. self.assertEqual(ret, 0)
  408. def test_runas_winrs_network_service_user(self):
  409. runaspy = textwrap.dedent(
  410. """
  411. import sys
  412. import salt.utils.win_runas
  413. sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', 'NETWORK SERVICE')['retcode'])
  414. """
  415. )
  416. with salt.utils.files.fopen(RUNAS_PATH, "w") as fp:
  417. fp.write(runaspy)
  418. ret = subprocess.call(
  419. "cmd.exe /C winrs /r:{} python {}".format(self.hostname, RUNAS_PATH),
  420. shell=True,
  421. )
  422. self.assertEqual(ret, 1)
  423. def test_runas_winrs_local_service_user(self):
  424. runaspy = textwrap.dedent(
  425. """
  426. import sys
  427. import salt.utils.win_runas
  428. sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', 'LOCAL SERVICE')['retcode'])
  429. """
  430. )
  431. with salt.utils.files.fopen(RUNAS_PATH, "w") as fp:
  432. fp.write(runaspy)
  433. ret = subprocess.call(
  434. "cmd.exe /C winrs /r:{} python {}".format(self.hostname, RUNAS_PATH),
  435. shell=True,
  436. )
  437. self.assertEqual(ret, 1)
  438. @with_system_user(
  439. "test-runas", on_existing="delete", delete=True, password=PASSWORD
  440. )
  441. def test_runas_powershell_remoting(self, username):
  442. psrp_wrap = "powershell Invoke-Command -ComputerName {} -ScriptBlock {{ {} }}"
  443. runaspy = textwrap.dedent(
  444. """
  445. import sys
  446. import salt.utils.win_runas
  447. username = '{}'
  448. password = '{}'
  449. sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username, password)['retcode'])
  450. """.format(
  451. username, PASSWORD
  452. )
  453. )
  454. with salt.utils.files.fopen(RUNAS_PATH, "w") as fp:
  455. fp.write(runaspy)
  456. cmd = "python.exe {}".format(RUNAS_PATH)
  457. ret = subprocess.call(psrp_wrap.format(self.hostname, cmd), shell=True)
  458. self.assertEqual(ret, 1)
  459. @with_system_user(
  460. "test-runas", on_existing="delete", delete=True, password=PASSWORD
  461. )
  462. def test_runas_powershell_remoting_no_pass(self, username):
  463. psrp_wrap = "powershell Invoke-Command -ComputerName {} -ScriptBlock {{ {} }}"
  464. runaspy = textwrap.dedent(
  465. """
  466. import sys
  467. import salt.utils.win_runas
  468. username = '{}'
  469. sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username)['retcode'])
  470. """.format(
  471. username
  472. )
  473. )
  474. with salt.utils.files.fopen(RUNAS_PATH, "w") as fp:
  475. fp.write(runaspy)
  476. cmd = "python.exe {}".format(RUNAS_PATH)
  477. ret = subprocess.call(psrp_wrap.format(self.hostname, cmd), shell=True)
  478. self.assertEqual(ret, 1)
  479. @with_system_user(
  480. "test-runas-admin",
  481. on_existing="delete",
  482. delete=True,
  483. password=PASSWORD,
  484. groups=["Administrators"],
  485. )
  486. def test_runas_powershell_remoting_admin(self, username):
  487. psrp_wrap = "powershell Invoke-Command -ComputerName {} -ScriptBlock {{ {} }}; exit $LASTEXITCODE"
  488. runaspy = textwrap.dedent(
  489. """
  490. import sys
  491. import salt.utils.win_runas
  492. username = '{}'
  493. password = '{}'
  494. ret = salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username, password)
  495. sys.exit(ret['retcode'])
  496. """.format(
  497. username, PASSWORD
  498. )
  499. )
  500. with salt.utils.files.fopen(RUNAS_PATH, "w") as fp:
  501. fp.write(runaspy)
  502. cmd = "python.exe {}; exit $LASTEXITCODE".format(RUNAS_PATH)
  503. ret = subprocess.call(psrp_wrap.format(self.hostname, cmd), shell=True)
  504. self.assertEqual(ret, 0)
  505. @with_system_user(
  506. "test-runas-admin",
  507. on_existing="delete",
  508. delete=True,
  509. password=PASSWORD,
  510. groups=["Administrators"],
  511. )
  512. def test_runas_powershell_remoting_admin_no_pass(self, username):
  513. psrp_wrap = "powershell Invoke-Command -ComputerName {} -ScriptBlock {{ {} }}; exit $LASTEXITCODE"
  514. runaspy = textwrap.dedent(
  515. """
  516. import sys
  517. import salt.utils.win_runas
  518. username = '{}'
  519. sys.exit(salt.utils.win_runas.runas('cmd.exe /C OPENFILES', username)['retcode'])
  520. """.format(
  521. username
  522. )
  523. )
  524. with salt.utils.files.fopen(RUNAS_PATH, "w") as fp:
  525. fp.write(runaspy)
  526. cmd = "python.exe {}; exit $LASTEXITCODE".format(RUNAS_PATH)
  527. ret = subprocess.call(psrp_wrap.format(self.hostname, cmd), shell=True)
  528. self.assertEqual(ret, 0)
  529. @with_system_user(
  530. "test-runas", on_existing="delete", delete=True, password=PASSWORD
  531. )
  532. def test_runas_service(self, username, timeout=200):
  533. if os.path.exists(RUNAS_OUT):
  534. os.remove(RUNAS_OUT)
  535. assert not os.path.exists(RUNAS_OUT)
  536. runaspy = SERVICE_SOURCE.format(repr(RUNAS_OUT), username, PASSWORD)
  537. with io.open(RUNAS_PATH, "w", encoding="utf-8") as fp:
  538. fp.write(runaspy)
  539. cmd = "python.exe {}".format(RUNAS_PATH)
  540. ret = subprocess.call(cmd, shell=True)
  541. self.assertEqual(ret, 0)
  542. win32serviceutil.StartService("test service")
  543. wait_for_service("test service")
  544. with salt.utils.files.fopen(RUNAS_OUT, "r") as fp:
  545. ret = yaml.load(fp)
  546. assert ret["retcode"] == 1, ret
  547. @with_system_user(
  548. "test-runas", on_existing="delete", delete=True, password=PASSWORD
  549. )
  550. def test_runas_service_no_pass(self, username, timeout=200):
  551. if os.path.exists(RUNAS_OUT):
  552. os.remove(RUNAS_OUT)
  553. assert not os.path.exists(RUNAS_OUT)
  554. runaspy = SERVICE_SOURCE.format(repr(RUNAS_OUT), username, "")
  555. with io.open(RUNAS_PATH, "w", encoding="utf-8") as fp:
  556. fp.write(runaspy)
  557. cmd = "python.exe {}".format(RUNAS_PATH)
  558. ret = subprocess.call(cmd, shell=True)
  559. self.assertEqual(ret, 0)
  560. win32serviceutil.StartService("test service")
  561. wait_for_service("test service")
  562. with salt.utils.files.fopen(RUNAS_OUT, "r") as fp:
  563. ret = yaml.load(fp)
  564. assert ret["retcode"] == 1, ret
  565. @with_system_user(
  566. "test-runas-admin",
  567. on_existing="delete",
  568. delete=True,
  569. password=PASSWORD,
  570. groups=["Administrators"],
  571. )
  572. def test_runas_service_admin(self, username, timeout=200):
  573. if os.path.exists(RUNAS_OUT):
  574. os.remove(RUNAS_OUT)
  575. assert not os.path.exists(RUNAS_OUT)
  576. runaspy = SERVICE_SOURCE.format(repr(RUNAS_OUT), username, PASSWORD)
  577. with io.open(RUNAS_PATH, "w", encoding="utf-8") as fp:
  578. fp.write(runaspy)
  579. cmd = "python.exe {}".format(RUNAS_PATH)
  580. ret = subprocess.call(cmd, shell=True)
  581. self.assertEqual(ret, 0)
  582. win32serviceutil.StartService("test service")
  583. wait_for_service("test service")
  584. with salt.utils.files.fopen(RUNAS_OUT, "r") as fp:
  585. ret = yaml.load(fp)
  586. assert ret["retcode"] == 0, ret
  587. @with_system_user(
  588. "test-runas-admin",
  589. on_existing="delete",
  590. delete=True,
  591. password=PASSWORD,
  592. groups=["Administrators"],
  593. )
  594. def test_runas_service_admin_no_pass(self, username, timeout=200):
  595. if os.path.exists(RUNAS_OUT):
  596. os.remove(RUNAS_OUT)
  597. assert not os.path.exists(RUNAS_OUT)
  598. runaspy = SERVICE_SOURCE.format(repr(RUNAS_OUT), username, "")
  599. with io.open(RUNAS_PATH, "w", encoding="utf-8") as fp:
  600. fp.write(runaspy)
  601. cmd = "python.exe {}".format(RUNAS_PATH)
  602. ret = subprocess.call(cmd, shell=True)
  603. self.assertEqual(ret, 0)
  604. win32serviceutil.StartService("test service")
  605. wait_for_service("test service")
  606. with salt.utils.files.fopen(RUNAS_OUT, "r") as fp:
  607. ret = yaml.load(fp)
  608. assert ret["retcode"] == 0, ret
  609. def test_runas_service_system_user(self):
  610. if os.path.exists(RUNAS_OUT):
  611. os.remove(RUNAS_OUT)
  612. assert not os.path.exists(RUNAS_OUT)
  613. runaspy = SERVICE_SOURCE.format(repr(RUNAS_OUT), "SYSTEM", "")
  614. with io.open(RUNAS_PATH, "w", encoding="utf-8") as fp:
  615. fp.write(runaspy)
  616. cmd = "python.exe {}".format(RUNAS_PATH)
  617. ret = subprocess.call(cmd, shell=True)
  618. self.assertEqual(ret, 0)
  619. win32serviceutil.StartService("test service")
  620. wait_for_service("test service")
  621. with salt.utils.files.fopen(RUNAS_OUT, "r") as fp:
  622. ret = yaml.load(fp)
  623. assert ret["retcode"] == 0, ret