1
0

test_system.py 18 KB


  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import datetime
  4. import logging
  5. import os
  6. import signal
  7. import subprocess
  8. import textwrap
  9. import time
  10. import pytest
  11. import salt.states.file
  12. import salt.utils.files
  13. import salt.utils.path
  14. import salt.utils.platform
  15. from salt.ext import six
  16. from salt.ext.six.moves import range
  17. from tests.support.case import ModuleCase
  18. from tests.support.helpers import (
  19. destructiveTest,
  20. flaky,
  21. runs_on,
  22. skip_if_not_root,
  23. slowTest,
  24. )
  25. from tests.support.unit import skipIf
  26. log = logging.getLogger(__name__)
  27. @runs_on(kernel="Linux")
  28. class SystemModuleTest(ModuleCase):
  29. """
  30. Validate the date/time functions in the system module
  31. """
  32. _hwclock_has_compare_ = None
  33. _systemd_timesyncd_available_ = None
  34. @classmethod
  35. def setUpClass(cls):
  36. cls.fmt_str = "%Y-%m-%d %H:%M:%S"
  37. cls._orig_time = None
  38. cls._machine_info = True
  39. @classmethod
  40. def tearDownClass(cls):
  41. for name in ("fmt_str", "_orig_time", "_machine_info"):
  42. delattr(cls, name)
  43. def setUp(self):
  44. super(SystemModuleTest, self).setUp()
  45. if self._systemd_timesyncd_available_ is None:
  46. SystemModuleTest._systemd_timesyncd_available_ = self.run_function(
  47. "service.available", ["systemd-timesyncd"]
  48. )
  49. if self._systemd_timesyncd_available_:
  50. self.run_function("service.stop", ["systemd-timesyncd"])
  51. def tearDown(self):
  52. if self._orig_time is not None:
  53. self._restore_time()
  54. self._orig_time = None
  55. if self._machine_info is not True:
  56. self._restore_machine_info()
  57. self._machine_info = True
  58. if self._systemd_timesyncd_available_:
  59. self.run_function("service.start", ["systemd-timesyncd"])
  60. def _save_time(self):
  61. self._orig_time = datetime.datetime.utcnow()
  62. def _set_time(self, new_time, offset=None):
  63. t = new_time.timetuple()[:6]
  64. t += (offset,)
  65. return self.run_function("system.set_system_date_time", t)
  66. def _restore_time(self):
  67. result = self._set_time(self._orig_time, "+0000")
  68. self.assertTrue(result, msg="Unable to restore time properly")
  69. def _same_times(self, t1, t2, seconds_diff=30):
  70. """
  71. Helper function to check if two datetime objects
  72. are close enough to the same time.
  73. """
  74. return abs(t1 - t2) < datetime.timedelta(seconds=seconds_diff)
  75. def _hwclock_has_compare(self):
  76. """
  77. Some builds of hwclock don't include the `--compare` function
  78. needed to test hw/sw clock synchronization. Returns false on
  79. systems where it's not present so that we can skip the
  80. comparison portion of the test.
  81. """
  82. if self._hwclock_has_compare_ is None:
  83. res = self.run_function("cmd.run_all", cmd="hwclock -h")
  84. SystemModuleTest._hwclock_has_compare_ = (
  85. res["retcode"] == 0 and res["stdout"].find("--compare") > 0
  86. )
  87. return self._hwclock_has_compare_
  88. def _test_hwclock_sync(self):
  89. """
  90. Check that hw and sw clocks are sync'd.
  91. """
  92. if not self.run_function("system.has_settable_hwclock"):
  93. return None
  94. if not self._hwclock_has_compare():
  95. return None
  96. class CompareTimeout(BaseException):
  97. pass
  98. def _alrm_handler(sig, frame):
  99. log.warning("hwclock --compare failed to produce output after 3 seconds")
  100. raise CompareTimeout
  101. for _ in range(2):
  102. try:
  103. orig_handler = signal.signal(signal.SIGALRM, _alrm_handler)
  104. signal.alarm(3)
  105. rpipeFd, wpipeFd = os.pipe()
  106. log.debug("Comparing hwclock to sys clock")
  107. with os.fdopen(rpipeFd, "r") as rpipe:
  108. with os.fdopen(wpipeFd, "w") as wpipe:
  109. with salt.utils.files.fopen(os.devnull, "r") as nulFd:
  110. p = subprocess.Popen(
  111. args=["hwclock", "--compare"],
  112. stdin=nulFd,
  113. stdout=wpipeFd,
  114. stderr=subprocess.PIPE,
  115. )
  116. p.communicate()
  117. # read header
  118. rpipe.readline()
  119. # read first time comparison
  120. timeCompStr = rpipe.readline()
  121. # stop
  122. p.terminate()
  123. timeComp = timeCompStr.split()
  124. hwTime = float(timeComp[0])
  125. swTime = float(timeComp[1])
  126. diff = abs(hwTime - swTime)
  127. self.assertTrue(
  128. diff <= 2.0,
  129. msg=(
  130. "hwclock difference too big: "
  131. + six.text_type(timeCompStr)
  132. ),
  133. )
  134. break
  135. except CompareTimeout:
  136. p.terminate()
  137. finally:
  138. signal.alarm(0)
  139. signal.signal(signal.SIGALRM, orig_handler)
  140. else:
  141. log.error("Failed to check hwclock sync")
  142. def _save_machine_info(self):
  143. if os.path.isfile("/etc/machine-info"):
  144. with salt.utils.files.fopen("/etc/machine-info", "r") as mach_info:
  145. self._machine_info = mach_info.read()
  146. else:
  147. self._machine_info = False
  148. def _restore_machine_info(self):
  149. if self._machine_info is not False:
  150. with salt.utils.files.fopen("/etc/machine-info", "w") as mach_info:
  151. mach_info.write(self._machine_info)
  152. else:
  153. self.run_function("file.remove", ["/etc/machine-info"])
  154. @slowTest
  155. def test_get_system_date_time(self):
  156. """
  157. Test we are able to get the correct time
  158. """
  159. t1 = datetime.datetime.now()
  160. res = self.run_function("system.get_system_date_time")
  161. t2 = datetime.datetime.strptime(res, self.fmt_str)
  162. msg = "Difference in times is too large. Now: {0} Fake: {1}".format(t1, t2)
  163. self.assertTrue(self._same_times(t1, t2, seconds_diff=2), msg=msg)
  164. @slowTest
  165. def test_get_system_date_time_utc(self):
  166. """
  167. Test we are able to get the correct time with utc
  168. """
  169. t1 = datetime.datetime.utcnow()
  170. res = self.run_function("system.get_system_date_time", utc_offset="+0000")
  171. t2 = datetime.datetime.strptime(res, self.fmt_str)
  172. msg = "Difference in times is too large. Now: {0} Fake: {1}".format(t1, t2)
  173. self.assertTrue(self._same_times(t1, t2, seconds_diff=2), msg=msg)
  174. @destructiveTest
  175. @skip_if_not_root
  176. @slowTest
  177. def test_set_system_date_time(self):
  178. """
  179. Test changing the system clock. We are only able to set it up to a
  180. resolution of a second so this test may appear to run in negative time.
  181. """
  182. self._save_time()
  183. cmp_time = datetime.datetime.now() - datetime.timedelta(days=7)
  184. result = self._set_time(cmp_time)
  185. time_now = datetime.datetime.now()
  186. msg = "Difference in times is too large. Now: {0} Fake: {1}".format(
  187. time_now, cmp_time
  188. )
  189. self.assertTrue(result and self._same_times(time_now, cmp_time), msg=msg)
  190. self._test_hwclock_sync()
  191. @destructiveTest
  192. @skip_if_not_root
  193. @slowTest
  194. def test_set_system_date_time_utc(self):
  195. """
  196. Test changing the system clock. We are only able to set it up to a
  197. resolution of a second so this test may appear to run in negative time.
  198. """
  199. self._save_time()
  200. cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7)
  201. result = self._set_time(cmp_time, offset="+0000")
  202. time_now = datetime.datetime.utcnow()
  203. msg = "Difference in times is too large. Now: {0} Fake: {1}".format(
  204. time_now, cmp_time
  205. )
  206. self.assertTrue(result)
  207. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  208. self._test_hwclock_sync()
  209. @destructiveTest
  210. @skip_if_not_root
  211. @slowTest
  212. def test_set_system_date_time_utcoffset_east(self):
  213. """
  214. Test changing the system clock. We are only able to set it up to a
  215. resolution of a second so this test may appear to run in negative time.
  216. """
  217. self._save_time()
  218. cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7)
  219. # 25200 seconds = 7 hours
  220. time_to_set = cmp_time - datetime.timedelta(seconds=25200)
  221. result = self._set_time(time_to_set, offset="-0700")
  222. time_now = datetime.datetime.utcnow()
  223. msg = "Difference in times is too large. Now: {0} Fake: {1}".format(
  224. time_now, cmp_time
  225. )
  226. self.assertTrue(result)
  227. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  228. self._test_hwclock_sync()
  229. @destructiveTest
  230. @skip_if_not_root
  231. @slowTest
  232. def test_set_system_date_time_utcoffset_west(self):
  233. """
  234. Test changing the system clock. We are only able to set it up to a
  235. resolution of a second so this test may appear to run in negative time.
  236. """
  237. self._save_time()
  238. cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7)
  239. # 7200 seconds = 2 hours
  240. time_to_set = cmp_time + datetime.timedelta(seconds=7200)
  241. result = self._set_time(time_to_set, offset="+0200")
  242. time_now = datetime.datetime.utcnow()
  243. msg = "Difference in times is too large. Now: {0} Fake: {1}".format(
  244. time_now, cmp_time
  245. )
  246. self.assertTrue(result)
  247. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  248. self._test_hwclock_sync()
  249. @flaky
  250. @destructiveTest
  251. @skip_if_not_root
  252. @slowTest
  253. def test_set_system_time(self):
  254. """
  255. Test setting the system time without adjusting the date.
  256. """
  257. cmp_time = datetime.datetime.now().replace(hour=10, minute=5, second=0)
  258. self._save_time()
  259. result = self.run_function("system.set_system_time", ["10:05:00"])
  260. time_now = datetime.datetime.now()
  261. msg = "Difference in times is too large. Now: {0} Fake: {1}".format(
  262. time_now, cmp_time
  263. )
  264. self.assertTrue(result)
  265. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  266. self._test_hwclock_sync()
  267. @destructiveTest
  268. @skip_if_not_root
  269. @slowTest
  270. def test_set_system_date(self):
  271. """
  272. Test setting the system date without adjusting the time.
  273. """
  274. cmp_time = datetime.datetime.now() - datetime.timedelta(days=7)
  275. self._save_time()
  276. result = self.run_function(
  277. "system.set_system_date", [cmp_time.strftime("%Y-%m-%d")]
  278. )
  279. time_now = datetime.datetime.now()
  280. msg = "Difference in times is too large. Now: {0} Fake: {1}".format(
  281. time_now, cmp_time
  282. )
  283. self.assertTrue(result)
  284. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  285. self._test_hwclock_sync()
  286. @skip_if_not_root
  287. @slowTest
  288. def test_get_computer_desc(self):
  289. """
  290. Test getting the system hostname
  291. """
  292. res = self.run_function("system.get_computer_desc")
  293. hostname_cmd = salt.utils.path.which("hostnamectl")
  294. if hostname_cmd:
  295. desc = self.run_function("cmd.run", ["hostnamectl status --pretty"])
  296. self.assertEqual(res, desc)
  297. else:
  298. if not os.path.isfile("/etc/machine-info"):
  299. self.assertFalse(res)
  300. else:
  301. with salt.utils.files.fopen("/etc/machine-info", "r") as mach_info:
  302. data = mach_info.read()
  303. self.assertIn(res, data.decode("string_escape"))
  304. @destructiveTest
  305. @skip_if_not_root
  306. @slowTest
  307. def test_set_computer_desc(self):
  308. """
  309. Test setting the computer description
  310. """
  311. self._save_machine_info()
  312. desc = "test"
  313. ret = self.run_function("system.set_computer_desc", [desc])
  314. computer_desc = self.run_function("system.get_computer_desc")
  315. self.assertTrue(ret)
  316. self.assertIn(desc, computer_desc)
  317. @destructiveTest
  318. @skip_if_not_root
  319. @slowTest
  320. def test_set_computer_desc_multiline(self):
  321. """
  322. Test setting the computer description with a multiline string with tabs
  323. and double-quotes.
  324. """
  325. self._save_machine_info()
  326. desc = textwrap.dedent(
  327. '''\
  328. 'First Line
  329. \tSecond Line: 'single-quoted string'
  330. \t\tThird Line: "double-quoted string with unicode: питон"'''
  331. )
  332. ret = self.run_function("system.set_computer_desc", [desc])
  333. # self.run_function returns the serialized return, we need to convert
  334. # back to unicode to compare to desc. in the assertIn below.
  335. computer_desc = salt.utils.stringutils.to_unicode(
  336. self.run_function("system.get_computer_desc")
  337. )
  338. self.assertTrue(ret)
  339. self.assertIn(desc, computer_desc)
  340. @skip_if_not_root
  341. @slowTest
  342. def test_has_hwclock(self):
  343. """
  344. Verify platform has a settable hardware clock, if possible.
  345. """
  346. if self.run_function("grains.get", ["os_family"]) == "NILinuxRT":
  347. self.assertTrue(self.run_function("system._has_settable_hwclock"))
  348. self.assertTrue(self._hwclock_has_compare())
  349. @runs_on(kernel="Windows")
  350. @pytest.mark.windows_whitelisted
  351. class WinSystemModuleTest(ModuleCase):
  352. """
  353. Validate the date/time functions in the win_system module
  354. """
  355. @slowTest
  356. def test_get_computer_name(self):
  357. """
  358. Test getting the computer name
  359. """
  360. ret = self.run_function("system.get_computer_name")
  361. self.assertTrue(isinstance(ret, six.text_type))
  362. import socket
  363. name = socket.gethostname()
  364. self.assertEqual(name, ret)
  365. @destructiveTest
  366. @slowTest
  367. def test_set_computer_desc(self):
  368. """
  369. Test setting the computer description
  370. """
  371. current_desc = self.run_function("system.get_computer_desc")
  372. desc = "test description"
  373. try:
  374. set_desc = self.run_function("system.set_computer_desc", [desc])
  375. self.assertTrue(set_desc)
  376. get_desc = self.run_function("system.get_computer_desc")
  377. self.assertEqual(set_desc["Computer Description"], get_desc)
  378. finally:
  379. self.run_function("system.set_computer_desc", [current_desc])
  380. @skipIf(True, "WAR ROOM 7/29/2019, unit test?")
  381. def test_get_system_time(self):
  382. """
  383. Test getting the system time
  384. """
  385. time_now = datetime.datetime.now()
  386. # We have to do some datetime fu to account for the possibility that the
  387. # system time will be obtained just before the minutes increment
  388. ret = self.run_function("system.get_system_time", timeout=300)
  389. # Split out time and AM/PM
  390. sys_time, meridian = ret.split(" ")
  391. h, m, s = sys_time.split(":")
  392. # Get the current time
  393. # Use the system time to generate a datetime object for the system time
  394. # with the same date
  395. time_sys = time_now.replace(hour=int(h), minute=int(m), second=int(s))
  396. # get_system_time returns a non 24 hour time
  397. # Lets make it 24 hour time
  398. if meridian == "PM":
  399. time_sys = time_sys + datetime.timedelta(hours=12)
  400. diff = time_sys - time_now
  401. # Timeouts are set to 300 seconds. We're adding a 30 second buffer
  402. self.assertTrue(diff.seconds < 330)
  403. @slowTest
  404. def test_get_system_date(self):
  405. """
  406. Test getting system date
  407. """
  408. ret = self.run_function("system.get_system_date")
  409. date = datetime.datetime.now().strftime("%m/%d/%Y")
  410. self.assertEqual(date, ret)
  411. @runs_on(kernel="Windows")
  412. @pytest.mark.windows_whitelisted
  413. class WinSystemModuleTimeSettingTest(ModuleCase):
  414. """
  415. Validate the date/time functions in the win_system module
  416. .. note::
  417. In order for these tests to pass, time sync must be disabled for the VM
  418. in the hyper-visor
  419. """
  420. @classmethod
  421. def setUpClass(cls):
  422. if subprocess.call("sc config w32time start= disabled > nul", shell=True) != 0:
  423. log.error("Failed to disable w32time service")
  424. if subprocess.call("sc stop w32time", shell=True) != 0:
  425. log.error("Failed to stop w32time service")
  426. @classmethod
  427. def tearDownClass(cls):
  428. if subprocess.call("sc config w32time start= auto > nul", shell=True) != 0:
  429. log.error("Failed to enable w32time service")
  430. if subprocess.call("sc start w32time", shell=True) != 0:
  431. log.error("Failed to start w32time service")
  432. if subprocess.call("w32tm /resync", shell=True) != 0:
  433. log.error("Re-syncing time failed")
  434. @skipIf(True, "WAR ROOM 7/18/2019, unit test?")
  435. @destructiveTest
  436. @slowTest
  437. def test_set_system_time(self):
  438. """
  439. Test setting the system time
  440. """
  441. # If the test fails, the hypervisor may be maintaining time sync
  442. test_time = "10:55"
  443. self.run_function("system.set_system_time", [test_time + " AM"])
  444. time.sleep(0.25)
  445. new_time = datetime.datetime.now().strftime("%H:%M")
  446. self.assertEqual(new_time, test_time)
  447. @skipIf(True, "WAR ROOM 7/18/2019, unit test?")
  448. @destructiveTest
  449. @slowTest
  450. def test_set_system_date(self):
  451. """
  452. Test setting system date
  453. """
  454. # If the test fails, the hypervisor may be maintaining time sync
  455. self.run_function("system.set_system_date", ["03/25/2018"])
  456. new_date = datetime.datetime.now().strftime("%Y/%m/%d")
  457. self.assertEqual(new_date, "2018/03/25")