Handle [SSL: CERTIFICATE_VERIFY_FAILED] exception

This patch removes all the duplicated code blocks
related to HTTP requests, improves the way that
the exceptions are handled and adds a new base class
for all the metadata providers which use HTTP(S)
protocol in order to serve information.

Change-Id: Ib36b0cf592310bb1e6cc4aec4fc2b9b5f29f1759
Co-Authored-By: Alexandru Tudose <atudose@cloudbasesolutions.com>
This commit is contained in:
Alexandru Coman 2016-07-13 20:29:35 +03:00
parent 6c6ef0e11a
commit 7078b58038
No known key found for this signature in database
GPG Key ID: 07D0D48680C70E83
15 changed files with 289 additions and 313 deletions

View File

@ -31,6 +31,14 @@ class CloudStackOptions(conf_base.Options):
help="The base URL where the service looks for metadata",
deprecated_name="cloudstack_metadata_ip",
deprecated_group="DEFAULT"),
cfg.BoolOpt(
"https_allow_insecure", default=False,
help="Whether to disable the validation of HTTPS "
"certificates."),
cfg.StrOpt(
"https_ca_bundle", default=None,
help="The path to a CA_BUNDLE file or directory with "
"certificates of trusted CAs."),
]
def register(self):

View File

@ -36,6 +36,14 @@ class EC2Options(conf_base.Options):
help="Add a route for the metadata ip address to the gateway",
deprecated_name="ec2_add_metadata_private_ip_route",
deprecated_group="DEFAULT"),
cfg.BoolOpt(
"https_allow_insecure", default=False,
help="Whether to disable the validation of HTTPS "
"certificates."),
cfg.StrOpt(
"https_ca_bundle", default=None,
help="The path to a CA_BUNDLE file or directory with "
"certificates of trusted CAs."),
]
def register(self):

View File

@ -46,6 +46,14 @@ class MAASOptions(conf_base.Options):
help="The MaaS OAuth token secret",
deprecated_name="maas_oauth_token_secret",
deprecated_group="DEFAULT"),
cfg.BoolOpt(
"https_allow_insecure", default=False,
help="Whether to disable the validation of HTTPS "
"certificates."),
cfg.StrOpt(
"https_ca_bundle", default=None,
help="The path to a CA_BUNDLE file or directory with "
"certificates of trusted CAs."),
]
def register(self):

View File

@ -34,6 +34,14 @@ class OpenStackOptions(conf_base.Options):
"add_metadata_private_ip_route", default=True,
help="Add a route for the metadata ip address to the gateway",
deprecated_group="DEFAULT"),
cfg.BoolOpt(
"https_allow_insecure", default=False,
help="Whether to disable the validation of HTTPS "
"certificates."),
cfg.StrOpt(
"https_ca_bundle", default=None,
help="The path to a CA_BUNDLE file or directory with "
"certificates of trusted CAs."),
]
def register(self):

View File

@ -23,6 +23,25 @@ class ItemNotFoundException(CloudbaseInitException):
pass
class ServiceException(Exception):
"""Base exception for all the metadata services related errors."""
pass
class CertificateVerifyFailed(ServiceException):
"""The received certificate is not valid.
In order to avoid the current exception the validation of the SSL
certificate should be disabled for the metadata provider. In order
to do that the `https_allow_insecure` config option should be set.
"""
pass
class WindowsCloudbaseInitException(CloudbaseInitException):
def __init__(self, msg="%r", error_code=None):

View File

