123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- # coding: utf-8
- # Python libs
- from __future__ import absolute_import
- import datetime
- import logging
- # Salt testing libs
- from tests.support.unit import skipIf, TestCase
- from tests.support.mock import patch, MagicMock, mock_open
- from tests.support.mixins import LoaderModuleMockMixin
- # Salt libs
- import salt.beacons.wtmp as wtmp
- from salt.ext import six
- # pylint: disable=import-error
- try:
- import dateutil.parser as dateutil_parser # pylint: disable=unused-import
- _TIME_SUPPORTED = True
- except ImportError:
- _TIME_SUPPORTED = False
- raw = b'\x07\x00\x00\x00H\x18\x00\x00pts/14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00s/14gareth\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00::1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x13I\xc5YZf\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- pack = (7, 6216, b'pts/14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', b's/14', b'gareth\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', b'::1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 0, 0, 0, 1506101523, 353882, 0, 0, 0, 16777216)
- log = logging.getLogger(__name__)
- class WTMPBeaconTestCase(TestCase, LoaderModuleMockMixin):
- '''
- Test case for salt.beacons.[s]
- '''
- def setup_loader_modules(self):
- return {
- wtmp: {
- '__context__': {'wtmp.loc': 2},
- '__salt__': {},
- }
- }
- def test_non_list_config(self):
- config = {}
- ret = wtmp.validate(config)
- self.assertEqual(ret, (False, 'Configuration for wtmp beacon must'
- ' be a list.'))
- def test_empty_config(self):
- config = [{}]
- ret = wtmp.validate(config)
- self.assertEqual(ret, (True, 'Valid beacon configuration'))
- def test_no_match(self):
- config = [{'users': {'gareth': {'time_range': {'end': '09-22-2017 5pm',
- 'start': '09-22-2017 3pm'}}}}
- ]
- ret = wtmp.validate(config)
- self.assertEqual(ret, (True, 'Valid beacon configuration'))
- with patch('salt.utils.files.fopen', mock_open(b'')) as m_open:
- ret = wtmp.beacon(config)
- call_args = next(six.itervalues(m_open.filehandles))[0].call.args
- assert call_args == (wtmp.WTMP, 'rb'), call_args
- assert ret == [], ret
- def test_invalid_users(self):
- config = [{'users': ['gareth']}]
- ret = wtmp.validate(config)
- self.assertEqual(ret, (False, 'User configuration for wtmp beacon must be a dictionary.'))
- def test_invalid_groups(self):
- config = [{'groups': ['docker']}]
- ret = wtmp.validate(config)
- self.assertEqual(ret, (False, 'Group configuration for wtmp beacon must be a dictionary.'))
- def test_default_invalid_time_range(self):
- config = [{'defaults': {'time_range': {'start': '3pm'}}}]
- ret = wtmp.validate(config)
- self.assertEqual(ret, (False, 'The time_range parameter for wtmp beacon must contain start & end options.'))
- def test_users_invalid_time_range(self):
- config = [{'users': {'gareth': {'time_range': {'start': '3pm'}}}}]
- ret = wtmp.validate(config)
- self.assertEqual(ret, (False, 'The time_range parameter for wtmp beacon must contain start & end options.'))
- def test_groups_invalid_time_range(self):
- config = [{'groups': {'docker': {'time_range': {'start': '3pm'}}}}]
- ret = wtmp.validate(config)
- self.assertEqual(ret, (False, 'The time_range parameter for wtmp beacon must contain start & end options.'))
- def test_match(self):
- with patch('salt.utils.files.fopen',
- mock_open(read_data=raw)):
- with patch('struct.unpack',
- MagicMock(return_value=pack)):
- config = [{'users': {'gareth': {}}}]
- ret = wtmp.validate(config)
- self.assertEqual(ret, (True, 'Valid beacon configuration'))
- _expected = [{'PID': 6216,
- 'action': 'login',
- 'line': 'pts/14',
- 'session': 0,
- 'time': 0,
- 'exit_status': 0,
- 'inittab': 's/14',
- 'type': 7,
- 'addr': 1506101523,
- 'hostname': '::1',
- 'user': 'gareth'}]
- ret = wtmp.beacon(config)
- log.debug('{}'.format(ret))
- self.assertEqual(ret, _expected)
- @skipIf(not _TIME_SUPPORTED, 'dateutil.parser is missing.')
- def test_match_time(self):
- with patch('salt.utils.files.fopen',
- mock_open(read_data=raw)):
- mock_now = datetime.datetime(2017, 9, 22, 16, 0, 0, 0)
- with patch('datetime.datetime', MagicMock()), \
- patch('datetime.datetime.now',
- MagicMock(return_value=mock_now)):
- with patch('struct.unpack',
- MagicMock(return_value=pack)):
- config = [{'users': {'gareth': {'time': {'end': '09-22-2017 5pm',
- 'start': '09-22-2017 3pm'}}}}
- ]
- ret = wtmp.validate(config)
- self.assertEqual(ret, (True, 'Valid beacon configuration'))
- _expected = [{'PID': 6216,
- 'action': 'login',
- 'line': 'pts/14',
- 'session': 0,
- 'time': 0,
- 'exit_status': 0,
- 'inittab': 's/14',
- 'type': 7,
- 'addr': 1506101523,
- 'hostname': '::1',
- 'user': 'gareth'}]
- ret = wtmp.beacon(config)
- self.assertEqual(ret, _expected)
- def test_match_group(self):
- for groupadd in ('salt.modules.aix_group',
- 'salt.modules.mac_group',
- 'salt.modules.pw_group',
- 'salt.modules.solaris_group',
- 'salt.modules.win_groupadd'):
- mock_group_info = {'passwd': 'x',
- 'gid': 100,
- 'name': 'users',
- 'members': ['gareth']}
- with patch('salt.utils.files.fopen',
- mock_open(read_data=raw)):
- with patch('time.time',
- MagicMock(return_value=1506121200)):
- with patch('struct.unpack',
- MagicMock(return_value=pack)):
- with patch('{0}.info'.format(groupadd),
- new=MagicMock(return_value=mock_group_info)):
- config = [{'group': {'users': {'time': {'end': '09-22-2017 5pm',
- 'start': '09-22-2017 3pm'}}}}
- ]
- ret = wtmp.validate(config)
- self.assertEqual(ret,
- (True, 'Valid beacon configuration'))
- _expected = [{'PID': 6216,
- 'action': 'login',
- 'line': 'pts/14',
- 'session': 0,
- 'time': 0,
- 'exit_status': 0,
- 'inittab': 's/14',
- 'type': 7,
- 'addr': 1506101523,
- 'hostname': '::1',
- 'user': 'gareth'}]
- ret = wtmp.beacon(config)
- self.assertEqual(ret, _expected)
|