Rajaram/Vinkesh | Copied tests for wsgi from nova. Added default content/accept types in Request which can be overridden by projects. Copied tests for XML serialization of Extension Controller's action from nova

This commit is contained in:
Rajaram Mallya
2011-09-08 18:29:40 +05:30
parent 87dceb46c4
commit 02c95aeb2f
6 changed files with 700 additions and 134 deletions

View File

@@ -139,5 +139,9 @@ class OpenstackException(Exception):
return self._error_string
class MalformedRequestBody(OpenstackException):
message = "Malformed message body: %(reason)s"
class InvalidContentType(OpenstackException):
message = "Invalid content type %(content_type)s"

View File

@@ -27,8 +27,9 @@ from lxml import etree
from openstack.common import exception
from openstack.common import wsgi
LOG = logging.getLogger('extensions')
DEFAULT_XMLNS = "http://docs.openstack.org/"
XMLNS_ATOM = "http://www.w3.org/2005/Atom"
class ExtensionDescriptor(object):
@@ -166,6 +167,9 @@ class ExtensionsResource(wsgi.Resource):
def __init__(self, extension_manager):
self.extension_manager = extension_manager
body_serializers = {'application/xml': ExtensionsXMLSerializer()}
serializer = wsgi.ResponseSerializer(body_serializers=body_serializers)
super(ExtensionsResource, self).__init__(self, None, serializer)
def _translate(self, ext):
ext_data = {}
@@ -342,8 +346,11 @@ class ExtensionManager(object):
def get_resources(self):
"""Returns a list of ResourceExtension objects."""
resources = []
resources.append(ResourceExtension('extensions',
ExtensionsResource(self)))
extension_resource = ExtensionsResource(self)
res_ext = ResourceExtension('extensions',
extension_resource,
serializer=extension_resource.serializer)
resources.append(res_ext)
for alias, ext in self.extensions.iteritems():
try:
resources.extend(ext.get_resources())
@@ -486,7 +493,7 @@ class ResourceExtension(object):
class ExtensionsXMLSerializer(wsgi.XMLDictSerializer):
# NSMAP = {None: xmlutil.XMLNS_V11, 'atom': xmlutil.XMLNS_ATOM}
NSMAP = {None: DEFAULT_XMLNS, 'atom': XMLNS_ATOM}
def show(self, ext_dict):
ext = etree.Element('extension', nsmap=self.NSMAP)
@@ -511,7 +518,7 @@ class ExtensionsXMLSerializer(wsgi.XMLDictSerializer):
desc.text = ext_dict['description']
ext_elem.append(desc)
for link in ext_dict.get('links', []):
elem = etree.SubElement(ext_elem, '{%s}link' % xmlutil.XMLNS_ATOM)
elem = etree.SubElement(ext_elem, '{%s}link' % XMLNS_ATOM)
elem.set('rel', link['rel'])
elem.set('href', link['href'])
elem.set('type', link['type'])

View File

