Add support for certificate based authentication

This commit adds support for HTTPS certificate based
authentication in RIBCL and RIS modules. It adds
a new optional argument 'cacert' to IloClient
object constructor in which the root cacertificate
file may be passed for HTTPS authentication.

Change-Id: Iaab525205f4052168e0a37ca5220c26b57f4135c
This commit is contained in:
Ramakrishnan G 2015-06-08 22:06:15 -07:00
parent 32542bb1cd
commit 175859abd5
7 changed files with 326 additions and 172 deletions

View File

@ -41,10 +41,12 @@ SUPPORTED_RIS_METHODS = [
class IloClient(operations.IloOperations):
def __init__(self, host, login, password, timeout=60, port=443,
bios_password=None):
bios_password=None, cacert=None):
self.ribcl = ribcl.RIBCLOperations(host, login, password, timeout,
port)
self.ris = ris.RISOperations(host, login, password, bios_password)
port, cacert=cacert)
self.ris = ris.RISOperations(host, login, password,
bios_password=bios_password,
cacert=cacert)
self.info = {'address': host, 'username': login, 'password': password}
self.model = self.ribcl.get_product_name()

View File

@ -21,9 +21,10 @@ import re
import xml.etree.ElementTree as etree
from oslo_utils import strutils
import requests
from requests.packages import urllib3
from requests.packages.urllib3 import exceptions as urllib3_exceptions
import six
from six.moves.urllib import error as urllib_error
from six.moves.urllib import request
from proliantutils import exception
from proliantutils.ilo import common
@ -49,12 +50,21 @@ class RIBCLOperations(operations.IloOperations):
Implements the base class using RIBCL scripting language to talk
to the iLO.
"""
def __init__(self, host, login, password, timeout=60, port=443):
def __init__(self, host, login, password, timeout=60, port=443,
cacert=None):
self.host = host
self.login = login
self.password = password
self.timeout = timeout
self.port = port
self.cacert = cacert
# By default, requests logs following message if verify=False
# InsecureRequestWarning: Unverified HTTPS request is
# being made. Adding certificate verification is strongly advised.
# Just disable the warning if user intentionally did this.
if self.cacert is None:
urllib3.disable_warnings(urllib3_exceptions.InsecureRequestWarning)
def _request_ilo(self, root):
"""Send RIBCL XML data to iLO.
@ -69,14 +79,19 @@ class RIBCLOperations(operations.IloOperations):
else:
urlstr = 'https://%s/ribcl' % (self.host)
xml = self._serialize_xml(root)
headers = {"Content-length": len(xml)}
kwargs = {'headers': headers, 'data': xml}
if self.cacert is not None:
kwargs['verify'] = self.cacert
else:
kwargs['verify'] = False
try:
req = request.Request(url=urlstr, data=xml.encode('ascii'))
req.add_header("Content-length", len(xml))
data = request.urlopen(req).read()
except (ValueError, urllib_error.URLError,
urllib_error.HTTPError) as e:
response = requests.post(urlstr, **kwargs)
response.raise_for_status()
except Exception as e:
raise exception.IloConnectionError(e)
return data
return response.text
def _create_dynamic_xml(self, cmdname, tag_name, mode, subelements=None):
"""Create RIBCL XML to send to iLO.
@ -528,15 +543,19 @@ class RIBCLOperations(operations.IloOperations):
def _request_host(self):
"""Request host info from the server."""
urlstr = 'http://%s/xmldata?item=all' % (self.host)
urlstr = 'https://%s/xmldata?item=all' % (self.host)
kwargs = {}
if self.cacert is not None:
kwargs['verify'] = self.cacert
else:
kwargs['verify'] = False
try:
req = request.Request(url=urlstr)
xml = request.urlopen(req).read()
except (ValueError, urllib_error.URLError,
urllib_error.HTTPError) as e:
response = requests.get(urlstr, **kwargs)
response.raise_for_status()
except Exception as e:
raise IloConnectionError(e)
return xml
return response.text
def get_host_uuid(self):
"""Request host UUID of the server.

View File

