test_ssh.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # -*- coding: utf-8 -*-
  2. '''
  3. :codeauthor: :email:`Daniel Wallace <dwallace@saltstack.com`
  4. '''
  5. # Import python libs
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import os
  8. import shutil
  9. import tempfile
  10. # Import Salt Testing libs
  11. from tests.support.runtests import RUNTIME_VARS
  12. from tests.support.unit import skipIf, TestCase
  13. from tests.support.case import ShellCase
  14. from tests.support.mock import NO_MOCK, NO_MOCK_REASON, patch, MagicMock
  15. # Import Salt libs
  16. import salt.config
  17. import salt.roster
  18. import salt.utils.files
  19. import salt.utils.path
  20. import salt.utils.thin
  21. import salt.utils.yaml
  22. from salt.client import ssh
  23. ROSTER = '''
  24. localhost:
  25. host: 127.0.0.1
  26. port: 2827
  27. self:
  28. host: 0.0.0.0
  29. port: 42
  30. '''
  31. @skipIf(NO_MOCK, NO_MOCK_REASON)
  32. @skipIf(not salt.utils.path.which('ssh'), "No ssh binary found in path")
  33. class SSHPasswordTests(ShellCase):
  34. def test_password_failure(self):
  35. '''
  36. Check password failures when trying to deploy keys
  37. '''
  38. opts = salt.config.client_config(self.get_config_file_path('master'))
  39. opts['list_hosts'] = False
  40. opts['argv'] = ['test.ping']
  41. opts['selected_target_option'] = 'glob'
  42. opts['tgt'] = 'localhost'
  43. opts['arg'] = []
  44. roster = os.path.join(self.config_dir, 'roster')
  45. handle_ssh_ret = [
  46. {'localhost': {'retcode': 255, 'stderr': u'Permission denied (publickey).\r\n', 'stdout': ''}},
  47. ]
  48. expected = {'localhost': 'Permission denied (publickey)'}
  49. display_output = MagicMock()
  50. with patch('salt.roster.get_roster_file', MagicMock(return_value=roster)), \
  51. patch('salt.client.ssh.SSH.handle_ssh', MagicMock(return_value=handle_ssh_ret)), \
  52. patch('salt.client.ssh.SSH.key_deploy', MagicMock(return_value=expected)), \
  53. patch('salt.output.display_output', display_output):
  54. client = ssh.SSH(opts)
  55. ret = next(client.run_iter())
  56. with self.assertRaises(SystemExit):
  57. client.run()
  58. display_output.assert_called_once_with(expected, 'nested', opts)
  59. self.assertIs(ret, handle_ssh_ret[0])
  60. class SSHRosterDefaults(TestCase):
  61. def test_roster_defaults_flat(self):
  62. '''
  63. Test Roster Defaults on the flat roster
  64. '''
  65. tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  66. expected = {
  67. 'self': {
  68. 'host': '0.0.0.0',
  69. 'user': 'daniel',
  70. 'port': 42,
  71. },
  72. 'localhost': {
  73. 'host': '127.0.0.1',
  74. 'user': 'daniel',
  75. 'port': 2827,
  76. },
  77. }
  78. try:
  79. root_dir = os.path.join(tempdir, 'foo', 'bar')
  80. os.makedirs(root_dir)
  81. fpath = os.path.join(root_dir, 'config')
  82. with salt.utils.files.fopen(fpath, 'w') as fp_:
  83. fp_.write(
  84. '''
  85. roster_defaults:
  86. user: daniel
  87. '''
  88. )
  89. opts = salt.config.master_config(fpath)
  90. with patch('salt.roster.get_roster_file', MagicMock(return_value=ROSTER)):
  91. with patch('salt.template.compile_template', MagicMock(return_value=salt.utils.yaml.safe_load(ROSTER))):
  92. roster = salt.roster.Roster(opts=opts)
  93. self.assertEqual(roster.targets('*', 'glob'), expected)
  94. finally:
  95. if os.path.isdir(tempdir):
  96. shutil.rmtree(tempdir)
  97. @skipIf(NO_MOCK, NO_MOCK_REASON)
  98. class SSHSingleTests(TestCase):
  99. def setUp(self):
  100. self.tmp_cachedir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
  101. def test_single_opts(self):
  102. ''' Sanity check for ssh.Single options
  103. '''
  104. argv = ['ssh.set_auth_key', 'root', 'hobn+amNAXSBTiOXEqlBjGB...rsa root@master']
  105. opts = {
  106. 'argv': argv,
  107. '__role': 'master',
  108. 'cachedir': self.tmp_cachedir,
  109. 'extension_modules': os.path.join(self.tmp_cachedir, 'extmods'),
  110. }
  111. target = {
  112. 'passwd': 'abc123',
  113. 'ssh_options': None,
  114. 'sudo': False,
  115. 'identities_only': False,
  116. 'host': 'login1',
  117. 'user': 'root',
  118. 'timeout': 65,
  119. 'remote_port_forwards': None,
  120. 'sudo_user': '',
  121. 'port': '22',
  122. 'priv': '/etc/salt/pki/master/ssh/salt-ssh.rsa'
  123. }
  124. single = ssh.Single(
  125. opts,
  126. opts['argv'],
  127. 'localhost',
  128. mods={},
  129. fsclient=None,
  130. thin=salt.utils.thin.thin_path(opts['cachedir']),
  131. mine=False,
  132. **target)
  133. self.assertEqual(single.shell._ssh_opts(), '')
  134. self.assertEqual(single.shell._cmd_str('date +%s'), 'ssh login1 '
  135. '-o KbdInteractiveAuthentication=no -o '
  136. 'PasswordAuthentication=yes -o ConnectTimeout=65 -o Port=22 '
  137. '-o IdentityFile=/etc/salt/pki/master/ssh/salt-ssh.rsa '
  138. '-o User=root date +%s')