From 175859abd5901ae474eed6ef5bf040eb9023c499 Mon Sep 17 00:00:00 2001 From: Ramakrishnan G Date: Mon, 8 Jun 2015 22:06:15 -0700 Subject: [PATCH] Add support for certificate based authentication This commit adds support for HTTPS certificate based authentication in RIBCL and RIS modules. It adds a new optional argument 'cacert' to IloClient object constructor in which the root cacertificate file may be passed for HTTPS authentication. Change-Id: Iaab525205f4052168e0a37ca5220c26b57f4135c --- proliantutils/ilo/client.py | 8 +- proliantutils/ilo/ribcl.py | 49 ++++-- proliantutils/ilo/ris.py | 74 +++++---- proliantutils/tests/ilo/test_client.py | 25 +++ proliantutils/tests/ilo/test_ribcl.py | 126 ++++++++++++++- proliantutils/tests/ilo/test_ris.py | 215 ++++++++++++------------- requirements.txt | 1 + 7 files changed, 326 insertions(+), 172 deletions(-) diff --git a/proliantutils/ilo/client.py b/proliantutils/ilo/client.py index 353b7e0..48f4c9b 100644 --- a/proliantutils/ilo/client.py +++ b/proliantutils/ilo/client.py @@ -41,10 +41,12 @@ SUPPORTED_RIS_METHODS = [ class IloClient(operations.IloOperations): def __init__(self, host, login, password, timeout=60, port=443, - bios_password=None): + bios_password=None, cacert=None): self.ribcl = ribcl.RIBCLOperations(host, login, password, timeout, - port) - self.ris = ris.RISOperations(host, login, password, bios_password) + port, cacert=cacert) + self.ris = ris.RISOperations(host, login, password, + bios_password=bios_password, + cacert=cacert) self.info = {'address': host, 'username': login, 'password': password} self.model = self.ribcl.get_product_name() diff --git a/proliantutils/ilo/ribcl.py b/proliantutils/ilo/ribcl.py index 404d163..85aae93 100644 --- a/proliantutils/ilo/ribcl.py +++ b/proliantutils/ilo/ribcl.py @@ -21,9 +21,10 @@ import re import xml.etree.ElementTree as etree from oslo_utils import strutils +import requests +from requests.packages import urllib3 +from requests.packages.urllib3 import exceptions as urllib3_exceptions import six -from six.moves.urllib import error as urllib_error -from six.moves.urllib import request from proliantutils import exception from proliantutils.ilo import common @@ -49,12 +50,21 @@ class RIBCLOperations(operations.IloOperations): Implements the base class using RIBCL scripting language to talk to the iLO. """ - def __init__(self, host, login, password, timeout=60, port=443): + def __init__(self, host, login, password, timeout=60, port=443, + cacert=None): self.host = host self.login = login self.password = password self.timeout = timeout self.port = port + self.cacert = cacert + + # By default, requests logs following message if verify=False + # InsecureRequestWarning: Unverified HTTPS request is + # being made. Adding certificate verification is strongly advised. + # Just disable the warning if user intentionally did this. + if self.cacert is None: + urllib3.disable_warnings(urllib3_exceptions.InsecureRequestWarning) def _request_ilo(self, root): """Send RIBCL XML data to iLO. @@ -69,14 +79,19 @@ class RIBCLOperations(operations.IloOperations): else: urlstr = 'https://%s/ribcl' % (self.host) xml = self._serialize_xml(root) + headers = {"Content-length": len(xml)} + kwargs = {'headers': headers, 'data': xml} + if self.cacert is not None: + kwargs['verify'] = self.cacert + else: + kwargs['verify'] = False + try: - req = request.Request(url=urlstr, data=xml.encode('ascii')) - req.add_header("Content-length", len(xml)) - data = request.urlopen(req).read() - except (ValueError, urllib_error.URLError, - urllib_error.HTTPError) as e: + response = requests.post(urlstr, **kwargs) + response.raise_for_status() + except Exception as e: raise exception.IloConnectionError(e) - return data + return response.text def _create_dynamic_xml(self, cmdname, tag_name, mode, subelements=None): """Create RIBCL XML to send to iLO. @@ -528,15 +543,19 @@ class RIBCLOperations(operations.IloOperations): def _request_host(self): """Request host info from the server.""" - urlstr = 'http://%s/xmldata?item=all' % (self.host) + urlstr = 'https://%s/xmldata?item=all' % (self.host) + kwargs = {} + if self.cacert is not None: + kwargs['verify'] = self.cacert + else: + kwargs['verify'] = False try: - req = request.Request(url=urlstr) - xml = request.urlopen(req).read() - except (ValueError, urllib_error.URLError, - urllib_error.HTTPError) as e: + response = requests.get(urlstr, **kwargs) + response.raise_for_status() + except Exception as e: raise IloConnectionError(e) - return xml + return response.text def get_host_uuid(self): """Request host UUID of the server. diff --git a/proliantutils/ilo/ris.py b/proliantutils/ilo/ris.py index 353c974..bd3f999 100755 --- a/proliantutils/ilo/ris.py +++ b/proliantutils/ilo/ris.py @@ -19,8 +19,10 @@ import gzip import hashlib import json +import requests +from requests.packages import urllib3 +from requests.packages.urllib3 import exceptions as urllib3_exceptions import six -from six.moves import http_client from six.moves.urllib import parse as urlparse from proliantutils import exception @@ -35,13 +37,22 @@ TODO : Add rest of the API's that exists in RIBCL. """ class RISOperations(operations.IloOperations): - def __init__(self, host, login, password, bios_password=None): + def __init__(self, host, login, password, bios_password=None, + cacert=None): self.host = host self.login = login self.password = password self.bios_password = bios_password # Message registry support self.message_registries = {} + self.cacert = cacert + + # By default, requests logs following message if verify=False + # InsecureRequestWarning: Unverified HTTPS request is + # being made. Adding certificate verification is strongly advised. + # Just disable the warning if user intentionally did this. + if self.cacert is None: + urllib3.disable_warnings(urllib3_exceptions.InsecureRequestWarning) def _rest_op(self, operation, suburi, request_headers, request_body): """Generic REST Operation handler.""" @@ -62,19 +73,17 @@ class RISOperations(operations.IloOperations): redir_count = 5 while redir_count: - conn = None - if url.scheme == 'https': - conn = http_client.HTTPSConnection(host=url.netloc, - strict=True) - elif url.scheme == 'http': - conn = http_client.HTTPConnection(host=url.netloc, - strict=True) + kwargs = {'headers': request_headers, + 'data': json.dumps(request_body)} + if self.cacert is not None: + kwargs['verify'] = self.cacert + else: + kwargs['verify'] = False + request_method = getattr(requests, operation.lower()) + response = None try: - conn.request(operation, url.path, headers=request_headers, - body=json.dumps(request_body)) - resp = conn.getresponse() - body = resp.read() + response = request_method(url.geturl(), **kwargs) except Exception as e: raise exception.IloConnectionError(e) @@ -86,11 +95,9 @@ class RISOperations(operations.IloOperations): # NOTE: this makes sure the headers names are all lower cases # because HTTP says they are case insensitive - headers = dict((x.lower(), y) for x, y in resp.getheaders()) - # Follow HTTP redirect - if resp.status == 301 and 'location' in headers: - url = urlparse.urlparse(headers['location']) + if response.status_code == 301 and 'location' in response.headers: + url = urlparse.urlparse(response.headers['location']) redir_count -= 1 else: break @@ -100,23 +107,26 @@ class RISOperations(operations.IloOperations): "URL incorrect: %s" % start_url) raise exception.IloConnectionError(msg) - response = dict() - try: - if body: - response = json.loads(body) - except (ValueError, TypeError): - # if it doesn't decode as json - # NOTE: resources may return gzipped content - # try to decode as gzip (we should check the headers for - # Content-Encoding=gzip) + response_body = {} + if response.text: try: - gzipper = gzip.GzipFile(fileobj=six.BytesIO(body)) - uncompressed_string = gzipper.read().decode('UTF-8') - response = json.loads(uncompressed_string) - except Exception as e: - raise exception.IloError(e) + response_body = json.loads(response.text) + except (TypeError, ValueError): + # if it doesn't decode as json + # NOTE: resources may return gzipped content + # try to decode as gzip (we should check the headers for + # Content-Encoding=gzip) + # NOTE: json.loads on python3 raises TypeError when + # response.text is gzipped one. + try: + gzipper = gzip.GzipFile( + fileobj=six.BytesIO(response.text)) + uncompressed_string = gzipper.read().decode('UTF-8') + response_body = json.loads(uncompressed_string) + except Exception as e: + raise exception.IloError(e) - return resp.status, headers, response + return response.status_code, response.headers, response_body def _rest_get(self, suburi, request_headers=None): """REST GET operation. diff --git a/proliantutils/tests/ilo/test_client.py b/proliantutils/tests/ilo/test_client.py index ea66a28..2084dfe 100644 --- a/proliantutils/tests/ilo/test_client.py +++ b/proliantutils/tests/ilo/test_client.py @@ -26,6 +26,31 @@ from proliantutils.ilo import ris from proliantutils.tests.ilo import ribcl_sample_outputs as constants +class IloClientInitTestCase(testtools.TestCase): + + @mock.patch.object(ribcl, 'RIBCLOperations') + @mock.patch.object(ris, 'RISOperations') + def test_init(self, ris_mock, ribcl_mock): + ribcl_obj_mock = mock.MagicMock() + ribcl_mock.return_value = ribcl_obj_mock + ribcl_obj_mock.get_product_name.return_value = 'product' + + c = client.IloClient("1.2.3.4", "admin", "Admin", + timeout=120, port=4430, + bios_password='foo', + cacert='/somewhere') + + ris_mock.assert_called_once_with( + "1.2.3.4", "admin", "Admin", bios_password='foo', + cacert='/somewhere') + ribcl_mock.assert_called_once_with( + "1.2.3.4", "admin", "Admin", 120, 4430, cacert='/somewhere') + self.assertEqual( + {'address': "1.2.3.4", 'username': "admin", 'password': "Admin"}, + c.info) + self.assertEqual('product', c.model) + + class IloClientTestCase(testtools.TestCase): @mock.patch.object(ribcl.RIBCLOperations, 'get_product_name') diff --git a/proliantutils/tests/ilo/test_ribcl.py b/proliantutils/tests/ilo/test_ribcl.py index 9f0ccc0..280adee 100644 --- a/proliantutils/tests/ilo/test_ribcl.py +++ b/proliantutils/tests/ilo/test_ribcl.py @@ -19,6 +19,9 @@ import json import unittest import mock +import requests +from requests.packages import urllib3 +from requests.packages.urllib3 import exceptions as urllib3_exceptions from proliantutils import exception from proliantutils.ilo import common @@ -26,15 +29,93 @@ from proliantutils.ilo import ribcl from proliantutils.tests.ilo import ribcl_sample_outputs as constants +class IloRibclTestCaseInitTestCase(unittest.TestCase): + + @mock.patch.object(urllib3, 'disable_warnings') + def test_init(self, disable_warning_mock): + ribcl_client = ribcl.RIBCLOperations( + "x.x.x.x", "admin", "Admin", 60, 443, cacert='/somepath') + + self.assertEqual(ribcl_client.host, "x.x.x.x") + self.assertEqual(ribcl_client.login, "admin") + self.assertEqual(ribcl_client.password, "Admin") + self.assertEqual(ribcl_client.timeout, 60) + self.assertEqual(ribcl_client.port, 443) + self.assertEqual(ribcl_client.cacert, '/somepath') + + @mock.patch.object(urllib3, 'disable_warnings') + def test_init_without_cacert(self, disable_warning_mock): + ribcl_client = ribcl.RIBCLOperations( + "x.x.x.x", "admin", "Admin", 60, 443) + + self.assertEqual(ribcl_client.host, "x.x.x.x") + self.assertEqual(ribcl_client.login, "admin") + self.assertEqual(ribcl_client.password, "Admin") + self.assertEqual(ribcl_client.timeout, 60) + self.assertEqual(ribcl_client.port, 443) + self.assertIsNone(ribcl_client.cacert) + disable_warning_mock.assert_called_once_with( + urllib3_exceptions.InsecureRequestWarning) + + class IloRibclTestCase(unittest.TestCase): def setUp(self): super(IloRibclTestCase, self).setUp() self.ilo = ribcl.RIBCLOperations("x.x.x.x", "admin", "Admin", 60, 443) - def test__request_ilo_connection_failed(self): + @mock.patch.object(ribcl.RIBCLOperations, '_serialize_xml') + @mock.patch.object(requests, 'post') + def test__request_ilo_without_verify(self, post_mock, serialize_mock): + response_mock = mock.MagicMock(text='returned-text') + serialize_mock.return_value = 'serialized-xml' + post_mock.return_value = response_mock + + retval = self.ilo._request_ilo('xml-obj') + + post_mock.assert_called_once_with( + 'https://x.x.x.x:443/ribcl', + headers={"Content-length": 14}, + data='serialized-xml', + verify=False) + response_mock.raise_for_status.assert_called_once_with() + self.assertEqual('returned-text', retval) + + @mock.patch.object(ribcl.RIBCLOperations, '_serialize_xml') + @mock.patch.object(requests, 'post') + def test__request_ilo_with_verify(self, post_mock, serialize_mock): + self.ilo = ribcl.RIBCLOperations( + "x.x.x.x", "admin", "Admin", 60, 443, + cacert='/somepath') + response_mock = mock.MagicMock(text='returned-text') + serialize_mock.return_value = 'serialized-xml' + post_mock.return_value = response_mock + + retval = self.ilo._request_ilo('xml-obj') + + post_mock.assert_called_once_with( + 'https://x.x.x.x:443/ribcl', + headers={"Content-length": 14}, + data='serialized-xml', + verify='/somepath') + response_mock.raise_for_status.assert_called_once_with() + self.assertEqual('returned-text', retval) + + @mock.patch.object(ribcl.RIBCLOperations, '_serialize_xml') + @mock.patch.object(requests, 'post') + def test__request_ilo_raises(self, post_mock, serialize_mock): + serialize_mock.return_value = 'serialized-xml' + post_mock.side_effect = Exception + self.assertRaises(exception.IloConnectionError, - self.ilo.get_all_licenses) + self.ilo._request_ilo, + 'xml-obj') + + post_mock.assert_called_once_with( + 'https://x.x.x.x:443/ribcl', + headers={"Content-length": 14}, + data='serialized-xml', + verify=False) @mock.patch.object(ribcl.RIBCLOperations, '_request_ilo') def test_login_fail(self, request_ilo_mock): @@ -339,6 +420,43 @@ class IloRibclTestCase(unittest.TestCase): self.assertIn('MINIMUM_POWER_READING', result) self.assertIn('AVERAGE_POWER_READING', result) + @mock.patch.object(requests, 'get') + def test__request_host_with_verify(self, request_mock): + self.ilo = ribcl.RIBCLOperations( + "x.x.x.x", "admin", "Admin", 60, 443, + cacert='/somepath') + response_mock = mock.MagicMock(text='foo') + request_mock.return_value = response_mock + + retval = self.ilo._request_host() + + request_mock.assert_called_once_with( + "https://x.x.x.x/xmldata?item=all", verify='/somepath') + response_mock.raise_for_status.assert_called_once_with() + self.assertEqual('foo', retval) + + @mock.patch.object(requests, 'get') + def test__request_host_without_verify(self, request_mock): + response_mock = mock.MagicMock(text='foo') + request_mock.return_value = response_mock + + retval = self.ilo._request_host() + + request_mock.assert_called_once_with( + "https://x.x.x.x/xmldata?item=all", verify=False) + response_mock.raise_for_status.assert_called_once_with() + self.assertEqual('foo', retval) + + @mock.patch.object(requests, 'get') + def test__request_host_raises(self, request_mock): + request_mock.side_effect = Exception + + self.assertRaises(exception.IloConnectionError, + self.ilo._request_host) + + request_mock.assert_called_once_with( + "https://x.x.x.x/xmldata?item=all", verify=False) + @mock.patch.object(ribcl.RIBCLOperations, '_request_host') def test_get_host_uuid(self, request_host_mock): request_host_mock.return_value = constants.GET_HOST_UUID @@ -543,10 +661,6 @@ class IloRibclTestCaseBeforeRisSupport(unittest.TestCase): super(IloRibclTestCaseBeforeRisSupport, self).setUp() self.ilo = ribcl.IloClient("x.x.x.x", "admin", "Admin", 60, 443) - def test__request_ilo_connection_failed(self): - self.assertRaises(ribcl.IloConnectionError, - self.ilo.get_all_licenses) - @mock.patch.object(ribcl.IloClient, '_request_ilo') def test_login_fail(self, request_ilo_mock): request_ilo_mock.return_value = constants.LOGIN_FAIL_XML diff --git a/proliantutils/tests/ilo/test_ris.py b/proliantutils/tests/ilo/test_ris.py index d942437..f4433bc 100755 --- a/proliantutils/tests/ilo/test_ris.py +++ b/proliantutils/tests/ilo/test_ris.py @@ -19,7 +19,9 @@ import base64 import json import mock -from six.moves import http_client +import requests +from requests.packages import urllib3 +from requests.packages.urllib3 import exceptions as urllib3_exceptions import testtools from proliantutils import exception @@ -28,6 +30,34 @@ from proliantutils.ilo import ris from proliantutils.tests.ilo import ris_sample_outputs as ris_outputs +class IloRisTestCaseInitTestCase(testtools.TestCase): + + @mock.patch.object(urllib3, 'disable_warnings') + def test_init(self, disable_warning_mock): + ris_client = ris.RISOperations( + "x.x.x.x", "admin", "Admin", bios_password='foo', + cacert='/somepath') + + self.assertEqual(ris_client.host, "x.x.x.x") + self.assertEqual(ris_client.login, "admin") + self.assertEqual(ris_client.password, "Admin") + self.assertEqual(ris_client.bios_password, "foo") + self.assertEqual({}, ris_client.message_registries) + self.assertEqual(ris_client.cacert, '/somepath') + + @mock.patch.object(urllib3, 'disable_warnings') + def test_init_without_cacert(self, disable_warning_mock): + ris_client = ris.RISOperations( + "x.x.x.x", "admin", "Admin", bios_password='foo') + + self.assertEqual(ris_client.host, "x.x.x.x") + self.assertEqual(ris_client.login, "admin") + self.assertEqual(ris_client.password, "Admin") + self.assertIsNone(ris_client.cacert) + disable_warning_mock.assert_called_once_with( + urllib3_exceptions.InsecureRequestWarning) + + class IloRisTestCase(testtools.TestCase): def setUp(self): @@ -396,19 +426,15 @@ class TestRISOperationsPrivateMethods(testtools.TestCase): result = self.client._validate_uefi_boot_mode() self.assertFalse(result) - @mock.patch.object(http_client, 'HTTPSConnection') - def test__rest_op_okay(self, https_con_mock): - connection_mock_obj = mock.MagicMock() - response_mock_obj = mock.MagicMock(status=200) - https_con_mock.return_value = connection_mock_obj - connection_mock_obj.getresponse.return_value = response_mock_obj - - sample_response_body = ris_outputs.RESPONSE_BODY_FOR_REST_OP - response_mock_obj.read.return_value = sample_response_body - + @mock.patch.object(requests, 'get') + def test__rest_op_okay(self, request_mock): sample_headers = ris_outputs.HEADERS_FOR_REST_OP - response_mock_obj.getheaders.return_value = sample_headers exp_headers = dict((x.lower(), y) for x, y in sample_headers) + sample_response_body = ris_outputs.RESPONSE_BODY_FOR_REST_OP + response_mock_obj = mock.MagicMock( + status_code=200, text=sample_response_body, + headers=exp_headers) + request_mock.return_value = response_mock_obj status, headers, response = self.client._rest_op( 'GET', '/v1/foo', None, None) @@ -416,152 +442,109 @@ class TestRISOperationsPrivateMethods(testtools.TestCase): self.assertEqual(200, status) self.assertEqual(exp_headers, headers) self.assertEqual(json.loads(sample_response_body), response) - - https_con_mock.assert_called_once_with(host='1.2.3.4', strict=True) - connection_mock_obj.request.assert_called_once_with( - 'GET', '/v1/foo', - # base64 encoded username + password for admin/Admin + request_mock.assert_called_once_with( + 'https://1.2.3.4/v1/foo', headers={'Authorization': 'BASIC YWRtaW46QWRtaW4='}, - body="null") + data="null", verify=False) + + @mock.patch.object(requests, 'get') + def test__rest_op_request_error(self, request_mock): + request_mock.side_effect = RuntimeError("boom") - @mock.patch.object(http_client, 'HTTPSConnection') - def test__rest_op_request_error(self, https_con_mock): - connection_mock_obj = mock.MagicMock() - https_con_mock.return_value = connection_mock_obj - connection_mock_obj.request.side_effect = RuntimeError("boom") exc = self.assertRaises(exception.IloConnectionError, self.client._rest_op, 'GET', '/v1/foo', {}, None) - https_con_mock.assert_called_once_with(host='1.2.3.4', strict=True) - self.assertIn("boom", str(exc)) - @mock.patch.object(http_client, 'HTTPSConnection') - def test__rest_op_get_response_error(self, https_con_mock): - connection_mock_obj = mock.MagicMock() - https_con_mock.return_value = connection_mock_obj - connection_mock_obj.getresponse.side_effect = RuntimeError("boom") - exc = self.assertRaises(exception.IloConnectionError, - self.client._rest_op, - 'GET', '/v1/foo', None, None) - https_con_mock.assert_called_once_with(host='1.2.3.4', strict=True) - connection_mock_obj.request.assert_called_once_with( - 'GET', '/v1/foo', + request_mock.assert_called_once_with( + 'https://1.2.3.4/v1/foo', headers={'Authorization': 'BASIC YWRtaW46QWRtaW4='}, - body="null") + data="null", verify=False) self.assertIn("boom", str(exc)) - @mock.patch.object(http_client, 'HTTPSConnection') - def test__rest_op_response_read_error(self, https_con_mock): - connection_mock_obj = mock.MagicMock() - response_mock_obj = mock.MagicMock(status=200) - https_con_mock.return_value = connection_mock_obj - connection_mock_obj.getresponse.return_value = response_mock_obj - response_mock_obj.read.side_effect = RuntimeError("boom") - exc = self.assertRaises(exception.IloConnectionError, - self.client._rest_op, - 'GET', '/v1/foo', None, None) - self.assertIn("boom", str(exc)) - - @mock.patch.object(http_client, 'HTTPSConnection') - def test__rest_op_continous_redirection(self, https_con_mock): - connection_mock_obj = mock.MagicMock() - response_mock_obj = mock.MagicMock(status=301) - https_con_mock.side_effect = [connection_mock_obj, - connection_mock_obj, - connection_mock_obj, - connection_mock_obj, - connection_mock_obj] - - connection_mock_obj.getresponse.return_value = response_mock_obj - + @mock.patch.object(requests, 'get') + def test__rest_op_continous_redirection(self, request_mock): sample_response_body = ris_outputs.RESPONSE_BODY_FOR_REST_OP - response_mock_obj.read.return_value = sample_response_body - sample_headers = ris_outputs.HEADERS_FOR_REST_OP sample_headers.append(('location', 'https://foo')) - response_mock_obj.getheaders.return_value = sample_headers + exp_headers = dict((x.lower(), y) for x, y in sample_headers) + response_mock_obj = mock.MagicMock( + status_code=301, text=sample_response_body, + headers=exp_headers) + request_mock.side_effect = [response_mock_obj, + response_mock_obj, + response_mock_obj, + response_mock_obj, + response_mock_obj] exc = self.assertRaises(exception.IloConnectionError, self.client._rest_op, 'GET', '/v1/foo', {}, None) - self.assertEqual(5, https_con_mock.call_count) - self.assertEqual(5, connection_mock_obj.request.call_count) + + self.assertEqual(5, request_mock.call_count) self.assertIn('https://1.2.3.4/v1/foo', str(exc)) - @mock.patch.object(http_client, 'HTTPConnection') - @mock.patch.object(http_client, 'HTTPSConnection') - def test__rest_op_one_redirection(self, https_con_mock, - http_con_mock): - connection_mock_obj = mock.MagicMock() - response_mock_obj1 = mock.MagicMock(status=301) - response_mock_obj2 = mock.MagicMock(status=200) - https_con_mock.return_value = connection_mock_obj - http_con_mock.return_value = connection_mock_obj - connection_mock_obj.getresponse.side_effect = [response_mock_obj1, - response_mock_obj2] - + @mock.patch.object(requests, 'get') + def test__rest_op_one_redirection(self, request_mock): sample_response_body = ris_outputs.RESPONSE_BODY_FOR_REST_OP - response_mock_obj1.read.return_value = sample_response_body - response_mock_obj2.read.return_value = sample_response_body - sample_headers1 = ris_outputs.HEADERS_FOR_REST_OP sample_headers2 = ris_outputs.HEADERS_FOR_REST_OP - sample_headers1.append(('location', 'http://5.6.7.8/v1/foo')) - response_mock_obj1.getheaders.return_value = sample_headers1 - response_mock_obj2.getheaders.return_value = sample_headers2 + sample_headers1.append(('location', 'https://5.6.7.8/v1/foo')) + exp_headers1 = dict((x.lower(), y) for x, y in sample_headers1) + exp_headers2 = dict((x.lower(), y) for x, y in sample_headers2) + response_mock_obj1 = mock.MagicMock( + status_code=301, text=sample_response_body, + headers=exp_headers1) + response_mock_obj2 = mock.MagicMock( + status_code=200, text=sample_response_body, + headers=exp_headers2) + request_mock.side_effect = [response_mock_obj1, + response_mock_obj2] status, headers, response = self.client._rest_op( 'GET', '/v1/foo', {}, None) - exp_headers = dict((x.lower(), y) for x, y in sample_headers1) + exp_headers = dict((x.lower(), y) for x, y in sample_headers2) self.assertEqual(200, status) self.assertEqual(exp_headers, headers) self.assertEqual(json.loads(sample_response_body), response) + request_mock.assert_has_calls([ + mock.call('https://1.2.3.4/v1/foo', + headers={'Authorization': 'BASIC YWRtaW46QWRtaW4='}, + data="null", verify=False), + mock.call('https://5.6.7.8/v1/foo', + headers={'Authorization': 'BASIC YWRtaW46QWRtaW4='}, + data="null", verify=False)]) - https_con_mock.assert_any_call(host='1.2.3.4', strict=True) - http_con_mock.assert_any_call(host='5.6.7.8', strict=True) - self.assertEqual(2, connection_mock_obj.request.call_count) - self.assertTrue(response_mock_obj1.read.called) - self.assertTrue(response_mock_obj2.read.called) - - @mock.patch.object(http_client, 'HTTPSConnection') - def test__rest_op_response_decode_error(self, https_con_mock): - connection_mock_obj = mock.MagicMock() - response_mock_obj = mock.MagicMock(status=200) - https_con_mock.return_value = connection_mock_obj - connection_mock_obj.getresponse.return_value = response_mock_obj - + @mock.patch.object(requests, 'get') + def test__rest_op_response_decode_error(self, request_mock): sample_response_body = "{[wrong json" - response_mock_obj.read.return_value = sample_response_body - sample_headers = ris_outputs.HEADERS_FOR_REST_OP - response_mock_obj.getheaders.return_value = sample_headers + exp_headers = dict((x.lower(), y) for x, y in sample_headers) + response_mock_obj = mock.MagicMock( + status_code=200, text=sample_response_body, + headers=exp_headers) + request_mock.return_value = response_mock_obj self.assertRaises(exception.IloError, self.client._rest_op, 'GET', '/v1/foo', {}, None) - https_con_mock.assert_called_once_with(host='1.2.3.4', strict=True) - connection_mock_obj.request.assert_called_once_with( - 'GET', '/v1/foo', - # base64 encoded username + password for admin/Admin + + request_mock.assert_called_once_with( + 'https://1.2.3.4/v1/foo', headers={'Authorization': 'BASIC YWRtaW46QWRtaW4='}, - body="null") - - @mock.patch.object(http_client, 'HTTPSConnection') - def test__rest_op_response_gzipped_response(self, https_con_mock): - connection_mock_obj = mock.MagicMock() - response_mock_obj = mock.MagicMock(status=200) - https_con_mock.return_value = connection_mock_obj - connection_mock_obj.getresponse.return_value = response_mock_obj + data="null", verify=False) + @mock.patch.object(requests, 'get') + def test__rest_op_response_gzipped_response(self, request_mock): sample_response_body = ris_outputs.RESPONSE_BODY_FOR_REST_OP gzipped_response_body = base64.b64decode( ris_outputs.BASE64_GZIPPED_RESPONSE) - response_mock_obj.read.return_value = gzipped_response_body - sample_headers = ris_outputs.HEADERS_FOR_REST_OP - response_mock_obj.getheaders.return_value = sample_headers exp_headers = dict((x.lower(), y) for x, y in sample_headers) + response_mock_obj = mock.MagicMock( + status_code=200, text=gzipped_response_body, + headers=exp_headers) + request_mock.return_value = response_mock_obj status, headers, response = self.client._rest_op( 'GET', '/v1/foo', {}, None) diff --git a/requirements.txt b/requirements.txt index ca607e0..9dccded 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ six>=1.9.0 oslo.concurrency>=1.8.0 oslo.utils>=1.4.0 jsonschema>=2.4.0 +requests>=2.7.0