1
0

test_system.py 16 KB


  1. # -*- coding: utf-8 -*-
  2. # Import Python libs
  3. from __future__ import absolute_import, unicode_literals, print_function
  4. import datetime
  5. import logging
  6. import os
  7. import signal
  8. import subprocess
  9. import textwrap
  10. # Import Salt Testing libs
  11. from tests.support.case import ModuleCase
  12. from tests.support.unit import skipIf
  13. from tests.support.helpers import destructiveTest, skip_if_not_root, flaky
  14. # Import Salt libs
  15. import salt.utils.files
  16. import salt.utils.path
  17. import salt.utils.platform
  18. import salt.states.file
  19. from salt.ext.six.moves import range
  20. from salt.ext import six
  21. log = logging.getLogger(__name__)
  22. @skipIf(not salt.utils.platform.is_linux(),
  23. 'These tests can only be run on linux')
  24. class SystemModuleTest(ModuleCase):
  25. '''
  26. Validate the date/time functions in the system module
  27. '''
  28. fmt_str = "%Y-%m-%d %H:%M:%S"
  29. def __init__(self, arg):
  30. super(self.__class__, self).__init__(arg)
  31. self._orig_time = None
  32. self._machine_info = True
  33. def setUp(self):
  34. super(SystemModuleTest, self).setUp()
  35. os_grain = self.run_function('grains.item', ['kernel'])
  36. if os_grain['kernel'] not in 'Linux':
  37. self.skipTest(
  38. 'Test not applicable to \'{kernel}\' kernel'.format(
  39. **os_grain
  40. )
  41. )
  42. if self.run_function('service.available', ['systemd-timesyncd']):
  43. self.run_function('service.stop', ['systemd-timesyncd'])
  44. def tearDown(self):
  45. if self._orig_time is not None:
  46. self._restore_time()
  47. self._orig_time = None
  48. if self._machine_info is not True:
  49. self._restore_machine_info()
  50. self._machine_info = True
  51. if self.run_function('service.available', ['systemd-timesyncd']):
  52. self.run_function('service.start', ['systemd-timesyncd'])
  53. def _save_time(self):
  54. self._orig_time = datetime.datetime.utcnow()
  55. def _set_time(self, new_time, offset=None):
  56. t = new_time.timetuple()[:6]
  57. t += (offset,)
  58. return self.run_function('system.set_system_date_time', t)
  59. def _restore_time(self):
  60. result = self._set_time(self._orig_time, "+0000")
  61. self.assertTrue(result, msg="Unable to restore time properly")
  62. def _same_times(self, t1, t2, seconds_diff=30):
  63. '''
  64. Helper function to check if two datetime objects
  65. are close enough to the same time.
  66. '''
  67. return abs(t1 - t2) < datetime.timedelta(seconds=seconds_diff)
  68. def _hwclock_has_compare(self):
  69. '''
  70. Some builds of hwclock don't include the `--compare` function
  71. needed to test hw/sw clock synchronization. Returns false on
  72. systems where it's not present so that we can skip the
  73. comparison portion of the test.
  74. '''
  75. res = self.run_function('cmd.run_all', cmd='hwclock -h')
  76. return res['retcode'] == 0 and res['stdout'].find('--compare') > 0
  77. def _test_hwclock_sync(self):
  78. '''
  79. Check that hw and sw clocks are sync'd.
  80. '''
  81. if not self.run_function('system.has_settable_hwclock'):
  82. return None
  83. if not self._hwclock_has_compare():
  84. return None
  85. class CompareTimeout(BaseException):
  86. pass
  87. def _alrm_handler(sig, frame):
  88. log.warning('hwclock --compare failed to produce output after 3 seconds')
  89. raise CompareTimeout
  90. for _ in range(2):
  91. try:
  92. orig_handler = signal.signal(signal.SIGALRM, _alrm_handler)
  93. signal.alarm(3)
  94. rpipeFd, wpipeFd = os.pipe()
  95. log.debug('Comparing hwclock to sys clock')
  96. with os.fdopen(rpipeFd, "r") as rpipe:
  97. with os.fdopen(wpipeFd, "w") as wpipe:
  98. with salt.utils.files.fopen(os.devnull, "r") as nulFd:
  99. p = subprocess.Popen(args=['hwclock', '--compare'],
  100. stdin=nulFd, stdout=wpipeFd, stderr=subprocess.PIPE)
  101. p.communicate()
  102. # read header
  103. rpipe.readline()
  104. # read first time comparison
  105. timeCompStr = rpipe.readline()
  106. # stop
  107. p.terminate()
  108. timeComp = timeCompStr.split()
  109. hwTime = float(timeComp[0])
  110. swTime = float(timeComp[1])
  111. diff = abs(hwTime - swTime)
  112. self.assertTrue(diff <= 2.0,
  113. msg=("hwclock difference too big: " + six.text_type(timeCompStr)))
  114. break
  115. except CompareTimeout:
  116. p.terminate()
  117. finally:
  118. signal.alarm(0)
  119. signal.signal(signal.SIGALRM, orig_handler)
  120. else:
  121. log.error('Failed to check hwclock sync')
  122. def _save_machine_info(self):
  123. if os.path.isfile('/etc/machine-info'):
  124. with salt.utils.files.fopen('/etc/machine-info', 'r') as mach_info:
  125. self._machine_info = mach_info.read()
  126. else:
  127. self._machine_info = False
  128. def _restore_machine_info(self):
  129. if self._machine_info is not False:
  130. with salt.utils.files.fopen('/etc/machine-info', 'w') as mach_info:
  131. mach_info.write(self._machine_info)
  132. else:
  133. self.run_function('file.remove', ['/etc/machine-info'])
  134. def test_get_system_date_time(self):
  135. '''
  136. Test we are able to get the correct time
  137. '''
  138. t1 = datetime.datetime.now()
  139. res = self.run_function('system.get_system_date_time')
  140. t2 = datetime.datetime.strptime(res, self.fmt_str)
  141. msg = ("Difference in times is too large. Now: {0} Fake: {1}"
  142. .format(t1, t2))
  143. self.assertTrue(self._same_times(t1, t2, seconds_diff=2), msg=msg)
  144. def test_get_system_date_time_utc(self):
  145. '''
  146. Test we are able to get the correct time with utc
  147. '''
  148. t1 = datetime.datetime.utcnow()
  149. res = self.run_function('system.get_system_date_time',
  150. utc_offset="+0000")
  151. t2 = datetime.datetime.strptime(res, self.fmt_str)
  152. msg = ("Difference in times is too large. Now: {0} Fake: {1}"
  153. .format(t1, t2))
  154. self.assertTrue(self._same_times(t1, t2, seconds_diff=2), msg=msg)
  155. @destructiveTest
  156. @skip_if_not_root
  157. def test_set_system_date_time(self):
  158. '''
  159. Test changing the system clock. We are only able to set it up to a
  160. resolution of a second so this test may appear to run in negative time.
  161. '''
  162. self._save_time()
  163. cmp_time = datetime.datetime.now() - datetime.timedelta(days=7)
  164. result = self._set_time(cmp_time)
  165. time_now = datetime.datetime.now()
  166. msg = ("Difference in times is too large. Now: {0} Fake: {1}"
  167. .format(time_now, cmp_time))
  168. self.assertTrue(result and self._same_times(time_now, cmp_time),
  169. msg=msg)
  170. self._test_hwclock_sync()
  171. @destructiveTest
  172. @skip_if_not_root
  173. def test_set_system_date_time_utc(self):
  174. '''
  175. Test changing the system clock. We are only able to set it up to a
  176. resolution of a second so this test may appear to run in negative time.
  177. '''
  178. self._save_time()
  179. cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7)
  180. result = self._set_time(cmp_time, offset="+0000")
  181. time_now = datetime.datetime.utcnow()
  182. msg = ("Difference in times is too large. Now: {0} Fake: {1}"
  183. .format(time_now, cmp_time))
  184. self.assertTrue(result)
  185. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  186. self._test_hwclock_sync()
  187. @destructiveTest
  188. @skip_if_not_root
  189. def test_set_system_date_time_utcoffset_east(self):
  190. '''
  191. Test changing the system clock. We are only able to set it up to a
  192. resolution of a second so this test may appear to run in negative time.
  193. '''
  194. self._save_time()
  195. cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7)
  196. # 25200 seconds = 7 hours
  197. time_to_set = cmp_time - datetime.timedelta(seconds=25200)
  198. result = self._set_time(time_to_set, offset='-0700')
  199. time_now = datetime.datetime.utcnow()
  200. msg = ("Difference in times is too large. Now: {0} Fake: {1}"
  201. .format(time_now, cmp_time))
  202. self.assertTrue(result)
  203. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  204. self._test_hwclock_sync()
  205. @destructiveTest
  206. @skip_if_not_root
  207. def test_set_system_date_time_utcoffset_west(self):
  208. '''
  209. Test changing the system clock. We are only able to set it up to a
  210. resolution of a second so this test may appear to run in negative time.
  211. '''
  212. self._save_time()
  213. cmp_time = datetime.datetime.utcnow() - datetime.timedelta(days=7)
  214. # 7200 seconds = 2 hours
  215. time_to_set = cmp_time + datetime.timedelta(seconds=7200)
  216. result = self._set_time(time_to_set, offset='+0200')
  217. time_now = datetime.datetime.utcnow()
  218. msg = ("Difference in times is too large. Now: {0} Fake: {1}"
  219. .format(time_now, cmp_time))
  220. self.assertTrue(result)
  221. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  222. self._test_hwclock_sync()
  223. @flaky
  224. @destructiveTest
  225. @skip_if_not_root
  226. def test_set_system_time(self):
  227. '''
  228. Test setting the system time without adjusting the date.
  229. '''
  230. cmp_time = datetime.datetime.now().replace(hour=10, minute=5, second=0)
  231. self._save_time()
  232. result = self.run_function('system.set_system_time', ["10:05:00"])
  233. time_now = datetime.datetime.now()
  234. msg = ("Difference in times is too large. Now: {0} Fake: {1}"
  235. .format(time_now, cmp_time))
  236. self.assertTrue(result)
  237. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  238. self._test_hwclock_sync()
  239. @destructiveTest
  240. @skip_if_not_root
  241. def test_set_system_date(self):
  242. '''
  243. Test setting the system date without adjusting the time.
  244. '''
  245. cmp_time = datetime.datetime.now() - datetime.timedelta(days=7)
  246. self._save_time()
  247. result = self.run_function(
  248. 'system.set_system_date',
  249. [cmp_time.strftime('%Y-%m-%d')]
  250. )
  251. time_now = datetime.datetime.now()
  252. msg = ("Difference in times is too large. Now: {0} Fake: {1}"
  253. .format(time_now, cmp_time))
  254. self.assertTrue(result)
  255. self.assertTrue(self._same_times(time_now, cmp_time), msg=msg)
  256. self._test_hwclock_sync()
  257. @skip_if_not_root
  258. def test_get_computer_desc(self):
  259. '''
  260. Test getting the system hostname
  261. '''
  262. res = self.run_function('system.get_computer_desc')
  263. hostname_cmd = salt.utils.path.which('hostnamectl')
  264. if hostname_cmd:
  265. desc = self.run_function('cmd.run', ["hostnamectl status --pretty"])
  266. self.assertEqual(res, desc)
  267. else:
  268. if not os.path.isfile('/etc/machine-info'):
  269. self.assertFalse(res)
  270. else:
  271. with salt.utils.files.fopen('/etc/machine-info', 'r') as mach_info:
  272. data = mach_info.read()
  273. self.assertIn(res, data.decode('string_escape'))
  274. @destructiveTest
  275. @skip_if_not_root
  276. def test_set_computer_desc(self):
  277. '''
  278. Test setting the computer description
  279. '''
  280. self._save_machine_info()
  281. desc = "test"
  282. ret = self.run_function('system.set_computer_desc', [desc])
  283. computer_desc = self.run_function('system.get_computer_desc')
  284. self.assertTrue(ret)
  285. self.assertIn(desc, computer_desc)
  286. @destructiveTest
  287. @skip_if_not_root
  288. def test_set_computer_desc_multiline(self):
  289. '''
  290. Test setting the computer description with a multiline string with tabs
  291. and double-quotes.
  292. '''
  293. self._save_machine_info()
  294. desc = textwrap.dedent('''\
  295. 'First Line
  296. \tSecond Line: 'single-quoted string'
  297. \t\tThird Line: "double-quoted string with unicode: питон"''')
  298. ret = self.run_function('system.set_computer_desc', [desc])
  299. # self.run_function returns the serialized return, we need to convert
  300. # back to unicode to compare to desc. in the assertIn below.
  301. computer_desc = salt.utils.stringutils.to_unicode(
  302. self.run_function('system.get_computer_desc')
  303. )
  304. self.assertTrue(ret)
  305. self.assertIn(desc, computer_desc)
  306. @skip_if_not_root
  307. def test_has_hwclock(self):
  308. '''
  309. Verify platform has a settable hardware clock, if possible.
  310. '''
  311. if self.run_function('grains.get', ['os_family']) == 'NILinuxRT':
  312. self.assertTrue(self.run_function('system._has_settable_hwclock'))
  313. self.assertTrue(self._hwclock_has_compare())
  314. @skipIf(not salt.utils.platform.is_windows(),
  315. 'These tests can only be run on windows')
  316. class WinSystemModuleTest(ModuleCase):
  317. '''
  318. Validate the date/time functions in the win_system module
  319. '''
  320. @classmethod
  321. def tearDownClass(cls):
  322. if subprocess.call('w32tm /resync', shell=True) != 0:
  323. log.error("Re-syncing time failed")
  324. def test_get_computer_name(self):
  325. '''
  326. Test getting the computer name
  327. '''
  328. ret = self.run_function('system.get_computer_name')
  329. self.assertTrue(isinstance(ret, six.text_type))
  330. import socket
  331. name = socket.gethostname()
  332. self.assertEqual(name, ret)
  333. @destructiveTest
  334. def test_set_computer_desc(self):
  335. '''
  336. Test setting the computer description
  337. '''
  338. current_desc = self.run_function('system.get_computer_desc')
  339. desc = 'test description'
  340. try:
  341. set_desc = self.run_function('system.set_computer_desc', [desc])
  342. self.assertTrue(set_desc)
  343. get_desc = self.run_function('system.get_computer_desc')
  344. self.assertEqual(set_desc['Computer Description'], get_desc)
  345. finally:
  346. self.run_function('system.set_computer_desc', [current_desc])
  347. def test_get_system_time(self):
  348. '''
  349. Test getting the system time
  350. '''
  351. ret = self.run_function('system.get_system_time')
  352. now = datetime.datetime.now()
  353. self.assertEqual(now.strftime("%I:%M"), ret.rsplit(':', 1)[0])
  354. @destructiveTest
  355. def test_set_system_time(self):
  356. '''
  357. Test setting the system time
  358. .. note::
  359. In order for this test to pass, time sync must be disabled for the
  360. VM in the hypervisor
  361. '''
  362. self.run_function('service.stop', ['w32time'])
  363. test_time = '10:55'
  364. current_time = self.run_function('system.get_system_time')
  365. try:
  366. self.run_function('system.set_system_time', [test_time + ' AM'])
  367. get_time = self.run_function('system.get_system_time').rsplit(':', 1)[0]
  368. self.assertEqual(get_time, test_time)
  369. finally:
  370. self.run_function('system.set_system_time', [current_time])
  371. self.run_function('service.start', ['w32time'])
  372. def test_get_system_date(self):
  373. '''
  374. Test getting system date
  375. '''
  376. ret = self.run_function('system.get_system_date')
  377. date = datetime.datetime.now().date().strftime("%m/%d/%Y")
  378. self.assertEqual(date, ret)
  379. @destructiveTest
  380. def test_set_system_date(self):
  381. '''
  382. Test setting system date
  383. .. note::
  384. In order for this test to pass, time sync must be disabled for the
  385. VM in the hypervisor
  386. '''
  387. self.run_function('service.stop', ['w32time'])
  388. current_date = self.run_function('system.get_system_date')
  389. try:
  390. self.run_function('system.set_system_date', ['03/25/2018'])
  391. ret = self.run_function('system.get_system_date')
  392. self.assertEqual(ret, '03/25/2018')
  393. finally:
  394. self.run_function('system.set_system_date', [current_date])
  395. self.run_function('service.start', ['w32time'])