test_vt.py 14 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. :codeauthor: Pedro Algarvio (pedro@algarvio.me)
  4. tests.unit.utils.vt_test
  5. ~~~~~~~~~~~~~~~~~~~~~~~~
  6. VirtualTerminal tests
  7. """
  8. # Import Python libs
  9. from __future__ import absolute_import, print_function, unicode_literals
  10. import functools
  11. import io
  12. import os
  13. import random
  14. import subprocess
  15. import sys
  16. import time
  17. # Import Salt libs
  18. import salt.utils
  19. import salt.utils.files
  20. import salt.utils.platform
  21. import salt.utils.stringutils
  22. import salt.utils.vt
  23. # Import 3rd-party libs
  24. from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
  25. # Import Salt Testing libs
  26. from tests.support.paths import CODE_DIR
  27. from tests.support.unit import TestCase, skipIf
  28. def stdout_fileno_available():
  29. """
  30. Tests if sys.stdout.fileno is available in this testing environment
  31. """
  32. try:
  33. sys.stdout.fileno()
  34. return True
  35. except io.UnsupportedOperation:
  36. return False
  37. def fixStdOutErrFileNoIfNeeded(func):
  38. """
  39. Decorator that sets stdout and stderr to their original objects if
  40. sys.stdout.fileno() doesn't work and restores them after running the
  41. decorated function. This doesn't check if the original objects actually
  42. work. If they don't then the test environment is too broken to test
  43. the VT.
  44. """
  45. @functools.wraps(func)
  46. def wrapper_fixStdOutErrFileNoIfNeeded(*args, **kwargs):
  47. original_stdout = os.sys.stdout
  48. original_stderr = os.sys.stderr
  49. if not stdout_fileno_available():
  50. os.sys.stdout = os.sys.__stdout__
  51. os.sys.stderr = os.sys.__stderr__
  52. try:
  53. return func(*args, **kwargs)
  54. finally:
  55. os.sys.stdout = original_stdout
  56. os.sys.stderr = original_stderr
  57. return wrapper_fixStdOutErrFileNoIfNeeded
  58. class VTTestCase(TestCase):
  59. @skipIf(
  60. True,
  61. "Disabled until we can figure out why this fails when whole test suite runs.",
  62. )
  63. def test_vt_size(self):
  64. """Confirm that the terminal size is being set"""
  65. if not sys.stdin.isatty():
  66. self.skipTest("Not attached to a TTY. The test would fail.")
  67. cols = random.choice(range(80, 250))
  68. terminal = salt.utils.vt.Terminal(
  69. 'echo "Foo!"',
  70. shell=True,
  71. cols=cols,
  72. rows=24,
  73. stream_stdout=False,
  74. stream_stderr=False,
  75. )
  76. # First the assertion
  77. self.assertEqual(terminal.getwinsize(), (24, cols))
  78. # Then wait for the terminal child to exit
  79. terminal.wait()
  80. terminal.close()
  81. @skipIf(
  82. True,
  83. "Disabled until we can find out why this kills the tests suite with an exit code of 134",
  84. )
  85. def test_issue_10404_ptys_not_released(self):
  86. n_executions = 15
  87. def current_pty_count():
  88. # Get current number of PTY's
  89. try:
  90. if os.path.exists("/proc/sys/kernel/pty/nr"):
  91. with salt.utils.files.fopen("/proc/sys/kernel/pty/nr") as fh_:
  92. return int(fh_.read().strip())
  93. proc = subprocess.Popen(
  94. "sysctl -a 2> /dev/null | grep pty.nr | awk '{print $3}'",
  95. shell=True,
  96. stdout=subprocess.PIPE,
  97. )
  98. stdout, _ = proc.communicate()
  99. return int(stdout.strip())
  100. except (ValueError, OSError, IOError):
  101. if salt.utils.platform.is_darwin():
  102. # We're unable to findout how many PTY's are open
  103. self.skipTest(
  104. "Unable to find out how many PTY's are open on Darwin - "
  105. "Skipping for now"
  106. )
  107. self.fail("Unable to find out how many PTY's are open")
  108. nr_ptys = current_pty_count()
  109. # Using context manager's
  110. for idx in range(0, nr_ptys + n_executions):
  111. try:
  112. with salt.utils.vt.Terminal(
  113. 'echo "Run {0}"'.format(idx),
  114. shell=True,
  115. stream_stdout=False,
  116. stream_stderr=False,
  117. ) as terminal:
  118. terminal.wait()
  119. try:
  120. if current_pty_count() > (nr_ptys + (n_executions / 2)):
  121. self.fail("VT is not cleaning up PTY's")
  122. except (ValueError, OSError, IOError):
  123. self.fail("Unable to find out how many PTY's are open")
  124. except Exception as exc: # pylint: disable=broad-except
  125. if "out of pty devices" in str(exc):
  126. # We're not cleaning up
  127. raise
  128. # We're pushing the system resources, let's keep going
  129. continue
  130. # Not using context manager's
  131. for idx in range(0, nr_ptys + n_executions):
  132. try:
  133. terminal = salt.utils.vt.Terminal(
  134. 'echo "Run {0}"'.format(idx),
  135. shell=True,
  136. stream_stdout=False,
  137. stream_stderr=False,
  138. )
  139. terminal.wait()
  140. try:
  141. if current_pty_count() > (nr_ptys + (n_executions / 2)):
  142. self.fail("VT is not cleaning up PTY's")
  143. except (ValueError, OSError, IOError):
  144. self.fail("Unable to find out how many PTY's are open")
  145. except Exception as exc: # pylint: disable=broad-except
  146. if "out of pty devices" in str(exc):
  147. # We're not cleaning up
  148. raise
  149. # We're pushing the system resources, let's keep going
  150. continue
  151. @skipIf(True, "Disabled until we can figure out how to make this more reliable.")
  152. def test_isalive_while_theres_data_to_read(self):
  153. expected_data = "Alive!\n"
  154. term = salt.utils.vt.Terminal(
  155. 'echo "Alive!"', shell=True, stream_stdout=False, stream_stderr=False
  156. )
  157. buffer_o = buffer_e = ""
  158. try:
  159. while term.has_unread_data:
  160. stdout, stderr = term.recv()
  161. if stdout:
  162. buffer_o += stdout
  163. if stderr:
  164. buffer_e += stderr
  165. # While there's data to be read, the process is alive
  166. if stdout is None and stderr is None:
  167. self.assertFalse(term.isalive())
  168. # term should be dead now
  169. self.assertEqual(buffer_o, expected_data)
  170. self.assertFalse(term.isalive())
  171. stdout, stderr = term.recv()
  172. self.assertFalse(term.isalive())
  173. self.assertIsNone(stderr)
  174. self.assertIsNone(stdout)
  175. finally:
  176. term.close(terminate=True, kill=True)
  177. expected_data = "Alive!\n"
  178. term = salt.utils.vt.Terminal(
  179. 'echo "Alive!" 1>&2', shell=True, stream_stdout=False, stream_stderr=False
  180. )
  181. buffer_o = buffer_e = ""
  182. try:
  183. while term.has_unread_data:
  184. stdout, stderr = term.recv()
  185. if stdout:
  186. buffer_o += stdout
  187. if stderr:
  188. buffer_e += stderr
  189. # While there's data to be read, the process is alive
  190. if stdout is None and stderr is None:
  191. self.assertFalse(term.isalive())
  192. # term should be dead now
  193. self.assertEqual(buffer_e, expected_data)
  194. self.assertFalse(term.isalive())
  195. stdout, stderr = term.recv()
  196. self.assertFalse(term.isalive())
  197. self.assertIsNone(stderr)
  198. self.assertIsNone(stdout)
  199. finally:
  200. term.close(terminate=True, kill=True)
  201. expected_data = "Alive!\nAlive!\n"
  202. term = salt.utils.vt.Terminal(
  203. 'echo "Alive!"; sleep 5; echo "Alive!"',
  204. shell=True,
  205. stream_stdout=False,
  206. stream_stderr=False,
  207. )
  208. buffer_o = buffer_e = ""
  209. try:
  210. while term.has_unread_data:
  211. stdout, stderr = term.recv()
  212. if stdout:
  213. buffer_o += stdout
  214. if stderr:
  215. buffer_e += stderr
  216. # While there's data to be read, the process is alive
  217. if stdout is None and stderr is None:
  218. self.assertFalse(term.isalive())
  219. if buffer_o != expected_data:
  220. self.assertTrue(term.isalive())
  221. # Don't spin
  222. time.sleep(0.1)
  223. # term should be dead now
  224. self.assertEqual(buffer_o, expected_data)
  225. self.assertFalse(term.isalive())
  226. stdout, stderr = term.recv()
  227. self.assertFalse(term.isalive())
  228. self.assertIsNone(stderr)
  229. self.assertIsNone(stdout)
  230. finally:
  231. term.close(terminate=True, kill=True)
  232. @staticmethod
  233. def generate_multibyte_stdout_unicode(block_size):
  234. return b"\xE2\x80\xA6" * 4 * block_size
  235. @staticmethod
  236. def generate_multibyte_stderr_unicode(block_size):
  237. return b"\x2E" + VTTestCase.generate_multibyte_stdout_unicode(block_size)
  238. @skipIf(
  239. salt.utils.platform.is_windows(), "Skip VT tests on windows, due to issue 54290"
  240. )
  241. @fixStdOutErrFileNoIfNeeded
  242. def test_split_multibyte_characters_unicode(self):
  243. """
  244. Tests that the vt correctly handles multibyte characters that are
  245. split between blocks of transmitted data.
  246. """
  247. block_size = 1024
  248. encoding = "utf-8"
  249. stdout_content = VTTestCase.generate_multibyte_stdout_unicode(block_size)
  250. # stderr is offset by one byte to guarentee a split character in
  251. # one of the output streams
  252. stderr_content = VTTestCase.generate_multibyte_stderr_unicode(block_size)
  253. expected_stdout = salt.utils.stringutils.to_unicode(stdout_content, encoding)
  254. expected_stderr = salt.utils.stringutils.to_unicode(stderr_content, encoding)
  255. python_command = "\n".join(
  256. (
  257. "import sys",
  258. "import os",
  259. "import tests.unit.utils.test_vt as test_vt",
  260. (
  261. "os.write(sys.stdout.fileno(), "
  262. "test_vt.VTTestCase.generate_multibyte_stdout_unicode("
  263. + str(block_size)
  264. + "))"
  265. ),
  266. (
  267. "os.write(sys.stderr.fileno(), "
  268. "test_vt.VTTestCase.generate_multibyte_stderr_unicode("
  269. + str(block_size)
  270. + "))"
  271. ),
  272. )
  273. )
  274. term = salt.utils.vt.Terminal(
  275. args=[sys.executable, "-c", '"' + python_command + '"'],
  276. shell=True,
  277. cwd=CODE_DIR,
  278. stream_stdout=False,
  279. stream_stderr=False,
  280. force_receive_encoding=encoding,
  281. )
  282. buffer_o = buffer_e = salt.utils.stringutils.to_unicode("")
  283. try:
  284. while term.has_unread_data:
  285. stdout, stderr = term.recv(block_size)
  286. if stdout:
  287. buffer_o += stdout
  288. if stderr:
  289. buffer_e += stderr
  290. self.assertEqual(buffer_o, expected_stdout)
  291. self.assertEqual(buffer_e, expected_stderr)
  292. finally:
  293. term.close(terminate=True, kill=True)
  294. @staticmethod
  295. def generate_multibyte_stdout_shiftjis(block_size):
  296. return b"\x8B\x80" * 4 * block_size
  297. @staticmethod
  298. def generate_multibyte_stderr_shiftjis(block_size):
  299. return b"\x2E" + VTTestCase.generate_multibyte_stdout_shiftjis(block_size)
  300. @skipIf(
  301. salt.utils.platform.is_windows(), "Skip VT tests on windows, due to issue 54290"
  302. )
  303. @fixStdOutErrFileNoIfNeeded
  304. def test_split_multibyte_characters_shiftjis(self):
  305. """
  306. Tests that the vt correctly handles multibyte characters that are
  307. split between blocks of transmitted data.
  308. Uses shift-jis encoding to make sure code doesn't assume unicode.
  309. """
  310. block_size = 1024
  311. encoding = "shift-jis"
  312. stdout_content = VTTestCase.generate_multibyte_stdout_shiftjis(block_size)
  313. stderr_content = VTTestCase.generate_multibyte_stderr_shiftjis(block_size)
  314. expected_stdout = salt.utils.stringutils.to_unicode(stdout_content, encoding)
  315. expected_stderr = salt.utils.stringutils.to_unicode(stderr_content, encoding)
  316. python_command = "\n".join(
  317. (
  318. "import sys",
  319. "import os",
  320. "import tests.unit.utils.test_vt as test_vt",
  321. (
  322. "os.write(sys.stdout.fileno(), "
  323. "test_vt.VTTestCase.generate_multibyte_stdout_shiftjis("
  324. + str(block_size)
  325. + "))"
  326. ),
  327. (
  328. "os.write(sys.stderr.fileno(), "
  329. "test_vt.VTTestCase.generate_multibyte_stderr_shiftjis("
  330. + str(block_size)
  331. + "))"
  332. ),
  333. )
  334. )
  335. term = salt.utils.vt.Terminal(
  336. args=[sys.executable, "-c", '"' + python_command + '"'],
  337. shell=True,
  338. cwd=CODE_DIR,
  339. stream_stdout=False,
  340. stream_stderr=False,
  341. force_receive_encoding=encoding,
  342. )
  343. buffer_o = buffer_e = salt.utils.stringutils.to_unicode("")
  344. try:
  345. while term.has_unread_data:
  346. stdout, stderr = term.recv(block_size)
  347. if stdout:
  348. buffer_o += stdout
  349. if stderr:
  350. buffer_e += stderr
  351. self.assertEqual(buffer_o, expected_stdout)
  352. self.assertEqual(buffer_e, expected_stderr)
  353. finally:
  354. term.close(terminate=True, kill=True)