
322 lines
9.5 KiB

"""REST+Json protocol implementation."""
import datetime
import decimal
import json
from simplegeneric import generic
import six
import wsme.exc
import wsme.types
from wsme.types import Unset
import wsme.utils
content_type = 'application/json'
accept_content_types = [
ENUM_TRUE = ('true', 't', 'yes', 'y', 'on', '1')
ENUM_FALSE = ('false', 'f', 'no', 'n', 'off', '0')
def tojson(datatype, value):
A generic converter from python to jsonify-able datatypes.
If a non-complex user specific type is to be used in the api,
a specific tojson should be added::
from wsme.protocol.restjson import tojson
myspecialtype = object()
def myspecialtype_tojson(datatype, value):
return str(value)
if value is None:
return None
if wsme.types.iscomplex(datatype):
d = dict()
for attr in wsme.types.list_attributes(datatype):
attr_value = getattr(value, attr.key)
if attr_value is not Unset:
d[] = tojson(attr.datatype, attr_value)
return d
elif wsme.types.isusertype(datatype):
return tojson(datatype.basetype, datatype.tobasetype(value))
return value
def bytes_tojson(datatype, value):
if value is None:
return None
return value.decode('ascii')
def array_tojson(datatype, value):
if value is None:
return None
return [tojson(datatype.item_type, item) for item in value]
def dict_tojson(datatype, value):
if value is None:
return None
return dict((
(tojson(datatype.key_type, item[0]),
tojson(datatype.value_type, item[1]))
for item in value.items()
def decimal_tojson(datatype, value):
if value is None:
return None
return str(value)
def date_tojson(datatype, value):
if value is None:
return None
return value.isoformat()
def time_tojson(datatype, value):
if value is None:
return None
return value.isoformat()
def datetime_tojson(datatype, value):
if value is None:
return None
return value.isoformat()
def fromjson(datatype, value):
"""A generic converter from json base types to python datatype.
If a non-complex user specific type is to be used in the api,
a specific fromjson should be added::
from wsme.protocol.restjson import fromjson
class MySpecialType(object):
def myspecialtype_fromjson(datatype, value):
return MySpecialType(value)
if value is None:
return None
if wsme.types.iscomplex(datatype):
obj = datatype()
attributes = wsme.types.list_attributes(datatype)
# Here we check that all the attributes in the value are also defined
# in our type definition, otherwise we raise an Error.
v_keys = set(value.keys())
a_keys = set( for adef in attributes)
if not v_keys <= a_keys:
raise wsme.exc.UnknownAttribute(None, v_keys - a_keys)
for attrdef in attributes:
if in value:
val_fromjson = fromjson(attrdef.datatype,
except wsme.exc.UnknownAttribute as e:
if getattr(attrdef, 'readonly', False):
raise wsme.exc.InvalidInput(, val_fromjson,
"Cannot set read only field.")
setattr(obj, attrdef.key, val_fromjson)
elif attrdef.mandatory:
raise wsme.exc.InvalidInput(, None,
"Mandatory field missing.")
return wsme.types.validate_value(datatype, obj)
elif wsme.types.isusertype(datatype):
value = datatype.frombasetype(
fromjson(datatype.basetype, value))
return value
def array_fromjson(datatype, value):
if value is None:
return None
if not isinstance(value, list):
raise ValueError("Value not a valid list: %s" % value)
return [fromjson(datatype.item_type, item) for item in value]
def dict_fromjson(datatype, value):
if value is None:
return None
if not isinstance(value, dict):
raise ValueError("Value not a valid dict: %s" % value)
return dict((
(fromjson(datatype.key_type, item[0]),
fromjson(datatype.value_type, item[1]))
for item in value.items()))
def str_fromjson(datatype, value):
if (isinstance(value, six.string_types) or
isinstance(value, six.integer_types) or
isinstance(value, float)):
return six.text_type(value).encode('utf8')
def text_fromjson(datatype, value):
if value is not None and isinstance(value, wsme.types.bytes):
return wsme.types.text(value)
return value
@fromjson.when_object(*six.integer_types + (float,))
def numeric_fromjson(datatype, value):
"""Convert string object to built-in types int, long or float."""
if value is None:
return None
return datatype(value)
def bool_fromjson(datatype, value):
"""Convert to bool, restricting strings to just unambiguous values."""
if value is None:
return None
if isinstance(value, six.integer_types + (bool,)):
return bool(value)
if value in ENUM_TRUE:
return True
if value in ENUM_FALSE:
return False
raise ValueError("Value not an unambiguous boolean: %s" % value)
def decimal_fromjson(datatype, value):
if value is None:
return None
return decimal.Decimal(value)
def date_fromjson(datatype, value):
if value is None:
return None
return wsme.utils.parse_isodate(value)
def time_fromjson(datatype, value):
if value is None:
return None
return wsme.utils.parse_isotime(value)
def datetime_fromjson(datatype, value):
if value is None:
return None
return wsme.utils.parse_isodatetime(value)
def parse(s, datatypes, bodyarg, encoding='utf8'):
jload = json.load
if not hasattr(s, 'read'):
if six.PY3 and isinstance(s, six.binary_type):
s = s.decode(encoding)
jload = json.loads
jdata = jload(s)
except ValueError:
raise wsme.exc.ClientSideError("Request is not in valid JSON format")
if bodyarg:
argname = list(datatypes.keys())[0]
kw = {argname: fromjson(datatypes[argname], jdata)}
except ValueError as e:
raise wsme.exc.InvalidInput(argname, jdata, e.args[0])
except wsme.exc.UnknownAttribute as e:
# We only know the fieldname at this level, not in the
# called function. We fill in this information here.
kw = {}
extra_args = []
if not isinstance(jdata, dict):
raise wsme.exc.ClientSideError("Request must be a JSON dict")
for key in jdata:
if key not in datatypes:
kw[key] = fromjson(datatypes[key], jdata[key])
except ValueError as e:
raise wsme.exc.InvalidInput(key, jdata[key], e.args[0])
except wsme.exc.UnknownAttribute as e:
# We only know the fieldname at this level, not in the
# called function. We fill in this information here.
if extra_args:
raise wsme.exc.UnknownArgument(', '.join(extra_args))
return kw
def encode_result(value, datatype, **options):
jsondata = tojson(datatype, value)
if options.get('nest_result', False):
jsondata = {options.get('nested_result_attrname', 'result'): jsondata}
return json.dumps(jsondata)
def encode_error(context, errordetail):
return json.dumps(errordetail)
def encode_sample_value(datatype, value, format=False):
r = tojson(datatype, value)
content = json.dumps(r, ensure_ascii=False, indent=4 if format else 0,
return ('javascript', content)
def encode_sample_params(params, format=False):
kw = {}
for name, datatype, value in params:
kw[name] = tojson(datatype, value)
content = json.dumps(kw, ensure_ascii=False, indent=4 if format else 0,
return ('javascript', content)
def encode_sample_result(datatype, value, format=False):
r = tojson(datatype, value)
content = json.dumps(r, ensure_ascii=False, indent=4 if format else 0,
return ('javascript', content)