test_system.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. import datetime
  2. import logging
  3. import os
  4. import signal
  5. import subprocess
  6. import textwrap
  7. import time
  8. import pytest
  9. import salt.states.file
  10. import salt.utils.files
  11. import salt.utils.path
  12. import salt.utils.platform
  13. from tests.support.case import ModuleCase
  14. from tests.support.helpers import (
  15. destructiveTest,
  16. flaky,
  17. runs_on,
  18. skip_if_not_root,
  19. slowTest,
  20. )
  21. from tests.support.unit import skipIf
  22. log = logging.getLogger(__name__)
  23. @runs_on(kernel="Linux")
  24. class SystemModuleTest(ModuleCase):
  25. """
  26. Validate the date/time functions in the system module
  27. """
  28. _hwclock_has_compare_ = None
  29. _systemd_timesyncd_available_ = None
  30. @classmethod
  31. def setUpClass(cls):
  32. cls.fmt_str = "%Y-%m-%d %H:%M:%S"
  33. cls._orig_time = None
  34. cls._machine_info = True
  35. @classmethod
  36. def tearDownClass(cls):
  37. for name in ("fmt_str", "_orig_time", "_machine_info"):
  38. delattr(cls, name)
  39. def setUp(self):
  40. super().setUp()
  41. if self._systemd_timesyncd_available_ is None:
  42. SystemModuleTest._systemd_timesyncd_available_ = self.run_function(
  43. "service.available", ["systemd-timesyncd"]
  44. )
  45. if self._systemd_timesyncd_available_:
  46. self.run_function("service.stop", ["systemd-timesyncd"])
  47. def tearDown(self):
  48. if self._orig_time is not None:
  49. self._restore_time()
  50. self._orig_time = None
  51. if self._machine_info is not True:
  52. self._restore_machine_info()
  53. self._machine_info = True
  54. if self._systemd_timesyncd_available_:
  55. self.run_function("service.start", ["systemd-timesyncd"])
  56. def _save_time(self):
  57. self._orig_time = datetime.datetime.utcnow()
  58. def _set_time(self, new_time, offset=None):
  59. t = new_time.timetuple()[:6]
  60. t += (offset,)
  61. return self.run_function("system.set_system_date_time", t)
  62. def _restore_time(self):
  63. result = self._set_time(self._orig_time, "+0000")
  64. self.assertTrue(result, msg="Unable to restore time properly")
  65. def _same_times(self, t1, t2, seconds_diff=30):
  66. """
  67. Helper function to check if two datetime objects
  68. are close enough to the same time.
  69. """
  70. return abs(t1 - t2) < datetime.timedelta(seconds=seconds_diff)
  71. def _hwclock_has_compare(self):
  72. """
  73. Some builds of hwclock don't include the `--compare` function
  74. needed to test hw/sw clock synchronization. Returns false on
  75. systems where it's not present so that we can skip the
  76. comparison portion of the test.
  77. """
  78. if self._hwclock_has_compare_ is None:
  79. res = self.run_function("cmd.run_all", cmd="hwclock -h")
  80. SystemModuleTest._hwclock_has_compare_ = (
  81. res["retcode"] == 0 and res["stdout"].find("--compare") > 0
  82. )
  83. return self._hwclock_has_compare_
  84. def _test_hwclock_sync(self):
  85. """
  86. Check that hw and sw clocks are sync'd.
  87. """
  88. if not self.run_function("system.has_settable_hwclock"):
  89. return None
  90. if not self._hwclock_has_compare():
  91. return None
  92. class CompareTimeout(BaseException):
  93. pass
  94. def _alrm_handler(sig, frame):
  95. log.warning("hwclock --compare failed to produce output after 3 seconds")
  96. raise CompareTimeout
  97. for _ in range(2):
  98. try:
  99. orig_handler = signal.signal(signal.SIGALRM, _alrm_handler)
  100. signal.alarm(3)
  101. rpipeFd, wpipeFd = os.pipe()
  102. log.debug("Comparing hwclock to sys clock")
  103. with os.fdopen(rpipeFd, "r") as rpipe:
  104. with os.fdopen(wpipeFd, "w") as wpipe:
  105. with salt.utils.files.fopen(os.devnull, "r") as nulFd:
  106. p = subprocess.Popen(
  107. args=["hwclock", "--compare"],
  108. stdin=nulFd,
  109. stdout=wpipeFd,
  110. stderr=subprocess.PIPE,
  111. )
  112. p.communicate()
  113. # read header
  114. rpipe.readline()
  115. # read first time comparison
  116. timeCompStr = rpipe.readline()
  117. # stop
  118. p.terminate()
  119. timeComp = timeCompStr.split()
  120. hwTime = float(timeComp[0])
  121. swTime = float(timeComp[1])
  122. diff = abs(hwTime - swTime)
  123. self.assertTrue(
  124. diff <= 2.0,
  125. msg=("hwclock difference too big: " + str(timeCompStr)),
  126. )
  127. break
  128. except CompareTimeout:
  129. p.terminate()
  130. finally:
  131. signal.alarm(0)
  132. signal.signal(signal.SIGALRM, orig_handler)
  133. else:
  134. log.error("Failed to check hwclock sync")
  135. def _save_machine_info(self):
  136. if os.path.isfile("/etc/machine-info"):
  137. with salt.utils.files.fopen("/etc/machine-info", "r") as mach_info:
  138. self._machine_info = mach_info.read()
  139. else:
  140. self._machine_info = False
  141. def _restore_machine_info(self):
  142. if self._machine_info is not False:
  143. with salt.utils.files.fopen("/etc/machine-info", "w") as mach_info:
  144. mach_info.write(self._machine_info)
  145. else:
  146. self.run_function("file.remove", ["/etc/machine-info"])
  147. @slowTest
  148. def test_get_system_date_time(self):
  149. """
  150. Test we are able to get the correct time
  151. """
  152. t1 = datetime.datetime.now()
  153. res = self.run_function("system.get_system_date_time")
  154. t2 = datetime.datetime.strptime(res, self.fmt_str)
  155. msg = "Difference in times is too large. Now: {} Fake: {}".format(t1, t2)
  156. self.assertTrue(self._same_times(t1, t2, seconds_diff=2), msg=msg)
  157. @slowTest
  158. def test_get_system_date_time_utc(self):
  159. """
  160. Test we are able to get the correct time with utc
  161. """
  162. t1 = datetime.datetime.utcnow()
  163. res = self.run_function("system.get_system_date_time", utc_offset="+0000")
  164. t2 = datetime.datetime.strptime(res, self.fmt_str)
  165. msg = "Difference in times is too large. Now: {} Fake: {}".format(t1, t2)
  166. self.assertTrue(self._same_times(t1, t2, seconds_diff=2), msg=msg)
  167. @destructiveTest
  168. @skip_if_not_root
  169. @slowTest
  170. def test_set_system_date_time(self):
  171. """
  172. Test changing the system clock. We are only able to set it up to a
  173. resolution of a second so this test may appear to run in negative time.
  174. """
  175. self._save_time()
  176. cmp_time = datetime.datetime.now() - datetime.timedelta(days=7)
  177. result = self._set_time(cmp_time)
  178. time_now = datetime.datetime.now()
  179. msg = "Difference in times is too large. Now: {} Fake: {}".format(
  180. time_now, cmp_time
  181. )
  182. self.assertTrue(result and self._same_times(time_now, cmp_time), msg=msg)
  183. self._test_hwclock_sync()
  184. @destructiveTest
  185. @skip_if_not_root
  186. @slowTest
  187. def test_set_system_date_time_utc(self):
  188. """
  189. Test changing the system clock. We are only able to set it up to a
  190. resolution of a second so this test may appear to run in negative time.
  191. """
  192. self._save_time()
  193. cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7)
  194. result = self._set_time(cmp_time, offset="+0000")
  195. time_now = datetime.datetime.utcnow()
  196. msg = "Difference in times is too large. Now: {} Fake: {}".format(
  197. time_now, cmp_time
  198. )
  199. self.assertTrue(result)
  200. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  201. self._test_hwclock_sync()
  202. @destructiveTest
  203. @skip_if_not_root
  204. @slowTest
  205. def test_set_system_date_time_utcoffset_east(self):
  206. """
  207. Test changing the system clock. We are only able to set it up to a
  208. resolution of a second so this test may appear to run in negative time.
  209. """
  210. self._save_time()
  211. cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7)
  212. # 25200 seconds = 7 hours
  213. time_to_set = cmp_time - datetime.timedelta(seconds=25200)
  214. result = self._set_time(time_to_set, offset="-0700")
  215. time_now = datetime.datetime.utcnow()
  216. msg = "Difference in times is too large. Now: {} Fake: {}".format(
  217. time_now, cmp_time
  218. )
  219. self.assertTrue(result)
  220. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  221. self._test_hwclock_sync()
  222. @destructiveTest
  223. @skip_if_not_root
  224. @slowTest
  225. def test_set_system_date_time_utcoffset_west(self):
  226. """
  227. Test changing the system clock. We are only able to set it up to a
  228. resolution of a second so this test may appear to run in negative time.
  229. """
  230. self._save_time()
  231. cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7)
  232. # 7200 seconds = 2 hours
  233. time_to_set = cmp_time + datetime.timedelta(seconds=7200)
  234. result = self._set_time(time_to_set, offset="+0200")
  235. time_now = datetime.datetime.utcnow()
  236. msg = "Difference in times is too large. Now: {} Fake: {}".format(
  237. time_now, cmp_time
  238. )
  239. self.assertTrue(result)
  240. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  241. self._test_hwclock_sync()
  242. @flaky
  243. @destructiveTest
  244. @skip_if_not_root
  245. @slowTest
  246. def test_set_system_time(self):
  247. """
  248. Test setting the system time without adjusting the date.
  249. """
  250. cmp_time = datetime.datetime.now().replace(hour=10, minute=5, second=0)
  251. self._save_time()
  252. result = self.run_function("system.set_system_time", ["10:05:00"])
  253. time_now = datetime.datetime.now()
  254. msg = "Difference in times is too large. Now: {} Fake: {}".format(
  255. time_now, cmp_time
  256. )
  257. self.assertTrue(result)
  258. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  259. self._test_hwclock_sync()
  260. @destructiveTest
  261. @skip_if_not_root
  262. @slowTest
  263. def test_set_system_date(self):
  264. """
  265. Test setting the system date without adjusting the time.
  266. """
  267. cmp_time = datetime.datetime.now() - datetime.timedelta(days=7)
  268. self._save_time()
  269. result = self.run_function(
  270. "system.set_system_date", [cmp_time.strftime("%Y-%m-%d")]
  271. )
  272. time_now = datetime.datetime.now()
  273. msg = "Difference in times is too large. Now: {} Fake: {}".format(
  274. time_now, cmp_time
  275. )
  276. self.assertTrue(result)
  277. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  278. self._test_hwclock_sync()
  279. @skip_if_not_root
  280. @slowTest
  281. def test_get_computer_desc(self):
  282. """
  283. Test getting the system hostname
  284. """
  285. res = self.run_function("system.get_computer_desc")
  286. hostname_cmd = salt.utils.path.which("hostnamectl")
  287. if hostname_cmd:
  288. desc = self.run_function("cmd.run", ["hostnamectl status --pretty"])
  289. self.assertEqual(res, desc)
  290. else:
  291. if not os.path.isfile("/etc/machine-info"):
  292. self.assertFalse(res)
  293. else:
  294. with salt.utils.files.fopen("/etc/machine-info", "r") as mach_info:
  295. data = mach_info.read()
  296. self.assertIn(res, data.decode("string_escape"))
  297. @destructiveTest
  298. @skip_if_not_root
  299. @slowTest
  300. def test_set_computer_desc(self):
  301. """
  302. Test setting the computer description
  303. """
  304. self._save_machine_info()
  305. desc = "test"
  306. ret = self.run_function("system.set_computer_desc", [desc])
  307. computer_desc = self.run_function("system.get_computer_desc")
  308. self.assertTrue(ret)
  309. self.assertIn(desc, computer_desc)
  310. @destructiveTest
  311. @skip_if_not_root
  312. @slowTest
  313. def test_set_computer_desc_multiline(self):
  314. """
  315. Test setting the computer description with a multiline string with tabs
  316. and double-quotes.
  317. """
  318. self._save_machine_info()
  319. desc = textwrap.dedent(
  320. '''\
  321. 'First Line
  322. \tSecond Line: 'single-quoted string'
  323. \t\tThird Line: "double-quoted string with unicode: питон"'''
  324. )
  325. ret = self.run_function("system.set_computer_desc", [desc])
  326. # self.run_function returns the serialized return, we need to convert
  327. # back to unicode to compare to desc. in the assertIn below.
  328. computer_desc = salt.utils.stringutils.to_unicode(
  329. self.run_function("system.get_computer_desc")
  330. )
  331. self.assertTrue(ret)
  332. self.assertIn(desc, computer_desc)
  333. @skip_if_not_root
  334. @slowTest
  335. def test_has_hwclock(self):
  336. """
  337. Verify platform has a settable hardware clock, if possible.
  338. """
  339. if self.run_function("grains.get", ["os_family"]) == "NILinuxRT":
  340. self.assertTrue(self.run_function("system._has_settable_hwclock"))
  341. self.assertTrue(self._hwclock_has_compare())
  342. @runs_on(kernel="Windows")
  343. @pytest.mark.windows_whitelisted
  344. class WinSystemModuleTest(ModuleCase):
  345. """
  346. Validate the date/time functions in the win_system module
  347. """
  348. @slowTest
  349. def test_get_computer_name(self):
  350. """
  351. Test getting the computer name
  352. """
  353. ret = self.run_function("system.get_computer_name")
  354. self.assertTrue(isinstance(ret, str))
  355. import socket
  356. name = socket.gethostname()
  357. self.assertEqual(name, ret)
  358. @destructiveTest
  359. @slowTest
  360. def test_set_computer_desc(self):
  361. """
  362. Test setting the computer description
  363. """
  364. current_desc = self.run_function("system.get_computer_desc")
  365. desc = "test description"
  366. try:
  367. set_desc = self.run_function("system.set_computer_desc", [desc])
  368. self.assertTrue(set_desc)
  369. get_desc = self.run_function("system.get_computer_desc")
  370. self.assertEqual(set_desc["Computer Description"], get_desc)
  371. finally:
  372. self.run_function("system.set_computer_desc", [current_desc])
  373. @skipIf(True, "WAR ROOM 7/29/2019, unit test?")
  374. def test_get_system_time(self):
  375. """
  376. Test getting the system time
  377. """
  378. time_now = datetime.datetime.now()
  379. # We have to do some datetime fu to account for the possibility that the
  380. # system time will be obtained just before the minutes increment
  381. ret = self.run_function("system.get_system_time", timeout=300)
  382. # Split out time and AM/PM
  383. sys_time, meridian = ret.split(" ")
  384. h, m, s = sys_time.split(":")
  385. # Get the current time
  386. # Use the system time to generate a datetime object for the system time
  387. # with the same date
  388. time_sys = time_now.replace(hour=int(h), minute=int(m), second=int(s))
  389. # get_system_time returns a non 24 hour time
  390. # Lets make it 24 hour time
  391. if meridian == "PM":
  392. time_sys = time_sys + datetime.timedelta(hours=12)
  393. diff = time_sys - time_now
  394. # Timeouts are set to 300 seconds. We're adding a 30 second buffer
  395. self.assertTrue(diff.seconds < 330)
  396. @slowTest
  397. def test_get_system_date(self):
  398. """
  399. Test getting system date
  400. """
  401. ret = self.run_function("system.get_system_date")
  402. date = datetime.datetime.now().strftime("%m/%d/%Y")
  403. self.assertEqual(date, ret)
  404. @runs_on(kernel="Windows")
  405. @pytest.mark.windows_whitelisted
  406. class WinSystemModuleTimeSettingTest(ModuleCase):
  407. """
  408. Validate the date/time functions in the win_system module
  409. .. note::
  410. In order for these tests to pass, time sync must be disabled for the VM
  411. in the hyper-visor
  412. """
  413. @classmethod
  414. def setUpClass(cls):
  415. if subprocess.call("sc config w32time start= disabled > nul", shell=True) != 0:
  416. log.error("Failed to disable w32time service")
  417. if subprocess.call("sc stop w32time", shell=True) != 0:
  418. log.error("Failed to stop w32time service")
  419. @classmethod
  420. def tearDownClass(cls):
  421. if subprocess.call("sc config w32time start= auto > nul", shell=True) != 0:
  422. log.error("Failed to enable w32time service")
  423. if subprocess.call("sc start w32time", shell=True) != 0:
  424. log.error("Failed to start w32time service")
  425. if subprocess.call("w32tm /resync", shell=True) != 0:
  426. log.error("Re-syncing time failed")
  427. @skipIf(True, "WAR ROOM 7/18/2019, unit test?")
  428. @destructiveTest
  429. @slowTest
  430. def test_set_system_time(self):
  431. """
  432. Test setting the system time
  433. """
  434. # If the test fails, the hypervisor may be maintaining time sync
  435. test_time = "10:55"
  436. self.run_function("system.set_system_time", [test_time + " AM"])
  437. time.sleep(0.25)
  438. new_time = datetime.datetime.now().strftime("%H:%M")
  439. self.assertEqual(new_time, test_time)
  440. @skipIf(True, "WAR ROOM 7/18/2019, unit test?")
  441. @destructiveTest
  442. @slowTest
  443. def test_set_system_date(self):
  444. """
  445. Test setting system date
  446. """
  447. # If the test fails, the hypervisor may be maintaining time sync
  448. self.run_function("system.set_system_date", ["03/25/2018"])
  449. new_date = datetime.datetime.now().strftime("%Y/%m/%d")
  450. self.assertEqual(new_date, "2018/03/25")