123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488 |
- # -*- coding: utf-8 -*-
- # This code assumes vboxapi.py from VirtualBox distribution
- # being in PYTHONPATH, or installed system-wide
- # Import Python Libs
- from __future__ import absolute_import, print_function, unicode_literals
- import os
- import logging
- import socket
- # Import Salt Testing Libs
- import tests.integration as integration
- from tests.support.unit import TestCase, skipIf
- from tests.integration.cloud.helpers.virtualbox import (VirtualboxTestCase,
- VirtualboxCloudTestCase,
- CONFIG_NAME,
- PROVIDER_NAME,
- PROFILE_NAME,
- BASE_BOX_NAME,
- INSTANCE_NAME,
- BOOTABLE_BASE_BOX_NAME,
- DEPLOY_PROFILE_NAME)
- # Import Salt Libs
- from salt.ext import six
- from salt.ext.six.moves import range
- from salt.config import cloud_providers_config, vm_profiles_config
- from salt.utils.virtualbox import (vb_xpcom_to_attribute_dict,
- vb_clone_vm,
- vb_destroy_machine,
- vb_create_machine,
- vb_get_box,
- vb_machine_exists,
- XPCOM_ATTRIBUTES,
- vb_start_vm,
- vb_stop_vm,
- vb_get_network_addresses,
- vb_wait_for_network_address,
- machine_get_machinestate_str,
- HAS_LIBS)
- log = logging.getLogger(__name__)
- # As described in the documentation of list_nodes (this may change with time)
- MINIMAL_MACHINE_ATTRIBUTES = [
- "id",
- "image",
- "size",
- "state",
- "private_ips",
- "public_ips",
- ]
- class VirtualboxProviderTest(VirtualboxCloudTestCase):
- """
- Integration tests for the Virtualbox cloud provider using the Virtualbox driver
- """
- def run_cloud_destroy(self, machine_name):
- """
- Calls salt-cloud to destroy a machine and returns the destroyed machine object (should be None)
- @param machine_name:
- @type str:
- @return:
- @rtype: dict
- """
- output = self.run_cloud('-d {0} --assume-yes --log-level=debug'.format(machine_name))
- return output.get(CONFIG_NAME, {}).get(PROVIDER_NAME, {})
- def setUp(self):
- """
- Sets up the test requirements
- """
- super(VirtualboxProviderTest, self).setUp()
- # check if appropriate cloud provider and profile files are present
- profile_str = 'virtualbox-config'
- providers = self.run_cloud('--list-providers')
- log.debug("providers: %s", providers)
- if profile_str not in providers:
- self.skipTest(
- 'Configuration file for {0} was not found. Check {0}.conf files '
- 'in tests/integration/files/conf/cloud.*.d/ to run these tests.'.format(PROVIDER_NAME)
- )
- # check if personal access token, ssh_key_file, and ssh_key_names are present
- config_path = os.path.join(
- integration.FILES,
- 'conf',
- 'cloud.providers.d',
- PROVIDER_NAME + '.conf'
- )
- log.debug("config_path: %s", config_path)
- providers = cloud_providers_config(config_path)
- log.debug("config: %s", providers)
- config_path = os.path.join(
- integration.FILES,
- 'conf',
- 'cloud.profiles.d',
- PROVIDER_NAME + '.conf'
- )
- profiles = vm_profiles_config(config_path, providers)
- profile = profiles.get(PROFILE_NAME)
- if not profile:
- self.skipTest(
- 'Profile {0} was not found. Check {1}.conf files '
- 'in tests/integration/files/conf/cloud.profiles.d/ to run these tests.'.format(PROFILE_NAME,
- PROVIDER_NAME)
- )
- base_box_name = profile.get("clonefrom")
- if base_box_name != BASE_BOX_NAME:
- self.skipTest(
- 'Profile {0} does not have a base box to clone from. Check {1}.conf files '
- 'in tests/integration/files/conf/cloud.profiles.d/ to run these tests.'
- 'And add a "clone_from: {2}" to the profile'.format(PROFILE_NAME, PROVIDER_NAME, BASE_BOX_NAME)
- )
- @classmethod
- def setUpClass(cls):
- vb_create_machine(BASE_BOX_NAME)
- @classmethod
- def tearDownClass(cls):
- vb_destroy_machine(BASE_BOX_NAME)
- def test_cloud_create(self):
- """
- Simply create a machine and make sure it was created
- """
- machines = self.run_cloud('-p {0} {1} --log-level=debug'.format(PROFILE_NAME, INSTANCE_NAME))
- self.assertIn(INSTANCE_NAME, machines.keys())
- def test_cloud_list(self):
- """
- List all machines in virtualbox and make sure the requested attributes are included
- """
- machines = self.run_cloud_function('list_nodes')
- expected_attributes = MINIMAL_MACHINE_ATTRIBUTES
- names = machines.keys()
- self.assertGreaterEqual(len(names), 1, "No machines found")
- for name, machine in six.iteritems(machines):
- if six.PY3:
- self.assertCountEqual(expected_attributes, machine.keys())
- else:
- self.assertItemsEqual(expected_attributes, machine.keys())
- self.assertIn(BASE_BOX_NAME, names)
- def test_cloud_list_full(self):
- """
- List all machines and make sure full information in included
- """
- machines = self.run_cloud_function('list_nodes_full')
- expected_minimal_attribute_count = len(MINIMAL_MACHINE_ATTRIBUTES)
- names = machines.keys()
- self.assertGreaterEqual(len(names), 1, "No machines found")
- for name, machine in six.iteritems(machines):
- self.assertGreaterEqual(len(machine.keys()), expected_minimal_attribute_count)
- self.assertIn(BASE_BOX_NAME, names)
- def test_cloud_list_select(self):
- """
- List selected attributes of all machines
- """
- machines = self.run_cloud_function('list_nodes_select')
- # TODO find out how to get query.selection from the "cloud" config
- expected_attributes = ["id"]
- names = machines.keys()
- self.assertGreaterEqual(len(names), 1, "No machines found")
- for name, machine in six.iteritems(machines):
- if six.PY3:
- self.assertCountEqual(expected_attributes, machine.keys())
- else:
- self.assertItemsEqual(expected_attributes, machine.keys())
- self.assertIn(BASE_BOX_NAME, names)
- def test_cloud_destroy(self):
- """
- Test creating an instance on virtualbox with the virtualbox driver
- """
- # check if instance with salt installed returned
- self.test_cloud_create()
- ret = self.run_cloud_destroy(INSTANCE_NAME)
- # destroy the instance
- self.assertIn(INSTANCE_NAME, ret.keys())
- def test_function_show_instance(self):
- kw_function_args = {
- "image": BASE_BOX_NAME
- }
- machines = self.run_cloud_function('show_image', kw_function_args, timeout=30)
- expected_minimal_attribute_count = len(MINIMAL_MACHINE_ATTRIBUTES)
- self.assertIn(BASE_BOX_NAME, machines)
- machine = machines[BASE_BOX_NAME]
- self.assertGreaterEqual(len(machine.keys()), expected_minimal_attribute_count)
- def tearDown(self):
- """
- Clean up after tests
- """
- if vb_machine_exists(INSTANCE_NAME):
- vb_destroy_machine(INSTANCE_NAME)
- @skipIf(HAS_LIBS and vb_machine_exists(BOOTABLE_BASE_BOX_NAME) is False,
- "Bootable VM '{0}' not found. Cannot run tests.".format(BOOTABLE_BASE_BOX_NAME)
- )
- class VirtualboxProviderHeavyTests(VirtualboxCloudTestCase):
- """
- Tests that include actually booting a machine and doing operations on it that might be lengthy.
- """
- def assertIsIpAddress(self, ip_str):
- """
- Is it either a IPv4 or IPv6 address
- @param ip_str:
- @type ip_str: str
- @raise AssertionError
- """
- try:
- socket.inet_aton(ip_str)
- except Exception:
- try:
- socket.inet_pton(socket.AF_INET6, ip_str)
- except Exception:
- self.fail("{0} is not a valid IP address".format(ip_str))
- def setUp(self):
- """
- Sets up the test requirements
- """
- # check if appropriate cloud provider and profile files are present
- provider_str = CONFIG_NAME
- providers = self.run_cloud('--list-providers')
- log.debug("providers: %s", providers)
- if provider_str not in providers:
- self.skipTest(
- 'Configuration file for {0} was not found. Check {0}.conf files '
- 'in tests/integration/files/conf/cloud.*.d/ to run these tests.'.format(PROVIDER_NAME)
- )
- # check if personal access token, ssh_key_file, and ssh_key_names are present
- config_path = os.path.join(
- integration.FILES,
- 'conf',
- 'cloud.providers.d',
- PROVIDER_NAME + '.conf'
- )
- log.debug("config_path: %s", config_path)
- providers = cloud_providers_config(config_path)
- log.debug("config: %s", providers)
- config_path = os.path.join(
- integration.FILES,
- 'conf',
- 'cloud.profiles.d',
- PROVIDER_NAME + '.conf'
- )
- profiles = vm_profiles_config(config_path, providers)
- profile = profiles.get(DEPLOY_PROFILE_NAME)
- if not profile:
- self.skipTest(
- 'Profile {0} was not found. Check {1}.conf files '
- 'in tests/integration/files/conf/cloud.profiles.d/ to run these tests.'.format(DEPLOY_PROFILE_NAME,
- PROVIDER_NAME)
- )
- base_box_name = profile.get("clonefrom")
- if base_box_name != BOOTABLE_BASE_BOX_NAME:
- self.skipTest(
- 'Profile {0} does not have a base box to clone from. Check {1}.conf files '
- 'in tests/integration/files/conf/cloud.profiles.d/ to run these tests.'
- 'And add a "clone_from: {2}" to the profile'.format(PROFILE_NAME, PROVIDER_NAME, BOOTABLE_BASE_BOX_NAME)
- )
- def tearDown(self):
- try:
- vb_stop_vm(BOOTABLE_BASE_BOX_NAME)
- except Exception:
- pass
- if vb_machine_exists(INSTANCE_NAME):
- try:
- vb_stop_vm(INSTANCE_NAME)
- vb_destroy_machine(INSTANCE_NAME)
- except Exception as e:
- log.warning("Possibly dirty state after exception", exc_info=True)
- def test_deploy(self):
- machines = self.run_cloud('-p {0} {1} --log-level=debug'.format(DEPLOY_PROFILE_NAME, INSTANCE_NAME))
- self.assertIn(INSTANCE_NAME, machines.keys())
- machine = machines[INSTANCE_NAME]
- self.assertIn("deployed", machine)
- self.assertTrue(machine["deployed"], "Machine wasn't deployed :(")
- def test_start_stop_action(self):
- res = self.run_cloud_action("start", BOOTABLE_BASE_BOX_NAME, timeout=10)
- log.info(res)
- machine = res.get(BOOTABLE_BASE_BOX_NAME)
- self.assertIsNotNone(machine)
- expected_state = "Running"
- state = machine.get("state")
- self.assertEqual(state, expected_state)
- res = self.run_cloud_action("stop", BOOTABLE_BASE_BOX_NAME, timeout=10)
- log.info(res)
- machine = res.get(BOOTABLE_BASE_BOX_NAME)
- self.assertIsNotNone(machine)
- expected_state = "PoweredOff"
- state = machine.get("state")
- self.assertEqual(state, expected_state)
- def test_restart_action(self):
- pass
- def test_network_addresses(self):
- # Machine is off
- ip_addresses = vb_get_network_addresses(machine_name=BOOTABLE_BASE_BOX_NAME)
- network_count = len(ip_addresses)
- self.assertEqual(network_count, 0)
- # Machine is up again
- vb_start_vm(BOOTABLE_BASE_BOX_NAME)
- ip_addresses = vb_wait_for_network_address(20, machine_name=BOOTABLE_BASE_BOX_NAME)
- network_count = len(ip_addresses)
- self.assertGreater(network_count, 0)
- for ip_address in ip_addresses:
- self.assertIsIpAddress(ip_address)
- @skipIf(HAS_LIBS is False, 'The \'vboxapi\' library is not available')
- class BaseVirtualboxTests(TestCase):
- def test_get_manager(self):
- self.assertIsNotNone(vb_get_box())
- class CreationDestructionVirtualboxTests(VirtualboxTestCase):
- def setUp(self):
- super(CreationDestructionVirtualboxTests, self).setUp()
- def test_vm_creation_and_destruction(self):
- vm_name = BASE_BOX_NAME
- vb_create_machine(vm_name)
- self.assertMachineExists(vm_name)
- vb_destroy_machine(vm_name)
- self.assertMachineDoesNotExist(vm_name)
- class CloneVirtualboxTests(VirtualboxTestCase):
- def setUp(self):
- self.vbox = vb_get_box()
- self.name = "SaltCloudVirtualboxTestVM"
- vb_create_machine(self.name)
- self.assertMachineExists(self.name)
- def tearDown(self):
- vb_destroy_machine(self.name)
- self.assertMachineDoesNotExist(self.name)
- def test_create_machine(self):
- vb_name = "NewTestMachine"
- machine = vb_clone_vm(
- name=vb_name,
- clone_from=self.name
- )
- self.assertEqual(machine.get("name"), vb_name)
- self.assertMachineExists(vb_name)
- vb_destroy_machine(vb_name)
- self.assertMachineDoesNotExist(vb_name)
- @skipIf(HAS_LIBS and vb_machine_exists(BOOTABLE_BASE_BOX_NAME) is False,
- "Bootable VM '{0}' not found. Cannot run tests.".format(BOOTABLE_BASE_BOX_NAME)
- )
- class BootVirtualboxTests(VirtualboxTestCase):
- def test_start_stop(self):
- for i in range(2):
- machine = vb_start_vm(BOOTABLE_BASE_BOX_NAME, 20000)
- self.assertEqual(machine_get_machinestate_str(machine), "Running")
- machine = vb_stop_vm(BOOTABLE_BASE_BOX_NAME)
- self.assertEqual(machine_get_machinestate_str(machine), "PoweredOff")
- class XpcomConversionTests(TestCase):
- @classmethod
- def _mock_xpcom_object(cls, interface_name=None, attributes=None):
- class XPCOM(object):
- def __str__(self):
- return "<XPCOM component '<unknown>' (implementing {0})>".format(interface_name)
- o = XPCOM()
- if attributes and isinstance(attributes, dict):
- for key, value in six.iteritems(attributes):
- setattr(o, key, value)
- return o
- def test_unknown_object(self):
- xpcom = XpcomConversionTests._mock_xpcom_object()
- ret = vb_xpcom_to_attribute_dict(xpcom)
- self.assertDictEqual(ret, dict())
- def test_imachine_object_default(self):
- interface = "IMachine"
- imachine = XpcomConversionTests._mock_xpcom_object(interface)
- ret = vb_xpcom_to_attribute_dict(imachine, interface_name=interface)
- expected_attributes = XPCOM_ATTRIBUTES[interface]
- self.assertIsNotNone(expected_attributes, "%s is unknown")
- for key in ret:
- self.assertIn(key, expected_attributes)
- def test_override_attributes(self):
- expected_dict = {
- "herp": "derp",
- "lol": "rofl",
- "something": 12345
- }
- xpc = XpcomConversionTests._mock_xpcom_object(attributes=expected_dict)
- ret = vb_xpcom_to_attribute_dict(xpc, attributes=expected_dict.keys())
- self.assertDictEqual(ret, expected_dict)
- def test_extra_attributes(self):
- interface = "IMachine"
- expected_extras = {
- "extra": "extra",
- }
- expected_machine = dict([(attribute, attribute) for attribute in XPCOM_ATTRIBUTES[interface]])
- expected_machine.update(expected_extras)
- imachine = XpcomConversionTests._mock_xpcom_object(interface, attributes=expected_machine)
- ret = vb_xpcom_to_attribute_dict(
- imachine,
- interface_name=interface,
- extra_attributes=expected_extras.keys()
- )
- self.assertDictEqual(ret, expected_machine)
- ret_keys = ret.keys()
- for key in expected_extras:
- self.assertIn(key, ret_keys)
- def test_extra_nonexistent_attributes(self):
- expected_extra_dict = {
- "nonexistent": ""
- }
- xpcom = XpcomConversionTests._mock_xpcom_object()
- ret = vb_xpcom_to_attribute_dict(xpcom, extra_attributes=expected_extra_dict.keys())
- self.assertDictEqual(ret, expected_extra_dict)
- def test_extra_nonexistent_attribute_with_default(self):
- expected_extras = [("nonexistent", list)]
- expected_extra_dict = {
- "nonexistent": []
- }
- xpcom = XpcomConversionTests._mock_xpcom_object()
- ret = vb_xpcom_to_attribute_dict(xpcom, extra_attributes=expected_extras)
- self.assertDictEqual(ret, expected_extra_dict)
|