XML de/serialization (bug 928058)

Middleware rewrites incoming XML requests as JSON, and outgoing JSON as
XML, per Accept and Content-Type headers.

Tests assert that core API methods support WADL/XSD specs, and cover
JSON content as well.

Change-Id: I6897971dd745766cbc472fd6e5346b1b34d933b0
This commit is contained in:
Dolph Mathews 2012-02-10 14:52:13 -06:00
parent e23ecc6893
commit 212489084f
10 changed files with 1105 additions and 57 deletions

View File

@ -50,6 +50,9 @@ paste.filter_factory = keystone.middleware:TokenAuthMiddleware.factory
[filter:admin_token_auth]
paste.filter_factory = keystone.middleware:AdminTokenAuthMiddleware.factory
[filter:xml_body]
paste.filter_factory = keystone.middleware:XmlBodyMiddleware.factory
[filter:json_body]
paste.filter_factory = keystone.middleware:JsonBodyMiddleware.factory
@ -66,10 +69,10 @@ paste.app_factory = keystone.service:public_app_factory
paste.app_factory = keystone.service:admin_app_factory
[pipeline:public_api]
pipeline = token_auth admin_token_auth json_body debug ec2_extension public_service
pipeline = token_auth admin_token_auth xml_body json_body debug ec2_extension public_service
[pipeline:admin_api]
pipeline = token_auth admin_token_auth json_body debug ec2_extension crud_extension admin_service
pipeline = token_auth admin_token_auth xml_body json_body debug ec2_extension crud_extension admin_service
[app:public_version_service]
paste.app_factory = keystone.service:public_version_app_factory
@ -78,10 +81,10 @@ paste.app_factory = keystone.service:public_version_app_factory
paste.app_factory = keystone.service:admin_version_app_factory
[pipeline:public_version_api]
pipeline = public_version_service
pipeline = xml_body public_version_service
[pipeline:admin_version_api]
pipeline = admin_version_service
pipeline = xml_body admin_version_service
[composite:main]
use = egg:Paste#urlmap

View File

@ -0,0 +1,197 @@
"""
Dict <--> XML de/serializer.
The identity API prefers attributes over elements, so we serialize that way
by convention, with a few hardcoded exceptions.
"""
from lxml import etree
import re
DOCTYPE = '<?xml version="1.0" encoding="UTF-8"?>'
XMLNS = 'http://docs.openstack.org/identity/api/v2.0'
def from_xml(xml):
"""Deserialize XML to a dictionary."""
if xml is None:
return None
deserializer = XmlDeserializer()
return deserializer(xml)
def to_xml(d, xmlns=None):
"""Serialize a dictionary to XML."""
if d is None:
return None
serialize = XmlSerializer()
return serialize(d, xmlns)
class XmlDeserializer(object):
def __call__(self, xml_str):
"""Returns a dictionary populated by decoding the given xml string."""
dom = etree.fromstring(xml_str.strip())
return self.walk_element(dom)
@staticmethod
def _tag_name(tag):
"""Remove the namespace from the tagname.
TODO(dolph): We might care about the namespace at some point.
>>> XmlDeserializer._tag_name('{xmlNamespace}tagName')
'tagName'
"""
m = re.search('[^}]+$', tag)
return m.string[m.start():]
def walk_element(self, element):
"""Populates a dictionary by walking an etree element."""
values = {}
for k, v in element.attrib.iteritems():
# boolean-looking attributes become booleans in JSON
if k in ['enabled']:
if v in ['true']:
v = True
elif v in ['false']:
v = False
values[k] = v
text = None
if element.text is not None:
text = element.text.strip()
# current spec does not have attributes on an element with text
values = values or text or {}
for child in [self.walk_element(x) for x in element]:
values = dict(values.items() + child.items())
return {XmlDeserializer._tag_name(element.tag): values}
class XmlSerializer(object):
def __call__(self, d, xmlns=None):
"""Returns an xml etree populated by the given dictionary.
Optionally, namespace the etree by specifying an ``xmlns``.
"""
# FIXME(dolph): skipping links for now
for key in d.keys():
if '_links' in key:
d.pop(key)
assert len(d.keys()) == 1, ('Cannot encode more than one root '
'element: %s' % d.keys())
# name the root dom element
name = d.keys()[0]
# only the root dom element gets an xlmns
root = etree.Element(name, xmlns=(xmlns or XMLNS))
self.populate_element(root, d[name])
# TODO(dolph): you can get a doctype from lxml, using ElementTrees
return '%s\n%s' % (DOCTYPE, etree.tostring(root, pretty_print=True))
def _populate_list(self, element, k, v):
"""Populates an element with a key & list value."""
# spec has a lot of inconsistency here!
container = element
if k == 'media-types':
# xsd compliance: <media-types> contains <media-type>s
# find an existing <media-types> element or make one
container = element.find('media-types')
if container is None:
container = etree.Element(k)
element.append(container)
name = k[:-1]
elif k == 'serviceCatalog':
# xsd compliance: <serviceCatalog> contains <service>s
container = etree.Element(k)
element.append(container)
name = 'service'
elif k == 'values' and element.tag[-1] == 's':
# OS convention is to contain lists in a 'values' element,
# so the list itself can have attributes, which is
# unnecessary in XML
name = element.tag[:-1]
elif k[-1] == 's':
name = k[:-1]
else:
name = k
for item in v:
child = etree.Element(name)
self.populate_element(child, item)
container.append(child)
def _populate_dict(self, element, k, v):
"""Populates an element with a key & dictionary value."""
child = etree.Element(k)
self.populate_element(child, v)
element.append(child)
def _populate_bool(self, element, k, v):
"""Populates an element with a key & boolean value."""
# booleans are 'true' and 'false'
element.set(k, unicode(v).lower())
def _populate_str(self, element, k, v):
"""Populates an element with a key & string value."""
if k in ['description']:
# always becomes an element
child = etree.Element(k)
child.text = unicode(v)
element.append(child)
else:
# add attributes to the current element
element.set(k, unicode(v))
def _populate_number(self, element, k, v):
"""Populates an element with a key & numeric value."""
# numbers can be handled as strings
self._populate_str(element, k, v)
def populate_element(self, element, value):
"""Populates an etree with the given value."""
if isinstance(value, list):
self._populate_sequence(element, value)
elif isinstance(value, dict):
self._populate_tree(element, value)
def _populate_sequence(self, element, l):
"""Populates an etree with a sequence of elements, given a list."""
# xsd compliance: child elements are singular: <users> has <user>s
name = element.tag
if element.tag[-1] == 's':
name = element.tag[:-1]
for item in l:
child = etree.Element(name)
self.populate_element(child, item)
element.append(child)
def _populate_tree(self, element, d):
"""Populates an etree with attributes & elements, given a dict."""
for k, v in d.iteritems():
if isinstance(v, dict):
self._populate_dict(element, k, v)
elif isinstance(v, list):
self._populate_list(element, k, v)
elif isinstance(v, bool):
self._populate_bool(element, k, v)
elif isinstance(v, basestring):
self._populate_str(element, k, v)
elif type(v) in [int, float, long, complex]:
self._populate_number(element, k, v)

