123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492 |
- # -*- coding: utf-8 -*-
- # Import Python libs
- from __future__ import absolute_import, unicode_literals, print_function
- import datetime
- import logging
- import os
- import signal
- import subprocess
- import time
- import textwrap
- import pytest
- # Import Salt Testing libs
- from tests.support.case import ModuleCase
- from tests.support.unit import skipIf
- from tests.support.helpers import flaky
- # Import Salt libs
- import salt.utils.files
- import salt.utils.path
- import salt.utils.platform
- import salt.states.file
- from salt.ext.six.moves import range
- from salt.ext import six
- log = logging.getLogger(__name__)
- @skipIf(not salt.utils.platform.is_linux(),
- 'These tests can only be run on linux')
- @pytest.mark.windows_whitelisted
- class SystemModuleTest(ModuleCase):
- '''
- Validate the date/time functions in the system module
- '''
- fmt_str = "%Y-%m-%d %H:%M:%S"
- def __init__(self, arg):
- super(self.__class__, self).__init__(arg)
- self._orig_time = None
- self._machine_info = True
- def setUp(self):
- super(SystemModuleTest, self).setUp()
- os_grain = self.run_function('grains.item', ['kernel'])
- if os_grain['kernel'] not in 'Linux':
- self.skipTest(
- 'Test not applicable to \'{kernel}\' kernel'.format(
- **os_grain
- )
- )
- if self.run_function('service.available', ['systemd-timesyncd']):
- self.run_function('service.stop', ['systemd-timesyncd'])
- def tearDown(self):
- if self._orig_time is not None:
- self._restore_time()
- self._orig_time = None
- if self._machine_info is not True:
- self._restore_machine_info()
- self._machine_info = True
- if self.run_function('service.available', ['systemd-timesyncd']):
- self.run_function('service.start', ['systemd-timesyncd'])
- def _save_time(self):
- self._orig_time = datetime.datetime.utcnow()
- def _set_time(self, new_time, offset=None):
- t = new_time.timetuple()[:6]
- t += (offset,)
- return self.run_function('system.set_system_date_time', t)
- def _restore_time(self):
- result = self._set_time(self._orig_time, "+0000")
- self.assertTrue(result, msg="Unable to restore time properly")
- def _same_times(self, t1, t2, seconds_diff=30):
- '''
- Helper function to check if two datetime objects
- are close enough to the same time.
- '''
- return abs(t1 - t2) < datetime.timedelta(seconds=seconds_diff)
- def _hwclock_has_compare(self):
- '''
- Some builds of hwclock don't include the `--compare` function
- needed to test hw/sw clock synchronization. Returns false on
- systems where it's not present so that we can skip the
- comparison portion of the test.
- '''
- res = self.run_function('cmd.run_all', cmd='hwclock -h')
- return res['retcode'] == 0 and res['stdout'].find('--compare') > 0
- def _test_hwclock_sync(self):
- '''
- Check that hw and sw clocks are sync'd.
- '''
- if not self.run_function('system.has_settable_hwclock'):
- return None
- if not self._hwclock_has_compare():
- return None
- class CompareTimeout(BaseException):
- pass
- def _alrm_handler(sig, frame):
- log.warning('hwclock --compare failed to produce output after 3 seconds')
- raise CompareTimeout
- for _ in range(2):
- try:
- orig_handler = signal.signal(signal.SIGALRM, _alrm_handler)
- signal.alarm(3)
- rpipeFd, wpipeFd = os.pipe()
- log.debug('Comparing hwclock to sys clock')
- with os.fdopen(rpipeFd, "r") as rpipe:
- with os.fdopen(wpipeFd, "w") as wpipe:
- with salt.utils.files.fopen(os.devnull, "r") as nulFd:
- p = subprocess.Popen(args=['hwclock', '--compare'],
- stdin=nulFd, stdout=wpipeFd, stderr=subprocess.PIPE)
- p.communicate()
- # read header
- rpipe.readline()
- # read first time comparison
- timeCompStr = rpipe.readline()
- # stop
- p.terminate()
- timeComp = timeCompStr.split()
- hwTime = float(timeComp[0])
- swTime = float(timeComp[1])
- diff = abs(hwTime - swTime)
- self.assertTrue(diff <= 2.0,
- msg=("hwclock difference too big: " + six.text_type(timeCompStr)))
- break
- except CompareTimeout:
- p.terminate()
- finally:
- signal.alarm(0)
- signal.signal(signal.SIGALRM, orig_handler)
- else:
- log.error('Failed to check hwclock sync')
- def _save_machine_info(self):
- if os.path.isfile('/etc/machine-info'):
- with salt.utils.files.fopen('/etc/machine-info', 'r') as mach_info:
- self._machine_info = mach_info.read()
- else:
- self._machine_info = False
- def _restore_machine_info(self):
- if self._machine_info is not False:
- with salt.utils.files.fopen('/etc/machine-info', 'w') as mach_info:
- mach_info.write(self._machine_info)
- else:
- self.run_function('file.remove', ['/etc/machine-info'])
- def test_get_system_date_time(self):
- '''
- Test we are able to get the correct time
- '''
- t1 = datetime.datetime.now()
- res = self.run_function('system.get_system_date_time')
- t2 = datetime.datetime.strptime(res, self.fmt_str)
- msg = ("Difference in times is too large. Now: {0} Fake: {1}"
- .format(t1, t2))
- self.assertTrue(self._same_times(t1, t2, seconds_diff=2), msg=msg)
- def test_get_system_date_time_utc(self):
- '''
- Test we are able to get the correct time with utc
- '''
- t1 = datetime.datetime.utcnow()
- res = self.run_function('system.get_system_date_time',
- utc_offset="+0000")
- t2 = datetime.datetime.strptime(res, self.fmt_str)
- msg = ("Difference in times is too large. Now: {0} Fake: {1}"
- .format(t1, t2))
- self.assertTrue(self._same_times(t1, t2, seconds_diff=2), msg=msg)
- @pytest.mark.destructive_test
- @pytest.mark.skip_if_not_root
- def test_set_system_date_time(self):
- '''
- Test changing the system clock. We are only able to set it up to a
- resolution of a second so this test may appear to run in negative time.
- '''
- self._save_time()
- cmp_time = datetime.datetime.now() - datetime.timedelta(days=7)
- result = self._set_time(cmp_time)
- time_now = datetime.datetime.now()
- msg = ("Difference in times is too large. Now: {0} Fake: {1}"
- .format(time_now, cmp_time))
- self.assertTrue(result and self._same_times(time_now, cmp_time),
- msg=msg)
- self._test_hwclock_sync()
- @pytest.mark.destructive_test
- @pytest.mark.skip_if_not_root
- def test_set_system_date_time_utc(self):
- '''
- Test changing the system clock. We are only able to set it up to a
- resolution of a second so this test may appear to run in negative time.
- '''
- self._save_time()
- cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7)
- result = self._set_time(cmp_time, offset="+0000")
- time_now = datetime.datetime.utcnow()
- msg = ("Difference in times is too large. Now: {0} Fake: {1}"
- .format(time_now, cmp_time))
- self.assertTrue(result)
- self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
- self._test_hwclock_sync()
- @pytest.mark.destructive_test
- @pytest.mark.skip_if_not_root
- def test_set_system_date_time_utcoffset_east(self):
- '''
- Test changing the system clock. We are only able to set it up to a
- resolution of a second so this test may appear to run in negative time.
- '''
- self._save_time()
- cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7)
- # 25200 seconds = 7 hours
- time_to_set = cmp_time - datetime.timedelta(seconds=25200)
- result = self._set_time(time_to_set, offset='-0700')
- time_now = datetime.datetime.utcnow()
- msg = ("Difference in times is too large. Now: {0} Fake: {1}"
- .format(time_now, cmp_time))
- self.assertTrue(result)
- self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
- self._test_hwclock_sync()
- @pytest.mark.destructive_test
- @pytest.mark.skip_if_not_root
- def test_set_system_date_time_utcoffset_west(self):
- '''
- Test changing the system clock. We are only able to set it up to a
- resolution of a second so this test may appear to run in negative time.
- '''
- self._save_time()
- cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7)
- # 7200 seconds = 2 hours
- time_to_set = cmp_time + datetime.timedelta(seconds=7200)
- result = self._set_time(time_to_set, offset='+0200')
- time_now = datetime.datetime.utcnow()
- msg = ("Difference in times is too large. Now: {0} Fake: {1}"
- .format(time_now, cmp_time))
- self.assertTrue(result)
- self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
- self._test_hwclock_sync()
- @flaky
- @pytest.mark.destructive_test
- @pytest.mark.skip_if_not_root
- def test_set_system_time(self):
- '''
- Test setting the system time without adjusting the date.
- '''
- cmp_time = datetime.datetime.now().replace(hour=10, minute=5, second=0)
- self._save_time()
- result = self.run_function('system.set_system_time', ["10:05:00"])
- time_now = datetime.datetime.now()
- msg = ("Difference in times is too large. Now: {0} Fake: {1}"
- .format(time_now, cmp_time))
- self.assertTrue(result)
- self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
- self._test_hwclock_sync()
- @pytest.mark.destructive_test
- @pytest.mark.skip_if_not_root
- def test_set_system_date(self):
- '''
- Test setting the system date without adjusting the time.
- '''
- cmp_time = datetime.datetime.now() - datetime.timedelta(days=7)
- self._save_time()
- result = self.run_function(
- 'system.set_system_date',
- [cmp_time.strftime('%Y-%m-%d')]
- )
- time_now = datetime.datetime.now()
- msg = ("Difference in times is too large. Now: {0} Fake: {1}"
- .format(time_now, cmp_time))
- self.assertTrue(result)
- self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
- self._test_hwclock_sync()
- @pytest.mark.skip_if_not_root
- def test_get_computer_desc(self):
- '''
- Test getting the system hostname
- '''
- res = self.run_function('system.get_computer_desc')
- hostname_cmd = salt.utils.path.which('hostnamectl')
- if hostname_cmd:
- desc = self.run_function('cmd.run', ["hostnamectl status --pretty"])
- self.assertEqual(res, desc)
- else:
- if not os.path.isfile('/etc/machine-info'):
- self.assertFalse(res)
- else:
- with salt.utils.files.fopen('/etc/machine-info', 'r') as mach_info:
- data = mach_info.read()
- self.assertIn(res, data.decode('string_escape'))
- @pytest.mark.destructive_test
- @pytest.mark.skip_if_not_root
- def test_set_computer_desc(self):
- '''
- Test setting the computer description
- '''
- self._save_machine_info()
- desc = "test"
- ret = self.run_function('system.set_computer_desc', [desc])
- computer_desc = self.run_function('system.get_computer_desc')
- self.assertTrue(ret)
- self.assertIn(desc, computer_desc)
- @pytest.mark.destructive_test
- @pytest.mark.skip_if_not_root
- def test_set_computer_desc_multiline(self):
- '''
- Test setting the computer description with a multiline string with tabs
- and double-quotes.
- '''
- self._save_machine_info()
- desc = textwrap.dedent('''\
- 'First Line
- \tSecond Line: 'single-quoted string'
- \t\tThird Line: "double-quoted string with unicode: питон"''')
- ret = self.run_function('system.set_computer_desc', [desc])
- # self.run_function returns the serialized return, we need to convert
- # back to unicode to compare to desc. in the assertIn below.
- computer_desc = salt.utils.stringutils.to_unicode(
- self.run_function('system.get_computer_desc')
- )
- self.assertTrue(ret)
- self.assertIn(desc, computer_desc)
- @pytest.mark.skip_if_not_root
- def test_has_hwclock(self):
- '''
- Verify platform has a settable hardware clock, if possible.
- '''
- if self.run_function('grains.get', ['os_family']) == 'NILinuxRT':
- self.assertTrue(self.run_function('system._has_settable_hwclock'))
- self.assertTrue(self._hwclock_has_compare())
- @skipIf(not salt.utils.platform.is_windows(),
- 'These tests can only be run on windows')
- @pytest.mark.windows_whitelisted
- class WinSystemModuleTest(ModuleCase):
- '''
- Validate the date/time functions in the win_system module
- '''
- @classmethod
- def setUpClass(cls):
- if subprocess.call('net stop w32time', shell=True) != 0:
- log.error('Failed to stop w32time service')
- @classmethod
- def tearDownClass(cls):
- if subprocess.call('net start w32time', shell=True) != 0:
- log.error('Failed to start w32time service')
- if subprocess.call('w32tm /resync', shell=True) != 0:
- log.error("Re-syncing time failed")
- def test_get_computer_name(self):
- '''
- Test getting the computer name
- '''
- ret = self.run_function('system.get_computer_name')
- self.assertTrue(isinstance(ret, six.text_type))
- import socket
- name = socket.gethostname()
- self.assertEqual(name, ret)
- @pytest.mark.destructive_test
- def test_set_computer_desc(self):
- '''
- Test setting the computer description
- '''
- current_desc = self.run_function('system.get_computer_desc')
- desc = 'test description'
- try:
- set_desc = self.run_function('system.set_computer_desc', [desc])
- self.assertTrue(set_desc)
- get_desc = self.run_function('system.get_computer_desc')
- self.assertEqual(set_desc['Computer Description'], get_desc)
- finally:
- self.run_function('system.set_computer_desc', [current_desc])
- @skipIf(True, 'WAR ROOM 7/29/2019, unit test?')
- def test_get_system_time(self):
- '''
- Test getting the system time
- '''
- time_now = datetime.datetime.now()
- # We have to do some datetime fu to account for the possibility that the
- # system time will be obtained just before the minutes increment
- ret = self.run_function('system.get_system_time', timeout=300)
- # Split out time and AM/PM
- sys_time, meridian = ret.split(' ')
- h, m, s = sys_time.split(':')
- # Get the current time
- # Use the system time to generate a datetime object for the system time
- # with the same date
- time_sys = time_now.replace(hour=int(h), minute=int(m), second=int(s))
- # get_system_time returns a non 24 hour time
- # Lets make it 24 hour time
- if meridian == 'PM':
- time_sys = time_sys + datetime.timedelta(hours=12)
- diff = time_sys - time_now
- # Timeouts are set to 300 seconds. We're adding a 30 second buffer
- self.assertTrue(diff.seconds < 330)
- @skipIf(True, 'WAR ROOM 7/18/2019, unit test?')
- @pytest.mark.destructive_test
- def test_set_system_time(self):
- '''
- Test setting the system time
- .. note::
- In order for this test to pass, time sync must be disabled for the
- VM in the hypervisor
- '''
- self.run_function('service.stop', ['w32time'])
- try:
- current_time = datetime.datetime.now().strftime('%H:%M:%S')
- test_time = '10:55'
- self.run_function('system.set_system_time', [test_time + ' AM'])
- time.sleep(.25)
- new_time = datetime.datetime.now().strftime('%H:%M')
- self.assertEqual(new_time, test_time)
- finally:
- self.run_function('system.set_system_time', [current_time])
- self.run_function('service.start', ['w32time'])
- def test_get_system_date(self):
- '''
- Test getting system date
- '''
- ret = self.run_function('system.get_system_date')
- date = datetime.datetime.now().strftime("%m/%d/%Y")
- self.assertEqual(date, ret)
- @skipIf(True, 'WAR ROOM 7/18/2019, unit test?')
- @pytest.mark.destructive_test
- def test_set_system_date(self):
- '''
- Test setting system date
- .. note::
- In order for this test to pass, time sync must be disabled for the
- VM in the hypervisor
- '''
- self.run_function('service.stop', ['w32time'])
- try:
- # If the test still fails, the hypervisor may be maintaining time
- # sync
- current_date = datetime.datetime.now().strftime('%Y/%m/%d')
- self.run_function('system.set_system_date', ['03/25/2018'])
- new_date = datetime.datetime.now().strftime('%Y/%m/%d')
- self.assertEqual(new_date, '2018/03/25')
- finally:
- self.run_function('system.set_system_date', [current_date])
- self.run_function('service.start', ['w32time'])
|