123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- # -*- coding: utf-8 -*-
- '''
- :codeauthor: Pedro Algarvio (pedro@algarvio.me)
- tests.integration.shell.syndic
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- '''
- # Import python libs
- from __future__ import absolute_import
- import logging
- from collections import OrderedDict
- # Import Salt Testing libs
- from tests.support.case import ShellCase
- from tests.support.mixins import ShellCaseCommonTestsMixin
- from tests.support.unit import skipIf
- from tests.integration.utils import testprogram
- # Import salt libs
- import salt.utils.files
- import salt.utils.yaml
- import salt.utils.platform
- # Import 3rd-party libs
- import psutil
- import pytest
- log = logging.getLogger(__name__)
- #@pytest.fixture(scope='module', autouse=True)
- def session_salt_syndic(request, session_salt_master_of_masters, session_salt_syndic):
- request.session.stats_processes.update(OrderedDict((
- ('Salt Syndic Master', psutil.Process(session_salt_master_of_masters.pid)),
- ('Salt Syndic', psutil.Process(session_salt_syndic.pid)),
- )).items())
- yield session_salt_syndic
- request.session.stats_processes.pop('Salt Syndic Master')
- request.session.stats_processes.pop('Salt Syndic')
- # Stop daemons now(they would be stopped at the end of the test run session
- for daemon in (session_salt_syndic, session_salt_master_of_masters):
- try:
- daemon.terminate()
- except Exception as exc: # pylint: disable=broad-except
- log.warning('Failed to terminate daemon: %s', daemon.__class__.__name__)
- @pytest.mark.windows_whitelisted
- class SyndicTest(ShellCase, testprogram.TestProgramCase, ShellCaseCommonTestsMixin):
- '''
- Test the salt-syndic command
- '''
- _call_binary_ = 'salt-syndic'
- @skipIf(salt.utils.platform.is_windows(), 'Skip on Windows OS')
- def test_exit_status_unknown_user(self):
- '''
- Ensure correct exit status when the syndic is configured to run as an unknown user.
- Skipped on windows because daemonization not supported
- '''
- syndic = testprogram.TestDaemonSaltSyndic(
- name='unknown_user',
- config_base={'user': 'some_unknown_user_xyz'},
- parent_dir=self._test_dir,
- )
- # Call setup here to ensure config and script exist
- syndic.setup()
- stdout, stderr, status = syndic.run(
- args=['-d'],
- catch_stderr=True,
- with_retcode=True,
- )
- try:
- self.assert_exit_status(
- status, 'EX_NOUSER',
- message='unknown user not on system',
- stdout=stdout, stderr=stderr
- )
- finally:
- # Although the start-up should fail, call shutdown() to set the
- # internal _shutdown flag and avoid the registered atexit calls to
- # cause timeout exceptions and respective traceback
- syndic.shutdown()
- # pylint: disable=invalid-name
- @skipIf(salt.utils.platform.is_windows(), 'Skip on Windows OS')
- def test_exit_status_unknown_argument(self):
- '''
- Ensure correct exit status when an unknown argument is passed to salt-syndic.
- Skipped on windows because daemonization not supported
- '''
- syndic = testprogram.TestDaemonSaltSyndic(
- name='unknown_argument',
- parent_dir=self._test_dir,
- )
- # Syndic setup here to ensure config and script exist
- syndic.setup()
- stdout, stderr, status = syndic.run(
- args=['-d', '--unknown-argument'],
- catch_stderr=True,
- with_retcode=True,
- )
- try:
- self.assert_exit_status(
- status, 'EX_USAGE',
- message='unknown argument',
- stdout=stdout, stderr=stderr
- )
- finally:
- # Although the start-up should fail, call shutdown() to set the
- # internal _shutdown flag and avoid the registered atexit calls to
- # cause timeout exceptions and respective traceback
- syndic.shutdown()
- @skipIf(salt.utils.platform.is_windows(), 'Skip on Windows OS')
- def test_exit_status_correct_usage(self):
- '''
- Ensure correct exit status when salt-syndic starts correctly.
- Skipped on windows because daemonization not supported
- '''
- syndic = testprogram.TestDaemonSaltSyndic(
- name='correct_usage',
- parent_dir=self._test_dir,
- )
- # Syndic setup here to ensure config and script exist
- syndic.setup()
- stdout, stderr, status = syndic.run(
- args=['-d', '-l', 'debug'],
- catch_stderr=True,
- with_retcode=True,
- )
- try:
- self.assert_exit_status(
- status, 'EX_OK',
- message='correct usage',
- stdout=stdout, stderr=stderr
- )
- finally:
- syndic.shutdown(wait_for_orphans=3)
|