Refactor Tacker unit tests to remove xml support

In part of test refactoring, removing xml support form tacker/tests/unit
and tacker/api

Change-Id: Ia23564990fe6070c5f9163618688bfcab4dc87de
Partial-Bug: #1579167
This commit is contained in:
dharmendra 2016-09-07 13:22:42 +09:00
parent e7a60fd2d6
commit dc5ee73140
13 changed files with 44 additions and 887 deletions

View File

@ -25,7 +25,6 @@ import six
import webob.dec
import webob.exc
from tacker.api.v1 import attributes
from tacker.common import exceptions
import tacker.extensions
from tacker import policy
@ -486,19 +485,9 @@ class ExtensionManager(object):
attr_map[resource].update(resource_attrs)
else:
attr_map[resource] = resource_attrs
if extended_attrs:
attributes.EXT_NSES[ext.get_alias()] = (
ext.get_namespace())
except AttributeError:
LOG.exception(_("Error fetching extended attributes for "
"extension '%s'"), ext.get_name())
try:
comp_map = ext.get_alias_namespace_compatibility_map()
attributes.EXT_NSES_BC.update(comp_map)
except AttributeError:
LOG.info(_("Extension '%s' provides no backward "
"compatibility map for extended attributes"),
ext.get_name())
processed_exts.add(ext_name)
del exts_to_process[ext_name]
if len(processed_exts) == processed_ext_count:

View File

@ -20,7 +20,6 @@ from oslo_log import log as logging
from oslo_utils import uuidutils
from six import iteritems
from tacker.common import constants
from tacker.common import exceptions as n_exc
@ -612,16 +611,3 @@ RESOURCE_ATTRIBUTE_MAP = {}
RESOURCE_FOREIGN_KEYS = {}
PLURALS = {'extensions': 'extension'}
EXT_NSES = {}
# Namespaces to be added for backward compatibility
# when existing extended resource attributes are
# provided by other extension than original one.
EXT_NSES_BC = {}
def get_attr_metadata():
return {'plurals': PLURALS,
'xmlns': constants.XML_NS_V10,
constants.EXT_NS: EXT_NSES,
constants.EXT_NS_COMP: EXT_NSES_BC}

View File

