Add unit tests for Left Hand client

This commit has unit test cases for Left hand client
to increase code coverage of client.

Change-Id: Ia8576ce971d9ef7554446d0fa028954565f1bff0
This commit is contained in:
Stack
2016-09-27 01:34:37 -07:00
parent db9773f0f9
commit 64cb39b9da
7 changed files with 339 additions and 5 deletions

View File

@@ -1,6 +1,6 @@
nose
nose-testconfig
flask
flask==0.10.1
Werkzeug
mock
flake8 # Used by tox

View File

@@ -1,6 +1,6 @@
nose
nose-testconfig
flask
flask==0.10.1
Werkzeug
mock
flake8 # Used by tox

View File

@@ -193,7 +193,7 @@ def delete_server(server_id):
@app.route('/lhos/servers', methods=['GET'])
def get_server():
def get_servers():
debugRequest(request)
server_name = None
server_name = request.args.get('name')
@@ -209,6 +209,17 @@ def get_server():
return resp
@app.route('/lhos/servers/<server_id>', methods=['GET'])
def get_server(server_id):
debugRequest(request)
for server in servers['members']:
if server['id'] == int(server_id):
resp = make_response(json.dumps(server), 200)
return resp
throw_error(500, 'SERVER_ERROR',
"The server id '%s' does not exists." % server_id)
# VOLUMES & SNAPSHOTS #
@@ -303,7 +314,7 @@ def handle_volume_actions(volume_id):
@app.route('/lhos/volumes', methods=['GET'])
def get_volume():
def get_volumes():
debugRequest(request)
volume_name = None
volume_name = request.args.get('name')
@@ -331,8 +342,34 @@ def get_volume():
return resp
@app.route('/lhos/volumes/<volume_id>', methods=['GET'])
def get_volume(volume_id):
debugRequest(request)
for volume in volumes['members']:
if volume['id'] == int(volume_id):
resp = make_response(json.dumps(volume), 200)
return resp
throw_error(500, 'SERVER_ERROR',
"The volume id '%s' does not exists." % volume_id)
@app.route('/lhos/volumes/<volume_id>', methods=['PUT'])
def modify_volume(volume_id):
debugRequest(request)
data = json.loads(request.data.decode('utf-8'))
for volume in volumes['members']:
if volume['id'] == int(volume_id):
for key in data.keys():
volume[key] = data[key]
return make_response("", 200)
throw_error(500, 'SERVER_ERROR',
"The volume id '%s' does not exists." % volume_id)
@app.route('/lhos/snapshots', methods=['GET'])
def get_snapshot():
def get_snapshots():
debugRequest(request)
snapshot_name = None
snapshot_name = request.args.get('name')
@@ -350,6 +387,18 @@ def get_snapshot():
return resp
@app.route('/lhos/snapshots/<snapshot_id>', methods=['GET'])
def get_snapshot(snapshot_id):
debugRequest(request)
for snapshot in snapshots['members']:
if snapshot['id'] == int(snapshot_id):
resp = make_response(json.dumps(snapshot), 200)
return resp
throw_error(500, 'SERVER_ERROR',
"The snapshot id '%s' does not exists." % snapshot_id)
@app.route('/lhos/volumes', methods=['POST'])
def create_volumes():
debugRequest(request)

View File