View File

@ -19,6 +19,8 @@ import json
import webob.exc
from keystone import config
from keystone import exception
from keystone.common import serializer
from keystone.common import wsgi
@ -109,7 +111,7 @@ class JsonBodyMiddleware(wsgi.Middleware):
try:
params_parsed = json.loads(params_json)
except ValueError:
msg = "Malformed json in request body"
msg = 'Malformed json in request body'
raise webob.exc.HTTPBadRequest(explanation=msg)
finally:
if not params_parsed:
@ -124,3 +126,30 @@ class JsonBodyMiddleware(wsgi.Middleware):
params[k] = v
request.environ[PARAMS_ENV] = params
class XmlBodyMiddleware(wsgi.Middleware):
"""De/serializes XML to/from JSON."""
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, request):
self.process_request(request)
response = request.get_response(self.application)
self.process_response(request, response)
return response
def process_request(self, request):
"""Transform the request from XML to JSON."""
incoming_xml = 'application/xml' in str(request.content_type)
if incoming_xml and request.body:
request.content_type = 'application/json'
request.body = json.dumps(serializer.from_xml(request.body))
def process_response(self, request, response):
"""Transform the response from JSON to XML."""
outgoing_xml = 'application/xml' in str(request.accept)
if outgoing_xml and response.body:
response.content_type = 'application/xml'
try:
response.body = serializer.to_xml(json.loads(response.body))
except:
raise exception.Error(message=response.body)

View File

@ -163,6 +163,10 @@ class VersionController(wsgi.Application):
"base": "application/json",
"type": "application/vnd.openstack.identity-v2.0"
"+json"
}, {
"base": "application/xml",
"type": "application/vnd.openstack.identity-v2.0"
"+xml"
}]
}]
}

578
tests/test_content_types.py Normal file
View File