@ -19,8 +19,10 @@ import gzip
import hashlib
import json
import requests
from requests.packages import urllib3
from requests.packages.urllib3 import exceptions as urllib3_exceptions
import six
from six.moves import http_client
from six.moves.urllib import parse as urlparse
from proliantutils import exception
@ -35,13 +37,22 @@ TODO : Add rest of the API's that exists in RIBCL. """
class RISOperations(operations.IloOperations):
def __init__(self, host, login, password, bios_password=None):
def __init__(self, host, login, password, bios_password=None,
cacert=None):
self.host = host
self.login = login
self.password = password
self.bios_password = bios_password
# Message registry support
self.message_registries = {}
self.cacert = cacert
# By default, requests logs following message if verify=False
# InsecureRequestWarning: Unverified HTTPS request is
# being made. Adding certificate verification is strongly advised.
# Just disable the warning if user intentionally did this.
if self.cacert is None:
urllib3.disable_warnings(urllib3_exceptions.InsecureRequestWarning)
def _rest_op(self, operation, suburi, request_headers, request_body):
"""Generic REST Operation handler."""
@ -62,19 +73,17 @@ class RISOperations(operations.IloOperations):
redir_count = 5
while redir_count:
conn = None
if url.scheme == 'https':
conn = http_client.HTTPSConnection(host=url.netloc,
strict=True)
elif url.scheme == 'http':
conn = http_client.HTTPConnection(host=url.netloc,
strict=True)
kwargs = {'headers': request_headers,
'data': json.dumps(request_body)}
if self.cacert is not None:
kwargs['verify'] = self.cacert
else:
kwargs['verify'] = False
request_method = getattr(requests, operation.lower())
response = None
try:
conn.request(operation, url.path, headers=request_headers,
body=json.dumps(request_body))
resp = conn.getresponse()
body = resp.read()
response = request_method(url.geturl(), **kwargs)
except Exception as e:
raise exception.IloConnectionError(e)
@ -86,11 +95,9 @@ class RISOperations(operations.IloOperations):
# NOTE: this makes sure the headers names are all lower cases
# because HTTP says they are case insensitive
headers = dict((x.lower(), y) for x, y in resp.getheaders())
# Follow HTTP redirect
if resp.status == 301 and 'location' in headers:
url = urlparse.urlparse(headers['location'])
if response.status_code == 301 and 'location' in response.headers:
url = urlparse.urlparse(response.headers['location'])
redir_count -= 1
else:
break
@ -100,23 +107,26 @@ class RISOperations(operations.IloOperations):
"URL incorrect: %s" % start_url)
raise exception.IloConnectionError(msg)
response = dict()
try:
if body:
response = json.loads(body)
except (ValueError, TypeError):
# if it doesn't decode as json
# NOTE: resources may return gzipped content
# try to decode as gzip (we should check the headers for
# Content-Encoding=gzip)
response_body = {}
if response.text:
try:
gzipper = gzip.GzipFile(fileobj=six.BytesIO(body))
uncompressed_string = gzipper.read().decode('UTF-8')
response = json.loads(uncompressed_string)
except Exception as e:
raise exception.IloError(e)
response_body = json.loads(response.text)
except (TypeError, ValueError):
# if it doesn't decode as json
# NOTE: resources may return gzipped content
# try to decode as gzip (we should check the headers for
# Content-Encoding=gzip)
# NOTE: json.loads on python3 raises TypeError when
# response.text is gzipped one.
try:
gzipper = gzip.GzipFile(
fileobj=six.BytesIO(response.text))
uncompressed_string = gzipper.read().decode('UTF-8')
response_body = json.loads(uncompressed_string)
except Exception as e:
raise exception.IloError(e)
return resp.status, headers, response
return response.status_code, response.headers, response_body
def _rest_get(self, suburi, request_headers=None):
"""REST GET operation.

View File

@ -26,6 +26,31 @@ from proliantutils.ilo import ris
from proliantutils.tests.ilo import ribcl_sample_outputs as constants
class IloClientInitTestCase(testtools.TestCase):
@mock.patch.object(ribcl, 'RIBCLOperations')
@mock.patch.object(ris, 'RISOperations')
def test_init(self, ris_mock, ribcl_mock):
ribcl_obj_mock = mock.MagicMock()
ribcl_mock.return_value = ribcl_obj_mock
ribcl_obj_mock.get_product_name.return_value = 'product'
c = client.IloClient("1.2.3.4", "admin", "Admin",
timeout=120, port=4430,
bios_password='foo',
cacert='/somewhere')
ris_mock.assert_called_once_with(
"1.2.3.4", "admin", "Admin", bios_password='foo',
cacert='/somewhere')
ribcl_mock.assert_called_once_with(
"1.2.3.4", "admin", "Admin", 120, 4430, cacert='/somewhere')
self.assertEqual(
{'address': "1.2.3.4", 'username': "admin", 'password': "Admin"},
c.info)
self.assertEqual('product', c.model)
class IloClientTestCase(testtools.TestCase):
@mock.patch.object(ribcl.RIBCLOperations, 'get_product_name')

View File

@ -19,6 +19,9 @@ import json
import unittest
import mock
import requests
from requests.packages import urllib3
from requests.packages.urllib3 import exceptions as urllib3_exceptions
from proliantutils import exception
from proliantutils.ilo import common
@ -26,15 +29,93 @@ from proliantutils.ilo import ribcl
from proliantutils.tests.ilo import ribcl_sample_outputs as constants
class IloRibclTestCaseInitTestCase(unittest.TestCase):
@mock.patch.object(urllib3, 'disable_warnings')
def test_init(self, disable_warning_mock):
ribcl_client = ribcl.RIBCLOperations(
"x.x.x.x", "admin", "Admin", 60, 443, cacert='/somepath')
self.assertEqual(ribcl_client.host, "x.x.x.x")
self.assertEqual(ribcl_client.login, "admin")
self.assertEqual(ribcl_client.password, "Admin")
self.assertEqual(ribcl_client.timeout, 60)
self.assertEqual(ribcl_client.port, 443)
self.assertEqual(ribcl_client.cacert, '/somepath')
@mock.patch.object(urllib3, 'disable_warnings')
def test_init_without_cacert(self, disable_warning_mock):
ribcl_client = ribcl.RIBCLOperations(
"x.x.x.x", "admin", "Admin", 60, 443)
self.assertEqual(ribcl_client.host, "x.x.x.x")
self.assertEqual(ribcl_client.login, "admin")
self.assertEqual(ribcl_client.password, "Admin")
self.assertEqual(ribcl_client.timeout, 60)
self.assertEqual(ribcl_client.port, 443)
self.assertIsNone(ribcl_client.cacert)
disable_warning_mock.assert_called_once_with(
urllib3_exceptions.InsecureRequestWarning)
class IloRibclTestCase(unittest.TestCase):
def setUp(self):
super(IloRibclTestCase, self).setUp()
self.ilo = ribcl.RIBCLOperations("x.x.x.x", "admin", "Admin", 60, 443)
def test__request_ilo_connection_failed(self):
@mock.patch.object(ribcl.RIBCLOperations, '_serialize_xml')
@mock.patch.object(requests, 'post')
def test__request_ilo_without_verify(self, post_mock, serialize_mock):
response_mock = mock.MagicMock(text='returned-text')
serialize_mock.return_value = 'serialized-xml'
post_mock.return_value = response_mock
retval = self.ilo._request_ilo('xml-obj')
post_mock.assert_called_once_with(
'https://x.x.x.x:443/ribcl',
headers={"Content-length": 14},
data='serialized-xml',
verify=False)
response_mock.raise_for_status.assert_called_once_with()
self.assertEqual('returned-text', retval)
@mock.patch.object(ribcl.RIBCLOperations, '_serialize_xml')
@mock.patch.object(requests, 'post')
def test__request_ilo_with_verify(self, post_mock, serialize_mock):
self.ilo = ribcl.RIBCLOperations(
"x.x.x.x", "admin", "Admin", 60, 443,
cacert='/somepath')
response_mock = mock.MagicMock(text='returned-text')
serialize_mock.return_value = 'serialized-xml'
post_mock.return_value = response_mock
retval = self.ilo._request_ilo('xml-obj')
post_mock.assert_called_once_with(
'https://x.x.x.x:443/ribcl',
headers={"Content-length": 14},
data='serialized-xml',
verify='/somepath')
response_mock.raise_for_status.assert_called_once_with()
self.assertEqual('returned-text', retval)
@mock.patch.object(ribcl.RIBCLOperations, '_serialize_xml')
@mock.patch.object(requests, 'post')
def test__request_ilo_raises(self, post_mock, serialize_mock):
serialize_mock.return_value = 'serialized-xml'
post_mock.side_effect = Exception
self.assertRaises(exception.IloConnectionError,
self.ilo.get_all_licenses)
self.ilo._request_ilo,
'xml-obj')
post_mock.assert_called_once_with(
'https://x.x.x.x:443/ribcl',
headers={"Content-length": 14},
data='serialized-xml',
verify=False)
@mock.patch.object(ribcl.RIBCLOperations, '_request_ilo')
def test_login_fail(self, request_ilo_mock):
@ -339,6 +420,43 @@ class IloRibclTestCase(unittest.TestCase):
self.assertIn('MINIMUM_POWER_READING', result)
self.assertIn('AVERAGE_POWER_READING', result)
@mock.patch.object(requests, 'get')
def test__request_host_with_verify(self, request_mock):
self.ilo = ribcl.RIBCLOperations(
"x.x.x.x", "admin", "Admin", 60, 443,
cacert='/somepath')
response_mock = mock.MagicMock(text='foo')
request_mock.return_value = response_mock
retval = self.ilo._request_host()
request_mock.assert_called_once_with(
"https://x.x.x.x/xmldata?item=all", verify='/somepath')
response_mock.raise_for_status.assert_called_once_with()
self.assertEqual('foo', retval)
@mock.patch.object(requests, 'get')
def test__request_host_without_verify(self, request_mock):
response_mock = mock.MagicMock(text='foo')
request_mock.return_value = response_mock
retval = self.ilo._request_host()
request_mock.assert_called_once_with(
"https://x.x.x.x/xmldata?item=all", verify=False)
response_mock.raise_for_status.assert_called_once_with()
self.assertEqual('foo', retval)
@mock.patch.object(requests, 'get')
def test__request_host_raises(self, request_mock):
request_mock.side_effect = Exception
self.assertRaises(exception.IloConnectionError,
self.ilo._request_host)
request_mock.assert_called_once_with(
"https://x.x.x.x/xmldata?item=all", verify=False)
@mock.patch.object(ribcl.RIBCLOperations, '_request_host')
def test_get_host_uuid(self, request_host_mock):
request_host_mock.return_value = constants.GET_HOST_UUID
@ -543,10 +661,6 @@ class IloRibclTestCaseBeforeRisSupport(unittest.TestCase):
super(IloRibclTestCaseBeforeRisSupport, self).setUp()
self.ilo = ribcl.IloClient("x.x.x.x", "admin", "Admin", 60, 443)
def test__request_ilo_connection_failed(self):
self.assertRaises(ribcl.IloConnectionError,
self.ilo.get_all_licenses)
@mock.patch.object(ribcl.IloClient, '_request_ilo')
def test_login_fail(self, request_ilo_mock):
request_ilo_mock.return_value = constants.LOGIN_FAIL_XML

View File

@ -19,7 +19,9 @@ import base64
import json
import mock
from six.moves import http_client
import requests
from requests.packages import urllib3
from requests.packages.urllib3 import exceptions as urllib3_exceptions
import testtools
from proliantutils import exception
@ -28,6 +30,34 @@ from proliantutils.ilo import ris
from proliantutils.tests.ilo import ris_sample_outputs as ris_outputs
class IloRisTestCaseInitTestCase(testtools.TestCase):
@mock.patch.object(urllib3, 'disable_warnings')
def test_init(self, disable_warning_mock):
ris_client = ris.RISOperations(
"x.x.x.x", "admin", "Admin", bios_password='foo',
cacert='/somepath')
self.assertEqual(ris_client.host, "x.x.x.x")
self.assertEqual(ris_client.login, "admin")
self.assertEqual(ris_client.password, "Admin")
self.assertEqual(ris_client.bios_password, "foo")
self.assertEqual({}, ris_client.message_registries)
self.assertEqual(ris_client.cacert, '/somepath')
@mock.patch.object(urllib3, 'disable_warnings')
def test_init_without_cacert(self, disable_warning_mock):
ris_client = ris.RISOperations(
"x.x.x.x", "admin", "Admin", bios_password='foo')
self.assertEqual(ris_client.host, "x.x.x.x")
self.assertEqual(ris_client.login, "admin")
self.assertEqual(ris_client.password, "Admin")
self.assertIsNone(ris_client.cacert)
disable_warning_mock.assert_called_once_with(
urllib3_exceptions.InsecureRequestWarning)
class IloRisTestCase(testtools.TestCase):
def setUp(self):
@ -396,19 +426,15 @@ class TestRISOperationsPrivateMethods(testtools.TestCase):
result = self.client._validate_uefi_boot_mode()
self.assertFalse(result)
@mock.patch.object(http_client, 'HTTPSConnection')
def test__rest_op_okay(self, https_con_mock):
connection_mock_obj = mock.MagicMock()
response_mock_obj = mock.MagicMock(status=200)
https_con_mock.return_value = connection_mock_obj
connection_mock_obj.getresponse.return_value = response_mock_obj
sample_response_body = ris_outputs.RESPONSE_BODY_FOR_REST_OP
response_mock_obj.read.return_value = sample_response_body
@mock.patch.object(requests, 'get')
def test__rest_op_okay(self, request_mock):
sample_headers = ris_outputs.HEADERS_FOR_REST_OP
response_mock_obj.getheaders.return_value = sample_headers
exp_headers = dict((x.lower(), y) for x, y in sample_headers)
sample_response_body = ris_outputs.RESPONSE_BODY_FOR_REST_OP
response_mock_obj = mock.MagicMock(
status_code=200, text=sample_response_body,
headers=exp_headers)
request_mock.return_value = response_mock_obj
status, headers, response = self.client._rest_op(
'GET', '/v1/foo', None, None)
@ -416,152 +442,109 @@ class TestRISOperationsPrivateMethods(testtools.TestCase):
self.assertEqual(200, status)
self.assertEqual(exp_headers, headers)
self.assertEqual(json.loads(sample_response_body), response)
https_con_mock.assert_called_once_with(host='1.2.3.4', strict=True)
connection_mock_obj.request.assert_called_once_with(
'GET', '/v1/foo',
# base64 encoded username + password for admin/Admin
request_mock.assert_called_once_with(
'https://1.2.3.4/v1/foo',
headers={'Authorization': 'BASIC YWRtaW46QWRtaW4='},
body="null")
data="null", verify=False)
@mock.patch.object(requests, 'get')
def test__rest_op_request_error(self, request_mock):
request_mock.side_effect = RuntimeError("boom")
@mock.patch.object(http_client, 'HTTPSConnection')
def test__rest_op_request_error(self, https_con_mock):
connection_mock_obj = mock.MagicMock()
https_con_mock.return_value = connection_mock_obj
connection_mock_obj.request.side_effect = RuntimeError("boom")
exc = self.assertRaises(exception.IloConnectionError,
self.client._rest_op,
'GET', '/v1/foo', {}, None)
https_con_mock.assert_called_once_with(host='1.2.3.4', strict=True)
self.assertIn("boom", str(exc))
@mock.patch.object(http_client, 'HTTPSConnection')
def test__rest_op_get_response_error(self, https_con_mock):
connection_mock_obj = mock.MagicMock()
https_con_mock.return_value = connection_mock_obj
connection_mock_obj.getresponse.side_effect = RuntimeError("boom")
exc = self.assertRaises(exception.IloConnectionError,
self.client._rest_op,
'GET', '/v1/foo', None, None)
https_con_mock.assert_called_once_with(host='1.2.3.4', strict=True)
connection_mock_obj.request.assert_called_once_with(
'GET', '/v1/foo',
request_mock.assert_called_once_with(
'https://1.2.3.4/v1/foo',
headers={'Authorization': 'BASIC YWRtaW46QWRtaW4='},
body="null")
data="null", verify=False)
self.assertIn("boom", str(exc))
@mock.patch.object(http_client, 'HTTPSConnection')
def test__rest_op_response_read_error(self, https_con_mock):
connection_mock_obj = mock.MagicMock()
response_mock_obj = mock.MagicMock(status=200)
https_con_mock.return_value = connection_mock_obj
connection_mock_obj.getresponse.return_value = response_mock_obj
response_mock_obj.read.side_effect = RuntimeError("boom")
exc = self.assertRaises(exception.IloConnectionError,
self.client._rest_op,
'GET', '/v1/foo', None, None)
self.assertIn("boom", str(exc))
@mock.patch.object(http_client, 'HTTPSConnection')
def test__rest_op_continous_redirection(self, https_con_mock):
connection_mock_obj = mock.MagicMock()
response_mock_obj = mock.MagicMock(status=301)
https_con_mock.side_effect = [connection_mock_obj,
connection_mock_obj,
connection_mock_obj,
connection_mock_obj,
connection_mock_obj]
connection_mock_obj.getresponse.return_value = response_mock_obj
@mock.patch.object(requests, 'get')
def test__rest_op_continous_redirection(self, request_mock):
sample_response_body = ris_outputs.RESPONSE_BODY_FOR_REST_OP
response_mock_obj.read.return_value = sample_response_body
sample_headers = ris_outputs.HEADERS_FOR_REST_OP
sample_headers.append(('location', 'https://foo'))
response_mock_obj.getheaders.return_value = sample_headers
exp_headers = dict((x.lower(), y) for x, y in sample_headers)
response_mock_obj = mock.MagicMock(
status_code=301, text=sample_response_body,
headers=exp_headers)
request_mock.side_effect = [response_mock_obj,
response_mock_obj,
response_mock_obj,
response_mock_obj,
response_mock_obj]
exc = self.assertRaises(exception.IloConnectionError,
self.client._rest_op,
'GET', '/v1/foo', {}, None)
self.assertEqual(5, https_con_mock.call_count)
self.assertEqual(5, connection_mock_obj.request.call_count)
self.assertEqual(5, request_mock.call_count)
self.assertIn('https://1.2.3.4/v1/foo', str(exc))
@mock.patch.object(http_client, 'HTTPConnection')
@mock.patch.object(http_client, 'HTTPSConnection')
def test__rest_op_one_redirection(self, https_con_mock,
http_con_mock):
connection_mock_obj = mock.MagicMock()
response_mock_obj1 = mock.MagicMock(status=301)
response_mock_obj2 = mock.MagicMock(status=200)
https_con_mock.return_value = connection_mock_obj
http_con_mock.return_value = connection_mock_obj
connection_mock_obj.getresponse.side_effect = [response_mock_obj1,
response_mock_obj2]
@mock.patch.object(requests, 'get')
def test__rest_op_one_redirection(self, request_mock):
sample_response_body = ris_outputs.RESPONSE_BODY_FOR_REST_OP
response_mock_obj1.read.return_value = sample_response_body
response_mock_obj2.read.return_value = sample_response_body
sample_headers1 = ris_outputs.HEADERS_FOR_REST_OP
sample_headers2 = ris_outputs.HEADERS_FOR_REST_OP
sample_headers1.append(('location', 'http://5.6.7.8/v1/foo'))
response_mock_obj1.getheaders.return_value = sample_headers1
response_mock_obj2.getheaders.return_value = sample_headers2
sample_headers1.append(('location', 'https://5.6.7.8/v1/foo'))
exp_headers1 = dict((x.lower(), y) for x, y in sample_headers1)
exp_headers2 = dict((x.lower(), y) for x, y in sample_headers2)
response_mock_obj1 = mock.MagicMock(
status_code=301, text=sample_response_body,
headers=exp_headers1)
response_mock_obj2 = mock.MagicMock(
status_code=200, text=sample_response_body,
headers=exp_headers2)
request_mock.side_effect = [response_mock_obj1,
response_mock_obj2]
status, headers, response = self.client._rest_op(
'GET', '/v1/foo', {}, None)
exp_headers = dict((x.lower(), y) for x, y in sample_headers1)
exp_headers = dict((x.lower(), y) for x, y in sample_headers2)
self.assertEqual(200, status)
self.assertEqual(exp_headers, headers)
self.assertEqual(json.loads(sample_response_body), response)
request_mock.assert_has_calls([
mock.call('https://1.2.3.4/v1/foo',
headers={'Authorization': 'BASIC YWRtaW46QWRtaW4='},
data="null", verify=False),
mock.call('https://5.6.7.8/v1/foo',
headers={'Authorization': 'BASIC YWRtaW46QWRtaW4='},
data="null", verify=False)])
https_con_mock.assert_any_call(host='1.2.3.4', strict=True)
http_con_mock.assert_any_call(host='5.6.7.8', strict=True)
self.assertEqual(2, connection_mock_obj.request.call_count)
self.assertTrue(response_mock_obj1.read.called)
self.assertTrue(response_mock_obj2.read.called)
@mock.patch.object(http_client, 'HTTPSConnection')
def test__rest_op_response_decode_error(self, https_con_mock):
connection_mock_obj = mock.MagicMock()
response_mock_obj = mock.MagicMock(status=200)
https_con_mock.return_value = connection_mock_obj
connection_mock_obj.getresponse.return_value = response_mock_obj
@mock.patch.object(requests, 'get')
def test__rest_op_response_decode_error(self, request_mock):
sample_response_body = "{[wrong json"
response_mock_obj.read.return_value = sample_response_body
sample_headers = ris_outputs.HEADERS_FOR_REST_OP
response_mock_obj.getheaders.return_value = sample_headers
exp_headers = dict((x.lower(), y) for x, y in sample_headers)
response_mock_obj = mock.MagicMock(
status_code=200, text=sample_response_body,
headers=exp_headers)
request_mock.return_value = response_mock_obj
self.assertRaises(exception.IloError,
self.client._rest_op,
'GET', '/v1/foo', {}, None)
https_con_mock.assert_called_once_with(host='1.2.3.4', strict=True)
connection_mock_obj.request.assert_called_once_with(
'GET', '/v1/foo',
# base64 encoded username + password for admin/Admin
request_mock.assert_called_once_with(
'https://1.2.3.4/v1/foo',
headers={'Authorization': 'BASIC YWRtaW46QWRtaW4='},
body="null")
@mock.patch.object(http_client, 'HTTPSConnection')
def test__rest_op_response_gzipped_response(self, https_con_mock):
connection_mock_obj = mock.MagicMock()
response_mock_obj = mock.MagicMock(status=200)
https_con_mock.return_value = connection_mock_obj
connection_mock_obj.getresponse.return_value = response_mock_obj
data="null", verify=False)
@mock.patch.object(requests, 'get')
def test__rest_op_response_gzipped_response(self, request_mock):
sample_response_body = ris_outputs.RESPONSE_BODY_FOR_REST_OP
gzipped_response_body = base64.b64decode(
ris_outputs.BASE64_GZIPPED_RESPONSE)
response_mock_obj.read.return_value = gzipped_response_body
sample_headers = ris_outputs.HEADERS_FOR_REST_OP
response_mock_obj.getheaders.return_value = sample_headers
exp_headers = dict((x.lower(), y) for x, y in sample_headers)
response_mock_obj = mock.MagicMock(
status_code=200, text=gzipped_response_body,
headers=exp_headers)
request_mock.return_value = response_mock_obj
status, headers, response = self.client._rest_op(
'GET', '/v1/foo', {}, None)

View File

@ -2,3 +2,4 @@ six>=1.9.0
oslo.concurrency>=1.8.0
oslo.utils>=1.4.0
jsonschema>=2.4.0
requests>=2.7.0