Remove the soap protocol (I am moving it to wsme-soap

This commit is contained in:
Christophe de Vienne 2011-10-13 16:59:34 +02:00
parent 05d62db7bd
commit 3de7a3a1e7
8 changed files with 2 additions and 672 deletions

View File

@ -4,8 +4,10 @@ syntax: glob
*.pyo *.pyo
*.egg-info *.egg-info
*.swp *.swp
*.orig
~* ~*
.coverage .coverage
.noseids
syntax: regexp syntax: regexp

View File

@ -33,7 +33,6 @@ setup(
'wsme.protocols': [ 'wsme.protocols': [
'restjson = wsme.protocols.restjson:RestJsonProtocol', 'restjson = wsme.protocols.restjson:RestJsonProtocol',
'restxml = wsme.protocols.restxml:RestXmlProtocol', 'restxml = wsme.protocols.restxml:RestXmlProtocol',
'soap = wsme.protocols.soap:SoapProtocol',
] ]
}, },
) )

View File

@ -1,335 +0,0 @@
"""
A SOAP implementation for wsme.
Parts of the code were taken from the tgwebservices soap implmentation.
"""
import pkg_resources
import datetime
import decimal
import base64
import logging
from simplegeneric import generic
try:
from xml.etree import cElementTree as et
except ImportError:
import cElementTree as et
from genshi.builder import tag, Element, Namespace
from genshi.template import MarkupTemplate
from wsme.controller import pexpose
import wsme.types
from wsme import exc
from wsme.utils import *
log = logging.getLogger(__name__)
xsi_ns = 'http://www.w3.org/2001/XMLSchema-instance'
type_qn = '{%s}type' % xsi_ns
nil_qn = '{%s}nil' % xsi_ns
type_registry = {
basestring: 'xsd:string',
str: 'xsd:string',
unicode: 'xsd:string',
int: 'xsd:int',
long: "xsd:long",
float: "xsd:float",
bool: "xsd:boolean",
#unsigned: "xsd:unsignedInt",
datetime.datetime: "xsd:dateTime",
datetime.date: "xsd:date",
datetime.time: "xsd:time",
decimal.Decimal: "xsd:decimal",
wsme.types.binary: "xsd:base64Binary",
}
array_registry = {
basestring: "String_Array",
str: "String_Array",
unicode: "String_Array",
int: "Int_Array",
long: "Long_Array",
float: "Float_Array",
bool: "Boolean_Array",
}
def soap_array(datatype):
if datatype in array_registry:
return array_registry[datatype]
return 'types:' + datatype.__name__ + '_Array'
def soap_type(datatype):
if type(datatype) == list:
return soap_array(datatype[0])
if datatype in type_registry:
return type_registry[datatype]
if wsme.types.iscomplex(datatype):
return "types:%s" % datatype.__name__
def soap_fname(funcdef):
return "%s%s" % (
"".join((i.capitalize() for i in funcdef.path)),
funcdef.name.capitalize())
def make_soap_element(datatype, tag, value):
el = Element(tag)
if value is None:
el(**{'xsi:nil': 'true'})
elif wsme.types.iscomplex(datatype):
el(**{'xsi:type': datatype.__name__})
for name, attrdef in wsme.types.list_attributes(datatype):
el.append(
tosoap(attrdef.datatype, name, getattr(value, name)))
else:
el(value, **{'xsi:type': type_registry.get(datatype)})
return el
@generic
def tosoap(datatype, tag, value):
"""Converts a value into xml Element objects for inclusion in the SOAP
response output (after adding the type to the type_registry).
If a non-complex user specific type is to be used in the api,
a specific toxml should be added::
from wsme.protocol.soap import tosoap, make_soap_element, type_registry
class MySpecialType(object):
pass
type_registry[MySpecialType] = 'xsd:MySpecialType'
@tosoap.when_object(MySpecialType)
def myspecialtype_tosoap(datatype, tag, value):
return make_soap_element(datatype, tag, str(value))
"""
return make_soap_element(datatype, tag, value)
@tosoap.when_type(list)
def array_tosoap(datatype, tag, value):
el = Element(tag)
el(**{'xsi:type': soap_array(datatype[0])})
if value is None:
el(**{'xsi:nil': 'true'})
for item in value:
el.append(tosoap(datatype[0], 'item', item))
return el
@tosoap.when_object(datetime.datetime)
def datetime_tosoap(datatype, tag, value):
return make_soap_element(datatype, tag,
value is not None and value.isoformat() or None)
@tosoap.when_object(wsme.types.binary)
def binary_tosoap(datatype, tag, value):
return make_soap_element(datatype, tag,
value is not None and base64.encodestring(value)
or None)
@tosoap.when_object(None)
def None_tosoap(datatype, tag, value):
return make_soap_element(datatype, tag, None)
@generic
def fromsoap(datatype, el, ns):
"""
A generic converter from soap elements to python datatype.
If a non-complex user specific type is to be used in the api,
a specific fromsoap should be added.
"""
if el.get(nil_qn) == 'true':
return None
soaptype = el.get(type_qn)
if datatype in type_registry:
if soaptype != type_registry[datatype]:
raise exc.InvalidInput(el.tag, et.tostring(el))
value = datatype(el.text)
else:
if soaptype != datatype.__name__:
raise exc.InvalidInput(el.tag, et.tostring(el))
value = datatype()
for name, attr in wsme.types.list_attributes(datatype):
child = el.find('{%s}%s' % (ns['type'], name))
setattr(value, name, fromsoap(attr.datatype, child, ns))
return value
@fromsoap.when_type(list)
def array_fromsoap(datatype, el, ns):
return [fromsoap(datatype[0], child, ns) for child in el]
@fromsoap.when_object(datetime.date)
def date_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) != 'xsd:date':
raise exc.InvalidInput(el.tag, et.tostring(el))
return parse_isodate(el.text)
@fromsoap.when_object(datetime.time)
def time_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) != 'xsd:time':
raise exc.InvalidInput(el.tag, et.tostring(el))
return parse_isotime(el.text)
@fromsoap.when_object(datetime.datetime)
def datetime_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) != 'xsd:dateTime':
raise exc.InvalidInput(el.tag, et.tostring(el))
return parse_isodatetime(el.text)
@fromsoap.when_object(wsme.types.binary)
def binary_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) != 'xsd:base64Binary':
raise exc.InvalidInput(el.tag, et.tostring(el))
return base64.decodestring(el.text)
class SoapProtocol(object):
"""
SOAP protocol.
.. autoattribute:: name
.. autoattribute:: content_types
"""
name = 'soap'
content_types = ['application/soap+xml']
ns = {
"soap": "http://www.w3.org/2001/12/soap-envelope",
"soapenv": "http://schemas.xmlsoap.org/soap/envelope/",
"soapenc": "http://schemas.xmlsoap.org/soap/encoding/",
}
def __init__(self, tns=None,
typenamespace=None,
baseURL=None):
self.tns = tns
self.typenamespace = typenamespace
self.servicename = 'MyApp'
self.baseURL = baseURL
self._name_mapping = {}
def get_name_mapping(self, service=None):
if service not in self._name_mapping:
self._name_mapping[service] = dict(
(soap_fname(f), f.path + [f.name])
for f in self.root.getapi()
if service is None or (f.path and f.path[0] == service))
return self._name_mapping[service]
def accept(self, req):
if req.path.endswith('.wsdl'):
return True
for ct in self.content_types:
if req.headers['Content-Type'].startswith(ct):
return True
if req.headers.get("Soapaction"):
return True
return False
def extract_path(self, request):
if request.path.endswith('.wsdl'):
return ['_protocol', self.name, 'api_wsdl']
el = et.fromstring(request.body)
body = el.find('{%(soapenv)s}Body' % self.ns)
# Extract the service name from the tns
message = list(body)[0]
fname = message.tag
if fname.startswith('{%s}' % self.typenamespace):
fname = fname[len(self.typenamespace) + 2:]
mapping = self.get_name_mapping()
if fname not in mapping:
raise exc.UnknownFunction(fname)
path = mapping[fname]
request._wsme_soap_message = message
return path
return None
def read_arguments(self, funcdef, request):
kw = {}
if not hasattr(request, '_wsme_soap_message'):
return kw
msg = request._wsme_soap_message
parameters = msg.find('{%s}parameters' % self.typenamespace)
if parameters:
for param in parameters:
name = param.tag[len(self.typenamespace) + 2:]
arg = funcdef.get_arg(name)
value = fromsoap(arg.datatype, param, {
'type': self.typenamespace,
})
kw[name] = value
return kw
def soap_response(self, funcdef, result):
r = Element(soap_fname(funcdef) + 'Response')
r.append(tosoap(funcdef.return_type, 'result', result))
return r
def encode_result(self, funcdef, result):
envelope = self.render_template('soap',
typenamespace=self.typenamespace,
result=result,
funcdef=funcdef,
soap_response=self.soap_response)
return envelope
def get_template(self, name):
return pkg_resources.resource_string(
__name__, 'templates/%s.html' % name)
def render_template(self, name, **kw):
tmpl = MarkupTemplate(self.get_template(name))
stream = tmpl.generate(**kw)
return stream.render('xml')
def encode_error(self, infos):
return self.render_template('fault',
typenamespace=self.typenamespace,
**infos)
@pexpose(contenttype="text/xml")
def api_wsdl(self, service=None):
if service is None:
servicename = self.servicename
else:
servicename = self.servicename + service.capitalize()
return self.render_template('wsdl',
tns=self.tns,
typenamespace=self.typenamespace,
soapenc=self.ns['soapenc'],
service_name=servicename,
complex_types=(t() for t in wsme.types.complex_types),
funclist=self.root.getapi(),
arrays=wsme.types.array_types,
list_attributes=wsme.types.list_attributes,
baseURL=self.baseURL,
soap_array=soap_array,
soap_type=soap_type,
soap_fname=soap_fname,
)

