Remove support for SOAP protocol

This also allows us to removal lxml as an optional dependency.

Change-Id: I7ce0418012070677e36e41236487f3fabc0763a8
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2019-05-01 14:56:13 -06:00
parent 3fb55f7f4f
commit 4c5c051fcb
15 changed files with 15 additions and 1617 deletions

View File

@ -31,7 +31,7 @@ Here is a standalone wsgi example::
def hello(self, who=u'World'):
return u"Hello {0} !".format(who)
ws = MyService(protocols=['restjson', 'restxml', 'soap'])
ws = MyService(protocols=['restjson', 'restxml'])
application = ws.wsgiapp()
With this published at the ``/ws`` path of your application, you can access
@ -42,28 +42,25 @@ your hello function in various protocols:
* - URL
- Returns
* - ``http://<server>/ws/hello.json?who=you``
- ``"Hello you !"``
* - ``http://<server>/ws/hello.xml``
- ``<result>Hello World !</result>``
* - ``http://<server>/ws/api.wsdl``
- A WSDL description for any SOAP client.
Main features
~~~~~~~~~~~~~
- Very simple API.
- Supports user-defined simple and complex types.
- Multi-protocol : REST+Json, REST+XML, SOAP and more to come.
- Multi-protocol : REST+Json, REST+XML and more to come.
- Extensible : easy to add more protocols or more base types.
- Framework independence : adapters are provided to easily integrate
your API in any web framework, for example a wsgi container,
Pecan_, Flask_, ...
- Very few runtime dependencies: webob, simplegeneric. Optionnaly lxml and
- Very few runtime dependencies: webob, simplegeneric. Optionally
simplejson if you need better performances.
- Integration in `Sphinx`_ for making clean documentation with
``wsmeext.sphinxext``.

View File

@ -7,6 +7,7 @@ Changes
* Remove support for turbogears
* Remove support for cornice
* Remove support for ExtDirect
* Remove support for SOAP
* Remove SQLAlchemy support. It has never actually worked to begin with.
0.9.2 (2017-02-14)

View File

