Adding more tests
* Created tests for some modules that werent covered * Updated model/service a bit to do some failure casing * Started and commented a failure test
This commit is contained in:
parent
d3c1cc0937
commit
952f160187
1
.gitignore
vendored
1
.gitignore
vendored
@ -10,3 +10,4 @@ reddwarf/vcsversion.py
|
||||
*py*.egg
|
||||
.coverage
|
||||
covhtml/
|
||||
.DS_Store
|
||||
|
@ -45,19 +45,3 @@ class Config(object):
|
||||
@classmethod
|
||||
def get(cls, key, default=None):
|
||||
return cls.instance.get(key, default)
|
||||
|
||||
@classmethod
|
||||
def get_params_group(cls, group_key):
|
||||
group_key = group_key + "_"
|
||||
return dict((key.replace(group_key, "", 1), cls.instance.get(key))
|
||||
for key in cls.instance
|
||||
if key.startswith(group_key))
|
||||
|
||||
|
||||
def load_app_environment(oparser):
|
||||
add_common_options(oparser)
|
||||
add_log_options(oparser)
|
||||
(options, args) = parse_options(oparser)
|
||||
conf = Config.load_paste_config('reddwarf', options, args)
|
||||
setup_logging(options=options, conf=conf)
|
||||
return conf
|
||||
|
@ -37,7 +37,6 @@ class ReddwarfContext(context.RequestContext):
|
||||
super(ReddwarfContext, self).__init__(**kwargs)
|
||||
|
||||
def to_dict(self):
|
||||
# self.auth_tok = auth_tok
|
||||
return {'user': self.user,
|
||||
'tenant': self.tenant,
|
||||
'is_admin': self.is_admin,
|
||||
|
@ -130,7 +130,7 @@ class Instance(RemoteModelBase):
|
||||
except nova_exceptions.NotFound, e:
|
||||
raise rd_exceptions.NotFound(uuid=uuid)
|
||||
except nova_exceptions.ClientException, e:
|
||||
raise rd_exceptions.ReddwarfError()
|
||||
raise rd_exceptions.ReddwarfError(str(e))
|
||||
else:
|
||||
self._data_object = server
|
||||
|
||||
|
@ -79,7 +79,13 @@ class InstanceController(BaseController):
|
||||
context = rd_context.ReddwarfContext(
|
||||
auth_tok=req.headers["X-Auth-Token"],
|
||||
tenant=tenant_id)
|
||||
server = models.Instance(context=context, uuid=id).data()
|
||||
try:
|
||||
# TODO(hub-cap): start testing the failure cases here
|
||||
server = models.Instance(context=context, uuid=id).data()
|
||||
except exception.ReddwarfError, e:
|
||||
# TODO(hub-cap): come up with a better way than
|
||||
# this to get the message
|
||||
return wsgi.Result(str(e), 404)
|
||||
# TODO(cp16net): need to set the return code correctly
|
||||
return wsgi.Result(views.InstanceView(server).data(), 201)
|
||||
|
||||
@ -92,8 +98,6 @@ class InstanceController(BaseController):
|
||||
# TODO(cp16net) : need to handle exceptions here if the delete fails
|
||||
models.Instance.delete(context=context, uuid=id)
|
||||
|
||||
# TODO(hub-cap): fixgure out why the result is coming back as None
|
||||
# LOG.info("result of delete %s" % result)
|
||||
# TODO(cp16net): need to set the return code correctly
|
||||
return wsgi.Result(202)
|
||||
|
||||
|
@ -54,19 +54,21 @@ class Query(object):
|
||||
def delete(self):
|
||||
db_api.delete_all(self._query_func, self._model, **self._conditions)
|
||||
|
||||
def limit(self, limit=200, marker=None, marker_column=None):
|
||||
return db_api.find_all_by_limit(self._query_func,
|
||||
self._model,
|
||||
self._conditions,
|
||||
limit=limit,
|
||||
marker=marker,
|
||||
marker_column=marker_column)
|
||||
|
||||
def paginated_collection(self, limit=200, marker=None, marker_column=None):
|
||||
collection = self.limit(int(limit) + 1, marker, marker_column)
|
||||
if len(collection) > int(limit):
|
||||
return (collection[0:-1], collection[-2]['id'])
|
||||
return (collection, None)
|
||||
#TODO(hub-cap): Reenable pagination when we have a need for it
|
||||
# def limit(self, limit=200, marker=None, marker_column=None):
|
||||
# return db_api.find_all_by_limit(self._query_func,
|
||||
# self._model,
|
||||
# self._conditions,
|
||||
# limit=limit,
|
||||
# marker=marker,
|
||||
# marker_column=marker_column)
|
||||
#
|
||||
# def paginated_collection(self, limit=200, marker=None,
|
||||
# marker_column=None):
|
||||
# collection = self.limit(int(limit) + 1, marker, marker_column)
|
||||
# if len(collection) > int(limit):
|
||||
# return (collection[0:-1], collection[-2]['id'])
|
||||
# return (collection, None)
|
||||
|
||||
|
||||
class Queryable(object):
|
||||
|
@ -25,37 +25,10 @@ from reddwarf.common import wsgi
|
||||
from reddwarf.db import db_api
|
||||
|
||||
|
||||
def sanitize(data):
|
||||
serializer = wsgi.JSONDictSerializer()
|
||||
return json.loads(serializer.serialize(data))
|
||||
|
||||
|
||||
class StubConfig():
|
||||
|
||||
def __init__(self, **options):
|
||||
self.options = options
|
||||
|
||||
def __enter__(self):
|
||||
self.actual_config = config.Config.instance
|
||||
temp_config = self.actual_config.copy()
|
||||
temp_config.update(self.options)
|
||||
config.Config.instance = temp_config
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
config.Config.instance = self.actual_config
|
||||
|
||||
|
||||
class StubTime(object):
|
||||
|
||||
def __init__(self, time):
|
||||
self.time = time
|
||||
|
||||
def __enter__(self):
|
||||
self.actual_provider = utils.utcnow
|
||||
utils.utcnow = lambda: self.time
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
utils.utcnow = self.actual_provider
|
||||
# TODO(hub-cap): we will probably use this later
|
||||
# def sanitize(data):
|
||||
# serializer = wsgi.JSONDictSerializer()
|
||||
# return json.loads(serializer.serialize(data))
|
||||
|
||||
|
||||
class TestApp(webtest.TestApp):
|
||||
|
52
reddwarf/tests/unit/test_context.py
Normal file
52
reddwarf/tests/unit/test_context.py
Normal file
@ -0,0 +1,52 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import unittest
|
||||
|
||||
from reddwarf.common import context
|
||||
|
||||
AUTH_TOK = "auth-token"
|
||||
LOG = logging.getLogger(__name__)
|
||||
TENANT = "tenant"
|
||||
USER = "user"
|
||||
|
||||
|
||||
class ContextTest(unittest.TestCase):
|
||||
|
||||
def test_get_context_as_dict(self):
|
||||
ctx = context.ReddwarfContext(user=USER, tenant=TENANT,
|
||||
is_admin=True, show_deleted=True,
|
||||
read_only=True, auth_tok=AUTH_TOK)
|
||||
ctx_dict = ctx.to_dict()
|
||||
self.assertEqual(ctx_dict['user'], USER)
|
||||
self.assertEqual(ctx_dict['tenant'], TENANT)
|
||||
self.assertEqual(ctx_dict['is_admin'], True)
|
||||
self.assertEqual(ctx_dict['show_deleted'], True)
|
||||
self.assertEqual(ctx_dict['read_only'], True)
|
||||
self.assertEqual(ctx_dict['auth_tok'], AUTH_TOK)
|
||||
|
||||
def test_creating_context(self):
|
||||
tmp_ctx_dict = {'user': USER, 'tenant': TENANT, 'is_admin': True,
|
||||
'show_deleted': True, 'read_only': True,
|
||||
'auth_tok': AUTH_TOK }
|
||||
tmp_ctx = context.ReddwarfContext.from_dict(tmp_ctx_dict)
|
||||
self.assertEqual(tmp_ctx.user, USER)
|
||||
self.assertEqual(tmp_ctx.tenant, TENANT)
|
||||
self.assertEqual(tmp_ctx.is_admin, True)
|
||||
self.assertEqual(tmp_ctx.show_deleted, True)
|
||||
self.assertEqual(tmp_ctx.read_only, True)
|
||||
self.assertEqual(tmp_ctx.auth_tok, AUTH_TOK)
|
@ -40,15 +40,6 @@ class ControllerTestBase(tests.BaseTest):
|
||||
self.app = unit.TestApp(reddwarf_app)
|
||||
|
||||
|
||||
class DummyApp(wsgi.Router):
|
||||
|
||||
def __init__(self, controller):
|
||||
mapper = routes.Mapper()
|
||||
mapper.resource("resource", "/resources",
|
||||
controller=controller.create_resource())
|
||||
super(DummyApp, self).__init__(mapper)
|
||||
|
||||
|
||||
class TestInstanceController(ControllerTestBase):
|
||||
|
||||
DUMMY_INSTANCE_ID = "123"
|
||||
@ -65,11 +56,12 @@ class TestInstanceController(ControllerTestBase):
|
||||
self.instances_path = "/tenant/instances"
|
||||
super(TestInstanceController, self).setUp()
|
||||
|
||||
def test_show(self):
|
||||
response = self.app.get("%s/%s" % (self.instances_path,
|
||||
self.DUMMY_INSTANCE_ID),
|
||||
headers={'X-Auth-Token': '123'})
|
||||
self.assertEqual(response.status_int, 404)
|
||||
# TODO(hub-cap): Start testing the failure cases
|
||||
# def test_show_broken(self):
|
||||
# response = self.app.get("%s/%s" % (self.instances_path,
|
||||
# self.DUMMY_INSTANCE_ID),
|
||||
# headers={'X-Auth-Token': '123'})
|
||||
# self.assertEqual(response.status_int, 404)
|
||||
|
||||
def test_show(self):
|
||||
self.mock.StubOutWithMock(models.Instance, 'data')
|
||||
|
36
reddwarf/tests/unit/test_exception.py
Normal file
36
reddwarf/tests/unit/test_exception.py
Normal file
@ -0,0 +1,36 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import unittest
|
||||
|
||||
from reddwarf.common import exception
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ExceptionTest(unittest.TestCase):
|
||||
|
||||
def test_exception_with_message_no_args(self):
|
||||
test_message = "test message no args"
|
||||
exc = exception.ReddwarfError(test_message)
|
||||
self.assertEqual(str(exc), test_message)
|
||||
|
||||
def test_exception_with_message_args(self):
|
||||
test_message = "test message %(one)s %(two)s"
|
||||
test_args = {'one': 1, 'two': 2 }
|
||||
exc = exception.ReddwarfError(test_message, one=1, two=2)
|
||||
self.assertEqual(str(exc), test_message % test_args)
|
99
reddwarf/tests/unit/test_utils.py
Normal file
99
reddwarf/tests/unit/test_utils.py
Normal file
@ -0,0 +1,99 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from reddwarf.common import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestMethodInspector(unittest.TestCase):
|
||||
|
||||
def test_method_without_optional_args(self):
|
||||
def foo(bar):
|
||||
"""This is a method"""
|
||||
|
||||
method = utils.MethodInspector(foo)
|
||||
|
||||
self.assertEqual(method.required_args, ['bar'])
|
||||
self.assertEqual(method.optional_args, [])
|
||||
|
||||
def test_method_with_optional_args(self):
|
||||
def foo(bar, baz=1):
|
||||
"""This is a method"""
|
||||
|
||||
method = utils.MethodInspector(foo)
|
||||
|
||||
self.assertEqual(method.required_args, ['bar'])
|
||||
self.assertEqual(method.optional_args, [('baz', 1)])
|
||||
|
||||
def test_instance_method_with_optional_args(self):
|
||||
class Foo():
|
||||
def bar(self, baz, qux=2):
|
||||
"""This is a method"""
|
||||
|
||||
method = utils.MethodInspector(Foo().bar)
|
||||
|
||||
self.assertEqual(method.required_args, ['baz'])
|
||||
self.assertEqual(method.optional_args, [('qux', 2)])
|
||||
|
||||
def test_method_without_args(self):
|
||||
def foo():
|
||||
"""This is a method"""
|
||||
|
||||
method = utils.MethodInspector(foo)
|
||||
|
||||
self.assertEqual(method.required_args, [])
|
||||
self.assertEqual(method.optional_args, [])
|
||||
|
||||
def test_instance_method_without_args(self):
|
||||
class Foo():
|
||||
def bar(self):
|
||||
"""This is a method"""
|
||||
|
||||
method = utils.MethodInspector(Foo().bar)
|
||||
|
||||
self.assertEqual(method.required_args, [])
|
||||
self.assertEqual(method.optional_args, [])
|
||||
|
||||
def test_method_str(self):
|
||||
class Foo():
|
||||
def bar(self, baz, qux=None):
|
||||
"""This is a method"""
|
||||
|
||||
method = utils.MethodInspector(Foo().bar)
|
||||
|
||||
self.assertEqual(str(method), "bar baz=<baz> [qux=<qux>]")
|
||||
|
||||
|
||||
class StringifyExcludeTest(unittest.TestCase):
|
||||
|
||||
def test_empty_stringify_keys(self):
|
||||
self.assertEqual(utils.stringify_keys(None), None)
|
||||
|
||||
def test_empty_exclude(self):
|
||||
self.assertEqual(utils.exclude(None), None)
|
||||
|
||||
def test_exclude_keys(self):
|
||||
exclude_keys = ['one']
|
||||
key_values = {'one': 1, 'two': 2 }
|
||||
new_keys = utils.exclude(key_values, *exclude_keys)
|
||||
self.assertEqual(len(new_keys), 1)
|
||||
self.assertEqual(new_keys, {'two': 2 })
|
320
reddwarf/tests/unit/test_wsgi.py
Normal file
320
reddwarf/tests/unit/test_wsgi.py
Normal file
@ -0,0 +1,320 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
""" Taken from melange. """
|
||||
import routes
|
||||
import webob
|
||||
import webob.exc
|
||||
import webtest
|
||||
|
||||
from reddwarf.common import wsgi
|
||||
from reddwarf import tests
|
||||
|
||||
|
||||
class StubApp(object):
|
||||
|
||||
def __init__(self):
|
||||
self.called = False
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
self.environ = environ
|
||||
self.start_response = start_response
|
||||
self.called = True
|
||||
|
||||
|
||||
class StubUrlMap(StubApp, dict):
|
||||
|
||||
def __init__(self, dictionary):
|
||||
self.update(dictionary)
|
||||
super(StubUrlMap, self).__init__()
|
||||
|
||||
|
||||
class VersionedURLMapTest(tests.BaseTest):
|
||||
|
||||
def setUp(self):
|
||||
self.v1_app = StubApp()
|
||||
self.v2_app = StubApp()
|
||||
self.root_app = StubApp()
|
||||
self.urlmap = StubUrlMap({'/v2.0': self.v2_app,
|
||||
'/v1.0': self.v1_app,
|
||||
'/': self.root_app})
|
||||
self.versioned_urlmap = wsgi.VersionedURLMap(self.urlmap)
|
||||
super(VersionedURLMapTest, self).setUp()
|
||||
|
||||
def test_chooses_app_based_on_accept_version(self):
|
||||
environ = {'HTTP_ACCEPT': "application/vnd.openstack.reddwarf+xml;"
|
||||
"version=1.0",
|
||||
'PATH_INFO': "/resource"}
|
||||
self.versioned_urlmap(environ=environ, start_response=None)
|
||||
|
||||
self.assertTrue(self.v1_app.called)
|
||||
|
||||
def test_delegates_to_urlmapper_when_accept_header_is_absent(self):
|
||||
self.versioned_urlmap(environ={'PATH_INFO': "/resource"},
|
||||
start_response=None)
|
||||
|
||||
self.assertTrue(self.urlmap.called)
|
||||
|
||||
def test_delegates_to_urlmapper_for_std_accept_headers_with_version(self):
|
||||
environ = {
|
||||
'HTTP_ACCEPT': "application/json;version=1.0",
|
||||
'PATH_INFO': "/resource",
|
||||
}
|
||||
|
||||
self.versioned_urlmap(environ=environ, start_response=None)
|
||||
|
||||
self.assertTrue(self.urlmap.called)
|
||||
|
||||
def test_delegates_to_urlmapper_for_nonexistant_version_of_app(self):
|
||||
environ = {
|
||||
'HTTP_ACCEPT': "application/vnd.openstack.reddwarf+xml;"
|
||||
"version=9.0", 'REQUEST_METHOD': "GET",
|
||||
'PATH_INFO': "/resource.xml",
|
||||
}
|
||||
|
||||
def assert_status(status, *args):
|
||||
self.assertEqual(status, "406 Not Acceptable")
|
||||
|
||||
self.versioned_urlmap(environ=environ, start_response=assert_status)
|
||||
|
||||
def test_delegates_to_urlmapper_when_url_versioned(self):
|
||||
environ = {
|
||||
'HTTP_ACCEPT': "application/vnd.openstack.reddwarf+xml;"
|
||||
"version=2.0",
|
||||
'PATH_INFO': "/v1.0/resource",
|
||||
}
|
||||
|
||||
self.versioned_urlmap(environ=environ, start_response=None)
|
||||
|
||||
self.assertTrue(self.urlmap.called)
|
||||
|
||||
|
||||
class RequestTest(tests.BaseTest):
|
||||
|
||||
def test_content_type_from_accept_header(self):
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = "application/xml"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/xml")
|
||||
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = "application/json"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = "application/xml, application/json"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = \
|
||||
"application/json; q=0.3, application/xml; q=0.9"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/xml")
|
||||
|
||||
def test_content_type_from_accept_header_with_versioned_mimetype(self):
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = \
|
||||
"application/vnd.openstack.reddwarf+xml;version=66.0"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/xml")
|
||||
|
||||
request = wsgi.Request.blank('/tests/123')
|
||||
request.headers["Accept"] = \
|
||||
"application/vnd.openstack.reddwarf+json;version=96.0"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
|
||||
def test_content_type_from_query_extension(self):
|
||||
request = wsgi.Request.blank('/tests/123.xml')
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/xml")
|
||||
|
||||
request = wsgi.Request.blank('/tests/123.json')
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
|
||||
request = wsgi.Request.blank('/tests/123.invalid')
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
|
||||
def test_content_type_accept_and_query_extension(self):
|
||||
request = wsgi.Request.blank('/tests/123.xml')
|
||||
request.headers["Accept"] = "application/json"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/xml")
|
||||
|
||||
def test_content_type_accept_default(self):
|
||||
request = wsgi.Request.blank('/tests/123.unsupported')
|
||||
request.headers["Accept"] = "application/unsupported1"
|
||||
result = request.best_match_content_type()
|
||||
self.assertEqual(result, "application/json")
|
||||
|
||||
def test_accept_version_for_custom_mime_type(self):
|
||||
environ = {'HTTP_ACCEPT': "application/vnd.openstack.reddwarf+xml;"
|
||||
"version=1.0"}
|
||||
request = wsgi.Request(environ=environ)
|
||||
|
||||
self.assertEqual(request.accept_version, "1.0")
|
||||
|
||||
def test_accept_version_from_first_custom_mime_type(self):
|
||||
environ = {'HTTP_ACCEPT': "application/json;version=2.0, "
|
||||
"application/vnd.openstack.reddwarf+xml;version=1.0, "
|
||||
"application/vnd.openstack.reddwarf+json;version=4.0"}
|
||||
request = wsgi.Request(environ=environ)
|
||||
|
||||
self.assertEqual(request.accept_version, "1.0")
|
||||
|
||||
def test_accept_version_is_none_for_standard_mime_type(self):
|
||||
environ = {'HTTP_ACCEPT': "application/json;"
|
||||
"version=1.0"}
|
||||
request = wsgi.Request(environ=environ)
|
||||
|
||||
self.assertIsNone(request.accept_version)
|
||||
|
||||
def test_accept_version_is_none_for_invalid_mime_type(self):
|
||||
environ = {'HTTP_ACCEPT': "glibberish;"
|
||||
"version=1.0"}
|
||||
request = wsgi.Request(environ=environ)
|
||||
|
||||
self.assertIsNone(request.accept_version)
|
||||
|
||||
def test_accept_version_none_when_mime_type_doesnt_specify_version(self):
|
||||
environ = {'HTTP_ACCEPT': "application/vnd.openstack.reddwarf+xml"}
|
||||
request = wsgi.Request(environ=environ)
|
||||
|
||||
self.assertIsNone(request.accept_version)
|
||||
|
||||
def test_accept_version_is_none_when_accept_header_is_absent(self):
|
||||
request = wsgi.Request(environ={})
|
||||
|
||||
self.assertIsNone(request.accept_version)
|
||||
|
||||
def test_accept_version_is_none_for_mime_type_with_invalid_version(self):
|
||||
environ = {'HTTP_ACCEPT': "application/vnd.openstack.reddwarf+xml;"
|
||||
"version=foo.bar"}
|
||||
request = wsgi.Request(environ=environ)
|
||||
|
||||
self.assertIsNone(request.accept_version)
|
||||
|
||||
def test_url_version_for_versioned_url(self):
|
||||
request = wsgi.Request.blank("/v1.0/resource")
|
||||
|
||||
self.assertEqual(request.url_version, "1.0")
|
||||
|
||||
def test_url_version_for_non_versioned_url_is_none(self):
|
||||
request = wsgi.Request.blank("/resource")
|
||||
|
||||
self.assertIsNone(request.url_version)
|
||||
|
||||
def test_request_params_returns_non_unicode_strings(self):
|
||||
request = wsgi.Request.blank("/resource?x=y&a=b")
|
||||
for key in request.params:
|
||||
self.assertEqual(type(key), str)
|
||||
|
||||
|
||||
class DummyApp(wsgi.Router):
|
||||
|
||||
def __init__(self):
|
||||
mapper = routes.Mapper()
|
||||
controller = StubController()
|
||||
mapper.resource("resource", "/resources",
|
||||
controller=controller.create_resource())
|
||||
super(DummyApp, self).__init__(mapper)
|
||||
|
||||
|
||||
class StubController(wsgi.Controller):
|
||||
|
||||
def index(self, request, format=None):
|
||||
return {'fort': 'knox'}
|
||||
|
||||
|
||||
class TestController(tests.BaseTest):
|
||||
|
||||
def test_response_content_type_matches_accept_header(self):
|
||||
app = webtest.TestApp(DummyApp())
|
||||
|
||||
response = app.get("/resources", headers={'Accept': "application/xml"})
|
||||
|
||||
self.assertEqual(response.content_type, "application/xml")
|
||||
self.assertEqual(response.xml.tag, "fort")
|
||||
self.assertEqual(response.xml.text.strip(), "knox")
|
||||
|
||||
def test_response_content_type_matches_url_format_over_accept_header(self):
|
||||
app = webtest.TestApp(DummyApp())
|
||||
|
||||
response = app.get("/resources.json",
|
||||
headers={'Accept': "application/xml"})
|
||||
|
||||
self.assertEqual(response.content_type, "application/json")
|
||||
self.assertEqual(response.json, {'fort': 'knox'})
|
||||
|
||||
def test_returns_404_if_action_not_implemented(self):
|
||||
app = webtest.TestApp(DummyApp())
|
||||
|
||||
response = app.get("/resources/new", status='*')
|
||||
|
||||
self.assertEqual(response.status_int, 404)
|
||||
|
||||
|
||||
class TestFault(tests.BaseTest):
|
||||
|
||||
def test_fault_wraps_webob_exception(self):
|
||||
app = webtest.TestApp(wsgi.Fault(webob.exc.HTTPNotFound("some error")))
|
||||
response = app.get("/", status="*")
|
||||
self.assertEqual(response.status_int, 404)
|
||||
self.assertEqual(response.content_type, "application/json")
|
||||
self.assertEqual(response.json['NotFound'],
|
||||
dict(code=404,
|
||||
message="The resource could not be found.",
|
||||
detail="some error"))
|
||||
|
||||
def test_fault_gives_back_xml(self):
|
||||
app = webtest.TestApp(wsgi.Fault(
|
||||
webob.exc.HTTPBadRequest("some error")))
|
||||
response = app.get("/x.xml", status="*")
|
||||
self.assertEqual(response.content_type, "application/xml")
|
||||
self.assertEqual(response.xml.tag, 'BadRequest')
|
||||
self.assertEqual(response.xml.attrib['code'], '400')
|
||||
self.assertEqual(response.xml.find('detail').text.strip(),
|
||||
'some error')
|
||||
|
||||
|
||||
class TestResult(tests.BaseTest):
|
||||
|
||||
class TestData(object):
|
||||
|
||||
def data_for_json(self):
|
||||
return {'foo': "bar", 'foo2': "bar2"}
|
||||
|
||||
def data_for_xml(self):
|
||||
return {'foos': [{'foo': "bar"}, {'foo2': "bar2"}]}
|
||||
|
||||
def test_data_returns_back_input_data(self):
|
||||
self.assertEqual(wsgi.Result("blah").data("application/json"), "blah")
|
||||
self.assertEqual(wsgi.Result({'x': "blah"}).data("application/json"),
|
||||
{'x': "blah"})
|
||||
self.assertEqual(wsgi.Result(["x", "blah"]).data("application/xml"),
|
||||
["x", "blah"])
|
||||
|
||||
def test_data_returns_json_specific_input_data(self):
|
||||
self.assertEqual(wsgi.Result(self.TestData()).data("application/json"),
|
||||
{'foo': "bar", 'foo2': "bar2"})
|
||||
|
||||
def test_data_returns_xml_specific_input_data(self):
|
||||
self.assertEqual(wsgi.Result(self.TestData()).data("application/xml"),
|
||||
{'foos': [{'foo': "bar"}, {'foo2': "bar2"}]})
|
Loading…
x
Reference in New Issue
Block a user