View File

@ -1,14 +0,0 @@
<soapenv:Envelope
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:py="http://genshi.edgewall.org/"
py:attrs="{'xmlns' : typenamespace}">
<soapenv:Body>
<soapenv:Fault>
<soapenv:faultcode>${faultcode}</soapenv:faultcode>
<soapenv:faultstring>${faultstring}</soapenv:faultstring>
<soapenv:detail py:if="debuginfo is not None">${debuginfo}</soapenv:detail>
</soapenv:Fault>
</soapenv:Body>
</soapenv:Envelope>

View File

@ -1,10 +0,0 @@
<soapenv:Envelope
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:py="http://genshi.edgewall.org/"
py:attrs="{'xmlns' : typenamespace}">
<soapenv:Body>
${soap_response(funcdef, result)}
</soapenv:Body>
</soapenv:Envelope>

View File

@ -1,101 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<wsdl:definitions name="${service_name}"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:py="http://genshi.edgewall.org/"
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
py:attrs="{'xmlns:tns' : tns, 'xmlns:types' : typenamespace, 'targetNamespace' : tns, 'xmlns:soapenc' : soapenc }">
<wsdl:types>
<xsd:schema elementFormDefault="qualified"
targetNamespace="${typenamespace}">
<!-- !Declare all of the complex types (user classes). -->
<xsd:complexType py:for="cls in complex_types"
name="${cls.__name__}">
<xsd:sequence>
<xsd:element py:for="attr, attrdef in list_attributes(cls)" name="${attr}"
type="${soap_type(attrdef.datatype)}"
minOccurs="${attrdef.mandatory and 1 or 0}"
maxOccurs="1"/>
</xsd:sequence>
</xsd:complexType>
<!-- !Declare the request/response pairs for each function -->
<py:for each="funcdef in funclist">
<xsd:element name="${soap_fname(funcdef)}">
<xsd:complexType>
<xsd:sequence>
<xsd:element py:for="farg in funcdef.arguments"
name="${farg.name}"
type="${soap_type(farg.datatype)}"
py:attrs="{'minOccurs': not farg.mandatory and '0' or None}"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
<xsd:element name="${soap_fname(funcdef)}Response">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="result"
type="${soap_type(funcdef.return_type)}"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
</py:for>
<!-- !Declare all of the array types. -->
<xsd:complexType py:for="array in arrays" name="${soap_array(array)}">
<xsd:sequence>
<xsd:element maxOccurs="unbounded" name="item" nillable="true"
type="${soap_type(array)}"/>
</xsd:sequence>
</xsd:complexType>
</xsd:schema>
</wsdl:types>
<!-- !Declare the request and response messages used for each func. -->
<py:for each="funcdef in funclist">
<wsdl:message name="${soap_fname(funcdef)}Request" py:attrs="{'xmlns':typenamespace}">
<wsdl:part name="parameters" element="types:${soap_fname(funcdef)}"/>
</wsdl:message>
<wsdl:message name="${soap_fname(funcdef)}Response" py:attrs="{'xmlns':typenamespace}">
<wsdl:part name="parameters" element="types:${soap_fname(funcdef)}Response"/>
</wsdl:message>
</py:for>
<!-- !Define the PortType for the service and all of the operations
(functions) declared in the service. -->
<wsdl:portType name="${service_name}_PortType">
<wsdl:operation py:for="funcdef in funclist" name="${soap_fname(funcdef)}">
<wsdl:documentation py:if="funcdef.doc">${funcdef.doc}</wsdl:documentation>
<wsdl:input message="tns:${soap_fname(funcdef)}Request"/>
<wsdl:output message="tns:${soap_fname(funcdef)}Response"/>
</wsdl:operation>
</wsdl:portType>
<!-- !Declare our service as a document/literal style service with http
transport. -->
<wsdl:binding name="${service_name}_Binding" type="tns:${service_name}_PortType">
<soap:binding style="document"
transport="http://schemas.xmlsoap.org/soap/http"/>
<wsdl:operation py:for="funcdef in funclist" name="${soap_fname(funcdef)}">
<soap:operation soapAction="${soap_fname(funcdef)}"/>
<wsdl:input>
<soap:body use="literal"/>
</wsdl:input>
<wsdl:output>
<soap:body use="literal"/>
</wsdl:output>
</wsdl:operation>
</wsdl:binding>
<!-- !Define our service's location. -->
<wsdl:service name="${service_name}">
<wsdl:documentation>WSDL File for ${service_name}</wsdl:documentation>
<wsdl:port binding="tns:${service_name}_Binding" name="${service_name}_PortType">
<soap:address
location="${baseURL}"/>
</wsdl:port>
</wsdl:service>
</wsdl:definitions>

