142 lines
4.3 KiB
Python
142 lines
4.3 KiB
Python
# Copyright (c) 2014 OpenStack Foundation.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import lxml.etree
|
|
from urllib import quote
|
|
from copy import deepcopy
|
|
from pkg_resources import resource_stream # pylint: disable-msg=E0611
|
|
import sys
|
|
|
|
from swift3.exception import S3Exception
|
|
from swift3.utils import LOGGER, camel_to_snake, utf8encode, utf8decode
|
|
|
|
XMLNS_S3 = 'http://s3.amazonaws.com/doc/2006-03-01/'
|
|
XMLNS_XSI = 'http://www.w3.org/2001/XMLSchema-instance'
|
|
|
|
|
|
class XMLSyntaxError(S3Exception):
|
|
pass
|
|
|
|
|
|
class DocumentInvalid(S3Exception):
|
|
pass
|
|
|
|
|
|
def cleanup_namespaces(elem):
|
|
def remove_ns(tag, ns):
|
|
if tag.startswith('{%s}' % ns):
|
|
tag = tag[len('{%s}' % ns):]
|
|
return tag
|
|
|
|
if not isinstance(elem.tag, basestring):
|
|
# elem is a comment element.
|
|
return
|
|
|
|
# remove s3 namespace
|
|
elem.tag = remove_ns(elem.tag, XMLNS_S3)
|
|
|
|
# remove default namespace
|
|
if elem.nsmap and None in elem.nsmap:
|
|
elem.tag = remove_ns(elem.tag, elem.nsmap[None])
|
|
|
|
for e in elem.iterchildren():
|
|
cleanup_namespaces(e)
|
|
|
|
|
|
def fromstring(text, root_tag=None):
|
|
try:
|
|
elem = lxml.etree.fromstring(text, parser)
|
|
except lxml.etree.XMLSyntaxError as e:
|
|
LOGGER.debug(e)
|
|
raise XMLSyntaxError(e)
|
|
|
|
cleanup_namespaces(elem)
|
|
|
|
if root_tag is not None:
|
|
# validate XML
|
|
try:
|
|
path = 'schema/%s.rng' % camel_to_snake(root_tag)
|
|
with resource_stream(__name__, path) as rng:
|
|
lxml.etree.RelaxNG(file=rng).assertValid(elem)
|
|
except IOError as e:
|
|
# Probably, the schema file doesn't exist.
|
|
exc_type, exc_value, exc_traceback = sys.exc_info()
|
|
LOGGER.error(e)
|
|
raise exc_type, exc_value, exc_traceback
|
|
except lxml.etree.DocumentInvalid as e:
|
|
LOGGER.debug(e)
|
|
raise DocumentInvalid(e)
|
|
|
|
return elem
|
|
|
|
|
|
def tostring(tree, encoding_type=None, use_s3ns=True):
|
|
if use_s3ns:
|
|
nsmap = tree.nsmap.copy()
|
|
nsmap[None] = XMLNS_S3
|
|
|
|
root = Element(tree.tag, attrib=tree.attrib, nsmap=nsmap)
|
|
root.text = tree.text
|
|
root.extend(deepcopy(tree.getchildren()))
|
|
tree = root
|
|
|
|
if encoding_type == 'url':
|
|
tree = deepcopy(tree)
|
|
for e in tree.iter():
|
|
# Some elements are not url-encoded even when we specify
|
|
# encoding_type=url.
|
|
blacklist = ['LastModified', 'ID', 'DisplayName', 'Initiated']
|
|
if e.tag not in blacklist:
|
|
if isinstance(e.text, basestring):
|
|
e.text = quote(e.text)
|
|
|
|
return lxml.etree.tostring(tree, xml_declaration=True, encoding='UTF-8')
|
|
|
|
|
|
class _Element(lxml.etree.ElementBase):
|
|
"""
|
|
Wrapper Element class of lxml.etree.Element to support
|
|
a utf-8 encoded non-ascii string as a text.
|
|
|
|
Why we need this?:
|
|
Original lxml.etree.Element supports only unicode for the text.
|
|
It declines maintainability because we have to call a lot of encode/decode
|
|
methods to apply account/container/object name (i.e. PATH_INFO) to each
|
|
Element instance. When using this class, we can remove such a redundant
|
|
codes from swift3 middleware.
|
|
"""
|
|
def __init__(self, *args, **kwargs):
|
|
# pylint: disable-msg=E1002
|
|
super(_Element, self).__init__(*args, **kwargs)
|
|
|
|
@property
|
|
def text(self):
|
|
"""
|
|
utf-8 wrapper property of lxml.etree.Element.text
|
|
"""
|
|
return utf8encode(lxml.etree.ElementBase.text.__get__(self))
|
|
|
|
@text.setter
|
|
def text(self, value):
|
|
lxml.etree.ElementBase.text.__set__(self, utf8decode(value))
|
|
|
|
|
|
parser_lookup = lxml.etree.ElementDefaultClassLookup(element=_Element)
|
|
parser = lxml.etree.XMLParser()
|
|
parser.set_element_class_lookup(parser_lookup)
|
|
|
|
Element = parser.makeelement
|
|
SubElement = lxml.etree.SubElement
|