@ -26,7 +26,6 @@ import six
import webob.dec
import webob.exc
from tacker.api.v1 import attributes
from tacker.common import exceptions
from tacker import wsgi
@ -44,14 +43,9 @@ def Resource(controller, faults=None, deserializers=None, serializers=None):
Represents an API entity resource and the associated serialization and
deserialization logic
"""
xml_deserializer = wsgi.XMLDeserializer(attributes.get_attr_metadata())
default_deserializers = {'application/xml': xml_deserializer,
'application/json': wsgi.JSONDeserializer()}
xml_serializer = wsgi.XMLDictSerializer(attributes.get_attr_metadata())
default_serializers = {'application/xml': xml_serializer,
'application/json': wsgi.JSONDictSerializer()}
format_types = {'xml': 'application/xml',
'json': 'application/json'}
default_deserializers = {'application/json': wsgi.JSONDeserializer()}
default_serializers = {'application/json': wsgi.JSONDictSerializer()}
format_types = {'json': 'application/json'}
action_status = dict(create=201, delete=204)
default_deserializers.update(deserializers or {})

View File

@ -31,9 +31,7 @@ class Index(wsgi.Application):
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
metadata = {'application/xml': {'attributes': {
'resource': ['name', 'collection'],
'link': ['href', 'rel']}}}
metadata = {}
layout = []
for name, collection in iteritems(self.resources):

View File

@ -46,14 +46,7 @@ class Versions(object):
builder = versions_view.get_view_builder(req)
versions = [builder.build(version) for version in version_objs]
response = dict(versions=versions)
metadata = {
"application/xml": {
"attributes": {
"version": ["status", "id"],
"link": ["rel", "href"],
}
}
}
metadata = {}
content_type = req.best_match_content_type()
body = (wsgi.Serializer(metadata=metadata).

View File

@ -16,19 +16,6 @@
# TODO(salv-orlando): Verify if a single set of operational
# status constants is achievable
EXT_NS_COMP = '_backward_comp_e_ns'
EXT_NS = '_extension_ns'
XML_NS_V10 = 'http://openstack.org/tacker/api/v1.0'
XSI_NAMESPACE = "http://www.w3.org/2001/XMLSchema-instance"
XSI_ATTR = "xsi:nil"
XSI_NIL_ATTR = "xmlns:xsi"
ATOM_NAMESPACE = "http://www.w3.org/2005/Atom"
ATOM_XMLNS = "xmlns:atom"
ATOM_LINK_NOTATION = "{%s}link" % ATOM_NAMESPACE
TYPE_XMLNS = "xmlns:tacker"
TYPE_ATTR = "tacker:type"
VIRTUAL_ROOT_KEY = "_v_root"
TYPE_BOOL = "bool"
TYPE_INT = "int"
TYPE_LONG = "long"

View File

@ -261,7 +261,3 @@ class TackerExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
def test_device_delete(self):
self._test_entity_delete(self._DEVICE)
class TackerExtensionTestCaseXML(TackerExtensionTestCase):
fmt = 'xml'

View File

@ -1205,12 +1205,6 @@ class SubresourceTest(base.BaseTestCase):
network_id='id1')
# Note: since all resources use the same controller and validation
# logic, we actually get really good coverage from testing just networks.
class XMLV2TestCase(JSONV2TestCase):
fmt = 'xml'
class V2Views(base.BaseTestCase):
def _view(self, keys, collection, resource):
data = dict((key, 'value') for key in keys)

View File

@ -42,29 +42,27 @@ class RequestTestCase(base.BaseTestCase):
self.assertEqual("application/json", result)
def test_content_type_from_accept(self):
for content_type in ('application/xml',
'application/json'):
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = content_type
result = request.best_match_content_type()
self.assertEqual(content_type, result)
content_type = 'application/json'
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = content_type
result = request.best_match_content_type()
self.assertEqual(result, content_type)
def test_content_type_from_accept_best(self):
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = "application/xml, application/json"
request.headers["Accept"] = "application/json"
result = request.best_match_content_type()
self.assertEqual("application/json", result)
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = ("application/json; q=0.3, "
"application/xml; q=0.9")
request.headers["Accept"] = ("application/json; q=0.3, ")
result = request.best_match_content_type()
self.assertEqual("application/xml", result)
self.assertEqual("application/json", result)
def test_content_type_from_query_extension(self):
request = wsgi.Request.blank('/tests/123.xml')
request = wsgi.Request.blank('/tests/123.json')
result = request.best_match_content_type()
self.assertEqual("application/xml", result)
self.assertEqual("application/json", result)
request = wsgi.Request.blank('/tests/123.json')
result = request.best_match_content_type()
@ -75,10 +73,10 @@ class RequestTestCase(base.BaseTestCase):
self.assertEqual("application/json", result)
def test_content_type_accept_and_query_extension(self):
request = wsgi.Request.blank('/tests/123.xml')
request = wsgi.Request.blank('/tests/123.json')
request.headers["Accept"] = "application/json"
result = request.best_match_content_type()
self.assertEqual("application/xml", result)
self.assertEqual("application/json", result)
def test_content_type_accept_default(self):
request = wsgi.Request.blank('/tests/123.unsupported')
@ -141,28 +139,6 @@ class ResourceTestCase(base.BaseTestCase):
self.assertEqual(expected_res,
wsgi.JSONDeserializer().deserialize(res.body))
def test_unmapped_tacker_error_with_xml(self):
msg = u'\u7f51\u7edc'
class TestException(n_exc.TackerException):
message = msg
expected_res = {'body': {
'TackerError': {
'type': 'TestException',
'message': msg,
'detail': ''}}}
controller = mock.MagicMock()
controller.test.side_effect = TestException()
resource = webtest.TestApp(wsgi_resource.Resource(controller))
environ = {'wsgiorg.routing_args': (None, {'action': 'test',
'format': 'xml'})}
res = resource.get('', extra_environ=environ, expect_errors=True)
self.assertEqual(exc.HTTPInternalServerError.code, res.status_int)
self.assertEqual(expected_res,
wsgi.XMLDeserializer().deserialize(res.body))
@mock.patch('oslo_i18n.translate')
def test_unmapped_tacker_error_localized(self, mock_translation):
oslo_i18n.install('blaa')
@ -209,30 +185,6 @@ class ResourceTestCase(base.BaseTestCase):
self.assertEqual(expected_res,
wsgi.JSONDeserializer().deserialize(res.body))
def test_mapped_tacker_error_with_xml(self):
msg = u'\u7f51\u7edc'
class TestException(n_exc.TackerException):
message = msg
expected_res = {'body': {
'TackerError': {
'type': 'TestException',
'message': msg,
'detail': ''}}}
controller = mock.MagicMock()
controller.test.side_effect = TestException()
faults = {TestException: exc.HTTPGatewayTimeout}
resource = webtest.TestApp(wsgi_resource.Resource(controller,
faults=faults))
environ = {'wsgiorg.routing_args': (None, {'action': 'test',
'format': 'xml'})}
res = resource.get('', extra_environ=environ, expect_errors=True)
self.assertEqual(exc.HTTPGatewayTimeout.code, res.status_int)
self.assertEqual(expected_res,
wsgi.XMLDeserializer().deserialize(res.body))
@mock.patch('oslo_i18n.translate')
def test_mapped_tacker_error_localized(self, mock_translation):
oslo_i18n.install('blaa')
@ -283,22 +235,6 @@ class ResourceTestCase(base.BaseTestCase):
self.assertEqual(expected_res,
wsgi.JSONDeserializer().deserialize(res.body))
def test_unhandled_error_with_xml(self):
expected_res = {'body': {'TackerError':
_('Request Failed: internal server error '
'while processing your request.')}}
controller = mock.MagicMock()
controller.test.side_effect = Exception()
resource = webtest.TestApp(wsgi_resource.Resource(controller))
environ = {'wsgiorg.routing_args': (None, {'action': 'test',
'format': 'xml'})}
res = resource.get('', extra_environ=environ, expect_errors=True)
self.assertEqual(exc.HTTPInternalServerError.code, res.status_int)
self.assertEqual(expected_res,
wsgi.XMLDeserializer().deserialize(res.body))
def test_status_200(self):
controller = mock.MagicMock()
controller.test = lambda request: {'foo': 'bar'}

View File

@ -617,10 +617,6 @@ class ExtensionControllerTest(testlib_api.WebTestCase):
self.assertEqual(404, response.status_int)
class ExtensionControllerTestXML(ExtensionControllerTest):
fmt = 'xml'
def app_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)

View File

@ -23,8 +23,6 @@ import testtools
import webob
import webob.exc
from tacker.api.v1 import attributes
from tacker.common import constants
from tacker.common import exceptions as exception
from tacker.tests import base
from tacker import wsgi
@ -150,33 +148,16 @@ class SerializerTest(base.BaseTestCase):
"""Test serialize with content type json."""
input_data = {'servers': ['test=pass']}
content_type = 'application/json'
serializer = wsgi.Serializer(default_xmlns="fake")
serializer = wsgi.Serializer()
result = serializer.serialize(input_data, content_type)
self.assertEqual('{"servers": ["test=pass"]}', result)
def test_serialize_content_type_xml(self):
"""Test serialize with content type xml."""
input_data = {'servers': ['test=pass']}
content_type = 'application/xml'
serializer = wsgi.Serializer(default_xmlns="fake")
result = serializer.serialize(input_data, content_type)
expected = (
'<?xml version=\'1.0\''
' encoding=\'UTF-8\'?>\n'
'<servers xmlns="http://openstack.org/quantum/api/v2.0" '
'xmlns:quantum="http://openstack.org/quantum/api/v2.0" '
'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">'
'<server>test=pass</server></servers>'
)
self.assertEqual(expected, result)
def test_deserialize_raise_bad_request(self):
"""Test serialize verifies that exception is raises."""
content_type = 'application/unknown'
data_string = 'test'
serializer = wsgi.Serializer(default_xmlns="fake")
serializer = wsgi.Serializer()
self.assertRaises(
webob.exc.HTTPBadRequest,
@ -186,100 +167,11 @@ class SerializerTest(base.BaseTestCase):
"""Test Serializer.deserialize with content type json."""
content_type = 'application/json'
data_string = '{"servers": ["test=pass"]}'
serializer = wsgi.Serializer(default_xmlns="fake")
serializer = wsgi.Serializer()
result = serializer.deserialize(data_string, content_type)
self.assertEqual({'body': {u'servers': [u'test=pass']}}, result)
def test_deserialize_xml_content_type(self):
"""Test deserialize with content type xml."""
content_type = 'application/xml'
data_string = (
'<servers xmlns="fake">'
'<server>test=pass</server>'
'</servers>'
)
serializer = wsgi.Serializer(
default_xmlns="fake", metadata={'xmlns': 'fake'})
result = serializer.deserialize(data_string, content_type)
expected = {'body': {'servers': {'server': 'test=pass'}}}
self.assertEqual(expected, result)
def test_deserialize_xml_content_type_with_meta(self):
"""Test deserialize with content type xml with meta."""
content_type = 'application/xml'
data_string = (
'<servers>'
'<server name="s1">'
'<test test="a">passed</test>'
'</server>'
'</servers>'
)
metadata = {'plurals': {'servers': 'server'}, 'xmlns': 'fake'}
serializer = wsgi.Serializer(
default_xmlns="fake", metadata=metadata)
result = serializer.deserialize(data_string, content_type)
expected = {'body': {'servers': [{'name': 's1', 'test': 'passed'}]}}
self.assertEqual(expected, result)
def test_serialize_xml_root_key_is_dict(self):
"""Test Serializer.serialize with content type xml with meta dict."""
content_type = 'application/xml'
data = {'servers': {'network': (2, 3)}}
metadata = {'xmlns': 'fake'}
serializer = wsgi.Serializer(default_xmlns="fake", metadata=metadata)
result = serializer.serialize(data, content_type)
result = result.replace('\n', '')
expected = (
'<?xml version=\'1.0\' encoding=\'UTF-8\'?>'
'<servers xmlns="fake" xmlns:quantum="fake" '
'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">'
'<network>(2, 3)</network></servers>'
)
self.assertEqual(expected, result)
def test_serialize_xml_root_key_is_list(self):
"""Test serialize with content type xml with meta list."""
input_dict = {'servers': ['test=pass']}
content_type = 'application/xml'
metadata = {'application/xml': {
'xmlns': 'fake'}}
serializer = wsgi.Serializer(default_xmlns="fake", metadata=metadata)
result = serializer.serialize(input_dict, content_type)
result = result.replace('\n', '').replace(' ', '')
expected = (
'<?xmlversion=\'1.0\''
'encoding=\'UTF-8\'?>'
'<serversxmlns="http://openstack.org/quantum/api/v2.0"'
'xmlns:quantum="http://openstack.org/quantum/api/v2.0"'
'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">'
'<server>test=pass</server></servers>'
)
self.assertEqual(expected, result)
def test_serialize_xml_root_is_None(self):
input_dict = {'test': 'pass'}
content_type = 'application/xml'
serializer = wsgi.Serializer(default_xmlns="fake")
result = serializer.serialize(input_dict, content_type)
result = result.replace('\n', '').replace(' ', '')
expected = (
'<?xmlversion=\'1.0\''
'encoding=\'UTF-8\'?>'
'<testxmlns="http://openstack.org/quantum/api/v2.0"'
'xmlns:quantum="http://openstack.org/quantum/api/v2.0"'
'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">'
'pass</test>'
)
self.assertEqual(expected, result)
class RequestDeserializerTest(testtools.TestCase):
def setUp(self):
@ -289,29 +181,17 @@ class RequestDeserializerTest(testtools.TestCase):
def deserialize(self, data, action='default'):
return 'pew_json'
class XMLDeserializer(object):
def deserialize(self, data, action='default'):
return 'pew_xml'
self.body_deserializers = {
'application/json': JSONDeserializer(),
'application/xml': XMLDeserializer()}
self.body_deserializers = {'application/json': JSONDeserializer()}
self.deserializer = wsgi.RequestDeserializer(self.body_deserializers)
def test_get_deserializer(self):
"""Test RequestDeserializer.get_body_deserializer."""
expected_json_serializer = self.deserializer.get_body_deserializer(
'application/json')
expected_xml_serializer = self.deserializer.get_body_deserializer(
'application/xml')
self.assertEqual(
expected_json_serializer,
self.body_deserializers['application/json'])
self.assertEqual(
expected_xml_serializer,
self.body_deserializers['application/xml'])
def test_get_expected_content_type(self):
"""Test RequestDeserializer.get_expected_content_type."""
@ -341,9 +221,9 @@ class RequestDeserializerTest(testtools.TestCase):
self.deserializer, 'get_action_args') as mock_method:
mock_method.return_value = {'action': 'create'}
request = wsgi.Request.blank('/')
request.headers['Accept'] = 'application/xml'
request.headers['Accept'] = 'application/json'
deserialized = self.deserializer.deserialize(request)
expected = ('create', {}, 'application/xml')
expected = ('create', {}, 'application/json')
self.assertEqual(expected, deserialized)
@ -364,17 +244,11 @@ class ResponseSerializerTest(testtools.TestCase):
def serialize(self, data, action='default'):
return 'pew_json'
class XMLSerializer(object):
def serialize(self, data, action='default'):
return 'pew_xml'
class HeadersSerializer(object):
def serialize(self, response, data, action):
response.status_int = 404
self.body_serializers = {
'application/json': JSONSerializer(),
'application/xml': XMLSerializer()}
self.body_serializers = {'application/json': JSONSerializer()}
self.serializer = wsgi.ResponseSerializer(
self.body_serializers, HeadersSerializer())
@ -406,13 +280,6 @@ class ResponseSerializerTest(testtools.TestCase):
self.assertEqual('pew_json', response.body)
self.assertEqual(404, response.status_int)
def test_serialize_xml_response(self):
response = self.serializer.serialize({}, 'application/xml')
self.assertEqual('application/xml', response.headers['Content-Type'])
self.assertEqual('pew_xml', response.body)
self.assertEqual(404, response.status_int)
def test_serialize_response_None(self):
response = self.serializer.serialize(
None, 'application/json')
@ -452,10 +319,10 @@ class RequestTest(base.BaseTestCase):
def test_content_type_from_accept(self):
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = "application/xml"
request.headers["Accept"] = "application/json"
result = request.best_match_content_type()
self.assertEqual("application/xml", result)
self.assertEqual("application/json", result)
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = "application/json"
@ -464,24 +331,12 @@ class RequestTest(base.BaseTestCase):
self.assertEqual("application/json", result)
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = "application/xml, application/json"
request.headers["Accept"] = "application/json"
result = request.best_match_content_type()
self.assertEqual("application/json", result)
request = wsgi.Request.blank('/tests/123')
request.headers["Accept"] = ("application/json; q=0.3, "
"application/xml; q=0.9")
result = request.best_match_content_type()
self.assertEqual("application/xml", result)
def test_content_type_from_query_extension(self):
request = wsgi.Request.blank('/tests/123.xml')
result = request.best_match_content_type()
self.assertEqual("application/xml", result)
request = wsgi.Request.blank('/tests/123.json')
result = request.best_match_content_type()
@ -493,11 +348,11 @@ class RequestTest(base.BaseTestCase):
self.assertEqual("application/json", result)
def test_content_type_accept_and_query_extension(self):
request = wsgi.Request.blank('/tests/123.xml')
request = wsgi.Request.blank('/tests/123.json')
request.headers["Accept"] = "application/json"
result = request.best_match_content_type()
self.assertEqual("application/xml", result)
self.assertEqual("application/json", result)
def test_content_type_accept_default(self):
request = wsgi.Request.blank('/tests/123.unsupported')
@ -658,39 +513,6 @@ class JSONDeserializerTest(base.BaseTestCase):
as_dict, deserializer.deserialize(data))
class XMLDeserializerTest(base.BaseTestCase):
def test_xml_empty(self):
xml = '<a></a>'
as_dict = {'body': {'a': ''}}
deserializer = wsgi.XMLDeserializer()
self.assertEqual(
as_dict, deserializer.deserialize(xml))
def test_initialization(self):
xml = '<a><b>test</b></a>'
deserializer = wsgi.XMLDeserializer()
self.assertEqual(
{'body': {u'a': {u'b': u'test'}}}, deserializer(xml))
def test_default_raise_Malformed_Exception(self):
"""Verify that exception MalformedRequestBody is raised."""
data_string = ""
deserializer = wsgi.XMLDeserializer()
self.assertRaises(
exception.MalformedRequestBody, deserializer.default, data_string)
def test_xml_with_utf8(self):
xml = '<a>\xe7\xbd\x91\xe7\xbb\x9c</a>'
as_dict = {'body': {'a': u'\u7f51\u7edc'}}
deserializer = wsgi.XMLDeserializer()
self.assertEqual(
as_dict, deserializer.deserialize(xml))
class RequestHeadersDeserializerTest(base.BaseTestCase):
def test_default(self):
@ -807,7 +629,7 @@ class ResourceTest(base.BaseTestCase):
return 'off'
resource = wsgi.Resource(Controller(), my_fault_body_function)
request = wsgi.Request.blank(
"/", method='POST', headers={'Content-Type': "xml"})
"/", method='POST', headers={'Content-Type': "json"})
response = resource.dispatch(
request, action='index', action_args='test')
@ -825,7 +647,7 @@ class ResourceTest(base.BaseTestCase):
def __init__(self):
self.url = 'http://where.no'
self.environ = 'environ'
self.body = '{"Content-Type": "xml"}'
self.body = '{"Content-Type": "json"}'
def method(self):
pass
@ -863,212 +685,6 @@ class FaultTest(base.BaseTestCase):
self.assertEqual(415, response.status_int)
class XMLDictSerializerTest(base.BaseTestCase):
def test_xml(self):
NETWORK = {'network': {'test': None,
'tenant_id': 'test-tenant',
'name': 'net1',
'admin_state_up': True,
'subnets': [],
'dict': {},
'int': 3,
'long': 4L,
'float': 5.0,
'prefix:external': True,
'tests': [{'test1': 'value1'},
{'test2': 2, 'test3': 3}]}}
# XML is:
# <network xmlns="http://openstack.org/quantum/api/v2.0"
# xmlns:prefix="http://xxxx.yy.com"
# xmlns:quantum="http://openstack.org/quantum/api/v2.0"
# xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
# <subnets quantum:type="list" /> # Empty List
# <int quantum:type="int">3</int> # Integer text
# <int quantum:type="long">4</int> # Long text
# <int quantum:type="float">5.0</int> # Float text
# <dict quantum:type="dict" /> # Empty Dict
# <name>net1</name>
# <admin_state_up quantum:type="bool">True</admin_state_up> # Bool
# <test xsi:nil="true" /> # None
# <tenant_id>test-tenant</tenant_id>
# # We must have a namespace defined in root for prefix:external
# <prefix:external quantum:type="bool">True</prefix:external>
# <tests> # List
# <test><test1>value1</test1></test>
# <test><test3 quantum:type="int">3</test3>
# <test2 quantum:type="int">2</test2>
# </test></tests>
# </network>
metadata = attributes.get_attr_metadata()
ns = {'prefix': 'http://xxxx.yy.com'}
metadata[constants.EXT_NS] = ns
metadata['plurals'] = {'tests': 'test'}
serializer = wsgi.XMLDictSerializer(metadata)
result = serializer.serialize(NETWORK)
deserializer = wsgi.XMLDeserializer(metadata)
new_net = deserializer.deserialize(result)['body']
self.assertEqual(NETWORK, new_net)
def test_None(self):
data = None
# Since it is None, we use xsi:nil='true'.
# In addition, we use an
# virtual XML root _v_root to wrap the XML doc.
# XML is:
# <_v_root xsi:nil="true"
# xmlns="http://openstack.org/quantum/api/v2.0"
# xmlns:quantum="http://openstack.org/quantum/api/v2.0"
# xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" />
serializer = wsgi.XMLDictSerializer(attributes.get_attr_metadata())
result = serializer.serialize(data)
deserializer = wsgi.XMLDeserializer(attributes.get_attr_metadata())
new_data = deserializer.deserialize(result)['body']
self.assertIsNone(new_data)
def test_empty_dic_xml(self):
data = {}
# Since it is an empty dict, we use quantum:type='dict' and
# an empty XML element to represent it. In addition, we use an
# virtual XML root _v_root to wrap the XML doc.
# XML is:
# <_v_root quantum:type="dict"
# xmlns="http://openstack.org/quantum/api/v2.0"
# xmlns:quantum="http://openstack.org/quantum/api/v2.0"
# xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" />
serializer = wsgi.XMLDictSerializer(attributes.get_attr_metadata())
result = serializer.serialize(data)
deserializer = wsgi.XMLDeserializer(attributes.get_attr_metadata())
new_data = deserializer.deserialize(result)['body']
self.assertEqual(data, new_data)
def test_non_root_one_item_dic_xml(self):
data = {'test1': 1}
# We have a key in this dict, and its value is an integer.
# XML is:
# <test1 quantum:type="int"
# xmlns="http://openstack.org/quantum/api/v2.0"
# xmlns:quantum="http://openstack.org/quantum/api/v2.0"
# xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
# 1</test1>
serializer = wsgi.XMLDictSerializer(attributes.get_attr_metadata())
result = serializer.serialize(data)
deserializer = wsgi.XMLDeserializer(attributes.get_attr_metadata())
new_data = deserializer.deserialize(result)['body']
self.assertEqual(data, new_data)
def test_non_root_two_items_dic_xml(self):
data = {'test1': 1, 'test2': '2'}
# We have no root element in this data, We will use a virtual
# root element _v_root to wrap the doct.
# The XML is:
# <_v_root xmlns="http://openstack.org/quantum/api/v2.0"
# xmlns:quantum="http://openstack.org/quantum/api/v2.0"
# xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
# <test1 quantum:type="int">1</test1><test2>2</test2>
# </_v_root>
serializer = wsgi.XMLDictSerializer(attributes.get_attr_metadata())
result = serializer.serialize(data)
deserializer = wsgi.XMLDeserializer(attributes.get_attr_metadata())
new_data = deserializer.deserialize(result)['body']
self.assertEqual(data, new_data)
def test_xml_root_key_is_list(self):
input_dict = {'servers': ['test-pass']}
serializer = wsgi.XMLDictSerializer(xmlns="fake")
result = serializer.default(input_dict)
result = result.replace('\n', '').replace(' ', '')
expected = (
'<?xmlversion=\'1.0\'encoding=\'UTF-8\'?>'
'<serversxmlns="fake"xmlns:quantum="fake"'
'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">'
'<server>test-pass</server></servers>'
)
self.assertEqual(expected, result)
def test_xml_meta_contains_node_name_list(self):
input_dict = {'servers': ['test-pass']}
servers = {'nodename': 'test',
'item_name': 'test',
'item_key': 'test'}
metadata = {'list_collections': {'servers': servers}}
serializer = wsgi.XMLDictSerializer(xmlns="fake", metadata=metadata)
result = serializer.default(input_dict)
result = result.replace('\n', '').replace(' ', '')
expected = (
'<?xmlversion=\'1.0\'encoding=\'UTF-8\'?>'
'<serversxmlns="fake"xmlns:quantum="fake"'
'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">'
'<server>test-pass</server></servers>'
)
self.assertEqual(expected, result)
def test_xml_meta_contains_node_name_dict(self):
input_dict = {'servers': {'a': {'2': '3'}}}
servers = {'servers': {
'nodename': 'test',
'item_name': 'test',
'item_key': 'test'}}
metadata = {'dict_collections': servers}
serializer = wsgi.XMLDictSerializer(xmlns="fake", metadata=metadata)
result = serializer.default(input_dict)
result = result.replace('\n', '').replace(' ', '')
expected = (
'<?xmlversion=\'1.0\'encoding=\'UTF-8\'?>'
'<serversxmlns="fake"xmlns:quantum="fake"'
'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">'
'<a><2>3</2></a></servers>'
)
self.assertEqual(expected, result)
def test_call(self):
data = {'servers': {'a': {'2': '3'}}}
serializer = wsgi.XMLDictSerializer()
expected = (
'<?xmlversion=\'1.0\'encoding=\'UTF-8\'?>'
'<serversxmlns="http://openstack.org/quantum/api/v2.0"'
'xmlns:quantum="http://openstack.org/quantum/api/v2.0"'
'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">'
'<a><2>3</2></a></servers>'
)
result = serializer(data)
result = result.replace('\n', '').replace(' ', '')
self.assertEqual(expected, result)
def test_xml_with_utf8(self):
data = {'servers': '\xe7\xbd\x91\xe7\xbb\x9c'}
serializer = wsgi.XMLDictSerializer()
expected = (
'<?xmlversion=\'1.0\'encoding=\'UTF-8\'?>'
'<serversxmlns="http://openstack.org/quantum/api/v2.0"'
'xmlns:quantum="http://openstack.org/quantum/api/v2.0"'
'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">'
'\xe7\xbd\x91\xe7\xbb\x9c</servers>'
)
result = serializer(data)
result = result.replace('\n', '').replace(' ', '')
self.assertEqual(expected, result)
def test_xml_with_unicode(self):
data = {'servers': u'\u7f51\u7edc'}
serializer = wsgi.XMLDictSerializer()
expected = (
'<?xmlversion=\'1.0\'encoding=\'UTF-8\'?>'
'<serversxmlns="http://openstack.org/quantum/api/v2.0"'
'xmlns:quantum="http://openstack.org/quantum/api/v2.0"'
'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">'
'\xe7\xbd\x91\xe7\xbb\x9c</servers>'
)
result = serializer(data)
result = result.replace('\n', '').replace(' ', '')
self.assertEqual(expected, result)
class TestWSGIServerWithSSL(base.BaseTestCase):
"""WSGI server tests."""

View File

@ -15,7 +15,6 @@
import testtools
from tacker.api.v1 import attributes
from tacker.tests import base
from tacker import wsgi
@ -55,11 +54,8 @@ class WebTestCase(base.BaseTestCase):
def setUp(self):
super(WebTestCase, self).setUp()
json_deserializer = wsgi.JSONDeserializer()
xml_deserializer = wsgi.XMLDeserializer(
attributes.get_attr_metadata())
self._deserializers = {
'application/json': json_deserializer,
'application/xml': xml_deserializer,
}
def deserialize(self, response):
@ -69,8 +65,7 @@ class WebTestCase(base.BaseTestCase):
def serialize(self, data):
ctype = 'application/%s' % self.fmt
result = wsgi.Serializer(
attributes.get_attr_metadata()).serialize(data, ctype)
result = wsgi.Serializer().serialize(data, ctype)
return result

View File

@ -24,8 +24,6 @@ import socket
import ssl
import sys
import time
from xml.etree import ElementTree as etree
from xml.parsers import expat
import eventlet.wsgi
# eventlet.patcher.monkey_patch(all=False, socket=True, thread=True)
@ -41,7 +39,6 @@ import six
import webob.dec
import webob.exc
from tacker.common import constants
from tacker.common import exceptions as exception
from tacker import context
from tacker.db import api
@ -325,7 +322,7 @@ class Request(webob.Request):
"""Determine the most acceptable content-type.
Based on:
1) URI extension (.json/.xml)
1) URI extension (.json)
2) Content-type header
3) Accept* headers
"""
@ -333,21 +330,21 @@ class Request(webob.Request):
parts = self.path.rsplit('.', 1)
if len(parts) > 1:
_format = parts[1]
if _format in ['json', 'xml']:
if _format in ['json']:
return 'application/{0}'.format(_format)
# Then look up content header
type_from_header = self.get_content_type()
if type_from_header:
return type_from_header
ctypes = ['application/json', 'application/xml']
ctypes = ['application/json']
# Finally search in Accept-* headers
bm = self.accept.best_match(ctypes)
return bm or 'application/json'
def get_content_type(self):
allowed_types = ("application/xml", "application/json")
allowed_types = ("application/json")
if "Content-Type" not in self.headers:
LOG.debug(_("Missing Content-Type"))
return None
@ -406,154 +403,6 @@ class JSONDictSerializer(DictSerializer):
return jsonutils.dumps(data, default=sanitizer)
class XMLDictSerializer(DictSerializer):
def __init__(self, metadata=None, xmlns=None):
"""Object initialization.
:param metadata: information needed to deserialize xml into
a dictionary.
:param xmlns: XML namespace to include with serialized xml
"""
super(XMLDictSerializer, self).__init__()
self.metadata = metadata or {}
if not xmlns:
xmlns = self.metadata.get('xmlns')
if not xmlns:
xmlns = constants.XML_NS_V10
self.xmlns = xmlns
def default(self, data):
"""Return data as XML string.
:param data: expect data to contain a single key as XML root, or
contain another '*_links' key as atom links. Other
case will use 'VIRTUAL_ROOT_KEY' as XML root.
"""
try:
links = None
has_atom = False
if data is None:
root_key = constants.VIRTUAL_ROOT_KEY
root_value = None
else:
link_keys = [k for k in data or []
if k.endswith('_links')]
if link_keys:
links = data.pop(link_keys[0], None)
has_atom = True
root_key = (len(data) == 1 and
list(data.keys())[0] or constants.VIRTUAL_ROOT_KEY)
root_value = data.get(root_key, data)
doc = etree.Element("_temp_root")
used_prefixes = []
self._to_xml_node(doc, self.metadata, root_key,
root_value, used_prefixes)
if links:
self._create_link_nodes(list(doc)[0], links)
return self.to_xml_string(list(doc)[0], used_prefixes, has_atom)
except AttributeError as e:
LOG.exception(str(e))
return ''
def __call__(self, data):
# Provides a migration path to a cleaner WSGI layer, this
# "default" stuff and extreme extensibility isn't being used
# like originally intended
return self.default(data)
def to_xml_string(self, node, used_prefixes, has_atom=False):
self._add_xmlns(node, used_prefixes, has_atom)
return etree.tostring(node, encoding='UTF-8')
# NOTE (ameade): the has_atom should be removed after all of the
# xml serializers and view builders have been updated to the current
# spec that required all responses include the xmlns:atom, the has_atom
# flag is to prevent current tests from breaking
def _add_xmlns(self, node, used_prefixes, has_atom=False):
node.set('xmlns', self.xmlns)
node.set(constants.TYPE_XMLNS, self.xmlns)
if has_atom:
node.set(constants.ATOM_XMLNS, constants.ATOM_NAMESPACE)
node.set(constants.XSI_NIL_ATTR, constants.XSI_NAMESPACE)
ext_ns = self.metadata.get(constants.EXT_NS, {})
ext_ns_bc = self.metadata.get(constants.EXT_NS_COMP, {})
for prefix in used_prefixes:
if prefix in ext_ns:
node.set('xmlns:' + prefix, ext_ns[prefix])
if prefix in ext_ns_bc:
node.set('xmlns:' + prefix, ext_ns_bc[prefix])
def _to_xml_node(self, parent, metadata, nodename, data, used_prefixes):
"""Recursive method to convert data members to XML nodes."""
result = etree.SubElement(parent, nodename)
if ":" in nodename:
used_prefixes.append(nodename.split(":", 1)[0])
# TODO(bcwaldon): accomplish this without a type-check
if isinstance(data, list):
if not data:
result.set(
constants.TYPE_ATTR,
constants.TYPE_LIST)
return result
singular = metadata.get('plurals', {}).get(nodename, None)
if singular is None:
if nodename.endswith('s'):
singular = nodename[:-1]
else:
singular = 'item'
for item in data:
self._to_xml_node(result, metadata, singular, item,
used_prefixes)
# TODO(bcwaldon): accomplish this without a type-check
elif isinstance(data, dict):
if not data:
result.set(
constants.TYPE_ATTR,
constants.TYPE_DICT)
return result
attrs = metadata.get('attributes', {}).get(nodename, {})
for k, v in data.items():
if k in attrs:
result.set(k, str(v))
else:
self._to_xml_node(result, metadata, k, v,
used_prefixes)
elif data is None:
result.set(constants.XSI_ATTR, 'true')
else:
if isinstance(data, bool):
result.set(
constants.TYPE_ATTR,
constants.TYPE_BOOL)
elif isinstance(data, int):
result.set(
constants.TYPE_ATTR,
constants.TYPE_INT)
elif isinstance(data, long):
result.set(
constants.TYPE_ATTR,
constants.TYPE_LONG)
elif isinstance(data, float):
result.set(
constants.TYPE_ATTR,
constants.TYPE_FLOAT)
LOG.debug(_("Data %(data)s type is %(type)s"),
{'data': data,
'type': type(data)})
if isinstance(data, str):
result.text = six.text_type(data, encoding='utf-8')
else:
result.text = six.text_type(data)
return result
def _create_link_nodes(self, xml_doc, links):
for link in links:
link_node = etree.SubElement(xml_doc, 'atom:link')
link_node.set('rel', link['rel'])
link_node.set('href', link['href'])
class ResponseHeaderSerializer(ActionDispatcher):
"""Default response headers serialization."""
@ -569,7 +418,6 @@ class ResponseSerializer(object):
def __init__(self, body_serializers=None, headers_serializer=None):
self.body_serializers = {
'application/xml': XMLDictSerializer(),
'application/json': JSONDictSerializer(),
}
self.body_serializers.update(body_serializers or {})
@ -628,156 +476,6 @@ class JSONDeserializer(TextDeserializer):
return {'body': self._from_json(datastring)}
class ProtectedXMLParser(etree.XMLParser):
def __init__(self, *args, **kwargs):
etree.XMLParser.__init__(self, *args, **kwargs)
self._parser.StartDoctypeDeclHandler = self.start_doctype_decl
def start_doctype_decl(self, name, sysid, pubid, internal):
raise ValueError(_("Inline DTD forbidden"))
def doctype(self, name, pubid, system):
raise ValueError(_("Inline DTD forbidden"))
class XMLDeserializer(TextDeserializer):
def __init__(self, metadata=None):
"""Object initialization.
:param metadata: information needed to deserialize xml into
a dictionary.
"""
super(XMLDeserializer, self).__init__()
self.metadata = metadata or {}
xmlns = self.metadata.get('xmlns')
if not xmlns:
xmlns = constants.XML_NS_V10
self.xmlns = xmlns
def _get_key(self, tag):
tags = tag.split("}", 1)
if len(tags) == 2:
ns = tags[0][1:]
bare_tag = tags[1]
ext_ns = self.metadata.get(constants.EXT_NS, {})
if ns == self.xmlns:
return bare_tag
for prefix, _ns in ext_ns.items():
if ns == _ns:
return prefix + ":" + bare_tag
ext_ns_bc = self.metadata.get(constants.EXT_NS_COMP, {})
for prefix, _ns in ext_ns_bc.items():
if ns == _ns:
return prefix + ":" + bare_tag
else:
return tag
def _get_links(self, root_tag, node):
link_nodes = node.findall(constants.ATOM_LINK_NOTATION)
root_tag = self._get_key(node.tag)
link_key = "%s_links" % root_tag
link_list = []
for link in link_nodes:
link_list.append({'rel': link.get('rel'),
'href': link.get('href')})
# Remove link node in order to avoid link node process as
# an item in _from_xml_node
node.remove(link)
return link_list and {link_key: link_list} or {}
def _parseXML(self, text):
parser = ProtectedXMLParser()
parser.feed(text)
return parser.close()
def _from_xml(self, datastring):
if datastring is None:
return None
plurals = set(self.metadata.get('plurals', {}))
try:
node = self._parseXML(datastring)
root_tag = self._get_key(node.tag)
# Deserialize link node was needed by unit test for verifying
# the request's response
links = self._get_links(root_tag, node)
result = self._from_xml_node(node, plurals)
# root_tag = constants.VIRTUAL_ROOT_KEY and links is not None
# is not possible because of the way data are serialized.
if root_tag == constants.VIRTUAL_ROOT_KEY:
return result
return dict({root_tag: result}, **links)
except Exception as e:
with excutils.save_and_reraise_exception():
parseError = False
# Python2.7
if (hasattr(etree, 'ParseError') and
isinstance(e, getattr(etree, 'ParseError'))):
parseError = True
# Python2.6
elif isinstance(e, expat.ExpatError):
parseError = True
if parseError:
msg = _("Cannot understand XML")
raise exception.MalformedRequestBody(reason=msg)
def _from_xml_node(self, node, listnames):
"""Convert a minidom node to a simple Python type.
:param listnames: list of XML node names whose subnodes should
be considered list items.
"""
attrNil = node.get(str(etree.QName(constants.XSI_NAMESPACE, "nil")))
attrType = node.get(str(etree.QName(
self.metadata.get('xmlns'), "type")))
if (attrNil and attrNil.lower() == 'true'):
return None
elif not len(node) and not node.text:
if (attrType and attrType == constants.TYPE_DICT):
return {}
elif (attrType and attrType == constants.TYPE_LIST):
return []
else:
return ''
elif (len(node) == 0 and node.text):
converters = {constants.TYPE_BOOL:
lambda x: x.lower() == 'true',
constants.TYPE_INT:
lambda x: int(x),
constants.TYPE_LONG:
lambda x: long(x),
constants.TYPE_FLOAT:
lambda x: float(x)}
if attrType and attrType in converters:
return converters[attrType](node.text)
else:
return node.text
elif self._get_key(node.tag) in listnames:
return [self._from_xml_node(n, listnames) for n in node]
else:
result = dict()
for attr in node.keys():
if (attr == 'xmlns' or
attr.startswith('xmlns:') or
attr == constants.XSI_ATTR or
attr == constants.TYPE_ATTR):
continue
result[self._get_key(attr)] = node.get(attr)
children = list(node)
for child in children:
result[self._get_key(child.tag)] = self._from_xml_node(
child, listnames)
return result
def default(self, datastring):
return {'body': self._from_xml(datastring)}
def __call__(self, datastring):
# Adding a migration path to allow us to remove unncessary classes
return self.default(datastring)
class RequestHeadersDeserializer(ActionDispatcher):
"""Default request headers deserializer."""
@ -793,7 +491,6 @@ class RequestDeserializer(object):
def __init__(self, body_deserializers=None, headers_deserializer=None):
self.body_deserializers = {
'application/xml': XMLDeserializer(),
'application/json': JSONDeserializer(),
}
self.body_deserializers.update(body_deserializers or {})
@ -1069,10 +766,6 @@ class Resource(Application):
self.deserializer = deserializer or RequestDese