Add XML templates.

Creates the concept of an XML template, which is a description of
how to serialize an object into XML.  XML templates are split into
master and slave templates, and slave templates can be attached to
master templates to augment the serialization.  The expected use
case is with extensions, allowing extensions to not only add data
to the object, but to also ensure that the new data gets
serialized into the output XML representation.

Also includes lazy serialization, for use by extensions.

Change-Id: Ifd8493be04c73bbb2a850080b687c5e799c48239
This commit is contained in:
Kevin L. Mitchell 2011-10-17 14:19:25 -05:00
parent edf3e39cd3
commit 9a15c0d070
24 changed files with 2541 additions and 380 deletions

View File

@ -75,9 +75,9 @@ use = call:nova.api.openstack.urlmap:urlmap_factory
/v1.1: openstackapi11
[pipeline:openstackapi11]
pipeline = faultwrap noauth ratelimit extensions osapiapp11
pipeline = faultwrap noauth ratelimit serialize extensions osapiapp11
# NOTE(vish): use the following pipeline for deprecated auth
# pipeline = faultwrap auth ratelimit extensions osapiapp11
# pipeline = faultwrap auth ratelimit serialize extensions osapiapp11
[filter:faultwrap]
paste.filter_factory = nova.api.openstack:FaultWrapper.factory
@ -91,6 +91,9 @@ paste.filter_factory = nova.api.openstack.auth:NoAuthMiddleware.factory
[filter:ratelimit]
paste.filter_factory = nova.api.openstack.limits:RateLimitingMiddleware.factory
[filter:serialize]
paste.filter_factory = nova.api.openstack.wsgi:LazySerializationMiddleware.factory
[filter:extensions]
paste.filter_factory = nova.api.openstack.extensions:ExtensionMiddleware.factory

View File

@ -22,6 +22,7 @@ from nova import log as logging
from nova.auth import manager
from nova.api.openstack import faults
from nova.api.openstack import wsgi
from nova.api.openstack import xmlutil
FLAGS = flags.FLAGS
@ -80,15 +81,25 @@ class Controller(object):
return dict(account=_translate_keys(account))
def create_resource():
metadata = {
"attributes": {
"account": ["id", "name", "description", "manager"],
},
}
class AccountTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('account', selector='account')
root.set('id', 'id')
root.set('name', 'name')
root.set('description', 'description')
root.set('manager', 'manager')
return xmlutil.MasterTemplate(root, 1)
class AccountXMLSerializer(xmlutil.XMLTemplateSerializer):
def default(self):
return AccountTemplate()
def create_resource():
body_serializers = {
'application/xml': wsgi.XMLDictSerializer(metadata=metadata),
'application/xml': AccountXMLSerializer(),
}
serializer = wsgi.ResponseSerializer(body_serializers)
return wsgi.Resource(Controller(), serializer=serializer)

View File

@ -356,52 +356,51 @@ class MetadataHeadersSerializer(wsgi.ResponseHeadersSerializer):
response.status_int = 204
class MetadataXMLSerializer(wsgi.XMLDictSerializer):
metadata_nsmap = {None: xmlutil.XMLNS_V11}
NSMAP = {None: xmlutil.XMLNS_V11}
def __init__(self, xmlns=wsgi.XMLNS_V11):
super(MetadataXMLSerializer, self).__init__(xmlns=xmlns)
class MetaItemTemplate(xmlutil.TemplateBuilder):
def construct(self):
sel = xmlutil.Selector('meta', xmlutil.get_items, 0)
root = xmlutil.TemplateElement('meta', selector=sel)
root.set('key', 0)
root.text = 1
return xmlutil.MasterTemplate(root, 1, nsmap=metadata_nsmap)
def populate_metadata(self, metadata_elem, meta_dict):
for (key, value) in meta_dict.items():
elem = etree.SubElement(metadata_elem, 'meta')
elem.set('key', str(key))
elem.text = value
def _populate_meta_item(self, meta_elem, meta_item_dict):
"""Populate a meta xml element from a dict."""
(key, value) = meta_item_dict.items()[0]
meta_elem.set('key', str(key))
meta_elem.text = value
class MetadataTemplateElement(xmlutil.TemplateElement):
def will_render(self, datum):
return True
def index(self, metadata_dict):
metadata = etree.Element('metadata', nsmap=self.NSMAP)
self.populate_metadata(metadata, metadata_dict.get('metadata', {}))
return self._to_xml(metadata)
def create(self, metadata_dict):
metadata = etree.Element('metadata', nsmap=self.NSMAP)
self.populate_metadata(metadata, metadata_dict.get('metadata', {}))
return self._to_xml(metadata)
class MetadataTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = MetadataTemplateElement('metadata', selector='metadata')
elem = xmlutil.SubTemplateElement(root, 'meta',
selector=xmlutil.get_items)
elem.set('key', 0)
elem.text = 1
return xmlutil.MasterTemplate(root, 1, nsmap=metadata_nsmap)
def update_all(self, metadata_dict):
metadata = etree.Element('metadata', nsmap=self.NSMAP)
self.populate_metadata(metadata, metadata_dict.get('metadata', {}))
return self._to_xml(metadata)
def show(self, meta_item_dict):
meta = etree.Element('meta', nsmap=self.NSMAP)
self._populate_meta_item(meta, meta_item_dict['meta'])
return self._to_xml(meta)
class MetadataXMLSerializer(xmlutil.XMLTemplateSerializer):
def index(self):
return MetadataTemplate()
def update(self, meta_item_dict):
meta = etree.Element('meta', nsmap=self.NSMAP)
self._populate_meta_item(meta, meta_item_dict['meta'])
return self._to_xml(meta)
def create(self):
return MetadataTemplate()
def default(self, *args, **kwargs):
return ''
def update_all(self):
return MetadataTemplate()
def show(self):
return MetaItemTemplate()
def update(self):
return MetaItemTemplate()
def default(self):
return xmlutil.MasterTemplate(None, 1)
def check_snapshots_enabled(f):

View File

@ -21,6 +21,7 @@ import webob
from nova import console
from nova import exception
from nova.api.openstack import wsgi
from nova.api.openstack import xmlutil
def _translate_keys(cons):
@ -89,5 +90,51 @@ class Controller(object):
return webob.Response(status_int=202)
class ConsoleTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('console', selector='console')
id_elem = xmlutil.SubTemplateElement(root, 'id', selector='id')
id_elem.text = xmlutil.Selector()
port_elem = xmlutil.SubTemplateElement(root, 'port', selector='port')
port_elem.text = xmlutil.Selector()
host_elem = xmlutil.SubTemplateElement(root, 'host', selector='host')
host_elem.text = xmlutil.Selector()
passwd_elem = xmlutil.SubTemplateElement(root, 'password',
selector='password')
passwd_elem.text = xmlutil.Selector()
constype_elem = xmlutil.SubTemplateElement(root, 'console_type',
selector='console_type')
constype_elem.text = xmlutil.Selector()
return xmlutil.MasterTemplate(root, 1)
class ConsolesTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('consoles')
console = xmlutil.SubTemplateElement(root, 'console',
selector='consoles')
console.append(ConsoleTemplate())
return xmlutil.MasterTemplate(root, 1)
class ConsoleXMLSerializer(xmlutil.XMLTemplateSerializer):
def index(self):
return ConsolesTemplate()
def show(self):
return ConsoleTemplate()
def create_resource():
return wsgi.Resource(Controller())
body_serializers = {
'application/xml': ConsoleXMLSerializer(),
}
serializer = wsgi.ResponseSerializer(body_serializers)
return wsgi.Resource(Controller(), serializer=serializer)

View File

@ -30,6 +30,7 @@ import webob.exc
from nova import exception
from nova import flags
from nova import log as logging
from nova import utils
from nova import wsgi as base_wsgi
import nova.api.openstack
from nova.api.openstack import common
@ -159,9 +160,20 @@ class RequestExtensionController(object):
def process(self, req, *args, **kwargs):
res = req.get_response(self.application)
# Deserialize the response body, if any
body = None
if res.body:
body = utils.loads(res.body)
# currently request handlers are un-ordered
for handler in self.handlers:
res = handler(req, res)
res = handler(req, res, body)
# Reserialize the response body
if body is not None:
res.body = utils.dumps(body)
return res

View File

@ -81,50 +81,54 @@ class Controller(object):
return views.flavors.ViewBuilder(base_url, project_id)
class FlavorXMLSerializer(wsgi.XMLDictSerializer):
def make_flavor(elem, detailed=False):
elem.set('name')
elem.set('id')
if detailed:
elem.set('ram')
elem.set('disk')
NSMAP = {None: xmlutil.XMLNS_V11, 'atom': xmlutil.XMLNS_ATOM}
for attr in ("vcpus", "swap", "rxtx_quota", "rxtx_cap"):
elem.set(attr, xmlutil.EmptyStringSelector(attr))
def __init__(self):
super(FlavorXMLSerializer, self).__init__(xmlns=wsgi.XMLNS_V11)
xmlutil.make_links(elem, 'links')
def _populate_flavor(self, flavor_elem, flavor_dict, detailed=False):
"""Populate a flavor xml element from a dict."""
flavor_elem.set('name', flavor_dict['name'])
flavor_elem.set('id', str(flavor_dict['id']))
if detailed:
flavor_elem.set('ram', str(flavor_dict['ram']))
flavor_elem.set('disk', str(flavor_dict['disk']))
flavor_nsmap = {None: xmlutil.XMLNS_V11, 'atom': xmlutil.XMLNS_ATOM}
for attr in ("vcpus", "swap", "rxtx_quota", "rxtx_cap"):
flavor_elem.set(attr, str(flavor_dict.get(attr, "")))
for link in flavor_dict.get('links', []):
elem = etree.SubElement(flavor_elem,
'{%s}link' % xmlutil.XMLNS_ATOM)
elem.set('rel', link['rel'])
elem.set('href', link['href'])
return flavor_elem
class FlavorTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('flavor', selector='flavor')
make_flavor(root, detailed=True)
return xmlutil.MasterTemplate(root, 1, nsmap=flavor_nsmap)
def show(self, flavor_container):
flavor = etree.Element('flavor', nsmap=self.NSMAP)
self._populate_flavor(flavor, flavor_container['flavor'], True)
return self._to_xml(flavor)
def detail(self, flavors_dict):
flavors = etree.Element('flavors', nsmap=self.NSMAP)
for flavor_dict in flavors_dict['flavors']:
flavor = etree.SubElement(flavors, 'flavor')
self._populate_flavor(flavor, flavor_dict, True)
return self._to_xml(flavors)
class MinimalFlavorsTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('flavors')
elem = xmlutil.SubTemplateElement(root, 'flavor', selector='flavors')
make_flavor(elem)
return xmlutil.MasterTemplate(root, 1, nsmap=flavor_nsmap)
def index(self, flavors_dict):
flavors = etree.Element('flavors', nsmap=self.NSMAP)
for flavor_dict in flavors_dict['flavors']:
flavor = etree.SubElement(flavors, 'flavor')
self._populate_flavor(flavor, flavor_dict, False)
return self._to_xml(flavors)
class FlavorsTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('flavors')
elem = xmlutil.SubTemplateElement(root, 'flavor', selector='flavors')
make_flavor(elem, detailed=True)
return xmlutil.MasterTemplate(root, 1, nsmap=flavor_nsmap)
class FlavorXMLSerializer(xmlutil.XMLTemplateSerializer):
def show(self):
return FlavorTemplate()
def detail(self):
return FlavorsTemplate()
def index(self):
return MinimalFlavorsTemplate()
def create_resource():

View File

