Porting restjson to Python 3.2 (in progress)

This commit is contained in:
Christophe de Vienne
2012-04-24 12:46:31 +02:00
parent 423c422aca
commit 2ef74e85b5
5 changed files with 135 additions and 107 deletions

View File

@@ -1,4 +1,7 @@
import logging
import six
from six import u
from wsme.exc import ClientSideError, UnknownArgument
from wsme.protocols import CallContext, Protocol
@@ -59,6 +62,8 @@ class RestProtocol(Protocol):
else:
if body is None:
body = request.body
if isinstance(body, six.binary_type):
body = body.decode('utf8')
if body:
parsed_args = self.parse_args(body)
else:
@@ -74,5 +79,5 @@ class RestProtocol(Protocol):
kw[arg.name] = self.decode_arg(value, arg)
if parsed_args:
raise UnknownArgument(parsed_args.keys()[0])
raise UnknownArgument(u(', ').join(parsed_args.keys()))
return kw

View File

@@ -4,6 +4,8 @@ REST+Json protocol implementation.
import datetime
import decimal
import six
from simplegeneric import generic
from wsme.protocols.rest import RestProtocol
@@ -44,6 +46,13 @@ def tojson(datatype, value):
return value
@tojson.when_object(six.binary_type)
def bytes_tojson(datatype, value):
if value is None:
return None
return value.decode('ascii')
@tojson.when_type(list)
def array_tojson(datatype, value):
if value is None:
@@ -55,8 +64,7 @@ def array_tojson(datatype, value):
def dict_tojson(datatype, value):
if value is None:
return None
key_type = datatype.keys()[0]
value_type = datatype.values()[0]
key_type, value_type = list(datatype.items())[0]
return dict((
(tojson(key_type, item[0]), tojson(value_type, item[1]))
for item in value.items()
@@ -134,18 +142,17 @@ def array_fromjson(datatype, value):
def dict_fromjson(datatype, value):
if value is None:
return None
key_type = datatype.keys()[0]
value_type = datatype.values()[0]
key_type, value_type = list(datatype.items())[0]
return dict((
(fromjson(key_type, item[0]), fromjson(value_type, item[1]))
for item in value.items()))
@fromjson.when_object(str)
@fromjson.when_object(six.binary_type)
def str_fromjson(datatype, value):
if value is None:
return None
return str(value)
return value.encode('utf8')
@fromjson.when_object(decimal.Decimal)
@@ -206,16 +213,17 @@ class RestJsonProtocol(RestProtocol):
def parse_args(self, body):
raw_args = json.loads(body)
print(raw_args)
return raw_args
def encode_result(self, context, result):
r = tojson(context.funcdef.return_type, result)
if self.nest_result:
r = {'result': r}
return json.dumps(r, ensure_ascii=False).encode('utf8')
return json.dumps(r)
def encode_error(self, context, errordetail):
return json.dumps(errordetail, encoding='utf-8')
return json.dumps(errordetail)
def encode_sample_value(self, datatype, value, format=False):
r = tojson(datatype, value)

View File

@@ -2,7 +2,7 @@ import logging
import sys
import traceback
import weakref
import six
from six import u, b
import webob
@@ -166,7 +166,7 @@ class WSRoot(object):
def _handle_request(self, request):
def default_prepare_response_body(request, results):
return '\n'.join(results)
return u('\n').join(results)
res = webob.Response()
res_content_type = None
@@ -207,10 +207,7 @@ class WSRoot(object):
body = prepare_response_body(request, (
self._do_call(protocol, context)
for context in protocol.iter_calls(request)))
if isinstance(body, six.text_type):
res.unicode_body = body
else:
res.body = body
res.unicode_body = body
if len(request.calls) == 1:
if hasattr(protocol, 'get_response_status'):

View File

@@ -5,6 +5,10 @@ import warnings
import datetime
import decimal
import base64
import sys
import six
from six import u, b
from webtest import TestApp
@@ -19,7 +23,8 @@ binarysample = r'\x00\xff\x43'
try:
1 / 0
except ZeroDivisionError, e:
except ZeroDivisionError:
e = sys.exc_info()[1]
zerodivisionerrormsg = str(e)
@@ -71,13 +76,13 @@ class NestedOuterApi(object):
class ReturnTypes(object):
@expose(str)
@expose(six.binary_type)
def getstr(self):
return "astring"
@expose(unicode)
@expose(six.text_type)
def getunicode(self):
return u""
return u('\xe3\x81\xae')
@expose(int)
def getint(self):
@@ -112,7 +117,7 @@ class ReturnTypes(object):
n = NestedOuter()
return n
@expose([str])
@expose([six.binary_type])
def getstrarray(self):
return ["A", "B", "C"]
@@ -120,7 +125,7 @@ class ReturnTypes(object):
def getnestedarray(self):
return [NestedOuter(), NestedOuter()]
@expose({str: NestedOuter})
@expose({six.binary_type: NestedOuter})
def getnesteddict(self):
return {'a': NestedOuter(), 'b': NestedOuter()}
@@ -134,88 +139,88 @@ class ReturnTypes(object):
class ArgTypes(object):
@expose(str)
@validate(str)
@expose(six.binary_type)
@validate(six.binary_type)
def setstr(self, value):
print repr(value)
assert type(value) == str
print(repr(value))
assert type(value) == six.binary_type
return value
@expose(unicode)
@validate(unicode)
@expose(six.text_type)
@validate(six.text_type)
def setunicode(self, value):
print repr(value)
assert type(value) == unicode
print(repr(value))
assert type(value) == six.text_type
return value
@expose(bool)
@validate(bool)
def setbool(self, value):
print repr(value)
print(repr(value))
assert type(value) == bool
return value
@expose(int)
@validate(int)
def setint(self, value):
print repr(value)
print(repr(value))
assert type(value) == int
return value
@expose(float)
@validate(float)
def setfloat(self, value):
print repr(value)
print(repr(value))
assert type(value) == float
return value
@expose(decimal.Decimal)
@validate(decimal.Decimal)
def setdecimal(self, value):
print repr(value)
print(repr(value))
assert type(value) == decimal.Decimal
return value
@expose(datetime.date)
@validate(datetime.date)
def setdate(self, value):
print repr(value)
print(repr(value))
assert type(value) == datetime.date
return value
@expose(datetime.time)
@validate(datetime.time)
def settime(self, value):
print repr(value)
print(repr(value))
assert type(value) == datetime.time
return value
@expose(datetime.datetime)
@validate(datetime.datetime)
def setdatetime(self, value):
print repr(value)
print(repr(value))
assert type(value) == datetime.datetime
return value
@expose(wsme.types.binary)
@validate(wsme.types.binary)
def setbinary(self, value):
print repr(value)
assert type(value) == str
print(repr(value))
assert type(value) == six.binary_type
return value
@expose([str])
@validate([str])
@expose([six.binary_type])
@validate([six.binary_type])
def setstrarray(self, value):
print repr(value)
print(repr(value))
assert type(value) == list
assert type(value[0]) == str
assert type(value[0]) == six.binary_type, type(value[0])
return value
@expose([datetime.datetime])
@validate([datetime.datetime])
def setdatetimearray(self, value):
print repr(value)
print(repr(value))
assert type(value) == list
assert type(value[0]) == datetime.datetime
return value
@@ -223,38 +228,38 @@ class ArgTypes(object):
@expose(NestedOuter)
@validate(NestedOuter)
def setnested(self, value):
print repr(value)
print(repr(value))
assert type(value) == NestedOuter
return value
@expose([NestedOuter])
@validate([NestedOuter])
def setnestedarray(self, value):
print repr(value)
print(repr(value))
assert type(value) == list
assert type(value[0]) == NestedOuter
return value
@expose({str: NestedOuter})
@validate({str: NestedOuter})
@expose({six.binary_type: NestedOuter})
@validate({six.binary_type: NestedOuter})
def setnesteddict(self, value):
print repr(value)
print(repr(value))
assert type(value) == dict
assert type(value.keys()[0]) == str
assert type(value.values()[0]) == NestedOuter
assert type(list(value.keys())[0]) == six.binary_type
assert type(list(value.values())[0]) == NestedOuter
return value
@expose(myenumtype)
@validate(myenumtype)
def setenum(self, value):
print value
assert type(value) == str
print(value)
assert type(value) == six.binary_type
return value
@expose(NamedAttrsObject)
@validate(NamedAttrsObject)
def setnamedattrsobj(self, value):
print value
print(value)
assert type(value) == NamedAttrsObject
assert value.attr_1 == 10, value.attr_1
assert value.attr_2 == 20, value.attr_2
@@ -302,20 +307,22 @@ class ProtocolTestCase(unittest.TestCase):
def test_invalid_path(self):
try:
res = self.call('invalid_function')
print res
print(res)
assert "No error raised"
except CallException, e:
except CallException:
e = sys.exc_info()[1]
assert e.faultcode == 'Client'
assert e.faultstring.lower() == \
u'unknown function name: invalid_function'
u('unknown function name: invalid_function')
def test_serverside_error(self):
try:
res = self.call('witherrors/divide_by_zero')
print res
print(res)
assert "No error raised"
except CallException, e:
print e
except CallException:
e = sys.exc_info()[1]
print(e)
assert e.faultcode == 'Server'
assert e.faultstring == zerodivisionerrormsg
assert e.debuginfo is not None
@@ -324,10 +331,11 @@ class ProtocolTestCase(unittest.TestCase):
self.root._debug = False
try:
res = self.call('witherrors/divide_by_zero')
print res
print(res)
assert "No error raised"
except CallException, e:
print e
except CallException:
e = sys.exc_info()[1]
print(e)
assert e.faultcode == 'Server'
assert e.faultstring == zerodivisionerrormsg
assert e.debuginfo is None
@@ -342,7 +350,7 @@ class ProtocolTestCase(unittest.TestCase):
def test_return_unicode(self):
r = self.call('returntypes/getunicode')
assert r == u'', r
assert r == u('\xe3\x81\xae'), r
def test_return_int(self):
r = self.call('returntypes/getint')
@@ -378,7 +386,7 @@ class ProtocolTestCase(unittest.TestCase):
assert r == {'inner': {'aint': 0}}, r
def test_return_strarray(self):
r = self.call('returntypes/getstrarray', _rt=[str])
r = self.call('returntypes/getstrarray', _rt=[six.binary_type])
assert r == ['A', 'B', 'C'], r
def test_return_nestedarray(self):
@@ -386,7 +394,7 @@ class ProtocolTestCase(unittest.TestCase):
assert r == [{'inner': {'aint': 0}}, {'inner': {'aint': 0}}], r
def test_return_nesteddict(self):
r = self.call('returntypes/getnesteddict', _rt={str:NestedOuter})
r = self.call('returntypes/getnesteddict', _rt={six.binary_type: NestedOuter})
assert r == {'a': {'inner': {'aint': 0}}, 'b': {'inner': {'aint': 0}}}
def test_return_enum(self):
@@ -398,11 +406,12 @@ class ProtocolTestCase(unittest.TestCase):
assert r == {'attr.1': 5, 'attr.2': 6}
def test_setstr(self):
assert self.call('argtypes/setstr', value='astring') == 'astring'
assert self.call('argtypes/setstr', value=b('astring'),
_rt=six.binary_type) == b('astring')
def test_setunicode(self):
assert self.call('argtypes/setunicode', value=u'',
_rt=unicode) == u''
assert self.call('argtypes/setunicode', value=u('\xe3\x81\xae'),
_rt=six.text_type) == u('\xe3\x81\xae')
def test_setint(self):
r = self.call('argtypes/setint', value=3, _rt=int)
@@ -439,7 +448,7 @@ class ProtocolTestCase(unittest.TestCase):
value = binarysample
r = self.call('argtypes/setbinary', value=(value, wsme.types.binary),
_rt=wsme.types.binary) == value
print r
print(r)
def test_setnested(self):
value = {'inner': {'aint': 54}}
@@ -449,10 +458,10 @@ class ProtocolTestCase(unittest.TestCase):
assert r == value
def test_setstrarray(self):
value = ["1", "2", "three"]
value = [b("1"), b("2"), b("three")]
r = self.call('argtypes/setstrarray',
value=(value, [str]),
_rt=[str])
value=(value, [six.binary_type]),
_rt=[six.binary_type])
assert r == value, r
def test_setdatetimearray(self):
@@ -477,13 +486,13 @@ class ProtocolTestCase(unittest.TestCase):
def test_setnesteddict(self):
value = {
'o1': {'inner': {'aint': 54}},
'o2': {'inner': {'aint': 55}},
b('o1'): {'inner': {'aint': 54}},
b('o2'): {'inner': {'aint': 55}},
}
r = self.call('argtypes/setnesteddict',
value=(value, {str: NestedOuter}),
_rt={str: NestedOuter})
print r
value=(value, {six.binary_type: NestedOuter}),
_rt={six.binary_type: NestedOuter})
print(r)
assert r == value
def test_setenum(self):
@@ -506,12 +515,13 @@ class ProtocolTestCase(unittest.TestCase):
def test_missing_argument(self):
try:
r = self.call('argtypes/setdatetime')
print r
print(r)
assert "No error raised"
except CallException, e:
print e
except CallException:
e = sys.exc_info()[1]
print(e)
assert e.faultcode == 'Client'
assert e.faultstring == u'Missing argument: "value"'
assert e.faultstring == u('Missing argument: "value"')
def test_misc_multiply(self):
assert self.call('misc/multiply', a=5, b=2, _rt=int) == 10