@ -20,9 +20,11 @@ import io
import time
from oslo_log import log as oslo_logging
import requests
import six
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import exception
from cloudbaseinit.utils import encoding
CONF = cloudbaseinit_conf.CONF
@ -180,3 +182,74 @@ class BaseMetadataService(object):
is True.
"""
return False
class BaseHTTPMetadataService(BaseMetadataService):
"""Contract class for metadata services that are ussing HTTP(S)."""
def __init__(self, base_url, https_allow_insecure=False,
https_ca_bundle=None):
"""Setup a new metadata service.
:param https_allow_insecure:
Whether to disable the validation of HTTPS certificates
(default False).
:param base_url:
The base URL where the service looks for metadata.
:param https_ca_bundle:
The path to a CA_BUNDLE file or directory with certificates
of trusted CAs.
.. note ::
If `https_ca_bundle` is set to a path to a directory, the
directory must have been processed using the c_rehash utility
supplied with OpenSSL.
"""
super(BaseHTTPMetadataService, self).__init__()
self._https_allow_insecure = https_allow_insecure
self._https_ca_bundle = https_ca_bundle
self._base_url = base_url
def _verify_https_request(self):
"""Whether to disable the validation of HTTPS certificates.
When this option is `True` the SSL certificate validation for the
current metadata provider will be disabled (please don't use it if
you don't know the implications of this behaviour).
"""
if self._https_ca_bundle:
return self._https_ca_bundle
else:
return self._https_allow_insecure
def _http_request(self, url, data=None, headers=None):
"""Get content for received url."""
if not url.startswith("http"):
url = requests.compat.urljoin(self._base_url, url)
request_action = requests.get if not data else requests.post
if not data:
LOG.debug('Getting metadata from: %s', url)
else:
LOG.debug('Posting data to %s', url)
response = request_action(url=url, data=data, headers=headers,
verify=self._verify_https_request())
response.raise_for_status()
return response.content
def _get_data(self, path):
"""Getting the required information ussing metadata service."""
try:
response = self._http_request(path)
except requests.HTTPError as exc:
if exc.response.status_code == 404:
raise NotExistingMetadataException(
getattr(exc, "message", str(exc)))
raise
except requests.exceptions.SSLError as exc:
LOG.exception(exc)
raise exception.CertificateVerifyFailed(
"HTTPS certificate validation failed.")
return response

View File

@ -24,7 +24,6 @@ from cloudbaseinit.metadata.services import base
from cloudbaseinit.osutils import factory as osutils_factory
from cloudbaseinit.utils import encoding
CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__)
@ -33,12 +32,17 @@ SAVED_PASSWORD = b"saved_password"
TIMEOUT = 10
class CloudStack(base.BaseMetadataService):
class CloudStack(base.BaseHTTPMetadataService):
def __init__(self):
super(CloudStack, self).__init__()
self.osutils = osutils_factory.get_os_utils()
self._metadata_url = None
# Note: The base url used by the current metadata service will be
# updated later by the `_test_api` method.
super(CloudStack, self).__init__(
base_url=None,
https_allow_insecure=CONF.cloudstack.https_allow_insecure,
https_ca_bundle=CONF.cloudstack.https_ca_bundle)
self._osutils = osutils_factory.get_os_utils()
self._router_ip = None
def _get_path(self, resource, version="latest"):
@ -48,7 +52,7 @@ class CloudStack(base.BaseMetadataService):
def _test_api(self, metadata_url):
"""Test if the CloudStack API is responding properly."""
self._metadata_url = metadata_url
self._base_url = metadata_url
try:
response = self._get_data(self._get_path("service-offering"))
except urllib.error.HTTPError as exc:
@ -63,7 +67,8 @@ class CloudStack(base.BaseMetadataService):
return False
LOG.debug('Available services: %s', response)
self._router_ip = urllib.parse.urlparse(metadata_url).netloc
netloc = urllib.parse.urlparse(metadata_url).netloc
self._router_ip = netloc.split(":")[0]
return True
def load(self):
@ -72,7 +77,7 @@ class CloudStack(base.BaseMetadataService):
if self._test_api(CONF.cloudstack.metadata_base_url):
return True
dhcp_servers = self.osutils.get_dhcp_hosts_in_use()
dhcp_servers = self._osutils.get_dhcp_hosts_in_use()
if not dhcp_servers:
LOG.debug('No DHCP server was found.')
return False
@ -83,24 +88,6 @@ class CloudStack(base.BaseMetadataService):
return False
def _http_request(self, url, **kwargs):
"""Get content for received url."""
LOG.debug('Getting metadata from: %s', url)
request = urllib.request.Request(url, **kwargs)
response = urllib.request.urlopen(request)
return response.read()
def _get_data(self, path):
"""Getting required metadata using CloudStack metadata API."""
metadata_url = urllib.parse.urljoin(self._metadata_url, path)
try:
content = self._http_request(metadata_url)
except urllib.error.HTTPError as exc:
if exc.code == 404:
raise base.NotExistingMetadataException()
raise
return content
def get_instance_id(self):
"""Instance name of the virtual machine."""
return self._get_cache_data(self._get_path("instance-id"),
@ -147,7 +134,6 @@ class CloudStack(base.BaseMetadataService):
with contextlib.closing(http_client.HTTPConnection(
self._router_ip, 8080, timeout=TIMEOUT)) as connection:
for _ in range(CONF.retry_count):
try:
connection.request("GET", "/", headers=headers)

View File

@ -13,26 +13,24 @@
# License for the specific language governing permissions and limitations
# under the License.
import posixpath
from oslo_log import log as oslo_logging
from six.moves.urllib import error
from six.moves.urllib import request
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.metadata.services import base
from cloudbaseinit.utils import network
CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__)
class EC2Service(base.BaseMetadataService):
class EC2Service(base.BaseHTTPMetadataService):
_metadata_version = '2009-04-04'
def __init__(self):
super(EC2Service, self).__init__()
super(EC2Service, self).__init__(
base_url=CONF.ec2.metadata_base_url,
https_allow_insecure=CONF.ec2.https_allow_insecure,
https_ca_bundle=CONF.ec2.https_ca_bundle)
self._enable_retry = True
def load(self):
@ -49,24 +47,6 @@ class EC2Service(base.BaseMetadataService):
CONF.ec2.metadata_base_url)
return False
def _get_response(self, req):
try:
return request.urlopen(req)
except error.HTTPError as ex:
if ex.code == 404:
raise base.NotExistingMetadataException()
else:
raise
def _get_data(self, path):
norm_path = posixpath.join(CONF.ec2.metadata_base_url, path)
LOG.debug('Getting metadata from: %(norm_path)s',
{'norm_path': norm_path})
req = request.Request(norm_path)
response = self._get_response(req)
return response.read()
def get_host_name(self):
return self._get_cache_data('%s/meta-data/local-hostname' %
self._metadata_version, decode=True)

View File

@ -12,26 +12,26 @@
# License for the specific language governing permissions and limitations
# under the License.
import posixpath
from oslo_log import log as oslo_logging
from six.moves.urllib import error
from six.moves.urllib import request
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.metadata.services import base
from cloudbaseinit.metadata.services import baseopenstackservice
from cloudbaseinit.metadata.services import baseopenstackservice as baseos
from cloudbaseinit.utils import network
CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__)
class HttpService(baseopenstackservice.BaseOpenStackService):
class HttpService(base.BaseHTTPMetadataService, baseos.BaseOpenStackService):
_POST_PASSWORD_MD_VER = '2013-04-04'
def __init__(self):
super(HttpService, self).__init__()
super(HttpService, self).__init__(
base_url=CONF.openstack.metadata_base_url,
https_allow_insecure=CONF.openstack.https_allow_insecure,
https_ca_bundle=CONF.openstack.https_ca_bundle)
self._enable_retry = True
def load(self):
@ -47,27 +47,8 @@ class HttpService(baseopenstackservice.BaseOpenStackService):
CONF.openstack.metadata_base_url)
return False
def _get_response(self, req):
try:
return request.urlopen(req)
except error.HTTPError as ex:
if ex.code == 404:
raise base.NotExistingMetadataException()
else:
raise
def _get_data(self, path):
norm_path = posixpath.join(CONF.openstack.metadata_base_url, path)
LOG.debug('Getting metadata from: %s', norm_path)
req = request.Request(norm_path)
response = self._get_response(req)
return response.read()
def _post_data(self, path, data):
norm_path = posixpath.join(CONF.openstack.metadata_base_url, path)
LOG.debug('Posting metadata to: %s', norm_path)
req = request.Request(norm_path, data=data)
self._get_response(req)
self._http_request(path, data=data)
return True
def _get_password_path(self):

View File

@ -12,13 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import posixpath
import re
from oauthlib import oauth1
from oslo_log import log as oslo_logging
from six.moves.urllib import error
from six.moves.urllib import request
import requests
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.metadata.services import base
@ -39,11 +37,14 @@ class _Realm(str):
__nonzero__ = __bool__
class MaaSHttpService(base.BaseMetadataService):
class MaaSHttpService(base.BaseHTTPMetadataService):
_METADATA_2012_03_01 = '2012-03-01'
def __init__(self):
super(MaaSHttpService, self).__init__()
super(MaaSHttpService, self).__init__(
base_url=CONF.maas.metadata_base_url,
https_allow_insecure=CONF.maas.https_allow_insecure,
https_ca_bundle=CONF.maas.https_ca_bundle)
self._enable_retry = True
self._metadata_version = self._METADATA_2012_03_01
@ -62,16 +63,8 @@ class MaaSHttpService(base.BaseMetadataService):
CONF.maas.metadata_base_url)
return False
def _get_response(self, req):
try:
return request.urlopen(req)
except error.HTTPError as ex:
if ex.code == 404:
raise base.NotExistingMetadataException()
else:
raise
def _get_oauth_headers(self, url):
LOG.debug("Getting authorization headers for %s.", url)
client = oauth1.Client(
CONF.maas.oauth_consumer_key,
client_secret=CONF.maas.oauth_consumer_secret,
@ -82,15 +75,14 @@ class MaaSHttpService(base.BaseMetadataService):
headers = client.sign(url, realm=realm)[1]
return headers
def _get_data(self, path):
norm_path = posixpath.join(CONF.maas.metadata_base_url, path)
oauth_headers = self._get_oauth_headers(norm_path)
def _http_request(self, url, data=None, headers=None):
"""Get content for received url."""
if not url.startswith("http"):
url = requests.compat.urljoin(self._base_url, url)
headers = {} if headers is None else headers
headers.update(self._get_oauth_headers(url))
LOG.debug('Getting metadata from: %(norm_path)s',
{'norm_path': norm_path})
req = request.Request(norm_path, headers=oauth_headers)
response = self._get_response(req)
return response.read()
return super(MaaSHttpService, self)._http_request(url, data, headers)
def get_host_name(self):
return self._get_cache_data('%s/meta-data/local-hostname' %

View File

@ -12,8 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
import requests
import unittest
from cloudbaseinit import exception
from cloudbaseinit.metadata.services import base
@ -35,3 +38,117 @@ class TestBase(unittest.TestCase):
def test_get_decoded_user_data(self):
userdata = self._service.get_decoded_user_data()
self.assertEqual(b"of course it works", userdata)
class TestBaseHTTPMetadataService(unittest.TestCase):
def setUp(self):
self._mock_base_url = "http://metadata.mock/"
self._service = base.BaseHTTPMetadataService("http://metadata.mock/")
def _test_verify_https_request(self, https_ca_bundle=None):
mock_service = base.BaseHTTPMetadataService(
base_url=mock.sentinel.url,
https_allow_insecure=mock.sentinel.allow_insecure,
https_ca_bundle=https_ca_bundle)
response = mock_service._verify_https_request()
if not https_ca_bundle:
self.assertTrue(mock.sentinel.allow_insecure)
else:
self.assertEqual(response, https_ca_bundle)
def test_verify_https_request(self):
self._test_verify_https_request()
def test_verify_https_request_with_ca_bundle(self):
self._test_verify_https_request(https_ca_bundle="/path/to/resource")
@mock.patch('requests.post')
@mock.patch('requests.get')
@mock.patch("cloudbaseinit.metadata.services.base.BaseHTTPMetadataService."
"_verify_https_request")
def _test_http_request(self, mock_verify, mock_get, mock_post,
mock_url, mock_data=None, mock_headers=None):
if not mock_url.startswith('http'):
mock_url = requests.compat.urljoin(self._mock_base_url, mock_url)
mock_response = mock.Mock()
mock_response_status = mock.Mock()
mock_response.raise_for_status = mock_response_status
mock_response.content = mock.sentinel.content
mock_get.return_value = mock_response
mock_post.return_value = mock_response
mock_verify.return_value = mock.sentinel.verify
response = self._service._http_request(url=mock_url, data=mock_data,
headers=mock_headers)
if mock_data:
mock_post.assert_called_once_with(
url=mock_url, data=mock_data, headers=mock_headers,
verify=mock.sentinel.verify
)
else:
mock_get.assert_called_once_with(
url=mock_url, data=mock_data, headers=mock_headers,
verify=mock.sentinel.verify
)
mock_response_status.assert_called_once_with()
self.assertEqual(response, mock.sentinel.content)
def test_http_get_request(self):
self._test_http_request(mock_url="/path/to/resource",
mock_data=None,
mock_headers={})
def test_http_post_request(self):
self._test_http_request(mock_url="/path/to/resource",
mock_data={"X-Cloudbase-Init", True},
mock_headers={})
@mock.patch('requests.compat.urljoin')
@mock.patch("cloudbaseinit.metadata.services.base."
"BaseHTTPMetadataService._http_request")
def _test_get_data(self, mock_http_request, mock_urljoin,
expected_response, expected_value):
fake_base_url = mock.Mock()
http_service = base.BaseHTTPMetadataService(fake_base_url)
mock_request = mock.Mock()
mock_urljoin.return_value = 'some_url'
mock_http_request.side_effect = [expected_response]
if expected_value:
self.assertRaises(expected_value, http_service._get_data,
mock_request)
else:
response = http_service._get_data(mock_request)
self.assertEqual(expected_response, response)
def test_get_response(self):
self._test_get_data(expected_response='fake response',
expected_value=False)
def test_get_response_not_found(self):
fake_response = mock.Mock()
fake_response.status_code = 404
http_error = requests.HTTPError()
http_error.response = fake_response
http_error.message = mock.Mock()
self._test_get_data(expected_response=http_error,
expected_value=base.NotExistingMetadataException)
def test_get_response_http_error(self):
fake_response = mock.Mock()
fake_response.status_code = 400
http_error = requests.HTTPError()
http_error.response = fake_response
self._test_get_data(expected_response=http_error,
expected_value=requests.HTTPError)
def test_get_response_ssl_error(self):
ssl_error = requests.exceptions.SSLError()
self._test_get_data(expected_response=ssl_error,
expected_value=exception.CertificateVerifyFailed)

View File

@ -64,8 +64,8 @@ class CloudStackTest(unittest.TestCase):
@mock.patch('cloudbaseinit.metadata.services.cloudstack.CloudStack'
'._test_api')
def test_load(self, mock_test_api, mock_os_util):
self._service.osutils.get_dhcp_hosts_in_use = mock.Mock()
self._service.osutils.get_dhcp_hosts_in_use.side_effect = [
self._service._osutils.get_dhcp_hosts_in_use = mock.Mock()
self._service._osutils.get_dhcp_hosts_in_use.side_effect = [
[(mock.sentinel.mac_address, '10.10.0.1'),
(mock.sentinel.mac_address, '10.10.0.2'),
(mock.sentinel.mac_address, '10.10.0.3')]
@ -89,7 +89,7 @@ class CloudStackTest(unittest.TestCase):
@mock.patch('cloudbaseinit.metadata.services.cloudstack.CloudStack'
'._test_api')
def test_load_fail(self, mock_test_api, mock_os_util):
self._service.osutils.get_dhcp_hosts_in_use.side_effect = [None]
self._service._osutils.get_dhcp_hosts_in_use.side_effect = [None]
mock_test_api.side_effect = [False]
self.assertFalse(self._service.load()) # No DHCP server was found.
@ -100,8 +100,8 @@ class CloudStackTest(unittest.TestCase):
@mock.patch('cloudbaseinit.metadata.services.cloudstack.CloudStack'
'._test_api')
def test_load_no_service(self, mock_test_api, mock_os_util):
self._service.osutils.get_dhcp_hosts_in_use = mock.Mock()
self._service.osutils.get_dhcp_hosts_in_use.side_effect = [
self._service._osutils.get_dhcp_hosts_in_use = mock.Mock()
self._service._osutils.get_dhcp_hosts_in_use.side_effect = [
[(mock.sentinel.mac_address, CONF.cloudstack.metadata_base_url)]
]
mock_test_api.side_effect = [False, False]
@ -110,29 +110,6 @@ class CloudStackTest(unittest.TestCase):
self.assertFalse(self._service.load())
self.assertEqual(2, mock_test_api.call_count)
@mock.patch('cloudbaseinit.metadata.services.cloudstack.CloudStack'
'._http_request')
def test_get_data(self, mock_http_request):
metadata = '/latest/meta-data/service-offering'
mock_http_request.side_effect = [
mock.sentinel.ok,
urllib.error.HTTPError(url=metadata, code=404, hdrs={}, fp=None,
msg='Testing 404 Not Found.'),
urllib.error.HTTPError(url=metadata, code=427, hdrs={}, fp=None,
msg='Testing 429 Too Many Requests.')
]
for status in (200, 404, 427):
if status == 200:
response = self._service._get_data(metadata)
self.assertEqual(mock.sentinel.ok, response)
elif status == 404:
self.assertRaises(base.NotExistingMetadataException,
self._service._get_data, metadata)
else:
self.assertRaises(urllib.error.HTTPError,
self._service._get_data, metadata)
@mock.patch('cloudbaseinit.metadata.services.cloudstack.CloudStack'
'._get_data')
def test_get_cache_data(self, mock_get_data):
@ -193,24 +170,6 @@ class CloudStackTest(unittest.TestCase):
response = self._service.get_public_keys()
self.assertEqual([], response)
@mock.patch('six.moves.urllib.request')
def test__http_request(self, mock_urllib_request):
mock_urllib_request.Request.return_value = mock.sentinel.request
with testutils.LogSnatcher('cloudbaseinit.metadata.services.'
'cloudstack') as snatcher:
self._service._http_request(mock.sentinel.url)
expected_logging = [
'Getting metadata from: %s' % mock.sentinel.url,
]
mock_urllib_request.Request.assert_called_once_with(
mock.sentinel.url)
mock_urllib_request.urlopen.assert_called_once_with(
mock.sentinel.request)
mock_urlopen = mock_urllib_request.urlopen.return_value
mock_urlopen.read.assert_called_once_with()
self.assertEqual(expected_logging, snatcher.output)
@mock.patch('six.moves.http_client.HTTPConnection')
def test_get_password(self, mock_http_connection):
headers = {"DomU_Request": "send_my_password"}

View File

@ -12,17 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import posixpath
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from six.moves.urllib import error
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.metadata.services import base
from cloudbaseinit.metadata.services import ec2service
from cloudbaseinit.tests import testutils
@ -58,46 +55,6 @@ class EC2ServiceTest(unittest.TestCase):
def test_load_exception(self):
self._test_load(side_effect=Exception)
@mock.patch('six.moves.urllib.request.urlopen')
def _test_get_response(self, mock_urlopen, ret_value):
req = mock.MagicMock()
mock_urlopen.side_effect = [ret_value]
is_instance = isinstance(ret_value, error.HTTPError)
if is_instance and ret_value.code == 404:
self.assertRaises(base.NotExistingMetadataException,
self._service._get_response, req)
elif is_instance and ret_value.code != 404:
self.assertRaises(error.HTTPError,
self._service._get_response, req)
else:
response = self._service._get_response(req)
self.assertEqual(ret_value, response)
mock_urlopen.assert_called_once_with(req)
def test_get_response(self):
self._test_get_response(ret_value=None)
def test_get_response_error_404(self):
err = error.HTTPError("http://169.254.169.254/", 404,
'test error 404', {}, None)
self._test_get_response(ret_value=err)
def test_get_response_error_other(self):
err = error.HTTPError("http://169.254.169.254/", 409,
'test error 409', {}, None)
self._test_get_response(ret_value=err)
@mock.patch('six.moves.urllib.request.Request')
@mock.patch('cloudbaseinit.metadata.services.ec2service.EC2Service'
'._get_response')
def test_get_data(self, mock_get_response, mock_Request):
response = self._service._get_data('fake')
fake_path = posixpath.join(CONF.ec2.metadata_base_url, 'fake')
mock_Request.assert_called_once_with(fake_path)
mock_get_response.assert_called_once_with(mock_Request())
self.assertEqual(mock_get_response.return_value.read.return_value,
response)
@mock.patch('cloudbaseinit.metadata.services.ec2service.EC2Service'
'._get_cache_data')
def test_get_host_name(self, mock_get_cache_data):

View File

@ -22,7 +22,6 @@ except ImportError:
from six.moves.urllib import error
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.metadata.services import base
from cloudbaseinit.metadata.services import httpservice
CONF = cloudbaseinit_conf.CONF
@ -54,83 +53,16 @@ class HttpServiceTest(unittest.TestCase):
def test_load_exception(self):
self._test_load(side_effect=Exception)
@mock.patch('six.moves.urllib.request.urlopen')
def _test_get_response(self, mock_urlopen, side_effect):
mock_req = mock.MagicMock
if side_effect and side_effect.code is 404:
mock_urlopen.side_effect = [side_effect]
self.assertRaises(base.NotExistingMetadataException,
self._httpservice._get_response,
mock_req)
elif side_effect and side_effect.code:
mock_urlopen.side_effect = [side_effect]
if side_effect.code == 404:
self.assertRaises(base.NotExistingMetadataException,
self._httpservice._get_response,
mock_req)
else:
self.assertRaises(error.HTTPError)
else:
mock_urlopen.return_value = 'fake url'
response = self._httpservice._get_response(mock_req)
self.assertEqual('fake url', response)
def test_get_response_fail_HTTPError(self):
err = error.HTTPError("http://169.254.169.254/", 404,
'test error 404', {}, None)
self._test_get_response(side_effect=err)
def test_get_response_fail_other_exception(self):
err = error.HTTPError("http://169.254.169.254/", 409,
'test error 409', {}, None)
self._test_get_response(side_effect=err)
def test_get_response(self):
self._test_get_response(side_effect=None)
@mock.patch('cloudbaseinit.metadata.services.httpservice.HttpService'
'._get_response')
@mock.patch('posixpath.join')
@mock.patch('six.moves.urllib.request.Request')
def test_get_data(self, mock_Request, mock_posix_join,
mock_get_response):
fake_path = os.path.join('fake', 'path')
mock_data = mock.MagicMock()
mock_norm_path = mock.MagicMock()
mock_req = mock.MagicMock()
mock_get_response.return_value = mock_data
mock_posix_join.return_value = mock_norm_path
mock_Request.return_value = mock_req
response = self._httpservice._get_data(fake_path)
mock_posix_join.assert_called_with(CONF.openstack.metadata_base_url,
fake_path)
mock_Request.assert_called_once_with(mock_norm_path)
mock_get_response.assert_called_once_with(mock_req)
self.assertEqual(mock_data.read.return_value, response)
@mock.patch('cloudbaseinit.metadata.services.httpservice.HttpService'
'._get_response')
@mock.patch('posixpath.join')
@mock.patch('six.moves.urllib.request.Request')
def test_post_data(self, mock_Request, mock_posix_join,
mock_get_response):
'._http_request')
def test_post_data(self, mock_http_request):
fake_path = os.path.join('fake', 'path')
fake_data = 'fake data'
mock_data = mock.MagicMock()
mock_norm_path = mock.MagicMock()
mock_req = mock.MagicMock()
mock_get_response.return_value = mock_data
mock_posix_join.return_value = mock_norm_path
mock_Request.return_value = mock_req
mock_http_request.return_value = mock_data
response = self._httpservice._post_data(fake_path, fake_data)
mock_posix_join.assert_called_with(CONF.openstack.metadata_base_url,
fake_path)
mock_Request.assert_called_once_with(mock_norm_path, data=fake_data)
mock_get_response.assert_called_once_with(mock_req)
mock_http_request.assert_called_once_with(fake_path, data=fake_data)
self.assertTrue(response)
def test_get_password_path(self):

View File

@ -12,18 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import posixpath
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from six.moves.urllib import error
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.metadata.services import base
from cloudbaseinit.metadata.services import maasservice
from cloudbaseinit.tests import testutils
from cloudbaseinit.utils import x509constants
@ -68,34 +64,6 @@ class MaaSHttpServiceTest(unittest.TestCase):
def test_load_get_cache_data_fails(self):
self._test_load(ip='196.254.196.254', cache_data_fails=True)
@mock.patch('six.moves.urllib.request.urlopen')
def _test_get_response(self, mock_urlopen, ret_val):
mock_request = mock.MagicMock()
mock_urlopen.side_effect = [ret_val]
if isinstance(ret_val, error.HTTPError) and ret_val.code == 404:
self.assertRaises(base.NotExistingMetadataException,
self._maasservice._get_response, mock_request)
elif isinstance(ret_val, error.HTTPError) and ret_val.code != 404:
self.assertRaises(error.HTTPError,
self._maasservice._get_response, mock_request)
else:
response = self._maasservice._get_response(req=mock_request)
mock_urlopen.assert_called_once_with(mock_request)
self.assertEqual(ret_val, response)
def test_get_response(self):
self._test_get_response(ret_val='fake response')
def test_get_response_error_404(self):
err = error.HTTPError("http://169.254.169.254/", 404,
'test error 404', {}, None)
self._test_get_response(ret_val=err)
def test_get_response_error_not_404(self):
err = error.HTTPError("http://169.254.169.254/", 409,
'test other error', {}, None)
self._test_get_response(ret_val=err)
@testutils.ConfPatcher('oauth_consumer_key', 'consumer_key', "maas")
@testutils.ConfPatcher('oauth_consumer_secret', 'consumer_secret', "maas")
@testutils.ConfPatcher('oauth_token_key', 'token_key', "maas")
@ -123,26 +91,6 @@ class MaaSHttpServiceTest(unittest.TestCase):
self.assertEqual('"consumer_secret%26token_secret"',
auth_parts['oauth_signature'])
@mock.patch("cloudbaseinit.metadata.services.maasservice.MaaSHttpService"
"._get_oauth_headers")
@mock.patch("six.moves.urllib.request.Request")
@mock.patch("cloudbaseinit.metadata.services.maasservice.MaaSHttpService"
"._get_response")
def test_get_data(self, mock_get_response, mock_Request,
mock_get_oauth_headers):
with testutils.ConfPatcher('metadata_base_url', '196.254.196.254',
'maas'):
fake_path = os.path.join('fake', 'path')
mock_get_oauth_headers.return_value = 'fake headers'
response = self._maasservice._get_data(path=fake_path)
norm_path = posixpath.join(CONF.maas.metadata_base_url, fake_path)
mock_get_oauth_headers.assert_called_once_with(norm_path)
mock_Request.assert_called_once_with(norm_path,
headers='fake headers')
mock_get_response.assert_called_once_with(mock_Request())
self.assertEqual(mock_get_response.return_value.read.return_value,
response)
@mock.patch("cloudbaseinit.metadata.services.maasservice.MaaSHttpService"
"._get_cache_data")
def test_get_host_name(self, mock_get_cache_data):