123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- # -*- coding: utf-8 -*-
- """
- integration tests for nilirt_ip
- """
- from __future__ import absolute_import, print_function, unicode_literals
- import re
- import shutil
- import time
- import salt.modules.nilrt_ip as ip
- import salt.utils.files
- import salt.utils.platform
- from salt.ext import six
- from salt.ext.six.moves import configparser
- from tests.support.case import ModuleCase
- from tests.support.helpers import (
- destructiveTest,
- requires_system_grains,
- runs_on,
- skip_if_not_root,
- )
- from tests.support.unit import skipIf
- try:
- import pyiface
- from pyiface.ifreqioctls import IFF_LOOPBACK, IFF_RUNNING
- except ImportError:
- pyiface = None
- try:
- from requests.structures import CaseInsensitiveDict
- except ImportError:
- CaseInsensitiveDict = None
- @skip_if_not_root
- @destructiveTest
- @skipIf(not pyiface, "The python pyiface package is not installed")
- @skipIf(not CaseInsensitiveDict, "The python package requests is not installed")
- @runs_on(os_family="NILinuxRT", reason="Tests applicable only to NILinuxRT")
- class NilrtIpModuleTest(ModuleCase):
- """
- Validate the nilrt_ip module
- """
- @requires_system_grains
- @classmethod
- def setUpClass(cls, grains): # pylint: disable=arguments-differ
- cls.initialState = {}
- cls.grains = grains
- @classmethod
- def tearDownClass(cls):
- cls.initialState = cls.grains = None
- @staticmethod
- def setup_loader_modules():
- """
- Setup loader modules
- """
- return {ip: {}}
- def setUp(self):
- """
- Get current settings
- """
- # save files from var/lib/connman*
- super(NilrtIpModuleTest, self).setUp()
- if self.grains["lsb_distrib_id"] == "nilrt":
- shutil.move("/etc/natinst/share/ni-rt.ini", "/tmp/ni-rt.ini")
- else:
- shutil.move("/var/lib/connman", "/tmp/connman")
- def tearDown(self):
- """
- Reset to original settings
- """
- # restore files
- if self.grains["lsb_distrib_id"] == "nilrt":
- shutil.move("/tmp/ni-rt.ini", "/etc/natinst/share/ni-rt.ini")
- self.run_function("cmd.run", ["/etc/init.d/networking restart"])
- else:
- shutil.move("/tmp/connman", "/var/lib/connman")
- self.run_function("service.restart", ["connman"])
- time.sleep(10) # wait 10 seconds for connman to be fully loaded
- interfaces = self.__interfaces()
- for interface in interfaces:
- self.run_function("ip.up", [interface.name])
- @staticmethod
- def __connected(interface):
- """
- Check if an interface is up or down
- :param interface: pyiface.Interface object
- :return: True, if interface is up, otherwise False.
- """
- return interface.flags & IFF_RUNNING != 0
- @staticmethod
- def __interfaces():
- """
- Return the list of all interfaces without loopback
- """
- return [
- interface
- for interface in pyiface.getIfaces()
- if interface.flags & IFF_LOOPBACK == 0
- ]
- def __check_ethercat(self):
- """
- Check if ethercat is installed.
- :return: True if ethercat is installed, otherwise False.
- """
- if self.grains["lsb_distrib_id"] != "nilrt":
- return False
- with salt.utils.files.fopen("/etc/natinst/share/ni-rt.ini", "r") as config_file:
- config_parser = configparser.RawConfigParser(dict_type=CaseInsensitiveDict)
- config_parser.readfp(config_file)
- if six.PY2:
- if (
- config_parser.has_option("lvrt", "AdditionalNetworkProtocols")
- and "ethercat"
- in config_parser.get("lvrt", "AdditionalNetworkProtocols").lower()
- ):
- return True
- return False
- else:
- return (
- "ethercat"
- in config_parser.get(
- "lvrt", "AdditionalNetworkProtocols", fallback=""
- ).lower()
- )
- def test_down(self):
- """
- Test ip.down function
- """
- interfaces = self.__interfaces()
- for interface in interfaces:
- result = self.run_function("ip.down", [interface.name])
- self.assertTrue(result)
- info = self.run_function("ip.get_interfaces_details", timeout=300)
- for interface in info["interfaces"]:
- self.assertEqual(interface["adapter_mode"], "disabled")
- self.assertFalse(
- self.__connected(pyiface.Interface(name=interface["connectionid"]))
- )
- def test_up(self):
- """
- Test ip.up function
- """
- interfaces = self.__interfaces()
- # first down all interfaces
- for interface in interfaces:
- self.run_function("ip.down", [interface.name])
- self.assertFalse(self.__connected(interface))
- # up interfaces
- for interface in interfaces:
- result = self.run_function("ip.up", [interface.name])
- self.assertTrue(result)
- info = self.run_function("ip.get_interfaces_details", timeout=300)
- for interface in info["interfaces"]:
- self.assertEqual(interface["adapter_mode"], "tcpip")
- def test_set_dhcp_linklocal_all(self):
- """
- Test ip.set_dhcp_linklocal_all function
- """
- interfaces = self.__interfaces()
- for interface in interfaces:
- result = self.run_function("ip.set_dhcp_linklocal_all", [interface.name])
- self.assertTrue(result)
- info = self.run_function("ip.get_interfaces_details", timeout=300)
- for interface in info["interfaces"]:
- self.assertEqual(interface["ipv4"]["requestmode"], "dhcp_linklocal")
- self.assertEqual(interface["adapter_mode"], "tcpip")
- def test_set_dhcp_only_all(self):
- """
- Test ip.set_dhcp_only_all function
- """
- if self.grains["lsb_distrib_id"] != "nilrt":
- self.skipTest("Test not applicable to newer nilrt")
- interfaces = self.__interfaces()
- for interface in interfaces:
- result = self.run_function("ip.set_dhcp_only_all", [interface.name])
- self.assertTrue(result)
- info = self.run_function("ip.get_interfaces_details", timeout=300)
- for interface in info["interfaces"]:
- self.assertEqual(interface["ipv4"]["requestmode"], "dhcp_only")
- self.assertEqual(interface["adapter_mode"], "tcpip")
- def test_set_linklocal_only_all(self):
- """
- Test ip.set_linklocal_only_all function
- """
- if self.grains["lsb_distrib_id"] != "nilrt":
- self.skipTest("Test not applicable to newer nilrt")
- interfaces = self.__interfaces()
- for interface in interfaces:
- result = self.run_function("ip.set_linklocal_only_all", [interface.name])
- self.assertTrue(result)
- info = self.run_function("ip.get_interfaces_details", timeout=300)
- for interface in info["interfaces"]:
- self.assertEqual(interface["ipv4"]["requestmode"], "linklocal_only")
- self.assertEqual(interface["adapter_mode"], "tcpip")
- def test_static_all(self):
- """
- Test ip.set_static_all function
- """
- interfaces = self.__interfaces()
- for interface in interfaces:
- result = self.run_function(
- "ip.set_static_all",
- [
- interface.name,
- "192.168.10.4",
- "255.255.255.0",
- "192.168.10.1",
- "8.8.4.4 8.8.8.8",
- ],
- )
- self.assertTrue(result)
- info = self.run_function("ip.get_interfaces_details", timeout=300)
- for interface in info["interfaces"]:
- self.assertEqual(interface["adapter_mode"], "tcpip")
- if self.grains["lsb_distrib_id"] != "nilrt":
- self.assertIn("8.8.4.4", interface["ipv4"]["dns"])
- self.assertIn("8.8.8.8", interface["ipv4"]["dns"])
- else:
- self.assertEqual(interface["ipv4"]["dns"], ["8.8.4.4"])
- self.assertEqual(interface["ipv4"]["requestmode"], "static")
- self.assertEqual(interface["ipv4"]["address"], "192.168.10.4")
- self.assertEqual(interface["ipv4"]["netmask"], "255.255.255.0")
- self.assertEqual(interface["ipv4"]["gateway"], "192.168.10.1")
- def test_supported_adapter_modes(self):
- """
- Test supported adapter modes for each interface
- """
- interface_pattern = re.compile("^eth[0-9]+$")
- info = self.run_function("ip.get_interfaces_details", timeout=300)
- for interface in info["interfaces"]:
- if interface["connectionid"] == "eth0":
- self.assertEqual(interface["supported_adapter_modes"], ["tcpip"])
- else:
- self.assertIn("tcpip", interface["supported_adapter_modes"])
- if not interface_pattern.match(interface["connectionid"]):
- self.assertNotIn("ethercat", interface["supported_adapter_modes"])
- elif self.__check_ethercat():
- self.assertIn("ethercat", interface["supported_adapter_modes"])
- def test_ethercat(self):
- """
- Test ip.set_ethercat function
- """
- if not self.__check_ethercat():
- self.skipTest("Test is just for systems with Ethercat")
- self.assertTrue(self.run_function("ip.set_ethercat", ["eth1", 19]))
- info = self.run_function("ip.get_interfaces_details", timeout=300)
- for interface in info["interfaces"]:
- if interface["connectionid"] == "eth1":
- self.assertEqual(interface["adapter_mode"], "ethercat")
- self.assertEqual(int(interface["ethercat"]["masterid"]), 19)
- break
- self.assertTrue(self.run_function("ip.set_dhcp_linklocal_all", ["eth1"]))
- info = self.run_function("ip.get_interfaces_details", timeout=300)
- for interface in info["interfaces"]:
- if interface["connectionid"] == "eth1":
- self.assertEqual(interface["adapter_mode"], "tcpip")
- self.assertEqual(interface["ipv4"]["requestmode"], "dhcp_linklocal")
- break
|