View File

@ -1,211 +0,0 @@
import decimal
import datetime
import base64
import wsme.tests.protocol
try:
import xml.etree.ElementTree as et
except:
import cElementTree as et
from wsme.protocols.soap import SoapProtocol
import wsme.utils
tns = "http://foo.bar.baz/soap/"
typenamespace = "http://foo.bar.baz/types/"
soapenv_ns = 'http://schemas.xmlsoap.org/soap/envelope/'
xsi_ns = 'http://www.w3.org/2001/XMLSchema-instance'
body_qn = '{%s}Body' % soapenv_ns
fault_qn = '{%s}Fault' % soapenv_ns
faultcode_qn = '{%s}faultcode' % soapenv_ns
faultstring_qn = '{%s}faultstring' % soapenv_ns
faultdetail_qn = '{%s}detail' % soapenv_ns
type_qn = '{%s}type' % xsi_ns
nil_qn = '{%s}nil' % xsi_ns
def build_soap_message(method, params=""):
message = """<?xml version="1.0"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<soap:Body xmlns="%(typenamespace)s">
<%(method)s>
%(params)s
</%(method)s>
</soap:Body>
</soap:Envelope>
""" % dict(method=method,
params=params,
typenamespace=typenamespace)
return message
python_types = {
int: ('xsd:int', str),
float: ('xsd:float', str),
str: ('xsd:string', str),
unicode: ('xsd:string', unicode),
wsme.types.binary: ('xsd:base64Binary', base64.encodestring),
decimal.Decimal: ('xsd:decimal', str),
datetime.date: ('xsd:date', datetime.date.isoformat),
datetime.time: ('xsd:time', datetime.time.isoformat),
datetime.datetime: ('xsd:dateTime', datetime.datetime.isoformat),
}
array_types = {
basestring: "String_Array",
str: "String_Array",
unicode: "String_Array",
int: "Int_Array",
long: "Long_Array",
float: "Float_Array",
bool: "Boolean_Array",
}
def tosoap(tag, value):
if isinstance(value, tuple):
value, datatype = value
else:
datatype = type(value)
el = et.Element(tag)
if value is None:
el.set('xsi:nil', True)
elif isinstance(datatype, list):
if datatype[0] in array_types:
el.set('xsi:type', array_types[datatype[0]])
else:
el.set('xsi:type', 'types:' + datatype[0].__name__)
for item in value:
el.append(tosoap('item', (item, datatype[0])))
elif datatype in python_types:
stype, conv = python_types[datatype]
el.text = conv(value)
el.set('xsi:type', stype)
else:
el.set('xsi:type', datatype.__name__)
for name, attr in datatype._wsme_attributes:
if name in value:
el.append(tosoap(name, (value[name], attr.datatype)))
return el
def read_bool(value):
return value == 'true'
soap_types = {
'xsd:string': unicode,
'xsd:int': int,
'xsd:long': long,
'xsd:float': float,
'xsd:decimal': decimal.Decimal,
'xsd:boolean': read_bool,
'xsd:date': wsme.utils.parse_isodate,
'xsd:time': wsme.utils.parse_isotime,
'xsd:dateTime': wsme.utils.parse_isodatetime,
'xsd:base64Binary': base64.decodestring,
}
def fromsoap(el):
if el.get(nil_qn) == 'true':
return None
t = el.get(type_qn)
if t in soap_types:
return soap_types[t](el.text)
elif t and t.endswith('_Array'):
return [fromsoap(i) for i in el]
else:
d = {}
for child in el:
name = child.tag
assert name.startswith('{%s}' % typenamespace)
name = name[len(typenamespace) + 2:]
d[name] = fromsoap(child)
return d
class TestSOAP(wsme.tests.protocol.ProtocolTestCase):
protocol = SoapProtocol(
tns=tns, typenamespace=typenamespace)
def test_simple_call(self):
message = build_soap_message('Touch')
print message
res = self.app.post('/', message,
headers={"Content-Type": "application/soap+xml; charset=utf-8"},
expect_errors=True)
print res.body
assert res.status.startswith('200')
def call(self, fpath, _rt=None, _accept=None,
_no_result_decode=False, **kw):
path = fpath.strip('/').split('/')
# get the actual definition so we can build the adequate request
if kw:
el = et.Element('parameters')
for key, value in kw.items():
el.append(tosoap(key, value))
params = et.tostring(el)
else:
params = ""
methodname = ''.join((i.capitalize() for i in path))
message = build_soap_message(methodname, params)
print message
headers = {"Content-Type": "application/soap+xml; charset=utf-8"}
if _accept is not None:
headers['Accept'] = _accept
res = self.app.post('/', message,
headers=headers,
expect_errors=True)
print "Status: ", res.status, "Received:", res.body
if _no_result_decode:
return res
el = et.fromstring(res.body)
body = el.find(body_qn)
print body
if res.status_int == 200:
r = body.find('{%s}%sResponse' % (typenamespace, methodname))
result = r.find('{%s}result' % typenamespace)
print "Result element: ", result
return fromsoap(result)
elif res.status_int == 400:
fault = body.find(fault_qn)
raise wsme.tests.protocol.CallException(
fault.find(faultcode_qn).text,
fault.find(faultstring_qn).text,
"")
elif res.status_int == 500:
fault = body.find(fault_qn)
raise wsme.tests.protocol.CallException(
fault.find(faultcode_qn).text,
fault.find(faultstring_qn).text,
fault.find(faultdetail_qn).text)
if el.tag == 'error':
raise wsme.tests.protocol.CallException(
el.find('faultcode').text,
el.find('faultstring').text,
el.find('debuginfo') is not None and
el.find('debuginfo').text or None)
else:
return loadxml(et.fromstring(res.body))
def test_wsdl(self):
res = self.app.get('/api.wsdl')
print res.body
assert res.body.find('NestedOuter_Array') != -1
assert 'ReturntypesGetunicode' in res.body