123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- # -*- coding: utf-8 -*-
- '''
- Integration tests for DigitalOcean APIv2
- '''
- # Import Python Libs
- from __future__ import absolute_import, print_function, unicode_literals
- import base64
- import hashlib
- # Import Salt Testing Libs
- from tests.integration.cloud.helpers.cloud_test_base import CloudTest, TIMEOUT
- # Import Salt Libs
- from salt.ext.six.moves import range
- import salt.crypt
- import salt.utils.stringutils
- class DigitalOceanTest(CloudTest):
- '''
- Integration tests for the DigitalOcean cloud provider in Salt-Cloud
- '''
- PROVIDER = 'digitalocean'
- REQUIRED_PROVIDER_CONFIG_ITEMS = ('personal_access_token', 'ssh_key_file', 'ssh_key_name')
- def test_list_images(self):
- '''
- Tests the return of running the --list-images command for digitalocean
- '''
- image_list = self.run_cloud('--list-images {0}'.format(self.PROVIDER))
- self.assertIn(
- '14.04.5 x64',
- [i.strip() for i in image_list]
- )
- def test_list_locations(self):
- '''
- Tests the return of running the --list-locations command for digitalocean
- '''
- _list_locations = self.run_cloud('--list-locations {0}'.format(self.PROVIDER))
- self.assertIn(
- 'San Francisco 2',
- [i.strip() for i in _list_locations]
- )
- def test_list_sizes(self):
- '''
- Tests the return of running the --list-sizes command for digitalocean
- '''
- _list_sizes = self.run_cloud('--list-sizes {0}'.format(self.PROVIDER))
- self.assertIn(
- '16gb',
- [i.strip() for i in _list_sizes]
- )
- def test_key_management(self):
- '''
- Test key management
- '''
- do_key_name = self.instance_name + '-key'
- # generate key and fingerprint
- if salt.crypt.HAS_M2:
- rsa_key = salt.crypt.RSA.gen_key(4096, 65537, lambda: None)
- pub = b'ssh-rsa %s' % base64.b64encode(
- b'\x00\x00\x00\x07ssh-rsa%s%s' % rsa_key.pub())
- else:
- ssh_key = salt.crypt.RSA.generate(4096)
- pub = ssh_key.publickey().exportKey("OpenSSH")
- pub = salt.utils.stringutils.to_str(pub)
- key_hex = hashlib.md5(base64.b64decode(pub.strip().split()[1].encode())).hexdigest()
- finger_print = ':'.join([key_hex[x:x+2] for x in range(0, len(key_hex), 2)])
- try:
- _key = self.run_cloud('-f create_key {0} name="{1}" public_key="{2}"'.format(self.PROVIDER,
- do_key_name, pub))
- # Upload public key
- self.assertIn(
- finger_print,
- [i.strip() for i in _key]
- )
- # List all keys
- list_keypairs = self.run_cloud('-f list_keypairs {0}'.format(self.PROVIDER))
- self.assertIn(
- finger_print,
- [i.strip() for i in list_keypairs]
- )
- # List key
- show_keypair = self.run_cloud('-f show_keypair {0} keyname={1}'.format(self.PROVIDER, do_key_name))
- self.assertIn(
- finger_print,
- [i.strip() for i in show_keypair]
- )
- except AssertionError:
- # Delete the public key if the above assertions fail
- self.run_cloud('-f remove_key {0} id={1}'.format(self.PROVIDER, finger_print))
- raise
- finally:
- # Delete public key
- self.assertTrue(self.run_cloud('-f remove_key {0} id={1}'.format(self.PROVIDER, finger_print)))
- def test_instance(self):
- '''
- Test creating an instance on DigitalOcean
- '''
- # check if instance with salt installed returned
- ret_str = self.run_cloud('-p digitalocean-test {0}'.format(self.instance_name), timeout=TIMEOUT)
- self.assertInstanceExists(ret_str)
- self.assertDestroyInstance()
|