123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- # -*- coding: utf-8 -*-
- '''
- Test the ssh module
- '''
- # Import python libs
- from __future__ import absolute_import, unicode_literals, print_function
- import os
- import shutil
- # Import Salt Testing libs
- from tests.support.runtime import RUNTIME_VARS
- from tests.support.case import ModuleCase
- # Import salt libs
- import salt.utils.files
- import salt.utils.platform
- # Import 3rd-party libs
- import pytest
- from tornado.httpclient import HTTPClient
- GITHUB_FINGERPRINT = '9d:38:5b:83:a9:17:52:92:56:1a:5e:c4:d4:81:8e:0a:ca:51:a2:64:f1:74:20:11:2e:f8:8a:c3:a1:39:49:8f'
- def check_status():
- '''
- Check the status of Github for remote operations
- '''
- try:
- return HTTPClient().fetch('http://github.com').code == 200
- except Exception: # pylint: disable=broad-except
- return False
- @pytest.mark.skip_if_binaries_missing(['ssh', 'ssh-keygen'], check_all=True)
- class SSHModuleTest(ModuleCase):
- '''
- Test the ssh module
- '''
- @classmethod
- def setUpClass(cls):
- cls.subsalt_dir = os.path.join(RUNTIME_VARS.TMP, 'subsalt')
- cls.authorized_keys = os.path.join(cls.subsalt_dir, 'authorized_keys')
- cls.known_hosts = os.path.join(cls.subsalt_dir, 'known_hosts')
- def setUp(self):
- '''
- Set up the ssh module tests
- '''
- if not check_status():
- self.skipTest('External source, github.com is down')
- super(SSHModuleTest, self).setUp()
- if not os.path.isdir(self.subsalt_dir):
- os.makedirs(self.subsalt_dir)
- ssh_raw_path = os.path.join(RUNTIME_VARS.FILES, 'ssh', 'raw')
- with salt.utils.files.fopen(ssh_raw_path) as fd:
- self.key = fd.read().strip()
- def tearDown(self):
- '''
- Tear down the ssh module tests
- '''
- if os.path.isdir(self.subsalt_dir):
- shutil.rmtree(self.subsalt_dir)
- super(SSHModuleTest, self).tearDown()
- del self.key
- def test_auth_keys(self):
- '''
- test ssh.auth_keys
- '''
- shutil.copyfile(
- os.path.join(RUNTIME_VARS.FILES, 'ssh', 'authorized_keys'),
- self.authorized_keys)
- user = 'root'
- if salt.utils.platform.is_windows():
- user = 'Administrator'
- ret = self.run_function('ssh.auth_keys', [user, self.authorized_keys])
- self.assertEqual(len(list(ret.items())), 1) # exactly one key is found
- key_data = list(ret.items())[0][1]
- try:
- self.assertEqual(key_data['comment'], 'github.com')
- self.assertEqual(key_data['enc'], 'ssh-rsa')
- self.assertEqual(
- key_data['options'], ['command="/usr/local/lib/ssh-helper"']
- )
- self.assertEqual(key_data['fingerprint'], GITHUB_FINGERPRINT)
- except AssertionError as exc:
- raise AssertionError(
- 'AssertionError: {0}. Function returned: {1}'.format(
- exc, ret
- )
- )
- def test_bad_enctype(self):
- '''
- test to make sure that bad key encoding types don't generate an
- invalid key entry in authorized_keys
- '''
- shutil.copyfile(
- os.path.join(RUNTIME_VARS.FILES, 'ssh', 'authorized_badkeys'),
- self.authorized_keys)
- ret = self.run_function('ssh.auth_keys', ['root', self.authorized_keys])
- # The authorized_badkeys file contains a key with an invalid ssh key
- # encoding (dsa-sha2-nistp256 instead of ecdsa-sha2-nistp256)
- # auth_keys should skip any keys with invalid encodings. Internally
- # the minion will throw a CommandExecutionError so the
- # user will get an indicator of what went wrong.
- self.assertEqual(len(list(ret.items())), 0) # Zero keys found
- def test_get_known_host_entries(self):
- '''
- Check that known host information is returned from ~/.ssh/config
- '''
- shutil.copyfile(
- os.path.join(RUNTIME_VARS.FILES, 'ssh', 'known_hosts'),
- self.known_hosts)
- arg = ['root', 'github.com']
- kwargs = {'config': self.known_hosts}
- ret = self.run_function('ssh.get_known_host_entries', arg, **kwargs)[0]
- try:
- self.assertEqual(ret['enc'], 'ssh-rsa')
- self.assertEqual(ret['key'], self.key)
- self.assertEqual(ret['fingerprint'], GITHUB_FINGERPRINT)
- except AssertionError as exc:
- raise AssertionError(
- 'AssertionError: {0}. Function returned: {1}'.format(
- exc, ret
- )
- )
- def test_recv_known_host_entries(self):
- '''
- Check that known host information is returned from remote host
- '''
- ret = self.run_function('ssh.recv_known_host_entries', ['github.com'])
- try:
- self.assertNotEqual(ret, None)
- self.assertEqual(ret[0]['enc'], 'ssh-rsa')
- self.assertEqual(ret[0]['key'], self.key)
- self.assertEqual(ret[0]['fingerprint'], GITHUB_FINGERPRINT)
- except AssertionError as exc:
- raise AssertionError(
- 'AssertionError: {0}. Function returned: {1}'.format(
- exc, ret
- )
- )
- def test_check_known_host_add(self):
- '''
- Check known hosts by its fingerprint. File needs to be updated
- '''
- arg = ['root', 'github.com']
- kwargs = {'fingerprint': GITHUB_FINGERPRINT, 'config': self.known_hosts}
- ret = self.run_function('ssh.check_known_host', arg, **kwargs)
- self.assertEqual(ret, 'add')
- def test_check_known_host_update(self):
- '''
- ssh.check_known_host update verification
- '''
- shutil.copyfile(
- os.path.join(RUNTIME_VARS.FILES, 'ssh', 'known_hosts'),
- self.known_hosts)
- arg = ['root', 'github.com']
- kwargs = {'config': self.known_hosts}
- # wrong fingerprint
- ret = self.run_function('ssh.check_known_host', arg,
- **dict(kwargs, fingerprint='aa:bb:cc:dd'))
- self.assertEqual(ret, 'update')
- # wrong keyfile
- ret = self.run_function('ssh.check_known_host', arg,
- **dict(kwargs, key='YQ=='))
- self.assertEqual(ret, 'update')
- def test_check_known_host_exists(self):
- '''
- Verify check_known_host_exists
- '''
- shutil.copyfile(
- os.path.join(RUNTIME_VARS.FILES, 'ssh', 'known_hosts'),
- self.known_hosts)
- arg = ['root', 'github.com']
- kwargs = {'config': self.known_hosts}
- # wrong fingerprint
- ret = self.run_function('ssh.check_known_host', arg,
- **dict(kwargs, fingerprint=GITHUB_FINGERPRINT))
- self.assertEqual(ret, 'exists')
- # wrong keyfile
- ret = self.run_function('ssh.check_known_host', arg,
- **dict(kwargs, key=self.key))
- self.assertEqual(ret, 'exists')
- def test_rm_known_host(self):
- '''
- ssh.rm_known_host
- '''
- shutil.copyfile(
- os.path.join(RUNTIME_VARS.FILES, 'ssh', 'known_hosts'),
- self.known_hosts)
- arg = ['root', 'github.com']
- kwargs = {'config': self.known_hosts, 'key': self.key}
- # before removal
- ret = self.run_function('ssh.check_known_host', arg, **kwargs)
- self.assertEqual(ret, 'exists')
- # remove
- self.run_function('ssh.rm_known_host', arg, config=self.known_hosts)
- # after removal
- ret = self.run_function('ssh.check_known_host', arg, **kwargs)
- self.assertEqual(ret, 'add')
- def test_set_known_host(self):
- '''
- ssh.set_known_host
- '''
- # add item
- ret = self.run_function('ssh.set_known_host', ['root', 'github.com'],
- config=self.known_hosts)
- try:
- self.assertEqual(ret['status'], 'updated')
- self.assertEqual(ret['old'], None)
- self.assertEqual(ret['new'][0]['fingerprint'], GITHUB_FINGERPRINT)
- except AssertionError as exc:
- raise AssertionError(
- 'AssertionError: {0}. Function returned: {1}'.format(
- exc, ret
- )
- )
- # check that item does exist
- ret = self.run_function('ssh.get_known_host_entries', ['root', 'github.com'],
- config=self.known_hosts)[0]
- try:
- self.assertEqual(ret['fingerprint'], GITHUB_FINGERPRINT)
- except AssertionError as exc:
- raise AssertionError(
- 'AssertionError: {0}. Function returned: {1}'.format(
- exc, ret
- )
- )
- # add the same item once again
- ret = self.run_function('ssh.set_known_host', ['root', 'github.com'],
- config=self.known_hosts)
- try:
- self.assertEqual(ret['status'], 'exists')
- except AssertionError as exc:
- raise AssertionError(
- 'AssertionError: {0}. Function returned: {1}'.format(
- exc, ret
- )
- )
|