123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- # -*- coding: utf-8 -*-
- """
- Test the ssh module
- """
- from __future__ import absolute_import, print_function, unicode_literals
- import os
- import shutil
- import pytest
- import salt.utils.files
- import salt.utils.platform
- from salt.ext.tornado.httpclient import HTTPClient
- from tests.support.case import ModuleCase
- from tests.support.helpers import skip_if_binaries_missing, slowTest
- from tests.support.runtests import RUNTIME_VARS
- GITHUB_FINGERPRINT = "9d:38:5b:83:a9:17:52:92:56:1a:5e:c4:d4:81:8e:0a:ca:51:a2:64:f1:74:20:11:2e:f8:8a:c3:a1:39:49:8f"
- def check_status():
- """
- Check the status of Github for remote operations
- """
- try:
- return HTTPClient().fetch("http://github.com").code == 200
- except Exception: # pylint: disable=broad-except
- return False
- @skip_if_binaries_missing(["ssh", "ssh-keygen"], check_all=True)
- @pytest.mark.windows_whitelisted
- class SSHModuleTest(ModuleCase):
- """
- Test the ssh module
- """
- @classmethod
- def setUpClass(cls):
- cls.subsalt_dir = os.path.join(RUNTIME_VARS.TMP, "subsalt")
- cls.authorized_keys = os.path.join(cls.subsalt_dir, "authorized_keys")
- cls.known_hosts = os.path.join(cls.subsalt_dir, "known_hosts")
- def setUp(self):
- """
- Set up the ssh module tests
- """
- if not check_status():
- self.skipTest("External source, github.com is down")
- super(SSHModuleTest, self).setUp()
- if not os.path.isdir(self.subsalt_dir):
- os.makedirs(self.subsalt_dir)
- ssh_raw_path = os.path.join(RUNTIME_VARS.FILES, "ssh", "raw")
- with salt.utils.files.fopen(ssh_raw_path) as fd:
- self.key = fd.read().strip()
- def tearDown(self):
- """
- Tear down the ssh module tests
- """
- if os.path.isdir(self.subsalt_dir):
- shutil.rmtree(self.subsalt_dir)
- super(SSHModuleTest, self).tearDown()
- del self.key
- @slowTest
- def test_auth_keys(self):
- """
- test ssh.auth_keys
- """
- shutil.copyfile(
- os.path.join(RUNTIME_VARS.FILES, "ssh", "authorized_keys"),
- self.authorized_keys,
- )
- user = "root"
- if salt.utils.platform.is_windows():
- user = "Administrator"
- ret = self.run_function("ssh.auth_keys", [user, self.authorized_keys])
- self.assertEqual(len(list(ret.items())), 1) # exactly one key is found
- key_data = list(ret.items())[0][1]
- try:
- self.assertEqual(key_data["comment"], "github.com")
- self.assertEqual(key_data["enc"], "ssh-rsa")
- self.assertEqual(
- key_data["options"], ['command="/usr/local/lib/ssh-helper"']
- )
- self.assertEqual(key_data["fingerprint"], GITHUB_FINGERPRINT)
- except AssertionError as exc:
- raise AssertionError(
- "AssertionError: {0}. Function returned: {1}".format(exc, ret)
- )
- @slowTest
- def test_bad_enctype(self):
- """
- test to make sure that bad key encoding types don't generate an
- invalid key entry in authorized_keys
- """
- shutil.copyfile(
- os.path.join(RUNTIME_VARS.FILES, "ssh", "authorized_badkeys"),
- self.authorized_keys,
- )
- ret = self.run_function("ssh.auth_keys", ["root", self.authorized_keys])
- # The authorized_badkeys file contains a key with an invalid ssh key
- # encoding (dsa-sha2-nistp256 instead of ecdsa-sha2-nistp256)
- # auth_keys should skip any keys with invalid encodings. Internally
- # the minion will throw a CommandExecutionError so the
- # user will get an indicator of what went wrong.
- self.assertEqual(len(list(ret.items())), 0) # Zero keys found
- @slowTest
- def test_get_known_host_entries(self):
- """
- Check that known host information is returned from ~/.ssh/config
- """
- shutil.copyfile(
- os.path.join(RUNTIME_VARS.FILES, "ssh", "known_hosts"), self.known_hosts
- )
- arg = ["root", "github.com"]
- kwargs = {"config": self.known_hosts}
- ret = self.run_function("ssh.get_known_host_entries", arg, **kwargs)[0]
- try:
- self.assertEqual(ret["enc"], "ssh-rsa")
- self.assertEqual(ret["key"], self.key)
- self.assertEqual(ret["fingerprint"], GITHUB_FINGERPRINT)
- except AssertionError as exc:
- raise AssertionError(
- "AssertionError: {0}. Function returned: {1}".format(exc, ret)
- )
- @slowTest
- def test_recv_known_host_entries(self):
- """
- Check that known host information is returned from remote host
- """
- ret = self.run_function("ssh.recv_known_host_entries", ["github.com"])
- try:
- self.assertNotEqual(ret, None)
- self.assertEqual(ret[0]["enc"], "ssh-rsa")
- self.assertEqual(ret[0]["key"], self.key)
- self.assertEqual(ret[0]["fingerprint"], GITHUB_FINGERPRINT)
- except AssertionError as exc:
- raise AssertionError(
- "AssertionError: {0}. Function returned: {1}".format(exc, ret)
- )
- @slowTest
- def test_check_known_host_add(self):
- """
- Check known hosts by its fingerprint. File needs to be updated
- """
- arg = ["root", "github.com"]
- kwargs = {"fingerprint": GITHUB_FINGERPRINT, "config": self.known_hosts}
- ret = self.run_function("ssh.check_known_host", arg, **kwargs)
- self.assertEqual(ret, "add")
- @slowTest
- def test_check_known_host_update(self):
- """
- ssh.check_known_host update verification
- """
- shutil.copyfile(
- os.path.join(RUNTIME_VARS.FILES, "ssh", "known_hosts"), self.known_hosts
- )
- arg = ["root", "github.com"]
- kwargs = {"config": self.known_hosts}
- # wrong fingerprint
- ret = self.run_function(
- "ssh.check_known_host", arg, **dict(kwargs, fingerprint="aa:bb:cc:dd")
- )
- self.assertEqual(ret, "update")
- # wrong keyfile
- ret = self.run_function("ssh.check_known_host", arg, **dict(kwargs, key="YQ=="))
- self.assertEqual(ret, "update")
- @slowTest
- def test_check_known_host_exists(self):
- """
- Verify check_known_host_exists
- """
- shutil.copyfile(
- os.path.join(RUNTIME_VARS.FILES, "ssh", "known_hosts"), self.known_hosts
- )
- arg = ["root", "github.com"]
- kwargs = {"config": self.known_hosts}
- # wrong fingerprint
- ret = self.run_function(
- "ssh.check_known_host", arg, **dict(kwargs, fingerprint=GITHUB_FINGERPRINT)
- )
- self.assertEqual(ret, "exists")
- # wrong keyfile
- ret = self.run_function(
- "ssh.check_known_host", arg, **dict(kwargs, key=self.key)
- )
- self.assertEqual(ret, "exists")
- @slowTest
- def test_rm_known_host(self):
- """
- ssh.rm_known_host
- """
- shutil.copyfile(
- os.path.join(RUNTIME_VARS.FILES, "ssh", "known_hosts"), self.known_hosts
- )
- arg = ["root", "github.com"]
- kwargs = {"config": self.known_hosts, "key": self.key}
- # before removal
- ret = self.run_function("ssh.check_known_host", arg, **kwargs)
- self.assertEqual(ret, "exists")
- # remove
- self.run_function("ssh.rm_known_host", arg, config=self.known_hosts)
- # after removal
- ret = self.run_function("ssh.check_known_host", arg, **kwargs)
- self.assertEqual(ret, "add")
- @slowTest
- def test_set_known_host(self):
- """
- ssh.set_known_host
- """
- # add item
- ret = self.run_function(
- "ssh.set_known_host", ["root", "github.com"], config=self.known_hosts
- )
- try:
- self.assertEqual(ret["status"], "updated")
- self.assertEqual(ret["old"], None)
- self.assertEqual(ret["new"][0]["fingerprint"], GITHUB_FINGERPRINT)
- except AssertionError as exc:
- raise AssertionError(
- "AssertionError: {0}. Function returned: {1}".format(exc, ret)
- )
- # check that item does exist
- ret = self.run_function(
- "ssh.get_known_host_entries",
- ["root", "github.com"],
- config=self.known_hosts,
- )[0]
- try:
- self.assertEqual(ret["fingerprint"], GITHUB_FINGERPRINT)
- except AssertionError as exc:
- raise AssertionError(
- "AssertionError: {0}. Function returned: {1}".format(exc, ret)
- )
- # add the same item once again
- ret = self.run_function(
- "ssh.set_known_host", ["root", "github.com"], config=self.known_hosts
- )
- try:
- self.assertEqual(ret["status"], "exists")
- except AssertionError as exc:
- raise AssertionError(
- "AssertionError: {0}. Function returned: {1}".format(exc, ret)
- )
|