@ -0,0 +1,578 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import httplib
import json
from lxml import etree
import nose.exc
from keystone import test
from keystone.common import serializer
import default_fixtures
class RestfulTestCase(test.TestCase):
"""Performs restful tests against the WSGI app over HTTP.
This class launches public & admin WSGI servers for every test, which can
be accessed by calling ``public_request()`` or ``admin_request()``,
respectfully.
``restful_request()`` and ``request()`` methods are also exposed if you
need to bypass restful conventions or access HTTP details in your test
implementation.
Two new asserts are provided:
* ``assertResponseSuccessful``: called automatically for every request
unless an ``expected_status`` is provided
* ``assertResponseStatus``: called instead of ``assertResponseSuccessful``,
if an ``expected_status`` is provided
Requests are automatically serialized according to the defined
``content_type``. Responses are automatically deserialized as well, and
available in the ``response.body`` attribute. The original body content is
available in the ``response.raw`` attribute.
"""
# default content type to test
content_type = 'json'
def setUp(self):
super(RestfulTestCase, self).setUp()
self.load_backends()
self.load_fixtures(default_fixtures)
self.public_server = self.serveapp('keystone', name='main')
self.admin_server = self.serveapp('keystone', name='admin')
# TODO(termie): is_admin is being deprecated once the policy stuff
# is all working
# TODO(termie): add an admin user to the fixtures and use that user
# override the fixtures, for now
self.metadata_foobar = self.identity_api.update_metadata(
self.user_foo['id'],
self.tenant_bar['id'],
dict(roles=['keystone_admin'], is_admin='1'))
def tearDown(self):
"""Kill running servers and release references to avoid leaks."""
self.public_server.kill()
self.admin_server.kill()
self.public_server = None
self.admin_server = None
super(RestfulTestCase, self).tearDown()
def request(self, host='0.0.0.0', port=80, method='GET', path='/',
headers=None, body=None, expected_status=None):
"""Perform request and fetch httplib.HTTPResponse from the server."""
# Initialize headers dictionary
headers = {} if not headers else headers
connection = httplib.HTTPConnection(host, port, timeout=10)
# Perform the request
connection.request(method, path, body, headers)
# Retrieve the response so we can close the connection
response = connection.getresponse()
response.body = response.read()
# Close the connection
connection.close()
# Automatically assert HTTP status code
if expected_status:
self.assertResponseStatus(response, expected_status)
else:
self.assertResponseSuccessful(response)
# Contains the response headers, body, etc
return response
def assertResponseSuccessful(self, response):
"""Asserts that a status code lies inside the 2xx range.
:param response: :py:class:`httplib.HTTPResponse` to be
verified to have a status code between 200 and 299.
example::
>>> self.assertResponseSuccessful(response, 203)
"""
self.assertTrue(response.status >= 200 and response.status <= 299,
'Status code %d is outside of the expected range (2xx)\n\n%s' %
(response.status, response.body))
def assertResponseStatus(self, response, expected_status):
"""Asserts a specific status code on the response.
:param response: :py:class:`httplib.HTTPResponse`
:param assert_status: The specific ``status`` result expected
example::
>>> self.assertResponseStatus(response, 203)
"""
self.assertEqual(response.status, expected_status,
'Status code %s is not %s, as expected)\n\n%s' %
(response.status, expected_status, response.body))
def _to_content_type(self, body, headers, content_type=None):
"""Attempt to encode JSON and XML automatically."""
content_type = content_type or self.content_type
if content_type == 'json':
headers['Accept'] = 'application/json'
if body:
headers['Content-Type'] = 'application/json'
return json.dumps(body)
elif content_type == 'xml':
headers['Accept'] = 'application/xml'
if body:
headers['Content-Type'] = 'application/xml'
return serializer.to_xml(body)
def _from_content_type(self, response, content_type=None):
"""Attempt to decode JSON and XML automatically, if detected."""
content_type = content_type or self.content_type
# make the original response body available, for convenience
response.raw = response.body
if response.body is not None and response.body.strip():
# if a body is provided, a Content-Type is also expected
header = response.getheader('Content-Type', None)
self.assertIn(self.content_type, header)
if self.content_type == 'json':
response.body = json.loads(response.body)
elif self.content_type == 'xml':
response.body = etree.fromstring(response.body)
def restful_request(self, headers=None, body=None, token=None, **kwargs):
"""Serializes/deserializes json/xml as request/response body.
.. WARNING::
* Existing Accept header will be overwritten.
* Existing Content-Type header will be overwritten.
"""
# Initialize headers dictionary
headers = {} if not headers else headers
if token is not None:
headers['X-Auth-Token'] = token
body = self._to_content_type(body, headers)
# Perform the HTTP request/response
response = self.request(headers=headers, body=body, **kwargs)
self._from_content_type(response)
# we can save some code & improve coverage by always doing this
if response.status >= 400:
self.assertValidErrorResponse(response)
# Contains the decoded response.body
return response
def _get_port(self, server):
return server.socket_info['socket'][1]
def _public_port(self):
return self._get_port(self.public_server)
def _admin_port(self):
return self._get_port(self.admin_server)
def public_request(self, port=None, **kwargs):
kwargs['port'] = port or self._public_port()
return self.restful_request(**kwargs)
def admin_request(self, port=None, **kwargs):
kwargs['port'] = port or self._admin_port()
return self.restful_request(**kwargs)
def get_scoped_token(self):
"""Convenience method so that we can test authenticated requests."""
r = self.public_request(method='POST', path='/v2.0/tokens', body={
'auth': {
'passwordCredentials': {
'username': self.user_foo['name'],
'password': self.user_foo['password'],
},
'tenantId': self.tenant_bar['id'],
},
})
return self._get_token_id(r)
def _get_token_id(self, r):
"""Helper method to return a token ID from a response.
This needs to be overridden by child classes for on their content type.
"""
raise NotImplementedError()
class CoreApiTests(object):
def assertValidError(self, error):
"""Applicable to XML and JSON."""
try:
print error.attrib
except:
pass
self.assertIsNotNone(error.get('code'))
self.assertIsNotNone(error.get('title'))
self.assertIsNotNone(error.get('message'))
def assertValidTenant(self, tenant):
"""Applicable to XML and JSON."""
self.assertIsNotNone(tenant.get('id'))
self.assertIsNotNone(tenant.get('name'))
def assertValidUser(self, user):
"""Applicable to XML and JSON."""
self.assertIsNotNone(user.get('id'))
self.assertIsNotNone(user.get('name'))
def assertValidRole(self, tenant):
"""Applicable to XML and JSON."""
self.assertIsNotNone(tenant.get('id'))
self.assertIsNotNone(tenant.get('name'))
def test_public_multiple_choice(self):
r = self.public_request(path='/', expected_status=300)
self.assertValidMultipleChoiceResponse(r)
def test_admin_multiple_choice(self):
r = self.admin_request(path='/', expected_status=300)
self.assertValidMultipleChoiceResponse(r)
def test_public_version(self):
raise nose.exc.SkipTest('Blocked by bug 925548')
r = self.public_request(path='/v2.0/')
self.assertValidVersionResponse(r)
def test_admin_version(self):
raise nose.exc.SkipTest('Blocked by bug 925548')
r = self.admin_request(path='/v2.0/')
self.assertValidVersionResponse(r)
def test_public_extensions(self):
raise nose.exc.SkipTest('Blocked by bug 928054')
self.public_request(path='/v2.0/extensions',)
def test_admin_extensions(self):
raise nose.exc.SkipTest('Blocked by bug 928054')
self.admin_request(path='/v2.0/extensions',)
def test_authenticate(self):
r = self.public_request(method='POST', path='/v2.0/tokens', body={
'auth': {
'passwordCredentials': {
'username': self.user_foo['name'],
'password': self.user_foo['password'],
},
'tenantId': self.tenant_bar['id'],
},
},
# TODO(dolph): creating a token should result in a 201 Created
expected_status=200)
self.assertValidAuthenticationResponse(r)
def test_get_tenants_for_token(self):
r = self.public_request(path='/v2.0/tenants',
token=self.get_scoped_token())
self.assertValidTenantListResponse(r)
def test_validate_token(self):
token = self.get_scoped_token()
r = self.admin_request(path='/v2.0/tokens/%(token_id)s' % {
'token_id': token,
},
token=token)
self.assertValidAuthenticationResponse(r)
def test_validate_token_head(self):
"""The same call as above, except using HEAD.
There's no response to validate here, but this is included for the
sake of completely covering the core API.
"""
raise nose.exc.SkipTest('Blocked by bug 933587')
token = self.get_scoped_token()
self.admin_request(method='HEAD', path='/v2.0/tokens/%(token_id)s' % {
'token_id': token,
},
token=token)
def test_endpoints(self):
raise nose.exc.SkipTest('Blocked by bug 933555')
token = self.get_scoped_token()
r = self.admin_request(path='/v2.0/tokens/%(token_id)s/endpoints' % {
'token_id': token,
},
token=token)
self.assertValidTokenCatalogResponse(r)
def test_get_tenant(self):
token = self.get_scoped_token()
r = self.admin_request(path='/v2.0/tenants/%(tenant_id)s' % {
'tenant_id': self.tenant_bar['id'],
},
token=token)
self.assertValidTenantResponse(r)
def test_get_user_roles(self):
raise nose.exc.SkipTest('Blocked by bug 933565')
token = self.get_scoped_token()
r = self.admin_request(path='/v2.0/users/%(user_id)s/roles' % {
'user_id': self.user_foo['id'],
},
token=token)
self.assertValidRoleListResponse(r)
def test_get_user_roles_with_tenant(self):
token = self.get_scoped_token()
r = self.admin_request(
path='/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % {
'tenant_id': self.tenant_bar['id'],
'user_id': self.user_foo['id'],
},
token=token)
self.assertValidRoleListResponse(r)
def test_get_user(self):
token = self.get_scoped_token()
r = self.admin_request(path='/v2.0/users/%(user_id)s' % {
'user_id': self.user_foo['id'],
},
token=token)
self.assertValidUserResponse(r)
def test_error_response(self):
"""This triggers assertValidErrorResponse by convention."""
self.public_request(path='/v2.0/tenants', expected_status=401)
class JsonTestCase(RestfulTestCase, CoreApiTests):
content_type = 'json'
def _get_token_id(self, r):
"""Applicable only to JSON."""
return r.body['access']['token']['id']
def assertValidErrorResponse(self, r):
self.assertIsNotNone(r.body.get('error'))
self.assertValidError(r.body['error'])
self.assertEqual(r.body['error']['code'], r.status)
def assertValidAuthenticationResponse(self, r):
self.assertIsNotNone(r.body.get('access'))
self.assertIsNotNone(r.body['access'].get('token'))
self.assertIsNotNone(r.body['access'].get('user'))
# validate token
self.assertIsNotNone(r.body['access']['token'].get('id'))
self.assertIsNotNone(r.body['access']['token'].get('expires'))
tenant = r.body['access']['token'].get('tenant')
if tenant is not None:
# validate tenant
self.assertIsNotNone(tenant.get('id'))
self.assertIsNotNone(tenant.get('name'))
# validate user
self.assertIsNotNone(r.body['access']['user'].get('id'))
self.assertIsNotNone(r.body['access']['user'].get('name'))
# validate service catalog
if r.body['access'].get('serviceCatalog') is not None:
self.assertTrue(len(r.body['access']['serviceCatalog']))
for service in r.body['access']['serviceCatalog']:
# validate service
self.assertIsNotNone(service.get('name'))
self.assertIsNotNone(service.get('type'))
# services contain at least one endpoint
self.assertIsNotNone(service.get('endpoints'))
self.assertTrue(len(service['endpoints']))
for endpoint in service['endpoints']:
# validate service endpoint
self.assertIsNotNone(endpoint.get('publicURL'))
def assertValidTenantListResponse(self, r):
self.assertIsNotNone(r.body.get('tenants'))
self.assertTrue(len(r.body['tenants']))
for tenant in r.body['tenants']:
self.assertValidTenant(tenant)
self.assertIsNotNone(tenant.get('enabled'))
self.assertIn(tenant.get('enabled'), [True, False])
def assertValidUserResponse(self, r):
self.assertIsNotNone(r.body.get('user'))
self.assertValidUser(r.body['user'])
def assertValidTenantResponse(self, r):
self.assertIsNotNone(r.body.get('tenant'))
self.assertValidTenant(r.body['tenant'])
def assertValidRoleListResponse(self, r):
self.assertIsNotNone(r.body.get('roles'))
self.assertTrue(len(r.body['roles']))
for role in r.body['roles']:
self.assertValidRole(role)
def assertValidMultipleChoiceResponse(self, r):
self.assertIsNotNone(r.body.get('versions'))
self.assertIsNotNone(r.body['versions'].get('values'))
self.assertTrue(len(r.body['versions']['values']))
for version in r.body['versions']['values']:
self.assertIsNotNone(version.get('id'))
self.assertIsNotNone(version.get('status'))
self.assertIsNotNone(version.get('updated'))
self.assertIsNotNone(version.get('links'))
self.assertTrue(len(version.get('links')))
for link in version.get('links'):
self.assertIsNotNone(link.get('rel'))
self.assertIsNotNone(link.get('href'))
self.assertIsNotNone(version.get('media-types'))
self.assertTrue(len(version.get('media-types')))
for media in version.get('media-types'):
self.assertIsNotNone(media.get('base'))
self.assertIsNotNone(media.get('type'))
def assertValidVersionResponse(self, r):
raise NotImplementedError()
class XmlTestCase(RestfulTestCase, CoreApiTests):
xmlns = 'http://docs.openstack.org/identity/api/v2.0'
content_type = 'xml'
def _get_token_id(self, r):
return r.body.find(self._tag('token')).get('id')
def _tag(self, tag_name, xmlns=None):
"""Helper method to build an namespaced element name."""
return '{%(ns)s}%(tag)s' % {'ns': xmlns or self.xmlns, 'tag': tag_name}
def assertValidErrorResponse(self, r):
xml = r.body
self.assertEqual(xml.tag, self._tag('error'))
self.assertValidError(xml)
self.assertEqual(xml.get('code'), str(r.status))
def assertValidMultipleChoiceResponse(self, r):
xml = r.body
self.assertEqual(xml.tag, self._tag('versions'))
self.assertTrue(len(xml.findall(self._tag('version'))))
for version in xml.findall(self._tag('version')):
# validate service endpoint
self.assertIsNotNone(version.get('id'))
self.assertIsNotNone(version.get('status'))
self.assertIsNotNone(version.get('updated'))
self.assertTrue(len(version.findall(self._tag('link'))))
for link in version.findall(self._tag('link')):
self.assertIsNotNone(link.get('rel'))
self.assertIsNotNone(link.get('href'))
media_types = version.find(self._tag('media-types'))
self.assertIsNotNone(media_types)
self.assertTrue(len(media_types.findall(self._tag('media-type'))))
for media in media_types.findall(self._tag('media-type')):
self.assertIsNotNone(media.get('base'))
self.assertIsNotNone(media.get('type'))
def assertValidVersionResponse(self, r):
raise NotImplementedError()
def assertValidTokenCatalogResponse(self, r):
xml = r.body
self.assertEqual(xml.tag, self._tag('endpoints'))
self.assertTrue(len(xml.findall(self._tag('endpoint'))))
for endpoint in xml.findall(self._tag('endpoint')):
self.assertIsNotNone(endpoint.get('publicUrl'))
def assertValidTenantResponse(self, r):
xml = r.body
self.assertEqual(xml.tag, self._tag('tenant'))
self.assertValidTenant(xml)
def assertValidUserResponse(self, r):
xml = r.body
self.assertEqual(xml.tag, self._tag('user'))
self.assertValidUser(xml)
def assertValidRoleListResponse(self, r):
xml = r.body
self.assertEqual(xml.tag, self._tag('roles'))
self.assertTrue(len(r.body.findall(self._tag('role'))))
for role in r.body.findall(self._tag('role')):
self.assertValidRole(role)
def assertValidAuthenticationResponse(self, r):
xml = r.body
self.assertEqual(xml.tag, self._tag('access'))
# validate token
token = xml.find(self._tag('token'))
self.assertIsNotNone(token)
self.assertIsNotNone(token.get('id'))
self.assertIsNotNone(token.get('expires'))
tenant = token.find(self._tag('tenant'))
if tenant is not None:
# validate tenant
self.assertValidTenant(tenant)
self.assertIn(tenant.get('enabled'), ['true', 'false'])
user = xml.find(self._tag('user'))
self.assertIsNotNone(user)
self.assertIsNotNone(user.get('id'))
self.assertIsNotNone(user.get('name'))
serviceCatalog = xml.find(self._tag('serviceCatalog'))
if serviceCatalog is not None:
self.assertTrue(len(serviceCatalog.findall(self._tag('service'))))
for service in serviceCatalog.findall(self._tag('service')):
# validate service
self.assertIsNotNone(service.get('name'))
self.assertIsNotNone(service.get('type'))
# services contain at least one endpoint
self.assertTrue(len(service))
for endpoint in service.findall(self._tag('endpoint')):
# validate service endpoint
self.assertIsNotNone(endpoint.get('publicURL'))
def assertValidTenantListResponse(self, r):
xml = r.body
self.assertEqual(xml.tag, self._tag('tenants'))
self.assertTrue(len(r.body))
for tenant in r.body.findall(self._tag('tenant')):
self.assertValidTenant(tenant)
self.assertIn(tenant.get('enabled'), ['true', 'false'])

