Adding Unit Test Cases for cloudpulse api
Partially implements: blueprint unit-tests Change-Id: Ib912b24efeba0ad3a54c6b3fafbb8f7c7e6fab43
This commit is contained in:
@@ -22,6 +22,7 @@ import datetime
|
|||||||
|
|
||||||
import pecan
|
import pecan
|
||||||
from pecan import rest
|
from pecan import rest
|
||||||
|
from webob import exc
|
||||||
import wsme
|
import wsme
|
||||||
from wsme import types as wtypes
|
from wsme import types as wtypes
|
||||||
import wsmeext.pecan as wsme_pecan
|
import wsmeext.pecan as wsme_pecan
|
||||||
@@ -29,6 +30,17 @@ import wsmeext.pecan as wsme_pecan
|
|||||||
from cloudpulse.api.controllers import link
|
from cloudpulse.api.controllers import link
|
||||||
from cloudpulse.api.controllers.v1 import cpulse
|
from cloudpulse.api.controllers.v1 import cpulse
|
||||||
|
|
||||||
|
BASE_VERSION = 1
|
||||||
|
|
||||||
|
MIN_VER_STR = '1.1'
|
||||||
|
|
||||||
|
# v1.1: Add API changelog here
|
||||||
|
MAX_VER_STR = '1.1'
|
||||||
|
|
||||||
|
|
||||||
|
MIN_VER = '1.1'
|
||||||
|
MAX_VER = '1.1'
|
||||||
|
|
||||||
|
|
||||||
class APIBase(wtypes.Base):
|
class APIBase(wtypes.Base):
|
||||||
|
|
||||||
@@ -112,4 +124,25 @@ class Controller(rest.RestController):
|
|||||||
# the request object to make the links.
|
# the request object to make the links.
|
||||||
return V1.convert()
|
return V1.convert()
|
||||||
|
|
||||||
|
def _check_version(self, version, headers=None):
|
||||||
|
if headers is None:
|
||||||
|
headers = {}
|
||||||
|
# ensure that major version in the URL matches the header
|
||||||
|
if version.major != BASE_VERSION:
|
||||||
|
raise exc.HTTPNotAcceptable(_(
|
||||||
|
"Mutually exclusive versions requested. Version %(ver)s "
|
||||||
|
"requested but not supported by this service."
|
||||||
|
"The supported version range is: "
|
||||||
|
"[%(min)s, %(max)s].") % {'ver': version,
|
||||||
|
'min': MIN_VER_STR,
|
||||||
|
'max': MAX_VER_STR},
|
||||||
|
headers=headers)
|
||||||
|
# ensure the minor version is within the supported range
|
||||||
|
if version < MIN_VER or version > MAX_VER:
|
||||||
|
raise exc.HTTPNotAcceptable(_(
|
||||||
|
"Version %(ver)s was requested but the minor version is not "
|
||||||
|
"supported by this service. The supported version range is: "
|
||||||
|
"[%(min)s, %(max)s].") % {'ver': version, 'min': MIN_VER_STR,
|
||||||
|
'max': MAX_VER_STR}, headers=headers)
|
||||||
|
|
||||||
__all__ = (Controller)
|
__all__ = (Controller)
|
||||||
|
@@ -16,6 +16,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import copy
|
import copy
|
||||||
|
import os
|
||||||
|
|
||||||
from cloudpulse.common import context as cpulse_context
|
from cloudpulse.common import context as cpulse_context
|
||||||
from cloudpulse.objects import base as objects_base
|
from cloudpulse.objects import base as objects_base
|
||||||
@@ -84,3 +85,19 @@ class TestCase(base.BaseTestCase):
|
|||||||
group = kw.pop('group', None)
|
group = kw.pop('group', None)
|
||||||
for k, v in kw.iteritems():
|
for k, v in kw.iteritems():
|
||||||
CONF.set_override(k, v, group)
|
CONF.set_override(k, v, group)
|
||||||
|
|
||||||
|
def path_get(self, project_file=None):
|
||||||
|
"""Get the absolute path to a file. Used for testing the API.
|
||||||
|
|
||||||
|
:param project_file: File whose path to return. Default: None.
|
||||||
|
:returns: path to the specified file, or path to project root.
|
||||||
|
"""
|
||||||
|
root = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||||
|
'..',
|
||||||
|
'..',
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if project_file:
|
||||||
|
return os.path.join(root, project_file)
|
||||||
|
else:
|
||||||
|
return root
|
||||||
|
@@ -14,12 +14,8 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import fixtures
|
|
||||||
from oslo_config import cfg
|
|
||||||
|
|
||||||
from cloudpulse.common import config
|
from cloudpulse.common import config
|
||||||
|
import fixtures
|
||||||
cfg.CONF.register_opt(cfg.StrOpt('host', default='localhost', help='host'))
|
|
||||||
|
|
||||||
|
|
||||||
class ConfFixture(fixtures.Fixture):
|
class ConfFixture(fixtures.Fixture):
|
||||||
|
93
cloudpulse/tests/fakes.py
Normal file
93
cloudpulse/tests/fakes.py
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
|
||||||
|
fakeAuthTokenHeaders = {'X-User-Id': u'773a902f022949619b5c2f32cd89d419',
|
||||||
|
'X-Roles': u'admin, ResellerAdmin, _member_',
|
||||||
|
'X-Project-Id': u'5588aebbcdc24e17a061595f80574376',
|
||||||
|
'X-Project-Name': 'test',
|
||||||
|
'X-User-Name': 'test',
|
||||||
|
'X-Auth-Token': u'5588aebbcdc24e17a061595f80574376',
|
||||||
|
'X-Forwarded-For': u'10.10.10.10, 11.11.11.11',
|
||||||
|
'X-Service-Catalog': u'{test: 12345}',
|
||||||
|
'X-Auth-Url': 'fake_auth_url',
|
||||||
|
'X-Identity-Status': 'Confirmed',
|
||||||
|
'X-User-Domain-Name': 'domain',
|
||||||
|
'X-Project-Domain-Id': 'project_domain_id',
|
||||||
|
'X-User-Domain-Id': 'user_domain_id',
|
||||||
|
'X-OpenStack-Cloudpulse-API-Version': '1.0'
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class FakePecanRequest(mock.Mock):
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super(FakePecanRequest, self).__init__(**kwargs)
|
||||||
|
self.host_url = 'http://test_url:8080/test'
|
||||||
|
self.context = {}
|
||||||
|
self.body = ''
|
||||||
|
self.content_type = 'text/unicode'
|
||||||
|
self.params = {}
|
||||||
|
self.path = '/v1/services'
|
||||||
|
self.headers = fakeAuthTokenHeaders
|
||||||
|
self.environ = {}
|
||||||
|
self.version = (1, 0)
|
||||||
|
|
||||||
|
def __setitem__(self, index, value):
|
||||||
|
setattr(self, index, value)
|
||||||
|
|
||||||
|
|
||||||
|
class FakePecanResponse(mock.Mock):
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super(FakePecanResponse, self).__init__(**kwargs)
|
||||||
|
self.status = None
|
||||||
|
|
||||||
|
|
||||||
|
class FakeApp(object):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FakeService(mock.Mock):
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super(FakeService, self).__init__(**kwargs)
|
||||||
|
self.__tablename__ = 'service'
|
||||||
|
self.__resource__ = 'services'
|
||||||
|
self.user_id = 'fake user id'
|
||||||
|
self.project_id = 'fake project id'
|
||||||
|
self.uuid = 'test_uuid'
|
||||||
|
self.id = 8
|
||||||
|
self.name = 'james'
|
||||||
|
self.service_type = 'not_this'
|
||||||
|
self.description = 'amazing'
|
||||||
|
self.tags = ['this', 'and that']
|
||||||
|
self.read_only = True
|
||||||
|
|
||||||
|
def as_dict(self):
|
||||||
|
return dict(service_type=self.service_type,
|
||||||
|
user_id=self.user_id,
|
||||||
|
project_id=self.project_id,
|
||||||
|
uuid=self.uuid,
|
||||||
|
id=self.id,
|
||||||
|
name=self.name,
|
||||||
|
tags=self.tags,
|
||||||
|
read_only=self.read_only,
|
||||||
|
description=self.description)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeAuthProtocol(mock.Mock):
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super(FakeAuthProtocol, self).__init__(**kwargs)
|
||||||
|
self.app = FakeApp()
|
||||||
|
self.config = ''
|
0
cloudpulse/tests/unit/api/__init__.py
Normal file
0
cloudpulse/tests/unit/api/__init__.py
Normal file
249
cloudpulse/tests/unit/api/base.py
Normal file
249
cloudpulse/tests/unit/api/base.py
Normal file
@@ -0,0 +1,249 @@
|
|||||||
|
# Copyright 2015 Cisco Inc
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
"""Base classes for API tests."""
|
||||||
|
|
||||||
|
# NOTE: Ported from ceilometer/tests/api.py (subsequently moved to
|
||||||
|
# ceilometer/tests/api/__init__.py). This should be oslo'ified:
|
||||||
|
# https://bugs.launchpad.net/ironic/+bug/1255115.
|
||||||
|
|
||||||
|
# NOTE(deva): import auth_token so we can override a config option
|
||||||
|
from keystonemiddleware import auth_token # noqa
|
||||||
|
import mock
|
||||||
|
from oslo_config import cfg
|
||||||
|
import pecan
|
||||||
|
import pecan.testing
|
||||||
|
from six.moves.urllib import parse as urlparse
|
||||||
|
|
||||||
|
from cloudpulse.api import hooks
|
||||||
|
from cloudpulse.db import api as dbapi
|
||||||
|
from cloudpulse.tests.unit.db import base
|
||||||
|
|
||||||
|
PATH_PREFIX = '/v1'
|
||||||
|
|
||||||
|
|
||||||
|
class FunctionalTest(base.DbTestCase):
|
||||||
|
"""Used for functional tests of Pecan controllers where you need to
|
||||||
|
|
||||||
|
test your literal application and its integration with the
|
||||||
|
framework.
|
||||||
|
"""
|
||||||
|
|
||||||
|
SOURCE_DATA = {'test_source': {'somekey': '666'}}
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(FunctionalTest, self).setUp()
|
||||||
|
cfg.CONF.set_override("auth_version", "v2.0",
|
||||||
|
group='keystone_authtoken')
|
||||||
|
cfg.CONF.set_override("admin_user", "admin",
|
||||||
|
group='keystone_authtoken')
|
||||||
|
self.app = self._make_app()
|
||||||
|
self.dbapi = dbapi.get_instance()
|
||||||
|
|
||||||
|
def reset_pecan():
|
||||||
|
pecan.set_config({}, overwrite=True)
|
||||||
|
|
||||||
|
self.addCleanup(reset_pecan)
|
||||||
|
|
||||||
|
p = mock.patch('cloudpulse.api.controllers.v1' +
|
||||||
|
'.Controller._check_version')
|
||||||
|
self._check_version = p.start()
|
||||||
|
self.addCleanup(p.stop)
|
||||||
|
|
||||||
|
def _make_app(self, enable_acl=False):
|
||||||
|
# Determine where we are so we can set up paths in the config
|
||||||
|
root_dir = self.path_get()
|
||||||
|
|
||||||
|
self.config = {
|
||||||
|
'app': {
|
||||||
|
'root': 'cloudpulse.api.controllers.root.RootController',
|
||||||
|
'modules': ['cloudpulse.api'],
|
||||||
|
'static_root': '%s/public' % root_dir,
|
||||||
|
'template_path': '%s/api/templates' % root_dir,
|
||||||
|
'enable_acl': enable_acl,
|
||||||
|
'acl_public_routes': ['/', '/v1'],
|
||||||
|
'hooks': [
|
||||||
|
hooks.ContextHook(),
|
||||||
|
hooks.RPCHook(),
|
||||||
|
hooks.NoExceptionTracebackHook(),
|
||||||
|
],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return pecan.testing.load_test_app(self.config)
|
||||||
|
|
||||||
|
def _request_json(self, path, params, expect_errors=False, headers=None,
|
||||||
|
method="post", extra_environ=None, status=None,
|
||||||
|
path_prefix=PATH_PREFIX):
|
||||||
|
"""Sends simulated HTTP request to Pecan test app.
|
||||||
|
|
||||||
|
:param path: url path of target service
|
||||||
|
:param params: content for wsgi.input of request
|
||||||
|
:param expect_errors: Boolean value; whether an error is expected based
|
||||||
|
on request
|
||||||
|
:param headers: a dictionary of headers to send along with the request
|
||||||
|
:param method: Request method type. Appropriate method function call
|
||||||
|
should be used rather than passing attribute in.
|
||||||
|
:param extra_environ: a dictionary of environ variables to send along
|
||||||
|
with the request
|
||||||
|
:param status: expected status code of response
|
||||||
|
:param path_prefix: prefix of the url path
|
||||||
|
"""
|
||||||
|
full_path = path_prefix + path
|
||||||
|
print('%s: %s %s' % (method.upper(), full_path, params))
|
||||||
|
response = getattr(self.app, "%s_json" % method)(
|
||||||
|
str(full_path),
|
||||||
|
params=params,
|
||||||
|
headers=headers,
|
||||||
|
status=status,
|
||||||
|
extra_environ=extra_environ,
|
||||||
|
expect_errors=expect_errors
|
||||||
|
)
|
||||||
|
print('GOT:%s' % response)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def put_json(self, path, params, expect_errors=False, headers=None,
|
||||||
|
extra_environ=None, status=None):
|
||||||
|
"""Sends simulated HTTP PUT request to Pecan test app.
|
||||||
|
|
||||||
|
:param path: url path of target service
|
||||||
|
:param params: content for wsgi.input of request
|
||||||
|
:param expect_errors: Boolean value; whether an error is expected based
|
||||||
|
on request
|
||||||
|
:param headers: a dictionary of headers to send along with the request
|
||||||
|
:param extra_environ: a dictionary of environ variables to send along
|
||||||
|
with the request
|
||||||
|
:param status: expected status code of response
|
||||||
|
"""
|
||||||
|
return self._request_json(path=path, params=params,
|
||||||
|
expect_errors=expect_errors,
|
||||||
|
headers=headers, extra_environ=extra_environ,
|
||||||
|
status=status, method="put")
|
||||||
|
|
||||||
|
def post_json(self, path, params, expect_errors=False, headers=None,
|
||||||
|
extra_environ=None, status=None):
|
||||||
|
"""Sends simulated HTTP POST request to Pecan test app.
|
||||||
|
|
||||||
|
:param path: url path of target service
|
||||||
|
:param params: content for wsgi.input of request
|
||||||
|
:param expect_errors: Boolean value; whether an error is expected based
|
||||||
|
on request
|
||||||
|
:param headers: a dictionary of headers to send along with the request
|
||||||
|
:param extra_environ: a dictionary of environ variables to send along
|
||||||
|
with the request
|
||||||
|
:param status: expected status code of response
|
||||||
|
"""
|
||||||
|
return self._request_json(path=path, params=params,
|
||||||
|
expect_errors=expect_errors,
|
||||||
|
headers=headers, extra_environ=extra_environ,
|
||||||
|
status=status, method="post")
|
||||||
|
|
||||||
|
def patch_json(self, path, params, expect_errors=False, headers=None,
|
||||||
|
extra_environ=None, status=None):
|
||||||
|
"""Sends simulated HTTP PATCH request to Pecan test app.
|
||||||
|
|
||||||
|
:param path: url path of target service
|
||||||
|
:param params: content for wsgi.input of request
|
||||||
|
:param expect_errors: Boolean value; whether an error is expected based
|
||||||
|
on request
|
||||||
|
:param headers: a dictionary of headers to send along with the request
|
||||||
|
:param extra_environ: a dictionary of environ variables to send along
|
||||||
|
with the request
|
||||||
|
:param status: expected status code of response
|
||||||
|
"""
|
||||||
|
return self._request_json(path=path, params=params,
|
||||||
|
expect_errors=expect_errors,
|
||||||
|
headers=headers, extra_environ=extra_environ,
|
||||||
|
status=status, method="patch")
|
||||||
|
|
||||||
|
def delete(self, path, expect_errors=False, headers=None,
|
||||||
|
extra_environ=None, status=None, path_prefix=PATH_PREFIX):
|
||||||
|
"""Sends simulated HTTP DELETE request to Pecan test app.
|
||||||
|
|
||||||
|
:param path: url path of target service
|
||||||
|
:param expect_errors: Boolean value; whether an error is expected based
|
||||||
|
on request
|
||||||
|
:param headers: a dictionary of headers to send along with the request
|
||||||
|
:param extra_environ: a dictionary of environ variables to send along
|
||||||
|
with the request
|
||||||
|
:param status: expected status code of response
|
||||||
|
:param path_prefix: prefix of the url path
|
||||||
|
"""
|
||||||
|
full_path = path_prefix + path
|
||||||
|
print('DELETE: %s' % (full_path))
|
||||||
|
response = self.app.delete(str(full_path),
|
||||||
|
headers=headers,
|
||||||
|
status=status,
|
||||||
|
extra_environ=extra_environ,
|
||||||
|
expect_errors=expect_errors)
|
||||||
|
print('GOT:%s' % response)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def get_json(self, path, expect_errors=False, headers=None,
|
||||||
|
extra_environ=None, q=None, path_prefix=PATH_PREFIX,
|
||||||
|
**params):
|
||||||
|
"""Sends simulated HTTP GET request to Pecan test app.
|
||||||
|
|
||||||
|
:param path: url path of target service
|
||||||
|
:param expect_errors: Boolean value;whether an error is expected based
|
||||||
|
on request
|
||||||
|
:param headers: a dictionary of headers to send along with the request
|
||||||
|
:param extra_environ: a dictionary of environ variables to send along
|
||||||
|
with the request
|
||||||
|
:param q: list of queries consisting of: field, value, op, and type
|
||||||
|
keys
|
||||||
|
:param path_prefix: prefix of the url path
|
||||||
|
:param params: content for wsgi.input of request
|
||||||
|
"""
|
||||||
|
if q is None:
|
||||||
|
q = []
|
||||||
|
full_path = path_prefix + path
|
||||||
|
query_params = {'q.field': [],
|
||||||
|
'q.value': [],
|
||||||
|
'q.op': [],
|
||||||
|
}
|
||||||
|
for query in q:
|
||||||
|
for name in ['field', 'op', 'value']:
|
||||||
|
query_params['q.%s' % name].append(query.get(name, ''))
|
||||||
|
all_params = {}
|
||||||
|
all_params.update(params)
|
||||||
|
if q:
|
||||||
|
all_params.update(query_params)
|
||||||
|
print('GET: %s %r' % (full_path, all_params))
|
||||||
|
response = self.app.get(full_path,
|
||||||
|
params=all_params,
|
||||||
|
headers=headers,
|
||||||
|
extra_environ=extra_environ,
|
||||||
|
expect_errors=expect_errors)
|
||||||
|
if not expect_errors:
|
||||||
|
response = response.json
|
||||||
|
print('GOT:%s' % response)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def validate_link(self, link, bookmark=False):
|
||||||
|
"""Checks if the given link can get correct data."""
|
||||||
|
# removes the scheme and net location parts of the link
|
||||||
|
url_parts = list(urlparse.urlparse(link))
|
||||||
|
url_parts[0] = url_parts[1] = ''
|
||||||
|
|
||||||
|
# bookmark link should not have the version in the URL
|
||||||
|
if bookmark and url_parts[2].startswith(PATH_PREFIX):
|
||||||
|
return False
|
||||||
|
|
||||||
|
full_path = urlparse.urlunparse(url_parts)
|
||||||
|
try:
|
||||||
|
self.get_json(full_path, path_prefix='')
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
33
cloudpulse/tests/unit/api/test_app.py
Normal file
33
cloudpulse/tests/unit/api/test_app.py
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
# Copyright 2015
|
||||||
|
# Cisco Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||||
|
# use this file except in compliance with the License. You may obtain a copy
|
||||||
|
# of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from cloudpulse.api import app as api_app
|
||||||
|
from cloudpulse.api import config as api_config
|
||||||
|
from cloudpulse.api import hooks
|
||||||
|
from cloudpulse.tests import base
|
||||||
|
|
||||||
|
|
||||||
|
class TestAppConfig(base.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestAppConfig, self).setUp()
|
||||||
|
|
||||||
|
def test_get_pecan_config(self):
|
||||||
|
config = api_app.get_pecan_config()
|
||||||
|
|
||||||
|
config_d = dict(config.app)
|
||||||
|
|
||||||
|
self.assertEqual(config_d['modules'], api_config.app['modules'])
|
||||||
|
self.assertEqual(config_d['root'], api_config.app['root'])
|
||||||
|
self.assertIsInstance(config_d['hooks'][0], hooks.ContextHook)
|
41
cloudpulse/tests/unit/api/test_auth.py
Normal file
41
cloudpulse/tests/unit/api/test_auth.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
# Copyright 2015
|
||||||
|
# Cisco, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||||
|
# use this file except in compliance with the License. You may obtain a copy
|
||||||
|
# of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
from oslo_config import fixture
|
||||||
|
|
||||||
|
from cloudpulse.api import auth
|
||||||
|
from cloudpulse.tests import base
|
||||||
|
from cloudpulse.tests import fakes
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch('cloudpulse.api.middleware.auth_token.AuthTokenMiddleware',
|
||||||
|
new_callable=fakes.FakeAuthProtocol)
|
||||||
|
class TestAuth(base.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestAuth, self).setUp()
|
||||||
|
self.CONF = self.useFixture(fixture.Config())
|
||||||
|
self.app = fakes.FakeApp()
|
||||||
|
|
||||||
|
def test_check_auth_option_enabled(self, mock_auth):
|
||||||
|
self.CONF.config(enable_authentication=True)
|
||||||
|
result = auth.install(self.app, self.CONF.conf, ['/'])
|
||||||
|
self.assertIsInstance(result, fakes.FakeAuthProtocol)
|
||||||
|
|
||||||
|
def test_check_auth_option_disabled(self, mock_auth):
|
||||||
|
self.CONF.config(enable_authentication=False)
|
||||||
|
result = auth.install(self.app, self.CONF.conf, ['/'])
|
||||||
|
self.assertIsInstance(result, fakes.FakeApp)
|
139
cloudpulse/tests/unit/api/test_hooks.py
Normal file
139
cloudpulse/tests/unit/api/test_hooks.py
Normal file
@@ -0,0 +1,139 @@
|
|||||||
|
# Copyright 2014
|
||||||
|
# The Cloudscaling Group, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||||
|
# use this file except in compliance with the License. You may obtain a copy
|
||||||
|
# of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
import mock
|
||||||
|
from oslo_config import cfg
|
||||||
|
import oslo_messaging as messaging
|
||||||
|
|
||||||
|
from cloudpulse.api.controllers import root
|
||||||
|
from cloudpulse.api import hooks
|
||||||
|
from cloudpulse.common import context as cloudpulse_context
|
||||||
|
from cloudpulse.tests import base
|
||||||
|
from cloudpulse.tests import fakes
|
||||||
|
from cloudpulse.tests.unit.api import base as api_base
|
||||||
|
|
||||||
|
|
||||||
|
class TestContextHook(base.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestContextHook, self).setUp()
|
||||||
|
self.app = fakes.FakeApp()
|
||||||
|
|
||||||
|
def test_context_hook_before_method(self):
|
||||||
|
state = mock.Mock(request=fakes.FakePecanRequest())
|
||||||
|
hook = hooks.ContextHook()
|
||||||
|
hook.before(state)
|
||||||
|
ctx = state.request.context
|
||||||
|
self.assertIsInstance(ctx, cloudpulse_context.RequestContext)
|
||||||
|
self.assertEqual(ctx.auth_token,
|
||||||
|
fakes.fakeAuthTokenHeaders['X-Auth-Token'])
|
||||||
|
self.assertEqual(ctx.project_id,
|
||||||
|
fakes.fakeAuthTokenHeaders['X-Project-Id'])
|
||||||
|
self.assertEqual(ctx.user_id,
|
||||||
|
fakes.fakeAuthTokenHeaders['X-User-Id'])
|
||||||
|
self.assertEqual(ctx.auth_url,
|
||||||
|
fakes.fakeAuthTokenHeaders['X-Auth-Url'])
|
||||||
|
self.assertEqual(ctx.domain_name,
|
||||||
|
fakes.fakeAuthTokenHeaders['X-User-Domain-Name'])
|
||||||
|
self.assertEqual(ctx.domain_id,
|
||||||
|
fakes.fakeAuthTokenHeaders['X-User-Domain-Id'])
|
||||||
|
|
||||||
|
def test_context_hook_before_method_auth_info(self):
|
||||||
|
state = mock.Mock(request=fakes.FakePecanRequest())
|
||||||
|
state.request.environ['keystone.token_info'] = 'assert_this'
|
||||||
|
hook = hooks.ContextHook()
|
||||||
|
hook.before(state)
|
||||||
|
ctx = state.request.context
|
||||||
|
self.assertIsInstance(ctx, cloudpulse_context.RequestContext)
|
||||||
|
self.assertEqual(fakes.fakeAuthTokenHeaders['X-Auth-Token'],
|
||||||
|
ctx.auth_token)
|
||||||
|
self.assertEqual('assert_this', ctx.auth_token_info)
|
||||||
|
|
||||||
|
|
||||||
|
class TestNoExceptionTracebackHook(api_base.FunctionalTest):
|
||||||
|
|
||||||
|
TRACE = [u'Traceback (most recent call last):',
|
||||||
|
u' File "/opt/stack/cloudpulse/cloudpulse/openstack',
|
||||||
|
u'/common/rpc/amqp.py",',
|
||||||
|
' line 434, in _process_data\\n **args)',
|
||||||
|
u' File "/opt/stack/cloudpulse/cloudpulse/openstack/common/rpc/'
|
||||||
|
'dispatcher.py", line 172, in dispatch\\n result ='
|
||||||
|
' getattr(proxyobj, method)(context, **kwargs)']
|
||||||
|
MSG_WITHOUT_TRACE = "Test exception message."
|
||||||
|
MSG_WITH_TRACE = MSG_WITHOUT_TRACE + "\n" + "\n".join(TRACE)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestNoExceptionTracebackHook, self).setUp()
|
||||||
|
p = mock.patch.object(root.Root, 'convert')
|
||||||
|
self.root_convert_mock = p.start()
|
||||||
|
self.addCleanup(p.stop)
|
||||||
|
|
||||||
|
def test_hook_exception_success(self):
|
||||||
|
self.root_convert_mock.side_effect = Exception(self.MSG_WITH_TRACE)
|
||||||
|
|
||||||
|
response = self.get_json('/', path_prefix='', expect_errors=True)
|
||||||
|
|
||||||
|
actual_msg = json.loads(response.json['error_message'])['faultstring']
|
||||||
|
self.assertEqual(self.MSG_WITHOUT_TRACE, actual_msg)
|
||||||
|
|
||||||
|
def test_hook_remote_error_success(self):
|
||||||
|
test_exc_type = 'TestException'
|
||||||
|
self.root_convert_mock.side_effect = messaging.rpc.RemoteError(
|
||||||
|
test_exc_type, self.MSG_WITHOUT_TRACE, self.TRACE)
|
||||||
|
|
||||||
|
response = self.get_json('/', path_prefix='', expect_errors=True)
|
||||||
|
|
||||||
|
# NOTE(max_lobur): For RemoteError the client message will still have
|
||||||
|
# some garbage because in RemoteError traceback is serialized as a list
|
||||||
|
# instead of'\n'.join(trace). But since RemoteError is kind of very
|
||||||
|
# rare thing (happens due to wrong deserialization settings etc.)
|
||||||
|
# we don't care about this garbage.
|
||||||
|
expected_msg = ("Remote error: %s %s"
|
||||||
|
% (test_exc_type, self.MSG_WITHOUT_TRACE) + "\n[u'")
|
||||||
|
actual_msg = json.loads(response.json['error_message'])['faultstring']
|
||||||
|
self.assertEqual(expected_msg, actual_msg)
|
||||||
|
|
||||||
|
def test_hook_without_traceback(self):
|
||||||
|
msg = "Error message without traceback \n but \n multiline"
|
||||||
|
self.root_convert_mock.side_effect = Exception(msg)
|
||||||
|
|
||||||
|
response = self.get_json('/', path_prefix='', expect_errors=True)
|
||||||
|
|
||||||
|
actual_msg = json.loads(response.json['error_message'])['faultstring']
|
||||||
|
self.assertEqual(msg, actual_msg)
|
||||||
|
|
||||||
|
def test_hook_server_debug_on_serverfault(self):
|
||||||
|
cfg.CONF.set_override('debug', True)
|
||||||
|
self.root_convert_mock.side_effect = Exception(self.MSG_WITH_TRACE)
|
||||||
|
|
||||||
|
response = self.get_json('/', path_prefix='', expect_errors=True)
|
||||||
|
|
||||||
|
actual_msg = json.loads(
|
||||||
|
response.json['error_message'])['faultstring']
|
||||||
|
self.assertEqual(self.MSG_WITHOUT_TRACE, actual_msg)
|
||||||
|
|
||||||
|
def test_hook_server_debug_on_clientfault(self):
|
||||||
|
cfg.CONF.set_override('debug', True)
|
||||||
|
client_error = Exception(self.MSG_WITH_TRACE)
|
||||||
|
client_error.code = 400
|
||||||
|
self.root_convert_mock.side_effect = client_error
|
||||||
|
|
||||||
|
response = self.get_json('/', path_prefix='', expect_errors=True)
|
||||||
|
|
||||||
|
actual_msg = json.loads(
|
||||||
|
response.json['error_message'])['faultstring']
|
||||||
|
self.assertEqual(self.MSG_WITH_TRACE, actual_msg)
|
@@ -25,6 +25,10 @@ oslo.i18n>=1.5.0,<1.6.0 # Apache-2.0
|
|||||||
paramiko>=1.13.0
|
paramiko>=1.13.0
|
||||||
pecan>=0.8.0
|
pecan>=0.8.0
|
||||||
pycrypto>=2.6.1
|
pycrypto>=2.6.1
|
||||||
|
python-cinderclient>=1.3.1
|
||||||
|
python-glanceclient>=0.18.0
|
||||||
|
python-neutronclient>=2.3.11,<3
|
||||||
|
python-novaclient>=2.22.0
|
||||||
python-keystoneclient>=1.1.0
|
python-keystoneclient>=1.1.0
|
||||||
singledispatch==3.4.0.3
|
singledispatch==3.4.0.3
|
||||||
six>=1.9.0
|
six>=1.9.0
|
||||||
|
Reference in New Issue
Block a user