@ -148,82 +148,63 @@ class Controller(object):
raise webob.exc.HTTPMethodNotAllowed()
class ImageXMLSerializer(wsgi.XMLDictSerializer):
def make_image(elem, detailed=False):
elem.set('name')
elem.set('id')
NSMAP = {None: xmlutil.XMLNS_V11, 'atom': xmlutil.XMLNS_ATOM}
if detailed:
elem.set('updated')
elem.set('created')
elem.set('status')
elem.set('progress')
elem.set('minRam')
elem.set('minDisk')
def __init__(self):
self.metadata_serializer = common.MetadataXMLSerializer()
server = xmlutil.SubTemplateElement(elem, 'server', selector='server')
server.set('id')
xmlutil.make_links(server, 'links')
def _create_metadata_node(self, metadata_dict):
metadata_elem = etree.Element('metadata', nsmap=self.NSMAP)
self.metadata_serializer.populate_metadata(metadata_elem,
metadata_dict)
return metadata_elem
elem.append(common.MetadataTemplate())
def _create_server_node(self, server_dict):
server_elem = etree.Element('server', nsmap=self.NSMAP)
server_elem.set('id', str(server_dict['id']))
for link in server_dict.get('links', []):
elem = etree.SubElement(server_elem,
'{%s}link' % xmlutil.XMLNS_ATOM)
elem.set('rel', link['rel'])
elem.set('href', link['href'])
return server_elem
xmlutil.make_links(elem, 'links')
def _populate_image(self, image_elem, image_dict, detailed=False):
"""Populate an image xml element from a dict."""
image_elem.set('name', image_dict['name'])
image_elem.set('id', str(image_dict['id']))
if detailed:
image_elem.set('updated', str(image_dict['updated']))
image_elem.set('created', str(image_dict['created']))
image_elem.set('status', str(image_dict['status']))
if 'progress' in image_dict:
image_elem.set('progress', str(image_dict['progress']))
if 'minRam' in image_dict:
image_elem.set('minRam', str(image_dict['minRam']))
if 'minDisk' in image_dict:
image_elem.set('minDisk', str(image_dict['minDisk']))
if 'server' in image_dict:
server_elem = self._create_server_node(image_dict['server'])
image_elem.append(server_elem)
image_nsmap = {None: xmlutil.XMLNS_V11, 'atom': xmlutil.XMLNS_ATOM}
meta_elem = self._create_metadata_node(
image_dict.get('metadata', {}))
image_elem.append(meta_elem)
self._populate_links(image_elem, image_dict.get('links', []))
class ImageTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('image', selector='image')
make_image(root, detailed=True)
return xmlutil.MasterTemplate(root, 1, nsmap=image_nsmap)
def _populate_links(self, parent, links):
for link in links:
elem = etree.SubElement(parent, '{%s}link' % xmlutil.XMLNS_ATOM)
elem.set('rel', link['rel'])
if 'type' in link:
elem.set('type', link['type'])
elem.set('href', link['href'])
def index(self, images_dict):
images = etree.Element('images', nsmap=self.NSMAP)
for image_dict in images_dict['images']:
image = etree.SubElement(images, 'image')
self._populate_image(image, image_dict, False)
class MinimalImagesTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('images')
elem = xmlutil.SubTemplateElement(root, 'image', selector='images')
make_image(elem)
xmlutil.make_links(root, 'images_links')
return xmlutil.MasterTemplate(root, 1, nsmap=image_nsmap)
self._populate_links(images, images_dict.get('images_links', []))
return self._to_xml(images)
def detail(self, images_dict):
images = etree.Element('images', nsmap=self.NSMAP)
for image_dict in images_dict['images']:
image = etree.SubElement(images, 'image')
self._populate_image(image, image_dict, True)
return self._to_xml(images)
class ImagesTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('images')
elem = xmlutil.SubTemplateElement(root, 'image', selector='images')
make_image(elem, detailed=True)
return xmlutil.MasterTemplate(root, 1, nsmap=image_nsmap)
def show(self, image_dict):
image = etree.Element('image', nsmap=self.NSMAP)
self._populate_image(image, image_dict['image'], True)
return self._to_xml(image)
class ImageXMLSerializer(xmlutil.XMLTemplateSerializer):
def index(self):
return MinimalImagesTemplate()
def detail(self):
return ImagesTemplate()
def show(self):
return ImageTemplate()
def create_resource():

View File

@ -74,37 +74,40 @@ class Controller(object):
return views_addresses.ViewBuilder()
class IPXMLSerializer(wsgi.XMLDictSerializer):
def make_network(elem):
elem.set('id', 0)
NSMAP = {None: xmlutil.XMLNS_V11}
ip = xmlutil.SubTemplateElement(elem, 'ip', selector=1)
ip.set('version')
ip.set('addr')
def __init__(self, xmlns=wsgi.XMLNS_V11):
super(IPXMLSerializer, self).__init__(xmlns=xmlns)
def populate_addresses_node(self, addresses_elem, addresses_dict):
for (network_id, ip_dicts) in addresses_dict.items():
network_elem = self._create_network_node(network_id, ip_dicts)
addresses_elem.append(network_elem)
network_nsmap = {None: xmlutil.XMLNS_V11}
def _create_network_node(self, network_id, ip_dicts):
network_elem = etree.Element('network', nsmap=self.NSMAP)
network_elem.set('id', str(network_id))
for ip_dict in ip_dicts:
ip_elem = etree.SubElement(network_elem, 'ip')
ip_elem.set('version', str(ip_dict['version']))
ip_elem.set('addr', ip_dict['addr'])
return network_elem
def show(self, network_dict):
(network_id, ip_dicts) = network_dict.items()[0]
network = self._create_network_node(network_id, ip_dicts)
return self._to_xml(network)
class NetworkTemplate(xmlutil.TemplateBuilder):
def construct(self):
sel = xmlutil.Selector(xmlutil.get_items, 0)
root = xmlutil.TemplateElement('network', selector=sel)
make_network(root)
return xmlutil.MasterTemplate(root, 1, nsmap=network_nsmap)
def index(self, addresses_dict):
addresses = etree.Element('addresses', nsmap=self.NSMAP)
self.populate_addresses_node(addresses,
addresses_dict.get('addresses', {}))
return self._to_xml(addresses)
class AddressesTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('addresses', selector='addresses')
elem = xmlutil.SubTemplateElement(root, 'network',
selector=xmlutil.get_items)
make_network(elem)
return xmlutil.MasterTemplate(root, 1, nsmap=network_nsmap)
class IPXMLSerializer(xmlutil.XMLTemplateSerializer):
def show(self):
return NetworkTemplate()
def index(self):
return AddressesTemplate()
def create_resource():

View File

@ -68,53 +68,37 @@ class LimitsController(object):
return limits_views.ViewBuilder()
class LimitsXMLSerializer(wsgi.XMLDictSerializer):
limits_nsmap = {None: xmlutil.XMLNS_V11, 'atom': xmlutil.XMLNS_ATOM}
xmlns = wsgi.XMLNS_V11
NSMAP = {None: xmlutil.XMLNS_V11, 'atom': xmlutil.XMLNS_ATOM}
class LimitsTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('limits', selector='limits')
def __init__(self):
pass
rates = xmlutil.SubTemplateElement(root, 'rates')
rate = xmlutil.SubTemplateElement(rates, 'rate', selector='rate')
rate.set('uri', 'uri')
rate.set('regex', 'regex')
limit = xmlutil.SubTemplateElement(rate, 'limit', selector='limit')
limit.set('value', 'value')
limit.set('verb', 'verb')
limit.set('remaining', 'remaining')
limit.set('unit', 'unit')
limit.set('next-available', 'next-available')
def _create_rates_node(self, rates):
rates_elem = etree.Element('rates', nsmap=self.NSMAP)
for rate in rates:
rate_node = etree.SubElement(rates_elem, 'rate')
rate_node.set('uri', rate['uri'])
rate_node.set('regex', rate['regex'])
for limit in rate['limit']:
limit_elem = etree.SubElement(rate_node, 'limit')
limit_elem.set('value', str(limit['value']))
limit_elem.set('verb', str(limit['verb']))
limit_elem.set('remaining', str(limit['remaining']))
limit_elem.set('unit', str(limit['unit']))
limit_elem.set('next-available', str(limit['next-available']))
return rates_elem
absolute = xmlutil.SubTemplateElement(root, 'absolute',
selector='absolute')
limit = xmlutil.SubTemplateElement(absolute, 'limit',
selector=xmlutil.get_items)
limit.set('name', 0)
limit.set('value', 1)
def _create_absolute_node(self, absolute_dict):
absolute_elem = etree.Element('absolute', nsmap=self.NSMAP)
for key, value in absolute_dict.items():
limit_elem = etree.SubElement(absolute_elem, 'limit')
limit_elem.set('name', str(key))
limit_elem.set('value', str(value))
return absolute_elem
return xmlutil.MasterTemplate(root, 1, nsmap=limits_nsmap)
def _populate_limits(self, limits_elem, limits_dict):
"""Populate a limits xml element from a dict."""
rates_elem = self._create_rates_node(
limits_dict.get('rate', []))
limits_elem.append(rates_elem)
absolutes_elem = self._create_absolute_node(
limits_dict.get('absolute', {}))
limits_elem.append(absolutes_elem)
def index(self, limits_dict):
limits = etree.Element('limits', nsmap=self.NSMAP)
self._populate_limits(limits, limits_dict['limits'])
return self._to_xml(limits)
class LimitsXMLSerializer(xmlutil.XMLTemplateSerializer):
def index(self):
return LimitsTemplate()
def create_resource():

View File

@ -988,129 +988,107 @@ class HeadersSerializer(wsgi.ResponseHeadersSerializer):
response.status_int = 202
class ServerXMLSerializer(wsgi.XMLDictSerializer):
class SecurityGroupsTemplateElement(xmlutil.TemplateElement):
def will_render(self, datum):
return 'security_groups' in datum
NSMAP = {None: xmlutil.XMLNS_V11, 'atom': xmlutil.XMLNS_ATOM}
def __init__(self):
self.metadata_serializer = common.MetadataXMLSerializer()
self.addresses_serializer = ips.IPXMLSerializer()
def make_server(elem, detailed=False):
elem.set('name')
elem.set('id')
def _create_metadata_node(self, metadata_dict):
metadata_elem = etree.Element('metadata', nsmap=self.NSMAP)
self.metadata_serializer.populate_metadata(metadata_elem,
metadata_dict)
return metadata_elem
if detailed:
elem.set('uuid')
elem.set('userId', 'user_id')
elem.set('tenantId', 'tenant_id')
elem.set('updated')
elem.set('created')
elem.set('hostId')
elem.set('accessIPv4')
elem.set('accessIPv6')
elem.set('status')
elem.set('progress')
def _create_image_node(self, image_dict):
image_elem = etree.Element('image', nsmap=self.NSMAP)
image_elem.set('id', str(image_dict['id']))
for link in image_dict.get('links', []):
elem = etree.SubElement(image_elem,
'{%s}link' % xmlutil.XMLNS_ATOM)
elem.set('rel', link['rel'])
elem.set('href', link['href'])
return image_elem
# Attach image node
image = xmlutil.SubTemplateElement(elem, 'image', selector='image')
image.set('id')
xmlutil.make_links(image, 'links')
def _create_flavor_node(self, flavor_dict):
flavor_elem = etree.Element('flavor', nsmap=self.NSMAP)
flavor_elem.set('id', str(flavor_dict['id']))
for link in flavor_dict.get('links', []):
elem = etree.SubElement(flavor_elem,
'{%s}link' % xmlutil.XMLNS_ATOM)
elem.set('rel', link['rel'])
elem.set('href', link['href'])
return flavor_elem
# Attach flavor node
flavor = xmlutil.SubTemplateElement(elem, 'flavor', selector='flavor')
flavor.set('id')
xmlutil.make_links(flavor, 'links')
def _create_addresses_node(self, addresses_dict):
addresses_elem = etree.Element('addresses', nsmap=self.NSMAP)
self.addresses_serializer.populate_addresses_node(addresses_elem,
addresses_dict)
return addresses_elem
# Attach metadata node
elem.append(common.MetadataTemplate())
def _populate_server(self, server_elem, server_dict, detailed=False):
"""Populate a server xml element from a dict."""
# Attach addresses node
elem.append(ips.AddressesTemplate())
server_elem.set('name', server_dict['name'])
server_elem.set('id', str(server_dict['id']))
if detailed:
server_elem.set('uuid', str(server_dict['uuid']))
server_elem.set('userId', str(server_dict['user_id']))
server_elem.set('tenantId', str(server_dict['tenant_id']))
server_elem.set('updated', str(server_dict['updated']))
server_elem.set('created', str(server_dict['created']))
server_elem.set('hostId', str(server_dict['hostId']))
server_elem.set('accessIPv4', str(server_dict['accessIPv4']))
server_elem.set('accessIPv6', str(server_dict['accessIPv6']))
server_elem.set('status', str(server_dict['status']))
if 'progress' in server_dict:
server_elem.set('progress', str(server_dict['progress']))
image_elem = self._create_image_node(server_dict['image'])
server_elem.append(image_elem)
# Attach security groups node
secgrps = SecurityGroupsTemplateElement('security_groups')
elem.append(secgrps)
secgrp = xmlutil.SubTemplateElement(secgrps, 'security_group',
selector='security_groups')
secgrp.set('name')
flavor_elem = self._create_flavor_node(server_dict['flavor'])
server_elem.append(flavor_elem)
xmlutil.make_links(elem, 'links')
meta_elem = self._create_metadata_node(
server_dict.get('metadata', {}))
server_elem.append(meta_elem)
addresses_elem = self._create_addresses_node(
server_dict.get('addresses', {}))
server_elem.append(addresses_elem)
groups = server_dict.get('security_groups')
if groups:
groups_elem = etree.SubElement(server_elem, 'security_groups')
for group in groups:
group_elem = etree.SubElement(groups_elem,
'security_group')
group_elem.set('name', group['name'])
server_nsmap = {None: xmlutil.XMLNS_V11, 'atom': xmlutil.XMLNS_ATOM}
self._populate_links(server_elem, server_dict.get('links', []))
def _populate_links(self, parent, links):
for link in links:
elem = etree.SubElement(parent,
'{%s}link' % xmlutil.XMLNS_ATOM)
elem.set('rel', link['rel'])
elem.set('href', link['href'])
class ServerTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('server', selector='server')
make_server(root, detailed=True)
return xmlutil.MasterTemplate(root, 1, nsmap=server_nsmap)
def index(self, servers_dict):
servers = etree.Element('servers', nsmap=self.NSMAP)
for server_dict in servers_dict['servers']:
server = etree.SubElement(servers, 'server')
self._populate_server(server, server_dict, False)
self._populate_links(servers, servers_dict.get('servers_links', []))
return self._to_xml(servers)
class MinimalServersTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('servers')
elem = xmlutil.SubTemplateElement(root, 'server', selector='servers')
make_server(elem)
xmlutil.make_links(root, 'servers_links')
return xmlutil.MasterTemplate(root, 1, nsmap=server_nsmap)
def detail(self, servers_dict):
servers = etree.Element('servers', nsmap=self.NSMAP)
for server_dict in servers_dict['servers']:
server = etree.SubElement(servers, 'server')
self._populate_server(server, server_dict, True)
return self._to_xml(servers)
def show(self, server_dict):
server = etree.Element('server', nsmap=self.NSMAP)
self._populate_server(server, server_dict['server'], True)
return self._to_xml(server)
class ServersTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('servers')
elem = xmlutil.SubTemplateElement(root, 'server', selector='servers')
make_server(elem, detailed=True)
return xmlutil.MasterTemplate(root, 1, nsmap=server_nsmap)
def create(self, server_dict):
server = etree.Element('server', nsmap=self.NSMAP)
self._populate_server(server, server_dict['server'], True)
server.set('adminPass', server_dict['server']['adminPass'])
return self._to_xml(server)
def action(self, server_dict):
#NOTE(bcwaldon): We need a way to serialize actions individually. This
# assumes all actions return a server entity
return self.create(server_dict)
class ServerAdminPassTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('server')
root.set('adminPass')
return xmlutil.SlaveTemplate(root, 1, nsmap=server_nsmap)
def update(self, server_dict):
server = etree.Element('server', nsmap=self.NSMAP)
self._populate_server(server, server_dict['server'], True)
return self._to_xml(server)
class ServerXMLSerializer(xmlutil.XMLTemplateSerializer):
def index(self):
return MinimalServersTemplate()
def detail(self):
return ServersTemplate()
def show(self):
return ServerTemplate()
def update(self):
return ServerTemplate()
def create(self):
master = ServerTemplate()
master.attach(ServerAdminPassTemplate())
return master
def action(self):
return self.create()
class ServerXMLDeserializer(wsgi.MetadataXMLDeserializer):

