The soap test case now fully pass. We are getting closer to a working implementation, but I think I messed up the namespaces.

This commit is contained in:
Christophe de Vienne
2011-09-24 22:04:45 +02:00
parent 6706ff4ee8
commit d9b7e4ef5d
4 changed files with 150 additions and 59 deletions

View File

@@ -71,6 +71,11 @@ class FunctionDefinition(object):
func._wsme_definition = fd
return fd
def get_arg(self, name):
for arg in self.arguments:
if arg.name == name:
return arg
return None
def register_protocol(protocol):
global registered_protocols

View File

@@ -19,10 +19,13 @@ from genshi.builder import tag, Element, Namespace
from genshi.template import MarkupTemplate
from wsme.controller import register_protocol, pexpose
import wsme.types
from wsme import exc
from wsme.utils import *
xsi_ns = 'http://www.w3.org/2001/XMLSchema-instance'
type_qn = '{%s}type' % xsi_ns
nil_qn = '{%s}nil' % xsi_ns
type_registry = {
@@ -77,18 +80,55 @@ def binary_tosoap(datatype, tag, value):
def None_tosoap(datatype, tag, value):
return make_soap_element(datatype, tag, None)
@generic
def fromsoap(datatype, element):
return None
@fromsoap.when_object(int)
def int_fromsoap(datatype, el):
def fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) != 'xsi:int':
raise exc.InvalidInput(el.tag, str(el))
return int(el.text)
soaptype = el.get(type_qn)
if datatype in type_registry:
if soaptype != type_registry[datatype]:
raise exc.InvalidInput(el.tag, et.tostring(el))
value = datatype(el.text)
else:
if soaptype != datatype.__name__:
raise exc.InvalidInput(el.tag, et.tostring(el))
value = datatype()
for name, attr in wsme.types.list_attributes(datatype):
child = el.find('{%s}%s' % (ns['type'], name))
setattr(value, name, fromsoap(attr.datatype, child, ns))
return value
@fromsoap.when_object(datetime.date)
def date_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) != 'xsd:date':
raise exc.InvalidInput(el.tag, et.tostring(el))
return parse_isodate(el.text)
@fromsoap.when_object(datetime.time)
def time_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) != 'xsd:time':
raise exc.InvalidInput(el.tag, et.tostring(el))
return parse_isotime(el.text)
@fromsoap.when_object(datetime.datetime)
def datetime_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) != 'xsd:dateTime':
raise exc.InvalidInput(el.tag, et.tostring(el))
return parse_isodatetime(el.text)
@fromsoap.when_object(wsme.types.binary)
def binary_fromsoap(datatype, el, ns):
if el.get(nil_qn) == 'true':
return None
if el.get(type_qn) != 'xsd:base64Binary':
raise exc.InvalidInput(el.tag, et.tostring(el))
return base64.decodestring(el.text)
class SoapProtocol(object):
name = 'SOAP'
@@ -133,15 +173,35 @@ class SoapProtocol(object):
el = et.fromstring(request.body)
body = el.find('{%(soapenv)s}Body' % self.ns)
# Extract the service name from the tns
fname = list(body)[0].tag
message = list(body)[0]
fname = message.tag
if fname.startswith('{%s}' % self.typenamespace):
fname = fname[len(self.typenamespace)+2:]
print fname
return self.get_name_mapping()[fname]
mapping = self.get_name_mapping()
if fname not in mapping:
raise exc.UnknownFunction(fname)
path = mapping[fname]
request._wsme_soap_message = message
return path
return None
def read_arguments(self, funcdef, request):
return {}
kw = {}
if not hasattr(request, '_wsme_soap_message'):
return kw
msg = request._wsme_soap_message
parameters = msg.find('{%s}parameters' % self.typenamespace)
print parameters
if parameters:
for param in parameters:
name = param.tag[len(self.typenamespace)+2:]
arg = funcdef.get_arg(name)
value = fromsoap(arg.datatype, param, {
'type': self.typenamespace,
})
kw[name] = value
return kw
def soap_response(self, funcdef, result):
r = Element(self.soap_fname(funcdef) + 'Response')

View File

