wsme/wsmeext/tg1.py

174 lines
5.1 KiB
Python

try:
import json
except ImportError:
import simplejson as json # noqa
import functools
import sys
import cherrypy
import webob
from turbogears import expose, util
import turbogears.view
from wsme.rest import validate as wsvalidate
import wsme.api
import wsme.rest
import wsme.rest.args
import wsme.rest.json
from wsme.utils import is_valid_code
import inspect
APIPATH_MAXLEN = 50
__all__ = ['wsexpose', 'wsvalidate']
def wsexpose(*args, **kwargs):
tg_json_expose = expose(
'wsmejson:',
accept_format='application/json',
content_type='application/json',
tg_format='json'
)
tg_altjson_expose = expose(
'wsmejson:',
accept_format='text/javascript',
content_type='application/json'
)
tg_xml_expose = expose(
'wsmexml:',
accept_format='text/xml',
content_type='text/xml',
tg_format='xml'
)
sig = wsme.signature(*args, **kwargs)
def decorate(f):
sig(f)
funcdef = wsme.api.FunctionDefinition.get(f)
@functools.wraps(f)
def callfunction(self, *args, **kwargs):
args, kwargs = wsme.rest.args.get_args(
funcdef, args, kwargs,
cherrypy.request.params, None,
cherrypy.request.body,
cherrypy.request.headers['Content-Type']
)
if funcdef.pass_request:
kwargs[funcdef.pass_request] = cherrypy.request
try:
result = f(self, *args, **kwargs)
except Exception:
try:
exception_info = sys.exc_info()
orig_exception = exception_info[1]
if isinstance(orig_exception, cherrypy.HTTPError):
orig_code = getattr(orig_exception, 'status', None)
else:
orig_code = getattr(orig_exception, 'code', None)
data = wsme.api.format_exception(exception_info)
finally:
del exception_info
cherrypy.response.status = 500
if data['faultcode'] == 'client':
cherrypy.response.status = 400
elif orig_code and is_valid_code(orig_code):
cherrypy.response.status = orig_code
accept = cherrypy.request.headers.get('Accept', "").lower()
accept = util.simplify_http_accept_header(accept)
decorators = {'text/xml': wsme.rest.xml.encode_error}
return decorators.get(
accept,
wsme.rest.json.encode_error
)(None, data)
return dict(
datatype=funcdef.return_type,
result=result
)
callfunction = tg_xml_expose(callfunction)
callfunction = tg_altjson_expose(callfunction)
callfunction = tg_json_expose(callfunction)
callfunction._wsme_original_function = f
return callfunction
return decorate
class AutoJSONTemplate(object):
def __init__(self, extra_vars_func=None, options=None):
pass
def render(self, info, format="json", fragment=False, template=None):
"Renders the template to a string using the provided info."
return wsme.rest.json.encode_result(
info['result'], info['datatype']
)
def get_content_type(self, user_agent):
return "application/json"
class AutoXMLTemplate(object):
def __init__(self, extra_vars_func=None, options=None):
pass
def render(self, info, format="json", fragment=False, template=None):
"Renders the template to a string using the provided info."
return wsme.rest.xml.encode_result(
info['result'], info['datatype']
)
def get_content_type(self, user_agent):
return "text/xml"
turbogears.view.engines['wsmejson'] = AutoJSONTemplate(turbogears.view.stdvars)
turbogears.view.engines['wsmexml'] = AutoXMLTemplate(turbogears.view.stdvars)
class Controller(object):
def __init__(self, wsroot):
self._wsroot = wsroot
@expose()
def default(self, *args, **kw):
req = webob.Request(cherrypy.request.wsgi_environ)
res = self._wsroot._handle_request(req)
cherrypy.response.header_list = res.headerlist
cherrypy.response.status = res.status
return res.body
def _scan_api(controller, path=[], objects=[]):
"""
Recursively iterate a controller api entries.
"""
for name in dir(controller):
if name.startswith('_'):
continue
a = getattr(controller, name)
if a in objects:
continue
if inspect.ismethod(a):
if wsme.api.iswsmefunction(a):
yield path + [name], a._wsme_original_function, [controller]
elif inspect.isclass(a):
continue
else:
if len(path) > APIPATH_MAXLEN:
raise ValueError("Path is too long: " + str(path))
for i in _scan_api(a, path + [name], objects + [a]):
yield i
def scan_api(root=None):
return _scan_api(cherrypy.root)