123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396 |
- # -*- coding: utf-8 -*-
- """
- :codeauthor: Pedro Algarvio (pedro@algarvio.me)
- tests.unit.utils.vt_test
- ~~~~~~~~~~~~~~~~~~~~~~~~
- VirtualTerminal tests
- """
- # Import Python libs
- from __future__ import absolute_import, print_function, unicode_literals
- import functools
- import io
- import os
- import random
- import subprocess
- import sys
- import time
- # Import Salt libs
- import salt.utils
- import salt.utils.files
- import salt.utils.platform
- import salt.utils.stringutils
- import salt.utils.vt
- # Import 3rd-party libs
- from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
- # Import Salt Testing libs
- from tests.support.paths import CODE_DIR
- from tests.support.unit import TestCase, skipIf
- def stdout_fileno_available():
- """
- Tests if sys.stdout.fileno is available in this testing environment
- """
- try:
- sys.stdout.fileno()
- return True
- except io.UnsupportedOperation:
- return False
- def fixStdOutErrFileNoIfNeeded(func):
- """
- Decorator that sets stdout and stderr to their original objects if
- sys.stdout.fileno() doesn't work and restores them after running the
- decorated function. This doesn't check if the original objects actually
- work. If they don't then the test environment is too broken to test
- the VT.
- """
- @functools.wraps(func)
- def wrapper_fixStdOutErrFileNoIfNeeded(*args, **kwargs):
- original_stdout = os.sys.stdout
- original_stderr = os.sys.stderr
- if not stdout_fileno_available():
- os.sys.stdout = os.sys.__stdout__
- os.sys.stderr = os.sys.__stderr__
- try:
- return func(*args, **kwargs)
- finally:
- os.sys.stdout = original_stdout
- os.sys.stderr = original_stderr
- return wrapper_fixStdOutErrFileNoIfNeeded
- class VTTestCase(TestCase):
- @skipIf(
- True,
- "Disabled until we can figure out why this fails when whole test suite runs.",
- )
- def test_vt_size(self):
- """Confirm that the terminal size is being set"""
- if not sys.stdin.isatty():
- self.skipTest("Not attached to a TTY. The test would fail.")
- cols = random.choice(range(80, 250))
- terminal = salt.utils.vt.Terminal(
- 'echo "Foo!"',
- shell=True,
- cols=cols,
- rows=24,
- stream_stdout=False,
- stream_stderr=False,
- )
- # First the assertion
- self.assertEqual(terminal.getwinsize(), (24, cols))
- # Then wait for the terminal child to exit
- terminal.wait()
- terminal.close()
- @skipIf(
- True,
- "Disabled until we can find out why this kills the tests suite with an exit code of 134",
- )
- def test_issue_10404_ptys_not_released(self):
- n_executions = 15
- def current_pty_count():
- # Get current number of PTY's
- try:
- if os.path.exists("/proc/sys/kernel/pty/nr"):
- with salt.utils.files.fopen("/proc/sys/kernel/pty/nr") as fh_:
- return int(fh_.read().strip())
- proc = subprocess.Popen(
- "sysctl -a 2> /dev/null | grep pty.nr | awk '{print $3}'",
- shell=True,
- stdout=subprocess.PIPE,
- )
- stdout, _ = proc.communicate()
- return int(stdout.strip())
- except (ValueError, OSError, IOError):
- if salt.utils.platform.is_darwin():
- # We're unable to findout how many PTY's are open
- self.skipTest(
- "Unable to find out how many PTY's are open on Darwin - "
- "Skipping for now"
- )
- self.fail("Unable to find out how many PTY's are open")
- nr_ptys = current_pty_count()
- # Using context manager's
- for idx in range(0, nr_ptys + n_executions):
- try:
- with salt.utils.vt.Terminal(
- 'echo "Run {0}"'.format(idx),
- shell=True,
- stream_stdout=False,
- stream_stderr=False,
- ) as terminal:
- terminal.wait()
- try:
- if current_pty_count() > (nr_ptys + (n_executions / 2)):
- self.fail("VT is not cleaning up PTY's")
- except (ValueError, OSError, IOError):
- self.fail("Unable to find out how many PTY's are open")
- except Exception as exc: # pylint: disable=broad-except
- if "out of pty devices" in str(exc):
- # We're not cleaning up
- raise
- # We're pushing the system resources, let's keep going
- continue
- # Not using context manager's
- for idx in range(0, nr_ptys + n_executions):
- try:
- terminal = salt.utils.vt.Terminal(
- 'echo "Run {0}"'.format(idx),
- shell=True,
- stream_stdout=False,
- stream_stderr=False,
- )
- terminal.wait()
- try:
- if current_pty_count() > (nr_ptys + (n_executions / 2)):
- self.fail("VT is not cleaning up PTY's")
- except (ValueError, OSError, IOError):
- self.fail("Unable to find out how many PTY's are open")
- except Exception as exc: # pylint: disable=broad-except
- if "out of pty devices" in str(exc):
- # We're not cleaning up
- raise
- # We're pushing the system resources, let's keep going
- continue
- @skipIf(True, "Disabled until we can figure out how to make this more reliable.")
- def test_isalive_while_theres_data_to_read(self):
- expected_data = "Alive!\n"
- term = salt.utils.vt.Terminal(
- 'echo "Alive!"', shell=True, stream_stdout=False, stream_stderr=False
- )
- buffer_o = buffer_e = ""
- try:
- while term.has_unread_data:
- stdout, stderr = term.recv()
- if stdout:
- buffer_o += stdout
- if stderr:
- buffer_e += stderr
- # While there's data to be read, the process is alive
- if stdout is None and stderr is None:
- self.assertFalse(term.isalive())
- # term should be dead now
- self.assertEqual(buffer_o, expected_data)
- self.assertFalse(term.isalive())
- stdout, stderr = term.recv()
- self.assertFalse(term.isalive())
- self.assertIsNone(stderr)
- self.assertIsNone(stdout)
- finally:
- term.close(terminate=True, kill=True)
- expected_data = "Alive!\n"
- term = salt.utils.vt.Terminal(
- 'echo "Alive!" 1>&2', shell=True, stream_stdout=False, stream_stderr=False
- )
- buffer_o = buffer_e = ""
- try:
- while term.has_unread_data:
- stdout, stderr = term.recv()
- if stdout:
- buffer_o += stdout
- if stderr:
- buffer_e += stderr
- # While there's data to be read, the process is alive
- if stdout is None and stderr is None:
- self.assertFalse(term.isalive())
- # term should be dead now
- self.assertEqual(buffer_e, expected_data)
- self.assertFalse(term.isalive())
- stdout, stderr = term.recv()
- self.assertFalse(term.isalive())
- self.assertIsNone(stderr)
- self.assertIsNone(stdout)
- finally:
- term.close(terminate=True, kill=True)
- expected_data = "Alive!\nAlive!\n"
- term = salt.utils.vt.Terminal(
- 'echo "Alive!"; sleep 5; echo "Alive!"',
- shell=True,
- stream_stdout=False,
- stream_stderr=False,
- )
- buffer_o = buffer_e = ""
- try:
- while term.has_unread_data:
- stdout, stderr = term.recv()
- if stdout:
- buffer_o += stdout
- if stderr:
- buffer_e += stderr
- # While there's data to be read, the process is alive
- if stdout is None and stderr is None:
- self.assertFalse(term.isalive())
- if buffer_o != expected_data:
- self.assertTrue(term.isalive())
- # Don't spin
- time.sleep(0.1)
- # term should be dead now
- self.assertEqual(buffer_o, expected_data)
- self.assertFalse(term.isalive())
- stdout, stderr = term.recv()
- self.assertFalse(term.isalive())
- self.assertIsNone(stderr)
- self.assertIsNone(stdout)
- finally:
- term.close(terminate=True, kill=True)
- @staticmethod
- def generate_multibyte_stdout_unicode(block_size):
- return b"\xE2\x80\xA6" * 4 * block_size
- @staticmethod
- def generate_multibyte_stderr_unicode(block_size):
- return b"\x2E" + VTTestCase.generate_multibyte_stdout_unicode(block_size)
- @skipIf(
- salt.utils.platform.is_windows(), "Skip VT tests on windows, due to issue 54290"
- )
- @fixStdOutErrFileNoIfNeeded
- def test_split_multibyte_characters_unicode(self):
- """
- Tests that the vt correctly handles multibyte characters that are
- split between blocks of transmitted data.
- """
- block_size = 1024
- encoding = "utf-8"
- stdout_content = VTTestCase.generate_multibyte_stdout_unicode(block_size)
- # stderr is offset by one byte to guarentee a split character in
- # one of the output streams
- stderr_content = VTTestCase.generate_multibyte_stderr_unicode(block_size)
- expected_stdout = salt.utils.stringutils.to_unicode(stdout_content, encoding)
- expected_stderr = salt.utils.stringutils.to_unicode(stderr_content, encoding)
- python_command = "\n".join(
- (
- "import sys",
- "import os",
- "import tests.unit.utils.test_vt as test_vt",
- (
- "os.write(sys.stdout.fileno(), "
- "test_vt.VTTestCase.generate_multibyte_stdout_unicode("
- + str(block_size)
- + "))"
- ),
- (
- "os.write(sys.stderr.fileno(), "
- "test_vt.VTTestCase.generate_multibyte_stderr_unicode("
- + str(block_size)
- + "))"
- ),
- )
- )
- term = salt.utils.vt.Terminal(
- args=[sys.executable, "-c", '"' + python_command + '"'],
- shell=True,
- cwd=CODE_DIR,
- stream_stdout=False,
- stream_stderr=False,
- force_receive_encoding=encoding,
- )
- buffer_o = buffer_e = salt.utils.stringutils.to_unicode("")
- try:
- while term.has_unread_data:
- stdout, stderr = term.recv(block_size)
- if stdout:
- buffer_o += stdout
- if stderr:
- buffer_e += stderr
- self.assertEqual(buffer_o, expected_stdout)
- self.assertEqual(buffer_e, expected_stderr)
- finally:
- term.close(terminate=True, kill=True)
- @staticmethod
- def generate_multibyte_stdout_shiftjis(block_size):
- return b"\x8B\x80" * 4 * block_size
- @staticmethod
- def generate_multibyte_stderr_shiftjis(block_size):
- return b"\x2E" + VTTestCase.generate_multibyte_stdout_shiftjis(block_size)
- @skipIf(
- salt.utils.platform.is_windows(), "Skip VT tests on windows, due to issue 54290"
- )
- @fixStdOutErrFileNoIfNeeded
- def test_split_multibyte_characters_shiftjis(self):
- """
- Tests that the vt correctly handles multibyte characters that are
- split between blocks of transmitted data.
- Uses shift-jis encoding to make sure code doesn't assume unicode.
- """
- block_size = 1024
- encoding = "shift-jis"
- stdout_content = VTTestCase.generate_multibyte_stdout_shiftjis(block_size)
- stderr_content = VTTestCase.generate_multibyte_stderr_shiftjis(block_size)
- expected_stdout = salt.utils.stringutils.to_unicode(stdout_content, encoding)
- expected_stderr = salt.utils.stringutils.to_unicode(stderr_content, encoding)
- python_command = "\n".join(
- (
- "import sys",
- "import os",
- "import tests.unit.utils.test_vt as test_vt",
- (
- "os.write(sys.stdout.fileno(), "
- "test_vt.VTTestCase.generate_multibyte_stdout_shiftjis("
- + str(block_size)
- + "))"
- ),
- (
- "os.write(sys.stderr.fileno(), "
- "test_vt.VTTestCase.generate_multibyte_stderr_shiftjis("
- + str(block_size)
- + "))"
- ),
- )
- )
- term = salt.utils.vt.Terminal(
- args=[sys.executable, "-c", '"' + python_command + '"'],
- shell=True,
- cwd=CODE_DIR,
- stream_stdout=False,
- stream_stderr=False,
- force_receive_encoding=encoding,
- )
- buffer_o = buffer_e = salt.utils.stringutils.to_unicode("")
- try:
- while term.has_unread_data:
- stdout, stderr = term.recv(block_size)
- if stdout:
- buffer_o += stdout
- if stderr:
- buffer_e += stderr
- self.assertEqual(buffer_o, expected_stdout)
- self.assertEqual(buffer_e, expected_stderr)
- finally:
- term.close(terminate=True, kill=True)
|