View File

@@ -3,6 +3,8 @@ import datetime
import decimal
import urllib
import six
import wsme.tests.protocol
try:
@@ -20,9 +22,10 @@ def prepare_value(value, datatype):
if isinstance(datatype, list):
return [prepare_value(item, datatype[0]) for item in value]
if isinstance(datatype, dict):
key_type, value_type = list(datatype.items())[0]
return dict((
(prepare_value(item[0], datatype.keys()[0]),
prepare_value(item[1], datatype.values()[0]))
(prepare_value(item[0], key_type),
prepare_value(item[1], value_type))
for item in value.items()
))
if datatype in (datetime.date, datetime.time, datetime.datetime):
@@ -31,18 +34,21 @@ def prepare_value(value, datatype):
return str(value)
if datatype == wsme.types.binary:
return base64.encodestring(value)
if datatype == six.binary_type:
return value.decode('ascii')
return value
def prepare_result(value, datatype):
print(value, datatype)
if isusertype(datatype):
datatype = datatype.basetype
if isinstance(datatype, list):
return [prepare_result(item, datatype[0]) for item in value]
if isinstance(datatype, dict):
return dict((
(prepare_result(item[0], datatype.keys()[0]),
prepare_result(item[1], datatype.values()[0]))
(prepare_result(item[0], list(datatype.keys())[0]),
prepare_result(item[1], list(datatype.values())[0]))
for item in value.items()
))
if datatype == datetime.date:
@@ -59,6 +65,8 @@ def prepare_result(value, datatype):
return value
if datatype == wsme.types.binary:
return base64.decodestring(value)
if datatype == six.binary_type:
return value.encode('utf8')
if type(value) != datatype:
return datatype(value)
return value
@@ -87,12 +95,12 @@ class TestRestJson(wsme.tests.protocol.ProtocolTestCase):
content,
headers=headers,
expect_errors=True)
print "Received:", res.body
print("Received:", res.body)
if _no_result_decode:
return res
r = json.loads(res.body)
r = json.loads(res.text)
if res.status_int == 200:
if _rt and r:
r = prepare_result(r, _rt)
@@ -103,20 +111,20 @@ class TestRestJson(wsme.tests.protocol.ProtocolTestCase):
r['faultstring'],
r.get('debuginfo'))
return json.loads(res.body)
return json.loads(res.text)
def test_fromjson(self):
assert fromjson(str, None) == None
def test_keyargs(self):
r = self.app.get('/argtypes/setint.json?value=2')
print r
assert json.loads(r.body) == 2
print(r)
assert json.loads(r.text) == 2
nestedarray = 'value[0].inner.aint=54&value[1].inner.aint=55'
r = self.app.get('/argtypes/setnestedarray.json?' + nestedarray)
print r
assert json.loads(r.body) == [
print(r)
assert json.loads(r.text) == [
{'inner': {'aint': 54}},
{'inner': {'aint': 55}}]
@@ -128,9 +136,9 @@ class TestRestJson(wsme.tests.protocol.ProtocolTestCase):
body = urllib.urlencode(params)
r = self.app.post('/argtypes/setnestedarray.json', body,
headers={'Content-Type': 'application/x-www-form-urlencoded'})
print r
print(r)
assert json.loads(r.body) == [
assert json.loads(r.text) == [
{'inner': {'aint': 54}},
{'inner': {'aint': 55}}]
@@ -139,38 +147,38 @@ class TestRestJson(wsme.tests.protocol.ProtocolTestCase):
'{"value": 2}',
headers={"Content-Type": "application/json"},
expect_errors=True)
print r
print(r)
assert r.status_int == 400
assert json.loads(r.body)['faultstring'] == \
assert json.loads(r.text)['faultstring'] == \
"Cannot read parameters from both a body and GET/POST params"
def test_inline_body(self):
params = urllib.urlencode({'body': '{"value": 4}'})
r = self.app.get('/argtypes/setint.json?' + params)
print r
assert json.loads(r.body) == 4
print(r)
assert json.loads(r.text) == 4
def test_empty_body(self):
params = urllib.urlencode({'body': ''})
r = self.app.get('/returntypes/getint.json?' + params)
print r
assert json.loads(r.body) == 2
print(r)
assert json.loads(r.text) == 2
def test_unknown_arg(self):
r = self.app.post('/returntypes/getint.json',
'{"a": 2}',
headers={"Content-Type": "application/json"},
expect_errors=True)
print r
print(r)
assert r.status_int == 400
assert json.loads(r.body)['faultstring'].startswith(
assert json.loads(r.text)['faultstring'].startswith(
"Unknown argument:")
r = self.app.get('/returntypes/getint.json?a=2',
expect_errors=True)
print r
print(r)
assert r.status_int == 400
assert json.loads(r.body)['faultstring'].startswith(
assert json.loads(r.text)['faultstring'].startswith(
"Unknown argument:")
def test_array_tojson(self):
@@ -200,8 +208,8 @@ class TestRestJson(wsme.tests.protocol.ProtocolTestCase):
def test_nest_result(self):
self.root.protocols[0].nest_result = True
r = self.app.get('/returntypes/getint.json')
print r
assert json.loads(r.body) == {"result": 2}
print(r)
assert json.loads(r.text) == {"result": 2}
def test_encode_sample_value(self):
class MyType(object):
@@ -215,7 +223,7 @@ class TestRestJson(wsme.tests.protocol.ProtocolTestCase):
v.astr = 's'
r = self.root.protocols[0].encode_sample_value(MyType, v, True)
print r
print(r)
assert r[0] == ('javascript')
assert r[1] == json.dumps({'aint': 4, 'astr': 's'},
ensure_ascii=False, indent=4, sort_keys=True)