View File

@ -20,6 +20,7 @@ from nova import flags
from nova import log as logging
from nova.api.openstack import common
from nova.api.openstack import wsgi
from nova.api.openstack import xmlutil
from nova.auth import manager
@ -97,15 +98,40 @@ class Controller(object):
return dict(user=_translate_keys(self.manager.get_user(id)))
def create_resource():
metadata = {
"attributes": {
"user": ["id", "name", "access", "secret", "admin"],
},
}
def make_user(elem):
elem.set('id')
elem.set('name')
elem.set('access')
elem.set('secret')
elem.set('admin')
class UserTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('user', selector='user')
make_user(root)
return xmlutil.MasterTemplate(root, 1)
class UsersTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('users')
elem = xmlutil.SubTemplateElement(root, 'user', selector='users')
make_user(elem)
return xmlutil.MasterTemplate(root, 1)
class UserXMLSerializer(xmlutil.XMLTemplateSerializer):
def index(self):
return UsersTemplate()
def default(self):
return UserTemplate()
def create_resource():
body_serializers = {
'application/xml': wsgi.XMLDictSerializer(metadata=metadata),
'application/xml': UserXMLSerializer(),
}
serializer = wsgi.ResponseSerializer(body_serializers)

View File

@ -449,7 +449,8 @@ class ResponseSerializer(object):
self.headers_serializer = headers_serializer or \
ResponseHeadersSerializer()
def serialize(self, response_data, content_type, action='default'):
def serialize(self, request, response_data, content_type,
action='default'):
"""Serialize a dict into a string and wrap in a wsgi.Request object.
:param response_data: dict produced by the Controller
@ -458,17 +459,28 @@ class ResponseSerializer(object):
"""
response = webob.Response()
self.serialize_headers(response, response_data, action)
self.serialize_body(response, response_data, content_type, action)
self.serialize_body(request, response, response_data, content_type,
action)
return response
def serialize_headers(self, response, data, action):
self.headers_serializer.serialize(response, data, action)
def serialize_body(self, response, data, content_type, action):
def serialize_body(self, request, response, data, content_type, action):
response.headers['Content-Type'] = content_type
if data is not None:
serializer = self.get_body_serializer(content_type)
response.body = serializer.serialize(data, action)
lazy_serialize = request.environ.get('nova.lazy_serialize', False)
if lazy_serialize:
response.body = utils.dumps(data)
request.environ['nova.serializer'] = serializer
request.environ['nova.action'] = action
if (hasattr(serializer, 'get_template') and
'nova.template' not in request.environ):
template = serializer.get_template(action)
request.environ['nova.template'] = template
else:
response.body = serializer.serialize(data, action)
def get_body_serializer(self, content_type):
try:
@ -478,6 +490,32 @@ class ResponseSerializer(object):
raise exception.InvalidContentType(content_type=content_type)
class LazySerializationMiddleware(wsgi.Middleware):
"""Lazy serialization middleware."""
@webob.dec.wsgify(RequestClass=Request)
def __call__(self, req):
# Request lazy serialization
req.environ['nova.lazy_serialize'] = True
response = req.get_response(self.application)
# See if there's a serializer...
serializer = req.environ.get('nova.serializer')
if serializer is None:
return response
# OK, build up the arguments for the serialize() method
kwargs = dict(action=req.environ['nova.action'])
if 'nova.template' in req.environ:
kwargs['template'] = req.environ['nova.template']
# Re-serialize the body
response.body = serializer.serialize(utils.loads(response.body),
**kwargs)
return response
class Resource(wsgi.Application):
"""WSGI app that handles (de)serialization and controller dispatch.
@ -531,7 +569,8 @@ class Resource(wsgi.Application):
action_result = faults.Fault(ex)
if type(action_result) is dict or action_result is None:
response = self.serializer.serialize(action_result,
response = self.serializer.serialize(request,
action_result,
accept,
action=action)
else:

View File

@ -20,6 +20,7 @@ import os.path
from lxml import etree
from nova import utils
from nova.api.openstack import wsgi
XMLNS_V10 = 'http://docs.rackspacecloud.com/servers/api/v1.0'
@ -38,3 +39,857 @@ def validate_schema(xml, schema_name):
schema_doc = etree.parse(schema_path)
relaxng = etree.RelaxNG(schema_doc)
relaxng.assertValid(xml)
class Selector(object):
"""Selects datum to operate on from an object."""
def __init__(self, *chain):
"""Initialize the selector.
Each argument is a subsequent index into the object.
"""
self.chain = chain
def __repr__(self):
"""Return a representation of the selector."""
return "Selector" + repr(self.chain)
def __call__(self, obj, do_raise=False):
"""Select a datum to operate on.
Selects the relevant datum within the object.
:param obj: The object from which to select the object.
:param do_raise: If False (the default), return None if the
indexed datum does not exist. Otherwise,
raise a KeyError.
"""
# Walk the selector list
for elem in self.chain:
# If it's callable, call it
if callable(elem):
obj = elem(obj)
else:
# Use indexing
try:
obj = obj[elem]
except (KeyError, IndexError):
# No sense going any further
if do_raise:
# Convert to a KeyError, for consistency
raise KeyError(elem)
return None
# Return the finally-selected object
return obj
def get_items(obj):
"""Get items in obj."""
return list(obj.items())
class EmptyStringSelector(Selector):
"""Returns the empty string if Selector would return None."""
def __call__(self, obj, do_raise=False):
"""Returns empty string if the selected value does not exist."""
try:
return super(EmptyStringSelector, self).__call__(obj, True)
except KeyError:
return ""
class ConstantSelector(object):
"""Returns a constant."""
def __init__(self, value):
"""Initialize the selector.
:param value: The value to return.
"""
self.value = value
def __repr__(self):
"""Return a representation of the selector."""
return repr(self.value)
def __call__(self, _obj, _do_raise=False):
"""Select a datum to operate on.
Returns a constant value. Compatible with
Selector.__call__().
"""
return self.value
class TemplateElement(object):
"""Represent an element in the template."""
def __init__(self, tag, attrib=None, selector=None, **extra):
"""Initialize an element.
Initializes an element in the template. Keyword arguments
specify attributes to be set on the element; values must be
callables. See TemplateElement.set() for more information.
:param tag: The name of the tag to create.
:param attrib: An optional dictionary of element attributes.
:param selector: An optional callable taking an object and
optional boolean do_raise indicator and
returning the object bound to the element.
"""
# Convert selector into a Selector
if selector is None:
selector = Selector()
elif not callable(selector):
selector = Selector(selector)
self.tag = tag
self.selector = selector
self.attrib = {}
self._text = None
self._children = []
self._childmap = {}
# Run the incoming attributes through set() so that they
# become selectorized
if not attrib:
attrib = {}
attrib.update(extra)
for k, v in attrib.items():
self.set(k, v)
def __repr__(self):
"""Return a representation of the template element."""
return ('<%s.%s %r at %#x>' %
(self.__class__.__module__, self.__class__.__name__,
self.tag, id(self)))
def __len__(self):
"""Return the number of child elements."""
return len(self._children)
def __contains__(self, key):
"""Determine whether a child node named by key exists."""
return key in self._childmap
def __getitem__(self, idx):
"""Retrieve a child node by index or name."""
if isinstance(idx, basestring):
# Allow access by node name
return self._childmap[idx]
else:
return self._children[idx]
def append(self, elem):
"""Append a child to the element."""
# Unwrap templates...
elem = elem.unwrap()
# Avoid duplications
if elem.tag in self._childmap:
raise KeyError(elem.tag)
self._children.append(elem)
self._childmap[elem.tag] = elem
def extend(self, elems):
"""Append children to the element."""
# Pre-evaluate the elements
elemmap = {}
elemlist = []
for elem in elems:
# Unwrap templates...
elem = elem.unwrap()
# Avoid duplications
if elem.tag in self._childmap or elem.tag in elemmap:
raise KeyError(elem.tag)
elemmap[elem.tag] = elem
elemlist.append(elem)
# Update the children
self._children.extend(elemlist)
self._childmap.update(elemmap)
def insert(self, idx, elem):
"""Insert a child element at the given index."""
# Unwrap templates...
elem = elem.unwrap()
# Avoid duplications
if elem.tag in self._childmap:
raise KeyError(elem.tag)
self._children.insert(idx, elem)
self._childmap[elem.tag] = elem
def remove(self, elem):
"""Remove a child element."""
# Unwrap templates...
elem = elem.unwrap()
# Check if element exists
if elem.tag not in self._childmap or self._childmap[elem.tag] != elem:
raise ValueError(_('element is not a child'))
self._children.remove(elem)
del self._childmap[elem.tag]
def get(self, key):
"""Get an attribute.
Returns a callable which performs datum selection.
:param key: The name of the attribute to get.
"""
return self.attrib[key]
def set(self, key, value=None):
"""Set an attribute.
:param key: The name of the attribute to set.
:param value: A callable taking an object and optional boolean
do_raise indicator and returning the datum bound
to the attribute. If None, a Selector() will be
constructed from the key. If a string, a
Selector() will be constructed from the string.
"""
# Convert value to a selector
if value is None:
value = Selector(key)
elif not callable(value):
value = Selector(value)
self.attrib[key] = value
def keys(self):
"""Return the attribute names."""
return self.attrib.keys()
def items(self):
"""Return the attribute names and values."""
return self.attrib.items()
def unwrap(self):
"""Unwraps a template to return a template element."""
# We are a template element
return self
def wrap(self):
"""Wraps a template element to return a template."""
# Wrap in a basic Template
return Template(self)
def apply(self, elem, obj):
"""Apply text and attributes to an etree.Element.
Applies the text and attribute instructions in the template
element to an etree.Element instance.
:param elem: An etree.Element instance.
:param obj: The base object associated with this template
element.
"""
# Start with the text...
if self.text is not None:
elem.text = unicode(self.text(obj))
# Now set up all the attributes...
for key, value in self.attrib.items():
try:
elem.set(key, unicode(value(obj, True)))
except KeyError:
# Attribute has no value, so don't include it
pass
def _render(self, parent, datum, patches, nsmap):
"""Internal rendering.
Renders the template node into an etree.Element object.
Returns the etree.Element object.
:param parent: The parent etree.Element instance.
:param datum: The datum associated with this template element.
:param patches: A list of other template elements that must
also be applied.
:param nsmap: An optional namespace dictionary to be
associated with the etree.Element instance.
"""
# Allocate a node
if callable(self.tag):
tagname = self.tag(datum)
else:
tagname = self.tag
elem = etree.Element(tagname, nsmap=nsmap)
# If we have a parent, append the node to the parent
if parent is not None:
parent.append(elem)
# If the datum is None, do nothing else
if datum is None:
return elem
# Apply this template element to the element
self.apply(elem, datum)
# Additionally, apply the patches
for patch in patches:
patch.apply(elem, datum)
# We have fully rendered the element; return it
return elem
def render(self, parent, obj, patches=[], nsmap=None):
"""Render an object.
Renders an object against this template node. Returns a list
of two-item tuples, where the first item is an etree.Element
instance and the second item is the datum associated with that
instance.
:param parent: The parent for the etree.Element instances.
:param obj: The object to render this template element
against.
:param patches: A list of other template elements to apply
when rendering this template element.
:param nsmap: An optional namespace dictionary to attach to
the etree.Element instances.
"""
# First, get the datum we're rendering
data = None if obj is None else self.selector(obj)
# Check if we should render at all
if not self.will_render(data):
return []
elif data is None:
return [(self._render(parent, None, patches, nsmap), None)]
# Make the data into a list if it isn't already
if not isinstance(data, list):
data = [data]
elif parent is None:
raise ValueError(_('root element selecting a list'))
# Render all the elements
elems = []
for datum in data:
elems.append((self._render(parent, datum, patches, nsmap), datum))
# Return all the elements rendered, as well as the
# corresponding datum for the next step down the tree
return elems
def will_render(self, datum):
"""Hook method.
An overridable hook method to determine whether this template
element will be rendered at all. By default, returns False
(inhibiting rendering) if the datum is None.
:param datum: The datum associated with this template element.
"""
# Don't render if datum is None
return datum is not None
def _text_get(self):
"""Template element text.
Either None or a callable taking an object and optional
boolean do_raise indicator and returning the datum bound to
the text of the template element.
"""
return self._text
def _text_set(self, value):
# Convert value to a selector
if value is not None and not callable(value):
value = Selector(value)
self._text = value
def _text_del(self):
self._text = None
text = property(_text_get, _text_set, _text_del)
def tree(self):
"""Return string representation of the template tree.
Returns a representation of the template rooted at this
element as a string, suitable for inclusion in debug logs.
"""
# Build the inner contents of the tag...
contents = [self.tag, '!selector=%r' % self.selector]
# Add the text...
if self.text is not None:
contents.append('!text=%r' % self.text)
# Add all the other attributes
for key, value in self.attrib.items():
contents.append('%s=%r' % (key, value))
# If there are no children, return it as a closed tag
if len(self) == 0:
return '<%s/>' % ' '.join(contents)
# OK, recurse to our children
children = [c.tree() for c in self]
# Return the result
return ('<%s>%s</%s>' %
(' '.join(contents), ''.join(children), self.tag))
def SubTemplateElement(parent, tag, attrib=None, selector=None, **extra):
"""Create a template element as a child of another.
Corresponds to the etree.SubElement interface. Parameters are as
for TemplateElement, with the addition of the parent.
"""
# Convert attributes
attrib = attrib or {}
attrib.update(extra)
# Get a TemplateElement
elem = TemplateElement(tag, attrib=attrib, selector=selector)
# Append the parent safely
if parent is not None:
parent.append(elem)
return elem
class Template(object):
"""Represent a template."""
def __init__(self, root, nsmap=None):
"""Initialize a template.
:param root: The root element of the template.
:param nsmap: An optional namespace dictionary to be
associated with the root element of the
template.
"""
self.root = root.unwrap() if root is not None else None
self.nsmap = nsmap or {}
def _serialize(self, parent, obj, siblings, nsmap=None):
"""Internal serialization.
Recursive routine to build a tree of etree.Element instances
from an object based on the template. Returns the first
etree.Element instance rendered, or None.
:param parent: The parent etree.Element instance. Can be
None.
:param obj: The object to render.
:param siblings: The TemplateElement instances against which
to render the object.
:param nsmap: An optional namespace dictionary to be
associated with the etree.Element instance
rendered.
"""
# First step, render the element
elems = siblings[0].render(parent, obj, siblings[1:], nsmap)
# Now, recurse to all child elements
seen = set()
for idx, sibling in enumerate(siblings):
for child in sibling:
# Have we handled this child already?
if child.tag in seen:
continue
seen.add(child.tag)
# Determine the child's siblings
nieces = [child]
for sib in siblings[idx + 1:]:
if child.tag in sib:
nieces.append(sib[child.tag])
# Now we recurse for every data element
for elem, datum in elems:
self._serialize(elem, datum, nieces)
# Return the first element; at the top level, this will be the
# root element
if elems:
return elems[0][0]
def serialize(self, obj, *args, **kwargs):
"""Serialize an object.
Serializes an object against the template. Returns a string
with the serialized XML. Positional and keyword arguments are
passed to etree.tostring().
:param obj: The object to serialize.
"""
elem = self.make_tree(obj)
if elem is None:
return ''
# Serialize it into XML
return etree.tostring(elem, *args, **kwargs)
def make_tree(self, obj):
"""Create a tree.
Serializes an object against the template. Returns an Element
node with appropriate children.
:param obj: The object to serialize.
"""
# If the template is empty, return the empty string
if self.root is None:
return None
# Get the siblings and nsmap of the root element
siblings = self._siblings()
nsmap = self._nsmap()
# Form the element tree
return self._serialize(None, obj, siblings, nsmap)
def _siblings(self):
"""Hook method for computing root siblings.
An overridable hook method to return the siblings of the root
element. By default, this is the root element itself.
"""
return [self.root]
def _nsmap(self):
"""Hook method for computing the namespace dictionary.
An overridable hook method to return the namespace dictionary.
"""
return self.nsmap.copy()
def unwrap(self):
"""Unwraps a template to return a template element."""
# Return the root element
return self.root
def wrap(self):
"""Wraps a template element to return a template."""
# We are a template
return self
def apply(self, master):
"""Hook method for determining slave applicability.
An overridable hook method used to determine if this template
is applicable as a slave to a given master template.
:param master: The master template to test.
"""
return True
def tree(self):
"""Return string representation of the template tree.
Returns a representation of the template as a string, suitable
for inclusion in debug logs.
"""
return "%r: %s" % (self, self.root.tree())
class MasterTemplate(Template):
"""Represent a master template.
Master templates are versioned derivatives of templates that
additionally allow slave templates to be attached. Slave
templates allow modification of the serialized result without
directly changing the master.
"""
def __init__(self, root, version, nsmap=None):
"""Initialize a master template.
:param root: The root element of the template.
:param version: The version number of the template.
:param nsmap: An optional namespace dictionary to be
associated with the root element of the
template.
"""
super(MasterTemplate, self).__init__(root, nsmap)
self.version = version
self.slaves = []
def __repr__(self):
"""Return string representation of the template."""
return ("<%s.%s object version %s at %#x>" %
(self.__class__.__module__, self.__class__.__name__,
self.version, id(self)))
def _siblings(self):
"""Hook method for computing root siblings.
An overridable hook method to return the siblings of the root
element. This is the root element plus the root elements of
all the slave templates.
"""
return [self.root] + [slave.root for slave in self.slaves]
def _nsmap(self):
"""Hook method for computing the namespace dictionary.
An overridable hook method to return the namespace dictionary.
The namespace dictionary is computed by taking the master
template's namespace dictionary and updating it from all the
slave templates.
"""
nsmap = self.nsmap.copy()
for slave in self.slaves:
nsmap.update(slave._nsmap())
return nsmap
def attach(self, *slaves):
"""Attach one or more slave templates.
Attaches one or more slave templates to the master template.
Slave templates must have a root element with the same tag as
the master template. The slave template's apply() method will
be called to determine if the slave should be applied to this
master; if it returns False, that slave will be skipped.
(This allows filtering of slaves based on the version of the
master template.)
"""
slave_list = []
for slave in slaves:
slave = slave.wrap()
# Make sure we have a tree match
if slave.root.tag != self.root.tag:
slavetag = slave.root.tag
mastertag = self.root.tag
msg = _("Template tree mismatch; adding slave %(slavetag)s "
"to master %(mastertag)s") % locals()
raise ValueError(msg)
# Make sure slave applies to this template
if not slave.apply(self):
continue
slave_list.append(slave)
# Add the slaves
self.slaves.extend(slave_list)
def copy(self):
"""Return a copy of this master template."""
# Return a copy of the MasterTemplate
tmp = self.__class__(self.root, self.version, self.nsmap)
tmp.slaves = self.slaves[:]
return tmp
class SlaveTemplate(Template):
"""Represent a slave template.
Slave templates are versioned derivatives of templates. Each
slave has a minimum version and optional maximum version of the
master template to which they can be attached.
"""
def __init__(self, root, min_vers, max_vers=None, nsmap=None):
"""Initialize a slave template.
:param root: The root element of the template.
:param min_vers: The minimum permissible version of the master
template for this slave template to apply.
:param max_vers: An optional upper bound for the master
template version.
:param nsmap: An optional namespace dictionary to be
associated with the root element of the
template.
"""
super(SlaveTemplate, self).__init__(root, nsmap)
self.min_vers = min_vers
self.max_vers = max_vers
def __repr__(self):
"""Return string representation of the template."""
return ("<%s.%s object versions %s-%s at %#x>" %
(self.__class__.__module__, self.__class__.__name__,
self.min_vers, self.max_vers, id(self)))
def apply(self, master):
"""Hook method for determining slave applicability.
An overridable hook method used to determine if this template
is applicable as a slave to a given master template. This
version requires the master template to have a version number
between min_vers and max_vers.
:param master: The master template to test.
"""
# Does the master meet our minimum version requirement?
if master.version < self.min_vers:
return False
# How about our maximum version requirement?
if self.max_vers is not None and master.version > self.max_vers:
return False
return True
class TemplateBuilder(object):
"""Template builder.
This class exists to allow templates to be lazily built without
having to build them each time they are needed. It must be
subclassed, and the subclass must implement the construct()
method, which must return a Template (or subclass) instance. The
constructor will always return the template returned by
construct(), or, if it has a copy() method, a copy of that
template.
"""
_tmpl = None
def __new__(cls, copy=True):
"""Construct and return a template.
:param copy: If True (the default), a copy of the template
will be constructed and returned, if possible.
"""
# Do we need to construct the template?
if cls._tmpl is None:
tmp = super(TemplateBuilder, cls).__new__(cls)
# Construct the template
cls._tmpl = tmp.construct()
# If the template has a copy attribute, return the result of
# calling it
if copy and hasattr(cls._tmpl, 'copy'):
return cls._tmpl.copy()
# Return the template
return cls._tmpl
def construct(self):
"""Construct a template.
Called to construct a template instance, which it must return.
Only called once.
"""
raise NotImplementedError(_("subclasses must implement construct()!"))
class XMLTemplateSerializer(wsgi.ActionDispatcher):
"""Template-based XML serializer.
Data serializer that uses templates to perform its serialization.
"""
def get_template(self, action='default'):
"""Retrieve the template to use for serialization."""
return self.dispatch(action=action)
def serialize(self, data, action='default', template=None):
"""Serialize data.
:param data: The data to serialize.
:param action: The action, for identifying the template to
use. If no template is provided,
get_template() will be called with this action
to retrieve the template.
:param template: The template to use in serialization.
"""
# No template provided, look one up
if template is None:
template = self.get_template(action)
# Still couldn't find a template; try the base
# XMLDictSerializer
if template is None:
serial = wsgi.XMLDictSerializer()
return serial.serialize(data, action=action)
# Serialize the template
return template.serialize(data, encoding='UTF-8',
xml_declaration=True)
def default(self):
"""Retrieve the default template to use."""
return None
def make_links(parent, selector=None):
"""
Attach an Atom <links> element to the parent.
"""
elem = SubTemplateElement(parent, '{%s}link' % XMLNS_ATOM,
selector=selector)
elem.set('rel')
elem.set('type')
elem.set('href')
# Just for completeness...
return elem

View File

@ -27,6 +27,7 @@ from nova.scheduler import api
from nova.api.openstack import common
from nova.api.openstack import servers
from nova.api.openstack import xmlutil
from nova.api.openstack import wsgi
@ -143,16 +144,70 @@ class Controller(object):
return cooked
def create_resource():
metadata = {
"attributes": {
"zone": ["id", "api_url", "name", "capabilities"],
},
}
class CapabilitySelector(object):
def __call__(self, obj, do_raise=False):
return [(k, v) for k, v in obj.items()
if k not in ('id', 'api_url', 'name', 'capabilities')]
def make_zone(elem):
#elem = xmlutil.SubTemplateElement(parent, 'zone', selector=selector)
elem.set('id')
elem.set('api_url')
elem.set('name')
elem.set('capabilities')
cap = xmlutil.SubTemplateElement(elem, xmlutil.Selector(0),
selector=CapabilitySelector())
cap.text = 1
zone_nsmap = {None: wsgi.XMLNS_V10}
class ZoneTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('zone', selector='zone')
make_zone(root)
return xmlutil.MasterTemplate(root, 1, nsmap=zone_nsmap)
class ZonesTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('zones')
elem = xmlutil.SubTemplateElement(root, 'zone', selector='zones')
make_zone(elem)
return xmlutil.MasterTemplate(root, 1, nsmap=zone_nsmap)
class WeightsTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('weights')
weight = xmlutil.SubTemplateElement(root, 'weight', selector='weights')
blob = xmlutil.SubTemplateElement(weight, 'blob')
blob.text = 'blob'
inner_weight = xmlutil.SubTemplateElement(weight, 'weight')
inner_weight.text = 'weight'
return xmlutil.MasterTemplate(root, 1, nsmap=zone_nsmap)
class ZonesXMLSerializer(xmlutil.XMLTemplateSerializer):
def index(self):
return ZonesTemplate()
def detail(self):
return ZonesTemplate()
def select(self):
return WeightsTemplate()
def default(self):
return ZoneTemplate()
def create_resource():
body_serializers = {
'application/xml': wsgi.XMLDictSerializer(xmlns=wsgi.XMLNS_V11,
metadata=metadata),
'application/xml': ZonesXMLSerializer(),
}
serializer = wsgi.ResponseSerializer(body_serializers)

View File

@ -64,12 +64,10 @@ class Foxinsocks(object):
def get_request_extensions(self):
request_exts = []
def _goose_handler(req, res):
def _goose_handler(req, res, body):
#NOTE: This only handles JSON responses.
# You can use content type header to test for XML.
data = json.loads(res.body)
data['flavor']['googoose'] = req.GET.get('chewing')
res.body = json.dumps(data)
body['flavor']['googoose'] = req.GET.get('chewing')
return res
req_ext1 = extensions.RequestExtension('GET',
@ -77,12 +75,10 @@ class Foxinsocks(object):
_goose_handler)
request_exts.append(req_ext1)
def _bands_handler(req, res):
def _bands_handler(req, res, body):
#NOTE: This only handles JSON responses.
# You can use content type header to test for XML.
data = json.loads(res.body)
data['big_bands'] = 'Pig Bands!'
res.body = json.dumps(data)
body['big_bands'] = 'Pig Bands!'
return res
req_ext2 = extensions.RequestExtension('GET',

View File

@ -29,9 +29,10 @@ from nova.api import openstack
from nova.api import auth as api_auth
from nova.api.openstack import auth
from nova.api.openstack import extensions
from nova.api.openstack import versions
from nova.api.openstack import limits
from nova.api.openstack import urlmap
from nova.api.openstack import versions
from nova.api.openstack import wsgi as os_wsgi
from nova.auth.manager import User, Project
import nova.image.fake
from nova.tests.glance import stubs as glance_stubs
@ -65,7 +66,8 @@ def fake_wsgi(self, req):
return self.application
def wsgi_app(inner_app11=None, fake_auth=True, fake_auth_context=None):
def wsgi_app(inner_app11=None, fake_auth=True, fake_auth_context=None,
serialization=os_wsgi.LazySerializationMiddleware):
if not inner_app11:
inner_app11 = openstack.APIRouter()
@ -76,11 +78,13 @@ def wsgi_app(inner_app11=None, fake_auth=True, fake_auth_context=None):
ctxt = context.RequestContext('fake', 'fake', auth_token=True)
api11 = openstack.FaultWrapper(api_auth.InjectContext(ctxt,
limits.RateLimitingMiddleware(
extensions.ExtensionMiddleware(inner_app11))))
serialization(
extensions.ExtensionMiddleware(inner_app11)))))
else:
api11 = openstack.FaultWrapper(auth.AuthMiddleware(
limits.RateLimitingMiddleware(
extensions.ExtensionMiddleware(inner_app11))))
serialization(
extensions.ExtensionMiddleware(inner_app11)))))
Auth = auth
mapper = urlmap.URLMap()
mapper['/v1.1'] = api11

View File

@ -16,6 +16,7 @@
import json
from lxml import etree
import webob
from nova import test
@ -59,10 +60,21 @@ class AccountsTest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(res_dict['account']['id'], 'test1')
self.assertEqual(res_dict['account']['name'], 'test1')
self.assertEqual(res_dict['account']['manager'], 'id1')
def test_get_account_xml(self):
req = webob.Request.blank('/v1.1/fake/accounts/test1.xml')
res = req.get_response(fakes.wsgi_app())
res_tree = etree.fromstring(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual('account', res_tree.tag)
self.assertEqual('test1', res_tree.get('id'))
self.assertEqual('test1', res_tree.get('name'))
self.assertEqual('id1', res_tree.get('manager'))
def test_account_delete(self):
req = webob.Request.blank('/v1.1/fake/accounts/test1')
@ -91,6 +103,27 @@ class AccountsTest(test.TestCase):
fakes.FakeAuthManager.projects)
self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 3)
def test_account_create_xml(self):
body = dict(account=dict(description='test account',
manager='id1'))
req = webob.Request.blank('/v1.1/fake/accounts/newacct.xml')
req.headers["Content-Type"] = "application/json"
req.method = 'PUT'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_tree = etree.fromstring(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(res_tree.tag, 'account')
self.assertEqual(res_tree.get('id'), 'newacct')
self.assertEqual(res_tree.get('name'), 'newacct')
self.assertEqual(res_tree.get('description'), 'test account')
self.assertEqual(res_tree.get('manager'), 'id1')
self.assertTrue('newacct' in
fakes.FakeAuthManager.projects)
self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 3)
def test_account_update(self):
body = dict(account=dict(description='test account',
manager='id2'))
@ -108,3 +141,22 @@ class AccountsTest(test.TestCase):
self.assertEqual(res_dict['account']['description'], 'test account')
self.assertEqual(res_dict['account']['manager'], 'id2')
self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 2)
def test_account_update_xml(self):
body = dict(account=dict(description='test account',
manager='id2'))
req = webob.Request.blank('/v1.1/fake/accounts/test1.xml')
req.headers["Content-Type"] = "application/json"
req.method = 'PUT'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_tree = etree.fromstring(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(res_tree.tag, 'account')
self.assertEqual(res_tree.get('id'), 'test1')
self.assertEqual(res_tree.get('name'), 'test1')
self.assertEqual(res_tree.get('description'), 'test account')
self.assertEqual(res_tree.get('manager'), 'id2')
self.assertEqual(len(fakes.FakeAuthManager.projects.values()), 2)

View File

@ -18,6 +18,8 @@
import datetime
import json
from lxml import etree
import webob
from nova.api.openstack import consoles
@ -142,6 +144,30 @@ class ConsolesTest(test.TestCase):
res_dict = json.loads(res.body)
self.assertDictMatch(res_dict, expected)
def test_show_console_xml(self):
def fake_get_console(cons_self, context, instance_id, console_id):
self.assertEqual(instance_id, 10)
self.assertEqual(console_id, 20)
pool = dict(console_type='fake_type',
public_hostname='fake_hostname')
return dict(id=console_id, password='fake_password',
port='fake_port', pool=pool)
self.stubs.Set(console.API, 'get_console', fake_get_console)
req = webob.Request.blank('/v1.1/fake/servers/10/consoles/20.xml')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
res_tree = etree.fromstring(res.body)
self.assertEqual(res_tree.tag, 'console')
self.assertEqual(res_tree.xpath('id')[0].text, '20')
self.assertEqual(res_tree.xpath('port')[0].text, 'fake_port')
self.assertEqual(res_tree.xpath('host')[0].text, 'fake_hostname')
self.assertEqual(res_tree.xpath('password')[0].text, 'fake_password')
self.assertEqual(res_tree.xpath('console_type')[0].text,
'fake_type')
def test_show_console_unknown_console(self):
def fake_get_console(cons_self, context, instance_id, console_id):
raise exception.ConsoleNotFound(console_id=console_id)
@ -188,6 +214,46 @@ class ConsolesTest(test.TestCase):
res_dict = json.loads(res.body)
self.assertDictMatch(res_dict, expected)
def test_list_consoles_xml(self):
def fake_get_consoles(cons_self, context, instance_id):
self.assertEqual(instance_id, 10)
pool1 = dict(console_type='fake_type',
public_hostname='fake_hostname')
cons1 = dict(id=10, password='fake_password',
port='fake_port', pool=pool1)
pool2 = dict(console_type='fake_type2',
public_hostname='fake_hostname2')
cons2 = dict(id=11, password='fake_password2',
port='fake_port2', pool=pool2)
return [cons1, cons2]
expected = {'consoles':
[{'console': {'id': 10, 'console_type': 'fake_type'}},
{'console': {'id': 11, 'console_type': 'fake_type2'}}]}
self.stubs.Set(console.API, 'get_consoles', fake_get_consoles)
req = webob.Request.blank('/v1.1/fake/servers/10/consoles.xml')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
res_tree = etree.fromstring(res.body)
self.assertEqual(res_tree.tag, 'consoles')
self.assertEqual(len(res_tree), 2)
self.assertEqual(res_tree[0].tag, 'console')
self.assertEqual(res_tree[1].tag, 'console')
self.assertEqual(len(res_tree[0]), 1)
self.assertEqual(res_tree[0][0].tag, 'console')
self.assertEqual(len(res_tree[1]), 1)
self.assertEqual(res_tree[1][0].tag, 'console')
self.assertEqual(res_tree[0][0].xpath('id')[0].text, '10')
self.assertEqual(res_tree[1][0].xpath('id')[0].text, '11')
self.assertEqual(res_tree[0][0].xpath('console_type')[0].text,
'fake_type')
self.assertEqual(res_tree[1][0].xpath('console_type')[0].text,
'fake_type2')
def test_delete_console(self):
def fake_get_console(cons_self, context, instance_id, console_id):
self.assertEqual(instance_id, 10)

View File

@ -22,6 +22,7 @@ from lxml import etree
from nova import context
from nova import test
from nova import wsgi as base_wsgi
from nova.api import openstack
from nova.api.openstack import extensions
from nova.api.openstack import flavors
@ -111,8 +112,9 @@ class ExtensionControllerTest(test.TestCase):
def test_list_extensions_json(self):
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/extensions")
response = request.get_response(ext_midware)
response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
# Make sure we have all the extensions.
@ -137,8 +139,9 @@ class ExtensionControllerTest(test.TestCase):
def test_get_extension_json(self):
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/extensions/FOXNSOX")
response = request.get_response(ext_midware)
response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
data = json.loads(response.body)
@ -160,9 +163,10 @@ class ExtensionControllerTest(test.TestCase):
def test_list_extensions_xml(self):
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/extensions")
request.accept = "application/xml"
response = request.get_response(ext_midware)
response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
print response.body
@ -187,9 +191,10 @@ class ExtensionControllerTest(test.TestCase):
def test_get_extension_xml(self):
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/extensions/FOXNSOX")
request.accept = "application/xml"
response = request.get_response(ext_midware)
response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
xml = response.body
print xml
@ -218,8 +223,9 @@ class ResourceExtensionTest(test.TestCase):
manager = StubExtensionManager(None)
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app, manager)
ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/blah")
response = request.get_response(ext_midware)
response = request.get_response(ser_midware)
self.assertEqual(404, response.status_int)
def test_get_resources(self):
@ -228,8 +234,9 @@ class ResourceExtensionTest(test.TestCase):
manager = StubExtensionManager(res_ext)
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app, manager)
ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/tweedles")
response = request.get_response(ext_midware)
response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
self.assertEqual(response_body, response.body)
@ -239,8 +246,9 @@ class ResourceExtensionTest(test.TestCase):
manager = StubExtensionManager(res_ext)
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app, manager)
ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/tweedles")
response = request.get_response(ext_midware)
response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
self.assertEqual(response_body, response.body)
@ -263,12 +271,15 @@ class ExtensionManagerTest(test.TestCase):
def test_get_resources(self):
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/123/foxnsocks")
response = request.get_response(ext_midware)
response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
self.assertEqual(response_body, response.body)
def test_invalid_extensions(self):
# Don't need the serialization middleware here because we're
# not testing any serialization
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
ext_mgr = ext_midware.ext_mgr
@ -287,11 +298,12 @@ class ActionExtensionTest(test.TestCase):
def _send_server_action_request(self, url, body):
app = openstack.APIRouter()
ext_midware = extensions.ExtensionMiddleware(app)
ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank(url)
request.method = 'POST'
request.content_type = 'application/json'
request.body = json.dumps(body)
response = request.get_response(ext_midware)
response = request.get_response(ser_midware)
return response
def test_extended_action(self):
@ -328,11 +340,9 @@ class RequestExtensionTest(test.TestCase):
def test_get_resources_with_stub_mgr(self):
def _req_handler(req, res):
def _req_handler(req, res, body):
# only handle JSON responses
data = json.loads(res.body)
data['flavor']['googoose'] = req.GET.get('chewing')
res.body = json.dumps(data)
body['flavor']['googoose'] = req.GET.get('chewing')
return res
req_ext = extensions.RequestExtension('GET',
@ -340,22 +350,24 @@ class RequestExtensionTest(test.TestCase):
_req_handler)
manager = StubExtensionManager(None, None, req_ext)
app = fakes.wsgi_app()
app = fakes.wsgi_app(serialization=base_wsgi.Middleware)
ext_midware = extensions.ExtensionMiddleware(app, manager)
ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/v1.1/123/flavors/1?chewing=bluegoo")
request.environ['api.version'] = '1.1'
response = request.get_response(ext_midware)
response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
response_data = json.loads(response.body)
self.assertEqual('bluegoo', response_data['flavor']['googoose'])
def test_get_resources_with_mgr(self):
app = fakes.wsgi_app()
app = fakes.wsgi_app(serialization=base_wsgi.Middleware)
ext_midware = extensions.ExtensionMiddleware(app)
ser_midware = wsgi.LazySerializationMiddleware(ext_midware)
request = webob.Request.blank("/v1.1/123/flavors/1?chewing=newblue")
request.environ['api.version'] = '1.1'
response = request.get_response(ext_midware)
response = request.get_response(ser_midware)
self.assertEqual(200, response.status_int)
response_data = json.loads(response.body)
self.assertEqual('newblue', response_data['flavor']['googoose'])

View File

@ -30,6 +30,7 @@ from xml.dom import minidom
import nova.context
from nova.api.openstack import limits
from nova.api.openstack import views
from nova.api.openstack import wsgi
from nova.api.openstack import xmlutil
from nova import test
@ -80,7 +81,8 @@ class LimitsControllerTest(BaseLimitTestSuite):
def setUp(self):
"""Run before each test."""
BaseLimitTestSuite.setUp(self)
self.controller = limits.create_resource()
self.controller = wsgi.LazySerializationMiddleware(
limits.create_resource())
self.maxDiff = None
def _get_index_request(self, accept_header="application/json"):

View File

@ -15,6 +15,7 @@
import json
from lxml import etree
import webob
from nova import test
@ -63,6 +64,19 @@ class UsersTest(test.TestCase):
self.assertEqual(res.status_int, 200)
self.assertEqual(len(res_dict['users']), 2)
def test_get_user_list_xml(self):
req = webob.Request.blank('/v1.1/fake/users.xml')
res = req.get_response(fakes.wsgi_app())
res_tree = etree.fromstring(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(res_tree.tag, 'users')
self.assertEqual(len(res_tree), 2)
self.assertEqual(res_tree[0].tag, 'user')
self.assertEqual(res_tree[0].get('id'), 'id1')
self.assertEqual(res_tree[1].tag, 'user')
self.assertEqual(res_tree[1].get('id'), 'id2')
def test_get_user_by_id(self):
req = webob.Request.blank('/v1.1/fake/users/id2')
res = req.get_response(fakes.wsgi_app())
@ -74,6 +88,18 @@ class UsersTest(test.TestCase):
self.assertEqual(res_dict['user']['admin'], True)
self.assertEqual(res.status_int, 200)
def test_get_user_by_id_xml(self):
req = webob.Request.blank('/v1.1/fake/users/id2.xml')
res = req.get_response(fakes.wsgi_app())
res_tree = etree.fromstring(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(res_tree.tag, 'user')
self.assertEqual(res_tree.get('id'), 'id2')
self.assertEqual(res_tree.get('name'), 'guy2')
self.assertEqual(res_tree.get('secret'), 'secret2')
self.assertEqual(res_tree.get('admin'), 'True')
def test_user_delete(self):
# Check the user exists
req = webob.Request.blank('/v1.1/fake/users/id1')
@ -125,6 +151,35 @@ class UsersTest(test.TestCase):
fakes.FakeAuthManager.auth_data])
self.assertEqual(len(fakes.FakeAuthManager.auth_data), 3)
def test_user_create_xml(self):
secret = utils.generate_password()
body = dict(user=dict(name='test_guy',
access='acc3',
secret=secret,
admin=True))
req = webob.Request.blank('/v1.1/fake/users.xml')
req.headers["Content-Type"] = "application/json"
req.method = 'POST'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_tree = etree.fromstring(res.body)
self.assertEqual(res.status_int, 200)
# NOTE(justinsb): This is a questionable assertion in general
# fake sets id=name, but others might not...
self.assertEqual(res_tree.tag, 'user')
self.assertEqual(res_tree.get('id'), 'test_guy')
self.assertEqual(res_tree.get('name'), 'test_guy')
self.assertEqual(res_tree.get('access'), 'acc3')
self.assertEqual(res_tree.get('secret'), secret)
self.assertEqual(res_tree.get('admin'), 'True')
self.assertTrue('test_guy' in [u.id for u in
fakes.FakeAuthManager.auth_data])
self.assertEqual(len(fakes.FakeAuthManager.auth_data), 3)
def test_user_update(self):
new_secret = utils.generate_password()
body = dict(user=dict(name='guy2',
@ -144,3 +199,24 @@ class UsersTest(test.TestCase):
self.assertEqual(res_dict['user']['access'], 'acc2')
self.assertEqual(res_dict['user']['secret'], new_secret)
self.assertEqual(res_dict['user']['admin'], True)
def test_user_update_xml(self):
new_secret = utils.generate_password()
body = dict(user=dict(name='guy2',
access='acc2',
secret=new_secret))
req = webob.Request.blank('/v1.1/fake/users/id2.xml')
req.headers["Content-Type"] = "application/json"
req.method = 'PUT'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_tree = etree.fromstring(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(res_tree.tag, 'user')
self.assertEqual(res_tree.get('id'), 'id2')
self.assertEqual(res_tree.get('name'), 'guy2')
self.assertEqual(res_tree.get('access'), 'acc2')
self.assertEqual(res_tree.get('secret'), new_secret)
self.assertEqual(res_tree.get('admin'), 'True')

View File

@ -215,20 +215,23 @@ class RequestHeadersDeserializerTest(test.TestCase):
self.assertEqual(deserializer.deserialize(req, 'update'), {'a': 'b'})
class JSONSerializer(object):
def serialize(self, data, action='default'):
return 'pew_json'
class XMLSerializer(object):
def serialize(self, data, action='default'):
return 'pew_xml'
class HeadersSerializer(object):
def serialize(self, response, data, action):
response.status_int = 404
class ResponseSerializerTest(test.TestCase):
def setUp(self):
class JSONSerializer(object):
def serialize(self, data, action='default'):
return 'pew_json'
class XMLSerializer(object):
def serialize(self, data, action='default'):
return 'pew_xml'
class HeadersSerializer(object):
def serialize(self, response, data, action):
response.status_int = 404
self.body_serializers = {
'application/json': JSONSerializer(),
'application/xml': XMLSerializer(),
@ -253,7 +256,8 @@ class ResponseSerializerTest(test.TestCase):
def test_serialize_response_json(self):
for content_type in ('application/json',
'application/vnd.openstack.compute+json'):
response = self.serializer.serialize({}, content_type)
request = wsgi.Request.blank('/')
response = self.serializer.serialize(request, {}, content_type)
self.assertEqual(response.headers['Content-Type'], content_type)
self.assertEqual(response.body, 'pew_json')
self.assertEqual(response.status_int, 404)
@ -261,21 +265,72 @@ class ResponseSerializerTest(test.TestCase):
def test_serialize_response_xml(self):
for content_type in ('application/xml',
'application/vnd.openstack.compute+xml'):
response = self.serializer.serialize({}, content_type)
request = wsgi.Request.blank('/')
response = self.serializer.serialize(request, {}, content_type)
self.assertEqual(response.headers['Content-Type'], content_type)
self.assertEqual(response.body, 'pew_xml')
self.assertEqual(response.status_int, 404)
def test_serialize_response_None(self):
response = self.serializer.serialize(None, 'application/json')
request = wsgi.Request.blank('/')
response = self.serializer.serialize(request, None, 'application/json')
self.assertEqual(response.headers['Content-Type'], 'application/json')
self.assertEqual(response.body, '')
self.assertEqual(response.status_int, 404)
def test_serialize_response_dict_to_unknown_content_type(self):
request = wsgi.Request.blank('/')
self.assertRaises(exception.InvalidContentType,
self.serializer.serialize,
{}, 'application/unknown')
request, {}, 'application/unknown')
class LazySerializationTest(test.TestCase):
def setUp(self):
self.body_serializers = {
'application/json': JSONSerializer(),
'application/xml': XMLSerializer(),
}
self.serializer = wsgi.ResponseSerializer(self.body_serializers,
HeadersSerializer())
def tearDown(self):
pass
def test_serialize_response_json(self):
for content_type in ('application/json',
'application/vnd.openstack.compute+json'):
request = wsgi.Request.blank('/')
request.environ['nova.lazy_serialize'] = True
response = self.serializer.serialize(request, {}, content_type)
self.assertEqual(response.headers['Content-Type'], content_type)
self.assertEqual(response.status_int, 404)
body = json.loads(response.body)
self.assertEqual(body, {})
serializer = request.environ['nova.serializer']
self.assertEqual(serializer.serialize(body), 'pew_json')
def test_serialize_response_xml(self):
for content_type in ('application/xml',
'application/vnd.openstack.compute+xml'):
request = wsgi.Request.blank('/')
request.environ['nova.lazy_serialize'] = True
response = self.serializer.serialize(request, {}, content_type)
self.assertEqual(response.headers['Content-Type'], content_type)
self.assertEqual(response.status_int, 404)
body = json.loads(response.body)
self.assertEqual(body, {})
serializer = request.environ['nova.serializer']
self.assertEqual(serializer.serialize(body), 'pew_xml')
def test_serialize_response_None(self):
request = wsgi.Request.blank('/')
request.environ['nova.lazy_serialize'] = True
response = self.serializer.serialize(request, None, 'application/json')
self.assertEqual(response.headers['Content-Type'], 'application/json')
self.assertEqual(response.status_int, 404)
self.assertEqual(response.body, '')
class RequestDeserializerTest(test.TestCase):

View File

@ -0,0 +1,763 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from lxml import etree
from nova import test
from nova.api.openstack import xmlutil
class SelectorTest(test.TestCase):
obj_for_test = {
'test': {
'name': 'test',
'values': [1, 2, 3],
'attrs': {
'foo': 1,
'bar': 2,
'baz': 3,
},
},
}
def test_empty_selector(self):
sel = xmlutil.Selector()
self.assertEqual(len(sel.chain), 0)
self.assertEqual(sel(self.obj_for_test), self.obj_for_test)
def test_dict_selector(self):
sel = xmlutil.Selector('test')
self.assertEqual(len(sel.chain), 1)
self.assertEqual(sel.chain[0], 'test')
self.assertEqual(sel(self.obj_for_test),
self.obj_for_test['test'])
def test_datum_selector(self):
sel = xmlutil.Selector('test', 'name')
self.assertEqual(len(sel.chain), 2)
self.assertEqual(sel.chain[0], 'test')
self.assertEqual(sel.chain[1], 'name')
self.assertEqual(sel(self.obj_for_test), 'test')
def test_list_selector(self):
sel = xmlutil.Selector('test', 'values', 0)
self.assertEqual(len(sel.chain), 3)
self.assertEqual(sel.chain[0], 'test')
self.assertEqual(sel.chain[1], 'values')
self.assertEqual(sel.chain[2], 0)
self.assertEqual(sel(self.obj_for_test), 1)
def test_items_selector(self):
sel = xmlutil.Selector('test', 'attrs', xmlutil.get_items)
self.assertEqual(len(sel.chain), 3)
self.assertEqual(sel.chain[2], xmlutil.get_items)
for key, val in sel(self.obj_for_test):
self.assertEqual(self.obj_for_test['test']['attrs'][key], val)
def test_missing_key_selector(self):
sel = xmlutil.Selector('test2', 'attrs')
self.assertEqual(sel(self.obj_for_test), None)
self.assertRaises(KeyError, sel, self.obj_for_test, True)
def test_constant_selector(self):
sel = xmlutil.ConstantSelector('Foobar')
self.assertEqual(sel.value, 'Foobar')
self.assertEqual(sel(self.obj_for_test), 'Foobar')
class TemplateElementTest(test.TestCase):
def test_element_initial_attributes(self):
# Create a template element with some attributes
elem = xmlutil.TemplateElement('test', attrib=dict(a=1, b=2, c=3),
c=4, d=5, e=6)
# Verify all the attributes are as expected
expected = dict(a=1, b=2, c=4, d=5, e=6)
for k, v in expected.items():
self.assertEqual(elem.attrib[k].chain[0], v)
def test_element_get_attributes(self):
expected = dict(a=1, b=2, c=3)
# Create a template element with some attributes
elem = xmlutil.TemplateElement('test', attrib=expected)
# Verify that get() retrieves the attributes
for k, v in expected.items():
self.assertEqual(elem.get(k).chain[0], v)
def test_element_set_attributes(self):
attrs = dict(a=None, b='foo', c=xmlutil.Selector('foo', 'bar'))
# Create a bare template element with no attributes
elem = xmlutil.TemplateElement('test')
# Set the attribute values
for k, v in attrs.items():
elem.set(k, v)
# Now verify what got set
self.assertEqual(len(elem.attrib['a'].chain), 1)
self.assertEqual(elem.attrib['a'].chain[0], 'a')
self.assertEqual(len(elem.attrib['b'].chain), 1)
self.assertEqual(elem.attrib['b'].chain[0], 'foo')
self.assertEqual(elem.attrib['c'], attrs['c'])
def test_element_attribute_keys(self):
attrs = dict(a=1, b=2, c=3, d=4)
expected = set(attrs.keys())
# Create a template element with some attributes
elem = xmlutil.TemplateElement('test', attrib=attrs)
# Now verify keys
self.assertEqual(set(elem.keys()), expected)
def test_element_attribute_items(self):
expected = dict(a=xmlutil.Selector(1),
b=xmlutil.Selector(2),
c=xmlutil.Selector(3))
keys = set(expected.keys())
# Create a template element with some attributes
elem = xmlutil.TemplateElement('test', attrib=expected)
# Now verify items
for k, v in elem.items():
self.assertEqual(expected[k], v)
keys.remove(k)
# Did we visit all keys?
self.assertEqual(len(keys), 0)
def test_element_selector_none(self):
# Create a template element with no selector
elem = xmlutil.TemplateElement('test')
self.assertEqual(len(elem.selector.chain), 0)
def test_element_selector_string(self):
# Create a template element with a string selector
elem = xmlutil.TemplateElement('test', selector='test')
self.assertEqual(len(elem.selector.chain), 1)
self.assertEqual(elem.selector.chain[0], 'test')
def test_element_selector(self):
sel = xmlutil.Selector('a', 'b')
# Create a template element with an explicit selector
elem = xmlutil.TemplateElement('test', selector=sel)
self.assertEqual(elem.selector, sel)
def test_element_append_child(self):
# Create an element
elem = xmlutil.TemplateElement('test')
# Make sure the element starts off empty
self.assertEqual(len(elem), 0)
# Create a child element
child = xmlutil.TemplateElement('child')
# Append the child to the parent
elem.append(child)
# Verify that the child was added
self.assertEqual(len(elem), 1)
self.assertEqual(elem[0], child)
self.assertEqual('child' in elem, True)
self.assertEqual(elem['child'], child)
# Ensure that multiple children of the same name are rejected
child2 = xmlutil.TemplateElement('child')
self.assertRaises(KeyError, elem.append, child2)
def test_element_extend_children(self):
# Create an element
elem = xmlutil.TemplateElement('test')
# Make sure the element starts off empty
self.assertEqual(len(elem), 0)
# Create a few children
children = [
xmlutil.TemplateElement('child1'),
xmlutil.TemplateElement('child2'),
xmlutil.TemplateElement('child3'),
]
# Extend the parent by those children
elem.extend(children)
# Verify that the children were added
self.assertEqual(len(elem), 3)
for idx in range(len(elem)):
self.assertEqual(children[idx], elem[idx])
self.assertEqual(children[idx].tag in elem, True)
self.assertEqual(elem[children[idx].tag], children[idx])
# Ensure that multiple children of the same name are rejected
children2 = [
xmlutil.TemplateElement('child4'),
xmlutil.TemplateElement('child1'),
]
self.assertRaises(KeyError, elem.extend, children2)
# Also ensure that child4 was not added
self.assertEqual(len(elem), 3)
self.assertEqual(elem[-1].tag, 'child3')
def test_element_insert_child(self):
# Create an element
elem = xmlutil.TemplateElement('test')
# Make sure the element starts off empty
self.assertEqual(len(elem), 0)
# Create a few children
children = [
xmlutil.TemplateElement('child1'),
xmlutil.TemplateElement('child2'),
xmlutil.TemplateElement('child3'),
]
# Extend the parent by those children
elem.extend(children)
# Create a child to insert
child = xmlutil.TemplateElement('child4')
# Insert it
elem.insert(1, child)
# Ensure the child was inserted in the right place
self.assertEqual(len(elem), 4)
children.insert(1, child)
for idx in range(len(elem)):
self.assertEqual(children[idx], elem[idx])
self.assertEqual(children[idx].tag in elem, True)
self.assertEqual(elem[children[idx].tag], children[idx])
# Ensure that multiple children of the same name are rejected
child2 = xmlutil.TemplateElement('child2')
self.assertRaises(KeyError, elem.insert, 2, child2)
def test_element_remove_child(self):
# Create an element
elem = xmlutil.TemplateElement('test')
# Make sure the element starts off empty
self.assertEqual(len(elem), 0)
# Create a few children
children = [
xmlutil.TemplateElement('child1'),
xmlutil.TemplateElement('child2'),
xmlutil.TemplateElement('child3'),
]
# Extend the parent by those children
elem.extend(children)
# Create a test child to remove
child = xmlutil.TemplateElement('child2')
# Try to remove it
self.assertRaises(ValueError, elem.remove, child)
# Ensure that no child was removed
self.assertEqual(len(elem), 3)
# Now remove a legitimate child
elem.remove(children[1])
# Ensure that the child was removed
self.assertEqual(len(elem), 2)
self.assertEqual(elem[0], children[0])
self.assertEqual(elem[1], children[2])
self.assertEqual('child2' in elem, False)
# Ensure the child cannot be retrieved by name
def get_key(elem, key):
return elem[key]
self.assertRaises(KeyError, get_key, elem, 'child2')
def test_element_text(self):
# Create an element
elem = xmlutil.TemplateElement('test')
# Ensure that it has no text
self.assertEqual(elem.text, None)
# Try setting it to a string and ensure it becomes a selector
elem.text = 'test'
self.assertEqual(hasattr(elem.text, 'chain'), True)
self.assertEqual(len(elem.text.chain), 1)
self.assertEqual(elem.text.chain[0], 'test')
# Try resetting the text to None
elem.text = None
self.assertEqual(elem.text, None)
# Now make up a selector and try setting the text to that
sel = xmlutil.Selector()
elem.text = sel
self.assertEqual(elem.text, sel)
# Finally, try deleting the text and see what happens
del elem.text
self.assertEqual(elem.text, None)
def test_apply_attrs(self):
# Create a template element
attrs = dict(attr1=xmlutil.ConstantSelector(1),
attr2=xmlutil.ConstantSelector(2))
tmpl_elem = xmlutil.TemplateElement('test', attrib=attrs)
# Create an etree element
elem = etree.Element('test')
# Apply the template to the element
tmpl_elem.apply(elem, None)
# Now, verify the correct attributes were set
for k, v in elem.items():
self.assertEqual(str(attrs[k].value), v)
def test_apply_text(self):
# Create a template element
tmpl_elem = xmlutil.TemplateElement('test')
tmpl_elem.text = xmlutil.ConstantSelector(1)
# Create an etree element
elem = etree.Element('test')
# Apply the template to the element
tmpl_elem.apply(elem, None)
# Now, verify the text was set
self.assertEqual(str(tmpl_elem.text.value), elem.text)
def test__render(self):
attrs = dict(attr1=xmlutil.ConstantSelector(1),
attr2=xmlutil.ConstantSelector(2),
attr3=xmlutil.ConstantSelector(3))
# Create a master template element
master_elem = xmlutil.TemplateElement('test', attr1=attrs['attr1'])
# Create a couple of slave template element
slave_elems = [
xmlutil.TemplateElement('test', attr2=attrs['attr2']),
xmlutil.TemplateElement('test', attr3=attrs['attr3']),
]
# Try the render
elem = master_elem._render(None, None, slave_elems, None)
# Verify the particulars of the render
self.assertEqual(elem.tag, 'test')
self.assertEqual(len(elem.nsmap), 0)
for k, v in elem.items():
self.assertEqual(str(attrs[k].value), v)
# Create a parent for the element to be rendered
parent = etree.Element('parent')
# Try the render again...
elem = master_elem._render(parent, None, slave_elems, dict(a='foo'))
# Verify the particulars of the render
self.assertEqual(len(parent), 1)
self.assertEqual(parent[0], elem)
self.assertEqual(len(elem.nsmap), 1)
self.assertEqual(elem.nsmap['a'], 'foo')
def test_render(self):
# Create a template element
tmpl_elem = xmlutil.TemplateElement('test')
tmpl_elem.text = xmlutil.Selector()
# Create the object we're going to render
obj = ['elem1', 'elem2', 'elem3', 'elem4']
# Try a render with no object
elems = tmpl_elem.render(None, None)
self.assertEqual(len(elems), 0)
# Try a render with one object
elems = tmpl_elem.render(None, 'foo')
self.assertEqual(len(elems), 1)
self.assertEqual(elems[0][0].text, 'foo')
self.assertEqual(elems[0][1], 'foo')
# Now, try rendering an object with multiple entries
parent = etree.Element('parent')
elems = tmpl_elem.render(parent, obj)
self.assertEqual(len(elems), 4)
# Check the results
for idx in range(len(obj)):
self.assertEqual(elems[idx][0].text, obj[idx])
self.assertEqual(elems[idx][1], obj[idx])
def test_subelement(self):
# Try the SubTemplateElement constructor
parent = xmlutil.SubTemplateElement(None, 'parent')
self.assertEqual(parent.tag, 'parent')
self.assertEqual(len(parent), 0)
# Now try it with a parent element
child = xmlutil.SubTemplateElement(parent, 'child')
self.assertEqual(child.tag, 'child')
self.assertEqual(len(parent), 1)
self.assertEqual(parent[0], child)
def test_wrap(self):
# These are strange methods, but they make things easier
elem = xmlutil.TemplateElement('test')
self.assertEqual(elem.unwrap(), elem)
self.assertEqual(elem.wrap().root, elem)
def test_dyntag(self):
obj = ['a', 'b', 'c']
# Create a template element with a dynamic tag
tmpl_elem = xmlutil.TemplateElement(xmlutil.Selector())
# Try the render
parent = etree.Element('parent')
elems = tmpl_elem.render(parent, obj)
# Verify the particulars of the render
self.assertEqual(len(elems), len(obj))
for idx in range(len(obj)):
self.assertEqual(elems[idx][0].tag, obj[idx])
class TemplateTest(test.TestCase):
def test_wrap(self):
# These are strange methods, but they make things easier
elem = xmlutil.TemplateElement('test')
tmpl = xmlutil.Template(elem)
self.assertEqual(tmpl.unwrap(), elem)
self.assertEqual(tmpl.wrap(), tmpl)
def test__siblings(self):
# Set up a basic template
elem = xmlutil.TemplateElement('test')
tmpl = xmlutil.Template(elem)
# Check that we get the right siblings
siblings = tmpl._siblings()
self.assertEqual(len(siblings), 1)
self.assertEqual(siblings[0], elem)
def test__nsmap(self):
# Set up a basic template
elem = xmlutil.TemplateElement('test')
tmpl = xmlutil.Template(elem, nsmap=dict(a="foo"))
# Check out that we get the right namespace dictionary
nsmap = tmpl._nsmap()
self.assertNotEqual(id(nsmap), id(tmpl.nsmap))
self.assertEqual(len(nsmap), 1)
self.assertEqual(nsmap['a'], 'foo')
def test_master_attach(self):
# Set up a master template
elem = xmlutil.TemplateElement('test')
tmpl = xmlutil.MasterTemplate(elem, 1)
# Make sure it has a root but no slaves
self.assertEqual(tmpl.root, elem)
self.assertEqual(len(tmpl.slaves), 0)
# Try to attach an invalid slave
bad_elem = xmlutil.TemplateElement('test2')
self.assertRaises(ValueError, tmpl.attach, bad_elem)
self.assertEqual(len(tmpl.slaves), 0)
# Try to attach an invalid and a valid slave
good_elem = xmlutil.TemplateElement('test')
self.assertRaises(ValueError, tmpl.attach, good_elem, bad_elem)
self.assertEqual(len(tmpl.slaves), 0)
# Try to attach an inapplicable template
class InapplicableTemplate(xmlutil.Template):
def apply(self, master):
return False
inapp_tmpl = InapplicableTemplate(good_elem)
tmpl.attach(inapp_tmpl)
self.assertEqual(len(tmpl.slaves), 0)
# Now try attaching an applicable template
tmpl.attach(good_elem)
self.assertEqual(len(tmpl.slaves), 1)
self.assertEqual(tmpl.slaves[0].root, good_elem)
def test_master_copy(self):
# Construct a master template
elem = xmlutil.TemplateElement('test')
tmpl = xmlutil.MasterTemplate(elem, 1, nsmap=dict(a='foo'))
# Give it a slave
slave = xmlutil.TemplateElement('test')
tmpl.attach(slave)
# Construct a copy
copy = tmpl.copy()
# Check to see if we actually managed a copy
self.assertNotEqual(tmpl, copy)
self.assertEqual(tmpl.root, copy.root)
self.assertEqual(tmpl.version, copy.version)
self.assertEqual(id(tmpl.nsmap), id(copy.nsmap))
self.assertNotEqual(id(tmpl.slaves), id(copy.slaves))
self.assertEqual(len(tmpl.slaves), len(copy.slaves))
self.assertEqual(tmpl.slaves[0], copy.slaves[0])
def test_slave_apply(self):
# Construct a master template
elem = xmlutil.TemplateElement('test')
master = xmlutil.MasterTemplate(elem, 3)
# Construct a slave template with applicable minimum version
slave = xmlutil.SlaveTemplate(elem, 2)
self.assertEqual(slave.apply(master), True)
# Construct a slave template with equal minimum version
slave = xmlutil.SlaveTemplate(elem, 3)
self.assertEqual(slave.apply(master), True)
# Construct a slave template with inapplicable minimum version
slave = xmlutil.SlaveTemplate(elem, 4)
self.assertEqual(slave.apply(master), False)
# Construct a slave template with applicable version range
slave = xmlutil.SlaveTemplate(elem, 2, 4)
self.assertEqual(slave.apply(master), True)
# Construct a slave template with low version range
slave = xmlutil.SlaveTemplate(elem, 1, 2)
self.assertEqual(slave.apply(master), False)
# Construct a slave template with high version range
slave = xmlutil.SlaveTemplate(elem, 4, 5)
self.assertEqual(slave.apply(master), False)
# Construct a slave template with matching version range
slave = xmlutil.SlaveTemplate(elem, 3, 3)
self.assertEqual(slave.apply(master), True)
def test__serialize(self):
# Our test object to serialize
obj = {
'test': {
'name': 'foobar',
'values': [1, 2, 3, 4],
'attrs': {
'a': 1,
'b': 2,
'c': 3,
'd': 4,
},
'image': {
'name': 'image_foobar',
'id': 42,
},
},
}
# Set up our master template
root = xmlutil.TemplateElement('test', selector='test',
name='name')
value = xmlutil.SubTemplateElement(root, 'value', selector='values')
value.text = xmlutil.Selector()
attrs = xmlutil.SubTemplateElement(root, 'attrs', selector='attrs')
xmlutil.SubTemplateElement(attrs, 'attr', selector=xmlutil.get_items,
key=0, value=1)
master = xmlutil.MasterTemplate(root, 1, nsmap=dict(f='foo'))
# Set up our slave template
root_slave = xmlutil.TemplateElement('test', selector='test')
image = xmlutil.SubTemplateElement(root_slave, 'image',
selector='image', id='id')
image.text = xmlutil.Selector('name')
slave = xmlutil.SlaveTemplate(root_slave, 1, nsmap=dict(b='bar'))
# Attach the slave to the master...
master.attach(slave)
# Try serializing our object
siblings = master._siblings()
nsmap = master._nsmap()
result = master._serialize(None, obj, siblings, nsmap)
# Now we get to manually walk the element tree...
self.assertEqual(result.tag, 'test')
self.assertEqual(len(result.nsmap), 2)
self.assertEqual(result.nsmap['f'], 'foo')
self.assertEqual(result.nsmap['b'], 'bar')
self.assertEqual(result.get('name'), obj['test']['name'])
for idx, val in enumerate(obj['test']['values']):
self.assertEqual(result[idx].tag, 'value')
self.assertEqual(result[idx].text, str(val))
idx += 1
self.assertEqual(result[idx].tag, 'attrs')
for attr in result[idx]:
self.assertEqual(attr.tag, 'attr')
self.assertEqual(attr.get('value'),
str(obj['test']['attrs'][attr.get('key')]))
idx += 1
self.assertEqual(result[idx].tag, 'image')
self.assertEqual(result[idx].get('id'),
str(obj['test']['image']['id']))
self.assertEqual(result[idx].text, obj['test']['image']['name'])
class MasterTemplateBuilder(xmlutil.TemplateBuilder):
def construct(self):
elem = xmlutil.TemplateElement('test')
return xmlutil.MasterTemplate(elem, 1)
class SlaveTemplateBuilder(xmlutil.TemplateBuilder):
def construct(self):
elem = xmlutil.TemplateElement('test')
return xmlutil.SlaveTemplate(elem, 1)
class TemplateBuilderTest(test.TestCase):
def test_master_template_builder(self):
# Make sure the template hasn't been built yet
self.assertEqual(MasterTemplateBuilder._tmpl, None)
# Now, construct the template
tmpl1 = MasterTemplateBuilder()
# Make sure that there is a template cached...
self.assertNotEqual(MasterTemplateBuilder._tmpl, None)
# Make sure it wasn't what was returned...
self.assertNotEqual(MasterTemplateBuilder._tmpl, tmpl1)
# Make sure it doesn't get rebuilt
cached = MasterTemplateBuilder._tmpl
tmpl2 = MasterTemplateBuilder()
self.assertEqual(MasterTemplateBuilder._tmpl, cached)
# Make sure we're always getting fresh copies
self.assertNotEqual(tmpl1, tmpl2)
# Make sure we can override the copying behavior
tmpl3 = MasterTemplateBuilder(False)
self.assertEqual(MasterTemplateBuilder._tmpl, tmpl3)
def test_slave_template_builder(self):
# Make sure the template hasn't been built yet
self.assertEqual(SlaveTemplateBuilder._tmpl, None)
# Now, construct the template
tmpl1 = SlaveTemplateBuilder()
# Make sure there is a template cached...
self.assertNotEqual(SlaveTemplateBuilder._tmpl, None)
# Make sure it was what was returned...
self.assertEqual(SlaveTemplateBuilder._tmpl, tmpl1)
# Make sure it doesn't get rebuilt
tmpl2 = SlaveTemplateBuilder()
self.assertEqual(SlaveTemplateBuilder._tmpl, tmpl1)
# Make sure we're always getting the cached copy
self.assertEqual(tmpl1, tmpl2)
class SerializerTest(xmlutil.XMLTemplateSerializer):
def test(self):
root = xmlutil.TemplateElement('servers')
a = xmlutil.SubTemplateElement(root, 'a', selector='servers')
a.text = xmlutil.Selector('a')
return xmlutil.MasterTemplate(root, 1, nsmap={None: "asdf"})
class XMLTemplateSerializerTest(test.TestCase):
def setUp(self):
self.tmpl_serializer = SerializerTest()
self.data = dict(servers=dict(a=(2, 3)))
self.data_multi = dict(servers=[dict(a=(2, 3)), dict(a=(3, 4))])
super(XMLTemplateSerializerTest, self).setUp()
def test_get_template(self):
# First, check what happens when we fall back on the default
# option
self.assertEqual(self.tmpl_serializer.get_template(), None)
self.assertEqual(self.tmpl_serializer.get_template('nosuch'), None)
# Now, check that we get back a template
tmpl = self.tmpl_serializer.get_template('test')
self.assertNotEqual(tmpl, None)
self.assertEqual(tmpl.root.tag, 'servers')
def test_serialize_default(self):
expected_xml = '<servers><a>(2,3)</a></servers>'
result = self.tmpl_serializer.serialize(self.data)
result = result.replace('\n', '').replace(' ', '')
self.assertEqual(result, expected_xml)
def test_serialize_multi_default(self):
expected_xml = ('<servers><server><a>(2,3)</a></server>'
'<server><a>(3,4)</a></server></servers>')
result = self.tmpl_serializer.serialize(self.data_multi)
result = result.replace('\n', '').replace(' ', '')
self.assertEqual(result, expected_xml)
def test_serialize_explicit(self):
expected_xml = ("<?xmlversion='1.0'encoding='UTF-8'?>"
'<serversxmlns="asdf"><a>(2,3)</a></servers>')
tmpl = self.tmpl_serializer.get_template('test')
result = self.tmpl_serializer.serialize(self.data, template=tmpl)
result = result.replace('\n', '').replace(' ', '')
self.assertEqual(result, expected_xml)
def test_serialize_multi_explicit(self):
expected_xml = ("<?xmlversion='1.0'encoding='UTF-8'?>"
'<serversxmlns="asdf"><a>(2,3)</a>'
'<a>(3,4)</a></servers>')
tmpl = self.tmpl_serializer.get_template('test')
result = self.tmpl_serializer.serialize(self.data_multi, template=tmpl)
result = result.replace('\n', '').replace(' ', '')
self.assertEqual(result, expected_xml)
def test_serialize(self):
expected_xml = ("<?xmlversion='1.0'encoding='UTF-8'?>"
'<serversxmlns="asdf"><a>(2,3)</a></servers>')
result = self.tmpl_serializer.serialize(self.data, 'test')
result = result.replace('\n', '').replace(' ', '')
self.assertEqual(result, expected_xml)
def test_serialize_multi(self):
expected_xml = ("<?xmlversion='1.0'encoding='UTF-8'?>"
'<serversxmlns="asdf"><a>(2,3)</a>'
'<a>(3,4)</a></servers>')
result = self.tmpl_serializer.serialize(self.data_multi, 'test')
result = result.replace('\n', '').replace(' ', '')
self.assertEqual(result, expected_xml)

View File

@ -14,15 +14,18 @@
# under the License.
import json
from lxml import etree
import stubout
import webob
import json
import nova.db
from nova import context
from nova import crypto
from nova import flags
from nova import test
from nova.api.openstack import xmlutil
from nova.api.openstack import zones
from nova.tests.api.openstack import fakes
from nova.scheduler import api
@ -112,6 +115,18 @@ class ZonesTest(test.TestCase):
self.assertEqual(res.status_int, 200)
self.assertEqual(len(res_dict['zones']), 2)
def test_get_zone_list_scheduler_xml(self):
self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler)
req = webob.Request.blank('/v1.1/fake/zones.xml')
res = req.get_response(fakes.wsgi_app())
res_tree = etree.fromstring(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(res_tree.tag, '{%s}zones' % xmlutil.XMLNS_V10)
self.assertEqual(len(res_tree), 2)
self.assertEqual(res_tree[0].tag, '{%s}zone' % xmlutil.XMLNS_V10)
self.assertEqual(res_tree[1].tag, '{%s}zone' % xmlutil.XMLNS_V10)
def test_get_zone_list_db(self):
self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty)
self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db)
@ -123,6 +138,20 @@ class ZonesTest(test.TestCase):
res_dict = json.loads(res.body)
self.assertEqual(len(res_dict['zones']), 2)
def test_get_zone_list_db_xml(self):
self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty)
self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db)
req = webob.Request.blank('/v1.1/fake/zones.xml')
req.headers["Content-Type"] = "application/json"
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
res_tree = etree.fromstring(res.body)
self.assertEqual(res_tree.tag, '{%s}zones' % xmlutil.XMLNS_V10)
self.assertEqual(len(res_tree), 2)
self.assertEqual(res_tree[0].tag, '{%s}zone' % xmlutil.XMLNS_V10)
self.assertEqual(res_tree[1].tag, '{%s}zone' % xmlutil.XMLNS_V10)
def test_get_zone_by_id(self):
req = webob.Request.blank('/v1.1/fake/zones/1')
req.headers["Content-Type"] = "application/json"
@ -134,6 +163,18 @@ class ZonesTest(test.TestCase):
self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
self.assertFalse('password' in res_dict['zone'])
def test_get_zone_by_id_xml(self):
req = webob.Request.blank('/v1.1/fake/zones/1.xml')
req.headers["Content-Type"] = "application/json"
res = req.get_response(fakes.wsgi_app())
res_tree = etree.fromstring(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10)
self.assertEqual(res_tree.get('id'), '1')
self.assertEqual(res_tree.get('api_url'), 'http://example.com')
self.assertEqual(res_tree.get('password'), None)
def test_zone_delete(self):
req = webob.Request.blank('/v1.1/fake/zones/1')
req.headers["Content-Type"] = "application/json"
@ -157,6 +198,23 @@ class ZonesTest(test.TestCase):
self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
self.assertFalse('username' in res_dict['zone'])
def test_zone_create_xml(self):
body = dict(zone=dict(api_url='http://example.com', username='fred',
password='fubar'))
req = webob.Request.blank('/v1.1/fake/zones.xml')
req.headers["Content-Type"] = "application/json"
req.method = 'POST'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
res_tree = etree.fromstring(res.body)
self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10)
self.assertEqual(res_tree.get('id'), '1')
self.assertEqual(res_tree.get('api_url'), 'http://example.com')
self.assertEqual(res_tree.get('username'), None)
def test_zone_update(self):
body = dict(zone=dict(username='zeb', password='sneaky'))
req = webob.Request.blank('/v1.1/fake/zones/1')
@ -172,6 +230,22 @@ class ZonesTest(test.TestCase):
self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
self.assertFalse('username' in res_dict['zone'])
def test_zone_update_xml(self):
body = dict(zone=dict(username='zeb', password='sneaky'))
req = webob.Request.blank('/v1.1/fake/zones/1.xml')
req.headers["Content-Type"] = "application/json"
req.method = 'PUT'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
res_tree = etree.fromstring(res.body)
self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10)
self.assertEqual(res_tree.get('id'), '1')
self.assertEqual(res_tree.get('api_url'), 'http://example.com')
self.assertEqual(res_tree.get('username'), None)
def test_zone_info(self):
caps = ['cap1=a;b', 'cap2=c;d']
self.flags(zone_name='darksecret', zone_capabilities=caps)
@ -187,6 +261,28 @@ class ZonesTest(test.TestCase):
self.assertEqual(res_dict['zone']['cap1'], 'a;b')
self.assertEqual(res_dict['zone']['cap2'], 'c;d')
def test_zone_info_xml(self):
caps = ['cap1=a;b', 'cap2=c;d']
self.flags(zone_name='darksecret', zone_capabilities=caps)
self.stubs.Set(api, '_call_scheduler', zone_capabilities)
body = dict(zone=dict(username='zeb', password='sneaky'))
req = webob.Request.blank('/v1.1/fake/zones/info.xml')
res = req.get_response(fakes.wsgi_app())
res_tree = etree.fromstring(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(res_tree.tag, '{%s}zone' % xmlutil.XMLNS_V10)
self.assertEqual(res_tree.get('name'), 'darksecret')
for elem in res_tree:
self.assertEqual(elem.tag in ('{%s}cap1' % xmlutil.XMLNS_V10,
'{%s}cap2' % xmlutil.XMLNS_V10),
True)
if elem.tag == '{%s}cap1' % xmlutil.XMLNS_V10:
self.assertEqual(elem.text, 'a;b')
elif elem.tag == '{%s}cap2' % xmlutil.XMLNS_V10:
self.assertEqual(elem.text, 'c;d')
def test_zone_select(self):
key = 'c286696d887c9aa0611bbb3e2025a45a'
self.flags(build_plan_encryption_key=key)
@ -220,3 +316,45 @@ class ZonesTest(test.TestCase):
self.assertTrue(found)
self.assertEqual(len(item), 2)
self.assertTrue('weight' in item)
def test_zone_select_xml(self):
key = 'c286696d887c9aa0611bbb3e2025a45a'
self.flags(build_plan_encryption_key=key)
self.stubs.Set(api, 'select', zone_select)
req = webob.Request.blank('/v1.1/fake/zones/select.xml')
req.method = 'POST'
req.headers["Content-Type"] = "application/json"
# Select queries end up being JSON encoded twice.
# Once to a string and again as an HTTP POST Body
req.body = json.dumps(json.dumps({}))
res = req.get_response(fakes.wsgi_app())
res_tree = etree.fromstring(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(res_tree.tag, '{%s}weights' % xmlutil.XMLNS_V10)
for item in res_tree:
self.assertEqual(item.tag, '{%s}weight' % xmlutil.XMLNS_V10)
blob = None
weight = None
for chld in item:
if chld.tag.endswith('blob'):
blob = chld.text
elif chld.tag.endswith('weight'):
weight = chld.text
decrypt = crypto.decryptor(FLAGS.build_plan_encryption_key)
secret_item = json.loads(decrypt(blob))
found = False
for original_item in GLOBAL_BUILD_PLAN:
if original_item['name'] != secret_item['name']:
continue
found = True
for key in ('weight', 'ip', 'zone'):
self.assertEqual(secret_item[key], original_item[key])
self.assertTrue(found)
self.assertEqual(len(item), 2)
self.assertTrue(weight)