View File

@ -14,6 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import webob
from keystone import config
@ -25,15 +27,23 @@ CONF = config.CONF
def make_request(**kwargs):
accept = kwargs.pop('accept', None)
method = kwargs.pop('method', 'GET')
body = kwargs.pop('body', None)
req = webob.Request.blank('/', **kwargs)
req.method = method
if body is not None:
req.body = body
if accept is not None:
req.accept = accept
return req
def make_response(**kwargs):
body = kwargs.pop('body', None)
return webob.Response(body)
class TokenAuthMiddlewareTest(test.TestCase):
def test_request(self):
req = make_request()
@ -97,3 +107,51 @@ class JsonBodyMiddlewareTest(test.TestCase):
middleware.JsonBodyMiddleware(None).process_request(req)
params = req.environ.get(middleware.PARAMS_ENV, {})
self.assertEqual(params, {})
class XmlBodyMiddlewareTest(test.TestCase):
def test_client_wants_xml_back(self):
"""Clients requesting XML should get what they ask for."""
body = '{"container": {"attribute": "value"}}'
req = make_request(body=body, method='POST', accept='application/xml')
middleware.XmlBodyMiddleware(None).process_request(req)
resp = make_response(body=body)
middleware.XmlBodyMiddleware(None).process_response(req, resp)
self.assertEqual(resp.content_type, 'application/xml')
def test_client_wants_json_back(self):
"""Clients requesting JSON should definitely not get XML back."""
body = '{"container": {"attribute": "value"}}'
req = make_request(body=body, method='POST', accept='application/json')
middleware.XmlBodyMiddleware(None).process_request(req)
resp = make_response(body=body)
middleware.XmlBodyMiddleware(None).process_response(req, resp)
self.assertNotIn('application/xml', resp.content_type)
def test_client_fails_to_specify_accept(self):
"""If client does not specify an Accept header, default to JSON."""
body = '{"container": {"attribute": "value"}}'
req = make_request(body=body, method='POST')
middleware.XmlBodyMiddleware(None).process_request(req)
resp = make_response(body=body)
middleware.XmlBodyMiddleware(None).process_response(req, resp)
self.assertNotIn('application/xml', resp.content_type)
def test_xml_replaced_by_json(self):
"""XML requests should be replaced by JSON requests."""
req = make_request(
body='<container><element attribute="value" /></container>',
content_type='application/xml',
method='POST')
middleware.XmlBodyMiddleware(None).process_request(req)
self.assertTrue(req.content_type, 'application/json')
self.assertTrue(json.loads(req.body))
def test_json_unnaffected(self):
"""JSON-only requests should be unnaffected by the XML middleware."""
content_type = 'application/json'
body = '{"container": {"attribute": "value"}}'
req = make_request(body=body, content_type=content_type, method='POST')
middleware.XmlBodyMiddleware(None).process_request(req)
self.assertEqual(req.body, body)
self.assertEqual(req.content_type, content_type)

