# -*- coding: utf-8 -*- ''' :codeauthor: :email:`Bo Maryniuk ` ''' from __future__ import absolute_import, print_function, unicode_literals import datetime from tests.support.unit import TestCase, skipIf from tests.support.mock import ( MagicMock, patch) from salt.ext.six.moves import zip from salt.ext import six import salt.utils.ssdp as ssdp import salt.utils.stringutils try: import pytest except ImportError: pytest = None class Mocks(object): def get_socket_mock(self, expected_ip, expected_hostname): ''' Get a mock of a socket :return: ''' sck = MagicMock() sck.getsockname = MagicMock(return_value=(expected_ip, 123456)) sock_mock = MagicMock() sock_mock.socket = MagicMock(return_value=sck) sock_mock.gethostname = MagicMock(return_value=expected_hostname) sock_mock.gethostbyname = MagicMock(return_value=expected_ip) return sock_mock def get_ssdp_factory(self, expected_ip=None, expected_hostname=None, **config): if expected_ip is None: expected_ip = '127.0.0.1' if expected_hostname is None: expected_hostname = 'localhost' sock_mock = self.get_socket_mock(expected_ip, expected_hostname) with patch('salt.utils.ssdp.socket', sock_mock): factory = ssdp.SSDPFactory(**config) return factory def get_ssdp_discovery_client(self, expected_ip=None, expected_hostname=None, **config): if expected_ip is None: expected_ip = '127.0.0.1' if expected_hostname is None: expected_hostname = 'localhost' sock_mock = self.get_socket_mock(expected_ip, expected_hostname) with patch('salt.utils.ssdp.socket', sock_mock): factory = ssdp.SSDPDiscoveryClient(**config) return factory def get_ssdp_discovery_server(self, expected_ip=None, expected_hostname=None, **config): if expected_ip is None: expected_ip = '127.0.0.1' if expected_hostname is None: expected_hostname = 'localhost' sock_mock = self.get_socket_mock(expected_ip, expected_hostname) with patch('salt.utils.ssdp.socket', sock_mock): factory = ssdp.SSDPDiscoveryServer(**config) return factory @skipIf(pytest is None, 'PyTest is missing') class SSDPBaseTestCase(TestCase, Mocks): ''' TestCase for SSDP-related parts. ''' @staticmethod def exception_generic(*args, **kwargs): ''' Side effect :return: ''' raise Exception('some network error') @staticmethod def exception_attr_error(*args, **kwargs): ''' Side effect :return: ''' raise AttributeError('attribute error: {0}. {1}'.format(args, kwargs)) @patch('salt.utils.ssdp._json', None) @patch('salt.utils.ssdp.asyncio', None) def test_base_avail(self): ''' Test SSDP base class availability method. :return: ''' base = ssdp.SSDPBase() assert not base._is_available() with patch('salt.utils.ssdp._json', True): assert not base._is_available() with patch('salt.utils.ssdp.asyncio', True): assert not base._is_available() with patch('salt.utils.ssdp._json', True), patch('salt.utils.ssdp.asyncio', True): assert base._is_available() def test_base_protocol_settings(self): ''' Tests default constants data. :return: ''' base = ssdp.SSDPBase() v_keys = ['signature', 'answer', 'port', 'listen_ip', 'timeout'] v_vals = ['__salt_master_service', {}, 4520, '0.0.0.0', 3] for key in v_keys: assert key in base.DEFAULTS for key in base.DEFAULTS: assert key in v_keys for key, value in zip(v_keys, v_vals): assert base.DEFAULTS[key] == value def test_base_self_ip(self): ''' Test getting self IP method. :return: ''' base = ssdp.SSDPBase() expected_ip = '192.168.1.10' expected_host = 'oxygen' sock_mock = self.get_socket_mock(expected_ip, expected_host) with patch('salt.utils.ssdp.socket', sock_mock): assert base.get_self_ip() == expected_ip sock_mock.socket().getsockname.side_effect = SSDPBaseTestCase.exception_generic with patch('salt.utils.ssdp.socket', sock_mock): assert base.get_self_ip() == expected_ip @skipIf(pytest is None, 'PyTest is missing') class SSDPFactoryTestCase(TestCase, Mocks): ''' Test socket protocol ''' def test_attr_check(self): ''' Tests attributes are set to the base class :return: ''' config = { ssdp.SSDPBase.SIGNATURE: '-signature-', ssdp.SSDPBase.ANSWER: {'this-is': 'the-answer'} } expected_ip = '10.10.10.10' factory = self.get_ssdp_factory(expected_ip=expected_ip, **config) for attr in [ssdp.SSDPBase.SIGNATURE, ssdp.SSDPBase.ANSWER]: assert hasattr(factory, attr) assert getattr(factory, attr) == config[attr] assert not factory.disable_hidden assert factory.my_ip == expected_ip def test_transport_sendto_success(self): ''' Test transport send_to. :return: ''' transport = MagicMock() log = MagicMock() factory = self.get_ssdp_factory() with patch.object(factory, 'transport', transport), patch.object(factory, 'log', log): data = {'some': 'data'} addr = '10.10.10.10' factory._sendto(data=data, addr=addr) assert factory.transport.sendto.called assert factory.transport.sendto.mock_calls[0][1][0]['some'] == 'data' assert factory.transport.sendto.mock_calls[0][2]['addr'] == '10.10.10.10' assert factory.log.debug.called assert factory.log.debug.mock_calls[0][1][0] == 'Sent successfully' def test_transport_sendto_retry(self): ''' Test transport send_to. :return: ''' with patch('salt.utils.ssdp.time.sleep', MagicMock()): transport = MagicMock() transport.sendto = MagicMock(side_effect=SSDPBaseTestCase.exception_attr_error) log = MagicMock() factory = self.get_ssdp_factory() with patch.object(factory, 'transport', transport), patch.object(factory, 'log', log): data = {'some': 'data'} addr = '10.10.10.10' factory._sendto(data=data, addr=addr) assert factory.transport.sendto.called assert ssdp.time.sleep.called assert ssdp.time.sleep.call_args[0][0] > 0 and ssdp.time.sleep.call_args[0][0] < 0.5 assert factory.log.debug.called assert 'Permission error' in factory.log.debug.mock_calls[0][1][0] def test_datagram_signature_bad(self): ''' Test datagram_received on bad signature :return: ''' factory = self.get_ssdp_factory() data = 'nonsense' addr = '10.10.10.10', 'foo.suse.de' with patch.object(factory, 'log', MagicMock()): factory.datagram_received(data=data, addr=addr) assert factory.log.debug.called assert 'Received bad signature from' in factory.log.debug.call_args[0][0] assert factory.log.debug.call_args[0][1] == addr[0] assert factory.log.debug.call_args[0][2] == addr[1] def test_datagram_signature_wrong_timestamp_quiet(self): ''' Test datagram receives a wrong timestamp (no reply). :return: ''' factory = self.get_ssdp_factory() data = '{}nonsense'.format(ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]) addr = '10.10.10.10', 'foo.suse.de' with patch.object(factory, 'log', MagicMock()), patch.object(factory, '_sendto', MagicMock()): factory.datagram_received(data=data, addr=addr) assert factory.log.debug.called assert 'Received invalid timestamp in package' in factory.log.debug.call_args[0][0] assert not factory._sendto.called def test_datagram_signature_wrong_timestamp_reply(self): ''' Test datagram receives a wrong timestamp. :return: ''' factory = self.get_ssdp_factory() factory.disable_hidden = True signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE] data = '{}nonsense'.format(signature) addr = '10.10.10.10', 'foo.suse.de' with patch.object(factory, 'log', MagicMock()), patch.object(factory, '_sendto', MagicMock()): factory.datagram_received(data=data, addr=addr) assert factory.log.debug.called assert 'Received invalid timestamp in package' in factory.log.debug.call_args[0][0] assert factory._sendto.called assert '{}:E:Invalid timestamp'.format(signature) == factory._sendto.call_args[0][0] def test_datagram_signature_outdated_timestamp_quiet(self): ''' Test if datagram processing reacts on outdated message (more than 20 seconds). Quiet mode. :return: ''' factory = self.get_ssdp_factory() signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE] data = '{}{}'.format(signature, '1516623820') addr = '10.10.10.10', 'foo.suse.de' ahead_dt = datetime.datetime.fromtimestamp(1516623841) curnt_dt = datetime.datetime.fromtimestamp(1516623820) delta = datetime.timedelta(0, 20) with patch.object(factory, 'log', MagicMock()), patch.object(factory, '_sendto'), \ patch('salt.utils.ssdp.datetime.datetime', MagicMock()), \ patch('salt.utils.ssdp.datetime.datetime.now', MagicMock(return_value=ahead_dt)), \ patch('salt.utils.ssdp.datetime.datetime.fromtimestamp', MagicMock(return_value=curnt_dt)), \ patch('salt.utils.ssdp.datetime.timedelta', MagicMock(return_value=delta)): factory.datagram_received(data=data, addr=addr) assert factory.log.debug.called assert not factory.disable_hidden assert not factory._sendto.called assert 'Received outdated package' in factory.log.debug.call_args[0][0] def test_datagram_signature_outdated_timestamp_reply(self): ''' Test if datagram processing reacts on outdated message (more than 20 seconds). Reply mode. :return: ''' factory = self.get_ssdp_factory() factory.disable_hidden = True signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE] data = '{}{}'.format(signature, '1516623820') addr = '10.10.10.10', 'foo.suse.de' ahead_dt = datetime.datetime.fromtimestamp(1516623841) curnt_dt = datetime.datetime.fromtimestamp(1516623820) delta = datetime.timedelta(0, 20) with patch.object(factory, 'log', MagicMock()), patch.object(factory, '_sendto'), \ patch('salt.utils.ssdp.datetime.datetime', MagicMock()), \ patch('salt.utils.ssdp.datetime.datetime.now', MagicMock(return_value=ahead_dt)), \ patch('salt.utils.ssdp.datetime.datetime.fromtimestamp', MagicMock(return_value=curnt_dt)), \ patch('salt.utils.ssdp.datetime.timedelta', MagicMock(return_value=delta)): factory.datagram_received(data=data, addr=addr) assert factory.log.debug.called assert factory.disable_hidden assert factory._sendto.called assert factory._sendto.call_args[0][0] == '{}:E:Timestamp is too old'.format(signature) assert 'Received outdated package' in factory.log.debug.call_args[0][0] def test_datagram_signature_correct_timestamp_reply(self): ''' Test if datagram processing sends out correct reply within 20 seconds. :return: ''' factory = self.get_ssdp_factory() factory.disable_hidden = True signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE] data = '{}{}'.format(signature, '1516623820') addr = '10.10.10.10', 'foo.suse.de' ahead_dt = datetime.datetime.fromtimestamp(1516623840) curnt_dt = datetime.datetime.fromtimestamp(1516623820) delta = datetime.timedelta(0, 20) with patch.object(factory, 'log', MagicMock()), patch.object(factory, '_sendto'), \ patch('salt.utils.ssdp.datetime.datetime', MagicMock()), \ patch('salt.utils.ssdp.datetime.datetime.now', MagicMock(return_value=ahead_dt)), \ patch('salt.utils.ssdp.datetime.datetime.fromtimestamp', MagicMock(return_value=curnt_dt)), \ patch('salt.utils.ssdp.datetime.timedelta', MagicMock(return_value=delta)): factory.datagram_received(data=data, addr=addr) assert factory.log.debug.called assert factory.disable_hidden assert factory._sendto.called assert factory._sendto.call_args[0][0] == salt.utils.stringutils.to_bytes("{}:@:{{}}".format(signature)) assert 'Received "%s" from %s:%s' in factory.log.debug.call_args[0][0] @skipIf(pytest is None, 'PyTest is missing') class SSDPServerTestCase(TestCase, Mocks): ''' Server-related test cases ''' def test_config_detached(self): ''' Test if configuration is not a reference. :return: ''' old_ip = '10.10.10.10' new_ip = '20.20.20.20' config = {'answer': {'master': old_ip}} with patch('salt.utils.ssdp.SSDPDiscoveryServer.get_self_ip', MagicMock(return_value=new_ip)): srv = ssdp.SSDPDiscoveryServer(**config) assert srv._config['answer']['master'] == new_ip assert config['answer']['master'] == old_ip def test_run(self): ''' Test server runner. :return: ''' with patch('salt.utils.ssdp.SSDPFactory', MagicMock()): config = {'answer': {'master': '10.10.10.10'}, ssdp.SSDPBase.LISTEN_IP: '10.10.10.10', ssdp.SSDPBase.PORT: 12345} srv = self.get_ssdp_discovery_server(**config) srv.create_datagram_endpoint = MagicMock() srv.log = MagicMock() trnsp = MagicMock() proto = MagicMock() loop = MagicMock() loop.run_until_complete = MagicMock(return_value=(trnsp, proto)) io = MagicMock() io.ported = False io.get_event_loop = MagicMock(return_value=loop) with patch('salt.utils.ssdp.asyncio', io): srv.run() cde_args = io.get_event_loop().create_datagram_endpoint.call_args[1] cfg_ip_addr, cfg_port = cde_args['local_addr'] assert io.get_event_loop.called assert io.get_event_loop().run_until_complete.called assert io.get_event_loop().create_datagram_endpoint.called assert io.get_event_loop().run_forever.called assert trnsp.close.called assert loop.close.called assert srv.log.info.called assert srv.log.info.call_args[0][0] == 'Stopping service discovery listener.' assert 'allow_broadcast' in cde_args assert cde_args['allow_broadcast'] assert 'local_addr' in cde_args assert not cfg_ip_addr == ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.LISTEN_IP] and cfg_ip_addr == '10.10.10.10' assert not cfg_port == ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.PORT] and cfg_port == 12345 @skipIf(pytest is None, 'PyTest is missing') class SSDPClientTestCase(TestCase, Mocks): ''' Client-related test cases ''' class Resource(object): ''' Fake network reader ''' def __init__(self): self.pool = [('some', '10.10.10.10'), ('data', '20.20.20.20'), ('data', '10.10.10.10'), (None, None)] def read(self, *args, **kwargs): return self.pool.pop(0) def test_config_passed(self): ''' Test if the configuration is passed. :return: ''' config = {ssdp.SSDPBase.SIGNATURE: 'SUSE Enterprise Server', ssdp.SSDPBase.TIMEOUT: 5, ssdp.SSDPBase.PORT: 12345} clnt = self.get_ssdp_discovery_client(**config) assert clnt._config[ssdp.SSDPBase.SIGNATURE] == config[ssdp.SSDPBase.SIGNATURE] assert clnt._config[ssdp.SSDPBase.PORT] == config[ssdp.SSDPBase.PORT] assert clnt._config[ssdp.SSDPBase.TIMEOUT] == config[ssdp.SSDPBase.TIMEOUT] def test_config_detached(self): ''' Test if the passed configuration is not a reference. :return: ''' config = {ssdp.SSDPBase.SIGNATURE: 'SUSE Enterprise Server', } clnt = self.get_ssdp_discovery_client(**config) clnt._config['foo'] = 'bar' assert 'foo' in clnt._config assert 'foo' not in config def test_query(self): ''' Test if client queries the broadcast :return: ''' config = {ssdp.SSDPBase.SIGNATURE: 'SUSE Enterprise Server', ssdp.SSDPBase.PORT: 4000} f_time = 1111 _socket = MagicMock() with patch('salt.utils.ssdp.socket', _socket),\ patch('salt.utils.ssdp.time.time', MagicMock(return_value=f_time)): clnt = ssdp.SSDPDiscoveryClient(**config) clnt._query() assert clnt._socket.sendto.called message, target = clnt._socket.sendto.call_args[0] assert message == salt.utils.stringutils.to_bytes( '{}{}'.format(config[ssdp.SSDPBase.SIGNATURE], f_time) ) assert target[0] == '' assert target[1] == config[ssdp.SSDPBase.PORT] def test_get_masters_map(self): ''' Test getting map of the available masters on the network :return: ''' _socket = MagicMock() response = {} with patch('salt.utils.ssdp.socket', _socket): clnt = ssdp.SSDPDiscoveryClient() clnt._socket.recvfrom = SSDPClientTestCase.Resource().read clnt.log = MagicMock() clnt._collect_masters_map(response=response) assert '10.10.10.10' in response assert '20.20.20.20' in response assert response['10.10.10.10'] == ['some', 'data'] assert response['20.20.20.20'] == ['data'] def test_get_masters_map_error_handling(self): ''' Test getting map handles timeout network exception :return: ''' _socket = MagicMock() response = {} error_msg = 'fake testing timeout just had happened' with patch('salt.utils.ssdp.socket', _socket): clnt = ssdp.SSDPDiscoveryClient() clnt._socket.recvfrom = MagicMock(side_effect=Exception(error_msg)) clnt.log = MagicMock() clnt._collect_masters_map(response=response) assert clnt.log.error.called assert 'Discovery master collection failure' in clnt.log.error.call_args[0][0] assert error_msg == six.text_type(clnt.log.error.call_args[0][1]) assert not response def test_discover_no_masters(self): ''' Test discover available master on the network (none found). :return: ''' clnt = self.get_ssdp_discovery_client() clnt._query = MagicMock() clnt._collect_masters_map = MagicMock() clnt.log = MagicMock() clnt.discover() assert clnt.log.info.called assert clnt.log.info.call_args[0][0] == 'No master has been discovered.' def test_discover_general_error(self): ''' Test discover available master on the network (erroneous found) :return: ''' _socket = MagicMock() error = 'Admins on strike due to broken coffee machine' signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE] fake_resource = SSDPClientTestCase.Resource() fake_resource.pool = [('{}:E:{}'.format(signature, error), '10.10.10.10'), (None, None)] with patch('salt.utils.ssdp.socket', _socket): clnt = ssdp.SSDPDiscoveryClient() clnt._socket.recvfrom = fake_resource.read clnt._query = MagicMock() clnt.log = MagicMock() clnt.discover() assert len(clnt.log.error.mock_calls) == 1 assert 'Error response from the service publisher' in clnt.log.error.call_args[0][0] assert '10.10.10.10' == clnt.log.error.call_args[0][1] assert clnt.log.error.call_args[1] == {} assert clnt.log.error.call_args[0][2] == error def test_discover_timestamp_error(self): ''' Test discover available master on the network (outdated timestamp) :return: ''' _socket = MagicMock() error = 'We only support a 1200 bps connection. Routing timestamp problems on neural net.' signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE] fake_resource = SSDPClientTestCase.Resource() fake_resource.pool = [('{}:E:{}'.format(signature, error), '10.10.10.10'), (None, None)] with patch('salt.utils.ssdp.socket', _socket): clnt = ssdp.SSDPDiscoveryClient() clnt._socket.recvfrom = fake_resource.read clnt._query = MagicMock() clnt.log = MagicMock() clnt.discover() assert len(clnt.log.error.mock_calls) == 2 assert 'Error response from the service publisher' in clnt.log.error.mock_calls[0][1][0] assert clnt.log.error.mock_calls[0][1][2] == error assert clnt.log.error.mock_calls[0][2] == {} assert 'Publisher sent shifted timestamp' in clnt.log.error.mock_calls[1][1][0] assert clnt.log.error.mock_calls[1][1][1] == clnt.log.error.mock_calls[0][1][1] == '10.10.10.10'