@ -243,8 +243,8 @@ man_pages = [
autodoc_member_order = 'bysource'
wsme_protocols = [
'restjson', 'restxml', 'soap',
'restjson', 'restxml',
]
intersphinx_mapping = {
'python': ('http://docs.python.org/', None),

View File

@ -33,8 +33,7 @@ Config values
.. confval:: wsme_protocols
A list of strings that are WSME protocol names. If provided by an
additional package (for example WSME-Soap), that package must be installed.
A list of strings that are WSME protocol names.
The types and services generated documentation will include code samples
for each of these protocols.
@ -60,7 +59,7 @@ Directives
To set it globally, see :confval:`wsme_root`.
A ``webpath`` option allows override of :confval:`wsme_webpath`.
Example:
.. code-block:: rst
@ -102,9 +101,9 @@ Example
:type: int
.. wsme:service:: name/space/SampleService
.. wsme:function:: doit
- .. wsme:root:: wsmeext.sphinxext.SampleService
:webpath: /api
@ -115,9 +114,9 @@ Example
:type: int
.. wsme:service:: name/space/SampleService
.. wsme:function:: getType
Returns a :wsme:type:`MyType <MyType>`

View File

@ -7,9 +7,8 @@ You can find it in the examples directory of the source distribution.
.. literalinclude:: ../examples/demo/demo.py
:language: python
When running this example, the following soap client can interrogate
When running this example, the following REST client can interrogate
the web services:
.. literalinclude:: ../examples/demo/client.py
:language: python

View File

@ -233,23 +233,3 @@ and 'debuginfo' subelements:
<faultcode>Client</faultcode>
<faultstring>id is missing</faultstring>
</error>
SOAP
----
:name: ``'soap'``
Implements the SOAP protocol.
A wsdl definition of the webservice is available at the 'api.wsdl' subpath.
(``/ws/api.wsdl`` in our example).
The protocol is selected if the request matches one of the following condition:
- The Content-Type is 'application/soap+xml'
- A header 'Soapaction' is present
Options
~~~~~~~
:tns: Type namespace

View File

@ -1,32 +0,0 @@
from suds.client import Client
url = 'http://127.0.0.1:8080/ws/api.wsdl'
client = Client(url, cache=None)
print client
print client.service.multiply(4, 5)
print client.service.helloworld()
print client.service.getperson()
p = client.service.listpersons()
print repr(p)
p = client.service.setpersons(p)
print repr(p)
p = client.factory.create('ns0:Person')
p.id = 4
print p
a = client.factory.create('ns0:Person_Array')
print a
a = client.service.setpersons(a)
print repr(a)
a.item.append(p)
print repr(a)
a = client.service.setpersons(a)
print repr(a)

View File

@ -90,12 +90,6 @@ class DemoRoot(WSRoot):
root = DemoRoot(webpath='/ws')
root.addprotocol('soap',
tns='http://example.com/demo',
typenamespace='http://example.com/demo/types',
baseURL='http://127.0.0.1:8080/ws/',
)
root.addprotocol('restjson')
bottle.mount('/ws/', root.wsgiapp())

View File

@ -32,7 +32,6 @@ wsme.protocols =
rest = wsme.rest.protocol:RestProtocol
restjson = wsme.rest.protocol:RestProtocol
restxml = wsme.rest.protocol:RestProtocol
soap = wsmeext.soap:SoapProtocol
[files]
packages =

231
tox.ini
View File

@ -1,5 +1,5 @@
[tox]
envlist = py27,py27-nolxml,pypy,coverage,py36,py35,py36-nolxml,py35-nolxml,pecan-dev27,pecan-dev35,pecan-dev36,pep8
envlist = py27,py35,py36,pypy,coverage,pep8,pecan-dev27,pecan-dev35,pecan-dev36
[common]
testtools =
@ -24,7 +24,6 @@ setenv =
deps =
{[common]testtools}
transaction
suds-jurko
https://github.com/pecan/pecan/zipball/master
[testenv:pecan-dev27]
@ -80,17 +79,6 @@ deps =
oslo.config
oslotest
[testenv:py27-lxml-json]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
basepython = python2.7
[testenv:py27]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
@ -98,20 +86,6 @@ commands =
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
basepython = python2.7
[testenv:py27-lxml-simplejson]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
simplejson
basepython = python2.7
[testenv:py27-simplejson]
@ -121,53 +95,9 @@ commands =
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
simplejson
basepython = python2.7
[testenv:py27-nolxml-json]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
basepython = python2.7
[testenv:py27-nolxml]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
basepython = python2.7
[testenv:py27-nolxml-simplejson]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
simplejson
basepython = python2.7
[testenv:py35-lxml-json]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
basepython = python3.5
[testenv:py35]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
@ -175,20 +105,6 @@ commands =
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
basepython = python3.5
[testenv:py35-lxml-simplejson]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
simplejson
basepython = python3.5
[testenv:py35-simplejson]
@ -198,53 +114,9 @@ commands =
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
simplejson
basepython = python3.5
[testenv:py35-nolxml-json]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
basepython = python3.5
[testenv:py35-nolxml]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
basepython = python3.5
[testenv:py35-nolxml-simplejson]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
simplejson
basepython = python3.5
[testenv:py36-lxml-json]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
basepython = python3.6
[testenv:py36]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
@ -252,20 +124,6 @@ commands =
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
basepython = python3.6
[testenv:py36-lxml-simplejson]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
simplejson
basepython = python3.6
[testenv:py36-simplejson]
@ -275,52 +133,9 @@ commands =
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
simplejson
basepython = python3.6
[testenv:py36-nolxml-json]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
basepython = python3.6
[testenv:py36-nolxml]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
basepython = python3.6
[testenv:py36-nolxml-simplejson]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
simplejson
basepython = python3.6
[testenv:pypy-lxml-json]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
[testenv:pypy]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
@ -328,19 +143,6 @@ commands =
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
[testenv:pypy-lxml-simplejson]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
simplejson
[testenv:pypy-simplejson]
commands =
@ -349,35 +151,4 @@ commands =
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
lxml
simplejson
[testenv:pypy-nolxml-json]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
[testenv:pypy-nolxml]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
[testenv:pypy-nolxml-simplejson]
commands =
{envbindir}/coverage run {envbindir}/nosetests --nologcapture --with-xunit --xunit-file nosetests-{envname}.xml wsme/tests wsmeext/tests tests/pecantest tests/test_sphinxext.py tests/test_flask.py --verbose {posargs}
{envbindir}/coverage report --show-missing wsme/*.py wsme/rest/*.py wsmeext/*.py
deps =
{[common]testtools}
{[common]basedeps}
suds-jurko
simplejson

View File

@ -1,5 +0,0 @@
from __future__ import absolute_import
from wsmeext.soap.protocol import SoapProtocol
__all__ = ['SoapProtocol']

View File

@ -1,478 +0,0 @@
"""
A SOAP implementation for wsme.
Parts of the code were taken from the tgwebservices soap implmentation.
"""
from __future__ import absolute_import
import pkg_resources
import datetime
import decimal
import base64
import logging
import six
from wsmeext.soap.simplegeneric import generic
from wsmeext.soap.wsdl import WSDLGenerator
try:
from lxml import etree as ET
use_lxml = True
except ImportError:
from xml.etree import cElementTree as ET # noqa
use_lxml = False
from wsme.protocol import CallContext, Protocol, expose
import wsme.types
import wsme.runtime
from wsme import exc
from wsme.utils import parse_isodate, parse_isotime, parse_isodatetime
log = logging.getLogger(__name__)
xsd_ns = 'http://www.w3.org/2001/XMLSchema'
xsi_ns = 'http://www.w3.org/2001/XMLSchema-instance'
soapenv_ns = 'http://schemas.xmlsoap.org/soap/envelope/'
if not use_lxml:
ET.register_namespace('soap', soapenv_ns)
type_qn = '{%s}type' % xsi_ns
nil_qn = '{%s}nil' % xsi_ns
Envelope_qn = '{%s}Envelope' % soapenv_ns
Body_qn = '{%s}Body' % soapenv_ns
Fault_qn = '{%s}Fault' % soapenv_ns
faultcode_qn = '{%s}faultcode' % soapenv_ns
faultstring_qn = '{%s}faultstring' % soapenv_ns
detail_qn = '{%s}detail' % soapenv_ns
type_registry = {
wsme.types.bytes: 'xs:string',
wsme.types.text: 'xs:string',
int: 'xs:int',
float: "xs:float",
bool: "xs:boolean",
datetime.datetime: "xs:dateTime",
datetime.date: "xs:date",
datetime.time: "xs:time",
decimal.Decimal: "xs:decimal",
wsme.types.binary: "xs:base64Binary",
}
if not six.PY3:
type_registry[long] = "xs:long" # noqa
array_registry = {
wsme.types.text: "String_Array",
wsme.types.bytes: "String_Array",
int: "Int_Array",
float: "Float_Array",
bool: "Boolean_Array",
}
if not six.PY3:
array_registry[long] = "Long_Array" # noqa
def soap_array(datatype, ns):
if datatype.item_type in array_registry:
name = array_registry[datatype.item_type]
else:
name = soap_type(datatype.item_type, False) + '_Array'
if ns:
name = 'types:' + name
return name
def soap_type(datatype, ns):
name = None
if wsme.types.isarray(datatype):
return soap_array(datatype, ns)
if wsme.types.isdict(datatype):
return None
if datatype in type_registry:
stype = type_registry[datatype]
if not ns:
stype = stype[3:]
return stype
if wsme.types.iscomplex(datatype):
name = datatype.__name__
if name and ns:
name = 'types:' + name
return name
if wsme.types.isusertype(datatype):
return soap_type(datatype.basetype, ns)
def soap_fname(path, funcdef):
return "".join([path[0]] + [i.capitalize() for i in path[1:]])
class SoapEncoder(object):
def __init__(self, types_ns):
self.types_ns = types_ns
def make_soap_element(self, datatype, tag, value, xsitype=None):
el = ET.Element(tag)
if value is None:
el.set(nil_qn, 'true')
elif xsitype is not None:
el.set(type_qn, xsitype)
el.text = value
elif wsme.types.isusertype(datatype):
return self.tosoap(datatype.basetype, tag,
datatype.tobasetype(value))
elif wsme.types.iscomplex(datatype):
el.set(type_qn, 'types:%s' % (datatype.__name__))
for attrdef in wsme.types.list_attributes(datatype):
attrvalue = getattr(value, attrdef.key)
if attrvalue is not wsme.types.Unset:
el.append(self.tosoap(
attrdef.datatype,
'{%s}%s' % (self.types_ns, attrdef.name),
attrvalue
))
else:
el.set(type_qn, type_registry.get(datatype))
if not isinstance(value, wsme.types.text):
value = wsme.types.text(value)
el.text = value
return el
@generic
def tosoap(self, datatype, tag, value):
"""Converts a value into xml Element objects for inclusion in the SOAP
response output (after adding the type to the type_registry).
If a non-complex user specific type is to be used in the api,
a specific toxml should be added::
from wsme.protocol.soap import tosoap, make_soap_element, \
type_registry
class MySpecialType(object):
pass
type_registry[MySpecialType] = 'xs:MySpecialType'
@tosoap.when_object(MySpecialType)
def myspecialtype_tosoap(datatype, tag, value):
return make_soap_element(datatype, tag, str(value))
"""
return self.make_soap_element(datatype, tag, value)
@tosoap.when_type(wsme.types.ArrayType)
def array_tosoap(self, datatype, tag, value):
el = ET.Element(tag)
el.set(type_qn, soap_array(datatype, self.types_ns))
if value is None:
el.set(nil_qn, 'true')
elif len(value) == 0:
el.append(ET.Element('item'))
else:
for item in value:
el.append(self.tosoap(datatype.item_type, 'item', item))
return el
@tosoap.when_object(bool)
def bool_tosoap(self, datatype, tag, value):
return self.make_soap_element(
datatype,
tag,
'true' if value is True else 'false' if value is False else None
)
@tosoap.when_object(wsme.types.bytes)
def bytes_tosoap(self, datatype, tag, value):
log.debug('(bytes_tosoap, %s, %s, %s, %s)', datatype,
tag, value, type(value))
if isinstance(value, wsme.types.bytes):
value = value.decode('ascii')
return self.make_soap_element(datatype, tag, value)
@tosoap.when_object(datetime.datetime)
def datetime_tosoap(self, datatype, tag, value):
return self.make_soap_element(
datatype,
tag,
value is not None and value.isoformat() or None
)
@tosoap.when_object(wsme.types.binary)
def binary_tosoap(self, datatype, tag, value):
log.debug("(%s, %s, %s)", datatype, tag, value)
value = base64.encodestring(value) if value is not None else None
if six.PY3:
value = value.decode('ascii')
return self.make_soap_element(
datatype.basetype, tag, value, 'xs:base64Binary'
)
@tosoap.when_object(None)
def None_tosoap(self, datatype, tag, value):
return self.make_soap_element(datatype, tag, None)
@generic
def fromsoap(datatype, el, ns):
"""
A generic converter from soap elements to python datatype.
If a non-complex user specific type is to be used in the api,
a specific fromsoap should be added.
"""
if el.get(nil_qn) == 'true':
return None
if datatype in type_registry:
value = datatype(el.text)
elif wsme.types.isusertype(datatype):
value = datatype.frombasetype(
fromsoap(datatype.basetype, el, ns))
else:
value = datatype()
for attr in wsme.types.list_attributes(datatype):
child = el.find('{%s}%s' % (ns['type'], attr.name))
if child is not None:
setattr(value, attr.key, fromsoap(attr.datatype, child, ns))
return value
@fromsoap.when_type(wsme.types.ArrayType)
def array_fromsoap(datatype, el, ns):
if len(el) == 1:
if datatype.item_type \
not in wsme.types.pod_types + wsme.types.dt_types \
and len(el[0]) == 0:
return []
return [fromsoap(datatype.item_type, child, ns) for child in el]
@fromsoap.when_object(wsme.types.bytes)
def bytes_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) not in (None, 'xs:string'):
raise exc.InvalidInput(el.tag, ET.tostring(el))
return el.text.encode('ascii') if el.text else six.b('')
@fromsoap.when_object(wsme.types.text)
def text_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) not in (None, 'xs:string'):
raise exc.InvalidInput(el.tag, ET.tostring(el))
return datatype(el.text if el.text else '')
@fromsoap.when_object(bool)
def bool_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) not in (None, 'xs:boolean'):
raise exc.InvalidInput(el.tag, ET.tostring(el))
return el.text.lower() != 'false'
@fromsoap.when_object(datetime.date)
def date_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) not in (None, 'xs:date'):
raise exc.InvalidInput(el.tag, ET.tostring(el))
return parse_isodate(el.text)
@fromsoap.when_object(datetime.time)
def time_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) not in (None, 'xs:time'):
raise exc.InvalidInput(el.tag, ET.tostring(el))
return parse_isotime(el.text)
@fromsoap.when_object(datetime.datetime)
def datetime_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) not in (None, 'xs:dateTime'):
raise exc.InvalidInput(el.tag, ET.tostring(el))
return parse_isodatetime(el.text)
@fromsoap.when_object(wsme.types.binary)
def binary_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) not in (None, 'xs:base64Binary'):
raise exc.InvalidInput(el.tag, ET.tostring(el))
return base64.decodestring(el.text.encode('ascii'))
class SoapProtocol(Protocol):
"""
SOAP protocol.
.. autoattribute:: name
.. autoattribute:: content_types
"""
name = 'soap'
displayname = 'SOAP'
content_types = ['application/soap+xml']
ns = {
"soap": "http://www.w3.org/2001/12/soap-envelope",
"soapenv": "http://schemas.xmlsoap.org/soap/envelope/",
"soapenc": "http://schemas.xmlsoap.org/soap/encoding/",
}
def __init__(self, tns=None, typenamespace=None, baseURL=None,
servicename='MyApp'):
self.tns = tns
self.typenamespace = typenamespace
self.servicename = servicename
self.baseURL = baseURL
self._name_mapping = {}
self.encoder = SoapEncoder(typenamespace)
def get_name_mapping(self, service=None):
if service not in self._name_mapping:
self._name_mapping[service] = dict(
(soap_fname(path, f), path)
for path, f in self.root.getapi()
if service is None or (path and path[0] == service)
)
return self._name_mapping[service]
def accept(self, req):
for ct in self.content_types:
if req.headers['Content-Type'].startswith(ct):
return True
if req.headers.get("Soapaction"):
return True
return False
def iter_calls(self, request):
yield CallContext(request)
def extract_path(self, context):
request = context.request
el = ET.fromstring(request.body)
body = el.find('{%(soapenv)s}Body' % self.ns)
# Extract the service name from the tns
message = list(body)[0]
fname = message.tag
if fname.startswith('{%s}' % self.typenamespace):
fname = fname[len(self.typenamespace) + 2:]
mapping = self.get_name_mapping()
if fname not in mapping:
raise exc.UnknownFunction(fname)
path = mapping[fname]
context.soap_message = message
return path
return None
def read_arguments(self, context):
kw = {}
if not hasattr(context, 'soap_message'):
return kw
msg = context.soap_message
for param in msg:
# FIX for python2.6 (only for lxml)
if use_lxml and isinstance(param, ET._Comment):
continue
name = param.tag[len(self.typenamespace) + 2:]
arg = context.funcdef.get_arg(name)
value = fromsoap(arg.datatype, param, {
'type': self.typenamespace,
})
kw[name] = value
wsme.runtime.check_arguments(context.funcdef, (), kw)
return kw
def soap_response(self, path, funcdef, result):
r = ET.Element('{%s}%sResponse' % (
self.typenamespace, soap_fname(path, funcdef)
))
log.debug('(soap_response, %s, %s)', funcdef.return_type, result)
r.append(self.encoder.tosoap(
funcdef.return_type, '{%s}result' % self.typenamespace, result
))
return r
def encode_result(self, context, result):
log.debug('(encode_result, %s)', result)
if use_lxml:
envelope = ET.Element(
Envelope_qn,
nsmap={'xs': xsd_ns, 'types': self.typenamespace}
)
else:
envelope = ET.Element(Envelope_qn, {
'xmlns:xs': xsd_ns,
'xmlns:types': self.typenamespace
})
body = ET.SubElement(envelope, Body_qn)
body.append(self.soap_response(context.path, context.funcdef, result))
s = ET.tostring(envelope)
return s
def get_template(self, name):
return pkg_resources.resource_string(
__name__, '%s.html' % name)
def encode_error(self, context, infos):
envelope = ET.Element(Envelope_qn)
body = ET.SubElement(envelope, Body_qn)
fault = ET.SubElement(body, Fault_qn)
ET.SubElement(fault, faultcode_qn).text = infos['faultcode']
ET.SubElement(fault, faultstring_qn).text = infos['faultstring']
if 'debuginfo' in infos:
ET.SubElement(fault, detail_qn).text = infos['debuginfo']
s = ET.tostring(envelope)
return s
@expose('/api.wsdl', 'text/xml')
def api_wsdl(self, service=None):
if service is None:
servicename = self.servicename
else:
servicename = self.servicename + service.capitalize()
return WSDLGenerator(
tns=self.tns,
types_ns=self.typenamespace,
soapenc=self.ns['soapenc'],
service_name=servicename,
complex_types=self.root.__registry__.complex_types,
funclist=self.root.getapi(),
arrays=self.root.__registry__.array_types,
baseURL=self.baseURL,
soap_array=soap_array,
soap_type=soap_type,
soap_fname=soap_fname,
).generate(True)
def encode_sample_value(self, datatype, value, format=False):
r = self.encoder.make_soap_element(datatype, 'value', value)
if format:
xml_indent(r)
return ('xml', six.text_type(r))
def xml_indent(elem, level=0):
i = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
for e in elem:
xml_indent(e, level + 1)
if not e.tail or not e.tail.strip():
e.tail = i
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i

View File

@ -1,107 +0,0 @@
import inspect
__all__ = ["generic"]
try:
from types import ClassType, InstanceType
classtypes = type, ClassType
except ImportError:
classtypes = type
InstanceType = None
def generic(func, argpos=None):
"""Create a simple generic function"""
if argpos is None:
if hasattr(func, 'argpos'):
argpos = func.argpos
else:
argnames = inspect.getargspec(func)[0]
if argnames and argnames[0] == 'self':
argpos = 1
else:
argpos = 0
_sentinel = object()
def _by_class(*args, **kw):
cls = args[argpos].__class__
for t in type(cls.__name__, (cls, object), {}).__mro__:
f = _gbt(t, _sentinel)
if f is not _sentinel:
return f(*args, **kw)
else:
return func(*args, **kw)
_by_type = {object: func, InstanceType: _by_class}
_gbt = _by_type.get
def when_type(*types):
"""Decorator to add a method that will be called for the given types"""
for t in types:
if not isinstance(t, classtypes):
raise TypeError(
"%r is not a type or class" % (t,)
)
def decorate(f):
for t in types:
if _by_type.setdefault(t, f) is not f:
raise TypeError(
"%r already has method for type %r" % (func, t)
)
return f
return decorate
_by_object = {}
_gbo = _by_object.get
def when_object(*obs):
"""Decorator to add a method to be called for the given object(s)"""
def decorate(f):
for o in obs:
if _by_object.setdefault(id(o), (o, f))[1] is not f:
raise TypeError(
"%r already has method for object %r" % (func, o)
)
return f
return decorate
def dispatch(*args, **kw):
f = _gbo(id(args[argpos]), _sentinel)
if f is _sentinel:
for t in type(args[argpos]).__mro__:
f = _gbt(t, _sentinel)
if f is not _sentinel:
return f(*args, **kw)
else:
return func(*args, **kw)
else:
return f[1](*args, **kw)
dispatch.__name__ = func.__name__
dispatch.__dict__ = func.__dict__.copy()
dispatch.__doc__ = func.__doc__
dispatch.__module__ = func.__module__
dispatch.when_type = when_type
dispatch.when_object = when_object
dispatch.default = func
dispatch.has_object = lambda o: id(o) in _by_object
dispatch.has_type = lambda t: t in _by_type
dispatch.argpos = argpos
return dispatch
def test_suite():
import doctest
return doctest.DocFileSuite(
'README.txt',
optionflags=doctest.ELLIPSIS | doctest.REPORT_ONLY_FIRST_FAILURE,
)
if __name__ == '__main__':
import unittest
r = unittest.TextTestRunner()
r.run(test_suite())

View File

@ -1,297 +0,0 @@
import six
import wsme.types
try:
from lxml import etree as ET
use_lxml = True
except ImportError:
from xml.etree import cElementTree as ET # noqa
use_lxml = False
def xml_tostring(el, pretty_print=False):
if use_lxml:
return ET.tostring(el, pretty_print=pretty_print)
return ET.tostring(el)
class NS(object):
def __init__(self, url):
self.url = url
def __call__(self, name):
return self.qn(name)
def __str__(self):
return self.url
def qn(self, name):
return '{%s}%s' % (self.url, name)
wsdl_ns = NS("http://schemas.xmlsoap.org/wsdl/")
soap_ns = NS("http://schemas.xmlsoap.org/wsdl/soap/")
xs_ns = NS("http://www.w3.org/2001/XMLSchema")
soapenc_ns = NS("http://schemas.xmlsoap.org/soap/encoding/")
class WSDLGenerator(object):
def __init__(
self,
tns,
types_ns,
soapenc,
service_name,
complex_types,
funclist,
arrays,
baseURL,
soap_array,
soap_type,
soap_fname):
self.tns = NS(tns)
self.types_ns = NS(types_ns)
self.soapenc = soapenc
self.service_name = service_name
self.complex_types = complex_types
self.funclist = funclist
self.arrays = arrays
self.baseURL = baseURL or ''
self.soap_array = soap_array
self.soap_fname = soap_fname
self.soap_type = soap_type
def gen_complex_type(self, cls):
complexType = ET.Element(xs_ns('complexType'))
complexType.set('name', cls.__name__)
sequence = ET.SubElement(complexType, xs_ns('sequence'))
for attrdef in wsme.types.list_attributes(cls):
soap_type = self.soap_type(attrdef.datatype, str(self.types_ns))
if soap_type is None:
continue
element = ET.SubElement(sequence, xs_ns('element'))
element.set('name', attrdef.name)
element.set('type', soap_type)
element.set('minOccurs', '1' if attrdef.mandatory else '0')
element.set('maxOccurs', '1')
return complexType
def gen_array(self, array):
complexType = ET.Element(xs_ns('complexType'))
complexType.set('name', self.soap_array(array, False))
ET.SubElement(
ET.SubElement(complexType, xs_ns('sequence')),
xs_ns('element'),
name='item',
maxOccurs='unbounded',
nillable='true',
type=self.soap_type(array.item_type, self.types_ns)
)
return complexType
def gen_function_types(self, path, funcdef):
args_el = ET.Element(
xs_ns('element'),
name=self.soap_fname(path, funcdef)
)
sequence = ET.SubElement(
ET.SubElement(args_el, xs_ns('complexType')),
xs_ns('sequence')
)
for farg in funcdef.arguments:
t = self.soap_type(farg.datatype, True)
if t is None:
continue
element = ET.SubElement(
sequence, xs_ns('element'),
name=farg.name,
type=self.soap_type(farg.datatype, True)
)
if not farg.mandatory:
element.set('minOccurs', '0')
response_el = ET.Element(
xs_ns('element'),
name=self.soap_fname(path, funcdef) + 'Response'
)
element = ET.SubElement(
ET.SubElement(
ET.SubElement(
response_el,
xs_ns('complexType')
),
xs_ns('sequence')
),
xs_ns('element'),
name='result'
)
return_soap_type = self.soap_type(funcdef.return_type, True)
if return_soap_type is not None:
element.set('type', return_soap_type)
return args_el, response_el
def gen_types(self):
types = ET.Element(wsdl_ns('types'))
schema = ET.SubElement(types, xs_ns('schema'))
schema.set('elementFormDefault', 'qualified')
schema.set('targetNamespace', str(self.types_ns))
for cls in self.complex_types:
schema.append(self.gen_complex_type(cls))
for array in self.arrays:
schema.append(self.gen_array(array))
for path, funcdef in self.funclist:
schema.extend(self.gen_function_types(path, funcdef))
return types
def gen_functions(self):
messages = []
binding = ET.Element(
wsdl_ns('binding'),
name='%s_Binding' % self.service_name,
type='tns:%s_PortType' % self.service_name
)
ET.SubElement(
binding,
soap_ns('binding'),
style='document',
transport='http://schemas.xmlsoap.org/soap/http'
)
portType = ET.Element(
wsdl_ns('portType'),
name='%s_PortType' % self.service_name
)
for path, funcdef in self.funclist:
soap_fname = self.soap_fname(path, funcdef)
# message
req_message = ET.Element(
wsdl_ns('message'),
name=soap_fname + 'Request',
xmlns=str(self.types_ns)
)
ET.SubElement(
req_message,
wsdl_ns('part'),
name='parameters',
element='types:%s' % soap_fname
)
messages.append(req_message)
res_message = ET.Element(
wsdl_ns('message'),
name=soap_fname + 'Response',
xmlns=str(self.types_ns)
)
ET.SubElement(
res_message,
wsdl_ns('part'),
name='parameters',
element='types:%sResponse' % soap_fname
)
messages.append(res_message)
# portType/operation
operation = ET.SubElement(
portType,
wsdl_ns('operation'),
name=soap_fname
)
if funcdef.doc:
ET.SubElement(
operation,
wsdl_ns('documentation')
).text = funcdef.doc
ET.SubElement(
operation, wsdl_ns('input'),
message='tns:%sRequest' % soap_fname
)
ET.SubElement(
operation, wsdl_ns('output'),
message='tns:%sResponse' % soap_fname
)
# binding/operation
operation = ET.SubElement(
binding,
wsdl_ns('operation'),
name=soap_fname
)
ET.SubElement(
operation,
soap_ns('operation'),
soapAction=soap_fname
)
ET.SubElement(
ET.SubElement(
operation,
wsdl_ns('input')
),
soap_ns('body'),
use='literal'
)
ET.SubElement(
ET.SubElement(
operation,
wsdl_ns('output')
),
soap_ns('body'),
use='literal'
)
return messages + [portType, binding]
def gen_service(self):
service = ET.Element(wsdl_ns('service'), name=self.service_name)
ET.SubElement(
service,
wsdl_ns('documentation')
).text = six.u('WSDL File for %s') % self.service_name
ET.SubElement(
ET.SubElement(
service,
wsdl_ns('port'),
binding='tns:%s_Binding' % self.service_name,
name='%s_PortType' % self.service_name
),
soap_ns('address'),
location=self.baseURL
)
return service
def gen_definitions(self):
attrib = {
'name': self.service_name,
'targetNamespace': str(self.tns)
}
if use_lxml:
definitions = ET.Element(
wsdl_ns('definitions'),
attrib=attrib,
nsmap={
'xs': str(xs_ns),
'soap': str(soap_ns),
'types': str(self.types_ns),
'tns': str(self.tns)
}
)
else:
definitions = ET.Element(wsdl_ns('definitions'), **attrib)
definitions.set('xmlns:types', str(self.types_ns))
definitions.set('xmlns:tns', str(self.tns))
definitions.set('name', self.service_name)
definitions.append(self.gen_types())
definitions.extend(self.gen_functions())
definitions.append(self.gen_service())
return definitions
def generate(self, format=False):
return xml_tostring(self.gen_definitions(), pretty_print=format)

View File

@ -1,423 +0,0 @@
import decimal
import datetime
import base64
import six
import wsme.tests.protocol
try:
import xml.etree.ElementTree as et
except ImportError:
import cElementTree as et # noqa
import suds.cache
import suds.client
import suds.transport
import wsme.utils
class XDecimal(suds.xsd.sxbuiltin.XBuiltin):
def translate(self, value, topython=True):
if topython:
if isinstance(value, six.string_types) and len(value):
return decimal.Decimal(value)
else:
if isinstance(value, (decimal.Decimal, int, float)):
return str(value)
return value
suds.xsd.sxbuiltin.Factory.tags['decimal'] = XDecimal
class WebtestSudsTransport(suds.transport.Transport):
def __init__(self, app):
suds.transport.Transport.__init__(self)
self.app = app
def open(self, request):
res = self.app.get(request.url, headers=request.headers)
return six.BytesIO(res.body)
def send(self, request):
res = self.app.post(
request.url,
request.message,
headers=dict((
(key, str(value)) for key, value in request.headers.items()
)),
expect_errors=True
)
return suds.transport.Reply(
res.status_int,
dict(res.headers),
res.body
)
class SudsCache(suds.cache.Cache):
def __init__(self):
self.d = {}
def get(self, id):
return self.d.get(id)
def getf(self, id):
b = self.get(id)
if b is not None:
return six.StringIO(self.get(id))
def put(self, id, bfr):
self.d[id] = bfr
def putf(self, id, fp):
self.put(id, fp.read())
def purge(self, id):
try:
del self.d[id]
except KeyError:
pass
def clear(self, id):
self.d = {}
sudscache = SudsCache()
tns = "http://foo.bar.baz/soap/"
typenamespace = "http://foo.bar.baz/types/"
soapenv_ns = 'http://schemas.xmlsoap.org/soap/envelope/'
xsi_ns = 'http://www.w3.org/2001/XMLSchema-instance'
body_qn = '{%s}Body' % soapenv_ns
fault_qn = '{%s}Fault' % soapenv_ns
faultcode_qn = '{%s}faultcode' % soapenv_ns
faultstring_qn = '{%s}faultstring' % soapenv_ns
faultdetail_qn = '{%s}detail' % soapenv_ns
type_qn = '{%s}type' % xsi_ns
nil_qn = '{%s}nil' % xsi_ns
def build_soap_message(method, params=""):
message = """<?xml version="1.0"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<soap:Body xmlns="%(typenamespace)s">
<%(method)s>
%(params)s
</%(method)s>
</soap:Body>
</soap:Envelope>
""" % dict(method=method, params=params, typenamespace=typenamespace)
return message
python_types = {
int: ('xs:int', str),
float: ('xs:float', str),
bool: ('xs:boolean', str),
wsme.types.bytes: (
'xs:string',
lambda x: x.decode('ascii') if isinstance(x, wsme.types.bytes) else x
),
wsme.types.text: ('xs:string', wsme.types.text),
wsme.types.binary: (
'xs:base64Binary',
lambda x: base64.encodestring(x).decode('ascii')
),
decimal.Decimal: ('xs:decimal', str),
datetime.date: ('xs:date', datetime.date.isoformat),
datetime.time: ('xs:time', datetime.time.isoformat),
datetime.datetime: ('xs:dateTime', datetime.datetime.isoformat),
}
array_types = {
wsme.types.bytes: "String_Array",
wsme.types.text: "String_Array",
int: "Int_Array",
float: "Float_Array",
bool: "Boolean_Array",
datetime.datetime: "dateTime_Array"
}
if not six.PY3:
array_types[long] = "Long_Array" # noqa
def tosoap(tag, value):
el = et.Element(tag)
if isinstance(value, tuple):
value, datatype = value
else:
datatype = type(value)
if value is None:
el.set('xsi:nil', 'true')
return el
if datatype in python_types:
stype, conv = python_types[datatype]
el.text = conv(value)
el.set('xsi:type', stype)
el.text = str(value)
return el
def tosuds(client, value):
if value is None:
return None
if isinstance(value, tuple):
value, datatype = value
else:
datatype = type(value)
if value is None:
return None
if isinstance(datatype, list):
if datatype[0] in array_types:
tname = array_types[datatype[0]]
else:
tname = datatype[0].__name__ + '_Array'
o = client.factory.create('types:' + tname)
o.item = [tosuds(client, (item, datatype[0])) for item in value]
return o
elif datatype in python_types:
return python_types[datatype][1](value)
else:
o = client.factory.create('types:' + datatype.__name__)
for attr in datatype._wsme_attributes:
if attr.name in value:
setattr(
o, attr.name,
tosuds(client, (value[attr.name], attr.datatype))
)
return o
def read_bool(value):
return value == 'true'
soap_types = {
'xs:string': wsme.types.text,
'xs:int': int,
'xs:long': int if six.PY3 else long, # noqa
'xs:float': float,
'xs:decimal': decimal.Decimal,
'xs:boolean': read_bool,
'xs:date': wsme.utils.parse_isodate,
'xs:time': wsme.utils.parse_isotime,
'xs:dateTime': wsme.utils.parse_isodatetime,
'xs:base64Binary': base64.decodestring,
}
def fromsoap(el):
if el.get(nil_qn) == 'true':
return None
t = el.get(type_qn)
if t == 'xs:string':
return wsme.types.text(el.text if el.text else '')
if t in soap_types:
return soap_types[t](el.text)
elif t and t.endswith('_Array'):
return [fromsoap(i) for i in el]
else:
d = {}
for child in el:
name = child.tag
assert name.startswith('{%s}' % typenamespace), name
name = name[len(typenamespace) + 2:]
d[name] = fromsoap(child)
return d
def tobytes(value):
if isinstance(value, wsme.types.text):
value = value.encode()
return value
def tobin(value):
value = base64.decodestring(value.encode())
return value
fromsuds_types = {
wsme.types.binary: tobin,
wsme.types.bytes: tobytes,
decimal.Decimal: decimal.Decimal,
}
def fromsuds(dt, value):
if value is None:
return None
if isinstance(dt, list):
return [fromsuds(dt[0], item) for item in value.item]
if wsme.types.isarray(dt):
return [fromsuds(dt.item_type, item) for item in value.item]
if wsme.types.isusertype(dt) and dt not in fromsuds_types:
dt = dt.basetype
if dt in fromsuds_types:
print(dt, value)
value = fromsuds_types[dt](value)
print(value)
return value
if wsme.types.iscomplex(dt):
d = {}
for attrdef in dt._wsme_attributes:
if not hasattr(value, attrdef.name):
continue
d[attrdef.name] = fromsuds(
attrdef.datatype, getattr(value, attrdef.name)
)
return d
return value
class TestSOAP(wsme.tests.protocol.ProtocolTestCase):
protocol = 'soap'
protocol_options = dict(tns=tns, typenamespace=typenamespace)
ws_path = '/'
_sudsclient = None
def setUp(self):
wsme.tests.protocol.ProtocolTestCase.setUp(self)
def test_simple_call(self):
message = build_soap_message('touch')
print(message)
res = self.app.post(
self.ws_path,
message,
headers={"Content-Type": "application/soap+xml; charset=utf-8"},
expect_errors=True
)
print(res.body)
assert res.status.startswith('200')
def call(self, fpath, _rt=None, _accept=None, _no_result_decode=False,
**kw):
if _no_result_decode or _accept or self._testMethodName in (
'test_missing_argument', 'test_invalid_path', 'test_settext_empty',
'test_settext_none'
):
return self.raw_call(fpath, _rt, _accept, _no_result_decode, **kw)
path = fpath.strip('/').split('/')
methodname = ''.join([path[0]] + [i.capitalize() for i in path[1:]])
m = getattr(self.sudsclient.service, methodname)
kw = dict((
(key, tosuds(self.sudsclient, value)) for key, value in kw.items()
))
print(kw)
try:
return fromsuds(_rt, m(**kw))
except suds.WebFault as exc:
raise wsme.tests.protocol.CallException(
exc.fault.faultcode,
exc.fault.faultstring,
getattr(exc.fault, 'detail', None) or None
)
def raw_call(self, fpath, _rt=None, _accept=None, _no_result_decode=False,
**kw):
path = fpath.strip('/').split('/')
methodname = ''.join([path[0]] + [i.capitalize() for i in path[1:]])
# get the actual definition so we can build the adequate request
if kw:
el = et.Element('parameters')
for key, value in kw.items():
el.append(tosoap(key, value))
params = six.b("\n").join(et.tostring(el) for el in el)
else:
params = ""
methodname = ''.join([path[0]] + [i.capitalize() for i in path[1:]])
message = build_soap_message(methodname, params)
print(message)
headers = {"Content-Type": "application/soap+xml; charset=utf-8"}
if _accept is not None:
headers['Accept'] = _accept
res = self.app.post(
self.ws_path,
message,
headers=headers,
expect_errors=True
)
print("Status: ", res.status, "Received:", res.body)
if _no_result_decode:
return res
el = et.fromstring(res.body)
body = el.find(body_qn)
print(body)
if res.status_int == 200:
response_tag = '{%s}%sResponse' % (typenamespace, methodname)
r = body.find(response_tag)
result = r.find('{%s}result' % typenamespace)
print("Result element: ", result)
return fromsoap(result)
elif res.status_int == 400:
fault = body.find(fault_qn)
raise wsme.tests.protocol.CallException(
fault.find(faultcode_qn).text,
fault.find(faultstring_qn).text,
"")
elif res.status_int == 500:
fault = body.find(fault_qn)
raise wsme.tests.protocol.CallException(
fault.find(faultcode_qn).text,
fault.find(faultstring_qn).text,
fault.find(faultdetail_qn) is not None and
fault.find(faultdetail_qn).text or None)
@property
def sudsclient(self):
if self._sudsclient is None:
self._sudsclient = suds.client.Client(
self.ws_path + 'api.wsdl',
transport=WebtestSudsTransport(self.app),
cache=sudscache
)
return self._sudsclient
def test_wsdl(self):
c = self.sudsclient
assert c.wsdl.tns[1] == tns, c.wsdl.tns
sd = c.sd[0]
assert len(sd.ports) == 1
port, methods = sd.ports[0]
self.assertEqual(len(methods), 51)
methods = dict(methods)
assert 'returntypesGettext' in methods
print(methods)
assert methods['argtypesSettime'][0][0] == 'value'
def test_return_nesteddict(self):
pass
def test_setnesteddict(self):
pass
def test_return_objectdictattribute(self):
pass
def test_setnested_nullobj(self):
pass # TODO write a soap adapted version of this test.