f91767bf1e
Change-Id: I2adc6a5cbd76edd16e6079046b67b8fc10bf1671
244 lines
7.1 KiB
Python
244 lines
7.1 KiB
Python
import base64
|
|
import datetime
|
|
import decimal
|
|
|
|
try:
|
|
import simplejson as json
|
|
except ImportError:
|
|
import json # noqa
|
|
|
|
import wsme.tests.protocol
|
|
from wsme.utils import parse_isodatetime, parse_isodate, parse_isotime
|
|
from wsme.types import isarray, isdict, isusertype
|
|
|
|
import six
|
|
|
|
if six.PY3:
|
|
from urllib.parse import urlencode
|
|
else:
|
|
from urllib import urlencode # noqa
|
|
|
|
|
|
def encode_arg(value):
|
|
if isinstance(value, tuple):
|
|
value, datatype = value
|
|
else:
|
|
datatype = type(value)
|
|
|
|
if isinstance(datatype, list):
|
|
value = [encode_arg((item, datatype[0])) for item in value]
|
|
elif isinstance(datatype, dict):
|
|
key_type, value_type = list(datatype.items())[0]
|
|
value = dict((
|
|
(encode_arg((key, key_type)),
|
|
encode_arg((value, value_type)))
|
|
for key, value in value.items()
|
|
))
|
|
elif datatype in (datetime.date, datetime.time, datetime.datetime):
|
|
value = value.isoformat()
|
|
elif datatype == wsme.types.binary:
|
|
value = base64.encodestring(value).decode('ascii')
|
|
elif datatype == wsme.types.bytes:
|
|
value = value.decode('ascii')
|
|
elif datatype == decimal.Decimal:
|
|
value = str(value)
|
|
return value
|
|
|
|
|
|
def decode_result(value, datatype):
|
|
if value is None:
|
|
return None
|
|
if datatype == wsme.types.binary:
|
|
value = base64.decodestring(value.encode('ascii'))
|
|
return value
|
|
if isusertype(datatype):
|
|
datatype = datatype.basetype
|
|
if isinstance(datatype, list):
|
|
value = [decode_result(item, datatype[0]) for item in value]
|
|
elif isarray(datatype):
|
|
value = [decode_result(item, datatype.item_type) for item in value]
|
|
elif isinstance(datatype, dict):
|
|
key_type, value_type = list(datatype.items())[0]
|
|
value = dict((
|
|
(decode_result(key, key_type),
|
|
decode_result(value, value_type))
|
|
for key, value in value.items()
|
|
))
|
|
elif isdict(datatype):
|
|
key_type, value_type = datatype.key_type, datatype.value_type
|
|
value = dict((
|
|
(decode_result(key, key_type),
|
|
decode_result(value, value_type))
|
|
for key, value in value.items()
|
|
))
|
|
elif datatype == datetime.time:
|
|
value = parse_isotime(value)
|
|
elif datatype == datetime.date:
|
|
value = parse_isodate(value)
|
|
elif datatype == datetime.datetime:
|
|
value = parse_isodatetime(value)
|
|
elif hasattr(datatype, '_wsme_attributes'):
|
|
for attr in datatype._wsme_attributes:
|
|
if attr.key not in value:
|
|
continue
|
|
value[attr.key] = decode_result(value[attr.key], attr.datatype)
|
|
elif datatype == decimal.Decimal:
|
|
value = decimal.Decimal(value)
|
|
elif datatype == wsme.types.bytes:
|
|
value = value.encode('ascii')
|
|
elif datatype is not None and type(value) != datatype:
|
|
value = datatype(value)
|
|
return value
|
|
|
|
|
|
class TestExtDirectProtocol(wsme.tests.protocol.ProtocolTestCase):
|
|
protocol = 'extdirect'
|
|
protocol_options = {
|
|
'namespace': 'MyNS.api',
|
|
'nsfolder': 'app'
|
|
}
|
|
|
|
def call(self, fname, _rt=None, _no_result_decode=False, _accept=None,
|
|
**kw):
|
|
path = fname.split('/')
|
|
try:
|
|
func, funcdef, args = self.root._lookup_function(path)
|
|
arguments = funcdef.arguments
|
|
except:
|
|
arguments = []
|
|
if len(path) == 1:
|
|
ns, action, fname = '', '', path[0]
|
|
elif len(path) == 2:
|
|
ns, action, fname = '', path[0], path[1]
|
|
else:
|
|
ns, action, fname = '.'.join(path[:-2]), path[-2], path[-1]
|
|
print(kw)
|
|
|
|
args = [
|
|
dict(
|
|
(arg.name, encode_arg(kw[arg.name]))
|
|
for arg in arguments if arg.name in kw
|
|
)
|
|
]
|
|
print("args =", args)
|
|
data = json.dumps({
|
|
'type': 'rpc',
|
|
'tid': 0,
|
|
'action': action,
|
|
'method': fname,
|
|
'data': args,
|
|
})
|
|
print(data)
|
|
headers = {'Content-Type': 'application/json'}
|
|
if _accept:
|
|
headers['Accept'] = _accept
|
|
res = self.app.post('/extdirect/router/%s' % ns, data, headers=headers,
|
|
expect_errors=True)
|
|
|
|
print(res.body)
|
|
|
|
if _no_result_decode:
|
|
return res
|
|
|
|
data = json.loads(res.text)
|
|
if data['type'] == 'rpc':
|
|
r = data['result']
|
|
return decode_result(r, _rt)
|
|
elif data['type'] == 'exception':
|
|
faultcode, faultstring = data['message'].split(': ', 1)
|
|
debuginfo = data.get('where')
|
|
raise wsme.tests.protocol.CallException(
|
|
faultcode, faultstring, debuginfo)
|
|
|
|
def test_api_alias(self):
|
|
assert self.root._get_protocol('extdirect').api_alias == '/app/api.js'
|
|
|
|
def test_get_api(self):
|
|
res = self.app.get('/app/api.js')
|
|
print(res.body)
|
|
assert res.body
|
|
|
|
def test_positional(self):
|
|
self.root._get_protocol('extdirect').default_params_notation = \
|
|
'positional'
|
|
|
|
data = json.dumps({
|
|
'type': 'rpc',
|
|
'tid': 0,
|
|
'action': 'misc',
|
|
'method': 'multiply',
|
|
'data': [2, 5],
|
|
})
|
|
headers = {'Content-Type': 'application/json'}
|
|
res = self.app.post('/extdirect/router', data, headers=headers)
|
|
|
|
print(res.body)
|
|
|
|
data = json.loads(res.text)
|
|
assert data['type'] == 'rpc'
|
|
r = data['result']
|
|
assert r == 10
|
|
|
|
def test_batchcall(self):
|
|
data = json.dumps([{
|
|
'type': 'rpc',
|
|
'tid': 1,
|
|
'action': 'argtypes',
|
|
'method': 'setdate',
|
|
'data': [{'value': '2011-04-06'}],
|
|
}, {
|
|
'type': 'rpc',
|
|
'tid': 2,
|
|
'action': 'returntypes',
|
|
'method': 'getbytes',
|
|
'data': []
|
|
}])
|
|
print(data)
|
|
headers = {'Content-Type': 'application/json'}
|
|
res = self.app.post('/extdirect/router', data, headers=headers)
|
|
|
|
print(res.body)
|
|
|
|
rdata = json.loads(res.text)
|
|
|
|
assert len(rdata) == 2
|
|
|
|
assert rdata[0]['tid'] == 1
|
|
assert rdata[0]['result'] == '2011-04-06'
|
|
assert rdata[1]['tid'] == 2
|
|
assert rdata[1]['result'] == 'astring'
|
|
|
|
def test_form_call(self):
|
|
params = {
|
|
'value[0].inner.aint': 54,
|
|
'value[1].inner.aint': 55,
|
|
'extType': 'rpc',
|
|
'extTID': 1,
|
|
'extAction': 'argtypes',
|
|
'extMethod': 'setnestedarray',
|
|
}
|
|
|
|
body = urlencode(params)
|
|
r = self.app.post(
|
|
'/extdirect/router',
|
|
body,
|
|
headers={'Content-Type': 'application/x-www-form-urlencoded'}
|
|
)
|
|
print (r)
|
|
|
|
assert json.loads(r.text) == {
|
|
"tid": "1",
|
|
"action": "argtypes",
|
|
"type": "rpc",
|
|
"method": "setnestedarray",
|
|
"result": [{
|
|
"inner": {
|
|
"aint": 54
|
|
}
|
|
}, {
|
|
"inner": {
|
|
"aint": 55
|
|
}
|
|
}]
|
|
}
|