155
tests/test_serializer.py Normal file
View File

@ -0,0 +1,155 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import re
from keystone import test
from keystone.common import serializer
class XmlSerializerTestCase(test.TestCase):
def assertEqualIgnoreWhitespace(self, a, b):
"""Splits two strings into lists and compares them.
This provides easy-to-read failures from nose.
"""
try:
self.assertEqual(a, b)
except:
a = re.sub('[ \n]+', ' ', a).strip().split()
b = re.sub('[ \n]+', ' ', b).strip().split()
self.assertEqual(a, b)
def assertSerializeDeserialize(self, d, xml, xmlns=None):
self.assertEqualIgnoreWhitespace(serializer.to_xml(d, xmlns), xml)
self.assertEqual(serializer.from_xml(xml), d)
# operations should be invertable
self.assertEqual(
serializer.from_xml(serializer.to_xml(d, xmlns)),
d)
self.assertEqualIgnoreWhitespace(
serializer.to_xml(serializer.from_xml(xml), xmlns),
xml)
def test_none(self):
d = None
xml = None
self.assertSerializeDeserialize(d, xml)
def test_auth_request(self):
d = {
"auth": {
"passwordCredentials": {
"username": "test_user",
"password": "mypass"
},
"tenantName": "customer-x"
}
}
xml = """
<?xml version="1.0" encoding="UTF-8"?>
<auth xmlns="http://docs.openstack.org/identity/api/v2.0"
tenantName="customer-x">
<passwordCredentials
username="test_user"
password="mypass"/>
</auth>
"""
self.assertSerializeDeserialize(d, xml)
def test_role_crud(self):
d = {
"role": {
"id": "123",
"name": "Guest",
"description": "Guest Access"
}
}
# TODO(dolph): examples show this description as an attribute?
xml = """
<?xml version="1.0" encoding="UTF-8"?>
<role xmlns="http://docs.openstack.org/identity/api/v2.0"
id="123"
name="Guest">
<description>Guest Access</description>
</role>
"""
self.assertSerializeDeserialize(d, xml)
def test_service_crud(self):
xmlns = "http://docs.openstack.org/identity/api/ext/OS-KSADM/v1.0"
d = {
# FIXME(dolph): should be...
# "OS-KSADM:service": {
"service": {
"id": "123",
"name": "nova",
"type": "compute",
"description": "OpenStack Compute Service"
}
}
# TODO(dolph): examples show this description as an attribute?
xml = """
<?xml version="1.0" encoding="UTF-8"?>
<service
xmlns="%(xmlns)s"
type="compute"
id="123"
name="nova">
<description>OpenStack Compute Service</description>
</service>
""" % {'xmlns': xmlns}
self.assertSerializeDeserialize(d, xml, xmlns=xmlns)
def test_tenant_crud(self):
d = {
"tenant": {
"id": "1234",
"name": "ACME corp",
"description": "A description...",
"enabled": True
}
}
xml = """
<?xml version="1.0" encoding="UTF-8"?>
<tenant
xmlns="http://docs.openstack.org/identity/api/v2.0"
enabled="true"
id="1234"
name="ACME corp">
<description>A description...</description>
</tenant>
"""
self.assertSerializeDeserialize(d, xml)
def test_values_list(self):
d = {
"objects": {
"values": [{
"attribute": "value1",
}, {
"attribute": "value2",
}]
}
}
xml = """
<?xml version="1.0" encoding="UTF-8"?>
<objects xmlns="http://docs.openstack.org/identity/api/v2.0">
<object attribute="value1"/>
<object attribute="value2"/>
</objects>
"""
self.assertEqualIgnoreWhitespace(serializer.to_xml(d), xml)