@@ -210,31 +210,47 @@ class Router(object):
class Request(webob.Request):
"""Add some Openstack API-specific logic to the base webob.Request."""
default_request_content_types = ('application/json', 'application/xml')
default_accept_types = ('application/json', 'application/xml')
default_accept_type = 'application/json'
def best_match_content_type(self, supported_content_types=None):
"""Determine the requested response content-type."""
supported_content_types = (supported_content_types
or ("application/xml",
"application/json",))
"""Determine the requested response content-type.
Based on the query extension then the Accept header.
Defaults to default_accept_type if we don't find a preference
"""
supported_content_types = (supported_content_types or
self.default_accept_types)
parts = self.path.rsplit('.', 1)
if len(parts) > 1:
ctype = 'application/{0}'.format(parts[1])
if ctype in supported_content_types:
return ctype
bm = self.accept.best_match(supported_content_types)
return bm or 'application/json'
return bm or self.default_accept_type
def get_content_type(self, allowed_content_types=None):
"""Determine content type of the request body."""
"""Determine content type of the request body.
allowed_content_types = allowed_content_types or ("application/xml",
"application/json",)
Does not do any body introspection, only checks header
"""
if not "Content-Type" in self.headers:
raise exception.InvalidContentType(content_type=None)
return None
content_type = self.content_type
allowed_content_types = (allowed_content_types or
self.default_request_content_types)
if content_type not in allowed_content_types:
raise exception.InvalidContentType(content_type=content_type)
else:
return content_type
return content_type
class Resource(object):
@@ -269,11 +285,19 @@ class Resource(object):
@webob.dec.wsgify(RequestClass=Request)
def __call__(self, request):
"""WSGI method that controls (de)serialization and method dispatch."""
action, action_args, accept = self.deserialize_request(request)
try:
action, action_args, accept = self.deserialize_request(request)
except exception.InvalidContentType:
msg = _("Unsupported Content-Type")
return webob.exc.HTTPUnsupportedMediaType(explanation=msg)
except exception.MalformedRequestBody:
msg = _("Malformed request body")
return webob.exc.HTTPBadRequest(explanation=msg)
action_result = self.execute_action(action, request, **action_args)
try:
return self.serialize_response(action, action_result, accept)
# return unserializable result (typically a webob exc)
except Exception:
return action_result
@@ -329,107 +353,6 @@ class ActionDispatcher(object):
raise NotImplementedError()
class TextDeserializer(ActionDispatcher):
"""Default request body deserialization"""
def deserialize(self, datastring, action='default'):
return self.dispatch(datastring, action=action)
def default(self, datastring):
return {}
class JSONDeserializer(TextDeserializer):
def _from_json(self, datastring):
try:
return json.loads(datastring)
except ValueError:
msg = _("cannot understand JSON")
raise exception.MalformedRequestBody(reason=msg)
def default(self, datastring):
return {'body': self._from_json(datastring)}
class XMLDeserializer(TextDeserializer):
def __init__(self, metadata=None):
"""
:param metadata: information needed to deserialize xml into
a dictionary.
"""
super(XMLDeserializer, self).__init__()
self.metadata = metadata or {}
def _from_xml(self, datastring):
plurals = set(self.metadata.get('plurals', {}))
try:
node = minidom.parseString(datastring).childNodes[0]
return {node.nodeName: self._from_xml_node(node, plurals)}
except expat.ExpatError:
msg = _("cannot understand XML")
raise exception.MalformedRequestBody(reason=msg)
def _from_xml_node(self, node, listnames):
"""Convert a minidom node to a simple Python type.
:param listnames: list of XML node names whose subnodes should
be considered list items.
"""
if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3:
return node.childNodes[0].nodeValue
elif node.nodeName in listnames:
return [self._from_xml_node(n, listnames) for n in node.childNodes]
else:
result = dict()
for attr in node.attributes.keys():
result[attr] = node.attributes[attr].nodeValue
for child in node.childNodes:
if child.nodeType != node.TEXT_NODE:
result[child.nodeName] = self._from_xml_node(child,
listnames)
return result
def find_first_child_named(self, parent, name):
"""Search a nodes children for the first child with a given name"""
for node in parent.childNodes:
if node.nodeName == name:
return node
return None
def find_children_named(self, parent, name):
"""Return all of a nodes children who have the given name"""
for node in parent.childNodes:
if node.nodeName == name:
yield node
def extract_text(self, node):
"""Get the text field contained by the given node"""
if len(node.childNodes) == 1:
child = node.childNodes[0]
if child.nodeType == child.TEXT_NODE:
return child.nodeValue
return ""
def default(self, datastring):
return {'body': self._from_xml(datastring)}
class MetadataXMLDeserializer(XMLDeserializer):
def extract_metadata(self, metadata_node):
"""Marshal the metadata attribute of a parsed request"""
metadata = {}
if metadata_node is not None:
for meta_node in self.find_children_named(metadata_node, "meta"):
key = meta_node.getAttribute("key")
metadata[key] = self.extract_text(meta_node)
return metadata
class DictSerializer(ActionDispatcher):
"""Default request body serialization"""
@@ -619,8 +542,7 @@ class RequestDeserializer(object):
def __init__(self, body_deserializers=None, headers_deserializer=None,
supported_content_types=None):
self.supported_content_types = supported_content_types or \
('application/json', 'application/xml')
self.supported_content_types = supported_content_types
self.body_deserializers = {
'application/xml': XMLDeserializer(),
@@ -662,7 +584,7 @@ class RequestDeserializer(object):
content_type = request.get_content_type()
except exception.InvalidContentType:
LOG.debug(_("Unrecognized Content-Type provided in request"))
return {}
raise
if content_type is None:
LOG.debug(_("No Content-Type provided in request"))
@@ -703,3 +625,93 @@ class RequestDeserializer(object):
pass
return args
class TextDeserializer(ActionDispatcher):
"""Default request body deserialization"""
def deserialize(self, datastring, action='default'):
return self.dispatch(datastring, action=action)
def default(self, datastring):
return {}
class JSONDeserializer(TextDeserializer):
def _from_json(self, datastring):
try:
return json.loads(datastring)
except ValueError:
msg = _("cannot understand JSON")
raise exception.MalformedRequestBody(reason=msg)
def default(self, datastring):
return {'body': self._from_json(datastring)}
class XMLDeserializer(TextDeserializer):
def __init__(self, metadata=None):
"""
:param metadata: information needed to deserialize xml into
a dictionary.
"""
super(XMLDeserializer, self).__init__()
self.metadata = metadata or {}
def _from_xml(self, datastring):
plurals = set(self.metadata.get('plurals', {}))
try:
node = minidom.parseString(datastring).childNodes[0]
return {node.nodeName: self._from_xml_node(node, plurals)}
except expat.ExpatError:
msg = _("cannot understand XML")
raise exception.MalformedRequestBody(reason=msg)
def _from_xml_node(self, node, listnames):
"""Convert a minidom node to a simple Python type.
:param listnames: list of XML node names whose subnodes should
be considered list items.
"""
if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3:
return node.childNodes[0].nodeValue
elif node.nodeName in listnames:
return [self._from_xml_node(n, listnames) for n in node.childNodes]
else:
result = dict()
for attr in node.attributes.keys():
result[attr] = node.attributes[attr].nodeValue
for child in node.childNodes:
if child.nodeType != node.TEXT_NODE:
result[child.nodeName] = self._from_xml_node(child,
listnames)
return result
def find_first_child_named(self, parent, name):
"""Search a nodes children for the first child with a given name"""
for node in parent.childNodes:
if node.nodeName == name:
return node
return None
def find_children_named(self, parent, name):
"""Return all of a nodes children who have the given name"""
for node in parent.childNodes:
if node.nodeName == name:
yield node
def extract_text(self, node):
"""Get the text field contained by the given node"""
if len(node.childNodes) == 1:
child = node.childNodes[0]
if child.nodeType == child.TEXT_NODE:
return child.nodeValue
return ""
def default(self, datastring):
return {'body': self._from_xml(datastring)}

View File

@@ -46,7 +46,7 @@ class StubBaseAppController(object):
def show(self, request, id):
return {'fort': 'knox'}
def update(self, request, id):
def update(self, request, id, body=None):
return {'uneditable': 'original_value'}
def create_resource(self):

View File

@@ -15,11 +15,14 @@
# under the License.
import json
from lxml import etree
import os.path
import routes
import unittest
from webtest import TestApp
from openstack.common import wsgi
from openstack.common import config
from openstack.common import extensions
@@ -33,6 +36,9 @@ test_conf_file = os.path.join(os.path.dirname(__file__), os.pardir,
os.pardir, 'etc', 'openstack-common.conf.test')
extensions_path = os.path.join(os.path.dirname(__file__), "extensions")
NS = "{http://docs.openstack.org/}"
ATOMNS = "{http://www.w3.org/2005/Atom}"
class ExtensionsTestApp(wsgi.Router):
@@ -254,19 +260,21 @@ class RequestExtensionTest(unittest.TestCase):
def _update_handler(req, res):
data = json.loads(res.body)
data['uneditable'] = req.params['uneditable']
data['uneditable'] = json.loads(req.body)['uneditable']
res.body = json.dumps(data)
return res
base_app = TestApp(setup_base_app())
response = base_app.put("/dummy_resources/1",
{'uneditable': "new_value"})
json.dumps({'uneditable': "new_value"}),
headers={'Content-Type': "application/json"})
self.assertEqual(response.json['uneditable'], "original_value")
ext_app = self._setup_app_with_request_handler(_update_handler,
'PUT')
ext_response = ext_app.put("/dummy_resources/1",
{'uneditable': "new_value"})
json.dumps({'uneditable': "new_value"}),
headers={'Content-Type': "application/json"})
self.assertEqual(ext_response.json['uneditable'], "new_value")
def _setup_app_with_request_handler(self, handler, verb):
@@ -306,23 +314,145 @@ class ExtensionControllerTest(unittest.TestCase):
response = self.test_app.get("/extensions")
foxnsox = response.json["extensions"][0]
self.assertEqual(foxnsox["alias"], "FOXNSOX")
self.assertEqual(foxnsox["namespace"],
"http://www.fox.in.socks/api/ext/pie/v1.0")
self.assertEqual(foxnsox, {
'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
'name': 'Fox In Socks',
'updated': '2011-01-22T13:25:27-06:00',
'description': 'The Fox In Socks Extension',
'alias': 'FOXNSOX',
'links': []
}
)
def test_extension_can_be_accessed_by_alias(self):
json_response = self.test_app.get("/extensions/FOXNSOX").json
foxnsox_extension = json_response['extension']
foxnsox = json_response['extension']
self.assertEqual(foxnsox_extension["alias"], "FOXNSOX")
self.assertEqual(foxnsox_extension["namespace"],
"http://www.fox.in.socks/api/ext/pie/v1.0")
self.assertEqual(foxnsox, {
'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
'name': 'Fox In Socks',
'updated': '2011-01-22T13:25:27-06:00',
'description': 'The Fox In Socks Extension',
'alias': 'FOXNSOX',
'links': []
}
)
def test_show_returns_not_found_for_non_existant_extension(self):
response = self.test_app.get("/extensions/non_existant", status="*")
self.assertEqual(response.status_int, 404)
def test_list_extensions_xml(self):
response = self.test_app.get("/extensions.xml")
self.assertEqual(200, response.status_int)
root = etree.XML(response.body)
self.assertEqual(root.tag.split('extensions')[0], NS)
# Make sure that Fox in Sox extension is correct.
exts = root.findall('{0}extension'.format(NS))
fox_ext = exts[0]
self.assertEqual(fox_ext.get('name'), 'Fox In Socks')
self.assertEqual(fox_ext.get('namespace'),
'http://www.fox.in.socks/api/ext/pie/v1.0')
self.assertEqual(fox_ext.get('updated'), '2011-01-22T13:25:27-06:00')
self.assertEqual(fox_ext.findtext('{0}description'.format(NS)),
'The Fox In Socks Extension')
def test_get_extension_xml(self):
response = self.test_app.get("/extensions/FOXNSOX.xml")
self.assertEqual(200, response.status_int)
xml = response.body
root = etree.XML(xml)
self.assertEqual(root.tag.split('extension')[0], NS)
self.assertEqual(root.get('alias'), 'FOXNSOX')
self.assertEqual(root.get('name'), 'Fox In Socks')
self.assertEqual(root.get('namespace'),
'http://www.fox.in.socks/api/ext/pie/v1.0')
self.assertEqual(root.get('updated'), '2011-01-22T13:25:27-06:00')
self.assertEqual(root.findtext('{0}description'.format(NS)),
'The Fox In Socks Extension')
class ExtensionsXMLSerializerTest(unittest.TestCase):
def test_serialize_extenstion(self):
serializer = extensions.ExtensionsXMLSerializer()
data = {'extension': {
'name': 'ext1',
'namespace': 'http://docs.rack.com/servers/api/ext/pie/v1.0',
'alias': 'RS-PIE',
'updated': '2011-01-22T13:25:27-06:00',
'description': 'Adds the capability to share an image.',
'links': [{'rel': 'describedby',
'type': 'application/pdf',
'href': 'http://docs.rack.com/servers/api/ext/cs.pdf'},
{'rel': 'describedby',
'type': 'application/vnd.sun.wadl+xml',
'href': 'http://docs.rack.com/servers/api/ext/cs.wadl'}]}}
xml = serializer.serialize(data, 'show')
root = etree.XML(xml)
ext_dict = data['extension']
self.assertEqual(root.findtext('{0}description'.format(NS)),
ext_dict['description'])
for key in ['name', 'namespace', 'alias', 'updated']:
self.assertEqual(root.get(key), ext_dict[key])
link_nodes = root.findall('{0}link'.format(ATOMNS))
self.assertEqual(len(link_nodes), 2)
for i, link in enumerate(ext_dict['links']):
for key, value in link.items():
self.assertEqual(link_nodes[i].get(key), value)
def test_serialize_extensions(self):
serializer = extensions.ExtensionsXMLSerializer()
data = {"extensions": [{
"name": "Public Image Extension",
"namespace": "http://foo.com/api/ext/pie/v1.0",
"alias": "RS-PIE",
"updated": "2011-01-22T13:25:27-06:00",
"description": "Adds the capability to share an image.",
"links": [{"rel": "describedby",
"type": "application/pdf",
"type": "application/vnd.sun.wadl+xml",
"href": "http://foo.com/api/ext/cs-pie.pdf"},
{"rel": "describedby",
"type": "application/vnd.sun.wadl+xml",
"href": "http://foo.com/api/ext/cs-pie.wadl"}]},
{"name": "Cloud Block Storage",
"namespace": "http://foo.com/api/ext/cbs/v1.0",
"alias": "RS-CBS",
"updated": "2011-01-12T11:22:33-06:00",
"description": "Allows mounting cloud block storage.",
"links": [{"rel": "describedby",
"type": "application/pdf",
"href": "http://foo.com/api/ext/cs-cbs.pdf"},
{"rel": "describedby",
"type": "application/vnd.sun.wadl+xml",
"href": "http://foo.com/api/ext/cs-cbs.wadl"}]}]}
xml = serializer.serialize(data, 'index')
root = etree.XML(xml)
ext_elems = root.findall('{0}extension'.format(NS))
self.assertEqual(len(ext_elems), 2)
for i, ext_elem in enumerate(ext_elems):
ext_dict = data['extensions'][i]
self.assertEqual(ext_elem.findtext('{0}description'.format(NS)),
ext_dict['description'])
for key in ['name', 'namespace', 'alias', 'updated']:
self.assertEqual(ext_elem.get(key), ext_dict[key])
link_nodes = ext_elem.findall('{0}link'.format(ATOMNS))
self.assertEqual(len(link_nodes), 2)
for i, link in enumerate(ext_dict['links']):
for key, value in link.items():
self.assertEqual(link_nodes[i].get(key), value)
def app_factory(global_conf, **local_conf):
conf = global_conf.copy()

