This commit has unit test cases for Left hand client to increase code coverage of client. Change-Id: Ia8576ce971d9ef7554446d0fa028954565f1bff0
328 lines
14 KiB
Python
328 lines
14 KiB
Python
# (c) Copyright 2015 Hewlett Packard Enterprise Development LP
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import mock
|
|
import paramiko
|
|
import unittest
|
|
|
|
import test_HPELeftHandClient_base
|
|
from hpelefthandclient import exceptions
|
|
from hpelefthandclient import ssh
|
|
|
|
# Python 3+ override
|
|
try:
|
|
basestring
|
|
except NameError:
|
|
basestring = str
|
|
|
|
user = "u"
|
|
password = "p"
|
|
ip = "10.10.22.241"
|
|
api_url = "http://10.10.22.241:8008/api/v1"
|
|
|
|
|
|
class HPELeftHandClientMockSSHTestCase(test_HPELeftHandClient_base
|
|
.HPELeftHandClientBaseTestCase):
|
|
|
|
def mock_paramiko(self, known_hosts_file, missing_key_policy):
|
|
"""Verify that these params get into paramiko."""
|
|
|
|
mock_lhk = mock.Mock()
|
|
mock_lshk = mock.Mock()
|
|
mock_smhkp = mock.Mock()
|
|
mock_smhkp.side_effect = Exception("Let's end this here")
|
|
|
|
with mock.patch('paramiko.client.SSHClient.load_system_host_keys',
|
|
mock_lshk, create=True):
|
|
with mock.patch('paramiko.client.SSHClient.load_host_keys',
|
|
mock_lhk, create=True):
|
|
with mock.patch('paramiko.client.SSHClient.'
|
|
'set_missing_host_key_policy',
|
|
mock_smhkp, create=True):
|
|
try:
|
|
self.cl.setSSHOptions(
|
|
ip, user, password,
|
|
known_hosts_file=known_hosts_file,
|
|
missing_key_policy=missing_key_policy)
|
|
except paramiko.SSHException as e:
|
|
if 'Invalid missing_key_policy' in str(e):
|
|
raise e
|
|
except Exception:
|
|
pass
|
|
|
|
if known_hosts_file is None:
|
|
mock_lhk.assert_not_called()
|
|
mock_lshk.assert_called_with()
|
|
else:
|
|
mock_lhk.assert_called_with(known_hosts_file)
|
|
mock_lshk.assert_not_called()
|
|
|
|
actual = mock_smhkp.call_args[0][0].__class__.__name__
|
|
if missing_key_policy is None:
|
|
# If missing, it should be called with our
|
|
# default which is an AutoAddPolicy
|
|
expected = paramiko.AutoAddPolicy().__class__.__name__
|
|
elif isinstance(missing_key_policy, basestring):
|
|
expected = missing_key_policy
|
|
else:
|
|
expected = missing_key_policy.__class__.__name__
|
|
self.assertEqual(actual, expected)
|
|
|
|
def do_mock_create_ssh(self, known_hosts_file, missing_key_policy):
|
|
"""Verify that params are getting forwarded to _create_ssh()."""
|
|
|
|
mock_ssh = mock.Mock()
|
|
path = 'hpelefthandclient.ssh.HPELeftHandSSHClient._create_ssh'
|
|
with mock.patch(path, mock_ssh, create=True):
|
|
|
|
self.cl.setSSHOptions(ip, user, password,
|
|
known_hosts_file=known_hosts_file,
|
|
missing_key_policy=missing_key_policy)
|
|
|
|
mock_ssh.assert_called_with(missing_key_policy=missing_key_policy,
|
|
known_hosts_file=known_hosts_file)
|
|
|
|
# Create a mocked ssh object for the client so that it can be
|
|
# "closed" during a logout.
|
|
self.cl.ssh = mock.MagicMock()
|
|
|
|
@mock.patch('hpelefthandclient.ssh.HPELeftHandSSHClient')
|
|
def do_mock_ssh(self, known_hosts_file, missing_key_policy,
|
|
mock_ssh_client):
|
|
"""Verify that params are getting forwarded to HPELeftHandSSHClient."""
|
|
|
|
self.cl.setSSHOptions(ip, user, password,
|
|
known_hosts_file=known_hosts_file,
|
|
missing_key_policy=missing_key_policy)
|
|
|
|
mock_ssh_client.assert_called_with(
|
|
ip, user, password, 16022, None, None,
|
|
missing_key_policy=missing_key_policy,
|
|
known_hosts_file=known_hosts_file)
|
|
|
|
def base(self, known_hosts_file, missing_key_policy):
|
|
self.printHeader("%s : known_hosts_file=%s missing_key_policy=%s" %
|
|
(unittest.TestCase.id(self),
|
|
known_hosts_file, missing_key_policy))
|
|
self.do_mock_ssh(known_hosts_file, missing_key_policy)
|
|
self.do_mock_create_ssh(known_hosts_file, missing_key_policy)
|
|
self.mock_paramiko(known_hosts_file, missing_key_policy)
|
|
self.printFooter(unittest.TestCase.id(self))
|
|
|
|
def test_auto_add_policy(self):
|
|
known_hosts_file = "test_bogus_known_hosts_file"
|
|
missing_key_policy = "AutoAddPolicy"
|
|
self.base(known_hosts_file, missing_key_policy)
|
|
|
|
def test_warning_policy(self):
|
|
known_hosts_file = "test_bogus_known_hosts_file"
|
|
missing_key_policy = "WarningPolicy"
|
|
self.base(known_hosts_file, missing_key_policy)
|
|
|
|
def test_reject_policy(self):
|
|
known_hosts_file = "test_bogus_known_hosts_file"
|
|
missing_key_policy = "RejectPolicy"
|
|
self.base(known_hosts_file, missing_key_policy)
|
|
|
|
def test_known_hosts_file_is_none(self):
|
|
known_hosts_file = None
|
|
missing_key_policy = paramiko.RejectPolicy()
|
|
self.base(known_hosts_file, missing_key_policy)
|
|
|
|
def test_both_settings_are_none(self):
|
|
known_hosts_file = None
|
|
missing_key_policy = None
|
|
self.base(known_hosts_file, missing_key_policy)
|
|
|
|
def test_bogus_missing_key_policy(self):
|
|
known_hosts_file = None
|
|
missing_key_policy = "bogus"
|
|
self.assertRaises(paramiko.SSHException,
|
|
self.base,
|
|
known_hosts_file,
|
|
missing_key_policy)
|
|
|
|
def test_create_ssh_except(self):
|
|
"""Make sure that SSH exceptions are not quietly eaten."""
|
|
|
|
self.cl.setSSHOptions(ip,
|
|
user,
|
|
password,
|
|
known_hosts_file=None,
|
|
missing_key_policy=paramiko.AutoAddPolicy)
|
|
|
|
self.cl.ssh.ssh = mock.Mock()
|
|
self.cl.ssh.ssh.invoke_shell.side_effect = Exception('boom')
|
|
|
|
cmd = ['fake']
|
|
self.assertRaises(exceptions.SSHException, self.cl.ssh._run_ssh, cmd)
|
|
|
|
self.cl.ssh.ssh.assert_has_calls(
|
|
[
|
|
mock.call.get_transport(),
|
|
mock.call.get_transport().is_alive(),
|
|
mock.call.invoke_shell(),
|
|
mock.call.get_transport(),
|
|
mock.call.get_transport().is_alive(),
|
|
]
|
|
)
|
|
|
|
def test_was_command_successful_true(self):
|
|
cmd_out = ['',
|
|
'HP StoreVirtual LeftHand OS Command Line Interface',
|
|
'(C) Copyright 2007-2013',
|
|
'',
|
|
'RESPONSE',
|
|
' result 0']
|
|
ssh_client = ssh.HPELeftHandSSHClient('foo', 'bar', 'biz')
|
|
out = ssh_client.was_command_successful(cmd_out)
|
|
self.assertTrue(out)
|
|
|
|
def test_was_command_successful_false(self):
|
|
# Create our fake SSH client
|
|
ssh_client = ssh.HPELeftHandSSHClient('foo', 'bar', 'biz')
|
|
|
|
# Test invalid command output
|
|
cmd_out = ['invalid']
|
|
self.assertRaises(exceptions.SSHException,
|
|
ssh_client.was_command_successful,
|
|
cmd_out)
|
|
|
|
# Test valid command output, but command failed
|
|
cmd_out = ['',
|
|
'HP StoreVirtual LeftHand OS Command Line Interface',
|
|
'(C) Copyright 2007-2013',
|
|
'',
|
|
'RESPONSE',
|
|
' result 8080878378']
|
|
self.assertRaises(exceptions.SSHException,
|
|
ssh_client.was_command_successful,
|
|
cmd_out)
|
|
|
|
def test_connect_with_password(self):
|
|
ssh_client = ssh.HPELeftHandSSHClient(ip, user, password,
|
|
known_hosts_file=None,
|
|
missing_key_policy=paramiko.
|
|
AutoAddPolicy)
|
|
|
|
ssh_client.san_password = 'my_san_pwd'
|
|
ssh_client.san_ip = '0.0.0.0'
|
|
ssh_client.san_ssh_port = '1234'
|
|
ssh_client.san_login = 'my_login'
|
|
ssh_client.ssh_conn_timeout = '100'
|
|
|
|
ssh_client.ssh.connect = mock.Mock()
|
|
ssh_client._connect(ssh_client.ssh)
|
|
ssh_client.ssh.connect.assert_called_with('0.0.0.0',
|
|
port='1234',
|
|
username='my_login',
|
|
password='my_san_pwd',
|
|
timeout='100')
|
|
|
|
def test_connect_with_private_key(self):
|
|
ssh_client = ssh.HPELeftHandSSHClient(ip, user, password,
|
|
known_hosts_file=None,
|
|
missing_key_policy=paramiko.
|
|
AutoAddPolicy)
|
|
mock_expand_user = mock.Mock()
|
|
mock_expand_user.return_value = 'my_user'
|
|
|
|
mock_from_private_key_file = mock.Mock()
|
|
mock_from_private_key_file.return_value = 'my_private_key'
|
|
|
|
ssh_client.san_password = None
|
|
ssh_client.san_privatekey = 'my_san_key'
|
|
ssh_client.san_ip = '0.0.0.0'
|
|
ssh_client.san_ssh_port = '1234'
|
|
ssh_client.san_login = 'my_login'
|
|
ssh_client.ssh_conn_timeout = '100'
|
|
|
|
ssh_client.ssh.connect = mock.Mock()
|
|
|
|
with mock.patch('os.path.expanduser',
|
|
mock_expand_user, create=True):
|
|
with mock.patch('paramiko.RSAKey.from_private_key_file',
|
|
mock_from_private_key_file, create=True):
|
|
ssh_client._connect(ssh_client.ssh)
|
|
|
|
mock_expand_user.assert_called_with('my_san_key')
|
|
mock_from_private_key_file.assert_called_with('my_user')
|
|
ssh_client.ssh.connect.assert_called_with('0.0.0.0',
|
|
port='1234',
|
|
username='my_login',
|
|
pkey='my_private_key',
|
|
timeout='100')
|
|
|
|
def test_connect_without_password_and_private_key(self):
|
|
ssh_client = ssh.HPELeftHandSSHClient(ip, user, password,
|
|
known_hosts_file=None,
|
|
missing_key_policy=paramiko.
|
|
AutoAddPolicy)
|
|
mock_expand_user = mock.Mock()
|
|
mock_expand_user.return_value = 'my_user'
|
|
|
|
mock_from_private_key_file = mock.Mock()
|
|
mock_from_private_key_file.return_value = 'my_private_key'
|
|
|
|
ssh_client.san_password = None
|
|
ssh_client.san_privatekey = None
|
|
|
|
ssh_client.ssh = mock.Mock()
|
|
ssh_client.ssh.get_transport.return_value = False
|
|
self.assertRaises(paramiko.SSHException, ssh_client.open)
|
|
|
|
def test_run_ssh_with_exception(self):
|
|
ssh_client = ssh.HPELeftHandSSHClient(ip, user, password,
|
|
known_hosts_file=None,
|
|
missing_key_policy=paramiko.
|
|
AutoAddPolicy)
|
|
|
|
ssh_client.check_ssh_injection = mock.Mock()
|
|
ssh_client.check_ssh_injection.return_value = True
|
|
ssh_client._ssh_execute = mock.Mock()
|
|
ssh_client._ssh_execute.side_effect = Exception('End this here')
|
|
ssh_client._create_ssh = mock.Mock()
|
|
ssh_client._create_ssh.return_value = True
|
|
ssh_client.ssh = mock.Mock()
|
|
ssh_client.ssh.get_transport.is_alive.return_value = True
|
|
|
|
command = ['fake']
|
|
self.assertRaises(exceptions.SSHException, ssh_client._run_ssh,
|
|
command, attempts=1)
|
|
ssh_client.check_ssh_injection.assert_called_once()
|
|
ssh_client._ssh_execute.assert_called_once()
|
|
ssh_client._create_ssh.assert_not_called()
|
|
|
|
def test_run_ssh_exceeds_attempts(self):
|
|
ssh_client = ssh.HPELeftHandSSHClient(ip, user, password,
|
|
known_hosts_file=None,
|
|
missing_key_policy=paramiko.
|
|
AutoAddPolicy)
|
|
|
|
ssh_client.check_ssh_injection = mock.Mock()
|
|
ssh_client.check_ssh_injection.return_value = True
|
|
ssh_client._ssh_execute = mock.Mock()
|
|
ssh_client._ssh_execute.side_effect = Exception('End this here')
|
|
ssh_client._create_ssh = mock.Mock()
|
|
ssh_client._create_ssh.return_value = True
|
|
ssh_client.ssh = mock.Mock()
|
|
ssh_client.ssh.get_transport.side_effect = Exception('End here')
|
|
|
|
command = ['fake']
|
|
self.assertRaises(Exception, ssh_client._run_ssh, command, attempts=1)
|
|
ssh_client.check_ssh_injection.assert_called_once()
|
|
ssh_client._ssh_execute.assert_called_once()
|
|
ssh_client._create_ssh.assert_not_called()
|