import datetime import logging import os import signal import subprocess import textwrap import time import pytest import salt.states.file import salt.utils.files import salt.utils.path import salt.utils.platform from import ModuleCase from import ( destructiveTest, flaky, runs_on, skip_if_not_root, slowTest, ) from import skipIf log = logging.getLogger(__name__) @runs_on(kernel="Linux") class SystemModuleTest(ModuleCase): """ Validate the date/time functions in the system module """ _hwclock_has_compare_ = None _systemd_timesyncd_available_ = None @classmethod def setUpClass(cls): cls.fmt_str = "%Y-%m-%d %H:%M:%S" cls._orig_time = None cls._machine_info = True @classmethod def tearDownClass(cls): for name in ("fmt_str", "_orig_time", "_machine_info"): delattr(cls, name) def setUp(self): super().setUp() if self._systemd_timesyncd_available_ is None: SystemModuleTest._systemd_timesyncd_available_ = self.run_function( "service.available", ["systemd-timesyncd"] ) if self._systemd_timesyncd_available_: self.run_function("service.stop", ["systemd-timesyncd"]) def tearDown(self): if self._orig_time is not None: self._restore_time() self._orig_time = None if self._machine_info is not True: self._restore_machine_info() self._machine_info = True if self._systemd_timesyncd_available_: self.run_function("service.start", ["systemd-timesyncd"]) def _save_time(self): self._orig_time = datetime.datetime.utcnow() def _set_time(self, new_time, offset=None): t = new_time.timetuple()[:6] t += (offset,) return self.run_function("system.set_system_date_time", t) def _restore_time(self): result = self._set_time(self._orig_time, "+0000") self.assertTrue(result, msg="Unable to restore time properly") def _same_times(self, t1, t2, seconds_diff=30): """ Helper function to check if two datetime objects are close enough to the same time. """ return abs(t1 - t2) < datetime.timedelta(seconds=seconds_diff) def _hwclock_has_compare(self): """ Some builds of hwclock don't include the `--compare` function needed to test hw/sw clock synchronization. Returns false on systems where it's not present so that we can skip the comparison portion of the test. """ if self._hwclock_has_compare_ is None: res = self.run_function("cmd.run_all", cmd="hwclock -h") SystemModuleTest._hwclock_has_compare_ = ( res["retcode"] == 0 and res["stdout"].find("--compare") > 0 ) return self._hwclock_has_compare_ def _test_hwclock_sync(self): """ Check that hw and sw clocks are sync'd. """ if not self.run_function("system.has_settable_hwclock"): return None if not self._hwclock_has_compare(): return None class CompareTimeout(BaseException): pass def _alrm_handler(sig, frame): log.warning("hwclock --compare failed to produce output after 3 seconds") raise CompareTimeout for _ in range(2): try: orig_handler = signal.signal(signal.SIGALRM, _alrm_handler) signal.alarm(3) rpipeFd, wpipeFd = os.pipe() log.debug("Comparing hwclock to sys clock") with os.fdopen(rpipeFd, "r") as rpipe: with os.fdopen(wpipeFd, "w") as wpipe: with salt.utils.files.fopen(os.devnull, "r") as nulFd: p = subprocess.Popen( args=["hwclock", "--compare"], stdin=nulFd, stdout=wpipeFd, stderr=subprocess.PIPE, ) p.communicate() # read header rpipe.readline() # read first time comparison timeCompStr = rpipe.readline() # stop p.terminate() timeComp = timeCompStr.split() hwTime = float(timeComp[0]) swTime = float(timeComp[1]) diff = abs(hwTime - swTime) self.assertTrue( diff <= 2.0, msg=("hwclock difference too big: " + str(timeCompStr)), ) break except CompareTimeout: p.terminate() finally: signal.alarm(0) signal.signal(signal.SIGALRM, orig_handler) else: log.error("Failed to check hwclock sync") def _save_machine_info(self): if os.path.isfile("/etc/machine-info"): with salt.utils.files.fopen("/etc/machine-info", "r") as mach_info: self._machine_info = else: self._machine_info = False def _restore_machine_info(self): if self._machine_info is not False: with salt.utils.files.fopen("/etc/machine-info", "w") as mach_info: mach_info.write(self._machine_info) else: self.run_function("file.remove", ["/etc/machine-info"]) @slowTest def test_get_system_date_time(self): """ Test we are able to get the correct time """ t1 = res = self.run_function("system.get_system_date_time") t2 = datetime.datetime.strptime(res, self.fmt_str) msg = "Difference in times is too large. Now: {} Fake: {}".format(t1, t2) self.assertTrue(self._same_times(t1, t2, seconds_diff=2), msg=msg) @slowTest def test_get_system_date_time_utc(self): """ Test we are able to get the correct time with utc """ t1 = datetime.datetime.utcnow() res = self.run_function("system.get_system_date_time", utc_offset="+0000") t2 = datetime.datetime.strptime(res, self.fmt_str) msg = "Difference in times is too large. Now: {} Fake: {}".format(t1, t2) self.assertTrue(self._same_times(t1, t2, seconds_diff=2), msg=msg) @destructiveTest @skip_if_not_root @slowTest def test_set_system_date_time(self): """ Test changing the system clock. We are only able to set it up to a resolution of a second so this test may appear to run in negative time. """ self._save_time() cmp_time = - datetime.timedelta(days=7) result = self._set_time(cmp_time) time_now = msg = "Difference in times is too large. Now: {} Fake: {}".format( time_now, cmp_time ) self.assertTrue(result and self._same_times(time_now, cmp_time), msg=msg) self._test_hwclock_sync() @destructiveTest @skip_if_not_root @slowTest def test_set_system_date_time_utc(self): """ Test changing the system clock. We are only able to set it up to a resolution of a second so this test may appear to run in negative time. """ self._save_time() cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7) result = self._set_time(cmp_time, offset="+0000") time_now = datetime.datetime.utcnow() msg = "Difference in times is too large. Now: {} Fake: {}".format( time_now, cmp_time ) self.assertTrue(result) self.assertTrue(self._same_times(time_now, cmp_time), msg=msg) self._test_hwclock_sync() @destructiveTest @skip_if_not_root @slowTest def test_set_system_date_time_utcoffset_east(self): """ Test changing the system clock. We are only able to set it up to a resolution of a second so this test may appear to run in negative time. """ self._save_time() cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7) # 25200 seconds = 7 hours time_to_set = cmp_time - datetime.timedelta(seconds=25200) result = self._set_time(time_to_set, offset="-0700") time_now = datetime.datetime.utcnow() msg = "Difference in times is too large. Now: {} Fake: {}".format( time_now, cmp_time ) self.assertTrue(result) self.assertTrue(self._same_times(time_now, cmp_time), msg=msg) self._test_hwclock_sync() @destructiveTest @skip_if_not_root @slowTest def test_set_system_date_time_utcoffset_west(self): """ Test changing the system clock. We are only able to set it up to a resolution of a second so this test may appear to run in negative time. """ self._save_time() cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7) # 7200 seconds = 2 hours time_to_set = cmp_time + datetime.timedelta(seconds=7200) result = self._set_time(time_to_set, offset="+0200") time_now = datetime.datetime.utcnow() msg = "Difference in times is too large. Now: {} Fake: {}".format( time_now, cmp_time ) self.assertTrue(result) self.assertTrue(self._same_times(time_now, cmp_time), msg=msg) self._test_hwclock_sync() @flaky @destructiveTest @skip_if_not_root @slowTest def test_set_system_time(self): """ Test setting the system time without adjusting the date. """ cmp_time =, minute=5, second=0) self._save_time() result = self.run_function("system.set_system_time", ["10:05:00"]) time_now = msg = "Difference in times is too large. Now: {} Fake: {}".format( time_now, cmp_time ) self.assertTrue(result) self.assertTrue(self._same_times(time_now, cmp_time), msg=msg) self._test_hwclock_sync() @destructiveTest @skip_if_not_root @slowTest def test_set_system_date(self): """ Test setting the system date without adjusting the time. """ cmp_time = - datetime.timedelta(days=7) self._save_time() result = self.run_function( "system.set_system_date", [cmp_time.strftime("%Y-%m-%d")] ) time_now = msg = "Difference in times is too large. Now: {} Fake: {}".format( time_now, cmp_time ) self.assertTrue(result) self.assertTrue(self._same_times(time_now, cmp_time), msg=msg) self._test_hwclock_sync() @skip_if_not_root @slowTest def test_get_computer_desc(self): """ Test getting the system hostname """ res = self.run_function("system.get_computer_desc") hostname_cmd = salt.utils.path.which("hostnamectl") if hostname_cmd: desc = self.run_function("", ["hostnamectl status --pretty"]) self.assertEqual(res, desc) else: if not os.path.isfile("/etc/machine-info"): self.assertFalse(res) else: with salt.utils.files.fopen("/etc/machine-info", "r") as mach_info: data = self.assertIn(res, data.decode("string_escape")) @destructiveTest @skip_if_not_root @slowTest def test_set_computer_desc(self): """ Test setting the computer description """ self._save_machine_info() desc = "test" ret = self.run_function("system.set_computer_desc", [desc]) computer_desc = self.run_function("system.get_computer_desc") self.assertTrue(ret) self.assertIn(desc, computer_desc) @destructiveTest @skip_if_not_root @slowTest def test_set_computer_desc_multiline(self): """ Test setting the computer description with a multiline string with tabs and double-quotes. """ self._save_machine_info() desc = textwrap.dedent( '''\ 'First Line \tSecond Line: 'single-quoted string' \t\tThird Line: "double-quoted string with unicode: питон"''' ) ret = self.run_function("system.set_computer_desc", [desc]) # self.run_function returns the serialized return, we need to convert # back to unicode to compare to desc. in the assertIn below. computer_desc = salt.utils.stringutils.to_unicode( self.run_function("system.get_computer_desc") ) self.assertTrue(ret) self.assertIn(desc, computer_desc) @skip_if_not_root @slowTest def test_has_hwclock(self): """ Verify platform has a settable hardware clock, if possible. """ if self.run_function("grains.get", ["os_family"]) == "NILinuxRT": self.assertTrue(self.run_function("system._has_settable_hwclock")) self.assertTrue(self._hwclock_has_compare()) @runs_on(kernel="Windows") @pytest.mark.windows_whitelisted class WinSystemModuleTest(ModuleCase): """ Validate the date/time functions in the win_system module """ @slowTest def test_get_computer_name(self): """ Test getting the computer name """ ret = self.run_function("system.get_computer_name") self.assertTrue(isinstance(ret, str)) import socket name = socket.gethostname() self.assertEqual(name, ret) @destructiveTest @slowTest def test_set_computer_desc(self): """ Test setting the computer description """ current_desc = self.run_function("system.get_computer_desc") desc = "test description" try: set_desc = self.run_function("system.set_computer_desc", [desc]) self.assertTrue(set_desc) get_desc = self.run_function("system.get_computer_desc") self.assertEqual(set_desc["Computer Description"], get_desc) finally: self.run_function("system.set_computer_desc", [current_desc]) @skipIf(True, "WAR ROOM 7/29/2019, unit test?") def test_get_system_time(self): """ Test getting the system time """ time_now = # We have to do some datetime fu to account for the possibility that the # system time will be obtained just before the minutes increment ret = self.run_function("system.get_system_time", timeout=300) # Split out time and AM/PM sys_time, meridian = ret.split(" ") h, m, s = sys_time.split(":") # Get the current time # Use the system time to generate a datetime object for the system time # with the same date time_sys = time_now.replace(hour=int(h), minute=int(m), second=int(s)) # get_system_time returns a non 24 hour time # Lets make it 24 hour time if meridian == "PM": time_sys = time_sys + datetime.timedelta(hours=12) diff = time_sys - time_now # Timeouts are set to 300 seconds. We're adding a 30 second buffer self.assertTrue(diff.seconds < 330) @slowTest def test_get_system_date(self): """ Test getting system date """ ret = self.run_function("system.get_system_date") date ="%m/%d/%Y") self.assertEqual(date, ret) @runs_on(kernel="Windows") @pytest.mark.windows_whitelisted class WinSystemModuleTimeSettingTest(ModuleCase): """ Validate the date/time functions in the win_system module .. note:: In order for these tests to pass, time sync must be disabled for the VM in the hyper-visor """ @classmethod def setUpClass(cls): if"sc config w32time start= disabled > nul", shell=True) != 0: log.error("Failed to disable w32time service") if"sc stop w32time", shell=True) != 0: log.error("Failed to stop w32time service") @classmethod def tearDownClass(cls): if"sc config w32time start= auto > nul", shell=True) != 0: log.error("Failed to enable w32time service") if"sc start w32time", shell=True) != 0: log.error("Failed to start w32time service") if"w32tm /resync", shell=True) != 0: log.error("Re-syncing time failed") @skipIf(True, "WAR ROOM 7/18/2019, unit test?") @destructiveTest @slowTest def test_set_system_time(self): """ Test setting the system time """ # If the test fails, the hypervisor may be maintaining time sync test_time = "10:55" self.run_function("system.set_system_time", [test_time + " AM"]) time.sleep(0.25) new_time ="%H:%M") self.assertEqual(new_time, test_time) @skipIf(True, "WAR ROOM 7/18/2019, unit test?") @destructiveTest @slowTest def test_set_system_date(self): """ Test setting system date """ # If the test fails, the hypervisor may be maintaining time sync self.run_function("system.set_system_date", ["03/25/2018"]) new_date ="%Y/%m/%d") self.assertEqual(new_date, "2018/03/25")