@@ -196,7 +196,7 @@ class ProtocolTestCase(unittest.TestCase):
assert "No error raised"
except CallException, e:
assert e.faultcode == 'Client'
assert e.faultstring == u'Unknown function name: invalid_function'
assert e.faultstring.lower() == u'unknown function name: invalid_function'
def test_serverside_error(self):
try:
@@ -253,42 +253,50 @@ class ProtocolTestCase(unittest.TestCase):
assert r == {'inner': {'aint': 0}} or r == {'inner': {'aint': '0'}}, r
def test_setstr(self):
assert self.call('argtypes/setstr', value='astring') in ('astring',)
assert self.call('argtypes/setstr', value='astring') == 'astring'
def test_setunicode(self):
assert self.call('argtypes/setunicode', value=u'') in (u'',)
assert self.call('argtypes/setunicode', value=u'',
_rt=unicode) == u''
def test_setint(self):
assert self.call('argtypes/setint', value=3) in (3, '3')
r = self.call('argtypes/setint', value=3)
assert r == 3, r
def test_setfloat(self):
assert self.call('argtypes/setfloat', value=3.54) in (3.54, '3.54')
assert self.call('argtypes/setfloat', value=3.54,
_rt=float) == 3.54
def test_setdecimal(self):
assert self.call('argtypes/setdecimal', value='3.14') in (
'3.14', decimal.Decimal('3.14'))
value = decimal.Decimal('3.14')
assert self.call('argtypes/setdecimal', value=value,
_rt=decimal.Decimal) == value
def test_setdate(self):
assert self.call('argtypes/setdate', value='2008-04-06') in (
datetime.date(2008, 4, 6), '2008-04-06')
value = datetime.date(2008, 4, 6)
r = self.call('argtypes/setdate', value=value,
_rt=datetime.date)
assert r == value
def test_settime(self):
assert self.call('argtypes/settime', value='12:12:15') \
in ('12:12:15', datetime.time(12, 12, 15))
value = datetime.time(12, 12, 15)
r = self.call('argtypes/settime', value=value,
_rt=datetime.time)
assert r == datetime.time(12, 12, 15)
def test_setdatetime(self):
assert self.call('argtypes/setdatetime', value='2008-04-06T12:12:15') \
in ('2008-04-06T12:12:15',
datetime.datetime(2008, 4, 6, 12, 12, 15))
value = datetime.datetime(2008, 4, 6, 12, 12, 15)
r = self.call('argtypes/setdatetime', value=value,
_rt=datetime.datetime)
assert r == datetime.datetime(2008, 4, 6, 12, 12, 15)
def test_setbinary(self):
r = self.call('argtypes/setbinary',
value=base64.encodestring(binarysample))
assert r == binarysample or r == base64.encodestring(binarysample), r
value = binarysample
r = self.call('argtypes/setbinary', value=(value, wsme.types.binary),
_rt=wsme.types.binary) == value
def test_setnested(self):
value = {'inner': {'aint': 54}}
assert self.call('argtypes/setnested',
value={'inner': {'aint': 54}}) in (
{'inner': {'aint': 54}},
{'inner': {'aint': '54'}},
)
value=(value, NestedOuter),
_rt=NestedOuter) == value

View File

@@ -45,28 +45,38 @@ soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
return message
def dumpxml(key, obj):
el = et.Element(key)
if isinstance(obj, basestring):
el.text = obj
elif type(obj) in (int, float, decimal.Decimal):
el.text = str(obj)
elif type(obj) in (datetime.date, datetime.time, datetime.datetime):
el.text = obj.isoformat()
elif type(obj) == dict:
for key, obj in obj.items():
el.append(dumpxml(key, obj))
return el
python_types = {
int: ('xsd:int', str),
float: ('xsd:float', str),
str: ('xsd:string', str),
unicode: ('xsd:string', unicode),
wsme.types.binary: ('xsd:base64Binary', base64.encodestring),
decimal.Decimal: ('xsd:decimal', str),
datetime.date: ('xsd:date', datetime.date.isoformat),
datetime.time: ('xsd:time', datetime.time.isoformat),
datetime.datetime: ('xsd:dateTime', datetime.datetime.isoformat),
}
def loadxml(el):
if len(el):
d = {}
for child in el:
d[child.tag] = loadxml(child)
return d
def tosoap(tag, value):
if isinstance(value, tuple):
value, datatype = value
else:
return el.text
datatype = type(value)
print datatype
el = et.Element(tag)
if value is None:
el.set('xsi:nil', True)
elif datatype in python_types:
stype, conv = python_types[datatype]
el.text = conv(value)
el.set('xsi:type', stype)
else:
el.set('xsi:type', datatype.__name__)
for name, attr in datatype._wsme_attributes:
if name in value:
el.append(tosoap(name, (value[name], attr.datatype)))
return el
def read_bool(value):
return value == 'true'
@@ -117,12 +127,20 @@ class TestSOAP(wsme.tests.protocol.ProtocolTestCase):
print res.body
assert res.status.startswith('200')
def call(self, fpath, **kw):
def call(self, fpath, _rt=None, **kw):
path = fpath.strip('/').split('/')
# get the actual definition so we can build the adequate request
params = ""
if kw:
el = et.Element('parameters')
for key, value in kw.items():
el.append(tosoap(key, value))
params = et.tostring(el)
else:
params = ""
methodname = ''.join((i.capitalize() for i in path))
message = build_soap_message(methodname, params)
print message
res = self.app.post('/', message,
headers={
"Content-Type": "application/soap+xml; charset=utf-8"
@@ -148,9 +166,9 @@ class TestSOAP(wsme.tests.protocol.ProtocolTestCase):
elif res.status_int == 500:
fault = body.find(fault_qn)
raise wsme.tests.protocol.CallException(
fault.find('faultcode').text,
fault.find('faultstring').text,
fault.find('detail').text)
fault.find(faultcode_qn).text,
fault.find(faultstring_qn).text,
fault.find(faultdetail_qn).text)
if el.tag == 'error':