413
tests/unit/test_wsgi.py Normal file
View File

@@ -0,0 +1,413 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import webob
from openstack.common import exception
from openstack.common import wsgi
class RequestTest(unittest.TestCase):
def test_content_type_missing(self):
request = wsgi.Request.blank('/tests/123', method='POST')
request.body = "<body />"
self.assertEqual(None, request.get_content_type())
def test_content_type_unsupported(self):
request = wsgi.Request.blank('/tests/123', method='POST')
request.headers["Content-Type"] = "text/html"
request.body = "asdf<br />"
self.assertRaises(exception.InvalidContentType,
request.get_content_type)
def test_content_type_with_charset(self):
request = wsgi.Request.blank('/tests/123')
request.headers["Content-Type"] = "application/json; charset=UTF-8"
result = request.get_content_type()
self.assertEqual(result, "application/json")
def test_content_type_with_given_content_types(self):
request = wsgi.Request.blank('/tests/123')
request.headers["Content-Type"] = "application/new-type;"
result = request.get_content_type(["application/json",
"application/new-type"])
self.assertEqual(result, "application/new-type")
def test_content_type_from_accept_xml(self):
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = "application/xml"
result = request.best_match_content_type()
self.assertEqual(result, "application/xml")
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = "application/json"
result = request.best_match_content_type()
self.assertEqual(result, "application/json")
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = "application/xml, application/json"
result = request.best_match_content_type()
self.assertEqual(result, "application/json")
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = \
"application/json; q=0.3, application/xml; q=0.9"
result = request.best_match_content_type()
self.assertEqual(result, "application/xml")
def test_content_type_from_query_extension(self):
request = wsgi.Request.blank('/tests/123.xml')
result = request.best_match_content_type()
self.assertEqual(result, "application/xml")
request = wsgi.Request.blank('/tests/123.json')
result = request.best_match_content_type()
self.assertEqual(result, "application/json")
request = wsgi.Request.blank('/tests/123.invalid')
result = request.best_match_content_type()
self.assertEqual(result, "application/json")
def test_content_type_accept_and_query_extension(self):
request = wsgi.Request.blank('/tests/123.xml')
request.headers["Accept"] = "application/json"
result = request.best_match_content_type()
self.assertEqual(result, "application/xml")
def test_content_type_accept_default(self):
request = wsgi.Request.blank('/tests/123.unsupported')
request.headers["Accept"] = "application/unsupported1"
result = request.best_match_content_type()
self.assertEqual(result, "application/json")
def test_content_type_accept_with_given_content_types(self):
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = "application/new_type"
result = request.best_match_content_type(["application/new_type"])
self.assertEqual(result, "application/new_type")
class ActionDispatcherTest(unittest.TestCase):
def test_dispatch(self):
serializer = wsgi.ActionDispatcher()
serializer.create = lambda x: x
self.assertEqual(serializer.dispatch('pants', action='create'),
'pants')
def test_dispatch_action_None(self):
serializer = wsgi.ActionDispatcher()
serializer.create = lambda x: x + ' pants'
serializer.default = lambda x: x + ' trousers'
self.assertEqual(serializer.dispatch('Two', action=None),
'Two trousers')
def test_dispatch_default(self):
serializer = wsgi.ActionDispatcher()
serializer.create = lambda x: x + ' pants'
serializer.default = lambda x: x + ' trousers'
self.assertEqual(serializer.dispatch('Two', action='update'),
'Two trousers')
class ResponseHeadersSerializerTest(unittest.TestCase):
def test_default(self):
serializer = wsgi.ResponseHeadersSerializer()
response = webob.Response()
serializer.serialize(response, {'v': '123'}, 'asdf')
self.assertEqual(response.status_int, 200)
def test_custom(self):
class Serializer(wsgi.ResponseHeadersSerializer):
def update(self, response, data):
response.status_int = 404
response.headers['X-Custom-Header'] = data['v']
serializer = Serializer()
response = webob.Response()
serializer.serialize(response, {'v': '123'}, 'update')
self.assertEqual(response.status_int, 404)
self.assertEqual(response.headers['X-Custom-Header'], '123')
class DictSerializerTest(unittest.TestCase):
def test_dispatch_default(self):
serializer = wsgi.DictSerializer()
self.assertEqual(serializer.serialize({}, 'NonExistantAction'), '')
class XMLDictSerializerTest(unittest.TestCase):
def test_xml(self):
input_dict = dict(servers=dict(a=(2, 3)))
expected_xml = """<servers xmlns="asdf">
<a>(2,3)</a>
</servers>"""
serializer = wsgi.XMLDictSerializer(xmlns="asdf")
result = serializer.serialize(input_dict)
result = result.replace('\n', '').replace(' ', '')
expected_xml = expected_xml.replace('\n', '').replace(' ', '')
self.assertEqual(result, expected_xml)
class JSONDictSerializerTest(unittest.TestCase):
def test_json(self):
input_dict = dict(servers=dict(a=(2, 3)))
expected_json = '{"servers":{"a":[2,3]}}'
serializer = wsgi.JSONDictSerializer()
result = serializer.serialize(input_dict)
result = result.replace('\n', '').replace(' ', '')
self.assertEqual(result, expected_json)
class TextDeserializerTest(unittest.TestCase):
def test_dispatch_default(self):
deserializer = wsgi.TextDeserializer()
self.assertEqual(deserializer.deserialize({}, 'update'), {})
class JSONDeserializerTest(unittest.TestCase):
def test_json(self):
data = """{"a": {
"a1": "1",
"a2": "2",
"bs": ["1", "2", "3", {"c": {"c1": "1"}}],
"d": {"e": "1"},
"f": "1"}}"""
as_dict = {
'body': {
'a': {
'a1': '1',
'a2': '2',
'bs': ['1', '2', '3', {'c': {'c1': '1'}}],
'd': {'e': '1'},
'f': '1',
},
},
}
deserializer = wsgi.JSONDeserializer()
self.assertEqual(deserializer.deserialize(data), as_dict)
class XMLDeserializerTest(unittest.TestCase):
def test_xml(self):
xml = """
<a a1="1" a2="2">
<bs><b>1</b><b>2</b><b>3</b><b><c c1="1"/></b></bs>
<d><e>1</e></d>
<f>1</f>
</a>
""".strip()
as_dict = {
'body': {
'a': {
'a1': '1',
'a2': '2',
'bs': ['1', '2', '3', {'c': {'c1': '1'}}],
'd': {'e': '1'},
'f': '1',
},
},
}
metadata = {'plurals': {'bs': 'b', 'ts': 't'}}
deserializer = wsgi.XMLDeserializer(metadata=metadata)
self.assertEqual(deserializer.deserialize(xml), as_dict)
def test_xml_empty(self):
xml = '<a></a>'
as_dict = {"body": {"a": {}}}
deserializer = wsgi.XMLDeserializer()
self.assertEqual(deserializer.deserialize(xml), as_dict)
class RequestHeadersDeserializerTest(unittest.TestCase):
def test_default(self):
deserializer = wsgi.RequestHeadersDeserializer()
req = wsgi.Request.blank('/')
self.assertEqual(deserializer.deserialize(req, 'nonExistant'), {})
def test_custom(self):
class Deserializer(wsgi.RequestHeadersDeserializer):
def update(self, request):
return {'a': request.headers['X-Custom-Header']}
deserializer = Deserializer()
req = wsgi.Request.blank('/')
req.headers['X-Custom-Header'] = 'b'
self.assertEqual(deserializer.deserialize(req, 'update'), {'a': 'b'})
class ResponseSerializerTest(unittest.TestCase):
def setUp(self):
class JSONSerializer(object):
def serialize(self, data, action='default'):
return 'pew_json'
class XMLSerializer(object):
def serialize(self, data, action='default'):
return 'pew_xml'
class HeadersSerializer(object):
def serialize(self, response, data, action):
response.status_int = 404
self.body_serializers = {
'application/json': JSONSerializer(),
'application/XML': XMLSerializer(),
}
self.serializer = wsgi.ResponseSerializer(self.body_serializers,
HeadersSerializer())
def tearDown(self):
pass
def test_get_serializer(self):
ctype = 'application/json'
self.assertEqual(self.serializer.get_body_serializer(ctype),
self.body_serializers[ctype])
def test_get_serializer_unknown_content_type(self):
self.assertRaises(exception.InvalidContentType,
self.serializer.get_body_serializer,
'application/unknown')
def test_serialize_response(self):
response = self.serializer.serialize({}, 'application/json')
self.assertEqual(response.headers['Content-Type'], 'application/json')
self.assertEqual(response.body, 'pew_json')
self.assertEqual(response.status_int, 404)
def test_serialize_response_None(self):
response = self.serializer.serialize(None, 'application/json')
self.assertEqual(response.headers['Content-Type'], 'application/json')
self.assertEqual(response.body, '')
self.assertEqual(response.status_int, 404)
def test_serialize_response_dict_to_unknown_content_type(self):
self.assertRaises(exception.InvalidContentType,
self.serializer.serialize,
{}, 'application/unknown')
class RequestDeserializerTest(unittest.TestCase):
def setUp(self):
class JSONDeserializer(object):
def deserialize(self, data, action='default'):
return 'pew_json'
class XMLDeserializer(object):
def deserialize(self, data, action='default'):
return 'pew_xml'
self.body_deserializers = {
'application/json': JSONDeserializer(),
'application/XML': XMLDeserializer(),
}
self.deserializer = wsgi.RequestDeserializer(self.body_deserializers)
def test_get_deserializer(self):
expected = self.deserializer.get_body_deserializer('application/json')
self.assertEqual(expected, self.body_deserializers['application/json'])
def test_get_deserializer_unknown_content_type(self):
self.assertRaises(exception.InvalidContentType,
self.deserializer.get_body_deserializer,
'application/unknown')
def test_get_expected_content_type(self):
request = wsgi.Request.blank('/')
request.headers['Accept'] = 'application/json'
self.assertEqual(self.deserializer.get_expected_content_type(request),
'application/json')
def test_get_action_args(self):
env = {
'wsgiorg.routing_args': [None, {
'controller': None,
'format': None,
'action': 'update',
'id': 12,
}],
}
expected = {'action': 'update', 'id': 12}
self.assertEqual(self.deserializer.get_action_args(env), expected)
def test_deserialize(self):
def fake_get_routing_args(request):
return {'action': 'create'}
self.deserializer.get_action_args = fake_get_routing_args
request = wsgi.Request.blank('/')
request.headers['Accept'] = 'application/xml'
deserialized = self.deserializer.deserialize(request)
expected = ('create', {}, 'application/xml')
self.assertEqual(expected, deserialized)
class ResourceTest(unittest.TestCase):
def test_dispatch(self):
class Controller(object):
def index(self, req, pants=None):
return pants
resource = wsgi.Resource(Controller())
actual = resource.dispatch(resource.controller,
'index', None, pants='off')
expected = 'off'
self.assertEqual(actual, expected)
def test_dispatch_unknown_controller_action(self):
class Controller(object):
def index(self, req, pants=None):
return pants
resource = wsgi.Resource(Controller())
self.assertRaises(AttributeError, resource.dispatch,
resource.controller, 'create', None, {})
def test_malformed_request_body_throws_bad_request(self):
resource = wsgi.Resource(None)
request = wsgi.Request.blank("/", body="{mal:formed", method='POST',
headers={'Content-Type': "application/json"})
response = resource(request)
self.assertEqual(response.status, '400 Bad Request')
def test_wrong_content_type_throws_unsupported_media_type_error(self):
resource = wsgi.Resource(None)
request = wsgi.Request.blank("/", body="{some:json}", method='POST',
headers={'Content-Type': "xxx"})
response = resource(request)
self.assertEqual(response.status, '415 Unsupported Media Type')