@@ -210,3 +210,118 @@ class HPELeftHandClientMockSSHTestCase(test_HPELeftHandClient_base
self.assertRaises(exceptions.SSHException,
ssh_client.was_command_successful,
cmd_out)
def test_connect_with_password(self):
ssh_client = ssh.HPELeftHandSSHClient(ip, user, password,
known_hosts_file=None,
missing_key_policy=paramiko.
AutoAddPolicy)
ssh_client.san_password = 'my_san_pwd'
ssh_client.san_ip = '0.0.0.0'
ssh_client.san_ssh_port = '1234'
ssh_client.san_login = 'my_login'
ssh_client.ssh_conn_timeout = '100'
ssh_client.ssh.connect = mock.Mock()
ssh_client._connect(ssh_client.ssh)
ssh_client.ssh.connect.assert_called_with('0.0.0.0',
port='1234',
username='my_login',
password='my_san_pwd',
timeout='100')
def test_connect_with_private_key(self):
ssh_client = ssh.HPELeftHandSSHClient(ip, user, password,
known_hosts_file=None,
missing_key_policy=paramiko.
AutoAddPolicy)
mock_expand_user = mock.Mock()
mock_expand_user.return_value = 'my_user'
mock_from_private_key_file = mock.Mock()
mock_from_private_key_file.return_value = 'my_private_key'
ssh_client.san_password = None
ssh_client.san_privatekey = 'my_san_key'
ssh_client.san_ip = '0.0.0.0'
ssh_client.san_ssh_port = '1234'
ssh_client.san_login = 'my_login'
ssh_client.ssh_conn_timeout = '100'
ssh_client.ssh.connect = mock.Mock()
with mock.patch('os.path.expanduser',
mock_expand_user, create=True):
with mock.patch('paramiko.RSAKey.from_private_key_file',
mock_from_private_key_file, create=True):
ssh_client._connect(ssh_client.ssh)
mock_expand_user.assert_called_with('my_san_key')
mock_from_private_key_file.assert_called_with('my_user')
ssh_client.ssh.connect.assert_called_with('0.0.0.0',
port='1234',
username='my_login',
pkey='my_private_key',
timeout='100')
def test_connect_without_password_and_private_key(self):
ssh_client = ssh.HPELeftHandSSHClient(ip, user, password,
known_hosts_file=None,
missing_key_policy=paramiko.
AutoAddPolicy)
mock_expand_user = mock.Mock()
mock_expand_user.return_value = 'my_user'
mock_from_private_key_file = mock.Mock()
mock_from_private_key_file.return_value = 'my_private_key'
ssh_client.san_password = None
ssh_client.san_privatekey = None
ssh_client.ssh = mock.Mock()
ssh_client.ssh.get_transport.return_value = False
self.assertRaises(paramiko.SSHException, ssh_client.open)
def test_run_ssh_with_exception(self):
ssh_client = ssh.HPELeftHandSSHClient(ip, user, password,
known_hosts_file=None,
missing_key_policy=paramiko.
AutoAddPolicy)
ssh_client.check_ssh_injection = mock.Mock()
ssh_client.check_ssh_injection.return_value = True
ssh_client._ssh_execute = mock.Mock()
ssh_client._ssh_execute.side_effect = Exception('End this here')
ssh_client._create_ssh = mock.Mock()
ssh_client._create_ssh.return_value = True
ssh_client.ssh = mock.Mock()
ssh_client.ssh.get_transport.is_alive.return_value = True
command = ['fake']
self.assertRaises(exceptions.SSHException, ssh_client._run_ssh,
command, attempts=1)
ssh_client.check_ssh_injection.assert_called_once()
ssh_client._ssh_execute.assert_called_once()
ssh_client._create_ssh.assert_not_called()
def test_run_ssh_exceeds_attempts(self):
ssh_client = ssh.HPELeftHandSSHClient(ip, user, password,
known_hosts_file=None,
missing_key_policy=paramiko.
AutoAddPolicy)
ssh_client.check_ssh_injection = mock.Mock()
ssh_client.check_ssh_injection.return_value = True
ssh_client._ssh_execute = mock.Mock()
ssh_client._ssh_execute.side_effect = Exception('End this here')
ssh_client._create_ssh = mock.Mock()
ssh_client._create_ssh.return_value = True
ssh_client.ssh = mock.Mock()
ssh_client.ssh.get_transport.side_effect = Exception('End here')
command = ['fake']
self.assertRaises(Exception, ssh_client._run_ssh, command, attempts=1)
ssh_client.check_ssh_injection.assert_called_once()
ssh_client._ssh_execute.assert_called_once()
ssh_client._create_ssh.assert_not_called()

View File

@@ -342,6 +342,16 @@ class HPELeftHandClientServerTestCase(test_HPELeftHandClient_base.
self.printFooter('get_server_by_name')
def test_4_get_server_by_id(self):
self.printHeader('get_server_by_id')
self.cl.createServer(SERVER_NAME1, IQN1)
result = self.cl.getServers()
server_info = self.cl.getServer(result['members'][0]['id'])
self.assertEqual(result['members'][0]['id'], server_info['id'])
self.printFooter('get_server_by_id')
def test_4_get_server_by_name_missing_server(self):
self.printHeader('get_server_by_name_missing_server')

View File

@@ -25,6 +25,7 @@ from hpelefthandclient import exceptions
VOLUME_NAME1 = 'VOLUME1_UNIT_TEST_' + test_HPELeftHandClient_base.TIME
VOLUME_NAME2 = 'VOLUME2_UNIT_TEST_' + test_HPELeftHandClient_base.TIME
VOLUME_NAME3 = 'VOLUME3_UNIT_TEST_' + test_HPELeftHandClient_base.TIME
CLONE_NAME1 = 'CLONE_UNIT_TEST_' + test_HPELeftHandClient_base.TIME
SNAP_NAME1 = 'SNAP_UNIT_TEST1_' + test_HPELeftHandClient_base.TIME
SNAP_NAME2 = 'SNAP_UNIT_TEST2_' + test_HPELeftHandClient_base.TIME
SKIP_FLASK_RCOPY_MESSAGE = ("Remote copy is not configured to be tested "
@@ -114,6 +115,11 @@ class HPELeftHandClientVolumeTestCase(test_HPELeftHandClient_base.
self.cl.deleteVolume(volume_info['id'])
except Exception:
pass
try:
volume_info = self.cl.getVolumeByName(CLONE_NAME1)
self.cl.deleteVolume(volume_info['id'])
except Exception:
pass
try:
self.secondary_cl.deleteRemoteSnapshotSchedule(
REMOTE_SNAP_SCHED_NAME + "_Rmt")
@@ -266,6 +272,20 @@ class HPELeftHandClientVolumeTestCase(test_HPELeftHandClient_base.
self.printFooter('get_volumes')
def test_2_get_volume_by_id(self):
self.printHeader('get_volumes')
self.cl.createVolume(VOLUME_NAME1, self.cluster_id,
self.GB_TO_BYTES)
self.cl.createVolume(VOLUME_NAME2, self.cluster_id,
self.GB_TO_BYTES)
vols = self.cl.getVolumes()
vol_info = self.cl.getVolume(vols['members'][0]['id'])
self.assertEqual(vols['members'][0]['id'], vol_info['id'])
self.printFooter('get_volumes')
def test_2_get_volumes_query(self):
self.printHeader('get_volumes')
@@ -287,6 +307,37 @@ class HPELeftHandClientVolumeTestCase(test_HPELeftHandClient_base.
self.printFooter('get_volumes')
def test_2_modify_volume(self):
self.printHeader('modify_volume')
self.cl.createVolume(VOLUME_NAME1, self.cluster_id,
self.GB_TO_BYTES)
volume_info = self.cl.getVolumeByName(VOLUME_NAME1)
new_options = {'description': 'test volume'}
self.cl.modifyVolume(volume_info['id'], new_options)
new_volume_info = self.cl.getVolumeByName(VOLUME_NAME1)
self.assertIn('description', new_volume_info)
self.assertEqual(new_options['description'],
new_volume_info['description'])
self.printFooter('modify_volume')
@unittest.skipIf(not is_live_test(), "Only runs on live array.")
def test_2_clone_volume(self):
self.printHeader('clone_volume')
self.cl.createVolume(VOLUME_NAME1, self.cluster_id,
self.GB_TO_BYTES)
volume_info = self.cl.getVolumeByName(VOLUME_NAME1)
self.cl.cloneVolume(CLONE_NAME1, volume_info['id'])
clone_volume_info = self.cl.getVolumeByName(CLONE_NAME1)
self.assertEqual(CLONE_NAME1,
clone_volume_info['name'])
self.printFooter('clone_volume')
def test_3_delete_volume_nonExist(self):
self.printHeader('delete_volume_nonExist')
@@ -521,6 +572,21 @@ class HPELeftHandClientVolumeTestCase(test_HPELeftHandClient_base.
self.printFooter('get_snapshot_parent_volume_snapshot_invalid')
def test_6_get_snapshot_by_id(self):
self.printHeader('get_snapshot_by_id')
self.cl.createVolume(VOLUME_NAME1, self.cluster_id,
self.GB_TO_BYTES)
volume_info = self.cl.getVolumeByName(VOLUME_NAME1)
option = {'inheritAccess': True}
self.cl.createSnapshot(SNAP_NAME1, volume_info['id'], option)
snaps = self.cl.getSnapshots()
snapshot_info = self.cl.getSnapshot(snaps['members'][0]['id'])
self.assertEqual(snaps['members'][0]['id'], snapshot_info['id'])
self.printFooter('get_snapshot_by_id')
def test_7_get_ip_from_cluster(self):
self.printHeader('get_ip_from_cluster')

View File

@@ -0,0 +1,94 @@
# (c) Copyright 2015 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test HTTPJSONRESTClient class of LeftHand Client(http)"""
import unittest
import mock
import requests
from hpelefthandclient import exceptions
from hpelefthandclient import http
class HTTPJSONRESTClientTestCase(unittest.TestCase):
http = None
def setUp(self):
fake_url = 'http://fake-url:0000'
self.http = http.HTTPJSONRESTClient(fake_url, secure=False,
http_log_debug=True,
suppress_ssl_warnings=False,
timeout=None)
def tearDown(self):
self.http = None
def test_cs_request(self):
url = "fake-url"
method = 'GET'
# Test for HTTPUnauthorized
self.http._time_request = mock.Mock()
ex = exceptions.HTTPUnauthorized()
self.http._time_request.side_effect = ex
self.http._do_reauth = mock.Mock()
resp = 'fake_response'
body = 'fake_body'
self.http._do_reauth.return_value = (resp, body)
self.http._cs_request(url, method)
self.http._time_request.assert_called()
self.http._do_reauth.assert_called_with(url, method, ex)
# Test for HTTPForbidden
ex = exceptions.HTTPForbidden()
self.http._time_request.side_effect = ex
self.http._cs_request(url, method)
self.http._time_request.assert_called()
self.http._do_reauth.assert_called_with(url, method, ex)
def test_do_reauth_exception(self):
url = "fake-url"
method = 'GET'
ex = exceptions.HTTPUnauthorized
self.http.auth_try = 2
self.http._reauth = mock.Mock()
self.http._time_request = mock.Mock()
self.http._time_request.side_effect = ex
self.assertRaises(ex, self.http._do_reauth, url, method, ex)
self.http._reauth.assert_called()
def test_do_reauth_with_auth_try_condition_false(self):
ex = exceptions.HTTPUnauthorized
url = "fake-url"
method = 'GET'
self.http.auth_try = 1
self.assertRaises(ex, self.http._do_reauth, url, method, ex)
def test_request_with_exception(self):
self.http._http_log_req = mock.Mock()
self.http.timeout = 100
retest = mock.Mock()
http_method = 'fake this'
http_url = 'http://fake-url:0000'
with mock.patch('requests.request',
retest, create=True):
retest.side_effect = requests.exceptions.SSLError
self.assertRaises(exceptions.SSLCertFailed, self.http.request,
http_url, http_method)
self.assertEqual(self.http.timeout, 100)