Files
deb-python-wsme/wsme/rest/args.py
Christophe de Vienne ddd2ba251e Move around the REST implementation : wsme.protocols.commons -> wsme.rest.args, wsme.protocols.rest -> wsme.rest.protocol, wsme.protocols.restxml/json -> wsme.rest.xml/json, wsme.protocols.__init__ -> wsme.protocol.
--HG--
rename : wsme/protocols/__init__.py => wsme/protocol.py
rename : wsme/protocols/commons.py => wsme/rest/args.py
rename : wsme/protocols/restjson.py => wsme/rest/json.py
rename : wsme/protocols/rest.py => wsme/rest/protocol.py
rename : wsme/protocols/restxml.py => wsme/rest/xml.py
2012-11-06 22:34:03 +01:00

189 lines
5.3 KiB
Python

import cgi
import datetime
import re
from simplegeneric import generic
from wsme.exc import UnknownArgument
from wsme.types import iscomplex, list_attributes, Unset
from wsme.types import UserType, ArrayType, DictType, File
from wsme.utils import parse_isodate, parse_isotime, parse_isodatetime
ARRAY_MAX_SIZE = 1000
@generic
def from_param(datatype, value):
return datatype(value) if value else None
@from_param.when_object(datetime.date)
def date_from_param(datatype, value):
return parse_isodate(value) if value else None
@from_param.when_object(datetime.time)
def time_from_param(datatype, value):
return parse_isotime(value) if value else None
@from_param.when_object(datetime.datetime)
def datetime_from_param(datatype, value):
return parse_isodatetime(value) if value else None
@from_param.when_object(File)
def filetype_from_param(datatype, value):
if isinstance(value, cgi.FieldStorage):
return File(fieldstorage=value)
return File(content=value)
@from_param.when_type(UserType)
def usertype_from_param(datatype, value):
return datatype.frombasetype(
from_param(datatype.basetype, value))
@generic
def from_params(datatype, params, path, hit_paths):
if iscomplex(datatype) and datatype is not File:
objfound = False
for key in params:
if key.startswith(path + '.'):
objfound = True
break
if objfound:
r = datatype()
for attrdef in list_attributes(datatype):
value = from_params(attrdef.datatype,
params, '%s.%s' % (path, attrdef.key), hit_paths)
if value is not Unset:
setattr(r, attrdef.key, value)
return r
else:
if path in params:
hit_paths.add(path)
return from_param(datatype, params[path])
return Unset
@from_params.when_type(ArrayType)
def array_from_params(datatype, params, path, hit_paths):
if path in params:
return [
from_param(datatype.item_type, value)
for value in params.getall(path)]
else:
indexes = set()
r = re.compile('^%s\[(?P<index>\d+)\]' % re.escape(path))
for p in params.keys():
m = r.match(p)
if m:
indexes.add(int(m.group('index')))
if not indexes:
return Unset
indexes = list(indexes)
indexes.sort()
return [from_params(datatype.item_type, params,
'%s[%s]' % (path, index), hit_paths)
for index in indexes]
@from_params.when_type(DictType)
def dict_from_params(datatype, params, path, hit_paths):
keys = set()
r = re.compile('^%s\[(?P<key>[a-zA-Z0-9_\.]+)\]' % re.escape(path))
for p in params.keys():
m = r.match(p)
if m:
keys.add(from_param(datatype.key_type, m.group('key')))
if not keys:
return Unset
return dict((
(key, from_params(datatype.value_type,
params, '%s[%s]' % (path, key), hit_paths))
for key in keys))
def args_from_args(funcdef, args, kwargs):
newargs = []
for argdef, arg in zip(funcdef.arguments[:len(args)], args):
newargs.append(from_param(argdef.datatype, arg))
newkwargs = {}
for argname, value in kwargs.items():
newkwargs[argname] = from_param(funcdef.get_arg(argname), value)
return newargs, newkwargs
def args_from_params(funcdef, params):
kw = {}
hit_paths = set()
for argdef in funcdef.arguments:
value = from_params(
argdef.datatype, params, argdef.name, hit_paths)
if value is not Unset:
kw[argdef.name] = value
paths = set(params.keys())
unknown_paths = paths - hit_paths
if unknown_paths:
raise UnknownArgument(', '.join(unknown_paths))
return [], kw
def args_from_body(funcdef, body, mimetype):
from wsme.rest import json as restjson
from wsme.rest import xml as restxml
kw = {}
if funcdef.body_type is not None:
bodydata = None
if mimetype in restjson.RestJsonProtocol.content_types:
if hasattr(body, 'read'):
jsonbody = restjson.json.load(body)
else:
jsonbody = restjson.json.loads(body)
bodydata = restjson.fromjson(funcdef.body_type, jsonbody)
elif mimetype in restxml.RestXmlProtocol.content_types:
if hasattr(body, 'read'):
xmlbody = restxml.et.parse(body)
else:
xmlbody = restxml.et.fromstring(body)
bodydata = restxml.fromxml(funcdef.body_type, xmlbody)
if bodydata:
kw[funcdef.arguments[-1].name] = bodydata
return (), kw
def combine_args(funcdef, *akw):
newargs, newkwargs = [], {}
argindexes = {}
for i, arg in enumerate(funcdef.arguments):
argindexes[arg.name] = i
newargs.append(arg.default)
for args, kwargs in akw:
for i, arg in enumerate(args):
newargs[i] = arg
for name, value in kwargs.iteritems():
newargs[argindexes[name]] = value
return newargs, newkwargs
def get_args(funcdef, args, kwargs, body, mimetype):
return combine_args(
funcdef,
args_from_args(funcdef, args, kwargs),
args_from_body(funcdef, body, mimetype)
)