View File

@ -41,32 +41,43 @@ class VersionTestCase(test.TestCase):
data = json.loads(resp.body)
expected = {
"versions": {
"values": [{
"id": "v2.0",
"status": "beta",
"updated": "2011-11-19T00:00:00Z",
"links": [{
"rel": "self",
"href": ("http://localhost:%s/v2.0/" %
CONF.public_port),
}, {
"rel": "describedby",
"type": "text/html",
"href": "http://docs.openstack.org/api/openstack-"
"identity-service/2.0/content/"
}, {
"rel": "describedby",
"type": "application/pdf",
"href": "http://docs.openstack.org/api/openstack-"
"identity-service/2.0/identity-dev-guide-"
"2.0.pdf"
}],
"media-types": [{
"base": "application/json",
"type": "application/vnd.openstack.identity-v2.0"
"+json"
}]
}]
"values": [
{
"id": "v2.0",
"status": "beta",
"updated": "2011-11-19T00:00:00Z",
"links": [
{
"rel": "self",
"href": "http://localhost:%s/v2.0/" %
CONF.public_port,
}, {
"rel": "describedby",
"type": "text/html",
"href": "http://docs.openstack.org/api/"
"openstack-identity-service/2.0/"
"content/"
}, {
"rel": "describedby",
"type": "application/pdf",
"href": "http://docs.openstack.org/api/"
"openstack-identity-service/2.0/"
"identity-dev-guide-2.0.pdf"
}
],
"media-types": [
{
"base": "application/json",
"type": "application/"
"vnd.openstack.identity-v2.0+json"
}, {
"base": "application/xml",
"type": "application/"
"vnd.openstack.identity-v2.0+xml"
}
]
}
]
}
}
self.assertEqual(data, expected)
@ -78,32 +89,43 @@ class VersionTestCase(test.TestCase):
data = json.loads(resp.body)
expected = {
"versions": {
"values": [{
"id": "v2.0",
"status": "beta",
"updated": "2011-11-19T00:00:00Z",
"links": [{
"rel": "self",
"href": ("http://localhost:%s/v2.0/" %
CONF.admin_port),
}, {
"rel": "describedby",
"type": "text/html",
"href": "http://docs.openstack.org/api/openstack-"
"identity-service/2.0/content/"
}, {
"rel": "describedby",
"type": "application/pdf",
"href": "http://docs.openstack.org/api/openstack-"
"identity-service/2.0/identity-dev-guide-"
"2.0.pdf"
}],
"media-types": [{
"base": "application/json",
"type": "application/vnd.openstack.identity-v2.0"
"+json"
}]
}]
"values": [
{
"id": "v2.0",
"status": "beta",
"updated": "2011-11-19T00:00:00Z",
"links": [
{
"rel": "self",
"href": "http://localhost:%s/v2.0/" %
CONF.admin_port,
}, {
"rel": "describedby",
"type": "text/html",
"href": "http://docs.openstack.org/api/"
"openstack-identity-service/2.0/"
"content/"
}, {
"rel": "describedby",
"type": "application/pdf",
"href": "http://docs.openstack.org/api/"
"openstack-identity-service/2.0/"
"identity-dev-guide-2.0.pdf"
}
],
"media-types": [
{
"base": "application/json",
"type": "application/"
"vnd.openstack.identity-v2.0+json"
}, {
"base": "application/xml",
"type": "application/"
"vnd.openstack.identity-v2.0+xml"
}
]
}
]
}
}
self.assertEqual(data, expected)

View File

@ -8,6 +8,7 @@ routes
sqlalchemy
sqlalchemy-migrate
passlib
lxml
# for python-novaclient
prettytable

View File

@ -9,6 +9,7 @@ sqlalchemy
sqlalchemy-migrate
passlib
python-memcached
lxml
# keystonelight testing dependencies
nose