Rewriting the argument mapping to handle keyword arguments, stop dropping unused URL parts, and prevent GET/POST params from overriding URL parts
This commit is contained in:
@@ -111,37 +111,42 @@ class Pecan(object):
|
||||
for hook in hooks:
|
||||
getattr(hook, hook_type)(*args)
|
||||
|
||||
def get_params(self, all_params, remainder, argspec, im_self):
|
||||
valid_params = dict()
|
||||
positional_params = []
|
||||
def get_args(self, all_params, remainder, argspec, im_self):
|
||||
args = []
|
||||
kwargs = dict()
|
||||
valid_args = argspec[0][1:]
|
||||
|
||||
if im_self is not None:
|
||||
positional_params.append(im_self)
|
||||
args.append(im_self)
|
||||
|
||||
# grab the routing args from nested REST controllers
|
||||
if 'routing_args' in request.context:
|
||||
remainder = request.context.pop('routing_args') + list(remainder)
|
||||
|
||||
# handle params that are POST or GET variables first
|
||||
for param_name, param_value in all_params.iteritems():
|
||||
if param_name in argspec[0]:
|
||||
valid_params[param_name] = param_value
|
||||
|
||||
# handle positional arguments
|
||||
used = set()
|
||||
for i, value in enumerate(remainder):
|
||||
if len(argspec[0]) > (i+1):
|
||||
if valid_params.get(argspec[0][i+1]) is None:
|
||||
used.add(i)
|
||||
valid_params[argspec[0][i+1]] = value
|
||||
if valid_args and remainder:
|
||||
args.extend(remainder[:len(valid_args)])
|
||||
remainder = remainder[len(valid_args):]
|
||||
valid_args = valid_args[len(args):]
|
||||
|
||||
# handle unconsumed positional arguments
|
||||
if len(used) < len(remainder) and argspec[1] is not None:
|
||||
for i, value in enumerate(remainder):
|
||||
if i not in used:
|
||||
positional_params.append(value)
|
||||
# handle wildcard arguments
|
||||
if remainder:
|
||||
if not argspec[1]:
|
||||
abort(404)
|
||||
args.extend(remainder)
|
||||
|
||||
return valid_params, positional_params
|
||||
# handle positional GET/POST params
|
||||
for name in valid_args:
|
||||
if name in all_params:
|
||||
args.append(all_params.pop(name))
|
||||
|
||||
# handle wildcard GET/POST params
|
||||
if argspec[2]:
|
||||
for name, value in all_params.iteritems():
|
||||
if name not in argspec[0]:
|
||||
kwargs[name] = value
|
||||
|
||||
return args, kwargs
|
||||
|
||||
def validate(self, schema, params=None, json=False):
|
||||
to_validate = params
|
||||
@@ -193,12 +198,7 @@ class Pecan(object):
|
||||
self.handle_security(controller)
|
||||
|
||||
# fetch and validate any parameters
|
||||
params, positional_params = self.get_params(
|
||||
dict(state.request.str_params),
|
||||
remainder,
|
||||
controller.pecan['argspec'],
|
||||
im_self
|
||||
)
|
||||
params = dict(state.request.str_params)
|
||||
if 'schema' in controller.pecan:
|
||||
request.validation_error = None
|
||||
try:
|
||||
@@ -213,8 +213,16 @@ class Pecan(object):
|
||||
redirect(controller.pecan['error_handler'], internal=True)
|
||||
if controller.pecan['validate_json']: params = dict(data=params)
|
||||
|
||||
# fetch the arguments for the controller
|
||||
args, kwargs = self.get_args(
|
||||
params,
|
||||
remainder,
|
||||
controller.pecan['argspec'],
|
||||
im_self
|
||||
)
|
||||
|
||||
# get the result from the controller
|
||||
result = controller(*positional_params, **params)
|
||||
result = controller(*args, **kwargs)
|
||||
|
||||
# a controller can return the response object which means they've taken
|
||||
# care of filling it out
|
||||
|
||||
@@ -99,6 +99,320 @@ class TestBase(object):
|
||||
assert r.status_int == 200
|
||||
assert r.body == '/100/name'
|
||||
|
||||
def test_controller_args(self):
|
||||
class RootController(object):
|
||||
@expose()
|
||||
def index(self, id):
|
||||
return 'index: %s' % id
|
||||
|
||||
@expose()
|
||||
def multiple(self, one, two):
|
||||
return 'multiple: %s, %s' % (one, two)
|
||||
|
||||
@expose()
|
||||
def optional(self, id=None):
|
||||
return 'optional: %s' % str(id)
|
||||
|
||||
@expose()
|
||||
def variable_args(self, *args):
|
||||
return 'variable_args: %s' % ', '.join(args)
|
||||
|
||||
@expose()
|
||||
def variable_kwargs(self, **kwargs):
|
||||
data = ['%s=%s' % (key, kwargs[key]) for key in sorted(kwargs.keys())]
|
||||
return 'variable_kwargs: %s' % ', '.join(data)
|
||||
|
||||
@expose()
|
||||
def variable_all(self, *args, **kwargs):
|
||||
data = ['%s=%s' % (key, kwargs[key]) for key in sorted(kwargs.keys())]
|
||||
return 'variable_all: %s' % ', '.join(list(args) + data)
|
||||
|
||||
@expose()
|
||||
def eater(self, id, dummy=None, *args, **kwargs):
|
||||
data = ['%s=%s' % (key, kwargs[key]) for key in sorted(kwargs.keys())]
|
||||
return 'eater: %s, %s, %s' % (id, dummy, ', '.join(list(args) + data))
|
||||
|
||||
@expose()
|
||||
def _route(self, args):
|
||||
if hasattr(self, args[0]):
|
||||
return getattr(self, args[0]), args[1:]
|
||||
else:
|
||||
return self.index, args
|
||||
|
||||
app = TestApp(Pecan(RootController()))
|
||||
|
||||
# required arg
|
||||
|
||||
try:
|
||||
r = app.get('/')
|
||||
assert r.status_int != 200
|
||||
except Exception, ex:
|
||||
assert type(ex) == TypeError
|
||||
assert ex.args[0] == 'index() takes exactly 2 arguments (1 given)'
|
||||
|
||||
r = app.get('/1')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'index: 1'
|
||||
|
||||
r = app.get('/1/dummy', status=404)
|
||||
assert r.status_int == 404
|
||||
|
||||
r = app.get('/?id=2')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'index: 2'
|
||||
|
||||
r = app.get('/3?id=three')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'index: 3'
|
||||
|
||||
r = app.post('/', {'id': '4'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'index: 4'
|
||||
|
||||
r = app.post('/4', {'id': 'four'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'index: 4'
|
||||
|
||||
r = app.get('/?id=5&dummy=dummy')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'index: 5'
|
||||
|
||||
r = app.post('/', {'id': '6', 'dummy': 'dummy'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'index: 6'
|
||||
|
||||
# multiple args
|
||||
|
||||
r = app.get('/multiple/one/two')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'multiple: one, two'
|
||||
|
||||
r = app.get('/multiple?one=three&two=four')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'multiple: three, four'
|
||||
|
||||
r = app.post('/multiple', {'one': 'five', 'two': 'six'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'multiple: five, six'
|
||||
|
||||
# optional arg
|
||||
|
||||
r = app.get('/optional')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'optional: None'
|
||||
|
||||
r = app.get('/optional/1')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'optional: 1'
|
||||
|
||||
r = app.get('/optional/2/dummy', status=404)
|
||||
assert r.status_int == 404
|
||||
|
||||
r = app.get('/optional?id=2')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'optional: 2'
|
||||
|
||||
r = app.get('/optional/3?id=three')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'optional: 3'
|
||||
|
||||
r = app.post('/optional', {'id': '4'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'optional: 4'
|
||||
|
||||
r = app.post('/optional/5', {'id': 'five'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'optional: 5'
|
||||
|
||||
r = app.get('/optional?id=6&dummy=dummy')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'optional: 6'
|
||||
|
||||
r = app.post('/optional', {'id': '7', 'dummy': 'dummy'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'optional: 7'
|
||||
|
||||
# variable args
|
||||
|
||||
r = app.get('/variable_args')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_args: '
|
||||
|
||||
r = app.get('/variable_args/1/dummy')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_args: 1, dummy'
|
||||
|
||||
r = app.get('/variable_args?id=2&dummy=dummy')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_args: '
|
||||
|
||||
r = app.post('/variable_args', {'id': '3', 'dummy': 'dummy'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_args: '
|
||||
|
||||
# variable keyword args
|
||||
|
||||
r = app.get('/variable_kwargs')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_kwargs: '
|
||||
|
||||
r = app.get('/variable_kwargs/1/dummy', status=404)
|
||||
assert r.status_int == 404
|
||||
|
||||
r = app.get('/variable_kwargs?id=2&dummy=dummy')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_kwargs: dummy=dummy, id=2'
|
||||
|
||||
r = app.post('/variable_kwargs', {'id': '3', 'dummy': 'dummy'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_kwargs: dummy=dummy, id=3'
|
||||
|
||||
# variable args & keyword args
|
||||
|
||||
r = app.get('/variable_all')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_all: '
|
||||
|
||||
r = app.get('/variable_all/1')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_all: 1'
|
||||
|
||||
r = app.get('/variable_all/2/dummy')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_all: 2, dummy'
|
||||
|
||||
r = app.get('/variable_all/3?month=1&day=12')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_all: 3, day=12, month=1'
|
||||
|
||||
r = app.get('/variable_all/4?id=four&month=1&day=12')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_all: 4, day=12, id=four, month=1'
|
||||
|
||||
r = app.post('/variable_all/5/dummy')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_all: 5, dummy'
|
||||
|
||||
r = app.post('/variable_all/6', {'month': '1', 'day': '12'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_all: 6, day=12, month=1'
|
||||
|
||||
r = app.post('/variable_all/7', {'id': 'seven', 'month': '1', 'day': '12'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'variable_all: 7, day=12, id=seven, month=1'
|
||||
|
||||
# the "everything" controller
|
||||
|
||||
try:
|
||||
r = app.get('/eater')
|
||||
assert r.status_int != 200
|
||||
except Exception, ex:
|
||||
assert type(ex) == TypeError
|
||||
assert ex.args[0] == 'eater() takes at least 2 arguments (1 given)'
|
||||
|
||||
r = app.get('/eater/1')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'eater: 1, None, '
|
||||
|
||||
r = app.get('/eater/2/dummy')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'eater: 2, dummy, '
|
||||
|
||||
r = app.get('/eater/3/dummy/foo/bar')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'eater: 3, dummy, foo, bar'
|
||||
|
||||
r = app.get('/eater/4?month=1&day=12')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'eater: 4, None, day=12, month=1'
|
||||
|
||||
r = app.get('/eater/5?id=five&month=1&day=12&dummy=dummy')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'eater: 5, dummy, day=12, month=1'
|
||||
|
||||
r = app.post('/eater/6')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'eater: 6, None, '
|
||||
|
||||
r = app.post('/eater/7/dummy')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'eater: 7, dummy, '
|
||||
|
||||
r = app.post('/eater/8/dummy/foo/bar')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'eater: 8, dummy, foo, bar'
|
||||
|
||||
r = app.post('/eater/9', {'month': '1', 'day': '12'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'eater: 9, None, day=12, month=1'
|
||||
|
||||
r = app.post('/eater/10', {'id': 'ten', 'month': '1', 'day': '12', 'dummy': 'dummy'})
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'eater: 10, dummy, day=12, month=1'
|
||||
|
||||
def test_abort(self):
|
||||
class RootController(object):
|
||||
@expose()
|
||||
def index(self):
|
||||
abort(404)
|
||||
|
||||
app = TestApp(Pecan(RootController()))
|
||||
r = app.get('/', status=404)
|
||||
assert r.status_int == 404
|
||||
|
||||
def test_redirect(self):
|
||||
class RootController(object):
|
||||
@expose()
|
||||
def index(self):
|
||||
redirect('/testing')
|
||||
|
||||
@expose()
|
||||
def permanent(self):
|
||||
redirect('/testing', code=301)
|
||||
|
||||
@expose()
|
||||
def testing(self):
|
||||
return 'it worked!'
|
||||
|
||||
app = TestApp(Pecan(RootController()))
|
||||
r = app.get('/')
|
||||
assert r.status_int == 302
|
||||
r = r.follow()
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'it worked!'
|
||||
|
||||
r = app.get('/permanent')
|
||||
assert r.status_int == 301
|
||||
r = r.follow()
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'it worked!'
|
||||
|
||||
def test_streaming_response(self):
|
||||
import StringIO
|
||||
class RootController(object):
|
||||
@expose(content_type='text/plain')
|
||||
def test(self, foo):
|
||||
if foo == 'stream':
|
||||
# mimic large file
|
||||
contents = StringIO.StringIO('stream')
|
||||
response.content_type='application/octet-stream'
|
||||
contents.seek(0, os.SEEK_END)
|
||||
response.content_length = contents.tell()
|
||||
contents.seek(0, os.SEEK_SET)
|
||||
response.app_iter = contents
|
||||
return response
|
||||
else:
|
||||
return 'plain text'
|
||||
|
||||
app = TestApp(Pecan(RootController()))
|
||||
r = app.get('/test/stream')
|
||||
assert r.content_type == 'application/octet-stream'
|
||||
assert r.body == 'stream'
|
||||
|
||||
r = app.get('/test/plain')
|
||||
assert r.content_type == 'text/plain'
|
||||
assert r.body == 'plain text'
|
||||
|
||||
def test_request_state_cleanup(self):
|
||||
"""
|
||||
After a request, the state local() should be totally clean
|
||||
@@ -193,140 +507,3 @@ class TestEngines(object):
|
||||
assert r.status_int == 200
|
||||
result = dict(loads(r.body))
|
||||
assert result == expected_result
|
||||
|
||||
def test_controller_parameters(self):
|
||||
class RootController(object):
|
||||
@expose('json')
|
||||
def index(self, argument=None):
|
||||
assert argument == 'value'
|
||||
return dict()
|
||||
|
||||
# arguments should get passed appropriately
|
||||
app = TestApp(Pecan(RootController()))
|
||||
r = app.get('/?argument=value')
|
||||
assert r.status_int == 200
|
||||
|
||||
# extra arguments get stripped off
|
||||
r = app.get('/?argument=value&extra=not')
|
||||
assert r.status_int == 200
|
||||
|
||||
def test_abort(self):
|
||||
class RootController(object):
|
||||
@expose()
|
||||
def index(self):
|
||||
abort(404)
|
||||
|
||||
app = TestApp(Pecan(RootController()))
|
||||
r = app.get('/', status=404)
|
||||
assert r.status_int == 404
|
||||
|
||||
def test_redirect(self):
|
||||
class RootController(object):
|
||||
@expose()
|
||||
def index(self):
|
||||
redirect('/testing')
|
||||
|
||||
@expose()
|
||||
def permanent(self):
|
||||
redirect('/testing', code=301)
|
||||
|
||||
@expose()
|
||||
def testing(self):
|
||||
return 'it worked!'
|
||||
|
||||
app = TestApp(Pecan(RootController()))
|
||||
r = app.get('/')
|
||||
assert r.status_int == 302
|
||||
r = r.follow()
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'it worked!'
|
||||
|
||||
r = app.get('/permanent')
|
||||
assert r.status_int == 301
|
||||
r = r.follow()
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'it worked!'
|
||||
|
||||
def test_uri_to_parameter_mapping(self):
|
||||
class RootController(object):
|
||||
@expose()
|
||||
def test(self, one, two):
|
||||
assert one == '1'
|
||||
assert two == '2'
|
||||
return 'it worked'
|
||||
|
||||
app = TestApp(Pecan(RootController()))
|
||||
r = app.get('/test/1/2')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'it worked'
|
||||
|
||||
def test_uri_to_parameter_mapping_with_validation(self):
|
||||
class TestSchema(Schema):
|
||||
one = validators.Int(not_empty=True)
|
||||
two = validators.Int(not_empty=True)
|
||||
|
||||
class RootController(object):
|
||||
@expose(schema=TestSchema())
|
||||
def test(self, one, two):
|
||||
assert request.validation_error is None
|
||||
assert one == 1
|
||||
assert two == 2
|
||||
return 'it worked'
|
||||
|
||||
@expose(schema=TestSchema())
|
||||
def fail(self, one, two):
|
||||
assert request.validation_error is not None
|
||||
assert one == 'one'
|
||||
assert two == 'two'
|
||||
return 'it failed'
|
||||
|
||||
app = TestApp(Pecan(RootController()))
|
||||
r = app.get('/test/1/2')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'it worked'
|
||||
|
||||
r = app.get('/fail/one/two')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'it failed'
|
||||
|
||||
def test_uri_to_parameter_mapping_with_varargs(self):
|
||||
class RootController(object):
|
||||
@expose()
|
||||
def test(self, *args):
|
||||
assert len(args) == 4
|
||||
assert args[0] == '1'
|
||||
assert args[1] == '2'
|
||||
assert args[2] == '3'
|
||||
assert args[3] == '4'
|
||||
return 'it worked'
|
||||
|
||||
app = TestApp(Pecan(RootController()))
|
||||
r = app.get('/test/1/2/3/4')
|
||||
assert r.status_int == 200
|
||||
assert r.body == 'it worked'
|
||||
|
||||
def test_streaming_response(self):
|
||||
import StringIO
|
||||
class RootController(object):
|
||||
@expose(content_type='text/plain')
|
||||
def test(self, foo):
|
||||
if foo == 'stream':
|
||||
# mimic large file
|
||||
contents = StringIO.StringIO('stream')
|
||||
response.content_type='application/octet-stream'
|
||||
contents.seek(0, os.SEEK_END)
|
||||
response.content_length = contents.tell()
|
||||
contents.seek(0, os.SEEK_SET)
|
||||
response.app_iter = contents
|
||||
return response
|
||||
else:
|
||||
return 'plain text'
|
||||
|
||||
app = TestApp(Pecan(RootController()))
|
||||
r = app.get('/test/stream/')
|
||||
assert r.content_type == 'application/octet-stream'
|
||||
assert r.body == 'stream'
|
||||
|
||||
r = app.get('/test/plain/')
|
||||
assert r.content_type == 'text/plain'
|
||||
assert r.body == 'plain text'
|
||||
|
||||
Reference in New Issue
Block a user