123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650 |
- # -*- coding: utf-8 -*-
- """
- :codeauthor: :email:`Bo Maryniuk <bo@suse.de>`
- """
- from __future__ import absolute_import, print_function, unicode_literals
- import datetime
- import salt.utils.ssdp as ssdp
- import salt.utils.stringutils
- from salt.ext import six
- from salt.ext.six.moves import zip
- from tests.support.mock import MagicMock, patch
- from tests.support.unit import TestCase, skipIf
- try:
- import pytest
- except ImportError:
- pytest = None
- class Mocks(object):
- def get_socket_mock(self, expected_ip, expected_hostname):
- """
- Get a mock of a socket
- :return:
- """
- sck = MagicMock()
- sck.getsockname = MagicMock(return_value=(expected_ip, 123456))
- sock_mock = MagicMock()
- sock_mock.socket = MagicMock(return_value=sck)
- sock_mock.gethostname = MagicMock(return_value=expected_hostname)
- sock_mock.gethostbyname = MagicMock(return_value=expected_ip)
- return sock_mock
- def get_ssdp_factory(self, expected_ip=None, expected_hostname=None, **config):
- if expected_ip is None:
- expected_ip = "127.0.0.1"
- if expected_hostname is None:
- expected_hostname = "localhost"
- sock_mock = self.get_socket_mock(expected_ip, expected_hostname)
- with patch("salt.utils.ssdp.socket", sock_mock):
- factory = ssdp.SSDPFactory(**config)
- return factory
- def get_ssdp_discovery_client(
- self, expected_ip=None, expected_hostname=None, **config
- ):
- if expected_ip is None:
- expected_ip = "127.0.0.1"
- if expected_hostname is None:
- expected_hostname = "localhost"
- sock_mock = self.get_socket_mock(expected_ip, expected_hostname)
- with patch("salt.utils.ssdp.socket", sock_mock):
- factory = ssdp.SSDPDiscoveryClient(**config)
- return factory
- def get_ssdp_discovery_server(
- self, expected_ip=None, expected_hostname=None, **config
- ):
- if expected_ip is None:
- expected_ip = "127.0.0.1"
- if expected_hostname is None:
- expected_hostname = "localhost"
- sock_mock = self.get_socket_mock(expected_ip, expected_hostname)
- with patch("salt.utils.ssdp.socket", sock_mock):
- factory = ssdp.SSDPDiscoveryServer(**config)
- return factory
- @skipIf(pytest is None, "PyTest is missing")
- class SSDPBaseTestCase(TestCase, Mocks):
- """
- TestCase for SSDP-related parts.
- """
- @staticmethod
- def exception_generic(*args, **kwargs):
- """
- Side effect
- :return:
- """
- raise Exception("some network error")
- @staticmethod
- def exception_attr_error(*args, **kwargs):
- """
- Side effect
- :return:
- """
- raise AttributeError("attribute error: {0}. {1}".format(args, kwargs))
- @patch("salt.utils.ssdp._json", None)
- @patch("salt.utils.ssdp.asyncio", None)
- def test_base_avail(self):
- """
- Test SSDP base class availability method.
- :return:
- """
- base = ssdp.SSDPBase()
- assert not base._is_available()
- with patch("salt.utils.ssdp._json", True):
- assert not base._is_available()
- with patch("salt.utils.ssdp.asyncio", True):
- assert not base._is_available()
- with patch("salt.utils.ssdp._json", True), patch(
- "salt.utils.ssdp.asyncio", True
- ):
- assert base._is_available()
- def test_base_protocol_settings(self):
- """
- Tests default constants data.
- :return:
- """
- base = ssdp.SSDPBase()
- v_keys = ["signature", "answer", "port", "listen_ip", "timeout"]
- v_vals = ["__salt_master_service", {}, 4520, "0.0.0.0", 3]
- for key in v_keys:
- assert key in base.DEFAULTS
- for key in base.DEFAULTS:
- assert key in v_keys
- for key, value in zip(v_keys, v_vals):
- assert base.DEFAULTS[key] == value
- def test_base_self_ip(self):
- """
- Test getting self IP method.
- :return:
- """
- base = ssdp.SSDPBase()
- expected_ip = "192.168.1.10"
- expected_host = "oxygen"
- sock_mock = self.get_socket_mock(expected_ip, expected_host)
- with patch("salt.utils.ssdp.socket", sock_mock):
- assert base.get_self_ip() == expected_ip
- sock_mock.socket().getsockname.side_effect = SSDPBaseTestCase.exception_generic
- with patch("salt.utils.ssdp.socket", sock_mock):
- assert base.get_self_ip() == expected_ip
- @skipIf(pytest is None, "PyTest is missing")
- class SSDPFactoryTestCase(TestCase, Mocks):
- """
- Test socket protocol
- """
- def test_attr_check(self):
- """
- Tests attributes are set to the base class
- :return:
- """
- config = {
- ssdp.SSDPBase.SIGNATURE: "-signature-",
- ssdp.SSDPBase.ANSWER: {"this-is": "the-answer"},
- }
- expected_ip = "10.10.10.10"
- factory = self.get_ssdp_factory(expected_ip=expected_ip, **config)
- for attr in [ssdp.SSDPBase.SIGNATURE, ssdp.SSDPBase.ANSWER]:
- assert hasattr(factory, attr)
- assert getattr(factory, attr) == config[attr]
- assert not factory.disable_hidden
- assert factory.my_ip == expected_ip
- def test_transport_sendto_success(self):
- """
- Test transport send_to.
- :return:
- """
- transport = MagicMock()
- log = MagicMock()
- factory = self.get_ssdp_factory()
- with patch.object(factory, "transport", transport), patch.object(
- factory, "log", log
- ):
- data = {"some": "data"}
- addr = "10.10.10.10"
- factory._sendto(data=data, addr=addr)
- assert factory.transport.sendto.called
- assert factory.transport.sendto.mock_calls[0][1][0]["some"] == "data"
- assert factory.transport.sendto.mock_calls[0][2]["addr"] == "10.10.10.10"
- assert factory.log.debug.called
- assert factory.log.debug.mock_calls[0][1][0] == "Sent successfully"
- def test_transport_sendto_retry(self):
- """
- Test transport send_to.
- :return:
- """
- with patch("salt.utils.ssdp.time.sleep", MagicMock()):
- transport = MagicMock()
- transport.sendto = MagicMock(
- side_effect=SSDPBaseTestCase.exception_attr_error
- )
- log = MagicMock()
- factory = self.get_ssdp_factory()
- with patch.object(factory, "transport", transport), patch.object(
- factory, "log", log
- ):
- data = {"some": "data"}
- addr = "10.10.10.10"
- factory._sendto(data=data, addr=addr)
- assert factory.transport.sendto.called
- assert ssdp.time.sleep.called
- assert (
- ssdp.time.sleep.call_args[0][0] > 0
- and ssdp.time.sleep.call_args[0][0] < 0.5
- )
- assert factory.log.debug.called
- assert "Permission error" in factory.log.debug.mock_calls[0][1][0]
- def test_datagram_signature_bad(self):
- """
- Test datagram_received on bad signature
- :return:
- """
- factory = self.get_ssdp_factory()
- data = "nonsense"
- addr = "10.10.10.10", "foo.suse.de"
- with patch.object(factory, "log", MagicMock()):
- factory.datagram_received(data=data, addr=addr)
- assert factory.log.debug.called
- assert "Received bad signature from" in factory.log.debug.call_args[0][0]
- assert factory.log.debug.call_args[0][1] == addr[0]
- assert factory.log.debug.call_args[0][2] == addr[1]
- def test_datagram_signature_wrong_timestamp_quiet(self):
- """
- Test datagram receives a wrong timestamp (no reply).
- :return:
- """
- factory = self.get_ssdp_factory()
- data = "{}nonsense".format(ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE])
- addr = "10.10.10.10", "foo.suse.de"
- with patch.object(factory, "log", MagicMock()), patch.object(
- factory, "_sendto", MagicMock()
- ):
- factory.datagram_received(data=data, addr=addr)
- assert factory.log.debug.called
- assert (
- "Received invalid timestamp in package"
- in factory.log.debug.call_args[0][0]
- )
- assert not factory._sendto.called
- def test_datagram_signature_wrong_timestamp_reply(self):
- """
- Test datagram receives a wrong timestamp.
- :return:
- """
- factory = self.get_ssdp_factory()
- factory.disable_hidden = True
- signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]
- data = "{}nonsense".format(signature)
- addr = "10.10.10.10", "foo.suse.de"
- with patch.object(factory, "log", MagicMock()), patch.object(
- factory, "_sendto", MagicMock()
- ):
- factory.datagram_received(data=data, addr=addr)
- assert factory.log.debug.called
- assert (
- "Received invalid timestamp in package"
- in factory.log.debug.call_args[0][0]
- )
- assert factory._sendto.called
- assert (
- "{}:E:Invalid timestamp".format(signature)
- == factory._sendto.call_args[0][0]
- )
- def test_datagram_signature_outdated_timestamp_quiet(self):
- """
- Test if datagram processing reacts on outdated message (more than 20 seconds). Quiet mode.
- :return:
- """
- factory = self.get_ssdp_factory()
- signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]
- data = "{}{}".format(signature, "1516623820")
- addr = "10.10.10.10", "foo.suse.de"
- ahead_dt = datetime.datetime.fromtimestamp(1516623841)
- curnt_dt = datetime.datetime.fromtimestamp(1516623820)
- delta = datetime.timedelta(0, 20)
- with patch.object(factory, "log", MagicMock()), patch.object(
- factory, "_sendto"
- ), patch("salt.utils.ssdp.datetime.datetime", MagicMock()), patch(
- "salt.utils.ssdp.datetime.datetime.now", MagicMock(return_value=ahead_dt)
- ), patch(
- "salt.utils.ssdp.datetime.datetime.fromtimestamp",
- MagicMock(return_value=curnt_dt),
- ), patch(
- "salt.utils.ssdp.datetime.timedelta", MagicMock(return_value=delta)
- ):
- factory.datagram_received(data=data, addr=addr)
- assert factory.log.debug.called
- assert not factory.disable_hidden
- assert not factory._sendto.called
- assert "Received outdated package" in factory.log.debug.call_args[0][0]
- def test_datagram_signature_outdated_timestamp_reply(self):
- """
- Test if datagram processing reacts on outdated message (more than 20 seconds). Reply mode.
- :return:
- """
- factory = self.get_ssdp_factory()
- factory.disable_hidden = True
- signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]
- data = "{}{}".format(signature, "1516623820")
- addr = "10.10.10.10", "foo.suse.de"
- ahead_dt = datetime.datetime.fromtimestamp(1516623841)
- curnt_dt = datetime.datetime.fromtimestamp(1516623820)
- delta = datetime.timedelta(0, 20)
- with patch.object(factory, "log", MagicMock()), patch.object(
- factory, "_sendto"
- ), patch("salt.utils.ssdp.datetime.datetime", MagicMock()), patch(
- "salt.utils.ssdp.datetime.datetime.now", MagicMock(return_value=ahead_dt)
- ), patch(
- "salt.utils.ssdp.datetime.datetime.fromtimestamp",
- MagicMock(return_value=curnt_dt),
- ), patch(
- "salt.utils.ssdp.datetime.timedelta", MagicMock(return_value=delta)
- ):
- factory.datagram_received(data=data, addr=addr)
- assert factory.log.debug.called
- assert factory.disable_hidden
- assert factory._sendto.called
- assert factory._sendto.call_args[0][
- 0
- ] == "{}:E:Timestamp is too old".format(signature)
- assert "Received outdated package" in factory.log.debug.call_args[0][0]
- def test_datagram_signature_correct_timestamp_reply(self):
- """
- Test if datagram processing sends out correct reply within 20 seconds.
- :return:
- """
- factory = self.get_ssdp_factory()
- factory.disable_hidden = True
- signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]
- data = "{}{}".format(signature, "1516623820")
- addr = "10.10.10.10", "foo.suse.de"
- ahead_dt = datetime.datetime.fromtimestamp(1516623840)
- curnt_dt = datetime.datetime.fromtimestamp(1516623820)
- delta = datetime.timedelta(0, 20)
- with patch.object(factory, "log", MagicMock()), patch.object(
- factory, "_sendto"
- ), patch("salt.utils.ssdp.datetime.datetime", MagicMock()), patch(
- "salt.utils.ssdp.datetime.datetime.now", MagicMock(return_value=ahead_dt)
- ), patch(
- "salt.utils.ssdp.datetime.datetime.fromtimestamp",
- MagicMock(return_value=curnt_dt),
- ), patch(
- "salt.utils.ssdp.datetime.timedelta", MagicMock(return_value=delta)
- ):
- factory.datagram_received(data=data, addr=addr)
- assert factory.log.debug.called
- assert factory.disable_hidden
- assert factory._sendto.called
- assert factory._sendto.call_args[0][0] == salt.utils.stringutils.to_bytes(
- "{}:@:{{}}".format(signature)
- )
- assert 'Received "%s" from %s:%s' in factory.log.debug.call_args[0][0]
- @skipIf(pytest is None, "PyTest is missing")
- class SSDPServerTestCase(TestCase, Mocks):
- """
- Server-related test cases
- """
- def test_config_detached(self):
- """
- Test if configuration is not a reference.
- :return:
- """
- old_ip = "10.10.10.10"
- new_ip = "20.20.20.20"
- config = {"answer": {"master": old_ip}}
- with patch(
- "salt.utils.ssdp.SSDPDiscoveryServer.get_self_ip",
- MagicMock(return_value=new_ip),
- ):
- srv = ssdp.SSDPDiscoveryServer(**config)
- assert srv._config["answer"]["master"] == new_ip
- assert config["answer"]["master"] == old_ip
- def test_run(self):
- """
- Test server runner.
- :return:
- """
- with patch("salt.utils.ssdp.SSDPFactory", MagicMock()):
- config = {
- "answer": {"master": "10.10.10.10"},
- ssdp.SSDPBase.LISTEN_IP: "10.10.10.10",
- ssdp.SSDPBase.PORT: 12345,
- }
- srv = self.get_ssdp_discovery_server(**config)
- srv.create_datagram_endpoint = MagicMock()
- srv.log = MagicMock()
- trnsp = MagicMock()
- proto = MagicMock()
- loop = MagicMock()
- loop.run_until_complete = MagicMock(return_value=(trnsp, proto))
- io = MagicMock()
- io.ported = False
- io.get_event_loop = MagicMock(return_value=loop)
- with patch("salt.utils.ssdp.asyncio", io):
- srv.run()
- cde_args = io.get_event_loop().create_datagram_endpoint.call_args[1]
- cfg_ip_addr, cfg_port = cde_args["local_addr"]
- assert io.get_event_loop.called
- assert io.get_event_loop().run_until_complete.called
- assert io.get_event_loop().create_datagram_endpoint.called
- assert io.get_event_loop().run_forever.called
- assert trnsp.close.called
- assert loop.close.called
- assert srv.log.info.called
- assert (
- srv.log.info.call_args[0][0]
- == "Stopping service discovery listener."
- )
- assert "allow_broadcast" in cde_args
- assert cde_args["allow_broadcast"]
- assert "local_addr" in cde_args
- assert (
- not cfg_ip_addr == ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.LISTEN_IP]
- and cfg_ip_addr == "10.10.10.10"
- )
- assert (
- not cfg_port == ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.PORT]
- and cfg_port == 12345
- )
- @skipIf(pytest is None, "PyTest is missing")
- class SSDPClientTestCase(TestCase, Mocks):
- """
- Client-related test cases
- """
- class Resource(object):
- """
- Fake network reader
- """
- def __init__(self):
- self.pool = [
- ("some", "10.10.10.10"),
- ("data", "20.20.20.20"),
- ("data", "10.10.10.10"),
- (None, None),
- ]
- def read(self, *args, **kwargs):
- return self.pool.pop(0)
- def test_config_passed(self):
- """
- Test if the configuration is passed.
- :return:
- """
- config = {
- ssdp.SSDPBase.SIGNATURE: "SUSE Enterprise Server",
- ssdp.SSDPBase.TIMEOUT: 5,
- ssdp.SSDPBase.PORT: 12345,
- }
- clnt = self.get_ssdp_discovery_client(**config)
- assert clnt._config[ssdp.SSDPBase.SIGNATURE] == config[ssdp.SSDPBase.SIGNATURE]
- assert clnt._config[ssdp.SSDPBase.PORT] == config[ssdp.SSDPBase.PORT]
- assert clnt._config[ssdp.SSDPBase.TIMEOUT] == config[ssdp.SSDPBase.TIMEOUT]
- def test_config_detached(self):
- """
- Test if the passed configuration is not a reference.
- :return:
- """
- config = {
- ssdp.SSDPBase.SIGNATURE: "SUSE Enterprise Server",
- }
- clnt = self.get_ssdp_discovery_client(**config)
- clnt._config["foo"] = "bar"
- assert "foo" in clnt._config
- assert "foo" not in config
- def test_query(self):
- """
- Test if client queries the broadcast
- :return:
- """
- config = {
- ssdp.SSDPBase.SIGNATURE: "SUSE Enterprise Server",
- ssdp.SSDPBase.PORT: 4000,
- }
- f_time = 1111
- _socket = MagicMock()
- with patch("salt.utils.ssdp.socket", _socket), patch(
- "salt.utils.ssdp.time.time", MagicMock(return_value=f_time)
- ):
- clnt = ssdp.SSDPDiscoveryClient(**config)
- clnt._query()
- assert clnt._socket.sendto.called
- message, target = clnt._socket.sendto.call_args[0]
- assert message == salt.utils.stringutils.to_bytes(
- "{}{}".format(config[ssdp.SSDPBase.SIGNATURE], f_time)
- )
- assert target[0] == "<broadcast>"
- assert target[1] == config[ssdp.SSDPBase.PORT]
- def test_get_masters_map(self):
- """
- Test getting map of the available masters on the network
- :return:
- """
- _socket = MagicMock()
- response = {}
- with patch("salt.utils.ssdp.socket", _socket):
- clnt = ssdp.SSDPDiscoveryClient()
- clnt._socket.recvfrom = SSDPClientTestCase.Resource().read
- clnt.log = MagicMock()
- clnt._collect_masters_map(response=response)
- assert "10.10.10.10" in response
- assert "20.20.20.20" in response
- assert response["10.10.10.10"] == ["some", "data"]
- assert response["20.20.20.20"] == ["data"]
- def test_get_masters_map_error_handling(self):
- """
- Test getting map handles timeout network exception
- :return:
- """
- _socket = MagicMock()
- response = {}
- error_msg = "fake testing timeout just had happened"
- with patch("salt.utils.ssdp.socket", _socket):
- clnt = ssdp.SSDPDiscoveryClient()
- clnt._socket.recvfrom = MagicMock(side_effect=Exception(error_msg))
- clnt.log = MagicMock()
- clnt._collect_masters_map(response=response)
- assert clnt.log.error.called
- assert (
- "Discovery master collection failure" in clnt.log.error.call_args[0][0]
- )
- assert error_msg == six.text_type(clnt.log.error.call_args[0][1])
- assert not response
- def test_discover_no_masters(self):
- """
- Test discover available master on the network (none found).
- :return:
- """
- clnt = self.get_ssdp_discovery_client()
- clnt._query = MagicMock()
- clnt._collect_masters_map = MagicMock()
- clnt.log = MagicMock()
- clnt.discover()
- assert clnt.log.info.called
- assert clnt.log.info.call_args[0][0] == "No master has been discovered."
- def test_discover_general_error(self):
- """
- Test discover available master on the network (erroneous found)
- :return:
- """
- _socket = MagicMock()
- error = "Admins on strike due to broken coffee machine"
- signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]
- fake_resource = SSDPClientTestCase.Resource()
- fake_resource.pool = [
- ("{}:E:{}".format(signature, error), "10.10.10.10"),
- (None, None),
- ]
- with patch("salt.utils.ssdp.socket", _socket):
- clnt = ssdp.SSDPDiscoveryClient()
- clnt._socket.recvfrom = fake_resource.read
- clnt._query = MagicMock()
- clnt.log = MagicMock()
- clnt.discover()
- assert len(clnt.log.error.mock_calls) == 1
- assert (
- "Error response from the service publisher"
- in clnt.log.error.call_args[0][0]
- )
- assert "10.10.10.10" == clnt.log.error.call_args[0][1]
- assert clnt.log.error.call_args[1] == {}
- assert clnt.log.error.call_args[0][2] == error
- def test_discover_timestamp_error(self):
- """
- Test discover available master on the network (outdated timestamp)
- :return:
- """
- _socket = MagicMock()
- error = "We only support a 1200 bps connection. Routing timestamp problems on neural net."
- signature = ssdp.SSDPBase.DEFAULTS[ssdp.SSDPBase.SIGNATURE]
- fake_resource = SSDPClientTestCase.Resource()
- fake_resource.pool = [
- ("{}:E:{}".format(signature, error), "10.10.10.10"),
- (None, None),
- ]
- with patch("salt.utils.ssdp.socket", _socket):
- clnt = ssdp.SSDPDiscoveryClient()
- clnt._socket.recvfrom = fake_resource.read
- clnt._query = MagicMock()
- clnt.log = MagicMock()
- clnt.discover()
- assert len(clnt.log.error.mock_calls) == 2
- assert (
- "Error response from the service publisher"
- in clnt.log.error.mock_calls[0][1][0]
- )
- assert clnt.log.error.mock_calls[0][1][2] == error
- assert clnt.log.error.mock_calls[0][2] == {}
- assert (
- "Publisher sent shifted timestamp" in clnt.log.error.mock_calls[1][1][0]
- )
- assert (
- clnt.log.error.mock_calls[1][1][1]
- == clnt.log.error.mock_calls[0][1][1]
- == "10.10.10.10"
- )
|